vantage_live/live_table/mod.rs
1//! `LiveTable` — write-through cache wrapper around an `AnyTable` master.
2//!
3//! Reads consult the cache first, fall through to the master on a miss, and
4//! repopulate. Writes are queued on a worker task that applies them to the
5//! master (or a caller-specified alternate target) and invalidates the
6//! cache on success. An optional [`LiveStream`] also invalidates the cache
7//! whenever an external event source observes a change.
8//!
9//! See `DESIGN.md` for the full architectural rationale.
10
11mod event_consumer;
12pub mod impls;
13mod worker;
14mod write_op;
15
16use std::sync::Arc;
17
18use tokio::sync::mpsc;
19use vantage_table::any::AnyTable;
20use vantage_table::pagination::Pagination;
21
22use crate::cache::Cache;
23use crate::live_stream::LiveStream;
24
25pub use write_op::WriteOp;
26
27/// Bounded queue capacity. Picked low because writes are infrequent
28/// compared to reads; bumping it doesn't unlock new behaviour, just
29/// hides backpressure.
30const WRITE_QUEUE_CAPACITY: usize = 256;
31
32#[derive(Clone)]
33pub struct LiveTable {
34 /// The master table. `AnyTable` is itself `Clone`-cheap (clones the
35 /// inner `Box<dyn TableLike>` via `clone_box`), so we hold it
36 /// directly rather than behind a lock — `TableLike` requires sync
37 /// metadata accessors that don't compose with `tokio::sync::RwLock`.
38 /// Trade-off: the worker task captures its own clone at spawn time,
39 /// so a future `set_master` would have to update both. v1 doesn't
40 /// expose `set_master`.
41 pub(crate) master: AnyTable,
42
43 /// Caller-supplied identifier for the cached view. Combined with a
44 /// page suffix to produce the actual cache key on each read.
45 pub(crate) cache_key: String,
46
47 /// Cache backend. `Arc<dyn Cache>` so the same backend instance can
48 /// be shared by many `LiveTable`s pointing at different caches.
49 pub(crate) cache: Arc<dyn Cache>,
50
51 /// If set, write operations land here instead of the master. Reads
52 /// stay on the master; only writes are diverted.
53 pub(crate) custom_write_target: Option<AnyTable>,
54
55 /// Channel into the write-queue worker task.
56 pub(crate) write_queue: mpsc::Sender<WriteOp>,
57
58 /// Optional event source — pushes fed in here invalidate the cache.
59 /// Currently informational; the worker is wired up in a follow-up.
60 #[allow(dead_code)]
61 pub(crate) live_stream: Option<Arc<dyn LiveStream>>,
62
63 /// The master's items-per-page ceiling. Stored for forward
64 /// compatibility (multi-page glue when UI ipp > master ipp); v1
65 /// trusts the caller to keep UI ipp at or below this.
66 pub(crate) master_ipp: Option<i64>,
67
68 /// Pagination state set by `TableLike::set_pagination`. Used to
69 /// derive the cache key suffix on every read. Plain field, not
70 /// behind a lock — `TableLike::get_pagination` is sync and returns
71 /// a borrow, same shape `AnyTable` uses. Each `clone()` gets its
72 /// own copy; master and cache stay shared.
73 pub(crate) pagination: Option<Pagination>,
74}
75
76impl LiveTable {
77 /// Build a `LiveTable` around `master`, caching results under
78 /// `cache_key`. The worker task is spawned on the current tokio
79 /// runtime; call this from within an async context.
80 pub fn new(master: AnyTable, cache_key: impl Into<String>, cache: Arc<dyn Cache>) -> Self {
81 let cache_key = cache_key.into();
82
83 let (tx, rx) = mpsc::channel::<WriteOp>(WRITE_QUEUE_CAPACITY);
84
85 // Spawn the worker. It owns the Receiver; when every Sender drops
86 // (i.e. the LiveTable and all its clones), recv() returns None
87 // and the worker loop exits cleanly.
88 worker::spawn(
89 rx,
90 master.clone(),
91 None, // custom_write_target — overridden via builder
92 cache_key.clone(),
93 Arc::clone(&cache),
94 );
95
96 Self {
97 master,
98 cache_key,
99 cache,
100 custom_write_target: None,
101 write_queue: tx,
102 live_stream: None,
103 master_ipp: None,
104 pagination: None,
105 }
106 }
107
108 /// Set the master's max items-per-page hint. Stored, not enforced
109 /// in v1 (caller is responsible for keeping UI ipp at or below).
110 pub fn with_master_ipp(mut self, ipp: i64) -> Self {
111 self.master_ipp = Some(ipp);
112 self
113 }
114
115 /// Route writes to a different table than the master. Reads stay on
116 /// the master; only writes are diverted. Setting this rebuilds the
117 /// worker so it picks up the new target.
118 pub fn with_custom_write_target(mut self, target: AnyTable) -> Self {
119 // Drop the old worker by replacing the channel; old Sender drops
120 // → old Receiver gets None → old worker exits.
121 let (tx, rx) = mpsc::channel::<WriteOp>(WRITE_QUEUE_CAPACITY);
122 worker::spawn(
123 rx,
124 self.master.clone(),
125 Some(target.clone()),
126 self.cache_key.clone(),
127 Arc::clone(&self.cache),
128 );
129 self.write_queue = tx;
130 self.custom_write_target = Some(target);
131 self
132 }
133
134 /// Attach a live event source. Spawns a background task that
135 /// subscribes to the stream and invalidates the cache on every
136 /// event (sloppy invalidation — see DESIGN.md).
137 pub fn with_live_stream(mut self, stream: Arc<dyn LiveStream>) -> Self {
138 event_consumer::spawn(
139 Arc::clone(&stream),
140 self.cache_key.clone(),
141 Arc::clone(&self.cache),
142 );
143 self.live_stream = Some(stream);
144 self
145 }
146
147 /// The cache key used for a given page number. Public for
148 /// observability / debugging — there's no production reason to call
149 /// this from outside the crate.
150 pub fn page_cache_key(&self, page: i64) -> String {
151 format!("{}/page_{}", self.cache_key, page)
152 }
153
154 /// The cache key used for a single-row `get_value` lookup.
155 pub fn id_cache_key(&self, id: &str) -> String {
156 format!("{}/id/{}", self.cache_key, id)
157 }
158}
159
160impl std::fmt::Debug for LiveTable {
161 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
162 f.debug_struct("LiveTable")
163 .field("cache_key", &self.cache_key)
164 .field("master_ipp", &self.master_ipp)
165 .field(
166 "has_custom_write_target",
167 &self.custom_write_target.is_some(),
168 )
169 .field("has_live_stream", &self.live_stream.is_some())
170 .finish()
171 }
172}