oxide_mirror/source.rs
1//! The [`SyncSource`] trait + reusable test helpers.
2
3use std::pin::Pin;
4use std::sync::Mutex;
5
6use async_trait::async_trait;
7use futures_util::stream;
8use futures_util::Stream;
9
10use crate::error::Result;
11use crate::event::Delta;
12
13/// A batch of deltas returned by [`SyncSource::pull`].
14#[derive(Debug, Clone, Default)]
15pub struct PullResult {
16 /// The deltas to apply, in the order the source observed them.
17 pub deltas: Vec<Delta>,
18 /// Opaque cursor that callers should store and pass back on the next
19 /// pull. `None` means "no further pages" — the syncer treats this as
20 /// "leave the existing cursor untouched" so a future call resumes from
21 /// the same point.
22 pub next_cursor: Option<String>,
23 /// Whether the source has more pages immediately available.
24 pub has_more: bool,
25}
26
27/// Pinned, heap-allocated delta stream.
28pub type DeltaStream = Pin<Box<dyn Stream<Item = Result<Delta>> + Send>>;
29
30/// Anything that can emit deltas into the local mirror.
31///
32/// Generated `oxide-gen` clients implement this by translating their API's
33/// list / change-feed endpoints into delta batches.
34#[async_trait]
35pub trait SyncSource: Send + Sync {
36 /// Stable id of the source (matches `Provenance::source`).
37 fn id(&self) -> &str;
38
39 /// Pull the next batch of deltas. `cursor` is the value the source
40 /// returned from the previous call, or `None` for the first pull.
41 async fn pull(&self, cursor: Option<String>) -> Result<PullResult>;
42
43 /// Subscribe to a live stream of deltas starting from `cursor`.
44 ///
45 /// The default implementation does a **single** `pull` call and wraps
46 /// the resulting batch as a finite stream. Override this method for
47 /// sources that support real push (WebSocket, SSE, change-feeds, etc.).
48 async fn subscribe(&self, cursor: Option<String>) -> Result<DeltaStream> {
49 let batch = self.pull(cursor).await?;
50 let deltas: Vec<Result<Delta>> = batch.deltas.into_iter().map(Ok).collect();
51 Ok(Box::pin(stream::iter(deltas)))
52 }
53}
54
55/// In-memory [`SyncSource`] used by tests.
56///
57/// Holds a list of pre-baked [`PullResult`] batches and serves them in order;
58/// once exhausted, subsequent calls return an empty batch with `has_more =
59/// false`.
60pub struct StaticSource {
61 id: String,
62 batches: Mutex<Vec<PullResult>>,
63}
64
65impl StaticSource {
66 /// Build a static source with the given id and batches.
67 pub fn new(id: impl Into<String>, batches: Vec<PullResult>) -> Self {
68 Self {
69 id: id.into(),
70 batches: Mutex::new(batches),
71 }
72 }
73
74 /// Convenience: build a single-batch source from a flat delta list.
75 pub fn from_deltas(id: impl Into<String>, deltas: Vec<Delta>) -> Self {
76 Self::new(
77 id,
78 vec![PullResult {
79 deltas,
80 next_cursor: None,
81 has_more: false,
82 }],
83 )
84 }
85
86 /// Push another batch to be served on the next pull.
87 pub fn enqueue(&self, batch: PullResult) {
88 self.batches.lock().unwrap().push(batch);
89 }
90}
91
92#[async_trait]
93impl SyncSource for StaticSource {
94 fn id(&self) -> &str {
95 &self.id
96 }
97
98 async fn pull(&self, _cursor: Option<String>) -> Result<PullResult> {
99 let mut batches = self.batches.lock().unwrap();
100 if batches.is_empty() {
101 Ok(PullResult::default())
102 } else {
103 Ok(batches.remove(0))
104 }
105 }
106}