1use std::marker::PhantomData;
5
6use reifydb_catalog::catalog::Catalog;
7use reifydb_core::{
8 encoded::schema::Schema,
9 interface::{auth::Identity, catalog::id::IndexId},
10 key::{EncodableKey, index_entry::IndexEntryKey},
11};
12use reifydb_transaction::transaction::{Transaction, command::CommandTransaction};
13use reifydb_type::{
14 fragment::Fragment,
15 value::{Value, row_number::RowNumber, r#type::Type},
16};
17
18use super::{
19 BulkInsertResult, RingBufferInsertResult, TableInsertResult,
20 error::BulkInsertError,
21 validation::{
22 reorder_rows_trusted, reorder_rows_trusted_rb, validate_and_coerce_rows, validate_and_coerce_rows_rb,
23 },
24};
25use crate::{
26 bulk_insert::primitive::{
27 ringbuffer::{PendingRingBufferInsert, RingBufferInsertBuilder},
28 table::{PendingTableInsert, TableInsertBuilder},
29 },
30 engine::StandardEngine,
31 transaction::operation::{
32 dictionary::DictionaryOperations, ringbuffer::RingBufferOperations, table::TableOperations,
33 },
34};
35
36pub trait ValidationMode: sealed::Sealed + 'static {}
38
39pub struct Validated;
41impl ValidationMode for Validated {}
42
43pub struct Trusted;
45impl ValidationMode for Trusted {}
46
47pub mod sealed {
48 pub trait Sealed {}
49 impl Sealed for super::Validated {}
50 impl Sealed for super::Trusted {}
51}
52
53pub struct BulkInsertBuilder<'e, V: ValidationMode = Validated> {
57 engine: &'e StandardEngine,
58 _identity: &'e Identity,
59 pending_tables: Vec<PendingTableInsert>,
60 pending_ringbuffers: Vec<PendingRingBufferInsert>,
61 _validation: PhantomData<V>,
62}
63
64impl<'e> BulkInsertBuilder<'e, Validated> {
65 pub(crate) fn new(engine: &'e StandardEngine, identity: &'e Identity) -> Self {
67 Self {
68 engine,
69 _identity: identity,
70 pending_tables: Vec::new(),
71 pending_ringbuffers: Vec::new(),
72 _validation: PhantomData,
73 }
74 }
75}
76
77impl<'e> BulkInsertBuilder<'e, Trusted> {
78 pub(crate) fn new_trusted(engine: &'e StandardEngine, identity: &'e Identity) -> Self {
80 Self {
81 engine,
82 _identity: identity,
83 pending_tables: Vec::new(),
84 pending_ringbuffers: Vec::new(),
85 _validation: PhantomData,
86 }
87 }
88}
89
90impl<'e, V: ValidationMode> BulkInsertBuilder<'e, V> {
91 pub fn table<'a>(&'a mut self, qualified_name: &str) -> TableInsertBuilder<'a, 'e, V> {
96 let (namespace, table) = parse_qualified_name(qualified_name);
97 TableInsertBuilder::new(self, namespace, table)
98 }
99
100 pub fn ringbuffer<'a>(&'a mut self, qualified_name: &str) -> RingBufferInsertBuilder<'a, 'e, V> {
105 let (namespace, ringbuffer) = parse_qualified_name(qualified_name);
106 RingBufferInsertBuilder::new(self, namespace, ringbuffer)
107 }
108
109 pub(super) fn add_table_insert(&mut self, pending: PendingTableInsert) {
111 self.pending_tables.push(pending);
112 }
113
114 pub(super) fn add_ringbuffer_insert(&mut self, pending: PendingRingBufferInsert) {
116 self.pending_ringbuffers.push(pending);
117 }
118
119 pub fn execute(self) -> crate::Result<BulkInsertResult> {
124 let mut txn = self.engine.begin_command()?;
125 let catalog = self.engine.catalog();
126 let mut result = BulkInsertResult::default();
127
128 for pending in self.pending_tables {
130 let table_result =
131 execute_table_insert::<V>(&catalog, &mut txn, &pending, std::any::TypeId::of::<V>())?;
132 result.tables.push(table_result);
133 }
134
135 for pending in self.pending_ringbuffers {
137 let rb_result = execute_ringbuffer_insert::<V>(
138 &catalog,
139 &mut txn,
140 &pending,
141 std::any::TypeId::of::<V>(),
142 )?;
143 result.ringbuffers.push(rb_result);
144 }
145
146 txn.commit()?;
148
149 Ok(result)
150 }
151}
152
153fn execute_table_insert<V: ValidationMode>(
155 catalog: &Catalog,
156 txn: &mut CommandTransaction,
157 pending: &PendingTableInsert,
158 type_id: std::any::TypeId,
159) -> crate::Result<TableInsertResult> {
160 use crate::vm::instruction::dml::primary_key;
161
162 let namespace = catalog
164 .find_namespace_by_name(&mut Transaction::Command(txn), &pending.namespace)?
165 .ok_or_else(|| BulkInsertError::namespace_not_found(Fragment::None, &pending.namespace))?;
166
167 let table = catalog
168 .find_table_by_name(&mut Transaction::Command(txn), namespace.id, &pending.table)?
169 .ok_or_else(|| BulkInsertError::table_not_found(Fragment::None, &pending.namespace, &pending.table))?;
170
171 let schema = crate::vm::instruction::dml::schema::get_or_create_table_schema(
173 catalog,
174 &table,
175 &mut Transaction::Command(txn),
176 )?;
177
178 let is_validated = type_id == std::any::TypeId::of::<Validated>();
180 let coerced_rows = if is_validated {
181 validate_and_coerce_rows(&pending.rows, &table)?
182 } else {
183 reorder_rows_trusted(&pending.rows, &table)?
184 };
185
186 let mut encoded_rows = Vec::with_capacity(coerced_rows.len());
187
188 for mut values in coerced_rows {
189 for (idx, col) in table.columns.iter().enumerate() {
191 if col.auto_increment && matches!(values[idx], Value::None { .. }) {
192 values[idx] = catalog.column_sequence_next_value(txn, table.id, col.id)?;
193 }
194 }
195
196 for (idx, col) in table.columns.iter().enumerate() {
198 if let Some(dict_id) = col.dictionary_id {
199 let dictionary = catalog
200 .find_dictionary(&mut Transaction::Command(txn), dict_id)?
201 .ok_or_else(|| {
202 reifydb_core::internal_error!(
203 "Dictionary {:?} not found for column {}",
204 dict_id,
205 col.name
206 )
207 })?;
208 let entry_id = txn.insert_into_dictionary(&dictionary, &values[idx])?;
209 values[idx] = entry_id.to_value();
210 }
211 }
212
213 if is_validated {
215 for (idx, col) in table.columns.iter().enumerate() {
216 col.constraint.validate(&values[idx])?;
217 }
218 }
219
220 let mut row = schema.allocate();
222 for (idx, value) in values.iter().enumerate() {
223 schema.set_value(&mut row, idx, value);
224 }
225 encoded_rows.push(row);
226 }
227
228 let total_rows = encoded_rows.len();
230 if total_rows == 0 {
231 return Ok(TableInsertResult {
232 namespace: pending.namespace.clone(),
233 table: pending.table.clone(),
234 inserted: 0,
235 });
236 }
237
238 let row_numbers = catalog.next_row_number_batch(txn, table.id, total_rows as u64)?;
239
240 for (row, &row_number) in encoded_rows.iter().zip(row_numbers.iter()) {
242 txn.insert_table(table.clone(), row.clone(), row_number)?;
243
244 if let Some(pk_def) = primary_key::get_primary_key(catalog, &mut Transaction::Command(txn), &table)? {
246 let index_key = primary_key::encode_primary_key(&pk_def, row, &table, &schema)?;
247 let index_entry_key =
248 IndexEntryKey::new(table.id, IndexId::primary(pk_def.id), index_key.clone());
249
250 if txn.contains_key(&index_entry_key.encode())? {
252 let key_columns = pk_def.columns.iter().map(|c| c.name.clone()).collect();
253 reifydb_type::return_error!(
254 reifydb_core::error::diagnostic::index::primary_key_violation(
255 Fragment::None,
256 table.name.clone(),
257 key_columns,
258 )
259 );
260 }
261
262 let row_number_schema = Schema::testing(&[Type::Uint8]);
264 let mut row_number_encoded = row_number_schema.allocate();
265 row_number_schema.set_u64(&mut row_number_encoded, 0, u64::from(row_number));
266 txn.set(&index_entry_key.encode(), row_number_encoded)?;
267 }
268 }
269
270 Ok(TableInsertResult {
271 namespace: pending.namespace.clone(),
272 table: pending.table.clone(),
273 inserted: total_rows as u64,
274 })
275}
276
277fn execute_ringbuffer_insert<V: ValidationMode>(
279 catalog: &Catalog,
280 txn: &mut CommandTransaction,
281 pending: &PendingRingBufferInsert,
282 type_id: std::any::TypeId,
283) -> crate::Result<RingBufferInsertResult> {
284 let namespace = catalog
285 .find_namespace_by_name(&mut Transaction::Command(txn), &pending.namespace)?
286 .ok_or_else(|| BulkInsertError::namespace_not_found(Fragment::None, &pending.namespace))?;
287
288 let ringbuffer = catalog
289 .find_ringbuffer_by_name(&mut Transaction::Command(txn), namespace.id, &pending.ringbuffer)?
290 .ok_or_else(|| {
291 BulkInsertError::ringbuffer_not_found(Fragment::None, &pending.namespace, &pending.ringbuffer)
292 })?;
293
294 let mut metadata =
295 catalog.find_ringbuffer_metadata(&mut Transaction::Command(txn), ringbuffer.id)?.ok_or_else(|| {
296 BulkInsertError::ringbuffer_not_found(Fragment::None, &pending.namespace, &pending.ringbuffer)
297 })?;
298
299 let schema = crate::vm::instruction::dml::schema::get_or_create_ringbuffer_schema(
301 catalog,
302 &ringbuffer,
303 &mut Transaction::Command(txn),
304 )?;
305
306 let is_validated = type_id == std::any::TypeId::of::<Validated>();
308 let coerced_rows = if is_validated {
309 validate_and_coerce_rows_rb(&pending.rows, &ringbuffer)?
310 } else {
311 reorder_rows_trusted_rb(&pending.rows, &ringbuffer)?
312 };
313
314 let mut inserted_count = 0u64;
315
316 for mut values in coerced_rows {
318 for (idx, col) in ringbuffer.columns.iter().enumerate() {
320 if let Some(dict_id) = col.dictionary_id {
321 let dictionary = catalog
322 .find_dictionary(&mut Transaction::Command(txn), dict_id)?
323 .ok_or_else(|| {
324 reifydb_core::internal_error!(
325 "Dictionary {:?} not found for column {}",
326 dict_id,
327 col.name
328 )
329 })?;
330 let entry_id = txn.insert_into_dictionary(&dictionary, &values[idx])?;
331 values[idx] = entry_id.to_value();
332 }
333 }
334
335 if is_validated {
337 for (idx, col) in ringbuffer.columns.iter().enumerate() {
338 col.constraint.validate(&values[idx])?;
339 }
340 }
341
342 let mut row = schema.allocate();
344 for (idx, value) in values.iter().enumerate() {
345 schema.set_value(&mut row, idx, value);
346 }
347
348 if metadata.is_full() {
350 let oldest_row = RowNumber(metadata.head);
351 txn.remove_from_ringbuffer(ringbuffer.clone(), oldest_row)?;
352 metadata.head += 1;
353 metadata.count -= 1;
354 }
355
356 let row_number = catalog.next_row_number_for_ringbuffer(txn, ringbuffer.id)?;
358
359 txn.insert_ringbuffer_at(ringbuffer.clone(), row_number, row)?;
361
362 if metadata.is_empty() {
364 metadata.head = row_number.0;
365 }
366 metadata.count += 1;
367 metadata.tail = row_number.0 + 1;
368
369 inserted_count += 1;
370 }
371
372 catalog.update_ringbuffer_metadata(txn, metadata)?;
374
375 Ok(RingBufferInsertResult {
376 namespace: pending.namespace.clone(),
377 ringbuffer: pending.ringbuffer.clone(),
378 inserted: inserted_count,
379 })
380}
381
382fn parse_qualified_name(qualified_name: &str) -> (String, String) {
385 if let Some((ns, name)) = qualified_name.split_once('.') {
386 (ns.to_string(), name.to_string())
387 } else {
388 ("default".to_string(), qualified_name.to_string())
389 }
390}