Skip to main content

rouchdb_changes/
lib.rs

1/// Streaming changes feed for RouchDB.
2///
3/// Provides a `ChangesStream` that wraps the adapter's `changes()` method
4/// and supports:
5/// - One-shot mode: fetch changes since a sequence and return
6/// - Live/continuous mode: keep polling for new changes
7/// - Filtering by document IDs
8use std::sync::Arc;
9use std::time::Duration;
10
11use tokio::sync::broadcast;
12
13use rouchdb_core::adapter::Adapter;
14use rouchdb_core::document::{ChangeEvent, ChangesOptions, Seq};
15use rouchdb_core::error::Result;
16
17/// A notification that a change occurred, sent through the broadcast channel.
18#[derive(Debug, Clone)]
19pub struct ChangeNotification {
20    pub seq: Seq,
21    pub doc_id: String,
22}
23
24/// A sender for change notifications. Adapters use this to notify listeners
25/// when documents are written.
26#[derive(Debug, Clone)]
27pub struct ChangeSender {
28    tx: broadcast::Sender<ChangeNotification>,
29}
30
31impl ChangeSender {
32    pub fn new(capacity: usize) -> (Self, ChangeReceiver) {
33        let (tx, rx) = broadcast::channel(capacity);
34        (ChangeSender { tx }, ChangeReceiver { rx })
35    }
36
37    pub fn notify(&self, seq: Seq, doc_id: String) {
38        // Ignore send errors (no receivers)
39        let _ = self.tx.send(ChangeNotification { seq, doc_id });
40    }
41
42    pub fn subscribe(&self) -> ChangeReceiver {
43        ChangeReceiver {
44            rx: self.tx.subscribe(),
45        }
46    }
47}
48
49/// A receiver for change notifications.
50pub struct ChangeReceiver {
51    rx: broadcast::Receiver<ChangeNotification>,
52}
53
54impl ChangeReceiver {
55    pub async fn recv(&mut self) -> Option<ChangeNotification> {
56        loop {
57            match self.rx.recv().await {
58                Ok(notification) => return Some(notification),
59                Err(broadcast::error::RecvError::Lagged(_)) => {
60                    // Missed some messages, continue receiving
61                    continue;
62                }
63                Err(broadcast::error::RecvError::Closed) => return None,
64            }
65        }
66    }
67}
68
69/// Configuration for a changes stream.
70#[derive(Debug, Clone)]
71pub struct ChangesStreamOptions {
72    pub since: Seq,
73    pub live: bool,
74    pub include_docs: bool,
75    pub doc_ids: Option<Vec<String>>,
76    pub limit: Option<u64>,
77    /// Polling interval for live mode when no broadcast channel is available.
78    pub poll_interval: Duration,
79}
80
81impl Default for ChangesStreamOptions {
82    fn default() -> Self {
83        Self {
84            since: Seq::default(),
85            live: false,
86            include_docs: false,
87            doc_ids: None,
88            limit: None,
89            poll_interval: Duration::from_millis(500),
90        }
91    }
92}
93
94/// Fetch changes from an adapter in one-shot mode.
95pub async fn get_changes(
96    adapter: &dyn Adapter,
97    opts: ChangesStreamOptions,
98) -> Result<Vec<ChangeEvent>> {
99    let changes_opts = ChangesOptions {
100        since: opts.since,
101        limit: opts.limit,
102        descending: false,
103        include_docs: opts.include_docs,
104        live: false,
105        doc_ids: opts.doc_ids,
106    };
107
108    let response = adapter.changes(changes_opts).await?;
109    Ok(response.results)
110}
111
112/// A live changes stream that yields change events as they happen.
113///
114/// In live mode, after fetching existing changes, it waits for
115/// notifications via a broadcast channel or polls at regular intervals.
116pub struct LiveChangesStream {
117    adapter: Arc<dyn Adapter>,
118    receiver: Option<ChangeReceiver>,
119    opts: ChangesStreamOptions,
120    last_seq: Seq,
121    buffer: Vec<ChangeEvent>,
122    buffer_idx: usize,
123    state: LiveStreamState,
124    count: u64,
125}
126
127enum LiveStreamState {
128    /// Fetching the initial batch of changes.
129    FetchingInitial,
130    /// Yielding buffered results.
131    Yielding,
132    /// Waiting for new notifications.
133    Waiting,
134    /// Done (limit reached or adapter closed).
135    Done,
136}
137
138impl LiveChangesStream {
139    pub fn new(
140        adapter: Arc<dyn Adapter>,
141        receiver: Option<ChangeReceiver>,
142        opts: ChangesStreamOptions,
143    ) -> Self {
144        let last_seq = opts.since.clone();
145        Self {
146            adapter,
147            receiver,
148            opts,
149            last_seq,
150            buffer: Vec::new(),
151            buffer_idx: 0,
152            state: LiveStreamState::FetchingInitial,
153            count: 0,
154        }
155    }
156
157    /// Fetch changes since `last_seq` and buffer them.
158    async fn fetch_changes(&mut self) -> Result<()> {
159        let changes_opts = ChangesOptions {
160            since: self.last_seq.clone(),
161            limit: self.opts.limit.map(|l| l.saturating_sub(self.count)),
162            descending: false,
163            include_docs: self.opts.include_docs,
164            live: false,
165            doc_ids: self.opts.doc_ids.clone(),
166        };
167
168        let response = self.adapter.changes(changes_opts).await?;
169        if !response.results.is_empty() {
170            self.last_seq = response.last_seq;
171        }
172        self.buffer = response.results;
173        self.buffer_idx = 0;
174        Ok(())
175    }
176
177    /// Get the next change event, blocking if in live mode.
178    pub async fn next_change(&mut self) -> Option<ChangeEvent> {
179        loop {
180            // Check limit
181            if let Some(limit) = self.opts.limit
182                && self.count >= limit
183            {
184                return None;
185            }
186
187            match self.state {
188                LiveStreamState::FetchingInitial => {
189                    if self.fetch_changes().await.is_err() {
190                        return None;
191                    }
192                    self.state = if self.buffer.is_empty() {
193                        if self.opts.live {
194                            LiveStreamState::Waiting
195                        } else {
196                            LiveStreamState::Done
197                        }
198                    } else {
199                        LiveStreamState::Yielding
200                    };
201                }
202                LiveStreamState::Yielding => {
203                    if self.buffer_idx < self.buffer.len() {
204                        let event = self.buffer[self.buffer_idx].clone();
205                        self.buffer_idx += 1;
206                        self.count += 1;
207                        return Some(event);
208                    }
209                    // Buffer exhausted
210                    self.state = if self.opts.live {
211                        LiveStreamState::Waiting
212                    } else {
213                        LiveStreamState::Done
214                    };
215                }
216                LiveStreamState::Waiting => {
217                    // Wait for a notification or poll
218                    if let Some(ref mut receiver) = self.receiver {
219                        // Wait for broadcast notification
220                        receiver.recv().await.as_ref()?;
221                    } else {
222                        // No broadcast channel, poll with interval
223                        tokio::time::sleep(self.opts.poll_interval).await;
224                    }
225
226                    // Fetch new changes
227                    if self.fetch_changes().await.is_err() {
228                        return None;
229                    }
230                    if !self.buffer.is_empty() {
231                        self.state = LiveStreamState::Yielding;
232                    }
233                    // If still empty, stay in Waiting state
234                }
235                LiveStreamState::Done => {
236                    return None;
237                }
238            }
239        }
240    }
241}
242
243// ---------------------------------------------------------------------------
244// Tests
245// ---------------------------------------------------------------------------
246
247#[cfg(test)]
248mod tests {
249    use super::*;
250    use rouchdb_adapter_memory::MemoryAdapter;
251    use rouchdb_core::document::{BulkDocsOptions, Document};
252    use std::collections::HashMap;
253
254    async fn setup() -> (Arc<MemoryAdapter>, ChangeSender) {
255        let db = Arc::new(MemoryAdapter::new("test"));
256        let (sender, _rx) = ChangeSender::new(64);
257        (db, sender)
258    }
259
260    async fn put_doc(db: &dyn Adapter, id: &str, data: serde_json::Value) -> String {
261        let doc = Document {
262            id: id.into(),
263            rev: None,
264            deleted: false,
265            data,
266            attachments: HashMap::new(),
267        };
268        let results = db
269            .bulk_docs(vec![doc], BulkDocsOptions::new())
270            .await
271            .unwrap();
272        results[0].rev.clone().unwrap()
273    }
274
275    #[tokio::test]
276    async fn one_shot_changes() {
277        let (db, _sender) = setup().await;
278        put_doc(db.as_ref(), "a", serde_json::json!({"v": 1})).await;
279        put_doc(db.as_ref(), "b", serde_json::json!({"v": 2})).await;
280
281        let events = get_changes(db.as_ref(), ChangesStreamOptions::default())
282            .await
283            .unwrap();
284
285        assert_eq!(events.len(), 2);
286        assert_eq!(events[0].id, "a");
287        assert_eq!(events[1].id, "b");
288    }
289
290    #[tokio::test]
291    async fn one_shot_changes_since() {
292        let (db, _sender) = setup().await;
293        put_doc(db.as_ref(), "a", serde_json::json!({})).await;
294        put_doc(db.as_ref(), "b", serde_json::json!({})).await;
295        put_doc(db.as_ref(), "c", serde_json::json!({})).await;
296
297        let events = get_changes(
298            db.as_ref(),
299            ChangesStreamOptions {
300                since: Seq::Num(2),
301                ..Default::default()
302            },
303        )
304        .await
305        .unwrap();
306
307        assert_eq!(events.len(), 1);
308        assert_eq!(events[0].id, "c");
309    }
310
311    #[tokio::test]
312    async fn one_shot_with_limit() {
313        let (db, _sender) = setup().await;
314        for i in 0..5 {
315            put_doc(db.as_ref(), &format!("d{}", i), serde_json::json!({})).await;
316        }
317
318        let events = get_changes(
319            db.as_ref(),
320            ChangesStreamOptions {
321                limit: Some(2),
322                ..Default::default()
323            },
324        )
325        .await
326        .unwrap();
327
328        assert_eq!(events.len(), 2);
329    }
330
331    #[tokio::test]
332    async fn live_stream_initial_then_new() {
333        let (db, sender) = setup().await;
334        put_doc(db.as_ref(), "existing", serde_json::json!({})).await;
335
336        let receiver = sender.subscribe();
337        let db_clone = db.clone();
338
339        let mut stream = LiveChangesStream::new(
340            db.clone(),
341            Some(receiver),
342            ChangesStreamOptions {
343                live: true,
344                limit: Some(3),
345                ..Default::default()
346            },
347        );
348
349        // First event should be the existing doc
350        let event = stream.next_change().await.unwrap();
351        assert_eq!(event.id, "existing");
352
353        // Now add more docs in the background
354        let sender_clone = sender.clone();
355        tokio::spawn(async move {
356            tokio::time::sleep(Duration::from_millis(50)).await;
357            put_doc(db_clone.as_ref(), "new1", serde_json::json!({})).await;
358            sender_clone.notify(Seq::Num(2), "new1".into());
359            tokio::time::sleep(Duration::from_millis(50)).await;
360            put_doc(db_clone.as_ref(), "new2", serde_json::json!({})).await;
361            sender_clone.notify(Seq::Num(3), "new2".into());
362        });
363
364        let event = stream.next_change().await.unwrap();
365        assert_eq!(event.id, "new1");
366
367        let event = stream.next_change().await.unwrap();
368        assert_eq!(event.id, "new2");
369
370        // Limit reached (3)
371        assert!(stream.next_change().await.is_none());
372    }
373
374    #[tokio::test]
375    async fn change_sender_subscribe() {
376        let (sender, _rx) = ChangeSender::new(16);
377        let mut sub = sender.subscribe();
378
379        sender.notify(Seq::Num(1), "doc1".into());
380
381        let notification = sub.recv().await.unwrap();
382        assert_eq!(notification.seq, Seq::Num(1));
383        assert_eq!(notification.doc_id, "doc1");
384    }
385}