use std::time::{Duration, Instant};
use dashmap::DashMap;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use thiserror::Error;
use uuid::Uuid;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum TransportMode {
Direct,
P2p,
Relay,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum ModeSwitchReason {
Requested,
PathDegraded,
PathRecovered,
OperatorOverride,
}
#[derive(Debug, Clone)]
struct ModeState {
mode: TransportMode,
generation: u64,
updated_at: Instant,
last_failure: Option<Instant>,
health: PathHealth,
last_cursors: HashMap<String, u64>,
}
#[derive(Debug, Clone)]
struct PathHealth {
consecutive_failures: u32,
backoff_until: Option<Instant>,
}
impl PathHealth {
fn new() -> Self {
Self {
consecutive_failures: 0,
backoff_until: None,
}
}
}
#[derive(Debug, Clone, Copy)]
pub struct FailoverPolicy {
pub failure_threshold: u32,
pub retry_backoff: Duration,
}
impl Default for FailoverPolicy {
fn default() -> Self {
Self {
failure_threshold: 3,
retry_backoff: Duration::from_secs(10),
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct ModeSwitchSignal {
pub peer_id: Uuid,
pub mode: TransportMode,
pub previous_mode: TransportMode,
pub reason: ModeSwitchReason,
pub generation: u64,
}
#[derive(Debug, Clone)]
pub struct TransportPacket {
pub channel: String,
pub cursor: Option<u64>,
pub payload: serde_json::Value,
}
impl TransportPacket {
pub fn new(channel: impl Into<String>, payload: serde_json::Value) -> Self {
Self {
channel: channel.into(),
cursor: None,
payload,
}
}
pub fn with_cursor(mut self, cursor: u64) -> Self {
self.cursor = Some(cursor);
self
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum TransportRoute {
Direct,
P2p,
Relay,
}
#[derive(Debug, Clone)]
pub struct RouteOutcome {
pub route: TransportRoute,
pub active_mode: TransportMode,
pub switch: Option<ModeSwitchSignal>,
pub delivered: bool,
}
#[derive(Debug, Error, PartialEq, Eq)]
pub enum TransportError {
#[error("path unavailable: {0}")]
PathUnavailable(&'static str),
#[error("dispatch failed: {0}")]
DispatchFailed(&'static str),
#[error("rate limited: {0}")]
RateLimited(&'static str),
}
pub trait TransportDispatcher: Send + Sync {
fn send_direct(&self, peer: Uuid, packet: TransportPacket) -> Result<(), TransportError>;
fn send_p2p(&self, peer: Uuid, packet: TransportPacket) -> Result<(), TransportError>;
fn send_relay(&self, peer: Uuid, packet: TransportPacket) -> Result<(), TransportError>;
}
pub struct TransportManager {
modes: DashMap<Uuid, ModeState>,
policy: FailoverPolicy,
}
impl Default for TransportManager {
fn default() -> Self {
Self::new()
}
}
impl TransportManager {
pub fn new() -> Self {
Self {
modes: DashMap::new(),
policy: FailoverPolicy::default(),
}
}
pub fn with_policy(policy: FailoverPolicy) -> Self {
Self {
modes: DashMap::new(),
policy,
}
}
pub fn mode_for(&self, peer: Uuid) -> TransportMode {
self.modes
.get(&peer)
.map(|s| s.mode)
.unwrap_or(TransportMode::Direct)
}
pub fn switch_mode(
&self,
peer: Uuid,
new_mode: TransportMode,
reason: ModeSwitchReason,
) -> Option<ModeSwitchSignal> {
let mut state = self.ensure_state(peer);
if state.mode == new_mode {
return None;
}
let previous_mode = state.mode;
state.mode = new_mode;
state.generation += 1;
state.updated_at = Instant::now();
Some(ModeSwitchSignal {
peer_id: peer,
mode: new_mode,
previous_mode,
reason,
generation: state.generation,
})
}
pub fn route<D: TransportDispatcher>(
&self,
peer: Uuid,
packet: TransportPacket,
dispatcher: &D,
) -> Result<RouteOutcome, TransportError> {
if self.is_duplicate(peer, &packet) {
let mode = self.mode_for(peer);
return Ok(RouteOutcome {
route: match mode {
TransportMode::Direct => TransportRoute::Direct,
TransportMode::P2p => TransportRoute::P2p,
TransportMode::Relay => TransportRoute::Relay,
},
active_mode: mode,
switch: None,
delivered: false,
});
}
let mode = self.mode_for(peer);
match mode {
TransportMode::Direct => {
let record = packet.clone();
dispatcher.send_direct(peer, packet)?;
self.record_cursor(peer, &record);
Ok(RouteOutcome {
route: TransportRoute::Direct,
active_mode: TransportMode::Direct,
switch: None,
delivered: true,
})
}
TransportMode::Relay => {
let record = packet.clone();
dispatcher.send_relay(peer, packet)?;
self.record_cursor(peer, &record);
Ok(RouteOutcome {
route: TransportRoute::Relay,
active_mode: TransportMode::Relay,
switch: None,
delivered: true,
})
}
TransportMode::P2p => match dispatcher.send_p2p(peer, packet.clone()) {
Ok(_) => {
self.reset_failures(peer);
self.record_cursor(peer, &packet);
Ok(RouteOutcome {
route: TransportRoute::P2p,
active_mode: TransportMode::P2p,
switch: None,
delivered: true,
})
}
Err(TransportError::PathUnavailable(_)) => {
let switch = self.mark_failure_and_maybe_downgrade(peer);
let record = packet.clone();
dispatcher.send_relay(peer, packet)?;
self.record_cursor(peer, &record);
Ok(RouteOutcome {
route: TransportRoute::Relay,
active_mode: TransportMode::Relay,
switch,
delivered: true,
})
}
Err(err) => Err(err),
},
}
}
pub fn maybe_retry_p2p(&self, peer: Uuid) -> Option<ModeSwitchSignal> {
let mut state = self.ensure_state(peer);
if state.mode != TransportMode::Relay {
return None;
}
let now = Instant::now();
let ready = state
.health
.backoff_until
.map(|until| now >= until)
.unwrap_or(false);
if !ready {
return None;
}
state.health.backoff_until = None;
state.health.consecutive_failures = 0;
let previous_mode = state.mode;
state.mode = TransportMode::P2p;
state.generation += 1;
state.updated_at = now;
Some(ModeSwitchSignal {
peer_id: peer,
mode: TransportMode::P2p,
previous_mode,
reason: ModeSwitchReason::PathRecovered,
generation: state.generation,
})
}
fn ensure_state(&self, peer: Uuid) -> dashmap::mapref::one::RefMut<'_, Uuid, ModeState> {
self.modes.entry(peer).or_insert_with(|| ModeState {
mode: TransportMode::Direct,
generation: 0,
updated_at: Instant::now(),
last_failure: None,
health: PathHealth::new(),
last_cursors: HashMap::new(),
})
}
fn mark_failure_and_maybe_downgrade(&self, peer: Uuid) -> Option<ModeSwitchSignal> {
let mut state = self.ensure_state(peer);
let now = Instant::now();
state.last_failure = Some(now);
state.health.consecutive_failures = state.health.consecutive_failures.saturating_add(1);
if state.health.consecutive_failures < self.policy.failure_threshold {
return None;
}
state.health.backoff_until = Some(now + self.policy.retry_backoff);
Some(ModeSwitchSignal {
peer_id: peer,
mode: TransportMode::Relay,
previous_mode: state.mode,
reason: ModeSwitchReason::PathDegraded,
generation: {
state.mode = TransportMode::Relay;
state.generation += 1;
state.generation
},
})
}
fn reset_failures(&self, peer: Uuid) {
if let Some(mut state) = self.modes.get_mut(&peer) {
state.health.consecutive_failures = 0;
state.health.backoff_until = None;
}
}
fn is_duplicate(&self, peer: Uuid, packet: &TransportPacket) -> bool {
let Some(cursor) = packet.cursor else {
return false;
};
let state = self.ensure_state(peer);
if let Some(last) = state.last_cursors.get(&packet.channel) {
return cursor <= *last;
}
false
}
fn record_cursor(&self, peer: Uuid, packet: &TransportPacket) {
let Some(cursor) = packet.cursor else {
return;
};
if let Some(mut state) = self.modes.get_mut(&peer) {
state.last_cursors.insert(packet.channel.clone(), cursor);
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use serde_json::json;
use std::sync::Mutex;
#[derive(Default)]
struct MockDispatcher {
fail_p2p: Mutex<bool>,
calls: Mutex<Vec<(TransportRoute, Uuid, TransportPacket)>>,
}
impl MockDispatcher {
fn with_p2p_failure() -> Self {
Self {
fail_p2p: Mutex::new(true),
calls: Mutex::new(Vec::new()),
}
}
fn allow_p2p(&self) {
*self.fail_p2p.lock().unwrap() = false;
}
fn calls(&self) -> Vec<(TransportRoute, Uuid, TransportPacket)> {
self.calls.lock().unwrap().clone()
}
}
impl TransportDispatcher for MockDispatcher {
fn send_direct(&self, peer: Uuid, packet: TransportPacket) -> Result<(), TransportError> {
self.calls
.lock()
.unwrap()
.push((TransportRoute::Direct, peer, packet));
Ok(())
}
fn send_p2p(&self, peer: Uuid, packet: TransportPacket) -> Result<(), TransportError> {
if *self.fail_p2p.lock().unwrap() {
return Err(TransportError::PathUnavailable("p2p unavailable"));
}
self.calls
.lock()
.unwrap()
.push((TransportRoute::P2p, peer, packet));
Ok(())
}
fn send_relay(&self, peer: Uuid, packet: TransportPacket) -> Result<(), TransportError> {
self.calls
.lock()
.unwrap()
.push((TransportRoute::Relay, peer, packet));
Ok(())
}
}
#[test]
fn tracks_modes_and_generations() {
let manager = TransportManager::new();
let peer = Uuid::new_v4();
assert_eq!(manager.mode_for(peer), TransportMode::Direct);
let switch = manager
.switch_mode(peer, TransportMode::P2p, ModeSwitchReason::Requested)
.unwrap();
assert_eq!(switch.previous_mode, TransportMode::Direct);
assert_eq!(switch.mode, TransportMode::P2p);
assert_eq!(switch.generation, 1);
assert!(
manager
.switch_mode(peer, TransportMode::P2p, ModeSwitchReason::Requested)
.is_none()
);
let switch = manager
.switch_mode(peer, TransportMode::Relay, ModeSwitchReason::PathDegraded)
.unwrap();
assert_eq!(switch.generation, 2);
assert_eq!(manager.mode_for(peer), TransportMode::Relay);
}
#[test]
fn routes_using_current_mode() {
let manager = TransportManager::new();
let dispatcher = MockDispatcher::default();
dispatcher.allow_p2p();
let peer = Uuid::new_v4();
let _ = manager.switch_mode(peer, TransportMode::P2p, ModeSwitchReason::Requested);
let packet = TransportPacket::new("presence", json!({ "message": "hi" }));
let outcome = manager.route(peer, packet, &dispatcher).unwrap();
assert_eq!(outcome.route, TransportRoute::P2p);
assert_eq!(outcome.active_mode, TransportMode::P2p);
assert!(outcome.delivered);
assert!(outcome.switch.is_none());
let calls = dispatcher.calls();
assert_eq!(calls.len(), 1);
assert_eq!(calls[0].0, TransportRoute::P2p);
assert_eq!(calls[0].2.channel, "presence");
}
#[test]
fn falls_back_to_relay_on_p2p_failure() {
let manager = TransportManager::with_policy(FailoverPolicy {
failure_threshold: 1,
retry_backoff: Duration::from_secs(1),
});
let dispatcher = MockDispatcher::with_p2p_failure();
let peer = Uuid::new_v4();
let _ = manager.switch_mode(peer, TransportMode::P2p, ModeSwitchReason::Requested);
let packet = TransportPacket::new("data", json!({ "seq": 1 })).with_cursor(42);
let outcome = manager.route(peer, packet, &dispatcher).unwrap();
assert_eq!(outcome.route, TransportRoute::Relay);
assert_eq!(outcome.active_mode, TransportMode::Relay);
assert!(outcome.delivered);
assert_eq!(
manager.mode_for(peer),
TransportMode::Relay,
"mode should be downgraded after failure"
);
let switch = outcome.switch.expect("switch signal missing");
assert_eq!(switch.reason, ModeSwitchReason::PathDegraded);
assert_eq!(switch.previous_mode, TransportMode::P2p);
assert_eq!(switch.mode, TransportMode::Relay);
let calls = dispatcher.calls();
assert_eq!(calls.len(), 1, "relay should get the retried packet");
assert_eq!(calls[0].0, TransportRoute::Relay);
assert_eq!(calls[0].2.cursor, Some(42));
assert_eq!(calls[0].2.channel, "data");
}
#[test]
fn respects_failure_threshold_and_backoff() {
let manager = TransportManager::with_policy(FailoverPolicy {
failure_threshold: 3,
retry_backoff: Duration::from_secs(5),
});
let dispatcher = MockDispatcher::with_p2p_failure();
let peer = Uuid::new_v4();
let _ = manager.switch_mode(peer, TransportMode::P2p, ModeSwitchReason::Requested);
for _ in 0..(manager.policy.failure_threshold - 1) {
let packet = TransportPacket::new("data", json!({ "seq": 1 }));
let outcome = manager.route(peer, packet, &dispatcher).unwrap();
assert!(outcome.switch.is_none(), "should not downgrade yet");
}
let packet = TransportPacket::new("data", json!({ "seq": 2 }));
let outcome = manager.route(peer, packet, &dispatcher).unwrap();
assert_eq!(outcome.active_mode, TransportMode::Relay);
assert!(outcome.delivered);
assert!(manager.mode_for(peer) == TransportMode::Relay);
assert!(manager.maybe_retry_p2p(peer).is_none());
}
#[test]
fn retries_after_backoff() {
let manager = TransportManager::with_policy(FailoverPolicy {
failure_threshold: 1,
retry_backoff: Duration::from_millis(0),
});
let dispatcher = MockDispatcher::with_p2p_failure();
let peer = Uuid::new_v4();
let _ = manager.switch_mode(peer, TransportMode::P2p, ModeSwitchReason::Requested);
for _ in 0..manager.policy.failure_threshold {
let packet = TransportPacket::new("data", json!({ "seq": 1 }));
let _ = manager.route(peer, packet.clone(), &dispatcher).unwrap();
}
if let Some(mut state) = manager.modes.get_mut(&peer) {
state.health.backoff_until = Some(Instant::now());
}
let switch = manager.maybe_retry_p2p(peer).expect("should retry");
assert_eq!(switch.mode, TransportMode::P2p);
assert_eq!(manager.mode_for(peer), TransportMode::P2p);
}
#[test]
fn drops_duplicates_and_preserves_ordering() {
let manager = TransportManager::with_policy(FailoverPolicy {
failure_threshold: 1,
retry_backoff: Duration::from_secs(1),
});
let dispatcher = MockDispatcher::default();
dispatcher.allow_p2p();
let peer = Uuid::new_v4();
let _ = manager.switch_mode(peer, TransportMode::P2p, ModeSwitchReason::Requested);
let packet1 = TransportPacket::new("data", json!({ "seq": 1 })).with_cursor(10);
let outcome1 = manager.route(peer, packet1.clone(), &dispatcher).unwrap();
assert!(outcome1.delivered);
let outcome_dup = manager.route(peer, packet1, &dispatcher).unwrap();
assert!(!outcome_dup.delivered);
let packet2 = TransportPacket::new("data", json!({ "seq": 2 })).with_cursor(11);
let outcome2 = manager.route(peer, packet2, &dispatcher).unwrap();
assert!(outcome2.delivered);
let calls = dispatcher.calls();
assert_eq!(calls.len(), 2);
assert_eq!(calls[0].2.cursor, Some(10));
assert_eq!(calls[1].2.cursor, Some(11));
}
}