1use std::marker::PhantomData;
5
6use reifydb_catalog::{
7 catalog::Catalog,
8 error::{CatalogError, CatalogObjectKind},
9};
10use reifydb_core::{
11 encoded::{row::EncodedRow, shape::RowShape},
12 error::CoreError,
13 interface::catalog::{
14 id::IndexId,
15 key::PrimaryKey,
16 ringbuffer::{RingBuffer, RingBufferMetadata},
17 series::{Series, SeriesMetadata},
18 table::Table,
19 },
20 internal_error,
21 key::{EncodableKey, index_entry::IndexEntryKey, series_row::SeriesRowKey},
22};
23use reifydb_runtime::context::clock::Clock;
24use reifydb_transaction::{
25 interceptor::series_row::SeriesRowInterceptor,
26 transaction::{Transaction, command::CommandTransaction},
27};
28use reifydb_type::{
29 fragment::Fragment,
30 params::Params,
31 value::{Value, identity::IdentityId, row_number::RowNumber, r#type::Type},
32};
33
34use super::{
35 BulkInsertResult, RingBufferInsertResult, SeriesInsertResult, TableInsertResult,
36 validation::{
37 reorder_rows_unvalidated, reorder_rows_unvalidated_rb, reorder_rows_unvalidated_series,
38 validate_and_coerce_rows, validate_and_coerce_rows_rb, validate_and_coerce_rows_series,
39 },
40};
41use crate::{
42 Result,
43 bulk_insert::primitive::{
44 ringbuffer::{PendingRingBufferInsert, RingBufferInsertBuilder},
45 series::{PendingSeriesInsert, SeriesInsertBuilder},
46 table::{PendingTableInsert, TableInsertBuilder},
47 },
48 engine::StandardEngine,
49 transaction::operation::{
50 dictionary::DictionaryOperations, ringbuffer::RingBufferOperations, table::TableOperations,
51 },
52 vm::instruction::dml::{
53 primary_key,
54 shape::{get_or_create_ringbuffer_shape, get_or_create_series_shape, get_or_create_table_shape},
55 },
56};
57
58pub trait ValidationMode: sealed::Sealed + 'static {
60 const VALIDATED: bool;
62
63 fn run<F, R>(txn: &mut CommandTransaction, total_rows: usize, body: F) -> Result<R>
68 where
69 F: FnOnce(&mut CommandTransaction) -> Result<R>;
70}
71
72pub struct Validated;
74impl ValidationMode for Validated {
75 const VALIDATED: bool = true;
76
77 fn run<F, R>(txn: &mut CommandTransaction, total_rows: usize, body: F) -> Result<R>
78 where
79 F: FnOnce(&mut CommandTransaction) -> Result<R>,
80 {
81 run_checked(txn, total_rows, body)
82 }
83}
84
85pub struct Unchecked;
89impl ValidationMode for Unchecked {
90 const VALIDATED: bool = false;
91
92 fn run<F, R>(txn: &mut CommandTransaction, _total_rows: usize, body: F) -> Result<R>
93 where
94 F: FnOnce(&mut CommandTransaction) -> Result<R>,
95 {
96 txn.execute_bulk_unchecked(body)
97 }
98}
99
100fn run_checked<F, R>(txn: &mut CommandTransaction, total_rows: usize, body: F) -> Result<R>
101where
102 F: FnOnce(&mut CommandTransaction) -> Result<R>,
103{
104 if total_rows > 0 {
108 txn.reserve_writes(total_rows.saturating_mul(2))?;
109 }
110 let r = body(txn)?;
111 txn.commit()?;
112 Ok(r)
113}
114
115pub mod sealed {
116
117 use super::{Unchecked, Validated};
118 pub trait Sealed {}
119 impl Sealed for Validated {}
120 impl Sealed for Unchecked {}
121}
122
123pub struct BulkInsertBuilder<'e, V: ValidationMode = Validated> {
127 engine: &'e StandardEngine,
128 identity: IdentityId,
129 pending_tables: Vec<PendingTableInsert>,
130 pending_ringbuffers: Vec<PendingRingBufferInsert>,
131 pending_series: Vec<PendingSeriesInsert>,
132 _validation: PhantomData<V>,
133}
134
135impl<'e> BulkInsertBuilder<'e, Validated> {
136 pub(crate) fn new(engine: &'e StandardEngine, identity: IdentityId) -> Self {
138 Self {
139 engine,
140 identity,
141 pending_tables: Vec::new(),
142 pending_ringbuffers: Vec::new(),
143 pending_series: Vec::new(),
144 _validation: PhantomData,
145 }
146 }
147}
148
149impl<'e> BulkInsertBuilder<'e, Unchecked> {
150 pub(crate) fn new_unchecked(engine: &'e StandardEngine, identity: IdentityId) -> Self {
153 Self {
154 engine,
155 identity,
156 pending_tables: Vec::new(),
157 pending_ringbuffers: Vec::new(),
158 pending_series: Vec::new(),
159 _validation: PhantomData,
160 }
161 }
162}
163
164impl<'e, V: ValidationMode> BulkInsertBuilder<'e, V> {
165 pub fn table<'a>(&'a mut self, qualified_name: &str) -> TableInsertBuilder<'a, 'e, V> {
170 let (namespace, table) = parse_qualified_name(qualified_name);
171 TableInsertBuilder::new(self, namespace, table)
172 }
173
174 pub fn ringbuffer<'a>(&'a mut self, qualified_name: &str) -> RingBufferInsertBuilder<'a, 'e, V> {
179 let (namespace, ringbuffer) = parse_qualified_name(qualified_name);
180 RingBufferInsertBuilder::new(self, namespace, ringbuffer)
181 }
182
183 pub fn series<'a>(&'a mut self, qualified_name: &str) -> SeriesInsertBuilder<'a, 'e, V> {
188 let (namespace, series) = parse_qualified_name(qualified_name);
189 SeriesInsertBuilder::new(self, namespace, series)
190 }
191
192 pub(super) fn add_table_insert(&mut self, pending: PendingTableInsert) {
194 self.pending_tables.push(pending);
195 }
196
197 pub(super) fn add_ringbuffer_insert(&mut self, pending: PendingRingBufferInsert) {
199 self.pending_ringbuffers.push(pending);
200 }
201
202 pub(super) fn add_series_insert(&mut self, pending: PendingSeriesInsert) {
204 self.pending_series.push(pending);
205 }
206
207 pub fn execute(self) -> Result<BulkInsertResult> {
212 self.engine.reject_if_read_only()?;
213 let mut txn = self.engine.begin_command(self.identity)?;
214 let catalog = self.engine.catalog();
215 let clock = self.engine.clock();
216 let total_rows = self.total_pending_rows();
217 let pending_tables = self.pending_tables;
218 let pending_ringbuffers = self.pending_ringbuffers;
219 let pending_series = self.pending_series;
220
221 V::run(&mut txn, total_rows, move |txn| {
222 run_all_pending::<V>(catalog, clock, txn, pending_tables, pending_ringbuffers, pending_series)
223 })
224 }
225
226 #[inline]
227 fn total_pending_rows(&self) -> usize {
228 self.pending_tables.iter().map(|p| p.rows.len()).sum::<usize>()
229 + self.pending_ringbuffers.iter().map(|p| p.rows.len()).sum::<usize>()
230 + self.pending_series.iter().map(|p| p.rows.len()).sum::<usize>()
231 }
232}
233
234#[inline]
235fn run_all_pending<V: ValidationMode>(
236 catalog: Catalog,
237 clock: &Clock,
238 txn: &mut CommandTransaction,
239 pending_tables: Vec<PendingTableInsert>,
240 pending_ringbuffers: Vec<PendingRingBufferInsert>,
241 pending_series: Vec<PendingSeriesInsert>,
242) -> Result<BulkInsertResult> {
243 let mut result = BulkInsertResult::default();
244 for pending in pending_tables {
245 result.tables.push(execute_table_insert::<V>(&catalog, txn, &pending, clock)?);
246 }
247 for pending in pending_ringbuffers {
248 result.ringbuffers.push(execute_ringbuffer_insert::<V>(&catalog, txn, &pending, clock)?);
249 }
250 for pending in pending_series {
251 result.series.push(execute_series_insert::<V>(&catalog, txn, &pending, clock)?);
252 }
253 Ok(result)
254}
255
256fn execute_table_insert<V: ValidationMode>(
257 catalog: &Catalog,
258 txn: &mut CommandTransaction,
259 pending: &PendingTableInsert,
260 clock: &Clock,
261) -> Result<TableInsertResult> {
262 let table = resolve_table(catalog, txn, pending)?;
263 let shape = get_or_create_table_shape(catalog, &table, &mut Transaction::Command(txn))?;
264 let encoded_rows = encode_table_rows::<V>(catalog, txn, pending, &table, &shape, clock)?;
265 if encoded_rows.is_empty() {
266 return Ok(empty_table_result(pending));
267 }
268 write_table_rows(catalog, txn, &table, &shape, pending, encoded_rows)
269}
270
271#[inline]
272fn empty_table_result(pending: &PendingTableInsert) -> TableInsertResult {
273 TableInsertResult {
274 namespace: pending.namespace.clone(),
275 table: pending.table.clone(),
276 inserted: 0,
277 }
278}
279
280#[inline]
281fn write_table_rows(
282 catalog: &Catalog,
283 txn: &mut CommandTransaction,
284 table: &Table,
285 shape: &RowShape,
286 pending: &PendingTableInsert,
287 encoded_rows: Vec<EncodedRow>,
288) -> Result<TableInsertResult> {
289 let total_rows = encoded_rows.len();
290 let row_numbers = catalog.next_row_number_batch(txn, table.id, total_rows as u64)?;
291 let pk_def = primary_key::get_primary_key(catalog, &mut Transaction::Command(txn), table)?;
292 let row_number_shape = pk_def.as_ref().map(|_| RowShape::testing(&[Type::Uint8]));
293
294 for (row, &row_number) in encoded_rows.iter().zip(row_numbers.iter()) {
295 txn.insert_table(table, shape, row.clone(), row_number)?;
296
297 if let Some(ref pk_def) = pk_def {
298 write_primary_key_index(
299 txn,
300 table,
301 shape,
302 pk_def,
303 row,
304 row_number,
305 row_number_shape.as_ref().unwrap(),
306 )?;
307 }
308 }
309
310 Ok(TableInsertResult {
311 namespace: pending.namespace.clone(),
312 table: pending.table.clone(),
313 inserted: total_rows as u64,
314 })
315}
316
317fn resolve_table(catalog: &Catalog, txn: &mut CommandTransaction, pending: &PendingTableInsert) -> Result<Table> {
318 let namespace = catalog
319 .find_namespace_by_name(&mut Transaction::Command(txn), &pending.namespace)?
320 .ok_or_else(|| CatalogError::NotFound {
321 kind: CatalogObjectKind::Namespace,
322 namespace: pending.namespace.to_string(),
323 name: String::new(),
324 fragment: Fragment::None,
325 })?;
326
327 catalog.find_table_by_name(&mut Transaction::Command(txn), namespace.id(), &pending.table)?.ok_or_else(|| {
328 CatalogError::NotFound {
329 kind: CatalogObjectKind::Table,
330 namespace: pending.namespace.to_string(),
331 name: pending.table.to_string(),
332 fragment: Fragment::None,
333 }
334 .into()
335 })
336}
337
338fn encode_table_rows<V: ValidationMode>(
339 catalog: &Catalog,
340 txn: &mut CommandTransaction,
341 pending: &PendingTableInsert,
342 table: &Table,
343 shape: &RowShape,
344 clock: &Clock,
345) -> Result<Vec<EncodedRow>> {
346 let coerced_rows = coerce_table_rows::<V>(&pending.rows, table)?;
347 let mut encoded_rows = Vec::with_capacity(coerced_rows.len());
348 for values in coerced_rows {
349 encoded_rows.push(prepare_table_row::<V>(catalog, txn, table, shape, clock, values)?);
350 }
351 Ok(encoded_rows)
352}
353
354#[inline]
355fn coerce_table_rows<V: ValidationMode>(rows: &[Params], table: &Table) -> Result<Vec<Vec<Value>>> {
356 if V::VALIDATED {
357 validate_and_coerce_rows(rows, table)
358 } else {
359 reorder_rows_unvalidated(rows, table)
360 }
361}
362
363#[inline]
364fn prepare_table_row<V: ValidationMode>(
365 catalog: &Catalog,
366 txn: &mut CommandTransaction,
367 table: &Table,
368 shape: &RowShape,
369 clock: &Clock,
370 mut values: Vec<Value>,
371) -> Result<EncodedRow> {
372 fill_auto_increment_table(catalog, txn, table, &mut values)?;
373 dictionary_encode_table(catalog, txn, table, &mut values)?;
374 if V::VALIDATED {
375 validate_table_constraints(table, &values)?;
376 }
377 Ok(encode_row(shape, &values, clock))
378}
379
380#[inline]
381fn validate_table_constraints(table: &Table, values: &[Value]) -> Result<()> {
382 for (idx, col) in table.columns.iter().enumerate() {
383 col.constraint.validate(&values[idx])?;
384 }
385 Ok(())
386}
387
388fn fill_auto_increment_table(
389 catalog: &Catalog,
390 txn: &mut CommandTransaction,
391 table: &Table,
392 values: &mut [Value],
393) -> Result<()> {
394 for (idx, col) in table.columns.iter().enumerate() {
395 if col.auto_increment && matches!(values[idx], Value::None { .. }) {
396 values[idx] = catalog.column_sequence_next_value(txn, table.id, col.id)?;
397 }
398 }
399 Ok(())
400}
401
402fn dictionary_encode_table(
403 catalog: &Catalog,
404 txn: &mut CommandTransaction,
405 table: &Table,
406 values: &mut [Value],
407) -> Result<()> {
408 for (idx, col) in table.columns.iter().enumerate() {
409 if let Some(dict_id) = col.dictionary_id {
410 let dictionary =
411 catalog.find_dictionary(&mut Transaction::Command(txn), dict_id)?.ok_or_else(|| {
412 internal_error!("Dictionary {:?} not found for column {}", dict_id, col.name)
413 })?;
414 let entry_id = txn.insert_into_dictionary(&dictionary, &values[idx])?;
415 values[idx] = entry_id.to_value();
416 }
417 }
418 Ok(())
419}
420
421fn encode_row(shape: &RowShape, values: &[Value], clock: &Clock) -> EncodedRow {
422 let mut row = shape.allocate();
423 for (idx, value) in values.iter().enumerate() {
424 shape.set_value(&mut row, idx, value);
425 }
426 let now_nanos = clock.now_nanos();
427 row.set_timestamps(now_nanos, now_nanos);
428 row
429}
430
431fn write_primary_key_index(
432 txn: &mut CommandTransaction,
433 table: &Table,
434 shape: &RowShape,
435 pk_def: &PrimaryKey,
436 row: &EncodedRow,
437 row_number: RowNumber,
438 row_number_shape: &RowShape,
439) -> Result<()> {
440 let index_key = primary_key::encode_primary_key(pk_def, row, table, shape)?;
441 let index_entry_key = IndexEntryKey::new(table.id, IndexId::primary(pk_def.id), index_key);
442
443 if txn.contains_key(&index_entry_key.encode())? {
444 let key_columns = pk_def.columns.iter().map(|c| c.name.clone()).collect();
445 return Err(CoreError::PrimaryKeyViolation {
446 fragment: Fragment::None,
447 table_name: table.name.clone(),
448 key_columns,
449 }
450 .into());
451 }
452
453 let mut row_number_encoded = row_number_shape.allocate();
454 row_number_shape.set_u64(&mut row_number_encoded, 0, u64::from(row_number));
455 txn.set(&index_entry_key.encode(), row_number_encoded)?;
456 Ok(())
457}
458
459fn execute_ringbuffer_insert<V: ValidationMode>(
460 catalog: &Catalog,
461 txn: &mut CommandTransaction,
462 pending: &PendingRingBufferInsert,
463 clock: &Clock,
464) -> Result<RingBufferInsertResult> {
465 let ringbuffer = resolve_ringbuffer(catalog, txn, pending)?;
466 let mut metadata = load_ringbuffer_metadata(catalog, txn, pending, &ringbuffer)?;
467 let shape = get_or_create_ringbuffer_shape(catalog, &ringbuffer, &mut Transaction::Command(txn))?;
468 let coerced_rows = coerce_ringbuffer_rows::<V>(pending, &ringbuffer)?;
469 let inserted =
470 insert_ringbuffer_rows::<V>(catalog, txn, &ringbuffer, &shape, coerced_rows, &mut metadata, clock)?;
471 catalog.update_ringbuffer_metadata(txn, metadata)?;
472 Ok(RingBufferInsertResult {
473 namespace: pending.namespace.clone(),
474 ringbuffer: pending.ringbuffer.clone(),
475 inserted,
476 })
477}
478
479#[inline]
480fn resolve_ringbuffer(
481 catalog: &Catalog,
482 txn: &mut CommandTransaction,
483 pending: &PendingRingBufferInsert,
484) -> Result<RingBuffer> {
485 let namespace = catalog
486 .find_namespace_by_name(&mut Transaction::Command(txn), &pending.namespace)?
487 .ok_or_else(|| CatalogError::NotFound {
488 kind: CatalogObjectKind::Namespace,
489 namespace: pending.namespace.to_string(),
490 name: String::new(),
491 fragment: Fragment::None,
492 })?;
493
494 catalog.find_ringbuffer_by_name(&mut Transaction::Command(txn), namespace.id(), &pending.ringbuffer)?
495 .ok_or_else(|| {
496 CatalogError::NotFound {
497 kind: CatalogObjectKind::RingBuffer,
498 namespace: pending.namespace.to_string(),
499 name: pending.ringbuffer.to_string(),
500 fragment: Fragment::None,
501 }
502 .into()
503 })
504}
505
506#[inline]
507fn load_ringbuffer_metadata(
508 catalog: &Catalog,
509 txn: &mut CommandTransaction,
510 pending: &PendingRingBufferInsert,
511 ringbuffer: &RingBuffer,
512) -> Result<RingBufferMetadata> {
513 catalog.find_ringbuffer_metadata(&mut Transaction::Command(txn), ringbuffer.id)?.ok_or_else(|| {
514 CatalogError::NotFound {
515 kind: CatalogObjectKind::RingBuffer,
516 namespace: pending.namespace.to_string(),
517 name: pending.ringbuffer.to_string(),
518 fragment: Fragment::None,
519 }
520 .into()
521 })
522}
523
524#[inline]
525fn coerce_ringbuffer_rows<V: ValidationMode>(
526 pending: &PendingRingBufferInsert,
527 ringbuffer: &RingBuffer,
528) -> Result<Vec<Vec<Value>>> {
529 if V::VALIDATED {
530 validate_and_coerce_rows_rb(&pending.rows, ringbuffer)
531 } else {
532 reorder_rows_unvalidated_rb(&pending.rows, ringbuffer)
533 }
534}
535
536fn insert_ringbuffer_rows<V: ValidationMode>(
537 catalog: &Catalog,
538 txn: &mut CommandTransaction,
539 ringbuffer: &RingBuffer,
540 shape: &RowShape,
541 coerced_rows: Vec<Vec<Value>>,
542 metadata: &mut RingBufferMetadata,
543 clock: &Clock,
544) -> Result<u64> {
545 let mut inserted_count = 0u64;
546 for mut values in coerced_rows {
547 dict_encode_ringbuffer_row(catalog, txn, ringbuffer, &mut values)?;
548
549 if V::VALIDATED {
550 for (idx, col) in ringbuffer.columns.iter().enumerate() {
551 col.constraint.validate(&values[idx])?;
552 }
553 }
554
555 let mut row = shape.allocate();
556 for (idx, value) in values.iter().enumerate() {
557 shape.set_value(&mut row, idx, value);
558 }
559 let now_nanos = clock.now_nanos();
560 row.set_timestamps(now_nanos, now_nanos);
561
562 evict_oldest_if_full(txn, ringbuffer, metadata)?;
563
564 let row_number = catalog.next_row_number_for_ringbuffer(txn, ringbuffer.id)?;
565 txn.insert_ringbuffer_at(ringbuffer, shape, row_number, row)?;
566
567 if metadata.is_empty() {
568 metadata.head = row_number.0;
569 }
570 metadata.count += 1;
571 metadata.tail = row_number.0 + 1;
572
573 inserted_count += 1;
574 }
575 Ok(inserted_count)
576}
577
578#[inline]
579fn evict_oldest_if_full(
580 txn: &mut CommandTransaction,
581 ringbuffer: &RingBuffer,
582 metadata: &mut RingBufferMetadata,
583) -> Result<()> {
584 if metadata.is_full() {
585 let oldest_row = RowNumber(metadata.head);
586 txn.remove_from_ringbuffer(ringbuffer, oldest_row)?;
587 metadata.head += 1;
588 metadata.count -= 1;
589 }
590 Ok(())
591}
592
593#[inline]
594fn dict_encode_ringbuffer_row(
595 catalog: &Catalog,
596 txn: &mut CommandTransaction,
597 ringbuffer: &RingBuffer,
598 values: &mut [Value],
599) -> Result<()> {
600 for (idx, col) in ringbuffer.columns.iter().enumerate() {
601 if let Some(dict_id) = col.dictionary_id {
602 let dictionary =
603 catalog.find_dictionary(&mut Transaction::Command(txn), dict_id)?.ok_or_else(|| {
604 internal_error!("Dictionary {:?} not found for column {}", dict_id, col.name)
605 })?;
606 let entry_id = txn.insert_into_dictionary(&dictionary, &values[idx])?;
607 values[idx] = entry_id.to_value();
608 }
609 }
610 Ok(())
611}
612
613fn execute_series_insert<V: ValidationMode>(
614 catalog: &Catalog,
615 txn: &mut CommandTransaction,
616 pending: &PendingSeriesInsert,
617 clock: &Clock,
618) -> Result<SeriesInsertResult> {
619 let series = resolve_series(catalog, txn, pending)?;
620 let mut metadata = load_series_metadata(catalog, txn, pending, &series)?;
621 let shape = get_or_create_series_shape(catalog, &series, &mut Transaction::Command(txn))?;
622 let coerced_rows = coerce_series_rows::<V>(pending, &series)?;
623 let inserted = insert_series_rows::<V>(txn, &series, &shape, coerced_rows, &mut metadata, clock)?;
624 catalog.update_series_metadata_txn(&mut Transaction::Command(txn), metadata)?;
625 Ok(SeriesInsertResult {
626 namespace: pending.namespace.clone(),
627 series: pending.series.clone(),
628 inserted,
629 })
630}
631
632#[inline]
633fn resolve_series(catalog: &Catalog, txn: &mut CommandTransaction, pending: &PendingSeriesInsert) -> Result<Series> {
634 let namespace = catalog
635 .find_namespace_by_name(&mut Transaction::Command(txn), &pending.namespace)?
636 .ok_or_else(|| CatalogError::NotFound {
637 kind: CatalogObjectKind::Namespace,
638 namespace: pending.namespace.to_string(),
639 name: String::new(),
640 fragment: Fragment::None,
641 })?;
642
643 catalog.find_series_by_name(&mut Transaction::Command(txn), namespace.id(), &pending.series)?.ok_or_else(|| {
644 CatalogError::NotFound {
645 kind: CatalogObjectKind::Series,
646 namespace: pending.namespace.to_string(),
647 name: pending.series.to_string(),
648 fragment: Fragment::None,
649 }
650 .into()
651 })
652}
653
654#[inline]
655fn load_series_metadata(
656 catalog: &Catalog,
657 txn: &mut CommandTransaction,
658 pending: &PendingSeriesInsert,
659 series: &Series,
660) -> Result<SeriesMetadata> {
661 catalog.find_series_metadata(&mut Transaction::Command(txn), series.id)?.ok_or_else(|| {
662 CatalogError::NotFound {
663 kind: CatalogObjectKind::Series,
664 namespace: pending.namespace.to_string(),
665 name: pending.series.to_string(),
666 fragment: Fragment::None,
667 }
668 .into()
669 })
670}
671
672#[inline]
673fn coerce_series_rows<V: ValidationMode>(pending: &PendingSeriesInsert, series: &Series) -> Result<Vec<Vec<Value>>> {
674 if V::VALIDATED {
675 validate_and_coerce_rows_series(&pending.rows, series)
676 } else {
677 reorder_rows_unvalidated_series(&pending.rows, series)
678 }
679}
680
681fn insert_series_rows<V: ValidationMode>(
682 txn: &mut CommandTransaction,
683 series: &Series,
684 shape: &RowShape,
685 coerced_rows: Vec<Vec<Value>>,
686 metadata: &mut SeriesMetadata,
687 clock: &Clock,
688) -> Result<u64> {
689 let key_col_name = series.key.column();
690 let key_col_idx =
691 series.columns.iter().position(|c| c.name == key_col_name).ok_or_else(|| {
692 internal_error!("series {} key column {} not found", series.name, key_col_name)
693 })?;
694
695 let mut inserted_count = 0u64;
696 for values in coerced_rows {
697 if V::VALIDATED {
698 for (idx, col) in series.columns.iter().enumerate() {
699 col.constraint.validate(&values[idx])?;
700 }
701 }
702
703 let key_value = series.key_to_u64(values[key_col_idx].clone()).unwrap_or(0);
704
705 metadata.sequence_counter += 1;
706 let sequence = metadata.sequence_counter;
707 let row_key = SeriesRowKey {
708 series: series.id,
709 variant_tag: None,
710 key: key_value,
711 sequence,
712 };
713 let encoded_key = row_key.encode();
714
715 let row = encode_series_row(series, shape, key_value, &values, key_col_idx, clock);
716
717 let row = SeriesRowInterceptor::pre_insert(txn, series, row)?;
718 txn.set(&encoded_key, row.clone())?;
719 SeriesRowInterceptor::post_insert(txn, series, &row)?;
720
721 update_series_metadata_for_insert(metadata, key_value);
722 inserted_count += 1;
723 }
724 Ok(inserted_count)
725}
726
727#[inline]
728fn encode_series_row(
729 series: &Series,
730 shape: &RowShape,
731 key_value: u64,
732 values: &[Value],
733 key_col_idx: usize,
734 clock: &Clock,
735) -> EncodedRow {
736 let key_value_encoded = series.key_from_u64(key_value);
737 let mut row = shape.allocate();
738 shape.set_value(&mut row, 0, &key_value_encoded);
739 let mut shape_idx = 1;
740 for (col_idx, value) in values.iter().enumerate() {
741 if col_idx == key_col_idx {
742 continue;
743 }
744 shape.set_value(&mut row, shape_idx, value);
745 shape_idx += 1;
746 }
747 let now_nanos = clock.now_nanos();
748 row.set_timestamps(now_nanos, now_nanos);
749 row
750}
751
752#[inline]
753fn update_series_metadata_for_insert(metadata: &mut SeriesMetadata, key_value: u64) {
754 if metadata.row_count == 0 {
755 metadata.oldest_key = key_value;
756 metadata.newest_key = key_value;
757 } else {
758 if key_value < metadata.oldest_key {
759 metadata.oldest_key = key_value;
760 }
761 if key_value > metadata.newest_key {
762 metadata.newest_key = key_value;
763 }
764 }
765 metadata.row_count += 1;
766}
767
768fn parse_qualified_name(qualified_name: &str) -> (String, String) {
771 if let Some((ns, name)) = qualified_name.rsplit_once("::") {
772 (ns.to_string(), name.to_string())
773 } else {
774 ("default".to_string(), qualified_name.to_string())
775 }
776}
777
778#[cfg(test)]
779mod tests {
780 use super::*;
781
782 #[test]
783 fn parse_qualified_name_simple() {
784 assert_eq!(parse_qualified_name("table"), ("default".to_string(), "table".to_string()));
785 }
786
787 #[test]
788 fn parse_qualified_name_single_namespace() {
789 assert_eq!(parse_qualified_name("ns::table"), ("ns".to_string(), "table".to_string()));
790 }
791
792 #[test]
793 fn parse_qualified_name_nested_namespace() {
794 assert_eq!(parse_qualified_name("a::b::table"), ("a::b".to_string(), "table".to_string()));
795 }
796
797 #[test]
798 fn parse_qualified_name_deeply_nested_namespace() {
799 assert_eq!(parse_qualified_name("a::b::c::table"), ("a::b::c".to_string(), "table".to_string()));
800 }
801
802 #[test]
803 fn parse_qualified_name_empty_string() {
804 assert_eq!(parse_qualified_name(""), ("default".to_string(), "".to_string()));
805 }
806}