Skip to main content

recall_echo/
graph_cli.rs

1//! Graph memory CLI subcommands (behind `graph` feature flag).
2
3use std::path::{Path, PathBuf};
4
5use crate::graph::traverse::format_traversal;
6use crate::graph::types::*;
7use crate::graph::GraphMemory;
8
9const GREEN: &str = "\x1b[32m";
10const CYAN: &str = "\x1b[36m";
11const YELLOW: &str = "\x1b[33m";
12const BOLD: &str = "\x1b[1m";
13const DIM: &str = "\x1b[2m";
14const RESET: &str = "\x1b[0m";
15
16/// Initialize the graph store at {memory_dir}/graph/.
17pub fn init(memory_dir: &Path) -> Result<(), String> {
18    let graph_dir = memory_dir.join("graph");
19    let rt = tokio::runtime::Runtime::new().map_err(|e| e.to_string())?;
20    rt.block_on(async {
21        GraphMemory::open(&graph_dir)
22            .await
23            .map_err(|e| e.to_string())?;
24        println!(
25            "{GREEN}✓{RESET} Graph store initialized at {}",
26            graph_dir.display()
27        );
28        Ok(())
29    })
30}
31
32/// Show graph stats.
33pub fn graph_status(memory_dir: &Path) -> Result<(), String> {
34    let graph_dir = memory_dir.join("graph");
35    if !graph_dir.exists() {
36        return Err("Graph store not initialized. Run `recall-echo graph init` first.".into());
37    }
38    let rt = tokio::runtime::Runtime::new().map_err(|e| e.to_string())?;
39    rt.block_on(async {
40        let gm = GraphMemory::open(&graph_dir)
41            .await
42            .map_err(|e| e.to_string())?;
43        let stats = gm.stats().await.map_err(|e| e.to_string())?;
44
45        println!("{BOLD}Graph Memory Status{RESET}");
46        println!("  Entities:      {}", stats.entity_count);
47        println!("  Relationships: {}", stats.relationship_count);
48        println!("  Episodes:      {}", stats.episode_count);
49
50        if !stats.entity_type_counts.is_empty() {
51            println!("\n  {DIM}By type:{RESET}");
52            let mut types: Vec<_> = stats.entity_type_counts.iter().collect();
53            types.sort_by(|a, b| b.1.cmp(a.1));
54            for (t, count) in types {
55                println!("    {t}: {count}");
56            }
57        }
58        Ok(())
59    })
60}
61
62/// Add an entity to the graph.
63pub fn add_entity(
64    memory_dir: &Path,
65    name: &str,
66    entity_type: &str,
67    abstract_text: &str,
68    overview: Option<&str>,
69    source: Option<&str>,
70) -> Result<(), String> {
71    let graph_dir = memory_dir.join("graph");
72    let et: EntityType = entity_type.parse().map_err(|e: String| e)?;
73
74    let rt = tokio::runtime::Runtime::new().map_err(|e| e.to_string())?;
75    rt.block_on(async {
76        let gm = GraphMemory::open(&graph_dir)
77            .await
78            .map_err(|e| e.to_string())?;
79
80        let entity = gm
81            .add_entity(NewEntity {
82                name: name.to_string(),
83                entity_type: et,
84                abstract_text: abstract_text.to_string(),
85                overview: overview.map(String::from),
86                content: None,
87                attributes: None,
88                source: source.map(String::from),
89            })
90            .await
91            .map_err(|e| e.to_string())?;
92
93        println!(
94            "{GREEN}✓{RESET} Created entity: {BOLD}{}{RESET} ({}) [{}]",
95            entity.name,
96            entity.entity_type,
97            entity.id_string()
98        );
99        Ok(())
100    })
101}
102
103/// Create a relationship between two entities.
104pub fn relate(
105    memory_dir: &Path,
106    from: &str,
107    rel_type: &str,
108    to: &str,
109    description: Option<&str>,
110    source: Option<&str>,
111) -> Result<(), String> {
112    let graph_dir = memory_dir.join("graph");
113    let rt = tokio::runtime::Runtime::new().map_err(|e| e.to_string())?;
114    rt.block_on(async {
115        let gm = GraphMemory::open(&graph_dir)
116            .await
117            .map_err(|e| e.to_string())?;
118
119        let rel = gm
120            .add_relationship(NewRelationship {
121                from_entity: from.to_string(),
122                to_entity: to.to_string(),
123                rel_type: rel_type.to_string(),
124                description: description.map(String::from),
125                confidence: None,
126                source: source.map(String::from),
127            })
128            .await
129            .map_err(|e| e.to_string())?;
130
131        println!(
132            "{GREEN}✓{RESET} {from} {CYAN}—[{rel_type}]→{RESET} {to} [{}]",
133            rel.id_string()
134        );
135        Ok(())
136    })
137}
138
139/// Semantic search across entities.
140pub fn search(
141    memory_dir: &Path,
142    query: &str,
143    limit: usize,
144    entity_type: Option<&str>,
145    keyword: Option<&str>,
146) -> Result<(), String> {
147    let graph_dir = memory_dir.join("graph");
148    let rt = tokio::runtime::Runtime::new().map_err(|e| e.to_string())?;
149    rt.block_on(async {
150        let gm = GraphMemory::open(&graph_dir)
151            .await
152            .map_err(|e| e.to_string())?;
153
154        let options = SearchOptions {
155            limit,
156            entity_type: entity_type.map(String::from),
157            keyword: keyword.map(String::from),
158        };
159
160        let results = gm
161            .search_with_options(query, &options)
162            .await
163            .map_err(|e| e.to_string())?;
164
165        if results.is_empty() {
166            println!("{YELLOW}No results.{RESET}");
167            return Ok(());
168        }
169
170        for (i, r) in results.iter().enumerate() {
171            println!(
172                "{BOLD}{}. {}{RESET} ({}) — score: {:.3}",
173                i + 1,
174                r.entity.name,
175                r.entity.entity_type,
176                r.score
177            );
178            println!("   {DIM}{}{RESET}", r.entity.abstract_text);
179        }
180        Ok(())
181    })
182}
183
184/// Ingest a single archive file into the graph (episodes only, no LLM extraction).
185pub fn ingest(memory_dir: &Path, archive_path: &Path) -> Result<(), String> {
186    let graph_dir = memory_dir.join("graph");
187    if !graph_dir.exists() {
188        return Err("Graph store not initialized. Run `recall-echo graph init` first.".into());
189    }
190
191    let content = std::fs::read_to_string(archive_path)
192        .map_err(|e| format!("Failed to read {}: {e}", archive_path.display()))?;
193
194    // Extract session_id and log_number from frontmatter if available
195    let (session_id, log_number) = extract_archive_metadata(&content, archive_path);
196
197    let rt = tokio::runtime::Runtime::new().map_err(|e| e.to_string())?;
198    rt.block_on(async {
199        let gm = GraphMemory::open(&graph_dir)
200            .await
201            .map_err(|e| e.to_string())?;
202
203        let report = gm
204            .ingest_archive(&content, &session_id, log_number, None)
205            .await
206            .map_err(|e| e.to_string())?;
207
208        println!(
209            "{GREEN}✓{RESET} Ingested {}: {} episodes created",
210            archive_path.display(),
211            report.episodes_created
212        );
213        if !report.errors.is_empty() {
214            for err in &report.errors {
215                println!("  {YELLOW}warning:{RESET} {err}");
216            }
217        }
218        Ok(())
219    })
220}
221
222/// Ingest all un-ingested archives in conversations/.
223pub fn ingest_all(memory_dir: &Path) -> Result<(), String> {
224    let graph_dir = memory_dir.join("graph");
225    if !graph_dir.exists() {
226        return Err("Graph store not initialized. Run `recall-echo graph init` first.".into());
227    }
228
229    let conversations_dir = find_conversations_dir(memory_dir)?;
230
231    // Collect all conversation files, sorted
232    let mut files: Vec<_> = std::fs::read_dir(&conversations_dir)
233        .map_err(|e| e.to_string())?
234        .filter_map(|e| e.ok())
235        .filter(|e| {
236            let name = e.file_name().to_string_lossy().to_string();
237            name.starts_with("conversation-") || name.starts_with("archive-log-")
238        })
239        .collect();
240    files.sort_by_key(|e| e.file_name());
241
242    if files.is_empty() {
243        println!("{YELLOW}No conversation archives found.{RESET}");
244        return Ok(());
245    }
246
247    let rt = tokio::runtime::Runtime::new().map_err(|e| e.to_string())?;
248    rt.block_on(async {
249        let gm = GraphMemory::open(&graph_dir)
250            .await
251            .map_err(|e| e.to_string())?;
252
253        let mut total_episodes = 0u32;
254        let mut ingested = 0u32;
255        let mut skipped = 0u32;
256
257        for entry in &files {
258            let path = entry.path();
259            let content = std::fs::read_to_string(&path).map_err(|e| e.to_string())?;
260
261            let (session_id, log_number) = extract_archive_metadata(&content, &path);
262
263            // Check if already ingested (has episodes for this log_number)
264            if let Some(ln) = log_number {
265                if let Ok(Some(_)) = gm.get_episode_by_log_number(ln).await {
266                    skipped += 1;
267                    continue;
268                }
269            }
270
271            let report = gm
272                .ingest_archive(&content, &session_id, log_number, None)
273                .await
274                .map_err(|e| e.to_string())?;
275
276            total_episodes += report.episodes_created;
277            ingested += 1;
278
279            println!(
280                "  {GREEN}✓{RESET} {} — {} episodes",
281                path.file_name().unwrap_or_default().to_string_lossy(),
282                report.episodes_created
283            );
284        }
285
286        println!(
287            "\n{GREEN}✓{RESET} Ingested {ingested} archives ({total_episodes} episodes), skipped {skipped} already ingested"
288        );
289        Ok(())
290    })
291}
292
293/// Extract session_id and log_number from a conversation archive's frontmatter.
294fn extract_archive_metadata(content: &str, path: &Path) -> (String, Option<u32>) {
295    let mut session_id = "unknown".to_string();
296    let mut log_number: Option<u32> = None;
297
298    // Try to extract log number from filename
299    if let Some(name) = path.file_stem().and_then(|s| s.to_str()) {
300        let num_str = name
301            .strip_prefix("conversation-")
302            .or_else(|| name.strip_prefix("archive-log-"));
303        if let Some(num_str) = num_str {
304            if let Ok(n) = num_str.parse::<u32>() {
305                log_number = Some(n);
306            }
307        }
308    }
309
310    // Try to extract session_id from frontmatter
311    if let Some(stripped) = content.strip_prefix("---") {
312        if let Some(end) = stripped.find("---") {
313            let frontmatter = &stripped[..end];
314            for line in frontmatter.lines() {
315                let line = line.trim();
316                if let Some(val) = line.strip_prefix("session_id:") {
317                    session_id = val.trim().trim_matches('"').to_string();
318                }
319            }
320        }
321    }
322
323    (session_id, log_number)
324}
325
326/// Traverse the graph from an entity.
327pub fn traverse(
328    memory_dir: &Path,
329    entity_name: &str,
330    depth: u32,
331    type_filter: Option<&str>,
332) -> Result<(), String> {
333    let graph_dir = memory_dir.join("graph");
334    let rt = tokio::runtime::Runtime::new().map_err(|e| e.to_string())?;
335    rt.block_on(async {
336        let gm = GraphMemory::open(&graph_dir)
337            .await
338            .map_err(|e| e.to_string())?;
339
340        let tree = gm
341            .traverse_filtered(entity_name, depth, type_filter)
342            .await
343            .map_err(|e| e.to_string())?;
344
345        let output = format_traversal(&tree, 0);
346        print!("{output}");
347        Ok(())
348    })
349}
350
351/// Hybrid query: semantic + graph expansion + optional episodes.
352pub fn hybrid_query(
353    memory_dir: &Path,
354    query: &str,
355    limit: usize,
356    entity_type: Option<&str>,
357    keyword: Option<&str>,
358    depth: u32,
359    episodes: bool,
360) -> Result<(), String> {
361    let graph_dir = memory_dir.join("graph");
362    let rt = tokio::runtime::Runtime::new().map_err(|e| e.to_string())?;
363    rt.block_on(async {
364        let gm = GraphMemory::open(&graph_dir)
365            .await
366            .map_err(|e| e.to_string())?;
367
368        let options = QueryOptions {
369            limit,
370            entity_type: entity_type.map(String::from),
371            keyword: keyword.map(String::from),
372            graph_depth: depth,
373            include_episodes: episodes,
374        };
375
376        let result = gm.query(query, &options).await.map_err(|e| e.to_string())?;
377
378        if result.entities.is_empty() && result.episodes.is_empty() {
379            println!("{YELLOW}No results.{RESET}");
380            return Ok(());
381        }
382
383        if !result.entities.is_empty() {
384            println!("{BOLD}Entities:{RESET}");
385            for (i, r) in result.entities.iter().enumerate() {
386                let source_tag = match &r.source {
387                    MatchSource::Semantic => "semantic".to_string(),
388                    MatchSource::Graph { parent, rel_type } => {
389                        format!("graph: {parent} —[{rel_type}]")
390                    }
391                    MatchSource::Keyword => "keyword".to_string(),
392                };
393                println!(
394                    "  {BOLD}{}. {}{RESET} ({}) — {:.3} [{DIM}{source_tag}{RESET}]",
395                    i + 1,
396                    r.entity.name,
397                    r.entity.entity_type,
398                    r.score
399                );
400                println!("     {DIM}{}{RESET}", r.entity.abstract_text);
401            }
402        }
403
404        if !result.episodes.is_empty() {
405            println!("\n{BOLD}Episodes:{RESET}");
406            for (i, ep) in result.episodes.iter().enumerate() {
407                let log = ep
408                    .episode
409                    .log_number
410                    .map(|n| format!("#{n}"))
411                    .unwrap_or_default();
412                println!(
413                    "  {BOLD}{}. {}{RESET} ({}) — {:.3}",
414                    i + 1,
415                    ep.episode.session_id,
416                    log,
417                    ep.score
418                );
419                println!("     {DIM}{}{RESET}", ep.episode.abstract_text);
420            }
421        }
422
423        Ok(())
424    })
425}
426
427/// Extract entities from already-ingested archives using an LLM.
428#[cfg(feature = "llm")]
429pub fn extract(
430    memory_dir: &Path,
431    log: Option<u32>,
432    all: bool,
433    dry_run: bool,
434    model_override: Option<String>,
435    provider_override: Option<String>,
436    delay_ms: u64,
437) -> Result<(), String> {
438    let graph_dir = memory_dir.join("graph");
439    if !graph_dir.exists() {
440        return Err("Graph store not initialized. Run `recall-echo graph init` first.".into());
441    }
442
443    let rt = tokio::runtime::Runtime::new().map_err(|e| e.to_string())?;
444    rt.block_on(async {
445        let gm = GraphMemory::open(&graph_dir)
446            .await
447            .map_err(|e| e.to_string())?;
448
449        // Determine which log numbers to process
450        let log_numbers: Vec<u32> = if let Some(ln) = log {
451            vec![ln]
452        } else if all {
453            gm.unextracted_log_numbers()
454                .await
455                .map_err(|e| e.to_string())?
456                .into_iter()
457                .map(|n| n as u32)
458                .collect()
459        } else {
460            return Err("Specify --log <N> or --all".into());
461        };
462
463        if log_numbers.is_empty() {
464            println!("{YELLOW}No unextracted archives found.{RESET}");
465            return Ok(());
466        }
467
468        // Find conversations directory
469        let conversations_dir = find_conversations_dir(memory_dir)?;
470
471        if dry_run {
472            println!(
473                "{BOLD}Dry run — {}{RESET} archives to extract",
474                log_numbers.len()
475            );
476            for ln in &log_numbers {
477                let path = find_archive_file(&conversations_dir, *ln);
478                let label = match &path {
479                    Ok(p) => {
480                        p.file_name()
481                            .unwrap_or_default()
482                            .to_string_lossy()
483                            .to_string()
484                    }
485                    Err(_) => format!("log {ln:03} (file not found)"),
486                };
487                println!("  {label}");
488            }
489            return Ok(());
490        }
491
492        // Build LLM provider from .recall-echo.toml (CLI flags override)
493        let (llm, model_name) = crate::llm_provider::create_provider(
494            memory_dir,
495            provider_override.as_deref(),
496            model_override.as_deref(),
497        )?;
498
499        println!(
500            "{BOLD}Extracting entities from {} archives using {model_name}{RESET}",
501            log_numbers.len(),
502        );
503
504        let mut total_entities_created = 0u32;
505        let mut total_entities_merged = 0u32;
506        let mut total_entities_skipped = 0u32;
507        let mut total_relationships = 0u32;
508        let mut total_errors = Vec::new();
509        let mut processed = 0u32;
510
511        for ln in &log_numbers {
512            let archive_path = match find_archive_file(&conversations_dir, *ln) {
513                Ok(p) => p,
514                Err(e) => {
515                    println!("  {YELLOW}⚠{RESET} log {ln:03}: {e}");
516                    total_errors.push(format!("log {ln:03}: {e}"));
517                    continue;
518                }
519            };
520
521            let content = std::fs::read_to_string(&archive_path)
522                .map_err(|e| format!("read {}: {e}", archive_path.display()))?;
523
524            let (session_id, _) = extract_archive_metadata(&content, &archive_path);
525
526            let report = gm
527                .extract_from_archive(&content, &session_id, Some(*ln), &*llm)
528                .await
529                .map_err(|e| format!("extraction log {ln:03}: {e}"))?;
530
531            println!(
532                "  {GREEN}✓{RESET} log {ln:03}: +{} entities, ~{} merged, -{} skipped, {} rels",
533                report.entities_created,
534                report.entities_merged,
535                report.entities_skipped,
536                report.relationships_created,
537            );
538
539            gm.mark_extracted(*ln).await.map_err(|e| e.to_string())?;
540
541            total_entities_created += report.entities_created;
542            total_entities_merged += report.entities_merged;
543            total_entities_skipped += report.entities_skipped;
544            total_relationships += report.relationships_created;
545            total_errors.extend(report.errors);
546            processed += 1;
547
548            // Rate limiting between archives
549            if delay_ms > 0 && *ln != *log_numbers.last().unwrap() {
550                tokio::time::sleep(std::time::Duration::from_millis(delay_ms)).await;
551            }
552        }
553
554        println!(
555            "\n{GREEN}✓{RESET} Done: {processed} archives — +{total_entities_created} created, ~{total_entities_merged} merged, -{total_entities_skipped} skipped, {total_relationships} relationships"
556        );
557
558        if !total_errors.is_empty() {
559            println!("\n{YELLOW}Warnings ({}):{RESET}", total_errors.len());
560            for err in total_errors.iter().take(10) {
561                println!("  {DIM}{err}{RESET}");
562            }
563            if total_errors.len() > 10 {
564                println!("  {DIM}... and {} more{RESET}", total_errors.len() - 10);
565            }
566        }
567
568        Ok(())
569    })
570}
571
572// ── Vigil sync commands ──────────────────────────────────────────────
573
574/// Sync vigil-pulse signals and outcomes into the graph.
575pub fn vigil_sync(
576    memory_dir: &Path,
577    signals_path: Option<&Path>,
578    outcomes_path: Option<&Path>,
579) -> Result<(), String> {
580    let graph_dir = memory_dir.join("graph");
581    if !graph_dir.exists() {
582        return Err("Graph store not initialized. Run `recall-echo graph init` first.".into());
583    }
584
585    // Default paths: look for vigil/ and caliber/ relative to memory_dir's parent (entity root)
586    let entity_root = memory_dir.parent().unwrap_or(memory_dir);
587
588    let default_signals = entity_root.join("vigil").join("signals.json");
589    let default_outcomes = entity_root.join("caliber").join("outcomes.json");
590
591    let sig_path = signals_path.unwrap_or(&default_signals);
592    let out_path = outcomes_path.unwrap_or(&default_outcomes);
593
594    let rt = tokio::runtime::Runtime::new().map_err(|e| e.to_string())?;
595    rt.block_on(async {
596        let gm = GraphMemory::open(&graph_dir)
597            .await
598            .map_err(|e| e.to_string())?;
599
600        let report = gm
601            .sync_vigil(sig_path, out_path)
602            .await
603            .map_err(|e| e.to_string())?;
604
605        println!("{BOLD}Vigil Sync{RESET}");
606        println!("  Measurements: +{}", report.measurements_created);
607        println!("  Outcomes:     +{}", report.outcomes_created);
608        println!("  Relationships: +{}", report.relationships_created);
609        println!("  Skipped:       {}", report.skipped);
610
611        if !report.errors.is_empty() {
612            println!("\n  {YELLOW}Warnings:{RESET}");
613            for err in &report.errors {
614                println!("    {DIM}{err}{RESET}");
615            }
616        }
617
618        if report.measurements_created == 0 && report.outcomes_created == 0 {
619            println!("\n  {DIM}No new data — graph is in sync.{RESET}");
620        }
621
622        Ok(())
623    })
624}
625
626// ── Pipeline commands ──────────────────────────────────────────────────
627
628/// Sync pipeline documents into the graph.
629pub fn pipeline_sync(memory_dir: &Path, docs_dir_override: Option<&Path>) -> Result<(), String> {
630    let graph_dir = memory_dir.join("graph");
631    if !graph_dir.exists() {
632        return Err("Graph store not initialized. Run `recall-echo graph init` first.".into());
633    }
634
635    // Resolve docs directory: CLI flag > config > error
636    let docs_dir = if let Some(d) = docs_dir_override {
637        d.to_path_buf()
638    } else {
639        let cfg = crate::config::load_from_dir(memory_dir);
640        match cfg.pipeline.and_then(|p| p.docs_dir) {
641            Some(d) => {
642                let path = PathBuf::from(shellexpand(&d));
643                if !path.exists() {
644                    return Err(format!(
645                        "Configured docs_dir does not exist: {}",
646                        path.display()
647                    ));
648                }
649                path
650            }
651            None => {
652                return Err(
653                    "No docs directory specified. Use --docs-dir or set [pipeline] docs_dir in config.".into(),
654                );
655            }
656        }
657    };
658
659    // Read pipeline documents
660    let docs = read_pipeline_docs(&docs_dir)?;
661
662    let rt = tokio::runtime::Runtime::new().map_err(|e| e.to_string())?;
663    rt.block_on(async {
664        let gm = GraphMemory::open(&graph_dir)
665            .await
666            .map_err(|e| e.to_string())?;
667
668        let report = gm.sync_pipeline(&docs).await.map_err(|e| e.to_string())?;
669
670        println!("{BOLD}Pipeline Sync{RESET}");
671        println!("  Created:      {}", report.entities_created);
672        println!("  Updated:      {}", report.entities_updated);
673        println!("  Archived:     {}", report.entities_archived);
674        println!(
675            "  Relationships: +{} / ~{} skipped",
676            report.relationships_created, report.relationships_skipped
677        );
678
679        if !report.errors.is_empty() {
680            println!("\n  {YELLOW}Warnings:{RESET}");
681            for err in &report.errors {
682                println!("    {DIM}{err}{RESET}");
683            }
684        }
685
686        if report.entities_created == 0
687            && report.entities_updated == 0
688            && report.entities_archived == 0
689        {
690            println!("\n  {DIM}No changes — graph is in sync.{RESET}");
691        }
692
693        Ok(())
694    })
695}
696
697/// Show pipeline health stats.
698pub fn pipeline_status(memory_dir: &Path, staleness_days: u32) -> Result<(), String> {
699    let graph_dir = memory_dir.join("graph");
700    if !graph_dir.exists() {
701        return Err("Graph store not initialized. Run `recall-echo graph init` first.".into());
702    }
703
704    let rt = tokio::runtime::Runtime::new().map_err(|e| e.to_string())?;
705    rt.block_on(async {
706        let gm = GraphMemory::open(&graph_dir)
707            .await
708            .map_err(|e| e.to_string())?;
709
710        let stats = gm
711            .pipeline_stats(staleness_days)
712            .await
713            .map_err(|e| e.to_string())?;
714
715        println!(
716            "{BOLD}Pipeline Status{RESET} ({} entities)",
717            stats.total_entities
718        );
719
720        if stats.by_stage.is_empty() {
721            println!(
722                "  {DIM}No pipeline entities in graph. Run `graph pipeline sync` first.{RESET}"
723            );
724            return Ok(());
725        }
726
727        // Display stages in pipeline order
728        let stage_order = ["learning", "thoughts", "curiosity", "reflections", "praxis"];
729        for stage in &stage_order {
730            if let Some(statuses) = stats.by_stage.get(*stage) {
731                println!("\n  {CYAN}{}{RESET}", stage.to_uppercase());
732                let mut items: Vec<_> = statuses.iter().collect();
733                items.sort_by_key(|(s, _)| (*s).clone());
734                for (status, count) in items {
735                    println!("    {status}: {count}");
736                }
737            }
738        }
739
740        if !stats.stale_thoughts.is_empty() {
741            println!("\n  {YELLOW}Stale thoughts (>{staleness_days}d):{RESET}");
742            for entity in &stats.stale_thoughts {
743                println!("    {DIM}•{RESET} {}", entity.name);
744            }
745        }
746
747        if !stats.stale_questions.is_empty() {
748            println!(
749                "\n  {YELLOW}Stale questions (>{}d):{RESET}",
750                staleness_days * 2
751            );
752            for entity in &stats.stale_questions {
753                println!("    {DIM}•{RESET} {}", entity.name);
754            }
755        }
756
757        if let Some(ref last) = stats.last_movement {
758            println!("\n  {DIM}Last movement: {last}{RESET}");
759        }
760
761        Ok(())
762    })
763}
764
765/// Trace pipeline flow for an entity.
766pub fn pipeline_flow(memory_dir: &Path, entity_name: &str) -> Result<(), String> {
767    let graph_dir = memory_dir.join("graph");
768    if !graph_dir.exists() {
769        return Err("Graph store not initialized. Run `recall-echo graph init` first.".into());
770    }
771
772    let rt = tokio::runtime::Runtime::new().map_err(|e| e.to_string())?;
773    rt.block_on(async {
774        let gm = GraphMemory::open(&graph_dir)
775            .await
776            .map_err(|e| e.to_string())?;
777
778        let chain = gm
779            .pipeline_flow(entity_name)
780            .await
781            .map_err(|e| e.to_string())?;
782
783        if chain.is_empty() {
784            println!("{YELLOW}No pipeline relationships found for \"{entity_name}\".{RESET}");
785            return Ok(());
786        }
787
788        println!("{BOLD}Pipeline Flow: {entity_name}{RESET}\n");
789        for (source, rel_type, target) in &chain {
790            println!(
791                "  {} ({}) {CYAN}—[{rel_type}]→{RESET} {} ({})",
792                source.name, source.entity_type, target.name, target.entity_type
793            );
794        }
795
796        Ok(())
797    })
798}
799
800/// List stale pipeline entities.
801pub fn pipeline_stale(memory_dir: &Path, staleness_days: u32) -> Result<(), String> {
802    let graph_dir = memory_dir.join("graph");
803    if !graph_dir.exists() {
804        return Err("Graph store not initialized. Run `recall-echo graph init` first.".into());
805    }
806
807    let rt = tokio::runtime::Runtime::new().map_err(|e| e.to_string())?;
808    rt.block_on(async {
809        let gm = GraphMemory::open(&graph_dir)
810            .await
811            .map_err(|e| e.to_string())?;
812
813        let stats = gm
814            .pipeline_stats(staleness_days)
815            .await
816            .map_err(|e| e.to_string())?;
817
818        let total_stale = stats.stale_thoughts.len() + stats.stale_questions.len();
819        if total_stale == 0 {
820            println!("{GREEN}✓{RESET} No stale pipeline entities.");
821            return Ok(());
822        }
823
824        println!("{BOLD}Stale Pipeline Entities{RESET}\n");
825
826        if !stats.stale_thoughts.is_empty() {
827            println!("  {YELLOW}Thoughts (>{staleness_days} days):{RESET}");
828            for entity in &stats.stale_thoughts {
829                println!("    • {} {DIM}({}){RESET}", entity.name, entity.entity_type);
830            }
831        }
832
833        if !stats.stale_questions.is_empty() {
834            println!("  {YELLOW}Questions (>{} days):{RESET}", staleness_days * 2);
835            for entity in &stats.stale_questions {
836                println!("    • {} {DIM}({}){RESET}", entity.name, entity.entity_type);
837            }
838        }
839
840        Ok(())
841    })
842}
843
844/// Read pipeline documents from a directory.
845fn read_pipeline_docs(dir: &Path) -> Result<PipelineDocuments, String> {
846    let read_or_empty = |name: &str| -> String {
847        let path = dir.join(name);
848        std::fs::read_to_string(&path).unwrap_or_default()
849    };
850
851    Ok(PipelineDocuments {
852        learning: read_or_empty("LEARNING.md"),
853        thoughts: read_or_empty("THOUGHTS.md"),
854        curiosity: read_or_empty("CURIOSITY.md"),
855        reflections: read_or_empty("REFLECTIONS.md"),
856        praxis: read_or_empty("PRAXIS.md"),
857    })
858}
859
860/// Expand ~ to home directory in paths.
861fn shellexpand(path: &str) -> String {
862    if let Some(rest) = path.strip_prefix("~/") {
863        if let Some(home) = dirs::home_dir() {
864            return home.join(rest).to_string_lossy().to_string();
865        }
866    }
867    path.to_string()
868}
869
870/// Find the conversations directory — checks memory_dir/conversations/ then parent/conversations/.
871fn find_conversations_dir(memory_dir: &Path) -> Result<PathBuf, String> {
872    let conv = memory_dir.join("conversations");
873    if conv.exists() {
874        return Ok(conv);
875    }
876    if let Some(parent) = memory_dir.parent() {
877        let parent_conv = parent.join("conversations");
878        if parent_conv.exists() {
879            return Ok(parent_conv);
880        }
881    }
882    Err("conversations/ directory not found".into())
883}
884
885/// Find the archive file for a given log number.
886#[cfg(feature = "llm")]
887fn find_archive_file(conversations_dir: &Path, log_number: u32) -> Result<PathBuf, String> {
888    // Try both naming conventions
889    let patterns = [
890        format!("conversation-{log_number:03}.md"),
891        format!("conversation-{log_number}.md"),
892        format!("archive-log-{log_number:03}.md"),
893        format!("archive-log-{log_number}.md"),
894    ];
895
896    for name in &patterns {
897        let path = conversations_dir.join(name);
898        if path.exists() {
899            return Ok(path);
900        }
901    }
902
903    Err(format!("no archive file for log {log_number:03}"))
904}