Skip to main content

claude_agent/session/
persistence_postgres.rs

1//! PostgreSQL session persistence with explicit schema management.
2//!
3//! # Schema Management
4//!
5//! This module separates schema management from data access, allowing flexible deployment:
6//!
7//! ```rust,no_run
8//! use claude_agent::session::{PostgresPersistence, PostgresSchema, PostgresConfig};
9//!
10//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
11//! // Option 1: Auto-migrate (development/simple deployments)
12//! let persistence = PostgresPersistence::connect_and_migrate("postgres://...").await?;
13//!
14//! // Option 2: Connect only, manage schema externally (production)
15//! let persistence = PostgresPersistence::connect("postgres://...").await?;
16//!
17//! // Option 3: Export SQL for external migration tools (Flyway, Diesel, etc.)
18//! let sql = PostgresSchema::sql(&PostgresConfig::default());
19//! println!("{}", sql);
20//!
21//! // Option 4: Verify schema is correct
22//! let issues = persistence.verify_schema().await?;
23//! if !issues.is_empty() {
24//!     for issue in &issues {
25//!         eprintln!("Schema issue: {:?}", issue);
26//!     }
27//! }
28//! # Ok(())
29//! # }
30//! ```
31
32use std::collections::VecDeque;
33use std::sync::Arc;
34use std::time::Duration;
35
36use async_trait::async_trait;
37use chrono::Utc;
38use sqlx::postgres::PgPoolOptions;
39use sqlx::{PgPool, Postgres, Row, Transaction};
40use uuid::Uuid;
41
42use super::persistence::Persistence;
43use super::state::{Session, SessionConfig, SessionId, SessionMessage};
44use super::types::{CompactRecord, Plan, QueueItem, QueueStatus, SummarySnapshot, TodoItem};
45use super::{SessionError, SessionResult, StorageResultExt};
46
47fn enum_to_db<T: serde::Serialize>(value: &T, default: &str) -> String {
48    serde_json::to_string(value)
49        .map(|s| s.trim_matches('"').to_string())
50        .unwrap_or_else(|_| default.to_string())
51}
52
53fn db_to_enum<T: serde::de::DeserializeOwned>(s: &str) -> Option<T> {
54    serde_json::from_str(&format!("\"{}\"", s)).ok()
55}
56
57// ============================================================================
58// Configuration
59// ============================================================================
60
61/// Connection pool configuration for PostgreSQL.
62#[derive(Clone, Debug)]
63pub struct PgPoolConfig {
64    pub max_connections: u32,
65    pub min_connections: u32,
66    pub connect_timeout: Duration,
67    pub idle_timeout: Duration,
68    pub max_lifetime: Duration,
69    pub acquire_timeout: Duration,
70    /// Maximum retry attempts for transient failures.
71    pub max_retries: u32,
72    /// Initial backoff duration for retries.
73    pub initial_backoff: Duration,
74    /// Maximum backoff duration.
75    pub max_backoff: Duration,
76}
77
78impl Default for PgPoolConfig {
79    fn default() -> Self {
80        Self {
81            max_connections: 10,
82            min_connections: 1,
83            connect_timeout: Duration::from_secs(30),
84            idle_timeout: Duration::from_secs(600),
85            max_lifetime: Duration::from_secs(1800),
86            acquire_timeout: Duration::from_secs(30),
87            max_retries: 3,
88            initial_backoff: Duration::from_millis(100),
89            max_backoff: Duration::from_secs(5),
90        }
91    }
92}
93
94impl PgPoolConfig {
95    pub fn high_throughput() -> Self {
96        Self {
97            max_connections: 50,
98            min_connections: 5,
99            connect_timeout: Duration::from_secs(10),
100            idle_timeout: Duration::from_secs(300),
101            max_lifetime: Duration::from_secs(900),
102            acquire_timeout: Duration::from_secs(10),
103            max_retries: 3,
104            initial_backoff: Duration::from_millis(50),
105            max_backoff: Duration::from_secs(2),
106        }
107    }
108
109    pub(crate) fn apply(&self) -> PgPoolOptions {
110        PgPoolOptions::new()
111            .max_connections(self.max_connections)
112            .min_connections(self.min_connections)
113            .acquire_timeout(self.acquire_timeout)
114            .idle_timeout(Some(self.idle_timeout))
115            .max_lifetime(Some(self.max_lifetime))
116    }
117}
118
119/// PostgreSQL persistence configuration.
120#[derive(Clone, Debug)]
121pub struct PostgresConfig {
122    pub sessions_table: String,
123    pub messages_table: String,
124    pub compacts_table: String,
125    pub summaries_table: String,
126    pub queue_table: String,
127    pub todos_table: String,
128    pub plans_table: String,
129    pub pool: PgPoolConfig,
130    /// Session retention period in days (default: 30).
131    ///
132    /// Sessions without explicit TTL that haven't been updated within
133    /// this period are cleaned up by `cleanup_expired()`.
134    pub retention_days: u32,
135}
136
137impl Default for PostgresConfig {
138    fn default() -> Self {
139        // Safety: "claude_" is a valid prefix (alphanumeric + underscore)
140        Self::prefix("claude_").unwrap()
141    }
142}
143
144impl PostgresConfig {
145    pub fn prefix(prefix: &str) -> Result<Self, SessionError> {
146        if !prefix
147            .chars()
148            .all(|c| c.is_ascii_alphanumeric() || c == '_')
149        {
150            return Err(SessionError::Storage {
151                message: format!(
152                    "Invalid table prefix '{}': only ASCII alphanumeric and underscore allowed",
153                    prefix
154                ),
155            });
156        }
157        Ok(Self {
158            sessions_table: format!("{prefix}sessions"),
159            messages_table: format!("{prefix}messages"),
160            compacts_table: format!("{prefix}compacts"),
161            summaries_table: format!("{prefix}summaries"),
162            queue_table: format!("{prefix}queue"),
163            todos_table: format!("{prefix}todos"),
164            plans_table: format!("{prefix}plans"),
165            pool: PgPoolConfig::default(),
166            retention_days: 30,
167        })
168    }
169
170    pub fn pool(mut self, pool: PgPoolConfig) -> Self {
171        self.pool = pool;
172        self
173    }
174
175    pub fn retention_days(mut self, days: u32) -> Self {
176        self.retention_days = days;
177        self
178    }
179
180    /// Get all table names.
181    pub fn table_names(&self) -> Vec<&str> {
182        vec![
183            &self.sessions_table,
184            &self.messages_table,
185            &self.compacts_table,
186            &self.summaries_table,
187            &self.queue_table,
188            &self.todos_table,
189            &self.plans_table,
190        ]
191    }
192}
193
194// ============================================================================
195// Schema Management
196// ============================================================================
197
198/// Schema issue found during verification.
199#[derive(Debug, Clone)]
200pub enum SchemaIssue {
201    MissingTable(String),
202    MissingIndex { table: String, index: String },
203    MissingColumn { table: String, column: String },
204}
205
206impl std::fmt::Display for SchemaIssue {
207    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
208        match self {
209            SchemaIssue::MissingTable(t) => write!(f, "Missing table: {}", t),
210            SchemaIssue::MissingIndex { table, index } => {
211                write!(f, "Missing index '{}' on table '{}'", index, table)
212            }
213            SchemaIssue::MissingColumn { table, column } => {
214                write!(f, "Missing column '{}' in table '{}'", column, table)
215            }
216        }
217    }
218}
219
220/// Schema manager for PostgreSQL persistence.
221///
222/// Provides utilities for schema creation, migration, and verification.
223pub struct PostgresSchema;
224
225impl PostgresSchema {
226    /// Generate complete SQL DDL for all tables and indexes.
227    ///
228    /// Use this to integrate with external migration tools (Flyway, Diesel, etc.).
229    pub fn sql(config: &PostgresConfig) -> String {
230        let mut sql = String::new();
231        sql.push_str("-- Claude Agent Session Schema\n");
232        sql.push_str("-- Generated by claude-agent PostgresSchema\n\n");
233
234        for table_sql in Self::table_ddl(config) {
235            sql.push_str(&table_sql);
236            sql.push_str("\n\n");
237        }
238
239        sql.push_str("-- Indexes\n");
240        for index_sql in Self::index_ddl(config) {
241            sql.push_str(&index_sql);
242            sql.push_str(";\n");
243        }
244
245        sql
246    }
247
248    /// Generate table DDL statements.
249    pub fn table_ddl(config: &PostgresConfig) -> Vec<String> {
250        let c = config;
251        vec![
252            format!(
253                r#"CREATE TABLE IF NOT EXISTS {sessions} (
254    id VARCHAR(255) PRIMARY KEY,
255    parent_id VARCHAR(255),
256    tenant_id VARCHAR(255),
257    session_type VARCHAR(32) NOT NULL DEFAULT 'main',
258    state VARCHAR(32) NOT NULL DEFAULT 'created',
259    mode VARCHAR(32) NOT NULL DEFAULT 'default',
260    config JSONB NOT NULL DEFAULT '{{}}',
261    permissions JSONB NOT NULL DEFAULT '{{}}',
262    summary TEXT,
263    total_input_tokens BIGINT DEFAULT 0,
264    total_output_tokens BIGINT DEFAULT 0,
265    total_cost_usd DECIMAL(12, 6) DEFAULT 0,
266    current_leaf_id VARCHAR(255),
267    static_context_hash VARCHAR(64),
268    error TEXT,
269    created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
270    updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
271    expires_at TIMESTAMPTZ,
272    CONSTRAINT fk_{sessions}_parent FOREIGN KEY (parent_id) REFERENCES {sessions}(id) ON DELETE CASCADE
273);"#,
274                sessions = c.sessions_table
275            ),
276            format!(
277                r#"CREATE TABLE IF NOT EXISTS {messages} (
278    id VARCHAR(255) PRIMARY KEY,
279    session_id VARCHAR(255) NOT NULL,
280    parent_id VARCHAR(255),
281    role VARCHAR(16) NOT NULL,
282    content JSONB NOT NULL,
283    is_sidechain BOOLEAN DEFAULT FALSE,
284    is_compact_summary BOOLEAN DEFAULT FALSE,
285    model VARCHAR(64),
286    request_id VARCHAR(255),
287    usage JSONB,
288    metadata JSONB,
289    environment JSONB,
290    created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
291    CONSTRAINT fk_{messages}_session FOREIGN KEY (session_id) REFERENCES {sessions}(id) ON DELETE CASCADE
292);"#,
293                messages = c.messages_table,
294                sessions = c.sessions_table
295            ),
296            format!(
297                r#"CREATE TABLE IF NOT EXISTS {compacts} (
298    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
299    session_id VARCHAR(255) NOT NULL,
300    trigger VARCHAR(32) NOT NULL,
301    pre_tokens INTEGER NOT NULL,
302    post_tokens INTEGER NOT NULL,
303    saved_tokens INTEGER NOT NULL,
304    summary TEXT NOT NULL,
305    original_count INTEGER NOT NULL,
306    new_count INTEGER NOT NULL,
307    logical_parent_id VARCHAR(255),
308    created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
309    CONSTRAINT fk_{compacts}_session FOREIGN KEY (session_id) REFERENCES {sessions}(id) ON DELETE CASCADE
310);"#,
311                compacts = c.compacts_table,
312                sessions = c.sessions_table
313            ),
314            format!(
315                r#"CREATE TABLE IF NOT EXISTS {summaries} (
316    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
317    session_id VARCHAR(255) NOT NULL,
318    summary TEXT NOT NULL,
319    leaf_message_id VARCHAR(255),
320    created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
321    CONSTRAINT fk_{summaries}_session FOREIGN KEY (session_id) REFERENCES {sessions}(id) ON DELETE CASCADE
322);"#,
323                summaries = c.summaries_table,
324                sessions = c.sessions_table
325            ),
326            format!(
327                r#"CREATE TABLE IF NOT EXISTS {queue} (
328    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
329    session_id VARCHAR(255) NOT NULL,
330    operation VARCHAR(32) NOT NULL,
331    content TEXT NOT NULL,
332    priority INTEGER DEFAULT 0,
333    status VARCHAR(32) NOT NULL DEFAULT 'pending',
334    created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
335    processed_at TIMESTAMPTZ,
336    CONSTRAINT fk_{queue}_session FOREIGN KEY (session_id) REFERENCES {sessions}(id) ON DELETE CASCADE
337);"#,
338                queue = c.queue_table,
339                sessions = c.sessions_table
340            ),
341            format!(
342                r#"CREATE TABLE IF NOT EXISTS {todos} (
343    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
344    session_id VARCHAR(255) NOT NULL,
345    plan_id UUID,
346    content TEXT NOT NULL,
347    active_form TEXT NOT NULL,
348    status VARCHAR(32) NOT NULL DEFAULT 'pending',
349    created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
350    started_at TIMESTAMPTZ,
351    completed_at TIMESTAMPTZ,
352    CONSTRAINT fk_{todos}_session FOREIGN KEY (session_id) REFERENCES {sessions}(id) ON DELETE CASCADE
353);"#,
354                todos = c.todos_table,
355                sessions = c.sessions_table
356            ),
357            format!(
358                r#"CREATE TABLE IF NOT EXISTS {plans} (
359    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
360    session_id VARCHAR(255) NOT NULL,
361    name VARCHAR(255),
362    content TEXT NOT NULL,
363    status VARCHAR(32) NOT NULL DEFAULT 'draft',
364    error TEXT,
365    created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
366    approved_at TIMESTAMPTZ,
367    started_at TIMESTAMPTZ,
368    completed_at TIMESTAMPTZ,
369    CONSTRAINT fk_{plans}_session FOREIGN KEY (session_id) REFERENCES {sessions}(id) ON DELETE CASCADE
370);"#,
371                plans = c.plans_table,
372                sessions = c.sessions_table
373            ),
374        ]
375    }
376
377    /// Generate index DDL statements.
378    pub fn index_ddl(config: &PostgresConfig) -> Vec<String> {
379        let c = config;
380        vec![
381            // Sessions indexes
382            format!(
383                "CREATE INDEX IF NOT EXISTS idx_{0}_tenant ON {0}(tenant_id)",
384                c.sessions_table
385            ),
386            format!(
387                "CREATE INDEX IF NOT EXISTS idx_{0}_parent ON {0}(parent_id)",
388                c.sessions_table
389            ),
390            format!(
391                "CREATE INDEX IF NOT EXISTS idx_{0}_expires ON {0}(expires_at) WHERE expires_at IS NOT NULL",
392                c.sessions_table
393            ),
394            format!(
395                "CREATE INDEX IF NOT EXISTS idx_{0}_state ON {0}(state)",
396                c.sessions_table
397            ),
398            // Messages indexes
399            format!(
400                "CREATE INDEX IF NOT EXISTS idx_{0}_session ON {0}(session_id)",
401                c.messages_table
402            ),
403            format!(
404                "CREATE INDEX IF NOT EXISTS idx_{0}_created ON {0}(session_id, created_at)",
405                c.messages_table
406            ),
407            // Compacts index
408            format!(
409                "CREATE INDEX IF NOT EXISTS idx_{0}_session ON {0}(session_id)",
410                c.compacts_table
411            ),
412            // Summaries index
413            format!(
414                "CREATE INDEX IF NOT EXISTS idx_{0}_session ON {0}(session_id)",
415                c.summaries_table
416            ),
417            // Queue indexes
418            format!(
419                "CREATE INDEX IF NOT EXISTS idx_{0}_session_status ON {0}(session_id, status)",
420                c.queue_table
421            ),
422            // Todos index
423            format!(
424                "CREATE INDEX IF NOT EXISTS idx_{0}_session ON {0}(session_id)",
425                c.todos_table
426            ),
427            // Plans index
428            format!(
429                "CREATE INDEX IF NOT EXISTS idx_{0}_session ON {0}(session_id)",
430                c.plans_table
431            ),
432        ]
433    }
434
435    /// Get expected indexes as (table_name, index_name) pairs.
436    pub fn expected_indexes(config: &PostgresConfig) -> Vec<(String, String)> {
437        let c = config;
438        vec![
439            (
440                c.sessions_table.clone(),
441                format!("idx_{}_tenant", c.sessions_table),
442            ),
443            (
444                c.sessions_table.clone(),
445                format!("idx_{}_parent", c.sessions_table),
446            ),
447            (
448                c.sessions_table.clone(),
449                format!("idx_{}_expires", c.sessions_table),
450            ),
451            (
452                c.sessions_table.clone(),
453                format!("idx_{}_state", c.sessions_table),
454            ),
455            (
456                c.messages_table.clone(),
457                format!("idx_{}_session", c.messages_table),
458            ),
459            (
460                c.messages_table.clone(),
461                format!("idx_{}_created", c.messages_table),
462            ),
463            (
464                c.compacts_table.clone(),
465                format!("idx_{}_session", c.compacts_table),
466            ),
467            (
468                c.summaries_table.clone(),
469                format!("idx_{}_session", c.summaries_table),
470            ),
471            (
472                c.queue_table.clone(),
473                format!("idx_{}_session_status", c.queue_table),
474            ),
475            (
476                c.todos_table.clone(),
477                format!("idx_{}_session", c.todos_table),
478            ),
479            (
480                c.plans_table.clone(),
481                format!("idx_{}_session", c.plans_table),
482            ),
483        ]
484    }
485
486    /// Run migration to create tables and indexes.
487    pub async fn migrate(pool: &PgPool, config: &PostgresConfig) -> Result<(), sqlx::Error> {
488        for table_ddl in Self::table_ddl(config) {
489            sqlx::query(&table_ddl).execute(pool).await?;
490        }
491
492        for index_ddl in Self::index_ddl(config) {
493            sqlx::query(&index_ddl).execute(pool).await?;
494        }
495
496        Ok(())
497    }
498
499    /// Verify schema integrity - check tables and indexes exist.
500    pub async fn verify(
501        pool: &PgPool,
502        config: &PostgresConfig,
503    ) -> Result<Vec<SchemaIssue>, sqlx::Error> {
504        let mut issues = Vec::new();
505
506        // Check tables
507        for table in config.table_names() {
508            let exists: bool = sqlx::query_scalar(
509                "SELECT EXISTS (SELECT 1 FROM information_schema.tables WHERE table_name = $1)",
510            )
511            .bind(table)
512            .fetch_one(pool)
513            .await?;
514
515            if !exists {
516                issues.push(SchemaIssue::MissingTable(table.to_string()));
517            }
518        }
519
520        // Check indexes
521        for (table, index) in Self::expected_indexes(config) {
522            let exists: bool = sqlx::query_scalar(
523                "SELECT EXISTS (SELECT 1 FROM pg_indexes WHERE tablename = $1 AND indexname = $2)",
524            )
525            .bind(&table)
526            .bind(&index)
527            .fetch_one(pool)
528            .await?;
529
530            if !exists {
531                issues.push(SchemaIssue::MissingIndex { table, index });
532            }
533        }
534
535        Ok(issues)
536    }
537}
538
539// ============================================================================
540// Persistence Implementation
541// ============================================================================
542
543/// PostgreSQL session persistence.
544pub struct PostgresPersistence {
545    pool: Arc<PgPool>,
546    config: PostgresConfig,
547}
548
549impl PostgresPersistence {
550    /// Connect to database without running migrations.
551    ///
552    /// Use this when managing schema externally (production deployments).
553    pub async fn connect(database_url: &str) -> Result<Self, sqlx::Error> {
554        Self::connect_with_config(database_url, PostgresConfig::default()).await
555    }
556
557    /// Connect with custom configuration, without running migrations.
558    pub async fn connect_with_config(
559        database_url: &str,
560        config: PostgresConfig,
561    ) -> Result<Self, sqlx::Error> {
562        let pool = config.pool.apply().connect(database_url).await?;
563        Ok(Self {
564            pool: Arc::new(pool),
565            config,
566        })
567    }
568
569    /// Connect and run migrations automatically.
570    ///
571    /// Convenient for development and simple deployments.
572    pub async fn connect_and_migrate(database_url: &str) -> Result<Self, sqlx::Error> {
573        Self::connect_and_migrate_with_config(database_url, PostgresConfig::default()).await
574    }
575
576    /// Connect with custom configuration and run migrations.
577    pub async fn connect_and_migrate_with_config(
578        database_url: &str,
579        config: PostgresConfig,
580    ) -> Result<Self, sqlx::Error> {
581        let persistence = Self::connect_with_config(database_url, config).await?;
582        persistence.migrate().await?;
583        Ok(persistence)
584    }
585
586    /// Use an existing pool without running migrations.
587    pub fn from_pool(pool: Arc<PgPool>) -> Self {
588        Self::pool_and_config(pool, PostgresConfig::default())
589    }
590
591    /// Use an existing pool with custom configuration.
592    pub fn pool_and_config(pool: Arc<PgPool>, config: PostgresConfig) -> Self {
593        Self { pool, config }
594    }
595
596    /// Run schema migration.
597    pub async fn migrate(&self) -> Result<(), sqlx::Error> {
598        PostgresSchema::migrate(&self.pool, &self.config).await
599    }
600
601    /// Verify schema integrity.
602    pub async fn verify_schema(&self) -> Result<Vec<SchemaIssue>, sqlx::Error> {
603        PostgresSchema::verify(&self.pool, &self.config).await
604    }
605
606    /// Get the underlying connection pool.
607    pub fn pool(&self) -> &PgPool {
608        &self.pool
609    }
610
611    /// Get the configuration.
612    pub fn config(&self) -> &PostgresConfig {
613        &self.config
614    }
615
616    // ========================================================================
617    // Internal helpers
618    // ========================================================================
619
620    async fn with_retry<F, Fut, T>(&self, operation: F) -> SessionResult<T>
621    where
622        F: Fn() -> Fut,
623        Fut: std::future::Future<Output = SessionResult<T>>,
624    {
625        super::with_retry(
626            self.config.pool.max_retries,
627            self.config.pool.initial_backoff,
628            self.config.pool.max_backoff,
629            Self::is_retryable,
630            operation,
631        )
632        .await
633    }
634
635    fn is_retryable(error: &SessionError) -> bool {
636        match error {
637            SessionError::Storage { message } => {
638                message.contains("connection")
639                    || message.contains("timeout")
640                    || message.contains("reset")
641                    || message.contains("broken pipe")
642                    || message.contains("serialization")
643                    || message.contains("deadlock")
644                    || message.contains("could not serialize")
645            }
646            _ => false,
647        }
648    }
649
650    async fn load_session_row(&self, session_id: &SessionId) -> SessionResult<Session> {
651        let c = &self.config;
652        let id_str = session_id.to_string();
653
654        let row = sqlx::query(&format!(
655            r#"
656            SELECT id, parent_id, tenant_id, session_type, state, mode,
657                   config, permissions, summary,
658                   total_input_tokens, total_output_tokens, total_cost_usd,
659                   current_leaf_id, static_context_hash, error,
660                   created_at, updated_at, expires_at
661            FROM {sessions}
662            WHERE id = $1
663            "#,
664            sessions = c.sessions_table
665        ))
666        .bind(&id_str)
667        .fetch_optional(self.pool.as_ref())
668        .await
669        .storage_err()?
670        .ok_or_else(|| SessionError::NotFound { id: id_str.clone() })?;
671
672        let messages = self.load_messages(session_id).await?;
673        let compacts = self.load_compacts(session_id).await?;
674        let todos = self.load_todos_internal(session_id).await?;
675        let plan = self.load_plan_internal(session_id).await?;
676
677        let config: SessionConfig = match row.try_get::<serde_json::Value, _>("config") {
678            Ok(v) => serde_json::from_value(v).unwrap_or_else(|e| {
679                tracing::warn!(session_id = %session_id, error = %e, "Failed to deserialize session config");
680                Default::default()
681            }),
682            Err(e) => {
683                tracing::warn!(session_id = %session_id, error = %e, "Failed to read session config column");
684                Default::default()
685            }
686        };
687
688        let permissions = match row.try_get::<serde_json::Value, _>("permissions") {
689            Ok(v) => serde_json::from_value(v).unwrap_or_else(|e| {
690                tracing::warn!(session_id = %session_id, error = %e, "Failed to deserialize session permissions");
691                Default::default()
692            }),
693            Err(e) => {
694                tracing::warn!(session_id = %session_id, error = %e, "Failed to read session permissions column");
695                Default::default()
696            }
697        };
698
699        let session_type = row
700            .try_get::<&str, _>("session_type")
701            .ok()
702            .and_then(db_to_enum)
703            .unwrap_or_default();
704
705        // mode column is ignored; SessionMode was removed (always stateless)
706        let _ = row.try_get::<&str, _>("mode");
707
708        let state = row
709            .try_get::<&str, _>("state")
710            .ok()
711            .and_then(db_to_enum)
712            .unwrap_or_default();
713
714        let current_leaf_id = row
715            .try_get::<&str, _>("current_leaf_id")
716            .ok()
717            .and_then(|s| s.parse().ok());
718
719        Ok(Session {
720            id: *session_id,
721            parent_id: row
722                .try_get::<&str, _>("parent_id")
723                .ok()
724                .and_then(|s| s.parse().ok()),
725            session_type,
726            tenant_id: row.try_get("tenant_id").ok(),
727            state,
728            config,
729            permissions,
730            messages,
731            current_leaf_id,
732            summary: row.try_get("summary").ok(),
733            total_usage: crate::types::TokenUsage {
734                input_tokens: row.try_get::<i64, _>("total_input_tokens").unwrap_or(0) as u64,
735                output_tokens: row.try_get::<i64, _>("total_output_tokens").unwrap_or(0) as u64,
736                ..Default::default()
737            },
738            current_input_tokens: 0,
739            total_cost_usd: row
740                .try_get::<rust_decimal::Decimal, _>("total_cost_usd")
741                .unwrap_or_default(),
742            static_context_hash: row.try_get("static_context_hash").ok(),
743            created_at: row.try_get("created_at").unwrap_or_else(|_| Utc::now()),
744            updated_at: row.try_get("updated_at").unwrap_or_else(|_| Utc::now()),
745            expires_at: row.try_get("expires_at").ok(),
746            error: row.try_get("error").ok(),
747            todos,
748            current_plan: plan,
749            compact_history: VecDeque::from(compacts),
750        })
751    }
752
753    async fn load_messages(&self, session_id: &SessionId) -> SessionResult<Vec<SessionMessage>> {
754        let c = &self.config;
755
756        let rows = sqlx::query(&format!(
757            r#"
758            SELECT id, parent_id, role, content, is_sidechain, is_compact_summary,
759                   model, request_id, usage, metadata, environment, created_at
760            FROM {messages}
761            WHERE session_id = $1
762            ORDER BY created_at ASC
763            "#,
764            messages = c.messages_table
765        ))
766        .bind(session_id.to_string())
767        .fetch_all(self.pool.as_ref())
768        .await
769        .storage_err()?;
770
771        let mut messages = Vec::with_capacity(rows.len());
772
773        for row in rows {
774            let id_str = match row.try_get::<&str, _>("id") {
775                Ok(id) => id,
776                Err(e) => {
777                    tracing::warn!(error = %e, "Skipping message row: failed to get id");
778                    continue;
779                }
780            };
781
782            let id = match id_str.parse() {
783                Ok(id) => id,
784                Err(e) => {
785                    tracing::warn!(id = id_str, error = %e, "Skipping message row: failed to parse id");
786                    continue;
787                }
788            };
789
790            let content: Vec<crate::types::ContentBlock> = match row
791                .try_get::<serde_json::Value, _>("content")
792                .ok()
793                .and_then(|v| serde_json::from_value(v).ok())
794            {
795                Some(c) => c,
796                None => {
797                    tracing::warn!(id = id_str, "Skipping message row: failed to parse content");
798                    continue;
799                }
800            };
801
802            let role: crate::types::Role =
803                match row.try_get::<&str, _>("role").ok().and_then(db_to_enum) {
804                    Some(r) => r,
805                    None => {
806                        tracing::warn!(id = id_str, "Skipping message row: failed to parse role");
807                        continue;
808                    }
809                };
810
811            let usage = row
812                .try_get::<serde_json::Value, _>("usage")
813                .ok()
814                .and_then(|v| serde_json::from_value(v).ok());
815
816            let metadata = match row.try_get::<serde_json::Value, _>("metadata") {
817                Ok(v) => serde_json::from_value(v).unwrap_or_else(|e| {
818                    tracing::warn!(id = id_str, error = %e, "Failed to deserialize message metadata");
819                    Default::default()
820                }),
821                Err(_) => Default::default(),
822            };
823
824            let environment = row
825                .try_get::<serde_json::Value, _>("environment")
826                .ok()
827                .and_then(|v| serde_json::from_value(v).ok());
828
829            messages.push(SessionMessage {
830                id,
831                parent_id: row
832                    .try_get::<&str, _>("parent_id")
833                    .ok()
834                    .and_then(|s| s.parse().ok()),
835                role,
836                content,
837                is_sidechain: row.try_get("is_sidechain").unwrap_or(false),
838                is_compact_summary: row.try_get("is_compact_summary").unwrap_or(false),
839                usage,
840                timestamp: row.try_get("created_at").unwrap_or_else(|_| Utc::now()),
841                metadata,
842                environment,
843            });
844        }
845
846        Ok(messages)
847    }
848
849    async fn load_compacts(&self, session_id: &SessionId) -> SessionResult<Vec<CompactRecord>> {
850        let c = &self.config;
851
852        let rows = sqlx::query(&format!(
853            r#"
854            SELECT id, session_id, trigger, pre_tokens, post_tokens, saved_tokens,
855                   summary, original_count, new_count, logical_parent_id, created_at
856            FROM {compacts}
857            WHERE session_id = $1
858            ORDER BY created_at ASC
859            "#,
860            compacts = c.compacts_table
861        ))
862        .bind(session_id.to_string())
863        .fetch_all(self.pool.as_ref())
864        .await
865        .storage_err()?;
866
867        let mut compacts = Vec::with_capacity(rows.len());
868
869        for row in rows {
870            let id: Uuid = match row.try_get("id") {
871                Ok(id) => id,
872                Err(e) => {
873                    tracing::warn!(session_id = %session_id, error = %e, "Skipping compact row: failed to get id");
874                    continue;
875                }
876            };
877
878            let trigger = match row.try_get::<&str, _>("trigger").ok().and_then(db_to_enum) {
879                Some(t) => t,
880                None => {
881                    tracing::warn!(session_id = %session_id, compact_id = %id, "Skipping compact row: failed to parse trigger");
882                    continue;
883                }
884            };
885
886            let summary = match row.try_get("summary") {
887                Ok(s) => s,
888                Err(e) => {
889                    tracing::warn!(session_id = %session_id, compact_id = %id, error = %e, "Skipping compact row: failed to get summary");
890                    continue;
891                }
892            };
893
894            compacts.push(CompactRecord {
895                id,
896                session_id: *session_id,
897                trigger,
898                pre_tokens: row.try_get::<i32, _>("pre_tokens").unwrap_or(0) as usize,
899                post_tokens: row.try_get::<i32, _>("post_tokens").unwrap_or(0) as usize,
900                saved_tokens: row.try_get::<i32, _>("saved_tokens").unwrap_or(0) as usize,
901                summary,
902                original_count: row.try_get::<i32, _>("original_count").unwrap_or(0) as usize,
903                new_count: row.try_get::<i32, _>("new_count").unwrap_or(0) as usize,
904                logical_parent_id: row
905                    .try_get::<&str, _>("logical_parent_id")
906                    .ok()
907                    .and_then(|s| s.parse().ok()),
908                created_at: row.try_get("created_at").unwrap_or_else(|_| Utc::now()),
909            });
910        }
911
912        Ok(compacts)
913    }
914
915    async fn load_todos_internal(&self, session_id: &SessionId) -> SessionResult<Vec<TodoItem>> {
916        let c = &self.config;
917
918        let rows = sqlx::query(&format!(
919            r#"
920            SELECT id, session_id, plan_id, content, active_form, status,
921                   created_at, started_at, completed_at
922            FROM {todos}
923            WHERE session_id = $1
924            ORDER BY created_at ASC
925            "#,
926            todos = c.todos_table
927        ))
928        .bind(session_id.to_string())
929        .fetch_all(self.pool.as_ref())
930        .await
931        .storage_err()?;
932
933        let mut todos = Vec::with_capacity(rows.len());
934
935        for row in rows {
936            let id: Uuid = match row.try_get("id") {
937                Ok(id) => id,
938                Err(e) => {
939                    tracing::warn!(session_id = %session_id, error = %e, "Skipping todo row: failed to get id");
940                    continue;
941                }
942            };
943
944            let status = match row.try_get::<&str, _>("status").ok().and_then(db_to_enum) {
945                Some(s) => s,
946                None => {
947                    tracing::warn!(session_id = %session_id, todo_id = %id, "Skipping todo row: failed to parse status");
948                    continue;
949                }
950            };
951
952            let content = match row.try_get("content") {
953                Ok(c) => c,
954                Err(e) => {
955                    tracing::warn!(session_id = %session_id, todo_id = %id, error = %e, "Skipping todo row: failed to get content");
956                    continue;
957                }
958            };
959
960            let active_form = match row.try_get("active_form") {
961                Ok(f) => f,
962                Err(e) => {
963                    tracing::warn!(session_id = %session_id, todo_id = %id, error = %e, "Skipping todo row: failed to get active_form");
964                    continue;
965                }
966            };
967
968            todos.push(TodoItem {
969                id,
970                session_id: *session_id,
971                content,
972                active_form,
973                status,
974                plan_id: row.try_get("plan_id").ok(),
975                created_at: row.try_get("created_at").unwrap_or_else(|_| Utc::now()),
976                started_at: row.try_get("started_at").ok(),
977                completed_at: row.try_get("completed_at").ok(),
978            });
979        }
980
981        Ok(todos)
982    }
983
984    async fn load_plan_internal(&self, session_id: &SessionId) -> SessionResult<Option<Plan>> {
985        let c = &self.config;
986
987        let row = sqlx::query(&format!(
988            r#"
989            SELECT id, session_id, name, content, status, error,
990                   created_at, approved_at, started_at, completed_at
991            FROM {plans}
992            WHERE session_id = $1
993            ORDER BY created_at DESC
994            LIMIT 1
995            "#,
996            plans = c.plans_table
997        ))
998        .bind(session_id.to_string())
999        .fetch_optional(self.pool.as_ref())
1000        .await
1001        .storage_err()?;
1002
1003        let Some(row) = row else {
1004            return Ok(None);
1005        };
1006
1007        let id: Uuid = match row.try_get("id") {
1008            Ok(id) => id,
1009            Err(e) => {
1010                tracing::warn!(session_id = %session_id, error = %e, "Skipping plan row: failed to get id");
1011                return Ok(None);
1012            }
1013        };
1014
1015        let status = match row.try_get::<&str, _>("status").ok().and_then(db_to_enum) {
1016            Some(s) => s,
1017            None => {
1018                tracing::warn!(session_id = %session_id, plan_id = %id, "Skipping plan row: failed to parse status");
1019                return Ok(None);
1020            }
1021        };
1022
1023        let content = match row.try_get("content") {
1024            Ok(c) => c,
1025            Err(e) => {
1026                tracing::warn!(session_id = %session_id, plan_id = %id, error = %e, "Skipping plan row: failed to get content");
1027                return Ok(None);
1028            }
1029        };
1030
1031        Ok(Some(Plan {
1032            id,
1033            session_id: *session_id,
1034            name: row.try_get("name").ok(),
1035            content,
1036            status,
1037            error: row.try_get("error").ok(),
1038            created_at: row.try_get("created_at").unwrap_or_else(|_| Utc::now()),
1039            approved_at: row.try_get("approved_at").ok(),
1040            started_at: row.try_get("started_at").ok(),
1041            completed_at: row.try_get("completed_at").ok(),
1042        }))
1043    }
1044
1045    async fn save_todos_tx(
1046        &self,
1047        tx: &mut Transaction<'_, Postgres>,
1048        session_id: &SessionId,
1049        todos: &[TodoItem],
1050    ) -> SessionResult<()> {
1051        let c = &self.config;
1052
1053        sqlx::query(&format!(
1054            "DELETE FROM {todos} WHERE session_id = $1",
1055            todos = c.todos_table
1056        ))
1057        .bind(session_id.to_string())
1058        .execute(&mut **tx)
1059        .await
1060        .storage_err()?;
1061
1062        for todo in todos {
1063            let status = enum_to_db(&todo.status, "pending");
1064
1065            sqlx::query(&format!(
1066                r#"
1067                INSERT INTO {todos} (
1068                    id, session_id, plan_id, content, active_form, status,
1069                    created_at, started_at, completed_at
1070                ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
1071                "#,
1072                todos = c.todos_table
1073            ))
1074            .bind(todo.id)
1075            .bind(session_id.to_string())
1076            .bind(todo.plan_id)
1077            .bind(&todo.content)
1078            .bind(&todo.active_form)
1079            .bind(&status)
1080            .bind(todo.created_at)
1081            .bind(todo.started_at)
1082            .bind(todo.completed_at)
1083            .execute(&mut **tx)
1084            .await
1085            .storage_err()?;
1086        }
1087
1088        Ok(())
1089    }
1090
1091    async fn save_plan_tx(
1092        &self,
1093        tx: &mut Transaction<'_, Postgres>,
1094        plan: &Plan,
1095    ) -> SessionResult<()> {
1096        let c = &self.config;
1097
1098        let status = enum_to_db(&plan.status, "draft");
1099
1100        sqlx::query(&format!(
1101            r#"
1102            INSERT INTO {plans} (
1103                id, session_id, name, content, status, error,
1104                created_at, approved_at, started_at, completed_at
1105            ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
1106            ON CONFLICT (id) DO UPDATE SET
1107                name = EXCLUDED.name,
1108                content = EXCLUDED.content,
1109                status = EXCLUDED.status,
1110                error = EXCLUDED.error,
1111                approved_at = EXCLUDED.approved_at,
1112                started_at = EXCLUDED.started_at,
1113                completed_at = EXCLUDED.completed_at
1114            "#,
1115            plans = c.plans_table
1116        ))
1117        .bind(plan.id)
1118        .bind(plan.session_id.to_string())
1119        .bind(&plan.name)
1120        .bind(&plan.content)
1121        .bind(&status)
1122        .bind(&plan.error)
1123        .bind(plan.created_at)
1124        .bind(plan.approved_at)
1125        .bind(plan.started_at)
1126        .bind(plan.completed_at)
1127        .execute(&mut **tx)
1128        .await
1129        .storage_err()?;
1130
1131        Ok(())
1132    }
1133
1134    async fn save_compacts_tx(
1135        &self,
1136        tx: &mut Transaction<'_, Postgres>,
1137        session_id: &SessionId,
1138        compacts: &VecDeque<CompactRecord>,
1139    ) -> SessionResult<()> {
1140        let c = &self.config;
1141
1142        sqlx::query(&format!(
1143            "DELETE FROM {compacts} WHERE session_id = $1",
1144            compacts = c.compacts_table
1145        ))
1146        .bind(session_id.to_string())
1147        .execute(&mut **tx)
1148        .await
1149        .storage_err()?;
1150
1151        for compact in compacts {
1152            let trigger = enum_to_db(&compact.trigger, "manual");
1153
1154            sqlx::query(&format!(
1155                r#"
1156                INSERT INTO {compacts} (
1157                    id, session_id, trigger, pre_tokens, post_tokens, saved_tokens,
1158                    summary, original_count, new_count, logical_parent_id, created_at
1159                ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)
1160                "#,
1161                compacts = c.compacts_table
1162            ))
1163            .bind(compact.id)
1164            .bind(session_id.to_string())
1165            .bind(&trigger)
1166            .bind(compact.pre_tokens as i32)
1167            .bind(compact.post_tokens as i32)
1168            .bind(compact.saved_tokens as i32)
1169            .bind(&compact.summary)
1170            .bind(compact.original_count as i32)
1171            .bind(compact.new_count as i32)
1172            .bind(compact.logical_parent_id.as_ref().map(|id| id.to_string()))
1173            .bind(compact.created_at)
1174            .execute(&mut **tx)
1175            .await
1176            .storage_err()?;
1177        }
1178
1179        Ok(())
1180    }
1181
1182    async fn save_messages_tx(
1183        &self,
1184        tx: &mut Transaction<'_, Postgres>,
1185        session_id: &SessionId,
1186        messages: &[SessionMessage],
1187    ) -> SessionResult<()> {
1188        let c = &self.config;
1189
1190        // Collect current message IDs for orphan cleanup
1191        let current_ids: Vec<String> = messages.iter().map(|m| m.id.to_string()).collect();
1192
1193        // Upsert each message (INSERT ... ON CONFLICT (id) DO UPDATE)
1194        for message in messages {
1195            let role = enum_to_db(&message.role, "user");
1196
1197            sqlx::query(&format!(
1198                r#"
1199                INSERT INTO {messages} (
1200                    id, session_id, parent_id, role, content, is_sidechain,
1201                    is_compact_summary, model, request_id, usage, metadata,
1202                    environment, created_at
1203                ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13)
1204                ON CONFLICT (id) DO UPDATE SET
1205                    parent_id = EXCLUDED.parent_id,
1206                    role = EXCLUDED.role,
1207                    content = EXCLUDED.content,
1208                    is_sidechain = EXCLUDED.is_sidechain,
1209                    is_compact_summary = EXCLUDED.is_compact_summary,
1210                    model = EXCLUDED.model,
1211                    request_id = EXCLUDED.request_id,
1212                    usage = EXCLUDED.usage,
1213                    metadata = EXCLUDED.metadata,
1214                    environment = EXCLUDED.environment
1215                "#,
1216                messages = c.messages_table
1217            ))
1218            .bind(message.id.to_string())
1219            .bind(session_id.to_string())
1220            .bind(message.parent_id.as_ref().map(|id| id.to_string()))
1221            .bind(&role)
1222            .bind(serde_json::to_value(&message.content).unwrap_or_else(|e| {
1223                tracing::warn!(message_id = %message.id, error = %e, "Failed to serialize message content");
1224                serde_json::Value::Array(Vec::new())
1225            }))
1226            .bind(message.is_sidechain)
1227            .bind(message.is_compact_summary)
1228            .bind(&message.metadata.model)
1229            .bind(&message.metadata.request_id)
1230            .bind(
1231                message
1232                    .usage
1233                    .as_ref()
1234                    .and_then(|u| serde_json::to_value(u).ok()),
1235            )
1236            .bind(serde_json::to_value(&message.metadata).unwrap_or_else(|e| {
1237                tracing::warn!(message_id = %message.id, error = %e, "Failed to serialize message metadata");
1238                serde_json::Value::Object(Default::default())
1239            }))
1240            .bind(
1241                message
1242                    .environment
1243                    .as_ref()
1244                    .and_then(|e| serde_json::to_value(e).ok()),
1245            )
1246            .bind(message.timestamp)
1247            .execute(&mut **tx)
1248            .await
1249            .storage_err()?;
1250        }
1251
1252        // Delete messages no longer in the session.
1253        // Guard: skip when current_ids is empty to avoid deleting ALL messages
1254        // (PostgreSQL treats `id != ALL(ARRAY[])` as true for every row).
1255        if !current_ids.is_empty() {
1256            sqlx::query(&format!(
1257                "DELETE FROM {messages} WHERE session_id = $1 AND id != ALL($2)",
1258                messages = c.messages_table
1259            ))
1260            .bind(session_id.to_string())
1261            .bind(&current_ids)
1262            .execute(&mut **tx)
1263            .await
1264            .storage_err()?;
1265        }
1266
1267        Ok(())
1268    }
1269
1270    async fn save_inner(&self, session: &Session) -> SessionResult<()> {
1271        let c = &self.config;
1272
1273        let mut tx = self.pool.begin().await.storage_err()?;
1274
1275        let session_type = enum_to_db(&session.session_type, "main");
1276        let state = enum_to_db(&session.state, "created");
1277        let mode = "stateless";
1278
1279        sqlx::query(&format!(
1280            r#"
1281            INSERT INTO {sessions} (
1282                id, parent_id, tenant_id, session_type, state, mode,
1283                config, permissions, summary,
1284                total_input_tokens, total_output_tokens, total_cost_usd,
1285                current_leaf_id, static_context_hash, error,
1286                created_at, updated_at, expires_at
1287            ) VALUES (
1288                $1, $2, $3, $4, $5, $6, $7, $8, $9,
1289                $10, $11, $12, $13, $14, $15, $16, $17, $18
1290            )
1291            ON CONFLICT (id) DO UPDATE SET
1292                parent_id = EXCLUDED.parent_id,
1293                tenant_id = EXCLUDED.tenant_id,
1294                session_type = EXCLUDED.session_type,
1295                state = EXCLUDED.state,
1296                mode = EXCLUDED.mode,
1297                config = EXCLUDED.config,
1298                permissions = EXCLUDED.permissions,
1299                summary = EXCLUDED.summary,
1300                total_input_tokens = EXCLUDED.total_input_tokens,
1301                total_output_tokens = EXCLUDED.total_output_tokens,
1302                total_cost_usd = EXCLUDED.total_cost_usd,
1303                current_leaf_id = EXCLUDED.current_leaf_id,
1304                static_context_hash = EXCLUDED.static_context_hash,
1305                error = EXCLUDED.error,
1306                updated_at = EXCLUDED.updated_at,
1307                expires_at = EXCLUDED.expires_at
1308            "#,
1309            sessions = c.sessions_table
1310        ))
1311        .bind(session.id.to_string())
1312        .bind(session.parent_id.map(|id| id.to_string()))
1313        .bind(&session.tenant_id)
1314        .bind(&session_type)
1315        .bind(&state)
1316        .bind(mode)
1317        .bind(serde_json::to_value(&session.config).unwrap_or_else(|e| {
1318            tracing::warn!(session_id = %session.id, error = %e, "Failed to serialize session config");
1319            serde_json::Value::Object(Default::default())
1320        }))
1321        .bind(serde_json::to_value(&session.permissions).unwrap_or_else(|e| {
1322            tracing::warn!(session_id = %session.id, error = %e, "Failed to serialize session permissions");
1323            serde_json::Value::Object(Default::default())
1324        }))
1325        .bind(&session.summary)
1326        .bind(session.total_usage.input_tokens as i64)
1327        .bind(session.total_usage.output_tokens as i64)
1328        .bind(session.total_cost_usd)
1329        .bind(session.current_leaf_id.as_ref().map(|id| id.to_string()))
1330        .bind(&session.static_context_hash)
1331        .bind(&session.error)
1332        .bind(session.created_at)
1333        .bind(session.updated_at)
1334        .bind(session.expires_at)
1335        .execute(&mut *tx)
1336        .await
1337        .storage_err()?;
1338
1339        self.save_messages_tx(&mut tx, &session.id, &session.messages)
1340            .await?;
1341        self.save_todos_tx(&mut tx, &session.id, &session.todos)
1342            .await?;
1343        self.save_compacts_tx(&mut tx, &session.id, &session.compact_history)
1344            .await?;
1345
1346        if let Some(ref plan) = session.current_plan {
1347            self.save_plan_tx(&mut tx, plan).await?;
1348        }
1349
1350        tx.commit().await.storage_err()?;
1351
1352        Ok(())
1353    }
1354}
1355
1356#[async_trait]
1357impl Persistence for PostgresPersistence {
1358    fn name(&self) -> &str {
1359        "postgres"
1360    }
1361
1362    async fn save(&self, session: &Session) -> SessionResult<()> {
1363        self.with_retry(|| self.save_inner(session)).await
1364    }
1365
1366    async fn load(&self, id: &SessionId) -> SessionResult<Option<Session>> {
1367        self.with_retry(|| async {
1368            match self.load_session_row(id).await {
1369                Ok(session) => Ok(Some(session)),
1370                Err(SessionError::NotFound { .. }) => Ok(None),
1371                Err(e) => Err(e),
1372            }
1373        })
1374        .await
1375    }
1376
1377    async fn delete(&self, id: &SessionId) -> SessionResult<bool> {
1378        let sid = *id;
1379        self.with_retry(|| async move {
1380            let c = &self.config;
1381
1382            let result = sqlx::query(&format!(
1383                "DELETE FROM {sessions} WHERE id = $1",
1384                sessions = c.sessions_table
1385            ))
1386            .bind(sid.to_string())
1387            .execute(self.pool.as_ref())
1388            .await
1389            .storage_err()?;
1390
1391            Ok(result.rows_affected() > 0)
1392        })
1393        .await
1394    }
1395
1396    async fn list(&self, tenant_id: Option<&str>) -> SessionResult<Vec<SessionId>> {
1397        let owned_tid = tenant_id.map(|s| s.to_string());
1398        self.with_retry(|| {
1399            let tid = owned_tid.clone();
1400            async move {
1401                let c = &self.config;
1402
1403                let rows = if let Some(ref tid) = tid {
1404                    sqlx::query(&format!(
1405                        "SELECT id FROM {sessions} WHERE tenant_id = $1",
1406                        sessions = c.sessions_table
1407                    ))
1408                    .bind(tid.as_str())
1409                    .fetch_all(self.pool.as_ref())
1410                    .await
1411                } else {
1412                    sqlx::query(&format!(
1413                        "SELECT id FROM {sessions}",
1414                        sessions = c.sessions_table
1415                    ))
1416                    .fetch_all(self.pool.as_ref())
1417                    .await
1418                }
1419                .storage_err()?;
1420
1421                let mut ids = Vec::with_capacity(rows.len());
1422
1423                for row in rows {
1424                    let id_str = match row.try_get::<&str, _>("id") {
1425                        Ok(id) => id,
1426                        Err(e) => {
1427                            tracing::warn!(error = %e, "Skipping session row: failed to get id");
1428                            continue;
1429                        }
1430                    };
1431
1432                    match id_str.parse() {
1433                        Ok(id) => ids.push(id),
1434                        Err(e) => {
1435                            tracing::warn!(id = id_str, error = %e, "Skipping session row: failed to parse id");
1436                        }
1437                    }
1438                }
1439
1440                Ok(ids)
1441            }
1442        })
1443        .await
1444    }
1445
1446    async fn add_summary(&self, snapshot: SummarySnapshot) -> SessionResult<()> {
1447        self.with_retry(|| async {
1448            let c = &self.config;
1449
1450            let mut tx = self.pool.begin().await.storage_err()?;
1451
1452            sqlx::query(&format!(
1453                r#"
1454                INSERT INTO {summaries} (id, session_id, summary, leaf_message_id, created_at)
1455                VALUES ($1, $2, $3, $4, $5)
1456                "#,
1457                summaries = c.summaries_table
1458            ))
1459            .bind(snapshot.id)
1460            .bind(snapshot.session_id.to_string())
1461            .bind(&snapshot.summary)
1462            .bind(snapshot.leaf_message_id.as_ref().map(|id| id.to_string()))
1463            .bind(snapshot.created_at)
1464            .execute(&mut *tx)
1465            .await
1466            .storage_err()?;
1467
1468            sqlx::query(&format!(
1469                "UPDATE {sessions} SET summary = $1, updated_at = NOW() WHERE id = $2",
1470                sessions = c.sessions_table
1471            ))
1472            .bind(&snapshot.summary)
1473            .bind(snapshot.session_id.to_string())
1474            .execute(&mut *tx)
1475            .await
1476            .storage_err()?;
1477
1478            tx.commit().await.storage_err()?;
1479
1480            Ok(())
1481        })
1482        .await
1483    }
1484
1485    async fn get_summaries(&self, session_id: &SessionId) -> SessionResult<Vec<SummarySnapshot>> {
1486        let sid = *session_id;
1487        self.with_retry(|| async move {
1488            let c = &self.config;
1489
1490            let rows = sqlx::query(&format!(
1491                r#"
1492                SELECT id, session_id, summary, leaf_message_id, created_at
1493                FROM {summaries}
1494                WHERE session_id = $1
1495                ORDER BY created_at ASC
1496                "#,
1497                summaries = c.summaries_table
1498            ))
1499            .bind(sid.to_string())
1500            .fetch_all(self.pool.as_ref())
1501            .await
1502            .storage_err()?;
1503
1504            let mut summaries = Vec::with_capacity(rows.len());
1505
1506            for row in rows {
1507                let id: Uuid = match row.try_get("id") {
1508                    Ok(id) => id,
1509                    Err(e) => {
1510                        tracing::warn!(session_id = %sid, error = %e, "Skipping summary row: failed to get id");
1511                        continue;
1512                    }
1513                };
1514
1515                let summary = match row.try_get("summary") {
1516                    Ok(s) => s,
1517                    Err(e) => {
1518                        tracing::warn!(session_id = %sid, summary_id = %id, error = %e, "Skipping summary row: failed to get summary");
1519                        continue;
1520                    }
1521                };
1522
1523                summaries.push(SummarySnapshot {
1524                    id,
1525                    session_id: sid,
1526                    summary,
1527                    leaf_message_id: row
1528                        .try_get::<&str, _>("leaf_message_id")
1529                        .ok()
1530                        .and_then(|s| s.parse().ok()),
1531                    created_at: row.try_get("created_at").unwrap_or_else(|_| Utc::now()),
1532                });
1533            }
1534
1535            Ok(summaries)
1536        })
1537        .await
1538    }
1539
1540    async fn enqueue(
1541        &self,
1542        session_id: &SessionId,
1543        content: String,
1544        priority: i32,
1545    ) -> SessionResult<QueueItem> {
1546        let sid = *session_id;
1547        let item = QueueItem::enqueue(sid, &content).priority(priority);
1548        self.with_retry(|| async {
1549            let c = &self.config;
1550
1551            sqlx::query(&format!(
1552                r#"
1553                INSERT INTO {queue} (id, session_id, operation, content, priority, status, created_at)
1554                VALUES ($1, $2, $3, $4, $5, $6, $7)
1555                "#,
1556                queue = c.queue_table
1557            ))
1558            .bind(item.id)
1559            .bind(sid.to_string())
1560            .bind("enqueue")
1561            .bind(&content)
1562            .bind(priority)
1563            .bind("pending")
1564            .bind(item.created_at)
1565            .execute(self.pool.as_ref())
1566            .await
1567            .storage_err()?;
1568
1569            Ok(item.clone())
1570        })
1571        .await
1572    }
1573
1574    async fn dequeue(&self, session_id: &SessionId) -> SessionResult<Option<QueueItem>> {
1575        let sid = *session_id;
1576        self.with_retry(|| async move {
1577            let c = &self.config;
1578
1579            let row = sqlx::query(&format!(
1580                r#"
1581                UPDATE {queue}
1582                SET status = 'processing'
1583                WHERE id = (
1584                    SELECT id FROM {queue}
1585                    WHERE session_id = $1 AND status = 'pending'
1586                    ORDER BY priority DESC, created_at ASC
1587                    LIMIT 1
1588                    FOR UPDATE SKIP LOCKED
1589                )
1590                RETURNING id, session_id, operation, content, priority, status, created_at, processed_at
1591                "#,
1592                queue = c.queue_table
1593            ))
1594            .bind(sid.to_string())
1595            .fetch_optional(self.pool.as_ref())
1596            .await
1597            .storage_err()?;
1598
1599            let Some(row) = row else {
1600                return Ok(None);
1601            };
1602
1603            let id: Uuid = match row.try_get("id") {
1604                Ok(id) => id,
1605                Err(e) => {
1606                    tracing::warn!(session_id = %sid, error = %e, "Failed to get dequeued item id");
1607                    return Ok(None);
1608                }
1609            };
1610
1611            let content = match row.try_get("content") {
1612                Ok(c) => c,
1613                Err(e) => {
1614                    tracing::warn!(session_id = %sid, queue_id = %id, error = %e, "Failed to get dequeued item content");
1615                    return Ok(None);
1616                }
1617            };
1618
1619            Ok(Some(QueueItem {
1620                id,
1621                session_id: sid,
1622                operation: super::types::QueueOperation::Enqueue,
1623                content,
1624                priority: row.try_get("priority").unwrap_or(0),
1625                status: QueueStatus::Processing,
1626                created_at: row.try_get("created_at").unwrap_or_else(|_| Utc::now()),
1627                processed_at: row.try_get("processed_at").ok(),
1628            }))
1629        })
1630        .await
1631    }
1632
1633    async fn cancel_queued(&self, item_id: Uuid) -> SessionResult<bool> {
1634        self.with_retry(|| async {
1635            let c = &self.config;
1636
1637            let result = sqlx::query(&format!(
1638                "UPDATE {queue} SET status = 'cancelled', processed_at = NOW() WHERE id = $1 AND status = 'pending'",
1639                queue = c.queue_table
1640            ))
1641            .bind(item_id)
1642            .execute(self.pool.as_ref())
1643            .await
1644            .storage_err()?;
1645
1646            Ok(result.rows_affected() > 0)
1647        })
1648        .await
1649    }
1650
1651    async fn pending_queue(&self, session_id: &SessionId) -> SessionResult<Vec<QueueItem>> {
1652        let sid = *session_id;
1653        self.with_retry(|| async move {
1654            let c = &self.config;
1655
1656            let rows = sqlx::query(&format!(
1657                r#"
1658                SELECT id, session_id, operation, content, priority, status, created_at, processed_at
1659                FROM {queue}
1660                WHERE session_id = $1 AND status = 'pending'
1661                ORDER BY priority DESC, created_at ASC
1662                "#,
1663                queue = c.queue_table
1664            ))
1665            .bind(sid.to_string())
1666            .fetch_all(self.pool.as_ref())
1667            .await
1668            .storage_err()?;
1669
1670            let mut items = Vec::with_capacity(rows.len());
1671
1672            for row in rows {
1673                let id: Uuid = match row.try_get("id") {
1674                    Ok(id) => id,
1675                    Err(e) => {
1676                        tracing::warn!(session_id = %sid, error = %e, "Skipping queue row: failed to get id");
1677                        continue;
1678                    }
1679                };
1680
1681                let content = match row.try_get("content") {
1682                    Ok(c) => c,
1683                    Err(e) => {
1684                        tracing::warn!(session_id = %sid, queue_id = %id, error = %e, "Skipping queue row: failed to get content");
1685                        continue;
1686                    }
1687                };
1688
1689                items.push(QueueItem {
1690                    id,
1691                    session_id: sid,
1692                    operation: super::types::QueueOperation::Enqueue,
1693                    content,
1694                    priority: row.try_get("priority").unwrap_or(0),
1695                    status: QueueStatus::Pending,
1696                    created_at: row.try_get("created_at").unwrap_or_else(|_| Utc::now()),
1697                    processed_at: row.try_get("processed_at").ok(),
1698                });
1699            }
1700
1701            Ok(items)
1702        })
1703        .await
1704    }
1705
1706    async fn cleanup_expired(&self) -> SessionResult<usize> {
1707        self.with_retry(|| async {
1708            let c = &self.config;
1709
1710            let result = sqlx::query(&format!(
1711                "DELETE FROM {sessions} WHERE \
1712                 (expires_at IS NOT NULL AND expires_at < NOW()) OR \
1713                 (updated_at < NOW() - make_interval(days => $1))",
1714                sessions = c.sessions_table,
1715            ))
1716            .bind(c.retention_days as i32)
1717            .execute(self.pool.as_ref())
1718            .await
1719            .storage_err()?;
1720
1721            Ok(result.rows_affected() as usize)
1722        })
1723        .await
1724    }
1725}