1use std::pin::Pin;
2use std::sync::Arc;
3
4use async_trait::async_trait;
5use futures_core::Stream;
6use indexmap::IndexMap;
7use serde_json::Value;
8use tokio_stream::StreamExt;
9use vantage_core::error;
10use vantage_dataset::traits::Result;
11use vantage_expressions::traits::associated_expressions::AssociatedExpression;
12use vantage_expressions::traits::datasource::{DataSource, ExprDataSource};
13use vantage_expressions::traits::expressive::{DeferredFn, ExpressiveEnum};
14use vantage_expressions::Expression;
15use vantage_table::column::core::{Column, ColumnType};
16use vantage_table::table::Table;
17use vantage_table::traits::table_source::TableSource;
18use vantage_types::{Entity, Record};
19
20use crate::{AwwPool, PaginatedStream};
21
22#[derive(Clone)]
28pub struct PoolApi {
29 pool: Arc<AwwPool>,
30 id_field: Option<String>,
31}
32
33impl std::fmt::Debug for PoolApi {
34 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
35 f.debug_struct("PoolApi")
36 .field("id_field", &self.id_field)
37 .finish()
38 }
39}
40
41impl PoolApi {
42 pub fn new(pool: Arc<AwwPool>) -> Self {
43 Self {
44 pool,
45 id_field: None,
46 }
47 }
48
49 pub fn pool(&self) -> &Arc<AwwPool> {
51 &self.pool
52 }
53
54 fn id_field_for<E: Entity<Value>>(&self, table: &Table<Self, E>) -> Option<String> {
55 table
56 .id_field()
57 .map(|col| col.name().to_string())
58 .or_else(|| self.id_field.clone())
59 }
60}
61
62impl DataSource for PoolApi {}
63
64impl ExprDataSource<Value> for PoolApi {
65 async fn execute(&self, expr: &Expression<Value>) -> vantage_core::Result<Value> {
66 if expr.parameters.is_empty() {
67 Ok(Value::String(expr.template.clone()))
68 } else {
69 Ok(Value::Null)
70 }
71 }
72
73 fn defer(&self, expr: Expression<Value>) -> DeferredFn<Value> {
74 let api = self.clone();
75 DeferredFn::new(move || {
76 let api = api.clone();
77 let expr = expr.clone();
78 Box::pin(async move {
79 let result = api.execute(&expr).await?;
80 Ok(ExpressiveEnum::Scalar(result))
81 })
82 })
83 }
84}
85
86#[async_trait]
87impl TableSource for PoolApi {
88 type Column<Type>
89 = Column<Type>
90 where
91 Type: ColumnType;
92 type AnyType = Value;
93 type Value = Value;
94 type Id = String;
95 type Condition = vantage_expressions::Expression<Self::Value>;
96
97 fn create_column<Type: ColumnType>(&self, name: &str) -> Self::Column<Type> {
98 Column::new(name)
99 }
100
101 fn to_any_column<Type: ColumnType>(
102 &self,
103 column: Self::Column<Type>,
104 ) -> Self::Column<Self::AnyType> {
105 Column::from_column(column)
106 }
107
108 fn convert_any_column<Type: ColumnType>(
109 &self,
110 any_column: Self::Column<Self::AnyType>,
111 ) -> Option<Self::Column<Type>> {
112 Some(Column::from_column(any_column))
113 }
114
115 fn expr(
116 &self,
117 template: impl Into<String>,
118 parameters: Vec<ExpressiveEnum<Self::Value>>,
119 ) -> Expression<Self::Value> {
120 Expression::new(template, parameters)
121 }
122
123 fn search_table_condition<E>(
124 &self,
125 _table: &Table<Self, E>,
126 search_value: &str,
127 ) -> Expression<Self::Value>
128 where
129 E: Entity<Self::Value>,
130 {
131 Expression::new(format!("SEARCH '{}'", search_value), vec![])
132 }
133
134 async fn list_table_values<E>(
136 &self,
137 table: &Table<Self, E>,
138 ) -> Result<IndexMap<Self::Id, Record<Self::Value>>>
139 where
140 E: Entity<Self::Value>,
141 Self: Sized,
142 {
143 let id_field = self.id_field_for(table);
144 let endpoint = format!("/{}", table.table_name());
145 let mut stream = PaginatedStream::get(self.pool.clone(), endpoint).prefetch(3);
146
147 let mut records = IndexMap::new();
148 while let Some(item) = stream.next().await {
149 let item = item.map_err(|e| error!("Stream error", detail = e))?;
150 let obj = item
151 .as_object()
152 .ok_or_else(|| error!("API data item is not an object"))?;
153
154 let id = id_field
155 .as_deref()
156 .and_then(|field| obj.get(field))
157 .and_then(|v| match v {
158 Value::String(s) => Some(s.clone()),
159 Value::Number(n) => Some(n.to_string()),
160 _ => None,
161 })
162 .unwrap_or_else(|| records.len().to_string());
163
164 let record: Record<Value> = obj.iter().map(|(k, v)| (k.clone(), v.clone())).collect();
165 records.insert(id, record);
166 }
167
168 Ok(records)
169 }
170
171 async fn get_table_value<E>(
172 &self,
173 table: &Table<Self, E>,
174 id: &Self::Id,
175 ) -> Result<Option<Record<Self::Value>>>
176 where
177 E: Entity<Self::Value>,
178 Self: Sized,
179 {
180 let records = self.list_table_values(table).await?;
182 Ok(records.get(id).cloned())
183 }
184
185 async fn get_table_some_value<E>(
186 &self,
187 table: &Table<Self, E>,
188 ) -> Result<Option<(Self::Id, Record<Self::Value>)>>
189 where
190 E: Entity<Self::Value>,
191 Self: Sized,
192 {
193 let records = self.list_table_values(table).await?;
194 Ok(records.into_iter().next())
195 }
196
197 async fn get_table_count<E>(&self, table: &Table<Self, E>) -> Result<i64>
198 where
199 E: Entity<Self::Value>,
200 Self: Sized,
201 {
202 let records = self.list_table_values(table).await?;
203 Ok(records.len() as i64)
204 }
205
206 async fn get_table_sum<E>(
207 &self,
208 _table: &Table<Self, E>,
209 _column: &Self::Column<Self::AnyType>,
210 ) -> Result<Self::Value>
211 where
212 E: Entity<Self::Value>,
213 Self: Sized,
214 {
215 Err(error!("Sum not implemented for API pool backend"))
216 }
217
218 async fn get_table_max<E>(
219 &self,
220 _table: &Table<Self, E>,
221 _column: &Self::Column<Self::AnyType>,
222 ) -> Result<Self::Value>
223 where
224 E: Entity<Self::Value>,
225 Self: Sized,
226 {
227 Err(error!("Max not implemented for API pool backend"))
228 }
229
230 async fn get_table_min<E>(
231 &self,
232 _table: &Table<Self, E>,
233 _column: &Self::Column<Self::AnyType>,
234 ) -> Result<Self::Value>
235 where
236 E: Entity<Self::Value>,
237 Self: Sized,
238 {
239 Err(error!("Min not implemented for API pool backend"))
240 }
241
242 fn stream_table_values<'a, E>(
244 &'a self,
245 table: &Table<Self, E>,
246 ) -> Pin<Box<dyn Stream<Item = Result<(Self::Id, Record<Self::Value>)>> + Send + 'a>>
247 where
248 E: Entity<Self::Value> + 'a,
249 Self: Sized,
250 {
251 let id_field = self.id_field_for(table);
252 let endpoint = format!("/{}", table.table_name());
253 let pool = self.pool.clone();
254
255 Box::pin(async_stream::stream! {
256 let mut stream = PaginatedStream::get(pool, endpoint).prefetch(3);
257 let mut row_idx = 0usize;
258
259 while let Some(item) = stream.next().await {
260 let item = match item {
261 Ok(v) => v,
262 Err(e) => {
263 yield Err(error!("Stream error", detail = e));
264 return;
265 }
266 };
267
268 let result = match item.as_object() {
269 Some(obj) => {
270 let id = id_field
271 .as_deref()
272 .and_then(|field| obj.get(field))
273 .and_then(|v| match v {
274 Value::String(s) => Some(s.clone()),
275 Value::Number(n) => Some(n.to_string()),
276 _ => None,
277 })
278 .unwrap_or_else(|| row_idx.to_string());
279
280 let record: Record<Value> =
281 obj.iter().map(|(k, v)| (k.clone(), v.clone())).collect();
282 row_idx += 1;
283 Ok((id, record))
284 }
285 None => Err(error!("API data item is not an object")),
286 };
287
288 yield result;
289 }
290 })
291 }
292
293 async fn insert_table_value<E>(
295 &self,
296 _table: &Table<Self, E>,
297 _id: &Self::Id,
298 _record: &Record<Self::Value>,
299 ) -> Result<Record<Self::Value>>
300 where
301 E: Entity<Self::Value>,
302 Self: Sized,
303 {
304 Err(error!("REST API pool is a read-only data source"))
305 }
306
307 async fn replace_table_value<E>(
308 &self,
309 _table: &Table<Self, E>,
310 _id: &Self::Id,
311 _record: &Record<Self::Value>,
312 ) -> Result<Record<Self::Value>>
313 where
314 E: Entity<Self::Value>,
315 Self: Sized,
316 {
317 Err(error!("REST API pool is a read-only data source"))
318 }
319
320 async fn patch_table_value<E>(
321 &self,
322 _table: &Table<Self, E>,
323 _id: &Self::Id,
324 _partial: &Record<Self::Value>,
325 ) -> Result<Record<Self::Value>>
326 where
327 E: Entity<Self::Value>,
328 Self: Sized,
329 {
330 Err(error!("REST API pool is a read-only data source"))
331 }
332
333 async fn delete_table_value<E>(&self, _table: &Table<Self, E>, _id: &Self::Id) -> Result<()>
334 where
335 E: Entity<Self::Value>,
336 Self: Sized,
337 {
338 Err(error!("REST API pool is a read-only data source"))
339 }
340
341 async fn delete_table_all_values<E>(&self, _table: &Table<Self, E>) -> Result<()>
342 where
343 E: Entity<Self::Value>,
344 Self: Sized,
345 {
346 Err(error!("REST API pool is a read-only data source"))
347 }
348
349 async fn insert_table_return_id_value<E>(
350 &self,
351 _table: &Table<Self, E>,
352 _record: &Record<Self::Value>,
353 ) -> Result<Self::Id>
354 where
355 E: Entity<Self::Value>,
356 Self: Sized,
357 {
358 Err(error!("REST API pool is a read-only data source"))
359 }
360
361 fn related_in_condition<SourceE: Entity<Self::Value> + 'static>(
362 &self,
363 _target_field: &str,
364 _source_table: &Table<Self, SourceE>,
365 _source_column: &str,
366 ) -> Self::Condition
367 where
368 Self: Sized,
369 {
370 unimplemented!("related_in_condition not yet supported for API pool")
371 }
372
373 fn column_table_values_expr<'a, E, Type: ColumnType>(
374 &'a self,
375 _table: &Table<Self, E>,
376 _column: &Self::Column<Type>,
377 ) -> AssociatedExpression<'a, Self, Self::Value, Vec<Type>>
378 where
379 E: Entity<Self::Value> + 'static,
380 Self: Sized,
381 {
382 unimplemented!("column_table_values_expr not yet supported for API pool")
383 }
384}