Skip to main content

tianshu_postgres/
session_store.rs

1// Copyright 2026 Desicool
2//
3// SPDX-License-Identifier: Apache-2.0
4
5use anyhow::Result;
6use async_trait::async_trait;
7use deadpool_postgres::Pool;
8use tracing::{debug, info};
9
10use tianshu::session::Session;
11use tianshu::store::SessionStore;
12
13/// Reference PostgreSQL implementation of `SessionStore`.
14///
15/// Uses a simple `wf_sessions` table with `metadata JSONB`. Production users
16/// with complex session schemas should implement their own `SessionStore`.
17pub struct PostgresSessionStore {
18    pool: Pool,
19}
20
21impl PostgresSessionStore {
22    pub fn new(pool: Pool) -> Self {
23        Self { pool }
24    }
25}
26
27#[async_trait]
28impl SessionStore for PostgresSessionStore {
29    async fn upsert(&self, session: &Session) -> Result<()> {
30        let client = self.pool.get().await?;
31        debug!("Upserting session: session_id={}", session.session_id);
32
33        client
34            .execute(
35                r#"
36                INSERT INTO wf_sessions (session_id, metadata, created_at, updated_at)
37                VALUES ($1, $2, $3, $4)
38                ON CONFLICT (session_id) DO UPDATE SET
39                    metadata   = EXCLUDED.metadata,
40                    updated_at = EXCLUDED.updated_at
41                "#,
42                &[
43                    &session.session_id,
44                    &session.metadata,
45                    &session.created_at,
46                    &session.updated_at,
47                ],
48            )
49            .await?;
50
51        info!("Upserted session: session_id={}", session.session_id);
52        Ok(())
53    }
54
55    async fn get(&self, session_id: &str) -> Result<Option<Session>> {
56        let client = self.pool.get().await?;
57        debug!("Fetching session: session_id={}", session_id);
58
59        let row_opt = client
60            .query_opt(
61                "SELECT session_id, metadata, created_at, updated_at FROM wf_sessions WHERE session_id = $1",
62                &[&session_id],
63            )
64            .await?;
65
66        Ok(row_opt.map(|row| Session {
67            session_id: row.get(0),
68            metadata: row.get(1),
69            created_at: row.get(2),
70            updated_at: row.get(3),
71        }))
72    }
73
74    async fn delete(&self, session_id: &str) -> Result<()> {
75        let client = self.pool.get().await?;
76        client
77            .execute(
78                "DELETE FROM wf_sessions WHERE session_id = $1",
79                &[&session_id],
80            )
81            .await?;
82        info!("Deleted session: session_id={}", session_id);
83        Ok(())
84    }
85
86    async fn setup(&self) -> Result<()> {
87        let client = self.pool.get().await?;
88        client
89            .execute(
90                r#"
91                CREATE TABLE IF NOT EXISTS wf_sessions (
92                    session_id  TEXT PRIMARY KEY,
93                    metadata    JSONB,
94                    created_at  TIMESTAMPTZ NOT NULL DEFAULT NOW(),
95                    updated_at  TIMESTAMPTZ NOT NULL DEFAULT NOW()
96                )
97                "#,
98                &[],
99            )
100            .await?;
101        info!("wf_sessions table ready");
102        Ok(())
103    }
104}