1use async_trait::async_trait;
14use ciborium::Value as CborValue;
15use indexmap::IndexMap;
16use serde_json::json;
17
18use vantage_core::error;
19use vantage_dataset::traits::Result as DatasetResult;
20use vantage_expressions::{
21 Expression, Expressive, expr_any,
22 traits::associated_expressions::AssociatedExpression,
23 traits::datasource::ExprDataSource,
24 traits::expressive::{DeferredFn, ExpressiveEnum},
25};
26use vantage_table::column::core::{Column, ColumnType};
27use vantage_table::table::Table;
28use vantage_table::traits::table_source::TableSource;
29use vantage_types::{Entity, Record};
30
31use crate::account::AwsAccount;
32use crate::condition::AwsCondition;
33
34#[async_trait]
35impl TableSource for AwsAccount {
36 type Column<Type>
37 = Column<Type>
38 where
39 Type: ColumnType;
40 type AnyType = CborValue;
41 type Value = CborValue;
42 type Id = String;
43 type Condition = AwsCondition;
44
45 fn create_column<Type: ColumnType>(&self, name: &str) -> Self::Column<Type> {
46 Column::new(name)
47 }
48
49 fn to_any_column<Type: ColumnType>(
50 &self,
51 column: Self::Column<Type>,
52 ) -> Self::Column<Self::AnyType> {
53 Column::from_column(column)
54 }
55
56 fn convert_any_column<Type: ColumnType>(
57 &self,
58 any_column: Self::Column<Self::AnyType>,
59 ) -> Option<Self::Column<Type>> {
60 Some(Column::from_column(any_column))
61 }
62
63 fn expr(
64 &self,
65 template: impl Into<String>,
66 parameters: Vec<ExpressiveEnum<Self::Value>>,
67 ) -> Expression<Self::Value> {
68 Expression::new(template, parameters)
69 }
70
71 fn search_table_condition<E>(
72 &self,
73 _table: &Table<Self, E>,
74 search_value: &str,
75 ) -> Self::Condition
76 where
77 E: Entity<Self::Value>,
78 {
79 AwsCondition::eq("__search__", json!(search_value).to_string())
83 }
84
85 async fn list_table_values<E>(
86 &self,
87 table: &Table<Self, E>,
88 ) -> DatasetResult<IndexMap<Self::Id, Record<Self::Value>>>
89 where
90 E: Entity<Self::Value>,
91 Self: Sized,
92 {
93 let id_field = table.id_field().map(|c| c.name().to_string());
94 let conditions: Vec<AwsCondition> = table.conditions().cloned().collect();
95 let resp = self.execute_rpc(table.table_name(), &conditions).await?;
96 Ok(self.parse_records(table.table_name(), resp, id_field.as_deref())?)
97 }
98
99 async fn get_table_value<E>(
100 &self,
101 table: &Table<Self, E>,
102 id: &Self::Id,
103 ) -> DatasetResult<Option<Record<Self::Value>>>
104 where
105 E: Entity<Self::Value>,
106 Self: Sized,
107 {
108 let mut all = self.list_table_values(table).await?;
112 Ok(all.shift_remove(id))
113 }
114
115 async fn get_table_some_value<E>(
116 &self,
117 table: &Table<Self, E>,
118 ) -> DatasetResult<Option<(Self::Id, Record<Self::Value>)>>
119 where
120 E: Entity<Self::Value>,
121 Self: Sized,
122 {
123 let all = self.list_table_values(table).await?;
124 Ok(all.into_iter().next())
125 }
126
127 async fn get_table_count<E>(&self, table: &Table<Self, E>) -> DatasetResult<i64>
128 where
129 E: Entity<Self::Value>,
130 Self: Sized,
131 {
132 let all = self.list_table_values(table).await?;
133 Ok(all.len() as i64)
134 }
135
136 async fn get_table_sum<E>(
137 &self,
138 _table: &Table<Self, E>,
139 _column: &Self::Column<Self::AnyType>,
140 ) -> DatasetResult<Self::Value>
141 where
142 E: Entity<Self::Value>,
143 Self: Sized,
144 {
145 Err(error!("Aggregations not supported by vantage-aws"))
146 }
147
148 async fn get_table_max<E>(
149 &self,
150 _table: &Table<Self, E>,
151 _column: &Self::Column<Self::AnyType>,
152 ) -> DatasetResult<Self::Value>
153 where
154 E: Entity<Self::Value>,
155 Self: Sized,
156 {
157 Err(error!("Aggregations not supported by vantage-aws"))
158 }
159
160 async fn get_table_min<E>(
161 &self,
162 _table: &Table<Self, E>,
163 _column: &Self::Column<Self::AnyType>,
164 ) -> DatasetResult<Self::Value>
165 where
166 E: Entity<Self::Value>,
167 Self: Sized,
168 {
169 Err(error!("Aggregations not supported by vantage-aws"))
170 }
171
172 async fn insert_table_value<E>(
173 &self,
174 _table: &Table<Self, E>,
175 _id: &Self::Id,
176 _record: &Record<Self::Value>,
177 ) -> DatasetResult<Record<Self::Value>>
178 where
179 E: Entity<Self::Value>,
180 Self: Sized,
181 {
182 Err(error!("vantage-aws is read-only in v0"))
183 }
184
185 async fn replace_table_value<E>(
186 &self,
187 _table: &Table<Self, E>,
188 _id: &Self::Id,
189 _record: &Record<Self::Value>,
190 ) -> DatasetResult<Record<Self::Value>>
191 where
192 E: Entity<Self::Value>,
193 Self: Sized,
194 {
195 Err(error!("vantage-aws is read-only in v0"))
196 }
197
198 async fn patch_table_value<E>(
199 &self,
200 _table: &Table<Self, E>,
201 _id: &Self::Id,
202 _partial: &Record<Self::Value>,
203 ) -> DatasetResult<Record<Self::Value>>
204 where
205 E: Entity<Self::Value>,
206 Self: Sized,
207 {
208 Err(error!("vantage-aws is read-only in v0"))
209 }
210
211 async fn delete_table_value<E>(
212 &self,
213 _table: &Table<Self, E>,
214 _id: &Self::Id,
215 ) -> DatasetResult<()>
216 where
217 E: Entity<Self::Value>,
218 Self: Sized,
219 {
220 Err(error!("vantage-aws is read-only in v0"))
221 }
222
223 async fn delete_table_all_values<E>(&self, _table: &Table<Self, E>) -> DatasetResult<()>
224 where
225 E: Entity<Self::Value>,
226 Self: Sized,
227 {
228 Err(error!("vantage-aws is read-only in v0"))
229 }
230
231 async fn insert_table_return_id_value<E>(
232 &self,
233 _table: &Table<Self, E>,
234 _record: &Record<Self::Value>,
235 ) -> DatasetResult<Self::Id>
236 where
237 E: Entity<Self::Value>,
238 Self: Sized,
239 {
240 Err(error!("vantage-aws is read-only in v0"))
241 }
242
243 fn related_in_condition<SourceE: Entity<Self::Value> + 'static>(
244 &self,
245 target_field: &str,
246 source_table: &Table<Self, SourceE>,
247 source_column: &str,
248 ) -> Self::Condition
249 where
250 Self: Sized,
251 {
252 let src_col = self.create_column::<Self::AnyType>(source_column);
259 let values_expr = self.column_table_values_expr(source_table, &src_col);
260 AwsCondition::Deferred {
261 field: target_field.to_string(),
262 source: values_expr.expr(),
263 }
264 }
265
266 fn column_table_values_expr<'a, E, Type: ColumnType>(
267 &'a self,
268 table: &Table<Self, E>,
269 column: &Self::Column<Type>,
270 ) -> AssociatedExpression<'a, Self, Self::Value, Vec<Type>>
271 where
272 E: Entity<Self::Value> + 'static,
273 Self: Sized,
274 {
275 let table_clone = table.clone();
281 let col = column.name().to_string();
282 let aws = self.clone();
283
284 let inner = expr_any!("{}", {
285 DeferredFn::new(move || {
286 let aws = aws.clone();
287 let table = table_clone.clone();
288 let col = col.clone();
289 Box::pin(async move {
290 let records = aws.list_table_values(&table).await?;
291 let values: Vec<CborValue> = records
292 .values()
293 .filter_map(|r| r.get(&col).cloned())
294 .collect();
295 Ok(ExpressiveEnum::Scalar(CborValue::Array(values)))
296 })
297 })
298 });
299
300 let expr = expr_any!("{}", { self.defer(inner) });
301 AssociatedExpression::new(expr, self)
302 }
303}