Skip to main content

smooth_operator_adapter_postgres/
admin.rs

1//! Persistent admin stores (Phase 12 follow-up) — Postgres-backed.
2//!
3//! The three management-console stores ship with process-local in-memory
4//! implementations (`InMemoryConnectorConfigStore`, `InMemorySettingsStore`,
5//! `InMemoryIndexingStore`) that lose everything on restart. This module makes
6//! them durable against the same Postgres the rest of the adapter dogfoods,
7//! preserving the in-memory semantics exactly:
8//!
9//! - [`PgConnectorConfigStore`] — org-scoped CRUD over `connector_configs`
10//!   (PK `(org_id, id)`). `list` is sorted by `(name, id)`; cross-org `get` /
11//!   `delete` never touch another org's row.
12//! - [`PgSettingsStore`] — per-org `agent_settings` (PK `org_id`); `get` of an
13//!   unset org returns [`AgentSettings::defaults`], `put` is an upsert.
14//! - [`PgIndexingStore`] — the `indexing_runs` ledger (PK `id`). `record_run`
15//!   upserts by id (so a `Running` row can be promoted to a terminal state),
16//!   `list_runs` returns a connector's runs oldest-first, and `latest_cursor`
17//!   is `max(cursor)` over **succeeded** runs only — a failed run never advances
18//!   the cursor.
19//!
20//! ## Sync trait over an async pool
21//!
22//! All three store traits are **synchronous** (the engine / admin API call them
23//! directly), but `deadpool` is async. We bridge with the same
24//! [`run_blocking`](run_blocking) helper the knowledge base uses: `spawn` the
25//! async work onto a captured runtime [`Handle`] (so its I/O makes progress on
26//! that runtime's reactor) and block the calling thread on the `JoinHandle` from
27//! a throwaway OS thread — never `Handle::block_on` on a runtime worker thread.
28
29use std::future::Future;
30
31use anyhow::{anyhow, Result};
32use chrono::{DateTime, Utc};
33use deadpool_postgres::Pool;
34use tokio::runtime::Handle;
35
36use smooth_operator::connector_config::{ConnectorConfig, ConnectorConfigStore, ConnectorKind};
37use smooth_operator::settings::{AgentSettings, SettingsStore};
38use smooth_operator_ingestion::indexing::{IndexingRun, IndexingRunStatus, IndexingStore};
39use smooth_operator_ingestion::Timestamp;
40
41/// Drive an async future to completion from a *synchronous* trait method.
42///
43/// Identical bridge to `PgKnowledgeBase::run_blocking`: `spawn` onto the
44/// captured runtime so the async I/O makes progress on that runtime's reactor,
45/// then block on the `JoinHandle` from a throwaway OS thread running a tiny
46/// current-thread runtime. This never calls `Handle::block_on` on a runtime
47/// worker thread (which panics "Cannot start a runtime from within a runtime"),
48/// so it is safe whether the caller is on a worker or a plain OS thread.
49fn run_blocking<F, T>(handle: &Handle, fut: F) -> Result<T>
50where
51    F: Future<Output = Result<T>> + Send + 'static,
52    T: Send + 'static,
53{
54    let join = handle.spawn(fut);
55    let (tx, rx) = std::sync::mpsc::channel();
56    std::thread::spawn(move || {
57        let result = (|| -> Result<T> {
58            let rt = tokio::runtime::Builder::new_current_thread()
59                .enable_all()
60                .build()?;
61            let joined = rt.block_on(join);
62            joined.map_err(|e| anyhow!("admin store task panicked or was cancelled: {e}"))?
63        })();
64        let _ = tx.send(result);
65    });
66    rx.recv()
67        .map_err(|e| anyhow!("admin store task channel closed: {e}"))?
68}
69
70// ---------------------------------------------------------------------------
71// Connector config store
72// ---------------------------------------------------------------------------
73
74/// Postgres-backed [`ConnectorConfigStore`] over `connector_configs`.
75#[derive(Clone)]
76pub struct PgConnectorConfigStore {
77    pool: Pool,
78    handle: Handle,
79}
80
81impl PgConnectorConfigStore {
82    /// Build over the adapter's async pool + captured runtime handle.
83    #[must_use]
84    pub fn new(pool: Pool, handle: Handle) -> Self {
85        Self { pool, handle }
86    }
87
88    async fn list_async(&self, org_id: String) -> Result<Vec<ConnectorConfig>> {
89        let client = self.pool.get().await?;
90        let rows = client
91            .query(
92                "SELECT id, org_id, name, kind, config, enabled, created_at, updated_at
93                 FROM connector_configs
94                 WHERE org_id = $1
95                 ORDER BY name, id",
96                &[&org_id],
97            )
98            .await?;
99        rows.iter().map(row_to_connector).collect()
100    }
101
102    async fn get_async(&self, org_id: String, id: String) -> Result<Option<ConnectorConfig>> {
103        let client = self.pool.get().await?;
104        let row = client
105            .query_opt(
106                "SELECT id, org_id, name, kind, config, enabled, created_at, updated_at
107                 FROM connector_configs
108                 WHERE org_id = $1 AND id = $2",
109                &[&org_id, &id],
110            )
111            .await?;
112        row.as_ref().map(row_to_connector).transpose()
113    }
114
115    async fn upsert_async(&self, cfg: ConnectorConfig) -> Result<()> {
116        let client = self.pool.get().await?;
117        client
118            .execute(
119                "INSERT INTO connector_configs
120                    (org_id, id, name, kind, config, enabled, created_at, updated_at)
121                 VALUES ($1,$2,$3,$4,$5,$6,$7,$8)
122                 ON CONFLICT (org_id, id) DO UPDATE SET
123                    name       = EXCLUDED.name,
124                    kind       = EXCLUDED.kind,
125                    config     = EXCLUDED.config,
126                    enabled    = EXCLUDED.enabled,
127                    created_at = EXCLUDED.created_at,
128                    updated_at = EXCLUDED.updated_at",
129                &[
130                    &cfg.org_id,
131                    &cfg.id,
132                    &cfg.name,
133                    &cfg.kind.as_str(),
134                    &cfg.config,
135                    &cfg.enabled,
136                    &cfg.created_at,
137                    &cfg.updated_at,
138                ],
139            )
140            .await?;
141        Ok(())
142    }
143
144    async fn delete_async(&self, org_id: String, id: String) -> Result<bool> {
145        let client = self.pool.get().await?;
146        let n = client
147            .execute(
148                "DELETE FROM connector_configs WHERE org_id = $1 AND id = $2",
149                &[&org_id, &id],
150            )
151            .await?;
152        Ok(n > 0)
153    }
154}
155
156fn row_to_connector(row: &tokio_postgres::Row) -> Result<ConnectorConfig> {
157    let kind_str: String = row.get("kind");
158    let kind = ConnectorKind::parse(&kind_str)
159        .map_err(|bad| anyhow!("unknown connector kind '{bad}' in connector_configs row"))?;
160    Ok(ConnectorConfig {
161        id: row.get("id"),
162        org_id: row.get("org_id"),
163        name: row.get("name"),
164        kind,
165        config: row.get("config"),
166        enabled: row.get("enabled"),
167        created_at: row.get("created_at"),
168        updated_at: row.get("updated_at"),
169    })
170}
171
172impl ConnectorConfigStore for PgConnectorConfigStore {
173    fn list(&self, org_id: &str) -> Vec<ConnectorConfig> {
174        let this = self.clone();
175        let org_id = org_id.to_string();
176        run_blocking(&self.handle, async move { this.list_async(org_id).await }).unwrap_or_default()
177    }
178
179    fn get(&self, org_id: &str, id: &str) -> Option<ConnectorConfig> {
180        let this = self.clone();
181        let org_id = org_id.to_string();
182        let id = id.to_string();
183        run_blocking(
184            &self.handle,
185            async move { this.get_async(org_id, id).await },
186        )
187        .ok()
188        .flatten()
189    }
190
191    fn upsert(&self, config: ConnectorConfig) {
192        let this = self.clone();
193        let _ = run_blocking(&self.handle, async move { this.upsert_async(config).await });
194    }
195
196    fn delete(&self, org_id: &str, id: &str) -> bool {
197        let this = self.clone();
198        let org_id = org_id.to_string();
199        let id = id.to_string();
200        run_blocking(
201            &self.handle,
202            async move { this.delete_async(org_id, id).await },
203        )
204        .unwrap_or(false)
205    }
206}
207
208// ---------------------------------------------------------------------------
209// Settings store
210// ---------------------------------------------------------------------------
211
212/// Postgres-backed [`SettingsStore`] over `agent_settings`.
213#[derive(Clone)]
214pub struct PgSettingsStore {
215    pool: Pool,
216    handle: Handle,
217}
218
219impl PgSettingsStore {
220    /// Build over the adapter's async pool + captured runtime handle.
221    #[must_use]
222    pub fn new(pool: Pool, handle: Handle) -> Self {
223        Self { pool, handle }
224    }
225
226    async fn get_async(&self, org_id: String) -> Result<Option<AgentSettings>> {
227        let client = self.pool.get().await?;
228        let row = client
229            .query_opt(
230                "SELECT org_id, model, system_prompt, default_tools, updated_at
231                 FROM agent_settings WHERE org_id = $1",
232                &[&org_id],
233            )
234            .await?;
235        match row {
236            Some(row) => {
237                let default_tools: serde_json::Value = row.get("default_tools");
238                let default_tools: Vec<String> = serde_json::from_value(default_tools)?;
239                Ok(Some(AgentSettings {
240                    org_id: row.get("org_id"),
241                    model: row.get("model"),
242                    system_prompt: row.get("system_prompt"),
243                    default_tools,
244                    updated_at: row.get("updated_at"),
245                }))
246            }
247            None => Ok(None),
248        }
249    }
250
251    async fn put_async(&self, settings: AgentSettings) -> Result<()> {
252        let client = self.pool.get().await?;
253        let default_tools = serde_json::to_value(&settings.default_tools)?;
254        client
255            .execute(
256                "INSERT INTO agent_settings
257                    (org_id, model, system_prompt, default_tools, updated_at)
258                 VALUES ($1,$2,$3,$4,$5)
259                 ON CONFLICT (org_id) DO UPDATE SET
260                    model         = EXCLUDED.model,
261                    system_prompt = EXCLUDED.system_prompt,
262                    default_tools = EXCLUDED.default_tools,
263                    updated_at    = EXCLUDED.updated_at",
264                &[
265                    &settings.org_id,
266                    &settings.model,
267                    &settings.system_prompt,
268                    &default_tools,
269                    &settings.updated_at,
270                ],
271            )
272            .await?;
273        Ok(())
274    }
275}
276
277impl SettingsStore for PgSettingsStore {
278    fn get(&self, org_id: &str) -> AgentSettings {
279        let this = self.clone();
280        let org = org_id.to_string();
281        // Absent row (or a transient read failure) falls back to defaults so the
282        // console always has a populated form, matching the in-memory store.
283        run_blocking(&self.handle, async move { this.get_async(org).await })
284            .ok()
285            .flatten()
286            .unwrap_or_else(|| AgentSettings::defaults(org_id))
287    }
288
289    fn put(&self, settings: AgentSettings) {
290        let this = self.clone();
291        let _ = run_blocking(&self.handle, async move { this.put_async(settings).await });
292    }
293}
294
295// ---------------------------------------------------------------------------
296// Indexing store
297// ---------------------------------------------------------------------------
298
299/// Postgres-backed [`IndexingStore`] over `indexing_runs`.
300#[derive(Clone)]
301pub struct PgIndexingStore {
302    pool: Pool,
303    handle: Handle,
304}
305
306impl PgIndexingStore {
307    /// Build over the adapter's async pool + captured runtime handle.
308    #[must_use]
309    pub fn new(pool: Pool, handle: Handle) -> Self {
310        Self { pool, handle }
311    }
312
313    async fn record_run_async(&self, run: IndexingRun) -> Result<()> {
314        let client = self.pool.get().await?;
315        let status = status_to_str(run.status);
316        let documents_seen = i64::try_from(run.documents_seen).unwrap_or(i64::MAX);
317        let chunks_indexed = i64::try_from(run.chunks_indexed).unwrap_or(i64::MAX);
318        let documents_skipped = i64::try_from(run.documents_skipped).unwrap_or(i64::MAX);
319        client
320            .execute(
321                "INSERT INTO indexing_runs
322                    (id, connector_name, status, started_at, finished_at,
323                     documents_seen, chunks_indexed, documents_skipped, cursor, error)
324                 VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10)
325                 ON CONFLICT (id) DO UPDATE SET
326                    connector_name    = EXCLUDED.connector_name,
327                    status            = EXCLUDED.status,
328                    started_at        = EXCLUDED.started_at,
329                    finished_at       = EXCLUDED.finished_at,
330                    documents_seen    = EXCLUDED.documents_seen,
331                    chunks_indexed    = EXCLUDED.chunks_indexed,
332                    documents_skipped = EXCLUDED.documents_skipped,
333                    cursor            = EXCLUDED.cursor,
334                    error             = EXCLUDED.error",
335                &[
336                    &run.id,
337                    &run.connector_name,
338                    &status,
339                    &run.started_at,
340                    &run.finished_at,
341                    &documents_seen,
342                    &chunks_indexed,
343                    &documents_skipped,
344                    &run.cursor,
345                    &run.error,
346                ],
347            )
348            .await?;
349        Ok(())
350    }
351
352    async fn latest_cursor_async(&self, connector_name: String) -> Result<Option<Timestamp>> {
353        let client = self.pool.get().await?;
354        // Max cursor over SUCCEEDED runs only — a failed run never advances it.
355        let row = client
356            .query_one(
357                "SELECT max(cursor) AS c
358                 FROM indexing_runs
359                 WHERE connector_name = $1 AND status = 'succeeded'",
360                &[&connector_name],
361            )
362            .await?;
363        Ok(row.get::<_, Option<DateTime<Utc>>>("c"))
364    }
365
366    async fn list_runs_async(&self, connector_name: String) -> Result<Vec<IndexingRun>> {
367        let client = self.pool.get().await?;
368        // Oldest-first to match the in-memory insertion-order contract.
369        let rows = client
370            .query(
371                "SELECT id, connector_name, status, started_at, finished_at,
372                        documents_seen, chunks_indexed, documents_skipped, cursor, error
373                 FROM indexing_runs
374                 WHERE connector_name = $1
375                 ORDER BY started_at ASC, id ASC",
376                &[&connector_name],
377            )
378            .await?;
379        rows.iter().map(row_to_run).collect()
380    }
381}
382
383fn status_to_str(status: IndexingRunStatus) -> &'static str {
384    match status {
385        IndexingRunStatus::Running => "running",
386        IndexingRunStatus::Succeeded => "succeeded",
387        IndexingRunStatus::Failed => "failed",
388    }
389}
390
391fn status_from_str(s: &str) -> Result<IndexingRunStatus> {
392    Ok(match s {
393        "running" => IndexingRunStatus::Running,
394        "succeeded" => IndexingRunStatus::Succeeded,
395        "failed" => IndexingRunStatus::Failed,
396        other => return Err(anyhow!("unknown indexing run status '{other}'")),
397    })
398}
399
400fn row_to_run(row: &tokio_postgres::Row) -> Result<IndexingRun> {
401    let status = status_from_str(row.get::<_, String>("status").as_str())?;
402    let documents_seen: i64 = row.get("documents_seen");
403    let chunks_indexed: i64 = row.get("chunks_indexed");
404    let documents_skipped: i64 = row.get("documents_skipped");
405    Ok(IndexingRun {
406        id: row.get("id"),
407        connector_name: row.get("connector_name"),
408        status,
409        started_at: row.get("started_at"),
410        finished_at: row.get("finished_at"),
411        documents_seen: usize::try_from(documents_seen).unwrap_or(0),
412        chunks_indexed: usize::try_from(chunks_indexed).unwrap_or(0),
413        documents_skipped: usize::try_from(documents_skipped).unwrap_or(0),
414        cursor: row.get("cursor"),
415        error: row.get("error"),
416    })
417}
418
419impl IndexingStore for PgIndexingStore {
420    fn record_run(&self, run: &IndexingRun) {
421        let this = self.clone();
422        let run = run.clone();
423        let _ = run_blocking(
424            &self.handle,
425            async move { this.record_run_async(run).await },
426        );
427    }
428
429    fn latest_cursor(&self, connector_name: &str) -> Option<Timestamp> {
430        let this = self.clone();
431        let name = connector_name.to_string();
432        run_blocking(
433            &self.handle,
434            async move { this.latest_cursor_async(name).await },
435        )
436        .ok()
437        .flatten()
438    }
439
440    fn list_runs(&self, connector_name: &str) -> Vec<IndexingRun> {
441        let this = self.clone();
442        let name = connector_name.to_string();
443        run_blocking(
444            &self.handle,
445            async move { this.list_runs_async(name).await },
446        )
447        .unwrap_or_default()
448    }
449}