use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use sqlx::SqlitePool;
use crate::error::{ClawError, ClawResult};
use crate::store::memory::{ListOptions, ListPage};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Session {
pub id: String,
pub started_at: DateTime<Utc>,
pub ended_at: Option<DateTime<Utc>>,
}
#[derive(Debug)]
pub struct SessionLifecycleStore<'a> {
pool: &'a SqlitePool,
}
impl<'a> SessionLifecycleStore<'a> {
pub fn new(pool: &'a SqlitePool) -> Self {
SessionLifecycleStore { pool }
}
pub async fn start(&self) -> ClawResult<String> {
let id = uuid::Uuid::new_v4().to_string();
let now = Utc::now().to_rfc3339();
sqlx::query("INSERT INTO sessions (id, started_at) VALUES (?, ?)")
.bind(&id)
.bind(&now)
.execute(self.pool)
.await?;
Ok(id)
}
pub async fn end(&self, session_id: &str) -> ClawResult<()> {
let now = Utc::now().to_rfc3339();
let affected = sqlx::query("UPDATE sessions SET ended_at = ? WHERE id = ?")
.bind(&now)
.bind(session_id)
.execute(self.pool)
.await?
.rows_affected();
if affected == 0 {
return Err(ClawError::NotFound {
entity: "Session".to_string(),
id: session_id.to_string(),
});
}
Ok(())
}
pub async fn get(&self, session_id: &str) -> ClawResult<Session> {
let row: Option<(String, String, Option<String>)> =
sqlx::query_as("SELECT id, started_at, ended_at FROM sessions WHERE id = ?")
.bind(session_id)
.fetch_optional(self.pool)
.await?;
let (id, started_at_str, ended_at_str) = row.ok_or_else(|| ClawError::NotFound {
entity: "Session".to_string(),
id: session_id.to_string(),
})?;
Ok(Session {
id,
started_at: DateTime::parse_from_rfc3339(&started_at_str)
.map_err(|e| ClawError::Store(e.to_string()))?
.with_timezone(&Utc),
ended_at: ended_at_str
.map(|s| {
DateTime::parse_from_rfc3339(&s)
.map(|dt| dt.with_timezone(&Utc))
.map_err(|e| ClawError::Store(e.to_string()))
})
.transpose()?,
})
}
pub async fn list(&self) -> ClawResult<Vec<Session>> {
let rows: Vec<(String, String, Option<String>)> = sqlx::query_as(
"SELECT id, started_at, ended_at FROM sessions ORDER BY started_at DESC",
)
.fetch_all(self.pool)
.await?;
rows.into_iter()
.map(|(id, started_at_str, ended_at_str)| {
Ok(Session {
id,
started_at: DateTime::parse_from_rfc3339(&started_at_str)
.map_err(|e| ClawError::Store(e.to_string()))?
.with_timezone(&Utc),
ended_at: ended_at_str
.map(|s| {
DateTime::parse_from_rfc3339(&s)
.map(|dt| dt.with_timezone(&Utc))
.map_err(|e| ClawError::Store(e.to_string()))
})
.transpose()?,
})
})
.collect()
}
pub async fn list_paginated(&self, opts: &ListOptions) -> ClawResult<ListPage<Session>> {
let limit = opts.validated_limit() as i64;
let fetch = limit.saturating_add(1);
let rows: Vec<(String, String, Option<String>)> = match &opts.cursor {
None => {
sqlx::query_as(
"SELECT id, started_at, ended_at FROM sessions \
ORDER BY started_at DESC, id DESC \
LIMIT ?",
)
.bind(fetch)
.fetch_all(self.pool)
.await?
}
Some(cursor) => {
sqlx::query_as(
"SELECT id, started_at, ended_at FROM sessions \
WHERE (started_at, id) < \
(SELECT started_at, id FROM sessions WHERE id = ?) \
ORDER BY started_at DESC, id DESC \
LIMIT ?",
)
.bind(cursor)
.bind(fetch)
.fetch_all(self.pool)
.await?
}
};
let has_more = rows.len() as i64 > limit;
let page = if has_more {
&rows[..limit as usize]
} else {
rows.as_slice()
};
let sessions: Vec<Session> = page
.iter()
.map(|(id, started_at_str, ended_at_str)| {
Ok(Session {
id: id.clone(),
started_at: DateTime::parse_from_rfc3339(started_at_str)
.map_err(|e| ClawError::Store(e.to_string()))?
.with_timezone(&Utc),
ended_at: ended_at_str
.as_deref()
.map(|s| {
DateTime::parse_from_rfc3339(s)
.map(|dt| dt.with_timezone(&Utc))
.map_err(|e| ClawError::Store(e.to_string()))
})
.transpose()?,
})
})
.collect::<ClawResult<_>>()?;
let next_cursor = if has_more {
sessions.last().map(|s| s.id.clone())
} else {
None
};
Ok(ListPage {
items: sessions,
next_cursor,
})
}
}