1use std::path::{Path, PathBuf};
4
5use crate::graph::traverse::format_traversal;
6use crate::graph::types::*;
7use crate::graph::GraphMemory;
8
9const GREEN: &str = "\x1b[32m";
10const CYAN: &str = "\x1b[36m";
11const YELLOW: &str = "\x1b[33m";
12const BOLD: &str = "\x1b[1m";
13const DIM: &str = "\x1b[2m";
14const RESET: &str = "\x1b[0m";
15
16pub fn init(memory_dir: &Path) -> Result<(), String> {
18 let graph_dir = memory_dir.join("graph");
19 let rt = tokio::runtime::Runtime::new().map_err(|e| e.to_string())?;
20 rt.block_on(async {
21 GraphMemory::open(&graph_dir)
22 .await
23 .map_err(|e| e.to_string())?;
24 println!(
25 "{GREEN}✓{RESET} Graph store initialized at {}",
26 graph_dir.display()
27 );
28 Ok(())
29 })
30}
31
32pub fn graph_status(memory_dir: &Path) -> Result<(), String> {
34 let graph_dir = memory_dir.join("graph");
35 if !graph_dir.exists() {
36 return Err("Graph store not initialized. Run `recall-echo graph init` first.".into());
37 }
38 let rt = tokio::runtime::Runtime::new().map_err(|e| e.to_string())?;
39 rt.block_on(async {
40 let gm = GraphMemory::open(&graph_dir)
41 .await
42 .map_err(|e| e.to_string())?;
43 let stats = gm.stats().await.map_err(|e| e.to_string())?;
44
45 println!("{BOLD}Graph Memory Status{RESET}");
46 println!(" Entities: {}", stats.entity_count);
47 println!(" Relationships: {}", stats.relationship_count);
48 println!(" Episodes: {}", stats.episode_count);
49
50 if !stats.entity_type_counts.is_empty() {
51 println!("\n {DIM}By type:{RESET}");
52 let mut types: Vec<_> = stats.entity_type_counts.iter().collect();
53 types.sort_by(|a, b| b.1.cmp(a.1));
54 for (t, count) in types {
55 println!(" {t}: {count}");
56 }
57 }
58 Ok(())
59 })
60}
61
62pub fn add_entity(
64 memory_dir: &Path,
65 name: &str,
66 entity_type: &str,
67 abstract_text: &str,
68 overview: Option<&str>,
69 source: Option<&str>,
70) -> Result<(), String> {
71 let graph_dir = memory_dir.join("graph");
72 let et: EntityType = entity_type.parse().map_err(|e: String| e)?;
73
74 let rt = tokio::runtime::Runtime::new().map_err(|e| e.to_string())?;
75 rt.block_on(async {
76 let gm = GraphMemory::open(&graph_dir)
77 .await
78 .map_err(|e| e.to_string())?;
79
80 let entity = gm
81 .add_entity(NewEntity {
82 name: name.to_string(),
83 entity_type: et,
84 abstract_text: abstract_text.to_string(),
85 overview: overview.map(String::from),
86 content: None,
87 attributes: None,
88 source: source.map(String::from),
89 })
90 .await
91 .map_err(|e| e.to_string())?;
92
93 println!(
94 "{GREEN}✓{RESET} Created entity: {BOLD}{}{RESET} ({}) [{}]",
95 entity.name,
96 entity.entity_type,
97 entity.id_string()
98 );
99 Ok(())
100 })
101}
102
103pub fn relate(
105 memory_dir: &Path,
106 from: &str,
107 rel_type: &str,
108 to: &str,
109 description: Option<&str>,
110 source: Option<&str>,
111) -> Result<(), String> {
112 let graph_dir = memory_dir.join("graph");
113 let rt = tokio::runtime::Runtime::new().map_err(|e| e.to_string())?;
114 rt.block_on(async {
115 let gm = GraphMemory::open(&graph_dir)
116 .await
117 .map_err(|e| e.to_string())?;
118
119 let rel = gm
120 .add_relationship(NewRelationship {
121 from_entity: from.to_string(),
122 to_entity: to.to_string(),
123 rel_type: rel_type.to_string(),
124 description: description.map(String::from),
125 confidence: None,
126 source: source.map(String::from),
127 })
128 .await
129 .map_err(|e| e.to_string())?;
130
131 println!(
132 "{GREEN}✓{RESET} {from} {CYAN}—[{rel_type}]→{RESET} {to} [{}]",
133 rel.id_string()
134 );
135 Ok(())
136 })
137}
138
139pub fn search(
141 memory_dir: &Path,
142 query: &str,
143 limit: usize,
144 entity_type: Option<&str>,
145 keyword: Option<&str>,
146) -> Result<(), String> {
147 let graph_dir = memory_dir.join("graph");
148 let rt = tokio::runtime::Runtime::new().map_err(|e| e.to_string())?;
149 rt.block_on(async {
150 let gm = GraphMemory::open(&graph_dir)
151 .await
152 .map_err(|e| e.to_string())?;
153
154 let options = SearchOptions {
155 limit,
156 entity_type: entity_type.map(String::from),
157 keyword: keyword.map(String::from),
158 };
159
160 let results = gm
161 .search_with_options(query, &options)
162 .await
163 .map_err(|e| e.to_string())?;
164
165 if results.is_empty() {
166 println!("{YELLOW}No results.{RESET}");
167 return Ok(());
168 }
169
170 for (i, r) in results.iter().enumerate() {
171 println!(
172 "{BOLD}{}. {}{RESET} ({}) — score: {:.3}",
173 i + 1,
174 r.entity.name,
175 r.entity.entity_type,
176 r.score
177 );
178 println!(" {DIM}{}{RESET}", r.entity.abstract_text);
179 }
180 Ok(())
181 })
182}
183
184pub fn ingest(memory_dir: &Path, archive_path: &Path) -> Result<(), String> {
186 let graph_dir = memory_dir.join("graph");
187 if !graph_dir.exists() {
188 return Err("Graph store not initialized. Run `recall-echo graph init` first.".into());
189 }
190
191 let content = std::fs::read_to_string(archive_path)
192 .map_err(|e| format!("Failed to read {}: {e}", archive_path.display()))?;
193
194 let (session_id, log_number) = extract_archive_metadata(&content, archive_path);
196
197 let rt = tokio::runtime::Runtime::new().map_err(|e| e.to_string())?;
198 rt.block_on(async {
199 let gm = GraphMemory::open(&graph_dir)
200 .await
201 .map_err(|e| e.to_string())?;
202
203 let report = gm
204 .ingest_archive(&content, &session_id, log_number, None)
205 .await
206 .map_err(|e| e.to_string())?;
207
208 println!(
209 "{GREEN}✓{RESET} Ingested {}: {} episodes created",
210 archive_path.display(),
211 report.episodes_created
212 );
213 if !report.errors.is_empty() {
214 for err in &report.errors {
215 println!(" {YELLOW}warning:{RESET} {err}");
216 }
217 }
218 Ok(())
219 })
220}
221
222pub fn ingest_all(memory_dir: &Path) -> Result<(), String> {
224 let graph_dir = memory_dir.join("graph");
225 if !graph_dir.exists() {
226 return Err("Graph store not initialized. Run `recall-echo graph init` first.".into());
227 }
228
229 let conversations_dir = find_conversations_dir(memory_dir)?;
230
231 let mut files: Vec<_> = std::fs::read_dir(&conversations_dir)
233 .map_err(|e| e.to_string())?
234 .filter_map(|e| e.ok())
235 .filter(|e| {
236 let name = e.file_name().to_string_lossy().to_string();
237 name.starts_with("conversation-") || name.starts_with("archive-log-")
238 })
239 .collect();
240 files.sort_by_key(|e| e.file_name());
241
242 if files.is_empty() {
243 println!("{YELLOW}No conversation archives found.{RESET}");
244 return Ok(());
245 }
246
247 let rt = tokio::runtime::Runtime::new().map_err(|e| e.to_string())?;
248 rt.block_on(async {
249 let gm = GraphMemory::open(&graph_dir)
250 .await
251 .map_err(|e| e.to_string())?;
252
253 let mut total_episodes = 0u32;
254 let mut ingested = 0u32;
255 let mut skipped = 0u32;
256
257 for entry in &files {
258 let path = entry.path();
259 let content = std::fs::read_to_string(&path).map_err(|e| e.to_string())?;
260
261 let (session_id, log_number) = extract_archive_metadata(&content, &path);
262
263 if let Some(ln) = log_number {
265 if let Ok(Some(_)) = gm.get_episode_by_log_number(ln).await {
266 skipped += 1;
267 continue;
268 }
269 }
270
271 let report = gm
272 .ingest_archive(&content, &session_id, log_number, None)
273 .await
274 .map_err(|e| e.to_string())?;
275
276 total_episodes += report.episodes_created;
277 ingested += 1;
278
279 println!(
280 " {GREEN}✓{RESET} {} — {} episodes",
281 path.file_name().unwrap_or_default().to_string_lossy(),
282 report.episodes_created
283 );
284 }
285
286 println!(
287 "\n{GREEN}✓{RESET} Ingested {ingested} archives ({total_episodes} episodes), skipped {skipped} already ingested"
288 );
289 Ok(())
290 })
291}
292
293fn extract_archive_metadata(content: &str, path: &Path) -> (String, Option<u32>) {
295 let mut session_id = "unknown".to_string();
296 let mut log_number: Option<u32> = None;
297
298 if let Some(name) = path.file_stem().and_then(|s| s.to_str()) {
300 let num_str = name
301 .strip_prefix("conversation-")
302 .or_else(|| name.strip_prefix("archive-log-"));
303 if let Some(num_str) = num_str {
304 if let Ok(n) = num_str.parse::<u32>() {
305 log_number = Some(n);
306 }
307 }
308 }
309
310 if let Some(stripped) = content.strip_prefix("---") {
312 if let Some(end) = stripped.find("---") {
313 let frontmatter = &stripped[..end];
314 for line in frontmatter.lines() {
315 let line = line.trim();
316 if let Some(val) = line.strip_prefix("session_id:") {
317 session_id = val.trim().trim_matches('"').to_string();
318 }
319 }
320 }
321 }
322
323 (session_id, log_number)
324}
325
326pub fn traverse(
328 memory_dir: &Path,
329 entity_name: &str,
330 depth: u32,
331 type_filter: Option<&str>,
332) -> Result<(), String> {
333 let graph_dir = memory_dir.join("graph");
334 let rt = tokio::runtime::Runtime::new().map_err(|e| e.to_string())?;
335 rt.block_on(async {
336 let gm = GraphMemory::open(&graph_dir)
337 .await
338 .map_err(|e| e.to_string())?;
339
340 let tree = gm
341 .traverse_filtered(entity_name, depth, type_filter)
342 .await
343 .map_err(|e| e.to_string())?;
344
345 let output = format_traversal(&tree, 0);
346 print!("{output}");
347 Ok(())
348 })
349}
350
351pub fn hybrid_query(
353 memory_dir: &Path,
354 query: &str,
355 limit: usize,
356 entity_type: Option<&str>,
357 keyword: Option<&str>,
358 depth: u32,
359 episodes: bool,
360) -> Result<(), String> {
361 let graph_dir = memory_dir.join("graph");
362 let rt = tokio::runtime::Runtime::new().map_err(|e| e.to_string())?;
363 rt.block_on(async {
364 let gm = GraphMemory::open(&graph_dir)
365 .await
366 .map_err(|e| e.to_string())?;
367
368 let options = QueryOptions {
369 limit,
370 entity_type: entity_type.map(String::from),
371 keyword: keyword.map(String::from),
372 graph_depth: depth,
373 include_episodes: episodes,
374 };
375
376 let result = gm.query(query, &options).await.map_err(|e| e.to_string())?;
377
378 if result.entities.is_empty() && result.episodes.is_empty() {
379 println!("{YELLOW}No results.{RESET}");
380 return Ok(());
381 }
382
383 if !result.entities.is_empty() {
384 println!("{BOLD}Entities:{RESET}");
385 for (i, r) in result.entities.iter().enumerate() {
386 let source_tag = match &r.source {
387 MatchSource::Semantic => "semantic".to_string(),
388 MatchSource::Graph { parent, rel_type } => {
389 format!("graph: {parent} —[{rel_type}]")
390 }
391 MatchSource::Keyword => "keyword".to_string(),
392 };
393 println!(
394 " {BOLD}{}. {}{RESET} ({}) — {:.3} [{DIM}{source_tag}{RESET}]",
395 i + 1,
396 r.entity.name,
397 r.entity.entity_type,
398 r.score
399 );
400 println!(" {DIM}{}{RESET}", r.entity.abstract_text);
401 }
402 }
403
404 if !result.episodes.is_empty() {
405 println!("\n{BOLD}Episodes:{RESET}");
406 for (i, ep) in result.episodes.iter().enumerate() {
407 let log = ep
408 .episode
409 .log_number
410 .map(|n| format!("#{n}"))
411 .unwrap_or_default();
412 println!(
413 " {BOLD}{}. {}{RESET} ({}) — {:.3}",
414 i + 1,
415 ep.episode.session_id,
416 log,
417 ep.score
418 );
419 println!(" {DIM}{}{RESET}", ep.episode.abstract_text);
420 }
421 }
422
423 Ok(())
424 })
425}
426
427#[cfg(feature = "llm")]
429pub fn extract(
430 memory_dir: &Path,
431 log: Option<u32>,
432 all: bool,
433 dry_run: bool,
434 model_override: Option<String>,
435 provider_override: Option<String>,
436 delay_ms: u64,
437) -> Result<(), String> {
438 let graph_dir = memory_dir.join("graph");
439 if !graph_dir.exists() {
440 return Err("Graph store not initialized. Run `recall-echo graph init` first.".into());
441 }
442
443 let rt = tokio::runtime::Runtime::new().map_err(|e| e.to_string())?;
444 rt.block_on(async {
445 let gm = GraphMemory::open(&graph_dir)
446 .await
447 .map_err(|e| e.to_string())?;
448
449 let log_numbers: Vec<u32> = if let Some(ln) = log {
451 vec![ln]
452 } else if all {
453 gm.unextracted_log_numbers()
454 .await
455 .map_err(|e| e.to_string())?
456 .into_iter()
457 .map(|n| n as u32)
458 .collect()
459 } else {
460 return Err("Specify --log <N> or --all".into());
461 };
462
463 if log_numbers.is_empty() {
464 println!("{YELLOW}No unextracted archives found.{RESET}");
465 return Ok(());
466 }
467
468 let conversations_dir = find_conversations_dir(memory_dir)?;
470
471 if dry_run {
472 println!(
473 "{BOLD}Dry run — {}{RESET} archives to extract",
474 log_numbers.len()
475 );
476 for ln in &log_numbers {
477 let path = find_archive_file(&conversations_dir, *ln);
478 let label = match &path {
479 Ok(p) => {
480 p.file_name()
481 .unwrap_or_default()
482 .to_string_lossy()
483 .to_string()
484 }
485 Err(_) => format!("log {ln:03} (file not found)"),
486 };
487 println!(" {label}");
488 }
489 return Ok(());
490 }
491
492 let (llm, model_name) = crate::llm_provider::create_provider(
494 memory_dir,
495 provider_override.as_deref(),
496 model_override.as_deref(),
497 )?;
498
499 println!(
500 "{BOLD}Extracting entities from {} archives using {model_name}{RESET}",
501 log_numbers.len(),
502 );
503
504 let mut total_entities_created = 0u32;
505 let mut total_entities_merged = 0u32;
506 let mut total_entities_skipped = 0u32;
507 let mut total_relationships = 0u32;
508 let mut total_errors = Vec::new();
509 let mut processed = 0u32;
510
511 for ln in &log_numbers {
512 let archive_path = match find_archive_file(&conversations_dir, *ln) {
513 Ok(p) => p,
514 Err(e) => {
515 println!(" {YELLOW}⚠{RESET} log {ln:03}: {e}");
516 total_errors.push(format!("log {ln:03}: {e}"));
517 continue;
518 }
519 };
520
521 let content = std::fs::read_to_string(&archive_path)
522 .map_err(|e| format!("read {}: {e}", archive_path.display()))?;
523
524 let (session_id, _) = extract_archive_metadata(&content, &archive_path);
525
526 let report = gm
527 .extract_from_archive(&content, &session_id, Some(*ln), &*llm)
528 .await
529 .map_err(|e| format!("extraction log {ln:03}: {e}"))?;
530
531 println!(
532 " {GREEN}✓{RESET} log {ln:03}: +{} entities, ~{} merged, -{} skipped, {} rels",
533 report.entities_created,
534 report.entities_merged,
535 report.entities_skipped,
536 report.relationships_created,
537 );
538
539 gm.mark_extracted(*ln).await.map_err(|e| e.to_string())?;
540
541 total_entities_created += report.entities_created;
542 total_entities_merged += report.entities_merged;
543 total_entities_skipped += report.entities_skipped;
544 total_relationships += report.relationships_created;
545 total_errors.extend(report.errors);
546 processed += 1;
547
548 if delay_ms > 0 && *ln != *log_numbers.last().unwrap() {
550 tokio::time::sleep(std::time::Duration::from_millis(delay_ms)).await;
551 }
552 }
553
554 println!(
555 "\n{GREEN}✓{RESET} Done: {processed} archives — +{total_entities_created} created, ~{total_entities_merged} merged, -{total_entities_skipped} skipped, {total_relationships} relationships"
556 );
557
558 if !total_errors.is_empty() {
559 println!("\n{YELLOW}Warnings ({}):{RESET}", total_errors.len());
560 for err in total_errors.iter().take(10) {
561 println!(" {DIM}{err}{RESET}");
562 }
563 if total_errors.len() > 10 {
564 println!(" {DIM}... and {} more{RESET}", total_errors.len() - 10);
565 }
566 }
567
568 Ok(())
569 })
570}
571
572pub fn vigil_sync(
576 memory_dir: &Path,
577 signals_path: Option<&Path>,
578 outcomes_path: Option<&Path>,
579) -> Result<(), String> {
580 let graph_dir = memory_dir.join("graph");
581 if !graph_dir.exists() {
582 return Err("Graph store not initialized. Run `recall-echo graph init` first.".into());
583 }
584
585 let entity_root = memory_dir
587 .parent()
588 .unwrap_or(memory_dir);
589
590 let default_signals = entity_root.join("vigil").join("signals.json");
591 let default_outcomes = entity_root.join("caliber").join("outcomes.json");
592
593 let sig_path = signals_path.unwrap_or(&default_signals);
594 let out_path = outcomes_path.unwrap_or(&default_outcomes);
595
596 let rt = tokio::runtime::Runtime::new().map_err(|e| e.to_string())?;
597 rt.block_on(async {
598 let gm = GraphMemory::open(&graph_dir)
599 .await
600 .map_err(|e| e.to_string())?;
601
602 let report = gm
603 .sync_vigil(sig_path, out_path)
604 .await
605 .map_err(|e| e.to_string())?;
606
607 println!("{BOLD}Vigil Sync{RESET}");
608 println!(" Measurements: +{}", report.measurements_created);
609 println!(" Outcomes: +{}", report.outcomes_created);
610 println!(" Relationships: +{}", report.relationships_created);
611 println!(" Skipped: {}", report.skipped);
612
613 if !report.errors.is_empty() {
614 println!("\n {YELLOW}Warnings:{RESET}");
615 for err in &report.errors {
616 println!(" {DIM}{err}{RESET}");
617 }
618 }
619
620 if report.measurements_created == 0
621 && report.outcomes_created == 0
622 {
623 println!("\n {DIM}No new data — graph is in sync.{RESET}");
624 }
625
626 Ok(())
627 })
628}
629
630pub fn pipeline_sync(memory_dir: &Path, docs_dir_override: Option<&Path>) -> Result<(), String> {
634 let graph_dir = memory_dir.join("graph");
635 if !graph_dir.exists() {
636 return Err("Graph store not initialized. Run `recall-echo graph init` first.".into());
637 }
638
639 let docs_dir = if let Some(d) = docs_dir_override {
641 d.to_path_buf()
642 } else {
643 let cfg = crate::config::load_from_dir(memory_dir);
644 match cfg.pipeline.and_then(|p| p.docs_dir) {
645 Some(d) => {
646 let path = PathBuf::from(shellexpand(&d));
647 if !path.exists() {
648 return Err(format!(
649 "Configured docs_dir does not exist: {}",
650 path.display()
651 ));
652 }
653 path
654 }
655 None => {
656 return Err(
657 "No docs directory specified. Use --docs-dir or set [pipeline] docs_dir in config.".into(),
658 );
659 }
660 }
661 };
662
663 let docs = read_pipeline_docs(&docs_dir)?;
665
666 let rt = tokio::runtime::Runtime::new().map_err(|e| e.to_string())?;
667 rt.block_on(async {
668 let gm = GraphMemory::open(&graph_dir)
669 .await
670 .map_err(|e| e.to_string())?;
671
672 let report = gm.sync_pipeline(&docs).await.map_err(|e| e.to_string())?;
673
674 println!("{BOLD}Pipeline Sync{RESET}");
675 println!(" Created: {}", report.entities_created);
676 println!(" Updated: {}", report.entities_updated);
677 println!(" Archived: {}", report.entities_archived);
678 println!(
679 " Relationships: +{} / ~{} skipped",
680 report.relationships_created, report.relationships_skipped
681 );
682
683 if !report.errors.is_empty() {
684 println!("\n {YELLOW}Warnings:{RESET}");
685 for err in &report.errors {
686 println!(" {DIM}{err}{RESET}");
687 }
688 }
689
690 if report.entities_created == 0
691 && report.entities_updated == 0
692 && report.entities_archived == 0
693 {
694 println!("\n {DIM}No changes — graph is in sync.{RESET}");
695 }
696
697 Ok(())
698 })
699}
700
701pub fn pipeline_status(memory_dir: &Path, staleness_days: u32) -> Result<(), String> {
703 let graph_dir = memory_dir.join("graph");
704 if !graph_dir.exists() {
705 return Err("Graph store not initialized. Run `recall-echo graph init` first.".into());
706 }
707
708 let rt = tokio::runtime::Runtime::new().map_err(|e| e.to_string())?;
709 rt.block_on(async {
710 let gm = GraphMemory::open(&graph_dir)
711 .await
712 .map_err(|e| e.to_string())?;
713
714 let stats = gm
715 .pipeline_stats(staleness_days)
716 .await
717 .map_err(|e| e.to_string())?;
718
719 println!(
720 "{BOLD}Pipeline Status{RESET} ({} entities)",
721 stats.total_entities
722 );
723
724 if stats.by_stage.is_empty() {
725 println!(
726 " {DIM}No pipeline entities in graph. Run `graph pipeline sync` first.{RESET}"
727 );
728 return Ok(());
729 }
730
731 let stage_order = ["learning", "thoughts", "curiosity", "reflections", "praxis"];
733 for stage in &stage_order {
734 if let Some(statuses) = stats.by_stage.get(*stage) {
735 println!("\n {CYAN}{}{RESET}", stage.to_uppercase());
736 let mut items: Vec<_> = statuses.iter().collect();
737 items.sort_by_key(|(s, _)| (*s).clone());
738 for (status, count) in items {
739 println!(" {status}: {count}");
740 }
741 }
742 }
743
744 if !stats.stale_thoughts.is_empty() {
745 println!("\n {YELLOW}Stale thoughts (>{staleness_days}d):{RESET}");
746 for entity in &stats.stale_thoughts {
747 println!(" {DIM}•{RESET} {}", entity.name);
748 }
749 }
750
751 if !stats.stale_questions.is_empty() {
752 println!(
753 "\n {YELLOW}Stale questions (>{}d):{RESET}",
754 staleness_days * 2
755 );
756 for entity in &stats.stale_questions {
757 println!(" {DIM}•{RESET} {}", entity.name);
758 }
759 }
760
761 if let Some(ref last) = stats.last_movement {
762 println!("\n {DIM}Last movement: {last}{RESET}");
763 }
764
765 Ok(())
766 })
767}
768
769pub fn pipeline_flow(memory_dir: &Path, entity_name: &str) -> Result<(), String> {
771 let graph_dir = memory_dir.join("graph");
772 if !graph_dir.exists() {
773 return Err("Graph store not initialized. Run `recall-echo graph init` first.".into());
774 }
775
776 let rt = tokio::runtime::Runtime::new().map_err(|e| e.to_string())?;
777 rt.block_on(async {
778 let gm = GraphMemory::open(&graph_dir)
779 .await
780 .map_err(|e| e.to_string())?;
781
782 let chain = gm
783 .pipeline_flow(entity_name)
784 .await
785 .map_err(|e| e.to_string())?;
786
787 if chain.is_empty() {
788 println!("{YELLOW}No pipeline relationships found for \"{entity_name}\".{RESET}");
789 return Ok(());
790 }
791
792 println!("{BOLD}Pipeline Flow: {entity_name}{RESET}\n");
793 for (source, rel_type, target) in &chain {
794 println!(
795 " {} ({}) {CYAN}—[{rel_type}]→{RESET} {} ({})",
796 source.name, source.entity_type, target.name, target.entity_type
797 );
798 }
799
800 Ok(())
801 })
802}
803
804pub fn pipeline_stale(memory_dir: &Path, staleness_days: u32) -> Result<(), String> {
806 let graph_dir = memory_dir.join("graph");
807 if !graph_dir.exists() {
808 return Err("Graph store not initialized. Run `recall-echo graph init` first.".into());
809 }
810
811 let rt = tokio::runtime::Runtime::new().map_err(|e| e.to_string())?;
812 rt.block_on(async {
813 let gm = GraphMemory::open(&graph_dir)
814 .await
815 .map_err(|e| e.to_string())?;
816
817 let stats = gm
818 .pipeline_stats(staleness_days)
819 .await
820 .map_err(|e| e.to_string())?;
821
822 let total_stale = stats.stale_thoughts.len() + stats.stale_questions.len();
823 if total_stale == 0 {
824 println!("{GREEN}✓{RESET} No stale pipeline entities.");
825 return Ok(());
826 }
827
828 println!("{BOLD}Stale Pipeline Entities{RESET}\n");
829
830 if !stats.stale_thoughts.is_empty() {
831 println!(" {YELLOW}Thoughts (>{staleness_days} days):{RESET}");
832 for entity in &stats.stale_thoughts {
833 println!(" • {} {DIM}({}){RESET}", entity.name, entity.entity_type);
834 }
835 }
836
837 if !stats.stale_questions.is_empty() {
838 println!(" {YELLOW}Questions (>{} days):{RESET}", staleness_days * 2);
839 for entity in &stats.stale_questions {
840 println!(" • {} {DIM}({}){RESET}", entity.name, entity.entity_type);
841 }
842 }
843
844 Ok(())
845 })
846}
847
848fn read_pipeline_docs(dir: &Path) -> Result<PipelineDocuments, String> {
850 let read_or_empty = |name: &str| -> String {
851 let path = dir.join(name);
852 std::fs::read_to_string(&path).unwrap_or_default()
853 };
854
855 Ok(PipelineDocuments {
856 learning: read_or_empty("LEARNING.md"),
857 thoughts: read_or_empty("THOUGHTS.md"),
858 curiosity: read_or_empty("CURIOSITY.md"),
859 reflections: read_or_empty("REFLECTIONS.md"),
860 praxis: read_or_empty("PRAXIS.md"),
861 })
862}
863
864fn shellexpand(path: &str) -> String {
866 if let Some(rest) = path.strip_prefix("~/") {
867 if let Some(home) = dirs::home_dir() {
868 return home.join(rest).to_string_lossy().to_string();
869 }
870 }
871 path.to_string()
872}
873
874fn find_conversations_dir(memory_dir: &Path) -> Result<PathBuf, String> {
876 let conv = memory_dir.join("conversations");
877 if conv.exists() {
878 return Ok(conv);
879 }
880 if let Some(parent) = memory_dir.parent() {
881 let parent_conv = parent.join("conversations");
882 if parent_conv.exists() {
883 return Ok(parent_conv);
884 }
885 }
886 Err("conversations/ directory not found".into())
887}
888
889#[cfg(feature = "llm")]
891fn find_archive_file(conversations_dir: &Path, log_number: u32) -> Result<PathBuf, String> {
892 let patterns = [
894 format!("conversation-{log_number:03}.md"),
895 format!("conversation-{log_number}.md"),
896 format!("archive-log-{log_number:03}.md"),
897 format!("archive-log-{log_number}.md"),
898 ];
899
900 for name in &patterns {
901 let path = conversations_dir.join(name);
902 if path.exists() {
903 return Ok(path);
904 }
905 }
906
907 Err(format!("no archive file for log {log_number:03}"))
908}