use std::collections::HashMap;
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use tokio::sync::mpsc;
use tracing::{error, warn};
use crate::calvin::CalvinCompletionRegistry;
use crate::calvin::sequencer::entry::SequencerEntry;
use crate::calvin::types::SequencedTxn;
pub struct StateMachineMetrics {
pub epochs_applied: AtomicU64,
pub txns_fanned_out: AtomicU64,
pub txns_dropped_backpressure: AtomicU64,
pub epochs_skipped_gap: AtomicU64,
}
impl StateMachineMetrics {
pub fn new() -> Arc<Self> {
Arc::new(Self {
epochs_applied: AtomicU64::new(0),
txns_fanned_out: AtomicU64::new(0),
txns_dropped_backpressure: AtomicU64::new(0),
epochs_skipped_gap: AtomicU64::new(0),
})
}
}
impl Default for StateMachineMetrics {
fn default() -> Self {
Self {
epochs_applied: AtomicU64::new(0),
txns_fanned_out: AtomicU64::new(0),
txns_dropped_backpressure: AtomicU64::new(0),
epochs_skipped_gap: AtomicU64::new(0),
}
}
}
pub struct SequencerStateMachine {
last_applied_epoch: u64,
vshard_senders: HashMap<u32, mpsc::Sender<SequencedTxn>>,
pub metrics: Arc<StateMachineMetrics>,
completion_registry: Arc<CalvinCompletionRegistry>,
}
const NOT_YET_APPLIED: u64 = u64::MAX;
impl SequencerStateMachine {
pub fn new(
vshard_senders: HashMap<u32, mpsc::Sender<SequencedTxn>>,
completion_registry: Arc<CalvinCompletionRegistry>,
) -> Self {
Self {
last_applied_epoch: NOT_YET_APPLIED,
vshard_senders,
metrics: StateMachineMetrics::new(),
completion_registry,
}
}
pub fn last_applied_epoch(&self) -> Option<u64> {
if self.last_applied_epoch == NOT_YET_APPLIED {
None
} else {
Some(self.last_applied_epoch)
}
}
pub fn next_epoch(&self) -> u64 {
if self.last_applied_epoch == NOT_YET_APPLIED {
0
} else {
self.last_applied_epoch + 1
}
}
pub fn set_vshard_sender(&mut self, vshard: u32, sender: mpsc::Sender<SequencedTxn>) {
self.vshard_senders.insert(vshard, sender);
}
pub fn remove_vshard_sender(&mut self, vshard: u32) {
self.vshard_senders.remove(&vshard);
}
pub fn current_committed_epoch(&self) -> Option<u64> {
self.last_applied_epoch()
}
pub fn replay_epochs_for_vshard(
&self,
entries: &[nodedb_raft::LogEntry],
vshard_id: u32,
from_epoch: u64,
to_epoch: u64,
) -> Vec<SequencedTxn> {
let mut result = std::collections::BTreeMap::<(u64, u32), SequencedTxn>::new();
for entry in entries {
if entry.data.is_empty() {
continue;
}
let seq_entry: SequencerEntry = match zerompk::from_msgpack(&entry.data) {
Ok(e) => e,
Err(err) => {
tracing::warn!(
raft_index = entry.index,
error = %err,
"calvin rebuild: failed to decode sequencer entry; skipping"
);
continue;
}
};
match seq_entry {
SequencerEntry::EpochBatch { mut batch } => {
if batch.epoch < from_epoch || batch.epoch > to_epoch {
continue;
}
for txn in &mut batch.txns {
txn.tx_class.restore_derived();
let participates = txn
.tx_class
.participating_vshards()
.iter()
.any(|v| v.as_u32() == vshard_id);
if participates {
result.insert((txn.epoch, txn.position), txn.clone());
}
}
}
SequencerEntry::CompletionAck { .. } => {}
}
}
result.into_values().collect()
}
pub fn apply(&mut self, data: &[u8]) {
let entry: SequencerEntry = match zerompk::from_msgpack(data) {
Ok(e) => e,
Err(err) => {
error!(error = %err, "sequencer state machine: failed to decode entry; skipping");
return;
}
};
match entry {
SequencerEntry::EpochBatch { mut batch } => {
for txn in &mut batch.txns {
txn.tx_class.restore_derived();
}
let expected = self.next_epoch();
if batch.epoch != expected {
error!(
epoch = batch.epoch,
expected,
"sequencer state machine: epoch gap detected; \
this node may have missed entries. Skipping batch."
);
self.metrics
.epochs_skipped_gap
.fetch_add(1, Ordering::Relaxed);
self.last_applied_epoch = batch.epoch;
return;
}
let mut fanned_out = 0u64;
let mut dropped = 0u64;
for txn in &batch.txns {
let mut txn_with_ts = txn.clone();
txn_with_ts.epoch_system_ms = batch.epoch_system_ms;
let vshards = txn.tx_class.participating_vshards();
for vshard_id in vshards {
let vshard = vshard_id.as_u32();
if let Some(sender) = self.vshard_senders.get(&vshard) {
match sender.try_send(txn_with_ts.clone()) {
Ok(()) => {
fanned_out += 1;
}
Err(mpsc::error::TrySendError::Full(_)) => {
warn!(
epoch = batch.epoch,
position = txn.position,
vshard,
"sequencer apply: vshard channel full (backpressure); \
dropping txn. Scheduler will catch up via log replay."
);
dropped += 1;
}
Err(mpsc::error::TrySendError::Closed(_)) => {
warn!(
vshard,
epoch = batch.epoch,
"sequencer apply: vshard sender gone; \
scheduler may have exited"
);
dropped += 1;
}
}
}
}
}
self.metrics
.txns_fanned_out
.fetch_add(fanned_out, Ordering::Relaxed);
self.metrics
.txns_dropped_backpressure
.fetch_add(dropped, Ordering::Relaxed);
self.metrics.epochs_applied.fetch_add(1, Ordering::Relaxed);
self.last_applied_epoch = batch.epoch;
}
SequencerEntry::CompletionAck {
epoch,
position,
vshard_id,
} => {
self.completion_registry
.note_completion_ack(crate::calvin::TxnId::new(epoch, position), vshard_id);
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::calvin::types::{
EngineKeySet, EpochBatch, ReadWriteSet, SequencedTxn, SortedVec, TxClass,
};
use nodedb_types::{
TenantId,
id::{DatabaseId, VShardId},
};
fn find_two_distinct_collections() -> (String, String) {
let mut first: Option<(String, u32)> = None;
for i in 0u32..512 {
let name = format!("col_{i}");
let vshard = VShardId::from_collection_in_database(DatabaseId::DEFAULT, &name).as_u32();
if let Some((ref fname, fv)) = first {
if fv != vshard {
return (fname.clone(), name);
}
} else {
first = Some((name, vshard));
}
}
panic!("could not find two distinct-vshard collections in 512 tries");
}
fn make_tx_class_for_vshards(vshard_a: u32, vshard_b: u32) -> (TxClass, u32, u32) {
let (col_a, col_b) = find_two_distinct_collections();
let _ = (vshard_a, vshard_b); let real_va = VShardId::from_collection_in_database(DatabaseId::DEFAULT, &col_a).as_u32();
let real_vb = VShardId::from_collection_in_database(DatabaseId::DEFAULT, &col_b).as_u32();
let write_set = ReadWriteSet::new(vec![
EngineKeySet::Document {
collection: col_a,
surrogates: SortedVec::new(vec![1]),
},
EngineKeySet::Document {
collection: col_b,
surrogates: SortedVec::new(vec![2]),
},
]);
let tx_class = TxClass::new(
ReadWriteSet::new(vec![]),
write_set,
vec![],
TenantId::new(1),
None,
)
.expect("valid TxClass");
(tx_class, real_va, real_vb)
}
fn make_batch_with_two_vshards() -> (EpochBatch, u32, u32) {
let (tx_class, va, vb) = make_tx_class_for_vshards(0, 1);
let batch = EpochBatch {
epoch: 0,
txns: vec![SequencedTxn {
epoch: 0,
position: 0,
tx_class,
epoch_system_ms: 1_700_000_000_000,
}],
epoch_system_ms: 1_700_000_000_000,
};
(batch, va, vb)
}
fn encode_entry(entry: &SequencerEntry) -> Vec<u8> {
zerompk::to_msgpack_vec(entry).expect("encode")
}
#[test]
fn apply_on_fresh_state_increments_last_applied_epoch() {
let (batch, va, vb) = make_batch_with_two_vshards();
let (tx_a, _) = mpsc::channel(64);
let (tx_b, _) = mpsc::channel(64);
let mut senders = HashMap::new();
senders.insert(va, tx_a);
senders.insert(vb, tx_b);
let mut sm = SequencerStateMachine::new(senders, CalvinCompletionRegistry::new());
assert_eq!(sm.last_applied_epoch(), None);
let data = encode_entry(&SequencerEntry::EpochBatch { batch });
sm.apply(&data);
assert_eq!(sm.last_applied_epoch(), Some(0));
assert_eq!(sm.metrics.epochs_applied.load(Ordering::Relaxed), 1);
}
#[test]
fn gap_detection_rejects_out_of_order_epochs() {
let (mut batch, va, vb) = make_batch_with_two_vshards();
let (tx_a, _) = mpsc::channel(64);
let (tx_b, _) = mpsc::channel(64);
let mut senders = HashMap::new();
senders.insert(va, tx_a);
senders.insert(vb, tx_b);
let mut sm = SequencerStateMachine::new(senders, CalvinCompletionRegistry::new());
let data0 = encode_entry(&SequencerEntry::EpochBatch {
batch: batch.clone(),
});
sm.apply(&data0);
assert_eq!(sm.last_applied_epoch(), Some(0));
batch.epoch = 2;
for txn in &mut batch.txns {
txn.epoch = 2;
}
let data2 = encode_entry(&SequencerEntry::EpochBatch { batch });
sm.apply(&data2);
assert_eq!(sm.metrics.epochs_skipped_gap.load(Ordering::Relaxed), 1);
assert_eq!(sm.last_applied_epoch(), Some(2));
}
#[test]
fn per_vshard_fanout_sends_only_to_participating_vshards() {
let (batch, va, vb) = make_batch_with_two_vshards();
let (tx_a, mut rx_a) = mpsc::channel(64);
let (tx_b, mut rx_b) = mpsc::channel(64);
let (tx_c, mut rx_c) = mpsc::channel(64);
let mut senders = HashMap::new();
senders.insert(va, tx_a);
senders.insert(vb, tx_b);
senders.insert(999, tx_c);
let mut sm = SequencerStateMachine::new(senders, CalvinCompletionRegistry::new());
let data = encode_entry(&SequencerEntry::EpochBatch { batch });
sm.apply(&data);
assert!(rx_a.try_recv().is_ok(), "vshard A should have received txn");
assert!(rx_b.try_recv().is_ok(), "vshard B should have received txn");
assert!(
rx_c.try_recv().is_err(),
"vshard C should not have received txn"
);
}
#[test]
fn try_send_on_full_channel_logs_and_drops_without_blocking() {
let (batch, va, vb) = make_batch_with_two_vshards();
let (tx_a, _rx_a) = mpsc::channel(1);
let (tx_b, _rx_b) = mpsc::channel(1);
let pre_fill: SequencedTxn = batch.txns[0].clone();
let _ = tx_a.try_send(pre_fill);
let mut senders = HashMap::new();
senders.insert(va, tx_a);
senders.insert(vb, tx_b);
let mut sm = SequencerStateMachine::new(senders, CalvinCompletionRegistry::new());
let data = encode_entry(&SequencerEntry::EpochBatch { batch });
sm.apply(&data);
assert!(sm.metrics.txns_dropped_backpressure.load(Ordering::Relaxed) >= 1);
}
#[test]
fn next_epoch_is_zero_on_fresh_state_machine() {
let sm = SequencerStateMachine::new(HashMap::new(), CalvinCompletionRegistry::new());
assert_eq!(sm.next_epoch(), 0);
}
#[test]
fn next_epoch_increments_after_apply() {
let (batch, va, vb) = make_batch_with_two_vshards();
let (tx_a, _) = mpsc::channel(64);
let (tx_b, _) = mpsc::channel(64);
let mut senders = HashMap::new();
senders.insert(va, tx_a);
senders.insert(vb, tx_b);
let mut sm = SequencerStateMachine::new(senders, CalvinCompletionRegistry::new());
let data = encode_entry(&SequencerEntry::EpochBatch { batch });
sm.apply(&data);
assert_eq!(sm.next_epoch(), 1);
}
}