Skip to main content

oxide_mirror/
source.rs

1//! The [`SyncSource`] trait + reusable test helpers.
2
3use std::pin::Pin;
4use std::sync::Mutex;
5
6use async_trait::async_trait;
7use futures_util::stream;
8use futures_util::Stream;
9
10use crate::error::Result;
11use crate::event::Delta;
12
13/// A batch of deltas returned by [`SyncSource::pull`].
14#[derive(Debug, Clone, Default)]
15pub struct PullResult {
16    /// The deltas to apply, in the order the source observed them.
17    pub deltas: Vec<Delta>,
18    /// Opaque cursor that callers should store and pass back on the next
19    /// pull. `None` means "no further pages" — the syncer treats this as
20    /// "leave the existing cursor untouched" so a future call resumes from
21    /// the same point.
22    pub next_cursor: Option<String>,
23    /// Whether the source has more pages immediately available.
24    pub has_more: bool,
25}
26
27/// Pinned, heap-allocated delta stream.
28pub type DeltaStream = Pin<Box<dyn Stream<Item = Result<Delta>> + Send>>;
29
30/// Anything that can emit deltas into the local mirror.
31///
32/// Generated `oxide-gen` clients implement this by translating their API's
33/// list / change-feed endpoints into delta batches.
34#[async_trait]
35pub trait SyncSource: Send + Sync {
36    /// Stable id of the source (matches `Provenance::source`).
37    fn id(&self) -> &str;
38
39    /// Pull the next batch of deltas. `cursor` is the value the source
40    /// returned from the previous call, or `None` for the first pull.
41    async fn pull(&self, cursor: Option<String>) -> Result<PullResult>;
42
43    /// Subscribe to a live stream of deltas starting from `cursor`.
44    ///
45    /// The default implementation does a **single** `pull` call and wraps
46    /// the resulting batch as a finite stream. Override this method for
47    /// sources that support real push (WebSocket, SSE, change-feeds, etc.).
48    async fn subscribe(&self, cursor: Option<String>) -> Result<DeltaStream> {
49        let batch = self.pull(cursor).await?;
50        let deltas: Vec<Result<Delta>> = batch.deltas.into_iter().map(Ok).collect();
51        Ok(Box::pin(stream::iter(deltas)))
52    }
53}
54
55/// In-memory [`SyncSource`] used by tests.
56///
57/// Holds a list of pre-baked [`PullResult`] batches and serves them in order;
58/// once exhausted, subsequent calls return an empty batch with `has_more =
59/// false`.
60pub struct StaticSource {
61    id: String,
62    batches: Mutex<Vec<PullResult>>,
63}
64
65impl StaticSource {
66    /// Build a static source with the given id and batches.
67    pub fn new(id: impl Into<String>, batches: Vec<PullResult>) -> Self {
68        Self {
69            id: id.into(),
70            batches: Mutex::new(batches),
71        }
72    }
73
74    /// Convenience: build a single-batch source from a flat delta list.
75    pub fn from_deltas(id: impl Into<String>, deltas: Vec<Delta>) -> Self {
76        Self::new(
77            id,
78            vec![PullResult {
79                deltas,
80                next_cursor: None,
81                has_more: false,
82            }],
83        )
84    }
85
86    /// Push another batch to be served on the next pull.
87    pub fn enqueue(&self, batch: PullResult) {
88        self.batches.lock().unwrap().push(batch);
89    }
90}
91
92#[async_trait]
93impl SyncSource for StaticSource {
94    fn id(&self) -> &str {
95        &self.id
96    }
97
98    async fn pull(&self, _cursor: Option<String>) -> Result<PullResult> {
99        let mut batches = self.batches.lock().unwrap();
100        if batches.is_empty() {
101            Ok(PullResult::default())
102        } else {
103            Ok(batches.remove(0))
104        }
105    }
106}