1use std::{any, marker::PhantomData};
5
6use any::TypeId;
7use reifydb_catalog::{
8 catalog::Catalog,
9 error::{CatalogError, CatalogObjectKind},
10};
11use reifydb_core::{
12 encoded::shape::RowShape,
13 error::CoreError,
14 interface::catalog::id::IndexId,
15 internal_error,
16 key::{EncodableKey, index_entry::IndexEntryKey},
17};
18use reifydb_runtime::context::clock::Clock;
19use reifydb_transaction::transaction::{Transaction, command::CommandTransaction};
20use reifydb_type::{
21 fragment::Fragment,
22 value::{Value, identity::IdentityId, row_number::RowNumber, r#type::Type},
23};
24
25use super::{
26 BulkInsertResult, RingBufferInsertResult, TableInsertResult,
27 validation::{
28 reorder_rows_trusted, reorder_rows_trusted_rb, validate_and_coerce_rows, validate_and_coerce_rows_rb,
29 },
30};
31use crate::{
32 Result,
33 bulk_insert::primitive::{
34 ringbuffer::{PendingRingBufferInsert, RingBufferInsertBuilder},
35 table::{PendingTableInsert, TableInsertBuilder},
36 },
37 engine::StandardEngine,
38 transaction::operation::{
39 dictionary::DictionaryOperations, ringbuffer::RingBufferOperations, table::TableOperations,
40 },
41 vm::instruction::dml::{
42 primary_key,
43 shape::{get_or_create_ringbuffer_shape, get_or_create_table_shape},
44 },
45};
46
47pub trait ValidationMode: sealed::Sealed + 'static {}
49
50pub struct Validated;
52impl ValidationMode for Validated {}
53
54pub struct Trusted;
56impl ValidationMode for Trusted {}
57
58pub mod sealed {
59
60 use super::{Trusted, Validated};
61 pub trait Sealed {}
62 impl Sealed for Validated {}
63 impl Sealed for Trusted {}
64}
65
66pub struct BulkInsertBuilder<'e, V: ValidationMode = Validated> {
70 engine: &'e StandardEngine,
71 identity: IdentityId,
72 pending_tables: Vec<PendingTableInsert>,
73 pending_ringbuffers: Vec<PendingRingBufferInsert>,
74 _validation: PhantomData<V>,
75}
76
77impl<'e> BulkInsertBuilder<'e, Validated> {
78 pub(crate) fn new(engine: &'e StandardEngine, identity: IdentityId) -> Self {
80 Self {
81 engine,
82 identity,
83 pending_tables: Vec::new(),
84 pending_ringbuffers: Vec::new(),
85 _validation: PhantomData,
86 }
87 }
88}
89
90impl<'e> BulkInsertBuilder<'e, Trusted> {
91 pub(crate) fn new_trusted(engine: &'e StandardEngine, identity: IdentityId) -> Self {
93 Self {
94 engine,
95 identity,
96 pending_tables: Vec::new(),
97 pending_ringbuffers: Vec::new(),
98 _validation: PhantomData,
99 }
100 }
101}
102
103impl<'e, V: ValidationMode> BulkInsertBuilder<'e, V> {
104 pub fn table<'a>(&'a mut self, qualified_name: &str) -> TableInsertBuilder<'a, 'e, V> {
109 let (namespace, table) = parse_qualified_name(qualified_name);
110 TableInsertBuilder::new(self, namespace, table)
111 }
112
113 pub fn ringbuffer<'a>(&'a mut self, qualified_name: &str) -> RingBufferInsertBuilder<'a, 'e, V> {
118 let (namespace, ringbuffer) = parse_qualified_name(qualified_name);
119 RingBufferInsertBuilder::new(self, namespace, ringbuffer)
120 }
121
122 pub(super) fn add_table_insert(&mut self, pending: PendingTableInsert) {
124 self.pending_tables.push(pending);
125 }
126
127 pub(super) fn add_ringbuffer_insert(&mut self, pending: PendingRingBufferInsert) {
129 self.pending_ringbuffers.push(pending);
130 }
131
132 pub fn execute(self) -> Result<BulkInsertResult> {
137 self.engine.reject_if_read_only()?;
138 let mut txn = self.engine.begin_command(self.identity)?;
139 let catalog = self.engine.catalog();
140 let clock = self.engine.clock();
141 let mut result = BulkInsertResult::default();
142
143 for pending in self.pending_tables {
145 let table_result =
146 execute_table_insert(&catalog, &mut txn, &pending, TypeId::of::<V>(), clock)?;
147 result.tables.push(table_result);
148 }
149
150 for pending in self.pending_ringbuffers {
152 let rb_result =
153 execute_ringbuffer_insert(&catalog, &mut txn, &pending, TypeId::of::<V>(), clock)?;
154 result.ringbuffers.push(rb_result);
155 }
156
157 txn.commit()?;
159
160 Ok(result)
161 }
162}
163
164fn execute_table_insert(
166 catalog: &Catalog,
167 txn: &mut CommandTransaction,
168 pending: &PendingTableInsert,
169 type_id: TypeId,
170 clock: &Clock,
171) -> Result<TableInsertResult> {
172 let namespace = catalog
174 .find_namespace_by_name(&mut Transaction::Command(txn), &pending.namespace)?
175 .ok_or_else(|| CatalogError::NotFound {
176 kind: CatalogObjectKind::Namespace,
177 namespace: pending.namespace.to_string(),
178 name: String::new(),
179 fragment: Fragment::None,
180 })?;
181
182 let table = catalog
183 .find_table_by_name(&mut Transaction::Command(txn), namespace.id(), &pending.table)?
184 .ok_or_else(|| CatalogError::NotFound {
185 kind: CatalogObjectKind::Table,
186 namespace: pending.namespace.to_string(),
187 name: pending.table.to_string(),
188 fragment: Fragment::None,
189 })?;
190
191 let shape = get_or_create_table_shape(catalog, &table, &mut Transaction::Command(txn))?;
193
194 let is_validated = type_id == TypeId::of::<Validated>();
196 let coerced_rows = if is_validated {
197 validate_and_coerce_rows(&pending.rows, &table)?
198 } else {
199 reorder_rows_trusted(&pending.rows, &table)?
200 };
201
202 let mut encoded_rows = Vec::with_capacity(coerced_rows.len());
203
204 for mut values in coerced_rows {
205 for (idx, col) in table.columns.iter().enumerate() {
207 if col.auto_increment && matches!(values[idx], Value::None { .. }) {
208 values[idx] = catalog.column_sequence_next_value(txn, table.id, col.id)?;
209 }
210 }
211
212 for (idx, col) in table.columns.iter().enumerate() {
214 if let Some(dict_id) = col.dictionary_id {
215 let dictionary = catalog
216 .find_dictionary(&mut Transaction::Command(txn), dict_id)?
217 .ok_or_else(|| {
218 internal_error!(
219 "Dictionary {:?} not found for column {}",
220 dict_id,
221 col.name
222 )
223 })?;
224 let entry_id = txn.insert_into_dictionary(&dictionary, &values[idx])?;
225 values[idx] = entry_id.to_value();
226 }
227 }
228
229 if is_validated {
231 for (idx, col) in table.columns.iter().enumerate() {
232 col.constraint.validate(&values[idx])?;
233 }
234 }
235
236 let mut row = shape.allocate();
238 for (idx, value) in values.iter().enumerate() {
239 shape.set_value(&mut row, idx, value);
240 }
241
242 let now_nanos = clock.now_nanos();
243 row.set_timestamps(now_nanos, now_nanos);
244
245 encoded_rows.push(row);
246 }
247
248 let total_rows = encoded_rows.len();
249 if total_rows == 0 {
250 return Ok(TableInsertResult {
251 namespace: pending.namespace.clone(),
252 table: pending.table.clone(),
253 inserted: 0,
254 });
255 }
256
257 let row_numbers = catalog.next_row_number_batch(txn, table.id, total_rows as u64)?;
258
259 let pk_def = primary_key::get_primary_key(catalog, &mut Transaction::Command(txn), &table)?;
261 let row_number_shape = if pk_def.is_some() {
262 Some(RowShape::testing(&[Type::Uint8]))
263 } else {
264 None
265 };
266
267 for (row, &row_number) in encoded_rows.iter().zip(row_numbers.iter()) {
269 txn.insert_table(&table, &shape, row.clone(), row_number)?;
270
271 if let Some(ref pk_def) = pk_def {
273 let index_key = primary_key::encode_primary_key(pk_def, row, &table, &shape)?;
274 let index_entry_key =
275 IndexEntryKey::new(table.id, IndexId::primary(pk_def.id), index_key.clone());
276
277 if txn.contains_key(&index_entry_key.encode())? {
279 let key_columns = pk_def.columns.iter().map(|c| c.name.clone()).collect();
280 return Err(CoreError::PrimaryKeyViolation {
281 fragment: Fragment::None,
282 table_name: table.name.clone(),
283 key_columns,
284 }
285 .into());
286 }
287
288 let rns = row_number_shape.as_ref().unwrap();
290 let mut row_number_encoded = rns.allocate();
291 rns.set_u64(&mut row_number_encoded, 0, u64::from(row_number));
292 txn.set(&index_entry_key.encode(), row_number_encoded)?;
293 }
294 }
295
296 Ok(TableInsertResult {
297 namespace: pending.namespace.clone(),
298 table: pending.table.clone(),
299 inserted: total_rows as u64,
300 })
301}
302
303fn execute_ringbuffer_insert(
305 catalog: &Catalog,
306 txn: &mut CommandTransaction,
307 pending: &PendingRingBufferInsert,
308 type_id: TypeId,
309 clock: &Clock,
310) -> Result<RingBufferInsertResult> {
311 let namespace = catalog
312 .find_namespace_by_name(&mut Transaction::Command(txn), &pending.namespace)?
313 .ok_or_else(|| CatalogError::NotFound {
314 kind: CatalogObjectKind::Namespace,
315 namespace: pending.namespace.to_string(),
316 name: String::new(),
317 fragment: Fragment::None,
318 })?;
319
320 let ringbuffer = catalog
321 .find_ringbuffer_by_name(&mut Transaction::Command(txn), namespace.id(), &pending.ringbuffer)?
322 .ok_or_else(|| CatalogError::NotFound {
323 kind: CatalogObjectKind::RingBuffer,
324 namespace: pending.namespace.to_string(),
325 name: pending.ringbuffer.to_string(),
326 fragment: Fragment::None,
327 })?;
328
329 let mut metadata = catalog
330 .find_ringbuffer_metadata(&mut Transaction::Command(txn), ringbuffer.id)?
331 .ok_or_else(|| CatalogError::NotFound {
332 kind: CatalogObjectKind::RingBuffer,
333 namespace: pending.namespace.to_string(),
334 name: pending.ringbuffer.to_string(),
335 fragment: Fragment::None,
336 })?;
337
338 let shape = get_or_create_ringbuffer_shape(catalog, &ringbuffer, &mut Transaction::Command(txn))?;
340
341 let is_validated = type_id == TypeId::of::<Validated>();
343 let coerced_rows = if is_validated {
344 validate_and_coerce_rows_rb(&pending.rows, &ringbuffer)?
345 } else {
346 reorder_rows_trusted_rb(&pending.rows, &ringbuffer)?
347 };
348
349 let mut inserted_count = 0u64;
350
351 for mut values in coerced_rows {
353 for (idx, col) in ringbuffer.columns.iter().enumerate() {
355 if let Some(dict_id) = col.dictionary_id {
356 let dictionary = catalog
357 .find_dictionary(&mut Transaction::Command(txn), dict_id)?
358 .ok_or_else(|| {
359 internal_error!(
360 "Dictionary {:?} not found for column {}",
361 dict_id,
362 col.name
363 )
364 })?;
365 let entry_id = txn.insert_into_dictionary(&dictionary, &values[idx])?;
366 values[idx] = entry_id.to_value();
367 }
368 }
369
370 if is_validated {
372 for (idx, col) in ringbuffer.columns.iter().enumerate() {
373 col.constraint.validate(&values[idx])?;
374 }
375 }
376
377 let mut row = shape.allocate();
379 for (idx, value) in values.iter().enumerate() {
380 shape.set_value(&mut row, idx, value);
381 }
382
383 let now_nanos = clock.now_nanos();
384 row.set_timestamps(now_nanos, now_nanos);
385
386 if metadata.is_full() {
387 let oldest_row = RowNumber(metadata.head);
388 txn.remove_from_ringbuffer(&ringbuffer, oldest_row)?;
389 metadata.head += 1;
390 metadata.count -= 1;
391 }
392
393 let row_number = catalog.next_row_number_for_ringbuffer(txn, ringbuffer.id)?;
395
396 txn.insert_ringbuffer_at(&ringbuffer, &shape, row_number, row)?;
398
399 if metadata.is_empty() {
401 metadata.head = row_number.0;
402 }
403 metadata.count += 1;
404 metadata.tail = row_number.0 + 1;
405
406 inserted_count += 1;
407 }
408
409 catalog.update_ringbuffer_metadata(txn, metadata)?;
411
412 Ok(RingBufferInsertResult {
413 namespace: pending.namespace.clone(),
414 ringbuffer: pending.ringbuffer.clone(),
415 inserted: inserted_count,
416 })
417}
418
419fn parse_qualified_name(qualified_name: &str) -> (String, String) {
422 if let Some((ns, name)) = qualified_name.rsplit_once("::") {
423 (ns.to_string(), name.to_string())
424 } else {
425 ("default".to_string(), qualified_name.to_string())
426 }
427}
428
429#[cfg(test)]
430mod tests {
431 use super::*;
432
433 #[test]
434 fn parse_qualified_name_simple() {
435 assert_eq!(parse_qualified_name("table"), ("default".to_string(), "table".to_string()));
436 }
437
438 #[test]
439 fn parse_qualified_name_single_namespace() {
440 assert_eq!(parse_qualified_name("ns::table"), ("ns".to_string(), "table".to_string()));
441 }
442
443 #[test]
444 fn parse_qualified_name_nested_namespace() {
445 assert_eq!(parse_qualified_name("a::b::table"), ("a::b".to_string(), "table".to_string()));
446 }
447
448 #[test]
449 fn parse_qualified_name_deeply_nested_namespace() {
450 assert_eq!(parse_qualified_name("a::b::c::table"), ("a::b::c".to_string(), "table".to_string()));
451 }
452
453 #[test]
454 fn parse_qualified_name_empty_string() {
455 assert_eq!(parse_qualified_name(""), ("default".to_string(), "".to_string()));
456 }
457}