1use std::marker::PhantomData;
5
6use reifydb_catalog::{
7 catalog::Catalog,
8 error::{CatalogError, CatalogObjectKind},
9};
10use reifydb_core::{
11 encoded::{row::EncodedRow, shape::RowShape},
12 error::CoreError,
13 interface::catalog::{
14 id::IndexId,
15 key::PrimaryKey,
16 ringbuffer::{RingBuffer, RingBufferMetadata},
17 series::{Series, SeriesMetadata},
18 table::Table,
19 },
20 internal_error,
21 key::{EncodableKey, index_entry::IndexEntryKey, series_row::SeriesRowKey},
22};
23use reifydb_runtime::context::clock::Clock;
24use reifydb_transaction::{
25 interceptor::series_row::SeriesRowInterceptor,
26 transaction::{Transaction, command::CommandTransaction},
27};
28use reifydb_type::{
29 fragment::Fragment,
30 params::Params,
31 value::{Value, identity::IdentityId, row_number::RowNumber, r#type::Type},
32};
33
34use super::{
35 BulkInsertResult, RingBufferInsertResult, SeriesInsertResult, TableInsertResult,
36 validation::{
37 reorder_rows_unvalidated, reorder_rows_unvalidated_rb, reorder_rows_unvalidated_series,
38 validate_and_coerce_rows, validate_and_coerce_rows_rb, validate_and_coerce_rows_series,
39 },
40};
41use crate::{
42 Result,
43 bulk_insert::primitive::{
44 ringbuffer::{PendingRingBufferInsert, RingBufferInsertBuilder},
45 series::{PendingSeriesInsert, SeriesInsertBuilder},
46 table::{PendingTableInsert, TableInsertBuilder},
47 },
48 engine::StandardEngine,
49 transaction::operation::{
50 dictionary::DictionaryOperations, ringbuffer::RingBufferOperations, table::TableOperations,
51 },
52 vm::instruction::dml::{
53 primary_key,
54 shape::{get_or_create_ringbuffer_shape, get_or_create_series_shape, get_or_create_table_shape},
55 },
56};
57
58pub trait ValidationMode: sealed::Sealed + 'static {
59 const VALIDATED: bool;
60
61 fn run<F, R>(txn: &mut CommandTransaction, total_rows: usize, body: F) -> Result<R>
62 where
63 F: FnOnce(&mut CommandTransaction) -> Result<R>;
64}
65
66pub struct Validated;
67impl ValidationMode for Validated {
68 const VALIDATED: bool = true;
69
70 fn run<F, R>(txn: &mut CommandTransaction, total_rows: usize, body: F) -> Result<R>
71 where
72 F: FnOnce(&mut CommandTransaction) -> Result<R>,
73 {
74 run_checked(txn, total_rows, body)
75 }
76}
77
78pub struct Unchecked;
79impl ValidationMode for Unchecked {
80 const VALIDATED: bool = false;
81
82 fn run<F, R>(txn: &mut CommandTransaction, _total_rows: usize, body: F) -> Result<R>
83 where
84 F: FnOnce(&mut CommandTransaction) -> Result<R>,
85 {
86 txn.execute_bulk_unchecked(body)
87 }
88}
89
90fn run_checked<F, R>(txn: &mut CommandTransaction, total_rows: usize, body: F) -> Result<R>
91where
92 F: FnOnce(&mut CommandTransaction) -> Result<R>,
93{
94 if total_rows > 0 {
95 txn.reserve_writes(total_rows.saturating_mul(2))?;
96 }
97 let r = body(txn)?;
98 txn.commit()?;
99 Ok(r)
100}
101
102pub mod sealed {
103
104 use super::{Unchecked, Validated};
105 pub trait Sealed {}
106 impl Sealed for Validated {}
107 impl Sealed for Unchecked {}
108}
109
110pub struct BulkInsertBuilder<'e, V: ValidationMode = Validated> {
111 engine: &'e StandardEngine,
112 identity: IdentityId,
113 pending_tables: Vec<PendingTableInsert>,
114 pending_ringbuffers: Vec<PendingRingBufferInsert>,
115 pending_series: Vec<PendingSeriesInsert>,
116 _validation: PhantomData<V>,
117}
118
119impl<'e> BulkInsertBuilder<'e, Validated> {
120 pub(crate) fn new(engine: &'e StandardEngine, identity: IdentityId) -> Self {
121 Self {
122 engine,
123 identity,
124 pending_tables: Vec::new(),
125 pending_ringbuffers: Vec::new(),
126 pending_series: Vec::new(),
127 _validation: PhantomData,
128 }
129 }
130}
131
132impl<'e> BulkInsertBuilder<'e, Unchecked> {
133 pub(crate) fn new_unchecked(engine: &'e StandardEngine, identity: IdentityId) -> Self {
134 Self {
135 engine,
136 identity,
137 pending_tables: Vec::new(),
138 pending_ringbuffers: Vec::new(),
139 pending_series: Vec::new(),
140 _validation: PhantomData,
141 }
142 }
143}
144
145impl<'e, V: ValidationMode> BulkInsertBuilder<'e, V> {
146 pub fn table<'a>(&'a mut self, qualified_name: &str) -> TableInsertBuilder<'a, 'e, V> {
147 let (namespace, table) = parse_qualified_name(qualified_name);
148 TableInsertBuilder::new(self, namespace, table)
149 }
150
151 pub fn ringbuffer<'a>(&'a mut self, qualified_name: &str) -> RingBufferInsertBuilder<'a, 'e, V> {
152 let (namespace, ringbuffer) = parse_qualified_name(qualified_name);
153 RingBufferInsertBuilder::new(self, namespace, ringbuffer)
154 }
155
156 pub fn series<'a>(&'a mut self, qualified_name: &str) -> SeriesInsertBuilder<'a, 'e, V> {
157 let (namespace, series) = parse_qualified_name(qualified_name);
158 SeriesInsertBuilder::new(self, namespace, series)
159 }
160
161 pub(super) fn add_table_insert(&mut self, pending: PendingTableInsert) {
162 self.pending_tables.push(pending);
163 }
164
165 pub(super) fn add_ringbuffer_insert(&mut self, pending: PendingRingBufferInsert) {
166 self.pending_ringbuffers.push(pending);
167 }
168
169 pub(super) fn add_series_insert(&mut self, pending: PendingSeriesInsert) {
170 self.pending_series.push(pending);
171 }
172
173 pub fn execute(self) -> Result<BulkInsertResult> {
174 self.engine.reject_if_read_only()?;
175 let mut txn = self.engine.begin_command(self.identity)?;
176 let catalog = self.engine.catalog();
177 let clock = self.engine.clock();
178 let total_rows = self.total_pending_rows();
179 let pending_tables = self.pending_tables;
180 let pending_ringbuffers = self.pending_ringbuffers;
181 let pending_series = self.pending_series;
182
183 V::run(&mut txn, total_rows, move |txn| {
184 run_all_pending::<V>(catalog, clock, txn, pending_tables, pending_ringbuffers, pending_series)
185 })
186 }
187
188 #[inline]
189 fn total_pending_rows(&self) -> usize {
190 self.pending_tables.iter().map(|p| p.rows.len()).sum::<usize>()
191 + self.pending_ringbuffers.iter().map(|p| p.rows.len()).sum::<usize>()
192 + self.pending_series.iter().map(|p| p.rows.len()).sum::<usize>()
193 }
194}
195
196#[inline]
197fn run_all_pending<V: ValidationMode>(
198 catalog: Catalog,
199 clock: &Clock,
200 txn: &mut CommandTransaction,
201 pending_tables: Vec<PendingTableInsert>,
202 pending_ringbuffers: Vec<PendingRingBufferInsert>,
203 pending_series: Vec<PendingSeriesInsert>,
204) -> Result<BulkInsertResult> {
205 let mut result = BulkInsertResult::default();
206 for pending in pending_tables {
207 result.tables.push(execute_table_insert::<V>(&catalog, txn, &pending, clock)?);
208 }
209 for pending in pending_ringbuffers {
210 result.ringbuffers.push(execute_ringbuffer_insert::<V>(&catalog, txn, &pending, clock)?);
211 }
212 for pending in pending_series {
213 result.series.push(execute_series_insert::<V>(&catalog, txn, &pending, clock)?);
214 }
215 Ok(result)
216}
217
218fn execute_table_insert<V: ValidationMode>(
219 catalog: &Catalog,
220 txn: &mut CommandTransaction,
221 pending: &PendingTableInsert,
222 clock: &Clock,
223) -> Result<TableInsertResult> {
224 let table = resolve_table(catalog, txn, pending)?;
225 let shape = get_or_create_table_shape(catalog, &table, &mut Transaction::Command(txn))?;
226 let encoded_rows = encode_table_rows::<V>(catalog, txn, pending, &table, &shape, clock)?;
227 if encoded_rows.is_empty() {
228 return Ok(empty_table_result(pending));
229 }
230 write_table_rows(catalog, txn, &table, &shape, pending, encoded_rows)
231}
232
233#[inline]
234fn empty_table_result(pending: &PendingTableInsert) -> TableInsertResult {
235 TableInsertResult {
236 namespace: pending.namespace.clone(),
237 table: pending.table.clone(),
238 inserted: 0,
239 }
240}
241
242#[inline]
243fn write_table_rows(
244 catalog: &Catalog,
245 txn: &mut CommandTransaction,
246 table: &Table,
247 shape: &RowShape,
248 pending: &PendingTableInsert,
249 encoded_rows: Vec<EncodedRow>,
250) -> Result<TableInsertResult> {
251 let total_rows = encoded_rows.len();
252 let row_numbers = catalog.next_row_number_batch(txn, table.id, total_rows as u64)?;
253 let pk_def = primary_key::get_primary_key(catalog, &mut Transaction::Command(txn), table)?;
254 let row_number_shape = pk_def.as_ref().map(|_| RowShape::testing(&[Type::Uint8]));
255
256 let mut owned_rows = encoded_rows;
257 txn.insert_table(table, shape, &row_numbers, &mut owned_rows)?;
258
259 if let Some(ref pk_def) = pk_def {
260 for (row, &row_number) in owned_rows.iter().zip(row_numbers.iter()) {
261 write_primary_key_index(
262 txn,
263 table,
264 shape,
265 pk_def,
266 row,
267 row_number,
268 row_number_shape.as_ref().unwrap(),
269 )?;
270 }
271 }
272
273 Ok(TableInsertResult {
274 namespace: pending.namespace.clone(),
275 table: pending.table.clone(),
276 inserted: total_rows as u64,
277 })
278}
279
280fn resolve_table(catalog: &Catalog, txn: &mut CommandTransaction, pending: &PendingTableInsert) -> Result<Table> {
281 let namespace = catalog
282 .find_namespace_by_name(&mut Transaction::Command(txn), &pending.namespace)?
283 .ok_or_else(|| CatalogError::NotFound {
284 kind: CatalogObjectKind::Namespace,
285 namespace: pending.namespace.to_string(),
286 name: String::new(),
287 fragment: Fragment::None,
288 })?;
289
290 catalog.find_table_by_name(&mut Transaction::Command(txn), namespace.id(), &pending.table)?.ok_or_else(|| {
291 CatalogError::NotFound {
292 kind: CatalogObjectKind::Table,
293 namespace: pending.namespace.to_string(),
294 name: pending.table.to_string(),
295 fragment: Fragment::None,
296 }
297 .into()
298 })
299}
300
301fn encode_table_rows<V: ValidationMode>(
302 catalog: &Catalog,
303 txn: &mut CommandTransaction,
304 pending: &PendingTableInsert,
305 table: &Table,
306 shape: &RowShape,
307 clock: &Clock,
308) -> Result<Vec<EncodedRow>> {
309 let coerced_rows = coerce_table_rows::<V>(&pending.rows, table)?;
310 let mut encoded_rows = Vec::with_capacity(coerced_rows.len());
311 for values in coerced_rows {
312 encoded_rows.push(prepare_table_row::<V>(catalog, txn, table, shape, clock, values)?);
313 }
314 Ok(encoded_rows)
315}
316
317#[inline]
318fn coerce_table_rows<V: ValidationMode>(rows: &[Params], table: &Table) -> Result<Vec<Vec<Value>>> {
319 if V::VALIDATED {
320 validate_and_coerce_rows(rows, table)
321 } else {
322 reorder_rows_unvalidated(rows, table)
323 }
324}
325
326#[inline]
327fn prepare_table_row<V: ValidationMode>(
328 catalog: &Catalog,
329 txn: &mut CommandTransaction,
330 table: &Table,
331 shape: &RowShape,
332 clock: &Clock,
333 mut values: Vec<Value>,
334) -> Result<EncodedRow> {
335 fill_auto_increment_table(catalog, txn, table, &mut values)?;
336 dictionary_encode_table(catalog, txn, table, &mut values)?;
337 if V::VALIDATED {
338 validate_table_constraints(table, &values)?;
339 }
340 Ok(encode_row(shape, &values, clock))
341}
342
343#[inline]
344fn validate_table_constraints(table: &Table, values: &[Value]) -> Result<()> {
345 for (idx, col) in table.columns.iter().enumerate() {
346 col.constraint.validate(&values[idx])?;
347 }
348 Ok(())
349}
350
351fn fill_auto_increment_table(
352 catalog: &Catalog,
353 txn: &mut CommandTransaction,
354 table: &Table,
355 values: &mut [Value],
356) -> Result<()> {
357 for (idx, col) in table.columns.iter().enumerate() {
358 if col.auto_increment && matches!(values[idx], Value::None { .. }) {
359 values[idx] = catalog.column_sequence_next_value(txn, table.id, col.id)?;
360 }
361 }
362 Ok(())
363}
364
365fn dictionary_encode_table(
366 catalog: &Catalog,
367 txn: &mut CommandTransaction,
368 table: &Table,
369 values: &mut [Value],
370) -> Result<()> {
371 for (idx, col) in table.columns.iter().enumerate() {
372 if let Some(dict_id) = col.dictionary_id {
373 let dictionary =
374 catalog.find_dictionary(&mut Transaction::Command(txn), dict_id)?.ok_or_else(|| {
375 internal_error!("Dictionary {:?} not found for column {}", dict_id, col.name)
376 })?;
377 let entry_id = txn.insert_into_dictionary(&dictionary, &values[idx])?;
378 values[idx] = entry_id.to_value();
379 }
380 }
381 Ok(())
382}
383
384fn encode_row(shape: &RowShape, values: &[Value], clock: &Clock) -> EncodedRow {
385 let mut row = shape.allocate();
386 for (idx, value) in values.iter().enumerate() {
387 shape.set_value(&mut row, idx, value);
388 }
389 let now_nanos = clock.now_nanos();
390 row.set_timestamps(now_nanos, now_nanos);
391 row
392}
393
394fn write_primary_key_index(
395 txn: &mut CommandTransaction,
396 table: &Table,
397 shape: &RowShape,
398 pk_def: &PrimaryKey,
399 row: &EncodedRow,
400 row_number: RowNumber,
401 row_number_shape: &RowShape,
402) -> Result<()> {
403 let index_key = primary_key::encode_primary_key(pk_def, row, table, shape)?;
404 let index_entry_key = IndexEntryKey::new(table.id, IndexId::primary(pk_def.id), index_key);
405
406 if txn.contains_key(&index_entry_key.encode())? {
407 let key_columns = pk_def.columns.iter().map(|c| c.name.clone()).collect();
408 return Err(CoreError::PrimaryKeyViolation {
409 fragment: Fragment::None,
410 table_name: table.name.clone(),
411 key_columns,
412 }
413 .into());
414 }
415
416 let mut row_number_encoded = row_number_shape.allocate();
417 row_number_shape.set_u64(&mut row_number_encoded, 0, u64::from(row_number));
418 txn.set(&index_entry_key.encode(), row_number_encoded)?;
419 Ok(())
420}
421
422fn execute_ringbuffer_insert<V: ValidationMode>(
423 catalog: &Catalog,
424 txn: &mut CommandTransaction,
425 pending: &PendingRingBufferInsert,
426 clock: &Clock,
427) -> Result<RingBufferInsertResult> {
428 let ringbuffer = resolve_ringbuffer(catalog, txn, pending)?;
429 let mut metadata = load_ringbuffer_metadata(catalog, txn, pending, &ringbuffer)?;
430 let shape = get_or_create_ringbuffer_shape(catalog, &ringbuffer, &mut Transaction::Command(txn))?;
431 let coerced_rows = coerce_ringbuffer_rows::<V>(pending, &ringbuffer)?;
432 let inserted =
433 insert_ringbuffer_rows::<V>(catalog, txn, &ringbuffer, &shape, coerced_rows, &mut metadata, clock)?;
434 catalog.update_ringbuffer_metadata(txn, metadata)?;
435 Ok(RingBufferInsertResult {
436 namespace: pending.namespace.clone(),
437 ringbuffer: pending.ringbuffer.clone(),
438 inserted,
439 })
440}
441
442#[inline]
443fn resolve_ringbuffer(
444 catalog: &Catalog,
445 txn: &mut CommandTransaction,
446 pending: &PendingRingBufferInsert,
447) -> Result<RingBuffer> {
448 let namespace = catalog
449 .find_namespace_by_name(&mut Transaction::Command(txn), &pending.namespace)?
450 .ok_or_else(|| CatalogError::NotFound {
451 kind: CatalogObjectKind::Namespace,
452 namespace: pending.namespace.to_string(),
453 name: String::new(),
454 fragment: Fragment::None,
455 })?;
456
457 catalog.find_ringbuffer_by_name(&mut Transaction::Command(txn), namespace.id(), &pending.ringbuffer)?
458 .ok_or_else(|| {
459 CatalogError::NotFound {
460 kind: CatalogObjectKind::RingBuffer,
461 namespace: pending.namespace.to_string(),
462 name: pending.ringbuffer.to_string(),
463 fragment: Fragment::None,
464 }
465 .into()
466 })
467}
468
469#[inline]
470fn load_ringbuffer_metadata(
471 catalog: &Catalog,
472 txn: &mut CommandTransaction,
473 pending: &PendingRingBufferInsert,
474 ringbuffer: &RingBuffer,
475) -> Result<RingBufferMetadata> {
476 catalog.find_ringbuffer_metadata(&mut Transaction::Command(txn), ringbuffer.id)?.ok_or_else(|| {
477 CatalogError::NotFound {
478 kind: CatalogObjectKind::RingBuffer,
479 namespace: pending.namespace.to_string(),
480 name: pending.ringbuffer.to_string(),
481 fragment: Fragment::None,
482 }
483 .into()
484 })
485}
486
487#[inline]
488fn coerce_ringbuffer_rows<V: ValidationMode>(
489 pending: &PendingRingBufferInsert,
490 ringbuffer: &RingBuffer,
491) -> Result<Vec<Vec<Value>>> {
492 if V::VALIDATED {
493 validate_and_coerce_rows_rb(&pending.rows, ringbuffer)
494 } else {
495 reorder_rows_unvalidated_rb(&pending.rows, ringbuffer)
496 }
497}
498
499fn insert_ringbuffer_rows<V: ValidationMode>(
500 catalog: &Catalog,
501 txn: &mut CommandTransaction,
502 ringbuffer: &RingBuffer,
503 shape: &RowShape,
504 coerced_rows: Vec<Vec<Value>>,
505 metadata: &mut RingBufferMetadata,
506 clock: &Clock,
507) -> Result<u64> {
508 let mut inserted_count = 0u64;
509 for mut values in coerced_rows {
510 dict_encode_ringbuffer_row(catalog, txn, ringbuffer, &mut values)?;
511
512 if V::VALIDATED {
513 for (idx, col) in ringbuffer.columns.iter().enumerate() {
514 col.constraint.validate(&values[idx])?;
515 }
516 }
517
518 let mut row = shape.allocate();
519 for (idx, value) in values.iter().enumerate() {
520 shape.set_value(&mut row, idx, value);
521 }
522 let now_nanos = clock.now_nanos();
523 row.set_timestamps(now_nanos, now_nanos);
524
525 evict_oldest_if_full(txn, ringbuffer, metadata)?;
526
527 let row_number = catalog.next_row_number_for_ringbuffer(txn, ringbuffer.id)?;
528 txn.insert_ringbuffer_at(ringbuffer, shape, row_number, row)?;
529
530 if metadata.is_empty() {
531 metadata.head = row_number.0;
532 }
533 metadata.count += 1;
534 metadata.tail = row_number.0 + 1;
535
536 inserted_count += 1;
537 }
538 Ok(inserted_count)
539}
540
541#[inline]
542fn evict_oldest_if_full(
543 txn: &mut CommandTransaction,
544 ringbuffer: &RingBuffer,
545 metadata: &mut RingBufferMetadata,
546) -> Result<()> {
547 if metadata.is_full() {
548 let oldest_row = RowNumber(metadata.head);
549 txn.remove_from_ringbuffer(ringbuffer, oldest_row)?;
550 metadata.head += 1;
551 metadata.count -= 1;
552 }
553 Ok(())
554}
555
556#[inline]
557fn dict_encode_ringbuffer_row(
558 catalog: &Catalog,
559 txn: &mut CommandTransaction,
560 ringbuffer: &RingBuffer,
561 values: &mut [Value],
562) -> Result<()> {
563 for (idx, col) in ringbuffer.columns.iter().enumerate() {
564 if let Some(dict_id) = col.dictionary_id {
565 let dictionary =
566 catalog.find_dictionary(&mut Transaction::Command(txn), dict_id)?.ok_or_else(|| {
567 internal_error!("Dictionary {:?} not found for column {}", dict_id, col.name)
568 })?;
569 let entry_id = txn.insert_into_dictionary(&dictionary, &values[idx])?;
570 values[idx] = entry_id.to_value();
571 }
572 }
573 Ok(())
574}
575
576fn execute_series_insert<V: ValidationMode>(
577 catalog: &Catalog,
578 txn: &mut CommandTransaction,
579 pending: &PendingSeriesInsert,
580 clock: &Clock,
581) -> Result<SeriesInsertResult> {
582 let series = resolve_series(catalog, txn, pending)?;
583 let mut metadata = load_series_metadata(catalog, txn, pending, &series)?;
584 let shape = get_or_create_series_shape(catalog, &series, &mut Transaction::Command(txn))?;
585 let coerced_rows = coerce_series_rows::<V>(pending, &series)?;
586 let inserted = insert_series_rows::<V>(txn, &series, &shape, coerced_rows, &mut metadata, clock)?;
587 catalog.update_series_metadata_txn(&mut Transaction::Command(txn), metadata)?;
588 Ok(SeriesInsertResult {
589 namespace: pending.namespace.clone(),
590 series: pending.series.clone(),
591 inserted,
592 })
593}
594
595#[inline]
596fn resolve_series(catalog: &Catalog, txn: &mut CommandTransaction, pending: &PendingSeriesInsert) -> Result<Series> {
597 let namespace = catalog
598 .find_namespace_by_name(&mut Transaction::Command(txn), &pending.namespace)?
599 .ok_or_else(|| CatalogError::NotFound {
600 kind: CatalogObjectKind::Namespace,
601 namespace: pending.namespace.to_string(),
602 name: String::new(),
603 fragment: Fragment::None,
604 })?;
605
606 catalog.find_series_by_name(&mut Transaction::Command(txn), namespace.id(), &pending.series)?.ok_or_else(|| {
607 CatalogError::NotFound {
608 kind: CatalogObjectKind::Series,
609 namespace: pending.namespace.to_string(),
610 name: pending.series.to_string(),
611 fragment: Fragment::None,
612 }
613 .into()
614 })
615}
616
617#[inline]
618fn load_series_metadata(
619 catalog: &Catalog,
620 txn: &mut CommandTransaction,
621 pending: &PendingSeriesInsert,
622 series: &Series,
623) -> Result<SeriesMetadata> {
624 catalog.find_series_metadata(&mut Transaction::Command(txn), series.id)?.ok_or_else(|| {
625 CatalogError::NotFound {
626 kind: CatalogObjectKind::Series,
627 namespace: pending.namespace.to_string(),
628 name: pending.series.to_string(),
629 fragment: Fragment::None,
630 }
631 .into()
632 })
633}
634
635#[inline]
636fn coerce_series_rows<V: ValidationMode>(pending: &PendingSeriesInsert, series: &Series) -> Result<Vec<Vec<Value>>> {
637 if V::VALIDATED {
638 validate_and_coerce_rows_series(&pending.rows, series)
639 } else {
640 reorder_rows_unvalidated_series(&pending.rows, series)
641 }
642}
643
644fn insert_series_rows<V: ValidationMode>(
645 txn: &mut CommandTransaction,
646 series: &Series,
647 shape: &RowShape,
648 coerced_rows: Vec<Vec<Value>>,
649 metadata: &mut SeriesMetadata,
650 clock: &Clock,
651) -> Result<u64> {
652 let key_col_name = series.key.column();
653 let key_col_idx =
654 series.columns.iter().position(|c| c.name == key_col_name).ok_or_else(|| {
655 internal_error!("series {} key column {} not found", series.name, key_col_name)
656 })?;
657
658 let mut inserted_count = 0u64;
659 for values in coerced_rows {
660 if V::VALIDATED {
661 for (idx, col) in series.columns.iter().enumerate() {
662 col.constraint.validate(&values[idx])?;
663 }
664 }
665
666 let key_value = series.key_to_u64(values[key_col_idx].clone()).unwrap_or(0);
667
668 metadata.sequence_counter += 1;
669 let sequence = metadata.sequence_counter;
670 let row_key = SeriesRowKey {
671 series: series.id,
672 variant_tag: None,
673 key: key_value,
674 sequence,
675 };
676 let encoded_key = row_key.encode();
677
678 let row = encode_series_row(series, shape, key_value, &values, key_col_idx, clock);
679
680 let mut rows_buf = [row];
681 SeriesRowInterceptor::pre_insert(txn, series, &mut rows_buf)?;
682 let [row] = rows_buf;
683 txn.set(&encoded_key, row.clone())?;
684 let rows = [row.clone()];
685 SeriesRowInterceptor::post_insert(txn, series, &rows)?;
686
687 update_series_metadata_for_insert(metadata, key_value);
688 inserted_count += 1;
689 }
690 Ok(inserted_count)
691}
692
693#[inline]
694fn encode_series_row(
695 series: &Series,
696 shape: &RowShape,
697 key_value: u64,
698 values: &[Value],
699 key_col_idx: usize,
700 clock: &Clock,
701) -> EncodedRow {
702 let key_value_encoded = series.key_from_u64(key_value);
703 let mut row = shape.allocate();
704 shape.set_value(&mut row, 0, &key_value_encoded);
705 let mut shape_idx = 1;
706 for (col_idx, value) in values.iter().enumerate() {
707 if col_idx == key_col_idx {
708 continue;
709 }
710 shape.set_value(&mut row, shape_idx, value);
711 shape_idx += 1;
712 }
713 let now_nanos = clock.now_nanos();
714 row.set_timestamps(now_nanos, now_nanos);
715 row
716}
717
718#[inline]
719fn update_series_metadata_for_insert(metadata: &mut SeriesMetadata, key_value: u64) {
720 if metadata.row_count == 0 {
721 metadata.oldest_key = key_value;
722 metadata.newest_key = key_value;
723 } else {
724 if key_value < metadata.oldest_key {
725 metadata.oldest_key = key_value;
726 }
727 if key_value > metadata.newest_key {
728 metadata.newest_key = key_value;
729 }
730 }
731 metadata.row_count += 1;
732}
733
734fn parse_qualified_name(qualified_name: &str) -> (String, String) {
735 if let Some((ns, name)) = qualified_name.rsplit_once("::") {
736 (ns.to_string(), name.to_string())
737 } else {
738 ("default".to_string(), qualified_name.to_string())
739 }
740}
741
742#[cfg(test)]
743mod tests {
744 use super::*;
745
746 #[test]
747 fn parse_qualified_name_simple() {
748 assert_eq!(parse_qualified_name("table"), ("default".to_string(), "table".to_string()));
749 }
750
751 #[test]
752 fn parse_qualified_name_single_namespace() {
753 assert_eq!(parse_qualified_name("ns::table"), ("ns".to_string(), "table".to_string()));
754 }
755
756 #[test]
757 fn parse_qualified_name_nested_namespace() {
758 assert_eq!(parse_qualified_name("a::b::table"), ("a::b".to_string(), "table".to_string()));
759 }
760
761 #[test]
762 fn parse_qualified_name_deeply_nested_namespace() {
763 assert_eq!(parse_qualified_name("a::b::c::table"), ("a::b::c".to_string(), "table".to_string()));
764 }
765
766 #[test]
767 fn parse_qualified_name_empty_string() {
768 assert_eq!(parse_qualified_name(""), ("default".to_string(), "".to_string()));
769 }
770}