vantage_vista/source.rs
1use std::pin::Pin;
2
3use async_trait::async_trait;
4use ciborium::Value as CborValue;
5use futures_core::Stream;
6use indexmap::IndexMap;
7use vantage_core::{Result, VantageError, error};
8use vantage_types::Record;
9
10use crate::{
11 capabilities::VistaCapabilities,
12 column::Column,
13 reference::{ContainedSpec, Reference},
14 sort::SortDirection,
15 vista::Vista,
16};
17
18/// Per-driver executor for a `Vista`.
19///
20/// Implementations live in driver crates (vantage-sqlite, vantage-mongodb,
21/// vantage-aws, etc.). Each method receives `&Vista` so the driver can read
22/// the current condition state, columns, and other metadata.
23///
24/// `Id = String` and `Value = ciborium::Value` at this boundary, so every
25/// driver's native id (Mongo `ObjectId`, Surreal `Thing`, …) stringifies
26/// here. Methods are named with the `_vista_` infix to mirror
27/// `TableSource`'s `_table_` convention; `Vista`'s `ValueSet` impls
28/// delegate by stripping the infix.
29///
30/// `id: &String` (rather than `&str`) is intentional: the upstream
31/// `vantage_dataset::ValueSet` trait family fixes `Id = String` and uses
32/// `&Self::Id` in its signatures, so impls receive `&String` and forward
33/// it through unchanged.
34#[async_trait]
35#[allow(clippy::ptr_arg)]
36pub trait TableShell: Send + Sync + 'static {
37 // ---- Schema --------------------------------------------------------------
38 //
39 // The shell owns the schema. `Vista` is a thin wrapper that forwards its
40 // metadata accessors here. No defaults — every impl must answer (an empty
41 // schema is a deliberate choice the impl declares explicitly).
42
43 fn columns(&self) -> &IndexMap<String, Column>;
44
45 fn references(&self) -> &IndexMap<String, Reference>;
46
47 fn id_column(&self) -> Option<&str>;
48
49 // ---- ReadableValueSet delegates ----------------------------------------
50
51 async fn list_vista_values(&self, vista: &Vista)
52 -> Result<IndexMap<String, Record<CborValue>>>;
53
54 async fn get_vista_value(
55 &self,
56 vista: &Vista,
57 id: &String,
58 ) -> Result<Option<Record<CborValue>>>;
59
60 async fn get_vista_some_value(
61 &self,
62 vista: &Vista,
63 ) -> Result<Option<(String, Record<CborValue>)>>;
64
65 /// Default implementation wraps `list_vista_values`. Drivers with native
66 /// streaming (cursor-based queries, paginated REST APIs) override.
67 #[allow(clippy::type_complexity)]
68 fn stream_vista_values<'a>(
69 &'a self,
70 vista: &'a Vista,
71 ) -> Pin<Box<dyn Stream<Item = Result<(String, Record<CborValue>)>> + Send + 'a>>
72 where
73 Self: Sync,
74 {
75 Box::pin(async_stream::stream! {
76 match self.list_vista_values(vista).await {
77 Ok(map) => {
78 for item in map {
79 yield Ok(item);
80 }
81 }
82 Err(e) => yield Err(e),
83 }
84 })
85 }
86
87 // ---- WritableValueSet delegates ----------------------------------------
88 //
89 // Default impls return a typed VantageError via `default_error` — drivers
90 // override only what they actually support. The matching `VistaCapabilities`
91 // flag must be set to `true` for any method the driver implements; if the
92 // flag is `true` but the trait method falls through to the default,
93 // `default_error` produces an `Unimplemented`-kind error (placeholder
94 // detected). If the flag is `false`, it produces `Unsupported`. Both are
95 // emitted as tracing events at construction.
96
97 async fn insert_vista_value(
98 &self,
99 _vista: &Vista,
100 _id: &String,
101 _record: &Record<CborValue>,
102 ) -> Result<Record<CborValue>> {
103 Err(self.default_error("insert_vista_value", "can_insert"))
104 }
105
106 async fn replace_vista_value(
107 &self,
108 _vista: &Vista,
109 _id: &String,
110 _record: &Record<CborValue>,
111 ) -> Result<Record<CborValue>> {
112 Err(self.default_error("replace_vista_value", "can_update"))
113 }
114
115 async fn patch_vista_value(
116 &self,
117 _vista: &Vista,
118 _id: &String,
119 _partial: &Record<CborValue>,
120 ) -> Result<Record<CborValue>> {
121 Err(self.default_error("patch_vista_value", "can_update"))
122 }
123
124 async fn delete_vista_value(&self, _vista: &Vista, _id: &String) -> Result<()> {
125 Err(self.default_error("delete_vista_value", "can_delete"))
126 }
127
128 async fn delete_vista_all_values(&self, _vista: &Vista) -> Result<()> {
129 Err(self.default_error("delete_vista_all_values", "can_delete"))
130 }
131
132 // ---- InsertableValueSet delegate ---------------------------------------
133
134 async fn insert_vista_return_id_value(
135 &self,
136 _vista: &Vista,
137 _record: &Record<CborValue>,
138 ) -> Result<String> {
139 Err(self.default_error("insert_vista_return_id_value", "can_insert"))
140 }
141
142 // ---- Aggregates --------------------------------------------------------
143
144 /// Default impl falls back to `list_vista_values` — drivers with native
145 /// count (`SELECT COUNT(*)`, etc.) override.
146 async fn get_vista_count(&self, vista: &Vista) -> Result<i64> {
147 Ok(self.list_vista_values(vista).await?.len() as i64)
148 }
149
150 // ---- Conditions --------------------------------------------------------
151
152 /// Translate `field == value` into the driver's native condition type and
153 /// apply it to the wrapped table. The default impl returns `Unimplemented`
154 /// — every driver is expected to override.
155 ///
156 /// `value` is the universal CBOR carrier; the driver picks the appropriate
157 /// translation (e.g. `cbor_to_bson` for Mongo, `cbor → AnyCsvType` for CSV).
158 fn add_eq_condition(&mut self, _field: &str, _value: &CborValue) -> Result<()> {
159 Err(error!(
160 format!(
161 "add_eq_condition not implemented for '{}'",
162 std::any::type_name::<Self>()
163 ),
164 method = "add_eq_condition",
165 source_type = std::any::type_name::<Self>()
166 )
167 .is_unimplemented())
168 }
169
170 /// Push a driver-native condition into the wrapped table. The
171 /// caller boxes the condition as `dyn Any` and the driver
172 /// downcasts to its own `T::Condition`. Used by YAML-driven
173 /// relation traversal, where the factory constructs a
174 /// `DeferredFn`-bearing condition outside the value-set surface
175 /// (which only accepts scalar eq) and pushes it through this
176 /// channel. Default is `Unimplemented`.
177 fn add_raw_condition(
178 &mut self,
179 _condition: Box<dyn std::any::Any + Send + Sync>,
180 ) -> Result<()> {
181 Err(error!(
182 format!(
183 "add_raw_condition not implemented for '{}'",
184 std::any::type_name::<Self>()
185 ),
186 method = "add_raw_condition",
187 source_type = std::any::type_name::<Self>()
188 )
189 .is_unimplemented())
190 }
191
192 // ---- Pagination --------------------------------------------------------
193
194 /// Declare how many records constitute one page. Used by both
195 /// [`fetch_page`](Self::fetch_page) and [`fetch_next`](Self::fetch_next).
196 /// Default returns `default_error("set_page_size", "can_set_page_size")`.
197 fn set_page_size(&mut self, _size: usize) -> Result<()> {
198 Err(self.default_error("set_page_size", "can_set_page_size"))
199 }
200
201 /// Fetch a specific page (1-based) using offset-style pagination. The
202 /// per-page count comes from the most recent
203 /// [`set_page_size`](Self::set_page_size).
204 ///
205 /// Drivers without random-access pagination (DynamoDB, most token-paginated
206 /// REST APIs) leave the default in place, which produces `Unsupported`.
207 /// Callers should branch on `vista.capabilities().can_fetch_page` first.
208 async fn fetch_page(
209 &self,
210 _vista: &Vista,
211 _page: usize,
212 ) -> Result<Vec<(String, Record<CborValue>)>> {
213 Err(self.default_error("fetch_page", "can_fetch_page"))
214 }
215
216 /// Cursor-style chain fetch. Pass `None` on the first call; pass the
217 /// previous call's returned token on subsequent calls. Returned token is
218 /// `None` when the result set is exhausted.
219 ///
220 /// The token is **driver-private** — its shape is whatever the backend
221 /// finds convenient (DynamoDB `LastEvaluatedKey` as a CBOR map, REST
222 /// `nextToken` as `CborValue::Text`, offset-based as `CborValue::Integer`).
223 /// Consumers treat it as opaque and round-trip it back unchanged.
224 ///
225 /// Default returns `default_error("fetch_next", "can_fetch_next")`.
226 async fn fetch_next(
227 &self,
228 _vista: &Vista,
229 _token: Option<CborValue>,
230 ) -> Result<(Vec<(String, Record<CborValue>)>, Option<CborValue>)> {
231 Err(self.default_error("fetch_next", "can_fetch_next"))
232 }
233
234 // ---- Quicksearch -------------------------------------------------------
235
236 /// Apply a quicksearch filter — a single string the driver fans out across
237 /// the columns it considers searchable (typically those flagged
238 /// [`SEARCHABLE`](crate::flags::SEARCHABLE), but each driver decides).
239 ///
240 /// **Replace semantics**: calling `add_search` again wipes the previous
241 /// search filter before applying the new one. Default produces
242 /// `Unimplemented` (when `can_search: true`) or `Unsupported` (when
243 /// `can_search: false`).
244 fn add_search(&mut self, _text: &str) -> Result<()> {
245 Err(self.default_error("add_search", "can_search"))
246 }
247
248 /// Drop the search filter previously applied via
249 /// [`add_search`](Self::add_search). Default mirrors `add_search`.
250 fn clear_search(&mut self) -> Result<()> {
251 Err(self.default_error("clear_search", "can_search"))
252 }
253
254 // ---- Ordering ----------------------------------------------------------
255
256 /// Push a single ORDER BY clause onto the wrapped table.
257 ///
258 /// Vista's `add_order` is replace-semantics: the driver shell should clear
259 /// any previously-set order before pushing the new one. Default produces
260 /// `Unimplemented` (when `can_order: true`) or `Unsupported` (when
261 /// `can_order: false`).
262 fn add_order(&mut self, _field: &str, _dir: SortDirection) -> Result<()> {
263 Err(self.default_error("add_order", "can_order"))
264 }
265
266 /// Wipe every order clause. Default mirrors [`add_order`](Self::add_order).
267 fn clear_orders(&mut self) -> Result<()> {
268 Err(self.default_error("clear_orders", "can_order"))
269 }
270
271 // ---- References --------------------------------------------------------
272
273 /// Resolve a same-persistence relation using a known source row, returning
274 /// the related table as a new `Vista`.
275 ///
276 /// Drivers override by forwarding into the wrapped typed `Table`'s
277 /// `get_ref_from_row::<EmptyEntity>(relation, &native_row)` and then
278 /// wrapping the result back as a `Vista` through the driver's factory.
279 /// The default returns `Unimplemented`; cross-persistence refs are
280 /// handled at the `Vista` layer via `Vista::with_foreign` before this
281 /// trait method is reached.
282 fn get_ref(&self, relation: &str, _row: &Record<CborValue>) -> Result<Vista> {
283 Err(error!(
284 format!(
285 "get_ref not implemented for '{}'",
286 std::any::type_name::<Self>()
287 ),
288 method = "get_ref",
289 relation = relation,
290 source_type = std::any::type_name::<Self>()
291 )
292 .is_unimplemented())
293 }
294
295 /// Build the **bare** target of a same-persistence relation as a `Vista` —
296 /// the table a new related row would be inserted into, with no join
297 /// condition applied. Used by Vista's nested insert to reach a has-one /
298 /// has-many child's destination.
299 ///
300 /// Drivers override by forwarding into the wrapped typed `Table`'s
301 /// `get_ref_target::<EmptyEntity>(relation)` and wrapping the result back
302 /// through the driver's factory — the same path as [`get_ref`](Self::get_ref)
303 /// minus the row-derived condition. The default returns `Unimplemented`;
304 /// cross-persistence relations are rejected at the `Vista` layer before
305 /// this is reached.
306 fn get_ref_target(&self, relation: &str) -> Result<Vista> {
307 Err(error!(
308 format!(
309 "get_ref_target not implemented for '{}'",
310 std::any::type_name::<Self>()
311 ),
312 method = "get_ref_target",
313 relation = relation,
314 source_type = std::any::type_name::<Self>()
315 )
316 .is_unimplemented())
317 }
318
319 /// Contained (embedded-in-row) relations this shell exposes, keyed by name.
320 /// Default empty — only shells that model embedded objects/arrays override.
321 fn contained(&self) -> &IndexMap<String, ContainedSpec> {
322 static EMPTY: std::sync::OnceLock<IndexMap<String, ContainedSpec>> =
323 std::sync::OnceLock::new();
324 EMPTY.get_or_init(IndexMap::new)
325 }
326
327 /// Resolve a contained relation against a known parent `row`, returning the
328 /// embedded records as a sub-`Vista`. Writes to that sub-Vista patch the
329 /// host column of `row`'s record back through the shell. Default returns
330 /// `Unimplemented`; shells override to seed [`crate::build_contained_vista`]
331 /// with a writeback that patches the parent.
332 fn get_contained_ref(&self, relation: &str, _row: &Record<CborValue>) -> Result<Vista> {
333 Err(error!(
334 format!(
335 "get_contained_ref not implemented for '{}'",
336 std::any::type_name::<Self>()
337 ),
338 method = "get_contained_ref",
339 relation = relation,
340 source_type = std::any::type_name::<Self>()
341 )
342 .is_unimplemented())
343 }
344
345 /// Names + cardinalities of the shell's same-persistence references.
346 /// Derived from [`references`](Self::references) by default; impls
347 /// should rarely need to override.
348 fn get_ref_kinds(&self) -> Vec<(String, crate::reference::ReferenceKind)> {
349 self.references()
350 .iter()
351 .map(|(name, r)| (name.clone(), r.kind))
352 .collect()
353 }
354
355 // ---- Identity ----------------------------------------------------------
356
357 /// Short human label for the underlying driver (e.g. `"csv"`, `"sqlite"`,
358 /// `"postgres"`, `"mongodb"`). Used for diagnostics and CLI output.
359 /// Drivers should override; the default is a placeholder.
360 fn driver_name(&self) -> &'static str {
361 "unknown"
362 }
363
364 // ---- Capability advertisement -----------------------------------------
365
366 fn capabilities(&self) -> &VistaCapabilities;
367
368 /// Look up a capability flag by name. Used by `default_error` to decide
369 /// between `Unsupported` and `Unimplemented`. Drivers don't normally
370 /// need to override this.
371 fn capability_flag(&self, name: &str) -> bool {
372 let caps = self.capabilities();
373 match name {
374 "can_count" => caps.can_count,
375 "can_insert" => caps.can_insert,
376 "can_update" => caps.can_update,
377 "can_delete" => caps.can_delete,
378 "can_subscribe" => caps.can_subscribe,
379 "can_invalidate" => caps.can_invalidate,
380 "can_order" => caps.can_order,
381 "can_search" => caps.can_search,
382 "can_set_page_size" => caps.can_set_page_size,
383 "can_fetch_page" => caps.can_fetch_page,
384 "can_fetch_next" => caps.can_fetch_next,
385 _ => false,
386 }
387 }
388
389 /// Build the standard error returned by default trait method impls.
390 ///
391 /// Picks the kind based on the capability flag: a `true` flag means the
392 /// driver advertised support but didn't override the method (placeholder
393 /// → `Unimplemented`); a `false` flag means the driver honestly doesn't
394 /// claim the op (caller should have checked → `Unsupported`).
395 ///
396 /// Both kinds emit a `tracing::error!` at construction with `method`,
397 /// `capability`, `source_type`, and `vista_name` as structured fields.
398 fn default_error(&self, method: &str, capability: &str) -> VantageError {
399 let source_type = std::any::type_name::<Self>();
400 if self.capability_flag(capability) {
401 error!(
402 format!(
403 "'{}' is advertised as VistaCapability for '{}' but implementation for '{}' is missing",
404 capability, source_type, method
405 ),
406 method = method,
407 capability = capability,
408 source_type = source_type
409 )
410 .is_unimplemented()
411 } else {
412 error!(
413 format!(
414 "'{}' is not supported by '{}'; '{}' refused",
415 capability, source_type, method
416 ),
417 method = method,
418 capability = capability,
419 source_type = source_type
420 )
421 .is_unsupported()
422 }
423 }
424}