use async_trait::async_trait;
use tracing::{debug, error};
use nodedb_types::value::Value;
use super::session::SyncSession;
use super::wire::*;
use crate::types::{DatabaseId, TenantId, TraceId, VShardId};
#[async_trait]
pub trait ColumnarDispatcher: Send + Sync {
async fn dispatch_insert(
&self,
tenant_id: TenantId,
vshard: VShardId,
collection: String,
rows: Vec<Vec<Value>>,
schema_bytes: Vec<u8>,
) -> crate::Result<u64>;
}
pub struct SharedStateColumnarDispatcher<'a> {
pub shared: &'a crate::control::state::SharedState,
}
#[async_trait]
impl<'a> ColumnarDispatcher for SharedStateColumnarDispatcher<'a> {
async fn dispatch_insert(
&self,
tenant_id: TenantId,
vshard: VShardId,
collection: String,
rows: Vec<Vec<Value>>,
schema_bytes: Vec<u8>,
) -> crate::Result<u64> {
use crate::bridge::envelope::PhysicalPlan;
use crate::control::server::dispatch_utils::dispatch_to_data_plane_with_source;
use crate::event::EventSource;
use nodedb_physical::physical_plan::columnar::{ColumnarInsertIntent, ColumnarOp};
use nodedb_types::columnar::ColumnarSchema;
use nodedb_types::value::Value;
use std::collections::HashMap;
let column_names: Vec<String> = if schema_bytes.is_empty() {
Vec::new()
} else {
zerompk::from_msgpack::<ColumnarSchema>(&schema_bytes)
.map(|s| s.columns.into_iter().map(|c| c.name).collect())
.unwrap_or_default()
};
let row_count = rows.len() as u64;
let object_rows: Vec<Value> = rows
.into_iter()
.map(|row| {
let mut map = HashMap::with_capacity(row.len());
for (i, val) in row.into_iter().enumerate() {
let key = column_names
.get(i)
.cloned()
.unwrap_or_else(|| format!("col{i}"));
map.insert(key, val);
}
Value::Object(map)
})
.collect();
let array_value = Value::Array(object_rows);
let payload =
nodedb_types::value_to_msgpack(&array_value).map_err(|e| crate::Error::Internal {
detail: format!("columnar sync: msgpack serialize rows: {e}"),
})?;
let plan = PhysicalPlan::Columnar(ColumnarOp::Insert {
collection,
payload,
format: "msgpack".to_string(),
intent: ColumnarInsertIntent::Insert,
on_conflict_updates: Vec::new(),
surrogates: Vec::new(),
schema_bytes,
});
dispatch_to_data_plane_with_source(
self.shared,
tenant_id,
vshard,
plan,
TraceId::ZERO,
EventSource::CrdtSync,
)
.await
.map(|_| row_count)
}
}
pub struct NoOpColumnarDispatcher;
#[async_trait]
impl ColumnarDispatcher for NoOpColumnarDispatcher {
async fn dispatch_insert(
&self,
_tenant_id: TenantId,
_vshard: VShardId,
_collection: String,
_rows: Vec<Vec<Value>>,
_schema_bytes: Vec<u8>,
) -> crate::Result<u64> {
Err(crate::Error::Internal {
detail: "columnar insert routed through path lacking SharedState; \
check listener wiring — insert was ACKed but NOT applied"
.to_string(),
})
}
}
impl SyncSession {
pub async fn handle_columnar_insert<D: ColumnarDispatcher>(
&mut self,
msg: &ColumnarInsertMsg,
dispatcher: &D,
) -> Option<SyncFrame> {
self.last_activity = std::time::Instant::now();
if !self.authenticated {
let ack = ColumnarInsertAckMsg {
collection: msg.collection.clone(),
batch_id: msg.batch_id,
accepted: 0,
rejected: msg.rows.len() as u64,
reject_reason: Some("unauthenticated".to_string()),
};
return SyncFrame::try_encode(SyncMessageType::ColumnarInsertAck, &ack);
}
let total = msg.rows.len() as u64;
let mut decoded_rows: Vec<Vec<Value>> = Vec::with_capacity(msg.rows.len());
for (i, row_bytes) in msg.rows.iter().enumerate() {
match zerompk::from_msgpack::<Vec<Value>>(row_bytes) {
Ok(row) => decoded_rows.push(row),
Err(e) => {
error!(
session = %self.session_id,
collection = %msg.collection,
batch_id = msg.batch_id,
row_index = i,
error = %e,
"columnar sync: row decode failed; rejecting entire batch"
);
let ack = ColumnarInsertAckMsg {
collection: msg.collection.clone(),
batch_id: msg.batch_id,
accepted: 0,
rejected: total,
reject_reason: Some(format!("row {i} msgpack decode failed: {e}")),
};
return SyncFrame::try_encode(SyncMessageType::ColumnarInsertAck, &ack);
}
}
}
let decoded = decoded_rows.len() as u64;
let tenant_id = self.tenant_id.unwrap_or(TenantId::new(0));
let vshard = VShardId::from_collection_in_database(DatabaseId::DEFAULT, &msg.collection);
debug!(
session = %self.session_id,
collection = %msg.collection,
batch_id = msg.batch_id,
rows = decoded,
lite_id = %msg.lite_id,
"columnar insert: dispatching to Data Plane"
);
match dispatcher
.dispatch_insert(
tenant_id,
vshard,
msg.collection.clone(),
decoded_rows,
msg.schema_bytes.clone(),
)
.await
{
Ok(accepted) => {
self.mutations_processed += accepted;
let ack = ColumnarInsertAckMsg {
collection: msg.collection.clone(),
batch_id: msg.batch_id,
accepted,
rejected: total.saturating_sub(accepted),
reject_reason: None,
};
SyncFrame::try_encode(SyncMessageType::ColumnarInsertAck, &ack)
}
Err(e) => {
error!(
session = %self.session_id,
collection = %msg.collection,
batch_id = msg.batch_id,
error = %e,
"columnar insert dispatch failed; reporting rows as rejected"
);
let ack = ColumnarInsertAckMsg {
collection: msg.collection.clone(),
batch_id: msg.batch_id,
accepted: 0,
rejected: total,
reject_reason: Some(e.to_string()),
};
SyncFrame::try_encode(SyncMessageType::ColumnarInsertAck, &ack)
}
}
}
}
#[cfg(test)]
mod tests {
use std::sync::{Arc, Mutex};
use super::*;
type MockCallLog = Arc<Mutex<Vec<(TenantId, String, Vec<Vec<Value>>)>>>;
struct MockDispatcher {
calls: MockCallLog,
result: crate::Result<u64>,
}
impl MockDispatcher {
fn ok(n: u64) -> (Self, MockCallLog) {
let calls = Arc::new(Mutex::new(Vec::new()));
(
Self {
calls: calls.clone(),
result: Ok(n),
},
calls,
)
}
fn err() -> Self {
Self {
calls: Arc::new(Mutex::new(Vec::new())),
result: Err(crate::Error::Internal {
detail: "mock failure".to_string(),
}),
}
}
}
#[async_trait]
impl ColumnarDispatcher for MockDispatcher {
async fn dispatch_insert(
&self,
tenant_id: TenantId,
_vshard: VShardId,
collection: String,
rows: Vec<Vec<Value>>,
_schema_bytes: Vec<u8>,
) -> crate::Result<u64> {
self.calls
.lock()
.unwrap()
.push((tenant_id, collection, rows));
match &self.result {
Ok(n) => Ok(*n),
Err(e) => Err(crate::Error::Internal {
detail: e.to_string(),
}),
}
}
}
fn make_session() -> SyncSession {
SyncSession::new("test-columnar-session".to_string())
}
fn encode_row(values: Vec<Value>) -> Vec<u8> {
zerompk::to_msgpack_vec(&values).expect("encode row")
}
fn make_insert_msg(collection: &str, rows: Vec<Vec<Value>>) -> ColumnarInsertMsg {
ColumnarInsertMsg {
lite_id: "lite-test".to_string(),
collection: collection.to_string(),
rows: rows.iter().map(|r| encode_row(r.clone())).collect(),
batch_id: 1,
schema_bytes: Vec::new(),
}
}
#[tokio::test]
async fn unauthenticated_returns_rejection() {
let mut session = make_session();
let (mock, calls) = MockDispatcher::ok(0);
let msg = make_insert_msg(
"metrics",
vec![vec![Value::Integer(1), Value::Float(std::f64::consts::PI)]],
);
let frame = session.handle_columnar_insert(&msg, &mock).await;
assert!(frame.is_some());
let ack: ColumnarInsertAckMsg = frame.unwrap().decode_body().unwrap();
assert_eq!(ack.accepted, 0);
assert_eq!(ack.rejected, 1);
assert!(calls.lock().unwrap().is_empty());
}
#[tokio::test]
async fn authenticated_dispatches_and_acks() {
let mut session = make_session();
session.authenticated = true;
let (mock, calls) = MockDispatcher::ok(2);
let msg = make_insert_msg(
"metrics",
vec![
vec![Value::Integer(1), Value::Float(1.0)],
vec![Value::Integer(2), Value::Float(2.0)],
],
);
let frame = session.handle_columnar_insert(&msg, &mock).await;
assert!(frame.is_some());
let ack: ColumnarInsertAckMsg = frame.unwrap().decode_body().unwrap();
assert_eq!(ack.accepted, 2);
assert_eq!(ack.rejected, 0);
let log = calls.lock().unwrap();
assert_eq!(log.len(), 1);
assert_eq!(log[0].1, "metrics");
assert_eq!(log[0].2.len(), 2);
}
#[tokio::test]
async fn dispatch_failure_rejects_all() {
let mut session = make_session();
session.authenticated = true;
let mock = MockDispatcher::err();
let msg = make_insert_msg("metrics", vec![vec![Value::Integer(1), Value::Float(1.0)]]);
let frame = session.handle_columnar_insert(&msg, &mock).await;
assert!(frame.is_some());
let ack: ColumnarInsertAckMsg = frame.unwrap().decode_body().unwrap();
assert_eq!(ack.accepted, 0);
assert_eq!(ack.rejected, 1);
assert!(ack.reject_reason.is_some());
}
}