dwbase_stream_local/
lib.rs1use std::collections::HashMap;
6use std::sync::atomic::{AtomicUsize, Ordering};
7use std::sync::mpsc::{self, Receiver, Sender};
8use std::sync::Mutex;
9
10use dwbase_core::{Atom, WorldKey};
11use dwbase_engine::{AtomFilter, DwbaseError, Result, StreamEngine};
12
13#[derive(Debug)]
14struct Subscriber {
15 filter: AtomFilter,
16 sender: Sender<Atom>,
17 receiver: Receiver<Atom>,
18}
19
20pub struct LocalStreamEngine {
22 next_id: AtomicUsize,
23 subscribers: Mutex<HashMap<usize, Subscriber>>,
24}
25
26impl LocalStreamEngine {
27 pub fn new() -> Self {
28 Self {
29 next_id: AtomicUsize::new(1),
30 subscribers: Mutex::new(HashMap::new()),
31 }
32 }
33
34 fn matches_filter(atom: &Atom, filter: &AtomFilter) -> bool {
35 if let Some(world) = &filter.world {
36 if atom.world() != world {
37 return false;
38 }
39 }
40 if !filter.kinds.is_empty() && !filter.kinds.contains(atom.kind()) {
41 return false;
42 }
43 if !filter.labels.is_empty() && !filter.labels.iter().all(|l| atom.labels().contains(l)) {
44 return false;
45 }
46 if !filter.flags.is_empty() && !filter.flags.iter().all(|f| atom.flags().contains(f)) {
47 return false;
48 }
49 if let Some(since) = &filter.since {
50 if atom.timestamp().0 < since.0 {
51 return false;
52 }
53 }
54 if let Some(until) = &filter.until {
55 if atom.timestamp().0 > until.0 {
56 return false;
57 }
58 }
59 true
60 }
61}
62
63impl Default for LocalStreamEngine {
64 fn default() -> Self {
65 Self::new()
66 }
67}
68
69impl StreamEngine for LocalStreamEngine {
70 type Handle = usize;
71
72 fn publish(&self, atom: &Atom) -> Result<()> {
73 let mut to_remove = Vec::new();
74 let subscribers = self.subscribers.lock().expect("subscribers lock poisoned");
75 for (id, sub) in subscribers.iter() {
76 if Self::matches_filter(atom, &sub.filter) && sub.sender.send(atom.clone()).is_err() {
77 to_remove.push(*id);
78 }
79 }
80 drop(subscribers);
81
82 if !to_remove.is_empty() {
83 let mut subs = self.subscribers.lock().expect("subscribers lock poisoned");
84 for id in to_remove {
85 subs.remove(&id);
86 }
87 }
88
89 Ok(())
90 }
91
92 fn subscribe(&self, world: &WorldKey, filter: AtomFilter) -> Result<Self::Handle> {
93 let (tx, rx) = mpsc::channel();
94 let mut filter = filter;
95 filter.world = Some(world.clone());
96 let id = self.next_id.fetch_add(1, Ordering::Relaxed);
97 let subscriber = Subscriber {
98 filter,
99 sender: tx,
100 receiver: rx,
101 };
102 let mut subscribers = self.subscribers.lock().expect("subscribers lock poisoned");
103 subscribers.insert(id, subscriber);
104 Ok(id)
105 }
106
107 fn poll(&self, handle: &Self::Handle) -> Result<Option<Atom>> {
108 let mut subscribers = self.subscribers.lock().expect("subscribers lock poisoned");
109 if let Some(sub) = subscribers.get_mut(handle) {
110 match sub.receiver.try_recv() {
111 Ok(atom) => Ok(Some(atom)),
112 Err(mpsc::TryRecvError::Empty) => Ok(None),
113 Err(mpsc::TryRecvError::Disconnected) => {
114 subscribers.remove(handle);
115 Ok(None)
116 }
117 }
118 } else {
119 Err(DwbaseError::Stream(format!("unknown handle {}", handle)))
120 }
121 }
122
123 fn stop(&self, handle: Self::Handle) -> Result<()> {
124 let mut subscribers = self.subscribers.lock().expect("subscribers lock poisoned");
125 subscribers.remove(&handle);
126 Ok(())
127 }
128}
129
130#[cfg(test)]
131mod tests {
132 use super::*;
133 use dwbase_core::{Atom, AtomId, AtomKind, Importance, Timestamp, WorkerKey};
134
135 fn atom(id: &str, world: &str, labels: &[&str]) -> Atom {
136 let mut builder = Atom::builder(
137 AtomId::new(id),
138 WorldKey::new(world),
139 WorkerKey::new("wkr"),
140 AtomKind::Observation,
141 Timestamp::new("2024-01-01T00:00:00Z"),
142 Importance::new(0.5).unwrap(),
143 "{}",
144 );
145 for l in labels {
146 builder = builder.add_label(*l);
147 }
148 builder.build()
149 }
150
151 #[test]
152 fn subscribe_publish_poll_delivers_atoms() {
153 let engine = LocalStreamEngine::new();
154 let world = WorldKey::new("w1");
155 let handle = engine
156 .subscribe(&world, AtomFilter::default())
157 .expect("subscribe");
158 let a1 = atom("a1", "w1", &[]);
159 engine.publish(&a1).unwrap();
160
161 let polled = engine.poll(&handle).unwrap();
162 assert_eq!(polled.as_ref().unwrap().id(), a1.id());
163 assert!(engine.poll(&handle).unwrap().is_none());
165 }
166
167 #[test]
168 fn filtering_is_respected() {
169 let engine = LocalStreamEngine::new();
170 let world = WorldKey::new("w1");
171 let filter = AtomFilter {
172 world: Some(world.clone()),
173 kinds: vec![AtomKind::Observation],
174 labels: vec!["keep".into()],
175 flags: vec![],
176 since: None,
177 until: None,
178 limit: None,
179 };
180 let handle = engine.subscribe(&world, filter).unwrap();
181
182 let drop_atom = atom("drop", "w1", &["other"]);
183 let keep_atom = atom("keep", "w1", &["keep"]);
184
185 engine.publish(&drop_atom).unwrap();
186 engine.publish(&keep_atom).unwrap();
187
188 assert!(engine.poll(&handle).unwrap().is_some());
190 let polled = engine.poll(&handle).unwrap();
191 assert!(polled.is_none());
192 }
193}