Skip to main content

ubiquisync_core/sync/
cursors.rs

1//! Per-origin replication cursors — the version vector replicas exchange.
2
3use std::collections::HashMap;
4use std::pin::Pin;
5
6use async_trait::async_trait;
7use futures_core::Stream;
8
9use crate::uuid::Uuid;
10
11use super::error::SyncError;
12
13/// Per-origin position: `peer id → next entry index`. The value for a peer is
14/// what to pass [`read_since`](super::LogSource::read_since) for the next entry;
15/// an absent key means `0`.
16///
17/// A version vector: merge by pointwise `max`, diff to get the gap to pull.
18pub type PeerCursors = HashMap<Uuid, u64>;
19
20/// An event from [`watch_cursors`](HasCursors::watch_cursors).
21///
22/// First event is a full `Snapshot`, later events `Advanced` deltas of the
23/// origins that moved; fold each in by pointwise `max`. Carrying cursor state
24/// (not entries) makes it loss-tolerant: a dropped or coalesced event costs
25/// nothing — the next one still names the current position, and the gap is
26/// refetched via `read_since`.
27#[derive(Debug, Clone)]
28pub enum CursorsEvent {
29    /// Full cursor vector at subscription time.
30    Snapshot(PeerCursors),
31    /// Origins that advanced since the last event, mapped to their new index.
32    Advanced(PeerCursors),
33}
34
35/// Stream from [`watch_cursors`](HasCursors::watch_cursors); ending means the
36/// watch closed.
37pub type CursorStream = Pin<Box<dyn Stream<Item = CursorsEvent> + Send>>;
38
39/// Snapshot or watch a replica's cursor vector — the shared base of
40/// [`LogProcessor`](super::LogProcessor) and [`LogSource`](super::LogSource).
41#[async_trait]
42pub trait HasCursors: Send + Sync {
43    /// Snapshot of the current cursor vector.
44    async fn cursors(&self) -> Result<PeerCursors, SyncError>;
45
46    /// Live cursor progress: a first [`Snapshot`](CursorsEvent::Snapshot), then
47    /// [`Advanced`](CursorsEvent::Advanced) deltas.
48    fn watch_cursors(&self) -> CursorStream;
49}