claw_core/store/
session.rs1use chrono::{DateTime, Utc};
8use serde::{Deserialize, Serialize};
9use sqlx::SqlitePool;
10use uuid::Uuid;
11
12use crate::error::{ClawError, ClawResult};
13
14#[derive(Debug, Clone, Serialize, Deserialize)]
16pub struct SessionRecord {
17 pub id: Uuid,
19 pub session_id: String,
21 pub key: String,
23 pub value: serde_json::Value,
25 pub updated_at: DateTime<Utc>,
27}
28
29#[derive(Debug)]
31pub struct SessionStore<'a> {
32 pool: &'a SqlitePool,
33}
34
35impl<'a> SessionStore<'a> {
36 pub fn new(pool: &'a SqlitePool) -> Self {
38 SessionStore { pool }
39 }
40
41 pub async fn upsert(&self, record: &SessionRecord) -> ClawResult<()> {
47 sqlx::query(
48 r#"
49 INSERT INTO session_state (id, session_id, key, value, updated_at)
50 VALUES (?, ?, ?, ?, ?)
51 ON CONFLICT (session_id, key)
52 DO UPDATE SET value = excluded.value,
53 updated_at = excluded.updated_at
54 "#,
55 )
56 .bind(record.id.to_string())
57 .bind(&record.session_id)
58 .bind(&record.key)
59 .bind(serde_json::to_string(&record.value)?)
60 .bind(record.updated_at.to_rfc3339())
61 .execute(self.pool)
62 .await?;
63
64 Ok(())
65 }
66
67 pub async fn get_session(&self, session_id: &str) -> ClawResult<Vec<SessionRecord>> {
73 let rows = sqlx::query_as::<_, (String, String, String, String, String)>(
74 "SELECT id, session_id, key, value, updated_at \
75 FROM session_state WHERE session_id = ? ORDER BY key",
76 )
77 .bind(session_id)
78 .fetch_all(self.pool)
79 .await?;
80
81 rows.into_iter()
82 .map(|(id, session_id, key, value, updated_at)| {
83 Ok(SessionRecord {
84 id: Uuid::parse_str(&id).map_err(|e| ClawError::Store(e.to_string()))?,
85 session_id,
86 key,
87 value: serde_json::from_str(&value)?,
88 updated_at: DateTime::parse_from_rfc3339(&updated_at)
89 .map_err(|e| ClawError::Store(e.to_string()))?
90 .with_timezone(&Utc),
91 })
92 })
93 .collect()
94 }
95
96 pub async fn clear_session(&self, session_id: &str) -> ClawResult<u64> {
102 let result = sqlx::query("DELETE FROM session_state WHERE session_id = ?")
103 .bind(session_id)
104 .execute(self.pool)
105 .await?;
106
107 Ok(result.rows_affected())
108 }
109}