use crate::events::{CortexEvent, EventBus, SessionEvent};
use crate::llm::LLMClient;
use crate::{CortexFilesystem, FilesystemOperations, MessageStorage, ParticipantManager, Result};
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use tracing::{info, warn};
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
#[serde(rename_all = "lowercase")]
pub enum SessionStatus {
Active,
Closed,
Archived,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SessionMetadata {
pub thread_id: String,
pub status: SessionStatus,
pub created_at: DateTime<Utc>,
pub updated_at: DateTime<Utc>,
pub closed_at: Option<DateTime<Utc>>,
pub message_count: usize,
pub participants: Vec<String>, pub tags: Vec<String>,
pub title: Option<String>,
pub description: Option<String>,
pub user_id: Option<String>,
pub agent_id: Option<String>,
}
impl SessionMetadata {
pub fn new(thread_id: impl Into<String>) -> Self {
let now = Utc::now();
Self {
thread_id: thread_id.into(),
status: SessionStatus::Active,
created_at: now,
updated_at: now,
closed_at: None,
message_count: 0,
participants: Vec::new(),
tags: Vec::new(),
title: None,
description: None,
user_id: None,
agent_id: None,
}
}
pub fn with_ids(
thread_id: impl Into<String>,
user_id: Option<String>,
agent_id: Option<String>,
) -> Self {
let mut metadata = Self::new(thread_id);
metadata.user_id = user_id;
metadata.agent_id = agent_id;
metadata
}
pub fn close(&mut self) {
self.status = SessionStatus::Closed;
self.closed_at = Some(Utc::now());
self.updated_at = Utc::now();
}
pub fn archive(&mut self) {
self.status = SessionStatus::Archived;
self.updated_at = Utc::now();
}
pub fn update_message_count(&mut self, count: usize) {
self.message_count = count;
self.updated_at = Utc::now();
}
pub fn add_participant(&mut self, participant_id: impl Into<String>) {
let id = participant_id.into();
if !self.participants.contains(&id) {
self.participants.push(id);
self.updated_at = Utc::now();
}
}
pub fn add_tag(&mut self, tag: impl Into<String>) {
let t = tag.into();
if !self.tags.contains(&t) {
self.tags.push(t);
self.updated_at = Utc::now();
}
}
pub fn set_title(&mut self, title: impl Into<String>) {
self.title = Some(title.into());
self.updated_at = Utc::now();
}
pub fn to_markdown(&self) -> String {
let mut md = String::new();
md.push_str(&format!("# Session: {}\n\n", self.thread_id));
if let Some(ref title) = self.title {
md.push_str(&format!("**Title**: {}\n\n", title));
}
md.push_str(&format!("**Status**: {:?}\n", self.status));
md.push_str(&format!(
"**Created**: {}\n",
self.created_at.format("%Y-%m-%d %H:%M:%S UTC")
));
md.push_str(&format!(
"**Updated**: {}\n",
self.updated_at.format("%Y-%m-%d %H:%M:%S UTC")
));
if let Some(closed_at) = self.closed_at {
md.push_str(&format!(
"**Closed**: {}\n",
closed_at.format("%Y-%m-%d %H:%M:%S UTC")
));
}
md.push_str(&format!("**Messages**: {}\n", self.message_count));
md.push_str(&format!("**Participants**: {}\n", self.participants.len()));
if !self.tags.is_empty() {
md.push_str(&format!("**Tags**: {}\n", self.tags.join(", ")));
}
if let Some(ref description) = self.description {
md.push_str(&format!("\n## Description\n\n{}\n", description));
}
md.push_str("\n## Participants\n\n");
for participant in &self.participants {
md.push_str(&format!("- {}\n", participant));
}
md
}
}
#[derive(Debug, Clone)]
pub struct SessionConfig {
pub max_messages_per_session: Option<usize>,
pub auto_archive_after_days: Option<i64>,
}
impl Default for SessionConfig {
fn default() -> Self {
Self {
max_messages_per_session: None,
auto_archive_after_days: Some(30),
}
}
}
#[derive(Debug, Clone, Default)]
pub struct ExtractionStats {
pub preferences: usize,
pub entities: usize,
pub events: usize,
pub cases: usize,
pub personal_info: usize,
pub work_history: usize,
pub relationships: usize,
pub goals: usize,
}
pub struct SessionManager {
filesystem: Arc<CortexFilesystem>,
message_storage: MessageStorage,
participant_manager: ParticipantManager,
#[allow(dead_code)]
config: SessionConfig,
llm_client: Option<Arc<dyn LLMClient>>,
event_bus: Option<EventBus>,
memory_event_tx: Option<tokio::sync::mpsc::UnboundedSender<crate::memory_events::MemoryEvent>>,
}
impl SessionManager {
pub fn new(filesystem: Arc<CortexFilesystem>, config: SessionConfig) -> Self {
let message_storage = MessageStorage::new(filesystem.clone());
let participant_manager = ParticipantManager::new();
Self {
filesystem,
message_storage,
participant_manager,
config,
llm_client: None,
event_bus: None,
memory_event_tx: None,
}
}
pub fn new_with_llm(
filesystem: Arc<CortexFilesystem>,
config: SessionConfig,
llm_client: Arc<dyn LLMClient>,
) -> Self {
let message_storage = MessageStorage::new(filesystem.clone());
let participant_manager = ParticipantManager::new();
Self {
filesystem,
message_storage,
participant_manager,
config,
llm_client: Some(llm_client),
event_bus: None,
memory_event_tx: None,
}
}
pub fn with_event_bus(
filesystem: Arc<CortexFilesystem>,
config: SessionConfig,
event_bus: EventBus,
) -> Self {
let message_storage = MessageStorage::new(filesystem.clone());
let participant_manager = ParticipantManager::new();
Self {
filesystem,
message_storage,
participant_manager,
config,
llm_client: None,
event_bus: Some(event_bus),
memory_event_tx: None,
}
}
pub fn with_llm_and_events(
filesystem: Arc<CortexFilesystem>,
config: SessionConfig,
llm_client: Arc<dyn LLMClient>,
event_bus: EventBus,
) -> Self {
let message_storage = MessageStorage::new(filesystem.clone());
let participant_manager = ParticipantManager::new();
Self {
filesystem,
message_storage,
participant_manager,
config,
llm_client: Some(llm_client),
event_bus: Some(event_bus),
memory_event_tx: None,
}
}
pub fn with_memory_event_tx(mut self, tx: tokio::sync::mpsc::UnboundedSender<crate::memory_events::MemoryEvent>) -> Self {
self.memory_event_tx = Some(tx);
self
}
pub fn switch_filesystem(&mut self, filesystem: Arc<CortexFilesystem>) {
self.message_storage = MessageStorage::new(filesystem.clone());
self.filesystem = filesystem;
}
pub fn llm_client(&self) -> Option<&Arc<dyn LLMClient>> {
self.llm_client.as_ref()
}
pub async fn create_session_with_ids(
&self,
thread_id: &str,
user_id: Option<String>,
agent_id: Option<String>,
) -> Result<SessionMetadata> {
let metadata = SessionMetadata::with_ids(thread_id, user_id, agent_id);
let metadata_uri = format!("cortex://session/{}/.session.json", thread_id);
let metadata_json = serde_json::to_string_pretty(&metadata)?;
self.filesystem.write(&metadata_uri, &metadata_json).await?;
if let Some(ref bus) = self.event_bus {
let _ = bus.publish(CortexEvent::Session(SessionEvent::Created {
session_id: thread_id.to_string(),
}));
}
Ok(metadata)
}
pub async fn load_session(&self, thread_id: &str) -> Result<SessionMetadata> {
let metadata_uri = format!("cortex://session/{}/.session.json", thread_id);
let metadata_json = self.filesystem.read(&metadata_uri).await?;
let metadata: SessionMetadata = serde_json::from_str(&metadata_json)?;
Ok(metadata)
}
pub async fn update_session(&self, metadata: &SessionMetadata) -> Result<()> {
let metadata_uri = format!("cortex://session/{}/.session.json", metadata.thread_id);
let metadata_json = serde_json::to_string_pretty(metadata)?;
self.filesystem.write(&metadata_uri, &metadata_json).await?;
Ok(())
}
pub async fn close_session_metadata_only(&mut self, thread_id: &str) -> Result<SessionMetadata> {
let mut metadata = self.load_session(thread_id).await?;
metadata.close();
self.update_session(&metadata).await?;
if let Some(ref bus) = self.event_bus {
let _ = bus.publish(CortexEvent::Session(SessionEvent::Closed {
session_id: thread_id.to_string(),
}));
}
info!("Session {} metadata closed (event emission skipped; caller handles processing)", thread_id);
Ok(metadata)
}
pub async fn close_session(&mut self, thread_id: &str) -> Result<SessionMetadata> {
let metadata = self.close_session_metadata_only(thread_id).await?;
if let Some(ref tx) = self.memory_event_tx {
let user_id = metadata.user_id.clone().unwrap_or_else(|| "default".to_string());
let agent_id = metadata.agent_id.clone().unwrap_or_else(|| "default".to_string());
let _ = tx.send(crate::memory_events::MemoryEvent::SessionClosed {
session_id: thread_id.to_string(),
user_id: user_id.clone(),
agent_id: agent_id.clone(),
});
info!(
"Session {} closed, SessionClosed event queued for async processing (user_id={}, agent_id={})",
thread_id, user_id, agent_id
);
} else {
warn!(
"memory_event_tx is None, SessionClosed event NOT sent for session {}",
thread_id
);
}
Ok(metadata)
}
pub async fn archive_session(&self, thread_id: &str) -> Result<SessionMetadata> {
let mut metadata = self.load_session(thread_id).await?;
metadata.archive();
self.update_session(&metadata).await?;
Ok(metadata)
}
pub async fn delete_session(&self, thread_id: &str) -> Result<()> {
let session_uri = format!("cortex://session/{}", thread_id);
self.filesystem.delete(&session_uri).await
}
pub async fn session_exists(&self, thread_id: &str) -> Result<bool> {
let metadata_uri = format!("cortex://session/{}/.session.json", thread_id);
self.filesystem.exists(&metadata_uri).await
}
pub fn message_storage(&self) -> &MessageStorage {
&self.message_storage
}
pub fn participant_manager(&mut self) -> &mut ParticipantManager {
&mut self.participant_manager
}
pub async fn add_message(
&self,
thread_id: &str,
role: crate::session::MessageRole,
content: String,
) -> Result<crate::session::Message> {
use crate::session::Message;
let message = Message::new(role, content);
let message_id = message.id.clone();
self.message_storage
.save_message(thread_id, &message)
.await?;
let mut metadata = self.load_session(thread_id).await?;
metadata.update_message_count(metadata.message_count + 1);
self.update_session(&metadata).await?;
if let Some(ref bus) = self.event_bus {
let _ = bus.publish(CortexEvent::Session(SessionEvent::MessageAdded {
session_id: thread_id.to_string(),
message_id: message_id.clone(),
}));
}
Ok(message)
}
}