use std::collections::BTreeSet;
use std::sync::Arc;
use nodedb_cluster::calvin::sequencer::inbox::Inbox;
use nodedb_cluster::calvin::types::{EngineKeySet, ReadWriteSet, SortedVec, TxClass};
use nodedb_types::TenantId;
use crate::Error;
use crate::control::cluster::calvin::executor::ollp::orchestrator::OllpOrchestrator;
use crate::control::planner::calvin::types::{DispatchClass, DispatchOutcome};
use crate::control::server::pgwire::session::TransactionState;
use crate::control::server::pgwire::session::cross_shard_mode::CrossShardTxnMode;
use crate::types::VShardId;
use nodedb_physical::physical_plan::{
DocumentOp, GraphOp, KvOp, PhysicalPlan, TimeseriesOp, VectorOp,
};
use nodedb_physical::physical_task::PhysicalTask;
pub use crate::control::planner::calvin::predicate::predicate_class;
pub fn is_write_plan(plan: &PhysicalPlan) -> bool {
match plan {
PhysicalPlan::Document(op) => matches!(
op,
DocumentOp::PointPut { .. }
| DocumentOp::PointInsert { .. }
| DocumentOp::PointDelete { .. }
| DocumentOp::PointUpdate { .. }
| DocumentOp::BatchInsert { .. }
| DocumentOp::InsertSelect { .. }
| DocumentOp::Upsert { .. }
| DocumentOp::BulkUpdate { .. }
| DocumentOp::BulkDelete { .. }
| DocumentOp::UpdateFromJoin { .. }
),
PhysicalPlan::Kv(op) => matches!(
op,
KvOp::Put { .. }
| KvOp::Insert { .. }
| KvOp::InsertIfAbsent { .. }
| KvOp::InsertOnConflictUpdate { .. }
| KvOp::Delete { .. }
| KvOp::BatchPut { .. }
),
PhysicalPlan::Vector(op) => matches!(
op,
VectorOp::Insert { .. }
| VectorOp::BatchInsert { .. }
| VectorOp::Delete { .. }
| VectorOp::DeleteBySurrogate { .. }
| VectorOp::SparseInsert { .. }
| VectorOp::SparseDelete { .. }
| VectorOp::MultiVectorInsert { .. }
),
PhysicalPlan::Graph(op) => {
matches!(op, GraphOp::EdgePut { .. } | GraphOp::EdgeDelete { .. })
}
PhysicalPlan::Timeseries(op) => matches!(op, TimeseriesOp::Ingest { .. }),
PhysicalPlan::Columnar(op) => {
use nodedb_physical::physical_plan::ColumnarOp;
matches!(op, ColumnarOp::Insert { .. })
}
PhysicalPlan::Crdt(op) => {
use nodedb_physical::physical_plan::CrdtOp;
matches!(op, CrdtOp::ListInsert { .. } | CrdtOp::ListDelete { .. })
}
PhysicalPlan::Array(op) => {
use nodedb_physical::physical_plan::ArrayOp;
matches!(
op,
ArrayOp::Put { .. } | ArrayOp::Delete { .. } | ArrayOp::Flush { .. }
)
}
PhysicalPlan::Spatial(_)
| PhysicalPlan::Text(_)
| PhysicalPlan::Query(_)
| PhysicalPlan::Meta(_)
| PhysicalPlan::ClusterArray(_) => false,
}
}
pub fn is_dependent_predicate(plan: &PhysicalPlan) -> bool {
matches!(
plan,
PhysicalPlan::Document(DocumentOp::BulkUpdate { .. })
| PhysicalPlan::Document(DocumentOp::BulkDelete { .. })
)
}
pub fn classify_dispatch(tasks: &[PhysicalTask]) -> DispatchClass {
let mut vshards: BTreeSet<u32> = BTreeSet::new();
let mut last_vshard = None;
for task in tasks {
if is_write_plan(&task.plan) {
let id = task.vshard_id.as_u32();
vshards.insert(id);
last_vshard = Some(task.vshard_id);
}
}
match vshards.len() {
0 => DispatchClass::SingleShard {
vshard: tasks
.first()
.map(|t| t.vshard_id)
.unwrap_or(VShardId::new(0)),
},
1 => DispatchClass::SingleShard {
vshard: last_vshard
.expect("invariant: vshards.len() == 1 means last_vshard was set during the loop"),
},
_ => DispatchClass::MultiShard { vshards },
}
}
pub fn build_static_tx_class(
tasks: &[PhysicalTask],
tenant_id: TenantId,
) -> crate::Result<TxClass> {
use std::collections::HashMap;
let mut doc_surrogates: HashMap<String, Vec<u32>> = HashMap::new();
for task in tasks {
if !is_write_plan(&task.plan) {
continue;
}
let collection = collection_name_from_plan(&task.plan);
let surrogate = surrogate_from_plan(&task.plan);
doc_surrogates
.entry(collection)
.or_default()
.push(surrogate);
}
let mut write_sets: Vec<EngineKeySet> = doc_surrogates
.into_iter()
.map(|(collection, surrogates)| EngineKeySet::Document {
collection,
surrogates: SortedVec::new(surrogates),
})
.collect();
write_sets.sort_by(|a, b| a.collection().cmp(b.collection()));
let write_set = ReadWriteSet::new(write_sets);
let read_set = ReadWriteSet::new(vec![]);
let plans: Vec<&PhysicalPlan> = tasks.iter().map(|t| &t.plan).collect();
let plans_bytes = zerompk::to_msgpack_vec(&plans).map_err(|e| Error::Serialization {
format: "msgpack".to_owned(),
detail: format!("failed to encode PhysicalPlan vec for Calvin TxClass: {e}"),
})?;
TxClass::new(read_set, write_set, plans_bytes, tenant_id, None).map_err(|e| Error::BadRequest {
detail: format!("invalid TxClass: {e}"),
})
}
pub fn build_dependent_tx_class(
tasks: &[PhysicalTask],
tenant_id: TenantId,
collection: &str,
predicted_surrogates: &[u32],
) -> crate::Result<TxClass> {
use std::collections::BTreeMap;
let mut doc_surrogates: BTreeMap<String, Vec<u32>> = BTreeMap::new();
doc_surrogates
.entry(collection.to_owned())
.or_default()
.extend_from_slice(predicted_surrogates);
for task in tasks {
let coll = collection_name_from_plan(&task.plan);
if coll.is_empty() || coll == collection {
continue;
}
let surrogate = surrogate_from_plan(&task.plan);
doc_surrogates.entry(coll).or_default().push(surrogate);
}
let mut write_sets: Vec<EngineKeySet> = doc_surrogates
.into_iter()
.map(|(coll, surrogates)| EngineKeySet::Document {
collection: coll,
surrogates: SortedVec::new(surrogates),
})
.collect();
write_sets.sort_by(|a, b| a.collection().cmp(b.collection()));
let write_set = ReadWriteSet::new(write_sets);
let read_set = ReadWriteSet::new(vec![]);
let plans: Vec<&PhysicalPlan> = tasks.iter().map(|t| &t.plan).collect();
let plans_bytes = zerompk::to_msgpack_vec(&plans).map_err(|e| Error::Serialization {
format: "msgpack".to_owned(),
detail: format!("failed to encode PhysicalPlan vec for Calvin dependent TxClass: {e}"),
})?;
TxClass::new(read_set, write_set, plans_bytes, tenant_id, None).map_err(|e| Error::BadRequest {
detail: format!("invalid dependent TxClass: {e}"),
})
}
fn collection_name_from_plan(plan: &PhysicalPlan) -> String {
match plan {
PhysicalPlan::Document(
DocumentOp::PointPut { collection, .. }
| DocumentOp::PointInsert { collection, .. }
| DocumentOp::PointDelete { collection, .. }
| DocumentOp::PointUpdate { collection, .. }
| DocumentOp::BatchInsert { collection, .. }
| DocumentOp::Upsert { collection, .. }
| DocumentOp::BulkUpdate { collection, .. }
| DocumentOp::BulkDelete { collection, .. },
) => collection.clone(),
PhysicalPlan::Kv(
KvOp::Put { collection, .. }
| KvOp::Insert { collection, .. }
| KvOp::InsertIfAbsent { collection, .. }
| KvOp::InsertOnConflictUpdate { collection, .. }
| KvOp::Delete { collection, .. }
| KvOp::BatchPut { collection, .. },
) => collection.clone(),
PhysicalPlan::Vector(
VectorOp::Insert { collection, .. }
| VectorOp::BatchInsert { collection, .. }
| VectorOp::Delete { collection, .. }
| VectorOp::DeleteBySurrogate { collection, .. },
) => collection.clone(),
PhysicalPlan::Graph(
GraphOp::EdgePut { collection, .. } | GraphOp::EdgeDelete { collection, .. },
) => collection.clone(),
PhysicalPlan::Timeseries(TimeseriesOp::Ingest { collection, .. }) => collection.clone(),
_ => String::new(),
}
}
fn surrogate_from_plan(plan: &PhysicalPlan) -> u32 {
match plan {
PhysicalPlan::Document(
DocumentOp::PointPut { surrogate, .. }
| DocumentOp::PointInsert { surrogate, .. }
| DocumentOp::PointDelete { surrogate, .. }
| DocumentOp::PointUpdate { surrogate, .. },
) => surrogate.as_u32(),
_ => 0,
}
}
pub async fn dispatch_calvin_or_fast(
tasks: &[PhysicalTask],
mode: CrossShardTxnMode,
tx_state: TransactionState,
inbox: Option<&Inbox>,
_orchestrator: Option<&Arc<OllpOrchestrator>>,
tenant_id: TenantId,
) -> crate::Result<DispatchOutcome> {
let class = classify_dispatch(tasks);
match &class {
DispatchClass::MultiShard { .. } => {
if tx_state == TransactionState::InBlock {
return Err(Error::CrossShardInExplicitTransaction);
}
match mode {
CrossShardTxnMode::Strict => {
let inbox = inbox.ok_or(Error::SequencerUnavailable)?;
let tx_class = build_static_tx_class(tasks, tenant_id)?;
let inbox_seq = inbox.submit(tx_class).map_err(|e| Error::BadRequest {
detail: format!("Calvin sequencer rejected transaction: {e}"),
})?;
Ok(DispatchOutcome::CalvinStatic { inbox_seq })
}
CrossShardTxnMode::BestEffortNonAtomic => Ok(DispatchOutcome::BestEffortNonAtomic),
}
}
DispatchClass::SingleShard { .. } => Ok(DispatchOutcome::SingleShard),
}
}
pub async fn dispatch_dependent_read(
orchestrator: &OllpOrchestrator,
inbox: &Inbox,
predicate_class_hash: u64,
tenant_id: TenantId,
tx_builder: impl Fn() -> crate::Result<TxClass>,
ollp_max_retries: u8,
) -> crate::Result<u64> {
use crate::control::cluster::calvin::executor::ollp::error::OllpError;
let mut retry_count: u32 = 0;
loop {
let result = orchestrator
.submit_with_retry(inbox, predicate_class_hash, tenant_id, || {
tx_builder().map_err(|_e| {
nodedb_cluster::error::CalvinError::Sequencer(
nodedb_cluster::calvin::sequencer::error::SequencerError::Unavailable,
)
})
})
.await;
match result {
Ok(inbox_seq) => return Ok(inbox_seq),
Err(OllpError::CircuitOpen { .. })
| Err(OllpError::Sequencer(_))
| Err(OllpError::Exhausted { .. })
| Err(OllpError::TenantBudgetExceeded { .. }) => {
if retry_count >= ollp_max_retries as u32 {
return Err(Error::OllpExhausted {
retries: ollp_max_retries,
});
}
orchestrator
.on_retry_required(predicate_class_hash, retry_count)
.await;
retry_count += 1;
}
}
}
}
#[cfg(test)]
#[path = "dispatch_tests.rs"]
mod tests;