use std::sync::Arc;
use std::time::{Duration, Instant};
use bytes::BytesMut;
use noxu_dbi::{DatabaseConfig, EnvironmentImpl};
use noxu_log::entry::LnLogEntry;
use noxu_log::{LogEntryType, LogManager};
use noxu_rep::{NodeType, RepConfig, RepNode, ReplicatedEnvironment};
use noxu_util::{NULL_LSN, NULL_VLSN};
fn ln_payload(db_id: u64, key: &[u8], data: &[u8]) -> Vec<u8> {
let entry = LnLogEntry::new(
db_id,
None,
NULL_LSN,
false,
None,
None,
NULL_VLSN,
0,
false,
key.to_vec(),
Some(data.to_vec()),
0,
NULL_VLSN,
);
let mut buf = BytesMut::new();
entry.write_to_log(&mut buf);
buf.to_vec()
}
fn open_node(
dir: &std::path::Path,
) -> (
Arc<EnvironmentImpl>,
u64,
Arc<std::sync::RwLock<noxu_tree::Tree>>,
Arc<LogManager>,
) {
let env = Arc::new(EnvironmentImpl::new(dir, false, true).unwrap());
let mut cfg = DatabaseConfig::new();
cfg.set_allow_create(true).set_transactional(true);
let db = env.open_database("chain_db", &cfg).unwrap();
let db_id = db.read().get_id().id() as u64;
let tree = env.replica_tree_for_db(db_id).unwrap();
let log_mgr = env.get_log_manager().expect("log manager");
(env, db_id, tree, log_mgr)
}
fn poll_read(
tree: &Arc<std::sync::RwLock<noxu_tree::Tree>>,
key: &[u8],
timeout: Duration,
) -> Option<Vec<u8>> {
let deadline = Instant::now() + timeout;
while Instant::now() < deadline {
if let Some(fetch) = tree.read().unwrap().search_with_data(key)
&& fetch.found
{
return fetch.data;
}
std::thread::sleep(Duration::from_millis(50));
}
None
}
#[test]
fn test_chain_master_r1_r2_read_on_r2_matches_master() {
let records: [(&[u8], &[u8]); 4] = [
(b"alpha", b"one"),
(b"bravo", b"two"),
(b"charlie", b"three"),
(b"delta", b"four"),
];
let master_dir = tempfile::TempDir::new().unwrap();
let master_home = master_dir.path().to_path_buf();
let (master_env_impl, master_db_id, _m_tree, master_log) =
open_node(&master_home);
let master_cfg = RepConfig::builder("chain_grp", "master", "127.0.0.1")
.node_port(0)
.env_home(&master_home)
.build();
let master = Arc::new(ReplicatedEnvironment::new(master_cfg).unwrap());
master.init_self_weak();
master.with_environment(Arc::clone(&master_env_impl));
let r1_dir = tempfile::TempDir::new().unwrap();
let r1_home = r1_dir.path().to_path_buf();
let (r1_env_impl, r1_db_id, r1_tree, _r1_log) = open_node(&r1_home);
assert_eq!(
r1_db_id, master_db_id,
"the replicated db must share a db id across the chain"
);
let r1_cfg = RepConfig::builder("chain_grp", "R1", "127.0.0.1")
.node_port(0)
.env_home(&r1_home)
.cascade_feeding(true) .build();
let r1 = Arc::new(ReplicatedEnvironment::new(r1_cfg).unwrap());
r1.init_self_weak();
r1.with_environment(Arc::clone(&r1_env_impl));
let r2_dir = tempfile::TempDir::new().unwrap();
let r2_home = r2_dir.path().to_path_buf();
let (r2_env_impl, r2_db_id, r2_tree, _r2_log) = open_node(&r2_home);
assert_eq!(r2_db_id, master_db_id, "shared db id across the chain");
let r2_cfg = RepConfig::builder("chain_grp", "R2", "127.0.0.1")
.node_port(0)
.env_home(&r2_home)
.build();
let r2 = Arc::new(ReplicatedEnvironment::new(r2_cfg).unwrap());
r2.init_self_weak();
r2.with_environment(Arc::clone(&r2_env_impl));
let master_addr = master.bound_addr().expect("master binds");
let r1_addr = r1.bound_addr().expect("R1 binds");
r1.add_peer(RepNode::new(
"master".to_string(),
NodeType::Electable,
"127.0.0.1".to_string(),
master_addr.port(),
1,
))
.unwrap();
r2.add_peer(RepNode::new(
"R1".to_string(),
NodeType::Electable,
"127.0.0.1".to_string(),
r1_addr.port(),
2,
))
.unwrap();
master
.add_peer(RepNode::new(
"R1".to_string(),
NodeType::Electable,
"127.0.0.1".to_string(),
r1_addr.port(),
1,
))
.unwrap();
master.become_master(1).unwrap();
for (i, (k, v)) in records.iter().enumerate() {
let vlsn = (i + 1) as u64;
let payload = ln_payload(master_db_id, k, v);
let lsn = master_log
.log_with_vlsn(
LogEntryType::InsertLN,
&payload,
vlsn,
true,
false,
)
.expect("master log_with_vlsn");
master.register_vlsn_typed(
vlsn,
lsn.file_number(),
lsn.file_offset(),
LogEntryType::InsertLN,
);
}
r1.become_replica("master").unwrap();
for (k, v) in &records {
let got = poll_read(&r1_tree, k, Duration::from_secs(15));
assert_eq!(
got.as_deref(),
Some(&v[..]),
"R1 must receive + apply '{}' from the master",
std::str::from_utf8(k).unwrap()
);
}
let r1_range = r1.vlsn_index_arc().get_range();
eprintln!(
"PROBE: R1 vlsn range first={} last={}",
r1_range.first(),
r1_range.last()
);
r2.become_replica("R1").unwrap();
for (k, v) in &records {
let got = poll_read(&r2_tree, k, Duration::from_secs(20));
assert_eq!(
got.as_deref(),
Some(&v[..]),
"FAIL-PRE: R2 read of '{}' returned nothing — without the \
WAL-backed cascade feeder + VLSN-tagged replica WAL, R2 cannot \
source the master's data via R1",
std::str::from_utf8(k).unwrap()
);
}
let r2_range = r2.vlsn_index_arc().get_range();
assert!(
r2_range.last() >= records.len() as u64,
"R2 VLSN range must cover all {} entries; got last={}",
records.len(),
r2_range.last()
);
assert!(
r1.wal_feeds_served() >= 1,
"R1 must have fed R2 via the WAL FeederRunner + EnvironmentLogScanner \
mechanism (JE Feeder + MasterFeederSource), not the in-memory pull \
fallback; wal_feeds_served() == {}",
r1.wal_feeds_served(),
);
assert!(
master.wal_feeds_served() >= 1,
"the master must have fed R1 via the same WAL FeederRunner mechanism; \
wal_feeds_served() == {}",
master.wal_feeds_served(),
);
r2.close().unwrap();
r1.close().unwrap();
master.close().unwrap();
}