use std::sync::Arc;
use reifydb_core::{
common::CommitVersion,
encoded::row::EncodedRow,
error::diagnostic::catalog::{namespace_not_found, series_not_found},
interface::{
catalog::{
policy::{DataOp, PolicyTargetType},
series::{SeriesKey, TimestampPrecision},
shape::ShapeId,
},
change::{Change, ChangeOrigin, Diff},
resolved::{ResolvedNamespace, ResolvedSeries, ResolvedShape},
},
key::{EncodableKey, series_row::SeriesRowKey},
value::column::{Column, columns::Columns, data::ColumnData},
};
use reifydb_rql::nodes::InsertSeriesNode;
use reifydb_transaction::{interceptor::series_row::SeriesRowInterceptor, transaction::Transaction};
use reifydb_type::{
fragment::Fragment,
params::Params,
return_error,
util::cowvec::CowVec,
value::{Value, datetime::DateTime, identity::IdentityId, row_number::RowNumber},
};
use tracing::instrument;
use super::{
returning::{decode_rows_to_columns, evaluate_returning},
shape::get_or_create_series_shape,
};
use crate::{
Result,
policy::PolicyEvaluator,
vm::{
services::Services,
stack::SymbolTable,
volcano::{
compile::compile,
query::{QueryContext, QueryNode},
},
},
};
#[instrument(name = "mutate::series::insert", level = "trace", skip_all)]
pub(crate) fn insert_series(
services: &Arc<Services>,
txn: &mut Transaction<'_>,
plan: InsertSeriesNode,
params: Params,
symbols: &SymbolTable,
) -> Result<Columns> {
let namespace_name = plan.target.namespace().name();
let Some(namespace) = services.catalog.find_namespace_by_name(txn, namespace_name)? else {
return_error!(namespace_not_found(Fragment::internal(namespace_name), namespace_name));
};
let series_name = plan.target.name();
let Some(series) = services.catalog.find_series_by_name(txn, namespace.id(), series_name)? else {
let fragment = Fragment::internal(plan.target.name());
return_error!(series_not_found(fragment, namespace_name, series_name));
};
let Some(mut metadata) = services.catalog.find_series_metadata(txn, series.id)? else {
let fragment = Fragment::internal(plan.target.name());
return_error!(series_not_found(fragment, namespace_name, series_name));
};
let has_tag = series.tag.is_some();
let key = &series.key;
let key_column_name = series.key.column();
let namespace_ident = Fragment::internal(namespace.name());
let resolved_namespace = ResolvedNamespace::new(namespace_ident, namespace.clone());
let series_ident = Fragment::internal(series.name.clone());
let resolved_series = ResolvedSeries::new(series_ident, resolved_namespace, series.clone());
let resolved_source = Some(ResolvedShape::Series(resolved_series));
let execution_context = Arc::new(QueryContext {
services: services.clone(),
source: resolved_source,
batch_size: 1024,
params: params.clone(),
symbols: symbols.clone(),
identity: IdentityId::root(),
});
let mut input_node = compile(*plan.input, txn, execution_context.clone());
let mut inserted_count = 0u64;
let mut returned_rows: Vec<(RowNumber, EncodedRow)> = if plan.returning.is_some() {
Vec::with_capacity(16)
} else {
Vec::new()
};
input_node.initialize(txn, &execution_context)?;
let shape = get_or_create_series_shape(&services.catalog, &series, txn)?;
let mut mutable_context = (*execution_context).clone();
while let Some(columns) = input_node.next(txn, &mut mutable_context)? {
PolicyEvaluator::new(services, symbols).enforce_write_policies(
txn,
namespace_name,
series_name,
DataOp::Insert,
&columns,
PolicyTargetType::Series,
)?;
let row_count = columns.row_count();
for row_idx in 0..row_count {
let key_value: u64 = if let Some(key_col) =
columns.iter().find(|col| col.name().text() == key_column_name)
{
match series.key_to_u64(key_col.data().get_value(row_idx)) {
Some(v) => v,
None => match key {
SeriesKey::DateTime {
precision,
..
} => generate_timestamp(services, precision),
SeriesKey::Integer {
..
} => metadata.newest_key + 1,
},
}
} else {
match key {
SeriesKey::DateTime {
precision,
..
} => generate_timestamp(services, precision),
SeriesKey::Integer {
..
} => metadata.newest_key + 1,
}
};
let variant_tag: Option<u8> = if has_tag {
if let Some(tag_col) = columns.iter().find(|col| col.name().text() == "tag") {
match tag_col.data().get_value(row_idx) {
Value::Uint1(t) => Some(t),
Value::Int1(t) => Some(t as u8),
_ => Some(0),
}
} else {
Some(0)
}
} else {
None
};
metadata.sequence_counter += 1;
let sequence = metadata.sequence_counter;
let row_key = SeriesRowKey {
series: series.id,
variant_tag,
key: key_value,
sequence,
};
let encoded_key = row_key.encode();
let data_columns: Vec<_> = series.data_columns().collect();
let mut data_values = Vec::with_capacity(data_columns.len());
for col_def in &data_columns {
let value = if let Some(input_col) =
columns.iter().find(|c| c.name().text() == col_def.name)
{
input_col.data().get_value(row_idx)
} else {
Value::none()
};
data_values.push(value);
}
let key_value_encoded = series.key_from_u64(key_value);
let mut row = shape.allocate();
shape.set_value(&mut row, 0, &key_value_encoded);
for (i, value) in data_values.iter().enumerate() {
shape.set_value(&mut row, i + 1, value);
}
let now_nanos = services.runtime_context.clock.now_nanos();
row.set_timestamps(now_nanos, now_nanos);
let row = SeriesRowInterceptor::pre_insert(txn, &series, row)?;
txn.set(&encoded_key, row.clone())?;
SeriesRowInterceptor::post_insert(txn, &series, &row)?;
if plan.returning.is_some() {
returned_rows.push((RowNumber::from(sequence), row.clone()));
}
{
let row_number = RowNumber::from(sequence);
let mut cols = Vec::with_capacity(1 + data_columns.len());
cols.push(Column {
name: Fragment::internal(key_column_name),
data: series.key_column_data(vec![key_value]),
});
for (i, col_def) in data_columns.iter().enumerate() {
let mut data = ColumnData::with_capacity(col_def.constraint.get_type(), 1);
data.push_value(data_values[i].clone());
cols.push(Column {
name: Fragment::internal(&col_def.name),
data,
});
}
let post = Columns {
row_numbers: CowVec::new(vec![row_number]),
created_at: CowVec::new(vec![DateTime::from_nanos(row.created_at_nanos())]),
updated_at: CowVec::new(vec![DateTime::from_nanos(row.updated_at_nanos())]),
columns: CowVec::new(cols),
};
txn.track_flow_change(Change {
origin: ChangeOrigin::Shape(ShapeId::series(series.id)),
version: CommitVersion(0),
diffs: vec![Diff::Insert {
post,
}],
changed_at: DateTime::default(),
});
}
if metadata.row_count == 0 {
metadata.oldest_key = key_value;
metadata.newest_key = key_value;
} else {
if key_value < metadata.oldest_key {
metadata.oldest_key = key_value;
}
if key_value > metadata.newest_key {
metadata.newest_key = key_value;
}
}
metadata.row_count += 1;
inserted_count += 1;
}
}
services.catalog.update_series_metadata_txn(txn, metadata)?;
if let Some(returning_exprs) = &plan.returning {
let columns = decode_rows_to_columns(&shape, &returned_rows);
return evaluate_returning(services, symbols, returning_exprs, columns);
}
Ok(Columns::single_row([
("namespace", Value::Utf8(namespace.name().to_string())),
("series", Value::Utf8(series.name)),
("inserted", Value::Uint8(inserted_count)),
]))
}
fn generate_timestamp(services: &Services, precision: &TimestampPrecision) -> u64 {
match precision {
TimestampPrecision::Second => services.runtime_context.clock.now_secs(),
TimestampPrecision::Millisecond => services.runtime_context.clock.now_millis(),
TimestampPrecision::Microsecond => services.runtime_context.clock.now_micros(),
TimestampPrecision::Nanosecond => services.runtime_context.clock.now_nanos(),
}
}