Skip to main content

vantage_api_pool/
pool_api.rs

1use std::pin::Pin;
2use std::sync::Arc;
3
4use async_trait::async_trait;
5use futures_core::Stream;
6use indexmap::IndexMap;
7use serde_json::Value;
8use tokio_stream::StreamExt;
9use vantage_core::error;
10use vantage_dataset::traits::Result;
11use vantage_expressions::traits::associated_expressions::AssociatedExpression;
12use vantage_expressions::traits::datasource::{DataSource, ExprDataSource};
13use vantage_expressions::traits::expressive::{DeferredFn, ExpressiveEnum};
14use vantage_expressions::Expression;
15use vantage_table::column::core::{Column, ColumnType};
16use vantage_table::table::Table;
17use vantage_table::traits::table_source::TableSource;
18use vantage_types::{Entity, Record};
19
20use crate::{AwwPool, PaginatedStream};
21
22/// Vantage `TableSource` backed by `AwwPool` with automatic pagination.
23///
24/// Wraps `Arc<AwwPool>` so it's cheaply cloneable (required by `TableSource`).
25/// - `list_table_values` auto-paginates, collecting all pages.
26/// - `stream_table_values` returns a `PaginatedStream` for incremental processing.
27#[derive(Clone)]
28pub struct PoolApi {
29    pool: Arc<AwwPool>,
30    id_field: Option<String>,
31}
32
33impl std::fmt::Debug for PoolApi {
34    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
35        f.debug_struct("PoolApi")
36            .field("id_field", &self.id_field)
37            .finish()
38    }
39}
40
41impl PoolApi {
42    pub fn new(pool: Arc<AwwPool>) -> Self {
43        Self {
44            pool,
45            id_field: None,
46        }
47    }
48
49    /// Get a reference to the underlying pool.
50    pub fn pool(&self) -> &Arc<AwwPool> {
51        &self.pool
52    }
53
54    fn id_field_for<E: Entity<Value>>(&self, table: &Table<Self, E>) -> Option<String> {
55        table
56            .id_field()
57            .map(|col| col.name().to_string())
58            .or_else(|| self.id_field.clone())
59    }
60}
61
62impl DataSource for PoolApi {}
63
64impl ExprDataSource<Value> for PoolApi {
65    async fn execute(&self, expr: &Expression<Value>) -> vantage_core::Result<Value> {
66        if expr.parameters.is_empty() {
67            Ok(Value::String(expr.template.clone()))
68        } else {
69            Ok(Value::Null)
70        }
71    }
72
73    fn defer(&self, expr: Expression<Value>) -> DeferredFn<Value> {
74        let api = self.clone();
75        DeferredFn::new(move || {
76            let api = api.clone();
77            let expr = expr.clone();
78            Box::pin(async move {
79                let result = api.execute(&expr).await?;
80                Ok(ExpressiveEnum::Scalar(result))
81            })
82        })
83    }
84}
85
86#[async_trait]
87impl TableSource for PoolApi {
88    type Column<Type>
89        = Column<Type>
90    where
91        Type: ColumnType;
92    type AnyType = Value;
93    type Value = Value;
94    type Id = String;
95    type Condition = vantage_expressions::Expression<Self::Value>;
96
97    fn create_column<Type: ColumnType>(&self, name: &str) -> Self::Column<Type> {
98        Column::new(name)
99    }
100
101    fn to_any_column<Type: ColumnType>(
102        &self,
103        column: Self::Column<Type>,
104    ) -> Self::Column<Self::AnyType> {
105        Column::from_column(column)
106    }
107
108    fn convert_any_column<Type: ColumnType>(
109        &self,
110        any_column: Self::Column<Self::AnyType>,
111    ) -> Option<Self::Column<Type>> {
112        Some(Column::from_column(any_column))
113    }
114
115    fn expr(
116        &self,
117        template: impl Into<String>,
118        parameters: Vec<ExpressiveEnum<Self::Value>>,
119    ) -> Expression<Self::Value> {
120        Expression::new(template, parameters)
121    }
122
123    fn search_table_condition<E>(
124        &self,
125        _table: &Table<Self, E>,
126        search_value: &str,
127    ) -> Expression<Self::Value>
128    where
129        E: Entity<Self::Value>,
130    {
131        Expression::new(format!("SEARCH '{}'", search_value), vec![])
132    }
133
134    /// Fetch all records by streaming through all pages.
135    async fn list_table_values<E>(
136        &self,
137        table: &Table<Self, E>,
138    ) -> Result<IndexMap<Self::Id, Record<Self::Value>>>
139    where
140        E: Entity<Self::Value>,
141        Self: Sized,
142    {
143        let id_field = self.id_field_for(table);
144        let endpoint = format!("/{}", table.table_name());
145        let mut stream = PaginatedStream::get(self.pool.clone(), endpoint).prefetch(3);
146
147        let mut records = IndexMap::new();
148        while let Some(item) = stream.next().await {
149            let item = item.map_err(|e| error!("Stream error", detail = e))?;
150            let obj = item
151                .as_object()
152                .ok_or_else(|| error!("API data item is not an object"))?;
153
154            let id = id_field
155                .as_deref()
156                .and_then(|field| obj.get(field))
157                .and_then(|v| match v {
158                    Value::String(s) => Some(s.clone()),
159                    Value::Number(n) => Some(n.to_string()),
160                    _ => None,
161                })
162                .unwrap_or_else(|| records.len().to_string());
163
164            let record: Record<Value> = obj.iter().map(|(k, v)| (k.clone(), v.clone())).collect();
165            records.insert(id, record);
166        }
167
168        Ok(records)
169    }
170
171    async fn get_table_value<E>(
172        &self,
173        table: &Table<Self, E>,
174        id: &Self::Id,
175    ) -> Result<Option<Record<Self::Value>>>
176    where
177        E: Entity<Self::Value>,
178        Self: Sized,
179    {
180        // Fetch all and find by id — could be optimized with a direct endpoint later
181        let records = self.list_table_values(table).await?;
182        Ok(records.get(id).cloned())
183    }
184
185    async fn get_table_some_value<E>(
186        &self,
187        table: &Table<Self, E>,
188    ) -> Result<Option<(Self::Id, Record<Self::Value>)>>
189    where
190        E: Entity<Self::Value>,
191        Self: Sized,
192    {
193        let records = self.list_table_values(table).await?;
194        Ok(records.into_iter().next())
195    }
196
197    async fn get_table_count<E>(&self, table: &Table<Self, E>) -> Result<i64>
198    where
199        E: Entity<Self::Value>,
200        Self: Sized,
201    {
202        let records = self.list_table_values(table).await?;
203        Ok(records.len() as i64)
204    }
205
206    async fn get_table_sum<E>(
207        &self,
208        _table: &Table<Self, E>,
209        _column: &Self::Column<Self::AnyType>,
210    ) -> Result<Self::Value>
211    where
212        E: Entity<Self::Value>,
213        Self: Sized,
214    {
215        Err(error!("Sum not implemented for API pool backend"))
216    }
217
218    async fn get_table_max<E>(
219        &self,
220        _table: &Table<Self, E>,
221        _column: &Self::Column<Self::AnyType>,
222    ) -> Result<Self::Value>
223    where
224        E: Entity<Self::Value>,
225        Self: Sized,
226    {
227        Err(error!("Max not implemented for API pool backend"))
228    }
229
230    async fn get_table_min<E>(
231        &self,
232        _table: &Table<Self, E>,
233        _column: &Self::Column<Self::AnyType>,
234    ) -> Result<Self::Value>
235    where
236        E: Entity<Self::Value>,
237        Self: Sized,
238    {
239        Err(error!("Min not implemented for API pool backend"))
240    }
241
242    /// Stream records page by page using PaginatedStream with prefetch.
243    fn stream_table_values<'a, E>(
244        &'a self,
245        table: &Table<Self, E>,
246    ) -> Pin<Box<dyn Stream<Item = Result<(Self::Id, Record<Self::Value>)>> + Send + 'a>>
247    where
248        E: Entity<Self::Value> + 'a,
249        Self: Sized,
250    {
251        let id_field = self.id_field_for(table);
252        let endpoint = format!("/{}", table.table_name());
253        let pool = self.pool.clone();
254
255        Box::pin(async_stream::stream! {
256            let mut stream = PaginatedStream::get(pool, endpoint).prefetch(3);
257            let mut row_idx = 0usize;
258
259            while let Some(item) = stream.next().await {
260                let item = match item {
261                    Ok(v) => v,
262                    Err(e) => {
263                        yield Err(error!("Stream error", detail = e));
264                        return;
265                    }
266                };
267
268                let result = match item.as_object() {
269                    Some(obj) => {
270                        let id = id_field
271                            .as_deref()
272                            .and_then(|field| obj.get(field))
273                            .and_then(|v| match v {
274                                Value::String(s) => Some(s.clone()),
275                                Value::Number(n) => Some(n.to_string()),
276                                _ => None,
277                            })
278                            .unwrap_or_else(|| row_idx.to_string());
279
280                        let record: Record<Value> =
281                            obj.iter().map(|(k, v)| (k.clone(), v.clone())).collect();
282                        row_idx += 1;
283                        Ok((id, record))
284                    }
285                    None => Err(error!("API data item is not an object")),
286                };
287
288                yield result;
289            }
290        })
291    }
292
293    // Write operations — not supported
294    async fn insert_table_value<E>(
295        &self,
296        _table: &Table<Self, E>,
297        _id: &Self::Id,
298        _record: &Record<Self::Value>,
299    ) -> Result<Record<Self::Value>>
300    where
301        E: Entity<Self::Value>,
302        Self: Sized,
303    {
304        Err(error!("REST API pool is a read-only data source"))
305    }
306
307    async fn replace_table_value<E>(
308        &self,
309        _table: &Table<Self, E>,
310        _id: &Self::Id,
311        _record: &Record<Self::Value>,
312    ) -> Result<Record<Self::Value>>
313    where
314        E: Entity<Self::Value>,
315        Self: Sized,
316    {
317        Err(error!("REST API pool is a read-only data source"))
318    }
319
320    async fn patch_table_value<E>(
321        &self,
322        _table: &Table<Self, E>,
323        _id: &Self::Id,
324        _partial: &Record<Self::Value>,
325    ) -> Result<Record<Self::Value>>
326    where
327        E: Entity<Self::Value>,
328        Self: Sized,
329    {
330        Err(error!("REST API pool is a read-only data source"))
331    }
332
333    async fn delete_table_value<E>(&self, _table: &Table<Self, E>, _id: &Self::Id) -> Result<()>
334    where
335        E: Entity<Self::Value>,
336        Self: Sized,
337    {
338        Err(error!("REST API pool is a read-only data source"))
339    }
340
341    async fn delete_table_all_values<E>(&self, _table: &Table<Self, E>) -> Result<()>
342    where
343        E: Entity<Self::Value>,
344        Self: Sized,
345    {
346        Err(error!("REST API pool is a read-only data source"))
347    }
348
349    async fn insert_table_return_id_value<E>(
350        &self,
351        _table: &Table<Self, E>,
352        _record: &Record<Self::Value>,
353    ) -> Result<Self::Id>
354    where
355        E: Entity<Self::Value>,
356        Self: Sized,
357    {
358        Err(error!("REST API pool is a read-only data source"))
359    }
360
361    fn related_in_condition<SourceE: Entity<Self::Value> + 'static>(
362        &self,
363        _target_field: &str,
364        _source_table: &Table<Self, SourceE>,
365        _source_column: &str,
366    ) -> Self::Condition
367    where
368        Self: Sized,
369    {
370        unimplemented!("related_in_condition not yet supported for API pool")
371    }
372
373    fn column_table_values_expr<'a, E, Type: ColumnType>(
374        &'a self,
375        _table: &Table<Self, E>,
376        _column: &Self::Column<Type>,
377    ) -> AssociatedExpression<'a, Self, Self::Value, Vec<Type>>
378    where
379        E: Entity<Self::Value> + 'static,
380        Self: Sized,
381    {
382        unimplemented!("column_table_values_expr not yet supported for API pool")
383    }
384}