actr-hyper 0.3.0

Hyper — Actor platform infrastructure: sandbox, transport, scheduler, WASM engine, signing, AIS bootstrap, persistence & crypto primitives
Documentation
use actr_protocol::ActrId;
use std::collections::HashMap;
use std::time::{Duration, Instant};

pub(crate) const DATA_STREAM_ACTIVITY_TTL: Duration = Duration::from_secs(30);

#[derive(Debug, Clone, PartialEq, Eq)]
pub(crate) struct DataStreamDeliveryUncertainNotice {
    pub(crate) stream_id: String,
    pub(crate) session_id: u64,
    pub(crate) reason: String,
}

#[derive(Debug, Clone)]
struct ActiveDataStream {
    last_updated_at: Instant,
    notified: bool,
}

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) enum DataStreamRecordState {
    Missing,
    Stale,
    Fresh,
}

#[derive(Debug)]
pub(crate) struct DataStreamActivityTracker {
    ttl: Duration,
    streams_by_peer: HashMap<ActrId, HashMap<String, HashMap<u64, ActiveDataStream>>>,
}

impl Default for DataStreamActivityTracker {
    fn default() -> Self {
        Self::new(DATA_STREAM_ACTIVITY_TTL)
    }
}

impl DataStreamActivityTracker {
    pub(crate) fn new(ttl: Duration) -> Self {
        Self {
            ttl,
            streams_by_peer: HashMap::new(),
        }
    }

    pub(crate) fn record_state(
        &self,
        peer_id: &ActrId,
        stream_id: &str,
        session_id: u64,
        now: Instant,
    ) -> DataStreamRecordState {
        let Some(stream) = self
            .streams_by_peer
            .get(peer_id)
            .and_then(|streams| streams.get(stream_id))
            .and_then(|sessions| sessions.get(&session_id))
        else {
            return DataStreamRecordState::Missing;
        };

        let refresh_interval = self.ttl.checked_div(2).unwrap_or(self.ttl);
        if now.duration_since(stream.last_updated_at) > refresh_interval {
            DataStreamRecordState::Stale
        } else {
            DataStreamRecordState::Fresh
        }
    }

    pub(crate) fn record_stream(
        &mut self,
        peer_id: &ActrId,
        stream_id: impl Into<String>,
        session_id: u64,
        now: Instant,
    ) {
        let stream_id = stream_id.into();
        let stream = self
            .streams_by_peer
            .entry(peer_id.clone())
            .or_default()
            .entry(stream_id)
            .or_default()
            .entry(session_id)
            .or_insert_with(|| ActiveDataStream {
                last_updated_at: now,
                notified: false,
            });

        stream.last_updated_at = now;
    }

    pub(crate) fn mark_delivery_uncertain(
        &mut self,
        peer_id: &ActrId,
        session_id: u64,
        reason: impl Into<String>,
        now: Instant,
    ) -> Vec<DataStreamDeliveryUncertainNotice> {
        self.prune_expired(now);

        let reason = reason.into();
        let Some(streams) = self.streams_by_peer.get_mut(peer_id) else {
            return Vec::new();
        };

        streams
            .iter_mut()
            .filter_map(|(stream_id, sessions)| {
                let stream = sessions.get_mut(&session_id)?;
                if stream.notified {
                    return None;
                }
                stream.notified = true;

                Some(DataStreamDeliveryUncertainNotice {
                    stream_id: stream_id.clone(),
                    session_id,
                    reason: reason.clone(),
                })
            })
            .collect()
    }

    #[cfg(test)]
    fn remove_stream(&mut self, peer_id: &ActrId, stream_id: &str) {
        let should_remove_peer = if let Some(streams) = self.streams_by_peer.get_mut(peer_id) {
            streams.remove(stream_id);
            streams.is_empty()
        } else {
            false
        };

        if should_remove_peer {
            self.streams_by_peer.remove(peer_id);
        }
    }

    pub(crate) fn remove_stream_session(
        &mut self,
        peer_id: &ActrId,
        stream_id: &str,
        session_id: u64,
    ) {
        let should_remove_peer = if let Some(streams) = self.streams_by_peer.get_mut(peer_id) {
            let should_remove_stream = if let Some(sessions) = streams.get_mut(stream_id) {
                sessions.remove(&session_id);
                sessions.is_empty()
            } else {
                false
            };

            if should_remove_stream {
                streams.remove(stream_id);
            }

            streams.is_empty()
        } else {
            false
        };

        if should_remove_peer {
            self.streams_by_peer.remove(peer_id);
        }
    }

    fn prune_expired(&mut self, now: Instant) {
        let ttl = self.ttl;
        self.streams_by_peer.retain(|_, streams| {
            streams.retain(|_, sessions| {
                sessions.retain(|_, stream| now.duration_since(stream.last_updated_at) <= ttl);
                !sessions.is_empty()
            });
            !streams.is_empty()
        });
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use actr_protocol::{ActrType, Realm};

    fn actr_id(serial_number: u64) -> ActrId {
        ActrId {
            realm: Realm { realm_id: 1 },
            serial_number,
            r#type: ActrType {
                manufacturer: "acme".to_string(),
                name: "node".to_string(),
                version: "1.0.0".to_string(),
            },
        }
    }

