use std::collections::BTreeMap;
use std::sync::Arc;
use std::sync::Mutex;
use std::sync::atomic::{AtomicU64, Ordering};
use tokio::sync::mpsc;
use tracing::warn;
use crate::calvin::sequencer::config::SequencerConfig;
use crate::calvin::sequencer::error::SequencerError;
use crate::calvin::types::TxClass;
use nodedb_types::TenantId;
#[derive(Debug, Clone)]
pub struct AdmittedTx {
pub inbox_seq: u64,
pub tx_class: TxClass,
}
#[derive(Debug)]
pub struct RejectedTx {
pub admitted: AdmittedTx,
pub reason: SequencerError,
pub conflict_context: Option<crate::calvin::sequencer::metrics::ConflictKey>,
}
#[derive(Clone)]
pub struct Inbox {
tx: mpsc::Sender<AdmittedTx>,
next_seq: Arc<AtomicU64>,
depth_counter: Arc<AtomicU64>,
tenant_in_flight: Arc<Mutex<BTreeMap<u64, u64>>>,
max_plans_bytes: usize,
max_participating_vshards: usize,
tenant_quota: usize,
max_dependent_read_bytes: usize,
max_dependent_read_passives: usize,
}
pub struct InboxReceiver {
rx: mpsc::Receiver<AdmittedTx>,
capacity: usize,
depth_counter: Arc<AtomicU64>,
tenant_in_flight: Arc<Mutex<BTreeMap<u64, u64>>>,
pending: Option<AdmittedTx>,
}
impl Inbox {
pub fn submit(&self, tx_class: TxClass) -> Result<u64, SequencerError> {
if tx_class.plans.len() > self.max_plans_bytes {
return Err(SequencerError::TxnTooLarge {
bytes: tx_class.plans.len(),
limit: self.max_plans_bytes,
});
}
let vshards = tx_class.participating_vshards().len();
if vshards > self.max_participating_vshards {
return Err(SequencerError::FanoutTooWide {
vshards,
limit: self.max_participating_vshards,
});
}
if let Some(spec) = &tx_class.dependent_reads {
let total_bytes = spec.total_bytes();
if total_bytes > self.max_dependent_read_bytes {
return Err(SequencerError::DependentReadTooLarge {
bytes: total_bytes,
limit: self.max_dependent_read_bytes,
});
}
let passives = spec.passive_reads.len();
if passives > self.max_dependent_read_passives {
return Err(SequencerError::DependentReadFanoutTooWide {
passives,
limit: self.max_dependent_read_passives,
});
}
}
let tenant = tx_class.tenant_id;
let tenant_key = tenant.as_u64();
{
let mut map = self
.tenant_in_flight
.lock()
.unwrap_or_else(|p| p.into_inner());
let in_flight = map.entry(tenant_key).or_insert(0);
if *in_flight >= self.tenant_quota as u64 {
return Err(SequencerError::TenantQuotaExceeded {
tenant: tenant.as_u64(),
quota: self.tenant_quota,
in_flight: *in_flight as usize,
});
}
*in_flight += 1;
}
let seq = self.next_seq.fetch_add(1, Ordering::Relaxed);
let admitted = AdmittedTx {
inbox_seq: seq,
tx_class,
};
let _admit_span = tracing::info_span!(
"sequencer_admit",
tenant_id = admitted.tx_class.tenant_id.as_u64(),
inbox_seq = seq,
);
match self.tx.try_send(admitted) {
Ok(()) => {
self.depth_counter.fetch_add(1, Ordering::Relaxed);
Ok(seq)
}
Err(e) => {
{
let mut map = self
.tenant_in_flight
.lock()
.unwrap_or_else(|p| p.into_inner());
if let Some(count) = map.get_mut(&tenant_key) {
*count = count.saturating_sub(1);
}
}
match e {
mpsc::error::TrySendError::Full(_) => Err(SequencerError::Overloaded),
mpsc::error::TrySendError::Closed(_) => {
warn!("sequencer inbox channel closed; service may have exited");
Err(SequencerError::Unavailable)
}
}
}
}
}
pub fn depth(&self) -> usize {
self.depth_counter.load(Ordering::Relaxed) as usize
}
}
impl InboxReceiver {
pub fn drain_into_capped(
&mut self,
out: &mut Vec<AdmittedTx>,
max_count: usize,
max_bytes: usize,
) -> usize {
let mut count = 0;
let mut bytes_so_far: usize = 0;
if let Some(pending) = self.pending.take() {
let tx_bytes = pending.tx_class.plans.len();
if count < max_count && bytes_so_far.saturating_add(tx_bytes) <= max_bytes {
bytes_so_far += tx_bytes;
self.decrement_tenant(&pending.tx_class.tenant_id);
self.depth_counter.fetch_sub(1, Ordering::Relaxed);
out.push(pending);
count += 1;
} else {
self.pending = Some(pending);
return 0;
}
}
while count < max_count {
match self.rx.try_recv() {
Ok(tx) => {
let tx_bytes = tx.tx_class.plans.len();
let new_total = bytes_so_far.saturating_add(tx_bytes);
if new_total > max_bytes {
self.pending = Some(tx);
break;
}
bytes_so_far = new_total;
self.decrement_tenant(&tx.tx_class.tenant_id);
self.depth_counter.fetch_sub(1, Ordering::Relaxed);
out.push(tx);
count += 1;
}
Err(_) => break,
}
}
count
}
pub fn drain_all_discard(&mut self) -> usize {
let mut count = 0;
if let Some(pending) = self.pending.take() {
self.decrement_tenant(&pending.tx_class.tenant_id);
self.depth_counter.fetch_sub(1, Ordering::Relaxed);
count += 1;
}
while let Ok(tx) = self.rx.try_recv() {
self.decrement_tenant(&tx.tx_class.tenant_id);
self.depth_counter.fetch_sub(1, Ordering::Relaxed);
count += 1;
}
count
}
pub fn capacity(&self) -> usize {
self.capacity
}
pub fn depth(&self) -> u64 {
self.depth_counter.load(Ordering::Relaxed)
}
fn decrement_tenant(&self, tenant: &TenantId) {
let key = tenant.as_u64();
let mut map = self
.tenant_in_flight
.lock()
.unwrap_or_else(|p| p.into_inner());
if let Some(count) = map.get_mut(&key) {
*count = count.saturating_sub(1);
}
}
}
pub fn new_inbox(capacity: usize, config: &SequencerConfig) -> (Inbox, InboxReceiver) {
let (tx, rx) = mpsc::channel(capacity);
let next_seq = Arc::new(AtomicU64::new(0));
let depth_counter = Arc::new(AtomicU64::new(0));
let tenant_in_flight: Arc<Mutex<BTreeMap<u64, u64>>> = Arc::new(Mutex::new(BTreeMap::new()));
let inbox = Inbox {
tx,
next_seq,
depth_counter: Arc::clone(&depth_counter),
tenant_in_flight: Arc::clone(&tenant_in_flight),
max_plans_bytes: config.max_plans_bytes_per_txn,
max_participating_vshards: config.max_participating_vshards_per_txn,
tenant_quota: config.tenant_inbox_quota,
max_dependent_read_bytes: config.max_dependent_read_bytes_per_txn,
max_dependent_read_passives: config.max_dependent_read_passives_per_txn,
};
let receiver = InboxReceiver {
rx,
capacity,
depth_counter,
tenant_in_flight,
pending: None,
};
(inbox, receiver)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::calvin::types::{EngineKeySet, ReadWriteSet, SortedVec, TxClass};
use nodedb_types::id::{DatabaseId, VShardId};
fn default_config() -> SequencerConfig {
SequencerConfig::default()
}
fn find_two_distinct_collections() -> (String, String) {
let mut first: Option<(String, u32)> = None;
for i in 0u32..512 {
let name = format!("col_{i}");
let vshard = VShardId::from_collection_in_database(DatabaseId::DEFAULT, &name).as_u32();
if let Some((ref fname, fv)) = first {
if fv != vshard {
return (fname.clone(), name);
}
} else {
first = Some((name, vshard));
}
}
panic!("could not find two distinct-vshard collections in 512 tries");
}
fn make_tx_class_with_tenant(tenant: u64) -> TxClass {
let (col_a, col_b) = find_two_distinct_collections();
let write_set = ReadWriteSet::new(vec![
EngineKeySet::Document {
collection: col_a,
surrogates: SortedVec::new(vec![1]),
},
EngineKeySet::Document {
collection: col_b,
surrogates: SortedVec::new(vec![2]),
},
]);
TxClass::new(
ReadWriteSet::new(vec![]),
write_set,
vec![],
TenantId::new(tenant),
None,
)
.expect("valid TxClass")
}
fn make_tx_class() -> TxClass {
make_tx_class_with_tenant(1)
}
#[test]
fn submit_succeeds_when_capacity_available() {
let config = default_config();
let (inbox, mut rx) = new_inbox(10, &config);
let seq = inbox
.submit(make_tx_class())
.expect("submit should succeed");
assert_eq!(seq, 0);
let mut out = Vec::new();
let drained = rx.drain_into_capped(&mut out, 100, usize::MAX);
assert_eq!(drained, 1);
assert_eq!(out[0].inbox_seq, 0);
}
#[test]
fn submit_returns_overloaded_when_full() {
let config = SequencerConfig {
inbox_capacity: 1,
tenant_inbox_quota: 10,
..default_config()
};
let (inbox, _rx) = new_inbox(1, &config);
let _ = inbox.submit(make_tx_class()).expect("first submit");
let err = inbox
.submit(make_tx_class())
.expect_err("should be overloaded");
assert_eq!(err, SequencerError::Overloaded);
}
#[test]
fn inbox_seq_is_monotonically_increasing() {
let config = default_config();
let (inbox, mut rx) = new_inbox(10, &config);
for _ in 0..5 {
inbox.submit(make_tx_class()).expect("submit");
}
let mut out = Vec::new();
rx.drain_into_capped(&mut out, 100, usize::MAX);
let seqs: Vec<u64> = out.iter().map(|t| t.inbox_seq).collect();
let mut sorted = seqs.clone();
sorted.sort();
assert_eq!(seqs, sorted, "inbox_seq must be monotonically increasing");
}
#[test]
fn drain_into_returns_count_and_clears_buffer() {
let config = default_config();
let (inbox, mut rx) = new_inbox(10, &config);
inbox.submit(make_tx_class()).expect("s1");
inbox.submit(make_tx_class()).expect("s2");
inbox.submit(make_tx_class()).expect("s3");
let mut out = Vec::new();
let n = rx.drain_into_capped(&mut out, 100, usize::MAX);
assert_eq!(n, 3);
assert_eq!(out.len(), 3);
let n2 = rx.drain_into_capped(&mut out, 100, usize::MAX);
assert_eq!(n2, 0);
}
#[test]
fn capacity_is_reported_correctly() {
let config = default_config();
let (_inbox, rx) = new_inbox(42, &config);
assert_eq!(rx.capacity(), 42);
}
#[test]
fn submit_rejects_oversized_plans() {
let config = SequencerConfig {
max_plans_bytes_per_txn: 4,
tenant_inbox_quota: 100,
..default_config()
};
let (inbox, _rx) = new_inbox(10, &config);
let mut tx = make_tx_class();
tx.plans = vec![0u8; 5]; let err = inbox.submit(tx).expect_err("should be rejected");
assert!(
matches!(err, SequencerError::TxnTooLarge { bytes: 5, limit: 4 }),
"unexpected error: {err:?}"
);
}
#[test]
fn submit_rejects_wide_fanout() {
let config = SequencerConfig {
max_participating_vshards_per_txn: 1,
tenant_inbox_quota: 100,
..default_config()
};
let (inbox, _rx) = new_inbox(10, &config);
let tx = make_tx_class();
let err = inbox.submit(tx).expect_err("should be rejected");
assert!(
matches!(err, SequencerError::FanoutTooWide { .. }),
"unexpected error: {err:?}"
);
}
#[test]
fn tenant_quota_isolation() {
let config = SequencerConfig {
inbox_capacity: 100,
tenant_inbox_quota: 2,
..default_config()
};
let (inbox, _rx) = new_inbox(100, &config);
inbox
.submit(make_tx_class_with_tenant(1))
.expect("t1 first");
inbox
.submit(make_tx_class_with_tenant(1))
.expect("t1 second");
let err = inbox
.submit(make_tx_class_with_tenant(1))
.expect_err("t1 quota exceeded");
assert!(
matches!(err, SequencerError::TenantQuotaExceeded { tenant: 1, .. }),
"unexpected error: {err:?}"
);
inbox
.submit(make_tx_class_with_tenant(2))
.expect("t2 unaffected by t1 quota");
}
#[test]
fn tenant_quota_decrements_on_drain() {
let config = SequencerConfig {
inbox_capacity: 10,
tenant_inbox_quota: 1,
..default_config()
};
let (inbox, mut rx) = new_inbox(10, &config);
inbox
.submit(make_tx_class_with_tenant(7))
.expect("first submit");
assert!(inbox.submit(make_tx_class_with_tenant(7)).is_err());
let mut out = Vec::new();
let n = rx.drain_into_capped(&mut out, 10, usize::MAX);
assert_eq!(n, 1);
inbox
.submit(make_tx_class_with_tenant(7))
.expect("submit after drain");
}
#[test]
fn tenant_counter_rolls_back_on_send_failure() {
let config = SequencerConfig {
inbox_capacity: 1,
tenant_inbox_quota: 10,
..default_config()
};
let (inbox, _rx) = new_inbox(1, &config);
inbox
.submit(make_tx_class_with_tenant(3))
.expect("first submit fills channel");
let err = inbox
.submit(make_tx_class_with_tenant(3))
.expect_err("channel full");
assert_eq!(err, SequencerError::Overloaded);
let map = inbox
.tenant_in_flight
.lock()
.unwrap_or_else(|p| p.into_inner());
let in_flight = *map.get(&TenantId::new(3).as_u64()).unwrap_or(&0);
assert_eq!(
in_flight, 1,
"tenant counter must be 1 (rolled back from 2 on Overloaded), got {in_flight}"
);
}
#[test]
fn submit_rejects_dependent_read_too_large() {
use crate::calvin::types::{DependentReadSpec, PassiveReadKey};
use std::collections::BTreeMap;
let config = SequencerConfig {
max_dependent_read_bytes_per_txn: 4,
tenant_inbox_quota: 100,
..default_config()
};
let (inbox, _rx) = new_inbox(10, &config);
let mut tx = make_tx_class();
tx.dependent_reads = Some(DependentReadSpec {
passive_reads: {
let mut m = BTreeMap::new();
m.insert(
999u32,
vec![PassiveReadKey {
engine_key: crate::calvin::types::EngineKeySet::Document {
collection: "passive_col".to_owned(),
surrogates: crate::calvin::types::SortedVec::new(vec![1u32, 2u32]),
},
}],
);
m
},
});
let err = inbox.submit(tx).expect_err("should be rejected");
assert!(
matches!(
err,
SequencerError::DependentReadTooLarge { bytes: 8, limit: 4 }
),
"unexpected error: {err:?}"
);
}
#[test]
fn submit_rejects_dependent_read_fanout_too_wide() {
use crate::calvin::types::{DependentReadSpec, EngineKeySet, PassiveReadKey, SortedVec};
use std::collections::BTreeMap;
let config = SequencerConfig {
max_dependent_read_passives_per_txn: 1,
tenant_inbox_quota: 100,
..default_config()
};
let (inbox, _rx) = new_inbox(10, &config);
let mut tx = make_tx_class();
tx.dependent_reads = Some(DependentReadSpec {
passive_reads: {
let mut m = BTreeMap::new();
for vshard in [101u32, 102u32] {
m.insert(
vshard,
vec![PassiveReadKey {
engine_key: EngineKeySet::Document {
collection: format!("col_{vshard}"),
surrogates: SortedVec::new(vec![vshard]),
},
}],
);
}
m
},
});
let err = inbox.submit(tx).expect_err("should be rejected");
assert!(
matches!(
err,
SequencerError::DependentReadFanoutTooWide {
passives: 2,
limit: 1
}
),
"unexpected error: {err:?}"
);
}
}