use std::{any, marker::PhantomData};
use any::TypeId;
use reifydb_catalog::{
catalog::Catalog,
error::{CatalogError, CatalogObjectKind},
};
use reifydb_core::{
encoded::shape::RowShape,
error::CoreError,
interface::catalog::id::IndexId,
internal_error,
key::{EncodableKey, index_entry::IndexEntryKey},
};
use reifydb_runtime::context::clock::Clock;
use reifydb_transaction::transaction::{Transaction, command::CommandTransaction};
use reifydb_type::{
fragment::Fragment,
value::{Value, identity::IdentityId, row_number::RowNumber, r#type::Type},
};
use super::{
BulkInsertResult, RingBufferInsertResult, TableInsertResult,
validation::{
reorder_rows_trusted, reorder_rows_trusted_rb, validate_and_coerce_rows, validate_and_coerce_rows_rb,
},
};
use crate::{
Result,
bulk_insert::primitive::{
ringbuffer::{PendingRingBufferInsert, RingBufferInsertBuilder},
table::{PendingTableInsert, TableInsertBuilder},
},
engine::StandardEngine,
transaction::operation::{
dictionary::DictionaryOperations, ringbuffer::RingBufferOperations, table::TableOperations,
},
vm::instruction::dml::{
primary_key,
shape::{get_or_create_ringbuffer_shape, get_or_create_table_shape},
},
};
pub trait ValidationMode: sealed::Sealed + 'static {}
pub struct Validated;
impl ValidationMode for Validated {}
pub struct Trusted;
impl ValidationMode for Trusted {}
pub mod sealed {
use super::{Trusted, Validated};
pub trait Sealed {}
impl Sealed for Validated {}
impl Sealed for Trusted {}
}
pub struct BulkInsertBuilder<'e, V: ValidationMode = Validated> {
engine: &'e StandardEngine,
identity: IdentityId,
pending_tables: Vec<PendingTableInsert>,
pending_ringbuffers: Vec<PendingRingBufferInsert>,
_validation: PhantomData<V>,
}
impl<'e> BulkInsertBuilder<'e, Validated> {
pub(crate) fn new(engine: &'e StandardEngine, identity: IdentityId) -> Self {
Self {
engine,
identity,
pending_tables: Vec::new(),
pending_ringbuffers: Vec::new(),
_validation: PhantomData,
}
}
}
impl<'e> BulkInsertBuilder<'e, Trusted> {
pub(crate) fn new_trusted(engine: &'e StandardEngine, identity: IdentityId) -> Self {
Self {
engine,
identity,
pending_tables: Vec::new(),
pending_ringbuffers: Vec::new(),
_validation: PhantomData,
}
}
}
impl<'e, V: ValidationMode> BulkInsertBuilder<'e, V> {
pub fn table<'a>(&'a mut self, qualified_name: &str) -> TableInsertBuilder<'a, 'e, V> {
let (namespace, table) = parse_qualified_name(qualified_name);
TableInsertBuilder::new(self, namespace, table)
}
pub fn ringbuffer<'a>(&'a mut self, qualified_name: &str) -> RingBufferInsertBuilder<'a, 'e, V> {
let (namespace, ringbuffer) = parse_qualified_name(qualified_name);
RingBufferInsertBuilder::new(self, namespace, ringbuffer)
}
pub(super) fn add_table_insert(&mut self, pending: PendingTableInsert) {
self.pending_tables.push(pending);
}
pub(super) fn add_ringbuffer_insert(&mut self, pending: PendingRingBufferInsert) {
self.pending_ringbuffers.push(pending);
}
pub fn execute(self) -> Result<BulkInsertResult> {
self.engine.reject_if_read_only()?;
let mut txn = self.engine.begin_command(self.identity)?;
let catalog = self.engine.catalog();
let clock = self.engine.clock();
let mut result = BulkInsertResult::default();
for pending in self.pending_tables {
let table_result =
execute_table_insert(&catalog, &mut txn, &pending, TypeId::of::<V>(), clock)?;
result.tables.push(table_result);
}
for pending in self.pending_ringbuffers {
let rb_result =
execute_ringbuffer_insert(&catalog, &mut txn, &pending, TypeId::of::<V>(), clock)?;
result.ringbuffers.push(rb_result);
}
txn.commit()?;
Ok(result)
}
}
fn execute_table_insert(
catalog: &Catalog,
txn: &mut CommandTransaction,
pending: &PendingTableInsert,
type_id: TypeId,
clock: &Clock,
) -> Result<TableInsertResult> {
let namespace = catalog
.find_namespace_by_name(&mut Transaction::Command(txn), &pending.namespace)?
.ok_or_else(|| CatalogError::NotFound {
kind: CatalogObjectKind::Namespace,
namespace: pending.namespace.to_string(),
name: String::new(),
fragment: Fragment::None,
})?;
let table = catalog
.find_table_by_name(&mut Transaction::Command(txn), namespace.id(), &pending.table)?
.ok_or_else(|| CatalogError::NotFound {
kind: CatalogObjectKind::Table,
namespace: pending.namespace.to_string(),
name: pending.table.to_string(),
fragment: Fragment::None,
})?;
let shape = get_or_create_table_shape(catalog, &table, &mut Transaction::Command(txn))?;
let is_validated = type_id == TypeId::of::<Validated>();
let coerced_rows = if is_validated {
validate_and_coerce_rows(&pending.rows, &table)?
} else {
reorder_rows_trusted(&pending.rows, &table)?
};
let mut encoded_rows = Vec::with_capacity(coerced_rows.len());
for mut values in coerced_rows {
for (idx, col) in table.columns.iter().enumerate() {
if col.auto_increment && matches!(values[idx], Value::None { .. }) {
values[idx] = catalog.column_sequence_next_value(txn, table.id, col.id)?;
}
}
for (idx, col) in table.columns.iter().enumerate() {
if let Some(dict_id) = col.dictionary_id {
let dictionary = catalog
.find_dictionary(&mut Transaction::Command(txn), dict_id)?
.ok_or_else(|| {
internal_error!(
"Dictionary {:?} not found for column {}",
dict_id,
col.name
)
})?;
let entry_id = txn.insert_into_dictionary(&dictionary, &values[idx])?;
values[idx] = entry_id.to_value();
}
}
if is_validated {
for (idx, col) in table.columns.iter().enumerate() {
col.constraint.validate(&values[idx])?;
}
}
let mut row = shape.allocate();
for (idx, value) in values.iter().enumerate() {
shape.set_value(&mut row, idx, value);
}
let now_nanos = clock.now_nanos();
row.set_timestamps(now_nanos, now_nanos);
encoded_rows.push(row);
}
let total_rows = encoded_rows.len();
if total_rows == 0 {
return Ok(TableInsertResult {
namespace: pending.namespace.clone(),
table: pending.table.clone(),
inserted: 0,
});
}
let row_numbers = catalog.next_row_number_batch(txn, table.id, total_rows as u64)?;
let pk_def = primary_key::get_primary_key(catalog, &mut Transaction::Command(txn), &table)?;
let row_number_shape = if pk_def.is_some() {
Some(RowShape::testing(&[Type::Uint8]))
} else {
None
};
for (row, &row_number) in encoded_rows.iter().zip(row_numbers.iter()) {
txn.insert_table(&table, &shape, row.clone(), row_number)?;
if let Some(ref pk_def) = pk_def {
let index_key = primary_key::encode_primary_key(pk_def, row, &table, &shape)?;
let index_entry_key =
IndexEntryKey::new(table.id, IndexId::primary(pk_def.id), index_key.clone());
if txn.contains_key(&index_entry_key.encode())? {
let key_columns = pk_def.columns.iter().map(|c| c.name.clone()).collect();
return Err(CoreError::PrimaryKeyViolation {
fragment: Fragment::None,
table_name: table.name.clone(),
key_columns,
}
.into());
}
let rns = row_number_shape.as_ref().unwrap();
let mut row_number_encoded = rns.allocate();
rns.set_u64(&mut row_number_encoded, 0, u64::from(row_number));
txn.set(&index_entry_key.encode(), row_number_encoded)?;
}
}
Ok(TableInsertResult {
namespace: pending.namespace.clone(),
table: pending.table.clone(),
inserted: total_rows as u64,
})
}
fn execute_ringbuffer_insert(
catalog: &Catalog,
txn: &mut CommandTransaction,
pending: &PendingRingBufferInsert,
type_id: TypeId,
clock: &Clock,
) -> Result<RingBufferInsertResult> {
let namespace = catalog
.find_namespace_by_name(&mut Transaction::Command(txn), &pending.namespace)?
.ok_or_else(|| CatalogError::NotFound {
kind: CatalogObjectKind::Namespace,
namespace: pending.namespace.to_string(),
name: String::new(),
fragment: Fragment::None,
})?;
let ringbuffer = catalog
.find_ringbuffer_by_name(&mut Transaction::Command(txn), namespace.id(), &pending.ringbuffer)?
.ok_or_else(|| CatalogError::NotFound {
kind: CatalogObjectKind::RingBuffer,
namespace: pending.namespace.to_string(),
name: pending.ringbuffer.to_string(),
fragment: Fragment::None,
})?;
let mut metadata = catalog
.find_ringbuffer_metadata(&mut Transaction::Command(txn), ringbuffer.id)?
.ok_or_else(|| CatalogError::NotFound {
kind: CatalogObjectKind::RingBuffer,
namespace: pending.namespace.to_string(),
name: pending.ringbuffer.to_string(),
fragment: Fragment::None,
})?;
let shape = get_or_create_ringbuffer_shape(catalog, &ringbuffer, &mut Transaction::Command(txn))?;
let is_validated = type_id == TypeId::of::<Validated>();
let coerced_rows = if is_validated {
validate_and_coerce_rows_rb(&pending.rows, &ringbuffer)?
} else {
reorder_rows_trusted_rb(&pending.rows, &ringbuffer)?
};
let mut inserted_count = 0u64;
for mut values in coerced_rows {
for (idx, col) in ringbuffer.columns.iter().enumerate() {
if let Some(dict_id) = col.dictionary_id {
let dictionary = catalog
.find_dictionary(&mut Transaction::Command(txn), dict_id)?
.ok_or_else(|| {
internal_error!(
"Dictionary {:?} not found for column {}",
dict_id,
col.name
)
})?;
let entry_id = txn.insert_into_dictionary(&dictionary, &values[idx])?;
values[idx] = entry_id.to_value();
}
}
if is_validated {
for (idx, col) in ringbuffer.columns.iter().enumerate() {
col.constraint.validate(&values[idx])?;
}
}
let mut row = shape.allocate();
for (idx, value) in values.iter().enumerate() {
shape.set_value(&mut row, idx, value);
}
let now_nanos = clock.now_nanos();
row.set_timestamps(now_nanos, now_nanos);
if metadata.is_full() {
let oldest_row = RowNumber(metadata.head);
txn.remove_from_ringbuffer(&ringbuffer, oldest_row)?;
metadata.head += 1;
metadata.count -= 1;
}
let row_number = catalog.next_row_number_for_ringbuffer(txn, ringbuffer.id)?;
txn.insert_ringbuffer_at(&ringbuffer, &shape, row_number, row)?;
if metadata.is_empty() {
metadata.head = row_number.0;
}
metadata.count += 1;
metadata.tail = row_number.0 + 1;
inserted_count += 1;
}
catalog.update_ringbuffer_metadata(txn, metadata)?;
Ok(RingBufferInsertResult {
namespace: pending.namespace.clone(),
ringbuffer: pending.ringbuffer.clone(),
inserted: inserted_count,
})
}
fn parse_qualified_name(qualified_name: &str) -> (String, String) {
if let Some((ns, name)) = qualified_name.rsplit_once("::") {
(ns.to_string(), name.to_string())
} else {
("default".to_string(), qualified_name.to_string())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn parse_qualified_name_simple() {
assert_eq!(parse_qualified_name("table"), ("default".to_string(), "table".to_string()));
}
#[test]
fn parse_qualified_name_single_namespace() {
assert_eq!(parse_qualified_name("ns::table"), ("ns".to_string(), "table".to_string()));
}
#[test]
fn parse_qualified_name_nested_namespace() {
assert_eq!(parse_qualified_name("a::b::table"), ("a::b".to_string(), "table".to_string()));
}
#[test]
fn parse_qualified_name_deeply_nested_namespace() {
assert_eq!(parse_qualified_name("a::b::c::table"), ("a::b::c".to_string(), "table".to_string()));
}
#[test]
fn parse_qualified_name_empty_string() {
assert_eq!(parse_qualified_name(""), ("default".to_string(), "".to_string()));
}
}