use std::sync::Arc;
use dashmap::DashMap;
use super::replication::ChannelId;
use super::replication_runtime::{Inbound, ReplicationInboundRouter, ReplicationRuntimeHandle};
#[derive(Default)]
pub struct RedexReplicationRouter {
runtimes: DashMap<ChannelId, Arc<ReplicationRuntimeHandle>>,
}
impl RedexReplicationRouter {
pub fn new() -> Self {
Self {
runtimes: DashMap::new(),
}
}
pub fn register(
&self,
channel_id: ChannelId,
handle: Arc<ReplicationRuntimeHandle>,
) -> Option<Arc<ReplicationRuntimeHandle>> {
self.runtimes.insert(channel_id, handle)
}
pub fn get(&self, channel_id: &ChannelId) -> Option<Arc<ReplicationRuntimeHandle>> {
self.runtimes.get(channel_id).map(|e| e.value().clone())
}
pub fn unregister(&self, channel_id: &ChannelId) -> Option<Arc<ReplicationRuntimeHandle>> {
self.runtimes.remove(channel_id).map(|(_, v)| v)
}
pub fn len(&self) -> usize {
self.runtimes.len()
}
pub fn is_empty(&self) -> bool {
self.runtimes.is_empty()
}
pub fn snapshot_handles(&self) -> Vec<(ChannelId, Arc<ReplicationRuntimeHandle>)> {
self.runtimes
.iter()
.map(|e| (*e.key(), e.value().clone()))
.collect()
}
}
impl ReplicationInboundRouter for RedexReplicationRouter {
fn try_route(&self, channel_id: ChannelId, inbound: Inbound) -> Result<(), Inbound> {
match self.runtimes.get(&channel_id) {
Some(handle) => handle.value().try_dispatch(inbound),
None => Err(inbound),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::adapter::net::channel::ChannelName;
use crate::adapter::net::redex::replication::{ReplicaRole, SyncHeartbeat};
fn cid_for(name: &str) -> ChannelId {
let cn = ChannelName::new(name).unwrap();
ChannelId::from_name(&cn)
}
fn dummy_inbound(channel_id: ChannelId) -> Inbound {
Inbound::Heartbeat {
from: 0xAA,
msg: SyncHeartbeat {
channel_id,
tail_seq: 0,
role: ReplicaRole::Replica,
wall_clock_ms: 0,
},
}
}
#[test]
fn unknown_channel_returns_inbound_back() {
let router = RedexReplicationRouter::new();
let cid = cid_for("test/unknown");
let event = dummy_inbound(cid);
let result = router.try_route(cid, event);
assert!(result.is_err(), "unknown channel must reject");
}
#[test]
fn empty_router_reports_empty() {
let router = RedexReplicationRouter::new();
assert!(router.is_empty());
assert_eq!(router.len(), 0);
assert!(router.get(&cid_for("nothing")).is_none());
}
#[test]
fn unregister_returns_handle_and_drops_registration() {
let rt = tokio::runtime::Runtime::new().unwrap();
let _guard = rt.enter();
let cid = cid_for("test/unregister");
let handle = build_dummy_handle();
let router = RedexReplicationRouter::new();
router.register(cid, handle.clone());
assert_eq!(router.len(), 1);
let removed = router.unregister(&cid);
assert!(removed.is_some(), "unregister must return the handle");
assert!(router.is_empty());
let result = router.try_route(cid, dummy_inbound(cid));
assert!(result.is_err());
rt.block_on(handle.cancel());
}
#[test]
fn register_replaces_returns_previous_handle() {
let rt = tokio::runtime::Runtime::new().unwrap();
let _guard = rt.enter();
let cid = cid_for("test/replace");
let first = build_dummy_handle();
let second = build_dummy_handle();
let router = RedexReplicationRouter::new();
assert!(router.register(cid, first.clone()).is_none());
let previous = router.register(cid, second.clone());
assert!(
previous.is_some(),
"second register must return the prior handle"
);
assert_eq!(router.len(), 1, "still one channel — second replaced first");
rt.block_on(first.cancel());
rt.block_on(second.cancel());
}
#[test]
fn try_route_to_registered_channel_dispatches() {
let rt = tokio::runtime::Runtime::new().unwrap();
let _guard = rt.enter();
let cid = cid_for("test/runtime");
let handle = build_dummy_handle();
let router = RedexReplicationRouter::new();
router.register(cid, handle.clone());
let result = router.try_route(cid, dummy_inbound(cid));
assert!(result.is_ok(), "registered channel must route");
rt.block_on(handle.cancel());
}
fn build_dummy_handle() -> Arc<ReplicationRuntimeHandle> {
use super::super::file::RedexFile;
use super::super::manager::Redex;
use super::super::replication_budget::BandwidthBudget;
use super::super::replication_config::ReplicationConfig;
use super::super::replication_coordinator::{
ChainTagSink, ChannelIdentity, ReplicationCoordinator,
};
use super::super::replication_metrics::ReplicationMetricsRegistry;
use super::super::replication_runtime::{
spawn_replication_runtime, ReplicationDispatcher, RuntimeInputs,
};
use crate::adapter::net::behavior::placement::NodeId;
use crate::adapter::net::channel::ChannelName;
use crate::adapter::net::redex::config::RedexFileConfig;
use crate::error::AdapterError;
use parking_lot::Mutex;
use std::time::{Duration, Instant};
struct NoopSink;
#[async_trait::async_trait]
impl ChainTagSink for NoopSink {
async fn announce_chain(
&self,
_origin_hash: u64,
_tip_seq: u64,
) -> Result<(), AdapterError> {
Ok(())
}
async fn withdraw_chain(&self, _origin_hash: u64) -> Result<(), AdapterError> {
Ok(())
}
}
struct NoopDispatcher;
#[async_trait::async_trait]
impl ReplicationDispatcher for NoopDispatcher {
async fn send_heartbeat(
&self,
_target: NodeId,
_msg: SyncHeartbeat,
) -> Result<(), AdapterError> {
Ok(())
}
async fn send_sync_request(
&self,
_target: NodeId,
_msg: super::super::replication::SyncRequest,
) -> Result<(), AdapterError> {
Ok(())
}
async fn send_sync_response(
&self,
_target: NodeId,
_msg: super::super::replication::SyncResponse,
) -> Result<(), AdapterError> {
Ok(())
}
async fn send_sync_nack(
&self,
_target: NodeId,
_msg: super::super::replication::SyncNack,
) -> Result<(), AdapterError> {
Ok(())
}
}
let cn = ChannelName::new("test/runtime").unwrap();
let redex = Redex::new();
let file: RedexFile = redex.open_file(&cn, RedexFileConfig::default()).unwrap();
let registry = ReplicationMetricsRegistry::new();
let coordinator = Arc::new(ReplicationCoordinator::new(
ChannelIdentity {
channel_name: "test/runtime".to_string(),
origin_hash: 0xCAFE_BABE,
},
ReplicationConfig::new(),
Arc::new(NoopSink) as Arc<dyn ChainTagSink>,
®istry,
));
let inputs = RuntimeInputs {
channel: ChannelIdentity {
channel_name: "test/runtime".to_string(),
origin_hash: 0xCAFE_BABE,
},
channel_id: cid_for("test/runtime"),
self_node_id: 0x10,
replica_set: vec![0x10, 0x20],
heartbeat_ms: 60_000, wall_clock_provider: Arc::new(|| 0),
tail_provider: Arc::new(|| 0),
rtt_lookup: Arc::new(|_| Some(Duration::from_millis(5))),
file,
default_bandwidth_class: Default::default(),
background_fraction: 0.3,
};
let budget = Arc::new(Mutex::new(BandwidthBudget::new(
0.5,
1_000_000,
Instant::now(),
)));
Arc::new(spawn_replication_runtime(
inputs,
coordinator,
Arc::new(NoopDispatcher),
budget,
))
}
}