use std::sync::Arc;
use reifydb_core::{
error::diagnostic::catalog::{dictionary_not_found, namespace_not_found},
interface::catalog::policy::PolicyTargetType,
value::column::{Column, columns::Columns, data::ColumnData},
};
use reifydb_rql::nodes::InsertDictionaryNode;
use reifydb_transaction::transaction::Transaction;
use reifydb_type::{
fragment::Fragment,
params::Params,
return_error,
value::{Value, dictionary::DictionaryEntryId, identity::IdentityId, r#type::Type},
};
use super::returning::evaluate_returning;
use crate::{
Result,
policy::PolicyEvaluator,
transaction::operation::dictionary::DictionaryOperations,
vm::{
services::Services,
stack::SymbolTable,
volcano::{
compile::compile,
query::{QueryContext, QueryNode},
},
},
};
pub(crate) fn insert_dictionary(
services: &Arc<Services>,
txn: &mut Transaction<'_>,
plan: InsertDictionaryNode,
symbols: &mut SymbolTable,
) -> Result<Columns> {
let namespace_name = plan.target.namespace().name();
let Some(namespace) = services.catalog.find_namespace_by_name(txn, namespace_name)? else {
return_error!(namespace_not_found(Fragment::internal(namespace_name), namespace_name));
};
let dictionary_name = plan.target.name();
let Some(dictionary) = services.catalog.find_dictionary_by_name(txn, namespace.id(), dictionary_name)? else {
let fragment = plan.target.identifier().clone();
return_error!(dictionary_not_found(fragment.clone(), namespace_name, dictionary_name,));
};
let execution_context = Arc::new(QueryContext {
services: services.clone(),
source: None,
batch_size: 1024,
params: Params::None,
symbols: symbols.clone(),
identity: IdentityId::root(),
});
let mut input_node = compile(*plan.input, txn, execution_context.clone());
input_node.initialize(txn, &execution_context)?;
let mut ids: Vec<Value> = Vec::new();
let mut values: Vec<Value> = Vec::new();
let mut mutable_context = (*execution_context).clone();
while let Some(columns) = input_node.next(txn, &mut mutable_context)? {
PolicyEvaluator::new(services, symbols).enforce_write_policies(
txn,
namespace_name,
dictionary_name,
"insert",
&columns,
PolicyTargetType::Dictionary,
)?;
let row_count = columns.row_count();
for row_idx in 0..row_count {
let value = if let Some(value_column) = columns.iter().find(|col| col.name() == "value") {
value_column.data().get_value(row_idx)
} else if let Some(first_column) = columns.iter().next() {
first_column.data().get_value(row_idx)
} else {
Value::none()
};
if matches!(value, Value::None { .. }) {
continue;
}
let coerced_value = coerce_value_to_dictionary_type(value, dictionary.value_type.clone())?;
let entry_id = txn.insert_into_dictionary(&dictionary, &coerced_value)?;
let id_value = match entry_id {
DictionaryEntryId::U1(v) => Value::Uint1(v),
DictionaryEntryId::U2(v) => Value::Uint2(v),
DictionaryEntryId::U4(v) => Value::Uint4(v),
DictionaryEntryId::U8(v) => Value::Uint8(v),
DictionaryEntryId::U16(v) => Value::Uint16(v),
};
ids.push(id_value);
values.push(coerced_value);
}
}
if let Some(returning_exprs) = &plan.returning {
if ids.is_empty() {
return evaluate_returning(services, symbols, returning_exprs, Columns::empty());
}
let id_column = build_id_column(&ids, dictionary.id_type)?;
let value_column = build_value_column(&values, dictionary.value_type)?;
let columns = Columns::new(vec![id_column, value_column]);
return evaluate_returning(services, symbols, returning_exprs, columns);
}
if ids.is_empty() {
return Ok(Columns::new(vec![
Column {
name: Fragment::internal("namespace"),
data: ColumnData::utf8(vec![namespace.name()]),
},
Column {
name: Fragment::internal("dictionary"),
data: ColumnData::utf8(vec![dictionary.name.clone()]),
},
Column {
name: Fragment::internal("inserted"),
data: ColumnData::uint8(vec![0]),
},
]));
}
let id_column = build_id_column(&ids, dictionary.id_type)?;
let value_column = build_value_column(&values, dictionary.value_type)?;
Ok(Columns::new(vec![
Column {
name: Fragment::internal("namespace"),
data: ColumnData::utf8(vec![namespace.name(); ids.len()]),
},
Column {
name: Fragment::internal("dictionary"),
data: ColumnData::utf8(vec![dictionary.name.clone(); ids.len()]),
},
id_column,
value_column,
]))
}
fn coerce_value_to_dictionary_type(value: Value, target_type: Type) -> Result<Value> {
match (&value, target_type) {
(Value::Utf8(_), Type::Utf8) => Ok(value),
(Value::Int1(_), Type::Int1) => Ok(value),
(Value::Int2(_), Type::Int2) => Ok(value),
(Value::Int4(_), Type::Int4) => Ok(value),
(Value::Int8(_), Type::Int8) => Ok(value),
(Value::Int16(_), Type::Int16) => Ok(value),
(Value::Uint1(_), Type::Uint1) => Ok(value),
(Value::Uint2(_), Type::Uint2) => Ok(value),
(Value::Uint4(_), Type::Uint4) => Ok(value),
(Value::Uint8(_), Type::Uint8) => Ok(value),
(Value::Uint16(_), Type::Uint16) => Ok(value),
(Value::Float4(_), Type::Float4) => Ok(value),
(Value::Float8(_), Type::Float8) => Ok(value),
(Value::Boolean(_), Type::Boolean) => Ok(value),
(Value::Date(_), Type::Date) => Ok(value),
(Value::DateTime(_), Type::DateTime) => Ok(value),
(Value::Time(_), Type::Time) => Ok(value),
(Value::Duration(_), Type::Duration) => Ok(value),
(Value::Uuid4(_), Type::Uuid4) => Ok(value),
(Value::Uuid7(_), Type::Uuid7) => Ok(value),
(Value::Blob(_), Type::Blob) => Ok(value),
(Value::Decimal(_), Type::Decimal) => Ok(value),
_ => {
Ok(value)
}
}
}
fn build_id_column(ids: &[Value], id_type: Type) -> Result<Column> {
let data = match id_type {
Type::Uint1 => {
let vals: Vec<u8> = ids
.iter()
.map(|v| match v {
Value::Uint1(n) => *n,
_ => 0,
})
.collect();
ColumnData::uint1(vals)
}
Type::Uint2 => {
let vals: Vec<u16> = ids
.iter()
.map(|v| match v {
Value::Uint2(n) => *n,
_ => 0,
})
.collect();
ColumnData::uint2(vals)
}
Type::Uint4 => {
let vals: Vec<u32> = ids
.iter()
.map(|v| match v {
Value::Uint4(n) => *n,
_ => 0,
})
.collect();
ColumnData::uint4(vals)
}
Type::Uint8 => {
let vals: Vec<u64> = ids
.iter()
.map(|v| match v {
Value::Uint8(n) => *n,
_ => 0,
})
.collect();
ColumnData::uint8(vals)
}
Type::Uint16 => {
let vals: Vec<u128> = ids
.iter()
.map(|v| match v {
Value::Uint16(n) => *n,
_ => 0,
})
.collect();
ColumnData::uint16(vals)
}
_ => {
let vals: Vec<u64> = ids
.iter()
.map(|v| match v {
Value::Uint8(n) => *n,
_ => 0,
})
.collect();
ColumnData::uint8(vals)
}
};
Ok(Column {
name: Fragment::internal("id"),
data,
})
}
fn build_value_column(values: &[Value], value_type: Type) -> Result<Column> {
let data = match value_type {
Type::Utf8 => {
let vals: Vec<String> = values
.iter()
.map(|v| match v {
Value::Utf8(s) => s.clone(),
_ => format!("{:?}", v),
})
.collect();
ColumnData::utf8(vals)
}
Type::Int1 => {
let vals: Vec<i8> = values
.iter()
.map(|v| match v {
Value::Int1(n) => *n,
_ => 0,
})
.collect();
ColumnData::int1(vals)
}
Type::Int2 => {
let vals: Vec<i16> = values
.iter()
.map(|v| match v {
Value::Int2(n) => *n,
_ => 0,
})
.collect();
ColumnData::int2(vals)
}
Type::Int4 => {
let vals: Vec<i32> = values
.iter()
.map(|v| match v {
Value::Int4(n) => *n,
_ => 0,
})
.collect();
ColumnData::int4(vals)
}
Type::Int8 => {
let vals: Vec<i64> = values
.iter()
.map(|v| match v {
Value::Int8(n) => *n,
_ => 0,
})
.collect();
ColumnData::int8(vals)
}
Type::Uint1 => {
let vals: Vec<u8> = values
.iter()
.map(|v| match v {
Value::Uint1(n) => *n,
_ => 0,
})
.collect();
ColumnData::uint1(vals)
}
Type::Uint2 => {
let vals: Vec<u16> = values
.iter()
.map(|v| match v {
Value::Uint2(n) => *n,
_ => 0,
})
.collect();
ColumnData::uint2(vals)
}
Type::Uint4 => {
let vals: Vec<u32> = values
.iter()
.map(|v| match v {
Value::Uint4(n) => *n,
_ => 0,
})
.collect();
ColumnData::uint4(vals)
}
Type::Uint8 => {
let vals: Vec<u64> = values
.iter()
.map(|v| match v {
Value::Uint8(n) => *n,
_ => 0,
})
.collect();
ColumnData::uint8(vals)
}
_ => {
let vals: Vec<String> = values.iter().map(|v| format!("{:?}", v)).collect();
ColumnData::utf8(vals)
}
};
Ok(Column {
name: Fragment::internal("value"),
data,
})
}