reifydb_engine/bulk_insert/
builder.rs1use std::marker::PhantomData;
5
6use reifydb_catalog::{CatalogStore, sequence::RowSequence};
7use reifydb_core::interface::{Identity, MultiVersionCommandTransaction, MultiVersionQueryTransaction};
8use reifydb_type::{Fragment, Value};
9
10use super::{
11 BulkInsertResult, RingBufferInsertResult, TableInsertResult,
12 error::BulkInsertError,
13 source::{PendingRingBufferInsert, PendingTableInsert, RingBufferInsertBuilder, TableInsertBuilder},
14 validation::{
15 reorder_rows_trusted, reorder_rows_trusted_rb, validate_and_coerce_rows, validate_and_coerce_rows_rb,
16 },
17};
18use crate::{
19 StandardCommandTransaction, StandardEngine, encoding::encode_value,
20 transaction::operation::RingBufferOperations,
21};
22
23pub trait ValidationMode: sealed::Sealed + 'static {}
25
26pub struct Validated;
28impl ValidationMode for Validated {}
29
30pub struct Trusted;
32impl ValidationMode for Trusted {}
33
34mod sealed {
35 pub trait Sealed {}
36 impl Sealed for super::Validated {}
37 impl Sealed for super::Trusted {}
38}
39
40pub struct BulkInsertBuilder<'e, V: ValidationMode = Validated> {
44 engine: &'e StandardEngine,
45 _identity: &'e Identity,
46 pending_tables: Vec<PendingTableInsert>,
47 pending_ringbuffers: Vec<PendingRingBufferInsert>,
48 _validation: PhantomData<V>,
49}
50
51impl<'e> BulkInsertBuilder<'e, Validated> {
52 pub(crate) fn new(engine: &'e StandardEngine, identity: &'e Identity) -> Self {
54 Self {
55 engine,
56 _identity: identity,
57 pending_tables: Vec::new(),
58 pending_ringbuffers: Vec::new(),
59 _validation: PhantomData,
60 }
61 }
62}
63
64impl<'e> BulkInsertBuilder<'e, Trusted> {
65 pub(crate) fn new_trusted(engine: &'e StandardEngine, identity: &'e Identity) -> Self {
67 Self {
68 engine,
69 _identity: identity,
70 pending_tables: Vec::new(),
71 pending_ringbuffers: Vec::new(),
72 _validation: PhantomData,
73 }
74 }
75}
76
77impl<'e, V: ValidationMode> BulkInsertBuilder<'e, V> {
78 pub fn table<'a>(&'a mut self, qualified_name: &str) -> TableInsertBuilder<'a, 'e, V> {
83 let (namespace, table) = parse_qualified_name(qualified_name);
84 TableInsertBuilder::new(self, namespace, table)
85 }
86
87 pub fn ringbuffer<'a>(&'a mut self, qualified_name: &str) -> RingBufferInsertBuilder<'a, 'e, V> {
92 let (namespace, ringbuffer) = parse_qualified_name(qualified_name);
93 RingBufferInsertBuilder::new(self, namespace, ringbuffer)
94 }
95
96 pub(super) fn add_table_insert(&mut self, pending: PendingTableInsert) {
98 self.pending_tables.push(pending);
99 }
100
101 pub(super) fn add_ringbuffer_insert(&mut self, pending: PendingRingBufferInsert) {
103 self.pending_ringbuffers.push(pending);
104 }
105
106 pub async fn execute(self) -> crate::Result<BulkInsertResult> {
111 use reifydb_core::interface::Engine;
112
113 let mut txn = self.engine.begin_command().await?;
114 let mut result = BulkInsertResult::default();
115
116 for pending in self.pending_tables {
118 let table_result =
119 execute_table_insert::<V>(&mut txn, &pending, std::any::TypeId::of::<V>()).await?;
120 result.tables.push(table_result);
121 }
122
123 for pending in self.pending_ringbuffers {
125 let rb_result =
126 execute_ringbuffer_insert::<V>(&mut txn, &pending, std::any::TypeId::of::<V>()).await?;
127 result.ringbuffers.push(rb_result);
128 }
129
130 txn.commit().await?;
132
133 Ok(result)
134 }
135}
136
137async fn execute_table_insert<V: ValidationMode>(
139 txn: &mut StandardCommandTransaction,
140 pending: &PendingTableInsert,
141 type_id: std::any::TypeId,
142) -> crate::Result<TableInsertResult> {
143 use reifydb_catalog::sequence::ColumnSequence;
144 use reifydb_core::value::encoded::EncodedValuesLayout;
145 use reifydb_type::Type;
146
147 use crate::{
148 execute::mutate::primary_key,
149 transaction::operation::{DictionaryOperations, TableOperations},
150 };
151
152 let namespace = CatalogStore::find_namespace_by_name(txn, &pending.namespace)
154 .await?
155 .ok_or_else(|| BulkInsertError::namespace_not_found(Fragment::None, &pending.namespace))?;
156
157 let table = CatalogStore::find_table_by_name(txn, namespace.id, &pending.table)
158 .await?
159 .ok_or_else(|| BulkInsertError::table_not_found(Fragment::None, &pending.namespace, &pending.table))?;
160
161 let mut table_types: Vec<Type> = Vec::with_capacity(table.columns.len());
163 for c in &table.columns {
164 let ty = if let Some(dict_id) = c.dictionary_id {
165 match CatalogStore::find_dictionary(txn, dict_id).await {
166 Ok(Some(d)) => d.id_type,
167 _ => c.constraint.get_type(),
168 }
169 } else {
170 c.constraint.get_type()
171 };
172 table_types.push(ty);
173 }
174 let layout = EncodedValuesLayout::new(&table_types);
175
176 let is_validated = type_id == std::any::TypeId::of::<Validated>();
178 let coerced_rows = if is_validated {
179 validate_and_coerce_rows(&pending.rows, &table)?
180 } else {
181 reorder_rows_trusted(&pending.rows, &table)?
182 };
183
184 let mut encoded_rows = Vec::with_capacity(coerced_rows.len());
185
186 for mut values in coerced_rows {
187 for (idx, col) in table.columns.iter().enumerate() {
189 if col.auto_increment && matches!(values[idx], Value::Undefined) {
190 values[idx] = ColumnSequence::next_value(txn, table.id, col.id).await?;
191 }
192 }
193
194 for (idx, col) in table.columns.iter().enumerate() {
196 if let Some(dict_id) = col.dictionary_id {
197 let dictionary =
198 CatalogStore::find_dictionary(txn, dict_id).await?.ok_or_else(|| {
199 reifydb_type::internal_error!(
200 "Dictionary {:?} not found for column {}",
201 dict_id,
202 col.name
203 )
204 })?;
205 let entry_id = txn.insert_into_dictionary(&dictionary, &values[idx]).await?;
206 values[idx] = entry_id.to_value();
207 }
208 }
209
210 if is_validated {
212 for (idx, col) in table.columns.iter().enumerate() {
213 col.constraint.validate(&values[idx])?;
214 }
215 }
216
217 let mut row = layout.allocate();
219 for (idx, value) in values.iter().enumerate() {
220 encode_value(&layout, &mut row, idx, value);
221 }
222 encoded_rows.push(row);
223 }
224
225 let total_rows = encoded_rows.len();
227 if total_rows == 0 {
228 return Ok(TableInsertResult {
229 namespace: pending.namespace.clone(),
230 table: pending.table.clone(),
231 inserted: 0,
232 });
233 }
234
235 let row_numbers = RowSequence::next_row_number_batch(txn, table.id, total_rows as u64).await?;
236
237 for (row, &row_number) in encoded_rows.iter().zip(row_numbers.iter()) {
239 txn.insert_table(table.clone(), row.clone(), row_number).await?;
240
241 if let Some(pk_def) = primary_key::get_primary_key(txn, &table).await? {
243 use reifydb_core::interface::{EncodableKey, IndexEntryKey, IndexId};
244
245 let index_key = primary_key::encode_primary_key(&pk_def, row, &table, &layout)?;
246 let index_entry_key =
247 IndexEntryKey::new(table.id, IndexId::primary(pk_def.id), index_key.clone());
248
249 if txn.contains_key(&index_entry_key.encode()).await? {
251 let key_columns = pk_def.columns.iter().map(|c| c.name.clone()).collect();
252 reifydb_core::return_error!(reifydb_type::diagnostic::index::primary_key_violation(
253 Fragment::None,
254 table.name.clone(),
255 key_columns,
256 ));
257 }
258
259 let row_number_layout = EncodedValuesLayout::new(&[Type::Uint8]);
261 let mut row_number_encoded = row_number_layout.allocate();
262 row_number_layout.set_u64(&mut row_number_encoded, 0, u64::from(row_number));
263 txn.set(&index_entry_key.encode(), row_number_encoded).await?;
264 }
265 }
266
267 Ok(TableInsertResult {
268 namespace: pending.namespace.clone(),
269 table: pending.table.clone(),
270 inserted: total_rows as u64,
271 })
272}
273
274async fn execute_ringbuffer_insert<V: ValidationMode>(
276 txn: &mut StandardCommandTransaction,
277 pending: &PendingRingBufferInsert,
278 type_id: std::any::TypeId,
279) -> crate::Result<RingBufferInsertResult> {
280 use reifydb_core::value::encoded::EncodedValuesLayout;
281 use reifydb_type::{RowNumber, Type};
282
283 use crate::transaction::operation::DictionaryOperations;
284
285 let namespace = CatalogStore::find_namespace_by_name(txn, &pending.namespace)
287 .await?
288 .ok_or_else(|| BulkInsertError::namespace_not_found(Fragment::None, &pending.namespace))?;
289
290 let ringbuffer = CatalogStore::find_ringbuffer_by_name(txn, namespace.id, &pending.ringbuffer)
291 .await?
292 .ok_or_else(|| {
293 BulkInsertError::ringbuffer_not_found(Fragment::None, &pending.namespace, &pending.ringbuffer)
294 })?;
295
296 let mut metadata = CatalogStore::find_ringbuffer_metadata(txn, ringbuffer.id).await?.ok_or_else(|| {
298 BulkInsertError::ringbuffer_not_found(Fragment::None, &pending.namespace, &pending.ringbuffer)
299 })?;
300
301 let mut rb_types: Vec<Type> = Vec::with_capacity(ringbuffer.columns.len());
303 for c in &ringbuffer.columns {
304 let ty = if let Some(dict_id) = c.dictionary_id {
305 match CatalogStore::find_dictionary(txn, dict_id).await {
306 Ok(Some(d)) => d.id_type,
307 _ => c.constraint.get_type(),
308 }
309 } else {
310 c.constraint.get_type()
311 };
312 rb_types.push(ty);
313 }
314 let layout = EncodedValuesLayout::new(&rb_types);
315
316 let is_validated = type_id == std::any::TypeId::of::<Validated>();
318 let coerced_rows = if is_validated {
319 validate_and_coerce_rows_rb(&pending.rows, &ringbuffer)?
320 } else {
321 reorder_rows_trusted_rb(&pending.rows, &ringbuffer)?
322 };
323
324 let mut inserted_count = 0u64;
325
326 for mut values in coerced_rows {
328 for (idx, col) in ringbuffer.columns.iter().enumerate() {
330 if let Some(dict_id) = col.dictionary_id {
331 let dictionary =
332 CatalogStore::find_dictionary(txn, dict_id).await?.ok_or_else(|| {
333 reifydb_type::internal_error!(
334 "Dictionary {:?} not found for column {}",
335 dict_id,
336 col.name
337 )
338 })?;
339 let entry_id = txn.insert_into_dictionary(&dictionary, &values[idx]).await?;
340 values[idx] = entry_id.to_value();
341 }
342 }
343
344 if is_validated {
346 for (idx, col) in ringbuffer.columns.iter().enumerate() {
347 col.constraint.validate(&values[idx])?;
348 }
349 }
350
351 let mut row = layout.allocate();
353 for (idx, value) in values.iter().enumerate() {
354 encode_value(&layout, &mut row, idx, value);
355 }
356
357 if metadata.is_full() {
359 let oldest_row = RowNumber(metadata.head);
360 txn.remove_from_ringbuffer(ringbuffer.clone(), oldest_row).await?;
361 metadata.head += 1;
362 metadata.count -= 1;
363 }
364
365 let row_number = RowSequence::next_row_number_for_ringbuffer(txn, ringbuffer.id).await?;
367
368 txn.insert_ringbuffer_at(ringbuffer.clone(), row_number, row).await?;
370
371 if metadata.is_empty() {
373 metadata.head = row_number.0;
374 }
375 metadata.count += 1;
376 metadata.tail = row_number.0 + 1;
377
378 inserted_count += 1;
379 }
380
381 CatalogStore::update_ringbuffer_metadata(txn, metadata).await?;
383
384 Ok(RingBufferInsertResult {
385 namespace: pending.namespace.clone(),
386 ringbuffer: pending.ringbuffer.clone(),
387 inserted: inserted_count,
388 })
389}
390
391fn parse_qualified_name(qualified_name: &str) -> (String, String) {
394 if let Some((ns, name)) = qualified_name.split_once('.') {
395 (ns.to_string(), name.to_string())
396 } else {
397 ("default".to_string(), qualified_name.to_string())
398 }
399}