Skip to main content

smooth_operator_adapter_postgres/
lib.rs

1//! Postgres + pgvector [`StorageAdapter`] — the dogfood backend.
2//!
3//! This is the production Postgres implementation of the one storage seam (see
4//! `docs/STORAGE.md`). It mirrors the smooai monorepo's schema so dogfooding is
5//! a swap, not a rewrite:
6//!
7//! - **OLTP** (conversations / participants / messages / sessions): async CRUD
8//!   over a [`deadpool_postgres`] pool, semantics matching the in-memory baseline
9//!   (conversation idempotency, external-id participant resolve, cursor message
10//!   paging, session status/counts).
11//! - **Checkpoints**: smooth-operator's
12//!   [`PostgresCheckpointStore`](smooth_operator_core::PostgresCheckpointStore) (a
13//!   *synchronous* r2d2-pooled store) constructed against the **same database**
14//!   — so the engine's `with_checkpoint_store` plugs straight in and agent state
15//!   lives next to the conversations it belongs to.
16//! - **Knowledge**: a pgvector-backed [`PgKnowledgeBase`] (dense HNSW cosine ∪
17//!   sparse `tsvector` BM25 → Reciprocal Rank Fusion). Text→vector goes through
18//!   the [`Embedder`] seam — [`DeterministicEmbedder`] by default (reproducible,
19//!   no network), [`GatewayEmbedder`] when a live gateway is configured.
20//! - **Memory**: a pgvector-backed [`PgMemory`] (parity gap Phase 3 /
21//!   SMOODEV-1470) — persistent, semantic, cross-thread agent memory namespaced
22//!   by `(organization_id, user_id)` like the TS `['memories', orgId, userId]`
23//!   store. Implements the core [`Memory`](smooth_operator_core::Memory) trait;
24//!   recall is pgvector cosine top-K under an HNSW index, scoped to the
25//!   namespace. Shares the adapter's [`Embedder`].
26//!
27//! ## Sharing one database between the async pool and the sync checkpoint store
28//!
29//! The OLTP/knowledge slices need async (`tokio-postgres` + `deadpool`); the
30//! checkpoint slice is a sync trait backed by an r2d2 pool inside
31//! smooth-operator. Both are pointed at the **same `conn_str`**: we build the
32//! async deadpool from it for our own queries, and hand the *same* string to
33//! `PostgresCheckpointStore::connect`, which stands up its own small r2d2 pool
34//! and `checkpoints` table in that database. Two pools, two driver stacks, one
35//! Postgres — the tables coexist (ours from [`schema`], theirs from their own
36//! `CREATE TABLE IF NOT EXISTS`).
37
38mod admin;
39mod embedder;
40mod knowledge;
41mod memory;
42mod reranker;
43mod schema;
44
45use std::sync::Arc;
46
47use anyhow::{anyhow, Context, Result};
48use async_trait::async_trait;
49use chrono::Utc;
50use deadpool_postgres::{Config as PoolConfig, ManagerConfig, Pool, RecyclingMethod, Runtime};
51use tokio_postgres::NoTls;
52
53use smooth_operator::access_control::AccessContext;
54use smooth_operator::adapter::{
55    ConversationUpdate, MessagePage, MessageQuery, SessionUpdate, StorageAdapter,
56};
57use smooth_operator::domain::{
58    Conversation, Direction, Message, MessageContent, Participant, ParticipantRef, ParticipantType,
59    Platform, Session, SessionStatus,
60};
61use smooth_operator_core::checkpoint::PostgresCheckpointStore;
62use smooth_operator_core::{CheckpointStore, KnowledgeBase};
63
64// The shared embedding seam (trait + deterministic default) now lives in core;
65// re-export it here so existing `postgres::{Embedder, DeterministicEmbedder, …}`
66// consumers keep working. Only the adapter-specific `GatewayEmbedder` (+ its
67// 1536-d constant) is defined locally.
68pub use admin::{PgConnectorConfigStore, PgIndexingStore, PgSettingsStore};
69pub use embedder::{GatewayEmbedder, OPENAI_SMALL_EMBEDDING_DIM};
70pub use knowledge::PgKnowledgeBase;
71pub use memory::PgMemory;
72pub use reranker::{
73    GatewayReranker, HttpRerankBackend, RerankBackend, RerankScore, DEFAULT_RERANK_MODEL,
74};
75pub use smooth_operator::embedding::{
76    DeterministicEmbedder, Embedder, InputType, DEFAULT_EMBEDDING_DIM,
77};
78
79/// Postgres + pgvector storage adapter.
80pub struct PostgresAdapter {
81    pool: Pool,
82    /// `Option` so [`Drop`] can `take()` the checkpoint store and dispose of it on
83    /// a dedicated OS thread. The sync `postgres::Client`s inside its r2d2 pool
84    /// run `block_on` in their own `Drop`, which panics on a Tokio worker thread
85    /// ("Cannot start a runtime from within a runtime"). Disposing off-runtime
86    /// keeps the adapter safe to drop from async code.
87    checkpoints: Option<Arc<PostgresCheckpointStore>>,
88    knowledge: Arc<PgKnowledgeBase>,
89    /// Retained so the [`memory`](PostgresAdapter::memory) accessor can build
90    /// namespace-bound [`PgMemory`] handles that embed identically to knowledge.
91    embedder: Arc<dyn Embedder>,
92    embedding_dim: usize,
93    /// Captured runtime handle for the sync admin-store bridges (connector
94    /// configs / settings / indexing runs over the same async pool).
95    handle: tokio::runtime::Handle,
96}
97
98impl Drop for PostgresAdapter {
99    fn drop(&mut self) {
100        if let Some(checkpoints) = self.checkpoints.take() {
101            // Move the (possibly last) strong ref off any Tokio worker thread so
102            // the r2d2 pool's blocking `postgres::Client::drop` runs on a plain
103            // OS thread where `block_on` is legal. Join it so disposal is
104            // deterministic (and so a short-lived process actually closes its
105            // connections).
106            if let Ok(handle) = std::thread::Builder::new()
107                .name("pg-checkpoint-drop".into())
108                .spawn(move || drop(checkpoints))
109            {
110                let _ = handle.join();
111            }
112        }
113    }
114}
115
116impl PostgresAdapter {
117    /// Connect to Postgres, build the async pool + sync checkpoint store, and
118    /// apply the schema. Uses the [`DeterministicEmbedder`] (1024-d) by default.
119    ///
120    /// `conn_str` is a libpq URL or `key=value` connection string; it is read
121    /// from `DATABASE_URL` / `SMOOTH_AGENT_DATABASE_URL` by [`Self::from_env`].
122    ///
123    /// # Errors
124    /// Returns an error if the connection string is invalid, either pool fails to
125    /// build, or schema migration fails.
126    pub async fn connect(conn_str: &str) -> Result<Self> {
127        Self::connect_with_embedder(conn_str, Arc::new(DeterministicEmbedder::new())).await
128    }
129
130    /// As [`Self::connect`] but with a caller-supplied embedder. The adapter's
131    /// vector column width is taken from `embedder.dim()`, so a 1536-d
132    /// [`GatewayEmbedder`] and the `vector(1536)` column always agree.
133    ///
134    /// # Errors
135    /// See [`Self::connect`].
136    pub async fn connect_with_embedder(
137        conn_str: &str,
138        embedder: Arc<dyn Embedder>,
139    ) -> Result<Self> {
140        let embedding_dim = embedder.dim();
141
142        // --- async pool (OLTP + knowledge) ---
143        let pg_config: tokio_postgres::Config = conn_str
144            .parse()
145            .context("parsing connection string for async pool")?;
146        let mut cfg = PoolConfig::new();
147        cfg.manager = Some(ManagerConfig {
148            recycling_method: RecyclingMethod::Fast,
149        });
150        // deadpool builds its manager from a tokio_postgres::Config.
151        cfg.dbname = pg_config.get_dbname().map(str::to_string);
152        cfg.user = pg_config.get_user().map(str::to_string);
153        cfg.password = pg_config
154            .get_password()
155            .map(|p| String::from_utf8_lossy(p).into_owned());
156        if let Some(host) = pg_config.get_hosts().iter().find_map(|h| match h {
157            tokio_postgres::config::Host::Tcp(t) => Some(t.clone()),
158            tokio_postgres::config::Host::Unix(p) => p.to_str().map(str::to_string),
159        }) {
160            cfg.host = Some(host);
161        }
162        if let Some(port) = pg_config.get_ports().first().copied() {
163            cfg.port = Some(port);
164        }
165        let pool = cfg
166            .create_pool(Some(Runtime::Tokio1), NoTls)
167            .context("building deadpool")?;
168
169        // --- apply schema (OLTP unconditionally; pgvector knowledge table) ---
170        {
171            let client = pool
172                .get()
173                .await
174                .context("acquiring connection for migration")?;
175            client
176                .batch_execute(schema::OLTP_SCHEMA)
177                .await
178                .context("applying OLTP schema")?;
179            client
180                .batch_execute(schema::ADMIN_SCHEMA)
181                .await
182                .context("applying admin schema")?;
183            client
184                .batch_execute(schema::VECTOR_EXTENSION)
185                .await
186                .context("creating pgvector extension")?;
187            client
188                .batch_execute(&schema::knowledge_vectors_schema(embedding_dim))
189                .await
190                .context("applying knowledge_vectors schema")?;
191            client
192                .batch_execute(&schema::memories_schema(embedding_dim))
193                .await
194                .context("applying memories schema")?;
195        }
196
197        // --- sync checkpoint store against the SAME database ---
198        // PostgresCheckpointStore::connect runs blocking r2d2 setup; keep it off
199        // the async worker threads.
200        let cs_conn = conn_str.to_string();
201        let checkpoints =
202            tokio::task::spawn_blocking(move || PostgresCheckpointStore::connect(&cs_conn))
203                .await
204                .context("checkpoint store setup task panicked")?
205                .context("constructing PostgresCheckpointStore")?;
206        let checkpoints = Arc::new(checkpoints);
207
208        // --- pgvector knowledge base (shares the async pool) ---
209        let handle = tokio::runtime::Handle::current();
210        let knowledge = Arc::new(PgKnowledgeBase::new(
211            pool.clone(),
212            embedder.clone(),
213            handle.clone(),
214            None,
215        ));
216
217        Ok(Self {
218            pool,
219            checkpoints: Some(checkpoints),
220            knowledge,
221            embedder,
222            embedding_dim,
223            handle,
224        })
225    }
226
227    /// Connect using `DATABASE_URL` or `SMOOTH_AGENT_DATABASE_URL` (the latter
228    /// wins if both are set).
229    ///
230    /// # Errors
231    /// Returns an error if neither env var is set, or if [`Self::connect`] fails.
232    pub async fn from_env() -> Result<Self> {
233        let conn_str = std::env::var("SMOOTH_AGENT_DATABASE_URL")
234            .or_else(|_| std::env::var("DATABASE_URL"))
235            .map_err(|_| anyhow!("neither SMOOTH_AGENT_DATABASE_URL nor DATABASE_URL is set"))?;
236        Self::connect(&conn_str).await
237    }
238
239    /// The embedding dimension this adapter's `knowledge_vectors` column uses.
240    #[must_use]
241    pub fn embedding_dim(&self) -> usize {
242        self.embedding_dim
243    }
244
245    /// A Postgres-backed [`ConnectorConfigStore`](smooth_operator::connector_config::ConnectorConfigStore)
246    /// over this adapter's pool (the `connector_configs` table). Cheap to build
247    /// (clones the pool handle); make as many as you like.
248    #[must_use]
249    pub fn connector_config_store(&self) -> PgConnectorConfigStore {
250        PgConnectorConfigStore::new(self.pool.clone(), self.handle.clone())
251    }
252
253    /// A Postgres-backed [`SettingsStore`](smooth_operator::settings::SettingsStore)
254    /// over this adapter's pool (the `agent_settings` table).
255    #[must_use]
256    pub fn settings_store(&self) -> PgSettingsStore {
257        PgSettingsStore::new(self.pool.clone(), self.handle.clone())
258    }
259
260    /// A Postgres-backed [`IndexingStore`](smooth_operator_ingestion::indexing::IndexingStore)
261    /// over this adapter's pool (the `indexing_runs` table).
262    #[must_use]
263    pub fn indexing_store(&self) -> PgIndexingStore {
264        PgIndexingStore::new(self.pool.clone(), self.handle.clone())
265    }
266
267    /// A Postgres-backed [`Memory`](smooth_operator_core::Memory) over this
268    /// adapter's pool (the `memories` table), bound to one `(organization_id,
269    /// user_id)` namespace — persistent, semantic, cross-thread agent memory
270    /// (parity gap Phase 3 / SMOODEV-1470). Pass `user_id = None` for org-wide
271    /// memory. Embeds with the adapter's configured [`Embedder`] so memory and
272    /// knowledge vectors share the same column width and hashing.
273    ///
274    /// Cheap to build (clones pool + embedder handles); make one per
275    /// `(org, user)` you serve.
276    #[must_use]
277    pub fn memory(&self, organization_id: impl Into<String>, user_id: Option<String>) -> PgMemory {
278        PgMemory::new(
279            self.pool.clone(),
280            self.embedder.clone(),
281            self.handle.clone(),
282            organization_id,
283            user_id,
284        )
285    }
286}
287
288// --- row → domain helpers ---------------------------------------------------
289
290fn platform_to_str(p: Platform) -> &'static str {
291    match p {
292        Platform::Web => "web",
293        Platform::Messenger => "messenger",
294        Platform::Instagram => "instagram",
295        Platform::Email => "email",
296        Platform::Discord => "discord",
297        Platform::Phone => "phone",
298        Platform::Sms => "sms",
299        Platform::Slack => "slack",
300        Platform::Whatsapp => "whatsapp",
301        Platform::Tiktok => "tiktok",
302    }
303}
304
305fn platform_from_str(s: &str) -> Result<Platform> {
306    Ok(match s {
307        "web" => Platform::Web,
308        "messenger" => Platform::Messenger,
309        "instagram" => Platform::Instagram,
310        "email" => Platform::Email,
311        "discord" => Platform::Discord,
312        "phone" => Platform::Phone,
313        "sms" => Platform::Sms,
314        "slack" => Platform::Slack,
315        "whatsapp" => Platform::Whatsapp,
316        "tiktok" => Platform::Tiktok,
317        other => return Err(anyhow!("unknown platform '{other}'")),
318    })
319}
320
321fn participant_type_to_str(t: ParticipantType) -> &'static str {
322    match t {
323        ParticipantType::User => "user",
324        ParticipantType::AiAgent => "ai-agent",
325        ParticipantType::HumanAgent => "human-agent",
326    }
327}
328
329fn participant_type_from_str(s: &str) -> Result<ParticipantType> {
330    Ok(match s {
331        "user" => ParticipantType::User,
332        "ai-agent" => ParticipantType::AiAgent,
333        "human-agent" => ParticipantType::HumanAgent,
334        other => return Err(anyhow!("unknown participant type '{other}'")),
335    })
336}
337
338fn direction_to_str(d: Direction) -> &'static str {
339    match d {
340        Direction::Inbound => "inbound",
341        Direction::Outbound => "outbound",
342    }
343}
344
345fn direction_from_str(s: &str) -> Result<Direction> {
346    Ok(match s {
347        "inbound" => Direction::Inbound,
348        "outbound" => Direction::Outbound,
349        other => return Err(anyhow!("unknown direction '{other}'")),
350    })
351}
352
353fn session_status_to_str(s: SessionStatus) -> &'static str {
354    match s {
355        SessionStatus::Active => "active",
356        SessionStatus::Idle => "idle",
357        SessionStatus::Ended => "ended",
358    }
359}
360
361fn session_status_from_str(s: &str) -> Result<SessionStatus> {
362    Ok(match s {
363        "active" => SessionStatus::Active,
364        "idle" => SessionStatus::Idle,
365        "ended" => SessionStatus::Ended,
366        other => return Err(anyhow!("unknown session status '{other}'")),
367    })
368}
369
370fn row_to_conversation(row: &tokio_postgres::Row) -> Result<Conversation> {
371    Ok(Conversation {
372        id: row.get("id"),
373        platform: platform_from_str(row.get::<_, String>("platform").as_str())?,
374        name: row.get("name"),
375        organization_id: row.get("organization_id"),
376        idempotency_key: row.get("idempotency_key"),
377        metadata_json: row.get("metadata_json"),
378        analytics_json: row.get("analytics_json"),
379        created_at: row.get("created_at"),
380        updated_at: row.get("updated_at"),
381    })
382}
383
384fn row_to_participant(row: &tokio_postgres::Row) -> Result<Participant> {
385    Ok(Participant {
386        id: row.get("id"),
387        conversation_id: row.get("conversation_id"),
388        organization_id: row.get("organization_id"),
389        participant_type: participant_type_from_str(row.get::<_, String>("type").as_str())?,
390        external_id: row.get("external_id"),
391        internal_id: row.get("internal_id"),
392        browser_fingerprint: row.get("browser_fingerprint"),
393        browser_info: row.get("browser_info"),
394        name: row.get("name"),
395        email: row.get("email"),
396        phone: row.get("phone"),
397        crm_contact_id: row.get("crm_contact_id"),
398        metadata_json: row.get("metadata_json"),
399        created_at: row.get("created_at"),
400        updated_at: row.get("updated_at"),
401    })
402}
403
404fn row_to_message(row: &tokio_postgres::Row) -> Result<Message> {
405    let content: serde_json::Value = row.get("content");
406    let content: MessageContent =
407        serde_json::from_value(content).context("decoding message content")?;
408    let from: Option<serde_json::Value> = row.get("from_ref");
409    let to: Option<serde_json::Value> = row.get("to_ref");
410    let from: Option<ParticipantRef> = from
411        .map(serde_json::from_value)
412        .transpose()
413        .context("decoding from_ref")?;
414    let to: Option<ParticipantRef> = to
415        .map(serde_json::from_value)
416        .transpose()
417        .context("decoding to_ref")?;
418    Ok(Message {
419        id: row.get("id"),
420        external_id: row.get("external_id"),
421        organization_id: row.get("organization_id"),
422        conversation_id: row.get("conversation_id"),
423        direction: direction_from_str(row.get::<_, String>("direction").as_str())?,
424        content,
425        from,
426        to,
427        metadata_json: row.get("metadata_json"),
428        analytics_json: row.get("analytics_json"),
429        created_at: row.get("created_at"),
430        updated_at: row.get("updated_at"),
431    })
432}
433
434fn row_to_session(row: &tokio_postgres::Row) -> Result<Session> {
435    let status: Option<String> = row.get("status");
436    let status = status.map(|s| session_status_from_str(&s)).transpose()?;
437    let token_count: Option<i64> = row.get("token_count");
438    let message_count: Option<i64> = row.get("message_count");
439    let metadata: Option<serde_json::Value> = row.get("metadata");
440    let metadata = metadata
441        .map(serde_json::from_value)
442        .transpose()
443        .context("decoding session metadata")?;
444    Ok(Session {
445        session_id: row.get("session_id"),
446        conversation_id: row.get("conversation_id"),
447        agent_id: row.get("agent_id"),
448        agent_name: row.get("agent_name"),
449        user_participant_id: row.get("user_participant_id"),
450        agent_participant_id: row.get("agent_participant_id"),
451        thread_id: row.get("thread_id"),
452        status,
453        token_count: token_count.map(|v| u64::try_from(v).unwrap_or(0)),
454        message_count: message_count.map(|v| u64::try_from(v).unwrap_or(0)),
455        metadata,
456        created_at: row.get("created_at"),
457        updated_at: row.get("updated_at"),
458        ended_at: row.get("ended_at"),
459        last_activity_at: row.get("last_activity_at"),
460    })
461}
462
463#[async_trait]
464impl StorageAdapter for PostgresAdapter {
465    // ---- conversations ---------------------------------------------------
466
467    async fn create_conversation(&self, conversation: Conversation) -> Result<Conversation> {
468        let client = self.pool.get().await?;
469        // Idempotency on (org, idempotencyKey): INSERT, on conflict do nothing,
470        // then read back whichever row owns the key (new or pre-existing).
471        client
472            .execute(
473                "INSERT INTO conversations
474                    (id, platform, name, organization_id, idempotency_key, metadata_json, analytics_json, created_at, updated_at)
475                 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
476                 ON CONFLICT (organization_id, idempotency_key) DO NOTHING",
477                &[
478                    &conversation.id,
479                    &platform_to_str(conversation.platform),
480                    &conversation.name,
481                    &conversation.organization_id,
482                    &conversation.idempotency_key,
483                    &conversation.metadata_json,
484                    &conversation.analytics_json,
485                    &conversation.created_at,
486                    &conversation.updated_at,
487                ],
488            )
489            .await?;
490        let row = client
491            .query_one(
492                "SELECT * FROM conversations WHERE organization_id = $1 AND idempotency_key = $2",
493                &[&conversation.organization_id, &conversation.idempotency_key],
494            )
495            .await?;
496        row_to_conversation(&row)
497    }
498
499    async fn get_conversation(&self, id: &str) -> Result<Option<Conversation>> {
500        let client = self.pool.get().await?;
501        let row = client
502            .query_opt("SELECT * FROM conversations WHERE id = $1", &[&id])
503            .await?;
504        row.as_ref().map(row_to_conversation).transpose()
505    }
506
507    async fn list_conversations_by_org(&self, organization_id: &str) -> Result<Vec<Conversation>> {
508        let client = self.pool.get().await?;
509        let rows = client
510            .query(
511                "SELECT * FROM conversations WHERE organization_id = $1 ORDER BY created_at DESC",
512                &[&organization_id],
513            )
514            .await?;
515        rows.iter().map(row_to_conversation).collect()
516    }
517
518    async fn update_conversation(
519        &self,
520        id: &str,
521        update: ConversationUpdate,
522    ) -> Result<Conversation> {
523        let client = self.pool.get().await?;
524        let now = Utc::now();
525        // COALESCE keeps existing values when the update field is NULL; the
526        // metadata/analytics fields are explicitly settable (incl. clearing is
527        // out of scope here, matching the in-memory "set only when Some").
528        let set_metadata = update.metadata_json.is_some();
529        let set_analytics = update.analytics_json.is_some();
530        let row = client
531            .query_one(
532                "UPDATE conversations SET
533                    name = COALESCE($2, name),
534                    metadata_json = CASE WHEN $3 THEN $4 ELSE metadata_json END,
535                    analytics_json = CASE WHEN $5 THEN $6 ELSE analytics_json END,
536                    updated_at = $7
537                 WHERE id = $1
538                 RETURNING *",
539                &[
540                    &id,
541                    &update.name,
542                    &set_metadata,
543                    &update.metadata_json,
544                    &set_analytics,
545                    &update.analytics_json,
546                    &now,
547                ],
548            )
549            .await
550            .with_context(|| format!("conversation '{id}' not found"))?;
551        row_to_conversation(&row)
552    }
553
554    // ---- participants ----------------------------------------------------
555
556    async fn add_participant(&self, participant: Participant) -> Result<Participant> {
557        let client = self.pool.get().await?;
558        client
559            .execute(
560                "INSERT INTO conversation_participants
561                    (id, conversation_id, organization_id, type, external_id, internal_id,
562                     browser_fingerprint, browser_info, name, email, phone, crm_contact_id,
563                     metadata_json, created_at, updated_at)
564                 VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13,$14,$15)",
565                &[
566                    &participant.id,
567                    &participant.conversation_id,
568                    &participant.organization_id,
569                    &participant_type_to_str(participant.participant_type),
570                    &participant.external_id,
571                    &participant.internal_id,
572                    &participant.browser_fingerprint,
573                    &participant.browser_info,
574                    &participant.name,
575                    &participant.email,
576                    &participant.phone,
577                    &participant.crm_contact_id,
578                    &participant.metadata_json,
579                    &participant.created_at,
580                    &participant.updated_at,
581                ],
582            )
583            .await?;
584        Ok(participant)
585    }
586
587    async fn get_participant(&self, id: &str) -> Result<Option<Participant>> {
588        let client = self.pool.get().await?;
589        let row = client
590            .query_opt(
591                "SELECT * FROM conversation_participants WHERE id = $1",
592                &[&id],
593            )
594            .await?;
595        row.as_ref().map(row_to_participant).transpose()
596    }
597
598    async fn list_participants_by_conversation(
599        &self,
600        conversation_id: &str,
601    ) -> Result<Vec<Participant>> {
602        let client = self.pool.get().await?;
603        let rows = client
604            .query(
605                "SELECT * FROM conversation_participants WHERE conversation_id = $1 ORDER BY created_at, id",
606                &[&conversation_id],
607            )
608            .await?;
609        rows.iter().map(row_to_participant).collect()
610    }
611
612    async fn resolve_participant_by_external_id(
613        &self,
614        conversation_id: &str,
615        external_id: &str,
616    ) -> Result<Option<Participant>> {
617        let client = self.pool.get().await?;
618        let row = client
619            .query_opt(
620                "SELECT * FROM conversation_participants
621                 WHERE conversation_id = $1 AND external_id = $2
622                 ORDER BY created_at LIMIT 1",
623                &[&conversation_id, &external_id],
624            )
625            .await?;
626        row.as_ref().map(row_to_participant).transpose()
627    }
628
629    // ---- messages --------------------------------------------------------
630
631    async fn append_message(&self, message: Message) -> Result<Message> {
632        let client = self.pool.get().await?;
633        let content = serde_json::to_value(&message.content)?;
634        let from = message
635            .from
636            .as_ref()
637            .map(serde_json::to_value)
638            .transpose()?;
639        let to = message.to.as_ref().map(serde_json::to_value).transpose()?;
640        client
641            .execute(
642                "INSERT INTO conversation_messages
643                    (id, external_id, organization_id, conversation_id, direction, content,
644                     from_ref, to_ref, metadata_json, analytics_json, created_at, updated_at)
645                 VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12)",
646                &[
647                    &message.id,
648                    &message.external_id,
649                    &message.organization_id,
650                    &message.conversation_id,
651                    &direction_to_str(message.direction),
652                    &content,
653                    &from,
654                    &to,
655                    &message.metadata_json,
656                    &message.analytics_json,
657                    &message.created_at,
658                    &message.updated_at,
659                ],
660            )
661            .await?;
662        Ok(message)
663    }
664
665    async fn get_message(&self, id: &str) -> Result<Option<Message>> {
666        let client = self.pool.get().await?;
667        let row = client
668            .query_opt("SELECT * FROM conversation_messages WHERE id = $1", &[&id])
669            .await?;
670        row.as_ref().map(row_to_message).transpose()
671    }
672
673    async fn list_messages_by_conversation(&self, query: MessageQuery) -> Result<MessagePage> {
674        let client = self.pool.get().await?;
675        let limit_i64 = i64::try_from(query.limit).unwrap_or(i64::MAX);
676
677        // Cursor is a message id; page starts strictly after that message's seq
678        // (or before, when descending). Resolve it to a seq first.
679        let cursor_seq: Option<i64> = match &query.cursor {
680            Some(cursor) => {
681                let row = client
682                    .query_opt(
683                        "SELECT seq FROM conversation_messages WHERE id = $1",
684                        &[&cursor],
685                    )
686                    .await?;
687                row.map(|r| r.get::<_, i64>("seq"))
688            }
689            None => None,
690        };
691
692        // Fetch limit + 1 to detect whether another page remains, mirroring the
693        // in-memory "next_cursor is Some iff more rows follow" contract.
694        let probe = limit_i64.saturating_add(1);
695        let rows = if query.descending {
696            // Newest first: seq descending; cursor means "seq < cursor_seq".
697            match cursor_seq {
698                Some(seq) => {
699                    client
700                        .query(
701                            "SELECT * FROM conversation_messages
702                             WHERE conversation_id = $1 AND seq < $2
703                             ORDER BY seq DESC LIMIT $3",
704                            &[&query.conversation_id, &seq, &probe],
705                        )
706                        .await?
707                }
708                None => {
709                    client
710                        .query(
711                            "SELECT * FROM conversation_messages
712                             WHERE conversation_id = $1
713                             ORDER BY seq DESC LIMIT $2",
714                            &[&query.conversation_id, &probe],
715                        )
716                        .await?
717                }
718            }
719        } else {
720            // Oldest first: seq ascending; cursor means "seq > cursor_seq".
721            match cursor_seq {
722                Some(seq) => {
723                    client
724                        .query(
725                            "SELECT * FROM conversation_messages
726                             WHERE conversation_id = $1 AND seq > $2
727                             ORDER BY seq ASC LIMIT $3",
728                            &[&query.conversation_id, &seq, &probe],
729                        )
730                        .await?
731                }
732                None => {
733                    client
734                        .query(
735                            "SELECT * FROM conversation_messages
736                             WHERE conversation_id = $1
737                             ORDER BY seq ASC LIMIT $2",
738                            &[&query.conversation_id, &probe],
739                        )
740                        .await?
741                }
742            }
743        };
744
745        let has_more = rows.len() as i64 > limit_i64;
746        let page_rows = if has_more {
747            &rows[..query.limit]
748        } else {
749            &rows[..]
750        };
751        let messages: Vec<Message> = page_rows
752            .iter()
753            .map(row_to_message)
754            .collect::<Result<_>>()?;
755        let next_cursor = if has_more {
756            messages.last().map(|m| m.id.clone())
757        } else {
758            None
759        };
760
761        Ok(MessagePage {
762            messages,
763            next_cursor,
764        })
765    }
766
767    // ---- sessions --------------------------------------------------------
768
769    async fn create_session(&self, session: Session) -> Result<Session> {
770        let client = self.pool.get().await?;
771        let status = session.status.map(session_status_to_str);
772        let token_count = session
773            .token_count
774            .map(|v| i64::try_from(v).unwrap_or(i64::MAX));
775        let message_count = session
776            .message_count
777            .map(|v| i64::try_from(v).unwrap_or(i64::MAX));
778        let metadata = session
779            .metadata
780            .as_ref()
781            .map(serde_json::to_value)
782            .transpose()?;
783        client
784            .execute(
785                "INSERT INTO conversation_sessions
786                    (session_id, conversation_id, agent_id, agent_name, user_participant_id,
787                     agent_participant_id, thread_id, status, token_count, message_count,
788                     metadata, created_at, updated_at, ended_at, last_activity_at)
789                 VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13,$14,$15)",
790                &[
791                    &session.session_id,
792                    &session.conversation_id,
793                    &session.agent_id,
794                    &session.agent_name,
795                    &session.user_participant_id,
796                    &session.agent_participant_id,
797                    &session.thread_id,
798                    &status,
799                    &token_count,
800                    &message_count,
801                    &metadata,
802                    &session.created_at,
803                    &session.updated_at,
804                    &session.ended_at,
805                    &session.last_activity_at,
806                ],
807            )
808            .await?;
809        Ok(session)
810    }
811
812    async fn get_session(&self, session_id: &str) -> Result<Option<Session>> {
813        let client = self.pool.get().await?;
814        let row = client
815            .query_opt(
816                "SELECT * FROM conversation_sessions WHERE session_id = $1",
817                &[&session_id],
818            )
819            .await?;
820        row.as_ref().map(row_to_session).transpose()
821    }
822
823    async fn update_session(&self, session_id: &str, update: SessionUpdate) -> Result<Session> {
824        let client = self.pool.get().await?;
825        let now = Utc::now();
826        let status = update.status.map(session_status_to_str);
827        let token_count = update
828            .token_count
829            .map(|v| i64::try_from(v).unwrap_or(i64::MAX));
830        let message_count = update
831            .message_count
832            .map(|v| i64::try_from(v).unwrap_or(i64::MAX));
833        // last_activity_at / ended_at are set only when Some (mirrors in-memory).
834        let set_last_activity = update.last_activity_at.is_some();
835        let set_ended = update.ended_at.is_some();
836        let row = client
837            .query_one(
838                "UPDATE conversation_sessions SET
839                    status = COALESCE($2, status),
840                    token_count = COALESCE($3, token_count),
841                    message_count = COALESCE($4, message_count),
842                    last_activity_at = CASE WHEN $5 THEN $6 ELSE last_activity_at END,
843                    ended_at = CASE WHEN $7 THEN $8 ELSE ended_at END,
844                    updated_at = $9
845                 WHERE session_id = $1
846                 RETURNING *",
847                &[
848                    &session_id,
849                    &status,
850                    &token_count,
851                    &message_count,
852                    &set_last_activity,
853                    &update.last_activity_at,
854                    &set_ended,
855                    &update.ended_at,
856                    &now,
857                ],
858            )
859            .await
860            .with_context(|| format!("session '{session_id}' not found"))?;
861        row_to_session(&row)
862    }
863
864    async fn list_sessions_by_conversation(&self, conversation_id: &str) -> Result<Vec<Session>> {
865        let client = self.pool.get().await?;
866        let rows = client
867            .query(
868                "SELECT * FROM conversation_sessions WHERE conversation_id = $1 ORDER BY created_at, session_id",
869                &[&conversation_id],
870            )
871            .await?;
872        rows.iter().map(row_to_session).collect()
873    }
874
875    // ---- engine accessors ------------------------------------------------
876
877    fn checkpoints(&self) -> Arc<dyn CheckpointStore> {
878        // Always `Some` between construction and drop.
879        self.checkpoints
880            .as_ref()
881            .expect("checkpoint store present")
882            .clone()
883    }
884
885    fn knowledge(&self) -> Arc<dyn KnowledgeBase> {
886        self.knowledge.clone()
887    }
888
889    fn knowledge_for_access(&self, access: &AccessContext) -> Arc<dyn KnowledgeBase> {
890        // Durable document-level ACL (feature gap G3): the returned handle
891        // filters every query by the requester's entitlements against the stored
892        // `acl` column **in SQL**, so a restricted document is never fetched —
893        // and the filter survives the ingest→serve process boundary (unlike the
894        // in-memory side table). See `knowledge::PgKnowledgeBase::query_async`.
895        Arc::new(self.knowledge.with_access(access.clone()))
896    }
897}