use anyhow::Result;
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use serde_json::json;
use std::fs::{self, File};
use std::io::{BufRead, BufReader, Write};
use std::path::PathBuf;
use uuid::Uuid;
use super::providers::{LLMProvider, Message, Role, ToolCall, Usage};
pub const CURRENT_SESSION_VERSION: u32 = 1;
#[derive(Debug, Clone)]
pub struct Session {
id: String,
created_at: DateTime<Utc>,
cwd: String,
messages: Vec<SessionMessage>,
system_context: Option<String>,
token_count: usize,
compaction_count: u32,
memory_flush_compaction_count: u32,
}
#[derive(Debug, Clone)]
pub struct SessionMessage {
pub message: Message,
pub provider: Option<String>,
pub model: Option<String>,
pub api: Option<String>,
pub usage: Option<MessageUsage>,
pub stop_reason: Option<String>,
pub timestamp: u64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct MessageUsage {
pub input: u64,
pub output: u64,
#[serde(skip_serializing_if = "Option::is_none")]
pub cache_read: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub cache_write: Option<u64>,
pub total_tokens: u64,
#[serde(skip_serializing_if = "Option::is_none")]
pub cost: Option<MessageCost>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct MessageCost {
pub input: f64,
pub output: f64,
pub total: f64,
}
impl From<&Usage> for MessageUsage {
fn from(usage: &Usage) -> Self {
Self {
input: usage.input_tokens,
output: usage.output_tokens,
cache_read: None,
cache_write: None,
total_tokens: usage.total(),
cost: None, }
}
}
impl SessionMessage {
pub fn new(message: Message) -> Self {
Self {
message,
provider: None,
model: None,
api: None,
usage: None,
stop_reason: None,
timestamp: Utc::now().timestamp_millis() as u64,
}
}
pub fn with_metadata(
message: Message,
provider: Option<&str>,
model: Option<&str>,
usage: Option<&Usage>,
stop_reason: Option<&str>,
) -> Self {
Self {
message,
provider: provider.map(|s| s.to_string()),
model: model.map(|s| s.to_string()),
api: provider.map(|p| format!("{}-messages", p)), usage: usage.map(MessageUsage::from),
stop_reason: stop_reason.map(|s| s.to_string()),
timestamp: Utc::now().timestamp_millis() as u64,
}
}
}
#[derive(Debug, Clone)]
pub struct SessionStatus {
pub id: String,
pub message_count: usize,
pub token_count: usize,
pub compaction_count: u32,
pub api_input_tokens: u64,
pub api_output_tokens: u64,
}
impl Session {
pub fn new() -> Self {
Self::new_with_cwd(
std::env::current_dir()
.map(|p| p.to_string_lossy().to_string())
.unwrap_or_else(|_| ".".to_string()),
)
}
pub fn new_with_cwd(cwd: String) -> Self {
Self {
id: Uuid::new_v4().to_string(),
created_at: Utc::now(),
cwd,
messages: Vec::new(),
system_context: None,
token_count: 0,
compaction_count: 0,
memory_flush_compaction_count: 0,
}
}
pub fn id(&self) -> &str {
&self.id
}
pub fn token_count(&self) -> usize {
self.token_count
}
pub fn compaction_count(&self) -> u32 {
self.compaction_count
}
pub fn should_memory_flush(&self) -> bool {
self.memory_flush_compaction_count <= self.compaction_count
}
pub fn mark_memory_flushed(&mut self) {
self.memory_flush_compaction_count = self.compaction_count + 1;
}
pub fn set_system_context(&mut self, context: String) {
self.system_context = Some(context);
self.recalculate_tokens();
}
pub fn add_message(&mut self, message: Message) {
let tokens = estimate_tokens(&message.content);
self.token_count += tokens;
self.messages.push(SessionMessage::new(message));
}
pub fn add_message_with_metadata(
&mut self,
message: Message,
provider: Option<&str>,
model: Option<&str>,
usage: Option<&Usage>,
stop_reason: Option<&str>,
) {
let tokens = estimate_tokens(&message.content);
self.token_count += tokens;
self.messages.push(SessionMessage::with_metadata(
message,
provider,
model,
usage,
stop_reason,
));
}
pub fn messages_for_llm(&self) -> Vec<Message> {
let mut messages = Vec::new();
if let Some(ref context) = self.system_context {
messages.push(Message {
role: Role::System,
content: context.clone(),
tool_calls: None,
tool_call_id: None,
images: Vec::new(),
});
}
messages.extend(self.messages.iter().map(|sm| sm.message.clone()));
messages
}
pub fn messages(&self) -> Vec<&Message> {
self.messages.iter().map(|sm| &sm.message).collect()
}
pub fn raw_messages(&self) -> &[SessionMessage] {
&self.messages
}
pub fn user_assistant_messages(&self) -> Vec<Message> {
self.messages
.iter()
.filter(|sm| matches!(sm.message.role, Role::User | Role::Assistant))
.map(|sm| sm.message.clone())
.collect()
}
pub async fn compact(&mut self, provider: &dyn LLMProvider) -> Result<()> {
if self.messages.len() < 4 {
return Ok(());
}
let keep_count = 4;
let to_summarize = &self.messages[..self.messages.len() - keep_count];
let text: String = to_summarize
.iter()
.map(|sm| format!("{:?}: {}", sm.message.role, sm.message.content))
.collect::<Vec<_>>()
.join("\n\n");
let summary = provider.summarize(&text).await?;
let mut new_messages = vec![SessionMessage::new(Message {
role: Role::System,
content: format!("Previous conversation summary:\n\n{}", summary),
tool_calls: None,
tool_call_id: None,
images: Vec::new(),
})];
new_messages.extend(self.messages[self.messages.len() - keep_count..].to_vec());
self.messages = new_messages;
self.compaction_count += 1;
self.recalculate_tokens();
Ok(())
}
fn recalculate_tokens(&mut self) {
self.token_count = 0;
if let Some(ref context) = self.system_context {
self.token_count += estimate_tokens(context);
}
for sm in &self.messages {
self.token_count += estimate_tokens(&sm.message.content);
}
}
pub fn save(&self) -> Result<PathBuf> {
let dir = get_sessions_dir()?;
fs::create_dir_all(&dir)?;
let path = dir.join(format!("{}.jsonl", self.id));
self.save_to_path(&path)?;
Ok(path)
}
pub fn save_for_agent(&self, agent_id: &str) -> Result<PathBuf> {
let dir = get_sessions_dir_for_agent(agent_id)?;
fs::create_dir_all(&dir)?;
let path = dir.join(format!("{}.jsonl", self.id));
self.save_to_path(&path)?;
Ok(path)
}
fn save_to_path(&self, path: &PathBuf) -> Result<()> {
let mut file = File::create(path)?;
let header = json!({
"type": "session",
"version": CURRENT_SESSION_VERSION,
"id": self.id,
"timestamp": self.created_at.to_rfc3339(),
"cwd": self.cwd,
"compactionCount": self.compaction_count,
"memoryFlushCompactionCount": self.memory_flush_compaction_count
});
writeln!(file, "{}", serde_json::to_string(&header)?)?;
if let Some(ref context) = self.system_context {
let system_msg = self.format_message_entry(&SessionMessage::new(Message {
role: Role::System,
content: context.clone(),
tool_calls: None,
tool_call_id: None,
images: Vec::new(),
}));
writeln!(file, "{}", serde_json::to_string(&system_msg)?)?;
}
for sm in &self.messages {
let entry = self.format_message_entry(sm);
writeln!(file, "{}", serde_json::to_string(&entry)?)?;
}
Ok(())
}
fn format_message_entry(&self, sm: &SessionMessage) -> serde_json::Value {
let role = match sm.message.role {
Role::User => "user",
Role::Assistant => "assistant",
Role::System => "system",
Role::Tool => "toolResult",
};
let mut content = Vec::new();
if !sm.message.content.is_empty() {
content.push(json!({
"type": "text",
"text": sm.message.content
}));
}
for img in &sm.message.images {
content.push(json!({
"type": "image_url",
"image_url": {
"url": format!("data:{};base64,{}", img.media_type, img.data)
}
}));
}
let mut message = json!({
"role": role,
"content": content
});
if let Some(ref tool_calls) = sm.message.tool_calls {
let tc: Vec<serde_json::Value> = tool_calls
.iter()
.map(|tc| {
json!({
"id": tc.id,
"name": tc.name,
"arguments": tc.arguments
})
})
.collect();
message["toolCalls"] = json!(tc);
}
if let Some(ref id) = sm.message.tool_call_id {
message["toolCallId"] = json!(id);
}
if let Some(ref provider) = sm.provider {
message["provider"] = json!(provider);
}
if let Some(ref model) = sm.model {
message["model"] = json!(model);
}
if let Some(ref api) = sm.api {
message["api"] = json!(api);
}
if let Some(ref usage) = sm.usage {
message["usage"] = serde_json::to_value(usage).unwrap_or(json!(null));
}
if let Some(ref reason) = sm.stop_reason {
message["stopReason"] = json!(reason);
}
message["timestamp"] = json!(sm.timestamp);
json!({
"type": "message",
"message": message
})
}
pub fn load(session_id: &str) -> Result<Self> {
let dir = get_sessions_dir()?;
let path = dir.join(format!("{}.jsonl", session_id));
if !path.exists() {
anyhow::bail!("Session not found: {}", session_id);
}
Self::load_from_path(&path, session_id)
}
fn load_from_path(path: &PathBuf, session_id: &str) -> Result<Self> {
let file = File::open(path)?;
let reader = BufReader::new(file);
let mut session = Session {
id: session_id.to_string(),
created_at: Utc::now(),
cwd: ".".to_string(),
messages: Vec::new(),
system_context: None,
token_count: 0,
compaction_count: 0,
memory_flush_compaction_count: 0,
};
for line in reader.lines() {
let line = line?;
if line.trim().is_empty() {
continue;
}
let entry: serde_json::Value = match serde_json::from_str(&line) {
Ok(v) => v,
Err(_) => continue, };
match entry["type"].as_str() {
Some("session") => {
if let Some(ts) = entry["timestamp"].as_str()
&& let Ok(dt) = DateTime::parse_from_rfc3339(ts)
{
session.created_at = dt.with_timezone(&Utc);
}
if let Some(cwd) = entry["cwd"].as_str() {
session.cwd = cwd.to_string();
}
if let Some(count) = entry["compactionCount"].as_u64() {
session.compaction_count = count as u32;
}
if let Some(count) = entry["memoryFlushCompactionCount"].as_u64() {
session.memory_flush_compaction_count = count as u32;
}
}
Some("message") => {
if let Some(msg_obj) = entry.get("message")
&& let Some(sm) = Self::parse_pi_message(msg_obj)
{
if sm.message.role == Role::System && session.system_context.is_none() {
session.system_context = Some(sm.message.content);
} else {
session.messages.push(sm);
}
}
}
_ => {}
}
}
session.recalculate_tokens();
Ok(session)
}
fn parse_pi_message(msg: &serde_json::Value) -> Option<SessionMessage> {
let role = match msg["role"].as_str()? {
"user" => Role::User,
"assistant" => Role::Assistant,
"system" => Role::System,
"toolResult" | "tool" => Role::Tool,
_ => return None,
};
let content = if let Some(arr) = msg["content"].as_array() {
arr.iter()
.filter_map(|item| {
if item["type"].as_str() == Some("text") {
item["text"].as_str().map(|s| s.to_string())
} else {
None
}
})
.collect::<Vec<_>>()
.join("")
} else if let Some(s) = msg["content"].as_str() {
s.to_string()
} else {
String::new()
};
let tool_calls = msg["toolCalls"].as_array().map(|arr| {
arr.iter()
.filter_map(|tc| {
Some(ToolCall {
id: tc["id"].as_str()?.to_string(),
name: tc["name"].as_str()?.to_string(),
arguments: tc["arguments"].as_str().unwrap_or("{}").to_string(),
})
})
.collect()
});
let tool_call_id = msg["toolCallId"].as_str().map(|s| s.to_string());
let usage = serde_json::from_value(msg["usage"].clone()).ok();
Some(SessionMessage {
message: Message {
role,
content,
tool_calls,
tool_call_id,
images: Vec::new(), },
provider: msg["provider"].as_str().map(|s| s.to_string()),
model: msg["model"].as_str().map(|s| s.to_string()),
api: msg["api"].as_str().map(|s| s.to_string()),
usage,
stop_reason: msg["stopReason"].as_str().map(|s| s.to_string()),
timestamp: msg["timestamp"].as_u64().unwrap_or(0),
})
}
pub fn status(&self) -> SessionStatus {
SessionStatus {
id: self.id.clone(),
message_count: self.messages.len(),
token_count: self.token_count,
compaction_count: self.compaction_count,
api_input_tokens: 0,
api_output_tokens: 0,
}
}
pub fn status_with_usage(&self, input_tokens: u64, output_tokens: u64) -> SessionStatus {
SessionStatus {
id: self.id.clone(),
message_count: self.messages.len(),
token_count: self.token_count,
compaction_count: self.compaction_count,
api_input_tokens: input_tokens,
api_output_tokens: output_tokens,
}
}
pub fn auto_save(&self) -> Result<()> {
if self.messages.is_empty() {
return Ok(());
}
self.save()?;
Ok(())
}
}
impl Default for Session {
fn default() -> Self {
Self::new()
}
}
pub const DEFAULT_AGENT_ID: &str = "main";
fn get_sessions_dir() -> Result<PathBuf> {
get_sessions_dir_for_agent(DEFAULT_AGENT_ID)
}
pub fn get_sessions_dir_for_agent(agent_id: &str) -> Result<PathBuf> {
let base = directories::BaseDirs::new()
.ok_or_else(|| anyhow::anyhow!("Could not determine home directory"))?;
Ok(base
.home_dir()
.join(".localgpt")
.join("agents")
.join(agent_id)
.join("sessions"))
}
pub fn get_state_dir() -> Result<PathBuf> {
let base = directories::BaseDirs::new()
.ok_or_else(|| anyhow::anyhow!("Could not determine home directory"))?;
Ok(base.home_dir().join(".localgpt"))
}
fn estimate_tokens(text: &str) -> usize {
text.len() / 4
}
#[derive(Debug, Clone)]
pub struct SessionInfo {
pub id: String,
pub created_at: DateTime<Utc>,
pub message_count: usize,
pub file_size: u64,
}
pub fn list_sessions() -> Result<Vec<SessionInfo>> {
list_sessions_for_agent(DEFAULT_AGENT_ID)
}
pub fn list_sessions_for_agent(agent_id: &str) -> Result<Vec<SessionInfo>> {
let sessions_dir = get_sessions_dir_for_agent(agent_id)?;
if !sessions_dir.exists() {
return Ok(Vec::new());
}
let mut sessions = Vec::new();
for entry in fs::read_dir(&sessions_dir)? {
let entry = entry?;
let path = entry.path();
if path.extension().map(|e| e != "jsonl").unwrap_or(true) {
continue;
}
let filename = path.file_stem().and_then(|s| s.to_str()).unwrap_or("");
if filename.len() < 32 {
continue;
}
let metadata = fs::metadata(&path)?;
let file_size = metadata.len();
if let Ok(file) = File::open(&path) {
let reader = BufReader::new(file);
if let Some(Ok(first_line)) = reader.lines().next()
&& let Ok(header) = serde_json::from_str::<serde_json::Value>(&first_line)
{
if header["type"].as_str() == Some("session") {
let created_at = header["timestamp"]
.as_str()
.and_then(|s| DateTime::parse_from_rfc3339(s).ok())
.map(|dt| dt.with_timezone(&Utc))
.unwrap_or_else(Utc::now);
let message_count = fs::read_to_string(&path)
.map(|s| s.lines().count().saturating_sub(1))
.unwrap_or(0);
sessions.push(SessionInfo {
id: filename.to_string(),
created_at,
message_count,
file_size,
});
}
}
}
}
sessions.sort_by(|a, b| b.created_at.cmp(&a.created_at));
Ok(sessions)
}
pub fn get_last_session_id() -> Result<Option<String>> {
get_last_session_id_for_agent(DEFAULT_AGENT_ID)
}
pub fn get_last_session_id_for_agent(agent_id: &str) -> Result<Option<String>> {
let sessions = list_sessions_for_agent(agent_id)?;
Ok(sessions.first().map(|s| s.id.clone()))
}
#[derive(Debug, Clone)]
pub struct SessionSearchResult {
pub session_id: String,
pub created_at: DateTime<Utc>,
pub message_preview: String,
pub match_count: usize,
}
pub fn search_sessions(query: &str) -> Result<Vec<SessionSearchResult>> {
search_sessions_for_agent(DEFAULT_AGENT_ID, query)
}
pub fn search_sessions_for_agent(agent_id: &str, query: &str) -> Result<Vec<SessionSearchResult>> {
let sessions_dir = get_sessions_dir_for_agent(agent_id)?;
if !sessions_dir.exists() {
return Ok(Vec::new());
}
let query_lower = query.to_lowercase();
let mut results = Vec::new();
for entry in fs::read_dir(&sessions_dir)? {
let entry = entry?;
let path = entry.path();
if path.extension().map(|e| e == "jsonl").unwrap_or(false)
&& let Some(filename) = path.file_stem().and_then(|s| s.to_str())
&& let Ok(content) = fs::read_to_string(&path)
{
let content_lower = content.to_lowercase();
let match_count = content_lower.matches(&query_lower).count();
if match_count > 0 {
let preview = extract_match_preview(&content, &query_lower, 100);
let created_at = fs::metadata(&path)
.and_then(|m| m.created())
.map(DateTime::<Utc>::from)
.unwrap_or_else(|_| Utc::now());
results.push(SessionSearchResult {
session_id: filename.to_string(),
created_at,
message_preview: preview,
match_count,
});
}
}
}
results.sort_by(|a, b| b.match_count.cmp(&a.match_count));
Ok(results)
}
fn extract_match_preview(content: &str, query_lower: &str, max_len: usize) -> String {
let content_lower = content.to_lowercase();
if let Some(pos) = content_lower.find(query_lower) {
let half_len = max_len / 2;
let start = pos.saturating_sub(half_len);
let end = (pos + query_lower.len() + half_len).min(content.len());
let slice = &content[start..end];
let cleaned: String = slice
.chars()
.map(|c| if c.is_whitespace() { ' ' } else { c })
.collect();
let trimmed = cleaned.trim();
let prefix = if start > 0 { "..." } else { "" };
let suffix = if end < content.len() { "..." } else { "" };
format!("{}{}{}", prefix, trimmed, suffix)
} else {
String::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_session_new() {
let session = Session::new();
assert!(!session.id().is_empty());
assert_eq!(session.token_count(), 0);
assert_eq!(session.compaction_count(), 0);
}
#[test]
fn test_message_usage_from() {
let usage = Usage {
input_tokens: 100,
output_tokens: 50,
};
let msg_usage = MessageUsage::from(&usage);
assert_eq!(msg_usage.input, 100);
assert_eq!(msg_usage.output, 50);
assert_eq!(msg_usage.total_tokens, 150);
}
}