Skip to main content

vantage_log_writer/
table_source.rs

1use async_trait::async_trait;
2use indexmap::IndexMap;
3use serde_json::Value;
4use vantage_core::{Result, error};
5use vantage_expressions::Expression;
6use vantage_expressions::traits::associated_expressions::AssociatedExpression;
7use vantage_expressions::traits::datasource::{DataSource, ExprDataSource};
8use vantage_expressions::traits::expressive::{DeferredFn, ExpressiveEnum};
9use vantage_table::column::core::{Column, ColumnType};
10use vantage_table::table::Table;
11use vantage_table::traits::table_source::TableSource;
12use vantage_types::{Entity, Record};
13
14use crate::log_writer::LogWriter;
15use crate::type_system::AnyJsonType;
16use crate::writer_task::WriteOp;
17
18impl DataSource for LogWriter {}
19
20impl ExprDataSource<Value> for LogWriter {
21    async fn execute(&self, _expr: &Expression<Value>) -> Result<Value> {
22        Err(unsupported("execute"))
23    }
24
25    fn defer(&self, _expr: Expression<Value>) -> DeferredFn<Value>
26    where
27        Value: Clone + Send + Sync + 'static,
28    {
29        DeferredFn::new(move || Box::pin(async move { Err(unsupported("defer")) }))
30    }
31}
32
33fn unsupported(method: &'static str) -> vantage_core::VantageError {
34    error!("log-writer is insert-only", method = method).is_unsupported()
35}
36
37#[async_trait]
38impl TableSource for LogWriter {
39    type Column<Type>
40        = Column<Type>
41    where
42        Type: ColumnType;
43    type AnyType = AnyJsonType;
44    type Value = Value;
45    type Id = String;
46    type Condition = Expression<Self::Value>;
47    type Source = String;
48
49    fn create_column<Type: ColumnType>(&self, name: &str) -> Self::Column<Type> {
50        Column::new(name)
51    }
52
53    fn to_any_column<Type: ColumnType>(
54        &self,
55        column: Self::Column<Type>,
56    ) -> Self::Column<Self::AnyType> {
57        Column::from_column(column)
58    }
59
60    fn convert_any_column<Type: ColumnType>(
61        &self,
62        any_column: Self::Column<Self::AnyType>,
63    ) -> Option<Self::Column<Type>> {
64        Some(Column::from_column(any_column))
65    }
66
67    fn expr(
68        &self,
69        template: impl Into<String>,
70        parameters: Vec<ExpressiveEnum<Self::Value>>,
71    ) -> Expression<Self::Value> {
72        Expression::new(template, parameters)
73    }
74
75    fn search_table_condition<E>(
76        &self,
77        _table: &Table<Self, E>,
78        _search_value: &str,
79    ) -> Self::Condition
80    where
81        E: Entity<Self::Value>,
82    {
83        Expression::new("", vec![])
84    }
85
86    async fn list_table_values<E>(
87        &self,
88        _table: &Table<Self, E>,
89    ) -> Result<IndexMap<Self::Id, Record<Self::Value>>>
90    where
91        E: Entity<Self::Value>,
92        Self: Sized,
93    {
94        Err(unsupported("list_table_values"))
95    }
96
97    async fn get_table_value<E>(
98        &self,
99        _table: &Table<Self, E>,
100        _id: &Self::Id,
101    ) -> Result<Option<Record<Self::Value>>>
102    where
103        E: Entity<Self::Value>,
104        Self: Sized,
105    {
106        Err(unsupported("get_table_value"))
107    }
108
109    async fn get_table_some_value<E>(
110        &self,
111        _table: &Table<Self, E>,
112    ) -> Result<Option<(Self::Id, Record<Self::Value>)>>
113    where
114        E: Entity<Self::Value>,
115        Self: Sized,
116    {
117        Err(unsupported("get_table_some_value"))
118    }
119
120    async fn get_table_count<E>(&self, _table: &Table<Self, E>) -> Result<i64>
121    where
122        E: Entity<Self::Value>,
123        Self: Sized,
124    {
125        Err(unsupported("get_table_count"))
126    }
127
128    async fn get_table_sum<E>(
129        &self,
130        _table: &Table<Self, E>,
131        _column: &Self::Column<Self::AnyType>,
132    ) -> Result<Self::Value>
133    where
134        E: Entity<Self::Value>,
135        Self: Sized,
136    {
137        Err(unsupported("get_table_sum"))
138    }
139
140    async fn get_table_max<E>(
141        &self,
142        _table: &Table<Self, E>,
143        _column: &Self::Column<Self::AnyType>,
144    ) -> Result<Self::Value>
145    where
146        E: Entity<Self::Value>,
147        Self: Sized,
148    {
149        Err(unsupported("get_table_max"))
150    }
151
152    async fn get_table_min<E>(
153        &self,
154        _table: &Table<Self, E>,
155        _column: &Self::Column<Self::AnyType>,
156    ) -> Result<Self::Value>
157    where
158        E: Entity<Self::Value>,
159        Self: Sized,
160    {
161        Err(unsupported("get_table_min"))
162    }
163
164    async fn insert_table_value<E>(
165        &self,
166        table: &Table<Self, E>,
167        id: &Self::Id,
168        record: &Record<Self::Value>,
169    ) -> Result<Record<Self::Value>>
170    where
171        E: Entity<Self::Value>,
172        Self: Sized,
173    {
174        let projected = project_record(table.columns().keys(), record, self.id_column(), id);
175        let line = serialize_line(&projected)?;
176        let path = self.file_path(table.table_name());
177        self.sender()
178            .send(WriteOp::Append { path, line })
179            .await
180            .map_err(|e| error!("log writer channel closed", detail = e.to_string()))?;
181        Ok(projected)
182    }
183
184    async fn replace_table_value<E>(
185        &self,
186        _table: &Table<Self, E>,
187        _id: &Self::Id,
188        _record: &Record<Self::Value>,
189    ) -> Result<Record<Self::Value>>
190    where
191        E: Entity<Self::Value>,
192        Self: Sized,
193    {
194        Err(unsupported("replace_table_value"))
195    }
196
197    async fn patch_table_value<E>(
198        &self,
199        _table: &Table<Self, E>,
200        _id: &Self::Id,
201        _partial: &Record<Self::Value>,
202    ) -> Result<Record<Self::Value>>
203    where
204        E: Entity<Self::Value>,
205        Self: Sized,
206    {
207        Err(unsupported("patch_table_value"))
208    }
209
210    async fn delete_table_value<E>(&self, _table: &Table<Self, E>, _id: &Self::Id) -> Result<()>
211    where
212        E: Entity<Self::Value>,
213        Self: Sized,
214    {
215        Err(unsupported("delete_table_value"))
216    }
217
218    async fn delete_table_all_values<E>(&self, _table: &Table<Self, E>) -> Result<()>
219    where
220        E: Entity<Self::Value>,
221        Self: Sized,
222    {
223        Err(unsupported("delete_table_all_values"))
224    }
225
226    async fn insert_table_return_id_value<E>(
227        &self,
228        table: &Table<Self, E>,
229        record: &Record<Self::Value>,
230    ) -> Result<Self::Id>
231    where
232        E: Entity<Self::Value>,
233        Self: Sized,
234    {
235        let id = extract_or_generate_id(record, self.id_column());
236        let projected = project_record(table.columns().keys(), record, self.id_column(), &id);
237        let line = serialize_line(&projected)?;
238        let path = self.file_path(table.table_name());
239        self.sender()
240            .send(WriteOp::Append { path, line })
241            .await
242            .map_err(|e| error!("log writer channel closed", detail = e.to_string()))?;
243        Ok(id)
244    }
245
246    fn related_in_condition<SourceE: Entity<Self::Value> + 'static>(
247        &self,
248        _target_field: &str,
249        _source_table: &Table<Self, SourceE>,
250        _source_column: &str,
251    ) -> Self::Condition
252    where
253        Self: Sized,
254    {
255        Expression::new("", vec![])
256    }
257
258    fn column_table_values_expr<'a, E, Type: ColumnType>(
259        &'a self,
260        _table: &Table<Self, E>,
261        _column: &Self::Column<Type>,
262    ) -> AssociatedExpression<'a, Self, Self::Value, Vec<Type>>
263    where
264        E: Entity<Self::Value> + 'static,
265        Self: Sized,
266    {
267        unimplemented!("log-writer is insert-only; column_table_values_expr is unreachable")
268    }
269}
270
271/// Project a record onto the table's declared column set, then attach the id.
272///
273/// Entity fields not declared as columns are dropped — this is the contract
274/// the user pinned down: "entity values with non-existent columns would be
275/// dropped".
276fn project_record<'a, I>(
277    column_names: I,
278    record: &Record<Value>,
279    id_column: &str,
280    id: &str,
281) -> Record<Value>
282where
283    I: IntoIterator<Item = &'a String>,
284{
285    let mut out = Record::new();
286    let mut wrote_id = false;
287    for col in column_names {
288        if col == id_column {
289            out.insert(col.clone(), Value::String(id.to_string()));
290            wrote_id = true;
291        } else if let Some(v) = record.get(col) {
292            out.insert(col.clone(), v.clone());
293        }
294    }
295    if !wrote_id {
296        out.insert(id_column.to_string(), Value::String(id.to_string()));
297    }
298    out
299}
300
301fn serialize_line(record: &Record<Value>) -> Result<String> {
302    let map: serde_json::Map<String, Value> = record
303        .as_inner()
304        .iter()
305        .map(|(k, v)| (k.clone(), v.clone()))
306        .collect();
307    let mut s = serde_json::to_string(&Value::Object(map))
308        .map_err(|e| error!("failed to serialize record to JSON", detail = e.to_string()))?;
309    s.push('\n');
310    Ok(s)
311}
312
313fn extract_or_generate_id(record: &Record<Value>, id_column: &str) -> String {
314    if let Some(v) = record.get(id_column) {
315        match v {
316            Value::String(s) if !s.is_empty() => return s.clone(),
317            Value::Number(n) => return n.to_string(),
318            _ => {}
319        }
320    }
321    ulid::Ulid::new().to_string()
322}