Skip to main content

vantage_live/live_table/
mod.rs

1//! `LiveTable` — write-through cache wrapper around an `AnyTable` master.
2//!
3//! Reads consult the cache first, fall through to the master on a miss, and
4//! repopulate. Writes are queued on a worker task that applies them to the
5//! master (or a caller-specified alternate target) and invalidates the
6//! cache on success. An optional [`LiveStream`] also invalidates the cache
7//! whenever an external event source observes a change.
8//!
9//! See `DESIGN.md` for the full architectural rationale.
10
11mod event_consumer;
12pub mod impls;
13mod worker;
14mod write_op;
15
16use std::sync::Arc;
17
18use tokio::sync::mpsc;
19use vantage_table::any::AnyTable;
20use vantage_table::pagination::Pagination;
21
22use crate::cache::Cache;
23use crate::live_stream::LiveStream;
24
25pub use write_op::WriteOp;
26
27/// Bounded queue capacity. Picked low because writes are infrequent
28/// compared to reads; bumping it doesn't unlock new behaviour, just
29/// hides backpressure.
30const WRITE_QUEUE_CAPACITY: usize = 256;
31
32#[derive(Clone)]
33pub struct LiveTable {
34    /// The master table. `AnyTable` is itself `Clone`-cheap (clones the
35    /// inner `Box<dyn TableLike>` via `clone_box`), so we hold it
36    /// directly rather than behind a lock — `TableLike` requires sync
37    /// metadata accessors that don't compose with `tokio::sync::RwLock`.
38    /// Trade-off: the worker task captures its own clone at spawn time,
39    /// so a future `set_master` would have to update both. v1 doesn't
40    /// expose `set_master`.
41    pub(crate) master: AnyTable,
42
43    /// Caller-supplied identifier for the cached view. Combined with a
44    /// page suffix to produce the actual cache key on each read.
45    pub(crate) cache_key: String,
46
47    /// Cache backend. `Arc<dyn Cache>` so the same backend instance can
48    /// be shared by many `LiveTable`s pointing at different caches.
49    pub(crate) cache: Arc<dyn Cache>,
50
51    /// If set, write operations land here instead of the master. Reads
52    /// stay on the master; only writes are diverted.
53    pub(crate) custom_write_target: Option<AnyTable>,
54
55    /// Channel into the write-queue worker task.
56    pub(crate) write_queue: mpsc::Sender<WriteOp>,
57
58    /// Optional event source — pushes fed in here invalidate the cache.
59    /// Currently informational; the worker is wired up in a follow-up.
60    #[allow(dead_code)]
61    pub(crate) live_stream: Option<Arc<dyn LiveStream>>,
62
63    /// The master's items-per-page ceiling. Stored for forward
64    /// compatibility (multi-page glue when UI ipp > master ipp); v1
65    /// trusts the caller to keep UI ipp at or below this.
66    pub(crate) master_ipp: Option<i64>,
67
68    /// Pagination state set by `TableLike::set_pagination`. Used to
69    /// derive the cache key suffix on every read. Plain field, not
70    /// behind a lock — `TableLike::get_pagination` is sync and returns
71    /// a borrow, same shape `AnyTable` uses. Each `clone()` gets its
72    /// own copy; master and cache stay shared.
73    pub(crate) pagination: Option<Pagination>,
74}
75
76impl LiveTable {
77    /// Build a `LiveTable` around `master`, caching results under
78    /// `cache_key`. The worker task is spawned on the current tokio
79    /// runtime; call this from within an async context.
80    pub fn new(master: AnyTable, cache_key: impl Into<String>, cache: Arc<dyn Cache>) -> Self {
81        let cache_key = cache_key.into();
82
83        let (tx, rx) = mpsc::channel::<WriteOp>(WRITE_QUEUE_CAPACITY);
84
85        // Spawn the worker. It owns the Receiver; when every Sender drops
86        // (i.e. the LiveTable and all its clones), recv() returns None
87        // and the worker loop exits cleanly.
88        worker::spawn(
89            rx,
90            master.clone(),
91            None, // custom_write_target — overridden via builder
92            cache_key.clone(),
93            Arc::clone(&cache),
94        );
95
96        Self {
97            master,
98            cache_key,
99            cache,
100            custom_write_target: None,
101            write_queue: tx,
102            live_stream: None,
103            master_ipp: None,
104            pagination: None,
105        }
106    }
107
108    /// Set the master's max items-per-page hint. Stored, not enforced
109    /// in v1 (caller is responsible for keeping UI ipp at or below).
110    pub fn with_master_ipp(mut self, ipp: i64) -> Self {
111        self.master_ipp = Some(ipp);
112        self
113    }
114
115    /// Route writes to a different table than the master. Reads stay on
116    /// the master; only writes are diverted. Setting this rebuilds the
117    /// worker so it picks up the new target.
118    pub fn with_custom_write_target(mut self, target: AnyTable) -> Self {
119        // Drop the old worker by replacing the channel; old Sender drops
120        // → old Receiver gets None → old worker exits.
121        let (tx, rx) = mpsc::channel::<WriteOp>(WRITE_QUEUE_CAPACITY);
122        worker::spawn(
123            rx,
124            self.master.clone(),
125            Some(target.clone()),
126            self.cache_key.clone(),
127            Arc::clone(&self.cache),
128        );
129        self.write_queue = tx;
130        self.custom_write_target = Some(target);
131        self
132    }
133
134    /// Attach a live event source. Spawns a background task that
135    /// subscribes to the stream and invalidates the cache on every
136    /// event (sloppy invalidation — see DESIGN.md).
137    pub fn with_live_stream(mut self, stream: Arc<dyn LiveStream>) -> Self {
138        event_consumer::spawn(
139            Arc::clone(&stream),
140            self.cache_key.clone(),
141            Arc::clone(&self.cache),
142        );
143        self.live_stream = Some(stream);
144        self
145    }
146
147    /// The cache key used for a given page number. Public for
148    /// observability / debugging — there's no production reason to call
149    /// this from outside the crate.
150    pub fn page_cache_key(&self, page: i64) -> String {
151        format!("{}/page_{}", self.cache_key, page)
152    }
153
154    /// The cache key used for a single-row `get_value` lookup.
155    pub fn id_cache_key(&self, id: &str) -> String {
156        format!("{}/id/{}", self.cache_key, id)
157    }
158}
159
160impl std::fmt::Debug for LiveTable {
161    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
162        f.debug_struct("LiveTable")
163            .field("cache_key", &self.cache_key)
164            .field("master_ipp", &self.master_ipp)
165            .field(
166                "has_custom_write_target",
167                &self.custom_write_target.is_some(),
168            )
169            .field("has_live_stream", &self.live_stream.is_some())
170            .finish()
171    }
172}