deepstrike-core 0.2.33

Cross-language agent runtime kernel — pure computation, zero I/O
Documentation
use std::collections::HashSet;

use compact_str::CompactString;

use super::attention::UrgencyBasedPolicy;
use super::queue::SignalQueue;
use crate::types::policy::{AttentionPolicy, SignalDisposition};
use crate::types::signal::RuntimeSignal;

/// High-performance signal router with deduplication and priority queuing.
pub struct SignalRouter {
    seen: HashSet<CompactString>,
    queue: SignalQueue,
    attention: Box<dyn AttentionPolicy>,
}

impl SignalRouter {
    pub fn new(max_queue_size: usize) -> Self {
        Self::with_policy(max_queue_size, Box::new(UrgencyBasedPolicy))
    }

    pub fn with_policy(max_queue_size: usize, policy: Box<dyn AttentionPolicy>) -> Self {
        Self {
            seen: HashSet::new(),
            queue: SignalQueue::new(max_queue_size),
            attention: policy,
        }
    }

    /// Ingest a signal. Returns the disposition after dedup + attention evaluation.
    /// `Queue` dispositions are buffered; if the queue is full, returns `Dropped`
    /// so the SDK can apply backpressure or surface the loss to telemetry.
    /// All other dispositions are returned directly to the caller.
    pub fn ingest(&mut self, signal: RuntimeSignal, is_running: bool) -> SignalDisposition {
        if let Some(ref key) = signal.dedupe_key {
            if !self.seen.insert(key.clone()) {
                return SignalDisposition::Ignore;
            }
        }

        let disposition = self.attention.evaluate(&signal, is_running);

        if disposition == SignalDisposition::Queue {
            if !self.queue.push(signal) {
                return SignalDisposition::Dropped;
            }
        }

        disposition
    }

    /// Pull next queued signal.
    pub fn next(&mut self) -> Option<RuntimeSignal> {
        self.queue.pop()
    }

    /// Pull the next queued signal visible to `recipient` (broadcasts plus signals
    /// addressed to it); other recipients' signals stay queued. `None` ⇒ no filter.
    pub fn next_for(&mut self, recipient: Option<&str>) -> Option<RuntimeSignal> {
        self.queue.pop_for(recipient)
    }

    /// Number of queued signals.
    pub fn depth(&self) -> usize {
        self.queue.len()
    }

    /// Clear the dedup set (call at session boundaries to prevent unbounded growth).
    pub fn clear_dedup(&mut self) {
        self.seen.clear();
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::types::signal::{SignalSource, SignalType, Urgency};

    #[test]
    fn deduplicates_signals() {
        let mut router = SignalRouter::new(100);
        let sig = RuntimeSignal::new(
            SignalSource::Cron,
            SignalType::Event,
            Urgency::Normal,
            "tick",
        )
        .with_dedupe("cron-tick-1");

        let d1 = router.ingest(sig.clone(), false);
        assert_ne!(d1, SignalDisposition::Ignore);

        let d2 = router.ingest(sig, false);
        assert_eq!(d2, SignalDisposition::Ignore);
    }

    #[test]
    fn normal_signal_queued() {
        let mut router = SignalRouter::new(100);
        let sig = RuntimeSignal::new(
            SignalSource::Cron,
            SignalType::Event,
            Urgency::Normal,
            "job",
        );

        let d = router.ingest(sig, false);
        assert_eq!(d, SignalDisposition::Queue);
        assert_eq!(router.depth(), 1);
        assert!(router.next().is_some());
    }

    #[test]
    fn interrupt_signals_not_queued() {
        let mut router = SignalRouter::new(100);
        let sig = RuntimeSignal::new(
            SignalSource::Gateway,
            SignalType::Alert,
            Urgency::Critical,
            "fire",
        );

        let d = router.ingest(sig, true);
        assert_eq!(d, SignalDisposition::InterruptNow);
        assert_eq!(router.depth(), 0);
    }

    #[test]
    fn full_queue_drops_signal() {
        let mut router = SignalRouter::new(1);
        let s1 = RuntimeSignal::new(
            SignalSource::Cron,
            SignalType::Event,
            Urgency::Normal,
            "first",
        );
        let s2 = RuntimeSignal::new(
            SignalSource::Cron,
            SignalType::Event,
            Urgency::Normal,
            "second",
        );

        assert_eq!(router.ingest(s1, false), SignalDisposition::Queue);
        assert_eq!(router.ingest(s2, false), SignalDisposition::Dropped);
    }

    #[test]
    fn next_for_drains_recipient_plus_broadcast_only() {
        let mut router = SignalRouter::new(100);
        let to_a = RuntimeSignal::new(SignalSource::Gateway, SignalType::Event, Urgency::Normal, "a")
            .with_recipient("sess-a")
            .with_timestamp(1);
        let to_b = RuntimeSignal::new(SignalSource::Gateway, SignalType::Event, Urgency::Normal, "b")
            .with_recipient("sess-b")
            .with_timestamp(2);
        let broadcast =
            RuntimeSignal::new(SignalSource::Cron, SignalType::Job, Urgency::Normal, "all")
                .with_timestamp(3);
        router.ingest(to_a, false);
        router.ingest(to_b, false);
        router.ingest(broadcast, false);

        // Puller "sess-a" sees only its own + the broadcast, never sess-b's.
        let first = router.next_for(Some("sess-a")).unwrap();
        let second = router.next_for(Some("sess-a")).unwrap();
        let mut got: Vec<_> = [first.summary.as_str(), second.summary.as_str()]
            .iter()
            .map(|s| s.to_string())
            .collect();
        got.sort();
        assert_eq!(got, vec!["a".to_string(), "all".to_string()]);
        assert!(router.next_for(Some("sess-a")).is_none());

        // sess-b's signal is still queued for its own puller.
        assert_eq!(router.next_for(Some("sess-b")).unwrap().summary.as_str(), "b");
    }

    #[test]
    fn next_for_none_drains_anything() {
        let mut router = SignalRouter::new(100);
        router.ingest(
            RuntimeSignal::new(SignalSource::Gateway, SignalType::Event, Urgency::Normal, "x")
                .with_recipient("sess-a"),
            false,
        );
        // No recipient filter ⇒ back-compat: drains the addressed signal too.
        assert_eq!(router.next_for(None).unwrap().summary.as_str(), "x");
    }

    #[test]
    fn clear_dedup_allows_reingest() {
        let mut router = SignalRouter::new(100);
        let sig = RuntimeSignal::new(
            SignalSource::Cron,
            SignalType::Event,
            Urgency::Normal,
            "tick",
        )
        .with_dedupe("key-1");

        router.ingest(sig.clone(), false);
        assert_eq!(router.ingest(sig.clone(), false), SignalDisposition::Ignore);

        router.clear_dedup();
        assert_ne!(router.ingest(sig, false), SignalDisposition::Ignore);
    }
}