Skip to main content

reifydb_engine/bulk_insert/
builder.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// Copyright (c) 2025 ReifyDB
3
4use std::marker::PhantomData;
5
6use reifydb_catalog::catalog::Catalog;
7use reifydb_core::{
8	encoded::schema::Schema,
9	interface::{auth::Identity, catalog::id::IndexId},
10	key::{EncodableKey, index_entry::IndexEntryKey},
11};
12use reifydb_transaction::transaction::{Transaction, command::CommandTransaction};
13use reifydb_type::{
14	fragment::Fragment,
15	value::{Value, row_number::RowNumber, r#type::Type},
16};
17
18use super::{
19	BulkInsertResult, RingBufferInsertResult, TableInsertResult,
20	error::BulkInsertError,
21	validation::{
22		reorder_rows_trusted, reorder_rows_trusted_rb, validate_and_coerce_rows, validate_and_coerce_rows_rb,
23	},
24};
25use crate::{
26	bulk_insert::primitive::{
27		ringbuffer::{PendingRingBufferInsert, RingBufferInsertBuilder},
28		table::{PendingTableInsert, TableInsertBuilder},
29	},
30	engine::StandardEngine,
31	transaction::operation::{
32		dictionary::DictionaryOperations, ringbuffer::RingBufferOperations, table::TableOperations,
33	},
34};
35
36/// Marker trait for validation mode (sealed)
37pub trait ValidationMode: sealed::Sealed + 'static {}
38
39/// Validated mode - performs full type checking and constraint validation
40pub struct Validated;
41impl ValidationMode for Validated {}
42
43/// Trusted mode - skips validation for pre-validated internal data
44pub struct Trusted;
45impl ValidationMode for Trusted {}
46
47pub mod sealed {
48	pub trait Sealed {}
49	impl Sealed for super::Validated {}
50	impl Sealed for super::Trusted {}
51}
52
53/// Main builder for bulk insert operations.
54///
55/// Type parameter `V` tracks the validation mode at compile time.
56pub struct BulkInsertBuilder<'e, V: ValidationMode = Validated> {
57	engine: &'e StandardEngine,
58	_identity: &'e Identity,
59	pending_tables: Vec<PendingTableInsert>,
60	pending_ringbuffers: Vec<PendingRingBufferInsert>,
61	_validation: PhantomData<V>,
62}
63
64impl<'e> BulkInsertBuilder<'e, Validated> {
65	/// Create a new bulk insert builder with full validation enabled.
66	pub(crate) fn new(engine: &'e StandardEngine, identity: &'e Identity) -> Self {
67		Self {
68			engine,
69			_identity: identity,
70			pending_tables: Vec::new(),
71			pending_ringbuffers: Vec::new(),
72			_validation: PhantomData,
73		}
74	}
75}
76
77impl<'e> BulkInsertBuilder<'e, Trusted> {
78	/// Create a new bulk insert builder with validation disabled (trusted mode).
79	pub(crate) fn new_trusted(engine: &'e StandardEngine, identity: &'e Identity) -> Self {
80		Self {
81			engine,
82			_identity: identity,
83			pending_tables: Vec::new(),
84			pending_ringbuffers: Vec::new(),
85			_validation: PhantomData,
86		}
87	}
88}
89
90impl<'e, V: ValidationMode> BulkInsertBuilder<'e, V> {
91	/// Begin inserting into a table.
92	///
93	/// The qualified name can be either "namespace.table" or just "table"
94	/// (which uses the default namespace).
95	pub fn table<'a>(&'a mut self, qualified_name: &str) -> TableInsertBuilder<'a, 'e, V> {
96		let (namespace, table) = parse_qualified_name(qualified_name);
97		TableInsertBuilder::new(self, namespace, table)
98	}
99
100	/// Begin inserting into a ring buffer.
101	///
102	/// The qualified name can be either "namespace.ringbuffer" or just "ringbuffer"
103	/// (which uses the default namespace).
104	pub fn ringbuffer<'a>(&'a mut self, qualified_name: &str) -> RingBufferInsertBuilder<'a, 'e, V> {
105		let (namespace, ringbuffer) = parse_qualified_name(qualified_name);
106		RingBufferInsertBuilder::new(self, namespace, ringbuffer)
107	}
108
109	/// Add a pending table insert (called by TableInsertBuilder::done)
110	pub(super) fn add_table_insert(&mut self, pending: PendingTableInsert) {
111		self.pending_tables.push(pending);
112	}
113
114	/// Add a pending ring buffer insert (called by RingBufferInsertBuilder::done)
115	pub(super) fn add_ringbuffer_insert(&mut self, pending: PendingRingBufferInsert) {
116		self.pending_ringbuffers.push(pending);
117	}
118
119	/// Execute all pending inserts in a single transaction.
120	///
121	/// Returns a summary of what was inserted. On error, the entire
122	/// transaction is rolled back (no partial inserts).
123	pub fn execute(self) -> crate::Result<BulkInsertResult> {
124		let mut txn = self.engine.begin_command()?;
125		let catalog = self.engine.catalog();
126		let mut result = BulkInsertResult::default();
127
128		// Process all pending table inserts
129		for pending in self.pending_tables {
130			let table_result =
131				execute_table_insert::<V>(&catalog, &mut txn, &pending, std::any::TypeId::of::<V>())?;
132			result.tables.push(table_result);
133		}
134
135		// Process all pending ring buffer inserts
136		for pending in self.pending_ringbuffers {
137			let rb_result = execute_ringbuffer_insert::<V>(
138				&catalog,
139				&mut txn,
140				&pending,
141				std::any::TypeId::of::<V>(),
142			)?;
143			result.ringbuffers.push(rb_result);
144		}
145
146		// Commit the transaction
147		txn.commit()?;
148
149		Ok(result)
150	}
151}
152
153/// Execute a table insert within a transaction
154fn execute_table_insert<V: ValidationMode>(
155	catalog: &Catalog,
156	txn: &mut CommandTransaction,
157	pending: &PendingTableInsert,
158	type_id: std::any::TypeId,
159) -> crate::Result<TableInsertResult> {
160	use crate::vm::instruction::dml::primary_key;
161
162	// 1. Look up namespace and table from catalog
163	let namespace = catalog
164		.find_namespace_by_name(&mut Transaction::Command(txn), &pending.namespace)?
165		.ok_or_else(|| BulkInsertError::namespace_not_found(Fragment::None, &pending.namespace))?;
166
167	let table = catalog
168		.find_table_by_name(&mut Transaction::Command(txn), namespace.id, &pending.table)?
169		.ok_or_else(|| BulkInsertError::table_not_found(Fragment::None, &pending.namespace, &pending.table))?;
170
171	// 2. Get or create schema with proper field names and constraints
172	let schema = crate::vm::instruction::dml::schema::get_or_create_table_schema(
173		catalog,
174		&table,
175		&mut Transaction::Command(txn),
176	)?;
177
178	// 3. Validate and coerce all rows in batch (fail-fast)
179	let is_validated = type_id == std::any::TypeId::of::<Validated>();
180	let coerced_rows = if is_validated {
181		validate_and_coerce_rows(&pending.rows, &table)?
182	} else {
183		reorder_rows_trusted(&pending.rows, &table)?
184	};
185
186	let mut encoded_rows = Vec::with_capacity(coerced_rows.len());
187
188	for mut values in coerced_rows {
189		// Handle auto-increment columns
190		for (idx, col) in table.columns.iter().enumerate() {
191			if col.auto_increment && matches!(values[idx], Value::None { .. }) {
192				values[idx] = catalog.column_sequence_next_value(txn, table.id, col.id)?;
193			}
194		}
195
196		// Handle dictionary encoding
197		for (idx, col) in table.columns.iter().enumerate() {
198			if let Some(dict_id) = col.dictionary_id {
199				let dictionary = catalog
200					.find_dictionary(&mut Transaction::Command(txn), dict_id)?
201					.ok_or_else(|| {
202						reifydb_core::internal_error!(
203							"Dictionary {:?} not found for column {}",
204							dict_id,
205							col.name
206						)
207					})?;
208				let entry_id = txn.insert_into_dictionary(&dictionary, &values[idx])?;
209				values[idx] = entry_id.to_value();
210			}
211		}
212
213		// Validate constraints (coercion is done in batch, but final constraint check still needed)
214		if is_validated {
215			for (idx, col) in table.columns.iter().enumerate() {
216				col.constraint.validate(&values[idx])?;
217			}
218		}
219
220		// Encode the row
221		let mut row = schema.allocate();
222		for (idx, value) in values.iter().enumerate() {
223			schema.set_value(&mut row, idx, value);
224		}
225		encoded_rows.push(row);
226	}
227
228	// 4. Batch allocate row numbers
229	let total_rows = encoded_rows.len();
230	if total_rows == 0 {
231		return Ok(TableInsertResult {
232			namespace: pending.namespace.clone(),
233			table: pending.table.clone(),
234			inserted: 0,
235		});
236	}
237
238	let row_numbers = catalog.next_row_number_batch(txn, table.id, total_rows as u64)?;
239
240	// 5. Insert all rows with their row numbers
241	for (row, &row_number) in encoded_rows.iter().zip(row_numbers.iter()) {
242		txn.insert_table(table.clone(), row.clone(), row_number)?;
243
244		// Handle primary key index if table has one
245		if let Some(pk_def) = primary_key::get_primary_key(catalog, &mut Transaction::Command(txn), &table)? {
246			let index_key = primary_key::encode_primary_key(&pk_def, row, &table, &schema)?;
247			let index_entry_key =
248				IndexEntryKey::new(table.id, IndexId::primary(pk_def.id), index_key.clone());
249
250			// Check for primary key violation
251			if txn.contains_key(&index_entry_key.encode())? {
252				let key_columns = pk_def.columns.iter().map(|c| c.name.clone()).collect();
253				reifydb_type::return_error!(
254					reifydb_core::error::diagnostic::index::primary_key_violation(
255						Fragment::None,
256						table.name.clone(),
257						key_columns,
258					)
259				);
260			}
261
262			// Store the index entry
263			let row_number_schema = Schema::testing(&[Type::Uint8]);
264			let mut row_number_encoded = row_number_schema.allocate();
265			row_number_schema.set_u64(&mut row_number_encoded, 0, u64::from(row_number));
266			txn.set(&index_entry_key.encode(), row_number_encoded)?;
267		}
268	}
269
270	Ok(TableInsertResult {
271		namespace: pending.namespace.clone(),
272		table: pending.table.clone(),
273		inserted: total_rows as u64,
274	})
275}
276
277/// Execute a ring buffer insert within a transaction
278fn execute_ringbuffer_insert<V: ValidationMode>(
279	catalog: &Catalog,
280	txn: &mut CommandTransaction,
281	pending: &PendingRingBufferInsert,
282	type_id: std::any::TypeId,
283) -> crate::Result<RingBufferInsertResult> {
284	let namespace = catalog
285		.find_namespace_by_name(&mut Transaction::Command(txn), &pending.namespace)?
286		.ok_or_else(|| BulkInsertError::namespace_not_found(Fragment::None, &pending.namespace))?;
287
288	let ringbuffer = catalog
289		.find_ringbuffer_by_name(&mut Transaction::Command(txn), namespace.id, &pending.ringbuffer)?
290		.ok_or_else(|| {
291			BulkInsertError::ringbuffer_not_found(Fragment::None, &pending.namespace, &pending.ringbuffer)
292		})?;
293
294	let mut metadata =
295		catalog.find_ringbuffer_metadata(&mut Transaction::Command(txn), ringbuffer.id)?.ok_or_else(|| {
296			BulkInsertError::ringbuffer_not_found(Fragment::None, &pending.namespace, &pending.ringbuffer)
297		})?;
298
299	// Get or create schema with proper field names and constraints
300	let schema = crate::vm::instruction::dml::schema::get_or_create_ringbuffer_schema(
301		catalog,
302		&ringbuffer,
303		&mut Transaction::Command(txn),
304	)?;
305
306	// 3. Validate and coerce all rows in batch (fail-fast)
307	let is_validated = type_id == std::any::TypeId::of::<Validated>();
308	let coerced_rows = if is_validated {
309		validate_and_coerce_rows_rb(&pending.rows, &ringbuffer)?
310	} else {
311		reorder_rows_trusted_rb(&pending.rows, &ringbuffer)?
312	};
313
314	let mut inserted_count = 0u64;
315
316	// 4. Process each coerced row
317	for mut values in coerced_rows {
318		// Handle dictionary encoding
319		for (idx, col) in ringbuffer.columns.iter().enumerate() {
320			if let Some(dict_id) = col.dictionary_id {
321				let dictionary = catalog
322					.find_dictionary(&mut Transaction::Command(txn), dict_id)?
323					.ok_or_else(|| {
324						reifydb_core::internal_error!(
325							"Dictionary {:?} not found for column {}",
326							dict_id,
327							col.name
328						)
329					})?;
330				let entry_id = txn.insert_into_dictionary(&dictionary, &values[idx])?;
331				values[idx] = entry_id.to_value();
332			}
333		}
334
335		// Validate constraints (coercion is done in batch, but final constraint check still needed)
336		if is_validated {
337			for (idx, col) in ringbuffer.columns.iter().enumerate() {
338				col.constraint.validate(&values[idx])?;
339			}
340		}
341
342		// Encode the row
343		let mut row = schema.allocate();
344		for (idx, value) in values.iter().enumerate() {
345			schema.set_value(&mut row, idx, value);
346		}
347
348		// Handle ring buffer overflow - delete oldest entry if full
349		if metadata.is_full() {
350			let oldest_row = RowNumber(metadata.head);
351			txn.remove_from_ringbuffer(ringbuffer.clone(), oldest_row)?;
352			metadata.head += 1;
353			metadata.count -= 1;
354		}
355
356		// Allocate row number
357		let row_number = catalog.next_row_number_for_ringbuffer(txn, ringbuffer.id)?;
358
359		// Store the row
360		txn.insert_ringbuffer_at(ringbuffer.clone(), row_number, row)?;
361
362		// Update metadata
363		if metadata.is_empty() {
364			metadata.head = row_number.0;
365		}
366		metadata.count += 1;
367		metadata.tail = row_number.0 + 1;
368
369		inserted_count += 1;
370	}
371
372	// Save updated metadata
373	catalog.update_ringbuffer_metadata(txn, metadata)?;
374
375	Ok(RingBufferInsertResult {
376		namespace: pending.namespace.clone(),
377		ringbuffer: pending.ringbuffer.clone(),
378		inserted: inserted_count,
379	})
380}
381
382/// Parse a qualified name like "namespace.table" into (namespace, name).
383/// If no namespace is provided, uses "default".
384fn parse_qualified_name(qualified_name: &str) -> (String, String) {
385	if let Some((ns, name)) = qualified_name.split_once('.') {
386		(ns.to_string(), name.to_string())
387	} else {
388		("default".to_string(), qualified_name.to_string())
389	}
390}