use std::sync::Arc;
use chrono::Utc;
use uuid::Uuid;
use crate::{
config::ClawDBConfig,
error::{ClawDBError, ClawDBResult},
events::{bus::EventBus, types::ClawEvent},
lifecycle::manager::ComponentLifecycleManager,
session::{context::SessionContext, store::SessionStore},
};
const DEFAULT_SESSION_SECS: i64 = 3_600;
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct ClawDBSession {
pub id: Uuid,
pub agent_id: Uuid,
pub workspace_id: Uuid,
pub role: String,
pub scopes: Vec<String>,
pub task_type: Option<String>,
pub guard_token: String,
pub created_at: chrono::DateTime<Utc>,
pub expires_at: chrono::DateTime<Utc>,
pub metadata: serde_json::Value,
}
impl ClawDBSession {
pub fn as_context(&self) -> SessionContext {
SessionContext {
session_id: self.id,
agent_id: self.agent_id,
token: self.guard_token.clone(),
role: self.role.clone(),
scopes: self.scopes.clone(),
task_type: self.task_type.clone().unwrap_or_default(),
expires_at: self.expires_at.timestamp(),
}
}
pub fn is_valid(&self) -> bool {
self.expires_at > Utc::now()
}
}
impl ClawDBSession {
pub fn from_context(ctx: SessionContext) -> Self {
let expires_at = chrono::DateTime::from_timestamp(ctx.expires_at, 0)
.unwrap_or_else(|| Utc::now() + chrono::Duration::hours(1));
Self {
id: ctx.session_id,
agent_id: ctx.agent_id,
workspace_id: Uuid::nil(),
role: ctx.role.clone(),
scopes: ctx.scopes.clone(),
task_type: if ctx.task_type.is_empty() {
None
} else {
Some(ctx.task_type.clone())
},
guard_token: ctx.token.clone(),
created_at: Utc::now(),
expires_at,
metadata: serde_json::Value::Null,
}
}
}
pub struct SessionManager {
lifecycle: Arc<ComponentLifecycleManager>,
store: Arc<SessionStore>,
event_bus: Arc<EventBus>,
config: Arc<ClawDBConfig>,
}
impl SessionManager {
pub fn new(
lifecycle: Arc<ComponentLifecycleManager>,
event_bus: Arc<EventBus>,
config: Arc<ClawDBConfig>,
) -> Self {
Self {
lifecycle,
store: Arc::new(SessionStore::new()),
event_bus,
config,
}
}
pub fn store(&self) -> &Arc<SessionStore> {
&self.store
}
#[tracing::instrument(skip(self), fields(agent_id = %agent_id, role = role))]
pub async fn create(
&self,
agent_id: Uuid,
role: &str,
scopes: Vec<String>,
task_type: Option<String>,
) -> ClawDBResult<ClawDBSession> {
let guard = self.lifecycle.guard()?;
let workspace_id = self.config.workspace_id;
let principal = format!("agent:{agent_id}");
let task = task_type.as_deref().unwrap_or("default");
let guard_token = guard
.issue_session_token(&principal, task)
.await
.map_err(ClawDBError::Guard)?;
let now = Utc::now();
let expires_at = now + chrono::Duration::seconds(DEFAULT_SESSION_SECS);
let session = ClawDBSession {
id: Uuid::new_v4(),
agent_id,
workspace_id,
role: role.to_string(),
scopes: scopes.clone(),
task_type: task_type.clone(),
guard_token,
created_at: now,
expires_at,
metadata: serde_json::Value::Null,
};
let ctx = session.as_context();
self.store.insert(ctx);
self.event_bus.emit(ClawEvent::SessionCreated {
agent_id,
session_id: session.id,
});
tracing::info!(session_id = %session.id, agent_id = %agent_id, role, "session created");
Ok(session)
}
#[tracing::instrument(skip(self, guard_token))]
pub async fn validate(&self, guard_token: &str) -> ClawDBResult<SessionContext> {
let guard = self.lifecycle.guard()?;
let claims = guard
.validate_token(guard_token)
.await
.map_err(ClawDBError::Guard)?;
let session_id = claims.session_id();
let ctx = self.store.get(session_id)?;
Ok(ctx)
}
#[tracing::instrument(skip(self), fields(session_id = %session_id))]
pub async fn refresh(&self, session_id: Uuid) -> ClawDBResult<SessionContext> {
let existing = self.store.get(session_id)?;
let guard = self.lifecycle.guard()?;
let principal = format!("agent:{}", existing.agent_id);
let new_token = guard
.issue_session_token(&principal, &existing.task_type)
.await
.map_err(ClawDBError::Guard)?;
let new_expiry =
Utc::now().timestamp() + DEFAULT_SESSION_SECS;
let refreshed = SessionContext {
token: new_token,
expires_at: new_expiry,
..existing
};
self.store.insert(refreshed.clone());
tracing::debug!(session_id = %session_id, "session refreshed");
Ok(refreshed)
}
#[tracing::instrument(skip(self), fields(session_id = %session_id))]
pub async fn revoke(&self, session_id: Uuid) -> ClawDBResult<()> {
let ctx = self
.store
.remove(session_id)
.ok_or(ClawDBError::SessionNotFound(session_id))?;
if let Ok(guard) = self.lifecycle.guard() {
if let Err(e) = guard.revoke_session(session_id).await {
tracing::warn!(session_id = %session_id, "guard revoke_session failed: {e}");
}
}
self.event_bus.emit(ClawEvent::SessionExpired {
agent_id: ctx.agent_id,
session_id,
});
tracing::info!(session_id = %session_id, "session revoked");
Ok(())
}
pub async fn revoke_all_for_agent(&self, agent_id: Uuid) -> ClawDBResult<u32> {
let ids = self.store.ids_for_agent(agent_id);
let mut count = 0u32;
for session_id in ids {
if let Err(e) = self.revoke(session_id).await {
tracing::warn!(session_id = %session_id, "revoke failed: {e}");
} else {
count += 1;
}
}
Ok(count)
}
pub async fn create_session(
&self,
agent_id: Uuid,
role: &str,
scopes: Vec<String>,
task_type: &str,
) -> ClawDBResult<SessionContext> {
let session = self
.create(agent_id, role, scopes, Some(task_type.to_string()))
.await?;
Ok(session.as_context())
}
pub fn get_session(&self, session_id: Uuid) -> ClawDBResult<SessionContext> {
self.store.get(session_id)
}
pub fn invalidate(&self, session_id: Uuid) {
self.store.remove(session_id);
}
}
}
pub fn active_count(&self) -> usize {
self.store.count()
}
pub fn prune_expired(&self) -> usize {
self.store.prune_expired()
}
}