Skip to main content

feagi_io/
sensory_intake.rs

1//! Transport-agnostic sensory intake.
2//!
3//! Receives FeagiByteContainer-format bytes from any transport (ZMQ, WebSocket, SHM, etc.)
4//! and exposes them for consumption by the burst engine. FEAGI core does not depend on
5//! a specific transport; producers push bytes here and the burst engine polls.
6
7use std::collections::VecDeque;
8use std::sync::Mutex;
9use std::time::Instant;
10
11const MAX_SENSORY_QUEUE_DEPTH: usize = 512;
12
13#[derive(Clone, Debug)]
14pub struct SensoryPacket {
15    pub bytes: Vec<u8>,
16    pub source_id: Option<String>,
17    pub received_at: Instant,
18}
19
20/// Thread-safe queue of sensory payloads (FeagiByteContainer bytes).
21/// Any transport (ZMQ, WebSocket, SHM) pushes here; burst engine polls.
22#[derive(Default)]
23pub struct SensoryIntakeQueue {
24    inner: Mutex<VecDeque<SensoryPacket>>,
25}
26
27impl SensoryIntakeQueue {
28    /// Create an empty queue.
29    pub fn new() -> Self {
30        Self {
31            inner: Mutex::new(VecDeque::new()),
32        }
33    }
34
35    /// Push a sensory payload (call from transport layer when data is received).
36    ///
37    /// Payloads are queued in arrival order with bounded memory.
38    /// If the queue reaches capacity, the oldest payload is dropped.
39    pub fn push(&self, bytes: Vec<u8>) {
40        self.push_with_source(bytes, None);
41    }
42
43    pub fn push_with_source(&self, bytes: Vec<u8>, source_id: Option<String>) {
44        if let Ok(mut q) = self.inner.lock() {
45            if q.len() >= MAX_SENSORY_QUEUE_DEPTH {
46                q.pop_front();
47            }
48            q.push_back(SensoryPacket {
49                bytes,
50                source_id,
51                received_at: Instant::now(),
52            });
53        }
54    }
55
56    /// Take the next payload if any (called by burst engine each burst).
57    pub fn poll_next(&self) -> Option<SensoryPacket> {
58        self.inner.lock().ok().and_then(|mut q| q.pop_front())
59    }
60
61    /// Drop all queued sensory payloads.
62    ///
63    /// Used by strict genome transitions to guarantee no stale pre-transition
64    /// sensory data can be consumed after a new genome is loaded.
65    pub fn clear(&self) {
66        if let Ok(mut q) = self.inner.lock() {
67            q.clear();
68        }
69    }
70}
71
72#[cfg(test)]
73mod tests {
74    use super::SensoryIntakeQueue;
75
76    #[test]
77    fn preserves_arrival_order() {
78        let q = SensoryIntakeQueue::new();
79        q.push(vec![1]);
80        q.push(vec![2]);
81        q.push(vec![3]);
82
83        assert_eq!(q.poll_next().map(|packet| packet.bytes), Some(vec![1]));
84        assert_eq!(q.poll_next().map(|packet| packet.bytes), Some(vec![2]));
85        assert_eq!(q.poll_next().map(|packet| packet.bytes), Some(vec![3]));
86        assert!(q.poll_next().is_none());
87    }
88
89    #[test]
90    fn drops_oldest_when_bounded_capacity_is_reached() {
91        let q = SensoryIntakeQueue::new();
92        for idx in 0..513u16 {
93            q.push(vec![idx as u8]);
94        }
95
96        // Queue depth is capped at 512, so the first element (0) is dropped.
97        assert_eq!(q.poll_next().map(|packet| packet.bytes), Some(vec![1]));
98    }
99
100    #[test]
101    fn preserves_source_metadata() {
102        let q = SensoryIntakeQueue::new();
103        q.push_with_source(vec![9], Some("agent-a".to_string()));
104
105        let packet = q.poll_next().expect("expected one packet");
106        assert_eq!(packet.bytes, vec![9]);
107        assert_eq!(packet.source_id.as_deref(), Some("agent-a"));
108    }
109}