Skip to main content

rouchdb_changes/
lib.rs

1/// Streaming changes feed for RouchDB.
2///
3/// Provides a `ChangesStream` that wraps the adapter's `changes()` method
4/// and supports:
5/// - One-shot mode: fetch changes since a sequence and return
6/// - Live/continuous mode: keep polling for new changes
7/// - Filtering by document IDs
8
9use std::sync::Arc;
10use std::time::Duration;
11
12use tokio::sync::broadcast;
13
14use rouchdb_core::adapter::Adapter;
15use rouchdb_core::document::{ChangeEvent, ChangesOptions, Seq};
16use rouchdb_core::error::Result;
17
18/// A notification that a change occurred, sent through the broadcast channel.
19#[derive(Debug, Clone)]
20pub struct ChangeNotification {
21    pub seq: Seq,
22    pub doc_id: String,
23}
24
25/// A sender for change notifications. Adapters use this to notify listeners
26/// when documents are written.
27#[derive(Debug, Clone)]
28pub struct ChangeSender {
29    tx: broadcast::Sender<ChangeNotification>,
30}
31
32impl ChangeSender {
33    pub fn new(capacity: usize) -> (Self, ChangeReceiver) {
34        let (tx, rx) = broadcast::channel(capacity);
35        (ChangeSender { tx }, ChangeReceiver { rx })
36    }
37
38    pub fn notify(&self, seq: Seq, doc_id: String) {
39        // Ignore send errors (no receivers)
40        let _ = self.tx.send(ChangeNotification { seq, doc_id });
41    }
42
43    pub fn subscribe(&self) -> ChangeReceiver {
44        ChangeReceiver {
45            rx: self.tx.subscribe(),
46        }
47    }
48}
49
50/// A receiver for change notifications.
51pub struct ChangeReceiver {
52    rx: broadcast::Receiver<ChangeNotification>,
53}
54
55impl ChangeReceiver {
56    pub async fn recv(&mut self) -> Option<ChangeNotification> {
57        loop {
58            match self.rx.recv().await {
59                Ok(notification) => return Some(notification),
60                Err(broadcast::error::RecvError::Lagged(_)) => {
61                    // Missed some messages, continue receiving
62                    continue;
63                }
64                Err(broadcast::error::RecvError::Closed) => return None,
65            }
66        }
67    }
68}
69
70/// Configuration for a changes stream.
71#[derive(Debug, Clone)]
72pub struct ChangesStreamOptions {
73    pub since: Seq,
74    pub live: bool,
75    pub include_docs: bool,
76    pub doc_ids: Option<Vec<String>>,
77    pub limit: Option<u64>,
78    /// Polling interval for live mode when no broadcast channel is available.
79    pub poll_interval: Duration,
80}
81
82impl Default for ChangesStreamOptions {
83    fn default() -> Self {
84        Self {
85            since: Seq::default(),
86            live: false,
87            include_docs: false,
88            doc_ids: None,
89            limit: None,
90            poll_interval: Duration::from_millis(500),
91        }
92    }
93}
94
95/// Fetch changes from an adapter in one-shot mode.
96pub async fn get_changes(
97    adapter: &dyn Adapter,
98    opts: ChangesStreamOptions,
99) -> Result<Vec<ChangeEvent>> {
100    let changes_opts = ChangesOptions {
101        since: opts.since,
102        limit: opts.limit,
103        descending: false,
104        include_docs: opts.include_docs,
105        live: false,
106        doc_ids: opts.doc_ids,
107    };
108
109    let response = adapter.changes(changes_opts).await?;
110    Ok(response.results)
111}
112
113/// A live changes stream that yields change events as they happen.
114///
115/// In live mode, after fetching existing changes, it waits for
116/// notifications via a broadcast channel or polls at regular intervals.
117pub struct LiveChangesStream {
118    adapter: Arc<dyn Adapter>,
119    receiver: Option<ChangeReceiver>,
120    opts: ChangesStreamOptions,
121    last_seq: Seq,
122    buffer: Vec<ChangeEvent>,
123    buffer_idx: usize,
124    state: LiveStreamState,
125    count: u64,
126}
127
128enum LiveStreamState {
129    /// Fetching the initial batch of changes.
130    FetchingInitial,
131    /// Yielding buffered results.
132    Yielding,
133    /// Waiting for new notifications.
134    Waiting,
135    /// Done (limit reached or adapter closed).
136    Done,
137}
138
139impl LiveChangesStream {
140    pub fn new(
141        adapter: Arc<dyn Adapter>,
142        receiver: Option<ChangeReceiver>,
143        opts: ChangesStreamOptions,
144    ) -> Self {
145        let last_seq = opts.since.clone();
146        Self {
147            adapter,
148            receiver,
149            opts,
150            last_seq,
151            buffer: Vec::new(),
152            buffer_idx: 0,
153            state: LiveStreamState::FetchingInitial,
154            count: 0,
155        }
156    }
157
158    /// Fetch changes since `last_seq` and buffer them.
159    async fn fetch_changes(&mut self) -> Result<()> {
160        let changes_opts = ChangesOptions {
161            since: self.last_seq.clone(),
162            limit: self.opts.limit.map(|l| l.saturating_sub(self.count)),
163            descending: false,
164            include_docs: self.opts.include_docs,
165            live: false,
166            doc_ids: self.opts.doc_ids.clone(),
167        };
168
169        let response = self.adapter.changes(changes_opts).await?;
170        if !response.results.is_empty() {
171            self.last_seq = response.last_seq;
172        }
173        self.buffer = response.results;
174        self.buffer_idx = 0;
175        Ok(())
176    }
177
178    /// Get the next change event, blocking if in live mode.
179    pub async fn next_change(&mut self) -> Option<ChangeEvent> {
180        loop {
181            // Check limit
182            if let Some(limit) = self.opts.limit {
183                if self.count >= limit {
184                    return None;
185                }
186            }
187
188            match self.state {
189                LiveStreamState::FetchingInitial => {
190                    if self.fetch_changes().await.is_err() {
191                        return None;
192                    }
193                    self.state = if self.buffer.is_empty() {
194                        if self.opts.live {
195                            LiveStreamState::Waiting
196                        } else {
197                            LiveStreamState::Done
198                        }
199                    } else {
200                        LiveStreamState::Yielding
201                    };
202                }
203                LiveStreamState::Yielding => {
204                    if self.buffer_idx < self.buffer.len() {
205                        let event = self.buffer[self.buffer_idx].clone();
206                        self.buffer_idx += 1;
207                        self.count += 1;
208                        return Some(event);
209                    }
210                    // Buffer exhausted
211                    self.state = if self.opts.live {
212                        LiveStreamState::Waiting
213                    } else {
214                        LiveStreamState::Done
215                    };
216                }
217                LiveStreamState::Waiting => {
218                    // Wait for a notification or poll
219                    if let Some(ref mut receiver) = self.receiver {
220                        // Wait for broadcast notification
221                        if receiver.recv().await.is_none() {
222                            return None; // Channel closed
223                        }
224                    } else {
225                        // No broadcast channel, poll with interval
226                        tokio::time::sleep(self.opts.poll_interval).await;
227                    }
228
229                    // Fetch new changes
230                    if self.fetch_changes().await.is_err() {
231                        return None;
232                    }
233                    if !self.buffer.is_empty() {
234                        self.state = LiveStreamState::Yielding;
235                    }
236                    // If still empty, stay in Waiting state
237                }
238                LiveStreamState::Done => {
239                    return None;
240                }
241            }
242        }
243    }
244}
245
246// ---------------------------------------------------------------------------
247// Tests
248// ---------------------------------------------------------------------------
249
250#[cfg(test)]
251mod tests {
252    use super::*;
253    use rouchdb_adapter_memory::MemoryAdapter;
254    use rouchdb_core::document::{BulkDocsOptions, Document};
255    use std::collections::HashMap;
256
257    async fn setup() -> (Arc<MemoryAdapter>, ChangeSender) {
258        let db = Arc::new(MemoryAdapter::new("test"));
259        let (sender, _rx) = ChangeSender::new(64);
260        (db, sender)
261    }
262
263    async fn put_doc(db: &dyn Adapter, id: &str, data: serde_json::Value) -> String {
264        let doc = Document {
265            id: id.into(),
266            rev: None,
267            deleted: false,
268            data,
269            attachments: HashMap::new(),
270        };
271        let results = db
272            .bulk_docs(vec![doc], BulkDocsOptions::new())
273            .await
274            .unwrap();
275        results[0].rev.clone().unwrap()
276    }
277
278    #[tokio::test]
279    async fn one_shot_changes() {
280        let (db, _sender) = setup().await;
281        put_doc(db.as_ref(), "a", serde_json::json!({"v": 1})).await;
282        put_doc(db.as_ref(), "b", serde_json::json!({"v": 2})).await;
283
284        let events = get_changes(
285            db.as_ref(),
286            ChangesStreamOptions::default(),
287        )
288        .await
289        .unwrap();
290
291        assert_eq!(events.len(), 2);
292        assert_eq!(events[0].id, "a");
293        assert_eq!(events[1].id, "b");
294    }
295
296    #[tokio::test]
297    async fn one_shot_changes_since() {
298        let (db, _sender) = setup().await;
299        put_doc(db.as_ref(), "a", serde_json::json!({})).await;
300        put_doc(db.as_ref(), "b", serde_json::json!({})).await;
301        put_doc(db.as_ref(), "c", serde_json::json!({})).await;
302
303        let events = get_changes(
304            db.as_ref(),
305            ChangesStreamOptions {
306                since: Seq::Num(2),
307                ..Default::default()
308            },
309        )
310        .await
311        .unwrap();
312
313        assert_eq!(events.len(), 1);
314        assert_eq!(events[0].id, "c");
315    }
316
317    #[tokio::test]
318    async fn one_shot_with_limit() {
319        let (db, _sender) = setup().await;
320        for i in 0..5 {
321            put_doc(db.as_ref(), &format!("d{}", i), serde_json::json!({})).await;
322        }
323
324        let events = get_changes(
325            db.as_ref(),
326            ChangesStreamOptions {
327                limit: Some(2),
328                ..Default::default()
329            },
330        )
331        .await
332        .unwrap();
333
334        assert_eq!(events.len(), 2);
335    }
336
337    #[tokio::test]
338    async fn live_stream_initial_then_new() {
339        let (db, sender) = setup().await;
340        put_doc(db.as_ref(), "existing", serde_json::json!({})).await;
341
342        let receiver = sender.subscribe();
343        let db_clone = db.clone();
344
345        let mut stream = LiveChangesStream::new(
346            db.clone(),
347            Some(receiver),
348            ChangesStreamOptions {
349                live: true,
350                limit: Some(3),
351                ..Default::default()
352            },
353        );
354
355        // First event should be the existing doc
356        let event = stream.next_change().await.unwrap();
357        assert_eq!(event.id, "existing");
358
359        // Now add more docs in the background
360        let sender_clone = sender.clone();
361        tokio::spawn(async move {
362            tokio::time::sleep(Duration::from_millis(50)).await;
363            put_doc(db_clone.as_ref(), "new1", serde_json::json!({})).await;
364            sender_clone.notify(Seq::Num(2), "new1".into());
365            tokio::time::sleep(Duration::from_millis(50)).await;
366            put_doc(db_clone.as_ref(), "new2", serde_json::json!({})).await;
367            sender_clone.notify(Seq::Num(3), "new2".into());
368        });
369
370        let event = stream.next_change().await.unwrap();
371        assert_eq!(event.id, "new1");
372
373        let event = stream.next_change().await.unwrap();
374        assert_eq!(event.id, "new2");
375
376        // Limit reached (3)
377        assert!(stream.next_change().await.is_none());
378    }
379
380    #[tokio::test]
381    async fn change_sender_subscribe() {
382        let (sender, _rx) = ChangeSender::new(16);
383        let mut sub = sender.subscribe();
384
385        sender.notify(Seq::Num(1), "doc1".into());
386
387        let notification = sub.recv().await.unwrap();
388        assert_eq!(notification.seq, Seq::Num(1));
389        assert_eq!(notification.doc_id, "doc1");
390    }
391}