use std::hash::Hash;
use std::sync::Arc;
use std::time::{Duration, Instant};
use parking_lot::RwLock;
use serde::de::DeserializeOwned;
use serde::Serialize;
pub mod audit;
pub mod capability;
pub mod capability_aggregation;
pub mod capability_bridge;
pub mod dispatch;
pub mod expiry;
pub mod metrics;
pub mod reservation;
pub mod routing;
pub mod snapshot;
pub mod state;
pub mod wire;
#[cfg(test)]
mod tests;
pub use audit::{FoldAuditSink, NoopSink, RingFoldAuditSink, VecFoldAuditSink};
pub use capability::{
capability_tags_for, capability_tags_for_all, reflex_addr_for, CapabilityFilter,
CapabilityFold, CapabilityMatch, CapabilityMembership, CapabilityQuery, HardwareSummary,
NodeState,
};
pub use capability_aggregation::{Aggregation, CapacityQuery, CapacityRow, GroupBy, TagMatcher};
pub use dispatch::{
DispatchError, FoldChannelRouter, FoldDispatch, FoldDispatchAdapter, FoldRegistry,
SUBPROTOCOL_FOLD,
};
pub use expiry::DEFAULT_SWEEP_INTERVAL;
pub use metrics::{FoldMetrics, FoldStats};
pub use reservation::{
JobId, ReservationAnnouncement, ReservationFold, ReservationQuery, ReservationRow,
ReservationState, ResourceId,
};
pub use routing::{RouteAnnouncement, RouteRow, RoutingFold, RoutingQuery};
pub use snapshot::{FoldSnapshot, FoldSnapshotEntry};
pub use state::{
ApplyOutcome, EntryTransition, FoldEntry, FoldError, FoldIndex, FoldState, MergeAction,
NoIndex, NodeId,
};
pub use wire::{EnvelopeMeta, SignedAnnouncement, WireError};
pub trait FoldKind: Send + Sync + Sized + 'static {
const KIND_ID: u16;
const CHANNEL_PREFIX: &'static str;
const DEFAULT_TTL: Duration;
type Key: Hash + Eq + Clone + std::fmt::Debug + Send + Sync + Serialize + DeserializeOwned;
type Payload: Clone + std::fmt::Debug + Send + Sync + Serialize + DeserializeOwned;
type Query: Send + Sync;
type Result: Send + Sync;
type Index: FoldIndex<Self>;
fn key_for(node_id: NodeId, payload: &Self::Payload) -> Self::Key;
fn build_index() -> Self::Index;
fn merge(
existing: Option<&FoldEntry<Self>>,
incoming: &SignedAnnouncement<Self::Payload>,
) -> MergeAction {
match existing {
None => MergeAction::Insert,
Some(e) if incoming.generation > e.generation => MergeAction::Replace,
_ => MergeAction::Reject,
}
}
fn query(state: &FoldState<Self>, index: &Self::Index, query: Self::Query) -> Self::Result;
fn audit_event(transition: EntryTransition<'_, Self>) -> Option<AuditEvent> {
let _ = transition;
None
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum AuditKind {
Created,
Replaced,
Rejected,
Evicted,
Expired,
Custom(&'static str),
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct AuditEvent {
pub kind: AuditKind,
pub key_repr: String,
pub detail: Option<String>,
}
pub struct Fold<K: FoldKind> {
state: Arc<RwLock<FoldState<K>>>,
index: Arc<RwLock<K::Index>>,
metrics: Arc<FoldMetrics>,
audit_sink: Arc<RwLock<Option<Arc<dyn FoldAuditSink>>>>,
sweep_handle: Option<tokio::task::JoinHandle<()>>,
}
impl<K: FoldKind> Fold<K> {
pub fn new() -> Self {
Self::with_sweep_interval(DEFAULT_SWEEP_INTERVAL)
}
pub fn with_sweep_interval(interval: Duration) -> Self {
let state = Arc::new(RwLock::new(FoldState::new()));
let index = Arc::new(RwLock::new(K::build_index()));
let metrics = Arc::new(FoldMetrics::new());
let audit_sink: Arc<RwLock<Option<Arc<dyn FoldAuditSink>>>> = Arc::new(RwLock::new(None));
let sweep_handle = if interval.is_zero() || tokio::runtime::Handle::try_current().is_err() {
None
} else {
Some(expiry::spawn_expiry_task::<K>(
Arc::downgrade(&state),
Arc::downgrade(&index),
Arc::downgrade(&metrics),
Arc::downgrade(&audit_sink),
interval,
))
};
Self {
state,
index,
metrics,
audit_sink,
sweep_handle,
}
}
pub fn apply(&self, ann: SignedAnnouncement<K::Payload>) -> Result<ApplyOutcome, FoldError> {
if ann.generation == 0 {
self.metrics.on_reject();
return Err(FoldError::InvalidGeneration {
node_id: ann.node_id,
});
}
let key = K::key_for(ann.node_id, &ann.payload);
let mut state = self.state.write();
let mut index = self.index.write();
let existing = state.entries.get(&key);
let action = K::merge(existing, &ann);
match action {
MergeAction::Insert => {
let entry = build_entry::<K>(&ann);
index.on_insert(&key, &entry.payload);
state
.by_node
.entry(ann.node_id)
.or_default()
.insert(key.clone());
let audit = K::audit_event(EntryTransition::Created {
key: &key,
new: &entry,
});
self.emit_audit(audit);
state.entries.insert(key, entry);
self.metrics.on_insert();
Ok(ApplyOutcome::Inserted)
}
MergeAction::Replace => {
let Some(old_entry) = state.entries.remove(&key) else {
self.metrics.on_reject();
return Ok(ApplyOutcome::Rejected);
};
if let Some(keys) = state.by_node.get_mut(&old_entry.node_id) {
keys.remove(&key);
if keys.is_empty() {
state.by_node.remove(&old_entry.node_id);
}
}
index.on_remove(&key, &old_entry.payload);
let new_entry = build_entry::<K>(&ann);
index.on_insert(&key, &new_entry.payload);
state
.by_node
.entry(ann.node_id)
.or_default()
.insert(key.clone());
let audit = K::audit_event(EntryTransition::Replaced {
key: &key,
old: &old_entry,
new: &new_entry,
});
self.emit_audit(audit);
state.entries.insert(key, new_entry);
self.metrics.on_replace();
Ok(ApplyOutcome::Replaced)
}
MergeAction::Reject => {
let audit = K::audit_event(EntryTransition::Rejected {
key: &key,
existing,
incoming: &ann,
});
self.emit_audit(audit);
self.metrics.on_reject();
Ok(ApplyOutcome::Rejected)
}
}
}
pub fn query(&self, q: K::Query) -> K::Result {
self.metrics.on_query();
let state = self.state.read();
let index = self.index.read();
K::query(&state, &index, q)
}
pub fn evict_node(&self, node_id: NodeId, reason: &str) {
let mut state = self.state.write();
let mut index = self.index.write();
let Some(keys) = state.by_node.remove(&node_id) else {
return;
};
for key in keys {
if let Some(old_entry) = state.entries.remove(&key) {
index.on_remove(&key, &old_entry.payload);
let audit = K::audit_event(EntryTransition::Evicted {
key: &key,
old: &old_entry,
reason,
});
self.emit_audit(audit);
self.metrics.on_evict();
}
}
}
pub fn snapshot(&self) -> FoldSnapshot<K> {
let state = self.state.read();
self.metrics.on_snapshot_taken();
FoldSnapshot::from_state(&state)
}
pub fn restore(&self, snap: FoldSnapshot<K>, force: bool) -> Result<(), FoldError> {
debug_assert!(
snap.kind == K::KIND_ID,
"FoldSnapshot::kind={} does not match K::KIND_ID={}",
snap.kind,
K::KIND_ID,
);
let mut state = self.state.write();
let mut index = self.index.write();
if !force && !state.entries.is_empty() {
return Err(FoldError::RestoreOverLiveState {
current_len: state.entries.len(),
});
}
state.entries.clear();
state.by_node.clear();
index.clear();
let anchor = Instant::now();
let now_unix_us = crate::adapter::net::current_timestamp_micros();
let elapsed_since_dump =
Duration::from_micros(now_unix_us.saturating_sub(snap.taken_at_unix_us));
for snap_entry in &snap.entries {
let Some(entry) =
FoldSnapshot::<K>::rehydrate_entry(snap_entry, anchor, elapsed_since_dump)
else {
continue;
};
let key = snap_entry.key.clone();
index.on_insert(&key, &entry.payload);
state
.by_node
.entry(entry.node_id)
.or_default()
.insert(key.clone());
state.entries.insert(key, entry);
}
let new_len = state.entries.len() as u64;
self.metrics.on_snapshot_restored(new_len);
Ok(())
}
pub fn metrics(&self) -> &FoldMetrics {
&self.metrics
}
pub fn stats(&self) -> metrics::FoldStats {
metrics::FoldStats {
kind: K::KIND_ID,
channel_prefix: K::CHANNEL_PREFIX.to_string(),
entries: self.metrics.entries(),
applies_inserted: self.metrics.applies_inserted(),
applies_replaced: self.metrics.applies_replaced(),
applies_rejected: self.metrics.applies_rejected(),
applies_total: self.metrics.applies_total(),
expiries: self.metrics.expiries(),
evictions: self.metrics.evictions(),
queries: self.metrics.queries(),
snapshots_taken: self.metrics.snapshots_taken(),
snapshots_restored: self.metrics.snapshots_restored(),
has_audit_sink: self.has_audit_sink(),
}
}
pub fn with_state<R>(&self, f: impl FnOnce(&FoldState<K>) -> R) -> R {
let state = self.state.read();
f(&state)
}
pub fn set_audit_sink(&self, sink: Option<Arc<dyn FoldAuditSink>>) {
*self.audit_sink.write() = sink;
}
pub fn has_audit_sink(&self) -> bool {
self.audit_sink.read().is_some()
}
pub fn sweep_expired_now(&self) -> usize {
let sink_holder = self.audit_sink.clone();
let sink_guard = sink_holder.read();
let sink_ref = sink_guard.as_ref();
expiry::sweep_expired::<K>(&self.state, &self.index, &self.metrics, sink_ref)
}
#[inline]
fn emit_audit(&self, event: Option<AuditEvent>) {
let Some(event) = event else {
return;
};
if let Some(sink) = self.audit_sink.read().as_ref() {
sink.record(event);
}
}
}
impl<K: FoldKind> Default for Fold<K> {
fn default() -> Self {
Self::new()
}
}
impl<K: FoldKind> Drop for Fold<K> {
fn drop(&mut self) {
if let Some(handle) = self.sweep_handle.take() {
handle.abort();
}
}
}
fn build_entry<K: FoldKind>(ann: &SignedAnnouncement<K::Payload>) -> FoldEntry<K> {
let now = Instant::now();
let ttl = ann
.ttl_secs
.map(|s| Duration::from_secs(s as u64))
.unwrap_or(K::DEFAULT_TTL);
FoldEntry {
payload: ann.payload.clone(),
node_id: ann.node_id,
generation: ann.generation,
received_at: now,
expires_at: now.checked_add(ttl).unwrap_or(now),
}
}