Skip to main content

smooth_operator_adapter_postgres/
lib.rs

1//! Postgres + pgvector [`StorageAdapter`] — the dogfood backend.
2//!
3//! This is the production Postgres implementation of the one storage seam (see
4//! `docs/STORAGE.md`). It mirrors the smooai monorepo's schema so dogfooding is
5//! a swap, not a rewrite:
6//!
7//! - **OLTP** (conversations / participants / messages / sessions): async CRUD
8//!   over a [`deadpool_postgres`] pool, semantics matching the in-memory baseline
9//!   (conversation idempotency, external-id participant resolve, cursor message
10//!   paging, session status/counts).
11//! - **Checkpoints**: smooth-operator's
12//!   [`PostgresCheckpointStore`](smooth_operator_core::PostgresCheckpointStore) (a
13//!   *synchronous* r2d2-pooled store) constructed against the **same database**
14//!   — so the engine's `with_checkpoint_store` plugs straight in and agent state
15//!   lives next to the conversations it belongs to.
16//! - **Knowledge**: a pgvector-backed [`PgKnowledgeBase`] (dense HNSW cosine ∪
17//!   sparse `tsvector` BM25 → Reciprocal Rank Fusion). Text→vector goes through
18//!   the [`Embedder`] seam — [`DeterministicEmbedder`] by default (reproducible,
19//!   no network), [`GatewayEmbedder`] when a live gateway is configured.
20//! - **Memory**: a pgvector-backed [`PgMemory`] (parity gap Phase 3 /
21//!   SMOODEV-1470) — persistent, semantic, cross-thread agent memory namespaced
22//!   by `(organization_id, user_id)` like the TS `['memories', orgId, userId]`
23//!   store. Implements the core [`Memory`](smooth_operator_core::Memory) trait;
24//!   recall is pgvector cosine top-K under an HNSW index, scoped to the
25//!   namespace. Shares the adapter's [`Embedder`].
26//!
27//! ## Sharing one database between the async pool and the sync checkpoint store
28//!
29//! The OLTP/knowledge slices need async (`tokio-postgres` + `deadpool`); the
30//! checkpoint slice is a sync trait backed by an r2d2 pool inside
31//! smooth-operator. Both are pointed at the **same `conn_str`**: we build the
32//! async deadpool from it for our own queries, and hand the *same* string to
33//! `PostgresCheckpointStore::connect`, which stands up its own small r2d2 pool
34//! and `checkpoints` table in that database. Two pools, two driver stacks, one
35//! Postgres — the tables coexist (ours from [`schema`], theirs from their own
36//! `CREATE TABLE IF NOT EXISTS`).
37
38mod admin;
39mod embedder;
40mod knowledge;
41mod memory;
42mod reranker;
43mod schema;
44
45use std::sync::Arc;
46
47use anyhow::{anyhow, Context, Result};
48use async_trait::async_trait;
49use chrono::Utc;
50use deadpool_postgres::{Config as PoolConfig, ManagerConfig, Pool, RecyclingMethod, Runtime};
51use tokio_postgres::NoTls;
52
53use smooth_operator::access_control::AccessContext;
54use smooth_operator::adapter::{
55    ConversationUpdate, MessagePage, MessageQuery, SessionUpdate, StorageAdapter,
56};
57use smooth_operator::domain::{
58    Conversation, Direction, Message, MessageContent, Participant, ParticipantRef, ParticipantType,
59    Platform, Session, SessionStatus,
60};
61use smooth_operator_core::checkpoint::PostgresCheckpointStore;
62use smooth_operator_core::{CheckpointStore, KnowledgeBase};
63
64// The shared embedding seam (trait + deterministic default) now lives in core;
65// re-export it here so existing `postgres::{Embedder, DeterministicEmbedder, …}`
66// consumers keep working. Only the adapter-specific `GatewayEmbedder` (+ its
67// 1536-d constant) is defined locally.
68pub use admin::{PgConnectorConfigStore, PgIndexingStore, PgSettingsStore};
69pub use embedder::{GatewayEmbedder, OPENAI_SMALL_EMBEDDING_DIM};
70pub use knowledge::PgKnowledgeBase;
71pub use memory::PgMemory;
72pub use reranker::{
73    GatewayReranker, HttpRerankBackend, RerankBackend, RerankScore, DEFAULT_RERANK_MODEL,
74};
75pub use smooth_operator::embedding::{
76    DeterministicEmbedder, Embedder, InputType, DEFAULT_EMBEDDING_DIM,
77};
78
79/// Postgres + pgvector storage adapter.
80pub struct PostgresAdapter {
81    pool: Pool,
82    /// `Option` so [`Drop`] can `take()` the checkpoint store and dispose of it on
83    /// a dedicated OS thread. The sync `postgres::Client`s inside its r2d2 pool
84    /// run `block_on` in their own `Drop`, which panics on a Tokio worker thread
85    /// ("Cannot start a runtime from within a runtime"). Disposing off-runtime
86    /// keeps the adapter safe to drop from async code.
87    checkpoints: Option<Arc<PostgresCheckpointStore>>,
88    knowledge: Arc<PgKnowledgeBase>,
89    /// Retained so the [`memory`](PostgresAdapter::memory) accessor can build
90    /// namespace-bound [`PgMemory`] handles that embed identically to knowledge.
91    embedder: Arc<dyn Embedder>,
92    embedding_dim: usize,
93    /// Captured runtime handle for the sync admin-store bridges (connector
94    /// configs / settings / indexing runs over the same async pool).
95    handle: tokio::runtime::Handle,
96}
97
98impl Drop for PostgresAdapter {
99    fn drop(&mut self) {
100        if let Some(checkpoints) = self.checkpoints.take() {
101            // Move the (possibly last) strong ref off any Tokio worker thread so
102            // the r2d2 pool's blocking `postgres::Client::drop` runs on a plain
103            // OS thread where `block_on` is legal. Join it so disposal is
104            // deterministic (and so a short-lived process actually closes its
105            // connections).
106            if let Ok(handle) = std::thread::Builder::new()
107                .name("pg-checkpoint-drop".into())
108                .spawn(move || drop(checkpoints))
109            {
110                let _ = handle.join();
111            }
112        }
113    }
114}
115
116impl PostgresAdapter {
117    /// Connect to Postgres, build the async pool + sync checkpoint store, and
118    /// apply the schema. Uses the [`DeterministicEmbedder`] (1024-d) by default.
119    ///
120    /// `conn_str` is a libpq URL or `key=value` connection string; it is read
121    /// from `DATABASE_URL` / `SMOOTH_AGENT_DATABASE_URL` by [`Self::from_env`].
122    ///
123    /// # Errors
124    /// Returns an error if the connection string is invalid, either pool fails to
125    /// build, or schema migration fails.
126    pub async fn connect(conn_str: &str) -> Result<Self> {
127        Self::connect_with_embedder(conn_str, Arc::new(DeterministicEmbedder::new())).await
128    }
129
130    /// As [`Self::connect`] but with a caller-supplied embedder. The adapter's
131    /// vector column width is taken from `embedder.dim()`, so a 1536-d
132    /// [`GatewayEmbedder`] and the `vector(1536)` column always agree.
133    ///
134    /// # Errors
135    /// See [`Self::connect`].
136    pub async fn connect_with_embedder(
137        conn_str: &str,
138        embedder: Arc<dyn Embedder>,
139    ) -> Result<Self> {
140        let embedding_dim = embedder.dim();
141
142        // --- async pool (OLTP + knowledge) ---
143        let pg_config: tokio_postgres::Config = conn_str
144            .parse()
145            .context("parsing connection string for async pool")?;
146        let mut cfg = PoolConfig::new();
147        cfg.manager = Some(ManagerConfig {
148            recycling_method: RecyclingMethod::Fast,
149        });
150        // deadpool builds its manager from a tokio_postgres::Config.
151        cfg.dbname = pg_config.get_dbname().map(str::to_string);
152        cfg.user = pg_config.get_user().map(str::to_string);
153        cfg.password = pg_config
154            .get_password()
155            .map(|p| String::from_utf8_lossy(p).into_owned());
156        if let Some(host) = pg_config.get_hosts().iter().find_map(|h| match h {
157            tokio_postgres::config::Host::Tcp(t) => Some(t.clone()),
158            tokio_postgres::config::Host::Unix(p) => p.to_str().map(str::to_string),
159        }) {
160            cfg.host = Some(host);
161        }
162        if let Some(port) = pg_config.get_ports().first().copied() {
163            cfg.port = Some(port);
164        }
165        let pool = cfg
166            .create_pool(Some(Runtime::Tokio1), NoTls)
167            .context("building deadpool")?;
168
169        // --- apply schema (OLTP unconditionally; pgvector knowledge table) ---
170        {
171            let client = pool
172                .get()
173                .await
174                .context("acquiring connection for migration")?;
175            client
176                .batch_execute(schema::OLTP_SCHEMA)
177                .await
178                .context("applying OLTP schema")?;
179            client
180                .batch_execute(schema::ADMIN_SCHEMA)
181                .await
182                .context("applying admin schema")?;
183            client
184                .batch_execute(schema::VECTOR_EXTENSION)
185                .await
186                .context("creating pgvector extension")?;
187            client
188                .batch_execute(&schema::knowledge_vectors_schema(embedding_dim))
189                .await
190                .context("applying knowledge_vectors schema")?;
191            client
192                .batch_execute(&schema::memories_schema(embedding_dim))
193                .await
194                .context("applying memories schema")?;
195        }
196
197        // --- sync checkpoint store against the SAME database ---
198        // PostgresCheckpointStore::connect runs blocking r2d2 setup; keep it off
199        // the async worker threads.
200        let cs_conn = conn_str.to_string();
201        let checkpoints =
202            tokio::task::spawn_blocking(move || PostgresCheckpointStore::connect(&cs_conn))
203                .await
204                .context("checkpoint store setup task panicked")?
205                .context("constructing PostgresCheckpointStore")?;
206        let checkpoints = Arc::new(checkpoints);
207
208        // --- pgvector knowledge base (shares the async pool) ---
209        let handle = tokio::runtime::Handle::current();
210        let knowledge = Arc::new(PgKnowledgeBase::new(
211            pool.clone(),
212            embedder.clone(),
213            handle.clone(),
214            None,
215        ));
216
217        Ok(Self {
218            pool,
219            checkpoints: Some(checkpoints),
220            knowledge,
221            embedder,
222            embedding_dim,
223            handle,
224        })
225    }
226
227    /// Connect using `DATABASE_URL` or `SMOOTH_AGENT_DATABASE_URL` (the latter
228    /// wins if both are set).
229    ///
230    /// # Errors
231    /// Returns an error if neither env var is set, or if [`Self::connect`] fails.
232    pub async fn from_env() -> Result<Self> {
233        let conn_str = std::env::var("SMOOTH_AGENT_DATABASE_URL")
234            .or_else(|_| std::env::var("DATABASE_URL"))
235            .map_err(|_| anyhow!("neither SMOOTH_AGENT_DATABASE_URL nor DATABASE_URL is set"))?;
236        Self::connect(&conn_str).await
237    }
238
239    /// The embedding dimension this adapter's `knowledge_vectors` column uses.
240    #[must_use]
241    pub fn embedding_dim(&self) -> usize {
242        self.embedding_dim
243    }
244
245    /// A Postgres-backed [`ConnectorConfigStore`](smooth_operator::connector_config::ConnectorConfigStore)
246    /// over this adapter's pool (the `connector_configs` table). Cheap to build
247    /// (clones the pool handle); make as many as you like.
248    #[must_use]
249    pub fn connector_config_store(&self) -> PgConnectorConfigStore {
250        PgConnectorConfigStore::new(self.pool.clone(), self.handle.clone())
251    }
252
253    /// A Postgres-backed [`SettingsStore`](smooth_operator::settings::SettingsStore)
254    /// over this adapter's pool (the `agent_settings` table).
255    #[must_use]
256    pub fn settings_store(&self) -> PgSettingsStore {
257        PgSettingsStore::new(self.pool.clone(), self.handle.clone())
258    }
259
260    /// A Postgres-backed [`IndexingStore`](smooth_operator_ingestion::indexing::IndexingStore)
261    /// over this adapter's pool (the `indexing_runs` table).
262    #[must_use]
263    pub fn indexing_store(&self) -> PgIndexingStore {
264        PgIndexingStore::new(self.pool.clone(), self.handle.clone())
265    }
266
267    /// A Postgres-backed [`Memory`](smooth_operator_core::Memory) over this
268    /// adapter's pool (the `memories` table), bound to one `(organization_id,
269    /// user_id)` namespace — persistent, semantic, cross-thread agent memory
270    /// (parity gap Phase 3 / SMOODEV-1470). Pass `user_id = None` for org-wide
271    /// memory. Embeds with the adapter's configured [`Embedder`] so memory and
272    /// knowledge vectors share the same column width and hashing.
273    ///
274    /// Cheap to build (clones pool + embedder handles); make one per
275    /// `(org, user)` you serve.
276    #[must_use]
277    pub fn memory(&self, organization_id: impl Into<String>, user_id: Option<String>) -> PgMemory {
278        PgMemory::new(
279            self.pool.clone(),
280            self.embedder.clone(),
281            self.handle.clone(),
282            organization_id,
283            user_id,
284        )
285    }
286}
287
288// --- row → domain helpers ---------------------------------------------------
289
290fn platform_to_str(p: Platform) -> &'static str {
291    match p {
292        Platform::Web => "web",
293        Platform::Messenger => "messenger",
294        Platform::Instagram => "instagram",
295        Platform::Email => "email",
296        Platform::Discord => "discord",
297        Platform::Phone => "phone",
298        Platform::Sms => "sms",
299        Platform::Slack => "slack",
300        Platform::Whatsapp => "whatsapp",
301        Platform::Tiktok => "tiktok",
302    }
303}
304
305fn platform_from_str(s: &str) -> Result<Platform> {
306    Ok(match s {
307        "web" => Platform::Web,
308        "messenger" => Platform::Messenger,
309        "instagram" => Platform::Instagram,
310        "email" => Platform::Email,
311        "discord" => Platform::Discord,
312        "phone" => Platform::Phone,
313        "sms" => Platform::Sms,
314        "slack" => Platform::Slack,
315        "whatsapp" => Platform::Whatsapp,
316        "tiktok" => Platform::Tiktok,
317        other => return Err(anyhow!("unknown platform '{other}'")),
318    })
319}
320
321fn participant_type_to_str(t: ParticipantType) -> &'static str {
322    match t {
323        ParticipantType::User => "user",
324        ParticipantType::AiAgent => "ai-agent",
325        ParticipantType::HumanAgent => "human-agent",
326    }
327}
328
329fn participant_type_from_str(s: &str) -> Result<ParticipantType> {
330    Ok(match s {
331        "user" => ParticipantType::User,
332        "ai-agent" => ParticipantType::AiAgent,
333        "human-agent" => ParticipantType::HumanAgent,
334        other => return Err(anyhow!("unknown participant type '{other}'")),
335    })
336}
337
338fn direction_to_str(d: Direction) -> &'static str {
339    match d {
340        Direction::Inbound => "inbound",
341        Direction::Outbound => "outbound",
342    }
343}
344
345fn direction_from_str(s: &str) -> Result<Direction> {
346    Ok(match s {
347        "inbound" => Direction::Inbound,
348        "outbound" => Direction::Outbound,
349        other => return Err(anyhow!("unknown direction '{other}'")),
350    })
351}
352
353fn session_status_to_str(s: SessionStatus) -> &'static str {
354    match s {
355        SessionStatus::Active => "active",
356        SessionStatus::Idle => "idle",
357        SessionStatus::Ended => "ended",
358    }
359}
360
361fn session_status_from_str(s: &str) -> Result<SessionStatus> {
362    Ok(match s {
363        "active" => SessionStatus::Active,
364        "idle" => SessionStatus::Idle,
365        "ended" => SessionStatus::Ended,
366        other => return Err(anyhow!("unknown session status '{other}'")),
367    })
368}
369
370fn row_to_conversation(row: &tokio_postgres::Row) -> Result<Conversation> {
371    Ok(Conversation {
372        id: row.get("id"),
373        platform: platform_from_str(row.get::<_, String>("platform").as_str())?,
374        name: row.get("name"),
375        organization_id: row.get("organization_id"),
376        idempotency_key: row.get("idempotency_key"),
377        metadata_json: row.get("metadata_json"),
378        analytics_json: row.get("analytics_json"),
379        created_at: row.get("created_at"),
380        updated_at: row.get("updated_at"),
381    })
382}
383
384fn row_to_participant(row: &tokio_postgres::Row) -> Result<Participant> {
385    Ok(Participant {
386        id: row.get("id"),
387        conversation_id: row.get("conversation_id"),
388        organization_id: row.get("organization_id"),
389        participant_type: participant_type_from_str(row.get::<_, String>("type").as_str())?,
390        external_id: row.get("external_id"),
391        internal_id: row.get("internal_id"),
392        browser_fingerprint: row.get("browser_fingerprint"),
393        browser_info: row.get("browser_info"),
394        name: row.get("name"),
395        email: row.get("email"),
396        phone: row.get("phone"),
397        crm_contact_id: row.get("crm_contact_id"),
398        metadata_json: row.get("metadata_json"),
399        created_at: row.get("created_at"),
400        updated_at: row.get("updated_at"),
401    })
402}
403
404fn row_to_message(row: &tokio_postgres::Row) -> Result<Message> {
405    let content: serde_json::Value = row.get("content");
406    let content: MessageContent =
407        serde_json::from_value(content).context("decoding message content")?;
408    let from: Option<serde_json::Value> = row.get("from_ref");
409    let to: Option<serde_json::Value> = row.get("to_ref");
410    let from: Option<ParticipantRef> = from
411        .map(serde_json::from_value)
412        .transpose()
413        .context("decoding from_ref")?;
414    let to: Option<ParticipantRef> = to
415        .map(serde_json::from_value)
416        .transpose()
417        .context("decoding to_ref")?;
418    Ok(Message {
419        id: row.get("id"),
420        external_id: row.get("external_id"),
421        organization_id: row.get("organization_id"),
422        conversation_id: row.get("conversation_id"),
423        direction: direction_from_str(row.get::<_, String>("direction").as_str())?,
424        content,
425        from,
426        to,
427        metadata_json: row.get("metadata_json"),
428        analytics_json: row.get("analytics_json"),
429        created_at: row.get("created_at"),
430        updated_at: row.get("updated_at"),
431    })
432}
433
434fn row_to_session(row: &tokio_postgres::Row) -> Result<Session> {
435    let status: Option<String> = row.get("status");
436    let status = status.map(|s| session_status_from_str(&s)).transpose()?;
437    let token_count: Option<i64> = row.get("token_count");
438    let message_count: Option<i64> = row.get("message_count");
439    let metadata: Option<serde_json::Value> = row.get("metadata");
440    let metadata = metadata
441        .map(serde_json::from_value)
442        .transpose()
443        .context("decoding session metadata")?;
444    Ok(Session {
445        session_id: row.get("session_id"),
446        conversation_id: row.get("conversation_id"),
447        organization_id: row.get("organization_id"),
448        agent_id: row.get("agent_id"),
449        agent_name: row.get("agent_name"),
450        user_participant_id: row.get("user_participant_id"),
451        agent_participant_id: row.get("agent_participant_id"),
452        thread_id: row.get("thread_id"),
453        status,
454        token_count: token_count.map(|v| u64::try_from(v).unwrap_or(0)),
455        message_count: message_count.map(|v| u64::try_from(v).unwrap_or(0)),
456        metadata,
457        created_at: row.get("created_at"),
458        updated_at: row.get("updated_at"),
459        ended_at: row.get("ended_at"),
460        last_activity_at: row.get("last_activity_at"),
461    })
462}
463
464#[async_trait]
465impl StorageAdapter for PostgresAdapter {
466    // ---- conversations ---------------------------------------------------
467
468    async fn create_conversation(&self, conversation: Conversation) -> Result<Conversation> {
469        let client = self.pool.get().await?;
470        // Idempotency on (org, idempotencyKey): INSERT, on conflict do nothing,
471        // then read back whichever row owns the key (new or pre-existing).
472        client
473            .execute(
474                "INSERT INTO conversations
475                    (id, platform, name, organization_id, idempotency_key, metadata_json, analytics_json, created_at, updated_at)
476                 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
477                 ON CONFLICT (organization_id, idempotency_key) DO NOTHING",
478                &[
479                    &conversation.id,
480                    &platform_to_str(conversation.platform),
481                    &conversation.name,
482                    &conversation.organization_id,
483                    &conversation.idempotency_key,
484                    &conversation.metadata_json,
485                    &conversation.analytics_json,
486                    &conversation.created_at,
487                    &conversation.updated_at,
488                ],
489            )
490            .await?;
491        let row = client
492            .query_one(
493                "SELECT * FROM conversations WHERE organization_id = $1 AND idempotency_key = $2",
494                &[&conversation.organization_id, &conversation.idempotency_key],
495            )
496            .await?;
497        row_to_conversation(&row)
498    }
499
500    async fn get_conversation(&self, id: &str) -> Result<Option<Conversation>> {
501        let client = self.pool.get().await?;
502        let row = client
503            .query_opt("SELECT * FROM conversations WHERE id = $1", &[&id])
504            .await?;
505        row.as_ref().map(row_to_conversation).transpose()
506    }
507
508    async fn list_conversations_by_org(&self, organization_id: &str) -> Result<Vec<Conversation>> {
509        let client = self.pool.get().await?;
510        let rows = client
511            .query(
512                "SELECT * FROM conversations WHERE organization_id = $1 ORDER BY created_at DESC",
513                &[&organization_id],
514            )
515            .await?;
516        rows.iter().map(row_to_conversation).collect()
517    }
518
519    async fn update_conversation(
520        &self,
521        id: &str,
522        update: ConversationUpdate,
523    ) -> Result<Conversation> {
524        let client = self.pool.get().await?;
525        let now = Utc::now();
526        // COALESCE keeps existing values when the update field is NULL; the
527        // metadata/analytics fields are explicitly settable (incl. clearing is
528        // out of scope here, matching the in-memory "set only when Some").
529        let set_metadata = update.metadata_json.is_some();
530        let set_analytics = update.analytics_json.is_some();
531        let row = client
532            .query_one(
533                "UPDATE conversations SET
534                    name = COALESCE($2, name),
535                    metadata_json = CASE WHEN $3 THEN $4 ELSE metadata_json END,
536                    analytics_json = CASE WHEN $5 THEN $6 ELSE analytics_json END,
537                    updated_at = $7
538                 WHERE id = $1
539                 RETURNING *",
540                &[
541                    &id,
542                    &update.name,
543                    &set_metadata,
544                    &update.metadata_json,
545                    &set_analytics,
546                    &update.analytics_json,
547                    &now,
548                ],
549            )
550            .await
551            .with_context(|| format!("conversation '{id}' not found"))?;
552        row_to_conversation(&row)
553    }
554
555    // ---- participants ----------------------------------------------------
556
557    async fn add_participant(&self, participant: Participant) -> Result<Participant> {
558        let client = self.pool.get().await?;
559        client
560            .execute(
561                "INSERT INTO conversation_participants
562                    (id, conversation_id, organization_id, type, external_id, internal_id,
563                     browser_fingerprint, browser_info, name, email, phone, crm_contact_id,
564                     metadata_json, created_at, updated_at)
565                 VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13,$14,$15)",
566                &[
567                    &participant.id,
568                    &participant.conversation_id,
569                    &participant.organization_id,
570                    &participant_type_to_str(participant.participant_type),
571                    &participant.external_id,
572                    &participant.internal_id,
573                    &participant.browser_fingerprint,
574                    &participant.browser_info,
575                    &participant.name,
576                    &participant.email,
577                    &participant.phone,
578                    &participant.crm_contact_id,
579                    &participant.metadata_json,
580                    &participant.created_at,
581                    &participant.updated_at,
582                ],
583            )
584            .await?;
585        Ok(participant)
586    }
587
588    async fn get_participant(&self, id: &str) -> Result<Option<Participant>> {
589        let client = self.pool.get().await?;
590        let row = client
591            .query_opt(
592                "SELECT * FROM conversation_participants WHERE id = $1",
593                &[&id],
594            )
595            .await?;
596        row.as_ref().map(row_to_participant).transpose()
597    }
598
599    async fn list_participants_by_conversation(
600        &self,
601        conversation_id: &str,
602    ) -> Result<Vec<Participant>> {
603        let client = self.pool.get().await?;
604        let rows = client
605            .query(
606                "SELECT * FROM conversation_participants WHERE conversation_id = $1 ORDER BY created_at, id",
607                &[&conversation_id],
608            )
609            .await?;
610        rows.iter().map(row_to_participant).collect()
611    }
612
613    async fn resolve_participant_by_external_id(
614        &self,
615        conversation_id: &str,
616        external_id: &str,
617    ) -> Result<Option<Participant>> {
618        let client = self.pool.get().await?;
619        let row = client
620            .query_opt(
621                "SELECT * FROM conversation_participants
622                 WHERE conversation_id = $1 AND external_id = $2
623                 ORDER BY created_at LIMIT 1",
624                &[&conversation_id, &external_id],
625            )
626            .await?;
627        row.as_ref().map(row_to_participant).transpose()
628    }
629
630    // ---- messages --------------------------------------------------------
631
632    async fn append_message(&self, message: Message) -> Result<Message> {
633        let client = self.pool.get().await?;
634        let content = serde_json::to_value(&message.content)?;
635        let from = message
636            .from
637            .as_ref()
638            .map(serde_json::to_value)
639            .transpose()?;
640        let to = message.to.as_ref().map(serde_json::to_value).transpose()?;
641        client
642            .execute(
643                "INSERT INTO conversation_messages
644                    (id, external_id, organization_id, conversation_id, direction, content,
645                     from_ref, to_ref, metadata_json, analytics_json, created_at, updated_at)
646                 VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12)",
647                &[
648                    &message.id,
649                    &message.external_id,
650                    &message.organization_id,
651                    &message.conversation_id,
652                    &direction_to_str(message.direction),
653                    &content,
654                    &from,
655                    &to,
656                    &message.metadata_json,
657                    &message.analytics_json,
658                    &message.created_at,
659                    &message.updated_at,
660                ],
661            )
662            .await?;
663        Ok(message)
664    }
665
666    async fn get_message(&self, id: &str) -> Result<Option<Message>> {
667        let client = self.pool.get().await?;
668        let row = client
669            .query_opt("SELECT * FROM conversation_messages WHERE id = $1", &[&id])
670            .await?;
671        row.as_ref().map(row_to_message).transpose()
672    }
673
674    async fn list_messages_by_conversation(&self, query: MessageQuery) -> Result<MessagePage> {
675        let client = self.pool.get().await?;
676        let limit_i64 = i64::try_from(query.limit).unwrap_or(i64::MAX);
677
678        // Cursor is a message id; page starts strictly after that message's seq
679        // (or before, when descending). Resolve it to a seq first.
680        let cursor_seq: Option<i64> = match &query.cursor {
681            Some(cursor) => {
682                let row = client
683                    .query_opt(
684                        "SELECT seq FROM conversation_messages WHERE id = $1",
685                        &[&cursor],
686                    )
687                    .await?;
688                row.map(|r| r.get::<_, i64>("seq"))
689            }
690            None => None,
691        };
692
693        // Fetch limit + 1 to detect whether another page remains, mirroring the
694        // in-memory "next_cursor is Some iff more rows follow" contract.
695        let probe = limit_i64.saturating_add(1);
696        let rows = if query.descending {
697            // Newest first: seq descending; cursor means "seq < cursor_seq".
698            match cursor_seq {
699                Some(seq) => {
700                    client
701                        .query(
702                            "SELECT * FROM conversation_messages
703                             WHERE conversation_id = $1 AND seq < $2
704                             ORDER BY seq DESC LIMIT $3",
705                            &[&query.conversation_id, &seq, &probe],
706                        )
707                        .await?
708                }
709                None => {
710                    client
711                        .query(
712                            "SELECT * FROM conversation_messages
713                             WHERE conversation_id = $1
714                             ORDER BY seq DESC LIMIT $2",
715                            &[&query.conversation_id, &probe],
716                        )
717                        .await?
718                }
719            }
720        } else {
721            // Oldest first: seq ascending; cursor means "seq > cursor_seq".
722            match cursor_seq {
723                Some(seq) => {
724                    client
725                        .query(
726                            "SELECT * FROM conversation_messages
727                             WHERE conversation_id = $1 AND seq > $2
728                             ORDER BY seq ASC LIMIT $3",
729                            &[&query.conversation_id, &seq, &probe],
730                        )
731                        .await?
732                }
733                None => {
734                    client
735                        .query(
736                            "SELECT * FROM conversation_messages
737                             WHERE conversation_id = $1
738                             ORDER BY seq ASC LIMIT $2",
739                            &[&query.conversation_id, &probe],
740                        )
741                        .await?
742                }
743            }
744        };
745
746        let has_more = rows.len() as i64 > limit_i64;
747        let page_rows = if has_more {
748            &rows[..query.limit]
749        } else {
750            &rows[..]
751        };
752        let messages: Vec<Message> = page_rows
753            .iter()
754            .map(row_to_message)
755            .collect::<Result<_>>()?;
756        let next_cursor = if has_more {
757            messages.last().map(|m| m.id.clone())
758        } else {
759            None
760        };
761
762        Ok(MessagePage {
763            messages,
764            next_cursor,
765        })
766    }
767
768    // ---- sessions --------------------------------------------------------
769
770    async fn create_session(&self, session: Session) -> Result<Session> {
771        let client = self.pool.get().await?;
772        let status = session.status.map(session_status_to_str);
773        let token_count = session
774            .token_count
775            .map(|v| i64::try_from(v).unwrap_or(i64::MAX));
776        let message_count = session
777            .message_count
778            .map(|v| i64::try_from(v).unwrap_or(i64::MAX));
779        let metadata = session
780            .metadata
781            .as_ref()
782            .map(serde_json::to_value)
783            .transpose()?;
784        client
785            .execute(
786                "INSERT INTO conversation_sessions
787                    (session_id, conversation_id, organization_id, agent_id, agent_name,
788                     user_participant_id, agent_participant_id, thread_id, status, token_count,
789                     message_count, metadata, created_at, updated_at, ended_at, last_activity_at)
790                 VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13,$14,$15,$16)",
791                &[
792                    &session.session_id,
793                    &session.conversation_id,
794                    &session.organization_id,
795                    &session.agent_id,
796                    &session.agent_name,
797                    &session.user_participant_id,
798                    &session.agent_participant_id,
799                    &session.thread_id,
800                    &status,
801                    &token_count,
802                    &message_count,
803                    &metadata,
804                    &session.created_at,
805                    &session.updated_at,
806                    &session.ended_at,
807                    &session.last_activity_at,
808                ],
809            )
810            .await?;
811        Ok(session)
812    }
813
814    async fn get_session(&self, session_id: &str) -> Result<Option<Session>> {
815        let client = self.pool.get().await?;
816        let row = client
817            .query_opt(
818                "SELECT * FROM conversation_sessions WHERE session_id = $1",
819                &[&session_id],
820            )
821            .await?;
822        row.as_ref().map(row_to_session).transpose()
823    }
824
825    async fn update_session(&self, session_id: &str, update: SessionUpdate) -> Result<Session> {
826        let client = self.pool.get().await?;
827        let now = Utc::now();
828        let status = update.status.map(session_status_to_str);
829        let token_count = update
830            .token_count
831            .map(|v| i64::try_from(v).unwrap_or(i64::MAX));
832        let message_count = update
833            .message_count
834            .map(|v| i64::try_from(v).unwrap_or(i64::MAX));
835        // last_activity_at / ended_at are set only when Some (mirrors in-memory).
836        let set_last_activity = update.last_activity_at.is_some();
837        let set_ended = update.ended_at.is_some();
838        let row = client
839            .query_one(
840                "UPDATE conversation_sessions SET
841                    status = COALESCE($2, status),
842                    token_count = COALESCE($3, token_count),
843                    message_count = COALESCE($4, message_count),
844                    last_activity_at = CASE WHEN $5 THEN $6 ELSE last_activity_at END,
845                    ended_at = CASE WHEN $7 THEN $8 ELSE ended_at END,
846                    updated_at = $9
847                 WHERE session_id = $1
848                 RETURNING *",
849                &[
850                    &session_id,
851                    &status,
852                    &token_count,
853                    &message_count,
854                    &set_last_activity,
855                    &update.last_activity_at,
856                    &set_ended,
857                    &update.ended_at,
858                    &now,
859                ],
860            )
861            .await
862            .with_context(|| format!("session '{session_id}' not found"))?;
863        row_to_session(&row)
864    }
865
866    async fn list_sessions_by_conversation(&self, conversation_id: &str) -> Result<Vec<Session>> {
867        let client = self.pool.get().await?;
868        let rows = client
869            .query(
870                "SELECT * FROM conversation_sessions WHERE conversation_id = $1 ORDER BY created_at, session_id",
871                &[&conversation_id],
872            )
873            .await?;
874        rows.iter().map(row_to_session).collect()
875    }
876
877    // ---- engine accessors ------------------------------------------------
878
879    fn checkpoints(&self) -> Arc<dyn CheckpointStore> {
880        // Always `Some` between construction and drop.
881        self.checkpoints
882            .as_ref()
883            .expect("checkpoint store present")
884            .clone()
885    }
886
887    fn knowledge(&self) -> Arc<dyn KnowledgeBase> {
888        self.knowledge.clone()
889    }
890
891    fn knowledge_for_access(&self, access: &AccessContext) -> Arc<dyn KnowledgeBase> {
892        // Durable document-level ACL (feature gap G3): the returned handle
893        // filters every query by the requester's entitlements against the stored
894        // `acl` column **in SQL**, so a restricted document is never fetched —
895        // and the filter survives the ingest→serve process boundary (unlike the
896        // in-memory side table). See `knowledge::PgKnowledgeBase::query_async`.
897        Arc::new(self.knowledge.with_access(access.clone()))
898    }
899}