chunkshop/sources/
sqlite_table.rs1use anyhow::{Context, Result};
4use serde_json::json;
5
6use crate::backends::base::BackendDialect;
7use crate::backends::sqlite::SQLiteBackend;
8use crate::config::SqliteTableSourceConfig;
9use crate::sources::base::Document;
10
11#[derive(Clone)]
12pub struct SqliteTableSource {
13 pub(crate) cfg: SqliteTableSourceConfig,
14 pub(crate) backend: SQLiteBackend,
15}
16
17impl SqliteTableSource {
18 pub fn new(cfg: SqliteTableSourceConfig) -> Self {
19 let backend = SQLiteBackend::new(cfg.dsn_env.clone());
20 Self { cfg, backend }
21 }
22
23 pub async fn iter_documents(&self) -> Result<Vec<Document>> {
24 let mut cols: Vec<&str> = vec![&self.cfg.id_column, &self.cfg.content_column];
26 let title_idx: Option<usize> = if let Some(tc) = &self.cfg.title_column {
27 cols.push(tc);
28 Some(2)
29 } else {
30 None
31 };
32 let meta_start = if title_idx.is_some() { 3 } else { 2 };
33 for col in &self.cfg.metadata_columns {
34 cols.push(col);
35 }
36
37 let cols_sql: String = cols
38 .iter()
39 .map(|c| self.backend.quote_ident(c))
40 .collect::<Vec<_>>()
41 .join(", ");
42 let fq = self
43 .backend
44 .fq_table(&self.cfg.database_name, &self.cfg.table);
45 let mut select = format!("SELECT {cols_sql} FROM {fq}");
46 if let Some(w) = &self.cfg.where_clause {
47 select.push_str(&format!(" WHERE {w}"));
48 }
49
50 let conn = self.backend.connect().await?;
51 let metadata_columns = self.cfg.metadata_columns.clone();
52 tokio::task::spawn_blocking(move || -> Result<Vec<Document>> {
53 let g = conn.blocking_lock();
54 let mut q = g.prepare(&select).context("prepare source query")?;
55 let n_cols = q.column_count();
56 let rows = q
57 .query_map([], |r| {
58 let id_v: rusqlite::types::Value = r.get(0)?;
59 let id = match id_v {
60 rusqlite::types::Value::Integer(i) => i.to_string(),
61 rusqlite::types::Value::Real(f) => f.to_string(),
62 rusqlite::types::Value::Text(s) => s,
63 rusqlite::types::Value::Null => String::new(),
64 rusqlite::types::Value::Blob(_) => "<blob>".to_string(),
65 };
66 let content: String = r.get(1)?;
67 let title: Option<String> = match title_idx {
68 Some(i) => r.get::<_, Option<String>>(i).ok().flatten(),
69 None => None,
70 };
71 let mut meta = serde_json::Map::new();
72 for (i, col) in metadata_columns.iter().enumerate() {
73 let idx = meta_start + i;
74 if idx >= n_cols {
75 break;
76 }
77 let v: rusqlite::types::Value = r.get(idx)?;
78 let jv = match v {
79 rusqlite::types::Value::Null => serde_json::Value::Null,
80 rusqlite::types::Value::Integer(i) => json!(i),
81 rusqlite::types::Value::Real(f) => json!(f),
82 rusqlite::types::Value::Text(s) => serde_json::Value::String(s),
83 rusqlite::types::Value::Blob(_) => serde_json::Value::Null,
84 };
85 meta.insert(col.clone(), jv);
86 }
87 Ok(Document {
88 id,
89 content,
90 title,
91 metadata: serde_json::Value::Object(meta),
92 fingerprint: None,
93 })
94 })
95 .context("query source rows")?;
96 let out: rusqlite::Result<Vec<Document>> = rows.collect();
97 Ok(out.context("collect source rows")?)
98 })
99 .await
100 .context("spawn_blocking iter_documents")?
101 }
102}