reifydb_engine/bulk_insert/
builder.rs

1// Copyright (c) reifydb.com 2025
2// This file is licensed under the AGPL-3.0-or-later, see license.md file
3
4use std::marker::PhantomData;
5
6use reifydb_catalog::{CatalogStore, sequence::RowSequence};
7use reifydb_core::interface::{Identity, MultiVersionCommandTransaction, MultiVersionQueryTransaction};
8use reifydb_type::{Fragment, Value};
9
10use super::{
11	BulkInsertResult, RingBufferInsertResult, TableInsertResult,
12	error::BulkInsertError,
13	source::{PendingRingBufferInsert, PendingTableInsert, RingBufferInsertBuilder, TableInsertBuilder},
14	validation::{
15		reorder_rows_trusted, reorder_rows_trusted_rb, validate_and_coerce_rows, validate_and_coerce_rows_rb,
16	},
17};
18use crate::{
19	StandardCommandTransaction, StandardEngine, encoding::encode_value,
20	transaction::operation::RingBufferOperations,
21};
22
23/// Marker trait for validation mode (sealed)
24pub trait ValidationMode: sealed::Sealed + 'static {}
25
26/// Validated mode - performs full type checking and constraint validation
27pub struct Validated;
28impl ValidationMode for Validated {}
29
30/// Trusted mode - skips validation for pre-validated internal data
31pub struct Trusted;
32impl ValidationMode for Trusted {}
33
34mod sealed {
35	pub trait Sealed {}
36	impl Sealed for super::Validated {}
37	impl Sealed for super::Trusted {}
38}
39
40/// Main builder for bulk insert operations.
41///
42/// Type parameter `V` tracks the validation mode at compile time.
43pub struct BulkInsertBuilder<'e, V: ValidationMode = Validated> {
44	engine: &'e StandardEngine,
45	_identity: &'e Identity,
46	pending_tables: Vec<PendingTableInsert>,
47	pending_ringbuffers: Vec<PendingRingBufferInsert>,
48	_validation: PhantomData<V>,
49}
50
51impl<'e> BulkInsertBuilder<'e, Validated> {
52	/// Create a new bulk insert builder with full validation enabled.
53	pub(crate) fn new(engine: &'e StandardEngine, identity: &'e Identity) -> Self {
54		Self {
55			engine,
56			_identity: identity,
57			pending_tables: Vec::new(),
58			pending_ringbuffers: Vec::new(),
59			_validation: PhantomData,
60		}
61	}
62}
63
64impl<'e> BulkInsertBuilder<'e, Trusted> {
65	/// Create a new bulk insert builder with validation disabled (trusted mode).
66	pub(crate) fn new_trusted(engine: &'e StandardEngine, identity: &'e Identity) -> Self {
67		Self {
68			engine,
69			_identity: identity,
70			pending_tables: Vec::new(),
71			pending_ringbuffers: Vec::new(),
72			_validation: PhantomData,
73		}
74	}
75}
76
77impl<'e, V: ValidationMode> BulkInsertBuilder<'e, V> {
78	/// Begin inserting into a table.
79	///
80	/// The qualified name can be either "namespace.table" or just "table"
81	/// (which uses the default namespace).
82	pub fn table<'a>(&'a mut self, qualified_name: &str) -> TableInsertBuilder<'a, 'e, V> {
83		let (namespace, table) = parse_qualified_name(qualified_name);
84		TableInsertBuilder::new(self, namespace, table)
85	}
86
87	/// Begin inserting into a ring buffer.
88	///
89	/// The qualified name can be either "namespace.ringbuffer" or just "ringbuffer"
90	/// (which uses the default namespace).
91	pub fn ringbuffer<'a>(&'a mut self, qualified_name: &str) -> RingBufferInsertBuilder<'a, 'e, V> {
92		let (namespace, ringbuffer) = parse_qualified_name(qualified_name);
93		RingBufferInsertBuilder::new(self, namespace, ringbuffer)
94	}
95
96	/// Add a pending table insert (called by TableInsertBuilder::done)
97	pub(super) fn add_table_insert(&mut self, pending: PendingTableInsert) {
98		self.pending_tables.push(pending);
99	}
100
101	/// Add a pending ring buffer insert (called by RingBufferInsertBuilder::done)
102	pub(super) fn add_ringbuffer_insert(&mut self, pending: PendingRingBufferInsert) {
103		self.pending_ringbuffers.push(pending);
104	}
105
106	/// Execute all pending inserts in a single transaction.
107	///
108	/// Returns a summary of what was inserted. On error, the entire
109	/// transaction is rolled back (no partial inserts).
110	pub async fn execute(self) -> crate::Result<BulkInsertResult> {
111		use reifydb_core::interface::Engine;
112
113		let mut txn = self.engine.begin_command().await?;
114		let mut result = BulkInsertResult::default();
115
116		// Process all pending table inserts
117		for pending in self.pending_tables {
118			let table_result =
119				execute_table_insert::<V>(&mut txn, &pending, std::any::TypeId::of::<V>()).await?;
120			result.tables.push(table_result);
121		}
122
123		// Process all pending ring buffer inserts
124		for pending in self.pending_ringbuffers {
125			let rb_result =
126				execute_ringbuffer_insert::<V>(&mut txn, &pending, std::any::TypeId::of::<V>()).await?;
127			result.ringbuffers.push(rb_result);
128		}
129
130		// Commit the transaction
131		txn.commit().await?;
132
133		Ok(result)
134	}
135}
136
137/// Execute a table insert within a transaction
138async fn execute_table_insert<V: ValidationMode>(
139	txn: &mut StandardCommandTransaction,
140	pending: &PendingTableInsert,
141	type_id: std::any::TypeId,
142) -> crate::Result<TableInsertResult> {
143	use reifydb_catalog::sequence::ColumnSequence;
144	use reifydb_core::value::encoded::EncodedValuesLayout;
145	use reifydb_type::Type;
146
147	use crate::{
148		execute::mutate::primary_key,
149		transaction::operation::{DictionaryOperations, TableOperations},
150	};
151
152	// 1. Look up namespace and table from catalog
153	let namespace = CatalogStore::find_namespace_by_name(txn, &pending.namespace)
154		.await?
155		.ok_or_else(|| BulkInsertError::namespace_not_found(Fragment::None, &pending.namespace))?;
156
157	let table = CatalogStore::find_table_by_name(txn, namespace.id, &pending.table)
158		.await?
159		.ok_or_else(|| BulkInsertError::table_not_found(Fragment::None, &pending.namespace, &pending.table))?;
160
161	// 2. Build layout for encoding - use dictionary ID type for dictionary-encoded columns
162	let mut table_types: Vec<Type> = Vec::with_capacity(table.columns.len());
163	for c in &table.columns {
164		let ty = if let Some(dict_id) = c.dictionary_id {
165			match CatalogStore::find_dictionary(txn, dict_id).await {
166				Ok(Some(d)) => d.id_type,
167				_ => c.constraint.get_type(),
168			}
169		} else {
170			c.constraint.get_type()
171		};
172		table_types.push(ty);
173	}
174	let layout = EncodedValuesLayout::new(&table_types);
175
176	// 3. Validate and coerce all rows in batch (fail-fast)
177	let is_validated = type_id == std::any::TypeId::of::<Validated>();
178	let coerced_rows = if is_validated {
179		validate_and_coerce_rows(&pending.rows, &table)?
180	} else {
181		reorder_rows_trusted(&pending.rows, &table)?
182	};
183
184	let mut encoded_rows = Vec::with_capacity(coerced_rows.len());
185
186	for mut values in coerced_rows {
187		// Handle auto-increment columns
188		for (idx, col) in table.columns.iter().enumerate() {
189			if col.auto_increment && matches!(values[idx], Value::Undefined) {
190				values[idx] = ColumnSequence::next_value(txn, table.id, col.id).await?;
191			}
192		}
193
194		// Handle dictionary encoding
195		for (idx, col) in table.columns.iter().enumerate() {
196			if let Some(dict_id) = col.dictionary_id {
197				let dictionary =
198					CatalogStore::find_dictionary(txn, dict_id).await?.ok_or_else(|| {
199						reifydb_type::internal_error!(
200							"Dictionary {:?} not found for column {}",
201							dict_id,
202							col.name
203						)
204					})?;
205				let entry_id = txn.insert_into_dictionary(&dictionary, &values[idx]).await?;
206				values[idx] = entry_id.to_value();
207			}
208		}
209
210		// Validate constraints (coercion is done in batch, but final constraint check still needed)
211		if is_validated {
212			for (idx, col) in table.columns.iter().enumerate() {
213				col.constraint.validate(&values[idx])?;
214			}
215		}
216
217		// Encode the row
218		let mut row = layout.allocate();
219		for (idx, value) in values.iter().enumerate() {
220			encode_value(&layout, &mut row, idx, value);
221		}
222		encoded_rows.push(row);
223	}
224
225	// 4. Batch allocate row numbers
226	let total_rows = encoded_rows.len();
227	if total_rows == 0 {
228		return Ok(TableInsertResult {
229			namespace: pending.namespace.clone(),
230			table: pending.table.clone(),
231			inserted: 0,
232		});
233	}
234
235	let row_numbers = RowSequence::next_row_number_batch(txn, table.id, total_rows as u64).await?;
236
237	// 5. Insert all rows with their row numbers
238	for (row, &row_number) in encoded_rows.iter().zip(row_numbers.iter()) {
239		txn.insert_table(table.clone(), row.clone(), row_number).await?;
240
241		// Handle primary key index if table has one
242		if let Some(pk_def) = primary_key::get_primary_key(txn, &table).await? {
243			use reifydb_core::interface::{EncodableKey, IndexEntryKey, IndexId};
244
245			let index_key = primary_key::encode_primary_key(&pk_def, row, &table, &layout)?;
246			let index_entry_key =
247				IndexEntryKey::new(table.id, IndexId::primary(pk_def.id), index_key.clone());
248
249			// Check for primary key violation
250			if txn.contains_key(&index_entry_key.encode()).await? {
251				let key_columns = pk_def.columns.iter().map(|c| c.name.clone()).collect();
252				reifydb_core::return_error!(reifydb_type::diagnostic::index::primary_key_violation(
253					Fragment::None,
254					table.name.clone(),
255					key_columns,
256				));
257			}
258
259			// Store the index entry
260			let row_number_layout = EncodedValuesLayout::new(&[Type::Uint8]);
261			let mut row_number_encoded = row_number_layout.allocate();
262			row_number_layout.set_u64(&mut row_number_encoded, 0, u64::from(row_number));
263			txn.set(&index_entry_key.encode(), row_number_encoded).await?;
264		}
265	}
266
267	Ok(TableInsertResult {
268		namespace: pending.namespace.clone(),
269		table: pending.table.clone(),
270		inserted: total_rows as u64,
271	})
272}
273
274/// Execute a ring buffer insert within a transaction
275async fn execute_ringbuffer_insert<V: ValidationMode>(
276	txn: &mut StandardCommandTransaction,
277	pending: &PendingRingBufferInsert,
278	type_id: std::any::TypeId,
279) -> crate::Result<RingBufferInsertResult> {
280	use reifydb_core::value::encoded::EncodedValuesLayout;
281	use reifydb_type::{RowNumber, Type};
282
283	use crate::transaction::operation::DictionaryOperations;
284
285	// 1. Look up namespace and ring buffer from catalog
286	let namespace = CatalogStore::find_namespace_by_name(txn, &pending.namespace)
287		.await?
288		.ok_or_else(|| BulkInsertError::namespace_not_found(Fragment::None, &pending.namespace))?;
289
290	let ringbuffer = CatalogStore::find_ringbuffer_by_name(txn, namespace.id, &pending.ringbuffer)
291		.await?
292		.ok_or_else(|| {
293			BulkInsertError::ringbuffer_not_found(Fragment::None, &pending.namespace, &pending.ringbuffer)
294		})?;
295
296	// Get current metadata
297	let mut metadata = CatalogStore::find_ringbuffer_metadata(txn, ringbuffer.id).await?.ok_or_else(|| {
298		BulkInsertError::ringbuffer_not_found(Fragment::None, &pending.namespace, &pending.ringbuffer)
299	})?;
300
301	// 2. Build layout for encoding
302	let mut rb_types: Vec<Type> = Vec::with_capacity(ringbuffer.columns.len());
303	for c in &ringbuffer.columns {
304		let ty = if let Some(dict_id) = c.dictionary_id {
305			match CatalogStore::find_dictionary(txn, dict_id).await {
306				Ok(Some(d)) => d.id_type,
307				_ => c.constraint.get_type(),
308			}
309		} else {
310			c.constraint.get_type()
311		};
312		rb_types.push(ty);
313	}
314	let layout = EncodedValuesLayout::new(&rb_types);
315
316	// 3. Validate and coerce all rows in batch (fail-fast)
317	let is_validated = type_id == std::any::TypeId::of::<Validated>();
318	let coerced_rows = if is_validated {
319		validate_and_coerce_rows_rb(&pending.rows, &ringbuffer)?
320	} else {
321		reorder_rows_trusted_rb(&pending.rows, &ringbuffer)?
322	};
323
324	let mut inserted_count = 0u64;
325
326	// 4. Process each coerced row
327	for mut values in coerced_rows {
328		// Handle dictionary encoding
329		for (idx, col) in ringbuffer.columns.iter().enumerate() {
330			if let Some(dict_id) = col.dictionary_id {
331				let dictionary =
332					CatalogStore::find_dictionary(txn, dict_id).await?.ok_or_else(|| {
333						reifydb_type::internal_error!(
334							"Dictionary {:?} not found for column {}",
335							dict_id,
336							col.name
337						)
338					})?;
339				let entry_id = txn.insert_into_dictionary(&dictionary, &values[idx]).await?;
340				values[idx] = entry_id.to_value();
341			}
342		}
343
344		// Validate constraints (coercion is done in batch, but final constraint check still needed)
345		if is_validated {
346			for (idx, col) in ringbuffer.columns.iter().enumerate() {
347				col.constraint.validate(&values[idx])?;
348			}
349		}
350
351		// Encode the row
352		let mut row = layout.allocate();
353		for (idx, value) in values.iter().enumerate() {
354			encode_value(&layout, &mut row, idx, value);
355		}
356
357		// Handle ring buffer overflow - delete oldest entry if full
358		if metadata.is_full() {
359			let oldest_row = RowNumber(metadata.head);
360			txn.remove_from_ringbuffer(ringbuffer.clone(), oldest_row).await?;
361			metadata.head += 1;
362			metadata.count -= 1;
363		}
364
365		// Allocate row number
366		let row_number = RowSequence::next_row_number_for_ringbuffer(txn, ringbuffer.id).await?;
367
368		// Store the row
369		txn.insert_ringbuffer_at(ringbuffer.clone(), row_number, row).await?;
370
371		// Update metadata
372		if metadata.is_empty() {
373			metadata.head = row_number.0;
374		}
375		metadata.count += 1;
376		metadata.tail = row_number.0 + 1;
377
378		inserted_count += 1;
379	}
380
381	// Save updated metadata
382	CatalogStore::update_ringbuffer_metadata(txn, metadata).await?;
383
384	Ok(RingBufferInsertResult {
385		namespace: pending.namespace.clone(),
386		ringbuffer: pending.ringbuffer.clone(),
387		inserted: inserted_count,
388	})
389}
390
391/// Parse a qualified name like "namespace.table" into (namespace, name).
392/// If no namespace is provided, uses "default".
393fn parse_qualified_name(qualified_name: &str) -> (String, String) {
394	if let Some((ns, name)) = qualified_name.split_once('.') {
395		(ns.to_string(), name.to_string())
396	} else {
397		("default".to_string(), qualified_name.to_string())
398	}
399}