Skip to main content

chunkshop/sources/
pg_table.rs

1//! Postgres source. Mirrors `python/src/chunkshop/sources/pg_table.py`.
2//! Uses PostgresBackend for connection + identifier quoting (v0.4.0 modular shape).
3//!
4//! RM-B Task 2: adds `IncrementalSource` impl with tuple cursor
5//! `(updated_at, id::text)` for boundary-row safety. Mirror of Python commit
6//! `ff01268`. Activated by setting `updated_at_column` in the config.
7
8use anyhow::{Context, Result};
9use serde::{Deserialize, Serialize};
10use serde_json::json;
11use std::future::Future;
12
13use crate::backends::base::{BackendConn, BackendDialect};
14use crate::backends::postgres::PostgresBackend;
15use crate::config::PgTableSourceConfig;
16use crate::sources::base::{Document, IncrementalSource};
17
18pub struct PgTableSource {
19    cfg: PgTableSourceConfig,
20    backend: PostgresBackend,
21}
22
23/// Cursor for `PgTableSource::iter_changes_since`.
24///
25/// Tuple cursor `(after_ts, after_id)` mirrors Python commit `ff01268`. The
26/// `(updated_at, id::text)` comparison defends against silent row loss when
27/// multiple rows commit at the same boundary timestamp:
28///
29/// 1. Row `c1@T` arrives → cursor advances to `{after_ts: T, after_id: c1}`.
30/// 2. Row `c2@T` arrives (same timestamp).
31/// 3. Next sync emits `c2` because `(T, "c2") > (T, "c1")` is true.
32///
33/// Without the tuple cursor, a single-column `updated_at > T` predicate would
34/// skip `c2` forever.
35#[derive(Debug, Clone, Default, Serialize, Deserialize)]
36pub struct PgTableCursor {
37    #[serde(default, skip_serializing_if = "Option::is_none")]
38    pub after_ts: Option<String>,
39    #[serde(default, skip_serializing_if = "Option::is_none")]
40    pub after_id: Option<String>,
41}
42
43impl PgTableSource {
44    pub fn new(cfg: PgTableSourceConfig) -> Self {
45        let backend = PostgresBackend::new(cfg.dsn_env.clone());
46        Self { cfg, backend }
47    }
48
49    pub async fn iter_documents(&self) -> Result<Vec<Document>> {
50        // Column order matches Python's pg_table.py:
51        //   [id, content, optional title, *metadata_columns...]
52        let mut select = format!(
53            "SELECT {id_col}, {content_col}",
54            id_col = self.backend.quote_ident(&self.cfg.id_column),
55            content_col = self.backend.quote_ident(&self.cfg.content_column),
56        );
57        let mut title_idx: Option<usize> = None;
58        if let Some(tc) = &self.cfg.title_column {
59            title_idx = Some(2);
60            select.push_str(&format!(", {}", self.backend.quote_ident(tc)));
61        }
62        let meta_start = if title_idx.is_some() { 3 } else { 2 };
63        for col in &self.cfg.metadata_columns {
64            select.push_str(&format!(", {}", self.backend.quote_ident(col)));
65        }
66        select.push_str(&format!(
67            " FROM {fq}",
68            fq = self
69                .backend
70                .fq_table(&self.cfg.schema_name, &self.cfg.table)
71        ));
72        if let Some(w) = &self.cfg.where_clause {
73            select.push_str(&format!(" WHERE {w}"));
74        }
75
76        self.backend.connect().await?;
77        let pool = self.backend.pool().await?;
78        let rows = sqlx::query(&select)
79            .fetch_all(pool)
80            .await
81            .with_context(|| format!("running query: {select}"))?;
82
83        let mut out = Vec::with_capacity(rows.len());
84        for row in rows {
85            use sqlx::Row;
86            let id: String = row
87                .try_get::<String, _>(0)
88                .or_else(|_| row.try_get::<i64, _>(0).map(|n| n.to_string()))
89                .or_else(|_| row.try_get::<i32, _>(0).map(|n| n.to_string()))
90                .with_context(|| "reading id column from row".to_string())?;
91            let content: String = row.try_get(1).context("reading content column")?;
92            let title: Option<String> = match title_idx {
93                Some(i) => row.try_get::<Option<String>, _>(i).unwrap_or(None),
94                None => None,
95            };
96            let mut meta = serde_json::Map::new();
97            for (i, col) in self.cfg.metadata_columns.iter().enumerate() {
98                let idx = meta_start + i;
99                let v = read_meta_value(&row, idx);
100                meta.insert(col.clone(), v);
101            }
102            out.push(Document {
103                id,
104                content,
105                title,
106                metadata: serde_json::Value::Object(meta),
107                fingerprint: None,
108            });
109        }
110        Ok(out)
111    }
112
113    /// Internal: the cursor-aware SELECT path. Mirrors Python's
114    /// `iter_changes_since` exactly. The caller decides whether to dispatch
115    /// here or to `iter_documents` based on `updated_at_column` presence.
116    async fn iter_changes_since_inner(&self, cursor: &PgTableCursor) -> Result<Vec<Document>> {
117        // Caller guarantees `updated_at_column` is Some when calling this.
118        let ua_col_name = self
119            .cfg
120            .updated_at_column
121            .as_ref()
122            .expect("iter_changes_since_inner called without updated_at_column");
123        let id_col = self.backend.quote_ident(&self.cfg.id_column);
124        let content_col = self.backend.quote_ident(&self.cfg.content_column);
125        let ua_col = self.backend.quote_ident(ua_col_name);
126
127        // Column order: id, content, [title,] updated_at, *metadata_columns
128        let mut select = format!("SELECT {id_col}, {content_col}");
129        let mut title_idx: Option<usize> = None;
130        if let Some(tc) = &self.cfg.title_column {
131            title_idx = Some(2);
132            select.push_str(&format!(", {}", self.backend.quote_ident(tc)));
133        }
134        // updated_at always comes next, so the metadata columns start at its
135        // index + 1.
136        let ua_idx = if title_idx.is_some() { 3 } else { 2 };
137        select.push_str(&format!(", {ua_col}"));
138        let meta_start = ua_idx + 1;
139        for col in &self.cfg.metadata_columns {
140            select.push_str(&format!(", {}", self.backend.quote_ident(col)));
141        }
142        select.push_str(&format!(
143            " FROM {fq}",
144            fq = self
145                .backend
146                .fq_table(&self.cfg.schema_name, &self.cfg.table)
147        ));
148
149        // Compose WHERE: optional `cfg.where_clause` AND optional tuple cursor.
150        let mut where_parts: Vec<String> = Vec::new();
151        if let Some(w) = &self.cfg.where_clause {
152            where_parts.push(format!("({w})"));
153        }
154        let have_cursor = cursor.after_ts.is_some() && cursor.after_id.is_some();
155        if have_cursor {
156            // (updated_at, id::text) > ($1, $2)
157            where_parts.push(format!(
158                "({ua_col}, {id_col}::text) > ($1::timestamptz, $2)"
159            ));
160        }
161        if !where_parts.is_empty() {
162            select.push_str(" WHERE ");
163            select.push_str(&where_parts.join(" AND "));
164        }
165        // Canonical order = (updated_at, id::text) ascending. Same as Python.
166        select.push_str(&format!(" ORDER BY {ua_col}, {id_col}::text"));
167
168        self.backend.connect().await?;
169        let pool = self.backend.pool().await?;
170
171        let mut q = sqlx::query(&select);
172        if have_cursor {
173            let ts = cursor.after_ts.as_deref().unwrap();
174            let id = cursor.after_id.as_deref().unwrap();
175            // Bind timestamp as text + cast in the SQL via $1::timestamptz so
176            // we don't need chrono parsing client-side. Postgres parses ISO
177            // strings consistently in any sane locale.
178            q = q.bind(ts).bind(id);
179        }
180        let rows = q
181            .fetch_all(pool)
182            .await
183            .with_context(|| format!("running query: {select}"))?;
184
185        let mut out = Vec::with_capacity(rows.len());
186        for row in rows {
187            use sqlx::Row;
188            let id: String = row
189                .try_get::<String, _>(0)
190                .or_else(|_| row.try_get::<i64, _>(0).map(|n| n.to_string()))
191                .or_else(|_| row.try_get::<i32, _>(0).map(|n| n.to_string()))
192                .with_context(|| "reading id column from row".to_string())?;
193            let content: String = row.try_get(1).context("reading content column")?;
194            let title: Option<String> = match title_idx {
195                Some(i) => row.try_get::<Option<String>, _>(i).unwrap_or(None),
196                None => None,
197            };
198            let updated_at_iso = read_timestamp_as_iso(&row, ua_idx);
199            let mut meta = serde_json::Map::new();
200            for (i, col) in self.cfg.metadata_columns.iter().enumerate() {
201                let idx = meta_start + i;
202                let v = read_meta_value(&row, idx);
203                meta.insert(col.clone(), v);
204            }
205            // Mirror Python: stamp `_updated_at` so `cursor_from` can read it.
206            if let Some(iso) = updated_at_iso {
207                meta.insert("_updated_at".to_string(), serde_json::Value::String(iso));
208            }
209            out.push(Document {
210                id,
211                content,
212                title,
213                metadata: serde_json::Value::Object(meta),
214                fingerprint: None,
215            });
216        }
217        Ok(out)
218    }
219}
220
221impl IncrementalSource for PgTableSource {
222    type Cursor = PgTableCursor;
223
224    fn empty_cursor(&self) -> Self::Cursor {
225        PgTableCursor::default()
226    }
227
228    fn iter_changes_since(
229        &self,
230        cursor: &Self::Cursor,
231    ) -> impl Future<Output = Result<Vec<Document>>> + Send {
232        let cursor = cursor.clone();
233        async move {
234            if self.cfg.updated_at_column.is_none() {
235                // No cursor column → fall back to full-resync semantics, same
236                // as Python's behavior. The cursor argument is ignored.
237                return self.iter_documents().await;
238            }
239            self.iter_changes_since_inner(&cursor).await
240        }
241    }
242
243    fn cursor_from(&self, last_document: &Document) -> Self::Cursor {
244        let after_ts = last_document
245            .metadata
246            .get("_updated_at")
247            .and_then(|v| v.as_str())
248            .map(|s| s.to_string());
249        PgTableCursor {
250            after_ts,
251            after_id: Some(last_document.id.clone()),
252        }
253    }
254}
255
256fn read_meta_value(row: &sqlx::postgres::PgRow, idx: usize) -> serde_json::Value {
257    use sqlx::Row;
258    if let Ok(v) = row.try_get::<Option<String>, _>(idx) {
259        return v
260            .map(serde_json::Value::String)
261            .unwrap_or(serde_json::Value::Null);
262    }
263    if let Ok(v) = row.try_get::<Option<i64>, _>(idx) {
264        return v.map(|n| json!(n)).unwrap_or(serde_json::Value::Null);
265    }
266    if let Ok(v) = row.try_get::<Option<i32>, _>(idx) {
267        return v.map(|n| json!(n)).unwrap_or(serde_json::Value::Null);
268    }
269    if let Ok(v) = row.try_get::<Option<f64>, _>(idx) {
270        return v.map(|n| json!(n)).unwrap_or(serde_json::Value::Null);
271    }
272    if let Ok(v) = row.try_get::<Option<bool>, _>(idx) {
273        return v.map(|b| json!(b)).unwrap_or(serde_json::Value::Null);
274    }
275    if let Ok(v) = row.try_get::<Option<Vec<String>>, _>(idx) {
276        return v.map(|a| json!(a)).unwrap_or(serde_json::Value::Null);
277    }
278    serde_json::Value::Null
279}
280
281/// Read a Postgres `timestamptz` column and emit ISO-8601 UTC text.
282///
283/// Format matches Python `datetime.fromisoformat` round-trip-safe output
284/// (`2026-05-25T12:00:00+00:00`), which is what consumers persist and what
285/// `iter_changes_since` accepts back via `$1::timestamptz`.
286fn read_timestamp_as_iso(row: &sqlx::postgres::PgRow, idx: usize) -> Option<String> {
287    use sqlx::Row;
288    if let Ok(v) =
289        row.try_get::<Option<sqlx::types::chrono::DateTime<sqlx::types::chrono::Utc>>, _>(idx)
290    {
291        return v.map(|dt| dt.to_rfc3339());
292    }
293    if let Ok(v) = row.try_get::<Option<sqlx::types::chrono::NaiveDateTime>, _>(idx) {
294        return v.map(|dt| dt.and_utc().to_rfc3339());
295    }
296    None
297}