tianshu_postgres/
session_store.rs1use anyhow::Result;
6use async_trait::async_trait;
7use deadpool_postgres::Pool;
8use tracing::{debug, info};
9
10use tianshu::session::Session;
11use tianshu::store::SessionStore;
12
13pub struct PostgresSessionStore {
18 pool: Pool,
19}
20
21impl PostgresSessionStore {
22 pub fn new(pool: Pool) -> Self {
23 Self { pool }
24 }
25}
26
27#[async_trait]
28impl SessionStore for PostgresSessionStore {
29 async fn upsert(&self, session: &Session) -> Result<()> {
30 let client = self.pool.get().await?;
31 debug!("Upserting session: session_id={}", session.session_id);
32
33 client
34 .execute(
35 r#"
36 INSERT INTO wf_sessions (session_id, metadata, created_at, updated_at)
37 VALUES ($1, $2, $3, $4)
38 ON CONFLICT (session_id) DO UPDATE SET
39 metadata = EXCLUDED.metadata,
40 updated_at = EXCLUDED.updated_at
41 "#,
42 &[
43 &session.session_id,
44 &session.metadata,
45 &session.created_at,
46 &session.updated_at,
47 ],
48 )
49 .await?;
50
51 info!("Upserted session: session_id={}", session.session_id);
52 Ok(())
53 }
54
55 async fn get(&self, session_id: &str) -> Result<Option<Session>> {
56 let client = self.pool.get().await?;
57 debug!("Fetching session: session_id={}", session_id);
58
59 let row_opt = client
60 .query_opt(
61 "SELECT session_id, metadata, created_at, updated_at FROM wf_sessions WHERE session_id = $1",
62 &[&session_id],
63 )
64 .await?;
65
66 Ok(row_opt.map(|row| Session {
67 session_id: row.get(0),
68 metadata: row.get(1),
69 created_at: row.get(2),
70 updated_at: row.get(3),
71 }))
72 }
73
74 async fn delete(&self, session_id: &str) -> Result<()> {
75 let client = self.pool.get().await?;
76 client
77 .execute(
78 "DELETE FROM wf_sessions WHERE session_id = $1",
79 &[&session_id],
80 )
81 .await?;
82 info!("Deleted session: session_id={}", session_id);
83 Ok(())
84 }
85
86 async fn setup(&self) -> Result<()> {
87 let client = self.pool.get().await?;
88 client
89 .execute(
90 r#"
91 CREATE TABLE IF NOT EXISTS wf_sessions (
92 session_id TEXT PRIMARY KEY,
93 metadata JSONB,
94 created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
95 updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
96 )
97 "#,
98 &[],
99 )
100 .await?;
101 info!("wf_sessions table ready");
102 Ok(())
103 }
104}