Skip to main content

recall_echo/
graph_cli.rs

1//! Graph memory CLI subcommands (behind `graph` feature flag).
2
3use std::path::{Path, PathBuf};
4
5use crate::graph::traverse::format_traversal;
6use crate::graph::types::*;
7use crate::graph::GraphMemory;
8
9const GREEN: &str = "\x1b[32m";
10const CYAN: &str = "\x1b[36m";
11const YELLOW: &str = "\x1b[33m";
12const BOLD: &str = "\x1b[1m";
13const DIM: &str = "\x1b[2m";
14const RESET: &str = "\x1b[0m";
15
16/// Initialize the graph store at {memory_dir}/graph/.
17pub fn init(memory_dir: &Path) -> Result<(), String> {
18    let graph_dir = memory_dir.join("graph");
19    let rt = tokio::runtime::Runtime::new().map_err(|e| e.to_string())?;
20    rt.block_on(async {
21        GraphMemory::open(&graph_dir)
22            .await
23            .map_err(|e| e.to_string())?;
24        println!(
25            "{GREEN}✓{RESET} Graph store initialized at {}",
26            graph_dir.display()
27        );
28        Ok(())
29    })
30}
31
32/// Show graph stats.
33pub fn graph_status(memory_dir: &Path) -> Result<(), String> {
34    let graph_dir = memory_dir.join("graph");
35    if !graph_dir.exists() {
36        return Err("Graph store not initialized. Run `recall-echo graph init` first.".into());
37    }
38    let rt = tokio::runtime::Runtime::new().map_err(|e| e.to_string())?;
39    rt.block_on(async {
40        let gm = GraphMemory::open(&graph_dir)
41            .await
42            .map_err(|e| e.to_string())?;
43        let stats = gm.stats().await.map_err(|e| e.to_string())?;
44
45        println!("{BOLD}Graph Memory Status{RESET}");
46        println!("  Entities:      {}", stats.entity_count);
47        println!("  Relationships: {}", stats.relationship_count);
48        println!("  Episodes:      {}", stats.episode_count);
49
50        if !stats.entity_type_counts.is_empty() {
51            println!("\n  {DIM}By type:{RESET}");
52            let mut types: Vec<_> = stats.entity_type_counts.iter().collect();
53            types.sort_by(|a, b| b.1.cmp(a.1));
54            for (t, count) in types {
55                println!("    {t}: {count}");
56            }
57        }
58        Ok(())
59    })
60}
61
62/// Add an entity to the graph.
63pub fn add_entity(
64    memory_dir: &Path,
65    name: &str,
66    entity_type: &str,
67    abstract_text: &str,
68    overview: Option<&str>,
69    source: Option<&str>,
70) -> Result<(), String> {
71    let graph_dir = memory_dir.join("graph");
72    let et: EntityType = entity_type.parse().map_err(|e: String| e)?;
73
74    let rt = tokio::runtime::Runtime::new().map_err(|e| e.to_string())?;
75    rt.block_on(async {
76        let gm = GraphMemory::open(&graph_dir)
77            .await
78            .map_err(|e| e.to_string())?;
79
80        let entity = gm
81            .add_entity(NewEntity {
82                name: name.to_string(),
83                entity_type: et,
84                abstract_text: abstract_text.to_string(),
85                overview: overview.map(String::from),
86                content: None,
87                attributes: None,
88                source: source.map(String::from),
89            })
90            .await
91            .map_err(|e| e.to_string())?;
92
93        println!(
94            "{GREEN}✓{RESET} Created entity: {BOLD}{}{RESET} ({}) [{}]",
95            entity.name,
96            entity.entity_type,
97            entity.id_string()
98        );
99        Ok(())
100    })
101}
102
103/// Create a relationship between two entities.
104pub fn relate(
105    memory_dir: &Path,
106    from: &str,
107    rel_type: &str,
108    to: &str,
109    description: Option<&str>,
110    source: Option<&str>,
111) -> Result<(), String> {
112    let graph_dir = memory_dir.join("graph");
113    let rt = tokio::runtime::Runtime::new().map_err(|e| e.to_string())?;
114    rt.block_on(async {
115        let gm = GraphMemory::open(&graph_dir)
116            .await
117            .map_err(|e| e.to_string())?;
118
119        let rel = gm
120            .add_relationship(NewRelationship {
121                from_entity: from.to_string(),
122                to_entity: to.to_string(),
123                rel_type: rel_type.to_string(),
124                description: description.map(String::from),
125                confidence: None,
126                source: source.map(String::from),
127            })
128            .await
129            .map_err(|e| e.to_string())?;
130
131        println!(
132            "{GREEN}✓{RESET} {from} {CYAN}—[{rel_type}]→{RESET} {to} [{}]",
133            rel.id_string()
134        );
135        Ok(())
136    })
137}
138
139/// Semantic search across entities.
140pub fn search(
141    memory_dir: &Path,
142    query: &str,
143    limit: usize,
144    entity_type: Option<&str>,
145    keyword: Option<&str>,
146) -> Result<(), String> {
147    let graph_dir = memory_dir.join("graph");
148    let rt = tokio::runtime::Runtime::new().map_err(|e| e.to_string())?;
149    rt.block_on(async {
150        let gm = GraphMemory::open(&graph_dir)
151            .await
152            .map_err(|e| e.to_string())?;
153
154        let options = SearchOptions {
155            limit,
156            entity_type: entity_type.map(String::from),
157            keyword: keyword.map(String::from),
158        };
159
160        let results = gm
161            .search_with_options(query, &options)
162            .await
163            .map_err(|e| e.to_string())?;
164
165        if results.is_empty() {
166            println!("{YELLOW}No results.{RESET}");
167            return Ok(());
168        }
169
170        for (i, r) in results.iter().enumerate() {
171            println!(
172                "{BOLD}{}. {}{RESET} ({}) — score: {:.3}",
173                i + 1,
174                r.entity.name,
175                r.entity.entity_type,
176                r.score
177            );
178            println!("   {DIM}{}{RESET}", r.entity.abstract_text);
179        }
180        Ok(())
181    })
182}
183
184/// Ingest a single archive file into the graph (episodes only, no LLM extraction).
185pub fn ingest(memory_dir: &Path, archive_path: &Path) -> Result<(), String> {
186    let graph_dir = memory_dir.join("graph");
187    if !graph_dir.exists() {
188        return Err("Graph store not initialized. Run `recall-echo graph init` first.".into());
189    }
190
191    let content = std::fs::read_to_string(archive_path)
192        .map_err(|e| format!("Failed to read {}: {e}", archive_path.display()))?;
193
194    // Extract session_id and log_number from frontmatter if available
195    let (session_id, log_number) = extract_archive_metadata(&content, archive_path);
196
197    let rt = tokio::runtime::Runtime::new().map_err(|e| e.to_string())?;
198    rt.block_on(async {
199        let gm = GraphMemory::open(&graph_dir)
200            .await
201            .map_err(|e| e.to_string())?;
202
203        let report = gm
204            .ingest_archive(&content, &session_id, log_number, None)
205            .await
206            .map_err(|e| e.to_string())?;
207
208        println!(
209            "{GREEN}✓{RESET} Ingested {}: {} episodes created",
210            archive_path.display(),
211            report.episodes_created
212        );
213        if !report.errors.is_empty() {
214            for err in &report.errors {
215                println!("  {YELLOW}warning:{RESET} {err}");
216            }
217        }
218        Ok(())
219    })
220}
221
222/// Ingest all un-ingested archives in conversations/.
223pub fn ingest_all(memory_dir: &Path) -> Result<(), String> {
224    let graph_dir = memory_dir.join("graph");
225    if !graph_dir.exists() {
226        return Err("Graph store not initialized. Run `recall-echo graph init` first.".into());
227    }
228
229    let conversations_dir = find_conversations_dir(memory_dir)?;
230
231    // Collect all conversation files, sorted
232    let mut files: Vec<_> = std::fs::read_dir(&conversations_dir)
233        .map_err(|e| e.to_string())?
234        .filter_map(|e| e.ok())
235        .filter(|e| {
236            let name = e.file_name().to_string_lossy().to_string();
237            name.starts_with("conversation-") || name.starts_with("archive-log-")
238        })
239        .collect();
240    files.sort_by_key(|e| e.file_name());
241
242    if files.is_empty() {
243        println!("{YELLOW}No conversation archives found.{RESET}");
244        return Ok(());
245    }
246
247    let rt = tokio::runtime::Runtime::new().map_err(|e| e.to_string())?;
248    rt.block_on(async {
249        let gm = GraphMemory::open(&graph_dir)
250            .await
251            .map_err(|e| e.to_string())?;
252
253        let mut total_episodes = 0u32;
254        let mut ingested = 0u32;
255        let mut skipped = 0u32;
256
257        for entry in &files {
258            let path = entry.path();
259            let content = std::fs::read_to_string(&path).map_err(|e| e.to_string())?;
260
261            let (session_id, log_number) = extract_archive_metadata(&content, &path);
262
263            // Check if already ingested (has episodes for this log_number)
264            if let Some(ln) = log_number {
265                if let Ok(Some(_)) = gm.get_episode_by_log_number(ln).await {
266                    skipped += 1;
267                    continue;
268                }
269            }
270
271            let report = gm
272                .ingest_archive(&content, &session_id, log_number, None)
273                .await
274                .map_err(|e| e.to_string())?;
275
276            total_episodes += report.episodes_created;
277            ingested += 1;
278
279            println!(
280                "  {GREEN}✓{RESET} {} — {} episodes",
281                path.file_name().unwrap_or_default().to_string_lossy(),
282                report.episodes_created
283            );
284        }
285
286        println!(
287            "\n{GREEN}✓{RESET} Ingested {ingested} archives ({total_episodes} episodes), skipped {skipped} already ingested"
288        );
289        Ok(())
290    })
291}
292
293/// Extract session_id and log_number from a conversation archive's frontmatter.
294fn extract_archive_metadata(content: &str, path: &Path) -> (String, Option<u32>) {
295    let mut session_id = "unknown".to_string();
296    let mut log_number: Option<u32> = None;
297
298    // Try to extract log number from filename
299    if let Some(name) = path.file_stem().and_then(|s| s.to_str()) {
300        let num_str = name
301            .strip_prefix("conversation-")
302            .or_else(|| name.strip_prefix("archive-log-"));
303        if let Some(num_str) = num_str {
304            if let Ok(n) = num_str.parse::<u32>() {
305                log_number = Some(n);
306            }
307        }
308    }
309
310    // Try to extract session_id from frontmatter
311    if let Some(stripped) = content.strip_prefix("---") {
312        if let Some(end) = stripped.find("---") {
313            let frontmatter = &stripped[..end];
314            for line in frontmatter.lines() {
315                let line = line.trim();
316                if let Some(val) = line.strip_prefix("session_id:") {
317                    session_id = val.trim().trim_matches('"').to_string();
318                }
319            }
320        }
321    }
322
323    (session_id, log_number)
324}
325
326/// Traverse the graph from an entity.
327pub fn traverse(
328    memory_dir: &Path,
329    entity_name: &str,
330    depth: u32,
331    type_filter: Option<&str>,
332) -> Result<(), String> {
333    let graph_dir = memory_dir.join("graph");
334    let rt = tokio::runtime::Runtime::new().map_err(|e| e.to_string())?;
335    rt.block_on(async {
336        let gm = GraphMemory::open(&graph_dir)
337            .await
338            .map_err(|e| e.to_string())?;
339
340        let tree = gm
341            .traverse_filtered(entity_name, depth, type_filter)
342            .await
343            .map_err(|e| e.to_string())?;
344
345        let output = format_traversal(&tree, 0);
346        print!("{output}");
347        Ok(())
348    })
349}
350
351/// Hybrid query: semantic + graph expansion + optional episodes.
352pub fn hybrid_query(
353    memory_dir: &Path,
354    query: &str,
355    limit: usize,
356    entity_type: Option<&str>,
357    keyword: Option<&str>,
358    depth: u32,
359    episodes: bool,
360) -> Result<(), String> {
361    let graph_dir = memory_dir.join("graph");
362    let rt = tokio::runtime::Runtime::new().map_err(|e| e.to_string())?;
363    rt.block_on(async {
364        let gm = GraphMemory::open(&graph_dir)
365            .await
366            .map_err(|e| e.to_string())?;
367
368        let options = QueryOptions {
369            limit,
370            entity_type: entity_type.map(String::from),
371            keyword: keyword.map(String::from),
372            graph_depth: depth,
373            include_episodes: episodes,
374        };
375
376        let result = gm.query(query, &options).await.map_err(|e| e.to_string())?;
377
378        if result.entities.is_empty() && result.episodes.is_empty() {
379            println!("{YELLOW}No results.{RESET}");
380            return Ok(());
381        }
382
383        if !result.entities.is_empty() {
384            println!("{BOLD}Entities:{RESET}");
385            for (i, r) in result.entities.iter().enumerate() {
386                let source_tag = match &r.source {
387                    MatchSource::Semantic => "semantic".to_string(),
388                    MatchSource::Graph { parent, rel_type } => {
389                        format!("graph: {parent} —[{rel_type}]")
390                    }
391                    MatchSource::Keyword => "keyword".to_string(),
392                };
393                println!(
394                    "  {BOLD}{}. {}{RESET} ({}) — {:.3} [{DIM}{source_tag}{RESET}]",
395                    i + 1,
396                    r.entity.name,
397                    r.entity.entity_type,
398                    r.score
399                );
400                println!("     {DIM}{}{RESET}", r.entity.abstract_text);
401            }
402        }
403
404        if !result.episodes.is_empty() {
405            println!("\n{BOLD}Episodes:{RESET}");
406            for (i, ep) in result.episodes.iter().enumerate() {
407                let log = ep
408                    .episode
409                    .log_number
410                    .map(|n| format!("#{n}"))
411                    .unwrap_or_default();
412                println!(
413                    "  {BOLD}{}. {}{RESET} ({}) — {:.3}",
414                    i + 1,
415                    ep.episode.session_id,
416                    log,
417                    ep.score
418                );
419                println!("     {DIM}{}{RESET}", ep.episode.abstract_text);
420            }
421        }
422
423        Ok(())
424    })
425}
426
427/// Extract entities from already-ingested archives using an LLM.
428#[cfg(feature = "llm")]
429pub fn extract(
430    memory_dir: &Path,
431    log: Option<u32>,
432    all: bool,
433    dry_run: bool,
434    model_override: Option<String>,
435    provider_override: Option<String>,
436    delay_ms: u64,
437) -> Result<(), String> {
438    let graph_dir = memory_dir.join("graph");
439    if !graph_dir.exists() {
440        return Err("Graph store not initialized. Run `recall-echo graph init` first.".into());
441    }
442
443    let rt = tokio::runtime::Runtime::new().map_err(|e| e.to_string())?;
444    rt.block_on(async {
445        let gm = GraphMemory::open(&graph_dir)
446            .await
447            .map_err(|e| e.to_string())?;
448
449        // Determine which log numbers to process
450        let log_numbers: Vec<u32> = if let Some(ln) = log {
451            vec![ln]
452        } else if all {
453            gm.unextracted_log_numbers()
454                .await
455                .map_err(|e| e.to_string())?
456                .into_iter()
457                .map(|n| n as u32)
458                .collect()
459        } else {
460            return Err("Specify --log <N> or --all".into());
461        };
462
463        if log_numbers.is_empty() {
464            println!("{YELLOW}No unextracted archives found.{RESET}");
465            return Ok(());
466        }
467
468        // Find conversations directory
469        let conversations_dir = find_conversations_dir(memory_dir)?;
470
471        if dry_run {
472            println!(
473                "{BOLD}Dry run — {}{RESET} archives to extract",
474                log_numbers.len()
475            );
476            for ln in &log_numbers {
477                let path = find_archive_file(&conversations_dir, *ln);
478                let label = match &path {
479                    Ok(p) => {
480                        p.file_name()
481                            .unwrap_or_default()
482                            .to_string_lossy()
483                            .to_string()
484                    }
485                    Err(_) => format!("log {ln:03} (file not found)"),
486                };
487                println!("  {label}");
488            }
489            return Ok(());
490        }
491
492        // Build LLM provider from .recall-echo.toml (CLI flags override)
493        let (llm, model_name) = crate::llm_provider::create_provider(
494            memory_dir,
495            provider_override.as_deref(),
496            model_override.as_deref(),
497        )?;
498
499        println!(
500            "{BOLD}Extracting entities from {} archives using {model_name}{RESET}",
501            log_numbers.len(),
502        );
503
504        let mut total_entities_created = 0u32;
505        let mut total_entities_merged = 0u32;
506        let mut total_entities_skipped = 0u32;
507        let mut total_relationships = 0u32;
508        let mut total_errors = Vec::new();
509        let mut processed = 0u32;
510
511        for ln in &log_numbers {
512            let archive_path = match find_archive_file(&conversations_dir, *ln) {
513                Ok(p) => p,
514                Err(e) => {
515                    println!("  {YELLOW}⚠{RESET} log {ln:03}: {e}");
516                    total_errors.push(format!("log {ln:03}: {e}"));
517                    continue;
518                }
519            };
520
521            let content = std::fs::read_to_string(&archive_path)
522                .map_err(|e| format!("read {}: {e}", archive_path.display()))?;
523
524            let (session_id, _) = extract_archive_metadata(&content, &archive_path);
525
526            let report = gm
527                .extract_from_archive(&content, &session_id, Some(*ln), &*llm)
528                .await
529                .map_err(|e| format!("extraction log {ln:03}: {e}"))?;
530
531            println!(
532                "  {GREEN}✓{RESET} log {ln:03}: +{} entities, ~{} merged, -{} skipped, {} rels",
533                report.entities_created,
534                report.entities_merged,
535                report.entities_skipped,
536                report.relationships_created,
537            );
538
539            gm.mark_extracted(*ln).await.map_err(|e| e.to_string())?;
540
541            total_entities_created += report.entities_created;
542            total_entities_merged += report.entities_merged;
543            total_entities_skipped += report.entities_skipped;
544            total_relationships += report.relationships_created;
545            total_errors.extend(report.errors);
546            processed += 1;
547
548            // Rate limiting between archives
549            if delay_ms > 0 && *ln != *log_numbers.last().unwrap() {
550                tokio::time::sleep(std::time::Duration::from_millis(delay_ms)).await;
551            }
552        }
553
554        println!(
555            "\n{GREEN}✓{RESET} Done: {processed} archives — +{total_entities_created} created, ~{total_entities_merged} merged, -{total_entities_skipped} skipped, {total_relationships} relationships"
556        );
557
558        if !total_errors.is_empty() {
559            println!("\n{YELLOW}Warnings ({}):{RESET}", total_errors.len());
560            for err in total_errors.iter().take(10) {
561                println!("  {DIM}{err}{RESET}");
562            }
563            if total_errors.len() > 10 {
564                println!("  {DIM}... and {} more{RESET}", total_errors.len() - 10);
565            }
566        }
567
568        Ok(())
569    })
570}
571
572// ── Vigil sync commands ──────────────────────────────────────────────
573
574/// Sync vigil-pulse signals and outcomes into the graph.
575pub fn vigil_sync(
576    memory_dir: &Path,
577    signals_path: Option<&Path>,
578    outcomes_path: Option<&Path>,
579) -> Result<(), String> {
580    let graph_dir = memory_dir.join("graph");
581    if !graph_dir.exists() {
582        return Err("Graph store not initialized. Run `recall-echo graph init` first.".into());
583    }
584
585    // Default paths: look for vigil/ and caliber/ relative to memory_dir's parent (entity root)
586    let entity_root = memory_dir
587        .parent()
588        .unwrap_or(memory_dir);
589
590    let default_signals = entity_root.join("vigil").join("signals.json");
591    let default_outcomes = entity_root.join("caliber").join("outcomes.json");
592
593    let sig_path = signals_path.unwrap_or(&default_signals);
594    let out_path = outcomes_path.unwrap_or(&default_outcomes);
595
596    let rt = tokio::runtime::Runtime::new().map_err(|e| e.to_string())?;
597    rt.block_on(async {
598        let gm = GraphMemory::open(&graph_dir)
599            .await
600            .map_err(|e| e.to_string())?;
601
602        let report = gm
603            .sync_vigil(sig_path, out_path)
604            .await
605            .map_err(|e| e.to_string())?;
606
607        println!("{BOLD}Vigil Sync{RESET}");
608        println!("  Measurements: +{}", report.measurements_created);
609        println!("  Outcomes:     +{}", report.outcomes_created);
610        println!("  Relationships: +{}", report.relationships_created);
611        println!("  Skipped:       {}", report.skipped);
612
613        if !report.errors.is_empty() {
614            println!("\n  {YELLOW}Warnings:{RESET}");
615            for err in &report.errors {
616                println!("    {DIM}{err}{RESET}");
617            }
618        }
619
620        if report.measurements_created == 0
621            && report.outcomes_created == 0
622        {
623            println!("\n  {DIM}No new data — graph is in sync.{RESET}");
624        }
625
626        Ok(())
627    })
628}
629
630// ── Pipeline commands ──────────────────────────────────────────────────
631
632/// Sync pipeline documents into the graph.
633pub fn pipeline_sync(memory_dir: &Path, docs_dir_override: Option<&Path>) -> Result<(), String> {
634    let graph_dir = memory_dir.join("graph");
635    if !graph_dir.exists() {
636        return Err("Graph store not initialized. Run `recall-echo graph init` first.".into());
637    }
638
639    // Resolve docs directory: CLI flag > config > error
640    let docs_dir = if let Some(d) = docs_dir_override {
641        d.to_path_buf()
642    } else {
643        let cfg = crate::config::load_from_dir(memory_dir);
644        match cfg.pipeline.and_then(|p| p.docs_dir) {
645            Some(d) => {
646                let path = PathBuf::from(shellexpand(&d));
647                if !path.exists() {
648                    return Err(format!(
649                        "Configured docs_dir does not exist: {}",
650                        path.display()
651                    ));
652                }
653                path
654            }
655            None => {
656                return Err(
657                    "No docs directory specified. Use --docs-dir or set [pipeline] docs_dir in config.".into(),
658                );
659            }
660        }
661    };
662
663    // Read pipeline documents
664    let docs = read_pipeline_docs(&docs_dir)?;
665
666    let rt = tokio::runtime::Runtime::new().map_err(|e| e.to_string())?;
667    rt.block_on(async {
668        let gm = GraphMemory::open(&graph_dir)
669            .await
670            .map_err(|e| e.to_string())?;
671
672        let report = gm.sync_pipeline(&docs).await.map_err(|e| e.to_string())?;
673
674        println!("{BOLD}Pipeline Sync{RESET}");
675        println!("  Created:      {}", report.entities_created);
676        println!("  Updated:      {}", report.entities_updated);
677        println!("  Archived:     {}", report.entities_archived);
678        println!(
679            "  Relationships: +{} / ~{} skipped",
680            report.relationships_created, report.relationships_skipped
681        );
682
683        if !report.errors.is_empty() {
684            println!("\n  {YELLOW}Warnings:{RESET}");
685            for err in &report.errors {
686                println!("    {DIM}{err}{RESET}");
687            }
688        }
689
690        if report.entities_created == 0
691            && report.entities_updated == 0
692            && report.entities_archived == 0
693        {
694            println!("\n  {DIM}No changes — graph is in sync.{RESET}");
695        }
696
697        Ok(())
698    })
699}
700
701/// Show pipeline health stats.
702pub fn pipeline_status(memory_dir: &Path, staleness_days: u32) -> Result<(), String> {
703    let graph_dir = memory_dir.join("graph");
704    if !graph_dir.exists() {
705        return Err("Graph store not initialized. Run `recall-echo graph init` first.".into());
706    }
707
708    let rt = tokio::runtime::Runtime::new().map_err(|e| e.to_string())?;
709    rt.block_on(async {
710        let gm = GraphMemory::open(&graph_dir)
711            .await
712            .map_err(|e| e.to_string())?;
713
714        let stats = gm
715            .pipeline_stats(staleness_days)
716            .await
717            .map_err(|e| e.to_string())?;
718
719        println!(
720            "{BOLD}Pipeline Status{RESET} ({} entities)",
721            stats.total_entities
722        );
723
724        if stats.by_stage.is_empty() {
725            println!(
726                "  {DIM}No pipeline entities in graph. Run `graph pipeline sync` first.{RESET}"
727            );
728            return Ok(());
729        }
730
731        // Display stages in pipeline order
732        let stage_order = ["learning", "thoughts", "curiosity", "reflections", "praxis"];
733        for stage in &stage_order {
734            if let Some(statuses) = stats.by_stage.get(*stage) {
735                println!("\n  {CYAN}{}{RESET}", stage.to_uppercase());
736                let mut items: Vec<_> = statuses.iter().collect();
737                items.sort_by_key(|(s, _)| (*s).clone());
738                for (status, count) in items {
739                    println!("    {status}: {count}");
740                }
741            }
742        }
743
744        if !stats.stale_thoughts.is_empty() {
745            println!("\n  {YELLOW}Stale thoughts (>{staleness_days}d):{RESET}");
746            for entity in &stats.stale_thoughts {
747                println!("    {DIM}•{RESET} {}", entity.name);
748            }
749        }
750
751        if !stats.stale_questions.is_empty() {
752            println!(
753                "\n  {YELLOW}Stale questions (>{}d):{RESET}",
754                staleness_days * 2
755            );
756            for entity in &stats.stale_questions {
757                println!("    {DIM}•{RESET} {}", entity.name);
758            }
759        }
760
761        if let Some(ref last) = stats.last_movement {
762            println!("\n  {DIM}Last movement: {last}{RESET}");
763        }
764
765        Ok(())
766    })
767}
768
769/// Trace pipeline flow for an entity.
770pub fn pipeline_flow(memory_dir: &Path, entity_name: &str) -> Result<(), String> {
771    let graph_dir = memory_dir.join("graph");
772    if !graph_dir.exists() {
773        return Err("Graph store not initialized. Run `recall-echo graph init` first.".into());
774    }
775
776    let rt = tokio::runtime::Runtime::new().map_err(|e| e.to_string())?;
777    rt.block_on(async {
778        let gm = GraphMemory::open(&graph_dir)
779            .await
780            .map_err(|e| e.to_string())?;
781
782        let chain = gm
783            .pipeline_flow(entity_name)
784            .await
785            .map_err(|e| e.to_string())?;
786
787        if chain.is_empty() {
788            println!("{YELLOW}No pipeline relationships found for \"{entity_name}\".{RESET}");
789            return Ok(());
790        }
791
792        println!("{BOLD}Pipeline Flow: {entity_name}{RESET}\n");
793        for (source, rel_type, target) in &chain {
794            println!(
795                "  {} ({}) {CYAN}—[{rel_type}]→{RESET} {} ({})",
796                source.name, source.entity_type, target.name, target.entity_type
797            );
798        }
799
800        Ok(())
801    })
802}
803
804/// List stale pipeline entities.
805pub fn pipeline_stale(memory_dir: &Path, staleness_days: u32) -> Result<(), String> {
806    let graph_dir = memory_dir.join("graph");
807    if !graph_dir.exists() {
808        return Err("Graph store not initialized. Run `recall-echo graph init` first.".into());
809    }
810
811    let rt = tokio::runtime::Runtime::new().map_err(|e| e.to_string())?;
812    rt.block_on(async {
813        let gm = GraphMemory::open(&graph_dir)
814            .await
815            .map_err(|e| e.to_string())?;
816
817        let stats = gm
818            .pipeline_stats(staleness_days)
819            .await
820            .map_err(|e| e.to_string())?;
821
822        let total_stale = stats.stale_thoughts.len() + stats.stale_questions.len();
823        if total_stale == 0 {
824            println!("{GREEN}✓{RESET} No stale pipeline entities.");
825            return Ok(());
826        }
827
828        println!("{BOLD}Stale Pipeline Entities{RESET}\n");
829
830        if !stats.stale_thoughts.is_empty() {
831            println!("  {YELLOW}Thoughts (>{staleness_days} days):{RESET}");
832            for entity in &stats.stale_thoughts {
833                println!("    • {} {DIM}({}){RESET}", entity.name, entity.entity_type);
834            }
835        }
836
837        if !stats.stale_questions.is_empty() {
838            println!("  {YELLOW}Questions (>{} days):{RESET}", staleness_days * 2);
839            for entity in &stats.stale_questions {
840                println!("    • {} {DIM}({}){RESET}", entity.name, entity.entity_type);
841            }
842        }
843
844        Ok(())
845    })
846}
847
848/// Read pipeline documents from a directory.
849fn read_pipeline_docs(dir: &Path) -> Result<PipelineDocuments, String> {
850    let read_or_empty = |name: &str| -> String {
851        let path = dir.join(name);
852        std::fs::read_to_string(&path).unwrap_or_default()
853    };
854
855    Ok(PipelineDocuments {
856        learning: read_or_empty("LEARNING.md"),
857        thoughts: read_or_empty("THOUGHTS.md"),
858        curiosity: read_or_empty("CURIOSITY.md"),
859        reflections: read_or_empty("REFLECTIONS.md"),
860        praxis: read_or_empty("PRAXIS.md"),
861    })
862}
863
864/// Expand ~ to home directory in paths.
865fn shellexpand(path: &str) -> String {
866    if let Some(rest) = path.strip_prefix("~/") {
867        if let Some(home) = dirs::home_dir() {
868            return home.join(rest).to_string_lossy().to_string();
869        }
870    }
871    path.to_string()
872}
873
874/// Find the conversations directory — checks memory_dir/conversations/ then parent/conversations/.
875fn find_conversations_dir(memory_dir: &Path) -> Result<PathBuf, String> {
876    let conv = memory_dir.join("conversations");
877    if conv.exists() {
878        return Ok(conv);
879    }
880    if let Some(parent) = memory_dir.parent() {
881        let parent_conv = parent.join("conversations");
882        if parent_conv.exists() {
883            return Ok(parent_conv);
884        }
885    }
886    Err("conversations/ directory not found".into())
887}
888
889/// Find the archive file for a given log number.
890#[cfg(feature = "llm")]
891fn find_archive_file(conversations_dir: &Path, log_number: u32) -> Result<PathBuf, String> {
892    // Try both naming conventions
893    let patterns = [
894        format!("conversation-{log_number:03}.md"),
895        format!("conversation-{log_number}.md"),
896        format!("archive-log-{log_number:03}.md"),
897        format!("archive-log-{log_number}.md"),
898    ];
899
900    for name in &patterns {
901        let path = conversations_dir.join(name);
902        if path.exists() {
903            return Ok(path);
904        }
905    }
906
907    Err(format!("no archive file for log {log_number:03}"))
908}