Skip to main content

reifydb_engine/bulk_insert/
builder.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use std::marker::PhantomData;
5
6use reifydb_catalog::{
7	catalog::Catalog,
8	error::{CatalogError, CatalogObjectKind},
9};
10use reifydb_core::{
11	encoded::{row::EncodedRow, shape::RowShape},
12	error::CoreError,
13	interface::catalog::{
14		id::IndexId,
15		key::PrimaryKey,
16		ringbuffer::{RingBuffer, RingBufferMetadata},
17		series::{Series, SeriesMetadata},
18		table::Table,
19	},
20	internal_error,
21	key::{EncodableKey, index_entry::IndexEntryKey, series_row::SeriesRowKey},
22};
23use reifydb_runtime::context::clock::Clock;
24use reifydb_transaction::{
25	interceptor::series_row::SeriesRowInterceptor,
26	transaction::{Transaction, command::CommandTransaction},
27};
28use reifydb_type::{
29	fragment::Fragment,
30	params::Params,
31	value::{Value, identity::IdentityId, row_number::RowNumber, r#type::Type},
32};
33
34use super::{
35	BulkInsertResult, RingBufferInsertResult, SeriesInsertResult, TableInsertResult,
36	validation::{
37		reorder_rows_unvalidated, reorder_rows_unvalidated_rb, reorder_rows_unvalidated_series,
38		validate_and_coerce_rows, validate_and_coerce_rows_rb, validate_and_coerce_rows_series,
39	},
40};
41use crate::{
42	Result,
43	bulk_insert::primitive::{
44		ringbuffer::{PendingRingBufferInsert, RingBufferInsertBuilder},
45		series::{PendingSeriesInsert, SeriesInsertBuilder},
46		table::{PendingTableInsert, TableInsertBuilder},
47	},
48	engine::StandardEngine,
49	transaction::operation::{
50		dictionary::DictionaryOperations, ringbuffer::RingBufferOperations, table::TableOperations,
51	},
52	vm::instruction::dml::{
53		primary_key,
54		shape::{get_or_create_ringbuffer_shape, get_or_create_series_shape, get_or_create_table_shape},
55	},
56};
57
58/// Marker trait for validation mode (sealed)
59pub trait ValidationMode: sealed::Sealed + 'static {
60	/// Whether this mode performs full type checking and constraint validation.
61	const VALIDATED: bool;
62
63	/// Run `body` inside a transaction in this mode and commit. `Unchecked`
64	/// routes through `execute_bulk_unchecked`, which disables conflict tracking
65	/// and commits via the bypass path; the others reserve the write-set hint
66	/// (when `total_rows > 0`) and commit through the standard path.
67	fn run<F, R>(txn: &mut CommandTransaction, total_rows: usize, body: F) -> Result<R>
68	where
69		F: FnOnce(&mut CommandTransaction) -> Result<R>;
70}
71
72/// Validated mode - performs full type checking and constraint validation
73pub struct Validated;
74impl ValidationMode for Validated {
75	const VALIDATED: bool = true;
76
77	fn run<F, R>(txn: &mut CommandTransaction, total_rows: usize, body: F) -> Result<R>
78	where
79		F: FnOnce(&mut CommandTransaction) -> Result<R>,
80	{
81		run_checked(txn, total_rows, body)
82	}
83}
84
85/// Unchecked mode - skips validation AND skips registering the commit in the
86/// oracle's per-key conflict-detection index. Used by `bulk_insert_unchecked`.
87/// See that method's doc for the safety contract.
88pub struct Unchecked;
89impl ValidationMode for Unchecked {
90	const VALIDATED: bool = false;
91
92	fn run<F, R>(txn: &mut CommandTransaction, _total_rows: usize, body: F) -> Result<R>
93	where
94		F: FnOnce(&mut CommandTransaction) -> Result<R>,
95	{
96		txn.execute_bulk_unchecked(body)
97	}
98}
99
100fn run_checked<F, R>(txn: &mut CommandTransaction, total_rows: usize, body: F) -> Result<R>
101where
102	F: FnOnce(&mut CommandTransaction) -> Result<R>,
103{
104	// Pre-size the conflict-tracker write set so a known-size bulk insert doesn't
105	// rehash its HashSet thousands of times. Each row produces one row write plus
106	// up to one primary-index write, so reserve 2x the row total.
107	if total_rows > 0 {
108		txn.reserve_writes(total_rows.saturating_mul(2))?;
109	}
110	let r = body(txn)?;
111	txn.commit()?;
112	Ok(r)
113}
114
115pub mod sealed {
116
117	use super::{Unchecked, Validated};
118	pub trait Sealed {}
119	impl Sealed for Validated {}
120	impl Sealed for Unchecked {}
121}
122
123/// Main builder for bulk insert operations.
124///
125/// Type parameter `V` tracks the validation mode at compile time.
126pub struct BulkInsertBuilder<'e, V: ValidationMode = Validated> {
127	engine: &'e StandardEngine,
128	identity: IdentityId,
129	pending_tables: Vec<PendingTableInsert>,
130	pending_ringbuffers: Vec<PendingRingBufferInsert>,
131	pending_series: Vec<PendingSeriesInsert>,
132	_validation: PhantomData<V>,
133}
134
135impl<'e> BulkInsertBuilder<'e, Validated> {
136	/// Create a new bulk insert builder with full validation enabled.
137	pub(crate) fn new(engine: &'e StandardEngine, identity: IdentityId) -> Self {
138		Self {
139			engine,
140			identity,
141			pending_tables: Vec::new(),
142			pending_ringbuffers: Vec::new(),
143			pending_series: Vec::new(),
144			_validation: PhantomData,
145		}
146	}
147}
148
149impl<'e> BulkInsertBuilder<'e, Unchecked> {
150	/// Create a new bulk insert builder with validation AND oracle conflict
151	/// tracking disabled (unchecked mode).
152	pub(crate) fn new_unchecked(engine: &'e StandardEngine, identity: IdentityId) -> Self {
153		Self {
154			engine,
155			identity,
156			pending_tables: Vec::new(),
157			pending_ringbuffers: Vec::new(),
158			pending_series: Vec::new(),
159			_validation: PhantomData,
160		}
161	}
162}
163
164impl<'e, V: ValidationMode> BulkInsertBuilder<'e, V> {
165	/// Begin inserting into a table.
166	///
167	/// The qualified name can be either "namespace::table" or just "table"
168	/// (which uses the default namespace).
169	pub fn table<'a>(&'a mut self, qualified_name: &str) -> TableInsertBuilder<'a, 'e, V> {
170		let (namespace, table) = parse_qualified_name(qualified_name);
171		TableInsertBuilder::new(self, namespace, table)
172	}
173
174	/// Begin inserting into a ring buffer.
175	///
176	/// The qualified name can be either "namespace::ringbuffer" or just "ringbuffer"
177	/// (which uses the default namespace).
178	pub fn ringbuffer<'a>(&'a mut self, qualified_name: &str) -> RingBufferInsertBuilder<'a, 'e, V> {
179		let (namespace, ringbuffer) = parse_qualified_name(qualified_name);
180		RingBufferInsertBuilder::new(self, namespace, ringbuffer)
181	}
182
183	/// Begin inserting into a series.
184	///
185	/// The qualified name can be either "namespace::series" or just "series"
186	/// (which uses the default namespace).
187	pub fn series<'a>(&'a mut self, qualified_name: &str) -> SeriesInsertBuilder<'a, 'e, V> {
188		let (namespace, series) = parse_qualified_name(qualified_name);
189		SeriesInsertBuilder::new(self, namespace, series)
190	}
191
192	/// Add a pending table insert (called by TableInsertBuilder::done)
193	pub(super) fn add_table_insert(&mut self, pending: PendingTableInsert) {
194		self.pending_tables.push(pending);
195	}
196
197	/// Add a pending ring buffer insert (called by RingBufferInsertBuilder::done)
198	pub(super) fn add_ringbuffer_insert(&mut self, pending: PendingRingBufferInsert) {
199		self.pending_ringbuffers.push(pending);
200	}
201
202	/// Add a pending series insert (called by SeriesInsertBuilder::done)
203	pub(super) fn add_series_insert(&mut self, pending: PendingSeriesInsert) {
204		self.pending_series.push(pending);
205	}
206
207	/// Execute all pending inserts in a single transaction.
208	///
209	/// Returns a summary of what was inserted. On error, the entire
210	/// transaction is rolled back (no partial inserts).
211	pub fn execute(self) -> Result<BulkInsertResult> {
212		self.engine.reject_if_read_only()?;
213		let mut txn = self.engine.begin_command(self.identity)?;
214		let catalog = self.engine.catalog();
215		let clock = self.engine.clock();
216		let total_rows = self.total_pending_rows();
217		let pending_tables = self.pending_tables;
218		let pending_ringbuffers = self.pending_ringbuffers;
219		let pending_series = self.pending_series;
220
221		V::run(&mut txn, total_rows, move |txn| {
222			run_all_pending::<V>(catalog, clock, txn, pending_tables, pending_ringbuffers, pending_series)
223		})
224	}
225
226	#[inline]
227	fn total_pending_rows(&self) -> usize {
228		self.pending_tables.iter().map(|p| p.rows.len()).sum::<usize>()
229			+ self.pending_ringbuffers.iter().map(|p| p.rows.len()).sum::<usize>()
230			+ self.pending_series.iter().map(|p| p.rows.len()).sum::<usize>()
231	}
232}
233
234#[inline]
235fn run_all_pending<V: ValidationMode>(
236	catalog: Catalog,
237	clock: &Clock,
238	txn: &mut CommandTransaction,
239	pending_tables: Vec<PendingTableInsert>,
240	pending_ringbuffers: Vec<PendingRingBufferInsert>,
241	pending_series: Vec<PendingSeriesInsert>,
242) -> Result<BulkInsertResult> {
243	let mut result = BulkInsertResult::default();
244	for pending in pending_tables {
245		result.tables.push(execute_table_insert::<V>(&catalog, txn, &pending, clock)?);
246	}
247	for pending in pending_ringbuffers {
248		result.ringbuffers.push(execute_ringbuffer_insert::<V>(&catalog, txn, &pending, clock)?);
249	}
250	for pending in pending_series {
251		result.series.push(execute_series_insert::<V>(&catalog, txn, &pending, clock)?);
252	}
253	Ok(result)
254}
255
256fn execute_table_insert<V: ValidationMode>(
257	catalog: &Catalog,
258	txn: &mut CommandTransaction,
259	pending: &PendingTableInsert,
260	clock: &Clock,
261) -> Result<TableInsertResult> {
262	let table = resolve_table(catalog, txn, pending)?;
263	let shape = get_or_create_table_shape(catalog, &table, &mut Transaction::Command(txn))?;
264	let encoded_rows = encode_table_rows::<V>(catalog, txn, pending, &table, &shape, clock)?;
265	if encoded_rows.is_empty() {
266		return Ok(empty_table_result(pending));
267	}
268	write_table_rows(catalog, txn, &table, &shape, pending, encoded_rows)
269}
270
271#[inline]
272fn empty_table_result(pending: &PendingTableInsert) -> TableInsertResult {
273	TableInsertResult {
274		namespace: pending.namespace.clone(),
275		table: pending.table.clone(),
276		inserted: 0,
277	}
278}
279
280#[inline]
281fn write_table_rows(
282	catalog: &Catalog,
283	txn: &mut CommandTransaction,
284	table: &Table,
285	shape: &RowShape,
286	pending: &PendingTableInsert,
287	encoded_rows: Vec<EncodedRow>,
288) -> Result<TableInsertResult> {
289	let total_rows = encoded_rows.len();
290	let row_numbers = catalog.next_row_number_batch(txn, table.id, total_rows as u64)?;
291	let pk_def = primary_key::get_primary_key(catalog, &mut Transaction::Command(txn), table)?;
292	let row_number_shape = pk_def.as_ref().map(|_| RowShape::testing(&[Type::Uint8]));
293
294	for (row, &row_number) in encoded_rows.iter().zip(row_numbers.iter()) {
295		txn.insert_table(table, shape, row.clone(), row_number)?;
296
297		if let Some(ref pk_def) = pk_def {
298			write_primary_key_index(
299				txn,
300				table,
301				shape,
302				pk_def,
303				row,
304				row_number,
305				row_number_shape.as_ref().unwrap(),
306			)?;
307		}
308	}
309
310	Ok(TableInsertResult {
311		namespace: pending.namespace.clone(),
312		table: pending.table.clone(),
313		inserted: total_rows as u64,
314	})
315}
316
317fn resolve_table(catalog: &Catalog, txn: &mut CommandTransaction, pending: &PendingTableInsert) -> Result<Table> {
318	let namespace = catalog
319		.find_namespace_by_name(&mut Transaction::Command(txn), &pending.namespace)?
320		.ok_or_else(|| CatalogError::NotFound {
321			kind: CatalogObjectKind::Namespace,
322			namespace: pending.namespace.to_string(),
323			name: String::new(),
324			fragment: Fragment::None,
325		})?;
326
327	catalog.find_table_by_name(&mut Transaction::Command(txn), namespace.id(), &pending.table)?.ok_or_else(|| {
328		CatalogError::NotFound {
329			kind: CatalogObjectKind::Table,
330			namespace: pending.namespace.to_string(),
331			name: pending.table.to_string(),
332			fragment: Fragment::None,
333		}
334		.into()
335	})
336}
337
338fn encode_table_rows<V: ValidationMode>(
339	catalog: &Catalog,
340	txn: &mut CommandTransaction,
341	pending: &PendingTableInsert,
342	table: &Table,
343	shape: &RowShape,
344	clock: &Clock,
345) -> Result<Vec<EncodedRow>> {
346	let coerced_rows = coerce_table_rows::<V>(&pending.rows, table)?;
347	let mut encoded_rows = Vec::with_capacity(coerced_rows.len());
348	for values in coerced_rows {
349		encoded_rows.push(prepare_table_row::<V>(catalog, txn, table, shape, clock, values)?);
350	}
351	Ok(encoded_rows)
352}
353
354#[inline]
355fn coerce_table_rows<V: ValidationMode>(rows: &[Params], table: &Table) -> Result<Vec<Vec<Value>>> {
356	if V::VALIDATED {
357		validate_and_coerce_rows(rows, table)
358	} else {
359		reorder_rows_unvalidated(rows, table)
360	}
361}
362
363#[inline]
364fn prepare_table_row<V: ValidationMode>(
365	catalog: &Catalog,
366	txn: &mut CommandTransaction,
367	table: &Table,
368	shape: &RowShape,
369	clock: &Clock,
370	mut values: Vec<Value>,
371) -> Result<EncodedRow> {
372	fill_auto_increment_table(catalog, txn, table, &mut values)?;
373	dictionary_encode_table(catalog, txn, table, &mut values)?;
374	if V::VALIDATED {
375		validate_table_constraints(table, &values)?;
376	}
377	Ok(encode_row(shape, &values, clock))
378}
379
380#[inline]
381fn validate_table_constraints(table: &Table, values: &[Value]) -> Result<()> {
382	for (idx, col) in table.columns.iter().enumerate() {
383		col.constraint.validate(&values[idx])?;
384	}
385	Ok(())
386}
387
388fn fill_auto_increment_table(
389	catalog: &Catalog,
390	txn: &mut CommandTransaction,
391	table: &Table,
392	values: &mut [Value],
393) -> Result<()> {
394	for (idx, col) in table.columns.iter().enumerate() {
395		if col.auto_increment && matches!(values[idx], Value::None { .. }) {
396			values[idx] = catalog.column_sequence_next_value(txn, table.id, col.id)?;
397		}
398	}
399	Ok(())
400}
401
402fn dictionary_encode_table(
403	catalog: &Catalog,
404	txn: &mut CommandTransaction,
405	table: &Table,
406	values: &mut [Value],
407) -> Result<()> {
408	for (idx, col) in table.columns.iter().enumerate() {
409		if let Some(dict_id) = col.dictionary_id {
410			let dictionary =
411				catalog.find_dictionary(&mut Transaction::Command(txn), dict_id)?.ok_or_else(|| {
412					internal_error!("Dictionary {:?} not found for column {}", dict_id, col.name)
413				})?;
414			let entry_id = txn.insert_into_dictionary(&dictionary, &values[idx])?;
415			values[idx] = entry_id.to_value();
416		}
417	}
418	Ok(())
419}
420
421fn encode_row(shape: &RowShape, values: &[Value], clock: &Clock) -> EncodedRow {
422	let mut row = shape.allocate();
423	for (idx, value) in values.iter().enumerate() {
424		shape.set_value(&mut row, idx, value);
425	}
426	let now_nanos = clock.now_nanos();
427	row.set_timestamps(now_nanos, now_nanos);
428	row
429}
430
431fn write_primary_key_index(
432	txn: &mut CommandTransaction,
433	table: &Table,
434	shape: &RowShape,
435	pk_def: &PrimaryKey,
436	row: &EncodedRow,
437	row_number: RowNumber,
438	row_number_shape: &RowShape,
439) -> Result<()> {
440	let index_key = primary_key::encode_primary_key(pk_def, row, table, shape)?;
441	let index_entry_key = IndexEntryKey::new(table.id, IndexId::primary(pk_def.id), index_key);
442
443	if txn.contains_key(&index_entry_key.encode())? {
444		let key_columns = pk_def.columns.iter().map(|c| c.name.clone()).collect();
445		return Err(CoreError::PrimaryKeyViolation {
446			fragment: Fragment::None,
447			table_name: table.name.clone(),
448			key_columns,
449		}
450		.into());
451	}
452
453	let mut row_number_encoded = row_number_shape.allocate();
454	row_number_shape.set_u64(&mut row_number_encoded, 0, u64::from(row_number));
455	txn.set(&index_entry_key.encode(), row_number_encoded)?;
456	Ok(())
457}
458
459fn execute_ringbuffer_insert<V: ValidationMode>(
460	catalog: &Catalog,
461	txn: &mut CommandTransaction,
462	pending: &PendingRingBufferInsert,
463	clock: &Clock,
464) -> Result<RingBufferInsertResult> {
465	let ringbuffer = resolve_ringbuffer(catalog, txn, pending)?;
466	let mut metadata = load_ringbuffer_metadata(catalog, txn, pending, &ringbuffer)?;
467	let shape = get_or_create_ringbuffer_shape(catalog, &ringbuffer, &mut Transaction::Command(txn))?;
468	let coerced_rows = coerce_ringbuffer_rows::<V>(pending, &ringbuffer)?;
469	let inserted =
470		insert_ringbuffer_rows::<V>(catalog, txn, &ringbuffer, &shape, coerced_rows, &mut metadata, clock)?;
471	catalog.update_ringbuffer_metadata(txn, metadata)?;
472	Ok(RingBufferInsertResult {
473		namespace: pending.namespace.clone(),
474		ringbuffer: pending.ringbuffer.clone(),
475		inserted,
476	})
477}
478
479#[inline]
480fn resolve_ringbuffer(
481	catalog: &Catalog,
482	txn: &mut CommandTransaction,
483	pending: &PendingRingBufferInsert,
484) -> Result<RingBuffer> {
485	let namespace = catalog
486		.find_namespace_by_name(&mut Transaction::Command(txn), &pending.namespace)?
487		.ok_or_else(|| CatalogError::NotFound {
488			kind: CatalogObjectKind::Namespace,
489			namespace: pending.namespace.to_string(),
490			name: String::new(),
491			fragment: Fragment::None,
492		})?;
493
494	catalog.find_ringbuffer_by_name(&mut Transaction::Command(txn), namespace.id(), &pending.ringbuffer)?
495		.ok_or_else(|| {
496			CatalogError::NotFound {
497				kind: CatalogObjectKind::RingBuffer,
498				namespace: pending.namespace.to_string(),
499				name: pending.ringbuffer.to_string(),
500				fragment: Fragment::None,
501			}
502			.into()
503		})
504}
505
506#[inline]
507fn load_ringbuffer_metadata(
508	catalog: &Catalog,
509	txn: &mut CommandTransaction,
510	pending: &PendingRingBufferInsert,
511	ringbuffer: &RingBuffer,
512) -> Result<RingBufferMetadata> {
513	catalog.find_ringbuffer_metadata(&mut Transaction::Command(txn), ringbuffer.id)?.ok_or_else(|| {
514		CatalogError::NotFound {
515			kind: CatalogObjectKind::RingBuffer,
516			namespace: pending.namespace.to_string(),
517			name: pending.ringbuffer.to_string(),
518			fragment: Fragment::None,
519		}
520		.into()
521	})
522}
523
524#[inline]
525fn coerce_ringbuffer_rows<V: ValidationMode>(
526	pending: &PendingRingBufferInsert,
527	ringbuffer: &RingBuffer,
528) -> Result<Vec<Vec<Value>>> {
529	if V::VALIDATED {
530		validate_and_coerce_rows_rb(&pending.rows, ringbuffer)
531	} else {
532		reorder_rows_unvalidated_rb(&pending.rows, ringbuffer)
533	}
534}
535
536fn insert_ringbuffer_rows<V: ValidationMode>(
537	catalog: &Catalog,
538	txn: &mut CommandTransaction,
539	ringbuffer: &RingBuffer,
540	shape: &RowShape,
541	coerced_rows: Vec<Vec<Value>>,
542	metadata: &mut RingBufferMetadata,
543	clock: &Clock,
544) -> Result<u64> {
545	let mut inserted_count = 0u64;
546	for mut values in coerced_rows {
547		dict_encode_ringbuffer_row(catalog, txn, ringbuffer, &mut values)?;
548
549		if V::VALIDATED {
550			for (idx, col) in ringbuffer.columns.iter().enumerate() {
551				col.constraint.validate(&values[idx])?;
552			}
553		}
554
555		let mut row = shape.allocate();
556		for (idx, value) in values.iter().enumerate() {
557			shape.set_value(&mut row, idx, value);
558		}
559		let now_nanos = clock.now_nanos();
560		row.set_timestamps(now_nanos, now_nanos);
561
562		evict_oldest_if_full(txn, ringbuffer, metadata)?;
563
564		let row_number = catalog.next_row_number_for_ringbuffer(txn, ringbuffer.id)?;
565		txn.insert_ringbuffer_at(ringbuffer, shape, row_number, row)?;
566
567		if metadata.is_empty() {
568			metadata.head = row_number.0;
569		}
570		metadata.count += 1;
571		metadata.tail = row_number.0 + 1;
572
573		inserted_count += 1;
574	}
575	Ok(inserted_count)
576}
577
578#[inline]
579fn evict_oldest_if_full(
580	txn: &mut CommandTransaction,
581	ringbuffer: &RingBuffer,
582	metadata: &mut RingBufferMetadata,
583) -> Result<()> {
584	if metadata.is_full() {
585		let oldest_row = RowNumber(metadata.head);
586		txn.remove_from_ringbuffer(ringbuffer, oldest_row)?;
587		metadata.head += 1;
588		metadata.count -= 1;
589	}
590	Ok(())
591}
592
593#[inline]
594fn dict_encode_ringbuffer_row(
595	catalog: &Catalog,
596	txn: &mut CommandTransaction,
597	ringbuffer: &RingBuffer,
598	values: &mut [Value],
599) -> Result<()> {
600	for (idx, col) in ringbuffer.columns.iter().enumerate() {
601		if let Some(dict_id) = col.dictionary_id {
602			let dictionary =
603				catalog.find_dictionary(&mut Transaction::Command(txn), dict_id)?.ok_or_else(|| {
604					internal_error!("Dictionary {:?} not found for column {}", dict_id, col.name)
605				})?;
606			let entry_id = txn.insert_into_dictionary(&dictionary, &values[idx])?;
607			values[idx] = entry_id.to_value();
608		}
609	}
610	Ok(())
611}
612
613fn execute_series_insert<V: ValidationMode>(
614	catalog: &Catalog,
615	txn: &mut CommandTransaction,
616	pending: &PendingSeriesInsert,
617	clock: &Clock,
618) -> Result<SeriesInsertResult> {
619	let series = resolve_series(catalog, txn, pending)?;
620	let mut metadata = load_series_metadata(catalog, txn, pending, &series)?;
621	let shape = get_or_create_series_shape(catalog, &series, &mut Transaction::Command(txn))?;
622	let coerced_rows = coerce_series_rows::<V>(pending, &series)?;
623	let inserted = insert_series_rows::<V>(txn, &series, &shape, coerced_rows, &mut metadata, clock)?;
624	catalog.update_series_metadata_txn(&mut Transaction::Command(txn), metadata)?;
625	Ok(SeriesInsertResult {
626		namespace: pending.namespace.clone(),
627		series: pending.series.clone(),
628		inserted,
629	})
630}
631
632#[inline]
633fn resolve_series(catalog: &Catalog, txn: &mut CommandTransaction, pending: &PendingSeriesInsert) -> Result<Series> {
634	let namespace = catalog
635		.find_namespace_by_name(&mut Transaction::Command(txn), &pending.namespace)?
636		.ok_or_else(|| CatalogError::NotFound {
637			kind: CatalogObjectKind::Namespace,
638			namespace: pending.namespace.to_string(),
639			name: String::new(),
640			fragment: Fragment::None,
641		})?;
642
643	catalog.find_series_by_name(&mut Transaction::Command(txn), namespace.id(), &pending.series)?.ok_or_else(|| {
644		CatalogError::NotFound {
645			kind: CatalogObjectKind::Series,
646			namespace: pending.namespace.to_string(),
647			name: pending.series.to_string(),
648			fragment: Fragment::None,
649		}
650		.into()
651	})
652}
653
654#[inline]
655fn load_series_metadata(
656	catalog: &Catalog,
657	txn: &mut CommandTransaction,
658	pending: &PendingSeriesInsert,
659	series: &Series,
660) -> Result<SeriesMetadata> {
661	catalog.find_series_metadata(&mut Transaction::Command(txn), series.id)?.ok_or_else(|| {
662		CatalogError::NotFound {
663			kind: CatalogObjectKind::Series,
664			namespace: pending.namespace.to_string(),
665			name: pending.series.to_string(),
666			fragment: Fragment::None,
667		}
668		.into()
669	})
670}
671
672#[inline]
673fn coerce_series_rows<V: ValidationMode>(pending: &PendingSeriesInsert, series: &Series) -> Result<Vec<Vec<Value>>> {
674	if V::VALIDATED {
675		validate_and_coerce_rows_series(&pending.rows, series)
676	} else {
677		reorder_rows_unvalidated_series(&pending.rows, series)
678	}
679}
680
681fn insert_series_rows<V: ValidationMode>(
682	txn: &mut CommandTransaction,
683	series: &Series,
684	shape: &RowShape,
685	coerced_rows: Vec<Vec<Value>>,
686	metadata: &mut SeriesMetadata,
687	clock: &Clock,
688) -> Result<u64> {
689	let key_col_name = series.key.column();
690	let key_col_idx =
691		series.columns.iter().position(|c| c.name == key_col_name).ok_or_else(|| {
692			internal_error!("series {} key column {} not found", series.name, key_col_name)
693		})?;
694
695	let mut inserted_count = 0u64;
696	for values in coerced_rows {
697		if V::VALIDATED {
698			for (idx, col) in series.columns.iter().enumerate() {
699				col.constraint.validate(&values[idx])?;
700			}
701		}
702
703		let key_value = series.key_to_u64(values[key_col_idx].clone()).unwrap_or(0);
704
705		metadata.sequence_counter += 1;
706		let sequence = metadata.sequence_counter;
707		let row_key = SeriesRowKey {
708			series: series.id,
709			variant_tag: None,
710			key: key_value,
711			sequence,
712		};
713		let encoded_key = row_key.encode();
714
715		let row = encode_series_row(series, shape, key_value, &values, key_col_idx, clock);
716
717		let row = SeriesRowInterceptor::pre_insert(txn, series, row)?;
718		txn.set(&encoded_key, row.clone())?;
719		SeriesRowInterceptor::post_insert(txn, series, &row)?;
720
721		update_series_metadata_for_insert(metadata, key_value);
722		inserted_count += 1;
723	}
724	Ok(inserted_count)
725}
726
727#[inline]
728fn encode_series_row(
729	series: &Series,
730	shape: &RowShape,
731	key_value: u64,
732	values: &[Value],
733	key_col_idx: usize,
734	clock: &Clock,
735) -> EncodedRow {
736	let key_value_encoded = series.key_from_u64(key_value);
737	let mut row = shape.allocate();
738	shape.set_value(&mut row, 0, &key_value_encoded);
739	let mut shape_idx = 1;
740	for (col_idx, value) in values.iter().enumerate() {
741		if col_idx == key_col_idx {
742			continue;
743		}
744		shape.set_value(&mut row, shape_idx, value);
745		shape_idx += 1;
746	}
747	let now_nanos = clock.now_nanos();
748	row.set_timestamps(now_nanos, now_nanos);
749	row
750}
751
752#[inline]
753fn update_series_metadata_for_insert(metadata: &mut SeriesMetadata, key_value: u64) {
754	if metadata.row_count == 0 {
755		metadata.oldest_key = key_value;
756		metadata.newest_key = key_value;
757	} else {
758		if key_value < metadata.oldest_key {
759			metadata.oldest_key = key_value;
760		}
761		if key_value > metadata.newest_key {
762			metadata.newest_key = key_value;
763		}
764	}
765	metadata.row_count += 1;
766}
767
768/// Parse a qualified name like "namespace::table" into (namespace, name).
769/// If no namespace is provided, uses "default".
770fn parse_qualified_name(qualified_name: &str) -> (String, String) {
771	if let Some((ns, name)) = qualified_name.rsplit_once("::") {
772		(ns.to_string(), name.to_string())
773	} else {
774		("default".to_string(), qualified_name.to_string())
775	}
776}
777
778#[cfg(test)]
779mod tests {
780	use super::*;
781
782	#[test]
783	fn parse_qualified_name_simple() {
784		assert_eq!(parse_qualified_name("table"), ("default".to_string(), "table".to_string()));
785	}
786
787	#[test]
788	fn parse_qualified_name_single_namespace() {
789		assert_eq!(parse_qualified_name("ns::table"), ("ns".to_string(), "table".to_string()));
790	}
791
792	#[test]
793	fn parse_qualified_name_nested_namespace() {
794		assert_eq!(parse_qualified_name("a::b::table"), ("a::b".to_string(), "table".to_string()));
795	}
796
797	#[test]
798	fn parse_qualified_name_deeply_nested_namespace() {
799		assert_eq!(parse_qualified_name("a::b::c::table"), ("a::b::c".to_string(), "table".to_string()));
800	}
801
802	#[test]
803	fn parse_qualified_name_empty_string() {
804		assert_eq!(parse_qualified_name(""), ("default".to_string(), "".to_string()));
805	}
806}