1use std::collections::VecDeque;
33use std::sync::Arc;
34use std::time::Duration;
35
36use async_trait::async_trait;
37use chrono::Utc;
38use sqlx::postgres::PgPoolOptions;
39use sqlx::{PgPool, Postgres, Row, Transaction};
40use uuid::Uuid;
41
42use super::persistence::Persistence;
43use super::state::{Session, SessionConfig, SessionId, SessionMessage};
44use super::types::{CompactRecord, Plan, QueueItem, QueueStatus, SummarySnapshot, TodoItem};
45use super::{SessionError, SessionResult, StorageResultExt};
46
47fn enum_to_db<T: serde::Serialize>(value: &T, default: &str) -> String {
48 serde_json::to_string(value)
49 .map(|s| s.trim_matches('"').to_string())
50 .unwrap_or_else(|_| default.to_string())
51}
52
53fn db_to_enum<T: serde::de::DeserializeOwned>(s: &str) -> Option<T> {
54 serde_json::from_str(&format!("\"{}\"", s)).ok()
55}
56
57#[derive(Clone, Debug)]
63pub struct PgPoolConfig {
64 pub max_connections: u32,
65 pub min_connections: u32,
66 pub connect_timeout: Duration,
67 pub idle_timeout: Duration,
68 pub max_lifetime: Duration,
69 pub acquire_timeout: Duration,
70 pub max_retries: u32,
72 pub initial_backoff: Duration,
74 pub max_backoff: Duration,
76}
77
78impl Default for PgPoolConfig {
79 fn default() -> Self {
80 Self {
81 max_connections: 10,
82 min_connections: 1,
83 connect_timeout: Duration::from_secs(30),
84 idle_timeout: Duration::from_secs(600),
85 max_lifetime: Duration::from_secs(1800),
86 acquire_timeout: Duration::from_secs(30),
87 max_retries: 3,
88 initial_backoff: Duration::from_millis(100),
89 max_backoff: Duration::from_secs(5),
90 }
91 }
92}
93
94impl PgPoolConfig {
95 pub fn high_throughput() -> Self {
96 Self {
97 max_connections: 50,
98 min_connections: 5,
99 connect_timeout: Duration::from_secs(10),
100 idle_timeout: Duration::from_secs(300),
101 max_lifetime: Duration::from_secs(900),
102 acquire_timeout: Duration::from_secs(10),
103 max_retries: 3,
104 initial_backoff: Duration::from_millis(50),
105 max_backoff: Duration::from_secs(2),
106 }
107 }
108
109 pub(crate) fn apply(&self) -> PgPoolOptions {
110 PgPoolOptions::new()
111 .max_connections(self.max_connections)
112 .min_connections(self.min_connections)
113 .acquire_timeout(self.acquire_timeout)
114 .idle_timeout(Some(self.idle_timeout))
115 .max_lifetime(Some(self.max_lifetime))
116 }
117}
118
119#[derive(Clone, Debug)]
121pub struct PostgresConfig {
122 pub sessions_table: String,
123 pub messages_table: String,
124 pub compacts_table: String,
125 pub summaries_table: String,
126 pub queue_table: String,
127 pub todos_table: String,
128 pub plans_table: String,
129 pub pool: PgPoolConfig,
130 pub retention_days: u32,
135}
136
137impl Default for PostgresConfig {
138 fn default() -> Self {
139 Self::prefix("claude_").unwrap()
141 }
142}
143
144impl PostgresConfig {
145 pub fn prefix(prefix: &str) -> Result<Self, SessionError> {
146 if !prefix
147 .chars()
148 .all(|c| c.is_ascii_alphanumeric() || c == '_')
149 {
150 return Err(SessionError::Storage {
151 message: format!(
152 "Invalid table prefix '{}': only ASCII alphanumeric and underscore allowed",
153 prefix
154 ),
155 });
156 }
157 Ok(Self {
158 sessions_table: format!("{prefix}sessions"),
159 messages_table: format!("{prefix}messages"),
160 compacts_table: format!("{prefix}compacts"),
161 summaries_table: format!("{prefix}summaries"),
162 queue_table: format!("{prefix}queue"),
163 todos_table: format!("{prefix}todos"),
164 plans_table: format!("{prefix}plans"),
165 pool: PgPoolConfig::default(),
166 retention_days: 30,
167 })
168 }
169
170 pub fn pool(mut self, pool: PgPoolConfig) -> Self {
171 self.pool = pool;
172 self
173 }
174
175 pub fn retention_days(mut self, days: u32) -> Self {
176 self.retention_days = days;
177 self
178 }
179
180 pub fn table_names(&self) -> Vec<&str> {
182 vec![
183 &self.sessions_table,
184 &self.messages_table,
185 &self.compacts_table,
186 &self.summaries_table,
187 &self.queue_table,
188 &self.todos_table,
189 &self.plans_table,
190 ]
191 }
192}
193
194#[derive(Debug, Clone)]
200pub enum SchemaIssue {
201 MissingTable(String),
202 MissingIndex { table: String, index: String },
203 MissingColumn { table: String, column: String },
204}
205
206impl std::fmt::Display for SchemaIssue {
207 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
208 match self {
209 SchemaIssue::MissingTable(t) => write!(f, "Missing table: {}", t),
210 SchemaIssue::MissingIndex { table, index } => {
211 write!(f, "Missing index '{}' on table '{}'", index, table)
212 }
213 SchemaIssue::MissingColumn { table, column } => {
214 write!(f, "Missing column '{}' in table '{}'", column, table)
215 }
216 }
217 }
218}
219
220pub struct PostgresSchema;
224
225impl PostgresSchema {
226 pub fn sql(config: &PostgresConfig) -> String {
230 let mut sql = String::new();
231 sql.push_str("-- Claude Agent Session Schema\n");
232 sql.push_str("-- Generated by claude-agent PostgresSchema\n\n");
233
234 for table_sql in Self::table_ddl(config) {
235 sql.push_str(&table_sql);
236 sql.push_str("\n\n");
237 }
238
239 sql.push_str("-- Indexes\n");
240 for index_sql in Self::index_ddl(config) {
241 sql.push_str(&index_sql);
242 sql.push_str(";\n");
243 }
244
245 sql
246 }
247
248 pub fn table_ddl(config: &PostgresConfig) -> Vec<String> {
250 let c = config;
251 vec![
252 format!(
253 r#"CREATE TABLE IF NOT EXISTS {sessions} (
254 id VARCHAR(255) PRIMARY KEY,
255 parent_id VARCHAR(255),
256 tenant_id VARCHAR(255),
257 session_type VARCHAR(32) NOT NULL DEFAULT 'main',
258 state VARCHAR(32) NOT NULL DEFAULT 'created',
259 mode VARCHAR(32) NOT NULL DEFAULT 'default',
260 config JSONB NOT NULL DEFAULT '{{}}',
261 permissions JSONB NOT NULL DEFAULT '{{}}',
262 summary TEXT,
263 total_input_tokens BIGINT DEFAULT 0,
264 total_output_tokens BIGINT DEFAULT 0,
265 total_cost_usd DECIMAL(12, 6) DEFAULT 0,
266 current_leaf_id VARCHAR(255),
267 static_context_hash VARCHAR(64),
268 error TEXT,
269 created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
270 updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
271 expires_at TIMESTAMPTZ,
272 CONSTRAINT fk_{sessions}_parent FOREIGN KEY (parent_id) REFERENCES {sessions}(id) ON DELETE CASCADE
273);"#,
274 sessions = c.sessions_table
275 ),
276 format!(
277 r#"CREATE TABLE IF NOT EXISTS {messages} (
278 id VARCHAR(255) PRIMARY KEY,
279 session_id VARCHAR(255) NOT NULL,
280 parent_id VARCHAR(255),
281 role VARCHAR(16) NOT NULL,
282 content JSONB NOT NULL,
283 is_sidechain BOOLEAN DEFAULT FALSE,
284 is_compact_summary BOOLEAN DEFAULT FALSE,
285 model VARCHAR(64),
286 request_id VARCHAR(255),
287 usage JSONB,
288 metadata JSONB,
289 environment JSONB,
290 created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
291 CONSTRAINT fk_{messages}_session FOREIGN KEY (session_id) REFERENCES {sessions}(id) ON DELETE CASCADE
292);"#,
293 messages = c.messages_table,
294 sessions = c.sessions_table
295 ),
296 format!(
297 r#"CREATE TABLE IF NOT EXISTS {compacts} (
298 id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
299 session_id VARCHAR(255) NOT NULL,
300 trigger VARCHAR(32) NOT NULL,
301 pre_tokens INTEGER NOT NULL,
302 post_tokens INTEGER NOT NULL,
303 saved_tokens INTEGER NOT NULL,
304 summary TEXT NOT NULL,
305 original_count INTEGER NOT NULL,
306 new_count INTEGER NOT NULL,
307 logical_parent_id VARCHAR(255),
308 created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
309 CONSTRAINT fk_{compacts}_session FOREIGN KEY (session_id) REFERENCES {sessions}(id) ON DELETE CASCADE
310);"#,
311 compacts = c.compacts_table,
312 sessions = c.sessions_table
313 ),
314 format!(
315 r#"CREATE TABLE IF NOT EXISTS {summaries} (
316 id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
317 session_id VARCHAR(255) NOT NULL,
318 summary TEXT NOT NULL,
319 leaf_message_id VARCHAR(255),
320 created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
321 CONSTRAINT fk_{summaries}_session FOREIGN KEY (session_id) REFERENCES {sessions}(id) ON DELETE CASCADE
322);"#,
323 summaries = c.summaries_table,
324 sessions = c.sessions_table
325 ),
326 format!(
327 r#"CREATE TABLE IF NOT EXISTS {queue} (
328 id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
329 session_id VARCHAR(255) NOT NULL,
330 operation VARCHAR(32) NOT NULL,
331 content TEXT NOT NULL,
332 priority INTEGER DEFAULT 0,
333 status VARCHAR(32) NOT NULL DEFAULT 'pending',
334 created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
335 processed_at TIMESTAMPTZ,
336 CONSTRAINT fk_{queue}_session FOREIGN KEY (session_id) REFERENCES {sessions}(id) ON DELETE CASCADE
337);"#,
338 queue = c.queue_table,
339 sessions = c.sessions_table
340 ),
341 format!(
342 r#"CREATE TABLE IF NOT EXISTS {todos} (
343 id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
344 session_id VARCHAR(255) NOT NULL,
345 plan_id UUID,
346 content TEXT NOT NULL,
347 active_form TEXT NOT NULL,
348 status VARCHAR(32) NOT NULL DEFAULT 'pending',
349 created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
350 started_at TIMESTAMPTZ,
351 completed_at TIMESTAMPTZ,
352 CONSTRAINT fk_{todos}_session FOREIGN KEY (session_id) REFERENCES {sessions}(id) ON DELETE CASCADE
353);"#,
354 todos = c.todos_table,
355 sessions = c.sessions_table
356 ),
357 format!(
358 r#"CREATE TABLE IF NOT EXISTS {plans} (
359 id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
360 session_id VARCHAR(255) NOT NULL,
361 name VARCHAR(255),
362 content TEXT NOT NULL,
363 status VARCHAR(32) NOT NULL DEFAULT 'draft',
364 error TEXT,
365 created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
366 approved_at TIMESTAMPTZ,
367 started_at TIMESTAMPTZ,
368 completed_at TIMESTAMPTZ,
369 CONSTRAINT fk_{plans}_session FOREIGN KEY (session_id) REFERENCES {sessions}(id) ON DELETE CASCADE
370);"#,
371 plans = c.plans_table,
372 sessions = c.sessions_table
373 ),
374 ]
375 }
376
377 pub fn index_ddl(config: &PostgresConfig) -> Vec<String> {
379 let c = config;
380 vec![
381 format!(
383 "CREATE INDEX IF NOT EXISTS idx_{0}_tenant ON {0}(tenant_id)",
384 c.sessions_table
385 ),
386 format!(
387 "CREATE INDEX IF NOT EXISTS idx_{0}_parent ON {0}(parent_id)",
388 c.sessions_table
389 ),
390 format!(
391 "CREATE INDEX IF NOT EXISTS idx_{0}_expires ON {0}(expires_at) WHERE expires_at IS NOT NULL",
392 c.sessions_table
393 ),
394 format!(
395 "CREATE INDEX IF NOT EXISTS idx_{0}_state ON {0}(state)",
396 c.sessions_table
397 ),
398 format!(
400 "CREATE INDEX IF NOT EXISTS idx_{0}_session ON {0}(session_id)",
401 c.messages_table
402 ),
403 format!(
404 "CREATE INDEX IF NOT EXISTS idx_{0}_created ON {0}(session_id, created_at)",
405 c.messages_table
406 ),
407 format!(
409 "CREATE INDEX IF NOT EXISTS idx_{0}_session ON {0}(session_id)",
410 c.compacts_table
411 ),
412 format!(
414 "CREATE INDEX IF NOT EXISTS idx_{0}_session ON {0}(session_id)",
415 c.summaries_table
416 ),
417 format!(
419 "CREATE INDEX IF NOT EXISTS idx_{0}_session_status ON {0}(session_id, status)",
420 c.queue_table
421 ),
422 format!(
424 "CREATE INDEX IF NOT EXISTS idx_{0}_session ON {0}(session_id)",
425 c.todos_table
426 ),
427 format!(
429 "CREATE INDEX IF NOT EXISTS idx_{0}_session ON {0}(session_id)",
430 c.plans_table
431 ),
432 ]
433 }
434
435 pub fn expected_indexes(config: &PostgresConfig) -> Vec<(String, String)> {
437 let c = config;
438 vec![
439 (
440 c.sessions_table.clone(),
441 format!("idx_{}_tenant", c.sessions_table),
442 ),
443 (
444 c.sessions_table.clone(),
445 format!("idx_{}_parent", c.sessions_table),
446 ),
447 (
448 c.sessions_table.clone(),
449 format!("idx_{}_expires", c.sessions_table),
450 ),
451 (
452 c.sessions_table.clone(),
453 format!("idx_{}_state", c.sessions_table),
454 ),
455 (
456 c.messages_table.clone(),
457 format!("idx_{}_session", c.messages_table),
458 ),
459 (
460 c.messages_table.clone(),
461 format!("idx_{}_created", c.messages_table),
462 ),
463 (
464 c.compacts_table.clone(),
465 format!("idx_{}_session", c.compacts_table),
466 ),
467 (
468 c.summaries_table.clone(),
469 format!("idx_{}_session", c.summaries_table),
470 ),
471 (
472 c.queue_table.clone(),
473 format!("idx_{}_session_status", c.queue_table),
474 ),
475 (
476 c.todos_table.clone(),
477 format!("idx_{}_session", c.todos_table),
478 ),
479 (
480 c.plans_table.clone(),
481 format!("idx_{}_session", c.plans_table),
482 ),
483 ]
484 }
485
486 pub async fn migrate(pool: &PgPool, config: &PostgresConfig) -> Result<(), sqlx::Error> {
488 for table_ddl in Self::table_ddl(config) {
489 sqlx::query(&table_ddl).execute(pool).await?;
490 }
491
492 for index_ddl in Self::index_ddl(config) {
493 sqlx::query(&index_ddl).execute(pool).await?;
494 }
495
496 Ok(())
497 }
498
499 pub async fn verify(
501 pool: &PgPool,
502 config: &PostgresConfig,
503 ) -> Result<Vec<SchemaIssue>, sqlx::Error> {
504 let mut issues = Vec::new();
505
506 for table in config.table_names() {
508 let exists: bool = sqlx::query_scalar(
509 "SELECT EXISTS (SELECT 1 FROM information_schema.tables WHERE table_name = $1)",
510 )
511 .bind(table)
512 .fetch_one(pool)
513 .await?;
514
515 if !exists {
516 issues.push(SchemaIssue::MissingTable(table.to_string()));
517 }
518 }
519
520 for (table, index) in Self::expected_indexes(config) {
522 let exists: bool = sqlx::query_scalar(
523 "SELECT EXISTS (SELECT 1 FROM pg_indexes WHERE tablename = $1 AND indexname = $2)",
524 )
525 .bind(&table)
526 .bind(&index)
527 .fetch_one(pool)
528 .await?;
529
530 if !exists {
531 issues.push(SchemaIssue::MissingIndex { table, index });
532 }
533 }
534
535 Ok(issues)
536 }
537}
538
539pub struct PostgresPersistence {
545 pool: Arc<PgPool>,
546 config: PostgresConfig,
547}
548
549impl PostgresPersistence {
550 pub async fn connect(database_url: &str) -> Result<Self, sqlx::Error> {
554 Self::connect_with_config(database_url, PostgresConfig::default()).await
555 }
556
557 pub async fn connect_with_config(
559 database_url: &str,
560 config: PostgresConfig,
561 ) -> Result<Self, sqlx::Error> {
562 let pool = config.pool.apply().connect(database_url).await?;
563 Ok(Self {
564 pool: Arc::new(pool),
565 config,
566 })
567 }
568
569 pub async fn connect_and_migrate(database_url: &str) -> Result<Self, sqlx::Error> {
573 Self::connect_and_migrate_with_config(database_url, PostgresConfig::default()).await
574 }
575
576 pub async fn connect_and_migrate_with_config(
578 database_url: &str,
579 config: PostgresConfig,
580 ) -> Result<Self, sqlx::Error> {
581 let persistence = Self::connect_with_config(database_url, config).await?;
582 persistence.migrate().await?;
583 Ok(persistence)
584 }
585
586 pub fn from_pool(pool: Arc<PgPool>) -> Self {
588 Self::pool_and_config(pool, PostgresConfig::default())
589 }
590
591 pub fn pool_and_config(pool: Arc<PgPool>, config: PostgresConfig) -> Self {
593 Self { pool, config }
594 }
595
596 pub async fn migrate(&self) -> Result<(), sqlx::Error> {
598 PostgresSchema::migrate(&self.pool, &self.config).await
599 }
600
601 pub async fn verify_schema(&self) -> Result<Vec<SchemaIssue>, sqlx::Error> {
603 PostgresSchema::verify(&self.pool, &self.config).await
604 }
605
606 pub fn pool(&self) -> &PgPool {
608 &self.pool
609 }
610
611 pub fn config(&self) -> &PostgresConfig {
613 &self.config
614 }
615
616 async fn with_retry<F, Fut, T>(&self, operation: F) -> SessionResult<T>
621 where
622 F: Fn() -> Fut,
623 Fut: std::future::Future<Output = SessionResult<T>>,
624 {
625 super::with_retry(
626 self.config.pool.max_retries,
627 self.config.pool.initial_backoff,
628 self.config.pool.max_backoff,
629 Self::is_retryable,
630 operation,
631 )
632 .await
633 }
634
635 fn is_retryable(error: &SessionError) -> bool {
636 match error {
637 SessionError::Storage { message } => {
638 message.contains("connection")
639 || message.contains("timeout")
640 || message.contains("reset")
641 || message.contains("broken pipe")
642 || message.contains("serialization")
643 || message.contains("deadlock")
644 || message.contains("could not serialize")
645 }
646 _ => false,
647 }
648 }
649
650 async fn load_session_row(&self, session_id: &SessionId) -> SessionResult<Session> {
651 let c = &self.config;
652 let id_str = session_id.to_string();
653
654 let row = sqlx::query(&format!(
655 r#"
656 SELECT id, parent_id, tenant_id, session_type, state, mode,
657 config, permissions, summary,
658 total_input_tokens, total_output_tokens, total_cost_usd,
659 current_leaf_id, static_context_hash, error,
660 created_at, updated_at, expires_at
661 FROM {sessions}
662 WHERE id = $1
663 "#,
664 sessions = c.sessions_table
665 ))
666 .bind(&id_str)
667 .fetch_optional(self.pool.as_ref())
668 .await
669 .storage_err()?
670 .ok_or_else(|| SessionError::NotFound { id: id_str.clone() })?;
671
672 let messages = self.load_messages(session_id).await?;
673 let compacts = self.load_compacts(session_id).await?;
674 let todos = self.load_todos_internal(session_id).await?;
675 let plan = self.load_plan_internal(session_id).await?;
676
677 let config: SessionConfig = match row.try_get::<serde_json::Value, _>("config") {
678 Ok(v) => serde_json::from_value(v).unwrap_or_else(|e| {
679 tracing::warn!(session_id = %session_id, error = %e, "Failed to deserialize session config");
680 Default::default()
681 }),
682 Err(e) => {
683 tracing::warn!(session_id = %session_id, error = %e, "Failed to read session config column");
684 Default::default()
685 }
686 };
687
688 let permissions = match row.try_get::<serde_json::Value, _>("permissions") {
689 Ok(v) => serde_json::from_value(v).unwrap_or_else(|e| {
690 tracing::warn!(session_id = %session_id, error = %e, "Failed to deserialize session permissions");
691 Default::default()
692 }),
693 Err(e) => {
694 tracing::warn!(session_id = %session_id, error = %e, "Failed to read session permissions column");
695 Default::default()
696 }
697 };
698
699 let session_type = row
700 .try_get::<&str, _>("session_type")
701 .ok()
702 .and_then(db_to_enum)
703 .unwrap_or_default();
704
705 let _ = row.try_get::<&str, _>("mode");
707
708 let state = row
709 .try_get::<&str, _>("state")
710 .ok()
711 .and_then(db_to_enum)
712 .unwrap_or_default();
713
714 let current_leaf_id = row
715 .try_get::<&str, _>("current_leaf_id")
716 .ok()
717 .and_then(|s| s.parse().ok());
718
719 Ok(Session {
720 id: *session_id,
721 parent_id: row
722 .try_get::<&str, _>("parent_id")
723 .ok()
724 .and_then(|s| s.parse().ok()),
725 session_type,
726 tenant_id: row.try_get("tenant_id").ok(),
727 state,
728 config,
729 permissions,
730 messages,
731 current_leaf_id,
732 summary: row.try_get("summary").ok(),
733 total_usage: crate::types::TokenUsage {
734 input_tokens: row.try_get::<i64, _>("total_input_tokens").unwrap_or(0) as u64,
735 output_tokens: row.try_get::<i64, _>("total_output_tokens").unwrap_or(0) as u64,
736 ..Default::default()
737 },
738 current_input_tokens: 0,
739 total_cost_usd: row
740 .try_get::<rust_decimal::Decimal, _>("total_cost_usd")
741 .unwrap_or_default(),
742 static_context_hash: row.try_get("static_context_hash").ok(),
743 created_at: row.try_get("created_at").unwrap_or_else(|_| Utc::now()),
744 updated_at: row.try_get("updated_at").unwrap_or_else(|_| Utc::now()),
745 expires_at: row.try_get("expires_at").ok(),
746 error: row.try_get("error").ok(),
747 todos,
748 current_plan: plan,
749 compact_history: VecDeque::from(compacts),
750 })
751 }
752
753 async fn load_messages(&self, session_id: &SessionId) -> SessionResult<Vec<SessionMessage>> {
754 let c = &self.config;
755
756 let rows = sqlx::query(&format!(
757 r#"
758 SELECT id, parent_id, role, content, is_sidechain, is_compact_summary,
759 model, request_id, usage, metadata, environment, created_at
760 FROM {messages}
761 WHERE session_id = $1
762 ORDER BY created_at ASC
763 "#,
764 messages = c.messages_table
765 ))
766 .bind(session_id.to_string())
767 .fetch_all(self.pool.as_ref())
768 .await
769 .storage_err()?;
770
771 let mut messages = Vec::with_capacity(rows.len());
772
773 for row in rows {
774 let id_str = match row.try_get::<&str, _>("id") {
775 Ok(id) => id,
776 Err(e) => {
777 tracing::warn!(error = %e, "Skipping message row: failed to get id");
778 continue;
779 }
780 };
781
782 let id = match id_str.parse() {
783 Ok(id) => id,
784 Err(e) => {
785 tracing::warn!(id = id_str, error = %e, "Skipping message row: failed to parse id");
786 continue;
787 }
788 };
789
790 let content: Vec<crate::types::ContentBlock> = match row
791 .try_get::<serde_json::Value, _>("content")
792 .ok()
793 .and_then(|v| serde_json::from_value(v).ok())
794 {
795 Some(c) => c,
796 None => {
797 tracing::warn!(id = id_str, "Skipping message row: failed to parse content");
798 continue;
799 }
800 };
801
802 let role: crate::types::Role =
803 match row.try_get::<&str, _>("role").ok().and_then(db_to_enum) {
804 Some(r) => r,
805 None => {
806 tracing::warn!(id = id_str, "Skipping message row: failed to parse role");
807 continue;
808 }
809 };
810
811 let usage = row
812 .try_get::<serde_json::Value, _>("usage")
813 .ok()
814 .and_then(|v| serde_json::from_value(v).ok());
815
816 let metadata = match row.try_get::<serde_json::Value, _>("metadata") {
817 Ok(v) => serde_json::from_value(v).unwrap_or_else(|e| {
818 tracing::warn!(id = id_str, error = %e, "Failed to deserialize message metadata");
819 Default::default()
820 }),
821 Err(_) => Default::default(),
822 };
823
824 let environment = row
825 .try_get::<serde_json::Value, _>("environment")
826 .ok()
827 .and_then(|v| serde_json::from_value(v).ok());
828
829 messages.push(SessionMessage {
830 id,
831 parent_id: row
832 .try_get::<&str, _>("parent_id")
833 .ok()
834 .and_then(|s| s.parse().ok()),
835 role,
836 content,
837 is_sidechain: row.try_get("is_sidechain").unwrap_or(false),
838 is_compact_summary: row.try_get("is_compact_summary").unwrap_or(false),
839 usage,
840 timestamp: row.try_get("created_at").unwrap_or_else(|_| Utc::now()),
841 metadata,
842 environment,
843 });
844 }
845
846 Ok(messages)
847 }
848
849 async fn load_compacts(&self, session_id: &SessionId) -> SessionResult<Vec<CompactRecord>> {
850 let c = &self.config;
851
852 let rows = sqlx::query(&format!(
853 r#"
854 SELECT id, session_id, trigger, pre_tokens, post_tokens, saved_tokens,
855 summary, original_count, new_count, logical_parent_id, created_at
856 FROM {compacts}
857 WHERE session_id = $1
858 ORDER BY created_at ASC
859 "#,
860 compacts = c.compacts_table
861 ))
862 .bind(session_id.to_string())
863 .fetch_all(self.pool.as_ref())
864 .await
865 .storage_err()?;
866
867 let mut compacts = Vec::with_capacity(rows.len());
868
869 for row in rows {
870 let id: Uuid = match row.try_get("id") {
871 Ok(id) => id,
872 Err(e) => {
873 tracing::warn!(session_id = %session_id, error = %e, "Skipping compact row: failed to get id");
874 continue;
875 }
876 };
877
878 let trigger = match row.try_get::<&str, _>("trigger").ok().and_then(db_to_enum) {
879 Some(t) => t,
880 None => {
881 tracing::warn!(session_id = %session_id, compact_id = %id, "Skipping compact row: failed to parse trigger");
882 continue;
883 }
884 };
885
886 let summary = match row.try_get("summary") {
887 Ok(s) => s,
888 Err(e) => {
889 tracing::warn!(session_id = %session_id, compact_id = %id, error = %e, "Skipping compact row: failed to get summary");
890 continue;
891 }
892 };
893
894 compacts.push(CompactRecord {
895 id,
896 session_id: *session_id,
897 trigger,
898 pre_tokens: row.try_get::<i32, _>("pre_tokens").unwrap_or(0) as usize,
899 post_tokens: row.try_get::<i32, _>("post_tokens").unwrap_or(0) as usize,
900 saved_tokens: row.try_get::<i32, _>("saved_tokens").unwrap_or(0) as usize,
901 summary,
902 original_count: row.try_get::<i32, _>("original_count").unwrap_or(0) as usize,
903 new_count: row.try_get::<i32, _>("new_count").unwrap_or(0) as usize,
904 logical_parent_id: row
905 .try_get::<&str, _>("logical_parent_id")
906 .ok()
907 .and_then(|s| s.parse().ok()),
908 created_at: row.try_get("created_at").unwrap_or_else(|_| Utc::now()),
909 });
910 }
911
912 Ok(compacts)
913 }
914
915 async fn load_todos_internal(&self, session_id: &SessionId) -> SessionResult<Vec<TodoItem>> {
916 let c = &self.config;
917
918 let rows = sqlx::query(&format!(
919 r#"
920 SELECT id, session_id, plan_id, content, active_form, status,
921 created_at, started_at, completed_at
922 FROM {todos}
923 WHERE session_id = $1
924 ORDER BY created_at ASC
925 "#,
926 todos = c.todos_table
927 ))
928 .bind(session_id.to_string())
929 .fetch_all(self.pool.as_ref())
930 .await
931 .storage_err()?;
932
933 let mut todos = Vec::with_capacity(rows.len());
934
935 for row in rows {
936 let id: Uuid = match row.try_get("id") {
937 Ok(id) => id,
938 Err(e) => {
939 tracing::warn!(session_id = %session_id, error = %e, "Skipping todo row: failed to get id");
940 continue;
941 }
942 };
943
944 let status = match row.try_get::<&str, _>("status").ok().and_then(db_to_enum) {
945 Some(s) => s,
946 None => {
947 tracing::warn!(session_id = %session_id, todo_id = %id, "Skipping todo row: failed to parse status");
948 continue;
949 }
950 };
951
952 let content = match row.try_get("content") {
953 Ok(c) => c,
954 Err(e) => {
955 tracing::warn!(session_id = %session_id, todo_id = %id, error = %e, "Skipping todo row: failed to get content");
956 continue;
957 }
958 };
959
960 let active_form = match row.try_get("active_form") {
961 Ok(f) => f,
962 Err(e) => {
963 tracing::warn!(session_id = %session_id, todo_id = %id, error = %e, "Skipping todo row: failed to get active_form");
964 continue;
965 }
966 };
967
968 todos.push(TodoItem {
969 id,
970 session_id: *session_id,
971 content,
972 active_form,
973 status,
974 plan_id: row.try_get("plan_id").ok(),
975 created_at: row.try_get("created_at").unwrap_or_else(|_| Utc::now()),
976 started_at: row.try_get("started_at").ok(),
977 completed_at: row.try_get("completed_at").ok(),
978 });
979 }
980
981 Ok(todos)
982 }
983
984 async fn load_plan_internal(&self, session_id: &SessionId) -> SessionResult<Option<Plan>> {
985 let c = &self.config;
986
987 let row = sqlx::query(&format!(
988 r#"
989 SELECT id, session_id, name, content, status, error,
990 created_at, approved_at, started_at, completed_at
991 FROM {plans}
992 WHERE session_id = $1
993 ORDER BY created_at DESC
994 LIMIT 1
995 "#,
996 plans = c.plans_table
997 ))
998 .bind(session_id.to_string())
999 .fetch_optional(self.pool.as_ref())
1000 .await
1001 .storage_err()?;
1002
1003 let Some(row) = row else {
1004 return Ok(None);
1005 };
1006
1007 let id: Uuid = match row.try_get("id") {
1008 Ok(id) => id,
1009 Err(e) => {
1010 tracing::warn!(session_id = %session_id, error = %e, "Skipping plan row: failed to get id");
1011 return Ok(None);
1012 }
1013 };
1014
1015 let status = match row.try_get::<&str, _>("status").ok().and_then(db_to_enum) {
1016 Some(s) => s,
1017 None => {
1018 tracing::warn!(session_id = %session_id, plan_id = %id, "Skipping plan row: failed to parse status");
1019 return Ok(None);
1020 }
1021 };
1022
1023 let content = match row.try_get("content") {
1024 Ok(c) => c,
1025 Err(e) => {
1026 tracing::warn!(session_id = %session_id, plan_id = %id, error = %e, "Skipping plan row: failed to get content");
1027 return Ok(None);
1028 }
1029 };
1030
1031 Ok(Some(Plan {
1032 id,
1033 session_id: *session_id,
1034 name: row.try_get("name").ok(),
1035 content,
1036 status,
1037 error: row.try_get("error").ok(),
1038 created_at: row.try_get("created_at").unwrap_or_else(|_| Utc::now()),
1039 approved_at: row.try_get("approved_at").ok(),
1040 started_at: row.try_get("started_at").ok(),
1041 completed_at: row.try_get("completed_at").ok(),
1042 }))
1043 }
1044
1045 async fn save_todos_tx(
1046 &self,
1047 tx: &mut Transaction<'_, Postgres>,
1048 session_id: &SessionId,
1049 todos: &[TodoItem],
1050 ) -> SessionResult<()> {
1051 let c = &self.config;
1052
1053 sqlx::query(&format!(
1054 "DELETE FROM {todos} WHERE session_id = $1",
1055 todos = c.todos_table
1056 ))
1057 .bind(session_id.to_string())
1058 .execute(&mut **tx)
1059 .await
1060 .storage_err()?;
1061
1062 for todo in todos {
1063 let status = enum_to_db(&todo.status, "pending");
1064
1065 sqlx::query(&format!(
1066 r#"
1067 INSERT INTO {todos} (
1068 id, session_id, plan_id, content, active_form, status,
1069 created_at, started_at, completed_at
1070 ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
1071 "#,
1072 todos = c.todos_table
1073 ))
1074 .bind(todo.id)
1075 .bind(session_id.to_string())
1076 .bind(todo.plan_id)
1077 .bind(&todo.content)
1078 .bind(&todo.active_form)
1079 .bind(&status)
1080 .bind(todo.created_at)
1081 .bind(todo.started_at)
1082 .bind(todo.completed_at)
1083 .execute(&mut **tx)
1084 .await
1085 .storage_err()?;
1086 }
1087
1088 Ok(())
1089 }
1090
1091 async fn save_plan_tx(
1092 &self,
1093 tx: &mut Transaction<'_, Postgres>,
1094 plan: &Plan,
1095 ) -> SessionResult<()> {
1096 let c = &self.config;
1097
1098 let status = enum_to_db(&plan.status, "draft");
1099
1100 sqlx::query(&format!(
1101 r#"
1102 INSERT INTO {plans} (
1103 id, session_id, name, content, status, error,
1104 created_at, approved_at, started_at, completed_at
1105 ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
1106 ON CONFLICT (id) DO UPDATE SET
1107 name = EXCLUDED.name,
1108 content = EXCLUDED.content,
1109 status = EXCLUDED.status,
1110 error = EXCLUDED.error,
1111 approved_at = EXCLUDED.approved_at,
1112 started_at = EXCLUDED.started_at,
1113 completed_at = EXCLUDED.completed_at
1114 "#,
1115 plans = c.plans_table
1116 ))
1117 .bind(plan.id)
1118 .bind(plan.session_id.to_string())
1119 .bind(&plan.name)
1120 .bind(&plan.content)
1121 .bind(&status)
1122 .bind(&plan.error)
1123 .bind(plan.created_at)
1124 .bind(plan.approved_at)
1125 .bind(plan.started_at)
1126 .bind(plan.completed_at)
1127 .execute(&mut **tx)
1128 .await
1129 .storage_err()?;
1130
1131 Ok(())
1132 }
1133
1134 async fn save_compacts_tx(
1135 &self,
1136 tx: &mut Transaction<'_, Postgres>,
1137 session_id: &SessionId,
1138 compacts: &VecDeque<CompactRecord>,
1139 ) -> SessionResult<()> {
1140 let c = &self.config;
1141
1142 sqlx::query(&format!(
1143 "DELETE FROM {compacts} WHERE session_id = $1",
1144 compacts = c.compacts_table
1145 ))
1146 .bind(session_id.to_string())
1147 .execute(&mut **tx)
1148 .await
1149 .storage_err()?;
1150
1151 for compact in compacts {
1152 let trigger = enum_to_db(&compact.trigger, "manual");
1153
1154 sqlx::query(&format!(
1155 r#"
1156 INSERT INTO {compacts} (
1157 id, session_id, trigger, pre_tokens, post_tokens, saved_tokens,
1158 summary, original_count, new_count, logical_parent_id, created_at
1159 ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)
1160 "#,
1161 compacts = c.compacts_table
1162 ))
1163 .bind(compact.id)
1164 .bind(session_id.to_string())
1165 .bind(&trigger)
1166 .bind(compact.pre_tokens as i32)
1167 .bind(compact.post_tokens as i32)
1168 .bind(compact.saved_tokens as i32)
1169 .bind(&compact.summary)
1170 .bind(compact.original_count as i32)
1171 .bind(compact.new_count as i32)
1172 .bind(compact.logical_parent_id.as_ref().map(|id| id.to_string()))
1173 .bind(compact.created_at)
1174 .execute(&mut **tx)
1175 .await
1176 .storage_err()?;
1177 }
1178
1179 Ok(())
1180 }
1181
1182 async fn save_messages_tx(
1183 &self,
1184 tx: &mut Transaction<'_, Postgres>,
1185 session_id: &SessionId,
1186 messages: &[SessionMessage],
1187 ) -> SessionResult<()> {
1188 let c = &self.config;
1189
1190 let current_ids: Vec<String> = messages.iter().map(|m| m.id.to_string()).collect();
1192
1193 for message in messages {
1195 let role = enum_to_db(&message.role, "user");
1196
1197 sqlx::query(&format!(
1198 r#"
1199 INSERT INTO {messages} (
1200 id, session_id, parent_id, role, content, is_sidechain,
1201 is_compact_summary, model, request_id, usage, metadata,
1202 environment, created_at
1203 ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13)
1204 ON CONFLICT (id) DO UPDATE SET
1205 parent_id = EXCLUDED.parent_id,
1206 role = EXCLUDED.role,
1207 content = EXCLUDED.content,
1208 is_sidechain = EXCLUDED.is_sidechain,
1209 is_compact_summary = EXCLUDED.is_compact_summary,
1210 model = EXCLUDED.model,
1211 request_id = EXCLUDED.request_id,
1212 usage = EXCLUDED.usage,
1213 metadata = EXCLUDED.metadata,
1214 environment = EXCLUDED.environment
1215 "#,
1216 messages = c.messages_table
1217 ))
1218 .bind(message.id.to_string())
1219 .bind(session_id.to_string())
1220 .bind(message.parent_id.as_ref().map(|id| id.to_string()))
1221 .bind(&role)
1222 .bind(serde_json::to_value(&message.content).unwrap_or_else(|e| {
1223 tracing::warn!(message_id = %message.id, error = %e, "Failed to serialize message content");
1224 serde_json::Value::Array(Vec::new())
1225 }))
1226 .bind(message.is_sidechain)
1227 .bind(message.is_compact_summary)
1228 .bind(&message.metadata.model)
1229 .bind(&message.metadata.request_id)
1230 .bind(
1231 message
1232 .usage
1233 .as_ref()
1234 .and_then(|u| serde_json::to_value(u).ok()),
1235 )
1236 .bind(serde_json::to_value(&message.metadata).unwrap_or_else(|e| {
1237 tracing::warn!(message_id = %message.id, error = %e, "Failed to serialize message metadata");
1238 serde_json::Value::Object(Default::default())
1239 }))
1240 .bind(
1241 message
1242 .environment
1243 .as_ref()
1244 .and_then(|e| serde_json::to_value(e).ok()),
1245 )
1246 .bind(message.timestamp)
1247 .execute(&mut **tx)
1248 .await
1249 .storage_err()?;
1250 }
1251
1252 if !current_ids.is_empty() {
1256 sqlx::query(&format!(
1257 "DELETE FROM {messages} WHERE session_id = $1 AND id != ALL($2)",
1258 messages = c.messages_table
1259 ))
1260 .bind(session_id.to_string())
1261 .bind(¤t_ids)
1262 .execute(&mut **tx)
1263 .await
1264 .storage_err()?;
1265 }
1266
1267 Ok(())
1268 }
1269
1270 async fn save_inner(&self, session: &Session) -> SessionResult<()> {
1271 let c = &self.config;
1272
1273 let mut tx = self.pool.begin().await.storage_err()?;
1274
1275 let session_type = enum_to_db(&session.session_type, "main");
1276 let state = enum_to_db(&session.state, "created");
1277 let mode = "stateless";
1278
1279 sqlx::query(&format!(
1280 r#"
1281 INSERT INTO {sessions} (
1282 id, parent_id, tenant_id, session_type, state, mode,
1283 config, permissions, summary,
1284 total_input_tokens, total_output_tokens, total_cost_usd,
1285 current_leaf_id, static_context_hash, error,
1286 created_at, updated_at, expires_at
1287 ) VALUES (
1288 $1, $2, $3, $4, $5, $6, $7, $8, $9,
1289 $10, $11, $12, $13, $14, $15, $16, $17, $18
1290 )
1291 ON CONFLICT (id) DO UPDATE SET
1292 parent_id = EXCLUDED.parent_id,
1293 tenant_id = EXCLUDED.tenant_id,
1294 session_type = EXCLUDED.session_type,
1295 state = EXCLUDED.state,
1296 mode = EXCLUDED.mode,
1297 config = EXCLUDED.config,
1298 permissions = EXCLUDED.permissions,
1299 summary = EXCLUDED.summary,
1300 total_input_tokens = EXCLUDED.total_input_tokens,
1301 total_output_tokens = EXCLUDED.total_output_tokens,
1302 total_cost_usd = EXCLUDED.total_cost_usd,
1303 current_leaf_id = EXCLUDED.current_leaf_id,
1304 static_context_hash = EXCLUDED.static_context_hash,
1305 error = EXCLUDED.error,
1306 updated_at = EXCLUDED.updated_at,
1307 expires_at = EXCLUDED.expires_at
1308 "#,
1309 sessions = c.sessions_table
1310 ))
1311 .bind(session.id.to_string())
1312 .bind(session.parent_id.map(|id| id.to_string()))
1313 .bind(&session.tenant_id)
1314 .bind(&session_type)
1315 .bind(&state)
1316 .bind(mode)
1317 .bind(serde_json::to_value(&session.config).unwrap_or_else(|e| {
1318 tracing::warn!(session_id = %session.id, error = %e, "Failed to serialize session config");
1319 serde_json::Value::Object(Default::default())
1320 }))
1321 .bind(serde_json::to_value(&session.permissions).unwrap_or_else(|e| {
1322 tracing::warn!(session_id = %session.id, error = %e, "Failed to serialize session permissions");
1323 serde_json::Value::Object(Default::default())
1324 }))
1325 .bind(&session.summary)
1326 .bind(session.total_usage.input_tokens as i64)
1327 .bind(session.total_usage.output_tokens as i64)
1328 .bind(session.total_cost_usd)
1329 .bind(session.current_leaf_id.as_ref().map(|id| id.to_string()))
1330 .bind(&session.static_context_hash)
1331 .bind(&session.error)
1332 .bind(session.created_at)
1333 .bind(session.updated_at)
1334 .bind(session.expires_at)
1335 .execute(&mut *tx)
1336 .await
1337 .storage_err()?;
1338
1339 self.save_messages_tx(&mut tx, &session.id, &session.messages)
1340 .await?;
1341 self.save_todos_tx(&mut tx, &session.id, &session.todos)
1342 .await?;
1343 self.save_compacts_tx(&mut tx, &session.id, &session.compact_history)
1344 .await?;
1345
1346 if let Some(ref plan) = session.current_plan {
1347 self.save_plan_tx(&mut tx, plan).await?;
1348 }
1349
1350 tx.commit().await.storage_err()?;
1351
1352 Ok(())
1353 }
1354}
1355
1356#[async_trait]
1357impl Persistence for PostgresPersistence {
1358 fn name(&self) -> &str {
1359 "postgres"
1360 }
1361
1362 async fn save(&self, session: &Session) -> SessionResult<()> {
1363 self.with_retry(|| self.save_inner(session)).await
1364 }
1365
1366 async fn load(&self, id: &SessionId) -> SessionResult<Option<Session>> {
1367 self.with_retry(|| async {
1368 match self.load_session_row(id).await {
1369 Ok(session) => Ok(Some(session)),
1370 Err(SessionError::NotFound { .. }) => Ok(None),
1371 Err(e) => Err(e),
1372 }
1373 })
1374 .await
1375 }
1376
1377 async fn delete(&self, id: &SessionId) -> SessionResult<bool> {
1378 let sid = *id;
1379 self.with_retry(|| async move {
1380 let c = &self.config;
1381
1382 let result = sqlx::query(&format!(
1383 "DELETE FROM {sessions} WHERE id = $1",
1384 sessions = c.sessions_table
1385 ))
1386 .bind(sid.to_string())
1387 .execute(self.pool.as_ref())
1388 .await
1389 .storage_err()?;
1390
1391 Ok(result.rows_affected() > 0)
1392 })
1393 .await
1394 }
1395
1396 async fn list(&self, tenant_id: Option<&str>) -> SessionResult<Vec<SessionId>> {
1397 let owned_tid = tenant_id.map(|s| s.to_string());
1398 self.with_retry(|| {
1399 let tid = owned_tid.clone();
1400 async move {
1401 let c = &self.config;
1402
1403 let rows = if let Some(ref tid) = tid {
1404 sqlx::query(&format!(
1405 "SELECT id FROM {sessions} WHERE tenant_id = $1",
1406 sessions = c.sessions_table
1407 ))
1408 .bind(tid.as_str())
1409 .fetch_all(self.pool.as_ref())
1410 .await
1411 } else {
1412 sqlx::query(&format!(
1413 "SELECT id FROM {sessions}",
1414 sessions = c.sessions_table
1415 ))
1416 .fetch_all(self.pool.as_ref())
1417 .await
1418 }
1419 .storage_err()?;
1420
1421 let mut ids = Vec::with_capacity(rows.len());
1422
1423 for row in rows {
1424 let id_str = match row.try_get::<&str, _>("id") {
1425 Ok(id) => id,
1426 Err(e) => {
1427 tracing::warn!(error = %e, "Skipping session row: failed to get id");
1428 continue;
1429 }
1430 };
1431
1432 match id_str.parse() {
1433 Ok(id) => ids.push(id),
1434 Err(e) => {
1435 tracing::warn!(id = id_str, error = %e, "Skipping session row: failed to parse id");
1436 }
1437 }
1438 }
1439
1440 Ok(ids)
1441 }
1442 })
1443 .await
1444 }
1445
1446 async fn add_summary(&self, snapshot: SummarySnapshot) -> SessionResult<()> {
1447 self.with_retry(|| async {
1448 let c = &self.config;
1449
1450 let mut tx = self.pool.begin().await.storage_err()?;
1451
1452 sqlx::query(&format!(
1453 r#"
1454 INSERT INTO {summaries} (id, session_id, summary, leaf_message_id, created_at)
1455 VALUES ($1, $2, $3, $4, $5)
1456 "#,
1457 summaries = c.summaries_table
1458 ))
1459 .bind(snapshot.id)
1460 .bind(snapshot.session_id.to_string())
1461 .bind(&snapshot.summary)
1462 .bind(snapshot.leaf_message_id.as_ref().map(|id| id.to_string()))
1463 .bind(snapshot.created_at)
1464 .execute(&mut *tx)
1465 .await
1466 .storage_err()?;
1467
1468 sqlx::query(&format!(
1469 "UPDATE {sessions} SET summary = $1, updated_at = NOW() WHERE id = $2",
1470 sessions = c.sessions_table
1471 ))
1472 .bind(&snapshot.summary)
1473 .bind(snapshot.session_id.to_string())
1474 .execute(&mut *tx)
1475 .await
1476 .storage_err()?;
1477
1478 tx.commit().await.storage_err()?;
1479
1480 Ok(())
1481 })
1482 .await
1483 }
1484
1485 async fn get_summaries(&self, session_id: &SessionId) -> SessionResult<Vec<SummarySnapshot>> {
1486 let sid = *session_id;
1487 self.with_retry(|| async move {
1488 let c = &self.config;
1489
1490 let rows = sqlx::query(&format!(
1491 r#"
1492 SELECT id, session_id, summary, leaf_message_id, created_at
1493 FROM {summaries}
1494 WHERE session_id = $1
1495 ORDER BY created_at ASC
1496 "#,
1497 summaries = c.summaries_table
1498 ))
1499 .bind(sid.to_string())
1500 .fetch_all(self.pool.as_ref())
1501 .await
1502 .storage_err()?;
1503
1504 let mut summaries = Vec::with_capacity(rows.len());
1505
1506 for row in rows {
1507 let id: Uuid = match row.try_get("id") {
1508 Ok(id) => id,
1509 Err(e) => {
1510 tracing::warn!(session_id = %sid, error = %e, "Skipping summary row: failed to get id");
1511 continue;
1512 }
1513 };
1514
1515 let summary = match row.try_get("summary") {
1516 Ok(s) => s,
1517 Err(e) => {
1518 tracing::warn!(session_id = %sid, summary_id = %id, error = %e, "Skipping summary row: failed to get summary");
1519 continue;
1520 }
1521 };
1522
1523 summaries.push(SummarySnapshot {
1524 id,
1525 session_id: sid,
1526 summary,
1527 leaf_message_id: row
1528 .try_get::<&str, _>("leaf_message_id")
1529 .ok()
1530 .and_then(|s| s.parse().ok()),
1531 created_at: row.try_get("created_at").unwrap_or_else(|_| Utc::now()),
1532 });
1533 }
1534
1535 Ok(summaries)
1536 })
1537 .await
1538 }
1539
1540 async fn enqueue(
1541 &self,
1542 session_id: &SessionId,
1543 content: String,
1544 priority: i32,
1545 ) -> SessionResult<QueueItem> {
1546 let sid = *session_id;
1547 let item = QueueItem::enqueue(sid, &content).priority(priority);
1548 self.with_retry(|| async {
1549 let c = &self.config;
1550
1551 sqlx::query(&format!(
1552 r#"
1553 INSERT INTO {queue} (id, session_id, operation, content, priority, status, created_at)
1554 VALUES ($1, $2, $3, $4, $5, $6, $7)
1555 "#,
1556 queue = c.queue_table
1557 ))
1558 .bind(item.id)
1559 .bind(sid.to_string())
1560 .bind("enqueue")
1561 .bind(&content)
1562 .bind(priority)
1563 .bind("pending")
1564 .bind(item.created_at)
1565 .execute(self.pool.as_ref())
1566 .await
1567 .storage_err()?;
1568
1569 Ok(item.clone())
1570 })
1571 .await
1572 }
1573
1574 async fn dequeue(&self, session_id: &SessionId) -> SessionResult<Option<QueueItem>> {
1575 let sid = *session_id;
1576 self.with_retry(|| async move {
1577 let c = &self.config;
1578
1579 let row = sqlx::query(&format!(
1580 r#"
1581 UPDATE {queue}
1582 SET status = 'processing'
1583 WHERE id = (
1584 SELECT id FROM {queue}
1585 WHERE session_id = $1 AND status = 'pending'
1586 ORDER BY priority DESC, created_at ASC
1587 LIMIT 1
1588 FOR UPDATE SKIP LOCKED
1589 )
1590 RETURNING id, session_id, operation, content, priority, status, created_at, processed_at
1591 "#,
1592 queue = c.queue_table
1593 ))
1594 .bind(sid.to_string())
1595 .fetch_optional(self.pool.as_ref())
1596 .await
1597 .storage_err()?;
1598
1599 let Some(row) = row else {
1600 return Ok(None);
1601 };
1602
1603 let id: Uuid = match row.try_get("id") {
1604 Ok(id) => id,
1605 Err(e) => {
1606 tracing::warn!(session_id = %sid, error = %e, "Failed to get dequeued item id");
1607 return Ok(None);
1608 }
1609 };
1610
1611 let content = match row.try_get("content") {
1612 Ok(c) => c,
1613 Err(e) => {
1614 tracing::warn!(session_id = %sid, queue_id = %id, error = %e, "Failed to get dequeued item content");
1615 return Ok(None);
1616 }
1617 };
1618
1619 Ok(Some(QueueItem {
1620 id,
1621 session_id: sid,
1622 operation: super::types::QueueOperation::Enqueue,
1623 content,
1624 priority: row.try_get("priority").unwrap_or(0),
1625 status: QueueStatus::Processing,
1626 created_at: row.try_get("created_at").unwrap_or_else(|_| Utc::now()),
1627 processed_at: row.try_get("processed_at").ok(),
1628 }))
1629 })
1630 .await
1631 }
1632
1633 async fn cancel_queued(&self, item_id: Uuid) -> SessionResult<bool> {
1634 self.with_retry(|| async {
1635 let c = &self.config;
1636
1637 let result = sqlx::query(&format!(
1638 "UPDATE {queue} SET status = 'cancelled', processed_at = NOW() WHERE id = $1 AND status = 'pending'",
1639 queue = c.queue_table
1640 ))
1641 .bind(item_id)
1642 .execute(self.pool.as_ref())
1643 .await
1644 .storage_err()?;
1645
1646 Ok(result.rows_affected() > 0)
1647 })
1648 .await
1649 }
1650
1651 async fn pending_queue(&self, session_id: &SessionId) -> SessionResult<Vec<QueueItem>> {
1652 let sid = *session_id;
1653 self.with_retry(|| async move {
1654 let c = &self.config;
1655
1656 let rows = sqlx::query(&format!(
1657 r#"
1658 SELECT id, session_id, operation, content, priority, status, created_at, processed_at
1659 FROM {queue}
1660 WHERE session_id = $1 AND status = 'pending'
1661 ORDER BY priority DESC, created_at ASC
1662 "#,
1663 queue = c.queue_table
1664 ))
1665 .bind(sid.to_string())
1666 .fetch_all(self.pool.as_ref())
1667 .await
1668 .storage_err()?;
1669
1670 let mut items = Vec::with_capacity(rows.len());
1671
1672 for row in rows {
1673 let id: Uuid = match row.try_get("id") {
1674 Ok(id) => id,
1675 Err(e) => {
1676 tracing::warn!(session_id = %sid, error = %e, "Skipping queue row: failed to get id");
1677 continue;
1678 }
1679 };
1680
1681 let content = match row.try_get("content") {
1682 Ok(c) => c,
1683 Err(e) => {
1684 tracing::warn!(session_id = %sid, queue_id = %id, error = %e, "Skipping queue row: failed to get content");
1685 continue;
1686 }
1687 };
1688
1689 items.push(QueueItem {
1690 id,
1691 session_id: sid,
1692 operation: super::types::QueueOperation::Enqueue,
1693 content,
1694 priority: row.try_get("priority").unwrap_or(0),
1695 status: QueueStatus::Pending,
1696 created_at: row.try_get("created_at").unwrap_or_else(|_| Utc::now()),
1697 processed_at: row.try_get("processed_at").ok(),
1698 });
1699 }
1700
1701 Ok(items)
1702 })
1703 .await
1704 }
1705
1706 async fn cleanup_expired(&self) -> SessionResult<usize> {
1707 self.with_retry(|| async {
1708 let c = &self.config;
1709
1710 let result = sqlx::query(&format!(
1711 "DELETE FROM {sessions} WHERE \
1712 (expires_at IS NOT NULL AND expires_at < NOW()) OR \
1713 (updated_at < NOW() - make_interval(days => $1))",
1714 sessions = c.sessions_table,
1715 ))
1716 .bind(c.retention_days as i32)
1717 .execute(self.pool.as_ref())
1718 .await
1719 .storage_err()?;
1720
1721 Ok(result.rows_affected() as usize)
1722 })
1723 .await
1724 }
1725}