chunkshop/sources/
clickhouse_table.rs1use anyhow::{anyhow, Context, Result};
20use serde_json::json;
21
22use crate::backends::base::BackendDialect;
23use crate::backends::clickhouse::ClickhouseBackend;
24use crate::config::ClickhouseTableSourceConfig;
25use crate::sources::base::Document;
26
27pub struct ClickhouseTableSource {
28 cfg: ClickhouseTableSourceConfig,
29 backend: ClickhouseBackend,
30}
31
32impl ClickhouseTableSource {
33 pub fn new(cfg: ClickhouseTableSourceConfig) -> Self {
34 let backend = ClickhouseBackend::new(cfg.dsn_env.clone());
35 Self { cfg, backend }
36 }
37
38 pub async fn iter_documents(&self) -> Result<Vec<Document>> {
39 let mut select_cols: Vec<String> = vec![
42 self.backend.quote_ident(&self.cfg.id_column),
43 self.backend.quote_ident(&self.cfg.content_column),
44 ];
45 if let Some(tc) = &self.cfg.title_column {
46 select_cols.push(self.backend.quote_ident(tc));
47 }
48 for col in &self.cfg.metadata_columns {
49 let q = self.backend.quote_ident(col);
53 select_cols.push(format!("toString({q}) AS {q}"));
54 }
55 let cols_sql = select_cols.join(", ");
56 let fq = self
57 .backend
58 .fq_table(&self.cfg.database_name, &self.cfg.table);
59 let mut q = format!("SELECT {cols_sql} FROM {fq}");
60 if let Some(w) = &self.cfg.where_clause {
61 q.push_str(&format!(" WHERE {w}"));
62 }
63
64 let client = self.backend.client().await?;
65 let mut cursor = client
66 .query(&q)
67 .fetch_bytes("JSONEachRow")
68 .with_context(|| format!("running CH source query: {q}"))?;
69 let bytes = cursor
70 .collect()
71 .await
72 .with_context(|| format!("collecting CH source response: {q}"))?;
73 let body =
74 std::str::from_utf8(&bytes).context("CH JSONEachRow response was not valid UTF-8")?;
75
76 let mut out = Vec::new();
77 for line in body.lines() {
78 let line = line.trim();
79 if line.is_empty() {
80 continue;
81 }
82 let row: serde_json::Value = serde_json::from_str(line)
83 .with_context(|| format!("parsing JSONEachRow line: {line}"))?;
84 let id = row
85 .get(&self.cfg.id_column)
86 .and_then(|v| match v {
87 serde_json::Value::String(s) => Some(s.clone()),
88 serde_json::Value::Number(n) => Some(n.to_string()),
89 serde_json::Value::Bool(b) => Some(b.to_string()),
90 _ => None,
91 })
92 .ok_or_else(|| {
93 anyhow!(
94 "id_column {:?} missing or not string/number in row",
95 self.cfg.id_column
96 )
97 })?;
98 let content = row
99 .get(&self.cfg.content_column)
100 .and_then(|v| v.as_str())
101 .unwrap_or("")
102 .to_string();
103 let title = self
104 .cfg
105 .title_column
106 .as_ref()
107 .and_then(|tc| row.get(tc).and_then(|v| v.as_str()).map(str::to_string));
108 let mut meta = serde_json::Map::new();
109 for mc in &self.cfg.metadata_columns {
110 meta.insert(mc.clone(), row.get(mc).cloned().unwrap_or(json!(null)));
111 }
112 out.push(Document {
113 id,
114 content,
115 title,
116 metadata: serde_json::Value::Object(meta),
117 fingerprint: None,
118 });
119 }
120 Ok(out)
121 }
122}