faucet_transform_sql/
runtime.rs1use crate::compile::{Reloadable, build_connection, sql_escape, validate_query};
4use crate::config::SqlTransformConfig;
5use crate::shovel::{infer_schema, json_to_record_batch, record_batches_to_json, schema_eq};
6use arrow::array::RecordBatch;
7use arrow::datatypes::SchemaRef;
8use duckdb::Connection;
9use duckdb::vtab::arrow::arrow_recordbatch_to_query_params;
10use faucet_core::FaucetError;
11use faucet_core::stage::TransformStage;
12use serde_json::Value;
13use std::sync::{Arc, Mutex};
14
15struct State {
16 conn: Connection,
17 query: String,
18 reloadables: Vec<Reloadable>,
19 cached_schema: Option<SchemaRef>,
20 pages_seen: u64,
21 aggregates: Option<bool>,
22 warned: bool,
23}
24
25pub struct SqlTransform {
27 state: Arc<Mutex<State>>,
28}
29
30impl std::fmt::Debug for SqlTransform {
31 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
32 let mut d = f.debug_struct("SqlTransform");
33 match self.state.lock() {
34 Ok(st) => d.field("query", &st.query),
35 Err(e) => d.field("query", &e.into_inner().query),
36 };
37 d.finish_non_exhaustive()
38 }
39}
40
41impl SqlTransform {
42 pub fn compile(cfg: &SqlTransformConfig) -> Result<Self, FaucetError> {
44 let (conn, reloadables) = build_connection(cfg)?;
45 validate_query(&conn, &cfg.query)?;
46 Ok(Self {
47 state: Arc::new(Mutex::new(State {
48 conn,
49 query: cfg.query.clone(),
50 reloadables,
51 cached_schema: None,
52 pages_seen: 0,
53 aggregates: None,
54 warned: false,
55 })),
56 })
57 }
58
59 pub fn into_page_stage(self) -> TransformStage {
61 let state = self.state;
62 TransformStage::PageFn(Arc::new(move |records: Vec<Value>| {
63 let mut st = state.lock().unwrap_or_else(|e| e.into_inner());
64 execute_page(&mut st, records)
65 }))
66 }
67}
68
69fn execute_page(st: &mut State, records: Vec<Value>) -> Result<Vec<Value>, FaucetError> {
70 if records.is_empty() {
71 return Ok(Vec::new());
72 }
73 reload_relations(st)?;
74
75 let fresh = infer_schema(&records)?;
78 let schema = match &st.cached_schema {
79 Some(s) if schema_eq(s, &fresh) => s.clone(),
80 _ => {
81 st.cached_schema = Some(fresh.clone());
82 fresh
83 }
84 };
85 let batch = json_to_record_batch(&records, schema)?;
86 let params = arrow_recordbatch_to_query_params(batch);
87 st.conn
88 .execute(
89 "CREATE OR REPLACE TEMP TABLE batch AS SELECT * FROM arrow(?, ?)",
90 params,
91 )
92 .map_err(|e| FaucetError::Transform(format!("sql transform: register batch: {e}")))?;
93
94 if st.aggregates.is_none() {
96 st.aggregates = Some(plan_has_aggregate(&st.conn, &st.query));
97 }
98 st.pages_seen += 1;
99 if st.pages_seen >= 2 && st.aggregates == Some(true) && !st.warned {
100 st.warned = true;
101 tracing::warn!(
102 target: "faucet::transform::sql",
103 "sql transform with aggregation received multiple pages; aggregation is \
104 per-page — set batch_size: 0 for global aggregation"
105 );
106 }
107
108 let out = {
109 let mut stmt = st
110 .conn
111 .prepare(&st.query)
112 .map_err(|e| FaucetError::Transform(format!("sql transform: prepare: {e}")))?;
113 let batches: Vec<RecordBatch> = stmt
114 .query_arrow([])
115 .map_err(|e| FaucetError::Transform(format!("sql transform: execute: {e}")))?
116 .collect();
117 record_batches_to_json(&batches)?
118 };
119 Ok(out)
120}
121
122fn reload_relations(st: &mut State) -> Result<(), FaucetError> {
123 for r in st.reloadables.iter_mut() {
124 let cur = std::fs::metadata(&r.path).and_then(|m| m.modified()).ok();
125 if cur != r.last_mtime {
126 let stmt = if r.is_csv {
127 format!(
128 "CREATE OR REPLACE TABLE \"{}\" AS SELECT * FROM read_csv_auto('{}', header={});",
129 r.name,
130 sql_escape(&r.path),
131 r.has_header
132 )
133 } else {
134 format!(
135 "CREATE OR REPLACE TABLE \"{}\" AS SELECT * FROM read_json_auto('{}', format='newline_delimited');",
136 r.name,
137 sql_escape(&r.path)
138 )
139 };
140 st.conn.execute_batch(&stmt).map_err(|e| {
141 FaucetError::Transform(format!("sql transform: reload '{}': {e}", r.name))
142 })?;
143 r.last_mtime = cur;
144 }
145 }
146 Ok(())
147}
148
149fn plan_has_aggregate(conn: &Connection, query: &str) -> bool {
150 let explain = format!("EXPLAIN {query}");
151 let mut found = false;
152 if let Ok(mut stmt) = conn.prepare(&explain)
153 && let Ok(rows) = stmt.query_map([], |row| row.get::<_, String>(1))
154 {
155 for r in rows.flatten() {
156 let u = r.to_uppercase();
157 if u.contains("AGGREGATE") || u.contains("WINDOW") {
158 found = true;
159 break;
160 }
161 }
162 }
163 found
164}