Skip to main content

feagi_io/
sensory_intake.rs

1//! Transport-agnostic sensory intake.
2//!
3//! Receives FeagiByteContainer-format bytes from any transport (ZMQ, WebSocket, SHM, etc.)
4//! and exposes them for consumption by the burst engine. FEAGI core does not depend on
5//! a specific transport; producers push bytes here and the burst engine polls.
6
7use std::collections::VecDeque;
8use std::sync::Mutex;
9
10/// Thread-safe queue of sensory payloads (FeagiByteContainer bytes).
11/// Any transport (ZMQ, WebSocket, SHM) pushes here; burst engine polls.
12#[derive(Default)]
13pub struct SensoryIntakeQueue {
14    inner: Mutex<VecDeque<Vec<u8>>>,
15}
16
17impl SensoryIntakeQueue {
18    /// Create an empty queue.
19    pub fn new() -> Self {
20        Self {
21            inner: Mutex::new(VecDeque::new()),
22        }
23    }
24
25    /// Push a sensory payload (call from transport layer when data is received).
26    ///
27    /// Latest-wins semantics are enforced to avoid unbounded stale-frame buildup
28    /// when producer rate exceeds burst-consumer rate.
29    pub fn push(&self, bytes: Vec<u8>) {
30        if let Ok(mut q) = self.inner.lock() {
31            q.clear();
32            q.push_back(bytes);
33        }
34    }
35
36    /// Take the next payload if any (called by burst engine each burst).
37    pub fn poll_next(&self) -> Option<Vec<u8>> {
38        self.inner.lock().ok().and_then(|mut q| q.pop_front())
39    }
40}