use crate::{AgentEvent, AgentStep, model::HistoryEntry};
use crabllm_core::Usage;
use serde::{Deserialize, Serialize};
use std::{
fs::{self, OpenOptions},
io::{BufRead, BufReader, Write},
path::{Path, PathBuf},
time::Instant,
};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ConversationMeta {
pub agent: String,
pub created_by: String,
pub created_at: String,
#[serde(default)]
pub title: String,
#[serde(default)]
pub uptime_secs: u64,
}
#[derive(Serialize, Deserialize)]
#[serde(untagged)]
enum ConversationLine {
Compact {
compact: String,
#[serde(default, skip_serializing_if = "String::is_empty")]
title: String,
#[serde(default, skip_serializing_if = "String::is_empty")]
archived_at: String,
},
Event(EventLine),
Entry(HistoryEntry),
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "event", rename_all = "snake_case")]
pub enum EventLine {
ToolStart {
calls: Vec<ToolCallTrace>,
ts: String,
},
ToolResult {
call_id: String,
duration_ms: u64,
ts: String,
},
Done {
model: String,
iterations: usize,
stop_reason: String,
usage: Usage,
ts: String,
},
UserSteered { content: String, ts: String },
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ToolCallTrace {
pub id: String,
pub name: String,
#[serde(default, skip_serializing_if = "String::is_empty")]
pub arguments: String,
}
impl EventLine {
pub fn from_agent_event(event: &AgentEvent) -> Option<Self> {
let ts = chrono::Utc::now().to_rfc3339();
match event {
AgentEvent::ToolCallsStart(calls) => Some(Self::ToolStart {
calls: calls
.iter()
.map(|c| ToolCallTrace {
id: c.id.clone(),
name: c.function.name.to_string(),
arguments: c.function.arguments.clone(),
})
.collect(),
ts,
}),
AgentEvent::ToolResult {
call_id,
duration_ms,
..
} => Some(Self::ToolResult {
call_id: call_id.clone(),
duration_ms: *duration_ms,
ts,
}),
AgentEvent::Done(resp) => Some(Self::Done {
model: resp.model.clone(),
iterations: resp.iterations,
stop_reason: resp.stop_reason.to_string(),
usage: sum_step_usage(&resp.steps),
ts,
}),
AgentEvent::UserSteered { content } => Some(Self::UserSteered {
content: content.clone(),
ts,
}),
_ => None,
}
}
}
fn sum_step_usage(steps: &[AgentStep]) -> Usage {
steps.iter().fold(Usage::default(), |mut acc, step| {
let u = &step.usage;
acc.prompt_tokens += u.prompt_tokens;
acc.completion_tokens += u.completion_tokens;
acc.total_tokens += u.total_tokens;
if let Some(v) = u.prompt_cache_hit_tokens {
*acc.prompt_cache_hit_tokens.get_or_insert(0) += v;
}
if let Some(v) = u.prompt_cache_miss_tokens {
*acc.prompt_cache_miss_tokens.get_or_insert(0) += v;
}
acc
})
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ArchiveSegment {
pub title: String,
pub summary: String,
pub archived_at: String,
}
#[derive(Debug, Clone)]
pub struct Conversation {
pub id: u64,
pub agent: String,
pub history: Vec<HistoryEntry>,
pub created_by: String,
pub title: String,
pub uptime_secs: u64,
pub created_at: Instant,
pub file_path: Option<PathBuf>,
}
impl Conversation {
pub fn new(id: u64, agent: impl Into<String>, created_by: impl Into<String>) -> Self {
Self {
id,
agent: agent.into(),
history: Vec::new(),
created_by: created_by.into(),
title: String::new(),
uptime_secs: 0,
created_at: Instant::now(),
file_path: None,
}
}
pub fn ensure_file(&mut self) {
if self.file_path.is_some() {
return;
}
self.init_file(&crate::paths::CONVERSATIONS_DIR);
}
pub fn init_file(&mut self, conversations_dir: &Path) {
let _ = fs::create_dir_all(conversations_dir);
let slug = sender_slug(&self.created_by);
let prefix = format!("{}_{slug}_", self.agent);
let seq = next_seq(conversations_dir, &prefix);
let filename = format!("{prefix}{seq}.jsonl");
let path = conversations_dir.join(filename);
let meta = ConversationMeta {
agent: self.agent.clone(),
created_by: self.created_by.clone(),
created_at: chrono::Utc::now().to_rfc3339(),
title: String::new(),
uptime_secs: self.uptime_secs,
};
match OpenOptions::new()
.create(true)
.truncate(true)
.write(true)
.open(&path)
{
Ok(mut f) => {
if let Ok(json) = serde_json::to_string(&meta) {
let _ = writeln!(f, "{json}");
}
self.file_path = Some(path);
}
Err(e) => tracing::warn!("failed to create conversation file: {e}"),
}
}
pub fn set_title(&mut self, title: &str) {
self.title = title.to_string();
let Some(ref old_path) = self.file_path else {
return;
};
self.rewrite_meta();
let title_slug = sender_slug(title);
if title_slug.is_empty() {
return;
}
let old_name = old_path.file_stem().and_then(|s| s.to_str()).unwrap_or("");
let new_name = format!("{old_name}_{title_slug}.jsonl");
let new_path = old_path.with_file_name(new_name);
if fs::rename(old_path, &new_path).is_ok() {
self.file_path = Some(new_path);
}
}
pub fn rewrite_meta(&self) {
let Some(ref path) = self.file_path else {
return;
};
let Ok(content) = fs::read_to_string(path) else {
return;
};
let meta = ConversationMeta {
agent: self.agent.clone(),
created_by: self.created_by.clone(),
created_at: chrono::Utc::now().to_rfc3339(),
title: self.title.clone(),
uptime_secs: self.uptime_secs,
};
let Ok(meta_json) = serde_json::to_string(&meta) else {
return;
};
let rest = content.find('\n').map(|i| &content[i..]).unwrap_or("");
let new_content = format!("{meta_json}{rest}");
let _ = fs::write(path, new_content);
}
pub fn append_messages(&self, entries: &[HistoryEntry]) {
let Some(ref path) = self.file_path else {
return;
};
let mut file = match OpenOptions::new().append(true).open(path) {
Ok(f) => f,
Err(e) => {
tracing::warn!("failed to open conversation file for append: {e}");
return;
}
};
for entry in entries {
if entry.auto_injected {
continue;
}
if let Ok(json) = serde_json::to_string(entry) {
let _ = writeln!(file, "{json}");
}
}
}
pub fn append_events(&self, events: &[EventLine]) {
if events.is_empty() {
return;
}
let Some(ref path) = self.file_path else {
return;
};
let mut file = match OpenOptions::new().append(true).open(path) {
Ok(f) => f,
Err(e) => {
tracing::warn!("failed to open conversation file for events: {e}");
return;
}
};
for event in events {
let line = ConversationLine::Event(event.clone());
if let Ok(json) = serde_json::to_string(&line) {
let _ = writeln!(file, "{json}");
}
}
}
pub fn append_compact(&self, summary: &str) {
let Some(ref path) = self.file_path else {
return;
};
let mut file = match OpenOptions::new().append(true).open(path) {
Ok(f) => f,
Err(e) => {
tracing::warn!("failed to open conversation file for compact: {e}");
return;
}
};
let line = ConversationLine::Compact {
compact: summary.to_string(),
title: compact_title(summary),
archived_at: chrono::Utc::now().to_rfc3339(),
};
if let Ok(json) = serde_json::to_string(&line) {
let _ = writeln!(file, "{json}");
}
}
pub fn load_context(path: &Path) -> anyhow::Result<(ConversationMeta, Vec<HistoryEntry>)> {
let file = fs::File::open(path)?;
let reader = BufReader::new(file);
let mut lines = reader.lines();
let meta_line = lines
.next()
.ok_or_else(|| anyhow::anyhow!("empty conversation file"))??;
let meta: ConversationMeta = serde_json::from_str(&meta_line)?;
let mut all_lines: Vec<String> = Vec::new();
let mut last_compact_idx: Option<usize> = None;
for line in lines {
let line = line?;
if line.trim().is_empty() {
continue;
}
if line.contains("\"compact\"")
&& let Ok(ConversationLine::Compact { .. }) = serde_json::from_str(&line)
{
last_compact_idx = Some(all_lines.len());
}
all_lines.push(line);
}
let context_start = last_compact_idx.unwrap_or_default();
let mut entries = Vec::new();
for (i, line) in all_lines[context_start..].iter().enumerate() {
if i == 0
&& last_compact_idx.is_some()
&& let Ok(ConversationLine::Compact { compact, .. }) = serde_json::from_str(line)
{
entries.push(HistoryEntry::user(&compact));
continue;
}
match serde_json::from_str::<ConversationLine>(line) {
Ok(ConversationLine::Entry(entry)) => entries.push(entry),
Ok(ConversationLine::Event(_) | ConversationLine::Compact { .. }) => {}
Err(e) => tracing::warn!("skipping unparsable conversation line: {e}"),
}
}
Ok((meta, entries))
}
pub fn load_archives(path: &Path) -> anyhow::Result<Vec<ArchiveSegment>> {
let file = fs::File::open(path)?;
let reader = BufReader::new(file);
let mut archives = Vec::new();
for line in reader.lines().skip(1) {
let line = line?;
if line.contains("\"compact\"")
&& let Ok(ConversationLine::Compact {
compact,
title,
archived_at,
}) = serde_json::from_str(&line)
{
archives.push(ArchiveSegment {
title,
summary: compact,
archived_at,
});
}
}
Ok(archives)
}
}
pub fn find_latest_conversation(
conversations_dir: &Path,
agent: &str,
created_by: &str,
) -> Option<PathBuf> {
let slug = sender_slug(created_by);
let prefix = format!("{agent}_{slug}_");
let mut best: Option<(u32, PathBuf)> = None;
for entry in fs::read_dir(conversations_dir).ok()?.flatten() {
let path = entry.path();
if path.is_dir() {
continue;
}
let name = path.file_name()?.to_str()?;
if !name.starts_with(&prefix) || !name.ends_with(".jsonl") {
continue;
}
let after_prefix = &name[prefix.len()..];
let seq_str = after_prefix.split(|c: char| !c.is_ascii_digit()).next()?;
let seq: u32 = seq_str.parse().ok()?;
if best.as_ref().is_none_or(|(best_seq, _)| seq > *best_seq) {
best = Some((seq, path));
}
}
best.map(|(_, path)| path)
}
fn next_seq(dir: &Path, prefix: &str) -> u32 {
let max = fs::read_dir(dir)
.ok()
.into_iter()
.flatten()
.flatten()
.filter_map(|e| {
let name = e.file_name();
let name = name.to_str()?;
if !name.starts_with(prefix) || !name.ends_with(".jsonl") {
return None;
}
let after_prefix = &name[prefix.len()..];
let seq_str = after_prefix.split(|c: char| !c.is_ascii_digit()).next()?;
seq_str.parse::<u32>().ok()
})
.max()
.unwrap_or(0);
max + 1
}
fn compact_title(summary: &str) -> String {
let end = summary
.find(['.', '!', '?'])
.map(|i| i + 1)
.unwrap_or(summary.len())
.min(60);
let title = summary[..summary.floor_char_boundary(end)].trim();
title.to_string()
}
pub fn sender_slug(s: &str) -> String {
s.chars()
.map(|c| {
if c.is_alphanumeric() || c == '-' {
c.to_ascii_lowercase()
} else {
'-'
}
})
.collect::<String>()
.split('-')
.filter(|s| !s.is_empty())
.collect::<Vec<_>>()
.join("-")
}