1use async_trait::async_trait;
14use ciborium::Value as CborValue;
15use indexmap::IndexMap;
16use serde_json::json;
17
18use vantage_core::error;
19use vantage_dataset::traits::Result as DatasetResult;
20use vantage_expressions::{
21 Expression, Expressive, expr_any,
22 traits::associated_expressions::AssociatedExpression,
23 traits::datasource::ExprDataSource,
24 traits::expressive::{DeferredFn, ExpressiveEnum},
25};
26use vantage_table::column::core::{Column, ColumnType};
27use vantage_table::table::Table;
28use vantage_table::traits::table_source::TableSource;
29use vantage_types::{Entity, Record};
30
31use crate::account::AwsAccount;
32use crate::condition::AwsCondition;
33
34#[async_trait]
35impl TableSource for AwsAccount {
36 type Column<Type>
37 = Column<Type>
38 where
39 Type: ColumnType;
40 type AnyType = CborValue;
41 type Value = CborValue;
42 type Id = String;
43 type Condition = AwsCondition;
44
45 fn eq_condition(field: &str, value: &str) -> DatasetResult<Self::Condition> {
46 Ok(AwsCondition::eq(field.to_string(), value.to_string()))
47 }
48
49 fn create_column<Type: ColumnType>(&self, name: &str) -> Self::Column<Type> {
50 Column::new(name)
51 }
52
53 fn to_any_column<Type: ColumnType>(
54 &self,
55 column: Self::Column<Type>,
56 ) -> Self::Column<Self::AnyType> {
57 Column::from_column(column)
58 }
59
60 fn convert_any_column<Type: ColumnType>(
61 &self,
62 any_column: Self::Column<Self::AnyType>,
63 ) -> Option<Self::Column<Type>> {
64 Some(Column::from_column(any_column))
65 }
66
67 fn expr(
68 &self,
69 template: impl Into<String>,
70 parameters: Vec<ExpressiveEnum<Self::Value>>,
71 ) -> Expression<Self::Value> {
72 Expression::new(template, parameters)
73 }
74
75 fn search_table_condition<E>(
76 &self,
77 _table: &Table<Self, E>,
78 search_value: &str,
79 ) -> Self::Condition
80 where
81 E: Entity<Self::Value>,
82 {
83 AwsCondition::eq("__search__", json!(search_value).to_string())
87 }
88
89 async fn list_table_values<E>(
90 &self,
91 table: &Table<Self, E>,
92 ) -> DatasetResult<IndexMap<Self::Id, Record<Self::Value>>>
93 where
94 E: Entity<Self::Value>,
95 Self: Sized,
96 {
97 let id_field = table.id_field().map(|c| c.name().to_string());
98 let conditions: Vec<AwsCondition> = table.conditions().cloned().collect();
99 let resp = self.execute_rpc(table.table_name(), &conditions).await?;
100 let mut records = self.parse_records(table.table_name(), resp, id_field.as_deref())?;
101
102 records.retain(|_id, record| {
110 conditions.iter().all(|c| match c {
111 AwsCondition::Eq { field, value } => match record.get(field) {
112 Some(rec_val) => rec_val == value,
113 None => true,
114 },
115 _ => true,
116 })
117 });
118
119 Ok(records)
120 }
121
122 async fn get_table_value<E>(
123 &self,
124 table: &Table<Self, E>,
125 id: &Self::Id,
126 ) -> DatasetResult<Option<Record<Self::Value>>>
127 where
128 E: Entity<Self::Value>,
129 Self: Sized,
130 {
131 let mut all = self.list_table_values(table).await?;
135 Ok(all.shift_remove(id))
136 }
137
138 async fn get_table_some_value<E>(
139 &self,
140 table: &Table<Self, E>,
141 ) -> DatasetResult<Option<(Self::Id, Record<Self::Value>)>>
142 where
143 E: Entity<Self::Value>,
144 Self: Sized,
145 {
146 let all = self.list_table_values(table).await?;
147 Ok(all.into_iter().next())
148 }
149
150 async fn get_table_count<E>(&self, table: &Table<Self, E>) -> DatasetResult<i64>
151 where
152 E: Entity<Self::Value>,
153 Self: Sized,
154 {
155 let all = self.list_table_values(table).await?;
156 Ok(all.len() as i64)
157 }
158
159 async fn get_table_sum<E>(
160 &self,
161 _table: &Table<Self, E>,
162 _column: &Self::Column<Self::AnyType>,
163 ) -> DatasetResult<Self::Value>
164 where
165 E: Entity<Self::Value>,
166 Self: Sized,
167 {
168 Err(error!("Aggregations not supported by vantage-aws"))
169 }
170
171 async fn get_table_max<E>(
172 &self,
173 _table: &Table<Self, E>,
174 _column: &Self::Column<Self::AnyType>,
175 ) -> DatasetResult<Self::Value>
176 where
177 E: Entity<Self::Value>,
178 Self: Sized,
179 {
180 Err(error!("Aggregations not supported by vantage-aws"))
181 }
182
183 async fn get_table_min<E>(
184 &self,
185 _table: &Table<Self, E>,
186 _column: &Self::Column<Self::AnyType>,
187 ) -> DatasetResult<Self::Value>
188 where
189 E: Entity<Self::Value>,
190 Self: Sized,
191 {
192 Err(error!("Aggregations not supported by vantage-aws"))
193 }
194
195 async fn insert_table_value<E>(
196 &self,
197 _table: &Table<Self, E>,
198 _id: &Self::Id,
199 _record: &Record<Self::Value>,
200 ) -> DatasetResult<Record<Self::Value>>
201 where
202 E: Entity<Self::Value>,
203 Self: Sized,
204 {
205 Err(error!("vantage-aws is read-only in v0"))
206 }
207
208 async fn replace_table_value<E>(
209 &self,
210 _table: &Table<Self, E>,
211 _id: &Self::Id,
212 _record: &Record<Self::Value>,
213 ) -> DatasetResult<Record<Self::Value>>
214 where
215 E: Entity<Self::Value>,
216 Self: Sized,
217 {
218 Err(error!("vantage-aws is read-only in v0"))
219 }
220
221 async fn patch_table_value<E>(
222 &self,
223 _table: &Table<Self, E>,
224 _id: &Self::Id,
225 _partial: &Record<Self::Value>,
226 ) -> DatasetResult<Record<Self::Value>>
227 where
228 E: Entity<Self::Value>,
229 Self: Sized,
230 {
231 Err(error!("vantage-aws is read-only in v0"))
232 }
233
234 async fn delete_table_value<E>(
235 &self,
236 _table: &Table<Self, E>,
237 _id: &Self::Id,
238 ) -> DatasetResult<()>
239 where
240 E: Entity<Self::Value>,
241 Self: Sized,
242 {
243 Err(error!("vantage-aws is read-only in v0"))
244 }
245
246 async fn delete_table_all_values<E>(&self, _table: &Table<Self, E>) -> DatasetResult<()>
247 where
248 E: Entity<Self::Value>,
249 Self: Sized,
250 {
251 Err(error!("vantage-aws is read-only in v0"))
252 }
253
254 async fn insert_table_return_id_value<E>(
255 &self,
256 _table: &Table<Self, E>,
257 _record: &Record<Self::Value>,
258 ) -> DatasetResult<Self::Id>
259 where
260 E: Entity<Self::Value>,
261 Self: Sized,
262 {
263 Err(error!("vantage-aws is read-only in v0"))
264 }
265
266 fn related_in_condition<SourceE: Entity<Self::Value> + 'static>(
267 &self,
268 target_field: &str,
269 source_table: &Table<Self, SourceE>,
270 source_column: &str,
271 ) -> Self::Condition
272 where
273 Self: Sized,
274 {
275 let src_col = self.create_column::<Self::AnyType>(source_column);
282 let values_expr = self.column_table_values_expr(source_table, &src_col);
283 AwsCondition::Deferred {
284 field: target_field.to_string(),
285 source: values_expr.expr(),
286 }
287 }
288
289 fn column_table_values_expr<'a, E, Type: ColumnType>(
290 &'a self,
291 table: &Table<Self, E>,
292 column: &Self::Column<Type>,
293 ) -> AssociatedExpression<'a, Self, Self::Value, Vec<Type>>
294 where
295 E: Entity<Self::Value> + 'static,
296 Self: Sized,
297 {
298 let table_clone = table.clone();
304 let col = column.name().to_string();
305 let aws = self.clone();
306
307 let inner = expr_any!("{}", {
308 DeferredFn::new(move || {
309 let aws = aws.clone();
310 let table = table_clone.clone();
311 let col = col.clone();
312 Box::pin(async move {
313 let records = aws.list_table_values(&table).await?;
314 let values: Vec<CborValue> = records
315 .values()
316 .filter_map(|r| r.get(&col).cloned())
317 .collect();
318 Ok(ExpressiveEnum::Scalar(CborValue::Array(values)))
319 })
320 })
321 });
322
323 let expr = expr_any!("{}", { self.defer(inner) });
324 AssociatedExpression::new(expr, self)
325 }
326}