use tracing::{debug, info_span};
use nodedb_cluster::calvin::types::PassiveReadKey;
use nodedb_types::Value;
use crate::bridge::envelope::{ErrorCode, Response};
use crate::data::executor::core_loop::CoreLoop;
use crate::data::executor::response_codec;
use crate::data::executor::task::ExecutionTask;
use crate::types::TenantId;
use nodedb_physical::physical_plan::PhysicalPlan;
use nodedb_physical::physical_plan::meta::PassiveReadKeyId;
use std::collections::BTreeMap;
impl CoreLoop {
pub(in crate::data::executor) fn execute_calvin_execute_static(
&mut self,
task: &ExecutionTask,
epoch: u64,
position: u32,
epoch_system_ms: i64,
tenant_id: &TenantId,
plans: &[PhysicalPlan],
) -> Response {
let vshard_id = task.request.vshard_id.as_u32();
debug!(
core = self.core_id,
epoch,
position,
epoch_system_ms,
vshard_id,
plan_count = plans.len(),
"calvin execute static"
);
let _apply_span = info_span!(
"executor_apply",
epoch,
position,
vshard = vshard_id,
tenant_id = tenant_id.as_u64(),
trace_id = ?task.request.trace_id,
)
.entered();
const NANOS_PER_MS: i64 = 1_000_000;
self.hlc
.update_from_remote(epoch_system_ms.saturating_mul(NANOS_PER_MS));
self.epoch_system_ms = Some(epoch_system_ms);
let result = self.execute_transaction_batch(task, tenant_id.as_u64(), plans);
self.epoch_system_ms = None;
result
}
pub(in crate::data::executor) fn execute_calvin_execute_passive(
&mut self,
task: &ExecutionTask,
epoch: u64,
position: u32,
tenant_id: &TenantId,
keys_to_read: &[PassiveReadKey],
) -> Response {
debug!(
core = self.core_id,
epoch,
position,
vshard_id = task.request.vshard_id.as_u32(),
key_count = keys_to_read.len(),
"calvin execute passive: reading keys"
);
let mut results: Vec<(PassiveReadKeyId, Value)> = Vec::with_capacity(keys_to_read.len());
for passive_key in keys_to_read {
let values = self.read_passive_key(tenant_id, &passive_key.engine_key);
results.extend(values);
}
match response_codec::encode_serde(&results) {
Ok(payload) => self.response_with_payload(task, payload),
Err(e) => self.response_error(
task,
ErrorCode::Internal {
detail: format!("calvin passive read encode: {e}"),
},
),
}
}
#[allow(clippy::too_many_arguments)]
pub(in crate::data::executor) fn execute_calvin_execute_active(
&mut self,
task: &ExecutionTask,
epoch: u64,
position: u32,
epoch_system_ms: i64,
tenant_id: &TenantId,
plans: &[PhysicalPlan],
injected_reads: &BTreeMap<PassiveReadKeyId, Value>,
) -> Response {
let vshard_id = task.request.vshard_id.as_u32();
debug!(
core = self.core_id,
epoch,
position,
epoch_system_ms,
vshard_id,
plan_count = plans.len(),
injected_count = injected_reads.len(),
"calvin execute active"
);
let _apply_span = info_span!(
"executor_apply",
epoch,
position,
vshard = vshard_id,
tenant_id = tenant_id.as_u64(),
trace_id = ?task.request.trace_id,
)
.entered();
const NANOS_PER_MS: i64 = 1_000_000;
self.hlc
.update_from_remote(epoch_system_ms.saturating_mul(NANOS_PER_MS));
self.epoch_system_ms = Some(epoch_system_ms);
let result = self.execute_transaction_batch(task, tenant_id.as_u64(), plans);
self.epoch_system_ms = None;
result
}
fn read_passive_key(
&self,
tenant_id: &TenantId,
engine_key: &nodedb_cluster::calvin::types::EngineKeySet,
) -> Vec<(PassiveReadKeyId, Value)> {
use nodedb_cluster::calvin::types::EngineKeySet;
match engine_key {
EngineKeySet::Document {
collection,
surrogates,
}
| EngineKeySet::Vector {
collection,
surrogates,
} => surrogates
.iter()
.map(|&surrogate| {
let value = self
.read_surrogate_value(tenant_id, collection, surrogate)
.unwrap_or(Value::Null);
(
PassiveReadKeyId {
collection: collection.clone(),
surrogate,
},
value,
)
})
.collect(),
EngineKeySet::Kv { collection, keys } => keys
.iter()
.map(|k| {
let value = self
.read_kv_value(tenant_id, collection, k)
.unwrap_or(Value::Null);
let key_hash = stable_kv_hash(k);
(
PassiveReadKeyId {
collection: collection.clone(),
surrogate: key_hash,
},
value,
)
})
.collect(),
EngineKeySet::Edge { collection, edges } => edges
.iter()
.map(|&(src, dst)| {
let edge_hash = stable_edge_hash(src, dst);
(
PassiveReadKeyId {
collection: collection.clone(),
surrogate: edge_hash,
},
Value::Null, )
})
.collect(),
}
}
fn read_surrogate_value(
&self,
tenant_id: &TenantId,
collection: &str,
surrogate: u32,
) -> Option<Value> {
let _ = (tenant_id, collection, surrogate);
None
}
fn read_kv_value(&self, tenant_id: &TenantId, collection: &str, key: &[u8]) -> Option<Value> {
let _ = (tenant_id, collection, key);
None
}
}
fn stable_kv_hash(key: &[u8]) -> u32 {
const FNV_OFFSET: u32 = 2_166_136_261;
const FNV_PRIME: u32 = 16_777_619;
let mut hash = FNV_OFFSET;
for &byte in key {
hash ^= u32::from(byte);
hash = hash.wrapping_mul(FNV_PRIME);
}
hash
}
fn stable_edge_hash(src: u32, dst: u32) -> u32 {
let combined: u64 = (u64::from(src) << 32) | u64::from(dst);
stable_kv_hash(&combined.to_le_bytes())
}