use anyhow::{anyhow, Context, Result};
use serde_json::json;
use crate::backends::base::BackendDialect;
use crate::backends::clickhouse::ClickhouseBackend;
use crate::config::ClickhouseTableSourceConfig;
use crate::sources::base::Document;
pub struct ClickhouseTableSource {
cfg: ClickhouseTableSourceConfig,
backend: ClickhouseBackend,
}
impl ClickhouseTableSource {
pub fn new(cfg: ClickhouseTableSourceConfig) -> Self {
let backend = ClickhouseBackend::new(cfg.dsn_env.clone());
Self { cfg, backend }
}
pub async fn iter_documents(&self) -> Result<Vec<Document>> {
let mut select_cols: Vec<String> = vec![
self.backend.quote_ident(&self.cfg.id_column),
self.backend.quote_ident(&self.cfg.content_column),
];
if let Some(tc) = &self.cfg.title_column {
select_cols.push(self.backend.quote_ident(tc));
}
for col in &self.cfg.metadata_columns {
let q = self.backend.quote_ident(col);
select_cols.push(format!("toString({q}) AS {q}"));
}
let cols_sql = select_cols.join(", ");
let fq = self
.backend
.fq_table(&self.cfg.database_name, &self.cfg.table);
let mut q = format!("SELECT {cols_sql} FROM {fq}");
if let Some(w) = &self.cfg.where_clause {
q.push_str(&format!(" WHERE {w}"));
}
let client = self.backend.client().await?;
let mut cursor = client
.query(&q)
.fetch_bytes("JSONEachRow")
.with_context(|| format!("running CH source query: {q}"))?;
let bytes = cursor
.collect()
.await
.with_context(|| format!("collecting CH source response: {q}"))?;
let body = std::str::from_utf8(&bytes)
.context("CH JSONEachRow response was not valid UTF-8")?;
let mut out = Vec::new();
for line in body.lines() {
let line = line.trim();
if line.is_empty() {
continue;
}
let row: serde_json::Value = serde_json::from_str(line)
.with_context(|| format!("parsing JSONEachRow line: {line}"))?;
let id = row
.get(&self.cfg.id_column)
.and_then(|v| match v {
serde_json::Value::String(s) => Some(s.clone()),
serde_json::Value::Number(n) => Some(n.to_string()),
serde_json::Value::Bool(b) => Some(b.to_string()),
_ => None,
})
.ok_or_else(|| {
anyhow!(
"id_column {:?} missing or not string/number in row",
self.cfg.id_column
)
})?;
let content = row
.get(&self.cfg.content_column)
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string();
let title = self
.cfg
.title_column
.as_ref()
.and_then(|tc| row.get(tc).and_then(|v| v.as_str()).map(str::to_string));
let mut meta = serde_json::Map::new();
for mc in &self.cfg.metadata_columns {
meta.insert(mc.clone(), row.get(mc).cloned().unwrap_or(json!(null)));
}
out.push(Document {
id,
content,
title,
metadata: serde_json::Value::Object(meta),
});
}
Ok(out)
}
}