1use std::sync::Arc;
10use std::time::Duration;
11
12use tokio::sync::broadcast;
13
14use rouchdb_core::adapter::Adapter;
15use rouchdb_core::document::{ChangeEvent, ChangesOptions, Seq};
16use rouchdb_core::error::Result;
17
18#[derive(Debug, Clone)]
20pub struct ChangeNotification {
21 pub seq: Seq,
22 pub doc_id: String,
23}
24
25#[derive(Debug, Clone)]
28pub struct ChangeSender {
29 tx: broadcast::Sender<ChangeNotification>,
30}
31
32impl ChangeSender {
33 pub fn new(capacity: usize) -> (Self, ChangeReceiver) {
34 let (tx, rx) = broadcast::channel(capacity);
35 (ChangeSender { tx }, ChangeReceiver { rx })
36 }
37
38 pub fn notify(&self, seq: Seq, doc_id: String) {
39 let _ = self.tx.send(ChangeNotification { seq, doc_id });
41 }
42
43 pub fn subscribe(&self) -> ChangeReceiver {
44 ChangeReceiver {
45 rx: self.tx.subscribe(),
46 }
47 }
48}
49
50pub struct ChangeReceiver {
52 rx: broadcast::Receiver<ChangeNotification>,
53}
54
55impl ChangeReceiver {
56 pub async fn recv(&mut self) -> Option<ChangeNotification> {
57 loop {
58 match self.rx.recv().await {
59 Ok(notification) => return Some(notification),
60 Err(broadcast::error::RecvError::Lagged(_)) => {
61 continue;
63 }
64 Err(broadcast::error::RecvError::Closed) => return None,
65 }
66 }
67 }
68}
69
70#[derive(Debug, Clone)]
72pub struct ChangesStreamOptions {
73 pub since: Seq,
74 pub live: bool,
75 pub include_docs: bool,
76 pub doc_ids: Option<Vec<String>>,
77 pub limit: Option<u64>,
78 pub poll_interval: Duration,
80}
81
82impl Default for ChangesStreamOptions {
83 fn default() -> Self {
84 Self {
85 since: Seq::default(),
86 live: false,
87 include_docs: false,
88 doc_ids: None,
89 limit: None,
90 poll_interval: Duration::from_millis(500),
91 }
92 }
93}
94
95pub async fn get_changes(
97 adapter: &dyn Adapter,
98 opts: ChangesStreamOptions,
99) -> Result<Vec<ChangeEvent>> {
100 let changes_opts = ChangesOptions {
101 since: opts.since,
102 limit: opts.limit,
103 descending: false,
104 include_docs: opts.include_docs,
105 live: false,
106 doc_ids: opts.doc_ids,
107 };
108
109 let response = adapter.changes(changes_opts).await?;
110 Ok(response.results)
111}
112
113pub struct LiveChangesStream {
118 adapter: Arc<dyn Adapter>,
119 receiver: Option<ChangeReceiver>,
120 opts: ChangesStreamOptions,
121 last_seq: Seq,
122 buffer: Vec<ChangeEvent>,
123 buffer_idx: usize,
124 state: LiveStreamState,
125 count: u64,
126}
127
128enum LiveStreamState {
129 FetchingInitial,
131 Yielding,
133 Waiting,
135 Done,
137}
138
139impl LiveChangesStream {
140 pub fn new(
141 adapter: Arc<dyn Adapter>,
142 receiver: Option<ChangeReceiver>,
143 opts: ChangesStreamOptions,
144 ) -> Self {
145 let last_seq = opts.since.clone();
146 Self {
147 adapter,
148 receiver,
149 opts,
150 last_seq,
151 buffer: Vec::new(),
152 buffer_idx: 0,
153 state: LiveStreamState::FetchingInitial,
154 count: 0,
155 }
156 }
157
158 async fn fetch_changes(&mut self) -> Result<()> {
160 let changes_opts = ChangesOptions {
161 since: self.last_seq.clone(),
162 limit: self.opts.limit.map(|l| l.saturating_sub(self.count)),
163 descending: false,
164 include_docs: self.opts.include_docs,
165 live: false,
166 doc_ids: self.opts.doc_ids.clone(),
167 };
168
169 let response = self.adapter.changes(changes_opts).await?;
170 if !response.results.is_empty() {
171 self.last_seq = response.last_seq;
172 }
173 self.buffer = response.results;
174 self.buffer_idx = 0;
175 Ok(())
176 }
177
178 pub async fn next_change(&mut self) -> Option<ChangeEvent> {
180 loop {
181 if let Some(limit) = self.opts.limit {
183 if self.count >= limit {
184 return None;
185 }
186 }
187
188 match self.state {
189 LiveStreamState::FetchingInitial => {
190 if self.fetch_changes().await.is_err() {
191 return None;
192 }
193 self.state = if self.buffer.is_empty() {
194 if self.opts.live {
195 LiveStreamState::Waiting
196 } else {
197 LiveStreamState::Done
198 }
199 } else {
200 LiveStreamState::Yielding
201 };
202 }
203 LiveStreamState::Yielding => {
204 if self.buffer_idx < self.buffer.len() {
205 let event = self.buffer[self.buffer_idx].clone();
206 self.buffer_idx += 1;
207 self.count += 1;
208 return Some(event);
209 }
210 self.state = if self.opts.live {
212 LiveStreamState::Waiting
213 } else {
214 LiveStreamState::Done
215 };
216 }
217 LiveStreamState::Waiting => {
218 if let Some(ref mut receiver) = self.receiver {
220 if receiver.recv().await.is_none() {
222 return None; }
224 } else {
225 tokio::time::sleep(self.opts.poll_interval).await;
227 }
228
229 if self.fetch_changes().await.is_err() {
231 return None;
232 }
233 if !self.buffer.is_empty() {
234 self.state = LiveStreamState::Yielding;
235 }
236 }
238 LiveStreamState::Done => {
239 return None;
240 }
241 }
242 }
243 }
244}
245
246#[cfg(test)]
251mod tests {
252 use super::*;
253 use rouchdb_adapter_memory::MemoryAdapter;
254 use rouchdb_core::document::{BulkDocsOptions, Document};
255 use std::collections::HashMap;
256
257 async fn setup() -> (Arc<MemoryAdapter>, ChangeSender) {
258 let db = Arc::new(MemoryAdapter::new("test"));
259 let (sender, _rx) = ChangeSender::new(64);
260 (db, sender)
261 }
262
263 async fn put_doc(db: &dyn Adapter, id: &str, data: serde_json::Value) -> String {
264 let doc = Document {
265 id: id.into(),
266 rev: None,
267 deleted: false,
268 data,
269 attachments: HashMap::new(),
270 };
271 let results = db
272 .bulk_docs(vec![doc], BulkDocsOptions::new())
273 .await
274 .unwrap();
275 results[0].rev.clone().unwrap()
276 }
277
278 #[tokio::test]
279 async fn one_shot_changes() {
280 let (db, _sender) = setup().await;
281 put_doc(db.as_ref(), "a", serde_json::json!({"v": 1})).await;
282 put_doc(db.as_ref(), "b", serde_json::json!({"v": 2})).await;
283
284 let events = get_changes(
285 db.as_ref(),
286 ChangesStreamOptions::default(),
287 )
288 .await
289 .unwrap();
290
291 assert_eq!(events.len(), 2);
292 assert_eq!(events[0].id, "a");
293 assert_eq!(events[1].id, "b");
294 }
295
296 #[tokio::test]
297 async fn one_shot_changes_since() {
298 let (db, _sender) = setup().await;
299 put_doc(db.as_ref(), "a", serde_json::json!({})).await;
300 put_doc(db.as_ref(), "b", serde_json::json!({})).await;
301 put_doc(db.as_ref(), "c", serde_json::json!({})).await;
302
303 let events = get_changes(
304 db.as_ref(),
305 ChangesStreamOptions {
306 since: Seq::Num(2),
307 ..Default::default()
308 },
309 )
310 .await
311 .unwrap();
312
313 assert_eq!(events.len(), 1);
314 assert_eq!(events[0].id, "c");
315 }
316
317 #[tokio::test]
318 async fn one_shot_with_limit() {
319 let (db, _sender) = setup().await;
320 for i in 0..5 {
321 put_doc(db.as_ref(), &format!("d{}", i), serde_json::json!({})).await;
322 }
323
324 let events = get_changes(
325 db.as_ref(),
326 ChangesStreamOptions {
327 limit: Some(2),
328 ..Default::default()
329 },
330 )
331 .await
332 .unwrap();
333
334 assert_eq!(events.len(), 2);
335 }
336
337 #[tokio::test]
338 async fn live_stream_initial_then_new() {
339 let (db, sender) = setup().await;
340 put_doc(db.as_ref(), "existing", serde_json::json!({})).await;
341
342 let receiver = sender.subscribe();
343 let db_clone = db.clone();
344
345 let mut stream = LiveChangesStream::new(
346 db.clone(),
347 Some(receiver),
348 ChangesStreamOptions {
349 live: true,
350 limit: Some(3),
351 ..Default::default()
352 },
353 );
354
355 let event = stream.next_change().await.unwrap();
357 assert_eq!(event.id, "existing");
358
359 let sender_clone = sender.clone();
361 tokio::spawn(async move {
362 tokio::time::sleep(Duration::from_millis(50)).await;
363 put_doc(db_clone.as_ref(), "new1", serde_json::json!({})).await;
364 sender_clone.notify(Seq::Num(2), "new1".into());
365 tokio::time::sleep(Duration::from_millis(50)).await;
366 put_doc(db_clone.as_ref(), "new2", serde_json::json!({})).await;
367 sender_clone.notify(Seq::Num(3), "new2".into());
368 });
369
370 let event = stream.next_change().await.unwrap();
371 assert_eq!(event.id, "new1");
372
373 let event = stream.next_change().await.unwrap();
374 assert_eq!(event.id, "new2");
375
376 assert!(stream.next_change().await.is_none());
378 }
379
380 #[tokio::test]
381 async fn change_sender_subscribe() {
382 let (sender, _rx) = ChangeSender::new(16);
383 let mut sub = sender.subscribe();
384
385 sender.notify(Seq::Num(1), "doc1".into());
386
387 let notification = sub.recv().await.unwrap();
388 assert_eq!(notification.seq, Seq::Num(1));
389 assert_eq!(notification.doc_id, "doc1");
390 }
391}