1use std::path::{Path, PathBuf};
4
5use crate::graph::traverse::format_traversal;
6use crate::graph::types::*;
7use crate::graph::GraphMemory;
8
9const GREEN: &str = "\x1b[32m";
10const CYAN: &str = "\x1b[36m";
11const YELLOW: &str = "\x1b[33m";
12const BOLD: &str = "\x1b[1m";
13const DIM: &str = "\x1b[2m";
14const RESET: &str = "\x1b[0m";
15
16pub fn init(memory_dir: &Path) -> Result<(), String> {
18 let graph_dir = memory_dir.join("graph");
19 let rt = tokio::runtime::Runtime::new().map_err(|e| e.to_string())?;
20 rt.block_on(async {
21 GraphMemory::open(&graph_dir)
22 .await
23 .map_err(|e| e.to_string())?;
24 println!(
25 "{GREEN}✓{RESET} Graph store initialized at {}",
26 graph_dir.display()
27 );
28 Ok(())
29 })
30}
31
32pub fn graph_status(memory_dir: &Path) -> Result<(), String> {
34 let graph_dir = memory_dir.join("graph");
35 if !graph_dir.exists() {
36 return Err("Graph store not initialized. Run `recall-echo graph init` first.".into());
37 }
38 let rt = tokio::runtime::Runtime::new().map_err(|e| e.to_string())?;
39 rt.block_on(async {
40 let gm = GraphMemory::open(&graph_dir)
41 .await
42 .map_err(|e| e.to_string())?;
43 let stats = gm.stats().await.map_err(|e| e.to_string())?;
44
45 println!("{BOLD}Graph Memory Status{RESET}");
46 println!(" Entities: {}", stats.entity_count);
47 println!(" Relationships: {}", stats.relationship_count);
48 println!(" Episodes: {}", stats.episode_count);
49
50 if !stats.entity_type_counts.is_empty() {
51 println!("\n {DIM}By type:{RESET}");
52 let mut types: Vec<_> = stats.entity_type_counts.iter().collect();
53 types.sort_by(|a, b| b.1.cmp(a.1));
54 for (t, count) in types {
55 println!(" {t}: {count}");
56 }
57 }
58 Ok(())
59 })
60}
61
62pub fn add_entity(
64 memory_dir: &Path,
65 name: &str,
66 entity_type: &str,
67 abstract_text: &str,
68 overview: Option<&str>,
69 source: Option<&str>,
70) -> Result<(), String> {
71 let graph_dir = memory_dir.join("graph");
72 let et: EntityType = entity_type.parse().map_err(|e: String| e)?;
73
74 let rt = tokio::runtime::Runtime::new().map_err(|e| e.to_string())?;
75 rt.block_on(async {
76 let gm = GraphMemory::open(&graph_dir)
77 .await
78 .map_err(|e| e.to_string())?;
79
80 let entity = gm
81 .add_entity(NewEntity {
82 name: name.to_string(),
83 entity_type: et,
84 abstract_text: abstract_text.to_string(),
85 overview: overview.map(String::from),
86 content: None,
87 attributes: None,
88 source: source.map(String::from),
89 })
90 .await
91 .map_err(|e| e.to_string())?;
92
93 println!(
94 "{GREEN}✓{RESET} Created entity: {BOLD}{}{RESET} ({}) [{}]",
95 entity.name,
96 entity.entity_type,
97 entity.id_string()
98 );
99 Ok(())
100 })
101}
102
103pub fn relate(
105 memory_dir: &Path,
106 from: &str,
107 rel_type: &str,
108 to: &str,
109 description: Option<&str>,
110 source: Option<&str>,
111) -> Result<(), String> {
112 let graph_dir = memory_dir.join("graph");
113 let rt = tokio::runtime::Runtime::new().map_err(|e| e.to_string())?;
114 rt.block_on(async {
115 let gm = GraphMemory::open(&graph_dir)
116 .await
117 .map_err(|e| e.to_string())?;
118
119 let rel = gm
120 .add_relationship(NewRelationship {
121 from_entity: from.to_string(),
122 to_entity: to.to_string(),
123 rel_type: rel_type.to_string(),
124 description: description.map(String::from),
125 confidence: None,
126 source: source.map(String::from),
127 })
128 .await
129 .map_err(|e| e.to_string())?;
130
131 println!(
132 "{GREEN}✓{RESET} {from} {CYAN}—[{rel_type}]→{RESET} {to} [{}]",
133 rel.id_string()
134 );
135 Ok(())
136 })
137}
138
139pub fn search(
141 memory_dir: &Path,
142 query: &str,
143 limit: usize,
144 entity_type: Option<&str>,
145 keyword: Option<&str>,
146) -> Result<(), String> {
147 let graph_dir = memory_dir.join("graph");
148 let rt = tokio::runtime::Runtime::new().map_err(|e| e.to_string())?;
149 rt.block_on(async {
150 let gm = GraphMemory::open(&graph_dir)
151 .await
152 .map_err(|e| e.to_string())?;
153
154 let options = SearchOptions {
155 limit,
156 entity_type: entity_type.map(String::from),
157 keyword: keyword.map(String::from),
158 };
159
160 let results = gm
161 .search_with_options(query, &options)
162 .await
163 .map_err(|e| e.to_string())?;
164
165 if results.is_empty() {
166 println!("{YELLOW}No results.{RESET}");
167 return Ok(());
168 }
169
170 for (i, r) in results.iter().enumerate() {
171 println!(
172 "{BOLD}{}. {}{RESET} ({}) — score: {:.3}",
173 i + 1,
174 r.entity.name,
175 r.entity.entity_type,
176 r.score
177 );
178 println!(" {DIM}{}{RESET}", r.entity.abstract_text);
179 }
180 Ok(())
181 })
182}
183
184pub fn ingest(memory_dir: &Path, archive_path: &Path) -> Result<(), String> {
186 let graph_dir = memory_dir.join("graph");
187 if !graph_dir.exists() {
188 return Err("Graph store not initialized. Run `recall-echo graph init` first.".into());
189 }
190
191 let content = std::fs::read_to_string(archive_path)
192 .map_err(|e| format!("Failed to read {}: {e}", archive_path.display()))?;
193
194 let (session_id, log_number) = extract_archive_metadata(&content, archive_path);
196
197 let rt = tokio::runtime::Runtime::new().map_err(|e| e.to_string())?;
198 rt.block_on(async {
199 let gm = GraphMemory::open(&graph_dir)
200 .await
201 .map_err(|e| e.to_string())?;
202
203 let report = gm
204 .ingest_archive(&content, &session_id, log_number, None)
205 .await
206 .map_err(|e| e.to_string())?;
207
208 println!(
209 "{GREEN}✓{RESET} Ingested {}: {} episodes created",
210 archive_path.display(),
211 report.episodes_created
212 );
213 if !report.errors.is_empty() {
214 for err in &report.errors {
215 println!(" {YELLOW}warning:{RESET} {err}");
216 }
217 }
218 Ok(())
219 })
220}
221
222pub fn ingest_all(memory_dir: &Path) -> Result<(), String> {
224 let graph_dir = memory_dir.join("graph");
225 if !graph_dir.exists() {
226 return Err("Graph store not initialized. Run `recall-echo graph init` first.".into());
227 }
228
229 let conversations_dir = find_conversations_dir(memory_dir)?;
230
231 let mut files: Vec<_> = std::fs::read_dir(&conversations_dir)
233 .map_err(|e| e.to_string())?
234 .filter_map(|e| e.ok())
235 .filter(|e| {
236 let name = e.file_name().to_string_lossy().to_string();
237 name.starts_with("conversation-") || name.starts_with("archive-log-")
238 })
239 .collect();
240 files.sort_by_key(|e| e.file_name());
241
242 if files.is_empty() {
243 println!("{YELLOW}No conversation archives found.{RESET}");
244 return Ok(());
245 }
246
247 let rt = tokio::runtime::Runtime::new().map_err(|e| e.to_string())?;
248 rt.block_on(async {
249 let gm = GraphMemory::open(&graph_dir)
250 .await
251 .map_err(|e| e.to_string())?;
252
253 let mut total_episodes = 0u32;
254 let mut ingested = 0u32;
255 let mut skipped = 0u32;
256
257 for entry in &files {
258 let path = entry.path();
259 let content = std::fs::read_to_string(&path).map_err(|e| e.to_string())?;
260
261 let (session_id, log_number) = extract_archive_metadata(&content, &path);
262
263 if let Some(ln) = log_number {
265 if let Ok(Some(_)) = gm.get_episode_by_log_number(ln).await {
266 skipped += 1;
267 continue;
268 }
269 }
270
271 let report = gm
272 .ingest_archive(&content, &session_id, log_number, None)
273 .await
274 .map_err(|e| e.to_string())?;
275
276 total_episodes += report.episodes_created;
277 ingested += 1;
278
279 println!(
280 " {GREEN}✓{RESET} {} — {} episodes",
281 path.file_name().unwrap_or_default().to_string_lossy(),
282 report.episodes_created
283 );
284 }
285
286 println!(
287 "\n{GREEN}✓{RESET} Ingested {ingested} archives ({total_episodes} episodes), skipped {skipped} already ingested"
288 );
289 Ok(())
290 })
291}
292
293fn extract_archive_metadata(content: &str, path: &Path) -> (String, Option<u32>) {
295 let mut session_id = "unknown".to_string();
296 let mut log_number: Option<u32> = None;
297
298 if let Some(name) = path.file_stem().and_then(|s| s.to_str()) {
300 let num_str = name
301 .strip_prefix("conversation-")
302 .or_else(|| name.strip_prefix("archive-log-"));
303 if let Some(num_str) = num_str {
304 if let Ok(n) = num_str.parse::<u32>() {
305 log_number = Some(n);
306 }
307 }
308 }
309
310 if let Some(stripped) = content.strip_prefix("---") {
312 if let Some(end) = stripped.find("---") {
313 let frontmatter = &stripped[..end];
314 for line in frontmatter.lines() {
315 let line = line.trim();
316 if let Some(val) = line.strip_prefix("session_id:") {
317 session_id = val.trim().trim_matches('"').to_string();
318 }
319 }
320 }
321 }
322
323 (session_id, log_number)
324}
325
326pub fn traverse(
328 memory_dir: &Path,
329 entity_name: &str,
330 depth: u32,
331 type_filter: Option<&str>,
332) -> Result<(), String> {
333 let graph_dir = memory_dir.join("graph");
334 let rt = tokio::runtime::Runtime::new().map_err(|e| e.to_string())?;
335 rt.block_on(async {
336 let gm = GraphMemory::open(&graph_dir)
337 .await
338 .map_err(|e| e.to_string())?;
339
340 let tree = gm
341 .traverse_filtered(entity_name, depth, type_filter)
342 .await
343 .map_err(|e| e.to_string())?;
344
345 let output = format_traversal(&tree, 0);
346 print!("{output}");
347 Ok(())
348 })
349}
350
351pub fn hybrid_query(
353 memory_dir: &Path,
354 query: &str,
355 limit: usize,
356 entity_type: Option<&str>,
357 keyword: Option<&str>,
358 depth: u32,
359 episodes: bool,
360) -> Result<(), String> {
361 let graph_dir = memory_dir.join("graph");
362 let rt = tokio::runtime::Runtime::new().map_err(|e| e.to_string())?;
363 rt.block_on(async {
364 let gm = GraphMemory::open(&graph_dir)
365 .await
366 .map_err(|e| e.to_string())?;
367
368 let options = QueryOptions {
369 limit,
370 entity_type: entity_type.map(String::from),
371 keyword: keyword.map(String::from),
372 graph_depth: depth,
373 include_episodes: episodes,
374 };
375
376 let result = gm.query(query, &options).await.map_err(|e| e.to_string())?;
377
378 if result.entities.is_empty() && result.episodes.is_empty() {
379 println!("{YELLOW}No results.{RESET}");
380 return Ok(());
381 }
382
383 if !result.entities.is_empty() {
384 println!("{BOLD}Entities:{RESET}");
385 for (i, r) in result.entities.iter().enumerate() {
386 let source_tag = match &r.source {
387 MatchSource::Semantic => "semantic".to_string(),
388 MatchSource::Graph { parent, rel_type } => {
389 format!("graph: {parent} —[{rel_type}]")
390 }
391 MatchSource::Keyword => "keyword".to_string(),
392 };
393 println!(
394 " {BOLD}{}. {}{RESET} ({}) — {:.3} [{DIM}{source_tag}{RESET}]",
395 i + 1,
396 r.entity.name,
397 r.entity.entity_type,
398 r.score
399 );
400 println!(" {DIM}{}{RESET}", r.entity.abstract_text);
401 }
402 }
403
404 if !result.episodes.is_empty() {
405 println!("\n{BOLD}Episodes:{RESET}");
406 for (i, ep) in result.episodes.iter().enumerate() {
407 let log = ep
408 .episode
409 .log_number
410 .map(|n| format!("#{n}"))
411 .unwrap_or_default();
412 println!(
413 " {BOLD}{}. {}{RESET} ({}) — {:.3}",
414 i + 1,
415 ep.episode.session_id,
416 log,
417 ep.score
418 );
419 println!(" {DIM}{}{RESET}", ep.episode.abstract_text);
420 }
421 }
422
423 Ok(())
424 })
425}
426
427#[cfg(feature = "llm")]
429pub fn extract(
430 memory_dir: &Path,
431 log: Option<u32>,
432 all: bool,
433 dry_run: bool,
434 model_override: Option<String>,
435 provider_override: Option<String>,
436 delay_ms: u64,
437) -> Result<(), String> {
438 let graph_dir = memory_dir.join("graph");
439 if !graph_dir.exists() {
440 return Err("Graph store not initialized. Run `recall-echo graph init` first.".into());
441 }
442
443 let rt = tokio::runtime::Runtime::new().map_err(|e| e.to_string())?;
444 rt.block_on(async {
445 let gm = GraphMemory::open(&graph_dir)
446 .await
447 .map_err(|e| e.to_string())?;
448
449 let log_numbers: Vec<u32> = if let Some(ln) = log {
451 vec![ln]
452 } else if all {
453 gm.unextracted_log_numbers()
454 .await
455 .map_err(|e| e.to_string())?
456 .into_iter()
457 .map(|n| n as u32)
458 .collect()
459 } else {
460 return Err("Specify --log <N> or --all".into());
461 };
462
463 if log_numbers.is_empty() {
464 println!("{YELLOW}No unextracted archives found.{RESET}");
465 return Ok(());
466 }
467
468 let conversations_dir = find_conversations_dir(memory_dir)?;
470
471 if dry_run {
472 println!(
473 "{BOLD}Dry run — {}{RESET} archives to extract",
474 log_numbers.len()
475 );
476 for ln in &log_numbers {
477 let path = find_archive_file(&conversations_dir, *ln);
478 let label = match &path {
479 Ok(p) => {
480 p.file_name()
481 .unwrap_or_default()
482 .to_string_lossy()
483 .to_string()
484 }
485 Err(_) => format!("log {ln:03} (file not found)"),
486 };
487 println!(" {label}");
488 }
489 return Ok(());
490 }
491
492 let (llm, model_name) = crate::llm_provider::create_provider(
494 memory_dir,
495 provider_override.as_deref(),
496 model_override.as_deref(),
497 )?;
498
499 println!(
500 "{BOLD}Extracting entities from {} archives using {model_name}{RESET}",
501 log_numbers.len(),
502 );
503
504 let mut total_entities_created = 0u32;
505 let mut total_entities_merged = 0u32;
506 let mut total_entities_skipped = 0u32;
507 let mut total_relationships = 0u32;
508 let mut total_errors = Vec::new();
509 let mut processed = 0u32;
510
511 for ln in &log_numbers {
512 let archive_path = match find_archive_file(&conversations_dir, *ln) {
513 Ok(p) => p,
514 Err(e) => {
515 println!(" {YELLOW}⚠{RESET} log {ln:03}: {e}");
516 total_errors.push(format!("log {ln:03}: {e}"));
517 continue;
518 }
519 };
520
521 let content = std::fs::read_to_string(&archive_path)
522 .map_err(|e| format!("read {}: {e}", archive_path.display()))?;
523
524 let (session_id, _) = extract_archive_metadata(&content, &archive_path);
525
526 let report = gm
527 .extract_from_archive(&content, &session_id, Some(*ln), &*llm)
528 .await
529 .map_err(|e| format!("extraction log {ln:03}: {e}"))?;
530
531 println!(
532 " {GREEN}✓{RESET} log {ln:03}: +{} entities, ~{} merged, -{} skipped, {} rels",
533 report.entities_created,
534 report.entities_merged,
535 report.entities_skipped,
536 report.relationships_created,
537 );
538
539 gm.mark_extracted(*ln).await.map_err(|e| e.to_string())?;
540
541 total_entities_created += report.entities_created;
542 total_entities_merged += report.entities_merged;
543 total_entities_skipped += report.entities_skipped;
544 total_relationships += report.relationships_created;
545 total_errors.extend(report.errors);
546 processed += 1;
547
548 if delay_ms > 0 && *ln != *log_numbers.last().unwrap() {
550 tokio::time::sleep(std::time::Duration::from_millis(delay_ms)).await;
551 }
552 }
553
554 println!(
555 "\n{GREEN}✓{RESET} Done: {processed} archives — +{total_entities_created} created, ~{total_entities_merged} merged, -{total_entities_skipped} skipped, {total_relationships} relationships"
556 );
557
558 if !total_errors.is_empty() {
559 println!("\n{YELLOW}Warnings ({}):{RESET}", total_errors.len());
560 for err in total_errors.iter().take(10) {
561 println!(" {DIM}{err}{RESET}");
562 }
563 if total_errors.len() > 10 {
564 println!(" {DIM}... and {} more{RESET}", total_errors.len() - 10);
565 }
566 }
567
568 Ok(())
569 })
570}
571
572pub fn vigil_sync(
576 memory_dir: &Path,
577 signals_path: Option<&Path>,
578 outcomes_path: Option<&Path>,
579) -> Result<(), String> {
580 let graph_dir = memory_dir.join("graph");
581 if !graph_dir.exists() {
582 return Err("Graph store not initialized. Run `recall-echo graph init` first.".into());
583 }
584
585 let entity_root = memory_dir.parent().unwrap_or(memory_dir);
587
588 let default_signals = entity_root.join("vigil").join("signals.json");
589 let default_outcomes = entity_root.join("caliber").join("outcomes.json");
590
591 let sig_path = signals_path.unwrap_or(&default_signals);
592 let out_path = outcomes_path.unwrap_or(&default_outcomes);
593
594 let rt = tokio::runtime::Runtime::new().map_err(|e| e.to_string())?;
595 rt.block_on(async {
596 let gm = GraphMemory::open(&graph_dir)
597 .await
598 .map_err(|e| e.to_string())?;
599
600 let report = gm
601 .sync_vigil(sig_path, out_path)
602 .await
603 .map_err(|e| e.to_string())?;
604
605 println!("{BOLD}Vigil Sync{RESET}");
606 println!(" Measurements: +{}", report.measurements_created);
607 println!(" Outcomes: +{}", report.outcomes_created);
608 println!(" Relationships: +{}", report.relationships_created);
609 println!(" Skipped: {}", report.skipped);
610
611 if !report.errors.is_empty() {
612 println!("\n {YELLOW}Warnings:{RESET}");
613 for err in &report.errors {
614 println!(" {DIM}{err}{RESET}");
615 }
616 }
617
618 if report.measurements_created == 0 && report.outcomes_created == 0 {
619 println!("\n {DIM}No new data — graph is in sync.{RESET}");
620 }
621
622 Ok(())
623 })
624}
625
626pub fn pipeline_sync(memory_dir: &Path, docs_dir_override: Option<&Path>) -> Result<(), String> {
630 let graph_dir = memory_dir.join("graph");
631 if !graph_dir.exists() {
632 return Err("Graph store not initialized. Run `recall-echo graph init` first.".into());
633 }
634
635 let docs_dir = if let Some(d) = docs_dir_override {
637 d.to_path_buf()
638 } else {
639 let cfg = crate::config::load_from_dir(memory_dir);
640 match cfg.pipeline.and_then(|p| p.docs_dir) {
641 Some(d) => {
642 let path = PathBuf::from(shellexpand(&d));
643 if !path.exists() {
644 return Err(format!(
645 "Configured docs_dir does not exist: {}",
646 path.display()
647 ));
648 }
649 path
650 }
651 None => {
652 return Err(
653 "No docs directory specified. Use --docs-dir or set [pipeline] docs_dir in config.".into(),
654 );
655 }
656 }
657 };
658
659 let docs = read_pipeline_docs(&docs_dir)?;
661
662 let rt = tokio::runtime::Runtime::new().map_err(|e| e.to_string())?;
663 rt.block_on(async {
664 let gm = GraphMemory::open(&graph_dir)
665 .await
666 .map_err(|e| e.to_string())?;
667
668 let report = gm.sync_pipeline(&docs).await.map_err(|e| e.to_string())?;
669
670 println!("{BOLD}Pipeline Sync{RESET}");
671 println!(" Created: {}", report.entities_created);
672 println!(" Updated: {}", report.entities_updated);
673 println!(" Archived: {}", report.entities_archived);
674 println!(
675 " Relationships: +{} / ~{} skipped",
676 report.relationships_created, report.relationships_skipped
677 );
678
679 if !report.errors.is_empty() {
680 println!("\n {YELLOW}Warnings:{RESET}");
681 for err in &report.errors {
682 println!(" {DIM}{err}{RESET}");
683 }
684 }
685
686 if report.entities_created == 0
687 && report.entities_updated == 0
688 && report.entities_archived == 0
689 {
690 println!("\n {DIM}No changes — graph is in sync.{RESET}");
691 }
692
693 Ok(())
694 })
695}
696
697pub fn pipeline_status(memory_dir: &Path, staleness_days: u32) -> Result<(), String> {
699 let graph_dir = memory_dir.join("graph");
700 if !graph_dir.exists() {
701 return Err("Graph store not initialized. Run `recall-echo graph init` first.".into());
702 }
703
704 let rt = tokio::runtime::Runtime::new().map_err(|e| e.to_string())?;
705 rt.block_on(async {
706 let gm = GraphMemory::open(&graph_dir)
707 .await
708 .map_err(|e| e.to_string())?;
709
710 let stats = gm
711 .pipeline_stats(staleness_days)
712 .await
713 .map_err(|e| e.to_string())?;
714
715 println!(
716 "{BOLD}Pipeline Status{RESET} ({} entities)",
717 stats.total_entities
718 );
719
720 if stats.by_stage.is_empty() {
721 println!(
722 " {DIM}No pipeline entities in graph. Run `graph pipeline sync` first.{RESET}"
723 );
724 return Ok(());
725 }
726
727 let stage_order = ["learning", "thoughts", "curiosity", "reflections", "praxis"];
729 for stage in &stage_order {
730 if let Some(statuses) = stats.by_stage.get(*stage) {
731 println!("\n {CYAN}{}{RESET}", stage.to_uppercase());
732 let mut items: Vec<_> = statuses.iter().collect();
733 items.sort_by_key(|(s, _)| (*s).clone());
734 for (status, count) in items {
735 println!(" {status}: {count}");
736 }
737 }
738 }
739
740 if !stats.stale_thoughts.is_empty() {
741 println!("\n {YELLOW}Stale thoughts (>{staleness_days}d):{RESET}");
742 for entity in &stats.stale_thoughts {
743 println!(" {DIM}•{RESET} {}", entity.name);
744 }
745 }
746
747 if !stats.stale_questions.is_empty() {
748 println!(
749 "\n {YELLOW}Stale questions (>{}d):{RESET}",
750 staleness_days * 2
751 );
752 for entity in &stats.stale_questions {
753 println!(" {DIM}•{RESET} {}", entity.name);
754 }
755 }
756
757 if let Some(ref last) = stats.last_movement {
758 println!("\n {DIM}Last movement: {last}{RESET}");
759 }
760
761 Ok(())
762 })
763}
764
765pub fn pipeline_flow(memory_dir: &Path, entity_name: &str) -> Result<(), String> {
767 let graph_dir = memory_dir.join("graph");
768 if !graph_dir.exists() {
769 return Err("Graph store not initialized. Run `recall-echo graph init` first.".into());
770 }
771
772 let rt = tokio::runtime::Runtime::new().map_err(|e| e.to_string())?;
773 rt.block_on(async {
774 let gm = GraphMemory::open(&graph_dir)
775 .await
776 .map_err(|e| e.to_string())?;
777
778 let chain = gm
779 .pipeline_flow(entity_name)
780 .await
781 .map_err(|e| e.to_string())?;
782
783 if chain.is_empty() {
784 println!("{YELLOW}No pipeline relationships found for \"{entity_name}\".{RESET}");
785 return Ok(());
786 }
787
788 println!("{BOLD}Pipeline Flow: {entity_name}{RESET}\n");
789 for (source, rel_type, target) in &chain {
790 println!(
791 " {} ({}) {CYAN}—[{rel_type}]→{RESET} {} ({})",
792 source.name, source.entity_type, target.name, target.entity_type
793 );
794 }
795
796 Ok(())
797 })
798}
799
800pub fn pipeline_stale(memory_dir: &Path, staleness_days: u32) -> Result<(), String> {
802 let graph_dir = memory_dir.join("graph");
803 if !graph_dir.exists() {
804 return Err("Graph store not initialized. Run `recall-echo graph init` first.".into());
805 }
806
807 let rt = tokio::runtime::Runtime::new().map_err(|e| e.to_string())?;
808 rt.block_on(async {
809 let gm = GraphMemory::open(&graph_dir)
810 .await
811 .map_err(|e| e.to_string())?;
812
813 let stats = gm
814 .pipeline_stats(staleness_days)
815 .await
816 .map_err(|e| e.to_string())?;
817
818 let total_stale = stats.stale_thoughts.len() + stats.stale_questions.len();
819 if total_stale == 0 {
820 println!("{GREEN}✓{RESET} No stale pipeline entities.");
821 return Ok(());
822 }
823
824 println!("{BOLD}Stale Pipeline Entities{RESET}\n");
825
826 if !stats.stale_thoughts.is_empty() {
827 println!(" {YELLOW}Thoughts (>{staleness_days} days):{RESET}");
828 for entity in &stats.stale_thoughts {
829 println!(" • {} {DIM}({}){RESET}", entity.name, entity.entity_type);
830 }
831 }
832
833 if !stats.stale_questions.is_empty() {
834 println!(" {YELLOW}Questions (>{} days):{RESET}", staleness_days * 2);
835 for entity in &stats.stale_questions {
836 println!(" • {} {DIM}({}){RESET}", entity.name, entity.entity_type);
837 }
838 }
839
840 Ok(())
841 })
842}
843
844fn read_pipeline_docs(dir: &Path) -> Result<PipelineDocuments, String> {
846 let read_or_empty = |name: &str| -> String {
847 let path = dir.join(name);
848 std::fs::read_to_string(&path).unwrap_or_default()
849 };
850
851 Ok(PipelineDocuments {
852 learning: read_or_empty("LEARNING.md"),
853 thoughts: read_or_empty("THOUGHTS.md"),
854 curiosity: read_or_empty("CURIOSITY.md"),
855 reflections: read_or_empty("REFLECTIONS.md"),
856 praxis: read_or_empty("PRAXIS.md"),
857 })
858}
859
860fn shellexpand(path: &str) -> String {
862 if let Some(rest) = path.strip_prefix("~/") {
863 if let Some(home) = dirs::home_dir() {
864 return home.join(rest).to_string_lossy().to_string();
865 }
866 }
867 path.to_string()
868}
869
870fn find_conversations_dir(memory_dir: &Path) -> Result<PathBuf, String> {
872 let conv = memory_dir.join("conversations");
873 if conv.exists() {
874 return Ok(conv);
875 }
876 if let Some(parent) = memory_dir.parent() {
877 let parent_conv = parent.join("conversations");
878 if parent_conv.exists() {
879 return Ok(parent_conv);
880 }
881 }
882 Err("conversations/ directory not found".into())
883}
884
885#[cfg(feature = "llm")]
887fn find_archive_file(conversations_dir: &Path, log_number: u32) -> Result<PathBuf, String> {
888 let patterns = [
890 format!("conversation-{log_number:03}.md"),
891 format!("conversation-{log_number}.md"),
892 format!("archive-log-{log_number:03}.md"),
893 format!("archive-log-{log_number}.md"),
894 ];
895
896 for name in &patterns {
897 let path = conversations_dir.join(name);
898 if path.exists() {
899 return Ok(path);
900 }
901 }
902
903 Err(format!("no archive file for log {log_number:03}"))
904}