Skip to main content

reifydb_engine/bulk_insert/
builder.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use std::{any, marker::PhantomData};
5
6use any::TypeId;
7use reifydb_catalog::{
8	catalog::Catalog,
9	error::{CatalogError, CatalogObjectKind},
10};
11use reifydb_core::{
12	encoded::shape::RowShape,
13	error::CoreError,
14	interface::catalog::id::IndexId,
15	internal_error,
16	key::{EncodableKey, index_entry::IndexEntryKey},
17};
18use reifydb_runtime::context::clock::Clock;
19use reifydb_transaction::transaction::{Transaction, command::CommandTransaction};
20use reifydb_type::{
21	fragment::Fragment,
22	value::{Value, identity::IdentityId, row_number::RowNumber, r#type::Type},
23};
24
25use super::{
26	BulkInsertResult, RingBufferInsertResult, TableInsertResult,
27	validation::{
28		reorder_rows_trusted, reorder_rows_trusted_rb, validate_and_coerce_rows, validate_and_coerce_rows_rb,
29	},
30};
31use crate::{
32	Result,
33	bulk_insert::primitive::{
34		ringbuffer::{PendingRingBufferInsert, RingBufferInsertBuilder},
35		table::{PendingTableInsert, TableInsertBuilder},
36	},
37	engine::StandardEngine,
38	transaction::operation::{
39		dictionary::DictionaryOperations, ringbuffer::RingBufferOperations, table::TableOperations,
40	},
41	vm::instruction::dml::{
42		primary_key,
43		shape::{get_or_create_ringbuffer_shape, get_or_create_table_shape},
44	},
45};
46
47/// Marker trait for validation mode (sealed)
48pub trait ValidationMode: sealed::Sealed + 'static {}
49
50/// Validated mode - performs full type checking and constraint validation
51pub struct Validated;
52impl ValidationMode for Validated {}
53
54/// Trusted mode - skips validation for pre-validated internal data
55pub struct Trusted;
56impl ValidationMode for Trusted {}
57
58pub mod sealed {
59
60	use super::{Trusted, Validated};
61	pub trait Sealed {}
62	impl Sealed for Validated {}
63	impl Sealed for Trusted {}
64}
65
66/// Main builder for bulk insert operations.
67///
68/// Type parameter `V` tracks the validation mode at compile time.
69pub struct BulkInsertBuilder<'e, V: ValidationMode = Validated> {
70	engine: &'e StandardEngine,
71	identity: IdentityId,
72	pending_tables: Vec<PendingTableInsert>,
73	pending_ringbuffers: Vec<PendingRingBufferInsert>,
74	_validation: PhantomData<V>,
75}
76
77impl<'e> BulkInsertBuilder<'e, Validated> {
78	/// Create a new bulk insert builder with full validation enabled.
79	pub(crate) fn new(engine: &'e StandardEngine, identity: IdentityId) -> Self {
80		Self {
81			engine,
82			identity,
83			pending_tables: Vec::new(),
84			pending_ringbuffers: Vec::new(),
85			_validation: PhantomData,
86		}
87	}
88}
89
90impl<'e> BulkInsertBuilder<'e, Trusted> {
91	/// Create a new bulk insert builder with validation disabled (trusted mode).
92	pub(crate) fn new_trusted(engine: &'e StandardEngine, identity: IdentityId) -> Self {
93		Self {
94			engine,
95			identity,
96			pending_tables: Vec::new(),
97			pending_ringbuffers: Vec::new(),
98			_validation: PhantomData,
99		}
100	}
101}
102
103impl<'e, V: ValidationMode> BulkInsertBuilder<'e, V> {
104	/// Begin inserting into a table.
105	///
106	/// The qualified name can be either "namespace::table" or just "table"
107	/// (which uses the default namespace).
108	pub fn table<'a>(&'a mut self, qualified_name: &str) -> TableInsertBuilder<'a, 'e, V> {
109		let (namespace, table) = parse_qualified_name(qualified_name);
110		TableInsertBuilder::new(self, namespace, table)
111	}
112
113	/// Begin inserting into a ring buffer.
114	///
115	/// The qualified name can be either "namespace::ringbuffer" or just "ringbuffer"
116	/// (which uses the default namespace).
117	pub fn ringbuffer<'a>(&'a mut self, qualified_name: &str) -> RingBufferInsertBuilder<'a, 'e, V> {
118		let (namespace, ringbuffer) = parse_qualified_name(qualified_name);
119		RingBufferInsertBuilder::new(self, namespace, ringbuffer)
120	}
121
122	/// Add a pending table insert (called by TableInsertBuilder::done)
123	pub(super) fn add_table_insert(&mut self, pending: PendingTableInsert) {
124		self.pending_tables.push(pending);
125	}
126
127	/// Add a pending ring buffer insert (called by RingBufferInsertBuilder::done)
128	pub(super) fn add_ringbuffer_insert(&mut self, pending: PendingRingBufferInsert) {
129		self.pending_ringbuffers.push(pending);
130	}
131
132	/// Execute all pending inserts in a single transaction.
133	///
134	/// Returns a summary of what was inserted. On error, the entire
135	/// transaction is rolled back (no partial inserts).
136	pub fn execute(self) -> Result<BulkInsertResult> {
137		self.engine.reject_if_read_only()?;
138		let mut txn = self.engine.begin_command(self.identity)?;
139		let catalog = self.engine.catalog();
140		let clock = self.engine.clock();
141		let mut result = BulkInsertResult::default();
142
143		// Process all pending table inserts
144		for pending in self.pending_tables {
145			let table_result =
146				execute_table_insert(&catalog, &mut txn, &pending, TypeId::of::<V>(), clock)?;
147			result.tables.push(table_result);
148		}
149
150		// Process all pending ring buffer inserts
151		for pending in self.pending_ringbuffers {
152			let rb_result =
153				execute_ringbuffer_insert(&catalog, &mut txn, &pending, TypeId::of::<V>(), clock)?;
154			result.ringbuffers.push(rb_result);
155		}
156
157		// Commit the transaction
158		txn.commit()?;
159
160		Ok(result)
161	}
162}
163
164/// Execute a table insert within a transaction
165fn execute_table_insert(
166	catalog: &Catalog,
167	txn: &mut CommandTransaction,
168	pending: &PendingTableInsert,
169	type_id: TypeId,
170	clock: &Clock,
171) -> Result<TableInsertResult> {
172	// 1. Look up namespace and table from catalog
173	let namespace = catalog
174		.find_namespace_by_name(&mut Transaction::Command(txn), &pending.namespace)?
175		.ok_or_else(|| CatalogError::NotFound {
176			kind: CatalogObjectKind::Namespace,
177			namespace: pending.namespace.to_string(),
178			name: String::new(),
179			fragment: Fragment::None,
180		})?;
181
182	let table = catalog
183		.find_table_by_name(&mut Transaction::Command(txn), namespace.id(), &pending.table)?
184		.ok_or_else(|| CatalogError::NotFound {
185			kind: CatalogObjectKind::Table,
186			namespace: pending.namespace.to_string(),
187			name: pending.table.to_string(),
188			fragment: Fragment::None,
189		})?;
190
191	// 2. Get or create shape with proper field names and constraints
192	let shape = get_or_create_table_shape(catalog, &table, &mut Transaction::Command(txn))?;
193
194	// 3. Validate and coerce all rows in batch (fail-fast)
195	let is_validated = type_id == TypeId::of::<Validated>();
196	let coerced_rows = if is_validated {
197		validate_and_coerce_rows(&pending.rows, &table)?
198	} else {
199		reorder_rows_trusted(&pending.rows, &table)?
200	};
201
202	let mut encoded_rows = Vec::with_capacity(coerced_rows.len());
203
204	for mut values in coerced_rows {
205		// Handle auto-increment columns
206		for (idx, col) in table.columns.iter().enumerate() {
207			if col.auto_increment && matches!(values[idx], Value::None { .. }) {
208				values[idx] = catalog.column_sequence_next_value(txn, table.id, col.id)?;
209			}
210		}
211
212		// Handle dictionary encoding
213		for (idx, col) in table.columns.iter().enumerate() {
214			if let Some(dict_id) = col.dictionary_id {
215				let dictionary = catalog
216					.find_dictionary(&mut Transaction::Command(txn), dict_id)?
217					.ok_or_else(|| {
218						internal_error!(
219							"Dictionary {:?} not found for column {}",
220							dict_id,
221							col.name
222						)
223					})?;
224				let entry_id = txn.insert_into_dictionary(&dictionary, &values[idx])?;
225				values[idx] = entry_id.to_value();
226			}
227		}
228
229		// Validate constraints (coercion is done in batch, but final constraint check still needed)
230		if is_validated {
231			for (idx, col) in table.columns.iter().enumerate() {
232				col.constraint.validate(&values[idx])?;
233			}
234		}
235
236		// Encode the row
237		let mut row = shape.allocate();
238		for (idx, value) in values.iter().enumerate() {
239			shape.set_value(&mut row, idx, value);
240		}
241
242		let now_nanos = clock.now_nanos();
243		row.set_timestamps(now_nanos, now_nanos);
244
245		encoded_rows.push(row);
246	}
247
248	let total_rows = encoded_rows.len();
249	if total_rows == 0 {
250		return Ok(TableInsertResult {
251			namespace: pending.namespace.clone(),
252			table: pending.table.clone(),
253			inserted: 0,
254		});
255	}
256
257	let row_numbers = catalog.next_row_number_batch(txn, table.id, total_rows as u64)?;
258
259	// Hoist loop-invariant computations out of the insertion loop
260	let pk_def = primary_key::get_primary_key(catalog, &mut Transaction::Command(txn), &table)?;
261	let row_number_shape = if pk_def.is_some() {
262		Some(RowShape::testing(&[Type::Uint8]))
263	} else {
264		None
265	};
266
267	// 5. Insert all rows with their row numbers
268	for (row, &row_number) in encoded_rows.iter().zip(row_numbers.iter()) {
269		txn.insert_table(&table, &shape, row.clone(), row_number)?;
270
271		// Handle primary key index if table has one
272		if let Some(ref pk_def) = pk_def {
273			let index_key = primary_key::encode_primary_key(pk_def, row, &table, &shape)?;
274			let index_entry_key =
275				IndexEntryKey::new(table.id, IndexId::primary(pk_def.id), index_key.clone());
276
277			// Check for primary key violation
278			if txn.contains_key(&index_entry_key.encode())? {
279				let key_columns = pk_def.columns.iter().map(|c| c.name.clone()).collect();
280				return Err(CoreError::PrimaryKeyViolation {
281					fragment: Fragment::None,
282					table_name: table.name.clone(),
283					key_columns,
284				}
285				.into());
286			}
287
288			// Store the index entry
289			let rns = row_number_shape.as_ref().unwrap();
290			let mut row_number_encoded = rns.allocate();
291			rns.set_u64(&mut row_number_encoded, 0, u64::from(row_number));
292			txn.set(&index_entry_key.encode(), row_number_encoded)?;
293		}
294	}
295
296	Ok(TableInsertResult {
297		namespace: pending.namespace.clone(),
298		table: pending.table.clone(),
299		inserted: total_rows as u64,
300	})
301}
302
303/// Execute a ring buffer insert within a transaction
304fn execute_ringbuffer_insert(
305	catalog: &Catalog,
306	txn: &mut CommandTransaction,
307	pending: &PendingRingBufferInsert,
308	type_id: TypeId,
309	clock: &Clock,
310) -> Result<RingBufferInsertResult> {
311	let namespace = catalog
312		.find_namespace_by_name(&mut Transaction::Command(txn), &pending.namespace)?
313		.ok_or_else(|| CatalogError::NotFound {
314			kind: CatalogObjectKind::Namespace,
315			namespace: pending.namespace.to_string(),
316			name: String::new(),
317			fragment: Fragment::None,
318		})?;
319
320	let ringbuffer = catalog
321		.find_ringbuffer_by_name(&mut Transaction::Command(txn), namespace.id(), &pending.ringbuffer)?
322		.ok_or_else(|| CatalogError::NotFound {
323			kind: CatalogObjectKind::RingBuffer,
324			namespace: pending.namespace.to_string(),
325			name: pending.ringbuffer.to_string(),
326			fragment: Fragment::None,
327		})?;
328
329	let mut metadata = catalog
330		.find_ringbuffer_metadata(&mut Transaction::Command(txn), ringbuffer.id)?
331		.ok_or_else(|| CatalogError::NotFound {
332			kind: CatalogObjectKind::RingBuffer,
333			namespace: pending.namespace.to_string(),
334			name: pending.ringbuffer.to_string(),
335			fragment: Fragment::None,
336		})?;
337
338	// Get or create shape with proper field names and constraints
339	let shape = get_or_create_ringbuffer_shape(catalog, &ringbuffer, &mut Transaction::Command(txn))?;
340
341	// 3. Validate and coerce all rows in batch (fail-fast)
342	let is_validated = type_id == TypeId::of::<Validated>();
343	let coerced_rows = if is_validated {
344		validate_and_coerce_rows_rb(&pending.rows, &ringbuffer)?
345	} else {
346		reorder_rows_trusted_rb(&pending.rows, &ringbuffer)?
347	};
348
349	let mut inserted_count = 0u64;
350
351	// 4. Process each coerced row
352	for mut values in coerced_rows {
353		// Handle dictionary encoding
354		for (idx, col) in ringbuffer.columns.iter().enumerate() {
355			if let Some(dict_id) = col.dictionary_id {
356				let dictionary = catalog
357					.find_dictionary(&mut Transaction::Command(txn), dict_id)?
358					.ok_or_else(|| {
359						internal_error!(
360							"Dictionary {:?} not found for column {}",
361							dict_id,
362							col.name
363						)
364					})?;
365				let entry_id = txn.insert_into_dictionary(&dictionary, &values[idx])?;
366				values[idx] = entry_id.to_value();
367			}
368		}
369
370		// Validate constraints (coercion is done in batch, but final constraint check still needed)
371		if is_validated {
372			for (idx, col) in ringbuffer.columns.iter().enumerate() {
373				col.constraint.validate(&values[idx])?;
374			}
375		}
376
377		// Encode the row
378		let mut row = shape.allocate();
379		for (idx, value) in values.iter().enumerate() {
380			shape.set_value(&mut row, idx, value);
381		}
382
383		let now_nanos = clock.now_nanos();
384		row.set_timestamps(now_nanos, now_nanos);
385
386		if metadata.is_full() {
387			let oldest_row = RowNumber(metadata.head);
388			txn.remove_from_ringbuffer(&ringbuffer, oldest_row)?;
389			metadata.head += 1;
390			metadata.count -= 1;
391		}
392
393		// Allocate row number
394		let row_number = catalog.next_row_number_for_ringbuffer(txn, ringbuffer.id)?;
395
396		// Store the row
397		txn.insert_ringbuffer_at(&ringbuffer, &shape, row_number, row)?;
398
399		// Update metadata
400		if metadata.is_empty() {
401			metadata.head = row_number.0;
402		}
403		metadata.count += 1;
404		metadata.tail = row_number.0 + 1;
405
406		inserted_count += 1;
407	}
408
409	// Save updated metadata
410	catalog.update_ringbuffer_metadata(txn, metadata)?;
411
412	Ok(RingBufferInsertResult {
413		namespace: pending.namespace.clone(),
414		ringbuffer: pending.ringbuffer.clone(),
415		inserted: inserted_count,
416	})
417}
418
419/// Parse a qualified name like "namespace::table" into (namespace, name).
420/// If no namespace is provided, uses "default".
421fn parse_qualified_name(qualified_name: &str) -> (String, String) {
422	if let Some((ns, name)) = qualified_name.rsplit_once("::") {
423		(ns.to_string(), name.to_string())
424	} else {
425		("default".to_string(), qualified_name.to_string())
426	}
427}
428
429#[cfg(test)]
430mod tests {
431	use super::*;
432
433	#[test]
434	fn parse_qualified_name_simple() {
435		assert_eq!(parse_qualified_name("table"), ("default".to_string(), "table".to_string()));
436	}
437
438	#[test]
439	fn parse_qualified_name_single_namespace() {
440		assert_eq!(parse_qualified_name("ns::table"), ("ns".to_string(), "table".to_string()));
441	}
442
443	#[test]
444	fn parse_qualified_name_nested_namespace() {
445		assert_eq!(parse_qualified_name("a::b::table"), ("a::b".to_string(), "table".to_string()));
446	}
447
448	#[test]
449	fn parse_qualified_name_deeply_nested_namespace() {
450		assert_eq!(parse_qualified_name("a::b::c::table"), ("a::b::c".to_string(), "table".to_string()));
451	}
452
453	#[test]
454	fn parse_qualified_name_empty_string() {
455		assert_eq!(parse_qualified_name(""), ("default".to_string(), "".to_string()));
456	}
457}