Skip to main content

chunkshop/sources/
clickhouse_table.rs

1//! ClickHouse table source. Mirrors `python/src/chunkshop/sources/clickhouse_table.py`.
2//!
3//! Streams rows via the official `clickhouse` crate's HTTP transport using
4//! `FORMAT JSONEachRow`. Path A (per the R4 plan): the column count is
5//! config-driven (`metadata_columns` is `Vec<String>`), so the typed `Row`
6//! derive — which expects a fixed shape — can't be used. Instead we pull each
7//! row as a JSON object and project into `Document`.
8//!
9//! Metadata columns are coerced via `toString({col})` in the SELECT so they
10//! come back as JSON strings regardless of underlying CH type. This trades
11//! Python's recursive `_json_safe` (which preserves nested list/tuple/dict
12//! structure) for SQL-side stringification — acceptable for chunkshop's flat
13//! metadata model and avoids a second JSON parse pass per row.
14//!
15//! API confirmed against `clickhouse = "0.15"`: `Query::fetch_bytes(format)`
16//! returns `Result<BytesCursor>` (sync) and `BytesCursor::collect(&mut self)`
17//! is async, so we bind the cursor to a `mut` local before collecting.
18
19use anyhow::{anyhow, Context, Result};
20use serde_json::json;
21
22use crate::backends::base::BackendDialect;
23use crate::backends::clickhouse::ClickhouseBackend;
24use crate::config::ClickhouseTableSourceConfig;
25use crate::sources::base::Document;
26
27pub struct ClickhouseTableSource {
28    cfg: ClickhouseTableSourceConfig,
29    backend: ClickhouseBackend,
30}
31
32impl ClickhouseTableSource {
33    pub fn new(cfg: ClickhouseTableSourceConfig) -> Self {
34        let backend = ClickhouseBackend::new(cfg.dsn_env.clone());
35        Self { cfg, backend }
36    }
37
38    pub async fn iter_documents(&self) -> Result<Vec<Document>> {
39        // Column order matches Python's clickhouse_table.py:
40        //   [id, content, optional title, *metadata_columns...]
41        let mut select_cols: Vec<String> = vec![
42            self.backend.quote_ident(&self.cfg.id_column),
43            self.backend.quote_ident(&self.cfg.content_column),
44        ];
45        if let Some(tc) = &self.cfg.title_column {
46            select_cols.push(self.backend.quote_ident(tc));
47        }
48        for col in &self.cfg.metadata_columns {
49            // Coerce metadata columns to String for stable JSON transport.
50            // CH's toString() works for scalars; nested types stringify as
51            // their CH text representation. See module docstring for trade-off.
52            let q = self.backend.quote_ident(col);
53            select_cols.push(format!("toString({q}) AS {q}"));
54        }
55        let cols_sql = select_cols.join(", ");
56        let fq = self
57            .backend
58            .fq_table(&self.cfg.database_name, &self.cfg.table);
59        let mut q = format!("SELECT {cols_sql} FROM {fq}");
60        if let Some(w) = &self.cfg.where_clause {
61            q.push_str(&format!(" WHERE {w}"));
62        }
63
64        let client = self.backend.client().await?;
65        let mut cursor = client
66            .query(&q)
67            .fetch_bytes("JSONEachRow")
68            .with_context(|| format!("running CH source query: {q}"))?;
69        let bytes = cursor
70            .collect()
71            .await
72            .with_context(|| format!("collecting CH source response: {q}"))?;
73        let body =
74            std::str::from_utf8(&bytes).context("CH JSONEachRow response was not valid UTF-8")?;
75
76        let mut out = Vec::new();
77        for line in body.lines() {
78            let line = line.trim();
79            if line.is_empty() {
80                continue;
81            }
82            let row: serde_json::Value = serde_json::from_str(line)
83                .with_context(|| format!("parsing JSONEachRow line: {line}"))?;
84            let id = row
85                .get(&self.cfg.id_column)
86                .and_then(|v| match v {
87                    serde_json::Value::String(s) => Some(s.clone()),
88                    serde_json::Value::Number(n) => Some(n.to_string()),
89                    serde_json::Value::Bool(b) => Some(b.to_string()),
90                    _ => None,
91                })
92                .ok_or_else(|| {
93                    anyhow!(
94                        "id_column {:?} missing or not string/number in row",
95                        self.cfg.id_column
96                    )
97                })?;
98            let content = row
99                .get(&self.cfg.content_column)
100                .and_then(|v| v.as_str())
101                .unwrap_or("")
102                .to_string();
103            let title = self
104                .cfg
105                .title_column
106                .as_ref()
107                .and_then(|tc| row.get(tc).and_then(|v| v.as_str()).map(str::to_string));
108            let mut meta = serde_json::Map::new();
109            for mc in &self.cfg.metadata_columns {
110                meta.insert(mc.clone(), row.get(mc).cloned().unwrap_or(json!(null)));
111            }
112            out.push(Document {
113                id,
114                content,
115                title,
116                metadata: serde_json::Value::Object(meta),
117                fingerprint: None,
118            });
119        }
120        Ok(out)
121    }
122}