1use std::pin::Pin;
2use std::sync::Arc;
3
4use async_trait::async_trait;
5use futures_core::Stream;
6use indexmap::IndexMap;
7use serde_json::Value;
8use tokio_stream::StreamExt;
9use vantage_core::error;
10use vantage_dataset::traits::Result;
11use vantage_expressions::traits::associated_expressions::AssociatedExpression;
12use vantage_expressions::traits::datasource::{DataSource, ExprDataSource};
13use vantage_expressions::traits::expressive::{DeferredFn, ExpressiveEnum};
14use vantage_expressions::Expression;
15use vantage_table::column::core::{Column, ColumnType};
16use vantage_table::table::Table;
17use vantage_table::traits::table_source::TableSource;
18use vantage_types::{Entity, Record};
19
20use crate::{AwwPool, PaginatedStream};
21
22#[derive(Clone)]
28pub struct PoolApi {
29 pool: Arc<AwwPool>,
30 id_field: Option<String>,
31}
32
33impl std::fmt::Debug for PoolApi {
34 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
35 f.debug_struct("PoolApi")
36 .field("id_field", &self.id_field)
37 .finish()
38 }
39}
40
41impl PoolApi {
42 pub fn new(pool: Arc<AwwPool>) -> Self {
43 Self {
44 pool,
45 id_field: None,
46 }
47 }
48
49 pub fn pool(&self) -> &Arc<AwwPool> {
51 &self.pool
52 }
53
54 fn id_field_for<E: Entity<Value>>(&self, table: &Table<Self, E>) -> Option<String> {
55 table
56 .id_field()
57 .map(|col| col.name().to_string())
58 .or_else(|| self.id_field.clone())
59 }
60}
61
62impl DataSource for PoolApi {}
63
64impl ExprDataSource<Value> for PoolApi {
65 async fn execute(&self, expr: &Expression<Value>) -> vantage_core::Result<Value> {
66 if expr.parameters.is_empty() {
67 Ok(Value::String(expr.template.clone()))
68 } else {
69 Ok(Value::Null)
70 }
71 }
72
73 fn defer(&self, expr: Expression<Value>) -> DeferredFn<Value> {
74 let api = self.clone();
75 DeferredFn::new(move || {
76 let api = api.clone();
77 let expr = expr.clone();
78 Box::pin(async move {
79 let result = api.execute(&expr).await?;
80 Ok(ExpressiveEnum::Scalar(result))
81 })
82 })
83 }
84}
85
86#[async_trait]
87impl TableSource for PoolApi {
88 type Column<Type>
89 = Column<Type>
90 where
91 Type: ColumnType;
92 type AnyType = Value;
93 type Value = Value;
94 type Id = String;
95 type Condition = vantage_expressions::Expression<Self::Value>;
96 type Source = String;
97
98 fn create_column<Type: ColumnType>(&self, name: &str) -> Self::Column<Type> {
99 Column::new(name)
100 }
101
102 fn to_any_column<Type: ColumnType>(
103 &self,
104 column: Self::Column<Type>,
105 ) -> Self::Column<Self::AnyType> {
106 Column::from_column(column)
107 }
108
109 fn convert_any_column<Type: ColumnType>(
110 &self,
111 any_column: Self::Column<Self::AnyType>,
112 ) -> Option<Self::Column<Type>> {
113 Some(Column::from_column(any_column))
114 }
115
116 fn expr(
117 &self,
118 template: impl Into<String>,
119 parameters: Vec<ExpressiveEnum<Self::Value>>,
120 ) -> Expression<Self::Value> {
121 Expression::new(template, parameters)
122 }
123
124 fn search_table_condition<E>(
125 &self,
126 _table: &Table<Self, E>,
127 search_value: &str,
128 ) -> Expression<Self::Value>
129 where
130 E: Entity<Self::Value>,
131 {
132 Expression::new(format!("SEARCH '{}'", search_value), vec![])
133 }
134
135 async fn list_table_values<E>(
137 &self,
138 table: &Table<Self, E>,
139 ) -> Result<IndexMap<Self::Id, Record<Self::Value>>>
140 where
141 E: Entity<Self::Value>,
142 Self: Sized,
143 {
144 let id_field = self.id_field_for(table);
145 let endpoint = format!("/{}", table.table_name());
146 let mut stream = PaginatedStream::get(self.pool.clone(), endpoint).prefetch(3);
147
148 let mut records = IndexMap::new();
149 while let Some(item) = stream.next().await {
150 let item = item.map_err(|e| error!("Stream error", detail = e))?;
151 let obj = item
152 .as_object()
153 .ok_or_else(|| error!("API data item is not an object"))?;
154
155 let id = id_field
156 .as_deref()
157 .and_then(|field| obj.get(field))
158 .and_then(|v| match v {
159 Value::String(s) => Some(s.clone()),
160 Value::Number(n) => Some(n.to_string()),
161 _ => None,
162 })
163 .unwrap_or_else(|| records.len().to_string());
164
165 let record: Record<Value> = obj.iter().map(|(k, v)| (k.clone(), v.clone())).collect();
166 records.insert(id, record);
167 }
168
169 Ok(records)
170 }
171
172 async fn get_table_value<E>(
173 &self,
174 table: &Table<Self, E>,
175 id: &Self::Id,
176 ) -> Result<Option<Record<Self::Value>>>
177 where
178 E: Entity<Self::Value>,
179 Self: Sized,
180 {
181 let records = self.list_table_values(table).await?;
183 Ok(records.get(id).cloned())
184 }
185
186 async fn get_table_some_value<E>(
187 &self,
188 table: &Table<Self, E>,
189 ) -> Result<Option<(Self::Id, Record<Self::Value>)>>
190 where
191 E: Entity<Self::Value>,
192 Self: Sized,
193 {
194 let records = self.list_table_values(table).await?;
195 Ok(records.into_iter().next())
196 }
197
198 async fn get_table_count<E>(&self, table: &Table<Self, E>) -> Result<i64>
199 where
200 E: Entity<Self::Value>,
201 Self: Sized,
202 {
203 let records = self.list_table_values(table).await?;
204 Ok(records.len() as i64)
205 }
206
207 async fn get_table_sum<E>(
208 &self,
209 _table: &Table<Self, E>,
210 _column: &Self::Column<Self::AnyType>,
211 ) -> Result<Self::Value>
212 where
213 E: Entity<Self::Value>,
214 Self: Sized,
215 {
216 Err(error!("Sum not implemented for API pool backend"))
217 }
218
219 async fn get_table_max<E>(
220 &self,
221 _table: &Table<Self, E>,
222 _column: &Self::Column<Self::AnyType>,
223 ) -> Result<Self::Value>
224 where
225 E: Entity<Self::Value>,
226 Self: Sized,
227 {
228 Err(error!("Max not implemented for API pool backend"))
229 }
230
231 async fn get_table_min<E>(
232 &self,
233 _table: &Table<Self, E>,
234 _column: &Self::Column<Self::AnyType>,
235 ) -> Result<Self::Value>
236 where
237 E: Entity<Self::Value>,
238 Self: Sized,
239 {
240 Err(error!("Min not implemented for API pool backend"))
241 }
242
243 fn stream_table_values<'a, E>(
245 &'a self,
246 table: &Table<Self, E>,
247 ) -> Pin<Box<dyn Stream<Item = Result<(Self::Id, Record<Self::Value>)>> + Send + 'a>>
248 where
249 E: Entity<Self::Value> + 'a,
250 Self: Sized,
251 {
252 let id_field = self.id_field_for(table);
253 let endpoint = format!("/{}", table.table_name());
254 let pool = self.pool.clone();
255
256 Box::pin(async_stream::stream! {
257 let mut stream = PaginatedStream::get(pool, endpoint).prefetch(3);
258 let mut row_idx = 0usize;
259
260 while let Some(item) = stream.next().await {
261 let item = match item {
262 Ok(v) => v,
263 Err(e) => {
264 yield Err(error!("Stream error", detail = e));
265 return;
266 }
267 };
268
269 let result = match item.as_object() {
270 Some(obj) => {
271 let id = id_field
272 .as_deref()
273 .and_then(|field| obj.get(field))
274 .and_then(|v| match v {
275 Value::String(s) => Some(s.clone()),
276 Value::Number(n) => Some(n.to_string()),
277 _ => None,
278 })
279 .unwrap_or_else(|| row_idx.to_string());
280
281 let record: Record<Value> =
282 obj.iter().map(|(k, v)| (k.clone(), v.clone())).collect();
283 row_idx += 1;
284 Ok((id, record))
285 }
286 None => Err(error!("API data item is not an object")),
287 };
288
289 yield result;
290 }
291 })
292 }
293
294 async fn insert_table_value<E>(
296 &self,
297 _table: &Table<Self, E>,
298 _id: &Self::Id,
299 _record: &Record<Self::Value>,
300 ) -> Result<Record<Self::Value>>
301 where
302 E: Entity<Self::Value>,
303 Self: Sized,
304 {
305 Err(error!("REST API pool is a read-only data source"))
306 }
307
308 async fn replace_table_value<E>(
309 &self,
310 _table: &Table<Self, E>,
311 _id: &Self::Id,
312 _record: &Record<Self::Value>,
313 ) -> Result<Record<Self::Value>>
314 where
315 E: Entity<Self::Value>,
316 Self: Sized,
317 {
318 Err(error!("REST API pool is a read-only data source"))
319 }
320
321 async fn patch_table_value<E>(
322 &self,
323 _table: &Table<Self, E>,
324 _id: &Self::Id,
325 _partial: &Record<Self::Value>,
326 ) -> Result<Record<Self::Value>>
327 where
328 E: Entity<Self::Value>,
329 Self: Sized,
330 {
331 Err(error!("REST API pool is a read-only data source"))
332 }
333
334 async fn delete_table_value<E>(&self, _table: &Table<Self, E>, _id: &Self::Id) -> Result<()>
335 where
336 E: Entity<Self::Value>,
337 Self: Sized,
338 {
339 Err(error!("REST API pool is a read-only data source"))
340 }
341
342 async fn delete_table_all_values<E>(&self, _table: &Table<Self, E>) -> Result<()>
343 where
344 E: Entity<Self::Value>,
345 Self: Sized,
346 {
347 Err(error!("REST API pool is a read-only data source"))
348 }
349
350 async fn insert_table_return_id_value<E>(
351 &self,
352 _table: &Table<Self, E>,
353 _record: &Record<Self::Value>,
354 ) -> Result<Self::Id>
355 where
356 E: Entity<Self::Value>,
357 Self: Sized,
358 {
359 Err(error!("REST API pool is a read-only data source"))
360 }
361
362 fn related_in_condition<SourceE: Entity<Self::Value> + 'static>(
363 &self,
364 _target_field: &str,
365 _source_table: &Table<Self, SourceE>,
366 _source_column: &str,
367 ) -> Self::Condition
368 where
369 Self: Sized,
370 {
371 unimplemented!("related_in_condition not yet supported for API pool")
372 }
373
374 fn column_table_values_expr<'a, E, Type: ColumnType>(
375 &'a self,
376 _table: &Table<Self, E>,
377 _column: &Self::Column<Type>,
378 ) -> AssociatedExpression<'a, Self, Self::Value, Vec<Type>>
379 where
380 E: Entity<Self::Value> + 'static,
381 Self: Sized,
382 {
383 unimplemented!("column_table_values_expr not yet supported for API pool")
384 }
385}