1use async_trait::async_trait;
2use indexmap::IndexMap;
3use serde_json::Value;
4use vantage_core::{Result, error};
5use vantage_expressions::Expression;
6use vantage_expressions::traits::associated_expressions::AssociatedExpression;
7use vantage_expressions::traits::datasource::{DataSource, ExprDataSource};
8use vantage_expressions::traits::expressive::{DeferredFn, ExpressiveEnum};
9use vantage_table::column::core::{Column, ColumnType};
10use vantage_table::table::Table;
11use vantage_table::traits::table_source::TableSource;
12use vantage_types::{Entity, Record};
13
14use crate::log_writer::LogWriter;
15use crate::type_system::AnyJsonType;
16use crate::writer_task::WriteOp;
17
18impl DataSource for LogWriter {}
19
20impl ExprDataSource<Value> for LogWriter {
21 async fn execute(&self, _expr: &Expression<Value>) -> Result<Value> {
22 Err(unsupported("execute"))
23 }
24
25 fn defer(&self, _expr: Expression<Value>) -> DeferredFn<Value>
26 where
27 Value: Clone + Send + Sync + 'static,
28 {
29 DeferredFn::new(move || Box::pin(async move { Err(unsupported("defer")) }))
30 }
31}
32
33fn unsupported(method: &'static str) -> vantage_core::VantageError {
34 error!("log-writer is insert-only", method = method).is_unsupported()
35}
36
37#[async_trait]
38impl TableSource for LogWriter {
39 type Column<Type>
40 = Column<Type>
41 where
42 Type: ColumnType;
43 type AnyType = AnyJsonType;
44 type Value = Value;
45 type Id = String;
46 type Condition = Expression<Self::Value>;
47 type Source = String;
48
49 fn create_column<Type: ColumnType>(&self, name: &str) -> Self::Column<Type> {
50 Column::new(name)
51 }
52
53 fn to_any_column<Type: ColumnType>(
54 &self,
55 column: Self::Column<Type>,
56 ) -> Self::Column<Self::AnyType> {
57 Column::from_column(column)
58 }
59
60 fn convert_any_column<Type: ColumnType>(
61 &self,
62 any_column: Self::Column<Self::AnyType>,
63 ) -> Option<Self::Column<Type>> {
64 Some(Column::from_column(any_column))
65 }
66
67 fn expr(
68 &self,
69 template: impl Into<String>,
70 parameters: Vec<ExpressiveEnum<Self::Value>>,
71 ) -> Expression<Self::Value> {
72 Expression::new(template, parameters)
73 }
74
75 fn search_table_condition<E>(
76 &self,
77 _table: &Table<Self, E>,
78 _search_value: &str,
79 ) -> Self::Condition
80 where
81 E: Entity<Self::Value>,
82 {
83 Expression::new("", vec![])
84 }
85
86 async fn list_table_values<E>(
87 &self,
88 _table: &Table<Self, E>,
89 ) -> Result<IndexMap<Self::Id, Record<Self::Value>>>
90 where
91 E: Entity<Self::Value>,
92 Self: Sized,
93 {
94 Err(unsupported("list_table_values"))
95 }
96
97 async fn get_table_value<E>(
98 &self,
99 _table: &Table<Self, E>,
100 _id: &Self::Id,
101 ) -> Result<Option<Record<Self::Value>>>
102 where
103 E: Entity<Self::Value>,
104 Self: Sized,
105 {
106 Err(unsupported("get_table_value"))
107 }
108
109 async fn get_table_some_value<E>(
110 &self,
111 _table: &Table<Self, E>,
112 ) -> Result<Option<(Self::Id, Record<Self::Value>)>>
113 where
114 E: Entity<Self::Value>,
115 Self: Sized,
116 {
117 Err(unsupported("get_table_some_value"))
118 }
119
120 async fn get_table_count<E>(&self, _table: &Table<Self, E>) -> Result<i64>
121 where
122 E: Entity<Self::Value>,
123 Self: Sized,
124 {
125 Err(unsupported("get_table_count"))
126 }
127
128 async fn get_table_sum<E>(
129 &self,
130 _table: &Table<Self, E>,
131 _column: &Self::Column<Self::AnyType>,
132 ) -> Result<Self::Value>
133 where
134 E: Entity<Self::Value>,
135 Self: Sized,
136 {
137 Err(unsupported("get_table_sum"))
138 }
139
140 async fn get_table_max<E>(
141 &self,
142 _table: &Table<Self, E>,
143 _column: &Self::Column<Self::AnyType>,
144 ) -> Result<Self::Value>
145 where
146 E: Entity<Self::Value>,
147 Self: Sized,
148 {
149 Err(unsupported("get_table_max"))
150 }
151
152 async fn get_table_min<E>(
153 &self,
154 _table: &Table<Self, E>,
155 _column: &Self::Column<Self::AnyType>,
156 ) -> Result<Self::Value>
157 where
158 E: Entity<Self::Value>,
159 Self: Sized,
160 {
161 Err(unsupported("get_table_min"))
162 }
163
164 async fn insert_table_value<E>(
165 &self,
166 table: &Table<Self, E>,
167 id: &Self::Id,
168 record: &Record<Self::Value>,
169 ) -> Result<Record<Self::Value>>
170 where
171 E: Entity<Self::Value>,
172 Self: Sized,
173 {
174 let projected = project_record(table.columns().keys(), record, self.id_column(), id);
175 let line = serialize_line(&projected)?;
176 let path = self.file_path(table.table_name());
177 self.sender()
178 .send(WriteOp::Append { path, line })
179 .await
180 .map_err(|e| error!("log writer channel closed", detail = e.to_string()))?;
181 Ok(projected)
182 }
183
184 async fn replace_table_value<E>(
185 &self,
186 _table: &Table<Self, E>,
187 _id: &Self::Id,
188 _record: &Record<Self::Value>,
189 ) -> Result<Record<Self::Value>>
190 where
191 E: Entity<Self::Value>,
192 Self: Sized,
193 {
194 Err(unsupported("replace_table_value"))
195 }
196
197 async fn patch_table_value<E>(
198 &self,
199 _table: &Table<Self, E>,
200 _id: &Self::Id,
201 _partial: &Record<Self::Value>,
202 ) -> Result<Record<Self::Value>>
203 where
204 E: Entity<Self::Value>,
205 Self: Sized,
206 {
207 Err(unsupported("patch_table_value"))
208 }
209
210 async fn delete_table_value<E>(&self, _table: &Table<Self, E>, _id: &Self::Id) -> Result<()>
211 where
212 E: Entity<Self::Value>,
213 Self: Sized,
214 {
215 Err(unsupported("delete_table_value"))
216 }
217
218 async fn delete_table_all_values<E>(&self, _table: &Table<Self, E>) -> Result<()>
219 where
220 E: Entity<Self::Value>,
221 Self: Sized,
222 {
223 Err(unsupported("delete_table_all_values"))
224 }
225
226 async fn insert_table_return_id_value<E>(
227 &self,
228 table: &Table<Self, E>,
229 record: &Record<Self::Value>,
230 ) -> Result<Self::Id>
231 where
232 E: Entity<Self::Value>,
233 Self: Sized,
234 {
235 let id = extract_or_generate_id(record, self.id_column());
236 let projected = project_record(table.columns().keys(), record, self.id_column(), &id);
237 let line = serialize_line(&projected)?;
238 let path = self.file_path(table.table_name());
239 self.sender()
240 .send(WriteOp::Append { path, line })
241 .await
242 .map_err(|e| error!("log writer channel closed", detail = e.to_string()))?;
243 Ok(id)
244 }
245
246 fn related_in_condition<SourceE: Entity<Self::Value> + 'static>(
247 &self,
248 _target_field: &str,
249 _source_table: &Table<Self, SourceE>,
250 _source_column: &str,
251 ) -> Self::Condition
252 where
253 Self: Sized,
254 {
255 Expression::new("", vec![])
256 }
257
258 fn column_table_values_expr<'a, E, Type: ColumnType>(
259 &'a self,
260 _table: &Table<Self, E>,
261 _column: &Self::Column<Type>,
262 ) -> AssociatedExpression<'a, Self, Self::Value, Vec<Type>>
263 where
264 E: Entity<Self::Value> + 'static,
265 Self: Sized,
266 {
267 unimplemented!("log-writer is insert-only; column_table_values_expr is unreachable")
268 }
269}
270
271fn project_record<'a, I>(
277 column_names: I,
278 record: &Record<Value>,
279 id_column: &str,
280 id: &str,
281) -> Record<Value>
282where
283 I: IntoIterator<Item = &'a String>,
284{
285 let mut out = Record::new();
286 let mut wrote_id = false;
287 for col in column_names {
288 if col == id_column {
289 out.insert(col.clone(), Value::String(id.to_string()));
290 wrote_id = true;
291 } else if let Some(v) = record.get(col) {
292 out.insert(col.clone(), v.clone());
293 }
294 }
295 if !wrote_id {
296 out.insert(id_column.to_string(), Value::String(id.to_string()));
297 }
298 out
299}
300
301fn serialize_line(record: &Record<Value>) -> Result<String> {
302 let map: serde_json::Map<String, Value> = record
303 .as_inner()
304 .iter()
305 .map(|(k, v)| (k.clone(), v.clone()))
306 .collect();
307 let mut s = serde_json::to_string(&Value::Object(map))
308 .map_err(|e| error!("failed to serialize record to JSON", detail = e.to_string()))?;
309 s.push('\n');
310 Ok(s)
311}
312
313fn extract_or_generate_id(record: &Record<Value>, id_column: &str) -> String {
314 if let Some(v) = record.get(id_column) {
315 match v {
316 Value::String(s) if !s.is_empty() => return s.clone(),
317 Value::Number(n) => return n.to_string(),
318 _ => {}
319 }
320 }
321 ulid::Ulid::new().to_string()
322}