1use std::fs;
11use std::path::Path;
12
13use crate::config;
14use crate::conversation::{self, Conversation};
15use crate::ephemeral::{self, EphemeralEntry};
16use crate::frontmatter::Frontmatter;
17use crate::summarize;
18use crate::tags;
19
20#[derive(Debug, Clone)]
22pub struct SessionMetadata {
23 pub session_id: String,
24 pub started_at: Option<String>,
25 pub ended_at: Option<String>,
26 pub entity_name: String,
27}
28
29pub struct ArchiveResult {
31 pub log_number: u32,
32 pub full_content: String,
33 pub session_id: String,
34}
35
36pub fn highest_conversation_number(conversations_dir: &Path) -> u32 {
38 let entries = match fs::read_dir(conversations_dir) {
39 Ok(e) => e,
40 Err(_) => return 0,
41 };
42
43 let mut max = 0u32;
44 for entry in entries.flatten() {
45 let name = entry.file_name();
46 let name = name.to_string_lossy();
47 if let Some(num_str) = name
48 .strip_prefix("conversation-")
49 .and_then(|s| s.strip_suffix(".md"))
50 {
51 if let Ok(n) = num_str.parse::<u32>() {
52 if n > max {
53 max = n;
54 }
55 }
56 }
57 }
58
59 max
60}
61
62pub fn append_index(
64 archive_path: &Path,
65 log_num: u32,
66 date: &str,
67 session_id: &str,
68 topics: &[String],
69 message_count: u32,
70 duration: &str,
71) -> Result<(), String> {
72 use std::io::Write;
73
74 let needs_header = if archive_path.exists() {
75 fs::read_to_string(archive_path)
76 .unwrap_or_default()
77 .trim()
78 .is_empty()
79 } else {
80 true
81 };
82
83 let mut file = fs::OpenOptions::new()
84 .create(true)
85 .append(true)
86 .open(archive_path)
87 .map_err(|e| format!("Failed to open ARCHIVE.md: {e}"))?;
88
89 if needs_header {
90 writeln!(file, "# Conversation Archive\n")
91 .map_err(|e| format!("Failed to write ARCHIVE.md header: {e}"))?;
92 writeln!(
93 file,
94 "| # | Date | Session | Topics | Messages | Duration |"
95 )
96 .map_err(|e| format!("Failed to write ARCHIVE.md header: {e}"))?;
97 writeln!(
98 file,
99 "|---|------|---------|--------|----------|----------|"
100 )
101 .map_err(|e| format!("Failed to write ARCHIVE.md header: {e}"))?;
102 }
103
104 let topics_str = if topics.is_empty() {
105 "\u{2014}".to_string()
106 } else {
107 topics.join(", ")
108 };
109
110 writeln!(
111 file,
112 "| {log_num:03} | {date} | {session_id} | {topics_str} | {message_count} | {duration} |"
113 )
114 .map_err(|e| format!("Failed to write to ARCHIVE.md: {e}"))?;
115
116 Ok(())
117}
118
119pub fn archive_conversation(
130 memory_dir: &Path,
131 conv: &Conversation,
132 summary: &summarize::ConversationSummary,
133 source: &str,
134) -> Result<ArchiveResult, String> {
135 let conversations_dir = memory_dir.join("conversations");
136 let archive_index = memory_dir.join("ARCHIVE.md");
137 let ephemeral_path = memory_dir.join("EPHEMERAL.md");
138
139 if !conversations_dir.exists() {
140 return Err("conversations/ directory not found. Run init first.".to_string());
141 }
142
143 if conv.user_message_count == 0 {
145 return Ok(ArchiveResult {
146 log_number: 0,
147 full_content: String::new(),
148 session_id: conv.session_id.clone(),
149 });
150 }
151
152 let next_num = highest_conversation_number(&conversations_dir) + 1;
153
154 let now = conversation::utc_now();
155 let date = conversation::date_from_timestamp(&now);
156 let duration = match (&conv.first_timestamp, &conv.last_timestamp) {
157 (Some(start), Some(end)) => conversation::calculate_duration(start, end),
158 _ => "unknown".to_string(),
159 };
160 let total_messages = conv.total_messages();
161
162 let fm = Frontmatter {
164 log: next_num,
165 date: now.clone(),
166 session_id: conv.session_id.clone(),
167 message_count: total_messages,
168 duration: duration.clone(),
169 source: source.to_string(),
170 topics: summary.topics.clone(),
171 };
172
173 let md_body = conversation::conversation_to_markdown(conv, next_num);
175
176 let conv_tags = tags::extract_tags(&conv.entries);
178 let tags_section = tags::format_tags_section(&conv_tags);
179
180 let summary_section = if !summary.summary.is_empty() {
182 let mut s = format!("## Summary\n\n{}\n\n", summary.summary);
183 if !summary.decisions.is_empty() {
184 s.push_str("**Decisions**:\n");
185 for d in &summary.decisions {
186 s.push_str(&format!("- {d}\n"));
187 }
188 s.push('\n');
189 }
190 if !summary.action_items.is_empty() {
191 s.push_str("**Action Items**:\n");
192 for a in &summary.action_items {
193 s.push_str(&format!("- {a}\n"));
194 }
195 s.push('\n');
196 }
197 s
198 } else {
199 String::new()
200 };
201
202 let full_content = format!(
203 "{}\n\n{}{}\n{}",
204 fm.render(),
205 summary_section,
206 md_body,
207 tags_section
208 );
209
210 let conv_file = conversations_dir.join(format!("conversation-{next_num:03}.md"));
212 fs::write(&conv_file, &full_content)
213 .map_err(|e| format!("Failed to write conversation file: {e}"))?;
214
215 append_index(
217 &archive_index,
218 next_num,
219 &date,
220 &conv.session_id,
221 &summary.topics,
222 total_messages,
223 &duration,
224 )?;
225
226 let entry = EphemeralEntry {
228 session_id: conv.session_id.clone(),
229 date: now,
230 duration,
231 message_count: total_messages,
232 archive_file: format!("conversation-{next_num:03}.md"),
233 summary: summary.summary.clone(),
234 };
235 ephemeral::append_entry(&ephemeral_path, &entry)?;
236 let cfg = config::load_from_dir(memory_dir);
237 ephemeral::trim_to_limit(&ephemeral_path, cfg.ephemeral.max_entries)?;
238
239 eprintln!(
240 "recall-echo: archived conversation-{:03}.md ({} messages)",
241 next_num, total_messages
242 );
243
244 Ok(ArchiveResult {
245 log_number: next_num,
246 full_content,
247 session_id: conv.session_id.clone(),
248 })
249}
250
251#[cfg(feature = "graph")]
253pub fn graph_ingest(memory_dir: &Path, result: &ArchiveResult) {
254 if result.log_number == 0 {
255 return;
256 }
257 let rt = match tokio::runtime::Runtime::new() {
258 Ok(rt) => rt,
259 Err(e) => {
260 eprintln!("recall-echo: graph runtime error: {e}");
261 return;
262 }
263 };
264 if let Err(e) = rt.block_on(crate::graph_bridge::ingest_into_graph(
265 memory_dir,
266 &result.full_content,
267 &result.session_id,
268 Some(result.log_number),
269 )) {
270 eprintln!("recall-echo: graph ingestion warning: {e}");
271 }
272}
273
274#[cfg(feature = "graph")]
278pub fn pipeline_sync_on_archive(memory_dir: &Path) {
279 let cfg = config::load_from_dir(memory_dir);
280 let pipeline = match cfg.pipeline {
281 Some(ref p) if p.auto_sync == Some(true) => p,
282 _ => return,
283 };
284
285 let docs_dir = match pipeline.docs_dir {
286 Some(ref d) => {
287 let path = std::path::PathBuf::from(shellexpand_path(d));
288 if !path.exists() {
289 eprintln!(
290 "recall-echo: pipeline docs_dir not found: {}",
291 path.display()
292 );
293 return;
294 }
295 path
296 }
297 None => {
298 eprintln!("recall-echo: pipeline auto_sync enabled but no docs_dir configured");
299 return;
300 }
301 };
302
303 let graph_dir = memory_dir.join("graph");
304 if !graph_dir.exists() {
305 return;
306 }
307
308 let rt = match tokio::runtime::Runtime::new() {
309 Ok(rt) => rt,
310 Err(e) => {
311 eprintln!("recall-echo: pipeline sync runtime error: {e}");
312 return;
313 }
314 };
315
316 if let Err(e) = rt.block_on(async {
317 let gm = recall_graph::GraphMemory::open(&graph_dir)
318 .await
319 .map_err(|e| format!("graph open: {e}"))?;
320
321 let docs = recall_graph::types::PipelineDocuments {
322 learning: read_opt_file(&docs_dir, "LEARNING.md"),
323 thoughts: read_opt_file(&docs_dir, "THOUGHTS.md"),
324 curiosity: read_opt_file(&docs_dir, "CURIOSITY.md"),
325 reflections: read_opt_file(&docs_dir, "REFLECTIONS.md"),
326 praxis: read_opt_file(&docs_dir, "PRAXIS.md"),
327 };
328
329 let report = gm.sync_pipeline(&docs).await.map_err(|e| format!("{e}"))?;
330
331 if report.entities_created > 0
332 || report.entities_updated > 0
333 || report.entities_archived > 0
334 {
335 eprintln!(
336 "recall-echo: pipeline synced — +{} created, ~{} updated, -{} archived",
337 report.entities_created, report.entities_updated, report.entities_archived
338 );
339 }
340
341 Ok::<(), String>(())
342 }) {
343 eprintln!("recall-echo: pipeline sync warning: {e}");
344 }
345}
346
347#[cfg(all(feature = "graph", feature = "pulse-null"))]
349async fn pipeline_sync_on_archive_async(memory_dir: &Path) {
350 let cfg = config::load_from_dir(memory_dir);
351 let pipeline = match cfg.pipeline {
352 Some(ref p) if p.auto_sync == Some(true) => p.clone(),
353 _ => return,
354 };
355
356 let docs_dir = match pipeline.docs_dir {
357 Some(ref d) => {
358 let path = std::path::PathBuf::from(shellexpand_path(d));
359 if !path.exists() {
360 eprintln!(
361 "recall-echo: pipeline docs_dir not found: {}",
362 path.display()
363 );
364 return;
365 }
366 path
367 }
368 None => {
369 eprintln!("recall-echo: pipeline auto_sync enabled but no docs_dir configured");
370 return;
371 }
372 };
373
374 let graph_dir = memory_dir.join("graph");
375 if !graph_dir.exists() {
376 return;
377 }
378
379 let gm = match recall_graph::GraphMemory::open(&graph_dir).await {
380 Ok(gm) => gm,
381 Err(e) => {
382 eprintln!("recall-echo: pipeline sync open error: {e}");
383 return;
384 }
385 };
386
387 let docs = recall_graph::types::PipelineDocuments {
388 learning: read_opt_file(&docs_dir, "LEARNING.md"),
389 thoughts: read_opt_file(&docs_dir, "THOUGHTS.md"),
390 curiosity: read_opt_file(&docs_dir, "CURIOSITY.md"),
391 reflections: read_opt_file(&docs_dir, "REFLECTIONS.md"),
392 praxis: read_opt_file(&docs_dir, "PRAXIS.md"),
393 };
394
395 match gm.sync_pipeline(&docs).await {
396 Ok(report) => {
397 if report.entities_created > 0
398 || report.entities_updated > 0
399 || report.entities_archived > 0
400 {
401 eprintln!(
402 "recall-echo: pipeline synced — +{} created, ~{} updated, -{} archived",
403 report.entities_created, report.entities_updated, report.entities_archived
404 );
405 }
406 }
407 Err(e) => eprintln!("recall-echo: pipeline sync warning: {e}"),
408 }
409}
410
411#[cfg(feature = "graph")]
412fn read_opt_file(dir: &Path, name: &str) -> String {
413 fs::read_to_string(dir.join(name)).unwrap_or_default()
414}
415
416#[cfg(feature = "graph")]
417fn shellexpand_path(path: &str) -> String {
418 if let Some(rest) = path.strip_prefix("~/") {
419 if let Ok(home) = std::env::var("HOME") {
420 return format!("{home}/{rest}");
421 }
422 }
423 path.to_string()
424}
425
426pub fn archive_from_jsonl(
435 base_dir: &Path,
436 session_id: &str,
437 transcript_path: &str,
438) -> Result<u32, String> {
439 let conv = crate::jsonl::parse_transcript(transcript_path, session_id)?;
440 let summary = summarize::algorithmic_summary(&conv);
441 let result = archive_conversation(base_dir, &conv, &summary, "jsonl")?;
442 let log_number = result.log_number;
443
444 #[cfg(feature = "graph")]
445 {
446 graph_ingest(base_dir, &result);
447 pipeline_sync_on_archive(base_dir);
448 }
449
450 Ok(log_number)
451}
452
453pub fn run_from_hook() -> Result<(), String> {
456 let hook_input = crate::jsonl::read_hook_input()?;
457 let base_dir = crate::paths::claude_dir()?;
458 archive_from_jsonl(
459 &base_dir,
460 &hook_input.session_id,
461 &hook_input.transcript_path,
462 )?;
463 Ok(())
464}
465
466pub fn archive_all_unarchived() -> Result<(), String> {
468 let base = crate::paths::claude_dir()?;
469 archive_all_with_base(&base)
470}
471
472pub fn archive_all_with_base(base: &Path) -> Result<(), String> {
473 let conversations_dir = base.join("conversations");
474 if !conversations_dir.exists() {
475 return Err(
476 "conversations/ directory not found. Run `recall-echo init` first.".to_string(),
477 );
478 }
479
480 let archived_sessions = collect_archived_sessions(&conversations_dir);
481
482 let projects_dir = base.join("projects");
483 if !projects_dir.exists() {
484 eprintln!("No projects directory found \u{2014} nothing to archive.");
485 return Ok(());
486 }
487
488 let mut jsonl_files = find_jsonl_files(&projects_dir);
489 jsonl_files.sort_by_key(|p| {
490 fs::metadata(p)
491 .and_then(|m| m.modified())
492 .unwrap_or(std::time::SystemTime::UNIX_EPOCH)
493 });
494
495 let mut archived_count = 0;
496 let mut skipped_count = 0;
497
498 for jsonl_path in &jsonl_files {
499 let session_id = match jsonl_path.file_stem().and_then(|s| s.to_str()) {
500 Some(id) => id.to_string(),
501 None => continue,
502 };
503
504 if archived_sessions.contains(&session_id) {
505 skipped_count += 1;
506 continue;
507 }
508
509 let path_str = jsonl_path.to_string_lossy().to_string();
510 match archive_from_jsonl(base, &session_id, &path_str) {
511 Ok(_) => archived_count += 1,
512 Err(e) => {
513 eprintln!("recall-echo: skipping {} \u{2014} {}", session_id, e);
514 }
515 }
516 }
517
518 eprintln!(
519 "recall-echo: archived {archived_count} conversation{}, skipped {skipped_count} already archived",
520 if archived_count == 1 { "" } else { "s" }
521 );
522
523 Ok(())
524}
525
526fn collect_archived_sessions(conversations_dir: &Path) -> std::collections::HashSet<String> {
527 let mut sessions = std::collections::HashSet::new();
528 if let Ok(entries) = fs::read_dir(conversations_dir) {
529 for entry in entries.flatten() {
530 let name = entry.file_name();
531 let name = name.to_string_lossy();
532 if name.starts_with("conversation-") && name.ends_with(".md") {
533 if let Ok(content) = fs::read_to_string(entry.path()) {
534 for line in content.lines().take(15) {
535 if let Some(sid) = line.strip_prefix("session_id: ") {
536 sessions.insert(sid.trim().trim_matches('"').to_string());
537 break;
538 }
539 }
540 }
541 }
542 }
543 }
544 sessions
545}
546
547fn find_jsonl_files(dir: &Path) -> Vec<std::path::PathBuf> {
548 let mut files = Vec::new();
549 if let Ok(entries) = fs::read_dir(dir) {
550 for entry in entries.flatten() {
551 let path = entry.path();
552 if path.is_dir() {
553 files.extend(find_jsonl_files(&path));
554 } else if path.extension().is_some_and(|e| e == "jsonl") {
555 files.push(path);
556 }
557 }
558 }
559 files
560}
561
562#[cfg(feature = "pulse-null")]
571pub async fn archive_session(
572 memory_dir: &Path,
573 messages: &[pulse_system_types::llm::Message],
574 metadata: &SessionMetadata,
575 provider: Option<&dyn pulse_system_types::llm::LmProvider>,
576) -> Result<u32, String> {
577 let mut conv = crate::pulse_null::messages_to_conversation(messages, &metadata.session_id);
578 conv.first_timestamp = metadata.started_at.clone();
579 conv.last_timestamp = metadata.ended_at.clone();
580
581 let summary = summarize::extract_with_fallback(provider, &conv).await;
582 let result = archive_conversation(memory_dir, &conv, &summary, "session")?;
583 let log_number = result.log_number;
584
585 #[cfg(feature = "graph")]
587 if log_number > 0 {
588 if let Err(e) = crate::graph_bridge::ingest_into_graph(
589 memory_dir,
590 &result.full_content,
591 &result.session_id,
592 Some(log_number),
593 )
594 .await
595 {
596 eprintln!("recall-echo: graph ingestion warning: {e}");
597 }
598 pipeline_sync_on_archive_async(memory_dir).await;
599 }
600
601 Ok(log_number)
602}
603
604#[cfg(test)]
605mod tests {
606 use super::*;
607
608 #[test]
609 fn highest_from_empty_dir() {
610 let tmp = tempfile::tempdir().unwrap();
611 assert_eq!(highest_conversation_number(tmp.path()), 0);
612 }
613
614 #[test]
615 fn highest_from_sequential_files() {
616 let tmp = tempfile::tempdir().unwrap();
617 fs::write(tmp.path().join("conversation-001.md"), "").unwrap();
618 fs::write(tmp.path().join("conversation-002.md"), "").unwrap();
619 fs::write(tmp.path().join("conversation-003.md"), "").unwrap();
620 assert_eq!(highest_conversation_number(tmp.path()), 3);
621 }
622
623 #[test]
624 fn highest_with_gaps() {
625 let tmp = tempfile::tempdir().unwrap();
626 fs::write(tmp.path().join("conversation-001.md"), "").unwrap();
627 fs::write(tmp.path().join("conversation-010.md"), "").unwrap();
628 assert_eq!(highest_conversation_number(tmp.path()), 10);
629 }
630
631 #[test]
632 fn highest_ignores_non_matching() {
633 let tmp = tempfile::tempdir().unwrap();
634 fs::write(tmp.path().join("conversation-003.md"), "").unwrap();
635 fs::write(tmp.path().join("notes.md"), "").unwrap();
636 fs::write(tmp.path().join("conversation-bad.md"), "").unwrap();
637 assert_eq!(highest_conversation_number(tmp.path()), 3);
638 }
639
640 #[test]
641 fn append_index_creates_header_and_appends() {
642 let tmp = tempfile::tempdir().unwrap();
643 let index = tmp.path().join("ARCHIVE.md");
644
645 append_index(
646 &index,
647 1,
648 "2026-03-05",
649 "abc123",
650 &["auth".to_string()],
651 34,
652 "45m",
653 )
654 .unwrap();
655 append_index(
656 &index,
657 2,
658 "2026-03-05",
659 "def456",
660 &["ci".to_string(), "tests".to_string()],
661 22,
662 "20m",
663 )
664 .unwrap();
665
666 let content = fs::read_to_string(&index).unwrap();
667 assert!(content.contains("# Conversation Archive"));
668 assert!(content.contains("| 001 | 2026-03-05 | abc123 | auth | 34 | 45m |"));
669 assert!(content.contains("| 002 | 2026-03-05 | def456 | ci, tests | 22 | 20m |"));
670 }
671
672 #[test]
673 fn append_index_to_existing_file() {
674 let tmp = tempfile::tempdir().unwrap();
675 let index = tmp.path().join("ARCHIVE.md");
676 fs::write(
677 &index,
678 "# Conversation Archive\n\n| # | Date | Session | Topics | Messages | Duration |\n|---|------|---------|--------|----------|----------|\n| 001 | 2026-03-05 | abc | test | 10 | 5m |\n",
679 )
680 .unwrap();
681
682 append_index(&index, 2, "2026-03-05", "def", &[], 20, "10m").unwrap();
683
684 let content = fs::read_to_string(&index).unwrap();
685 assert!(content.contains("| 002 | 2026-03-05 | def | \u{2014} | 20 | 10m |"));
686 assert_eq!(content.matches("# Conversation Archive").count(), 1);
687 }
688
689 #[test]
690 fn archive_conversation_basic() {
691 let tmp = tempfile::tempdir().unwrap();
692 let memory = tmp.path();
693 fs::create_dir_all(memory.join("conversations")).unwrap();
694
695 let conv = Conversation {
696 session_id: "test-abc".to_string(),
697 first_timestamp: Some("2026-03-05T14:30:00Z".to_string()),
698 last_timestamp: Some("2026-03-05T15:00:00Z".to_string()),
699 user_message_count: 1,
700 assistant_message_count: 1,
701 entries: vec![
702 conversation::ConversationEntry::UserMessage("Let's build something".to_string()),
703 conversation::ConversationEntry::AssistantText("Sure, let's do it.".to_string()),
704 ],
705 };
706
707 let summary = summarize::ConversationSummary {
708 summary: "Built something cool".to_string(),
709 topics: vec!["building".to_string()],
710 decisions: vec![],
711 action_items: vec![],
712 };
713
714 let result = archive_conversation(memory, &conv, &summary, "test").unwrap();
715 assert_eq!(result.log_number, 1);
716 assert!(memory.join("conversations/conversation-001.md").exists());
717
718 let content = fs::read_to_string(memory.join("conversations/conversation-001.md")).unwrap();
719 assert!(content.contains("session_id: \"test-abc\""));
720 assert!(content.contains("source: \"test\""));
721 assert!(content.contains("Built something cool"));
722 }
723
724 #[test]
725 fn archive_conversation_skips_empty() {
726 let tmp = tempfile::tempdir().unwrap();
727 let memory = tmp.path();
728 fs::create_dir_all(memory.join("conversations")).unwrap();
729
730 let conv = Conversation::new("empty");
731 let summary = summarize::ConversationSummary::default();
732
733 let result = archive_conversation(memory, &conv, &summary, "test").unwrap();
734 assert_eq!(result.log_number, 0);
735 }
736}