    #[test]
    fn records_stream_once_per_session() {
        let peer = actr_id(100);
        let now = Instant::now();
        let mut tracker = DataStreamActivityTracker::new(Duration::from_secs(30));

        assert_eq!(
            tracker.record_state(&peer, "stream-a", 42, now),
            DataStreamRecordState::Missing
        );
        tracker.record_stream(&peer, "stream-a", 42, now);
        assert_eq!(
            tracker.record_state(&peer, "stream-a", 42, now + Duration::from_secs(1)),
            DataStreamRecordState::Fresh
        );
        assert_eq!(
            tracker.record_state(&peer, "stream-a", 42, now + Duration::from_secs(16)),
            DataStreamRecordState::Stale
        );

        let notices = tracker.mark_delivery_uncertain(
            &peer,
            42,
            "data channel closed",
            now + Duration::from_secs(2),
        );

        assert_eq!(notices.len(), 1);
        assert_eq!(notices[0].stream_id, "stream-a");
        assert_eq!(notices[0].session_id, 42);
    }

    #[test]
    fn tracks_multiple_streams_for_same_peer() {
        let peer = actr_id(100);
        let now = Instant::now();
        let mut tracker = DataStreamActivityTracker::new(Duration::from_secs(30));

        tracker.record_stream(&peer, "stream-a", 42, now);
        tracker.record_stream(&peer, "stream-b", 42, now);

        let mut notices = tracker.mark_delivery_uncertain(&peer, 42, "webrtc disconnected", now);
        notices.sort_by(|a, b| a.stream_id.cmp(&b.stream_id));

        assert_eq!(notices.len(), 2);
        assert_eq!(notices[0].stream_id, "stream-a");
        assert_eq!(notices[1].stream_id, "stream-b");
    }

    #[test]
    fn expires_streams_outside_ttl() {
        let peer = actr_id(100);
        let now = Instant::now();
        let mut tracker = DataStreamActivityTracker::new(Duration::from_secs(5));

        tracker.record_stream(&peer, "stream-a", 42, now);

        let notices =
            tracker.mark_delivery_uncertain(&peer, 42, "late close", now + Duration::from_secs(6));

        assert!(notices.is_empty());
    }

    #[test]
    fn filters_and_deduplicates_by_session() {
        let peer = actr_id(100);
        let now = Instant::now();
        let mut tracker = DataStreamActivityTracker::new(Duration::from_secs(30));

        tracker.record_stream(&peer, "stream-a", 42, now);
        let first = tracker.mark_delivery_uncertain(&peer, 42, "state disconnected", now);
        let duplicate = tracker.mark_delivery_uncertain(&peer, 42, "data channel closed", now);
        let stale_session = tracker.mark_delivery_uncertain(&peer, 43, "stale close", now);

        tracker.record_stream(&peer, "stream-a", 43, now + Duration::from_secs(1));
        let next_session = tracker.mark_delivery_uncertain(
            &peer,
            43,
            "new session failed",
            now + Duration::from_secs(2),
        );

        assert_eq!(first.len(), 1);
        assert!(duplicate.is_empty());
        assert!(stale_session.is_empty());
        assert_eq!(next_session.len(), 1);
        assert_eq!(next_session[0].session_id, 43);
    }

    #[test]
    fn keeps_same_stream_records_for_overlapping_sessions() {
        let peer = actr_id(100);
        let now = Instant::now();
        let mut tracker = DataStreamActivityTracker::new(Duration::from_secs(30));

        tracker.record_stream(&peer, "stream-a", 42, now);
        tracker.record_stream(&peer, "stream-a", 43, now + Duration::from_secs(1));

        let old_session = tracker.mark_delivery_uncertain(
            &peer,
            42,
            "old data channel closed late",
            now + Duration::from_secs(2),
        );
        let new_session = tracker.mark_delivery_uncertain(
            &peer,
            43,
            "new data channel closed",
            now + Duration::from_secs(3),
        );

        assert_eq!(old_session.len(), 1);
        assert_eq!(old_session[0].session_id, 42);
        assert_eq!(new_session.len(), 1);
        assert_eq!(new_session[0].session_id, 43);
    }

    #[test]
    fn remove_stream_drops_failed_inflight_marker() {
        let peer = actr_id(100);
        let now = Instant::now();
        let mut tracker = DataStreamActivityTracker::new(Duration::from_secs(30));

        tracker.record_stream(&peer, "stream-a", 42, now);
        tracker.remove_stream(&peer, "stream-a");

        let notices = tracker.mark_delivery_uncertain(&peer, 42, "late close", now);
        assert!(notices.is_empty());
    }

    #[test]
    fn remove_stream_session_keeps_other_session_marker() {
        let peer = actr_id(100);
        let now = Instant::now();
        let mut tracker = DataStreamActivityTracker::new(Duration::from_secs(30));

        tracker.record_stream(&peer, "stream-a", 42, now);
        tracker.record_stream(&peer, "stream-a", 43, now + Duration::from_secs(1));
        tracker.remove_stream_session(&peer, "stream-a", 43);

        let old_session = tracker.mark_delivery_uncertain(
            &peer,
            42,
            "old data channel closed",
            now + Duration::from_secs(2),
        );
        let removed_session = tracker.mark_delivery_uncertain(
            &peer,
            43,
            "new send failed before reaching transport",
            now + Duration::from_secs(2),
        );

        assert_eq!(old_session.len(), 1);
        assert!(removed_session.is_empty());
    }
}