Skip to main content

vantage_aws/impls/
table_source.rs

1//! `TableSource` impl for `AwsAccount`.
2//!
3//! Protocol-agnostic — `execute_rpc` and `parse_records` (in
4//! `crate::dispatch`) pick the wire protocol from the table-name
5//! prefix. Read-only in v0. Writes are stubbed to error. Aggregations
6//! (sum/min/max) likewise. The two interesting methods are
7//! `list_table_values` (folds conditions into a request body and
8//! parses the response) and `column_table_values_expr` (returns a
9//! deferred expression over the column's values — same shape as
10//! `vantage-csv`). `related_in_condition` builds on top of that to
11//! make `with_one` / `with_many` work for cross-resource navigation.
12
13use async_trait::async_trait;
14use ciborium::Value as CborValue;
15use indexmap::IndexMap;
16use serde_json::json;
17
18use vantage_core::error;
19use vantage_dataset::traits::Result as DatasetResult;
20use vantage_expressions::{
21    Expression, Expressive, expr_any,
22    traits::associated_expressions::AssociatedExpression,
23    traits::datasource::ExprDataSource,
24    traits::expressive::{DeferredFn, ExpressiveEnum},
25};
26use vantage_table::column::core::{Column, ColumnType};
27use vantage_table::table::Table;
28use vantage_table::traits::table_source::TableSource;
29use vantage_types::{Entity, Record};
30
31use crate::account::AwsAccount;
32use crate::condition::AwsCondition;
33
34#[async_trait]
35impl TableSource for AwsAccount {
36    type Column<Type>
37        = Column<Type>
38    where
39        Type: ColumnType;
40    type AnyType = CborValue;
41    type Value = CborValue;
42    type Id = String;
43    type Condition = AwsCondition;
44    type Source = String;
45
46    fn eq_condition(field: &str, value: &str) -> DatasetResult<Self::Condition> {
47        Ok(AwsCondition::eq(field.to_string(), value.to_string()))
48    }
49
50    /// Build `field == value` from a typed `Self::Value` (CBOR). Used by
51    /// `Reference::resolve_from_row` when traversing `with_many` /
52    /// `with_one` relations — the join value is pulled out of a
53    /// `Record<CborValue>` and pushed onto the child table verbatim,
54    /// without round-tripping through a string.
55    fn eq_value_condition(
56        &self,
57        field: &str,
58        value: Self::Value,
59    ) -> DatasetResult<Self::Condition> {
60        Ok(AwsCondition::eq(field.to_string(), value))
61    }
62
63    fn create_column<Type: ColumnType>(&self, name: &str) -> Self::Column<Type> {
64        Column::new(name)
65    }
66
67    fn to_any_column<Type: ColumnType>(
68        &self,
69        column: Self::Column<Type>,
70    ) -> Self::Column<Self::AnyType> {
71        Column::from_column(column)
72    }
73
74    fn convert_any_column<Type: ColumnType>(
75        &self,
76        any_column: Self::Column<Self::AnyType>,
77    ) -> Option<Self::Column<Type>> {
78        Some(Column::from_column(any_column))
79    }
80
81    fn expr(
82        &self,
83        template: impl Into<String>,
84        parameters: Vec<ExpressiveEnum<Self::Value>>,
85    ) -> Expression<Self::Value> {
86        Expression::new(template, parameters)
87    }
88
89    fn search_table_condition<E>(
90        &self,
91        _table: &Table<Self, E>,
92        search_value: &str,
93    ) -> Self::Condition
94    where
95        E: Entity<Self::Value>,
96    {
97        // No notion of which field is "searchable" at this layer.
98        // Models that want full-text search add their own Eq on the
99        // service-specific field (e.g. CloudWatch's filterPattern).
100        AwsCondition::eq("__search__", json!(search_value).to_string())
101    }
102
103    async fn list_table_values<E>(
104        &self,
105        table: &Table<Self, E>,
106    ) -> DatasetResult<IndexMap<Self::Id, Record<Self::Value>>>
107    where
108        E: Entity<Self::Value>,
109        Self: Sized,
110    {
111        let id_field = table.id_field().map(|c| c.name().to_string());
112        let conditions: Vec<AwsCondition> = table.conditions().cloned().collect();
113        let resp = self.execute_rpc(table.table_name(), &conditions).await?;
114        let mut records = self.parse_records(table.table_name(), resp, id_field.as_deref())?;
115
116        // AWS APIs only push down their own request-param filters
117        // (e.g. `PathPrefix`, `logGroupNamePrefix`, `cluster`). Eq
118        // conditions naming actual record fields (`UserName`, `Path`,
119        // `clusterArn`) are still useful — apply them post-hoc so
120        // narrowing works regardless of what the API filters
121        // server-side. Field names that don't appear on any record are
122        // assumed to be request params and skipped.
123        records.retain(|_id, record| {
124            conditions.iter().all(|c| match c {
125                AwsCondition::Eq { field, value } => match record.get(field) {
126                    Some(rec_val) => rec_val == value,
127                    None => true,
128                },
129                _ => true,
130            })
131        });
132
133        Ok(records)
134    }
135
136    async fn get_table_value<E>(
137        &self,
138        table: &Table<Self, E>,
139        id: &Self::Id,
140    ) -> DatasetResult<Option<Record<Self::Value>>>
141    where
142        E: Entity<Self::Value>,
143        Self: Sized,
144    {
145        // No native point-get for most JSON-1.1 APIs — list with the
146        // table's conditions and pluck. Same honest cost as Mongo's
147        // get-by-id without an index.
148        let mut all = self.list_table_values(table).await?;
149        Ok(all.shift_remove(id))
150    }
151
152    async fn get_table_some_value<E>(
153        &self,
154        table: &Table<Self, E>,
155    ) -> DatasetResult<Option<(Self::Id, Record<Self::Value>)>>
156    where
157        E: Entity<Self::Value>,
158        Self: Sized,
159    {
160        let all = self.list_table_values(table).await?;
161        Ok(all.into_iter().next())
162    }
163
164    async fn get_table_count<E>(&self, table: &Table<Self, E>) -> DatasetResult<i64>
165    where
166        E: Entity<Self::Value>,
167        Self: Sized,
168    {
169        let all = self.list_table_values(table).await?;
170        Ok(all.len() as i64)
171    }
172
173    async fn get_table_sum<E>(
174        &self,
175        _table: &Table<Self, E>,
176        _column: &Self::Column<Self::AnyType>,
177    ) -> DatasetResult<Self::Value>
178    where
179        E: Entity<Self::Value>,
180        Self: Sized,
181    {
182        Err(error!("Aggregations not supported by vantage-aws"))
183    }
184
185    async fn get_table_max<E>(
186        &self,
187        _table: &Table<Self, E>,
188        _column: &Self::Column<Self::AnyType>,
189    ) -> DatasetResult<Self::Value>
190    where
191        E: Entity<Self::Value>,
192        Self: Sized,
193    {
194        Err(error!("Aggregations not supported by vantage-aws"))
195    }
196
197    async fn get_table_min<E>(
198        &self,
199        _table: &Table<Self, E>,
200        _column: &Self::Column<Self::AnyType>,
201    ) -> DatasetResult<Self::Value>
202    where
203        E: Entity<Self::Value>,
204        Self: Sized,
205    {
206        Err(error!("Aggregations not supported by vantage-aws"))
207    }
208
209    async fn insert_table_value<E>(
210        &self,
211        _table: &Table<Self, E>,
212        _id: &Self::Id,
213        _record: &Record<Self::Value>,
214    ) -> DatasetResult<Record<Self::Value>>
215    where
216        E: Entity<Self::Value>,
217        Self: Sized,
218    {
219        Err(error!("vantage-aws is read-only in v0"))
220    }
221
222    async fn replace_table_value<E>(
223        &self,
224        _table: &Table<Self, E>,
225        _id: &Self::Id,
226        _record: &Record<Self::Value>,
227    ) -> DatasetResult<Record<Self::Value>>
228    where
229        E: Entity<Self::Value>,
230        Self: Sized,
231    {
232        Err(error!("vantage-aws is read-only in v0"))
233    }
234
235    async fn patch_table_value<E>(
236        &self,
237        _table: &Table<Self, E>,
238        _id: &Self::Id,
239        _partial: &Record<Self::Value>,
240    ) -> DatasetResult<Record<Self::Value>>
241    where
242        E: Entity<Self::Value>,
243        Self: Sized,
244    {
245        Err(error!("vantage-aws is read-only in v0"))
246    }
247
248    async fn delete_table_value<E>(
249        &self,
250        _table: &Table<Self, E>,
251        _id: &Self::Id,
252    ) -> DatasetResult<()>
253    where
254        E: Entity<Self::Value>,
255        Self: Sized,
256    {
257        Err(error!("vantage-aws is read-only in v0"))
258    }
259
260    async fn delete_table_all_values<E>(&self, _table: &Table<Self, E>) -> DatasetResult<()>
261    where
262        E: Entity<Self::Value>,
263        Self: Sized,
264    {
265        Err(error!("vantage-aws is read-only in v0"))
266    }
267
268    async fn insert_table_return_id_value<E>(
269        &self,
270        _table: &Table<Self, E>,
271        _record: &Record<Self::Value>,
272    ) -> DatasetResult<Self::Id>
273    where
274        E: Entity<Self::Value>,
275        Self: Sized,
276    {
277        Err(error!("vantage-aws is read-only in v0"))
278    }
279
280    fn related_in_condition<SourceE: Entity<Self::Value> + 'static>(
281        &self,
282        target_field: &str,
283        source_table: &Table<Self, SourceE>,
284        source_column: &str,
285    ) -> Self::Condition
286    where
287        Self: Sized,
288    {
289        // Build "target_field IN (subquery)" as a Deferred condition:
290        // at execute time, the embedded expression runs the source
291        // query, projects `source_column`, and we apply the same
292        // take-1-or-error rule as any other Deferred. AWS doesn't
293        // accept multi-value filters, so traversal is implicitly
294        // single-parent — multi-row sources error loudly.
295        let src_col = self.create_column::<Self::AnyType>(source_column);
296        let values_expr = self.column_table_values_expr(source_table, &src_col);
297        AwsCondition::Deferred {
298            field: target_field.to_string(),
299            source: values_expr.expr(),
300        }
301    }
302
303    fn column_table_values_expr<'a, E, Type: ColumnType>(
304        &'a self,
305        table: &Table<Self, E>,
306        column: &Self::Column<Type>,
307    ) -> AssociatedExpression<'a, Self, Self::Value, Vec<Type>>
308    where
309        E: Entity<Self::Value> + 'static,
310        Self: Sized,
311    {
312        // Same shape as `vantage-csv`'s impl: wrap a `DeferredFn` that
313        // runs `list_table_values` and projects the named column at
314        // execute time. Caller goes through `AssociatedExpression::get`
315        // for direct execution, or `.expr()` to embed in a bigger
316        // expression (e.g. a deferred condition on another table).
317        let table_clone = table.clone();
318        let col = column.name().to_string();
319        let aws = self.clone();
320
321        let inner = expr_any!("{}", {
322            DeferredFn::new(move || {
323                let aws = aws.clone();
324                let table = table_clone.clone();
325                let col = col.clone();
326                Box::pin(async move {
327                    let records = aws.list_table_values(&table).await?;
328                    let values: Vec<CborValue> = records
329                        .values()
330                        .filter_map(|r| r.get(&col).cloned())
331                        .collect();
332                    Ok(ExpressiveEnum::Scalar(CborValue::Array(values)))
333                })
334            })
335        });
336
337        let expr = expr_any!("{}", { self.defer(inner) });
338        AssociatedExpression::new(expr, self)
339    }
340}