claude_agent/session/
persistence_postgres.rs

1//! PostgreSQL session persistence with explicit schema management.
2//!
3//! # Schema Management
4//!
5//! This module separates schema management from data access, allowing flexible deployment:
6//!
7//! ```rust,no_run
8//! use claude_agent::session::{PostgresPersistence, PostgresSchema, PostgresConfig};
9//!
10//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
11//! // Option 1: Auto-migrate (development/simple deployments)
12//! let persistence = PostgresPersistence::connect_and_migrate("postgres://...").await?;
13//!
14//! // Option 2: Connect only, manage schema externally (production)
15//! let persistence = PostgresPersistence::connect("postgres://...").await?;
16//!
17//! // Option 3: Export SQL for external migration tools (Flyway, Diesel, etc.)
18//! let sql = PostgresSchema::sql(&PostgresConfig::default());
19//! println!("{}", sql);
20//!
21//! // Option 4: Verify schema is correct
22//! let issues = persistence.verify_schema().await?;
23//! if !issues.is_empty() {
24//!     for issue in &issues {
25//!         eprintln!("Schema issue: {:?}", issue);
26//!     }
27//! }
28//! # Ok(())
29//! # }
30//! ```
31
32use std::sync::Arc;
33use std::time::Duration;
34
35use async_trait::async_trait;
36use chrono::Utc;
37use sqlx::postgres::PgPoolOptions;
38use sqlx::{PgPool, Postgres, Row, Transaction};
39use uuid::Uuid;
40
41use super::persistence::Persistence;
42use super::state::{Session, SessionConfig, SessionId, SessionMessage};
43use super::types::{CompactRecord, Plan, QueueItem, QueueStatus, SummarySnapshot, TodoItem};
44use super::{SessionError, SessionResult};
45
46// ============================================================================
47// Configuration
48// ============================================================================
49
50/// Connection pool configuration for PostgreSQL.
51#[derive(Clone, Debug)]
52pub struct PgPoolConfig {
53    pub max_connections: u32,
54    pub min_connections: u32,
55    pub connect_timeout: Duration,
56    pub idle_timeout: Duration,
57    pub max_lifetime: Duration,
58    pub acquire_timeout: Duration,
59}
60
61impl Default for PgPoolConfig {
62    fn default() -> Self {
63        Self {
64            max_connections: 10,
65            min_connections: 1,
66            connect_timeout: Duration::from_secs(30),
67            idle_timeout: Duration::from_secs(600),
68            max_lifetime: Duration::from_secs(1800),
69            acquire_timeout: Duration::from_secs(30),
70        }
71    }
72}
73
74impl PgPoolConfig {
75    pub fn high_throughput() -> Self {
76        Self {
77            max_connections: 50,
78            min_connections: 5,
79            connect_timeout: Duration::from_secs(10),
80            idle_timeout: Duration::from_secs(300),
81            max_lifetime: Duration::from_secs(900),
82            acquire_timeout: Duration::from_secs(10),
83        }
84    }
85
86    pub(crate) fn apply(&self) -> PgPoolOptions {
87        PgPoolOptions::new()
88            .max_connections(self.max_connections)
89            .min_connections(self.min_connections)
90            .acquire_timeout(self.acquire_timeout)
91            .idle_timeout(Some(self.idle_timeout))
92            .max_lifetime(Some(self.max_lifetime))
93    }
94}
95
96/// PostgreSQL persistence configuration.
97#[derive(Clone, Debug)]
98pub struct PostgresConfig {
99    pub sessions_table: String,
100    pub messages_table: String,
101    pub compacts_table: String,
102    pub summaries_table: String,
103    pub queue_table: String,
104    pub todos_table: String,
105    pub plans_table: String,
106    pub pool: PgPoolConfig,
107}
108
109impl Default for PostgresConfig {
110    fn default() -> Self {
111        Self::with_prefix("claude_")
112    }
113}
114
115impl PostgresConfig {
116    pub fn with_prefix(prefix: &str) -> Self {
117        Self {
118            sessions_table: format!("{prefix}sessions"),
119            messages_table: format!("{prefix}messages"),
120            compacts_table: format!("{prefix}compacts"),
121            summaries_table: format!("{prefix}summaries"),
122            queue_table: format!("{prefix}queue"),
123            todos_table: format!("{prefix}todos"),
124            plans_table: format!("{prefix}plans"),
125            pool: PgPoolConfig::default(),
126        }
127    }
128
129    pub fn with_pool(mut self, pool: PgPoolConfig) -> Self {
130        self.pool = pool;
131        self
132    }
133
134    /// Get all table names.
135    pub fn table_names(&self) -> Vec<&str> {
136        vec![
137            &self.sessions_table,
138            &self.messages_table,
139            &self.compacts_table,
140            &self.summaries_table,
141            &self.queue_table,
142            &self.todos_table,
143            &self.plans_table,
144        ]
145    }
146}
147
148// ============================================================================
149// Schema Management
150// ============================================================================
151
152/// Schema issue found during verification.
153#[derive(Debug, Clone)]
154pub enum SchemaIssue {
155    MissingTable(String),
156    MissingIndex { table: String, index: String },
157    MissingColumn { table: String, column: String },
158}
159
160impl std::fmt::Display for SchemaIssue {
161    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
162        match self {
163            SchemaIssue::MissingTable(t) => write!(f, "Missing table: {}", t),
164            SchemaIssue::MissingIndex { table, index } => {
165                write!(f, "Missing index '{}' on table '{}'", index, table)
166            }
167            SchemaIssue::MissingColumn { table, column } => {
168                write!(f, "Missing column '{}' in table '{}'", column, table)
169            }
170        }
171    }
172}
173
174/// Schema manager for PostgreSQL persistence.
175///
176/// Provides utilities for schema creation, migration, and verification.
177pub struct PostgresSchema;
178
179impl PostgresSchema {
180    /// Generate complete SQL DDL for all tables and indexes.
181    ///
182    /// Use this to integrate with external migration tools (Flyway, Diesel, etc.).
183    pub fn sql(config: &PostgresConfig) -> String {
184        let mut sql = String::new();
185        sql.push_str("-- Claude Agent Session Schema\n");
186        sql.push_str("-- Generated by claude-agent PostgresSchema\n\n");
187
188        for table_sql in Self::table_ddl(config) {
189            sql.push_str(&table_sql);
190            sql.push_str("\n\n");
191        }
192
193        sql.push_str("-- Indexes\n");
194        for index_sql in Self::index_ddl(config) {
195            sql.push_str(&index_sql);
196            sql.push_str(";\n");
197        }
198
199        sql
200    }
201
202    /// Generate table DDL statements.
203    pub fn table_ddl(config: &PostgresConfig) -> Vec<String> {
204        let c = config;
205        vec![
206            format!(
207                r#"CREATE TABLE IF NOT EXISTS {sessions} (
208    id VARCHAR(255) PRIMARY KEY,
209    parent_id VARCHAR(255),
210    tenant_id VARCHAR(255),
211    session_type VARCHAR(32) NOT NULL DEFAULT 'main',
212    state VARCHAR(32) NOT NULL DEFAULT 'created',
213    mode VARCHAR(32) NOT NULL DEFAULT 'default',
214    config JSONB NOT NULL DEFAULT '{{}}',
215    permission_policy JSONB NOT NULL DEFAULT '{{}}',
216    summary TEXT,
217    total_input_tokens BIGINT DEFAULT 0,
218    total_output_tokens BIGINT DEFAULT 0,
219    total_cost_usd DECIMAL(12, 6) DEFAULT 0,
220    current_leaf_id VARCHAR(255),
221    static_context_hash VARCHAR(64),
222    error TEXT,
223    created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
224    updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
225    expires_at TIMESTAMPTZ,
226    CONSTRAINT fk_{sessions}_parent FOREIGN KEY (parent_id) REFERENCES {sessions}(id) ON DELETE CASCADE
227);"#,
228                sessions = c.sessions_table
229            ),
230            format!(
231                r#"CREATE TABLE IF NOT EXISTS {messages} (
232    id VARCHAR(255) PRIMARY KEY,
233    session_id VARCHAR(255) NOT NULL,
234    parent_id VARCHAR(255),
235    role VARCHAR(16) NOT NULL,
236    content JSONB NOT NULL,
237    is_sidechain BOOLEAN DEFAULT FALSE,
238    is_compact_summary BOOLEAN DEFAULT FALSE,
239    model VARCHAR(64),
240    request_id VARCHAR(255),
241    usage JSONB,
242    metadata JSONB,
243    environment JSONB,
244    created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
245    CONSTRAINT fk_{messages}_session FOREIGN KEY (session_id) REFERENCES {sessions}(id) ON DELETE CASCADE
246);"#,
247                messages = c.messages_table,
248                sessions = c.sessions_table
249            ),
250            format!(
251                r#"CREATE TABLE IF NOT EXISTS {compacts} (
252    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
253    session_id VARCHAR(255) NOT NULL,
254    trigger VARCHAR(32) NOT NULL,
255    pre_tokens INTEGER NOT NULL,
256    post_tokens INTEGER NOT NULL,
257    saved_tokens INTEGER NOT NULL,
258    summary TEXT NOT NULL,
259    original_count INTEGER NOT NULL,
260    new_count INTEGER NOT NULL,
261    logical_parent_id VARCHAR(255),
262    created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
263    CONSTRAINT fk_{compacts}_session FOREIGN KEY (session_id) REFERENCES {sessions}(id) ON DELETE CASCADE
264);"#,
265                compacts = c.compacts_table,
266                sessions = c.sessions_table
267            ),
268            format!(
269                r#"CREATE TABLE IF NOT EXISTS {summaries} (
270    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
271    session_id VARCHAR(255) NOT NULL,
272    summary TEXT NOT NULL,
273    leaf_message_id VARCHAR(255),
274    created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
275    CONSTRAINT fk_{summaries}_session FOREIGN KEY (session_id) REFERENCES {sessions}(id) ON DELETE CASCADE
276);"#,
277                summaries = c.summaries_table,
278                sessions = c.sessions_table
279            ),
280            format!(
281                r#"CREATE TABLE IF NOT EXISTS {queue} (
282    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
283    session_id VARCHAR(255) NOT NULL,
284    operation VARCHAR(32) NOT NULL,
285    content TEXT NOT NULL,
286    priority INTEGER DEFAULT 0,
287    status VARCHAR(32) NOT NULL DEFAULT 'pending',
288    created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
289    processed_at TIMESTAMPTZ,
290    CONSTRAINT fk_{queue}_session FOREIGN KEY (session_id) REFERENCES {sessions}(id) ON DELETE CASCADE
291);"#,
292                queue = c.queue_table,
293                sessions = c.sessions_table
294            ),
295            format!(
296                r#"CREATE TABLE IF NOT EXISTS {todos} (
297    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
298    session_id VARCHAR(255) NOT NULL,
299    plan_id UUID,
300    content TEXT NOT NULL,
301    active_form TEXT NOT NULL,
302    status VARCHAR(32) NOT NULL DEFAULT 'pending',
303    created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
304    started_at TIMESTAMPTZ,
305    completed_at TIMESTAMPTZ,
306    CONSTRAINT fk_{todos}_session FOREIGN KEY (session_id) REFERENCES {sessions}(id) ON DELETE CASCADE
307);"#,
308                todos = c.todos_table,
309                sessions = c.sessions_table
310            ),
311            format!(
312                r#"CREATE TABLE IF NOT EXISTS {plans} (
313    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
314    session_id VARCHAR(255) NOT NULL,
315    name VARCHAR(255),
316    content TEXT NOT NULL,
317    status VARCHAR(32) NOT NULL DEFAULT 'draft',
318    error TEXT,
319    created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
320    approved_at TIMESTAMPTZ,
321    started_at TIMESTAMPTZ,
322    completed_at TIMESTAMPTZ,
323    CONSTRAINT fk_{plans}_session FOREIGN KEY (session_id) REFERENCES {sessions}(id) ON DELETE CASCADE
324);"#,
325                plans = c.plans_table,
326                sessions = c.sessions_table
327            ),
328        ]
329    }
330
331    /// Generate index DDL statements.
332    pub fn index_ddl(config: &PostgresConfig) -> Vec<String> {
333        let c = config;
334        vec![
335            // Sessions indexes
336            format!(
337                "CREATE INDEX IF NOT EXISTS idx_{0}_tenant ON {0}(tenant_id)",
338                c.sessions_table
339            ),
340            format!(
341                "CREATE INDEX IF NOT EXISTS idx_{0}_parent ON {0}(parent_id)",
342                c.sessions_table
343            ),
344            format!(
345                "CREATE INDEX IF NOT EXISTS idx_{0}_expires ON {0}(expires_at) WHERE expires_at IS NOT NULL",
346                c.sessions_table
347            ),
348            format!(
349                "CREATE INDEX IF NOT EXISTS idx_{0}_state ON {0}(state)",
350                c.sessions_table
351            ),
352            // Messages indexes
353            format!(
354                "CREATE INDEX IF NOT EXISTS idx_{0}_session ON {0}(session_id)",
355                c.messages_table
356            ),
357            format!(
358                "CREATE INDEX IF NOT EXISTS idx_{0}_created ON {0}(session_id, created_at)",
359                c.messages_table
360            ),
361            // Compacts index
362            format!(
363                "CREATE INDEX IF NOT EXISTS idx_{0}_session ON {0}(session_id)",
364                c.compacts_table
365            ),
366            // Summaries index
367            format!(
368                "CREATE INDEX IF NOT EXISTS idx_{0}_session ON {0}(session_id)",
369                c.summaries_table
370            ),
371            // Queue indexes
372            format!(
373                "CREATE INDEX IF NOT EXISTS idx_{0}_session_status ON {0}(session_id, status)",
374                c.queue_table
375            ),
376            // Todos index
377            format!(
378                "CREATE INDEX IF NOT EXISTS idx_{0}_session ON {0}(session_id)",
379                c.todos_table
380            ),
381            // Plans index
382            format!(
383                "CREATE INDEX IF NOT EXISTS idx_{0}_session ON {0}(session_id)",
384                c.plans_table
385            ),
386        ]
387    }
388
389    /// Get expected indexes as (table_name, index_name) pairs.
390    pub fn expected_indexes(config: &PostgresConfig) -> Vec<(String, String)> {
391        let c = config;
392        vec![
393            (
394                c.sessions_table.clone(),
395                format!("idx_{}_tenant", c.sessions_table),
396            ),
397            (
398                c.sessions_table.clone(),
399                format!("idx_{}_parent", c.sessions_table),
400            ),
401            (
402                c.sessions_table.clone(),
403                format!("idx_{}_expires", c.sessions_table),
404            ),
405            (
406                c.sessions_table.clone(),
407                format!("idx_{}_state", c.sessions_table),
408            ),
409            (
410                c.messages_table.clone(),
411                format!("idx_{}_session", c.messages_table),
412            ),
413            (
414                c.messages_table.clone(),
415                format!("idx_{}_created", c.messages_table),
416            ),
417            (
418                c.compacts_table.clone(),
419                format!("idx_{}_session", c.compacts_table),
420            ),
421            (
422                c.summaries_table.clone(),
423                format!("idx_{}_session", c.summaries_table),
424            ),
425            (
426                c.queue_table.clone(),
427                format!("idx_{}_session_status", c.queue_table),
428            ),
429            (
430                c.todos_table.clone(),
431                format!("idx_{}_session", c.todos_table),
432            ),
433            (
434                c.plans_table.clone(),
435                format!("idx_{}_session", c.plans_table),
436            ),
437        ]
438    }
439
440    /// Run migration to create tables and indexes.
441    pub async fn migrate(pool: &PgPool, config: &PostgresConfig) -> Result<(), sqlx::Error> {
442        for table_ddl in Self::table_ddl(config) {
443            sqlx::query(&table_ddl).execute(pool).await?;
444        }
445
446        for index_ddl in Self::index_ddl(config) {
447            sqlx::query(&index_ddl).execute(pool).await?;
448        }
449
450        Ok(())
451    }
452
453    /// Verify schema integrity - check tables and indexes exist.
454    pub async fn verify(
455        pool: &PgPool,
456        config: &PostgresConfig,
457    ) -> Result<Vec<SchemaIssue>, sqlx::Error> {
458        let mut issues = Vec::new();
459
460        // Check tables
461        for table in config.table_names() {
462            let exists: bool = sqlx::query_scalar(
463                "SELECT EXISTS (SELECT 1 FROM information_schema.tables WHERE table_name = $1)",
464            )
465            .bind(table)
466            .fetch_one(pool)
467            .await?;
468
469            if !exists {
470                issues.push(SchemaIssue::MissingTable(table.to_string()));
471            }
472        }
473
474        // Check indexes
475        for (table, index) in Self::expected_indexes(config) {
476            let exists: bool = sqlx::query_scalar(
477                "SELECT EXISTS (SELECT 1 FROM pg_indexes WHERE tablename = $1 AND indexname = $2)",
478            )
479            .bind(&table)
480            .bind(&index)
481            .fetch_one(pool)
482            .await?;
483
484            if !exists {
485                issues.push(SchemaIssue::MissingIndex { table, index });
486            }
487        }
488
489        Ok(issues)
490    }
491}
492
493// ============================================================================
494// Persistence Implementation
495// ============================================================================
496
497/// PostgreSQL session persistence.
498pub struct PostgresPersistence {
499    pool: Arc<PgPool>,
500    config: PostgresConfig,
501}
502
503impl PostgresPersistence {
504    /// Connect to database without running migrations.
505    ///
506    /// Use this when managing schema externally (production deployments).
507    pub async fn connect(database_url: &str) -> Result<Self, sqlx::Error> {
508        Self::connect_with_config(database_url, PostgresConfig::default()).await
509    }
510
511    /// Connect with custom configuration, without running migrations.
512    pub async fn connect_with_config(
513        database_url: &str,
514        config: PostgresConfig,
515    ) -> Result<Self, sqlx::Error> {
516        let pool = config.pool.apply().connect(database_url).await?;
517        Ok(Self {
518            pool: Arc::new(pool),
519            config,
520        })
521    }
522
523    /// Connect and run migrations automatically.
524    ///
525    /// Convenient for development and simple deployments.
526    pub async fn connect_and_migrate(database_url: &str) -> Result<Self, sqlx::Error> {
527        Self::connect_and_migrate_with_config(database_url, PostgresConfig::default()).await
528    }
529
530    /// Connect with custom configuration and run migrations.
531    pub async fn connect_and_migrate_with_config(
532        database_url: &str,
533        config: PostgresConfig,
534    ) -> Result<Self, sqlx::Error> {
535        let persistence = Self::connect_with_config(database_url, config).await?;
536        persistence.migrate().await?;
537        Ok(persistence)
538    }
539
540    /// Use an existing pool without running migrations.
541    pub fn with_pool(pool: Arc<PgPool>) -> Self {
542        Self::with_pool_and_config(pool, PostgresConfig::default())
543    }
544
545    /// Use an existing pool with custom configuration.
546    pub fn with_pool_and_config(pool: Arc<PgPool>, config: PostgresConfig) -> Self {
547        Self { pool, config }
548    }
549
550    /// Run schema migration.
551    pub async fn migrate(&self) -> Result<(), sqlx::Error> {
552        PostgresSchema::migrate(&self.pool, &self.config).await
553    }
554
555    /// Verify schema integrity.
556    pub async fn verify_schema(&self) -> Result<Vec<SchemaIssue>, sqlx::Error> {
557        PostgresSchema::verify(&self.pool, &self.config).await
558    }
559
560    /// Get the underlying connection pool.
561    pub fn pool(&self) -> &PgPool {
562        &self.pool
563    }
564
565    /// Get the configuration.
566    pub fn config(&self) -> &PostgresConfig {
567        &self.config
568    }
569
570    // ========================================================================
571    // Internal helpers
572    // ========================================================================
573
574    async fn load_session_row(&self, session_id: &SessionId) -> SessionResult<Session> {
575        let c = &self.config;
576        let id_str = session_id.to_string();
577
578        let row = sqlx::query(&format!(
579            r#"
580            SELECT id, parent_id, tenant_id, session_type, state, mode,
581                   config, permission_policy, summary,
582                   total_input_tokens, total_output_tokens, total_cost_usd,
583                   current_leaf_id, static_context_hash, error,
584                   created_at, updated_at, expires_at
585            FROM {sessions}
586            WHERE id = $1
587            "#,
588            sessions = c.sessions_table
589        ))
590        .bind(&id_str)
591        .fetch_optional(self.pool.as_ref())
592        .await
593        .map_err(|e| SessionError::Storage {
594            message: e.to_string(),
595        })?
596        .ok_or_else(|| SessionError::NotFound { id: id_str.clone() })?;
597
598        let messages = self.load_messages(session_id).await?;
599        let compacts = self.load_compacts(session_id).await?;
600        let todos = self.load_todos_internal(session_id).await?;
601        let plan = self.load_plan_internal(session_id).await?;
602
603        let config: SessionConfig = row
604            .try_get::<serde_json::Value, _>("config")
605            .ok()
606            .and_then(|v| serde_json::from_value(v).ok())
607            .unwrap_or_default();
608
609        let permission_policy = row
610            .try_get::<serde_json::Value, _>("permission_policy")
611            .ok()
612            .and_then(|v| serde_json::from_value(v).ok())
613            .unwrap_or_default();
614
615        let session_type = row
616            .try_get::<&str, _>("session_type")
617            .ok()
618            .and_then(|s| serde_json::from_str(&format!("\"{}\"", s)).ok())
619            .unwrap_or_default();
620
621        let mode = row
622            .try_get::<&str, _>("mode")
623            .ok()
624            .and_then(|s| serde_json::from_str(&format!("\"{}\"", s)).ok())
625            .unwrap_or_default();
626
627        let state = row
628            .try_get::<&str, _>("state")
629            .ok()
630            .and_then(|s| serde_json::from_str(&format!("\"{}\"", s)).ok())
631            .unwrap_or_default();
632
633        let current_leaf_id = row
634            .try_get::<&str, _>("current_leaf_id")
635            .ok()
636            .and_then(|s| s.parse().ok());
637
638        Ok(Session {
639            id: *session_id,
640            parent_id: row
641                .try_get::<&str, _>("parent_id")
642                .ok()
643                .and_then(|s| s.parse().ok()),
644            session_type,
645            tenant_id: row.try_get("tenant_id").ok(),
646            mode,
647            state,
648            config,
649            permission_policy,
650            messages,
651            current_leaf_id,
652            summary: row.try_get("summary").ok(),
653            total_usage: crate::types::TokenUsage {
654                input_tokens: row.try_get::<i64, _>("total_input_tokens").unwrap_or(0) as u64,
655                output_tokens: row.try_get::<i64, _>("total_output_tokens").unwrap_or(0) as u64,
656                ..Default::default()
657            },
658            current_input_tokens: 0,
659            total_cost_usd: row
660                .try_get::<rust_decimal::Decimal, _>("total_cost_usd")
661                .ok()
662                .and_then(|d| d.to_string().parse().ok())
663                .unwrap_or(0.0),
664            static_context_hash: row.try_get("static_context_hash").ok(),
665            created_at: row.try_get("created_at").unwrap_or_else(|_| Utc::now()),
666            updated_at: row.try_get("updated_at").unwrap_or_else(|_| Utc::now()),
667            expires_at: row.try_get("expires_at").ok(),
668            error: row.try_get("error").ok(),
669            todos,
670            current_plan: plan,
671            compact_history: compacts,
672        })
673    }
674
675    async fn load_messages(&self, session_id: &SessionId) -> SessionResult<Vec<SessionMessage>> {
676        let c = &self.config;
677
678        let rows = sqlx::query(&format!(
679            r#"
680            SELECT id, parent_id, role, content, is_sidechain, is_compact_summary,
681                   model, request_id, usage, metadata, environment, created_at
682            FROM {messages}
683            WHERE session_id = $1
684            ORDER BY created_at ASC
685            "#,
686            messages = c.messages_table
687        ))
688        .bind(session_id.to_string())
689        .fetch_all(self.pool.as_ref())
690        .await
691        .map_err(|e| SessionError::Storage {
692            message: e.to_string(),
693        })?;
694
695        Ok(rows
696            .into_iter()
697            .filter_map(|row| {
698                let content: Vec<crate::types::ContentBlock> = row
699                    .try_get::<serde_json::Value, _>("content")
700                    .ok()
701                    .and_then(|v| serde_json::from_value(v).ok())?;
702
703                let role: crate::types::Role = row
704                    .try_get::<&str, _>("role")
705                    .ok()
706                    .and_then(|s| serde_json::from_str(&format!("\"{}\"", s)).ok())?;
707
708                let usage = row
709                    .try_get::<serde_json::Value, _>("usage")
710                    .ok()
711                    .and_then(|v| serde_json::from_value(v).ok());
712
713                let metadata = row
714                    .try_get::<serde_json::Value, _>("metadata")
715                    .ok()
716                    .and_then(|v| serde_json::from_value(v).ok())
717                    .unwrap_or_default();
718
719                let environment = row
720                    .try_get::<serde_json::Value, _>("environment")
721                    .ok()
722                    .and_then(|v| serde_json::from_value(v).ok());
723
724                Some(SessionMessage {
725                    id: row.try_get::<&str, _>("id").ok()?.parse().ok()?,
726                    parent_id: row
727                        .try_get::<&str, _>("parent_id")
728                        .ok()
729                        .and_then(|s| s.parse().ok()),
730                    role,
731                    content,
732                    is_sidechain: row.try_get("is_sidechain").unwrap_or(false),
733                    is_compact_summary: row.try_get("is_compact_summary").unwrap_or(false),
734                    usage,
735                    timestamp: row.try_get("created_at").unwrap_or_else(|_| Utc::now()),
736                    metadata,
737                    environment,
738                })
739            })
740            .collect())
741    }
742
743    async fn load_compacts(&self, session_id: &SessionId) -> SessionResult<Vec<CompactRecord>> {
744        let c = &self.config;
745
746        let rows = sqlx::query(&format!(
747            r#"
748            SELECT id, session_id, trigger, pre_tokens, post_tokens, saved_tokens,
749                   summary, original_count, new_count, logical_parent_id, created_at
750            FROM {compacts}
751            WHERE session_id = $1
752            ORDER BY created_at ASC
753            "#,
754            compacts = c.compacts_table
755        ))
756        .bind(session_id.to_string())
757        .fetch_all(self.pool.as_ref())
758        .await
759        .map_err(|e| SessionError::Storage {
760            message: e.to_string(),
761        })?;
762
763        Ok(rows
764            .into_iter()
765            .filter_map(|row| {
766                let trigger = row
767                    .try_get::<&str, _>("trigger")
768                    .ok()
769                    .and_then(|s| serde_json::from_str(&format!("\"{}\"", s)).ok())?;
770
771                Some(CompactRecord {
772                    id: row.try_get("id").ok()?,
773                    session_id: row.try_get::<&str, _>("session_id").ok()?.parse().ok()?,
774                    trigger,
775                    pre_tokens: row.try_get::<i32, _>("pre_tokens").unwrap_or(0) as usize,
776                    post_tokens: row.try_get::<i32, _>("post_tokens").unwrap_or(0) as usize,
777                    saved_tokens: row.try_get::<i32, _>("saved_tokens").unwrap_or(0) as usize,
778                    summary: row.try_get("summary").ok()?,
779                    original_count: row.try_get::<i32, _>("original_count").unwrap_or(0) as usize,
780                    new_count: row.try_get::<i32, _>("new_count").unwrap_or(0) as usize,
781                    logical_parent_id: row
782                        .try_get::<&str, _>("logical_parent_id")
783                        .ok()
784                        .and_then(|s| s.parse().ok()),
785                    created_at: row.try_get("created_at").unwrap_or_else(|_| Utc::now()),
786                })
787            })
788            .collect())
789    }
790
791    async fn load_todos_internal(&self, session_id: &SessionId) -> SessionResult<Vec<TodoItem>> {
792        let c = &self.config;
793
794        let rows = sqlx::query(&format!(
795            r#"
796            SELECT id, session_id, plan_id, content, active_form, status,
797                   created_at, started_at, completed_at
798            FROM {todos}
799            WHERE session_id = $1
800            ORDER BY created_at ASC
801            "#,
802            todos = c.todos_table
803        ))
804        .bind(session_id.to_string())
805        .fetch_all(self.pool.as_ref())
806        .await
807        .map_err(|e| SessionError::Storage {
808            message: e.to_string(),
809        })?;
810
811        Ok(rows
812            .into_iter()
813            .filter_map(|row| {
814                let status = row
815                    .try_get::<&str, _>("status")
816                    .ok()
817                    .and_then(|s| serde_json::from_str(&format!("\"{}\"", s)).ok())?;
818
819                Some(TodoItem {
820                    id: row.try_get("id").ok()?,
821                    session_id: row.try_get::<&str, _>("session_id").ok()?.parse().ok()?,
822                    content: row.try_get("content").ok()?,
823                    active_form: row.try_get("active_form").ok()?,
824                    status,
825                    plan_id: row.try_get("plan_id").ok(),
826                    created_at: row.try_get("created_at").unwrap_or_else(|_| Utc::now()),
827                    started_at: row.try_get("started_at").ok(),
828                    completed_at: row.try_get("completed_at").ok(),
829                })
830            })
831            .collect())
832    }
833
834    async fn load_plan_internal(&self, session_id: &SessionId) -> SessionResult<Option<Plan>> {
835        let c = &self.config;
836
837        let row = sqlx::query(&format!(
838            r#"
839            SELECT id, session_id, name, content, status, error,
840                   created_at, approved_at, started_at, completed_at
841            FROM {plans}
842            WHERE session_id = $1
843            ORDER BY created_at DESC
844            LIMIT 1
845            "#,
846            plans = c.plans_table
847        ))
848        .bind(session_id.to_string())
849        .fetch_optional(self.pool.as_ref())
850        .await
851        .map_err(|e| SessionError::Storage {
852            message: e.to_string(),
853        })?;
854
855        Ok(row.and_then(|row| {
856            let status = row
857                .try_get::<&str, _>("status")
858                .ok()
859                .and_then(|s| serde_json::from_str(&format!("\"{}\"", s)).ok())?;
860
861            Some(Plan {
862                id: row.try_get("id").ok()?,
863                session_id: row.try_get::<&str, _>("session_id").ok()?.parse().ok()?,
864                name: row.try_get("name").ok(),
865                content: row.try_get("content").ok()?,
866                status,
867                error: row.try_get("error").ok(),
868                created_at: row.try_get("created_at").unwrap_or_else(|_| Utc::now()),
869                approved_at: row.try_get("approved_at").ok(),
870                started_at: row.try_get("started_at").ok(),
871                completed_at: row.try_get("completed_at").ok(),
872            })
873        }))
874    }
875
876    async fn save_todos_tx(
877        &self,
878        tx: &mut Transaction<'_, Postgres>,
879        session_id: &SessionId,
880        todos: &[TodoItem],
881    ) -> SessionResult<()> {
882        let c = &self.config;
883
884        sqlx::query(&format!(
885            "DELETE FROM {todos} WHERE session_id = $1",
886            todos = c.todos_table
887        ))
888        .bind(session_id.to_string())
889        .execute(&mut **tx)
890        .await
891        .map_err(|e| SessionError::Storage {
892            message: e.to_string(),
893        })?;
894
895        for todo in todos {
896            let status = serde_json::to_string(&todo.status)
897                .unwrap_or_else(|_| "\"pending\"".to_string())
898                .trim_matches('"')
899                .to_string();
900
901            sqlx::query(&format!(
902                r#"
903                INSERT INTO {todos} (
904                    id, session_id, plan_id, content, active_form, status,
905                    created_at, started_at, completed_at
906                ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
907                "#,
908                todos = c.todos_table
909            ))
910            .bind(todo.id)
911            .bind(session_id.to_string())
912            .bind(todo.plan_id)
913            .bind(&todo.content)
914            .bind(&todo.active_form)
915            .bind(&status)
916            .bind(todo.created_at)
917            .bind(todo.started_at)
918            .bind(todo.completed_at)
919            .execute(&mut **tx)
920            .await
921            .map_err(|e| SessionError::Storage {
922                message: e.to_string(),
923            })?;
924        }
925
926        Ok(())
927    }
928
929    async fn save_plan_tx(
930        &self,
931        tx: &mut Transaction<'_, Postgres>,
932        plan: &Plan,
933    ) -> SessionResult<()> {
934        let c = &self.config;
935
936        let status = serde_json::to_string(&plan.status)
937            .unwrap_or_else(|_| "\"draft\"".to_string())
938            .trim_matches('"')
939            .to_string();
940
941        sqlx::query(&format!(
942            r#"
943            INSERT INTO {plans} (
944                id, session_id, name, content, status, error,
945                created_at, approved_at, started_at, completed_at
946            ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
947            ON CONFLICT (id) DO UPDATE SET
948                name = EXCLUDED.name,
949                content = EXCLUDED.content,
950                status = EXCLUDED.status,
951                error = EXCLUDED.error,
952                approved_at = EXCLUDED.approved_at,
953                started_at = EXCLUDED.started_at,
954                completed_at = EXCLUDED.completed_at
955            "#,
956            plans = c.plans_table
957        ))
958        .bind(plan.id)
959        .bind(plan.session_id.to_string())
960        .bind(&plan.name)
961        .bind(&plan.content)
962        .bind(&status)
963        .bind(&plan.error)
964        .bind(plan.created_at)
965        .bind(plan.approved_at)
966        .bind(plan.started_at)
967        .bind(plan.completed_at)
968        .execute(&mut **tx)
969        .await
970        .map_err(|e| SessionError::Storage {
971            message: e.to_string(),
972        })?;
973
974        Ok(())
975    }
976
977    async fn save_compacts_tx(
978        &self,
979        tx: &mut Transaction<'_, Postgres>,
980        session_id: &SessionId,
981        compacts: &[CompactRecord],
982    ) -> SessionResult<()> {
983        let c = &self.config;
984
985        sqlx::query(&format!(
986            "DELETE FROM {compacts} WHERE session_id = $1",
987            compacts = c.compacts_table
988        ))
989        .bind(session_id.to_string())
990        .execute(&mut **tx)
991        .await
992        .map_err(|e| SessionError::Storage {
993            message: e.to_string(),
994        })?;
995
996        for compact in compacts {
997            let trigger = serde_json::to_string(&compact.trigger)
998                .unwrap_or_else(|_| "\"manual\"".to_string())
999                .trim_matches('"')
1000                .to_string();
1001
1002            sqlx::query(&format!(
1003                r#"
1004                INSERT INTO {compacts} (
1005                    id, session_id, trigger, pre_tokens, post_tokens, saved_tokens,
1006                    summary, original_count, new_count, logical_parent_id, created_at
1007                ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)
1008                "#,
1009                compacts = c.compacts_table
1010            ))
1011            .bind(compact.id)
1012            .bind(session_id.to_string())
1013            .bind(&trigger)
1014            .bind(compact.pre_tokens as i32)
1015            .bind(compact.post_tokens as i32)
1016            .bind(compact.saved_tokens as i32)
1017            .bind(&compact.summary)
1018            .bind(compact.original_count as i32)
1019            .bind(compact.new_count as i32)
1020            .bind(compact.logical_parent_id.as_ref().map(|id| id.to_string()))
1021            .bind(compact.created_at)
1022            .execute(&mut **tx)
1023            .await
1024            .map_err(|e| SessionError::Storage {
1025                message: e.to_string(),
1026            })?;
1027        }
1028
1029        Ok(())
1030    }
1031
1032    async fn save_messages_tx(
1033        &self,
1034        tx: &mut Transaction<'_, Postgres>,
1035        session_id: &SessionId,
1036        messages: &[SessionMessage],
1037    ) -> SessionResult<()> {
1038        let c = &self.config;
1039
1040        sqlx::query(&format!(
1041            "DELETE FROM {messages} WHERE session_id = $1",
1042            messages = c.messages_table
1043        ))
1044        .bind(session_id.to_string())
1045        .execute(&mut **tx)
1046        .await
1047        .map_err(|e| SessionError::Storage {
1048            message: e.to_string(),
1049        })?;
1050
1051        for message in messages {
1052            let role = serde_json::to_string(&message.role)
1053                .unwrap_or_else(|_| "\"user\"".to_string())
1054                .trim_matches('"')
1055                .to_string();
1056
1057            sqlx::query(&format!(
1058                r#"
1059                INSERT INTO {messages} (
1060                    id, session_id, parent_id, role, content, is_sidechain,
1061                    is_compact_summary, model, request_id, usage, metadata,
1062                    environment, created_at
1063                ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13)
1064                "#,
1065                messages = c.messages_table
1066            ))
1067            .bind(message.id.to_string())
1068            .bind(session_id.to_string())
1069            .bind(message.parent_id.as_ref().map(|id| id.to_string()))
1070            .bind(&role)
1071            .bind(serde_json::to_value(&message.content).unwrap_or_default())
1072            .bind(message.is_sidechain)
1073            .bind(message.is_compact_summary)
1074            .bind(&message.metadata.model)
1075            .bind(&message.metadata.request_id)
1076            .bind(
1077                message
1078                    .usage
1079                    .as_ref()
1080                    .and_then(|u| serde_json::to_value(u).ok()),
1081            )
1082            .bind(serde_json::to_value(&message.metadata).unwrap_or_default())
1083            .bind(
1084                message
1085                    .environment
1086                    .as_ref()
1087                    .and_then(|e| serde_json::to_value(e).ok()),
1088            )
1089            .bind(message.timestamp)
1090            .execute(&mut **tx)
1091            .await
1092            .map_err(|e| SessionError::Storage {
1093                message: e.to_string(),
1094            })?;
1095        }
1096
1097        Ok(())
1098    }
1099}
1100
1101#[async_trait]
1102impl Persistence for PostgresPersistence {
1103    fn name(&self) -> &str {
1104        "postgres"
1105    }
1106
1107    async fn save(&self, session: &Session) -> SessionResult<()> {
1108        let c = &self.config;
1109
1110        let mut tx = self.pool.begin().await.map_err(|e| SessionError::Storage {
1111            message: e.to_string(),
1112        })?;
1113
1114        let session_type = serde_json::to_string(&session.session_type)
1115            .unwrap_or_else(|_| "\"main\"".to_string())
1116            .trim_matches('"')
1117            .to_string();
1118
1119        let state = serde_json::to_string(&session.state)
1120            .unwrap_or_else(|_| "\"created\"".to_string())
1121            .trim_matches('"')
1122            .to_string();
1123
1124        let mode = serde_json::to_string(&session.mode)
1125            .unwrap_or_else(|_| "\"default\"".to_string())
1126            .trim_matches('"')
1127            .to_string();
1128
1129        sqlx::query(&format!(
1130            r#"
1131            INSERT INTO {sessions} (
1132                id, parent_id, tenant_id, session_type, state, mode,
1133                config, permission_policy, summary,
1134                total_input_tokens, total_output_tokens, total_cost_usd,
1135                current_leaf_id, static_context_hash, error,
1136                created_at, updated_at, expires_at
1137            ) VALUES (
1138                $1, $2, $3, $4, $5, $6, $7, $8, $9,
1139                $10, $11, $12, $13, $14, $15, $16, $17, $18
1140            )
1141            ON CONFLICT (id) DO UPDATE SET
1142                parent_id = EXCLUDED.parent_id,
1143                tenant_id = EXCLUDED.tenant_id,
1144                session_type = EXCLUDED.session_type,
1145                state = EXCLUDED.state,
1146                mode = EXCLUDED.mode,
1147                config = EXCLUDED.config,
1148                permission_policy = EXCLUDED.permission_policy,
1149                summary = EXCLUDED.summary,
1150                total_input_tokens = EXCLUDED.total_input_tokens,
1151                total_output_tokens = EXCLUDED.total_output_tokens,
1152                total_cost_usd = EXCLUDED.total_cost_usd,
1153                current_leaf_id = EXCLUDED.current_leaf_id,
1154                static_context_hash = EXCLUDED.static_context_hash,
1155                error = EXCLUDED.error,
1156                updated_at = EXCLUDED.updated_at,
1157                expires_at = EXCLUDED.expires_at
1158            "#,
1159            sessions = c.sessions_table
1160        ))
1161        .bind(session.id.to_string())
1162        .bind(session.parent_id.map(|id| id.to_string()))
1163        .bind(&session.tenant_id)
1164        .bind(&session_type)
1165        .bind(&state)
1166        .bind(&mode)
1167        .bind(serde_json::to_value(&session.config).unwrap_or_default())
1168        .bind(serde_json::to_value(&session.permission_policy).unwrap_or_default())
1169        .bind(&session.summary)
1170        .bind(session.total_usage.input_tokens as i64)
1171        .bind(session.total_usage.output_tokens as i64)
1172        .bind(session.total_cost_usd)
1173        .bind(session.current_leaf_id.as_ref().map(|id| id.to_string()))
1174        .bind(&session.static_context_hash)
1175        .bind(&session.error)
1176        .bind(session.created_at)
1177        .bind(session.updated_at)
1178        .bind(session.expires_at)
1179        .execute(&mut *tx)
1180        .await
1181        .map_err(|e| SessionError::Storage {
1182            message: e.to_string(),
1183        })?;
1184
1185        self.save_messages_tx(&mut tx, &session.id, &session.messages)
1186            .await?;
1187        self.save_todos_tx(&mut tx, &session.id, &session.todos)
1188            .await?;
1189        self.save_compacts_tx(&mut tx, &session.id, &session.compact_history)
1190            .await?;
1191
1192        if let Some(ref plan) = session.current_plan {
1193            self.save_plan_tx(&mut tx, plan).await?;
1194        }
1195
1196        tx.commit().await.map_err(|e| SessionError::Storage {
1197            message: e.to_string(),
1198        })?;
1199
1200        Ok(())
1201    }
1202
1203    async fn load(&self, id: &SessionId) -> SessionResult<Option<Session>> {
1204        match self.load_session_row(id).await {
1205            Ok(session) => Ok(Some(session)),
1206            Err(SessionError::NotFound { .. }) => Ok(None),
1207            Err(e) => Err(e),
1208        }
1209    }
1210
1211    async fn delete(&self, id: &SessionId) -> SessionResult<bool> {
1212        let c = &self.config;
1213
1214        let result = sqlx::query(&format!(
1215            "DELETE FROM {sessions} WHERE id = $1",
1216            sessions = c.sessions_table
1217        ))
1218        .bind(id.to_string())
1219        .execute(self.pool.as_ref())
1220        .await
1221        .map_err(|e| SessionError::Storage {
1222            message: e.to_string(),
1223        })?;
1224
1225        Ok(result.rows_affected() > 0)
1226    }
1227
1228    async fn list(&self, tenant_id: Option<&str>) -> SessionResult<Vec<SessionId>> {
1229        let c = &self.config;
1230
1231        let rows = if let Some(tid) = tenant_id {
1232            sqlx::query(&format!(
1233                "SELECT id FROM {sessions} WHERE tenant_id = $1",
1234                sessions = c.sessions_table
1235            ))
1236            .bind(tid)
1237            .fetch_all(self.pool.as_ref())
1238            .await
1239        } else {
1240            sqlx::query(&format!(
1241                "SELECT id FROM {sessions}",
1242                sessions = c.sessions_table
1243            ))
1244            .fetch_all(self.pool.as_ref())
1245            .await
1246        }
1247        .map_err(|e| SessionError::Storage {
1248            message: e.to_string(),
1249        })?;
1250
1251        Ok(rows
1252            .into_iter()
1253            .filter_map(|row| row.try_get::<&str, _>("id").ok()?.parse().ok())
1254            .collect())
1255    }
1256
1257    async fn list_children(&self, parent_id: &SessionId) -> SessionResult<Vec<SessionId>> {
1258        let c = &self.config;
1259
1260        let rows = sqlx::query(&format!(
1261            "SELECT id FROM {sessions} WHERE parent_id = $1",
1262            sessions = c.sessions_table
1263        ))
1264        .bind(parent_id.to_string())
1265        .fetch_all(self.pool.as_ref())
1266        .await
1267        .map_err(|e| SessionError::Storage {
1268            message: e.to_string(),
1269        })?;
1270
1271        Ok(rows
1272            .into_iter()
1273            .filter_map(|row| row.try_get::<&str, _>("id").ok()?.parse().ok())
1274            .collect())
1275    }
1276
1277    async fn add_summary(&self, snapshot: SummarySnapshot) -> SessionResult<()> {
1278        let c = &self.config;
1279
1280        sqlx::query(&format!(
1281            r#"
1282            INSERT INTO {summaries} (id, session_id, summary, leaf_message_id, created_at)
1283            VALUES ($1, $2, $3, $4, $5)
1284            "#,
1285            summaries = c.summaries_table
1286        ))
1287        .bind(snapshot.id)
1288        .bind(snapshot.session_id.to_string())
1289        .bind(&snapshot.summary)
1290        .bind(snapshot.leaf_message_id.map(|id| id.to_string()))
1291        .bind(snapshot.created_at)
1292        .execute(self.pool.as_ref())
1293        .await
1294        .map_err(|e| SessionError::Storage {
1295            message: e.to_string(),
1296        })?;
1297
1298        sqlx::query(&format!(
1299            "UPDATE {sessions} SET summary = $1, updated_at = NOW() WHERE id = $2",
1300            sessions = c.sessions_table
1301        ))
1302        .bind(&snapshot.summary)
1303        .bind(snapshot.session_id.to_string())
1304        .execute(self.pool.as_ref())
1305        .await
1306        .map_err(|e| SessionError::Storage {
1307            message: e.to_string(),
1308        })?;
1309
1310        Ok(())
1311    }
1312
1313    async fn get_summaries(&self, session_id: &SessionId) -> SessionResult<Vec<SummarySnapshot>> {
1314        let c = &self.config;
1315
1316        let rows = sqlx::query(&format!(
1317            r#"
1318            SELECT id, session_id, summary, leaf_message_id, created_at
1319            FROM {summaries}
1320            WHERE session_id = $1
1321            ORDER BY created_at ASC
1322            "#,
1323            summaries = c.summaries_table
1324        ))
1325        .bind(session_id.to_string())
1326        .fetch_all(self.pool.as_ref())
1327        .await
1328        .map_err(|e| SessionError::Storage {
1329            message: e.to_string(),
1330        })?;
1331
1332        Ok(rows
1333            .into_iter()
1334            .filter_map(|row| {
1335                Some(SummarySnapshot {
1336                    id: row.try_get("id").ok()?,
1337                    session_id: row.try_get::<&str, _>("session_id").ok()?.parse().ok()?,
1338                    summary: row.try_get("summary").ok()?,
1339                    leaf_message_id: row
1340                        .try_get::<&str, _>("leaf_message_id")
1341                        .ok()
1342                        .and_then(|s| s.parse().ok()),
1343                    created_at: row.try_get("created_at").unwrap_or_else(|_| Utc::now()),
1344                })
1345            })
1346            .collect())
1347    }
1348
1349    async fn enqueue(
1350        &self,
1351        session_id: &SessionId,
1352        content: String,
1353        priority: i32,
1354    ) -> SessionResult<QueueItem> {
1355        let c = &self.config;
1356        let item = QueueItem::enqueue(*session_id, &content).with_priority(priority);
1357
1358        sqlx::query(&format!(
1359            r#"
1360            INSERT INTO {queue} (id, session_id, operation, content, priority, status, created_at)
1361            VALUES ($1, $2, $3, $4, $5, $6, $7)
1362            "#,
1363            queue = c.queue_table
1364        ))
1365        .bind(item.id)
1366        .bind(session_id.to_string())
1367        .bind("enqueue")
1368        .bind(&content)
1369        .bind(priority)
1370        .bind("pending")
1371        .bind(item.created_at)
1372        .execute(self.pool.as_ref())
1373        .await
1374        .map_err(|e| SessionError::Storage {
1375            message: e.to_string(),
1376        })?;
1377
1378        Ok(item)
1379    }
1380
1381    async fn dequeue(&self, session_id: &SessionId) -> SessionResult<Option<QueueItem>> {
1382        let c = &self.config;
1383
1384        let row = sqlx::query(&format!(
1385            r#"
1386            UPDATE {queue}
1387            SET status = 'processing'
1388            WHERE id = (
1389                SELECT id FROM {queue}
1390                WHERE session_id = $1 AND status = 'pending'
1391                ORDER BY priority DESC, created_at ASC
1392                LIMIT 1
1393                FOR UPDATE SKIP LOCKED
1394            )
1395            RETURNING id, session_id, operation, content, priority, status, created_at, processed_at
1396            "#,
1397            queue = c.queue_table
1398        ))
1399        .bind(session_id.to_string())
1400        .fetch_optional(self.pool.as_ref())
1401        .await
1402        .map_err(|e| SessionError::Storage {
1403            message: e.to_string(),
1404        })?;
1405
1406        Ok(row.and_then(|row| {
1407            Some(QueueItem {
1408                id: row.try_get("id").ok()?,
1409                session_id: row.try_get::<&str, _>("session_id").ok()?.parse().ok()?,
1410                operation: super::types::QueueOperation::Enqueue,
1411                content: row.try_get("content").ok()?,
1412                priority: row.try_get("priority").unwrap_or(0),
1413                status: QueueStatus::Processing,
1414                created_at: row.try_get("created_at").unwrap_or_else(|_| Utc::now()),
1415                processed_at: row.try_get("processed_at").ok(),
1416            })
1417        }))
1418    }
1419
1420    async fn cancel_queued(&self, item_id: Uuid) -> SessionResult<bool> {
1421        let c = &self.config;
1422
1423        let result = sqlx::query(&format!(
1424            "UPDATE {queue} SET status = 'cancelled', processed_at = NOW() WHERE id = $1 AND status = 'pending'",
1425            queue = c.queue_table
1426        ))
1427        .bind(item_id)
1428        .execute(self.pool.as_ref())
1429        .await
1430        .map_err(|e| SessionError::Storage { message: e.to_string() })?;
1431
1432        Ok(result.rows_affected() > 0)
1433    }
1434
1435    async fn pending_queue(&self, session_id: &SessionId) -> SessionResult<Vec<QueueItem>> {
1436        let c = &self.config;
1437
1438        let rows = sqlx::query(&format!(
1439            r#"
1440            SELECT id, session_id, operation, content, priority, status, created_at, processed_at
1441            FROM {queue}
1442            WHERE session_id = $1 AND status = 'pending'
1443            ORDER BY priority DESC, created_at ASC
1444            "#,
1445            queue = c.queue_table
1446        ))
1447        .bind(session_id.to_string())
1448        .fetch_all(self.pool.as_ref())
1449        .await
1450        .map_err(|e| SessionError::Storage {
1451            message: e.to_string(),
1452        })?;
1453
1454        Ok(rows
1455            .into_iter()
1456            .filter_map(|row| {
1457                Some(QueueItem {
1458                    id: row.try_get("id").ok()?,
1459                    session_id: row.try_get::<&str, _>("session_id").ok()?.parse().ok()?,
1460                    operation: super::types::QueueOperation::Enqueue,
1461                    content: row.try_get("content").ok()?,
1462                    priority: row.try_get("priority").unwrap_or(0),
1463                    status: QueueStatus::Pending,
1464                    created_at: row.try_get("created_at").unwrap_or_else(|_| Utc::now()),
1465                    processed_at: row.try_get("processed_at").ok(),
1466                })
1467            })
1468            .collect())
1469    }
1470
1471    async fn cleanup_expired(&self) -> SessionResult<usize> {
1472        let c = &self.config;
1473
1474        let result = sqlx::query(&format!(
1475            "DELETE FROM {sessions} WHERE expires_at IS NOT NULL AND expires_at < NOW()",
1476            sessions = c.sessions_table
1477        ))
1478        .execute(self.pool.as_ref())
1479        .await
1480        .map_err(|e| SessionError::Storage {
1481            message: e.to_string(),
1482        })?;
1483
1484        Ok(result.rows_affected() as usize)
1485    }
1486}