Skip to main content

dwbase_stream_local/
lib.rs

1//! Local in-memory stream engine using per-subscriber channels.
2//!
3//! This implementation is synchronous and uses poll semantics (no async streams) to stay WASM-friendly.
4
5use std::collections::HashMap;
6use std::sync::atomic::{AtomicUsize, Ordering};
7use std::sync::mpsc::{self, Receiver, Sender};
8use std::sync::Mutex;
9
10use dwbase_core::{Atom, WorldKey};
11use dwbase_engine::{AtomFilter, DwbaseError, Result, StreamEngine};
12
13#[derive(Debug)]
14struct Subscriber {
15    filter: AtomFilter,
16    sender: Sender<Atom>,
17    receiver: Receiver<Atom>,
18}
19
20/// Local implementation of `StreamEngine` using per-subscriber channels and poll semantics.
21pub struct LocalStreamEngine {
22    next_id: AtomicUsize,
23    subscribers: Mutex<HashMap<usize, Subscriber>>,
24}
25
26impl LocalStreamEngine {
27    pub fn new() -> Self {
28        Self {
29            next_id: AtomicUsize::new(1),
30            subscribers: Mutex::new(HashMap::new()),
31        }
32    }
33
34    fn matches_filter(atom: &Atom, filter: &AtomFilter) -> bool {
35        if let Some(world) = &filter.world {
36            if atom.world() != world {
37                return false;
38            }
39        }
40        if !filter.kinds.is_empty() && !filter.kinds.contains(atom.kind()) {
41            return false;
42        }
43        if !filter.labels.is_empty() && !filter.labels.iter().all(|l| atom.labels().contains(l)) {
44            return false;
45        }
46        if !filter.flags.is_empty() && !filter.flags.iter().all(|f| atom.flags().contains(f)) {
47            return false;
48        }
49        if let Some(since) = &filter.since {
50            if atom.timestamp().0 < since.0 {
51                return false;
52            }
53        }
54        if let Some(until) = &filter.until {
55            if atom.timestamp().0 > until.0 {
56                return false;
57            }
58        }
59        true
60    }
61}
62
63impl Default for LocalStreamEngine {
64    fn default() -> Self {
65        Self::new()
66    }
67}
68
69impl StreamEngine for LocalStreamEngine {
70    type Handle = usize;
71
72    fn publish(&self, atom: &Atom) -> Result<()> {
73        let mut to_remove = Vec::new();
74        let subscribers = self.subscribers.lock().expect("subscribers lock poisoned");
75        for (id, sub) in subscribers.iter() {
76            if Self::matches_filter(atom, &sub.filter) && sub.sender.send(atom.clone()).is_err() {
77                to_remove.push(*id);
78            }
79        }
80        drop(subscribers);
81
82        if !to_remove.is_empty() {
83            let mut subs = self.subscribers.lock().expect("subscribers lock poisoned");
84            for id in to_remove {
85                subs.remove(&id);
86            }
87        }
88
89        Ok(())
90    }
91
92    fn subscribe(&self, world: &WorldKey, filter: AtomFilter) -> Result<Self::Handle> {
93        let (tx, rx) = mpsc::channel();
94        let mut filter = filter;
95        filter.world = Some(world.clone());
96        let id = self.next_id.fetch_add(1, Ordering::Relaxed);
97        let subscriber = Subscriber {
98            filter,
99            sender: tx,
100            receiver: rx,
101        };
102        let mut subscribers = self.subscribers.lock().expect("subscribers lock poisoned");
103        subscribers.insert(id, subscriber);
104        Ok(id)
105    }
106
107    fn poll(&self, handle: &Self::Handle) -> Result<Option<Atom>> {
108        let mut subscribers = self.subscribers.lock().expect("subscribers lock poisoned");
109        if let Some(sub) = subscribers.get_mut(handle) {
110            match sub.receiver.try_recv() {
111                Ok(atom) => Ok(Some(atom)),
112                Err(mpsc::TryRecvError::Empty) => Ok(None),
113                Err(mpsc::TryRecvError::Disconnected) => {
114                    subscribers.remove(handle);
115                    Ok(None)
116                }
117            }
118        } else {
119            Err(DwbaseError::Stream(format!("unknown handle {}", handle)))
120        }
121    }
122
123    fn stop(&self, handle: Self::Handle) -> Result<()> {
124        let mut subscribers = self.subscribers.lock().expect("subscribers lock poisoned");
125        subscribers.remove(&handle);
126        Ok(())
127    }
128}
129
130#[cfg(test)]
131mod tests {
132    use super::*;
133    use dwbase_core::{Atom, AtomId, AtomKind, Importance, Timestamp, WorkerKey};
134
135    fn atom(id: &str, world: &str, labels: &[&str]) -> Atom {
136        let mut builder = Atom::builder(
137            AtomId::new(id),
138            WorldKey::new(world),
139            WorkerKey::new("wkr"),
140            AtomKind::Observation,
141            Timestamp::new("2024-01-01T00:00:00Z"),
142            Importance::new(0.5).unwrap(),
143            "{}",
144        );
145        for l in labels {
146            builder = builder.add_label(*l);
147        }
148        builder.build()
149    }
150
151    #[test]
152    fn subscribe_publish_poll_delivers_atoms() {
153        let engine = LocalStreamEngine::new();
154        let world = WorldKey::new("w1");
155        let handle = engine
156            .subscribe(&world, AtomFilter::default())
157            .expect("subscribe");
158        let a1 = atom("a1", "w1", &[]);
159        engine.publish(&a1).unwrap();
160
161        let polled = engine.poll(&handle).unwrap();
162        assert_eq!(polled.as_ref().unwrap().id(), a1.id());
163        // no additional items
164        assert!(engine.poll(&handle).unwrap().is_none());
165    }
166
167    #[test]
168    fn filtering_is_respected() {
169        let engine = LocalStreamEngine::new();
170        let world = WorldKey::new("w1");
171        let filter = AtomFilter {
172            world: Some(world.clone()),
173            kinds: vec![AtomKind::Observation],
174            labels: vec!["keep".into()],
175            flags: vec![],
176            since: None,
177            until: None,
178            limit: None,
179        };
180        let handle = engine.subscribe(&world, filter).unwrap();
181
182        let drop_atom = atom("drop", "w1", &["other"]);
183        let keep_atom = atom("keep", "w1", &["keep"]);
184
185        engine.publish(&drop_atom).unwrap();
186        engine.publish(&keep_atom).unwrap();
187
188        // drop is filtered out
189        assert!(engine.poll(&handle).unwrap().is_some());
190        let polled = engine.poll(&handle).unwrap();
191        assert!(polled.is_none());
192    }
193}