1use std::sync::Arc;
9use std::time::Duration;
10
11use tokio::sync::broadcast;
12
13use rouchdb_core::adapter::Adapter;
14use rouchdb_core::document::{ChangeEvent, ChangesOptions, Seq};
15use rouchdb_core::error::Result;
16
17#[derive(Debug, Clone)]
19pub struct ChangeNotification {
20 pub seq: Seq,
21 pub doc_id: String,
22}
23
24#[derive(Debug, Clone)]
27pub struct ChangeSender {
28 tx: broadcast::Sender<ChangeNotification>,
29}
30
31impl ChangeSender {
32 pub fn new(capacity: usize) -> (Self, ChangeReceiver) {
33 let (tx, rx) = broadcast::channel(capacity);
34 (ChangeSender { tx }, ChangeReceiver { rx })
35 }
36
37 pub fn notify(&self, seq: Seq, doc_id: String) {
38 let _ = self.tx.send(ChangeNotification { seq, doc_id });
40 }
41
42 pub fn subscribe(&self) -> ChangeReceiver {
43 ChangeReceiver {
44 rx: self.tx.subscribe(),
45 }
46 }
47}
48
49pub struct ChangeReceiver {
51 rx: broadcast::Receiver<ChangeNotification>,
52}
53
54impl ChangeReceiver {
55 pub async fn recv(&mut self) -> Option<ChangeNotification> {
56 loop {
57 match self.rx.recv().await {
58 Ok(notification) => return Some(notification),
59 Err(broadcast::error::RecvError::Lagged(_)) => {
60 continue;
62 }
63 Err(broadcast::error::RecvError::Closed) => return None,
64 }
65 }
66 }
67}
68
69#[derive(Debug, Clone)]
71pub struct ChangesStreamOptions {
72 pub since: Seq,
73 pub live: bool,
74 pub include_docs: bool,
75 pub doc_ids: Option<Vec<String>>,
76 pub limit: Option<u64>,
77 pub poll_interval: Duration,
79}
80
81impl Default for ChangesStreamOptions {
82 fn default() -> Self {
83 Self {
84 since: Seq::default(),
85 live: false,
86 include_docs: false,
87 doc_ids: None,
88 limit: None,
89 poll_interval: Duration::from_millis(500),
90 }
91 }
92}
93
94pub async fn get_changes(
96 adapter: &dyn Adapter,
97 opts: ChangesStreamOptions,
98) -> Result<Vec<ChangeEvent>> {
99 let changes_opts = ChangesOptions {
100 since: opts.since,
101 limit: opts.limit,
102 descending: false,
103 include_docs: opts.include_docs,
104 live: false,
105 doc_ids: opts.doc_ids,
106 };
107
108 let response = adapter.changes(changes_opts).await?;
109 Ok(response.results)
110}
111
112pub struct LiveChangesStream {
117 adapter: Arc<dyn Adapter>,
118 receiver: Option<ChangeReceiver>,
119 opts: ChangesStreamOptions,
120 last_seq: Seq,
121 buffer: Vec<ChangeEvent>,
122 buffer_idx: usize,
123 state: LiveStreamState,
124 count: u64,
125}
126
127enum LiveStreamState {
128 FetchingInitial,
130 Yielding,
132 Waiting,
134 Done,
136}
137
138impl LiveChangesStream {
139 pub fn new(
140 adapter: Arc<dyn Adapter>,
141 receiver: Option<ChangeReceiver>,
142 opts: ChangesStreamOptions,
143 ) -> Self {
144 let last_seq = opts.since.clone();
145 Self {
146 adapter,
147 receiver,
148 opts,
149 last_seq,
150 buffer: Vec::new(),
151 buffer_idx: 0,
152 state: LiveStreamState::FetchingInitial,
153 count: 0,
154 }
155 }
156
157 async fn fetch_changes(&mut self) -> Result<()> {
159 let changes_opts = ChangesOptions {
160 since: self.last_seq.clone(),
161 limit: self.opts.limit.map(|l| l.saturating_sub(self.count)),
162 descending: false,
163 include_docs: self.opts.include_docs,
164 live: false,
165 doc_ids: self.opts.doc_ids.clone(),
166 };
167
168 let response = self.adapter.changes(changes_opts).await?;
169 if !response.results.is_empty() {
170 self.last_seq = response.last_seq;
171 }
172 self.buffer = response.results;
173 self.buffer_idx = 0;
174 Ok(())
175 }
176
177 pub async fn next_change(&mut self) -> Option<ChangeEvent> {
179 loop {
180 if let Some(limit) = self.opts.limit
182 && self.count >= limit
183 {
184 return None;
185 }
186
187 match self.state {
188 LiveStreamState::FetchingInitial => {
189 if self.fetch_changes().await.is_err() {
190 return None;
191 }
192 self.state = if self.buffer.is_empty() {
193 if self.opts.live {
194 LiveStreamState::Waiting
195 } else {
196 LiveStreamState::Done
197 }
198 } else {
199 LiveStreamState::Yielding
200 };
201 }
202 LiveStreamState::Yielding => {
203 if self.buffer_idx < self.buffer.len() {
204 let event = self.buffer[self.buffer_idx].clone();
205 self.buffer_idx += 1;
206 self.count += 1;
207 return Some(event);
208 }
209 self.state = if self.opts.live {
211 LiveStreamState::Waiting
212 } else {
213 LiveStreamState::Done
214 };
215 }
216 LiveStreamState::Waiting => {
217 if let Some(ref mut receiver) = self.receiver {
219 receiver.recv().await.as_ref()?;
221 } else {
222 tokio::time::sleep(self.opts.poll_interval).await;
224 }
225
226 if self.fetch_changes().await.is_err() {
228 return None;
229 }
230 if !self.buffer.is_empty() {
231 self.state = LiveStreamState::Yielding;
232 }
233 }
235 LiveStreamState::Done => {
236 return None;
237 }
238 }
239 }
240 }
241}
242
243#[cfg(test)]
248mod tests {
249 use super::*;
250 use rouchdb_adapter_memory::MemoryAdapter;
251 use rouchdb_core::document::{BulkDocsOptions, Document};
252 use std::collections::HashMap;
253
254 async fn setup() -> (Arc<MemoryAdapter>, ChangeSender) {
255 let db = Arc::new(MemoryAdapter::new("test"));
256 let (sender, _rx) = ChangeSender::new(64);
257 (db, sender)
258 }
259
260 async fn put_doc(db: &dyn Adapter, id: &str, data: serde_json::Value) -> String {
261 let doc = Document {
262 id: id.into(),
263 rev: None,
264 deleted: false,
265 data,
266 attachments: HashMap::new(),
267 };
268 let results = db
269 .bulk_docs(vec![doc], BulkDocsOptions::new())
270 .await
271 .unwrap();
272 results[0].rev.clone().unwrap()
273 }
274
275 #[tokio::test]
276 async fn one_shot_changes() {
277 let (db, _sender) = setup().await;
278 put_doc(db.as_ref(), "a", serde_json::json!({"v": 1})).await;
279 put_doc(db.as_ref(), "b", serde_json::json!({"v": 2})).await;
280
281 let events = get_changes(db.as_ref(), ChangesStreamOptions::default())
282 .await
283 .unwrap();
284
285 assert_eq!(events.len(), 2);
286 assert_eq!(events[0].id, "a");
287 assert_eq!(events[1].id, "b");
288 }
289
290 #[tokio::test]
291 async fn one_shot_changes_since() {
292 let (db, _sender) = setup().await;
293 put_doc(db.as_ref(), "a", serde_json::json!({})).await;
294 put_doc(db.as_ref(), "b", serde_json::json!({})).await;
295 put_doc(db.as_ref(), "c", serde_json::json!({})).await;
296
297 let events = get_changes(
298 db.as_ref(),
299 ChangesStreamOptions {
300 since: Seq::Num(2),
301 ..Default::default()
302 },
303 )
304 .await
305 .unwrap();
306
307 assert_eq!(events.len(), 1);
308 assert_eq!(events[0].id, "c");
309 }
310
311 #[tokio::test]
312 async fn one_shot_with_limit() {
313 let (db, _sender) = setup().await;
314 for i in 0..5 {
315 put_doc(db.as_ref(), &format!("d{}", i), serde_json::json!({})).await;
316 }
317
318 let events = get_changes(
319 db.as_ref(),
320 ChangesStreamOptions {
321 limit: Some(2),
322 ..Default::default()
323 },
324 )
325 .await
326 .unwrap();
327
328 assert_eq!(events.len(), 2);
329 }
330
331 #[tokio::test]
332 async fn live_stream_initial_then_new() {
333 let (db, sender) = setup().await;
334 put_doc(db.as_ref(), "existing", serde_json::json!({})).await;
335
336 let receiver = sender.subscribe();
337 let db_clone = db.clone();
338
339 let mut stream = LiveChangesStream::new(
340 db.clone(),
341 Some(receiver),
342 ChangesStreamOptions {
343 live: true,
344 limit: Some(3),
345 ..Default::default()
346 },
347 );
348
349 let event = stream.next_change().await.unwrap();
351 assert_eq!(event.id, "existing");
352
353 let sender_clone = sender.clone();
355 tokio::spawn(async move {
356 tokio::time::sleep(Duration::from_millis(50)).await;
357 put_doc(db_clone.as_ref(), "new1", serde_json::json!({})).await;
358 sender_clone.notify(Seq::Num(2), "new1".into());
359 tokio::time::sleep(Duration::from_millis(50)).await;
360 put_doc(db_clone.as_ref(), "new2", serde_json::json!({})).await;
361 sender_clone.notify(Seq::Num(3), "new2".into());
362 });
363
364 let event = stream.next_change().await.unwrap();
365 assert_eq!(event.id, "new1");
366
367 let event = stream.next_change().await.unwrap();
368 assert_eq!(event.id, "new2");
369
370 assert!(stream.next_change().await.is_none());
372 }
373
374 #[tokio::test]
375 async fn change_sender_subscribe() {
376 let (sender, _rx) = ChangeSender::new(16);
377 let mut sub = sender.subscribe();
378
379 sender.notify(Seq::Num(1), "doc1".into());
380
381 let notification = sub.recv().await.unwrap();
382 assert_eq!(notification.seq, Seq::Num(1));
383 assert_eq!(notification.doc_id, "doc1");
384 }
385}