use crate::{layers::manager::LayerManager, CortexFilesystem, FilesystemOperations, Result};
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use std::sync::Arc;
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
#[serde(rename_all = "lowercase")]
pub enum MessageRole {
User,
Assistant,
System,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Message {
pub id: String,
pub role: MessageRole,
pub content: String,
pub timestamp: DateTime<Utc>,
pub created_at: DateTime<Utc>, pub metadata: Option<serde_json::Value>,
}
impl Message {
pub fn new(role: MessageRole, content: impl Into<String>) -> Self {
let timestamp = Utc::now();
Self {
id: uuid::Uuid::new_v4().to_string(),
role,
content: content.into(),
timestamp,
created_at: timestamp,
metadata: None,
}
}
pub fn user(content: impl Into<String>) -> Self {
Self::new(MessageRole::User, content)
}
pub fn assistant(content: impl Into<String>) -> Self {
Self::new(MessageRole::Assistant, content)
}
pub fn system(content: impl Into<String>) -> Self {
Self::new(MessageRole::System, content)
}
pub fn with_metadata(mut self, metadata: serde_json::Value) -> Self {
self.metadata = Some(metadata);
self
}
pub fn to_markdown(&self) -> String {
let role_emoji = match self.role {
MessageRole::User => "👤",
MessageRole::Assistant => "🤖",
MessageRole::System => "⚙️",
};
let timestamp = self.timestamp.format("%Y-%m-%d %H:%M:%S UTC");
let mut md = format!(
"# {} {:?}\n\n**ID**: `{}` \n**Timestamp**: {}\n\n",
role_emoji, self.role, self.id, timestamp
);
md.push_str("## Content\n\n");
md.push_str(&self.content);
md.push_str("\n\n");
if let Some(ref metadata) = self.metadata {
md.push_str("## Metadata\n\n");
md.push_str("```json\n");
md.push_str(&serde_json::to_string_pretty(metadata).unwrap_or_default());
md.push_str("\n```\n");
}
md
}
}
pub struct MessageStorage {
filesystem: Arc<CortexFilesystem>,
}
impl MessageStorage {
pub fn new(filesystem: Arc<CortexFilesystem>) -> Self {
Self { filesystem }
}
pub async fn save_message(&self, thread_id: &str, message: &Message) -> Result<String> {
let timestamp = message.timestamp;
let year_month = timestamp.format("%Y-%m").to_string();
let day = timestamp.format("%d").to_string();
let filename = format!(
"{}_{}.md",
timestamp.format("%H_%M_%S"),
&message.id[..8] );
let uri = format!(
"cortex://session/{}/timeline/{}/{}/{}",
thread_id, year_month, day, filename
);
let content = message.to_markdown();
self.filesystem.write(&uri, &content).await?;
Ok(uri)
}
pub async fn save_message_with_layers(
&self,
thread_id: &str,
message: &Message,
layer_manager: &LayerManager,
) -> Result<String> {
let uri = self.save_message(thread_id, message).await?;
let thread_uri = format!("cortex://session/{}", thread_id);
let content = message.to_markdown();
if let Err(e) = layer_manager
.generate_all_layers(&thread_uri, &content, &[])
.await
{
tracing::warn!("Failed to generate layers for thread {}: {}", thread_id, e);
}
Ok(uri)
}
pub async fn load_message(&self, uri: &str) -> Result<Message> {
let content = self.filesystem.read(uri).await?;
let lines: Vec<&str> = content.lines().collect();
let id = lines
.iter()
.find(|l| l.starts_with("**ID**:"))
.and_then(|l| l.split('`').nth(1))
.unwrap_or("unknown")
.to_string();
let role = if content.contains("User") {
MessageRole::User
} else if content.contains("Assistant") {
MessageRole::Assistant
} else {
MessageRole::System
};
let content_start = content.find("## Content").unwrap_or(0) + 12;
let content_end = content.find("## Metadata").unwrap_or(content.len());
let message_content = content[content_start..content_end].trim().to_string();
let timestamp = Utc::now();
Ok(Message {
id,
role,
content: message_content,
timestamp,
created_at: timestamp,
metadata: None,
})
}
pub async fn list_messages(&self, thread_id: &str) -> Result<Vec<String>> {
let timeline_uri = format!("cortex://session/{}/timeline", thread_id);
let mut messages = Vec::new();
self.collect_message_uris_recursive(&timeline_uri, &mut messages).await?;
Ok(messages)
}
fn collect_message_uris_recursive<'a>(
&'a self,
uri: &'a str,
result: &'a mut Vec<String>,
) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<()>> + Send + 'a>> {
Box::pin(async move {
match self.filesystem.list(uri).await {
Ok(entries) => {
for entry in entries {
if entry.is_directory && !entry.name.starts_with('.') {
self.collect_message_uris_recursive(&entry.uri, result).await?;
} else if entry.name.ends_with(".md") && !entry.name.starts_with('.') {
result.push(entry.uri.clone());
}
}
}
Err(e) => {
tracing::debug!("Failed to list directory {}: {}", uri, e);
}
}
Ok(())
})
}
pub async fn delete_message(&self, uri: &str) -> Result<()> {
self.filesystem.delete(uri).await
}
pub async fn batch_save(&self, thread_id: &str, messages: &[Message]) -> Result<Vec<String>> {
let mut uris = Vec::new();
for message in messages {
let uri = self.save_message(thread_id, message).await?;
uris.push(uri);
}
Ok(uris)
}
}