Skip to main content

vantage_aws/impls/
table_source.rs

1//! `TableSource` impl for `AwsAccount`.
2//!
3//! Protocol-agnostic — `execute_rpc` and `parse_records` (in
4//! `crate::dispatch`) pick the wire protocol from the table-name
5//! prefix. Read-only in v0. Writes are stubbed to error. Aggregations
6//! (sum/min/max) likewise. The two interesting methods are
7//! `list_table_values` (folds conditions into a request body and
8//! parses the response) and `column_table_values_expr` (returns a
9//! deferred expression over the column's values — same shape as
10//! `vantage-csv`). `related_in_condition` builds on top of that to
11//! make `with_one` / `with_many` work for cross-resource navigation.
12
13use async_trait::async_trait;
14use ciborium::Value as CborValue;
15use indexmap::IndexMap;
16use serde_json::json;
17
18use vantage_core::error;
19use vantage_dataset::traits::Result as DatasetResult;
20use vantage_expressions::{
21    Expression, Expressive, expr_any,
22    traits::associated_expressions::AssociatedExpression,
23    traits::datasource::ExprDataSource,
24    traits::expressive::{DeferredFn, ExpressiveEnum},
25};
26use vantage_table::column::core::{Column, ColumnType};
27use vantage_table::table::Table;
28use vantage_table::traits::table_source::TableSource;
29use vantage_types::{Entity, Record};
30
31use crate::account::AwsAccount;
32use crate::condition::AwsCondition;
33
34#[async_trait]
35impl TableSource for AwsAccount {
36    type Column<Type>
37        = Column<Type>
38    where
39        Type: ColumnType;
40    type AnyType = CborValue;
41    type Value = CborValue;
42    type Id = String;
43    type Condition = AwsCondition;
44
45    fn create_column<Type: ColumnType>(&self, name: &str) -> Self::Column<Type> {
46        Column::new(name)
47    }
48
49    fn to_any_column<Type: ColumnType>(
50        &self,
51        column: Self::Column<Type>,
52    ) -> Self::Column<Self::AnyType> {
53        Column::from_column(column)
54    }
55
56    fn convert_any_column<Type: ColumnType>(
57        &self,
58        any_column: Self::Column<Self::AnyType>,
59    ) -> Option<Self::Column<Type>> {
60        Some(Column::from_column(any_column))
61    }
62
63    fn expr(
64        &self,
65        template: impl Into<String>,
66        parameters: Vec<ExpressiveEnum<Self::Value>>,
67    ) -> Expression<Self::Value> {
68        Expression::new(template, parameters)
69    }
70
71    fn search_table_condition<E>(
72        &self,
73        _table: &Table<Self, E>,
74        search_value: &str,
75    ) -> Self::Condition
76    where
77        E: Entity<Self::Value>,
78    {
79        // No notion of which field is "searchable" at this layer.
80        // Models that want full-text search add their own Eq on the
81        // service-specific field (e.g. CloudWatch's filterPattern).
82        AwsCondition::eq("__search__", json!(search_value).to_string())
83    }
84
85    async fn list_table_values<E>(
86        &self,
87        table: &Table<Self, E>,
88    ) -> DatasetResult<IndexMap<Self::Id, Record<Self::Value>>>
89    where
90        E: Entity<Self::Value>,
91        Self: Sized,
92    {
93        let id_field = table.id_field().map(|c| c.name().to_string());
94        let conditions: Vec<AwsCondition> = table.conditions().cloned().collect();
95        let resp = self.execute_rpc(table.table_name(), &conditions).await?;
96        Ok(self.parse_records(table.table_name(), resp, id_field.as_deref())?)
97    }
98
99    async fn get_table_value<E>(
100        &self,
101        table: &Table<Self, E>,
102        id: &Self::Id,
103    ) -> DatasetResult<Option<Record<Self::Value>>>
104    where
105        E: Entity<Self::Value>,
106        Self: Sized,
107    {
108        // No native point-get for most JSON-1.1 APIs — list with the
109        // table's conditions and pluck. Same honest cost as Mongo's
110        // get-by-id without an index.
111        let mut all = self.list_table_values(table).await?;
112        Ok(all.shift_remove(id))
113    }
114
115    async fn get_table_some_value<E>(
116        &self,
117        table: &Table<Self, E>,
118    ) -> DatasetResult<Option<(Self::Id, Record<Self::Value>)>>
119    where
120        E: Entity<Self::Value>,
121        Self: Sized,
122    {
123        let all = self.list_table_values(table).await?;
124        Ok(all.into_iter().next())
125    }
126
127    async fn get_table_count<E>(&self, table: &Table<Self, E>) -> DatasetResult<i64>
128    where
129        E: Entity<Self::Value>,
130        Self: Sized,
131    {
132        let all = self.list_table_values(table).await?;
133        Ok(all.len() as i64)
134    }
135
136    async fn get_table_sum<E>(
137        &self,
138        _table: &Table<Self, E>,
139        _column: &Self::Column<Self::AnyType>,
140    ) -> DatasetResult<Self::Value>
141    where
142        E: Entity<Self::Value>,
143        Self: Sized,
144    {
145        Err(error!("Aggregations not supported by vantage-aws"))
146    }
147
148    async fn get_table_max<E>(
149        &self,
150        _table: &Table<Self, E>,
151        _column: &Self::Column<Self::AnyType>,
152    ) -> DatasetResult<Self::Value>
153    where
154        E: Entity<Self::Value>,
155        Self: Sized,
156    {
157        Err(error!("Aggregations not supported by vantage-aws"))
158    }
159
160    async fn get_table_min<E>(
161        &self,
162        _table: &Table<Self, E>,
163        _column: &Self::Column<Self::AnyType>,
164    ) -> DatasetResult<Self::Value>
165    where
166        E: Entity<Self::Value>,
167        Self: Sized,
168    {
169        Err(error!("Aggregations not supported by vantage-aws"))
170    }
171
172    async fn insert_table_value<E>(
173        &self,
174        _table: &Table<Self, E>,
175        _id: &Self::Id,
176        _record: &Record<Self::Value>,
177    ) -> DatasetResult<Record<Self::Value>>
178    where
179        E: Entity<Self::Value>,
180        Self: Sized,
181    {
182        Err(error!("vantage-aws is read-only in v0"))
183    }
184
185    async fn replace_table_value<E>(
186        &self,
187        _table: &Table<Self, E>,
188        _id: &Self::Id,
189        _record: &Record<Self::Value>,
190    ) -> DatasetResult<Record<Self::Value>>
191    where
192        E: Entity<Self::Value>,
193        Self: Sized,
194    {
195        Err(error!("vantage-aws is read-only in v0"))
196    }
197
198    async fn patch_table_value<E>(
199        &self,
200        _table: &Table<Self, E>,
201        _id: &Self::Id,
202        _partial: &Record<Self::Value>,
203    ) -> DatasetResult<Record<Self::Value>>
204    where
205        E: Entity<Self::Value>,
206        Self: Sized,
207    {
208        Err(error!("vantage-aws is read-only in v0"))
209    }
210
211    async fn delete_table_value<E>(
212        &self,
213        _table: &Table<Self, E>,
214        _id: &Self::Id,
215    ) -> DatasetResult<()>
216    where
217        E: Entity<Self::Value>,
218        Self: Sized,
219    {
220        Err(error!("vantage-aws is read-only in v0"))
221    }
222
223    async fn delete_table_all_values<E>(&self, _table: &Table<Self, E>) -> DatasetResult<()>
224    where
225        E: Entity<Self::Value>,
226        Self: Sized,
227    {
228        Err(error!("vantage-aws is read-only in v0"))
229    }
230
231    async fn insert_table_return_id_value<E>(
232        &self,
233        _table: &Table<Self, E>,
234        _record: &Record<Self::Value>,
235    ) -> DatasetResult<Self::Id>
236    where
237        E: Entity<Self::Value>,
238        Self: Sized,
239    {
240        Err(error!("vantage-aws is read-only in v0"))
241    }
242
243    fn related_in_condition<SourceE: Entity<Self::Value> + 'static>(
244        &self,
245        target_field: &str,
246        source_table: &Table<Self, SourceE>,
247        source_column: &str,
248    ) -> Self::Condition
249    where
250        Self: Sized,
251    {
252        // Build "target_field IN (subquery)" as a Deferred condition:
253        // at execute time, the embedded expression runs the source
254        // query, projects `source_column`, and we apply the same
255        // take-1-or-error rule as any other Deferred. AWS doesn't
256        // accept multi-value filters, so traversal is implicitly
257        // single-parent — multi-row sources error loudly.
258        let src_col = self.create_column::<Self::AnyType>(source_column);
259        let values_expr = self.column_table_values_expr(source_table, &src_col);
260        AwsCondition::Deferred {
261            field: target_field.to_string(),
262            source: values_expr.expr(),
263        }
264    }
265
266    fn column_table_values_expr<'a, E, Type: ColumnType>(
267        &'a self,
268        table: &Table<Self, E>,
269        column: &Self::Column<Type>,
270    ) -> AssociatedExpression<'a, Self, Self::Value, Vec<Type>>
271    where
272        E: Entity<Self::Value> + 'static,
273        Self: Sized,
274    {
275        // Same shape as `vantage-csv`'s impl: wrap a `DeferredFn` that
276        // runs `list_table_values` and projects the named column at
277        // execute time. Caller goes through `AssociatedExpression::get`
278        // for direct execution, or `.expr()` to embed in a bigger
279        // expression (e.g. a deferred condition on another table).
280        let table_clone = table.clone();
281        let col = column.name().to_string();
282        let aws = self.clone();
283
284        let inner = expr_any!("{}", {
285            DeferredFn::new(move || {
286                let aws = aws.clone();
287                let table = table_clone.clone();
288                let col = col.clone();
289                Box::pin(async move {
290                    let records = aws.list_table_values(&table).await?;
291                    let values: Vec<CborValue> = records
292                        .values()
293                        .filter_map(|r| r.get(&col).cloned())
294                        .collect();
295                    Ok(ExpressiveEnum::Scalar(CborValue::Array(values)))
296                })
297            })
298        });
299
300        let expr = expr_any!("{}", { self.defer(inner) });
301        AssociatedExpression::new(expr, self)
302    }
303}