1use std::{any, marker::PhantomData};
5
6use any::TypeId;
7use reifydb_catalog::{
8 catalog::Catalog,
9 error::{CatalogError, CatalogObjectKind},
10};
11use reifydb_core::{
12 encoded::schema::Schema,
13 error::CoreError,
14 interface::catalog::id::IndexId,
15 internal_error,
16 key::{EncodableKey, index_entry::IndexEntryKey},
17};
18use reifydb_transaction::transaction::{Transaction, command::CommandTransaction};
19use reifydb_type::{
20 fragment::Fragment,
21 value::{Value, identity::IdentityId, row_number::RowNumber, r#type::Type},
22};
23
24use super::{
25 BulkInsertResult, RingBufferInsertResult, TableInsertResult,
26 validation::{
27 reorder_rows_trusted, reorder_rows_trusted_rb, validate_and_coerce_rows, validate_and_coerce_rows_rb,
28 },
29};
30use crate::{
31 Result,
32 bulk_insert::primitive::{
33 ringbuffer::{PendingRingBufferInsert, RingBufferInsertBuilder},
34 table::{PendingTableInsert, TableInsertBuilder},
35 },
36 engine::StandardEngine,
37 transaction::operation::{
38 dictionary::DictionaryOperations, ringbuffer::RingBufferOperations, table::TableOperations,
39 },
40 vm::instruction::dml::{
41 primary_key,
42 schema::{get_or_create_ringbuffer_schema, get_or_create_table_schema},
43 },
44};
45
46pub trait ValidationMode: sealed::Sealed + 'static {}
48
49pub struct Validated;
51impl ValidationMode for Validated {}
52
53pub struct Trusted;
55impl ValidationMode for Trusted {}
56
57pub mod sealed {
58
59 use super::{Trusted, Validated};
60 pub trait Sealed {}
61 impl Sealed for Validated {}
62 impl Sealed for Trusted {}
63}
64
65pub struct BulkInsertBuilder<'e, V: ValidationMode = Validated> {
69 engine: &'e StandardEngine,
70 _identity: IdentityId,
71 pending_tables: Vec<PendingTableInsert>,
72 pending_ringbuffers: Vec<PendingRingBufferInsert>,
73 _validation: PhantomData<V>,
74}
75
76impl<'e> BulkInsertBuilder<'e, Validated> {
77 pub(crate) fn new(engine: &'e StandardEngine, identity: IdentityId) -> Self {
79 Self {
80 engine,
81 _identity: identity,
82 pending_tables: Vec::new(),
83 pending_ringbuffers: Vec::new(),
84 _validation: PhantomData,
85 }
86 }
87}
88
89impl<'e> BulkInsertBuilder<'e, Trusted> {
90 pub(crate) fn new_trusted(engine: &'e StandardEngine, identity: IdentityId) -> Self {
92 Self {
93 engine,
94 _identity: identity,
95 pending_tables: Vec::new(),
96 pending_ringbuffers: Vec::new(),
97 _validation: PhantomData,
98 }
99 }
100}
101
102impl<'e, V: ValidationMode> BulkInsertBuilder<'e, V> {
103 pub fn table<'a>(&'a mut self, qualified_name: &str) -> TableInsertBuilder<'a, 'e, V> {
108 let (namespace, table) = parse_qualified_name(qualified_name);
109 TableInsertBuilder::new(self, namespace, table)
110 }
111
112 pub fn ringbuffer<'a>(&'a mut self, qualified_name: &str) -> RingBufferInsertBuilder<'a, 'e, V> {
117 let (namespace, ringbuffer) = parse_qualified_name(qualified_name);
118 RingBufferInsertBuilder::new(self, namespace, ringbuffer)
119 }
120
121 pub(super) fn add_table_insert(&mut self, pending: PendingTableInsert) {
123 self.pending_tables.push(pending);
124 }
125
126 pub(super) fn add_ringbuffer_insert(&mut self, pending: PendingRingBufferInsert) {
128 self.pending_ringbuffers.push(pending);
129 }
130
131 pub fn execute(self) -> Result<BulkInsertResult> {
136 let mut txn = self.engine.begin_command()?;
137 let catalog = self.engine.catalog();
138 let mut result = BulkInsertResult::default();
139
140 for pending in self.pending_tables {
142 let table_result = execute_table_insert::<V>(&catalog, &mut txn, &pending, TypeId::of::<V>())?;
143 result.tables.push(table_result);
144 }
145
146 for pending in self.pending_ringbuffers {
148 let rb_result =
149 execute_ringbuffer_insert::<V>(&catalog, &mut txn, &pending, TypeId::of::<V>())?;
150 result.ringbuffers.push(rb_result);
151 }
152
153 txn.commit()?;
155
156 Ok(result)
157 }
158}
159
160fn execute_table_insert<V: ValidationMode>(
162 catalog: &Catalog,
163 txn: &mut CommandTransaction,
164 pending: &PendingTableInsert,
165 type_id: TypeId,
166) -> Result<TableInsertResult> {
167 let namespace = catalog
169 .find_namespace_by_name(&mut Transaction::Command(txn), &pending.namespace)?
170 .ok_or_else(|| CatalogError::NotFound {
171 kind: CatalogObjectKind::Namespace,
172 namespace: pending.namespace.to_string(),
173 name: String::new(),
174 fragment: Fragment::None,
175 })?;
176
177 let table = catalog
178 .find_table_by_name(&mut Transaction::Command(txn), namespace.id, &pending.table)?
179 .ok_or_else(|| CatalogError::NotFound {
180 kind: CatalogObjectKind::Table,
181 namespace: pending.namespace.to_string(),
182 name: pending.table.to_string(),
183 fragment: Fragment::None,
184 })?;
185
186 let schema = get_or_create_table_schema(catalog, &table, &mut Transaction::Command(txn))?;
188
189 let is_validated = type_id == TypeId::of::<Validated>();
191 let coerced_rows = if is_validated {
192 validate_and_coerce_rows(&pending.rows, &table)?
193 } else {
194 reorder_rows_trusted(&pending.rows, &table)?
195 };
196
197 let mut encoded_rows = Vec::with_capacity(coerced_rows.len());
198
199 for mut values in coerced_rows {
200 for (idx, col) in table.columns.iter().enumerate() {
202 if col.auto_increment && matches!(values[idx], Value::None { .. }) {
203 values[idx] = catalog.column_sequence_next_value(txn, table.id, col.id)?;
204 }
205 }
206
207 for (idx, col) in table.columns.iter().enumerate() {
209 if let Some(dict_id) = col.dictionary_id {
210 let dictionary = catalog
211 .find_dictionary(&mut Transaction::Command(txn), dict_id)?
212 .ok_or_else(|| {
213 internal_error!(
214 "Dictionary {:?} not found for column {}",
215 dict_id,
216 col.name
217 )
218 })?;
219 let entry_id = txn.insert_into_dictionary(&dictionary, &values[idx])?;
220 values[idx] = entry_id.to_value();
221 }
222 }
223
224 if is_validated {
226 for (idx, col) in table.columns.iter().enumerate() {
227 col.constraint.validate(&values[idx])?;
228 }
229 }
230
231 let mut row = schema.allocate();
233 for (idx, value) in values.iter().enumerate() {
234 schema.set_value(&mut row, idx, value);
235 }
236 encoded_rows.push(row);
237 }
238
239 let total_rows = encoded_rows.len();
241 if total_rows == 0 {
242 return Ok(TableInsertResult {
243 namespace: pending.namespace.clone(),
244 table: pending.table.clone(),
245 inserted: 0,
246 });
247 }
248
249 let row_numbers = catalog.next_row_number_batch(txn, table.id, total_rows as u64)?;
250
251 let pk_def = primary_key::get_primary_key(catalog, &mut Transaction::Command(txn), &table)?;
253 let row_number_schema = if pk_def.is_some() {
254 Some(Schema::testing(&[Type::Uint8]))
255 } else {
256 None
257 };
258
259 for (row, &row_number) in encoded_rows.iter().zip(row_numbers.iter()) {
261 txn.insert_table(&table, &schema, row.clone(), row_number)?;
262
263 if let Some(ref pk_def) = pk_def {
265 let index_key = primary_key::encode_primary_key(pk_def, row, &table, &schema)?;
266 let index_entry_key =
267 IndexEntryKey::new(table.id, IndexId::primary(pk_def.id), index_key.clone());
268
269 if txn.contains_key(&index_entry_key.encode())? {
271 let key_columns = pk_def.columns.iter().map(|c| c.name.clone()).collect();
272 return Err(CoreError::PrimaryKeyViolation {
273 fragment: Fragment::None,
274 table_name: table.name.clone(),
275 key_columns,
276 }
277 .into());
278 }
279
280 let rns = row_number_schema.as_ref().unwrap();
282 let mut row_number_encoded = rns.allocate();
283 rns.set_u64(&mut row_number_encoded, 0, u64::from(row_number));
284 txn.set(&index_entry_key.encode(), row_number_encoded)?;
285 }
286 }
287
288 Ok(TableInsertResult {
289 namespace: pending.namespace.clone(),
290 table: pending.table.clone(),
291 inserted: total_rows as u64,
292 })
293}
294
295fn execute_ringbuffer_insert<V: ValidationMode>(
297 catalog: &Catalog,
298 txn: &mut CommandTransaction,
299 pending: &PendingRingBufferInsert,
300 type_id: TypeId,
301) -> Result<RingBufferInsertResult> {
302 let namespace = catalog
303 .find_namespace_by_name(&mut Transaction::Command(txn), &pending.namespace)?
304 .ok_or_else(|| CatalogError::NotFound {
305 kind: CatalogObjectKind::Namespace,
306 namespace: pending.namespace.to_string(),
307 name: String::new(),
308 fragment: Fragment::None,
309 })?;
310
311 let ringbuffer = catalog
312 .find_ringbuffer_by_name(&mut Transaction::Command(txn), namespace.id, &pending.ringbuffer)?
313 .ok_or_else(|| CatalogError::NotFound {
314 kind: CatalogObjectKind::RingBuffer,
315 namespace: pending.namespace.to_string(),
316 name: pending.ringbuffer.to_string(),
317 fragment: Fragment::None,
318 })?;
319
320 let mut metadata = catalog
321 .find_ringbuffer_metadata(&mut Transaction::Command(txn), ringbuffer.id)?
322 .ok_or_else(|| CatalogError::NotFound {
323 kind: CatalogObjectKind::RingBuffer,
324 namespace: pending.namespace.to_string(),
325 name: pending.ringbuffer.to_string(),
326 fragment: Fragment::None,
327 })?;
328
329 let schema = get_or_create_ringbuffer_schema(catalog, &ringbuffer, &mut Transaction::Command(txn))?;
331
332 let is_validated = type_id == TypeId::of::<Validated>();
334 let coerced_rows = if is_validated {
335 validate_and_coerce_rows_rb(&pending.rows, &ringbuffer)?
336 } else {
337 reorder_rows_trusted_rb(&pending.rows, &ringbuffer)?
338 };
339
340 let mut inserted_count = 0u64;
341
342 for mut values in coerced_rows {
344 for (idx, col) in ringbuffer.columns.iter().enumerate() {
346 if let Some(dict_id) = col.dictionary_id {
347 let dictionary = catalog
348 .find_dictionary(&mut Transaction::Command(txn), dict_id)?
349 .ok_or_else(|| {
350 internal_error!(
351 "Dictionary {:?} not found for column {}",
352 dict_id,
353 col.name
354 )
355 })?;
356 let entry_id = txn.insert_into_dictionary(&dictionary, &values[idx])?;
357 values[idx] = entry_id.to_value();
358 }
359 }
360
361 if is_validated {
363 for (idx, col) in ringbuffer.columns.iter().enumerate() {
364 col.constraint.validate(&values[idx])?;
365 }
366 }
367
368 let mut row = schema.allocate();
370 for (idx, value) in values.iter().enumerate() {
371 schema.set_value(&mut row, idx, value);
372 }
373
374 if metadata.is_full() {
376 let oldest_row = RowNumber(metadata.head);
377 txn.remove_from_ringbuffer(&ringbuffer, oldest_row)?;
378 metadata.head += 1;
379 metadata.count -= 1;
380 }
381
382 let row_number = catalog.next_row_number_for_ringbuffer(txn, ringbuffer.id)?;
384
385 txn.insert_ringbuffer_at(&ringbuffer, &schema, row_number, row)?;
387
388 if metadata.is_empty() {
390 metadata.head = row_number.0;
391 }
392 metadata.count += 1;
393 metadata.tail = row_number.0 + 1;
394
395 inserted_count += 1;
396 }
397
398 catalog.update_ringbuffer_metadata(txn, metadata)?;
400
401 Ok(RingBufferInsertResult {
402 namespace: pending.namespace.clone(),
403 ringbuffer: pending.ringbuffer.clone(),
404 inserted: inserted_count,
405 })
406}
407
408fn parse_qualified_name(qualified_name: &str) -> (String, String) {
411 if let Some((ns, name)) = qualified_name.split_once("::") {
412 (ns.to_string(), name.to_string())
413 } else {
414 ("default".to_string(), qualified_name.to_string())
415 }
416}