Skip to main content

vantage_api_pool/
pool_api.rs

1use std::pin::Pin;
2use std::sync::Arc;
3
4use async_trait::async_trait;
5use futures_core::Stream;
6use indexmap::IndexMap;
7use serde_json::Value;
8use tokio_stream::StreamExt;
9use vantage_core::error;
10use vantage_dataset::traits::Result;
11use vantage_expressions::traits::associated_expressions::AssociatedExpression;
12use vantage_expressions::traits::datasource::{DataSource, ExprDataSource};
13use vantage_expressions::traits::expressive::{DeferredFn, ExpressiveEnum};
14use vantage_expressions::Expression;
15use vantage_table::column::core::{Column, ColumnType};
16use vantage_table::table::Table;
17use vantage_table::traits::table_source::TableSource;
18use vantage_types::{Entity, Record};
19
20use crate::{AwwPool, PaginatedStream};
21
22/// Vantage `TableSource` backed by `AwwPool` with automatic pagination.
23///
24/// Wraps `Arc<AwwPool>` so it's cheaply cloneable (required by `TableSource`).
25/// - `list_table_values` auto-paginates, collecting all pages.
26/// - `stream_table_values` returns a `PaginatedStream` for incremental processing.
27#[derive(Clone)]
28pub struct PoolApi {
29    pool: Arc<AwwPool>,
30    id_field: Option<String>,
31}
32
33impl std::fmt::Debug for PoolApi {
34    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
35        f.debug_struct("PoolApi")
36            .field("id_field", &self.id_field)
37            .finish()
38    }
39}
40
41impl PoolApi {
42    pub fn new(pool: Arc<AwwPool>) -> Self {
43        Self {
44            pool,
45            id_field: None,
46        }
47    }
48
49    /// Get a reference to the underlying pool.
50    pub fn pool(&self) -> &Arc<AwwPool> {
51        &self.pool
52    }
53
54    fn id_field_for<E: Entity<Value>>(&self, table: &Table<Self, E>) -> Option<String> {
55        table
56            .id_field()
57            .map(|col| col.name().to_string())
58            .or_else(|| self.id_field.clone())
59    }
60}
61
62impl DataSource for PoolApi {}
63
64impl ExprDataSource<Value> for PoolApi {
65    async fn execute(&self, expr: &Expression<Value>) -> vantage_core::Result<Value> {
66        if expr.parameters.is_empty() {
67            Ok(Value::String(expr.template.clone()))
68        } else {
69            Ok(Value::Null)
70        }
71    }
72
73    fn defer(&self, expr: Expression<Value>) -> DeferredFn<Value> {
74        let api = self.clone();
75        DeferredFn::new(move || {
76            let api = api.clone();
77            let expr = expr.clone();
78            Box::pin(async move {
79                let result = api.execute(&expr).await?;
80                Ok(ExpressiveEnum::Scalar(result))
81            })
82        })
83    }
84}
85
86#[async_trait]
87impl TableSource for PoolApi {
88    type Column<Type>
89        = Column<Type>
90    where
91        Type: ColumnType;
92    type AnyType = Value;
93    type Value = Value;
94    type Id = String;
95    type Condition = vantage_expressions::Expression<Self::Value>;
96    type Source = String;
97
98    fn create_column<Type: ColumnType>(&self, name: &str) -> Self::Column<Type> {
99        Column::new(name)
100    }
101
102    fn to_any_column<Type: ColumnType>(
103        &self,
104        column: Self::Column<Type>,
105    ) -> Self::Column<Self::AnyType> {
106        Column::from_column(column)
107    }
108
109    fn convert_any_column<Type: ColumnType>(
110        &self,
111        any_column: Self::Column<Self::AnyType>,
112    ) -> Option<Self::Column<Type>> {
113        Some(Column::from_column(any_column))
114    }
115
116    fn expr(
117        &self,
118        template: impl Into<String>,
119        parameters: Vec<ExpressiveEnum<Self::Value>>,
120    ) -> Expression<Self::Value> {
121        Expression::new(template, parameters)
122    }
123
124    fn search_table_condition<E>(
125        &self,
126        _table: &Table<Self, E>,
127        search_value: &str,
128    ) -> Expression<Self::Value>
129    where
130        E: Entity<Self::Value>,
131    {
132        Expression::new(format!("SEARCH '{}'", search_value), vec![])
133    }
134
135    /// Fetch all records by streaming through all pages.
136    async fn list_table_values<E>(
137        &self,
138        table: &Table<Self, E>,
139    ) -> Result<IndexMap<Self::Id, Record<Self::Value>>>
140    where
141        E: Entity<Self::Value>,
142        Self: Sized,
143    {
144        let id_field = self.id_field_for(table);
145        let endpoint = format!("/{}", table.table_name());
146        let mut stream = PaginatedStream::get(self.pool.clone(), endpoint).prefetch(3);
147
148        let mut records = IndexMap::new();
149        while let Some(item) = stream.next().await {
150            let item = item.map_err(|e| error!("Stream error", detail = e))?;
151            let obj = item
152                .as_object()
153                .ok_or_else(|| error!("API data item is not an object"))?;
154
155            let id = id_field
156                .as_deref()
157                .and_then(|field| obj.get(field))
158                .and_then(|v| match v {
159                    Value::String(s) => Some(s.clone()),
160                    Value::Number(n) => Some(n.to_string()),
161                    _ => None,
162                })
163                .unwrap_or_else(|| records.len().to_string());
164
165            let record: Record<Value> = obj.iter().map(|(k, v)| (k.clone(), v.clone())).collect();
166            records.insert(id, record);
167        }
168
169        Ok(records)
170    }
171
172    async fn get_table_value<E>(
173        &self,
174        table: &Table<Self, E>,
175        id: &Self::Id,
176    ) -> Result<Option<Record<Self::Value>>>
177    where
178        E: Entity<Self::Value>,
179        Self: Sized,
180    {
181        // Fetch all and find by id — could be optimized with a direct endpoint later
182        let records = self.list_table_values(table).await?;
183        Ok(records.get(id).cloned())
184    }
185
186    async fn get_table_some_value<E>(
187        &self,
188        table: &Table<Self, E>,
189    ) -> Result<Option<(Self::Id, Record<Self::Value>)>>
190    where
191        E: Entity<Self::Value>,
192        Self: Sized,
193    {
194        let records = self.list_table_values(table).await?;
195        Ok(records.into_iter().next())
196    }
197
198    async fn get_table_count<E>(&self, table: &Table<Self, E>) -> Result<i64>
199    where
200        E: Entity<Self::Value>,
201        Self: Sized,
202    {
203        let records = self.list_table_values(table).await?;
204        Ok(records.len() as i64)
205    }
206
207    async fn get_table_sum<E>(
208        &self,
209        _table: &Table<Self, E>,
210        _column: &Self::Column<Self::AnyType>,
211    ) -> Result<Self::Value>
212    where
213        E: Entity<Self::Value>,
214        Self: Sized,
215    {
216        Err(error!("Sum not implemented for API pool backend"))
217    }
218
219    async fn get_table_max<E>(
220        &self,
221        _table: &Table<Self, E>,
222        _column: &Self::Column<Self::AnyType>,
223    ) -> Result<Self::Value>
224    where
225        E: Entity<Self::Value>,
226        Self: Sized,
227    {
228        Err(error!("Max not implemented for API pool backend"))
229    }
230
231    async fn get_table_min<E>(
232        &self,
233        _table: &Table<Self, E>,
234        _column: &Self::Column<Self::AnyType>,
235    ) -> Result<Self::Value>
236    where
237        E: Entity<Self::Value>,
238        Self: Sized,
239    {
240        Err(error!("Min not implemented for API pool backend"))
241    }
242
243    /// Stream records page by page using PaginatedStream with prefetch.
244    fn stream_table_values<'a, E>(
245        &'a self,
246        table: &Table<Self, E>,
247    ) -> Pin<Box<dyn Stream<Item = Result<(Self::Id, Record<Self::Value>)>> + Send + 'a>>
248    where
249        E: Entity<Self::Value> + 'a,
250        Self: Sized,
251    {
252        let id_field = self.id_field_for(table);
253        let endpoint = format!("/{}", table.table_name());
254        let pool = self.pool.clone();
255
256        Box::pin(async_stream::stream! {
257            let mut stream = PaginatedStream::get(pool, endpoint).prefetch(3);
258            let mut row_idx = 0usize;
259
260            while let Some(item) = stream.next().await {
261                let item = match item {
262                    Ok(v) => v,
263                    Err(e) => {
264                        yield Err(error!("Stream error", detail = e));
265                        return;
266                    }
267                };
268
269                let result = match item.as_object() {
270                    Some(obj) => {
271                        let id = id_field
272                            .as_deref()
273                            .and_then(|field| obj.get(field))
274                            .and_then(|v| match v {
275                                Value::String(s) => Some(s.clone()),
276                                Value::Number(n) => Some(n.to_string()),
277                                _ => None,
278                            })
279                            .unwrap_or_else(|| row_idx.to_string());
280
281                        let record: Record<Value> =
282                            obj.iter().map(|(k, v)| (k.clone(), v.clone())).collect();
283                        row_idx += 1;
284                        Ok((id, record))
285                    }
286                    None => Err(error!("API data item is not an object")),
287                };
288
289                yield result;
290            }
291        })
292    }
293
294    // Write operations — not supported
295    async fn insert_table_value<E>(
296        &self,
297        _table: &Table<Self, E>,
298        _id: &Self::Id,
299        _record: &Record<Self::Value>,
300    ) -> Result<Record<Self::Value>>
301    where
302        E: Entity<Self::Value>,
303        Self: Sized,
304    {
305        Err(error!("REST API pool is a read-only data source"))
306    }
307
308    async fn replace_table_value<E>(
309        &self,
310        _table: &Table<Self, E>,
311        _id: &Self::Id,
312        _record: &Record<Self::Value>,
313    ) -> Result<Record<Self::Value>>
314    where
315        E: Entity<Self::Value>,
316        Self: Sized,
317    {
318        Err(error!("REST API pool is a read-only data source"))
319    }
320
321    async fn patch_table_value<E>(
322        &self,
323        _table: &Table<Self, E>,
324        _id: &Self::Id,
325        _partial: &Record<Self::Value>,
326    ) -> Result<Record<Self::Value>>
327    where
328        E: Entity<Self::Value>,
329        Self: Sized,
330    {
331        Err(error!("REST API pool is a read-only data source"))
332    }
333
334    async fn delete_table_value<E>(&self, _table: &Table<Self, E>, _id: &Self::Id) -> Result<()>
335    where
336        E: Entity<Self::Value>,
337        Self: Sized,
338    {
339        Err(error!("REST API pool is a read-only data source"))
340    }
341
342    async fn delete_table_all_values<E>(&self, _table: &Table<Self, E>) -> Result<()>
343    where
344        E: Entity<Self::Value>,
345        Self: Sized,
346    {
347        Err(error!("REST API pool is a read-only data source"))
348    }
349
350    async fn insert_table_return_id_value<E>(
351        &self,
352        _table: &Table<Self, E>,
353        _record: &Record<Self::Value>,
354    ) -> Result<Self::Id>
355    where
356        E: Entity<Self::Value>,
357        Self: Sized,
358    {
359        Err(error!("REST API pool is a read-only data source"))
360    }
361
362    fn related_in_condition<SourceE: Entity<Self::Value> + 'static>(
363        &self,
364        _target_field: &str,
365        _source_table: &Table<Self, SourceE>,
366        _source_column: &str,
367    ) -> Self::Condition
368    where
369        Self: Sized,
370    {
371        unimplemented!("related_in_condition not yet supported for API pool")
372    }
373
374    fn column_table_values_expr<'a, E, Type: ColumnType>(
375        &'a self,
376        _table: &Table<Self, E>,
377        _column: &Self::Column<Type>,
378    ) -> AssociatedExpression<'a, Self, Self::Value, Vec<Type>>
379    where
380        E: Entity<Self::Value> + 'static,
381        Self: Sized,
382    {
383        unimplemented!("column_table_values_expr not yet supported for API pool")
384    }
385}