Skip to main content

rouchdb_changes/
lib.rs

1/// Streaming changes feed for RouchDB.
2///
3/// Provides a `ChangesStream` that wraps the adapter's `changes()` method
4/// and supports:
5/// - One-shot mode: fetch changes since a sequence and return
6/// - Live/continuous mode: keep polling for new changes
7/// - Filtering by document IDs
8use std::sync::Arc;
9use std::time::Duration;
10
11use tokio::sync::{broadcast, mpsc};
12use tokio_util::sync::CancellationToken;
13
14use rouchdb_core::adapter::Adapter;
15use rouchdb_core::document::{ChangeEvent, ChangesOptions, ChangesStyle, Seq};
16
17/// A filter function for changes events.
18pub type ChangesFilter = Arc<dyn Fn(&ChangeEvent) -> bool + Send + Sync>;
19
20/// Lifecycle events emitted by a live changes stream.
21///
22/// Mirrors PouchDB's changes event model: `change`, `complete`, `error`,
23/// `paused`, and `active`.
24#[derive(Debug, Clone)]
25pub enum ChangesEvent {
26    /// A document changed.
27    Change(ChangeEvent),
28    /// The stream completed (limit reached or non-live mode ended).
29    Complete { last_seq: Seq },
30    /// An error occurred while fetching changes.
31    Error(String),
32    /// The stream is caught up and waiting for new changes.
33    Paused,
34    /// The stream resumed fetching after being paused.
35    Active,
36}
37use rouchdb_core::error::Result;
38
39/// A notification that a change occurred, sent through the broadcast channel.
40#[derive(Debug, Clone)]
41pub struct ChangeNotification {
42    pub seq: Seq,
43    pub doc_id: String,
44}
45
46/// A sender for change notifications. Adapters use this to notify listeners
47/// when documents are written.
48#[derive(Debug, Clone)]
49pub struct ChangeSender {
50    tx: broadcast::Sender<ChangeNotification>,
51}
52
53impl ChangeSender {
54    pub fn new(capacity: usize) -> (Self, ChangeReceiver) {
55        let (tx, rx) = broadcast::channel(capacity);
56        (ChangeSender { tx }, ChangeReceiver { rx })
57    }
58
59    pub fn notify(&self, seq: Seq, doc_id: String) {
60        // Ignore send errors (no receivers)
61        let _ = self.tx.send(ChangeNotification { seq, doc_id });
62    }
63
64    pub fn subscribe(&self) -> ChangeReceiver {
65        ChangeReceiver {
66            rx: self.tx.subscribe(),
67        }
68    }
69}
70
71/// A receiver for change notifications.
72pub struct ChangeReceiver {
73    rx: broadcast::Receiver<ChangeNotification>,
74}
75
76impl ChangeReceiver {
77    pub async fn recv(&mut self) -> Option<ChangeNotification> {
78        loop {
79            match self.rx.recv().await {
80                Ok(notification) => return Some(notification),
81                Err(broadcast::error::RecvError::Lagged(_)) => {
82                    // Missed some messages, continue receiving
83                    continue;
84                }
85                Err(broadcast::error::RecvError::Closed) => return None,
86            }
87        }
88    }
89}
90
91/// Configuration for a changes stream.
92#[derive(Clone)]
93pub struct ChangesStreamOptions {
94    pub since: Seq,
95    pub live: bool,
96    pub include_docs: bool,
97    pub doc_ids: Option<Vec<String>>,
98    pub selector: Option<serde_json::Value>,
99    pub limit: Option<u64>,
100    /// Include conflicting revisions per change event.
101    pub conflicts: bool,
102    /// Changes style: `MainOnly` (default) or `AllDocs`.
103    pub style: ChangesStyle,
104    /// A filter function applied post-fetch to each change event.
105    pub filter: Option<ChangesFilter>,
106    /// Polling interval for live mode when no broadcast channel is available.
107    pub poll_interval: Duration,
108    /// How long to keep the connection open before closing in live mode.
109    pub timeout: Option<Duration>,
110    /// Interval for heartbeat signals in live mode (prevents connection timeout).
111    pub heartbeat: Option<Duration>,
112}
113
114impl Default for ChangesStreamOptions {
115    fn default() -> Self {
116        Self {
117            since: Seq::default(),
118            live: false,
119            include_docs: false,
120            doc_ids: None,
121            selector: None,
122            limit: None,
123            conflicts: false,
124            style: ChangesStyle::default(),
125            filter: None,
126            poll_interval: Duration::from_millis(500),
127            timeout: None,
128            heartbeat: None,
129        }
130    }
131}
132
133impl std::fmt::Debug for ChangesStreamOptions {
134    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
135        f.debug_struct("ChangesStreamOptions")
136            .field("since", &self.since)
137            .field("live", &self.live)
138            .field("include_docs", &self.include_docs)
139            .field("doc_ids", &self.doc_ids)
140            .field("selector", &self.selector)
141            .field("limit", &self.limit)
142            .field("conflicts", &self.conflicts)
143            .field("style", &self.style)
144            .field("filter", &self.filter.as_ref().map(|_| "<fn>"))
145            .field("poll_interval", &self.poll_interval)
146            .field("timeout", &self.timeout)
147            .field("heartbeat", &self.heartbeat)
148            .finish()
149    }
150}
151
152/// Fetch changes from an adapter in one-shot mode.
153pub async fn get_changes(
154    adapter: &dyn Adapter,
155    opts: ChangesStreamOptions,
156) -> Result<Vec<ChangeEvent>> {
157    let filter = opts.filter.clone();
158    let changes_opts = ChangesOptions {
159        since: opts.since,
160        limit: opts.limit,
161        descending: false,
162        include_docs: opts.include_docs,
163        live: false,
164        doc_ids: opts.doc_ids,
165        conflicts: opts.conflicts,
166        style: opts.style,
167        ..Default::default()
168    };
169
170    let response = adapter.changes(changes_opts).await?;
171    let results = if let Some(f) = filter {
172        response.results.into_iter().filter(|e| f(e)).collect()
173    } else {
174        response.results
175    };
176    Ok(results)
177}
178
179/// A live changes stream that yields change events as they happen.
180///
181/// In live mode, after fetching existing changes, it waits for
182/// notifications via a broadcast channel or polls at regular intervals.
183pub struct LiveChangesStream {
184    adapter: Arc<dyn Adapter>,
185    receiver: Option<ChangeReceiver>,
186    opts: ChangesStreamOptions,
187    last_seq: Seq,
188    buffer: Vec<ChangeEvent>,
189    buffer_idx: usize,
190    state: LiveStreamState,
191    count: u64,
192}
193
194enum LiveStreamState {
195    /// Fetching the initial batch of changes.
196    FetchingInitial,
197    /// Yielding buffered results.
198    Yielding,
199    /// Waiting for new notifications.
200    Waiting,
201    /// Done (limit reached or adapter closed).
202    Done,
203}
204
205impl LiveChangesStream {
206    pub fn new(
207        adapter: Arc<dyn Adapter>,
208        receiver: Option<ChangeReceiver>,
209        opts: ChangesStreamOptions,
210    ) -> Self {
211        let last_seq = opts.since.clone();
212        Self {
213            adapter,
214            receiver,
215            opts,
216            last_seq,
217            buffer: Vec::new(),
218            buffer_idx: 0,
219            state: LiveStreamState::FetchingInitial,
220            count: 0,
221        }
222    }
223
224    /// Fetch changes since `last_seq` and buffer them.
225    async fn fetch_changes(&mut self) -> Result<()> {
226        let changes_opts = ChangesOptions {
227            since: self.last_seq.clone(),
228            limit: self.opts.limit.map(|l| l.saturating_sub(self.count)),
229            descending: false,
230            include_docs: self.opts.include_docs,
231            live: false,
232            doc_ids: self.opts.doc_ids.clone(),
233            conflicts: self.opts.conflicts,
234            style: self.opts.style.clone(),
235            ..Default::default()
236        };
237
238        let response = self.adapter.changes(changes_opts).await?;
239        if !response.results.is_empty() {
240            self.last_seq = response.last_seq;
241        }
242        self.buffer = response.results;
243        self.buffer_idx = 0;
244        Ok(())
245    }
246
247    /// Get the next change event, blocking if in live mode.
248    pub async fn next_change(&mut self) -> Option<ChangeEvent> {
249        loop {
250            // Check limit
251            if let Some(limit) = self.opts.limit
252                && self.count >= limit
253            {
254                return None;
255            }
256
257            match self.state {
258                LiveStreamState::FetchingInitial => {
259                    if self.fetch_changes().await.is_err() {
260                        return None;
261                    }
262                    self.state = if self.buffer.is_empty() {
263                        if self.opts.live {
264                            LiveStreamState::Waiting
265                        } else {
266                            LiveStreamState::Done
267                        }
268                    } else {
269                        LiveStreamState::Yielding
270                    };
271                }
272                LiveStreamState::Yielding => {
273                    if self.buffer_idx < self.buffer.len() {
274                        let event = self.buffer[self.buffer_idx].clone();
275                        self.buffer_idx += 1;
276                        self.count += 1;
277                        return Some(event);
278                    }
279                    // Buffer exhausted
280                    self.state = if self.opts.live {
281                        LiveStreamState::Waiting
282                    } else {
283                        LiveStreamState::Done
284                    };
285                }
286                LiveStreamState::Waiting => {
287                    // Wait for a notification or poll, with optional timeout
288                    let wait_result = if let Some(ref mut receiver) = self.receiver {
289                        if let Some(timeout_dur) = self.opts.timeout {
290                            match tokio::time::timeout(timeout_dur, receiver.recv()).await {
291                                Ok(Some(_)) => true,
292                                Ok(None) => return None, // Channel closed
293                                Err(_) => return None,   // Timeout elapsed
294                            }
295                        } else {
296                            receiver.recv().await.as_ref().is_some()
297                        }
298                    } else {
299                        // No broadcast channel, poll with interval
300                        if let Some(timeout_dur) = self.opts.timeout {
301                            match tokio::time::timeout(
302                                timeout_dur,
303                                tokio::time::sleep(self.opts.poll_interval),
304                            )
305                            .await
306                            {
307                                Ok(()) => true,
308                                Err(_) => return None, // Timeout elapsed
309                            }
310                        } else {
311                            tokio::time::sleep(self.opts.poll_interval).await;
312                            true
313                        }
314                    };
315
316                    if !wait_result {
317                        return None;
318                    }
319
320                    // Fetch new changes
321                    if self.fetch_changes().await.is_err() {
322                        return None;
323                    }
324                    if !self.buffer.is_empty() {
325                        self.state = LiveStreamState::Yielding;
326                    }
327                    // If still empty, stay in Waiting state
328                }
329                LiveStreamState::Done => {
330                    return None;
331                }
332            }
333        }
334    }
335}
336
337/// Handle for a live changes stream. Dropping or cancelling stops the stream.
338pub struct ChangesHandle {
339    cancel: CancellationToken,
340}
341
342impl ChangesHandle {
343    /// Cancel the live changes stream.
344    pub fn cancel(&self) {
345        self.cancel.cancel();
346    }
347}
348
349impl Drop for ChangesHandle {
350    fn drop(&mut self) {
351        self.cancel.cancel();
352    }
353}
354
355/// Start a live changes stream that sends events through an mpsc channel.
356///
357/// Spawns a background task that polls the adapter for changes and sends
358/// each `ChangeEvent` through the returned receiver. The `ChangesHandle`
359/// controls the stream's lifecycle.
360pub fn live_changes(
361    adapter: Arc<dyn Adapter>,
362    opts: ChangesStreamOptions,
363) -> (mpsc::Receiver<ChangeEvent>, ChangesHandle) {
364    let (tx, rx) = mpsc::channel(64);
365    let cancel = CancellationToken::new();
366    let cancel_clone = cancel.clone();
367    let filter = opts.filter.clone();
368
369    tokio::spawn(async move {
370        let mut stream =
371            LiveChangesStream::new(adapter, None, ChangesStreamOptions { live: true, ..opts });
372
373        loop {
374            tokio::select! {
375                change = stream.next_change() => {
376                    match change {
377                        Some(event) => {
378                            // Apply filter if set
379                            if let Some(ref f) = filter
380                                && !f(&event)
381                            {
382                                continue;
383                            }
384                            if tx.send(event).await.is_err() {
385                                break; // Receiver dropped
386                            }
387                        }
388                        None => break, // Stream ended (limit reached)
389                    }
390                }
391                _ = cancel_clone.cancelled() => break,
392            }
393        }
394    });
395
396    (rx, ChangesHandle { cancel })
397}
398
399/// Start a live changes stream that emits lifecycle events.
400///
401/// Like `live_changes()` but wraps each event in a `ChangesEvent` enum
402/// that includes `Active`, `Paused`, `Complete`, and `Error` lifecycle events
403/// alongside the actual `Change` events.
404pub fn live_changes_events(
405    adapter: Arc<dyn Adapter>,
406    opts: ChangesStreamOptions,
407) -> (mpsc::Receiver<ChangesEvent>, ChangesHandle) {
408    let (tx, rx) = mpsc::channel(64);
409    let cancel = CancellationToken::new();
410    let cancel_clone = cancel.clone();
411    let filter = opts.filter.clone();
412
413    tokio::spawn(async move {
414        let mut stream =
415            LiveChangesStream::new(adapter, None, ChangesStreamOptions { live: true, ..opts });
416
417        let mut was_paused = false;
418
419        loop {
420            tokio::select! {
421                change = stream.next_change() => {
422                    match change {
423                        Some(event) => {
424                            // Emit Active if we were paused
425                            if was_paused {
426                                was_paused = false;
427                                let _ = tx.send(ChangesEvent::Active).await;
428                            }
429
430                            // Apply filter if set
431                            if let Some(ref f) = filter
432                                && !f(&event)
433                            {
434                                continue;
435                            }
436
437                            if tx.send(ChangesEvent::Change(event)).await.is_err() {
438                                break;
439                            }
440                        }
441                        None => {
442                            // Stream ended
443                            let _ = tx.send(ChangesEvent::Complete {
444                                last_seq: stream.last_seq.clone(),
445                            }).await;
446                            break;
447                        }
448                    }
449                }
450                _ = cancel_clone.cancelled() => {
451                    let _ = tx.send(ChangesEvent::Complete {
452                        last_seq: stream.last_seq.clone(),
453                    }).await;
454                    break;
455                },
456            }
457
458            // If the buffer is exhausted and we're in waiting state, emit Paused
459            if stream.buffer_idx >= stream.buffer.len()
460                && matches!(stream.state, LiveStreamState::Waiting)
461                && !was_paused
462            {
463                was_paused = true;
464                let _ = tx.send(ChangesEvent::Paused).await;
465            }
466        }
467    });
468
469    (rx, ChangesHandle { cancel })
470}
471
472// ---------------------------------------------------------------------------
473// Tests
474// ---------------------------------------------------------------------------
475
476#[cfg(test)]
477mod tests {
478    use super::*;
479    use rouchdb_adapter_memory::MemoryAdapter;
480    use rouchdb_core::document::{BulkDocsOptions, Document};
481    use std::collections::HashMap;
482
483    async fn setup() -> (Arc<MemoryAdapter>, ChangeSender) {
484        let db = Arc::new(MemoryAdapter::new("test"));
485        let (sender, _rx) = ChangeSender::new(64);
486        (db, sender)
487    }
488
489    async fn put_doc(db: &dyn Adapter, id: &str, data: serde_json::Value) -> String {
490        let doc = Document {
491            id: id.into(),
492            rev: None,
493            deleted: false,
494            data,
495            attachments: HashMap::new(),
496        };
497        let results = db
498            .bulk_docs(vec![doc], BulkDocsOptions::new())
499            .await
500            .unwrap();
501        results[0].rev.clone().unwrap()
502    }
503
504    #[tokio::test]
505    async fn one_shot_changes() {
506        let (db, _sender) = setup().await;
507        put_doc(db.as_ref(), "a", serde_json::json!({"v": 1})).await;
508        put_doc(db.as_ref(), "b", serde_json::json!({"v": 2})).await;
509
510        let events = get_changes(db.as_ref(), ChangesStreamOptions::default())
511            .await
512            .unwrap();
513
514        assert_eq!(events.len(), 2);
515        assert_eq!(events[0].id, "a");
516        assert_eq!(events[1].id, "b");
517    }
518
519    #[tokio::test]
520    async fn one_shot_changes_since() {
521        let (db, _sender) = setup().await;
522        put_doc(db.as_ref(), "a", serde_json::json!({})).await;
523        put_doc(db.as_ref(), "b", serde_json::json!({})).await;
524        put_doc(db.as_ref(), "c", serde_json::json!({})).await;
525
526        let events = get_changes(
527            db.as_ref(),
528            ChangesStreamOptions {
529                since: Seq::Num(2),
530                ..Default::default()
531            },
532        )
533        .await
534        .unwrap();
535
536        assert_eq!(events.len(), 1);
537        assert_eq!(events[0].id, "c");
538    }
539
540    #[tokio::test]
541    async fn one_shot_with_limit() {
542        let (db, _sender) = setup().await;
543        for i in 0..5 {
544            put_doc(db.as_ref(), &format!("d{}", i), serde_json::json!({})).await;
545        }
546
547        let events = get_changes(
548            db.as_ref(),
549            ChangesStreamOptions {
550                limit: Some(2),
551                ..Default::default()
552            },
553        )
554        .await
555        .unwrap();
556
557        assert_eq!(events.len(), 2);
558    }
559
560    #[tokio::test]
561    async fn live_stream_initial_then_new() {
562        let (db, sender) = setup().await;
563        put_doc(db.as_ref(), "existing", serde_json::json!({})).await;
564
565        let receiver = sender.subscribe();
566        let db_clone = db.clone();
567
568        let mut stream = LiveChangesStream::new(
569            db.clone(),
570            Some(receiver),
571            ChangesStreamOptions {
572                live: true,
573                limit: Some(3),
574                ..Default::default()
575            },
576        );
577
578        // First event should be the existing doc
579        let event = stream.next_change().await.unwrap();
580        assert_eq!(event.id, "existing");
581
582        // Now add more docs in the background
583        let sender_clone = sender.clone();
584        tokio::spawn(async move {
585            tokio::time::sleep(Duration::from_millis(50)).await;
586            put_doc(db_clone.as_ref(), "new1", serde_json::json!({})).await;
587            sender_clone.notify(Seq::Num(2), "new1".into());
588            tokio::time::sleep(Duration::from_millis(50)).await;
589            put_doc(db_clone.as_ref(), "new2", serde_json::json!({})).await;
590            sender_clone.notify(Seq::Num(3), "new2".into());
591        });
592
593        let event = stream.next_change().await.unwrap();
594        assert_eq!(event.id, "new1");
595
596        let event = stream.next_change().await.unwrap();
597        assert_eq!(event.id, "new2");
598
599        // Limit reached (3)
600        assert!(stream.next_change().await.is_none());
601    }
602
603    #[tokio::test]
604    async fn live_changes_via_channel() {
605        let db = Arc::new(MemoryAdapter::new("test"));
606        put_doc(db.as_ref(), "a", serde_json::json!({"v": 1})).await;
607
608        let (mut rx, handle) = live_changes(
609            db.clone(),
610            ChangesStreamOptions {
611                live: true,
612                poll_interval: Duration::from_millis(50),
613                ..Default::default()
614            },
615        );
616
617        // Should receive the existing doc
618        let event = tokio::time::timeout(Duration::from_secs(2), rx.recv())
619            .await
620            .unwrap()
621            .unwrap();
622        assert_eq!(event.id, "a");
623
624        // Add a new doc — should be picked up by polling
625        put_doc(db.as_ref(), "b", serde_json::json!({"v": 2})).await;
626
627        let event = tokio::time::timeout(Duration::from_secs(2), rx.recv())
628            .await
629            .unwrap()
630            .unwrap();
631        assert_eq!(event.id, "b");
632
633        handle.cancel();
634    }
635
636    #[tokio::test]
637    async fn change_sender_subscribe() {
638        let (sender, _rx) = ChangeSender::new(16);
639        let mut sub = sender.subscribe();
640
641        sender.notify(Seq::Num(1), "doc1".into());
642
643        let notification = sub.recv().await.unwrap();
644        assert_eq!(notification.seq, Seq::Num(1));
645        assert_eq!(notification.doc_id, "doc1");
646    }
647}