use std::time::Duration;
use serde::{Deserialize, Serialize};
use crate::adapter::net::current_timestamp_micros;
use super::state::{FoldEntry, FoldState, MergeAction, NoIndex, NodeId};
use super::{FoldKind, SignedAnnouncement};
pub type ResourceId = u64;
pub type JobId = u64;
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub enum ReservationState {
Free,
Reserved {
holder: NodeId,
until_unix_us: u64,
},
Active {
holder: NodeId,
job_id: JobId,
},
}
impl ReservationState {
pub fn holder(&self) -> Option<NodeId> {
match self {
ReservationState::Free => None,
ReservationState::Reserved { holder, .. } => Some(*holder),
ReservationState::Active { holder, .. } => Some(*holder),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct ReservationAnnouncement {
pub resource_id: ResourceId,
pub state: ReservationState,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ReservationQuery {
AllFree,
HeldBy(NodeId),
State(ResourceId),
AllActive,
}
pub type ReservationRow = (ResourceId, ReservationState);
#[derive(Debug)]
pub struct ReservationFold;
impl FoldKind for ReservationFold {
const KIND_ID: u16 = 3;
const CHANNEL_PREFIX: &'static str = "fold:res:";
const DEFAULT_TTL: Duration = Duration::from_secs(30);
type Key = ResourceId;
type Payload = ReservationAnnouncement;
type Query = ReservationQuery;
type Result = Vec<ReservationRow>;
type Index = NoIndex;
fn key_for(_publisher: NodeId, payload: &Self::Payload) -> Self::Key {
payload.resource_id
}
fn build_index() -> NoIndex {
NoIndex
}
fn merge(
existing: Option<&FoldEntry<Self>>,
incoming: &SignedAnnouncement<Self::Payload>,
) -> MergeAction {
let Some(entry) = existing else {
return MergeAction::Insert;
};
let publisher = incoming.node_id;
let same_publisher = entry.node_id == publisher;
if same_publisher {
if incoming.generation <= entry.generation {
return MergeAction::Reject;
}
return if legal_same_publisher(&entry.payload.state, &incoming.payload.state, publisher)
{
MergeAction::Replace
} else {
MergeAction::Reject
};
}
match (&entry.payload.state, &incoming.payload.state) {
(ReservationState::Free, _) => MergeAction::Replace,
(
ReservationState::Reserved {
until_unix_us: deadline,
..
},
ReservationState::Reserved {
holder: new_holder, ..
},
) => {
if *new_holder == publisher && reservation_expired(*deadline) {
MergeAction::Replace
} else {
MergeAction::Reject
}
}
(ReservationState::Active { .. }, _) => MergeAction::Reject,
(ReservationState::Reserved { .. }, _) => MergeAction::Reject,
}
}
fn query(
state: &FoldState<Self>,
_index: &NoIndex,
query: ReservationQuery,
) -> Vec<ReservationRow> {
match query {
ReservationQuery::AllFree => state
.entries
.iter()
.filter(|(_, e)| matches!(e.payload.state, ReservationState::Free))
.map(|(k, e)| (*k, e.payload.state.clone()))
.collect(),
ReservationQuery::AllActive => state
.entries
.iter()
.filter(|(_, e)| matches!(e.payload.state, ReservationState::Active { .. }))
.map(|(k, e)| (*k, e.payload.state.clone()))
.collect(),
ReservationQuery::HeldBy(node_id) => state
.entries
.iter()
.filter(|(_, e)| e.payload.state.holder() == Some(node_id))
.map(|(k, e)| (*k, e.payload.state.clone()))
.collect(),
ReservationQuery::State(resource_id) => state
.entries
.get(&resource_id)
.map(|e| vec![(resource_id, e.payload.state.clone())])
.unwrap_or_default(),
}
}
}
fn legal_same_publisher(from: &ReservationState, to: &ReservationState, publisher: NodeId) -> bool {
let same_holder = |s: &ReservationState| match s.holder() {
None => true,
Some(h) => h == publisher,
};
if !same_holder(to) {
return false;
}
use ReservationState::*;
match (from, to) {
(Free, Free) => false,
(Free, Reserved { .. }) | (Free, Active { .. }) => true,
(Reserved { .. }, _) => true,
(Active { .. }, Free) => true,
(Active { .. }, Active { .. }) => true,
(Active { .. }, Reserved { .. }) => false,
}
}
fn reservation_expired(until_unix_us: u64) -> bool {
current_timestamp_micros() >= until_unix_us
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use std::time::Duration;
use super::*;
use crate::adapter::net::behavior::fold::{
ApplyOutcome, EnvelopeMeta, Fold, FoldRegistry, SignedAnnouncement,
};
use crate::adapter::net::identity::EntityKeypair;
fn sign_res(
keypair: &EntityKeypair,
node_id: NodeId,
generation: u64,
resource_id: ResourceId,
state: ReservationState,
) -> SignedAnnouncement<ReservationAnnouncement> {
SignedAnnouncement::sign(
keypair,
ReservationFold::KIND_ID,
0, node_id,
generation,
EnvelopeMeta::default(),
ReservationAnnouncement { resource_id, state },
)
.expect("sign succeeds")
}
fn new_fold() -> Fold<ReservationFold> {
Fold::with_sweep_interval(Duration::ZERO)
}
fn fresh_deadline_us() -> u64 {
current_timestamp_micros() + 60_000_000
}
fn expired_deadline_us() -> u64 {
current_timestamp_micros().saturating_sub(60_000_000)
}
#[test]
fn first_announcement_installs_regardless_of_state() {
let fold = new_fold();
let kp = EntityKeypair::generate();
for (i, state) in [
ReservationState::Free,
ReservationState::Reserved {
holder: 0xA,
until_unix_us: fresh_deadline_us(),
},
ReservationState::Active {
holder: 0xA,
job_id: 7,
},
]
.into_iter()
.enumerate()
{
let outcome = fold
.apply(sign_res(&kp, 0xA, 1, i as u64, state))
.expect("apply");
assert_eq!(outcome, ApplyOutcome::Inserted);
}
assert_eq!(fold.metrics().applies_inserted(), 3);
}
#[test]
fn holder_can_reserve_then_activate_then_release() {
let fold = new_fold();
let kp = EntityKeypair::generate();
let r = 0x99;
let outcome = fold
.apply(sign_res(
&kp,
0xA,
1,
r,
ReservationState::Reserved {
holder: 0xA,
until_unix_us: fresh_deadline_us(),
},
))
.unwrap();
assert_eq!(outcome, ApplyOutcome::Inserted);
let outcome = fold
.apply(sign_res(
&kp,
0xA,
2,
r,
ReservationState::Reserved {
holder: 0xA,
until_unix_us: fresh_deadline_us() + 10_000_000,
},
))
.unwrap();
assert_eq!(outcome, ApplyOutcome::Replaced);
let outcome = fold
.apply(sign_res(
&kp,
0xA,
3,
r,
ReservationState::Active {
holder: 0xA,
job_id: 42,
},
))
.unwrap();
assert_eq!(outcome, ApplyOutcome::Replaced);
let outcome = fold
.apply(sign_res(
&kp,
0xA,
4,
r,
ReservationState::Active {
holder: 0xA,
job_id: 43,
},
))
.unwrap();
assert_eq!(outcome, ApplyOutcome::Replaced);
let outcome = fold
.apply(sign_res(&kp, 0xA, 5, r, ReservationState::Free))
.unwrap();
assert_eq!(outcome, ApplyOutcome::Replaced);
let q = fold.query(ReservationQuery::State(r));
assert_eq!(q, vec![(r, ReservationState::Free)]);
}
#[test]
fn holder_cannot_transition_active_back_to_reserved() {
let fold = new_fold();
let kp = EntityKeypair::generate();
let r = 0x77;
fold.apply(sign_res(
&kp,
0xA,
1,
r,
ReservationState::Active {
holder: 0xA,
job_id: 1,
},
))
.unwrap();
let outcome = fold
.apply(sign_res(
&kp,
0xA,
2,
r,
ReservationState::Reserved {
holder: 0xA,
until_unix_us: fresh_deadline_us(),
},
))
.unwrap();
assert_eq!(outcome, ApplyOutcome::Rejected);
let q = fold.query(ReservationQuery::State(r));
assert_eq!(
q,
vec![(
r,
ReservationState::Active {
holder: 0xA,
job_id: 1
}
)]
);
}
#[test]
fn publisher_cannot_install_state_naming_a_different_holder() {
let fold = new_fold();
let kp = EntityKeypair::generate();
let r = 0x55;
let outcome = fold
.apply(sign_res(
&kp,
0xA,
1,
r,
ReservationState::Reserved {
holder: 0xB,
until_unix_us: fresh_deadline_us(),
},
))
.unwrap();
assert_eq!(outcome, ApplyOutcome::Inserted);
let outcome = fold
.apply(sign_res(
&kp,
0xA,
2,
r,
ReservationState::Reserved {
holder: 0xC,
until_unix_us: fresh_deadline_us(),
},
))
.unwrap();
assert_eq!(outcome, ApplyOutcome::Rejected);
}
#[test]
fn stale_generation_from_same_publisher_is_rejected() {
let fold = new_fold();
let kp = EntityKeypair::generate();
let r = 0x33;
fold.apply(sign_res(
&kp,
0xA,
5,
r,
ReservationState::Reserved {
holder: 0xA,
until_unix_us: fresh_deadline_us(),
},
))
.unwrap();
let outcome = fold
.apply(sign_res(
&kp,
0xA,
5,
r,
ReservationState::Reserved {
holder: 0xA,
until_unix_us: fresh_deadline_us() + 100,
},
))
.unwrap();
assert_eq!(outcome, ApplyOutcome::Rejected);
let outcome = fold
.apply(sign_res(&kp, 0xA, 4, r, ReservationState::Free))
.unwrap();
assert_eq!(outcome, ApplyOutcome::Rejected);
}
#[test]
fn foreign_publisher_can_claim_a_free_resource() {
let fold = new_fold();
let kp_a = EntityKeypair::generate();
let kp_b = EntityKeypair::generate();
let r = 0x11;
let deadline = fresh_deadline_us();
fold.apply(sign_res(&kp_a, 0xA, 1, r, ReservationState::Free))
.unwrap();
let outcome = fold
.apply(sign_res(
&kp_b,
0xB,
1,
r,
ReservationState::Reserved {
holder: 0xB,
until_unix_us: deadline,
},
))
.unwrap();
assert_eq!(outcome, ApplyOutcome::Replaced);
let q = fold.query(ReservationQuery::State(r));
assert_eq!(
q[0].1,
ReservationState::Reserved {
holder: 0xB,
until_unix_us: deadline,
}
);
}
#[test]
fn foreign_publisher_cannot_steal_a_fresh_reservation() {
let fold = new_fold();
let kp_a = EntityKeypair::generate();
let kp_b = EntityKeypair::generate();
let r = 0x22;
fold.apply(sign_res(
&kp_a,
0xA,
1,
r,
ReservationState::Reserved {
holder: 0xA,
until_unix_us: fresh_deadline_us(),
},
))
.unwrap();
let outcome = fold
.apply(sign_res(
&kp_b,
0xB,
1,
r,
ReservationState::Reserved {
holder: 0xB,
until_unix_us: fresh_deadline_us(),
},
))
.unwrap();
assert_eq!(outcome, ApplyOutcome::Rejected);
let q = fold.query(ReservationQuery::State(r));
assert_eq!(q[0].1.holder(), Some(0xA));
}
#[test]
fn foreign_publisher_can_take_over_an_expired_reservation() {
let fold = new_fold();
let kp_a = EntityKeypair::generate();
let kp_b = EntityKeypair::generate();
let r = 0x44;
fold.apply(sign_res(
&kp_a,
0xA,
1,
r,
ReservationState::Reserved {
holder: 0xA,
until_unix_us: expired_deadline_us(),
},
))
.unwrap();
let outcome = fold
.apply(sign_res(
&kp_b,
0xB,
1,
r,
ReservationState::Reserved {
holder: 0xB,
until_unix_us: fresh_deadline_us(),
},
))
.unwrap();
assert_eq!(outcome, ApplyOutcome::Replaced);
let q = fold.query(ReservationQuery::State(r));
assert_eq!(q[0].1.holder(), Some(0xB));
}
#[test]
fn foreign_publisher_cannot_release_someone_elses_reservation() {
let fold = new_fold();
let kp_a = EntityKeypair::generate();
let kp_b = EntityKeypair::generate();
let r = 0x66;
fold.apply(sign_res(
&kp_a,
0xA,
1,
r,
ReservationState::Reserved {
holder: 0xA,
until_unix_us: fresh_deadline_us(),
},
))
.unwrap();
let outcome = fold
.apply(sign_res(&kp_b, 0xB, 1, r, ReservationState::Free))
.unwrap();
assert_eq!(outcome, ApplyOutcome::Rejected);
}
#[test]
fn foreign_publisher_cannot_change_active_state() {
let fold = new_fold();
let kp_a = EntityKeypair::generate();
let kp_b = EntityKeypair::generate();
let r = 0x88;
fold.apply(sign_res(
&kp_a,
0xA,
1,
r,
ReservationState::Active {
holder: 0xA,
job_id: 7,
},
))
.unwrap();
let outcome = fold
.apply(sign_res(&kp_b, 0xB, 1, r, ReservationState::Free))
.unwrap();
assert_eq!(outcome, ApplyOutcome::Rejected);
let outcome = fold
.apply(sign_res(
&kp_b,
0xB,
1,
r,
ReservationState::Reserved {
holder: 0xB,
until_unix_us: fresh_deadline_us(),
},
))
.unwrap();
assert_eq!(outcome, ApplyOutcome::Rejected);
}
#[test]
fn first_claim_wins_on_concurrent_reservation() {
let fold = new_fold();
let kp_a = EntityKeypair::generate();
let kp_b = EntityKeypair::generate();
let r = 0xAA;
fold.apply(sign_res(&kp_a, 0xA, 1, r, ReservationState::Free))
.unwrap();
let outcome_a = fold
.apply(sign_res(
&kp_a,
0xA,
2,
r,
ReservationState::Reserved {
holder: 0xA,
until_unix_us: fresh_deadline_us(),
},
))
.unwrap();
assert_eq!(outcome_a, ApplyOutcome::Replaced);
let outcome_b = fold
.apply(sign_res(
&kp_b,
0xB,
1,
r,
ReservationState::Reserved {
holder: 0xB,
until_unix_us: fresh_deadline_us(),
},
))
.unwrap();
assert_eq!(outcome_b, ApplyOutcome::Rejected);
}
#[test]
fn runtime_ttl_sweeps_stale_reservation_entries() {
let fold = new_fold();
let kp = EntityKeypair::generate();
let r = 0xCC;
let ann = SignedAnnouncement::sign(
&kp,
ReservationFold::KIND_ID,
0,
0xA,
1,
EnvelopeMeta {
ttl_secs: Some(0),
..Default::default()
},
ReservationAnnouncement {
resource_id: r,
state: ReservationState::Reserved {
holder: 0xA,
until_unix_us: fresh_deadline_us(),
},
},
)
.unwrap();
fold.apply(ann).unwrap();
assert_eq!(fold.metrics().entries(), 1);
std::thread::sleep(Duration::from_millis(10));
let n = fold.sweep_expired_now();
assert_eq!(n, 1);
assert_eq!(fold.metrics().entries(), 0);
assert_eq!(fold.metrics().expiries(), 1);
}
#[test]
fn query_returns_only_free_resources_for_all_free() {
let fold = new_fold();
let kp = EntityKeypair::generate();
fold.apply(sign_res(&kp, 0xA, 1, 1, ReservationState::Free))
.unwrap();
fold.apply(sign_res(
&kp,
0xA,
1,
2,
ReservationState::Reserved {
holder: 0xA,
until_unix_us: fresh_deadline_us(),
},
))
.unwrap();
fold.apply(sign_res(
&kp,
0xA,
1,
3,
ReservationState::Active {
holder: 0xA,
job_id: 7,
},
))
.unwrap();
fold.apply(sign_res(&kp, 0xA, 1, 4, ReservationState::Free))
.unwrap();
let free: Vec<_> = fold
.query(ReservationQuery::AllFree)
.into_iter()
.map(|(id, _)| id)
.collect::<std::collections::HashSet<_>>()
.into_iter()
.collect();
let mut sorted = free;
sorted.sort();
assert_eq!(sorted, vec![1, 4]);
}
#[test]
fn query_returns_only_active_resources_for_all_active() {
let fold = new_fold();
let kp = EntityKeypair::generate();
fold.apply(sign_res(&kp, 0xA, 1, 1, ReservationState::Free))
.unwrap();
fold.apply(sign_res(
&kp,
0xA,
1,
2,
ReservationState::Active {
holder: 0xA,
job_id: 7,
},
))
.unwrap();
fold.apply(sign_res(
&kp,
0xA,
1,
3,
ReservationState::Active {
holder: 0xA,
job_id: 8,
},
))
.unwrap();
let active_ids: std::collections::HashSet<_> = fold
.query(ReservationQuery::AllActive)
.into_iter()
.map(|(id, _)| id)
.collect();
assert_eq!(active_ids, [2, 3].into_iter().collect());
}
#[test]
fn query_held_by_finds_reserved_and_active_resources() {
let fold = new_fold();
let kp_a = EntityKeypair::generate();
let kp_b = EntityKeypair::generate();
fold.apply(sign_res(
&kp_a,
0xA,
1,
1,
ReservationState::Reserved {
holder: 0xA,
until_unix_us: fresh_deadline_us(),
},
))
.unwrap();
fold.apply(sign_res(
&kp_a,
0xA,
1,
2,
ReservationState::Active {
holder: 0xA,
job_id: 7,
},
))
.unwrap();
fold.apply(sign_res(
&kp_b,
0xB,
1,
3,
ReservationState::Reserved {
holder: 0xB,
until_unix_us: fresh_deadline_us(),
},
))
.unwrap();
let held_by_a: std::collections::HashSet<_> = fold
.query(ReservationQuery::HeldBy(0xA))
.into_iter()
.map(|(id, _)| id)
.collect();
assert_eq!(held_by_a, [1, 2].into_iter().collect());
let held_by_b: Vec<_> = fold
.query(ReservationQuery::HeldBy(0xB))
.into_iter()
.map(|(id, _)| id)
.collect();
assert_eq!(held_by_b, vec![3]);
}
#[test]
fn reservation_fold_plugs_into_registry_and_dispatches_signed_envelopes() {
let registry = FoldRegistry::new();
let fold: Arc<Fold<ReservationFold>> = Arc::new(new_fold());
registry.register(fold.clone());
let kp = EntityKeypair::generate();
let ann = sign_res(
&kp,
0xA,
1,
0x123,
ReservationState::Reserved {
holder: 0xA,
until_unix_us: fresh_deadline_us(),
},
);
let bytes = ann.encode().expect("encode");
let outcome = registry.dispatch(&bytes, kp.entity_id()).expect("dispatch");
assert_eq!(outcome, ApplyOutcome::Inserted);
}
}