Skip to main content

vantage_aws/impls/
table_source.rs

1//! `TableSource` impl for `AwsAccount`.
2//!
3//! Protocol-agnostic — `execute_rpc` and `parse_records` (in
4//! `crate::dispatch`) pick the wire protocol from the table-name
5//! prefix. Read-only in v0. Writes are stubbed to error. Aggregations
6//! (sum/min/max) likewise. The two interesting methods are
7//! `list_table_values` (folds conditions into a request body and
8//! parses the response) and `column_table_values_expr` (returns a
9//! deferred expression over the column's values — same shape as
10//! `vantage-csv`). `related_in_condition` builds on top of that to
11//! make `with_one` / `with_many` work for cross-resource navigation.
12
13use async_trait::async_trait;
14use ciborium::Value as CborValue;
15use indexmap::IndexMap;
16use serde_json::json;
17
18use vantage_core::error;
19use vantage_dataset::traits::Result as DatasetResult;
20use vantage_expressions::{
21    Expression, Expressive, expr_any,
22    traits::associated_expressions::AssociatedExpression,
23    traits::datasource::ExprDataSource,
24    traits::expressive::{DeferredFn, ExpressiveEnum},
25};
26use vantage_table::column::core::{Column, ColumnType};
27use vantage_table::table::Table;
28use vantage_table::traits::table_source::TableSource;
29use vantage_types::{Entity, Record};
30
31use crate::account::AwsAccount;
32use crate::condition::AwsCondition;
33
34#[async_trait]
35impl TableSource for AwsAccount {
36    type Column<Type>
37        = Column<Type>
38    where
39        Type: ColumnType;
40    type AnyType = CborValue;
41    type Value = CborValue;
42    type Id = String;
43    type Condition = AwsCondition;
44
45    fn eq_condition(field: &str, value: &str) -> DatasetResult<Self::Condition> {
46        Ok(AwsCondition::eq(field.to_string(), value.to_string()))
47    }
48
49    /// Build `field == value` from a typed `Self::Value` (CBOR). Used by
50    /// `Reference::resolve_from_row` when traversing `with_many` /
51    /// `with_one` relations — the join value is pulled out of a
52    /// `Record<CborValue>` and pushed onto the child table verbatim,
53    /// without round-tripping through a string.
54    fn eq_value_condition(
55        &self,
56        field: &str,
57        value: Self::Value,
58    ) -> DatasetResult<Self::Condition> {
59        Ok(AwsCondition::eq(field.to_string(), value))
60    }
61
62    fn create_column<Type: ColumnType>(&self, name: &str) -> Self::Column<Type> {
63        Column::new(name)
64    }
65
66    fn to_any_column<Type: ColumnType>(
67        &self,
68        column: Self::Column<Type>,
69    ) -> Self::Column<Self::AnyType> {
70        Column::from_column(column)
71    }
72
73    fn convert_any_column<Type: ColumnType>(
74        &self,
75        any_column: Self::Column<Self::AnyType>,
76    ) -> Option<Self::Column<Type>> {
77        Some(Column::from_column(any_column))
78    }
79
80    fn expr(
81        &self,
82        template: impl Into<String>,
83        parameters: Vec<ExpressiveEnum<Self::Value>>,
84    ) -> Expression<Self::Value> {
85        Expression::new(template, parameters)
86    }
87
88    fn search_table_condition<E>(
89        &self,
90        _table: &Table<Self, E>,
91        search_value: &str,
92    ) -> Self::Condition
93    where
94        E: Entity<Self::Value>,
95    {
96        // No notion of which field is "searchable" at this layer.
97        // Models that want full-text search add their own Eq on the
98        // service-specific field (e.g. CloudWatch's filterPattern).
99        AwsCondition::eq("__search__", json!(search_value).to_string())
100    }
101
102    async fn list_table_values<E>(
103        &self,
104        table: &Table<Self, E>,
105    ) -> DatasetResult<IndexMap<Self::Id, Record<Self::Value>>>
106    where
107        E: Entity<Self::Value>,
108        Self: Sized,
109    {
110        let id_field = table.id_field().map(|c| c.name().to_string());
111        let conditions: Vec<AwsCondition> = table.conditions().cloned().collect();
112        let resp = self.execute_rpc(table.table_name(), &conditions).await?;
113        let mut records = self.parse_records(table.table_name(), resp, id_field.as_deref())?;
114
115        // AWS APIs only push down their own request-param filters
116        // (e.g. `PathPrefix`, `logGroupNamePrefix`, `cluster`). Eq
117        // conditions naming actual record fields (`UserName`, `Path`,
118        // `clusterArn`) are still useful — apply them post-hoc so
119        // narrowing works regardless of what the API filters
120        // server-side. Field names that don't appear on any record are
121        // assumed to be request params and skipped.
122        records.retain(|_id, record| {
123            conditions.iter().all(|c| match c {
124                AwsCondition::Eq { field, value } => match record.get(field) {
125                    Some(rec_val) => rec_val == value,
126                    None => true,
127                },
128                _ => true,
129            })
130        });
131
132        Ok(records)
133    }
134
135    async fn get_table_value<E>(
136        &self,
137        table: &Table<Self, E>,
138        id: &Self::Id,
139    ) -> DatasetResult<Option<Record<Self::Value>>>
140    where
141        E: Entity<Self::Value>,
142        Self: Sized,
143    {
144        // No native point-get for most JSON-1.1 APIs — list with the
145        // table's conditions and pluck. Same honest cost as Mongo's
146        // get-by-id without an index.
147        let mut all = self.list_table_values(table).await?;
148        Ok(all.shift_remove(id))
149    }
150
151    async fn get_table_some_value<E>(
152        &self,
153        table: &Table<Self, E>,
154    ) -> DatasetResult<Option<(Self::Id, Record<Self::Value>)>>
155    where
156        E: Entity<Self::Value>,
157        Self: Sized,
158    {
159        let all = self.list_table_values(table).await?;
160        Ok(all.into_iter().next())
161    }
162
163    async fn get_table_count<E>(&self, table: &Table<Self, E>) -> DatasetResult<i64>
164    where
165        E: Entity<Self::Value>,
166        Self: Sized,
167    {
168        let all = self.list_table_values(table).await?;
169        Ok(all.len() as i64)
170    }
171
172    async fn get_table_sum<E>(
173        &self,
174        _table: &Table<Self, E>,
175        _column: &Self::Column<Self::AnyType>,
176    ) -> DatasetResult<Self::Value>
177    where
178        E: Entity<Self::Value>,
179        Self: Sized,
180    {
181        Err(error!("Aggregations not supported by vantage-aws"))
182    }
183
184    async fn get_table_max<E>(
185        &self,
186        _table: &Table<Self, E>,
187        _column: &Self::Column<Self::AnyType>,
188    ) -> DatasetResult<Self::Value>
189    where
190        E: Entity<Self::Value>,
191        Self: Sized,
192    {
193        Err(error!("Aggregations not supported by vantage-aws"))
194    }
195
196    async fn get_table_min<E>(
197        &self,
198        _table: &Table<Self, E>,
199        _column: &Self::Column<Self::AnyType>,
200    ) -> DatasetResult<Self::Value>
201    where
202        E: Entity<Self::Value>,
203        Self: Sized,
204    {
205        Err(error!("Aggregations not supported by vantage-aws"))
206    }
207
208    async fn insert_table_value<E>(
209        &self,
210        _table: &Table<Self, E>,
211        _id: &Self::Id,
212        _record: &Record<Self::Value>,
213    ) -> DatasetResult<Record<Self::Value>>
214    where
215        E: Entity<Self::Value>,
216        Self: Sized,
217    {
218        Err(error!("vantage-aws is read-only in v0"))
219    }
220
221    async fn replace_table_value<E>(
222        &self,
223        _table: &Table<Self, E>,
224        _id: &Self::Id,
225        _record: &Record<Self::Value>,
226    ) -> DatasetResult<Record<Self::Value>>
227    where
228        E: Entity<Self::Value>,
229        Self: Sized,
230    {
231        Err(error!("vantage-aws is read-only in v0"))
232    }
233
234    async fn patch_table_value<E>(
235        &self,
236        _table: &Table<Self, E>,
237        _id: &Self::Id,
238        _partial: &Record<Self::Value>,
239    ) -> DatasetResult<Record<Self::Value>>
240    where
241        E: Entity<Self::Value>,
242        Self: Sized,
243    {
244        Err(error!("vantage-aws is read-only in v0"))
245    }
246
247    async fn delete_table_value<E>(
248        &self,
249        _table: &Table<Self, E>,
250        _id: &Self::Id,
251    ) -> DatasetResult<()>
252    where
253        E: Entity<Self::Value>,
254        Self: Sized,
255    {
256        Err(error!("vantage-aws is read-only in v0"))
257    }
258
259    async fn delete_table_all_values<E>(&self, _table: &Table<Self, E>) -> DatasetResult<()>
260    where
261        E: Entity<Self::Value>,
262        Self: Sized,
263    {
264        Err(error!("vantage-aws is read-only in v0"))
265    }
266
267    async fn insert_table_return_id_value<E>(
268        &self,
269        _table: &Table<Self, E>,
270        _record: &Record<Self::Value>,
271    ) -> DatasetResult<Self::Id>
272    where
273        E: Entity<Self::Value>,
274        Self: Sized,
275    {
276        Err(error!("vantage-aws is read-only in v0"))
277    }
278
279    fn related_in_condition<SourceE: Entity<Self::Value> + 'static>(
280        &self,
281        target_field: &str,
282        source_table: &Table<Self, SourceE>,
283        source_column: &str,
284    ) -> Self::Condition
285    where
286        Self: Sized,
287    {
288        // Build "target_field IN (subquery)" as a Deferred condition:
289        // at execute time, the embedded expression runs the source
290        // query, projects `source_column`, and we apply the same
291        // take-1-or-error rule as any other Deferred. AWS doesn't
292        // accept multi-value filters, so traversal is implicitly
293        // single-parent — multi-row sources error loudly.
294        let src_col = self.create_column::<Self::AnyType>(source_column);
295        let values_expr = self.column_table_values_expr(source_table, &src_col);
296        AwsCondition::Deferred {
297            field: target_field.to_string(),
298            source: values_expr.expr(),
299        }
300    }
301
302    fn column_table_values_expr<'a, E, Type: ColumnType>(
303        &'a self,
304        table: &Table<Self, E>,
305        column: &Self::Column<Type>,
306    ) -> AssociatedExpression<'a, Self, Self::Value, Vec<Type>>
307    where
308        E: Entity<Self::Value> + 'static,
309        Self: Sized,
310    {
311        // Same shape as `vantage-csv`'s impl: wrap a `DeferredFn` that
312        // runs `list_table_values` and projects the named column at
313        // execute time. Caller goes through `AssociatedExpression::get`
314        // for direct execution, or `.expr()` to embed in a bigger
315        // expression (e.g. a deferred condition on another table).
316        let table_clone = table.clone();
317        let col = column.name().to_string();
318        let aws = self.clone();
319
320        let inner = expr_any!("{}", {
321            DeferredFn::new(move || {
322                let aws = aws.clone();
323                let table = table_clone.clone();
324                let col = col.clone();
325                Box::pin(async move {
326                    let records = aws.list_table_values(&table).await?;
327                    let values: Vec<CborValue> = records
328                        .values()
329                        .filter_map(|r| r.get(&col).cloned())
330                        .collect();
331                    Ok(ExpressiveEnum::Scalar(CborValue::Array(values)))
332                })
333            })
334        });
335
336        let expr = expr_any!("{}", { self.defer(inner) });
337        AssociatedExpression::new(expr, self)
338    }
339}