orbit-rs 0.1.0

Fleet-aware shared-memory rings over POSIX shared memory.
Documentation
use std::collections::BTreeMap;
use std::time::Duration;

use crate::NodeId;
use crate::fleet::Fleet;
use crate::fleet::heartbeat::record::{
    FLEET_HEARTBEAT_FRAME_KIND, FLEET_HEARTBEAT_RING_KIND, FleetHeartbeatRecord,
};
use crate::id::NetId64;
use crate::ring::Frame;
use crate::tick::OrbitEpoch;

#[derive(Clone, Debug, PartialEq, Eq)]
pub struct FleetHeartbeat {
    pub id: NetId64,
    pub node_id: NodeId,
    pub captured_at: OrbitEpoch,
}

impl FleetHeartbeat {
    pub fn counter(&self) -> u64 {
        self.id.counter()
    }

    pub fn age_at(&self, now: OrbitEpoch) -> Duration {
        self.captured_at.age_at(now)
    }

    pub fn is_fresh_at(&self, now: OrbitEpoch, max_age: Duration) -> bool {
        self.captured_at.is_fresh_at(now, max_age)
    }

    pub(crate) fn from_frame(frame: Frame) -> Option<Self> {
        if frame.id.kind() != FLEET_HEARTBEAT_RING_KIND {
            return None;
        }
        if frame.kind != FLEET_HEARTBEAT_FRAME_KIND {
            return None;
        }

        Some(Self {
            id: frame.id,
            node_id: NodeId::new(frame.id.node()),
            captured_at: OrbitEpoch::from_unix_ms(frame.ver),
        })
    }
}

pub(crate) fn latest_heartbeats(fleet: &Fleet) -> Vec<FleetHeartbeat> {
    let head = fleet.head::<FleetHeartbeatRecord>();
    if head == 0 {
        return Vec::new();
    }

    let capacity = fleet.ring_capacity::<FleetHeartbeatRecord>() as u64;
    let walk_count = head.min(capacity);
    let expected_nodes = fleet.fleet_size() as usize;
    let mut by_node = BTreeMap::new();

    for offset in 0..walk_count {
        let counter = head - 1 - offset;
        let Some(frame) = fleet.read_at::<FleetHeartbeatRecord>(counter) else {
            if counter == 0 {
                break;
            }
            continue;
        };
        if frame.id.counter() != counter {
            if counter == 0 {
                break;
            }
            continue;
        }
        let Some(sample) = FleetHeartbeat::from_frame(frame) else {
            if counter == 0 {
                break;
            }
            continue;
        };

        by_node.entry(sample.node_id.get()).or_insert(sample);
        if expected_nodes > 0 && by_node.len() >= expected_nodes {
            break;
        }
        if counter == 0 {
            break;
        }
    }

    by_node.into_values().collect()
}