1use std::sync::Arc;
33use std::time::Duration;
34
35use async_trait::async_trait;
36use chrono::Utc;
37use sqlx::postgres::PgPoolOptions;
38use sqlx::{PgPool, Postgres, Row, Transaction};
39use uuid::Uuid;
40
41use super::persistence::Persistence;
42use super::state::{Session, SessionConfig, SessionId, SessionMessage};
43use super::types::{CompactRecord, Plan, QueueItem, QueueStatus, SummarySnapshot, TodoItem};
44use super::{SessionError, SessionResult};
45
46#[derive(Clone, Debug)]
52pub struct PgPoolConfig {
53 pub max_connections: u32,
54 pub min_connections: u32,
55 pub connect_timeout: Duration,
56 pub idle_timeout: Duration,
57 pub max_lifetime: Duration,
58 pub acquire_timeout: Duration,
59}
60
61impl Default for PgPoolConfig {
62 fn default() -> Self {
63 Self {
64 max_connections: 10,
65 min_connections: 1,
66 connect_timeout: Duration::from_secs(30),
67 idle_timeout: Duration::from_secs(600),
68 max_lifetime: Duration::from_secs(1800),
69 acquire_timeout: Duration::from_secs(30),
70 }
71 }
72}
73
74impl PgPoolConfig {
75 pub fn high_throughput() -> Self {
76 Self {
77 max_connections: 50,
78 min_connections: 5,
79 connect_timeout: Duration::from_secs(10),
80 idle_timeout: Duration::from_secs(300),
81 max_lifetime: Duration::from_secs(900),
82 acquire_timeout: Duration::from_secs(10),
83 }
84 }
85
86 pub(crate) fn apply(&self) -> PgPoolOptions {
87 PgPoolOptions::new()
88 .max_connections(self.max_connections)
89 .min_connections(self.min_connections)
90 .acquire_timeout(self.acquire_timeout)
91 .idle_timeout(Some(self.idle_timeout))
92 .max_lifetime(Some(self.max_lifetime))
93 }
94}
95
96#[derive(Clone, Debug)]
98pub struct PostgresConfig {
99 pub sessions_table: String,
100 pub messages_table: String,
101 pub compacts_table: String,
102 pub summaries_table: String,
103 pub queue_table: String,
104 pub todos_table: String,
105 pub plans_table: String,
106 pub pool: PgPoolConfig,
107}
108
109impl Default for PostgresConfig {
110 fn default() -> Self {
111 Self::with_prefix("claude_")
112 }
113}
114
115impl PostgresConfig {
116 pub fn with_prefix(prefix: &str) -> Self {
117 Self {
118 sessions_table: format!("{prefix}sessions"),
119 messages_table: format!("{prefix}messages"),
120 compacts_table: format!("{prefix}compacts"),
121 summaries_table: format!("{prefix}summaries"),
122 queue_table: format!("{prefix}queue"),
123 todos_table: format!("{prefix}todos"),
124 plans_table: format!("{prefix}plans"),
125 pool: PgPoolConfig::default(),
126 }
127 }
128
129 pub fn with_pool(mut self, pool: PgPoolConfig) -> Self {
130 self.pool = pool;
131 self
132 }
133
134 pub fn table_names(&self) -> Vec<&str> {
136 vec![
137 &self.sessions_table,
138 &self.messages_table,
139 &self.compacts_table,
140 &self.summaries_table,
141 &self.queue_table,
142 &self.todos_table,
143 &self.plans_table,
144 ]
145 }
146}
147
148#[derive(Debug, Clone)]
154pub enum SchemaIssue {
155 MissingTable(String),
156 MissingIndex { table: String, index: String },
157 MissingColumn { table: String, column: String },
158}
159
160impl std::fmt::Display for SchemaIssue {
161 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
162 match self {
163 SchemaIssue::MissingTable(t) => write!(f, "Missing table: {}", t),
164 SchemaIssue::MissingIndex { table, index } => {
165 write!(f, "Missing index '{}' on table '{}'", index, table)
166 }
167 SchemaIssue::MissingColumn { table, column } => {
168 write!(f, "Missing column '{}' in table '{}'", column, table)
169 }
170 }
171 }
172}
173
174pub struct PostgresSchema;
178
179impl PostgresSchema {
180 pub fn sql(config: &PostgresConfig) -> String {
184 let mut sql = String::new();
185 sql.push_str("-- Claude Agent Session Schema\n");
186 sql.push_str("-- Generated by claude-agent PostgresSchema\n\n");
187
188 for table_sql in Self::table_ddl(config) {
189 sql.push_str(&table_sql);
190 sql.push_str("\n\n");
191 }
192
193 sql.push_str("-- Indexes\n");
194 for index_sql in Self::index_ddl(config) {
195 sql.push_str(&index_sql);
196 sql.push_str(";\n");
197 }
198
199 sql
200 }
201
202 pub fn table_ddl(config: &PostgresConfig) -> Vec<String> {
204 let c = config;
205 vec![
206 format!(
207 r#"CREATE TABLE IF NOT EXISTS {sessions} (
208 id VARCHAR(255) PRIMARY KEY,
209 parent_id VARCHAR(255),
210 tenant_id VARCHAR(255),
211 session_type VARCHAR(32) NOT NULL DEFAULT 'main',
212 state VARCHAR(32) NOT NULL DEFAULT 'created',
213 mode VARCHAR(32) NOT NULL DEFAULT 'default',
214 config JSONB NOT NULL DEFAULT '{{}}',
215 permission_policy JSONB NOT NULL DEFAULT '{{}}',
216 summary TEXT,
217 total_input_tokens BIGINT DEFAULT 0,
218 total_output_tokens BIGINT DEFAULT 0,
219 total_cost_usd DECIMAL(12, 6) DEFAULT 0,
220 current_leaf_id VARCHAR(255),
221 static_context_hash VARCHAR(64),
222 error TEXT,
223 created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
224 updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
225 expires_at TIMESTAMPTZ,
226 CONSTRAINT fk_{sessions}_parent FOREIGN KEY (parent_id) REFERENCES {sessions}(id) ON DELETE CASCADE
227);"#,
228 sessions = c.sessions_table
229 ),
230 format!(
231 r#"CREATE TABLE IF NOT EXISTS {messages} (
232 id VARCHAR(255) PRIMARY KEY,
233 session_id VARCHAR(255) NOT NULL,
234 parent_id VARCHAR(255),
235 role VARCHAR(16) NOT NULL,
236 content JSONB NOT NULL,
237 is_sidechain BOOLEAN DEFAULT FALSE,
238 is_compact_summary BOOLEAN DEFAULT FALSE,
239 model VARCHAR(64),
240 request_id VARCHAR(255),
241 usage JSONB,
242 metadata JSONB,
243 environment JSONB,
244 created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
245 CONSTRAINT fk_{messages}_session FOREIGN KEY (session_id) REFERENCES {sessions}(id) ON DELETE CASCADE
246);"#,
247 messages = c.messages_table,
248 sessions = c.sessions_table
249 ),
250 format!(
251 r#"CREATE TABLE IF NOT EXISTS {compacts} (
252 id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
253 session_id VARCHAR(255) NOT NULL,
254 trigger VARCHAR(32) NOT NULL,
255 pre_tokens INTEGER NOT NULL,
256 post_tokens INTEGER NOT NULL,
257 saved_tokens INTEGER NOT NULL,
258 summary TEXT NOT NULL,
259 original_count INTEGER NOT NULL,
260 new_count INTEGER NOT NULL,
261 logical_parent_id VARCHAR(255),
262 created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
263 CONSTRAINT fk_{compacts}_session FOREIGN KEY (session_id) REFERENCES {sessions}(id) ON DELETE CASCADE
264);"#,
265 compacts = c.compacts_table,
266 sessions = c.sessions_table
267 ),
268 format!(
269 r#"CREATE TABLE IF NOT EXISTS {summaries} (
270 id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
271 session_id VARCHAR(255) NOT NULL,
272 summary TEXT NOT NULL,
273 leaf_message_id VARCHAR(255),
274 created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
275 CONSTRAINT fk_{summaries}_session FOREIGN KEY (session_id) REFERENCES {sessions}(id) ON DELETE CASCADE
276);"#,
277 summaries = c.summaries_table,
278 sessions = c.sessions_table
279 ),
280 format!(
281 r#"CREATE TABLE IF NOT EXISTS {queue} (
282 id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
283 session_id VARCHAR(255) NOT NULL,
284 operation VARCHAR(32) NOT NULL,
285 content TEXT NOT NULL,
286 priority INTEGER DEFAULT 0,
287 status VARCHAR(32) NOT NULL DEFAULT 'pending',
288 created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
289 processed_at TIMESTAMPTZ,
290 CONSTRAINT fk_{queue}_session FOREIGN KEY (session_id) REFERENCES {sessions}(id) ON DELETE CASCADE
291);"#,
292 queue = c.queue_table,
293 sessions = c.sessions_table
294 ),
295 format!(
296 r#"CREATE TABLE IF NOT EXISTS {todos} (
297 id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
298 session_id VARCHAR(255) NOT NULL,
299 plan_id UUID,
300 content TEXT NOT NULL,
301 active_form TEXT NOT NULL,
302 status VARCHAR(32) NOT NULL DEFAULT 'pending',
303 created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
304 started_at TIMESTAMPTZ,
305 completed_at TIMESTAMPTZ,
306 CONSTRAINT fk_{todos}_session FOREIGN KEY (session_id) REFERENCES {sessions}(id) ON DELETE CASCADE
307);"#,
308 todos = c.todos_table,
309 sessions = c.sessions_table
310 ),
311 format!(
312 r#"CREATE TABLE IF NOT EXISTS {plans} (
313 id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
314 session_id VARCHAR(255) NOT NULL,
315 name VARCHAR(255),
316 content TEXT NOT NULL,
317 status VARCHAR(32) NOT NULL DEFAULT 'draft',
318 error TEXT,
319 created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
320 approved_at TIMESTAMPTZ,
321 started_at TIMESTAMPTZ,
322 completed_at TIMESTAMPTZ,
323 CONSTRAINT fk_{plans}_session FOREIGN KEY (session_id) REFERENCES {sessions}(id) ON DELETE CASCADE
324);"#,
325 plans = c.plans_table,
326 sessions = c.sessions_table
327 ),
328 ]
329 }
330
331 pub fn index_ddl(config: &PostgresConfig) -> Vec<String> {
333 let c = config;
334 vec![
335 format!(
337 "CREATE INDEX IF NOT EXISTS idx_{0}_tenant ON {0}(tenant_id)",
338 c.sessions_table
339 ),
340 format!(
341 "CREATE INDEX IF NOT EXISTS idx_{0}_parent ON {0}(parent_id)",
342 c.sessions_table
343 ),
344 format!(
345 "CREATE INDEX IF NOT EXISTS idx_{0}_expires ON {0}(expires_at) WHERE expires_at IS NOT NULL",
346 c.sessions_table
347 ),
348 format!(
349 "CREATE INDEX IF NOT EXISTS idx_{0}_state ON {0}(state)",
350 c.sessions_table
351 ),
352 format!(
354 "CREATE INDEX IF NOT EXISTS idx_{0}_session ON {0}(session_id)",
355 c.messages_table
356 ),
357 format!(
358 "CREATE INDEX IF NOT EXISTS idx_{0}_created ON {0}(session_id, created_at)",
359 c.messages_table
360 ),
361 format!(
363 "CREATE INDEX IF NOT EXISTS idx_{0}_session ON {0}(session_id)",
364 c.compacts_table
365 ),
366 format!(
368 "CREATE INDEX IF NOT EXISTS idx_{0}_session ON {0}(session_id)",
369 c.summaries_table
370 ),
371 format!(
373 "CREATE INDEX IF NOT EXISTS idx_{0}_session_status ON {0}(session_id, status)",
374 c.queue_table
375 ),
376 format!(
378 "CREATE INDEX IF NOT EXISTS idx_{0}_session ON {0}(session_id)",
379 c.todos_table
380 ),
381 format!(
383 "CREATE INDEX IF NOT EXISTS idx_{0}_session ON {0}(session_id)",
384 c.plans_table
385 ),
386 ]
387 }
388
389 pub fn expected_indexes(config: &PostgresConfig) -> Vec<(String, String)> {
391 let c = config;
392 vec![
393 (
394 c.sessions_table.clone(),
395 format!("idx_{}_tenant", c.sessions_table),
396 ),
397 (
398 c.sessions_table.clone(),
399 format!("idx_{}_parent", c.sessions_table),
400 ),
401 (
402 c.sessions_table.clone(),
403 format!("idx_{}_expires", c.sessions_table),
404 ),
405 (
406 c.sessions_table.clone(),
407 format!("idx_{}_state", c.sessions_table),
408 ),
409 (
410 c.messages_table.clone(),
411 format!("idx_{}_session", c.messages_table),
412 ),
413 (
414 c.messages_table.clone(),
415 format!("idx_{}_created", c.messages_table),
416 ),
417 (
418 c.compacts_table.clone(),
419 format!("idx_{}_session", c.compacts_table),
420 ),
421 (
422 c.summaries_table.clone(),
423 format!("idx_{}_session", c.summaries_table),
424 ),
425 (
426 c.queue_table.clone(),
427 format!("idx_{}_session_status", c.queue_table),
428 ),
429 (
430 c.todos_table.clone(),
431 format!("idx_{}_session", c.todos_table),
432 ),
433 (
434 c.plans_table.clone(),
435 format!("idx_{}_session", c.plans_table),
436 ),
437 ]
438 }
439
440 pub async fn migrate(pool: &PgPool, config: &PostgresConfig) -> Result<(), sqlx::Error> {
442 for table_ddl in Self::table_ddl(config) {
443 sqlx::query(&table_ddl).execute(pool).await?;
444 }
445
446 for index_ddl in Self::index_ddl(config) {
447 sqlx::query(&index_ddl).execute(pool).await?;
448 }
449
450 Ok(())
451 }
452
453 pub async fn verify(
455 pool: &PgPool,
456 config: &PostgresConfig,
457 ) -> Result<Vec<SchemaIssue>, sqlx::Error> {
458 let mut issues = Vec::new();
459
460 for table in config.table_names() {
462 let exists: bool = sqlx::query_scalar(
463 "SELECT EXISTS (SELECT 1 FROM information_schema.tables WHERE table_name = $1)",
464 )
465 .bind(table)
466 .fetch_one(pool)
467 .await?;
468
469 if !exists {
470 issues.push(SchemaIssue::MissingTable(table.to_string()));
471 }
472 }
473
474 for (table, index) in Self::expected_indexes(config) {
476 let exists: bool = sqlx::query_scalar(
477 "SELECT EXISTS (SELECT 1 FROM pg_indexes WHERE tablename = $1 AND indexname = $2)",
478 )
479 .bind(&table)
480 .bind(&index)
481 .fetch_one(pool)
482 .await?;
483
484 if !exists {
485 issues.push(SchemaIssue::MissingIndex { table, index });
486 }
487 }
488
489 Ok(issues)
490 }
491}
492
493pub struct PostgresPersistence {
499 pool: Arc<PgPool>,
500 config: PostgresConfig,
501}
502
503impl PostgresPersistence {
504 pub async fn connect(database_url: &str) -> Result<Self, sqlx::Error> {
508 Self::connect_with_config(database_url, PostgresConfig::default()).await
509 }
510
511 pub async fn connect_with_config(
513 database_url: &str,
514 config: PostgresConfig,
515 ) -> Result<Self, sqlx::Error> {
516 let pool = config.pool.apply().connect(database_url).await?;
517 Ok(Self {
518 pool: Arc::new(pool),
519 config,
520 })
521 }
522
523 pub async fn connect_and_migrate(database_url: &str) -> Result<Self, sqlx::Error> {
527 Self::connect_and_migrate_with_config(database_url, PostgresConfig::default()).await
528 }
529
530 pub async fn connect_and_migrate_with_config(
532 database_url: &str,
533 config: PostgresConfig,
534 ) -> Result<Self, sqlx::Error> {
535 let persistence = Self::connect_with_config(database_url, config).await?;
536 persistence.migrate().await?;
537 Ok(persistence)
538 }
539
540 pub fn with_pool(pool: Arc<PgPool>) -> Self {
542 Self::with_pool_and_config(pool, PostgresConfig::default())
543 }
544
545 pub fn with_pool_and_config(pool: Arc<PgPool>, config: PostgresConfig) -> Self {
547 Self { pool, config }
548 }
549
550 pub async fn migrate(&self) -> Result<(), sqlx::Error> {
552 PostgresSchema::migrate(&self.pool, &self.config).await
553 }
554
555 pub async fn verify_schema(&self) -> Result<Vec<SchemaIssue>, sqlx::Error> {
557 PostgresSchema::verify(&self.pool, &self.config).await
558 }
559
560 pub fn pool(&self) -> &PgPool {
562 &self.pool
563 }
564
565 pub fn config(&self) -> &PostgresConfig {
567 &self.config
568 }
569
570 async fn load_session_row(&self, session_id: &SessionId) -> SessionResult<Session> {
575 let c = &self.config;
576 let id_str = session_id.to_string();
577
578 let row = sqlx::query(&format!(
579 r#"
580 SELECT id, parent_id, tenant_id, session_type, state, mode,
581 config, permission_policy, summary,
582 total_input_tokens, total_output_tokens, total_cost_usd,
583 current_leaf_id, static_context_hash, error,
584 created_at, updated_at, expires_at
585 FROM {sessions}
586 WHERE id = $1
587 "#,
588 sessions = c.sessions_table
589 ))
590 .bind(&id_str)
591 .fetch_optional(self.pool.as_ref())
592 .await
593 .map_err(|e| SessionError::Storage {
594 message: e.to_string(),
595 })?
596 .ok_or_else(|| SessionError::NotFound { id: id_str.clone() })?;
597
598 let messages = self.load_messages(session_id).await?;
599 let compacts = self.load_compacts(session_id).await?;
600 let todos = self.load_todos_internal(session_id).await?;
601 let plan = self.load_plan_internal(session_id).await?;
602
603 let config: SessionConfig = row
604 .try_get::<serde_json::Value, _>("config")
605 .ok()
606 .and_then(|v| serde_json::from_value(v).ok())
607 .unwrap_or_default();
608
609 let permission_policy = row
610 .try_get::<serde_json::Value, _>("permission_policy")
611 .ok()
612 .and_then(|v| serde_json::from_value(v).ok())
613 .unwrap_or_default();
614
615 let session_type = row
616 .try_get::<&str, _>("session_type")
617 .ok()
618 .and_then(|s| serde_json::from_str(&format!("\"{}\"", s)).ok())
619 .unwrap_or_default();
620
621 let mode = row
622 .try_get::<&str, _>("mode")
623 .ok()
624 .and_then(|s| serde_json::from_str(&format!("\"{}\"", s)).ok())
625 .unwrap_or_default();
626
627 let state = row
628 .try_get::<&str, _>("state")
629 .ok()
630 .and_then(|s| serde_json::from_str(&format!("\"{}\"", s)).ok())
631 .unwrap_or_default();
632
633 let current_leaf_id = row
634 .try_get::<&str, _>("current_leaf_id")
635 .ok()
636 .and_then(|s| s.parse().ok());
637
638 Ok(Session {
639 id: *session_id,
640 parent_id: row
641 .try_get::<&str, _>("parent_id")
642 .ok()
643 .and_then(|s| s.parse().ok()),
644 session_type,
645 tenant_id: row.try_get("tenant_id").ok(),
646 mode,
647 state,
648 config,
649 permission_policy,
650 messages,
651 current_leaf_id,
652 summary: row.try_get("summary").ok(),
653 total_usage: crate::types::TokenUsage {
654 input_tokens: row.try_get::<i64, _>("total_input_tokens").unwrap_or(0) as u64,
655 output_tokens: row.try_get::<i64, _>("total_output_tokens").unwrap_or(0) as u64,
656 ..Default::default()
657 },
658 current_input_tokens: 0,
659 total_cost_usd: row
660 .try_get::<rust_decimal::Decimal, _>("total_cost_usd")
661 .ok()
662 .and_then(|d| d.to_string().parse().ok())
663 .unwrap_or(0.0),
664 static_context_hash: row.try_get("static_context_hash").ok(),
665 created_at: row.try_get("created_at").unwrap_or_else(|_| Utc::now()),
666 updated_at: row.try_get("updated_at").unwrap_or_else(|_| Utc::now()),
667 expires_at: row.try_get("expires_at").ok(),
668 error: row.try_get("error").ok(),
669 todos,
670 current_plan: plan,
671 compact_history: compacts,
672 })
673 }
674
675 async fn load_messages(&self, session_id: &SessionId) -> SessionResult<Vec<SessionMessage>> {
676 let c = &self.config;
677
678 let rows = sqlx::query(&format!(
679 r#"
680 SELECT id, parent_id, role, content, is_sidechain, is_compact_summary,
681 model, request_id, usage, metadata, environment, created_at
682 FROM {messages}
683 WHERE session_id = $1
684 ORDER BY created_at ASC
685 "#,
686 messages = c.messages_table
687 ))
688 .bind(session_id.to_string())
689 .fetch_all(self.pool.as_ref())
690 .await
691 .map_err(|e| SessionError::Storage {
692 message: e.to_string(),
693 })?;
694
695 Ok(rows
696 .into_iter()
697 .filter_map(|row| {
698 let content: Vec<crate::types::ContentBlock> = row
699 .try_get::<serde_json::Value, _>("content")
700 .ok()
701 .and_then(|v| serde_json::from_value(v).ok())?;
702
703 let role: crate::types::Role = row
704 .try_get::<&str, _>("role")
705 .ok()
706 .and_then(|s| serde_json::from_str(&format!("\"{}\"", s)).ok())?;
707
708 let usage = row
709 .try_get::<serde_json::Value, _>("usage")
710 .ok()
711 .and_then(|v| serde_json::from_value(v).ok());
712
713 let metadata = row
714 .try_get::<serde_json::Value, _>("metadata")
715 .ok()
716 .and_then(|v| serde_json::from_value(v).ok())
717 .unwrap_or_default();
718
719 let environment = row
720 .try_get::<serde_json::Value, _>("environment")
721 .ok()
722 .and_then(|v| serde_json::from_value(v).ok());
723
724 Some(SessionMessage {
725 id: row.try_get::<&str, _>("id").ok()?.parse().ok()?,
726 parent_id: row
727 .try_get::<&str, _>("parent_id")
728 .ok()
729 .and_then(|s| s.parse().ok()),
730 role,
731 content,
732 is_sidechain: row.try_get("is_sidechain").unwrap_or(false),
733 is_compact_summary: row.try_get("is_compact_summary").unwrap_or(false),
734 usage,
735 timestamp: row.try_get("created_at").unwrap_or_else(|_| Utc::now()),
736 metadata,
737 environment,
738 })
739 })
740 .collect())
741 }
742
743 async fn load_compacts(&self, session_id: &SessionId) -> SessionResult<Vec<CompactRecord>> {
744 let c = &self.config;
745
746 let rows = sqlx::query(&format!(
747 r#"
748 SELECT id, session_id, trigger, pre_tokens, post_tokens, saved_tokens,
749 summary, original_count, new_count, logical_parent_id, created_at
750 FROM {compacts}
751 WHERE session_id = $1
752 ORDER BY created_at ASC
753 "#,
754 compacts = c.compacts_table
755 ))
756 .bind(session_id.to_string())
757 .fetch_all(self.pool.as_ref())
758 .await
759 .map_err(|e| SessionError::Storage {
760 message: e.to_string(),
761 })?;
762
763 Ok(rows
764 .into_iter()
765 .filter_map(|row| {
766 let trigger = row
767 .try_get::<&str, _>("trigger")
768 .ok()
769 .and_then(|s| serde_json::from_str(&format!("\"{}\"", s)).ok())?;
770
771 Some(CompactRecord {
772 id: row.try_get("id").ok()?,
773 session_id: row.try_get::<&str, _>("session_id").ok()?.parse().ok()?,
774 trigger,
775 pre_tokens: row.try_get::<i32, _>("pre_tokens").unwrap_or(0) as usize,
776 post_tokens: row.try_get::<i32, _>("post_tokens").unwrap_or(0) as usize,
777 saved_tokens: row.try_get::<i32, _>("saved_tokens").unwrap_or(0) as usize,
778 summary: row.try_get("summary").ok()?,
779 original_count: row.try_get::<i32, _>("original_count").unwrap_or(0) as usize,
780 new_count: row.try_get::<i32, _>("new_count").unwrap_or(0) as usize,
781 logical_parent_id: row
782 .try_get::<&str, _>("logical_parent_id")
783 .ok()
784 .and_then(|s| s.parse().ok()),
785 created_at: row.try_get("created_at").unwrap_or_else(|_| Utc::now()),
786 })
787 })
788 .collect())
789 }
790
791 async fn load_todos_internal(&self, session_id: &SessionId) -> SessionResult<Vec<TodoItem>> {
792 let c = &self.config;
793
794 let rows = sqlx::query(&format!(
795 r#"
796 SELECT id, session_id, plan_id, content, active_form, status,
797 created_at, started_at, completed_at
798 FROM {todos}
799 WHERE session_id = $1
800 ORDER BY created_at ASC
801 "#,
802 todos = c.todos_table
803 ))
804 .bind(session_id.to_string())
805 .fetch_all(self.pool.as_ref())
806 .await
807 .map_err(|e| SessionError::Storage {
808 message: e.to_string(),
809 })?;
810
811 Ok(rows
812 .into_iter()
813 .filter_map(|row| {
814 let status = row
815 .try_get::<&str, _>("status")
816 .ok()
817 .and_then(|s| serde_json::from_str(&format!("\"{}\"", s)).ok())?;
818
819 Some(TodoItem {
820 id: row.try_get("id").ok()?,
821 session_id: row.try_get::<&str, _>("session_id").ok()?.parse().ok()?,
822 content: row.try_get("content").ok()?,
823 active_form: row.try_get("active_form").ok()?,
824 status,
825 plan_id: row.try_get("plan_id").ok(),
826 created_at: row.try_get("created_at").unwrap_or_else(|_| Utc::now()),
827 started_at: row.try_get("started_at").ok(),
828 completed_at: row.try_get("completed_at").ok(),
829 })
830 })
831 .collect())
832 }
833
834 async fn load_plan_internal(&self, session_id: &SessionId) -> SessionResult<Option<Plan>> {
835 let c = &self.config;
836
837 let row = sqlx::query(&format!(
838 r#"
839 SELECT id, session_id, name, content, status, error,
840 created_at, approved_at, started_at, completed_at
841 FROM {plans}
842 WHERE session_id = $1
843 ORDER BY created_at DESC
844 LIMIT 1
845 "#,
846 plans = c.plans_table
847 ))
848 .bind(session_id.to_string())
849 .fetch_optional(self.pool.as_ref())
850 .await
851 .map_err(|e| SessionError::Storage {
852 message: e.to_string(),
853 })?;
854
855 Ok(row.and_then(|row| {
856 let status = row
857 .try_get::<&str, _>("status")
858 .ok()
859 .and_then(|s| serde_json::from_str(&format!("\"{}\"", s)).ok())?;
860
861 Some(Plan {
862 id: row.try_get("id").ok()?,
863 session_id: row.try_get::<&str, _>("session_id").ok()?.parse().ok()?,
864 name: row.try_get("name").ok(),
865 content: row.try_get("content").ok()?,
866 status,
867 error: row.try_get("error").ok(),
868 created_at: row.try_get("created_at").unwrap_or_else(|_| Utc::now()),
869 approved_at: row.try_get("approved_at").ok(),
870 started_at: row.try_get("started_at").ok(),
871 completed_at: row.try_get("completed_at").ok(),
872 })
873 }))
874 }
875
876 async fn save_todos_tx(
877 &self,
878 tx: &mut Transaction<'_, Postgres>,
879 session_id: &SessionId,
880 todos: &[TodoItem],
881 ) -> SessionResult<()> {
882 let c = &self.config;
883
884 sqlx::query(&format!(
885 "DELETE FROM {todos} WHERE session_id = $1",
886 todos = c.todos_table
887 ))
888 .bind(session_id.to_string())
889 .execute(&mut **tx)
890 .await
891 .map_err(|e| SessionError::Storage {
892 message: e.to_string(),
893 })?;
894
895 for todo in todos {
896 let status = serde_json::to_string(&todo.status)
897 .unwrap_or_else(|_| "\"pending\"".to_string())
898 .trim_matches('"')
899 .to_string();
900
901 sqlx::query(&format!(
902 r#"
903 INSERT INTO {todos} (
904 id, session_id, plan_id, content, active_form, status,
905 created_at, started_at, completed_at
906 ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
907 "#,
908 todos = c.todos_table
909 ))
910 .bind(todo.id)
911 .bind(session_id.to_string())
912 .bind(todo.plan_id)
913 .bind(&todo.content)
914 .bind(&todo.active_form)
915 .bind(&status)
916 .bind(todo.created_at)
917 .bind(todo.started_at)
918 .bind(todo.completed_at)
919 .execute(&mut **tx)
920 .await
921 .map_err(|e| SessionError::Storage {
922 message: e.to_string(),
923 })?;
924 }
925
926 Ok(())
927 }
928
929 async fn save_plan_tx(
930 &self,
931 tx: &mut Transaction<'_, Postgres>,
932 plan: &Plan,
933 ) -> SessionResult<()> {
934 let c = &self.config;
935
936 let status = serde_json::to_string(&plan.status)
937 .unwrap_or_else(|_| "\"draft\"".to_string())
938 .trim_matches('"')
939 .to_string();
940
941 sqlx::query(&format!(
942 r#"
943 INSERT INTO {plans} (
944 id, session_id, name, content, status, error,
945 created_at, approved_at, started_at, completed_at
946 ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
947 ON CONFLICT (id) DO UPDATE SET
948 name = EXCLUDED.name,
949 content = EXCLUDED.content,
950 status = EXCLUDED.status,
951 error = EXCLUDED.error,
952 approved_at = EXCLUDED.approved_at,
953 started_at = EXCLUDED.started_at,
954 completed_at = EXCLUDED.completed_at
955 "#,
956 plans = c.plans_table
957 ))
958 .bind(plan.id)
959 .bind(plan.session_id.to_string())
960 .bind(&plan.name)
961 .bind(&plan.content)
962 .bind(&status)
963 .bind(&plan.error)
964 .bind(plan.created_at)
965 .bind(plan.approved_at)
966 .bind(plan.started_at)
967 .bind(plan.completed_at)
968 .execute(&mut **tx)
969 .await
970 .map_err(|e| SessionError::Storage {
971 message: e.to_string(),
972 })?;
973
974 Ok(())
975 }
976
977 async fn save_compacts_tx(
978 &self,
979 tx: &mut Transaction<'_, Postgres>,
980 session_id: &SessionId,
981 compacts: &[CompactRecord],
982 ) -> SessionResult<()> {
983 let c = &self.config;
984
985 sqlx::query(&format!(
986 "DELETE FROM {compacts} WHERE session_id = $1",
987 compacts = c.compacts_table
988 ))
989 .bind(session_id.to_string())
990 .execute(&mut **tx)
991 .await
992 .map_err(|e| SessionError::Storage {
993 message: e.to_string(),
994 })?;
995
996 for compact in compacts {
997 let trigger = serde_json::to_string(&compact.trigger)
998 .unwrap_or_else(|_| "\"manual\"".to_string())
999 .trim_matches('"')
1000 .to_string();
1001
1002 sqlx::query(&format!(
1003 r#"
1004 INSERT INTO {compacts} (
1005 id, session_id, trigger, pre_tokens, post_tokens, saved_tokens,
1006 summary, original_count, new_count, logical_parent_id, created_at
1007 ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)
1008 "#,
1009 compacts = c.compacts_table
1010 ))
1011 .bind(compact.id)
1012 .bind(session_id.to_string())
1013 .bind(&trigger)
1014 .bind(compact.pre_tokens as i32)
1015 .bind(compact.post_tokens as i32)
1016 .bind(compact.saved_tokens as i32)
1017 .bind(&compact.summary)
1018 .bind(compact.original_count as i32)
1019 .bind(compact.new_count as i32)
1020 .bind(compact.logical_parent_id.as_ref().map(|id| id.to_string()))
1021 .bind(compact.created_at)
1022 .execute(&mut **tx)
1023 .await
1024 .map_err(|e| SessionError::Storage {
1025 message: e.to_string(),
1026 })?;
1027 }
1028
1029 Ok(())
1030 }
1031
1032 async fn save_messages_tx(
1033 &self,
1034 tx: &mut Transaction<'_, Postgres>,
1035 session_id: &SessionId,
1036 messages: &[SessionMessage],
1037 ) -> SessionResult<()> {
1038 let c = &self.config;
1039
1040 sqlx::query(&format!(
1041 "DELETE FROM {messages} WHERE session_id = $1",
1042 messages = c.messages_table
1043 ))
1044 .bind(session_id.to_string())
1045 .execute(&mut **tx)
1046 .await
1047 .map_err(|e| SessionError::Storage {
1048 message: e.to_string(),
1049 })?;
1050
1051 for message in messages {
1052 let role = serde_json::to_string(&message.role)
1053 .unwrap_or_else(|_| "\"user\"".to_string())
1054 .trim_matches('"')
1055 .to_string();
1056
1057 sqlx::query(&format!(
1058 r#"
1059 INSERT INTO {messages} (
1060 id, session_id, parent_id, role, content, is_sidechain,
1061 is_compact_summary, model, request_id, usage, metadata,
1062 environment, created_at
1063 ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13)
1064 "#,
1065 messages = c.messages_table
1066 ))
1067 .bind(message.id.to_string())
1068 .bind(session_id.to_string())
1069 .bind(message.parent_id.as_ref().map(|id| id.to_string()))
1070 .bind(&role)
1071 .bind(serde_json::to_value(&message.content).unwrap_or_default())
1072 .bind(message.is_sidechain)
1073 .bind(message.is_compact_summary)
1074 .bind(&message.metadata.model)
1075 .bind(&message.metadata.request_id)
1076 .bind(
1077 message
1078 .usage
1079 .as_ref()
1080 .and_then(|u| serde_json::to_value(u).ok()),
1081 )
1082 .bind(serde_json::to_value(&message.metadata).unwrap_or_default())
1083 .bind(
1084 message
1085 .environment
1086 .as_ref()
1087 .and_then(|e| serde_json::to_value(e).ok()),
1088 )
1089 .bind(message.timestamp)
1090 .execute(&mut **tx)
1091 .await
1092 .map_err(|e| SessionError::Storage {
1093 message: e.to_string(),
1094 })?;
1095 }
1096
1097 Ok(())
1098 }
1099}
1100
1101#[async_trait]
1102impl Persistence for PostgresPersistence {
1103 fn name(&self) -> &str {
1104 "postgres"
1105 }
1106
1107 async fn save(&self, session: &Session) -> SessionResult<()> {
1108 let c = &self.config;
1109
1110 let mut tx = self.pool.begin().await.map_err(|e| SessionError::Storage {
1111 message: e.to_string(),
1112 })?;
1113
1114 let session_type = serde_json::to_string(&session.session_type)
1115 .unwrap_or_else(|_| "\"main\"".to_string())
1116 .trim_matches('"')
1117 .to_string();
1118
1119 let state = serde_json::to_string(&session.state)
1120 .unwrap_or_else(|_| "\"created\"".to_string())
1121 .trim_matches('"')
1122 .to_string();
1123
1124 let mode = serde_json::to_string(&session.mode)
1125 .unwrap_or_else(|_| "\"default\"".to_string())
1126 .trim_matches('"')
1127 .to_string();
1128
1129 sqlx::query(&format!(
1130 r#"
1131 INSERT INTO {sessions} (
1132 id, parent_id, tenant_id, session_type, state, mode,
1133 config, permission_policy, summary,
1134 total_input_tokens, total_output_tokens, total_cost_usd,
1135 current_leaf_id, static_context_hash, error,
1136 created_at, updated_at, expires_at
1137 ) VALUES (
1138 $1, $2, $3, $4, $5, $6, $7, $8, $9,
1139 $10, $11, $12, $13, $14, $15, $16, $17, $18
1140 )
1141 ON CONFLICT (id) DO UPDATE SET
1142 parent_id = EXCLUDED.parent_id,
1143 tenant_id = EXCLUDED.tenant_id,
1144 session_type = EXCLUDED.session_type,
1145 state = EXCLUDED.state,
1146 mode = EXCLUDED.mode,
1147 config = EXCLUDED.config,
1148 permission_policy = EXCLUDED.permission_policy,
1149 summary = EXCLUDED.summary,
1150 total_input_tokens = EXCLUDED.total_input_tokens,
1151 total_output_tokens = EXCLUDED.total_output_tokens,
1152 total_cost_usd = EXCLUDED.total_cost_usd,
1153 current_leaf_id = EXCLUDED.current_leaf_id,
1154 static_context_hash = EXCLUDED.static_context_hash,
1155 error = EXCLUDED.error,
1156 updated_at = EXCLUDED.updated_at,
1157 expires_at = EXCLUDED.expires_at
1158 "#,
1159 sessions = c.sessions_table
1160 ))
1161 .bind(session.id.to_string())
1162 .bind(session.parent_id.map(|id| id.to_string()))
1163 .bind(&session.tenant_id)
1164 .bind(&session_type)
1165 .bind(&state)
1166 .bind(&mode)
1167 .bind(serde_json::to_value(&session.config).unwrap_or_default())
1168 .bind(serde_json::to_value(&session.permission_policy).unwrap_or_default())
1169 .bind(&session.summary)
1170 .bind(session.total_usage.input_tokens as i64)
1171 .bind(session.total_usage.output_tokens as i64)
1172 .bind(session.total_cost_usd)
1173 .bind(session.current_leaf_id.as_ref().map(|id| id.to_string()))
1174 .bind(&session.static_context_hash)
1175 .bind(&session.error)
1176 .bind(session.created_at)
1177 .bind(session.updated_at)
1178 .bind(session.expires_at)
1179 .execute(&mut *tx)
1180 .await
1181 .map_err(|e| SessionError::Storage {
1182 message: e.to_string(),
1183 })?;
1184
1185 self.save_messages_tx(&mut tx, &session.id, &session.messages)
1186 .await?;
1187 self.save_todos_tx(&mut tx, &session.id, &session.todos)
1188 .await?;
1189 self.save_compacts_tx(&mut tx, &session.id, &session.compact_history)
1190 .await?;
1191
1192 if let Some(ref plan) = session.current_plan {
1193 self.save_plan_tx(&mut tx, plan).await?;
1194 }
1195
1196 tx.commit().await.map_err(|e| SessionError::Storage {
1197 message: e.to_string(),
1198 })?;
1199
1200 Ok(())
1201 }
1202
1203 async fn load(&self, id: &SessionId) -> SessionResult<Option<Session>> {
1204 match self.load_session_row(id).await {
1205 Ok(session) => Ok(Some(session)),
1206 Err(SessionError::NotFound { .. }) => Ok(None),
1207 Err(e) => Err(e),
1208 }
1209 }
1210
1211 async fn delete(&self, id: &SessionId) -> SessionResult<bool> {
1212 let c = &self.config;
1213
1214 let result = sqlx::query(&format!(
1215 "DELETE FROM {sessions} WHERE id = $1",
1216 sessions = c.sessions_table
1217 ))
1218 .bind(id.to_string())
1219 .execute(self.pool.as_ref())
1220 .await
1221 .map_err(|e| SessionError::Storage {
1222 message: e.to_string(),
1223 })?;
1224
1225 Ok(result.rows_affected() > 0)
1226 }
1227
1228 async fn list(&self, tenant_id: Option<&str>) -> SessionResult<Vec<SessionId>> {
1229 let c = &self.config;
1230
1231 let rows = if let Some(tid) = tenant_id {
1232 sqlx::query(&format!(
1233 "SELECT id FROM {sessions} WHERE tenant_id = $1",
1234 sessions = c.sessions_table
1235 ))
1236 .bind(tid)
1237 .fetch_all(self.pool.as_ref())
1238 .await
1239 } else {
1240 sqlx::query(&format!(
1241 "SELECT id FROM {sessions}",
1242 sessions = c.sessions_table
1243 ))
1244 .fetch_all(self.pool.as_ref())
1245 .await
1246 }
1247 .map_err(|e| SessionError::Storage {
1248 message: e.to_string(),
1249 })?;
1250
1251 Ok(rows
1252 .into_iter()
1253 .filter_map(|row| row.try_get::<&str, _>("id").ok()?.parse().ok())
1254 .collect())
1255 }
1256
1257 async fn list_children(&self, parent_id: &SessionId) -> SessionResult<Vec<SessionId>> {
1258 let c = &self.config;
1259
1260 let rows = sqlx::query(&format!(
1261 "SELECT id FROM {sessions} WHERE parent_id = $1",
1262 sessions = c.sessions_table
1263 ))
1264 .bind(parent_id.to_string())
1265 .fetch_all(self.pool.as_ref())
1266 .await
1267 .map_err(|e| SessionError::Storage {
1268 message: e.to_string(),
1269 })?;
1270
1271 Ok(rows
1272 .into_iter()
1273 .filter_map(|row| row.try_get::<&str, _>("id").ok()?.parse().ok())
1274 .collect())
1275 }
1276
1277 async fn add_summary(&self, snapshot: SummarySnapshot) -> SessionResult<()> {
1278 let c = &self.config;
1279
1280 sqlx::query(&format!(
1281 r#"
1282 INSERT INTO {summaries} (id, session_id, summary, leaf_message_id, created_at)
1283 VALUES ($1, $2, $3, $4, $5)
1284 "#,
1285 summaries = c.summaries_table
1286 ))
1287 .bind(snapshot.id)
1288 .bind(snapshot.session_id.to_string())
1289 .bind(&snapshot.summary)
1290 .bind(snapshot.leaf_message_id.map(|id| id.to_string()))
1291 .bind(snapshot.created_at)
1292 .execute(self.pool.as_ref())
1293 .await
1294 .map_err(|e| SessionError::Storage {
1295 message: e.to_string(),
1296 })?;
1297
1298 sqlx::query(&format!(
1299 "UPDATE {sessions} SET summary = $1, updated_at = NOW() WHERE id = $2",
1300 sessions = c.sessions_table
1301 ))
1302 .bind(&snapshot.summary)
1303 .bind(snapshot.session_id.to_string())
1304 .execute(self.pool.as_ref())
1305 .await
1306 .map_err(|e| SessionError::Storage {
1307 message: e.to_string(),
1308 })?;
1309
1310 Ok(())
1311 }
1312
1313 async fn get_summaries(&self, session_id: &SessionId) -> SessionResult<Vec<SummarySnapshot>> {
1314 let c = &self.config;
1315
1316 let rows = sqlx::query(&format!(
1317 r#"
1318 SELECT id, session_id, summary, leaf_message_id, created_at
1319 FROM {summaries}
1320 WHERE session_id = $1
1321 ORDER BY created_at ASC
1322 "#,
1323 summaries = c.summaries_table
1324 ))
1325 .bind(session_id.to_string())
1326 .fetch_all(self.pool.as_ref())
1327 .await
1328 .map_err(|e| SessionError::Storage {
1329 message: e.to_string(),
1330 })?;
1331
1332 Ok(rows
1333 .into_iter()
1334 .filter_map(|row| {
1335 Some(SummarySnapshot {
1336 id: row.try_get("id").ok()?,
1337 session_id: row.try_get::<&str, _>("session_id").ok()?.parse().ok()?,
1338 summary: row.try_get("summary").ok()?,
1339 leaf_message_id: row
1340 .try_get::<&str, _>("leaf_message_id")
1341 .ok()
1342 .and_then(|s| s.parse().ok()),
1343 created_at: row.try_get("created_at").unwrap_or_else(|_| Utc::now()),
1344 })
1345 })
1346 .collect())
1347 }
1348
1349 async fn enqueue(
1350 &self,
1351 session_id: &SessionId,
1352 content: String,
1353 priority: i32,
1354 ) -> SessionResult<QueueItem> {
1355 let c = &self.config;
1356 let item = QueueItem::enqueue(*session_id, &content).with_priority(priority);
1357
1358 sqlx::query(&format!(
1359 r#"
1360 INSERT INTO {queue} (id, session_id, operation, content, priority, status, created_at)
1361 VALUES ($1, $2, $3, $4, $5, $6, $7)
1362 "#,
1363 queue = c.queue_table
1364 ))
1365 .bind(item.id)
1366 .bind(session_id.to_string())
1367 .bind("enqueue")
1368 .bind(&content)
1369 .bind(priority)
1370 .bind("pending")
1371 .bind(item.created_at)
1372 .execute(self.pool.as_ref())
1373 .await
1374 .map_err(|e| SessionError::Storage {
1375 message: e.to_string(),
1376 })?;
1377
1378 Ok(item)
1379 }
1380
1381 async fn dequeue(&self, session_id: &SessionId) -> SessionResult<Option<QueueItem>> {
1382 let c = &self.config;
1383
1384 let row = sqlx::query(&format!(
1385 r#"
1386 UPDATE {queue}
1387 SET status = 'processing'
1388 WHERE id = (
1389 SELECT id FROM {queue}
1390 WHERE session_id = $1 AND status = 'pending'
1391 ORDER BY priority DESC, created_at ASC
1392 LIMIT 1
1393 FOR UPDATE SKIP LOCKED
1394 )
1395 RETURNING id, session_id, operation, content, priority, status, created_at, processed_at
1396 "#,
1397 queue = c.queue_table
1398 ))
1399 .bind(session_id.to_string())
1400 .fetch_optional(self.pool.as_ref())
1401 .await
1402 .map_err(|e| SessionError::Storage {
1403 message: e.to_string(),
1404 })?;
1405
1406 Ok(row.and_then(|row| {
1407 Some(QueueItem {
1408 id: row.try_get("id").ok()?,
1409 session_id: row.try_get::<&str, _>("session_id").ok()?.parse().ok()?,
1410 operation: super::types::QueueOperation::Enqueue,
1411 content: row.try_get("content").ok()?,
1412 priority: row.try_get("priority").unwrap_or(0),
1413 status: QueueStatus::Processing,
1414 created_at: row.try_get("created_at").unwrap_or_else(|_| Utc::now()),
1415 processed_at: row.try_get("processed_at").ok(),
1416 })
1417 }))
1418 }
1419
1420 async fn cancel_queued(&self, item_id: Uuid) -> SessionResult<bool> {
1421 let c = &self.config;
1422
1423 let result = sqlx::query(&format!(
1424 "UPDATE {queue} SET status = 'cancelled', processed_at = NOW() WHERE id = $1 AND status = 'pending'",
1425 queue = c.queue_table
1426 ))
1427 .bind(item_id)
1428 .execute(self.pool.as_ref())
1429 .await
1430 .map_err(|e| SessionError::Storage { message: e.to_string() })?;
1431
1432 Ok(result.rows_affected() > 0)
1433 }
1434
1435 async fn pending_queue(&self, session_id: &SessionId) -> SessionResult<Vec<QueueItem>> {
1436 let c = &self.config;
1437
1438 let rows = sqlx::query(&format!(
1439 r#"
1440 SELECT id, session_id, operation, content, priority, status, created_at, processed_at
1441 FROM {queue}
1442 WHERE session_id = $1 AND status = 'pending'
1443 ORDER BY priority DESC, created_at ASC
1444 "#,
1445 queue = c.queue_table
1446 ))
1447 .bind(session_id.to_string())
1448 .fetch_all(self.pool.as_ref())
1449 .await
1450 .map_err(|e| SessionError::Storage {
1451 message: e.to_string(),
1452 })?;
1453
1454 Ok(rows
1455 .into_iter()
1456 .filter_map(|row| {
1457 Some(QueueItem {
1458 id: row.try_get("id").ok()?,
1459 session_id: row.try_get::<&str, _>("session_id").ok()?.parse().ok()?,
1460 operation: super::types::QueueOperation::Enqueue,
1461 content: row.try_get("content").ok()?,
1462 priority: row.try_get("priority").unwrap_or(0),
1463 status: QueueStatus::Pending,
1464 created_at: row.try_get("created_at").unwrap_or_else(|_| Utc::now()),
1465 processed_at: row.try_get("processed_at").ok(),
1466 })
1467 })
1468 .collect())
1469 }
1470
1471 async fn cleanup_expired(&self) -> SessionResult<usize> {
1472 let c = &self.config;
1473
1474 let result = sqlx::query(&format!(
1475 "DELETE FROM {sessions} WHERE expires_at IS NOT NULL AND expires_at < NOW()",
1476 sessions = c.sessions_table
1477 ))
1478 .execute(self.pool.as_ref())
1479 .await
1480 .map_err(|e| SessionError::Storage {
1481 message: e.to_string(),
1482 })?;
1483
1484 Ok(result.rows_affected() as usize)
1485 }
1486}