vantage_table/traits/table_source.rs
1use std::hash::Hash;
2use std::pin::Pin;
3
4use async_trait::async_trait;
5use futures_core::Stream;
6use indexmap::IndexMap;
7use vantage_dataset::traits::Result;
8use vantage_expressions::{
9 Expression,
10 traits::associated_expressions::AssociatedExpression,
11 traits::datasource::{DataSource, ExprDataSource},
12 traits::expressive::ExpressiveEnum,
13};
14use vantage_types::{Entity, Record};
15
16use crate::{column::core::ColumnType, table::Table, traits::column_like::ColumnLike};
17
18/// Trait for table data sources that defines column type separate from execution
19/// TableSource represents a data source that can create and manage tables
20#[async_trait]
21pub trait TableSource: DataSource + Clone + 'static {
22 type Column<Type>: ColumnLike<Type> + Clone
23 where
24 Type: ColumnType;
25 type AnyType: ColumnType;
26 type Value: Clone + Send + Sync + 'static;
27 type Id: Send + Sync + Clone + Hash + Eq + 'static;
28
29 /// The condition type stored by `Table`. SQL/SurrealDB backends use
30 /// `Expression<Self::Value>`; document-oriented backends like MongoDB
31 /// can use a native filter type (e.g. `bson::Document`).
32 type Condition: Clone + Send + Sync + 'static;
33
34 /// Create a new column with the given name
35 fn create_column<Type: ColumnType>(&self, name: &str) -> Self::Column<Type>;
36
37 /// Convert a typed column to type-erased column
38 fn to_any_column<Type: ColumnType>(
39 &self,
40 column: Self::Column<Type>,
41 ) -> Self::Column<Self::AnyType>;
42
43 /// Attempt to convert a type-erased column back to typed column
44 fn convert_any_column<Type: ColumnType>(
45 &self,
46 any_column: Self::Column<Self::AnyType>,
47 ) -> Option<Self::Column<Type>>;
48
49 /// Create an expression from a template and parameters, similar to Expression::new
50 fn expr(
51 &self,
52 template: impl Into<String>,
53 parameters: Vec<ExpressiveEnum<Self::Value>>,
54 ) -> Expression<Self::Value>;
55
56 /// Create a search condition for a table (e.g., searches across searchable fields)
57 ///
58 /// Different vendors implement search differently:
59 /// - SQL: `field LIKE '%value%'` (returns Expression)
60 /// - SurrealDB: `field CONTAINS 'value'` (returns Expression)
61 /// - MongoDB: `{ field: { $regex: 'value' } }` (returns MongoCondition)
62 ///
63 /// The implementation should search across appropriate fields in the table.
64 fn search_table_condition<E>(
65 &self,
66 table: &Table<Self, E>,
67 search_value: &str,
68 ) -> Self::Condition
69 where
70 E: Entity<Self::Value>,
71 Self: Sized;
72
73 /// Get all data from a table as Record values with IDs (for ReadableValueSet implementation)
74 async fn list_table_values<E>(
75 &self,
76 table: &Table<Self, E>,
77 ) -> Result<IndexMap<Self::Id, Record<Self::Value>>>
78 where
79 E: Entity<Self::Value>,
80 Self: Sized;
81
82 /// Get a single record by ID as Record value (for ReadableValueSet implementation).
83 ///
84 /// Returns `Ok(None)` when no record exists with the given ID.
85 async fn get_table_value<E>(
86 &self,
87 table: &Table<Self, E>,
88 id: &Self::Id,
89 ) -> Result<Option<Record<Self::Value>>>
90 where
91 E: Entity<Self::Value>,
92 Self: Sized;
93
94 /// Get some data from a table as Record value with ID (for ReadableValueSet implementation)
95 async fn get_table_some_value<E>(
96 &self,
97 table: &Table<Self, E>,
98 ) -> Result<Option<(Self::Id, Record<Self::Value>)>>
99 where
100 E: Entity<Self::Value>,
101 Self: Sized;
102
103 /// Get count of records in the table
104 async fn get_table_count<E>(&self, table: &Table<Self, E>) -> Result<i64>
105 where
106 E: Entity<Self::Value>,
107 Self: Sized;
108
109 /// Get sum of a column in the table (returns native value type)
110 async fn get_table_sum<E>(
111 &self,
112 table: &Table<Self, E>,
113 column: &Self::Column<Self::AnyType>,
114 ) -> Result<Self::Value>
115 where
116 E: Entity<Self::Value>,
117 Self: Sized;
118
119 /// Get maximum value of a column in the table (returns native value type)
120 async fn get_table_max<E>(
121 &self,
122 table: &Table<Self, E>,
123 column: &Self::Column<Self::AnyType>,
124 ) -> Result<Self::Value>
125 where
126 E: Entity<Self::Value>,
127 Self: Sized;
128
129 /// Get minimum value of a column in the table (returns native value type)
130 async fn get_table_min<E>(
131 &self,
132 table: &Table<Self, E>,
133 column: &Self::Column<Self::AnyType>,
134 ) -> Result<Self::Value>
135 where
136 E: Entity<Self::Value>,
137 Self: Sized;
138
139 /// Insert a record as Record value (for WritableValueSet implementation)
140 async fn insert_table_value<E>(
141 &self,
142 table: &Table<Self, E>,
143 id: &Self::Id,
144 record: &Record<Self::Value>,
145 ) -> Result<Record<Self::Value>>
146 where
147 E: Entity<Self::Value>,
148 Self: Sized;
149
150 /// Replace a record as Record value (for WritableValueSet implementation)
151 async fn replace_table_value<E>(
152 &self,
153 table: &Table<Self, E>,
154 id: &Self::Id,
155 record: &Record<Self::Value>,
156 ) -> Result<Record<Self::Value>>
157 where
158 E: Entity<Self::Value>,
159 Self: Sized;
160
161 /// Patch a record as Record value (for WritableValueSet implementation)
162 async fn patch_table_value<E>(
163 &self,
164 table: &Table<Self, E>,
165 id: &Self::Id,
166 partial: &Record<Self::Value>,
167 ) -> Result<Record<Self::Value>>
168 where
169 E: Entity<Self::Value>,
170 Self: Sized;
171
172 /// Delete a record by ID (for WritableValueSet implementation)
173 async fn delete_table_value<E>(&self, table: &Table<Self, E>, id: &Self::Id) -> Result<()>
174 where
175 E: Entity<Self::Value>,
176 Self: Sized;
177
178 /// Delete all records (for WritableValueSet implementation)
179 async fn delete_table_all_values<E>(&self, table: &Table<Self, E>) -> Result<()>
180 where
181 E: Entity<Self::Value>,
182 Self: Sized;
183
184 /// Insert a record and return generated ID (for InsertableValueSet implementation)
185 async fn insert_table_return_id_value<E>(
186 &self,
187 table: &Table<Self, E>,
188 record: &Record<Self::Value>,
189 ) -> Result<Self::Id>
190 where
191 E: Entity<Self::Value>,
192 Self: Sized;
193
194 /// Stream all records from a table as (Id, Record) pairs.
195 ///
196 /// Default implementation wraps `list_table_values` into a stream.
197 /// Backends with native streaming (e.g. REST APIs with pagination)
198 /// can override this to yield records incrementally.
199 #[allow(clippy::type_complexity)]
200 fn stream_table_values<'a, E>(
201 &'a self,
202 table: &Table<Self, E>,
203 ) -> Pin<Box<dyn Stream<Item = Result<(Self::Id, Record<Self::Value>)>> + Send + 'a>>
204 where
205 E: Entity<Self::Value> + 'a,
206 Self: Sized,
207 {
208 let table = table.clone();
209 Box::pin(async_stream::stream! {
210 let records = self.list_table_values(&table).await;
211 match records {
212 Ok(map) => {
213 for item in map {
214 yield Ok(item);
215 }
216 }
217 Err(e) => yield Err(e),
218 }
219 })
220 }
221
222 /// Build a condition for "target_field IN (values of source_column from source_table)".
223 ///
224 /// Used by the relationship traversal system (`get_ref_as`) to filter
225 /// a target table by foreign key values from a source table.
226 ///
227 /// Each backend implements this natively:
228 /// - SQL backends build a subquery: `"id" IN (SELECT "bakery_id" FROM "client" WHERE ...)`
229 /// - MongoDB builds a deferred `{ field: { "$in": [...] } }` document
230 /// - CSV fetches values in memory and builds an IN condition
231 fn related_in_condition<SourceE: Entity<Self::Value> + 'static>(
232 &self,
233 target_field: &str,
234 source_table: &Table<Self, SourceE>,
235 source_column: &str,
236 ) -> Self::Condition
237 where
238 Self: Sized;
239
240 /// Build a correlated condition: `target_table.target_field = source_table.source_column`.
241 ///
242 /// Used by `get_subquery_as` for embedding correlated subqueries inside SELECT
243 /// expressions (e.g. `(SELECT COUNT(*) FROM order WHERE order.client_id = client.id)`).
244 ///
245 /// SQL backends produce table-qualified equality; non-SQL backends may not support
246 /// correlated subqueries and should leave the default (which panics).
247 fn related_correlated_condition(
248 &self,
249 target_table: &str,
250 target_field: &str,
251 source_table: &str,
252 source_column: &str,
253 ) -> Self::Condition {
254 let _ = (target_table, target_field, source_table, source_column);
255 unimplemented!("correlated subqueries not supported by this backend")
256 }
257
258 /// Return an associated expression that, when resolved, yields all values
259 /// of the given typed column from this table (respecting current conditions).
260 ///
261 /// For query-language backends, this can be a subquery expression.
262 /// For simple backends (CSV), this uses a `DeferredFn` that loads data
263 /// and extracts the column values at execution time.
264 ///
265 /// The returned `AssociatedExpression` can be:
266 /// - Executed directly: `.get().await -> Result<Vec<Type>>`
267 /// - Composed into expressions: used via `Expressive` trait in `in_()` conditions
268 ///
269 /// ```rust,ignore
270 /// let fk_col = source.get_column::<String>("bakery_id").unwrap();
271 /// let fk_values = source.data_source().column_table_values_expr(&source, &fk_col);
272 /// // Execute: let ids = fk_values.get().await?;
273 /// // Or compose: target.add_condition(target["id"].in_((fk_values)));
274 /// ```
275 fn column_table_values_expr<'a, E, Type: ColumnType>(
276 &'a self,
277 table: &Table<Self, E>,
278 column: &Self::Column<Type>,
279 ) -> AssociatedExpression<'a, Self, Self::Value, Vec<Type>>
280 where
281 E: Entity<Self::Value> + 'static,
282 Self: ExprDataSource<Self::Value> + Sized;
283}