use std::collections::BTreeMap;
use anyhow::Result;
use iroh::EndpointId;
use n0_future::time::{Instant, SystemTime};
use serde::{Deserialize, Serialize};
use tracing::{debug, warn};
use crate::{
net::{AbortReason, AcceptOutcome, SyncFinished},
NamespaceId,
};
#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq, Copy)]
pub enum SyncReason {
DirectJoin,
NewNeighbor,
SyncReport,
Resync,
}
#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
pub enum Origin {
Connect(SyncReason),
Accept,
}
#[derive(Debug, Clone, Default)]
pub enum SyncState {
#[default]
Idle,
Running {
start: SystemTime,
origin: Origin,
},
}
#[derive(Default)]
pub struct NamespaceStates(BTreeMap<NamespaceId, NamespaceState>);
#[derive(Default)]
struct NamespaceState {
nodes: BTreeMap<EndpointId, PeerState>,
may_emit_ready: bool,
}
impl NamespaceStates {
pub fn is_syncing(&self, namespace: &NamespaceId) -> bool {
self.0.contains_key(namespace)
}
pub fn insert(&mut self, namespace: NamespaceId) {
self.0.entry(namespace).or_default();
}
pub fn start_connect(
&mut self,
namespace: &NamespaceId,
node: EndpointId,
reason: SyncReason,
) -> bool {
match self.entry(namespace, node) {
None => {
debug!("abort connect: namespace is not in sync set");
false
}
Some(state) => state.start_connect(reason),
}
}
pub fn accept_request(
&mut self,
me: &EndpointId,
namespace: &NamespaceId,
node: EndpointId,
) -> AcceptOutcome {
let Some(state) = self.entry(namespace, node) else {
return AcceptOutcome::Reject(AbortReason::NotFound);
};
state.accept_request(me, &node)
}
pub fn finish(
&mut self,
namespace: &NamespaceId,
node: EndpointId,
origin: &Origin,
result: Result<SyncFinished>,
) -> Option<(SystemTime, bool)> {
let state = self.entry(namespace, node)?;
state.finish(origin, result)
}
pub fn set_may_emit_ready(&mut self, namespace: &NamespaceId, value: bool) -> Option<()> {
let state = self.0.get_mut(namespace)?;
state.may_emit_ready = value;
Some(())
}
pub fn may_emit_ready(&mut self, namespace: &NamespaceId) -> Option<bool> {
let state = self.0.get_mut(namespace)?;
if state.may_emit_ready {
state.may_emit_ready = false;
Some(true)
} else {
Some(false)
}
}
pub fn remove(&mut self, namespace: &NamespaceId) -> bool {
self.0.remove(namespace).is_some()
}
fn entry(&mut self, namespace: &NamespaceId, node: EndpointId) -> Option<&mut PeerState> {
self.0
.get_mut(namespace)
.map(|n| n.nodes.entry(node).or_default())
}
}
#[derive(Default)]
struct PeerState {
state: SyncState,
resync_requested: bool,
last_sync: Option<(Instant, Result<SyncFinished>)>,
}
impl PeerState {
fn finish(
&mut self,
origin: &Origin,
result: Result<SyncFinished>,
) -> Option<(SystemTime, bool)> {
let start = match &self.state {
SyncState::Running {
start,
origin: origin2,
} => {
if origin2 != origin {
warn!(actual = ?origin, expected = ?origin2, "finished sync origin does not match state")
}
Some(*start)
}
SyncState::Idle => {
warn!("sync state finish called but not in running state");
None
}
};
self.last_sync = Some((Instant::now(), result));
self.state = SyncState::Idle;
start.map(|s| (s, self.resync_requested))
}
fn start_connect(&mut self, reason: SyncReason) -> bool {
debug!(?reason, "start connect");
match self.state {
SyncState::Running { .. } => {
debug!("abort connect: sync already running");
if matches!(reason, SyncReason::SyncReport) {
debug!("resync queued");
self.resync_requested = true;
}
false
}
SyncState::Idle => {
self.set_sync_running(Origin::Connect(reason));
true
}
}
}
fn accept_request(&mut self, me: &EndpointId, node: &EndpointId) -> AcceptOutcome {
let outcome = match &self.state {
SyncState::Idle => AcceptOutcome::Allow,
SyncState::Running { origin, .. } => match origin {
Origin::Accept => AcceptOutcome::Reject(AbortReason::AlreadySyncing),
Origin::Connect(_reason) => match expected_sync_direction(me, node) {
SyncDirection::Accept => AcceptOutcome::Allow,
SyncDirection::Connect => AcceptOutcome::Reject(AbortReason::AlreadySyncing),
},
},
};
if let AcceptOutcome::Allow = outcome {
self.set_sync_running(Origin::Accept);
}
outcome
}
fn set_sync_running(&mut self, origin: Origin) {
self.state = SyncState::Running {
origin,
start: SystemTime::now(),
};
self.resync_requested = false;
}
}
#[derive(Debug)]
enum SyncDirection {
Accept,
Connect,
}
fn expected_sync_direction(self_node_id: &EndpointId, other_node_id: &EndpointId) -> SyncDirection {
if self_node_id.as_bytes() > other_node_id.as_bytes() {
SyncDirection::Accept
} else {
SyncDirection::Connect
}
}