feagi_io/sensory_intake.rs
1//! Transport-agnostic sensory intake.
2//!
3//! Receives FeagiByteContainer-format bytes from any transport (ZMQ, WebSocket, SHM, etc.)
4//! and exposes them for consumption by the burst engine. FEAGI core does not depend on
5//! a specific transport; producers push bytes here and the burst engine polls.
6
7use std::collections::VecDeque;
8use std::sync::Mutex;
9
10/// Thread-safe queue of sensory payloads (FeagiByteContainer bytes).
11/// Any transport (ZMQ, WebSocket, SHM) pushes here; burst engine polls.
12#[derive(Default)]
13pub struct SensoryIntakeQueue {
14 inner: Mutex<VecDeque<Vec<u8>>>,
15}
16
17impl SensoryIntakeQueue {
18 /// Create an empty queue.
19 pub fn new() -> Self {
20 Self {
21 inner: Mutex::new(VecDeque::new()),
22 }
23 }
24
25 /// Push a sensory payload (call from transport layer when data is received).
26 ///
27 /// Latest-wins semantics are enforced to avoid unbounded stale-frame buildup
28 /// when producer rate exceeds burst-consumer rate.
29 pub fn push(&self, bytes: Vec<u8>) {
30 if let Ok(mut q) = self.inner.lock() {
31 q.clear();
32 q.push_back(bytes);
33 }
34 }
35
36 /// Take the next payload if any (called by burst engine each burst).
37 pub fn poll_next(&self) -> Option<Vec<u8>> {
38 self.inner.lock().ok().and_then(|mut q| q.pop_front())
39 }
40}