1use anyhow::{Context, Result};
28use chrono::{DateTime, Datelike, NaiveDate, Utc};
29use frankensqlite::compat::{ConnectionExt, RowExt};
30use frankensqlite::{Connection, Row};
31use serde::{Deserialize, Serialize};
32use std::collections::{BTreeMap, HashMap, HashSet};
33use std::path::Path;
34use std::time::Instant;
35use tracing::info;
36
37const STOP_WORDS: &[&str] = &[
39 "the", "a", "an", "and", "or", "but", "in", "on", "at", "to", "for", "of", "with", "by",
40 "from", "is", "it", "as", "was", "be", "are", "been", "being", "have", "has", "had", "do",
41 "does", "did", "will", "would", "could", "should", "may", "might", "must", "shall", "can",
42 "need", "this", "that", "these", "those", "i", "you", "he", "she", "we", "they", "what",
43 "which", "who", "when", "where", "why", "how", "all", "each", "every", "both", "few", "more",
44 "most", "other", "some", "such", "no", "nor", "not", "only", "own", "same", "so", "than",
45 "too", "very", "just", "also", "now", "here", "there", "then", "once", "about", "after",
46 "again", "into", "over", "under", "out", "up", "down", "off", "any", "its", "your", "my",
47 "our", "their", "his", "her", "him", "them", "me", "us", "if", "else", "while", "during",
48 "before",
49];
50
51#[derive(Debug, Clone, Serialize, Deserialize)]
53pub struct Statistics {
54 pub total_conversations: usize,
55 pub total_messages: usize,
56 pub total_characters: usize,
57 pub agents: BTreeMap<String, AgentStats>,
63 pub roles: BTreeMap<String, usize>,
64 pub time_range: TimeRange,
65 pub computed_at: String,
67}
68
69#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
71pub struct AgentStats {
72 pub conversations: usize,
73 pub messages: usize,
74}
75
76impl Statistics {
77 pub fn from_packets(packets: &[crate::model::conversation_packet::ConversationPacket]) -> Self {
94 let mut total_messages: usize = 0;
95 let mut total_characters: usize = 0;
96 let mut agents: BTreeMap<String, AgentStats> = BTreeMap::new();
97 let mut roles: BTreeMap<String, usize> = BTreeMap::new();
98 let mut earliest_started_at: Option<i64> = None;
99 let mut latest_started_at: Option<i64> = None;
100
101 for packet in packets {
102 let payload = &packet.payload;
103 let agent_slug = payload.identity.agent_slug.clone();
104 let agent_entry = agents.entry(agent_slug).or_insert(AgentStats {
105 conversations: 0,
106 messages: 0,
107 });
108 agent_entry.conversations = agent_entry.conversations.saturating_add(1);
109
110 let conv_message_count = payload.messages.len();
114 total_messages = total_messages.saturating_add(conv_message_count);
115 agent_entry.messages = agent_entry.messages.saturating_add(conv_message_count);
116
117 for message in &payload.messages {
121 total_characters = total_characters.saturating_add(message.content.chars().count());
122 }
123
124 for message in &payload.messages {
130 let role = if message.role == "assistant" {
131 "agent"
132 } else {
133 message.role.as_str()
134 };
135 *roles.entry(role.to_string()).or_insert(0) += 1;
136 }
137
138 if let Some(started_at) = payload.timestamps.started_at {
139 earliest_started_at = Some(match earliest_started_at {
140 Some(current) => current.min(started_at),
141 None => started_at,
142 });
143 latest_started_at = Some(match latest_started_at {
144 Some(current) => current.max(started_at),
145 None => started_at,
146 });
147 }
148 }
149
150 Self {
151 total_conversations: packets.len(),
152 total_messages,
153 total_characters,
154 agents,
155 roles,
156 time_range: TimeRange {
157 earliest: earliest_started_at
158 .and_then(DateTime::from_timestamp_millis)
159 .map(|dt| dt.to_rfc3339()),
160 latest: latest_started_at
161 .and_then(DateTime::from_timestamp_millis)
162 .map(|dt| dt.to_rfc3339()),
163 },
164 computed_at: Utc::now().to_rfc3339(),
165 }
166 }
167}
168
169#[derive(Debug, Clone, Serialize, Deserialize)]
171pub struct TimeRange {
172 pub earliest: Option<String>,
174 pub latest: Option<String>,
176}
177
178#[derive(Debug, Clone, Serialize, Deserialize)]
180pub struct Timeline {
181 pub daily: Vec<DailyEntry>,
182 pub weekly: Vec<WeeklyEntry>,
183 pub monthly: Vec<MonthlyEntry>,
184 pub by_agent: BTreeMap<String, AgentTimeline>,
187}
188
189#[derive(Debug, Clone, Serialize, Deserialize)]
191pub struct AgentTimeline {
192 pub daily: Vec<DailyEntry>,
193 pub weekly: Vec<WeeklyEntry>,
194 pub monthly: Vec<MonthlyEntry>,
195}
196
197#[derive(Debug, Clone, Serialize, Deserialize)]
199pub struct DailyEntry {
200 pub date: String,
201 pub messages: usize,
202 pub conversations: usize,
203}
204
205#[derive(Debug, Clone, Serialize, Deserialize)]
207pub struct WeeklyEntry {
208 pub week: String,
209 pub messages: usize,
210 pub conversations: usize,
211}
212
213#[derive(Debug, Clone, Serialize, Deserialize)]
215pub struct MonthlyEntry {
216 pub month: String,
217 pub messages: usize,
218 pub conversations: usize,
219}
220
221#[derive(Debug, Clone, Serialize, Deserialize)]
223pub struct WorkspaceSummary {
224 pub workspaces: Vec<WorkspaceEntry>,
225}
226
227#[derive(Debug, Clone, Serialize, Deserialize)]
229pub struct WorkspaceEntry {
230 pub path: String,
231 pub display_name: String,
232 pub conversations: usize,
233 pub messages: usize,
234 pub agents: Vec<String>,
235 pub date_range: TimeRange,
236 pub recent_titles: Vec<String>,
237}
238
239#[derive(Debug, Clone, Serialize, Deserialize)]
241pub struct AgentSummary {
242 pub agents: Vec<AgentEntry>,
243}
244
245#[derive(Debug, Clone, Serialize, Deserialize)]
247pub struct AgentEntry {
248 pub name: String,
249 pub conversations: usize,
250 pub messages: usize,
251 pub workspaces: Vec<String>,
252 pub date_range: TimeRange,
253 pub avg_messages_per_conversation: f64,
254}
255
256#[derive(Debug, Clone, Serialize, Deserialize)]
258pub struct TopTerms {
259 pub terms: Vec<(String, usize)>,
260}
261
262#[derive(Debug, Clone, Serialize, Deserialize)]
264pub struct AnalyticsBundle {
265 pub statistics: Statistics,
266 pub timeline: Timeline,
267 pub workspace_summary: WorkspaceSummary,
268 pub agent_summary: AgentSummary,
269 pub top_terms: TopTerms,
270}
271
272impl AnalyticsBundle {
273 pub fn write_to_dir(&self, dir: &Path) -> Result<()> {
275 std::fs::create_dir_all(dir).context("Failed to create analytics directory")?;
276
277 let stats_path = dir.join("statistics.json");
279 let stats_json = serde_json::to_string_pretty(&self.statistics)
280 .context("Failed to serialize statistics")?;
281 crate::pages::write_file_durably(&stats_path, stats_json.as_bytes())
282 .context("Failed to write statistics.json")?;
283
284 let timeline_path = dir.join("timeline.json");
286 let timeline_json =
287 serde_json::to_string_pretty(&self.timeline).context("Failed to serialize timeline")?;
288 crate::pages::write_file_durably(&timeline_path, timeline_json.as_bytes())
289 .context("Failed to write timeline.json")?;
290
291 let workspace_path = dir.join("workspace_summary.json");
293 let workspace_json = serde_json::to_string_pretty(&self.workspace_summary)
294 .context("Failed to serialize workspace_summary")?;
295 crate::pages::write_file_durably(&workspace_path, workspace_json.as_bytes())
296 .context("Failed to write workspace_summary.json")?;
297
298 let agent_path = dir.join("agent_summary.json");
300 let agent_json = serde_json::to_string_pretty(&self.agent_summary)
301 .context("Failed to serialize agent_summary")?;
302 crate::pages::write_file_durably(&agent_path, agent_json.as_bytes())
303 .context("Failed to write agent_summary.json")?;
304
305 let terms_path = dir.join("top_terms.json");
307 let terms_json = serde_json::to_string_pretty(&self.top_terms)
308 .context("Failed to serialize top_terms")?;
309 crate::pages::write_file_durably(&terms_path, terms_json.as_bytes())
310 .context("Failed to write top_terms.json")?;
311
312 info!(
313 "Analytics written to {:?}: statistics.json, timeline.json, workspace_summary.json, agent_summary.json, top_terms.json",
314 dir
315 );
316
317 Ok(())
318 }
319}
320
321pub struct AnalyticsGenerator<'a> {
323 db: &'a Connection,
324}
325
326impl<'a> AnalyticsGenerator<'a> {
327 pub fn new(db: &'a Connection) -> Self {
329 Self { db }
330 }
331
332 pub fn generate_all(&self) -> Result<AnalyticsBundle> {
334 info!("Generating pre-computed analytics...");
335
336 let statistics = self.generate_statistics()?;
337 let timeline = self.generate_timeline()?;
338 let workspace_summary = self.generate_workspace_summary()?;
339 let agent_summary = self.generate_agent_summary()?;
340 let top_terms = self.generate_top_terms()?;
341
342 Ok(AnalyticsBundle {
343 statistics,
344 timeline,
345 workspace_summary,
346 agent_summary,
347 top_terms,
348 })
349 }
350
351 fn generate_statistics(&self) -> Result<Statistics> {
353 info!("Generating statistics...");
354
355 let total_conversations: i64 = self
357 .db
358 .query_row_map("SELECT COUNT(*) FROM conversations", &[], |row: &Row| {
359 row.get_typed(0)
360 })
361 .context("Failed to count conversations")?;
362
363 let total_messages: i64 = self
365 .db
366 .query_row_map("SELECT COUNT(*) FROM messages", &[], |row: &Row| {
367 row.get_typed(0)
368 })
369 .context("Failed to count messages")?;
370
371 let total_characters: i64 = self
373 .db
374 .query_row_map(
375 "SELECT SUM(LENGTH(content)) FROM messages",
376 &[],
377 |row: &Row| row.get_typed::<Option<i64>>(0),
378 )
379 .context("Failed to sum content lengths")?
380 .unwrap_or(0);
381
382 let mut agents: BTreeMap<String, AgentStats> = BTreeMap::new();
384 let agent_conv_rows: Vec<(String, i64)> = self.db.query_map_collect(
385 "SELECT agent, COUNT(*) as conv_count FROM conversations GROUP BY agent",
386 &[],
387 |row: &Row| Ok((row.get_typed::<String>(0)?, row.get_typed::<i64>(1)?)),
388 )?;
389 for (agent, conv_count) in agent_conv_rows {
390 agents.insert(
391 agent.clone(),
392 AgentStats {
393 conversations: conv_count as usize,
394 messages: 0, },
396 );
397 }
398
399 let msg_rows: Vec<(String, i64)> = self.db.query_map_collect(
401 "SELECT c.agent, COUNT(m.id) FROM messages m
402 JOIN conversations c ON m.conversation_id = c.id
403 GROUP BY c.agent",
404 &[],
405 |row: &Row| Ok((row.get_typed::<String>(0)?, row.get_typed::<i64>(1)?)),
406 )?;
407 for (agent, msg_count) in msg_rows {
408 if let Some(stats) = agents.get_mut(&agent) {
409 stats.messages = msg_count as usize;
410 }
411 }
412
413 let mut roles: BTreeMap<String, usize> = BTreeMap::new();
415 let role_rows: Vec<(String, i64)> = self.db.query_map_collect(
416 "SELECT role, COUNT(*) FROM messages GROUP BY role",
417 &[],
418 |row: &Row| Ok((row.get_typed::<String>(0)?, row.get_typed::<i64>(1)?)),
419 )?;
420 for (role, count) in role_rows {
421 roles.insert(role, count as usize);
422 }
423
424 let time_range: (Option<i64>, Option<i64>) = self
426 .db
427 .query_row_map(
428 "SELECT MIN(started_at), MAX(started_at) FROM conversations",
429 &[],
430 |row: &Row| Ok((row.get_typed(0)?, row.get_typed(1)?)),
431 )
432 .context("Failed to get time range")?;
433
434 Ok(Statistics {
435 total_conversations: total_conversations as usize,
436 total_messages: total_messages as usize,
437 total_characters: total_characters as usize,
438 agents,
439 roles,
440 time_range: TimeRange {
441 earliest: time_range
442 .0
443 .and_then(DateTime::from_timestamp_millis)
444 .map(|dt| dt.to_rfc3339()),
445 latest: time_range
446 .1
447 .and_then(DateTime::from_timestamp_millis)
448 .map(|dt| dt.to_rfc3339()),
449 },
450 computed_at: Utc::now().to_rfc3339(),
451 })
452 }
453
454 fn generate_timeline(&self) -> Result<Timeline> {
456 info!("Generating timeline...");
457
458 let mut daily_map: HashMap<String, DailyEntry> = HashMap::new();
460 let mut daily_conv_ids: HashMap<String, HashSet<i64>> = HashMap::new();
461
462 let timeline_rows: Vec<(Option<String>, i64)> = self.db.query_map_collect(
463 "SELECT DATE(m.created_at/1000, 'unixepoch') as date, m.conversation_id
464 FROM messages m
465 WHERE m.created_at IS NOT NULL
466 ORDER BY date",
467 &[],
468 |row: &Row| {
469 Ok((
470 row.get_typed::<Option<String>>(0)?,
471 row.get_typed::<i64>(1)?,
472 ))
473 },
474 )?;
475
476 for (date_opt, conv_id) in timeline_rows {
477 if let Some(date) = date_opt {
478 let entry = daily_map.entry(date.clone()).or_insert(DailyEntry {
479 date: date.clone(),
480 messages: 0,
481 conversations: 0,
482 });
483 entry.messages += 1;
484 daily_conv_ids.entry(date).or_default().insert(conv_id);
485 }
486 }
487
488 for (date, conv_ids) in &daily_conv_ids {
490 if let Some(entry) = daily_map.get_mut(date) {
491 entry.conversations = conv_ids.len();
492 }
493 }
494
495 let mut daily: Vec<DailyEntry> = daily_map.into_values().collect();
496 daily.sort_by(|a, b| a.date.cmp(&b.date));
497
498 let weekly = aggregate_to_weekly(&daily);
500
501 let monthly = aggregate_to_monthly(&daily);
503
504 let mut by_agent: BTreeMap<String, AgentTimeline> = BTreeMap::new();
506 let mut agent_daily_map: HashMap<String, HashMap<String, DailyEntry>> = HashMap::new();
507 let mut agent_daily_conv_ids: HashMap<String, HashMap<String, HashSet<i64>>> =
508 HashMap::new();
509
510 let agent_timeline_rows: Vec<(Option<String>, String, i64)> = self.db.query_map_collect(
511 "SELECT DATE(m.created_at/1000, 'unixepoch') as date, c.agent, m.conversation_id
512 FROM messages m
513 JOIN conversations c ON m.conversation_id = c.id
514 WHERE m.created_at IS NOT NULL
515 ORDER BY date",
516 &[],
517 |row: &Row| {
518 Ok((
519 row.get_typed::<Option<String>>(0)?,
520 row.get_typed::<String>(1)?,
521 row.get_typed::<i64>(2)?,
522 ))
523 },
524 )?;
525
526 for (date_opt, agent, conv_id) in agent_timeline_rows {
527 if let Some(date) = date_opt {
528 let agent_map = agent_daily_map.entry(agent.clone()).or_default();
529 let entry = agent_map.entry(date.clone()).or_insert(DailyEntry {
530 date: date.clone(),
531 messages: 0,
532 conversations: 0,
533 });
534 entry.messages += 1;
535
536 agent_daily_conv_ids
537 .entry(agent)
538 .or_default()
539 .entry(date)
540 .or_default()
541 .insert(conv_id);
542 }
543 }
544
545 for (agent, conv_ids_map) in &agent_daily_conv_ids {
547 if let Some(daily_map) = agent_daily_map.get_mut(agent) {
548 for (date, conv_ids) in conv_ids_map {
549 if let Some(entry) = daily_map.get_mut(date) {
550 entry.conversations = conv_ids.len();
551 }
552 }
553 }
554 }
555
556 for (agent, daily_map) in agent_daily_map {
558 let mut agent_daily: Vec<DailyEntry> = daily_map.into_values().collect();
559 agent_daily.sort_by(|a, b| a.date.cmp(&b.date));
560 let agent_weekly = aggregate_to_weekly(&agent_daily);
561 let agent_monthly = aggregate_to_monthly(&agent_daily);
562
563 by_agent.insert(
564 agent,
565 AgentTimeline {
566 daily: agent_daily,
567 weekly: agent_weekly,
568 monthly: agent_monthly,
569 },
570 );
571 }
572
573 Ok(Timeline {
574 daily,
575 weekly,
576 monthly,
577 by_agent,
578 })
579 }
580
581 fn generate_workspace_summary(&self) -> Result<WorkspaceSummary> {
583 info!("Generating workspace summary...");
584 let started = Instant::now();
585
586 let mut workspaces: Vec<WorkspaceEntry> = Vec::new();
587
588 let workspace_rows: Vec<(String, i64, Option<i64>, Option<i64>)> =
590 self.db.query_map_collect(
591 "SELECT workspace, COUNT(*) as conv_count,
592 MIN(started_at), MAX(started_at)
593 FROM conversations
594 WHERE workspace IS NOT NULL
595 GROUP BY workspace
596 ORDER BY conv_count DESC",
597 &[],
598 |row: &Row| {
599 Ok((
600 row.get_typed::<String>(0)?,
601 row.get_typed::<i64>(1)?,
602 row.get_typed::<Option<i64>>(2)?,
603 row.get_typed::<Option<i64>>(3)?,
604 ))
605 },
606 )?;
607
608 let mut messages_by_workspace: HashMap<String, i64> = HashMap::new();
610 let ws_msg_rows: Vec<(String, i64)> = self.db.query_map_collect(
611 "SELECT c.workspace, COUNT(m.id)
612 FROM conversations c
613 LEFT JOIN messages m ON m.conversation_id = c.id
614 WHERE c.workspace IS NOT NULL
615 GROUP BY c.workspace",
616 &[],
617 |row: &Row| Ok((row.get_typed::<String>(0)?, row.get_typed::<i64>(1)?)),
618 )?;
619 for (workspace, msg_count) in ws_msg_rows {
620 messages_by_workspace.insert(workspace, msg_count);
621 }
622
623 let mut agents_by_workspace: HashMap<String, Vec<String>> = HashMap::new();
625 let ws_agent_rows: Vec<(String, String)> = self.db.query_map_collect(
626 "SELECT workspace, agent
627 FROM conversations
628 WHERE workspace IS NOT NULL
629 GROUP BY workspace, agent
630 ORDER BY workspace, agent",
631 &[],
632 |row: &Row| Ok((row.get_typed::<String>(0)?, row.get_typed::<String>(1)?)),
633 )?;
634 for (workspace, agent) in ws_agent_rows {
635 agents_by_workspace
636 .entry(workspace)
637 .or_default()
638 .push(agent);
639 }
640
641 let mut recent_titles_by_workspace: HashMap<String, Vec<String>> = HashMap::new();
643 let ws_title_rows: Vec<(String, String)> = self.db.query_map_collect(
644 "SELECT workspace, title
645 FROM conversations
646 WHERE workspace IS NOT NULL AND title IS NOT NULL
647 ORDER BY workspace, started_at DESC",
648 &[],
649 |row: &Row| Ok((row.get_typed::<String>(0)?, row.get_typed::<String>(1)?)),
650 )?;
651 for (workspace, title) in ws_title_rows {
652 let titles = recent_titles_by_workspace.entry(workspace).or_default();
653 if titles.len() < 5 {
654 titles.push(title);
655 }
656 }
657
658 for (workspace, conv_count, min_ts, max_ts) in workspace_rows {
659 let msg_count = messages_by_workspace.get(&workspace).copied().unwrap_or(0);
660 let agents = agents_by_workspace.remove(&workspace).unwrap_or_default();
661 let recent_titles = recent_titles_by_workspace
662 .remove(&workspace)
663 .unwrap_or_default();
664
665 let display_name = Path::new(&workspace)
667 .file_name()
668 .map(|s| s.to_string_lossy().to_string())
669 .unwrap_or_else(|| workspace.clone());
670
671 workspaces.push(WorkspaceEntry {
672 path: workspace,
673 display_name,
674 conversations: conv_count as usize,
675 messages: msg_count as usize,
676 agents,
677 date_range: TimeRange {
678 earliest: min_ts
679 .and_then(DateTime::from_timestamp_millis)
680 .map(|dt| dt.to_rfc3339()),
681 latest: max_ts
682 .and_then(DateTime::from_timestamp_millis)
683 .map(|dt| dt.to_rfc3339()),
684 },
685 recent_titles,
686 });
687 }
688
689 info!(
690 query_count = 4,
691 workspace_rows = workspaces.len(),
692 elapsed_ms = started.elapsed().as_millis(),
693 "Workspace summary generated using set-based aggregation"
694 );
695
696 Ok(WorkspaceSummary { workspaces })
697 }
698
699 fn generate_agent_summary(&self) -> Result<AgentSummary> {
701 info!("Generating agent summary...");
702 let started = Instant::now();
703
704 let mut agents: Vec<AgentEntry> = Vec::new();
705
706 let agent_rows: Vec<(String, i64, Option<i64>, Option<i64>)> = self.db.query_map_collect(
708 "SELECT agent, COUNT(*) as conv_count,
709 MIN(started_at), MAX(started_at)
710 FROM conversations
711 GROUP BY agent
712 ORDER BY conv_count DESC",
713 &[],
714 |row: &Row| {
715 Ok((
716 row.get_typed::<String>(0)?,
717 row.get_typed::<i64>(1)?,
718 row.get_typed::<Option<i64>>(2)?,
719 row.get_typed::<Option<i64>>(3)?,
720 ))
721 },
722 )?;
723
724 let mut messages_by_agent: HashMap<String, i64> = HashMap::new();
726 let agent_msg_rows: Vec<(String, i64)> = self.db.query_map_collect(
727 "SELECT c.agent, COUNT(m.id)
728 FROM conversations c
729 LEFT JOIN messages m ON m.conversation_id = c.id
730 GROUP BY c.agent",
731 &[],
732 |row: &Row| Ok((row.get_typed::<String>(0)?, row.get_typed::<i64>(1)?)),
733 )?;
734 for (agent, msg_count) in agent_msg_rows {
735 messages_by_agent.insert(agent, msg_count);
736 }
737
738 let mut workspaces_by_agent: HashMap<String, Vec<String>> = HashMap::new();
740 let agent_ws_rows: Vec<(String, String)> = self.db.query_map_collect(
741 "SELECT agent, workspace
742 FROM conversations
743 WHERE workspace IS NOT NULL
744 GROUP BY agent, workspace
745 ORDER BY agent, workspace",
746 &[],
747 |row: &Row| Ok((row.get_typed::<String>(0)?, row.get_typed::<String>(1)?)),
748 )?;
749 for (agent, workspace) in agent_ws_rows {
750 workspaces_by_agent
751 .entry(agent)
752 .or_default()
753 .push(workspace);
754 }
755
756 for (agent, conv_count, min_ts, max_ts) in agent_rows {
757 let msg_count = messages_by_agent.get(&agent).copied().unwrap_or(0);
758 let workspaces = workspaces_by_agent.remove(&agent).unwrap_or_default();
759
760 let avg_messages = if conv_count > 0 {
761 msg_count as f64 / conv_count as f64
762 } else {
763 0.0
764 };
765
766 agents.push(AgentEntry {
767 name: agent,
768 conversations: conv_count as usize,
769 messages: msg_count as usize,
770 workspaces,
771 date_range: TimeRange {
772 earliest: min_ts
773 .and_then(DateTime::from_timestamp_millis)
774 .map(|dt| dt.to_rfc3339()),
775 latest: max_ts
776 .and_then(DateTime::from_timestamp_millis)
777 .map(|dt| dt.to_rfc3339()),
778 },
779 avg_messages_per_conversation: avg_messages,
780 });
781 }
782
783 info!(
784 query_count = 3,
785 agent_rows = agents.len(),
786 elapsed_ms = started.elapsed().as_millis(),
787 "Agent summary generated using set-based aggregation"
788 );
789
790 Ok(AgentSummary { agents })
791 }
792
793 fn generate_top_terms(&self) -> Result<TopTerms> {
795 info!("Generating top terms...");
796
797 let stop_words: HashSet<&str> = STOP_WORDS.iter().copied().collect();
798
799 let titles: Vec<String> = self.db.query_map_collect(
801 "SELECT title FROM conversations WHERE title IS NOT NULL",
802 &[],
803 |row: &Row| row.get_typed::<String>(0),
804 )?;
805
806 let mut term_counts: HashMap<String, usize> = HashMap::new();
807
808 for title in titles {
809 for word in title.split_whitespace() {
810 let word: String = word
812 .chars()
813 .filter(|c| c.is_alphanumeric() || *c == '_' || *c == '-')
814 .collect::<String>()
815 .to_lowercase();
816
817 if word.len() >= 3 && !stop_words.contains(word.as_str()) {
819 *term_counts.entry(word).or_insert(0) += 1;
820 }
821 }
822 }
823
824 let mut top: Vec<(String, usize)> = term_counts.into_iter().collect();
826 top.sort_by_key(|entry| std::cmp::Reverse(entry.1));
827
828 top.truncate(100);
830
831 Ok(TopTerms { terms: top })
832 }
833}
834
835pub fn aggregate_to_weekly(daily: &[DailyEntry]) -> Vec<WeeklyEntry> {
837 let mut weekly_map: HashMap<String, WeeklyEntry> = HashMap::new();
838
839 for entry in daily {
840 if let Ok(date) = NaiveDate::parse_from_str(&entry.date, "%Y-%m-%d") {
842 let iso_week = date.iso_week();
843 let week_str = format!("{}-W{:02}", iso_week.year(), iso_week.week());
844
845 let weekly = weekly_map.entry(week_str.clone()).or_insert(WeeklyEntry {
846 week: week_str,
847 messages: 0,
848 conversations: 0,
849 });
850 weekly.messages += entry.messages;
851 weekly.conversations += entry.conversations;
852 }
853 }
854
855 let mut result: Vec<WeeklyEntry> = weekly_map.into_values().collect();
856 result.sort_by(|a, b| a.week.cmp(&b.week));
857 result
858}
859
860pub fn aggregate_to_monthly(daily: &[DailyEntry]) -> Vec<MonthlyEntry> {
862 let mut monthly_map: HashMap<String, MonthlyEntry> = HashMap::new();
863
864 for entry in daily {
865 if entry.date.len() >= 7 {
867 let month_str = entry.date[..7].to_string();
868
869 let monthly = monthly_map
870 .entry(month_str.clone())
871 .or_insert(MonthlyEntry {
872 month: month_str,
873 messages: 0,
874 conversations: 0,
875 });
876 monthly.messages += entry.messages;
877 monthly.conversations += entry.conversations;
878 }
879 }
880
881 let mut result: Vec<MonthlyEntry> = monthly_map.into_values().collect();
882 result.sort_by(|a, b| a.month.cmp(&b.month));
883 result
884}
885
886#[cfg(test)]
887mod tests {
888 use super::*;
889 use tempfile::TempDir;
890
891 fn create_test_db() -> (TempDir, Connection) {
892 let dir = TempDir::new().unwrap();
893 let db_path = dir.path().join("test.db");
894 let conn = Connection::open(db_path.to_string_lossy().as_ref()).unwrap();
895
896 conn.execute_batch(
898 "CREATE TABLE conversations (
899 id INTEGER PRIMARY KEY,
900 agent TEXT NOT NULL,
901 workspace TEXT,
902 title TEXT,
903 source_path TEXT NOT NULL,
904 started_at INTEGER,
905 ended_at INTEGER,
906 message_count INTEGER,
907 metadata_json TEXT
908 );
909 CREATE TABLE messages (
910 id INTEGER PRIMARY KEY,
911 conversation_id INTEGER NOT NULL,
912 idx INTEGER NOT NULL,
913 role TEXT NOT NULL,
914 content TEXT NOT NULL,
915 created_at INTEGER,
916 FOREIGN KEY (conversation_id) REFERENCES conversations(id)
917 );",
918 )
919 .unwrap();
920
921 (dir, conn)
922 }
923
924 fn insert_test_data(conn: &Connection) {
925 conn.execute(
927 "INSERT INTO conversations (id, agent, workspace, title, source_path, started_at, message_count)
928 VALUES (1, 'claude-code', '/home/user/project-a', 'Debug authentication flow', '/path/a.jsonl', 1700000000000, 5)",
929 ).unwrap();
930 conn.execute(
931 "INSERT INTO conversations (id, agent, workspace, title, source_path, started_at, message_count)
932 VALUES (2, 'claude-code', '/home/user/project-a', 'Fix database connection', '/path/b.jsonl', 1700100000000, 3)",
933 ).unwrap();
934 conn.execute(
935 "INSERT INTO conversations (id, agent, workspace, title, source_path, started_at, message_count)
936 VALUES (3, 'codex', '/home/user/project-b', 'Add user authentication', '/path/c.jsonl', 1700200000000, 4)",
937 ).unwrap();
938
939 for conv_id in 1..=3 {
941 let msg_count = match conv_id {
942 1 => 5,
943 2 => 3,
944 3 => 4,
945 _ => 0,
946 };
947 for idx in 0..msg_count {
948 let role = if conv_id == 3 && idx == 3 {
949 "narrator"
950 } else if idx % 2 == 0 {
951 "user"
952 } else {
953 "agent"
954 };
955 let created_at =
956 1700000000000i64 + (conv_id as i64 * 100000000) + (idx as i64 * 1000);
957 let content = if conv_id == 3 && idx == 1 {
958 format!("Message {} for conv {} with caf\u{00e9}", idx, conv_id)
959 } else {
960 format!("Message {} for conv {}", idx, conv_id)
961 };
962 conn.execute_compat(
963 "INSERT INTO messages (conversation_id, idx, role, content, created_at)
964 VALUES (?1, ?2, ?3, ?4, ?5)",
965 frankensqlite::params![
966 conv_id as i64,
967 idx as i64,
968 role,
969 content.as_str(),
970 created_at
971 ],
972 )
973 .unwrap();
974 }
975 }
976 }
977
978 #[test]
979 fn test_statistics_generation() {
980 let (_dir, conn) = create_test_db();
981 insert_test_data(&conn);
982
983 let generator = AnalyticsGenerator::new(&conn);
984 let stats = generator.generate_statistics().unwrap();
985
986 assert_eq!(stats.total_conversations, 3);
987 assert_eq!(stats.total_messages, 12); assert!(stats.agents.contains_key("claude-code"));
989 assert!(stats.agents.contains_key("codex"));
990 assert_eq!(stats.agents["claude-code"].conversations, 2);
991 assert_eq!(stats.agents["codex"].conversations, 1);
992 }
993
994 #[test]
1003 fn analytics_statistics_from_packets_matches_sql_for_canonical_corpus() {
1004 use crate::model::conversation_packet::{
1005 ConversationPacket, ConversationPacketMessage, ConversationPacketProvenance,
1006 };
1007 use serde_json::Value;
1008
1009 let (_dir, conn) = create_test_db();
1010 insert_test_data(&conn);
1011
1012 let sql_stats = AnalyticsGenerator::new(&conn)
1013 .generate_statistics()
1014 .unwrap();
1015
1016 let mut packets: Vec<ConversationPacket> = Vec::new();
1022 let conv_rows: Vec<(i64, String, Option<String>, Option<i64>)> = conn
1023 .query_map_collect(
1024 "SELECT id, agent, source_path, started_at FROM conversations ORDER BY id ASC",
1025 &[],
1026 |row: &Row| {
1027 Ok((
1028 row.get_typed::<i64>(0)?,
1029 row.get_typed::<String>(1)?,
1030 row.get_typed::<Option<String>>(2)?,
1031 row.get_typed::<Option<i64>>(3)?,
1032 ))
1033 },
1034 )
1035 .unwrap();
1036
1037 for (conv_id, agent, source_path, started_at) in conv_rows {
1038 let msg_rows: Vec<(i64, String, String, Option<i64>)> = conn
1039 .query_map_collect(
1040 "SELECT idx, role, content, created_at
1041 FROM messages
1042 WHERE conversation_id = ?1
1043 ORDER BY idx ASC",
1044 &[frankensqlite::compat::ParamValue::from(conv_id)],
1045 |row: &Row| {
1046 Ok((
1047 row.get_typed::<i64>(0)?,
1048 row.get_typed::<String>(1)?,
1049 row.get_typed::<String>(2)?,
1050 row.get_typed::<Option<i64>>(3)?,
1051 ))
1052 },
1053 )
1054 .unwrap();
1055
1056 use crate::model::types::{
1061 Conversation, Message, MessageRole, Snippet as CanonicalSnippet,
1062 };
1063 let _ = CanonicalSnippet {
1064 id: None,
1065 file_path: None,
1066 start_line: None,
1067 end_line: None,
1068 language: None,
1069 snippet_text: None,
1070 };
1071 let canonical = Conversation {
1072 id: Some(conv_id),
1073 agent_slug: agent.clone(),
1074 workspace: None,
1075 external_id: None,
1076 title: None,
1077 source_path: source_path
1078 .map(std::path::PathBuf::from)
1079 .unwrap_or_else(|| std::path::PathBuf::from(format!("/tmp/conv-{conv_id}"))),
1080 started_at,
1081 ended_at: None,
1082 approx_tokens: None,
1083 metadata_json: Value::Null,
1084 source_id: "local".to_string(),
1085 origin_host: None,
1086 messages: msg_rows
1087 .into_iter()
1088 .map(|(idx, role, content, created_at)| Message {
1089 id: None,
1090 idx,
1091 role: match role.as_str() {
1092 "user" => MessageRole::User,
1093 "agent" | "assistant" => MessageRole::Agent,
1094 "tool" => MessageRole::Tool,
1095 "system" => MessageRole::System,
1096 other => MessageRole::Other(other.to_string()),
1097 },
1098 author: None,
1099 created_at,
1100 content,
1101 extra_json: Value::Null,
1102 snippets: Vec::new(),
1103 })
1104 .collect(),
1105 };
1106 packets.push(ConversationPacket::from_canonical_replay(
1107 &canonical,
1108 ConversationPacketProvenance::local(),
1109 ));
1110 for msg in &packets.last().unwrap().payload.messages {
1114 let _: &ConversationPacketMessage = msg;
1115 }
1116 }
1117
1118 let mut packet_stats = Statistics::from_packets(&packets);
1119 packet_stats.computed_at = sql_stats.computed_at.clone();
1123
1124 assert_eq!(
1125 packet_stats.total_conversations, sql_stats.total_conversations,
1126 "packet path total_conversations must match SQL path"
1127 );
1128 assert_eq!(
1129 packet_stats.total_messages, sql_stats.total_messages,
1130 "packet path total_messages must match SQL path (12 = 5+3+4)"
1131 );
1132 assert_eq!(
1133 packet_stats.total_characters, sql_stats.total_characters,
1134 "packet path total_characters must match SUM(LENGTH(content))"
1135 );
1136 assert_eq!(
1137 packet_stats.agents, sql_stats.agents,
1138 "per-agent (conversations, messages) buckets must match"
1139 );
1140 assert_eq!(
1141 packet_stats.roles, sql_stats.roles,
1142 "role-count buckets must agree (user/assistant)"
1143 );
1144 assert_eq!(
1145 packet_stats.time_range.earliest, sql_stats.time_range.earliest,
1146 "earliest started_at must round-trip identically through DateTime::from_timestamp_millis"
1147 );
1148 assert_eq!(
1149 packet_stats.time_range.latest, sql_stats.time_range.latest,
1150 "latest started_at must round-trip identically"
1151 );
1152 let sql_json = serde_json::to_string(&sql_stats).unwrap();
1156 let packet_json = serde_json::to_string(&packet_stats).unwrap();
1157 assert_eq!(
1158 sql_json, packet_json,
1159 "SQL-driven and packet-driven Statistics must serialize identically"
1160 );
1161 }
1162
1163 #[test]
1164 fn test_timeline_aggregation() {
1165 let daily = vec![
1166 DailyEntry {
1167 date: "2024-01-01".into(),
1168 messages: 10,
1169 conversations: 1,
1170 },
1171 DailyEntry {
1172 date: "2024-01-02".into(),
1173 messages: 20,
1174 conversations: 2,
1175 },
1176 DailyEntry {
1177 date: "2024-01-08".into(),
1178 messages: 15,
1179 conversations: 1,
1180 },
1181 ];
1182
1183 let weekly = aggregate_to_weekly(&daily);
1184 assert_eq!(weekly.len(), 2); let monthly = aggregate_to_monthly(&daily);
1187 assert_eq!(monthly.len(), 1);
1188 assert_eq!(monthly[0].messages, 45); }
1190
1191 #[test]
1192 fn test_top_terms_extraction() {
1193 let (_dir, conn) = create_test_db();
1194 insert_test_data(&conn);
1195
1196 let generator = AnalyticsGenerator::new(&conn);
1197 let top = generator.generate_top_terms().unwrap();
1198
1199 assert!(
1201 top.terms
1202 .iter()
1203 .any(|(term, count)| term == "authentication" && *count >= 2)
1204 );
1205 }
1206
1207 #[test]
1208 fn test_workspace_summary() {
1209 let (_dir, conn) = create_test_db();
1210 insert_test_data(&conn);
1211
1212 let generator = AnalyticsGenerator::new(&conn);
1213 let summary = generator.generate_workspace_summary().unwrap();
1214
1215 assert_eq!(summary.workspaces.len(), 2);
1216
1217 let project_a = summary
1219 .workspaces
1220 .iter()
1221 .find(|w| w.path.contains("project-a"));
1222 assert!(project_a.is_some());
1223 assert_eq!(project_a.unwrap().conversations, 2);
1224 }
1225
1226 #[test]
1227 fn test_agent_summary() {
1228 let (_dir, conn) = create_test_db();
1229 insert_test_data(&conn);
1230
1231 let generator = AnalyticsGenerator::new(&conn);
1232 let summary = generator.generate_agent_summary().unwrap();
1233
1234 assert_eq!(summary.agents.len(), 2);
1235
1236 let claude = summary.agents.iter().find(|a| a.name == "claude-code");
1237 assert!(claude.is_some());
1238 assert_eq!(claude.unwrap().conversations, 2);
1239 assert_eq!(claude.unwrap().messages, 8); }
1241
1242 #[test]
1243 fn test_workspace_summary_distinct_agents_and_recent_titles() {
1244 let (_dir, conn) = create_test_db();
1245 insert_test_data(&conn);
1246
1247 let generator = AnalyticsGenerator::new(&conn);
1248 let summary = generator.generate_workspace_summary().unwrap();
1249
1250 let project_a = summary
1251 .workspaces
1252 .iter()
1253 .find(|w| w.path == "/home/user/project-a")
1254 .expect("project-a workspace should exist");
1255
1256 assert_eq!(project_a.messages, 8); assert_eq!(project_a.agents, vec!["claude-code".to_string()]);
1258 assert_eq!(project_a.recent_titles.len(), 2);
1259 assert_eq!(
1260 project_a.recent_titles.first().map(String::as_str),
1261 Some("Fix database connection")
1262 );
1263 }
1264
1265 #[test]
1266 fn test_agent_summary_high_cardinality_distribution() {
1267 let (_dir, conn) = create_test_db();
1268
1269 let mut conv_id: i64 = 1;
1270
1271 for i in 0..40 {
1273 let workspace = format!("/home/user/ws-{}", i % 10);
1274 let started_at = 1_700_000_000_000i64 + i as i64 * 1_000;
1275 let title = format!("Claude conversation {}", i);
1276 let source = format!("/path/{}.jsonl", conv_id);
1277 conn.execute_compat(
1278 "INSERT INTO conversations (id, agent, workspace, title, source_path, started_at, message_count)
1279 VALUES (?1, 'claude-code', ?2, ?3, ?4, ?5, 1)",
1280 frankensqlite::params![
1281 conv_id,
1282 workspace.as_str(),
1283 title.as_str(),
1284 source.as_str(),
1285 started_at
1286 ],
1287 )
1288 .unwrap();
1289 let content = format!("message {}", i);
1290 conn.execute_compat(
1291 "INSERT INTO messages (conversation_id, idx, role, content, created_at)
1292 VALUES (?1, 0, 'assistant', ?2, ?3)",
1293 frankensqlite::params![conv_id, content.as_str(), started_at],
1294 )
1295 .unwrap();
1296 conv_id += 1;
1297 }
1298
1299 for i in 0..5 {
1301 let started_at = 1_700_100_000_000i64 + i as i64 * 1_000;
1302 let title = format!("Codex conversation {}", i);
1303 let source = format!("/path/{}.jsonl", conv_id);
1304 conn.execute_compat(
1305 "INSERT INTO conversations (id, agent, workspace, title, source_path, started_at, message_count)
1306 VALUES (?1, 'codex', '/home/user/codex-ws', ?2, ?3, ?4, 1)",
1307 frankensqlite::params![
1308 conv_id,
1309 title.as_str(),
1310 source.as_str(),
1311 started_at
1312 ],
1313 )
1314 .unwrap();
1315 let content = format!("codex {}", i);
1316 conn.execute_compat(
1317 "INSERT INTO messages (conversation_id, idx, role, content, created_at)
1318 VALUES (?1, 0, 'assistant', ?2, ?3)",
1319 frankensqlite::params![conv_id, content.as_str(), started_at],
1320 )
1321 .unwrap();
1322 conv_id += 1;
1323 }
1324
1325 let generator = AnalyticsGenerator::new(&conn);
1326 let summary = generator.generate_agent_summary().unwrap();
1327
1328 let claude = summary
1329 .agents
1330 .iter()
1331 .find(|a| a.name == "claude-code")
1332 .expect("claude-code agent should exist");
1333 assert_eq!(claude.conversations, 40);
1334 assert_eq!(claude.messages, 40);
1335 assert_eq!(claude.workspaces.len(), 10);
1336 assert!((claude.avg_messages_per_conversation - 1.0).abs() < f64::EPSILON);
1337
1338 let codex = summary
1339 .agents
1340 .iter()
1341 .find(|a| a.name == "codex")
1342 .expect("codex agent should exist");
1343 assert_eq!(codex.conversations, 5);
1344 assert_eq!(codex.messages, 5);
1345 }
1346
1347 #[test]
1348 fn test_bundle_write() {
1349 let (_dir, conn) = create_test_db();
1350 insert_test_data(&conn);
1351
1352 let generator = AnalyticsGenerator::new(&conn);
1353 let bundle = generator.generate_all().unwrap();
1354
1355 let output_dir = TempDir::new().unwrap();
1356 bundle.write_to_dir(output_dir.path()).unwrap();
1357
1358 assert!(output_dir.path().join("statistics.json").exists());
1360 assert!(output_dir.path().join("timeline.json").exists());
1361 assert!(output_dir.path().join("workspace_summary.json").exists());
1362 assert!(output_dir.path().join("agent_summary.json").exists());
1363 assert!(output_dir.path().join("top_terms.json").exists());
1364 }
1365
1366 #[test]
1367 fn test_generate_all() {
1368 let (_dir, conn) = create_test_db();
1369 insert_test_data(&conn);
1370
1371 let generator = AnalyticsGenerator::new(&conn);
1372 let bundle = generator.generate_all().unwrap();
1373
1374 assert_eq!(bundle.statistics.total_conversations, 3);
1376 assert!(!bundle.timeline.daily.is_empty() || bundle.timeline.monthly.is_empty());
1377 assert!(!bundle.workspace_summary.workspaces.is_empty());
1378 assert!(!bundle.agent_summary.agents.is_empty());
1379 }
1381
1382 #[test]
1383 fn test_empty_database() {
1384 let (_dir, conn) = create_test_db();
1385 let generator = AnalyticsGenerator::new(&conn);
1388 let bundle = generator.generate_all().unwrap();
1389
1390 assert_eq!(bundle.statistics.total_conversations, 0);
1391 assert_eq!(bundle.statistics.total_messages, 0);
1392 assert!(bundle.timeline.daily.is_empty());
1393 assert!(bundle.workspace_summary.workspaces.is_empty());
1394 assert!(bundle.agent_summary.agents.is_empty());
1395 assert!(bundle.top_terms.terms.is_empty());
1396 }
1397}