use anyhow::{Context, Result};
use chrono::{DateTime, Datelike, NaiveDate, Utc};
use frankensqlite::compat::{ConnectionExt, RowExt};
use frankensqlite::{Connection, Row};
use serde::{Deserialize, Serialize};
use std::collections::{BTreeMap, HashMap, HashSet};
use std::path::Path;
use std::time::Instant;
use tracing::info;
const STOP_WORDS: &[&str] = &[
"the", "a", "an", "and", "or", "but", "in", "on", "at", "to", "for", "of", "with", "by",
"from", "is", "it", "as", "was", "be", "are", "been", "being", "have", "has", "had", "do",
"does", "did", "will", "would", "could", "should", "may", "might", "must", "shall", "can",
"need", "this", "that", "these", "those", "i", "you", "he", "she", "we", "they", "what",
"which", "who", "when", "where", "why", "how", "all", "each", "every", "both", "few", "more",
"most", "other", "some", "such", "no", "nor", "not", "only", "own", "same", "so", "than",
"too", "very", "just", "also", "now", "here", "there", "then", "once", "about", "after",
"again", "into", "over", "under", "out", "up", "down", "off", "any", "its", "your", "my",
"our", "their", "his", "her", "him", "them", "me", "us", "if", "else", "while", "during",
"before",
];
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Statistics {
pub total_conversations: usize,
pub total_messages: usize,
pub total_characters: usize,
pub agents: BTreeMap<String, AgentStats>,
pub roles: BTreeMap<String, usize>,
pub time_range: TimeRange,
pub computed_at: String,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct AgentStats {
pub conversations: usize,
pub messages: usize,
}
impl Statistics {
pub fn from_packets(packets: &[crate::model::conversation_packet::ConversationPacket]) -> Self {
let mut total_messages: usize = 0;
let mut total_characters: usize = 0;
let mut agents: BTreeMap<String, AgentStats> = BTreeMap::new();
let mut roles: BTreeMap<String, usize> = BTreeMap::new();
let mut earliest_started_at: Option<i64> = None;
let mut latest_started_at: Option<i64> = None;
for packet in packets {
let payload = &packet.payload;
let agent_slug = payload.identity.agent_slug.clone();
let agent_entry = agents.entry(agent_slug).or_insert(AgentStats {
conversations: 0,
messages: 0,
});
agent_entry.conversations = agent_entry.conversations.saturating_add(1);
let conv_message_count = payload.messages.len();
total_messages = total_messages.saturating_add(conv_message_count);
agent_entry.messages = agent_entry.messages.saturating_add(conv_message_count);
for message in &payload.messages {
total_characters = total_characters.saturating_add(message.content.chars().count());
}
for message in &payload.messages {
let role = if message.role == "assistant" {
"agent"
} else {
message.role.as_str()
};
*roles.entry(role.to_string()).or_insert(0) += 1;
}
if let Some(started_at) = payload.timestamps.started_at {
earliest_started_at = Some(match earliest_started_at {
Some(current) => current.min(started_at),
None => started_at,
});
latest_started_at = Some(match latest_started_at {
Some(current) => current.max(started_at),
None => started_at,
});
}
}
Self {
total_conversations: packets.len(),
total_messages,
total_characters,
agents,
roles,
time_range: TimeRange {
earliest: earliest_started_at
.and_then(DateTime::from_timestamp_millis)
.map(|dt| dt.to_rfc3339()),
latest: latest_started_at
.and_then(DateTime::from_timestamp_millis)
.map(|dt| dt.to_rfc3339()),
},
computed_at: Utc::now().to_rfc3339(),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TimeRange {
pub earliest: Option<String>,
pub latest: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Timeline {
pub daily: Vec<DailyEntry>,
pub weekly: Vec<WeeklyEntry>,
pub monthly: Vec<MonthlyEntry>,
pub by_agent: BTreeMap<String, AgentTimeline>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AgentTimeline {
pub daily: Vec<DailyEntry>,
pub weekly: Vec<WeeklyEntry>,
pub monthly: Vec<MonthlyEntry>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DailyEntry {
pub date: String,
pub messages: usize,
pub conversations: usize,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WeeklyEntry {
pub week: String,
pub messages: usize,
pub conversations: usize,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MonthlyEntry {
pub month: String,
pub messages: usize,
pub conversations: usize,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WorkspaceSummary {
pub workspaces: Vec<WorkspaceEntry>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WorkspaceEntry {
pub path: String,
pub display_name: String,
pub conversations: usize,
pub messages: usize,
pub agents: Vec<String>,
pub date_range: TimeRange,
pub recent_titles: Vec<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AgentSummary {
pub agents: Vec<AgentEntry>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AgentEntry {
pub name: String,
pub conversations: usize,
pub messages: usize,
pub workspaces: Vec<String>,
pub date_range: TimeRange,
pub avg_messages_per_conversation: f64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TopTerms {
pub terms: Vec<(String, usize)>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AnalyticsBundle {
pub statistics: Statistics,
pub timeline: Timeline,
pub workspace_summary: WorkspaceSummary,
pub agent_summary: AgentSummary,
pub top_terms: TopTerms,
}
impl AnalyticsBundle {
pub fn write_to_dir(&self, dir: &Path) -> Result<()> {
std::fs::create_dir_all(dir).context("Failed to create analytics directory")?;
let stats_path = dir.join("statistics.json");
let stats_json = serde_json::to_string_pretty(&self.statistics)
.context("Failed to serialize statistics")?;
crate::pages::write_file_durably(&stats_path, stats_json.as_bytes())
.context("Failed to write statistics.json")?;
let timeline_path = dir.join("timeline.json");
let timeline_json =
serde_json::to_string_pretty(&self.timeline).context("Failed to serialize timeline")?;
crate::pages::write_file_durably(&timeline_path, timeline_json.as_bytes())
.context("Failed to write timeline.json")?;
let workspace_path = dir.join("workspace_summary.json");
let workspace_json = serde_json::to_string_pretty(&self.workspace_summary)
.context("Failed to serialize workspace_summary")?;
crate::pages::write_file_durably(&workspace_path, workspace_json.as_bytes())
.context("Failed to write workspace_summary.json")?;
let agent_path = dir.join("agent_summary.json");
let agent_json = serde_json::to_string_pretty(&self.agent_summary)
.context("Failed to serialize agent_summary")?;
crate::pages::write_file_durably(&agent_path, agent_json.as_bytes())
.context("Failed to write agent_summary.json")?;
let terms_path = dir.join("top_terms.json");
let terms_json = serde_json::to_string_pretty(&self.top_terms)
.context("Failed to serialize top_terms")?;
crate::pages::write_file_durably(&terms_path, terms_json.as_bytes())
.context("Failed to write top_terms.json")?;
info!(
"Analytics written to {:?}: statistics.json, timeline.json, workspace_summary.json, agent_summary.json, top_terms.json",
dir
);
Ok(())
}
}
pub struct AnalyticsGenerator<'a> {
db: &'a Connection,
}
impl<'a> AnalyticsGenerator<'a> {
pub fn new(db: &'a Connection) -> Self {
Self { db }
}
pub fn generate_all(&self) -> Result<AnalyticsBundle> {
info!("Generating pre-computed analytics...");
let statistics = self.generate_statistics()?;
let timeline = self.generate_timeline()?;
let workspace_summary = self.generate_workspace_summary()?;
let agent_summary = self.generate_agent_summary()?;
let top_terms = self.generate_top_terms()?;
Ok(AnalyticsBundle {
statistics,
timeline,
workspace_summary,
agent_summary,
top_terms,
})
}
fn generate_statistics(&self) -> Result<Statistics> {
info!("Generating statistics...");
let total_conversations: i64 = self
.db
.query_row_map("SELECT COUNT(*) FROM conversations", &[], |row: &Row| {
row.get_typed(0)
})
.context("Failed to count conversations")?;
let total_messages: i64 = self
.db
.query_row_map("SELECT COUNT(*) FROM messages", &[], |row: &Row| {
row.get_typed(0)
})
.context("Failed to count messages")?;
let total_characters: i64 = self
.db
.query_row_map(
"SELECT SUM(LENGTH(content)) FROM messages",
&[],
|row: &Row| row.get_typed::<Option<i64>>(0),
)
.context("Failed to sum content lengths")?
.unwrap_or(0);
let mut agents: BTreeMap<String, AgentStats> = BTreeMap::new();
let agent_conv_rows: Vec<(String, i64)> = self.db.query_map_collect(
"SELECT agent, COUNT(*) as conv_count FROM conversations GROUP BY agent",
&[],
|row: &Row| Ok((row.get_typed::<String>(0)?, row.get_typed::<i64>(1)?)),
)?;
for (agent, conv_count) in agent_conv_rows {
agents.insert(
agent.clone(),
AgentStats {
conversations: conv_count as usize,
messages: 0, },
);
}
let msg_rows: Vec<(String, i64)> = self.db.query_map_collect(
"SELECT c.agent, COUNT(m.id) FROM messages m
JOIN conversations c ON m.conversation_id = c.id
GROUP BY c.agent",
&[],
|row: &Row| Ok((row.get_typed::<String>(0)?, row.get_typed::<i64>(1)?)),
)?;
for (agent, msg_count) in msg_rows {
if let Some(stats) = agents.get_mut(&agent) {
stats.messages = msg_count as usize;
}
}
let mut roles: BTreeMap<String, usize> = BTreeMap::new();
let role_rows: Vec<(String, i64)> = self.db.query_map_collect(
"SELECT role, COUNT(*) FROM messages GROUP BY role",
&[],
|row: &Row| Ok((row.get_typed::<String>(0)?, row.get_typed::<i64>(1)?)),
)?;
for (role, count) in role_rows {
roles.insert(role, count as usize);
}
let time_range: (Option<i64>, Option<i64>) = self
.db
.query_row_map(
"SELECT MIN(started_at), MAX(started_at) FROM conversations",
&[],
|row: &Row| Ok((row.get_typed(0)?, row.get_typed(1)?)),
)
.context("Failed to get time range")?;
Ok(Statistics {
total_conversations: total_conversations as usize,
total_messages: total_messages as usize,
total_characters: total_characters as usize,
agents,
roles,
time_range: TimeRange {
earliest: time_range
.0
.and_then(DateTime::from_timestamp_millis)
.map(|dt| dt.to_rfc3339()),
latest: time_range
.1
.and_then(DateTime::from_timestamp_millis)
.map(|dt| dt.to_rfc3339()),
},
computed_at: Utc::now().to_rfc3339(),
})
}
fn generate_timeline(&self) -> Result<Timeline> {
info!("Generating timeline...");
let mut daily_map: HashMap<String, DailyEntry> = HashMap::new();
let mut daily_conv_ids: HashMap<String, HashSet<i64>> = HashMap::new();
let timeline_rows: Vec<(Option<String>, i64)> = self.db.query_map_collect(
"SELECT DATE(m.created_at/1000, 'unixepoch') as date, m.conversation_id
FROM messages m
WHERE m.created_at IS NOT NULL
ORDER BY date",
&[],
|row: &Row| {
Ok((
row.get_typed::<Option<String>>(0)?,
row.get_typed::<i64>(1)?,
))
},
)?;
for (date_opt, conv_id) in timeline_rows {
if let Some(date) = date_opt {
let entry = daily_map.entry(date.clone()).or_insert(DailyEntry {
date: date.clone(),
messages: 0,
conversations: 0,
});
entry.messages += 1;
daily_conv_ids.entry(date).or_default().insert(conv_id);
}
}
for (date, conv_ids) in &daily_conv_ids {
if let Some(entry) = daily_map.get_mut(date) {
entry.conversations = conv_ids.len();
}
}
let mut daily: Vec<DailyEntry> = daily_map.into_values().collect();
daily.sort_by(|a, b| a.date.cmp(&b.date));
let weekly = aggregate_to_weekly(&daily);
let monthly = aggregate_to_monthly(&daily);
let mut by_agent: BTreeMap<String, AgentTimeline> = BTreeMap::new();
let mut agent_daily_map: HashMap<String, HashMap<String, DailyEntry>> = HashMap::new();
let mut agent_daily_conv_ids: HashMap<String, HashMap<String, HashSet<i64>>> =
HashMap::new();
let agent_timeline_rows: Vec<(Option<String>, String, i64)> = self.db.query_map_collect(
"SELECT DATE(m.created_at/1000, 'unixepoch') as date, c.agent, m.conversation_id
FROM messages m
JOIN conversations c ON m.conversation_id = c.id
WHERE m.created_at IS NOT NULL
ORDER BY date",
&[],
|row: &Row| {
Ok((
row.get_typed::<Option<String>>(0)?,
row.get_typed::<String>(1)?,
row.get_typed::<i64>(2)?,
))
},
)?;
for (date_opt, agent, conv_id) in agent_timeline_rows {
if let Some(date) = date_opt {
let agent_map = agent_daily_map.entry(agent.clone()).or_default();
let entry = agent_map.entry(date.clone()).or_insert(DailyEntry {
date: date.clone(),
messages: 0,
conversations: 0,
});
entry.messages += 1;
agent_daily_conv_ids
.entry(agent)
.or_default()
.entry(date)
.or_default()
.insert(conv_id);
}
}
for (agent, conv_ids_map) in &agent_daily_conv_ids {
if let Some(daily_map) = agent_daily_map.get_mut(agent) {
for (date, conv_ids) in conv_ids_map {
if let Some(entry) = daily_map.get_mut(date) {
entry.conversations = conv_ids.len();
}
}
}
}
for (agent, daily_map) in agent_daily_map {
let mut agent_daily: Vec<DailyEntry> = daily_map.into_values().collect();
agent_daily.sort_by(|a, b| a.date.cmp(&b.date));
let agent_weekly = aggregate_to_weekly(&agent_daily);
let agent_monthly = aggregate_to_monthly(&agent_daily);
by_agent.insert(
agent,
AgentTimeline {
daily: agent_daily,
weekly: agent_weekly,
monthly: agent_monthly,
},
);
}
Ok(Timeline {
daily,
weekly,
monthly,
by_agent,
})
}
fn generate_workspace_summary(&self) -> Result<WorkspaceSummary> {
info!("Generating workspace summary...");
let started = Instant::now();
let mut workspaces: Vec<WorkspaceEntry> = Vec::new();
let workspace_rows: Vec<(String, i64, Option<i64>, Option<i64>)> =
self.db.query_map_collect(
"SELECT workspace, COUNT(*) as conv_count,
MIN(started_at), MAX(started_at)
FROM conversations
WHERE workspace IS NOT NULL
GROUP BY workspace
ORDER BY conv_count DESC",
&[],
|row: &Row| {
Ok((
row.get_typed::<String>(0)?,
row.get_typed::<i64>(1)?,
row.get_typed::<Option<i64>>(2)?,
row.get_typed::<Option<i64>>(3)?,
))
},
)?;
let mut messages_by_workspace: HashMap<String, i64> = HashMap::new();
let ws_msg_rows: Vec<(String, i64)> = self.db.query_map_collect(
"SELECT c.workspace, COUNT(m.id)
FROM conversations c
LEFT JOIN messages m ON m.conversation_id = c.id
WHERE c.workspace IS NOT NULL
GROUP BY c.workspace",
&[],
|row: &Row| Ok((row.get_typed::<String>(0)?, row.get_typed::<i64>(1)?)),
)?;
for (workspace, msg_count) in ws_msg_rows {
messages_by_workspace.insert(workspace, msg_count);
}
let mut agents_by_workspace: HashMap<String, Vec<String>> = HashMap::new();
let ws_agent_rows: Vec<(String, String)> = self.db.query_map_collect(
"SELECT workspace, agent
FROM conversations
WHERE workspace IS NOT NULL
GROUP BY workspace, agent
ORDER BY workspace, agent",
&[],
|row: &Row| Ok((row.get_typed::<String>(0)?, row.get_typed::<String>(1)?)),
)?;
for (workspace, agent) in ws_agent_rows {
agents_by_workspace
.entry(workspace)
.or_default()
.push(agent);
}
let mut recent_titles_by_workspace: HashMap<String, Vec<String>> = HashMap::new();
let ws_title_rows: Vec<(String, String)> = self.db.query_map_collect(
"SELECT workspace, title
FROM conversations
WHERE workspace IS NOT NULL AND title IS NOT NULL
ORDER BY workspace, started_at DESC",
&[],
|row: &Row| Ok((row.get_typed::<String>(0)?, row.get_typed::<String>(1)?)),
)?;
for (workspace, title) in ws_title_rows {
let titles = recent_titles_by_workspace.entry(workspace).or_default();
if titles.len() < 5 {
titles.push(title);
}
}
for (workspace, conv_count, min_ts, max_ts) in workspace_rows {
let msg_count = messages_by_workspace.get(&workspace).copied().unwrap_or(0);
let agents = agents_by_workspace.remove(&workspace).unwrap_or_default();
let recent_titles = recent_titles_by_workspace
.remove(&workspace)
.unwrap_or_default();
let display_name = Path::new(&workspace)
.file_name()
.map(|s| s.to_string_lossy().to_string())
.unwrap_or_else(|| workspace.clone());
workspaces.push(WorkspaceEntry {
path: workspace,
display_name,
conversations: conv_count as usize,
messages: msg_count as usize,
agents,
date_range: TimeRange {
earliest: min_ts
.and_then(DateTime::from_timestamp_millis)
.map(|dt| dt.to_rfc3339()),
latest: max_ts
.and_then(DateTime::from_timestamp_millis)
.map(|dt| dt.to_rfc3339()),
},
recent_titles,
});
}
info!(
query_count = 4,
workspace_rows = workspaces.len(),
elapsed_ms = started.elapsed().as_millis(),
"Workspace summary generated using set-based aggregation"
);
Ok(WorkspaceSummary { workspaces })
}
fn generate_agent_summary(&self) -> Result<AgentSummary> {
info!("Generating agent summary...");
let started = Instant::now();
let mut agents: Vec<AgentEntry> = Vec::new();
let agent_rows: Vec<(String, i64, Option<i64>, Option<i64>)> = self.db.query_map_collect(
"SELECT agent, COUNT(*) as conv_count,
MIN(started_at), MAX(started_at)
FROM conversations
GROUP BY agent
ORDER BY conv_count DESC",
&[],
|row: &Row| {
Ok((
row.get_typed::<String>(0)?,
row.get_typed::<i64>(1)?,
row.get_typed::<Option<i64>>(2)?,
row.get_typed::<Option<i64>>(3)?,
))
},
)?;
let mut messages_by_agent: HashMap<String, i64> = HashMap::new();
let agent_msg_rows: Vec<(String, i64)> = self.db.query_map_collect(
"SELECT c.agent, COUNT(m.id)
FROM conversations c
LEFT JOIN messages m ON m.conversation_id = c.id
GROUP BY c.agent",
&[],
|row: &Row| Ok((row.get_typed::<String>(0)?, row.get_typed::<i64>(1)?)),
)?;
for (agent, msg_count) in agent_msg_rows {
messages_by_agent.insert(agent, msg_count);
}
let mut workspaces_by_agent: HashMap<String, Vec<String>> = HashMap::new();
let agent_ws_rows: Vec<(String, String)> = self.db.query_map_collect(
"SELECT agent, workspace
FROM conversations
WHERE workspace IS NOT NULL
GROUP BY agent, workspace
ORDER BY agent, workspace",
&[],
|row: &Row| Ok((row.get_typed::<String>(0)?, row.get_typed::<String>(1)?)),
)?;
for (agent, workspace) in agent_ws_rows {
workspaces_by_agent
.entry(agent)
.or_default()
.push(workspace);
}
for (agent, conv_count, min_ts, max_ts) in agent_rows {
let msg_count = messages_by_agent.get(&agent).copied().unwrap_or(0);
let workspaces = workspaces_by_agent.remove(&agent).unwrap_or_default();
let avg_messages = if conv_count > 0 {
msg_count as f64 / conv_count as f64
} else {
0.0
};
agents.push(AgentEntry {
name: agent,
conversations: conv_count as usize,
messages: msg_count as usize,
workspaces,
date_range: TimeRange {
earliest: min_ts
.and_then(DateTime::from_timestamp_millis)
.map(|dt| dt.to_rfc3339()),
latest: max_ts
.and_then(DateTime::from_timestamp_millis)
.map(|dt| dt.to_rfc3339()),
},
avg_messages_per_conversation: avg_messages,
});
}
info!(
query_count = 3,
agent_rows = agents.len(),
elapsed_ms = started.elapsed().as_millis(),
"Agent summary generated using set-based aggregation"
);
Ok(AgentSummary { agents })
}
fn generate_top_terms(&self) -> Result<TopTerms> {
info!("Generating top terms...");
let stop_words: HashSet<&str> = STOP_WORDS.iter().copied().collect();
let titles: Vec<String> = self.db.query_map_collect(
"SELECT title FROM conversations WHERE title IS NOT NULL",
&[],
|row: &Row| row.get_typed::<String>(0),
)?;
let mut term_counts: HashMap<String, usize> = HashMap::new();
for title in titles {
for word in title.split_whitespace() {
let word: String = word
.chars()
.filter(|c| c.is_alphanumeric() || *c == '_' || *c == '-')
.collect::<String>()
.to_lowercase();
if word.len() >= 3 && !stop_words.contains(word.as_str()) {
*term_counts.entry(word).or_insert(0) += 1;
}
}
}
let mut top: Vec<(String, usize)> = term_counts.into_iter().collect();
top.sort_by_key(|entry| std::cmp::Reverse(entry.1));
top.truncate(100);
Ok(TopTerms { terms: top })
}
}
pub fn aggregate_to_weekly(daily: &[DailyEntry]) -> Vec<WeeklyEntry> {
let mut weekly_map: HashMap<String, WeeklyEntry> = HashMap::new();
for entry in daily {
if let Ok(date) = NaiveDate::parse_from_str(&entry.date, "%Y-%m-%d") {
let iso_week = date.iso_week();
let week_str = format!("{}-W{:02}", iso_week.year(), iso_week.week());
let weekly = weekly_map.entry(week_str.clone()).or_insert(WeeklyEntry {
week: week_str,
messages: 0,
conversations: 0,
});
weekly.messages += entry.messages;
weekly.conversations += entry.conversations;
}
}
let mut result: Vec<WeeklyEntry> = weekly_map.into_values().collect();
result.sort_by(|a, b| a.week.cmp(&b.week));
result
}
pub fn aggregate_to_monthly(daily: &[DailyEntry]) -> Vec<MonthlyEntry> {
let mut monthly_map: HashMap<String, MonthlyEntry> = HashMap::new();
for entry in daily {
if entry.date.len() >= 7 {
let month_str = entry.date[..7].to_string();
let monthly = monthly_map
.entry(month_str.clone())
.or_insert(MonthlyEntry {
month: month_str,
messages: 0,
conversations: 0,
});
monthly.messages += entry.messages;
monthly.conversations += entry.conversations;
}
}
let mut result: Vec<MonthlyEntry> = monthly_map.into_values().collect();
result.sort_by(|a, b| a.month.cmp(&b.month));
result
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::TempDir;
fn create_test_db() -> (TempDir, Connection) {
let dir = TempDir::new().unwrap();
let db_path = dir.path().join("test.db");
let conn = Connection::open(db_path.to_string_lossy().as_ref()).unwrap();
conn.execute_batch(
"CREATE TABLE conversations (
id INTEGER PRIMARY KEY,
agent TEXT NOT NULL,
workspace TEXT,
title TEXT,
source_path TEXT NOT NULL,
started_at INTEGER,
ended_at INTEGER,
message_count INTEGER,
metadata_json TEXT
);
CREATE TABLE messages (
id INTEGER PRIMARY KEY,
conversation_id INTEGER NOT NULL,
idx INTEGER NOT NULL,
role TEXT NOT NULL,
content TEXT NOT NULL,
created_at INTEGER,
FOREIGN KEY (conversation_id) REFERENCES conversations(id)
);",
)
.unwrap();
(dir, conn)
}
fn insert_test_data(conn: &Connection) {
conn.execute(
"INSERT INTO conversations (id, agent, workspace, title, source_path, started_at, message_count)
VALUES (1, 'claude-code', '/home/user/project-a', 'Debug authentication flow', '/path/a.jsonl', 1700000000000, 5)",
).unwrap();
conn.execute(
"INSERT INTO conversations (id, agent, workspace, title, source_path, started_at, message_count)
VALUES (2, 'claude-code', '/home/user/project-a', 'Fix database connection', '/path/b.jsonl', 1700100000000, 3)",
).unwrap();
conn.execute(
"INSERT INTO conversations (id, agent, workspace, title, source_path, started_at, message_count)
VALUES (3, 'codex', '/home/user/project-b', 'Add user authentication', '/path/c.jsonl', 1700200000000, 4)",
).unwrap();
for conv_id in 1..=3 {
let msg_count = match conv_id {
1 => 5,
2 => 3,
3 => 4,
_ => 0,
};
for idx in 0..msg_count {
let role = if conv_id == 3 && idx == 3 {
"narrator"
} else if idx % 2 == 0 {
"user"
} else {
"agent"
};
let created_at =
1700000000000i64 + (conv_id as i64 * 100000000) + (idx as i64 * 1000);
let content = if conv_id == 3 && idx == 1 {
format!("Message {} for conv {} with caf\u{00e9}", idx, conv_id)
} else {
format!("Message {} for conv {}", idx, conv_id)
};
conn.execute_compat(
"INSERT INTO messages (conversation_id, idx, role, content, created_at)
VALUES (?1, ?2, ?3, ?4, ?5)",
frankensqlite::params![
conv_id as i64,
idx as i64,
role,
content.as_str(),
created_at
],
)
.unwrap();
}
}
}
#[test]
fn test_statistics_generation() {
let (_dir, conn) = create_test_db();
insert_test_data(&conn);
let generator = AnalyticsGenerator::new(&conn);
let stats = generator.generate_statistics().unwrap();
assert_eq!(stats.total_conversations, 3);
assert_eq!(stats.total_messages, 12); assert!(stats.agents.contains_key("claude-code"));
assert!(stats.agents.contains_key("codex"));
assert_eq!(stats.agents["claude-code"].conversations, 2);
assert_eq!(stats.agents["codex"].conversations, 1);
}
#[test]
fn analytics_statistics_from_packets_matches_sql_for_canonical_corpus() {
use crate::model::conversation_packet::{
ConversationPacket, ConversationPacketMessage, ConversationPacketProvenance,
};
use serde_json::Value;
let (_dir, conn) = create_test_db();
insert_test_data(&conn);
let sql_stats = AnalyticsGenerator::new(&conn)
.generate_statistics()
.unwrap();
let mut packets: Vec<ConversationPacket> = Vec::new();
let conv_rows: Vec<(i64, String, Option<String>, Option<i64>)> = conn
.query_map_collect(
"SELECT id, agent, source_path, started_at FROM conversations ORDER BY id ASC",
&[],
|row: &Row| {
Ok((
row.get_typed::<i64>(0)?,
row.get_typed::<String>(1)?,
row.get_typed::<Option<String>>(2)?,
row.get_typed::<Option<i64>>(3)?,
))
},
)
.unwrap();
for (conv_id, agent, source_path, started_at) in conv_rows {
let msg_rows: Vec<(i64, String, String, Option<i64>)> = conn
.query_map_collect(
"SELECT idx, role, content, created_at
FROM messages
WHERE conversation_id = ?1
ORDER BY idx ASC",
&[frankensqlite::compat::ParamValue::from(conv_id)],
|row: &Row| {
Ok((
row.get_typed::<i64>(0)?,
row.get_typed::<String>(1)?,
row.get_typed::<String>(2)?,
row.get_typed::<Option<i64>>(3)?,
))
},
)
.unwrap();
use crate::model::types::{
Conversation, Message, MessageRole, Snippet as CanonicalSnippet,
};
let _ = CanonicalSnippet {
id: None,
file_path: None,
start_line: None,
end_line: None,
language: None,
snippet_text: None,
};
let canonical = Conversation {
id: Some(conv_id),
agent_slug: agent.clone(),
workspace: None,
external_id: None,
title: None,
source_path: source_path
.map(std::path::PathBuf::from)
.unwrap_or_else(|| std::path::PathBuf::from(format!("/tmp/conv-{conv_id}"))),
started_at,
ended_at: None,
approx_tokens: None,
metadata_json: Value::Null,
source_id: "local".to_string(),
origin_host: None,
messages: msg_rows
.into_iter()
.map(|(idx, role, content, created_at)| Message {
id: None,
idx,
role: match role.as_str() {
"user" => MessageRole::User,
"agent" | "assistant" => MessageRole::Agent,
"tool" => MessageRole::Tool,
"system" => MessageRole::System,
other => MessageRole::Other(other.to_string()),
},
author: None,
created_at,
content,
extra_json: Value::Null,
snippets: Vec::new(),
})
.collect(),
};
packets.push(ConversationPacket::from_canonical_replay(
&canonical,
ConversationPacketProvenance::local(),
));
for msg in &packets.last().unwrap().payload.messages {
let _: &ConversationPacketMessage = msg;
}
}
let mut packet_stats = Statistics::from_packets(&packets);
packet_stats.computed_at = sql_stats.computed_at.clone();
assert_eq!(
packet_stats.total_conversations, sql_stats.total_conversations,
"packet path total_conversations must match SQL path"
);
assert_eq!(
packet_stats.total_messages, sql_stats.total_messages,
"packet path total_messages must match SQL path (12 = 5+3+4)"
);
assert_eq!(
packet_stats.total_characters, sql_stats.total_characters,
"packet path total_characters must match SUM(LENGTH(content))"
);
assert_eq!(
packet_stats.agents, sql_stats.agents,
"per-agent (conversations, messages) buckets must match"
);
assert_eq!(
packet_stats.roles, sql_stats.roles,
"role-count buckets must agree (user/assistant)"
);
assert_eq!(
packet_stats.time_range.earliest, sql_stats.time_range.earliest,
"earliest started_at must round-trip identically through DateTime::from_timestamp_millis"
);
assert_eq!(
packet_stats.time_range.latest, sql_stats.time_range.latest,
"latest started_at must round-trip identically"
);
let sql_json = serde_json::to_string(&sql_stats).unwrap();
let packet_json = serde_json::to_string(&packet_stats).unwrap();
assert_eq!(
sql_json, packet_json,
"SQL-driven and packet-driven Statistics must serialize identically"
);
}
#[test]
fn test_timeline_aggregation() {
let daily = vec![
DailyEntry {
date: "2024-01-01".into(),
messages: 10,
conversations: 1,
},
DailyEntry {
date: "2024-01-02".into(),
messages: 20,
conversations: 2,
},
DailyEntry {
date: "2024-01-08".into(),
messages: 15,
conversations: 1,
},
];
let weekly = aggregate_to_weekly(&daily);
assert_eq!(weekly.len(), 2);
let monthly = aggregate_to_monthly(&daily);
assert_eq!(monthly.len(), 1);
assert_eq!(monthly[0].messages, 45); }
#[test]
fn test_top_terms_extraction() {
let (_dir, conn) = create_test_db();
insert_test_data(&conn);
let generator = AnalyticsGenerator::new(&conn);
let top = generator.generate_top_terms().unwrap();
assert!(
top.terms
.iter()
.any(|(term, count)| term == "authentication" && *count >= 2)
);
}
#[test]
fn test_workspace_summary() {
let (_dir, conn) = create_test_db();
insert_test_data(&conn);
let generator = AnalyticsGenerator::new(&conn);
let summary = generator.generate_workspace_summary().unwrap();
assert_eq!(summary.workspaces.len(), 2);
let project_a = summary
.workspaces
.iter()
.find(|w| w.path.contains("project-a"));
assert!(project_a.is_some());
assert_eq!(project_a.unwrap().conversations, 2);
}
#[test]
fn test_agent_summary() {
let (_dir, conn) = create_test_db();
insert_test_data(&conn);
let generator = AnalyticsGenerator::new(&conn);
let summary = generator.generate_agent_summary().unwrap();
assert_eq!(summary.agents.len(), 2);
let claude = summary.agents.iter().find(|a| a.name == "claude-code");
assert!(claude.is_some());
assert_eq!(claude.unwrap().conversations, 2);
assert_eq!(claude.unwrap().messages, 8); }
#[test]
fn test_workspace_summary_distinct_agents_and_recent_titles() {
let (_dir, conn) = create_test_db();
insert_test_data(&conn);
let generator = AnalyticsGenerator::new(&conn);
let summary = generator.generate_workspace_summary().unwrap();
let project_a = summary
.workspaces
.iter()
.find(|w| w.path == "/home/user/project-a")
.expect("project-a workspace should exist");
assert_eq!(project_a.messages, 8); assert_eq!(project_a.agents, vec!["claude-code".to_string()]);
assert_eq!(project_a.recent_titles.len(), 2);
assert_eq!(
project_a.recent_titles.first().map(String::as_str),
Some("Fix database connection")
);
}
#[test]
fn test_agent_summary_high_cardinality_distribution() {
let (_dir, conn) = create_test_db();
let mut conv_id: i64 = 1;
for i in 0..40 {
let workspace = format!("/home/user/ws-{}", i % 10);
let started_at = 1_700_000_000_000i64 + i as i64 * 1_000;
let title = format!("Claude conversation {}", i);
let source = format!("/path/{}.jsonl", conv_id);
conn.execute_compat(
"INSERT INTO conversations (id, agent, workspace, title, source_path, started_at, message_count)
VALUES (?1, 'claude-code', ?2, ?3, ?4, ?5, 1)",
frankensqlite::params![
conv_id,
workspace.as_str(),
title.as_str(),
source.as_str(),
started_at
],
)
.unwrap();
let content = format!("message {}", i);
conn.execute_compat(
"INSERT INTO messages (conversation_id, idx, role, content, created_at)
VALUES (?1, 0, 'assistant', ?2, ?3)",
frankensqlite::params![conv_id, content.as_str(), started_at],
)
.unwrap();
conv_id += 1;
}
for i in 0..5 {
let started_at = 1_700_100_000_000i64 + i as i64 * 1_000;
let title = format!("Codex conversation {}", i);
let source = format!("/path/{}.jsonl", conv_id);
conn.execute_compat(
"INSERT INTO conversations (id, agent, workspace, title, source_path, started_at, message_count)
VALUES (?1, 'codex', '/home/user/codex-ws', ?2, ?3, ?4, 1)",
frankensqlite::params![
conv_id,
title.as_str(),
source.as_str(),
started_at
],
)
.unwrap();
let content = format!("codex {}", i);
conn.execute_compat(
"INSERT INTO messages (conversation_id, idx, role, content, created_at)
VALUES (?1, 0, 'assistant', ?2, ?3)",
frankensqlite::params![conv_id, content.as_str(), started_at],
)
.unwrap();
conv_id += 1;
}
let generator = AnalyticsGenerator::new(&conn);
let summary = generator.generate_agent_summary().unwrap();
let claude = summary
.agents
.iter()
.find(|a| a.name == "claude-code")
.expect("claude-code agent should exist");
assert_eq!(claude.conversations, 40);
assert_eq!(claude.messages, 40);
assert_eq!(claude.workspaces.len(), 10);
assert!((claude.avg_messages_per_conversation - 1.0).abs() < f64::EPSILON);
let codex = summary
.agents
.iter()
.find(|a| a.name == "codex")
.expect("codex agent should exist");
assert_eq!(codex.conversations, 5);
assert_eq!(codex.messages, 5);
}
#[test]
fn test_bundle_write() {
let (_dir, conn) = create_test_db();
insert_test_data(&conn);
let generator = AnalyticsGenerator::new(&conn);
let bundle = generator.generate_all().unwrap();
let output_dir = TempDir::new().unwrap();
bundle.write_to_dir(output_dir.path()).unwrap();
assert!(output_dir.path().join("statistics.json").exists());
assert!(output_dir.path().join("timeline.json").exists());
assert!(output_dir.path().join("workspace_summary.json").exists());
assert!(output_dir.path().join("agent_summary.json").exists());
assert!(output_dir.path().join("top_terms.json").exists());
}
#[test]
fn test_generate_all() {
let (_dir, conn) = create_test_db();
insert_test_data(&conn);
let generator = AnalyticsGenerator::new(&conn);
let bundle = generator.generate_all().unwrap();
assert_eq!(bundle.statistics.total_conversations, 3);
assert!(!bundle.timeline.daily.is_empty() || bundle.timeline.monthly.is_empty());
assert!(!bundle.workspace_summary.workspaces.is_empty());
assert!(!bundle.agent_summary.agents.is_empty());
}
#[test]
fn test_empty_database() {
let (_dir, conn) = create_test_db();
let generator = AnalyticsGenerator::new(&conn);
let bundle = generator.generate_all().unwrap();
assert_eq!(bundle.statistics.total_conversations, 0);
assert_eq!(bundle.statistics.total_messages, 0);
assert!(bundle.timeline.daily.is_empty());
assert!(bundle.workspace_summary.workspaces.is_empty());
assert!(bundle.agent_summary.agents.is_empty());
assert!(bundle.top_terms.terms.is_empty());
}
}