feagi_io/
sensory_intake.rs1use std::collections::VecDeque;
8use std::sync::Mutex;
9use std::time::Instant;
10
11const MAX_SENSORY_QUEUE_DEPTH: usize = 512;
12
13#[derive(Clone, Debug)]
14pub struct SensoryPacket {
15 pub bytes: Vec<u8>,
16 pub source_id: Option<String>,
17 pub received_at: Instant,
18}
19
20#[derive(Default)]
23pub struct SensoryIntakeQueue {
24 inner: Mutex<VecDeque<SensoryPacket>>,
25}
26
27impl SensoryIntakeQueue {
28 pub fn new() -> Self {
30 Self {
31 inner: Mutex::new(VecDeque::new()),
32 }
33 }
34
35 pub fn push(&self, bytes: Vec<u8>) {
40 self.push_with_source(bytes, None);
41 }
42
43 pub fn push_with_source(&self, bytes: Vec<u8>, source_id: Option<String>) {
44 if let Ok(mut q) = self.inner.lock() {
45 if q.len() >= MAX_SENSORY_QUEUE_DEPTH {
46 q.pop_front();
47 }
48 q.push_back(SensoryPacket {
49 bytes,
50 source_id,
51 received_at: Instant::now(),
52 });
53 }
54 }
55
56 pub fn poll_next(&self) -> Option<SensoryPacket> {
58 self.inner.lock().ok().and_then(|mut q| q.pop_front())
59 }
60
61 pub fn clear(&self) {
66 if let Ok(mut q) = self.inner.lock() {
67 q.clear();
68 }
69 }
70}
71
72#[cfg(test)]
73mod tests {
74 use super::SensoryIntakeQueue;
75
76 #[test]
77 fn preserves_arrival_order() {
78 let q = SensoryIntakeQueue::new();
79 q.push(vec![1]);
80 q.push(vec![2]);
81 q.push(vec![3]);
82
83 assert_eq!(q.poll_next().map(|packet| packet.bytes), Some(vec![1]));
84 assert_eq!(q.poll_next().map(|packet| packet.bytes), Some(vec![2]));
85 assert_eq!(q.poll_next().map(|packet| packet.bytes), Some(vec![3]));
86 assert!(q.poll_next().is_none());
87 }
88
89 #[test]
90 fn drops_oldest_when_bounded_capacity_is_reached() {
91 let q = SensoryIntakeQueue::new();
92 for idx in 0..513u16 {
93 q.push(vec![idx as u8]);
94 }
95
96 assert_eq!(q.poll_next().map(|packet| packet.bytes), Some(vec![1]));
98 }
99
100 #[test]
101 fn preserves_source_metadata() {
102 let q = SensoryIntakeQueue::new();
103 q.push_with_source(vec![9], Some("agent-a".to_string()));
104
105 let packet = q.poll_next().expect("expected one packet");
106 assert_eq!(packet.bytes, vec![9]);
107 assert_eq!(packet.source_id.as_deref(), Some("agent-a"));
108 }
109}