Skip to main content

chunkshop/sources/
sqlite_table.rs

1//! SQLite source. Mirrors `python/src/chunkshop/sources/sqlite_table.py`.
2
3use anyhow::{Context, Result};
4use serde_json::json;
5
6use crate::backends::base::BackendDialect;
7use crate::backends::sqlite::SQLiteBackend;
8use crate::config::SqliteTableSourceConfig;
9use crate::sources::base::Document;
10
11#[derive(Clone)]
12pub struct SqliteTableSource {
13    pub(crate) cfg: SqliteTableSourceConfig,
14    pub(crate) backend: SQLiteBackend,
15}
16
17impl SqliteTableSource {
18    pub fn new(cfg: SqliteTableSourceConfig) -> Self {
19        let backend = SQLiteBackend::new(cfg.dsn_env.clone());
20        Self { cfg, backend }
21    }
22
23    pub async fn iter_documents(&self) -> Result<Vec<Document>> {
24        // Build column list: [id, content, optional title, *metadata...]
25        let mut cols: Vec<&str> = vec![&self.cfg.id_column, &self.cfg.content_column];
26        let title_idx: Option<usize> = if let Some(tc) = &self.cfg.title_column {
27            cols.push(tc);
28            Some(2)
29        } else {
30            None
31        };
32        let meta_start = if title_idx.is_some() { 3 } else { 2 };
33        for col in &self.cfg.metadata_columns {
34            cols.push(col);
35        }
36
37        let cols_sql: String = cols
38            .iter()
39            .map(|c| self.backend.quote_ident(c))
40            .collect::<Vec<_>>()
41            .join(", ");
42        let fq = self
43            .backend
44            .fq_table(&self.cfg.database_name, &self.cfg.table);
45        let mut select = format!("SELECT {cols_sql} FROM {fq}");
46        if let Some(w) = &self.cfg.where_clause {
47            select.push_str(&format!(" WHERE {w}"));
48        }
49
50        let conn = self.backend.connect().await?;
51        let metadata_columns = self.cfg.metadata_columns.clone();
52        tokio::task::spawn_blocking(move || -> Result<Vec<Document>> {
53            let g = conn.blocking_lock();
54            let mut q = g.prepare(&select).context("prepare source query")?;
55            let n_cols = q.column_count();
56            let rows = q
57                .query_map([], |r| {
58                    let id_v: rusqlite::types::Value = r.get(0)?;
59                    let id = match id_v {
60                        rusqlite::types::Value::Integer(i) => i.to_string(),
61                        rusqlite::types::Value::Real(f) => f.to_string(),
62                        rusqlite::types::Value::Text(s) => s,
63                        rusqlite::types::Value::Null => String::new(),
64                        rusqlite::types::Value::Blob(_) => "<blob>".to_string(),
65                    };
66                    let content: String = r.get(1)?;
67                    let title: Option<String> = match title_idx {
68                        Some(i) => r.get::<_, Option<String>>(i).ok().flatten(),
69                        None => None,
70                    };
71                    let mut meta = serde_json::Map::new();
72                    for (i, col) in metadata_columns.iter().enumerate() {
73                        let idx = meta_start + i;
74                        if idx >= n_cols {
75                            break;
76                        }
77                        let v: rusqlite::types::Value = r.get(idx)?;
78                        let jv = match v {
79                            rusqlite::types::Value::Null => serde_json::Value::Null,
80                            rusqlite::types::Value::Integer(i) => json!(i),
81                            rusqlite::types::Value::Real(f) => json!(f),
82                            rusqlite::types::Value::Text(s) => serde_json::Value::String(s),
83                            rusqlite::types::Value::Blob(_) => serde_json::Value::Null,
84                        };
85                        meta.insert(col.clone(), jv);
86                    }
87                    Ok(Document {
88                        id,
89                        content,
90                        title,
91                        metadata: serde_json::Value::Object(meta),
92                        fingerprint: None,
93                    })
94                })
95                .context("query source rows")?;
96            let out: rusqlite::Result<Vec<Document>> = rows.collect();
97            Ok(out.context("collect source rows")?)
98        })
99        .await
100        .context("spawn_blocking iter_documents")?
101    }
102}