chunkshop/sources/
pg_table.rs1use anyhow::{Context, Result};
9use serde::{Deserialize, Serialize};
10use serde_json::json;
11use std::future::Future;
12
13use crate::backends::base::{BackendConn, BackendDialect};
14use crate::backends::postgres::PostgresBackend;
15use crate::config::PgTableSourceConfig;
16use crate::sources::base::{Document, IncrementalSource};
17
18pub struct PgTableSource {
19 cfg: PgTableSourceConfig,
20 backend: PostgresBackend,
21}
22
23#[derive(Debug, Clone, Default, Serialize, Deserialize)]
36pub struct PgTableCursor {
37 #[serde(default, skip_serializing_if = "Option::is_none")]
38 pub after_ts: Option<String>,
39 #[serde(default, skip_serializing_if = "Option::is_none")]
40 pub after_id: Option<String>,
41}
42
43impl PgTableSource {
44 pub fn new(cfg: PgTableSourceConfig) -> Self {
45 let backend = PostgresBackend::new(cfg.dsn_env.clone());
46 Self { cfg, backend }
47 }
48
49 pub async fn iter_documents(&self) -> Result<Vec<Document>> {
50 let mut select = format!(
53 "SELECT {id_col}, {content_col}",
54 id_col = self.backend.quote_ident(&self.cfg.id_column),
55 content_col = self.backend.quote_ident(&self.cfg.content_column),
56 );
57 let mut title_idx: Option<usize> = None;
58 if let Some(tc) = &self.cfg.title_column {
59 title_idx = Some(2);
60 select.push_str(&format!(", {}", self.backend.quote_ident(tc)));
61 }
62 let meta_start = if title_idx.is_some() { 3 } else { 2 };
63 for col in &self.cfg.metadata_columns {
64 select.push_str(&format!(", {}", self.backend.quote_ident(col)));
65 }
66 select.push_str(&format!(
67 " FROM {fq}",
68 fq = self
69 .backend
70 .fq_table(&self.cfg.schema_name, &self.cfg.table)
71 ));
72 if let Some(w) = &self.cfg.where_clause {
73 select.push_str(&format!(" WHERE {w}"));
74 }
75
76 self.backend.connect().await?;
77 let pool = self.backend.pool().await?;
78 let rows = sqlx::query(&select)
79 .fetch_all(pool)
80 .await
81 .with_context(|| format!("running query: {select}"))?;
82
83 let mut out = Vec::with_capacity(rows.len());
84 for row in rows {
85 use sqlx::Row;
86 let id: String = row
87 .try_get::<String, _>(0)
88 .or_else(|_| row.try_get::<i64, _>(0).map(|n| n.to_string()))
89 .or_else(|_| row.try_get::<i32, _>(0).map(|n| n.to_string()))
90 .with_context(|| "reading id column from row".to_string())?;
91 let content: String = row.try_get(1).context("reading content column")?;
92 let title: Option<String> = match title_idx {
93 Some(i) => row.try_get::<Option<String>, _>(i).unwrap_or(None),
94 None => None,
95 };
96 let mut meta = serde_json::Map::new();
97 for (i, col) in self.cfg.metadata_columns.iter().enumerate() {
98 let idx = meta_start + i;
99 let v = read_meta_value(&row, idx);
100 meta.insert(col.clone(), v);
101 }
102 out.push(Document {
103 id,
104 content,
105 title,
106 metadata: serde_json::Value::Object(meta),
107 fingerprint: None,
108 });
109 }
110 Ok(out)
111 }
112
113 async fn iter_changes_since_inner(&self, cursor: &PgTableCursor) -> Result<Vec<Document>> {
117 let ua_col_name = self
119 .cfg
120 .updated_at_column
121 .as_ref()
122 .expect("iter_changes_since_inner called without updated_at_column");
123 let id_col = self.backend.quote_ident(&self.cfg.id_column);
124 let content_col = self.backend.quote_ident(&self.cfg.content_column);
125 let ua_col = self.backend.quote_ident(ua_col_name);
126
127 let mut select = format!("SELECT {id_col}, {content_col}");
129 let mut title_idx: Option<usize> = None;
130 if let Some(tc) = &self.cfg.title_column {
131 title_idx = Some(2);
132 select.push_str(&format!(", {}", self.backend.quote_ident(tc)));
133 }
134 let ua_idx = if title_idx.is_some() { 3 } else { 2 };
137 select.push_str(&format!(", {ua_col}"));
138 let meta_start = ua_idx + 1;
139 for col in &self.cfg.metadata_columns {
140 select.push_str(&format!(", {}", self.backend.quote_ident(col)));
141 }
142 select.push_str(&format!(
143 " FROM {fq}",
144 fq = self
145 .backend
146 .fq_table(&self.cfg.schema_name, &self.cfg.table)
147 ));
148
149 let mut where_parts: Vec<String> = Vec::new();
151 if let Some(w) = &self.cfg.where_clause {
152 where_parts.push(format!("({w})"));
153 }
154 let have_cursor = cursor.after_ts.is_some() && cursor.after_id.is_some();
155 if have_cursor {
156 where_parts.push(format!(
158 "({ua_col}, {id_col}::text) > ($1::timestamptz, $2)"
159 ));
160 }
161 if !where_parts.is_empty() {
162 select.push_str(" WHERE ");
163 select.push_str(&where_parts.join(" AND "));
164 }
165 select.push_str(&format!(" ORDER BY {ua_col}, {id_col}::text"));
167
168 self.backend.connect().await?;
169 let pool = self.backend.pool().await?;
170
171 let mut q = sqlx::query(&select);
172 if have_cursor {
173 let ts = cursor.after_ts.as_deref().unwrap();
174 let id = cursor.after_id.as_deref().unwrap();
175 q = q.bind(ts).bind(id);
179 }
180 let rows = q
181 .fetch_all(pool)
182 .await
183 .with_context(|| format!("running query: {select}"))?;
184
185 let mut out = Vec::with_capacity(rows.len());
186 for row in rows {
187 use sqlx::Row;
188 let id: String = row
189 .try_get::<String, _>(0)
190 .or_else(|_| row.try_get::<i64, _>(0).map(|n| n.to_string()))
191 .or_else(|_| row.try_get::<i32, _>(0).map(|n| n.to_string()))
192 .with_context(|| "reading id column from row".to_string())?;
193 let content: String = row.try_get(1).context("reading content column")?;
194 let title: Option<String> = match title_idx {
195 Some(i) => row.try_get::<Option<String>, _>(i).unwrap_or(None),
196 None => None,
197 };
198 let updated_at_iso = read_timestamp_as_iso(&row, ua_idx);
199 let mut meta = serde_json::Map::new();
200 for (i, col) in self.cfg.metadata_columns.iter().enumerate() {
201 let idx = meta_start + i;
202 let v = read_meta_value(&row, idx);
203 meta.insert(col.clone(), v);
204 }
205 if let Some(iso) = updated_at_iso {
207 meta.insert("_updated_at".to_string(), serde_json::Value::String(iso));
208 }
209 out.push(Document {
210 id,
211 content,
212 title,
213 metadata: serde_json::Value::Object(meta),
214 fingerprint: None,
215 });
216 }
217 Ok(out)
218 }
219}
220
221impl IncrementalSource for PgTableSource {
222 type Cursor = PgTableCursor;
223
224 fn empty_cursor(&self) -> Self::Cursor {
225 PgTableCursor::default()
226 }
227
228 fn iter_changes_since(
229 &self,
230 cursor: &Self::Cursor,
231 ) -> impl Future<Output = Result<Vec<Document>>> + Send {
232 let cursor = cursor.clone();
233 async move {
234 if self.cfg.updated_at_column.is_none() {
235 return self.iter_documents().await;
238 }
239 self.iter_changes_since_inner(&cursor).await
240 }
241 }
242
243 fn cursor_from(&self, last_document: &Document) -> Self::Cursor {
244 let after_ts = last_document
245 .metadata
246 .get("_updated_at")
247 .and_then(|v| v.as_str())
248 .map(|s| s.to_string());
249 PgTableCursor {
250 after_ts,
251 after_id: Some(last_document.id.clone()),
252 }
253 }
254}
255
256fn read_meta_value(row: &sqlx::postgres::PgRow, idx: usize) -> serde_json::Value {
257 use sqlx::Row;
258 if let Ok(v) = row.try_get::<Option<String>, _>(idx) {
259 return v
260 .map(serde_json::Value::String)
261 .unwrap_or(serde_json::Value::Null);
262 }
263 if let Ok(v) = row.try_get::<Option<i64>, _>(idx) {
264 return v.map(|n| json!(n)).unwrap_or(serde_json::Value::Null);
265 }
266 if let Ok(v) = row.try_get::<Option<i32>, _>(idx) {
267 return v.map(|n| json!(n)).unwrap_or(serde_json::Value::Null);
268 }
269 if let Ok(v) = row.try_get::<Option<f64>, _>(idx) {
270 return v.map(|n| json!(n)).unwrap_or(serde_json::Value::Null);
271 }
272 if let Ok(v) = row.try_get::<Option<bool>, _>(idx) {
273 return v.map(|b| json!(b)).unwrap_or(serde_json::Value::Null);
274 }
275 if let Ok(v) = row.try_get::<Option<Vec<String>>, _>(idx) {
276 return v.map(|a| json!(a)).unwrap_or(serde_json::Value::Null);
277 }
278 serde_json::Value::Null
279}
280
281fn read_timestamp_as_iso(row: &sqlx::postgres::PgRow, idx: usize) -> Option<String> {
287 use sqlx::Row;
288 if let Ok(v) =
289 row.try_get::<Option<sqlx::types::chrono::DateTime<sqlx::types::chrono::Utc>>, _>(idx)
290 {
291 return v.map(|dt| dt.to_rfc3339());
292 }
293 if let Ok(v) = row.try_get::<Option<sqlx::types::chrono::NaiveDateTime>, _>(idx) {
294 return v.map(|dt| dt.and_utc().to_rfc3339());
295 }
296 None
297}