1use std::sync::Arc;
9use std::time::Duration;
10
11use tokio::sync::{broadcast, mpsc};
12use tokio_util::sync::CancellationToken;
13
14use rouchdb_core::adapter::Adapter;
15use rouchdb_core::document::{ChangeEvent, ChangesOptions, ChangesStyle, Seq};
16
17pub type ChangesFilter = Arc<dyn Fn(&ChangeEvent) -> bool + Send + Sync>;
19
20#[derive(Debug, Clone)]
25pub enum ChangesEvent {
26 Change(ChangeEvent),
28 Complete { last_seq: Seq },
30 Error(String),
32 Paused,
34 Active,
36}
37use rouchdb_core::error::Result;
38
39#[derive(Debug, Clone)]
41pub struct ChangeNotification {
42 pub seq: Seq,
43 pub doc_id: String,
44}
45
46#[derive(Debug, Clone)]
49pub struct ChangeSender {
50 tx: broadcast::Sender<ChangeNotification>,
51}
52
53impl ChangeSender {
54 pub fn new(capacity: usize) -> (Self, ChangeReceiver) {
55 let (tx, rx) = broadcast::channel(capacity);
56 (ChangeSender { tx }, ChangeReceiver { rx })
57 }
58
59 pub fn notify(&self, seq: Seq, doc_id: String) {
60 let _ = self.tx.send(ChangeNotification { seq, doc_id });
62 }
63
64 pub fn subscribe(&self) -> ChangeReceiver {
65 ChangeReceiver {
66 rx: self.tx.subscribe(),
67 }
68 }
69}
70
71pub struct ChangeReceiver {
73 rx: broadcast::Receiver<ChangeNotification>,
74}
75
76impl ChangeReceiver {
77 pub async fn recv(&mut self) -> Option<ChangeNotification> {
78 loop {
79 match self.rx.recv().await {
80 Ok(notification) => return Some(notification),
81 Err(broadcast::error::RecvError::Lagged(_)) => {
82 continue;
84 }
85 Err(broadcast::error::RecvError::Closed) => return None,
86 }
87 }
88 }
89}
90
91#[derive(Clone)]
93pub struct ChangesStreamOptions {
94 pub since: Seq,
95 pub live: bool,
96 pub include_docs: bool,
97 pub doc_ids: Option<Vec<String>>,
98 pub selector: Option<serde_json::Value>,
99 pub limit: Option<u64>,
100 pub conflicts: bool,
102 pub style: ChangesStyle,
104 pub filter: Option<ChangesFilter>,
106 pub poll_interval: Duration,
108 pub timeout: Option<Duration>,
110 pub heartbeat: Option<Duration>,
112}
113
114impl Default for ChangesStreamOptions {
115 fn default() -> Self {
116 Self {
117 since: Seq::default(),
118 live: false,
119 include_docs: false,
120 doc_ids: None,
121 selector: None,
122 limit: None,
123 conflicts: false,
124 style: ChangesStyle::default(),
125 filter: None,
126 poll_interval: Duration::from_millis(500),
127 timeout: None,
128 heartbeat: None,
129 }
130 }
131}
132
133impl std::fmt::Debug for ChangesStreamOptions {
134 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
135 f.debug_struct("ChangesStreamOptions")
136 .field("since", &self.since)
137 .field("live", &self.live)
138 .field("include_docs", &self.include_docs)
139 .field("doc_ids", &self.doc_ids)
140 .field("selector", &self.selector)
141 .field("limit", &self.limit)
142 .field("conflicts", &self.conflicts)
143 .field("style", &self.style)
144 .field("filter", &self.filter.as_ref().map(|_| "<fn>"))
145 .field("poll_interval", &self.poll_interval)
146 .field("timeout", &self.timeout)
147 .field("heartbeat", &self.heartbeat)
148 .finish()
149 }
150}
151
152pub async fn get_changes(
154 adapter: &dyn Adapter,
155 opts: ChangesStreamOptions,
156) -> Result<Vec<ChangeEvent>> {
157 let filter = opts.filter.clone();
158 let changes_opts = ChangesOptions {
159 since: opts.since,
160 limit: opts.limit,
161 descending: false,
162 include_docs: opts.include_docs,
163 live: false,
164 doc_ids: opts.doc_ids,
165 conflicts: opts.conflicts,
166 style: opts.style,
167 ..Default::default()
168 };
169
170 let response = adapter.changes(changes_opts).await?;
171 let results = if let Some(f) = filter {
172 response.results.into_iter().filter(|e| f(e)).collect()
173 } else {
174 response.results
175 };
176 Ok(results)
177}
178
179pub struct LiveChangesStream {
184 adapter: Arc<dyn Adapter>,
185 receiver: Option<ChangeReceiver>,
186 opts: ChangesStreamOptions,
187 last_seq: Seq,
188 buffer: Vec<ChangeEvent>,
189 buffer_idx: usize,
190 state: LiveStreamState,
191 count: u64,
192}
193
194enum LiveStreamState {
195 FetchingInitial,
197 Yielding,
199 Waiting,
201 Done,
203}
204
205impl LiveChangesStream {
206 pub fn new(
207 adapter: Arc<dyn Adapter>,
208 receiver: Option<ChangeReceiver>,
209 opts: ChangesStreamOptions,
210 ) -> Self {
211 let last_seq = opts.since.clone();
212 Self {
213 adapter,
214 receiver,
215 opts,
216 last_seq,
217 buffer: Vec::new(),
218 buffer_idx: 0,
219 state: LiveStreamState::FetchingInitial,
220 count: 0,
221 }
222 }
223
224 async fn fetch_changes(&mut self) -> Result<()> {
226 let changes_opts = ChangesOptions {
227 since: self.last_seq.clone(),
228 limit: self.opts.limit.map(|l| l.saturating_sub(self.count)),
229 descending: false,
230 include_docs: self.opts.include_docs,
231 live: false,
232 doc_ids: self.opts.doc_ids.clone(),
233 conflicts: self.opts.conflicts,
234 style: self.opts.style.clone(),
235 ..Default::default()
236 };
237
238 let response = self.adapter.changes(changes_opts).await?;
239 if !response.results.is_empty() {
240 self.last_seq = response.last_seq;
241 }
242 self.buffer = response.results;
243 self.buffer_idx = 0;
244 Ok(())
245 }
246
247 pub async fn next_change(&mut self) -> Option<ChangeEvent> {
249 loop {
250 if let Some(limit) = self.opts.limit
252 && self.count >= limit
253 {
254 return None;
255 }
256
257 match self.state {
258 LiveStreamState::FetchingInitial => {
259 if self.fetch_changes().await.is_err() {
260 return None;
261 }
262 self.state = if self.buffer.is_empty() {
263 if self.opts.live {
264 LiveStreamState::Waiting
265 } else {
266 LiveStreamState::Done
267 }
268 } else {
269 LiveStreamState::Yielding
270 };
271 }
272 LiveStreamState::Yielding => {
273 if self.buffer_idx < self.buffer.len() {
274 let event = self.buffer[self.buffer_idx].clone();
275 self.buffer_idx += 1;
276 self.count += 1;
277 return Some(event);
278 }
279 self.state = if self.opts.live {
281 LiveStreamState::Waiting
282 } else {
283 LiveStreamState::Done
284 };
285 }
286 LiveStreamState::Waiting => {
287 let wait_result = if let Some(ref mut receiver) = self.receiver {
289 if let Some(timeout_dur) = self.opts.timeout {
290 match tokio::time::timeout(timeout_dur, receiver.recv()).await {
291 Ok(Some(_)) => true,
292 Ok(None) => return None, Err(_) => return None, }
295 } else {
296 receiver.recv().await.as_ref().is_some()
297 }
298 } else {
299 if let Some(timeout_dur) = self.opts.timeout {
301 match tokio::time::timeout(
302 timeout_dur,
303 tokio::time::sleep(self.opts.poll_interval),
304 )
305 .await
306 {
307 Ok(()) => true,
308 Err(_) => return None, }
310 } else {
311 tokio::time::sleep(self.opts.poll_interval).await;
312 true
313 }
314 };
315
316 if !wait_result {
317 return None;
318 }
319
320 if self.fetch_changes().await.is_err() {
322 return None;
323 }
324 if !self.buffer.is_empty() {
325 self.state = LiveStreamState::Yielding;
326 }
327 }
329 LiveStreamState::Done => {
330 return None;
331 }
332 }
333 }
334 }
335}
336
337pub struct ChangesHandle {
339 cancel: CancellationToken,
340}
341
342impl ChangesHandle {
343 pub fn cancel(&self) {
345 self.cancel.cancel();
346 }
347}
348
349impl Drop for ChangesHandle {
350 fn drop(&mut self) {
351 self.cancel.cancel();
352 }
353}
354
355pub fn live_changes(
361 adapter: Arc<dyn Adapter>,
362 opts: ChangesStreamOptions,
363) -> (mpsc::Receiver<ChangeEvent>, ChangesHandle) {
364 let (tx, rx) = mpsc::channel(64);
365 let cancel = CancellationToken::new();
366 let cancel_clone = cancel.clone();
367 let filter = opts.filter.clone();
368
369 tokio::spawn(async move {
370 let mut stream =
371 LiveChangesStream::new(adapter, None, ChangesStreamOptions { live: true, ..opts });
372
373 loop {
374 tokio::select! {
375 change = stream.next_change() => {
376 match change {
377 Some(event) => {
378 if let Some(ref f) = filter
380 && !f(&event)
381 {
382 continue;
383 }
384 if tx.send(event).await.is_err() {
385 break; }
387 }
388 None => break, }
390 }
391 _ = cancel_clone.cancelled() => break,
392 }
393 }
394 });
395
396 (rx, ChangesHandle { cancel })
397}
398
399pub fn live_changes_events(
405 adapter: Arc<dyn Adapter>,
406 opts: ChangesStreamOptions,
407) -> (mpsc::Receiver<ChangesEvent>, ChangesHandle) {
408 let (tx, rx) = mpsc::channel(64);
409 let cancel = CancellationToken::new();
410 let cancel_clone = cancel.clone();
411 let filter = opts.filter.clone();
412
413 tokio::spawn(async move {
414 let mut stream =
415 LiveChangesStream::new(adapter, None, ChangesStreamOptions { live: true, ..opts });
416
417 let mut was_paused = false;
418
419 loop {
420 tokio::select! {
421 change = stream.next_change() => {
422 match change {
423 Some(event) => {
424 if was_paused {
426 was_paused = false;
427 let _ = tx.send(ChangesEvent::Active).await;
428 }
429
430 if let Some(ref f) = filter
432 && !f(&event)
433 {
434 continue;
435 }
436
437 if tx.send(ChangesEvent::Change(event)).await.is_err() {
438 break;
439 }
440 }
441 None => {
442 let _ = tx.send(ChangesEvent::Complete {
444 last_seq: stream.last_seq.clone(),
445 }).await;
446 break;
447 }
448 }
449 }
450 _ = cancel_clone.cancelled() => {
451 let _ = tx.send(ChangesEvent::Complete {
452 last_seq: stream.last_seq.clone(),
453 }).await;
454 break;
455 },
456 }
457
458 if stream.buffer_idx >= stream.buffer.len()
460 && matches!(stream.state, LiveStreamState::Waiting)
461 && !was_paused
462 {
463 was_paused = true;
464 let _ = tx.send(ChangesEvent::Paused).await;
465 }
466 }
467 });
468
469 (rx, ChangesHandle { cancel })
470}
471
472#[cfg(test)]
477mod tests {
478 use super::*;
479 use rouchdb_adapter_memory::MemoryAdapter;
480 use rouchdb_core::document::{BulkDocsOptions, Document};
481 use std::collections::HashMap;
482
483 async fn setup() -> (Arc<MemoryAdapter>, ChangeSender) {
484 let db = Arc::new(MemoryAdapter::new("test"));
485 let (sender, _rx) = ChangeSender::new(64);
486 (db, sender)
487 }
488
489 async fn put_doc(db: &dyn Adapter, id: &str, data: serde_json::Value) -> String {
490 let doc = Document {
491 id: id.into(),
492 rev: None,
493 deleted: false,
494 data,
495 attachments: HashMap::new(),
496 };
497 let results = db
498 .bulk_docs(vec![doc], BulkDocsOptions::new())
499 .await
500 .unwrap();
501 results[0].rev.clone().unwrap()
502 }
503
504 #[tokio::test]
505 async fn one_shot_changes() {
506 let (db, _sender) = setup().await;
507 put_doc(db.as_ref(), "a", serde_json::json!({"v": 1})).await;
508 put_doc(db.as_ref(), "b", serde_json::json!({"v": 2})).await;
509
510 let events = get_changes(db.as_ref(), ChangesStreamOptions::default())
511 .await
512 .unwrap();
513
514 assert_eq!(events.len(), 2);
515 assert_eq!(events[0].id, "a");
516 assert_eq!(events[1].id, "b");
517 }
518
519 #[tokio::test]
520 async fn one_shot_changes_since() {
521 let (db, _sender) = setup().await;
522 put_doc(db.as_ref(), "a", serde_json::json!({})).await;
523 put_doc(db.as_ref(), "b", serde_json::json!({})).await;
524 put_doc(db.as_ref(), "c", serde_json::json!({})).await;
525
526 let events = get_changes(
527 db.as_ref(),
528 ChangesStreamOptions {
529 since: Seq::Num(2),
530 ..Default::default()
531 },
532 )
533 .await
534 .unwrap();
535
536 assert_eq!(events.len(), 1);
537 assert_eq!(events[0].id, "c");
538 }
539
540 #[tokio::test]
541 async fn one_shot_with_limit() {
542 let (db, _sender) = setup().await;
543 for i in 0..5 {
544 put_doc(db.as_ref(), &format!("d{}", i), serde_json::json!({})).await;
545 }
546
547 let events = get_changes(
548 db.as_ref(),
549 ChangesStreamOptions {
550 limit: Some(2),
551 ..Default::default()
552 },
553 )
554 .await
555 .unwrap();
556
557 assert_eq!(events.len(), 2);
558 }
559
560 #[tokio::test]
561 async fn live_stream_initial_then_new() {
562 let (db, sender) = setup().await;
563 put_doc(db.as_ref(), "existing", serde_json::json!({})).await;
564
565 let receiver = sender.subscribe();
566 let db_clone = db.clone();
567
568 let mut stream = LiveChangesStream::new(
569 db.clone(),
570 Some(receiver),
571 ChangesStreamOptions {
572 live: true,
573 limit: Some(3),
574 ..Default::default()
575 },
576 );
577
578 let event = stream.next_change().await.unwrap();
580 assert_eq!(event.id, "existing");
581
582 let sender_clone = sender.clone();
584 tokio::spawn(async move {
585 tokio::time::sleep(Duration::from_millis(50)).await;
586 put_doc(db_clone.as_ref(), "new1", serde_json::json!({})).await;
587 sender_clone.notify(Seq::Num(2), "new1".into());
588 tokio::time::sleep(Duration::from_millis(50)).await;
589 put_doc(db_clone.as_ref(), "new2", serde_json::json!({})).await;
590 sender_clone.notify(Seq::Num(3), "new2".into());
591 });
592
593 let event = stream.next_change().await.unwrap();
594 assert_eq!(event.id, "new1");
595
596 let event = stream.next_change().await.unwrap();
597 assert_eq!(event.id, "new2");
598
599 assert!(stream.next_change().await.is_none());
601 }
602
603 #[tokio::test]
604 async fn live_changes_via_channel() {
605 let db = Arc::new(MemoryAdapter::new("test"));
606 put_doc(db.as_ref(), "a", serde_json::json!({"v": 1})).await;
607
608 let (mut rx, handle) = live_changes(
609 db.clone(),
610 ChangesStreamOptions {
611 live: true,
612 poll_interval: Duration::from_millis(50),
613 ..Default::default()
614 },
615 );
616
617 let event = tokio::time::timeout(Duration::from_secs(2), rx.recv())
619 .await
620 .unwrap()
621 .unwrap();
622 assert_eq!(event.id, "a");
623
624 put_doc(db.as_ref(), "b", serde_json::json!({"v": 2})).await;
626
627 let event = tokio::time::timeout(Duration::from_secs(2), rx.recv())
628 .await
629 .unwrap()
630 .unwrap();
631 assert_eq!(event.id, "b");
632
633 handle.cancel();
634 }
635
636 #[tokio::test]
637 async fn change_sender_subscribe() {
638 let (sender, _rx) = ChangeSender::new(16);
639 let mut sub = sender.subscribe();
640
641 sender.notify(Seq::Num(1), "doc1".into());
642
643 let notification = sub.recv().await.unwrap();
644 assert_eq!(notification.seq, Seq::Num(1));
645 assert_eq!(notification.doc_id, "doc1");
646 }
647}