Skip to main content

reifydb_engine/bulk_insert/
builder.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// Copyright (c) 2025 ReifyDB
3
4use std::{any, marker::PhantomData};
5
6use any::TypeId;
7use reifydb_catalog::{
8	catalog::Catalog,
9	error::{CatalogError, CatalogObjectKind},
10};
11use reifydb_core::{
12	encoded::schema::Schema,
13	error::CoreError,
14	interface::catalog::id::IndexId,
15	internal_error,
16	key::{EncodableKey, index_entry::IndexEntryKey},
17};
18use reifydb_transaction::transaction::{Transaction, command::CommandTransaction};
19use reifydb_type::{
20	fragment::Fragment,
21	value::{Value, identity::IdentityId, row_number::RowNumber, r#type::Type},
22};
23
24use super::{
25	BulkInsertResult, RingBufferInsertResult, TableInsertResult,
26	validation::{
27		reorder_rows_trusted, reorder_rows_trusted_rb, validate_and_coerce_rows, validate_and_coerce_rows_rb,
28	},
29};
30use crate::{
31	Result,
32	bulk_insert::primitive::{
33		ringbuffer::{PendingRingBufferInsert, RingBufferInsertBuilder},
34		table::{PendingTableInsert, TableInsertBuilder},
35	},
36	engine::StandardEngine,
37	transaction::operation::{
38		dictionary::DictionaryOperations, ringbuffer::RingBufferOperations, table::TableOperations,
39	},
40	vm::instruction::dml::{
41		primary_key,
42		schema::{get_or_create_ringbuffer_schema, get_or_create_table_schema},
43	},
44};
45
46/// Marker trait for validation mode (sealed)
47pub trait ValidationMode: sealed::Sealed + 'static {}
48
49/// Validated mode - performs full type checking and constraint validation
50pub struct Validated;
51impl ValidationMode for Validated {}
52
53/// Trusted mode - skips validation for pre-validated internal data
54pub struct Trusted;
55impl ValidationMode for Trusted {}
56
57pub mod sealed {
58
59	use super::{Trusted, Validated};
60	pub trait Sealed {}
61	impl Sealed for Validated {}
62	impl Sealed for Trusted {}
63}
64
65/// Main builder for bulk insert operations.
66///
67/// Type parameter `V` tracks the validation mode at compile time.
68pub struct BulkInsertBuilder<'e, V: ValidationMode = Validated> {
69	engine: &'e StandardEngine,
70	_identity: IdentityId,
71	pending_tables: Vec<PendingTableInsert>,
72	pending_ringbuffers: Vec<PendingRingBufferInsert>,
73	_validation: PhantomData<V>,
74}
75
76impl<'e> BulkInsertBuilder<'e, Validated> {
77	/// Create a new bulk insert builder with full validation enabled.
78	pub(crate) fn new(engine: &'e StandardEngine, identity: IdentityId) -> Self {
79		Self {
80			engine,
81			_identity: identity,
82			pending_tables: Vec::new(),
83			pending_ringbuffers: Vec::new(),
84			_validation: PhantomData,
85		}
86	}
87}
88
89impl<'e> BulkInsertBuilder<'e, Trusted> {
90	/// Create a new bulk insert builder with validation disabled (trusted mode).
91	pub(crate) fn new_trusted(engine: &'e StandardEngine, identity: IdentityId) -> Self {
92		Self {
93			engine,
94			_identity: identity,
95			pending_tables: Vec::new(),
96			pending_ringbuffers: Vec::new(),
97			_validation: PhantomData,
98		}
99	}
100}
101
102impl<'e, V: ValidationMode> BulkInsertBuilder<'e, V> {
103	/// Begin inserting into a table.
104	///
105	/// The qualified name can be either "namespace::table" or just "table"
106	/// (which uses the default namespace).
107	pub fn table<'a>(&'a mut self, qualified_name: &str) -> TableInsertBuilder<'a, 'e, V> {
108		let (namespace, table) = parse_qualified_name(qualified_name);
109		TableInsertBuilder::new(self, namespace, table)
110	}
111
112	/// Begin inserting into a ring buffer.
113	///
114	/// The qualified name can be either "namespace::ringbuffer" or just "ringbuffer"
115	/// (which uses the default namespace).
116	pub fn ringbuffer<'a>(&'a mut self, qualified_name: &str) -> RingBufferInsertBuilder<'a, 'e, V> {
117		let (namespace, ringbuffer) = parse_qualified_name(qualified_name);
118		RingBufferInsertBuilder::new(self, namespace, ringbuffer)
119	}
120
121	/// Add a pending table insert (called by TableInsertBuilder::done)
122	pub(super) fn add_table_insert(&mut self, pending: PendingTableInsert) {
123		self.pending_tables.push(pending);
124	}
125
126	/// Add a pending ring buffer insert (called by RingBufferInsertBuilder::done)
127	pub(super) fn add_ringbuffer_insert(&mut self, pending: PendingRingBufferInsert) {
128		self.pending_ringbuffers.push(pending);
129	}
130
131	/// Execute all pending inserts in a single transaction.
132	///
133	/// Returns a summary of what was inserted. On error, the entire
134	/// transaction is rolled back (no partial inserts).
135	pub fn execute(self) -> Result<BulkInsertResult> {
136		let mut txn = self.engine.begin_command()?;
137		let catalog = self.engine.catalog();
138		let mut result = BulkInsertResult::default();
139
140		// Process all pending table inserts
141		for pending in self.pending_tables {
142			let table_result = execute_table_insert::<V>(&catalog, &mut txn, &pending, TypeId::of::<V>())?;
143			result.tables.push(table_result);
144		}
145
146		// Process all pending ring buffer inserts
147		for pending in self.pending_ringbuffers {
148			let rb_result =
149				execute_ringbuffer_insert::<V>(&catalog, &mut txn, &pending, TypeId::of::<V>())?;
150			result.ringbuffers.push(rb_result);
151		}
152
153		// Commit the transaction
154		txn.commit()?;
155
156		Ok(result)
157	}
158}
159
160/// Execute a table insert within a transaction
161fn execute_table_insert<V: ValidationMode>(
162	catalog: &Catalog,
163	txn: &mut CommandTransaction,
164	pending: &PendingTableInsert,
165	type_id: TypeId,
166) -> Result<TableInsertResult> {
167	// 1. Look up namespace and table from catalog
168	let namespace = catalog
169		.find_namespace_by_name(&mut Transaction::Command(txn), &pending.namespace)?
170		.ok_or_else(|| CatalogError::NotFound {
171			kind: CatalogObjectKind::Namespace,
172			namespace: pending.namespace.to_string(),
173			name: String::new(),
174			fragment: Fragment::None,
175		})?;
176
177	let table = catalog
178		.find_table_by_name(&mut Transaction::Command(txn), namespace.id, &pending.table)?
179		.ok_or_else(|| CatalogError::NotFound {
180			kind: CatalogObjectKind::Table,
181			namespace: pending.namespace.to_string(),
182			name: pending.table.to_string(),
183			fragment: Fragment::None,
184		})?;
185
186	// 2. Get or create schema with proper field names and constraints
187	let schema = get_or_create_table_schema(catalog, &table, &mut Transaction::Command(txn))?;
188
189	// 3. Validate and coerce all rows in batch (fail-fast)
190	let is_validated = type_id == TypeId::of::<Validated>();
191	let coerced_rows = if is_validated {
192		validate_and_coerce_rows(&pending.rows, &table)?
193	} else {
194		reorder_rows_trusted(&pending.rows, &table)?
195	};
196
197	let mut encoded_rows = Vec::with_capacity(coerced_rows.len());
198
199	for mut values in coerced_rows {
200		// Handle auto-increment columns
201		for (idx, col) in table.columns.iter().enumerate() {
202			if col.auto_increment && matches!(values[idx], Value::None { .. }) {
203				values[idx] = catalog.column_sequence_next_value(txn, table.id, col.id)?;
204			}
205		}
206
207		// Handle dictionary encoding
208		for (idx, col) in table.columns.iter().enumerate() {
209			if let Some(dict_id) = col.dictionary_id {
210				let dictionary = catalog
211					.find_dictionary(&mut Transaction::Command(txn), dict_id)?
212					.ok_or_else(|| {
213						internal_error!(
214							"Dictionary {:?} not found for column {}",
215							dict_id,
216							col.name
217						)
218					})?;
219				let entry_id = txn.insert_into_dictionary(&dictionary, &values[idx])?;
220				values[idx] = entry_id.to_value();
221			}
222		}
223
224		// Validate constraints (coercion is done in batch, but final constraint check still needed)
225		if is_validated {
226			for (idx, col) in table.columns.iter().enumerate() {
227				col.constraint.validate(&values[idx])?;
228			}
229		}
230
231		// Encode the row
232		let mut row = schema.allocate();
233		for (idx, value) in values.iter().enumerate() {
234			schema.set_value(&mut row, idx, value);
235		}
236		encoded_rows.push(row);
237	}
238
239	// 4. Batch allocate row numbers
240	let total_rows = encoded_rows.len();
241	if total_rows == 0 {
242		return Ok(TableInsertResult {
243			namespace: pending.namespace.clone(),
244			table: pending.table.clone(),
245			inserted: 0,
246		});
247	}
248
249	let row_numbers = catalog.next_row_number_batch(txn, table.id, total_rows as u64)?;
250
251	// Hoist loop-invariant computations out of the insertion loop
252	let pk_def = primary_key::get_primary_key(catalog, &mut Transaction::Command(txn), &table)?;
253	let row_number_schema = if pk_def.is_some() {
254		Some(Schema::testing(&[Type::Uint8]))
255	} else {
256		None
257	};
258
259	// 5. Insert all rows with their row numbers
260	for (row, &row_number) in encoded_rows.iter().zip(row_numbers.iter()) {
261		txn.insert_table(&table, &schema, row.clone(), row_number)?;
262
263		// Handle primary key index if table has one
264		if let Some(ref pk_def) = pk_def {
265			let index_key = primary_key::encode_primary_key(pk_def, row, &table, &schema)?;
266			let index_entry_key =
267				IndexEntryKey::new(table.id, IndexId::primary(pk_def.id), index_key.clone());
268
269			// Check for primary key violation
270			if txn.contains_key(&index_entry_key.encode())? {
271				let key_columns = pk_def.columns.iter().map(|c| c.name.clone()).collect();
272				return Err(CoreError::PrimaryKeyViolation {
273					fragment: Fragment::None,
274					table_name: table.name.clone(),
275					key_columns,
276				}
277				.into());
278			}
279
280			// Store the index entry
281			let rns = row_number_schema.as_ref().unwrap();
282			let mut row_number_encoded = rns.allocate();
283			rns.set_u64(&mut row_number_encoded, 0, u64::from(row_number));
284			txn.set(&index_entry_key.encode(), row_number_encoded)?;
285		}
286	}
287
288	Ok(TableInsertResult {
289		namespace: pending.namespace.clone(),
290		table: pending.table.clone(),
291		inserted: total_rows as u64,
292	})
293}
294
295/// Execute a ring buffer insert within a transaction
296fn execute_ringbuffer_insert<V: ValidationMode>(
297	catalog: &Catalog,
298	txn: &mut CommandTransaction,
299	pending: &PendingRingBufferInsert,
300	type_id: TypeId,
301) -> Result<RingBufferInsertResult> {
302	let namespace = catalog
303		.find_namespace_by_name(&mut Transaction::Command(txn), &pending.namespace)?
304		.ok_or_else(|| CatalogError::NotFound {
305			kind: CatalogObjectKind::Namespace,
306			namespace: pending.namespace.to_string(),
307			name: String::new(),
308			fragment: Fragment::None,
309		})?;
310
311	let ringbuffer = catalog
312		.find_ringbuffer_by_name(&mut Transaction::Command(txn), namespace.id, &pending.ringbuffer)?
313		.ok_or_else(|| CatalogError::NotFound {
314			kind: CatalogObjectKind::RingBuffer,
315			namespace: pending.namespace.to_string(),
316			name: pending.ringbuffer.to_string(),
317			fragment: Fragment::None,
318		})?;
319
320	let mut metadata = catalog
321		.find_ringbuffer_metadata(&mut Transaction::Command(txn), ringbuffer.id)?
322		.ok_or_else(|| CatalogError::NotFound {
323			kind: CatalogObjectKind::RingBuffer,
324			namespace: pending.namespace.to_string(),
325			name: pending.ringbuffer.to_string(),
326			fragment: Fragment::None,
327		})?;
328
329	// Get or create schema with proper field names and constraints
330	let schema = get_or_create_ringbuffer_schema(catalog, &ringbuffer, &mut Transaction::Command(txn))?;
331
332	// 3. Validate and coerce all rows in batch (fail-fast)
333	let is_validated = type_id == TypeId::of::<Validated>();
334	let coerced_rows = if is_validated {
335		validate_and_coerce_rows_rb(&pending.rows, &ringbuffer)?
336	} else {
337		reorder_rows_trusted_rb(&pending.rows, &ringbuffer)?
338	};
339
340	let mut inserted_count = 0u64;
341
342	// 4. Process each coerced row
343	for mut values in coerced_rows {
344		// Handle dictionary encoding
345		for (idx, col) in ringbuffer.columns.iter().enumerate() {
346			if let Some(dict_id) = col.dictionary_id {
347				let dictionary = catalog
348					.find_dictionary(&mut Transaction::Command(txn), dict_id)?
349					.ok_or_else(|| {
350						internal_error!(
351							"Dictionary {:?} not found for column {}",
352							dict_id,
353							col.name
354						)
355					})?;
356				let entry_id = txn.insert_into_dictionary(&dictionary, &values[idx])?;
357				values[idx] = entry_id.to_value();
358			}
359		}
360
361		// Validate constraints (coercion is done in batch, but final constraint check still needed)
362		if is_validated {
363			for (idx, col) in ringbuffer.columns.iter().enumerate() {
364				col.constraint.validate(&values[idx])?;
365			}
366		}
367
368		// Encode the row
369		let mut row = schema.allocate();
370		for (idx, value) in values.iter().enumerate() {
371			schema.set_value(&mut row, idx, value);
372		}
373
374		// Handle ring buffer overflow - delete oldest entry if full
375		if metadata.is_full() {
376			let oldest_row = RowNumber(metadata.head);
377			txn.remove_from_ringbuffer(&ringbuffer, oldest_row)?;
378			metadata.head += 1;
379			metadata.count -= 1;
380		}
381
382		// Allocate row number
383		let row_number = catalog.next_row_number_for_ringbuffer(txn, ringbuffer.id)?;
384
385		// Store the row
386		txn.insert_ringbuffer_at(&ringbuffer, &schema, row_number, row)?;
387
388		// Update metadata
389		if metadata.is_empty() {
390			metadata.head = row_number.0;
391		}
392		metadata.count += 1;
393		metadata.tail = row_number.0 + 1;
394
395		inserted_count += 1;
396	}
397
398	// Save updated metadata
399	catalog.update_ringbuffer_metadata(txn, metadata)?;
400
401	Ok(RingBufferInsertResult {
402		namespace: pending.namespace.clone(),
403		ringbuffer: pending.ringbuffer.clone(),
404		inserted: inserted_count,
405	})
406}
407
408/// Parse a qualified name like "namespace::table" into (namespace, name).
409/// If no namespace is provided, uses "default".
410fn parse_qualified_name(qualified_name: &str) -> (String, String) {
411	if let Some((ns, name)) = qualified_name.split_once("::") {
412		(ns.to_string(), name.to_string())
413	} else {
414		("default".to_string(), qualified_name.to_string())
415	}
416}