mod common;
use std::sync::atomic::Ordering;
use std::time::Duration;
use nodedb_cluster::calvin::{
sequencer::{SequencerConfig, new_inbox},
types::{EngineKeySet, ReadWriteSet, SequencedTxn, SortedVec, TxClass},
};
use nodedb_types::{
TenantId,
id::{DatabaseId, VShardId},
};
use tokio::sync::mpsc;
use common::{spawn_with_sequencer, wait_for_sequencer_leader};
fn two_distinct_collections() -> (String, String) {
let mut first: Option<(String, u32)> = None;
for i in 0u32..512 {
let name = format!("col_{i}");
let vshard = VShardId::from_collection_in_database(DatabaseId::DEFAULT, &name).as_u32();
if let Some((ref fname, fv)) = first {
if fv != vshard {
return (fname.clone(), name);
}
} else {
first = Some((name, vshard));
}
}
panic!("could not find two distinct-vshard collections in 512 tries");
}
fn make_multishard_txclass() -> (TxClass, u32, u32) {
let (col_a, col_b) = two_distinct_collections();
let va = VShardId::from_collection_in_database(DatabaseId::DEFAULT, &col_a).as_u32();
let vb = VShardId::from_collection_in_database(DatabaseId::DEFAULT, &col_b).as_u32();
let write_set = ReadWriteSet::new(vec![
EngineKeySet::Document {
collection: col_a,
surrogates: SortedVec::new(vec![1]),
},
EngineKeySet::Document {
collection: col_b,
surrogates: SortedVec::new(vec![2]),
},
]);
let tx = TxClass::new(
ReadWriteSet::new(vec![]),
write_set,
vec![],
TenantId::new(1),
None,
)
.expect("valid TxClass");
(tx, va, vb)
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn sequencer_normal_path_commit_on_all_replicas() {
let node_ids = vec![1u64, 2, 3];
let nodes = spawn_with_sequencer(node_ids)
.await
.expect("spawn_with_sequencer");
let leader_idx =
wait_for_sequencer_leader(&nodes, Duration::from_secs(10), Duration::from_millis(50)).await;
let config = SequencerConfig {
epoch_duration: Duration::from_millis(10),
..SequencerConfig::default()
};
let (inbox, inbox_receiver) = new_inbox(1024, &config);
let (tx_a, col_b_name) = two_distinct_collections();
let va = VShardId::from_collection_in_database(DatabaseId::DEFAULT, &tx_a).as_u32();
let vb = VShardId::from_collection_in_database(DatabaseId::DEFAULT, &col_b_name).as_u32();
let mut vshard_rxs_a: Vec<mpsc::Receiver<SequencedTxn>> = Vec::new();
let mut vshard_rxs_b: Vec<mpsc::Receiver<SequencedTxn>> = Vec::new();
for node in &nodes {
let (tx_a_ch, rx_a) = mpsc::channel(64);
let (tx_b_ch, rx_b) = mpsc::channel(64);
node.add_vshard_sender(va, tx_a_ch);
node.add_vshard_sender(vb, tx_b_ch);
vshard_rxs_a.push(rx_a);
vshard_rxs_b.push(rx_b);
}
let (_seq_shutdown, _, _seq_handle) =
nodes[leader_idx].start_sequencer_service(inbox_receiver, config.clone());
let (tx_class, _va, _vb) = make_multishard_txclass();
inbox.submit(tx_class).expect("submit");
common::wait_for(
"all 3 nodes apply epoch 0",
Duration::from_secs(10),
Duration::from_millis(20),
|| nodes.iter().all(|n| n.last_applied_epoch().is_some()),
)
.await;
for (i, node) in nodes.iter().enumerate() {
let epoch = node
.last_applied_epoch()
.expect("epoch should be applied on all nodes");
assert_eq!(
epoch, 0,
"node {} should have applied epoch 0, got {}",
i, epoch
);
let applied = node
.state_machine
.lock()
.unwrap()
.metrics
.epochs_applied
.load(Ordering::Relaxed);
assert!(applied >= 1, "node {} metrics show 0 epochs applied", i);
}
for (i, (rx_a, rx_b)) in vshard_rxs_a
.iter_mut()
.zip(vshard_rxs_b.iter_mut())
.enumerate()
{
let got_a = rx_a.try_recv().is_ok();
let got_b = rx_b.try_recv().is_ok();
assert!(
got_a || got_b,
"node {}: neither vshard receiver got the txn fan-out",
i
);
}
for node in nodes {
node.shutdown().await;
}
}