Skip to main content

reifydb_engine/bulk_insert/
builder.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use std::marker::PhantomData;
5
6use reifydb_catalog::{
7	catalog::Catalog,
8	error::{CatalogError, CatalogObjectKind},
9};
10use reifydb_core::{
11	encoded::{row::EncodedRow, shape::RowShape},
12	error::CoreError,
13	interface::catalog::{
14		id::IndexId,
15		key::PrimaryKey,
16		ringbuffer::{RingBuffer, RingBufferMetadata},
17		series::{Series, SeriesMetadata},
18		table::Table,
19	},
20	internal_error,
21	key::{EncodableKey, index_entry::IndexEntryKey, series_row::SeriesRowKey},
22};
23use reifydb_runtime::context::clock::Clock;
24use reifydb_transaction::{
25	interceptor::series_row::SeriesRowInterceptor,
26	transaction::{Transaction, command::CommandTransaction},
27};
28use reifydb_type::{
29	fragment::Fragment,
30	params::Params,
31	value::{Value, identity::IdentityId, row_number::RowNumber, r#type::Type},
32};
33
34use super::{
35	BulkInsertResult, RingBufferInsertResult, SeriesInsertResult, TableInsertResult,
36	validation::{
37		reorder_rows_unvalidated, reorder_rows_unvalidated_rb, reorder_rows_unvalidated_series,
38		validate_and_coerce_rows, validate_and_coerce_rows_rb, validate_and_coerce_rows_series,
39	},
40};
41use crate::{
42	Result,
43	bulk_insert::primitive::{
44		ringbuffer::{PendingRingBufferInsert, RingBufferInsertBuilder},
45		series::{PendingSeriesInsert, SeriesInsertBuilder},
46		table::{PendingTableInsert, TableInsertBuilder},
47	},
48	engine::StandardEngine,
49	transaction::operation::{
50		dictionary::DictionaryOperations, ringbuffer::RingBufferOperations, table::TableOperations,
51	},
52	vm::instruction::dml::{
53		primary_key,
54		shape::{get_or_create_ringbuffer_shape, get_or_create_series_shape, get_or_create_table_shape},
55	},
56};
57
58pub trait ValidationMode: sealed::Sealed + 'static {
59	const VALIDATED: bool;
60
61	fn run<F, R>(txn: &mut CommandTransaction, total_rows: usize, body: F) -> Result<R>
62	where
63		F: FnOnce(&mut CommandTransaction) -> Result<R>;
64}
65
66pub struct Validated;
67impl ValidationMode for Validated {
68	const VALIDATED: bool = true;
69
70	fn run<F, R>(txn: &mut CommandTransaction, total_rows: usize, body: F) -> Result<R>
71	where
72		F: FnOnce(&mut CommandTransaction) -> Result<R>,
73	{
74		run_checked(txn, total_rows, body)
75	}
76}
77
78pub struct Unchecked;
79impl ValidationMode for Unchecked {
80	const VALIDATED: bool = false;
81
82	fn run<F, R>(txn: &mut CommandTransaction, _total_rows: usize, body: F) -> Result<R>
83	where
84		F: FnOnce(&mut CommandTransaction) -> Result<R>,
85	{
86		txn.execute_bulk_unchecked(body)
87	}
88}
89
90fn run_checked<F, R>(txn: &mut CommandTransaction, total_rows: usize, body: F) -> Result<R>
91where
92	F: FnOnce(&mut CommandTransaction) -> Result<R>,
93{
94	if total_rows > 0 {
95		txn.reserve_writes(total_rows.saturating_mul(2))?;
96	}
97	let r = body(txn)?;
98	txn.commit()?;
99	Ok(r)
100}
101
102pub mod sealed {
103
104	use super::{Unchecked, Validated};
105	pub trait Sealed {}
106	impl Sealed for Validated {}
107	impl Sealed for Unchecked {}
108}
109
110pub struct BulkInsertBuilder<'e, V: ValidationMode = Validated> {
111	engine: &'e StandardEngine,
112	identity: IdentityId,
113	pending_tables: Vec<PendingTableInsert>,
114	pending_ringbuffers: Vec<PendingRingBufferInsert>,
115	pending_series: Vec<PendingSeriesInsert>,
116	_validation: PhantomData<V>,
117}
118
119impl<'e> BulkInsertBuilder<'e, Validated> {
120	pub(crate) fn new(engine: &'e StandardEngine, identity: IdentityId) -> Self {
121		Self {
122			engine,
123			identity,
124			pending_tables: Vec::new(),
125			pending_ringbuffers: Vec::new(),
126			pending_series: Vec::new(),
127			_validation: PhantomData,
128		}
129	}
130}
131
132impl<'e> BulkInsertBuilder<'e, Unchecked> {
133	pub(crate) fn new_unchecked(engine: &'e StandardEngine, identity: IdentityId) -> Self {
134		Self {
135			engine,
136			identity,
137			pending_tables: Vec::new(),
138			pending_ringbuffers: Vec::new(),
139			pending_series: Vec::new(),
140			_validation: PhantomData,
141		}
142	}
143}
144
145impl<'e, V: ValidationMode> BulkInsertBuilder<'e, V> {
146	pub fn table<'a>(&'a mut self, qualified_name: &str) -> TableInsertBuilder<'a, 'e, V> {
147		let (namespace, table) = parse_qualified_name(qualified_name);
148		TableInsertBuilder::new(self, namespace, table)
149	}
150
151	pub fn ringbuffer<'a>(&'a mut self, qualified_name: &str) -> RingBufferInsertBuilder<'a, 'e, V> {
152		let (namespace, ringbuffer) = parse_qualified_name(qualified_name);
153		RingBufferInsertBuilder::new(self, namespace, ringbuffer)
154	}
155
156	pub fn series<'a>(&'a mut self, qualified_name: &str) -> SeriesInsertBuilder<'a, 'e, V> {
157		let (namespace, series) = parse_qualified_name(qualified_name);
158		SeriesInsertBuilder::new(self, namespace, series)
159	}
160
161	pub(super) fn add_table_insert(&mut self, pending: PendingTableInsert) {
162		self.pending_tables.push(pending);
163	}
164
165	pub(super) fn add_ringbuffer_insert(&mut self, pending: PendingRingBufferInsert) {
166		self.pending_ringbuffers.push(pending);
167	}
168
169	pub(super) fn add_series_insert(&mut self, pending: PendingSeriesInsert) {
170		self.pending_series.push(pending);
171	}
172
173	pub fn execute(self) -> Result<BulkInsertResult> {
174		self.engine.reject_if_read_only()?;
175		let mut txn = self.engine.begin_command(self.identity)?;
176		let catalog = self.engine.catalog();
177		let clock = self.engine.clock();
178		let total_rows = self.total_pending_rows();
179		let pending_tables = self.pending_tables;
180		let pending_ringbuffers = self.pending_ringbuffers;
181		let pending_series = self.pending_series;
182
183		V::run(&mut txn, total_rows, move |txn| {
184			run_all_pending::<V>(catalog, clock, txn, pending_tables, pending_ringbuffers, pending_series)
185		})
186	}
187
188	#[inline]
189	fn total_pending_rows(&self) -> usize {
190		self.pending_tables.iter().map(|p| p.rows.len()).sum::<usize>()
191			+ self.pending_ringbuffers.iter().map(|p| p.rows.len()).sum::<usize>()
192			+ self.pending_series.iter().map(|p| p.rows.len()).sum::<usize>()
193	}
194}
195
196#[inline]
197fn run_all_pending<V: ValidationMode>(
198	catalog: Catalog,
199	clock: &Clock,
200	txn: &mut CommandTransaction,
201	pending_tables: Vec<PendingTableInsert>,
202	pending_ringbuffers: Vec<PendingRingBufferInsert>,
203	pending_series: Vec<PendingSeriesInsert>,
204) -> Result<BulkInsertResult> {
205	let mut result = BulkInsertResult::default();
206	for pending in pending_tables {
207		result.tables.push(execute_table_insert::<V>(&catalog, txn, &pending, clock)?);
208	}
209	for pending in pending_ringbuffers {
210		result.ringbuffers.push(execute_ringbuffer_insert::<V>(&catalog, txn, &pending, clock)?);
211	}
212	for pending in pending_series {
213		result.series.push(execute_series_insert::<V>(&catalog, txn, &pending, clock)?);
214	}
215	Ok(result)
216}
217
218fn execute_table_insert<V: ValidationMode>(
219	catalog: &Catalog,
220	txn: &mut CommandTransaction,
221	pending: &PendingTableInsert,
222	clock: &Clock,
223) -> Result<TableInsertResult> {
224	let table = resolve_table(catalog, txn, pending)?;
225	let shape = get_or_create_table_shape(catalog, &table, &mut Transaction::Command(txn))?;
226	let encoded_rows = encode_table_rows::<V>(catalog, txn, pending, &table, &shape, clock)?;
227	if encoded_rows.is_empty() {
228		return Ok(empty_table_result(pending));
229	}
230	write_table_rows(catalog, txn, &table, &shape, pending, encoded_rows)
231}
232
233#[inline]
234fn empty_table_result(pending: &PendingTableInsert) -> TableInsertResult {
235	TableInsertResult {
236		namespace: pending.namespace.clone(),
237		table: pending.table.clone(),
238		inserted: 0,
239	}
240}
241
242#[inline]
243fn write_table_rows(
244	catalog: &Catalog,
245	txn: &mut CommandTransaction,
246	table: &Table,
247	shape: &RowShape,
248	pending: &PendingTableInsert,
249	encoded_rows: Vec<EncodedRow>,
250) -> Result<TableInsertResult> {
251	let total_rows = encoded_rows.len();
252	let row_numbers = catalog.next_row_number_batch(txn, table.id, total_rows as u64)?;
253	let pk_def = primary_key::get_primary_key(catalog, &mut Transaction::Command(txn), table)?;
254	let row_number_shape = pk_def.as_ref().map(|_| RowShape::testing(&[Type::Uint8]));
255
256	let mut owned_rows = encoded_rows;
257	txn.insert_table(table, shape, &row_numbers, &mut owned_rows)?;
258
259	if let Some(ref pk_def) = pk_def {
260		for (row, &row_number) in owned_rows.iter().zip(row_numbers.iter()) {
261			write_primary_key_index(
262				txn,
263				table,
264				shape,
265				pk_def,
266				row,
267				row_number,
268				row_number_shape.as_ref().unwrap(),
269			)?;
270		}
271	}
272
273	Ok(TableInsertResult {
274		namespace: pending.namespace.clone(),
275		table: pending.table.clone(),
276		inserted: total_rows as u64,
277	})
278}
279
280fn resolve_table(catalog: &Catalog, txn: &mut CommandTransaction, pending: &PendingTableInsert) -> Result<Table> {
281	let namespace = catalog
282		.find_namespace_by_name(&mut Transaction::Command(txn), &pending.namespace)?
283		.ok_or_else(|| CatalogError::NotFound {
284			kind: CatalogObjectKind::Namespace,
285			namespace: pending.namespace.to_string(),
286			name: String::new(),
287			fragment: Fragment::None,
288		})?;
289
290	catalog.find_table_by_name(&mut Transaction::Command(txn), namespace.id(), &pending.table)?.ok_or_else(|| {
291		CatalogError::NotFound {
292			kind: CatalogObjectKind::Table,
293			namespace: pending.namespace.to_string(),
294			name: pending.table.to_string(),
295			fragment: Fragment::None,
296		}
297		.into()
298	})
299}
300
301fn encode_table_rows<V: ValidationMode>(
302	catalog: &Catalog,
303	txn: &mut CommandTransaction,
304	pending: &PendingTableInsert,
305	table: &Table,
306	shape: &RowShape,
307	clock: &Clock,
308) -> Result<Vec<EncodedRow>> {
309	let coerced_rows = coerce_table_rows::<V>(&pending.rows, table)?;
310	let mut encoded_rows = Vec::with_capacity(coerced_rows.len());
311	for values in coerced_rows {
312		encoded_rows.push(prepare_table_row::<V>(catalog, txn, table, shape, clock, values)?);
313	}
314	Ok(encoded_rows)
315}
316
317#[inline]
318fn coerce_table_rows<V: ValidationMode>(rows: &[Params], table: &Table) -> Result<Vec<Vec<Value>>> {
319	if V::VALIDATED {
320		validate_and_coerce_rows(rows, table)
321	} else {
322		reorder_rows_unvalidated(rows, table)
323	}
324}
325
326#[inline]
327fn prepare_table_row<V: ValidationMode>(
328	catalog: &Catalog,
329	txn: &mut CommandTransaction,
330	table: &Table,
331	shape: &RowShape,
332	clock: &Clock,
333	mut values: Vec<Value>,
334) -> Result<EncodedRow> {
335	fill_auto_increment_table(catalog, txn, table, &mut values)?;
336	dictionary_encode_table(catalog, txn, table, &mut values)?;
337	if V::VALIDATED {
338		validate_table_constraints(table, &values)?;
339	}
340	Ok(encode_row(shape, &values, clock))
341}
342
343#[inline]
344fn validate_table_constraints(table: &Table, values: &[Value]) -> Result<()> {
345	for (idx, col) in table.columns.iter().enumerate() {
346		col.constraint.validate(&values[idx])?;
347	}
348	Ok(())
349}
350
351fn fill_auto_increment_table(
352	catalog: &Catalog,
353	txn: &mut CommandTransaction,
354	table: &Table,
355	values: &mut [Value],
356) -> Result<()> {
357	for (idx, col) in table.columns.iter().enumerate() {
358		if col.auto_increment && matches!(values[idx], Value::None { .. }) {
359			values[idx] = catalog.column_sequence_next_value(txn, table.id, col.id)?;
360		}
361	}
362	Ok(())
363}
364
365fn dictionary_encode_table(
366	catalog: &Catalog,
367	txn: &mut CommandTransaction,
368	table: &Table,
369	values: &mut [Value],
370) -> Result<()> {
371	for (idx, col) in table.columns.iter().enumerate() {
372		if let Some(dict_id) = col.dictionary_id {
373			let dictionary =
374				catalog.find_dictionary(&mut Transaction::Command(txn), dict_id)?.ok_or_else(|| {
375					internal_error!("Dictionary {:?} not found for column {}", dict_id, col.name)
376				})?;
377			let entry_id = txn.insert_into_dictionary(&dictionary, &values[idx])?;
378			values[idx] = entry_id.to_value();
379		}
380	}
381	Ok(())
382}
383
384fn encode_row(shape: &RowShape, values: &[Value], clock: &Clock) -> EncodedRow {
385	let mut row = shape.allocate();
386	for (idx, value) in values.iter().enumerate() {
387		shape.set_value(&mut row, idx, value);
388	}
389	let now_nanos = clock.now_nanos();
390	row.set_timestamps(now_nanos, now_nanos);
391	row
392}
393
394fn write_primary_key_index(
395	txn: &mut CommandTransaction,
396	table: &Table,
397	shape: &RowShape,
398	pk_def: &PrimaryKey,
399	row: &EncodedRow,
400	row_number: RowNumber,
401	row_number_shape: &RowShape,
402) -> Result<()> {
403	let index_key = primary_key::encode_primary_key(pk_def, row, table, shape)?;
404	let index_entry_key = IndexEntryKey::new(table.id, IndexId::primary(pk_def.id), index_key);
405
406	if txn.contains_key(&index_entry_key.encode())? {
407		let key_columns = pk_def.columns.iter().map(|c| c.name.clone()).collect();
408		return Err(CoreError::PrimaryKeyViolation {
409			fragment: Fragment::None,
410			table_name: table.name.clone(),
411			key_columns,
412		}
413		.into());
414	}
415
416	let mut row_number_encoded = row_number_shape.allocate();
417	row_number_shape.set_u64(&mut row_number_encoded, 0, u64::from(row_number));
418	txn.set(&index_entry_key.encode(), row_number_encoded)?;
419	Ok(())
420}
421
422fn execute_ringbuffer_insert<V: ValidationMode>(
423	catalog: &Catalog,
424	txn: &mut CommandTransaction,
425	pending: &PendingRingBufferInsert,
426	clock: &Clock,
427) -> Result<RingBufferInsertResult> {
428	let ringbuffer = resolve_ringbuffer(catalog, txn, pending)?;
429	let mut metadata = load_ringbuffer_metadata(catalog, txn, pending, &ringbuffer)?;
430	let shape = get_or_create_ringbuffer_shape(catalog, &ringbuffer, &mut Transaction::Command(txn))?;
431	let coerced_rows = coerce_ringbuffer_rows::<V>(pending, &ringbuffer)?;
432	let inserted =
433		insert_ringbuffer_rows::<V>(catalog, txn, &ringbuffer, &shape, coerced_rows, &mut metadata, clock)?;
434	catalog.update_ringbuffer_metadata(txn, metadata)?;
435	Ok(RingBufferInsertResult {
436		namespace: pending.namespace.clone(),
437		ringbuffer: pending.ringbuffer.clone(),
438		inserted,
439	})
440}
441
442#[inline]
443fn resolve_ringbuffer(
444	catalog: &Catalog,
445	txn: &mut CommandTransaction,
446	pending: &PendingRingBufferInsert,
447) -> Result<RingBuffer> {
448	let namespace = catalog
449		.find_namespace_by_name(&mut Transaction::Command(txn), &pending.namespace)?
450		.ok_or_else(|| CatalogError::NotFound {
451			kind: CatalogObjectKind::Namespace,
452			namespace: pending.namespace.to_string(),
453			name: String::new(),
454			fragment: Fragment::None,
455		})?;
456
457	catalog.find_ringbuffer_by_name(&mut Transaction::Command(txn), namespace.id(), &pending.ringbuffer)?
458		.ok_or_else(|| {
459			CatalogError::NotFound {
460				kind: CatalogObjectKind::RingBuffer,
461				namespace: pending.namespace.to_string(),
462				name: pending.ringbuffer.to_string(),
463				fragment: Fragment::None,
464			}
465			.into()
466		})
467}
468
469#[inline]
470fn load_ringbuffer_metadata(
471	catalog: &Catalog,
472	txn: &mut CommandTransaction,
473	pending: &PendingRingBufferInsert,
474	ringbuffer: &RingBuffer,
475) -> Result<RingBufferMetadata> {
476	catalog.find_ringbuffer_metadata(&mut Transaction::Command(txn), ringbuffer.id)?.ok_or_else(|| {
477		CatalogError::NotFound {
478			kind: CatalogObjectKind::RingBuffer,
479			namespace: pending.namespace.to_string(),
480			name: pending.ringbuffer.to_string(),
481			fragment: Fragment::None,
482		}
483		.into()
484	})
485}
486
487#[inline]
488fn coerce_ringbuffer_rows<V: ValidationMode>(
489	pending: &PendingRingBufferInsert,
490	ringbuffer: &RingBuffer,
491) -> Result<Vec<Vec<Value>>> {
492	if V::VALIDATED {
493		validate_and_coerce_rows_rb(&pending.rows, ringbuffer)
494	} else {
495		reorder_rows_unvalidated_rb(&pending.rows, ringbuffer)
496	}
497}
498
499fn insert_ringbuffer_rows<V: ValidationMode>(
500	catalog: &Catalog,
501	txn: &mut CommandTransaction,
502	ringbuffer: &RingBuffer,
503	shape: &RowShape,
504	coerced_rows: Vec<Vec<Value>>,
505	metadata: &mut RingBufferMetadata,
506	clock: &Clock,
507) -> Result<u64> {
508	let mut inserted_count = 0u64;
509	for mut values in coerced_rows {
510		dict_encode_ringbuffer_row(catalog, txn, ringbuffer, &mut values)?;
511
512		if V::VALIDATED {
513			for (idx, col) in ringbuffer.columns.iter().enumerate() {
514				col.constraint.validate(&values[idx])?;
515			}
516		}
517
518		let mut row = shape.allocate();
519		for (idx, value) in values.iter().enumerate() {
520			shape.set_value(&mut row, idx, value);
521		}
522		let now_nanos = clock.now_nanos();
523		row.set_timestamps(now_nanos, now_nanos);
524
525		evict_oldest_if_full(txn, ringbuffer, metadata)?;
526
527		let row_number = catalog.next_row_number_for_ringbuffer(txn, ringbuffer.id)?;
528		txn.insert_ringbuffer_at(ringbuffer, shape, row_number, row)?;
529
530		if metadata.is_empty() {
531			metadata.head = row_number.0;
532		}
533		metadata.count += 1;
534		metadata.tail = row_number.0 + 1;
535
536		inserted_count += 1;
537	}
538	Ok(inserted_count)
539}
540
541#[inline]
542fn evict_oldest_if_full(
543	txn: &mut CommandTransaction,
544	ringbuffer: &RingBuffer,
545	metadata: &mut RingBufferMetadata,
546) -> Result<()> {
547	if metadata.is_full() {
548		let oldest_row = RowNumber(metadata.head);
549		txn.remove_from_ringbuffer(ringbuffer, oldest_row)?;
550		metadata.head += 1;
551		metadata.count -= 1;
552	}
553	Ok(())
554}
555
556#[inline]
557fn dict_encode_ringbuffer_row(
558	catalog: &Catalog,
559	txn: &mut CommandTransaction,
560	ringbuffer: &RingBuffer,
561	values: &mut [Value],
562) -> Result<()> {
563	for (idx, col) in ringbuffer.columns.iter().enumerate() {
564		if let Some(dict_id) = col.dictionary_id {
565			let dictionary =
566				catalog.find_dictionary(&mut Transaction::Command(txn), dict_id)?.ok_or_else(|| {
567					internal_error!("Dictionary {:?} not found for column {}", dict_id, col.name)
568				})?;
569			let entry_id = txn.insert_into_dictionary(&dictionary, &values[idx])?;
570			values[idx] = entry_id.to_value();
571		}
572	}
573	Ok(())
574}
575
576fn execute_series_insert<V: ValidationMode>(
577	catalog: &Catalog,
578	txn: &mut CommandTransaction,
579	pending: &PendingSeriesInsert,
580	clock: &Clock,
581) -> Result<SeriesInsertResult> {
582	let series = resolve_series(catalog, txn, pending)?;
583	let mut metadata = load_series_metadata(catalog, txn, pending, &series)?;
584	let shape = get_or_create_series_shape(catalog, &series, &mut Transaction::Command(txn))?;
585	let coerced_rows = coerce_series_rows::<V>(pending, &series)?;
586	let inserted = insert_series_rows::<V>(txn, &series, &shape, coerced_rows, &mut metadata, clock)?;
587	catalog.update_series_metadata_txn(&mut Transaction::Command(txn), metadata)?;
588	Ok(SeriesInsertResult {
589		namespace: pending.namespace.clone(),
590		series: pending.series.clone(),
591		inserted,
592	})
593}
594
595#[inline]
596fn resolve_series(catalog: &Catalog, txn: &mut CommandTransaction, pending: &PendingSeriesInsert) -> Result<Series> {
597	let namespace = catalog
598		.find_namespace_by_name(&mut Transaction::Command(txn), &pending.namespace)?
599		.ok_or_else(|| CatalogError::NotFound {
600			kind: CatalogObjectKind::Namespace,
601			namespace: pending.namespace.to_string(),
602			name: String::new(),
603			fragment: Fragment::None,
604		})?;
605
606	catalog.find_series_by_name(&mut Transaction::Command(txn), namespace.id(), &pending.series)?.ok_or_else(|| {
607		CatalogError::NotFound {
608			kind: CatalogObjectKind::Series,
609			namespace: pending.namespace.to_string(),
610			name: pending.series.to_string(),
611			fragment: Fragment::None,
612		}
613		.into()
614	})
615}
616
617#[inline]
618fn load_series_metadata(
619	catalog: &Catalog,
620	txn: &mut CommandTransaction,
621	pending: &PendingSeriesInsert,
622	series: &Series,
623) -> Result<SeriesMetadata> {
624	catalog.find_series_metadata(&mut Transaction::Command(txn), series.id)?.ok_or_else(|| {
625		CatalogError::NotFound {
626			kind: CatalogObjectKind::Series,
627			namespace: pending.namespace.to_string(),
628			name: pending.series.to_string(),
629			fragment: Fragment::None,
630		}
631		.into()
632	})
633}
634
635#[inline]
636fn coerce_series_rows<V: ValidationMode>(pending: &PendingSeriesInsert, series: &Series) -> Result<Vec<Vec<Value>>> {
637	if V::VALIDATED {
638		validate_and_coerce_rows_series(&pending.rows, series)
639	} else {
640		reorder_rows_unvalidated_series(&pending.rows, series)
641	}
642}
643
644fn insert_series_rows<V: ValidationMode>(
645	txn: &mut CommandTransaction,
646	series: &Series,
647	shape: &RowShape,
648	coerced_rows: Vec<Vec<Value>>,
649	metadata: &mut SeriesMetadata,
650	clock: &Clock,
651) -> Result<u64> {
652	let key_col_name = series.key.column();
653	let key_col_idx =
654		series.columns.iter().position(|c| c.name == key_col_name).ok_or_else(|| {
655			internal_error!("series {} key column {} not found", series.name, key_col_name)
656		})?;
657
658	let mut inserted_count = 0u64;
659	for values in coerced_rows {
660		if V::VALIDATED {
661			for (idx, col) in series.columns.iter().enumerate() {
662				col.constraint.validate(&values[idx])?;
663			}
664		}
665
666		let key_value = series.key_to_u64(values[key_col_idx].clone()).unwrap_or(0);
667
668		metadata.sequence_counter += 1;
669		let sequence = metadata.sequence_counter;
670		let row_key = SeriesRowKey {
671			series: series.id,
672			variant_tag: None,
673			key: key_value,
674			sequence,
675		};
676		let encoded_key = row_key.encode();
677
678		let row = encode_series_row(series, shape, key_value, &values, key_col_idx, clock);
679
680		let mut rows_buf = [row];
681		SeriesRowInterceptor::pre_insert(txn, series, &mut rows_buf)?;
682		let [row] = rows_buf;
683		txn.set(&encoded_key, row.clone())?;
684		let rows = [row.clone()];
685		SeriesRowInterceptor::post_insert(txn, series, &rows)?;
686
687		update_series_metadata_for_insert(metadata, key_value);
688		inserted_count += 1;
689	}
690	Ok(inserted_count)
691}
692
693#[inline]
694fn encode_series_row(
695	series: &Series,
696	shape: &RowShape,
697	key_value: u64,
698	values: &[Value],
699	key_col_idx: usize,
700	clock: &Clock,
701) -> EncodedRow {
702	let key_value_encoded = series.key_from_u64(key_value);
703	let mut row = shape.allocate();
704	shape.set_value(&mut row, 0, &key_value_encoded);
705	let mut shape_idx = 1;
706	for (col_idx, value) in values.iter().enumerate() {
707		if col_idx == key_col_idx {
708			continue;
709		}
710		shape.set_value(&mut row, shape_idx, value);
711		shape_idx += 1;
712	}
713	let now_nanos = clock.now_nanos();
714	row.set_timestamps(now_nanos, now_nanos);
715	row
716}
717
718#[inline]
719fn update_series_metadata_for_insert(metadata: &mut SeriesMetadata, key_value: u64) {
720	if metadata.row_count == 0 {
721		metadata.oldest_key = key_value;
722		metadata.newest_key = key_value;
723	} else {
724		if key_value < metadata.oldest_key {
725			metadata.oldest_key = key_value;
726		}
727		if key_value > metadata.newest_key {
728			metadata.newest_key = key_value;
729		}
730	}
731	metadata.row_count += 1;
732}
733
734fn parse_qualified_name(qualified_name: &str) -> (String, String) {
735	if let Some((ns, name)) = qualified_name.rsplit_once("::") {
736		(ns.to_string(), name.to_string())
737	} else {
738		("default".to_string(), qualified_name.to_string())
739	}
740}
741
742#[cfg(test)]
743mod tests {
744	use super::*;
745
746	#[test]
747	fn parse_qualified_name_simple() {
748		assert_eq!(parse_qualified_name("table"), ("default".to_string(), "table".to_string()));
749	}
750
751	#[test]
752	fn parse_qualified_name_single_namespace() {
753		assert_eq!(parse_qualified_name("ns::table"), ("ns".to_string(), "table".to_string()));
754	}
755
756	#[test]
757	fn parse_qualified_name_nested_namespace() {
758		assert_eq!(parse_qualified_name("a::b::table"), ("a::b".to_string(), "table".to_string()));
759	}
760
761	#[test]
762	fn parse_qualified_name_deeply_nested_namespace() {
763		assert_eq!(parse_qualified_name("a::b::c::table"), ("a::b::c".to_string(), "table".to_string()));
764	}
765
766	#[test]
767	fn parse_qualified_name_empty_string() {
768		assert_eq!(parse_qualified_name(""), ("default".to_string(), "".to_string()));
769	}
770}