Skip to main content

smooth_operator_adapter_postgres/
lib.rs

1//! Postgres + pgvector [`StorageAdapter`] — the dogfood backend.
2//!
3//! This is the production Postgres implementation of the one storage seam (see
4//! `docs/STORAGE.md`). It mirrors the smooai monorepo's schema so dogfooding is
5//! a swap, not a rewrite:
6//!
7//! - **OLTP** (conversations / participants / messages / sessions): async CRUD
8//!   over a [`deadpool_postgres`] pool, semantics matching the in-memory baseline
9//!   (conversation idempotency, external-id participant resolve, cursor message
10//!   paging, session status/counts).
11//! - **Checkpoints**: smooth-operator's
12//!   [`PostgresCheckpointStore`](smooth_operator_core::PostgresCheckpointStore) (a
13//!   *synchronous* r2d2-pooled store) constructed against the **same database**
14//!   — so the engine's `with_checkpoint_store` plugs straight in and agent state
15//!   lives next to the conversations it belongs to.
16//! - **Knowledge**: a pgvector-backed [`PgKnowledgeBase`] (dense HNSW cosine ∪
17//!   sparse `tsvector` BM25 → Reciprocal Rank Fusion). Text→vector goes through
18//!   the [`Embedder`] seam — [`DeterministicEmbedder`] by default (reproducible,
19//!   no network), [`GatewayEmbedder`] when a live gateway is configured.
20//! - **Memory**: a pgvector-backed [`PgMemory`] (parity gap Phase 3 /
21//!   SMOODEV-1470) — persistent, semantic, cross-thread agent memory namespaced
22//!   by `(organization_id, user_id)` like the TS `['memories', orgId, userId]`
23//!   store. Implements the core [`Memory`](smooth_operator_core::Memory) trait;
24//!   recall is pgvector cosine top-K under an HNSW index, scoped to the
25//!   namespace. Shares the adapter's [`Embedder`].
26//!
27//! ## Sharing one database between the async pool and the sync checkpoint store
28//!
29//! The OLTP/knowledge slices need async (`tokio-postgres` + `deadpool`); the
30//! checkpoint slice is a sync trait backed by an r2d2 pool inside
31//! smooth-operator. Both are pointed at the **same `conn_str`**: we build the
32//! async deadpool from it for our own queries, and hand the *same* string to
33//! `PostgresCheckpointStore::connect`, which stands up its own small r2d2 pool
34//! and `checkpoints` table in that database. Two pools, two driver stacks, one
35//! Postgres — the tables coexist (ours from [`schema`], theirs from their own
36//! `CREATE TABLE IF NOT EXISTS`).
37
38mod admin;
39mod agent_config;
40mod embedder;
41mod knowledge;
42mod memory;
43mod reranker;
44mod schema;
45
46use std::sync::Arc;
47
48use anyhow::{anyhow, Context, Result};
49use async_trait::async_trait;
50use chrono::Utc;
51use deadpool_postgres::{Config as PoolConfig, ManagerConfig, Pool, RecyclingMethod, Runtime};
52use tokio_postgres::NoTls;
53
54use smooth_operator::access_control::AccessContext;
55use smooth_operator::adapter::{
56    ConversationUpdate, MessagePage, MessageQuery, SessionUpdate, StorageAdapter,
57};
58use smooth_operator::domain::{
59    Conversation, Direction, Message, MessageContent, Participant, ParticipantRef, ParticipantType,
60    Platform, Session, SessionStatus,
61};
62use smooth_operator_core::checkpoint::PostgresCheckpointStore;
63use smooth_operator_core::{CheckpointStore, KnowledgeBase};
64
65// The shared embedding seam (trait + deterministic default) now lives in core;
66// re-export it here so existing `postgres::{Embedder, DeterministicEmbedder, …}`
67// consumers keep working. Only the adapter-specific `GatewayEmbedder` (+ its
68// 1536-d constant) is defined locally.
69pub use admin::{PgConnectorConfigStore, PgIndexingStore, PgSettingsStore};
70pub use agent_config::PgAgentConfigResolver;
71pub use embedder::{GatewayEmbedder, OPENAI_SMALL_EMBEDDING_DIM};
72pub use knowledge::PgKnowledgeBase;
73pub use memory::PgMemory;
74pub use reranker::{
75    GatewayReranker, HttpRerankBackend, RerankBackend, RerankScore, DEFAULT_RERANK_MODEL,
76};
77pub use smooth_operator::embedding::{
78    DeterministicEmbedder, Embedder, InputType, DEFAULT_EMBEDDING_DIM,
79};
80
81/// Postgres + pgvector storage adapter.
82pub struct PostgresAdapter {
83    pool: Pool,
84    /// `Option` so [`Drop`] can `take()` the checkpoint store and dispose of it on
85    /// a dedicated OS thread. The sync `postgres::Client`s inside its r2d2 pool
86    /// run `block_on` in their own `Drop`, which panics on a Tokio worker thread
87    /// ("Cannot start a runtime from within a runtime"). Disposing off-runtime
88    /// keeps the adapter safe to drop from async code.
89    checkpoints: Option<Arc<PostgresCheckpointStore>>,
90    knowledge: Arc<PgKnowledgeBase>,
91    /// Retained so the [`memory`](PostgresAdapter::memory) accessor can build
92    /// namespace-bound [`PgMemory`] handles that embed identically to knowledge.
93    embedder: Arc<dyn Embedder>,
94    embedding_dim: usize,
95    /// Captured runtime handle for the sync admin-store bridges (connector
96    /// configs / settings / indexing runs over the same async pool).
97    handle: tokio::runtime::Handle,
98}
99
100impl Drop for PostgresAdapter {
101    fn drop(&mut self) {
102        if let Some(checkpoints) = self.checkpoints.take() {
103            // Move the (possibly last) strong ref off any Tokio worker thread so
104            // the r2d2 pool's blocking `postgres::Client::drop` runs on a plain
105            // OS thread where `block_on` is legal. Join it so disposal is
106            // deterministic (and so a short-lived process actually closes its
107            // connections).
108            if let Ok(handle) = std::thread::Builder::new()
109                .name("pg-checkpoint-drop".into())
110                .spawn(move || drop(checkpoints))
111            {
112                let _ = handle.join();
113            }
114        }
115    }
116}
117
118impl PostgresAdapter {
119    /// Connect to Postgres, build the async pool + sync checkpoint store, and
120    /// apply the schema. Uses the [`DeterministicEmbedder`] (1024-d) by default.
121    ///
122    /// `conn_str` is a libpq URL or `key=value` connection string; it is read
123    /// from `DATABASE_URL` / `SMOOTH_AGENT_DATABASE_URL` by [`Self::from_env`].
124    ///
125    /// # Errors
126    /// Returns an error if the connection string is invalid, either pool fails to
127    /// build, or schema migration fails.
128    pub async fn connect(conn_str: &str) -> Result<Self> {
129        Self::connect_with_embedder(conn_str, Arc::new(DeterministicEmbedder::new())).await
130    }
131
132    /// As [`Self::connect`] but with a caller-supplied embedder. The adapter's
133    /// vector column width is taken from `embedder.dim()`, so a 1536-d
134    /// [`GatewayEmbedder`] and the `vector(1536)` column always agree.
135    ///
136    /// # Errors
137    /// See [`Self::connect`].
138    pub async fn connect_with_embedder(
139        conn_str: &str,
140        embedder: Arc<dyn Embedder>,
141    ) -> Result<Self> {
142        let embedding_dim = embedder.dim();
143
144        // --- async pool (OLTP + knowledge) ---
145        let pg_config: tokio_postgres::Config = conn_str
146            .parse()
147            .context("parsing connection string for async pool")?;
148        let mut cfg = PoolConfig::new();
149        cfg.manager = Some(ManagerConfig {
150            recycling_method: RecyclingMethod::Fast,
151        });
152        // deadpool builds its manager from a tokio_postgres::Config.
153        cfg.dbname = pg_config.get_dbname().map(str::to_string);
154        cfg.user = pg_config.get_user().map(str::to_string);
155        cfg.password = pg_config
156            .get_password()
157            .map(|p| String::from_utf8_lossy(p).into_owned());
158        if let Some(host) = pg_config.get_hosts().iter().find_map(|h| match h {
159            tokio_postgres::config::Host::Tcp(t) => Some(t.clone()),
160            tokio_postgres::config::Host::Unix(p) => p.to_str().map(str::to_string),
161        }) {
162            cfg.host = Some(host);
163        }
164        if let Some(port) = pg_config.get_ports().first().copied() {
165            cfg.port = Some(port);
166        }
167        let pool = cfg
168            .create_pool(Some(Runtime::Tokio1), NoTls)
169            .context("building deadpool")?;
170
171        // --- apply schema (OLTP unconditionally; pgvector knowledge table) ---
172        {
173            let client = pool
174                .get()
175                .await
176                .context("acquiring connection for migration")?;
177            client
178                .batch_execute(schema::OLTP_SCHEMA)
179                .await
180                .context("applying OLTP schema")?;
181            client
182                .batch_execute(schema::ADMIN_SCHEMA)
183                .await
184                .context("applying admin schema")?;
185            client
186                .batch_execute(schema::VECTOR_EXTENSION)
187                .await
188                .context("creating pgvector extension")?;
189            client
190                .batch_execute(&schema::knowledge_vectors_schema(embedding_dim))
191                .await
192                .context("applying knowledge_vectors schema")?;
193            client
194                .batch_execute(&schema::memories_schema(embedding_dim))
195                .await
196                .context("applying memories schema")?;
197        }
198
199        // --- sync checkpoint store against the SAME database ---
200        // PostgresCheckpointStore::connect runs blocking r2d2 setup; keep it off
201        // the async worker threads.
202        let cs_conn = conn_str.to_string();
203        let checkpoints =
204            tokio::task::spawn_blocking(move || PostgresCheckpointStore::connect(&cs_conn))
205                .await
206                .context("checkpoint store setup task panicked")?
207                .context("constructing PostgresCheckpointStore")?;
208        let checkpoints = Arc::new(checkpoints);
209
210        // --- pgvector knowledge base (shares the async pool) ---
211        let handle = tokio::runtime::Handle::current();
212        let knowledge = Arc::new(PgKnowledgeBase::new(
213            pool.clone(),
214            embedder.clone(),
215            handle.clone(),
216            None,
217        ));
218
219        Ok(Self {
220            pool,
221            checkpoints: Some(checkpoints),
222            knowledge,
223            embedder,
224            embedding_dim,
225            handle,
226        })
227    }
228
229    /// Connect using `DATABASE_URL` or `SMOOTH_AGENT_DATABASE_URL` (the latter
230    /// wins if both are set).
231    ///
232    /// # Errors
233    /// Returns an error if neither env var is set, or if [`Self::connect`] fails.
234    pub async fn from_env() -> Result<Self> {
235        let conn_str = std::env::var("SMOOTH_AGENT_DATABASE_URL")
236            .or_else(|_| std::env::var("DATABASE_URL"))
237            .map_err(|_| anyhow!("neither SMOOTH_AGENT_DATABASE_URL nor DATABASE_URL is set"))?;
238        Self::connect(&conn_str).await
239    }
240
241    /// The embedding dimension this adapter's `knowledge_vectors` column uses.
242    #[must_use]
243    pub fn embedding_dim(&self) -> usize {
244        self.embedding_dim
245    }
246
247    /// A Postgres-backed [`ConnectorConfigStore`](smooth_operator::connector_config::ConnectorConfigStore)
248    /// over this adapter's pool (the `connector_configs` table). Cheap to build
249    /// (clones the pool handle); make as many as you like.
250    #[must_use]
251    pub fn connector_config_store(&self) -> PgConnectorConfigStore {
252        PgConnectorConfigStore::new(self.pool.clone(), self.handle.clone())
253    }
254
255    /// A Postgres-backed [`SettingsStore`](smooth_operator::settings::SettingsStore)
256    /// over this adapter's pool (the `agent_settings` table).
257    #[must_use]
258    pub fn settings_store(&self) -> PgSettingsStore {
259        PgSettingsStore::new(self.pool.clone(), self.handle.clone())
260    }
261
262    /// A Postgres-backed [`AgentConfigResolver`](smooth_operator::agent_config::AgentConfigResolver)
263    /// over this adapter's pool (the monorepo `agents` table). Reads a
264    /// connection's per-agent `instructions` / `conversation_workflow` so the
265    /// runner honors them. Degrades to no per-agent config when the table is
266    /// absent or the row is malformed. Cheap to build (clones the pool handle).
267    #[must_use]
268    pub fn agent_config_resolver(&self) -> PgAgentConfigResolver {
269        PgAgentConfigResolver::new(self.pool.clone())
270    }
271
272    /// A Postgres-backed [`IndexingStore`](smooth_operator_ingestion::indexing::IndexingStore)
273    /// over this adapter's pool (the `indexing_runs` table).
274    #[must_use]
275    pub fn indexing_store(&self) -> PgIndexingStore {
276        PgIndexingStore::new(self.pool.clone(), self.handle.clone())
277    }
278
279    /// A Postgres-backed [`Memory`](smooth_operator_core::Memory) over this
280    /// adapter's pool (the `memories` table), bound to one `(organization_id,
281    /// user_id)` namespace — persistent, semantic, cross-thread agent memory
282    /// (parity gap Phase 3 / SMOODEV-1470). Pass `user_id = None` for org-wide
283    /// memory. Embeds with the adapter's configured [`Embedder`] so memory and
284    /// knowledge vectors share the same column width and hashing.
285    ///
286    /// Cheap to build (clones pool + embedder handles); make one per
287    /// `(org, user)` you serve.
288    #[must_use]
289    pub fn memory(&self, organization_id: impl Into<String>, user_id: Option<String>) -> PgMemory {
290        PgMemory::new(
291            self.pool.clone(),
292            self.embedder.clone(),
293            self.handle.clone(),
294            organization_id,
295            user_id,
296        )
297    }
298}
299
300// --- row → domain helpers ---------------------------------------------------
301
302fn platform_to_str(p: Platform) -> &'static str {
303    match p {
304        Platform::Web => "web",
305        Platform::Messenger => "messenger",
306        Platform::Instagram => "instagram",
307        Platform::Email => "email",
308        Platform::Discord => "discord",
309        Platform::Phone => "phone",
310        Platform::Sms => "sms",
311        Platform::Slack => "slack",
312        Platform::Whatsapp => "whatsapp",
313        Platform::Tiktok => "tiktok",
314    }
315}
316
317fn platform_from_str(s: &str) -> Result<Platform> {
318    Ok(match s {
319        "web" => Platform::Web,
320        "messenger" => Platform::Messenger,
321        "instagram" => Platform::Instagram,
322        "email" => Platform::Email,
323        "discord" => Platform::Discord,
324        "phone" => Platform::Phone,
325        "sms" => Platform::Sms,
326        "slack" => Platform::Slack,
327        "whatsapp" => Platform::Whatsapp,
328        "tiktok" => Platform::Tiktok,
329        other => return Err(anyhow!("unknown platform '{other}'")),
330    })
331}
332
333fn participant_type_to_str(t: ParticipantType) -> &'static str {
334    match t {
335        ParticipantType::User => "user",
336        ParticipantType::AiAgent => "ai-agent",
337        ParticipantType::HumanAgent => "human-agent",
338    }
339}
340
341fn participant_type_from_str(s: &str) -> Result<ParticipantType> {
342    Ok(match s {
343        "user" => ParticipantType::User,
344        "ai-agent" => ParticipantType::AiAgent,
345        "human-agent" => ParticipantType::HumanAgent,
346        other => return Err(anyhow!("unknown participant type '{other}'")),
347    })
348}
349
350fn direction_to_str(d: Direction) -> &'static str {
351    match d {
352        Direction::Inbound => "inbound",
353        Direction::Outbound => "outbound",
354    }
355}
356
357fn direction_from_str(s: &str) -> Result<Direction> {
358    Ok(match s {
359        "inbound" => Direction::Inbound,
360        "outbound" => Direction::Outbound,
361        other => return Err(anyhow!("unknown direction '{other}'")),
362    })
363}
364
365fn session_status_to_str(s: SessionStatus) -> &'static str {
366    match s {
367        SessionStatus::Active => "active",
368        SessionStatus::Idle => "idle",
369        SessionStatus::Ended => "ended",
370    }
371}
372
373fn session_status_from_str(s: &str) -> Result<SessionStatus> {
374    Ok(match s {
375        "active" => SessionStatus::Active,
376        "idle" => SessionStatus::Idle,
377        "ended" => SessionStatus::Ended,
378        other => return Err(anyhow!("unknown session status '{other}'")),
379    })
380}
381
382fn row_to_conversation(row: &tokio_postgres::Row) -> Result<Conversation> {
383    Ok(Conversation {
384        id: row.get("id"),
385        platform: platform_from_str(row.get::<_, String>("platform").as_str())?,
386        name: row.get("name"),
387        organization_id: row.get("organization_id"),
388        idempotency_key: row.get("idempotency_key"),
389        metadata_json: row.get("metadata_json"),
390        analytics_json: row.get("analytics_json"),
391        created_at: row.get("created_at"),
392        updated_at: row.get("updated_at"),
393    })
394}
395
396fn row_to_participant(row: &tokio_postgres::Row) -> Result<Participant> {
397    Ok(Participant {
398        id: row.get("id"),
399        conversation_id: row.get("conversation_id"),
400        organization_id: row.get("organization_id"),
401        participant_type: participant_type_from_str(row.get::<_, String>("type").as_str())?,
402        external_id: row.get("external_id"),
403        internal_id: row.get("internal_id"),
404        browser_fingerprint: row.get("browser_fingerprint"),
405        browser_info: row.get("browser_info"),
406        name: row.get("name"),
407        email: row.get("email"),
408        phone: row.get("phone"),
409        crm_contact_id: row.get("crm_contact_id"),
410        metadata_json: row.get("metadata_json"),
411        created_at: row.get("created_at"),
412        updated_at: row.get("updated_at"),
413    })
414}
415
416fn row_to_message(row: &tokio_postgres::Row) -> Result<Message> {
417    let content: serde_json::Value = row.get("content");
418    let content: MessageContent =
419        serde_json::from_value(content).context("decoding message content")?;
420    let from: Option<serde_json::Value> = row.get("from_ref");
421    let to: Option<serde_json::Value> = row.get("to_ref");
422    let from: Option<ParticipantRef> = from
423        .map(serde_json::from_value)
424        .transpose()
425        .context("decoding from_ref")?;
426    let to: Option<ParticipantRef> = to
427        .map(serde_json::from_value)
428        .transpose()
429        .context("decoding to_ref")?;
430    Ok(Message {
431        id: row.get("id"),
432        external_id: row.get("external_id"),
433        organization_id: row.get("organization_id"),
434        conversation_id: row.get("conversation_id"),
435        direction: direction_from_str(row.get::<_, String>("direction").as_str())?,
436        content,
437        from,
438        to,
439        metadata_json: row.get("metadata_json"),
440        analytics_json: row.get("analytics_json"),
441        created_at: row.get("created_at"),
442        updated_at: row.get("updated_at"),
443    })
444}
445
446fn row_to_session(row: &tokio_postgres::Row) -> Result<Session> {
447    let status: Option<String> = row.get("status");
448    let status = status.map(|s| session_status_from_str(&s)).transpose()?;
449    let token_count: Option<i64> = row.get("token_count");
450    let message_count: Option<i64> = row.get("message_count");
451    let metadata: Option<serde_json::Value> = row.get("metadata");
452    let metadata = metadata
453        .map(serde_json::from_value)
454        .transpose()
455        .context("decoding session metadata")?;
456    Ok(Session {
457        session_id: row.get("session_id"),
458        conversation_id: row.get("conversation_id"),
459        organization_id: row.get("organization_id"),
460        agent_id: row.get("agent_id"),
461        agent_name: row.get("agent_name"),
462        user_participant_id: row.get("user_participant_id"),
463        agent_participant_id: row.get("agent_participant_id"),
464        thread_id: row.get("thread_id"),
465        status,
466        token_count: token_count.map(|v| u64::try_from(v).unwrap_or(0)),
467        message_count: message_count.map(|v| u64::try_from(v).unwrap_or(0)),
468        metadata,
469        created_at: row.get("created_at"),
470        updated_at: row.get("updated_at"),
471        ended_at: row.get("ended_at"),
472        last_activity_at: row.get("last_activity_at"),
473    })
474}
475
476#[async_trait]
477impl StorageAdapter for PostgresAdapter {
478    // ---- conversations ---------------------------------------------------
479
480    async fn create_conversation(&self, conversation: Conversation) -> Result<Conversation> {
481        let client = self.pool.get().await?;
482        // Idempotency on (org, idempotencyKey): INSERT, on conflict do nothing,
483        // then read back whichever row owns the key (new or pre-existing).
484        client
485            .execute(
486                "INSERT INTO conversations
487                    (id, platform, name, organization_id, idempotency_key, metadata_json, analytics_json, created_at, updated_at)
488                 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
489                 ON CONFLICT (organization_id, idempotency_key) DO NOTHING",
490                &[
491                    &conversation.id,
492                    &platform_to_str(conversation.platform),
493                    &conversation.name,
494                    &conversation.organization_id,
495                    &conversation.idempotency_key,
496                    &conversation.metadata_json,
497                    &conversation.analytics_json,
498                    &conversation.created_at,
499                    &conversation.updated_at,
500                ],
501            )
502            .await?;
503        let row = client
504            .query_one(
505                "SELECT * FROM conversations WHERE organization_id = $1 AND idempotency_key = $2",
506                &[&conversation.organization_id, &conversation.idempotency_key],
507            )
508            .await?;
509        row_to_conversation(&row)
510    }
511
512    async fn get_conversation(&self, id: &str) -> Result<Option<Conversation>> {
513        let client = self.pool.get().await?;
514        let row = client
515            .query_opt("SELECT * FROM conversations WHERE id = $1", &[&id])
516            .await?;
517        row.as_ref().map(row_to_conversation).transpose()
518    }
519
520    async fn list_conversations_by_org(&self, organization_id: &str) -> Result<Vec<Conversation>> {
521        let client = self.pool.get().await?;
522        let rows = client
523            .query(
524                "SELECT * FROM conversations WHERE organization_id = $1 ORDER BY created_at DESC",
525                &[&organization_id],
526            )
527            .await?;
528        rows.iter().map(row_to_conversation).collect()
529    }
530
531    async fn update_conversation(
532        &self,
533        id: &str,
534        update: ConversationUpdate,
535    ) -> Result<Conversation> {
536        let client = self.pool.get().await?;
537        let now = Utc::now();
538        // COALESCE keeps existing values when the update field is NULL; the
539        // metadata/analytics fields are explicitly settable (incl. clearing is
540        // out of scope here, matching the in-memory "set only when Some").
541        let set_metadata = update.metadata_json.is_some();
542        let set_analytics = update.analytics_json.is_some();
543        let row = client
544            .query_one(
545                "UPDATE conversations SET
546                    name = COALESCE($2, name),
547                    metadata_json = CASE WHEN $3 THEN $4 ELSE metadata_json END,
548                    analytics_json = CASE WHEN $5 THEN $6 ELSE analytics_json END,
549                    updated_at = $7
550                 WHERE id = $1
551                 RETURNING *",
552                &[
553                    &id,
554                    &update.name,
555                    &set_metadata,
556                    &update.metadata_json,
557                    &set_analytics,
558                    &update.analytics_json,
559                    &now,
560                ],
561            )
562            .await
563            .with_context(|| format!("conversation '{id}' not found"))?;
564        row_to_conversation(&row)
565    }
566
567    // ---- participants ----------------------------------------------------
568
569    async fn add_participant(&self, participant: Participant) -> Result<Participant> {
570        let client = self.pool.get().await?;
571        client
572            .execute(
573                "INSERT INTO conversation_participants
574                    (id, conversation_id, organization_id, type, external_id, internal_id,
575                     browser_fingerprint, browser_info, name, email, phone, crm_contact_id,
576                     metadata_json, created_at, updated_at)
577                 VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13,$14,$15)",
578                &[
579                    &participant.id,
580                    &participant.conversation_id,
581                    &participant.organization_id,
582                    &participant_type_to_str(participant.participant_type),
583                    &participant.external_id,
584                    &participant.internal_id,
585                    &participant.browser_fingerprint,
586                    &participant.browser_info,
587                    &participant.name,
588                    &participant.email,
589                    &participant.phone,
590                    &participant.crm_contact_id,
591                    &participant.metadata_json,
592                    &participant.created_at,
593                    &participant.updated_at,
594                ],
595            )
596            .await?;
597        Ok(participant)
598    }
599
600    async fn get_participant(&self, id: &str) -> Result<Option<Participant>> {
601        let client = self.pool.get().await?;
602        let row = client
603            .query_opt(
604                "SELECT * FROM conversation_participants WHERE id = $1",
605                &[&id],
606            )
607            .await?;
608        row.as_ref().map(row_to_participant).transpose()
609    }
610
611    async fn list_participants_by_conversation(
612        &self,
613        conversation_id: &str,
614    ) -> Result<Vec<Participant>> {
615        let client = self.pool.get().await?;
616        let rows = client
617            .query(
618                "SELECT * FROM conversation_participants WHERE conversation_id = $1 ORDER BY created_at, id",
619                &[&conversation_id],
620            )
621            .await?;
622        rows.iter().map(row_to_participant).collect()
623    }
624
625    async fn resolve_participant_by_external_id(
626        &self,
627        conversation_id: &str,
628        external_id: &str,
629    ) -> Result<Option<Participant>> {
630        let client = self.pool.get().await?;
631        let row = client
632            .query_opt(
633                "SELECT * FROM conversation_participants
634                 WHERE conversation_id = $1 AND external_id = $2
635                 ORDER BY created_at LIMIT 1",
636                &[&conversation_id, &external_id],
637            )
638            .await?;
639        row.as_ref().map(row_to_participant).transpose()
640    }
641
642    // ---- messages --------------------------------------------------------
643
644    async fn append_message(&self, message: Message) -> Result<Message> {
645        let client = self.pool.get().await?;
646        let content = serde_json::to_value(&message.content)?;
647        let from = message
648            .from
649            .as_ref()
650            .map(serde_json::to_value)
651            .transpose()?;
652        let to = message.to.as_ref().map(serde_json::to_value).transpose()?;
653        client
654            .execute(
655                "INSERT INTO conversation_messages
656                    (id, external_id, organization_id, conversation_id, direction, content,
657                     from_ref, to_ref, metadata_json, analytics_json, created_at, updated_at)
658                 VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12)",
659                &[
660                    &message.id,
661                    &message.external_id,
662                    &message.organization_id,
663                    &message.conversation_id,
664                    &direction_to_str(message.direction),
665                    &content,
666                    &from,
667                    &to,
668                    &message.metadata_json,
669                    &message.analytics_json,
670                    &message.created_at,
671                    &message.updated_at,
672                ],
673            )
674            .await?;
675        Ok(message)
676    }
677
678    async fn get_message(&self, id: &str) -> Result<Option<Message>> {
679        let client = self.pool.get().await?;
680        let row = client
681            .query_opt("SELECT * FROM conversation_messages WHERE id = $1", &[&id])
682            .await?;
683        row.as_ref().map(row_to_message).transpose()
684    }
685
686    async fn list_messages_by_conversation(&self, query: MessageQuery) -> Result<MessagePage> {
687        let client = self.pool.get().await?;
688        let limit_i64 = i64::try_from(query.limit).unwrap_or(i64::MAX);
689
690        // Cursor is a message id; page starts strictly after that message's seq
691        // (or before, when descending). Resolve it to a seq first.
692        let cursor_seq: Option<i64> = match &query.cursor {
693            Some(cursor) => {
694                let row = client
695                    .query_opt(
696                        "SELECT seq FROM conversation_messages WHERE id = $1",
697                        &[&cursor],
698                    )
699                    .await?;
700                row.map(|r| r.get::<_, i64>("seq"))
701            }
702            None => None,
703        };
704
705        // Fetch limit + 1 to detect whether another page remains, mirroring the
706        // in-memory "next_cursor is Some iff more rows follow" contract.
707        let probe = limit_i64.saturating_add(1);
708        let rows = if query.descending {
709            // Newest first: seq descending; cursor means "seq < cursor_seq".
710            match cursor_seq {
711                Some(seq) => {
712                    client
713                        .query(
714                            "SELECT * FROM conversation_messages
715                             WHERE conversation_id = $1 AND seq < $2
716                             ORDER BY seq DESC LIMIT $3",
717                            &[&query.conversation_id, &seq, &probe],
718                        )
719                        .await?
720                }
721                None => {
722                    client
723                        .query(
724                            "SELECT * FROM conversation_messages
725                             WHERE conversation_id = $1
726                             ORDER BY seq DESC LIMIT $2",
727                            &[&query.conversation_id, &probe],
728                        )
729                        .await?
730                }
731            }
732        } else {
733            // Oldest first: seq ascending; cursor means "seq > cursor_seq".
734            match cursor_seq {
735                Some(seq) => {
736                    client
737                        .query(
738                            "SELECT * FROM conversation_messages
739                             WHERE conversation_id = $1 AND seq > $2
740                             ORDER BY seq ASC LIMIT $3",
741                            &[&query.conversation_id, &seq, &probe],
742                        )
743                        .await?
744                }
745                None => {
746                    client
747                        .query(
748                            "SELECT * FROM conversation_messages
749                             WHERE conversation_id = $1
750                             ORDER BY seq ASC LIMIT $2",
751                            &[&query.conversation_id, &probe],
752                        )
753                        .await?
754                }
755            }
756        };
757
758        let has_more = rows.len() as i64 > limit_i64;
759        let page_rows = if has_more {
760            &rows[..query.limit]
761        } else {
762            &rows[..]
763        };
764        let messages: Vec<Message> = page_rows
765            .iter()
766            .map(row_to_message)
767            .collect::<Result<_>>()?;
768        let next_cursor = if has_more {
769            messages.last().map(|m| m.id.clone())
770        } else {
771            None
772        };
773
774        Ok(MessagePage {
775            messages,
776            next_cursor,
777        })
778    }
779
780    // ---- sessions --------------------------------------------------------
781
782    async fn create_session(&self, session: Session) -> Result<Session> {
783        let client = self.pool.get().await?;
784        let status = session.status.map(session_status_to_str);
785        let token_count = session
786            .token_count
787            .map(|v| i64::try_from(v).unwrap_or(i64::MAX));
788        let message_count = session
789            .message_count
790            .map(|v| i64::try_from(v).unwrap_or(i64::MAX));
791        let metadata = session
792            .metadata
793            .as_ref()
794            .map(serde_json::to_value)
795            .transpose()?;
796        client
797            .execute(
798                "INSERT INTO conversation_sessions
799                    (session_id, conversation_id, organization_id, agent_id, agent_name,
800                     user_participant_id, agent_participant_id, thread_id, status, token_count,
801                     message_count, metadata, created_at, updated_at, ended_at, last_activity_at)
802                 VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12,$13,$14,$15,$16)",
803                &[
804                    &session.session_id,
805                    &session.conversation_id,
806                    &session.organization_id,
807                    &session.agent_id,
808                    &session.agent_name,
809                    &session.user_participant_id,
810                    &session.agent_participant_id,
811                    &session.thread_id,
812                    &status,
813                    &token_count,
814                    &message_count,
815                    &metadata,
816                    &session.created_at,
817                    &session.updated_at,
818                    &session.ended_at,
819                    &session.last_activity_at,
820                ],
821            )
822            .await?;
823        Ok(session)
824    }
825
826    async fn get_session(&self, session_id: &str) -> Result<Option<Session>> {
827        let client = self.pool.get().await?;
828        let row = client
829            .query_opt(
830                "SELECT * FROM conversation_sessions WHERE session_id = $1",
831                &[&session_id],
832            )
833            .await?;
834        row.as_ref().map(row_to_session).transpose()
835    }
836
837    async fn update_session(&self, session_id: &str, update: SessionUpdate) -> Result<Session> {
838        let client = self.pool.get().await?;
839        let now = Utc::now();
840        let status = update.status.map(session_status_to_str);
841        let token_count = update
842            .token_count
843            .map(|v| i64::try_from(v).unwrap_or(i64::MAX));
844        let message_count = update
845            .message_count
846            .map(|v| i64::try_from(v).unwrap_or(i64::MAX));
847        // last_activity_at / ended_at are set only when Some (mirrors in-memory).
848        let set_last_activity = update.last_activity_at.is_some();
849        let set_ended = update.ended_at.is_some();
850        let row = client
851            .query_one(
852                "UPDATE conversation_sessions SET
853                    status = COALESCE($2, status),
854                    token_count = COALESCE($3, token_count),
855                    message_count = COALESCE($4, message_count),
856                    last_activity_at = CASE WHEN $5 THEN $6 ELSE last_activity_at END,
857                    ended_at = CASE WHEN $7 THEN $8 ELSE ended_at END,
858                    updated_at = $9
859                 WHERE session_id = $1
860                 RETURNING *",
861                &[
862                    &session_id,
863                    &status,
864                    &token_count,
865                    &message_count,
866                    &set_last_activity,
867                    &update.last_activity_at,
868                    &set_ended,
869                    &update.ended_at,
870                    &now,
871                ],
872            )
873            .await
874            .with_context(|| format!("session '{session_id}' not found"))?;
875        row_to_session(&row)
876    }
877
878    async fn list_sessions_by_conversation(&self, conversation_id: &str) -> Result<Vec<Session>> {
879        let client = self.pool.get().await?;
880        let rows = client
881            .query(
882                "SELECT * FROM conversation_sessions WHERE conversation_id = $1 ORDER BY created_at, session_id",
883                &[&conversation_id],
884            )
885            .await?;
886        rows.iter().map(row_to_session).collect()
887    }
888
889    // ---- engine accessors ------------------------------------------------
890
891    fn checkpoints(&self) -> Arc<dyn CheckpointStore> {
892        // Always `Some` between construction and drop.
893        self.checkpoints
894            .as_ref()
895            .expect("checkpoint store present")
896            .clone()
897    }
898
899    fn knowledge(&self) -> Arc<dyn KnowledgeBase> {
900        self.knowledge.clone()
901    }
902
903    fn knowledge_for_access(&self, access: &AccessContext) -> Arc<dyn KnowledgeBase> {
904        // Durable document-level ACL (feature gap G3): the returned handle
905        // filters every query by the requester's entitlements against the stored
906        // `acl` column **in SQL**, so a restricted document is never fetched —
907        // and the filter survives the ingest→serve process boundary (unlike the
908        // in-memory side table). See `knowledge::PgKnowledgeBase::query_async`.
909        Arc::new(self.knowledge.with_access(access.clone()))
910    }
911}