Skip to main content

gity_cli/
lib.rs

1use clap::{Parser, Subcommand};
2use gity_ipc::{
3    DaemonCommand, DaemonError, DaemonHealth, DaemonMetrics, DaemonResponse, DaemonService,
4    FsMonitorSnapshot, JobKind, LogEntry, RepoHealthDetail, RepoStatusDetail, RepoSummary,
5    ValidatedPath,
6};
7use std::{path::PathBuf, time::SystemTime};
8use thiserror::Error;
9
10/// Converts a PathBuf to a ValidatedPath, wrapping any validation errors.
11fn validated_path(path: PathBuf) -> Result<ValidatedPath, CliError> {
12    ValidatedPath::new(path).map_err(|e| CliError::Message(e.to_string()))
13}
14
15/// CLI definition shared by the `gity` binary and tests.
16#[derive(Debug, Parser)]
17#[command(author, version, about = "Fast Git helper daemon")]
18pub struct Cli {
19    #[command(subcommand)]
20    pub command: Commands,
21}
22
23#[derive(Debug, Subcommand)]
24pub enum Commands {
25    /// Register a repository with the daemon.
26    Register { repo_path: PathBuf },
27    /// Remove a repository from the daemon.
28    Unregister { repo_path: PathBuf },
29    /// List registered repositories.
30    List {
31        /// Include daemon metrics in the listing output.
32        #[arg(long)]
33        stats: bool,
34    },
35    /// Stream daemon notifications (e.g., watcher events).
36    Events,
37    /// Display cached status for a repository.
38    Status { repo_path: PathBuf },
39    /// List files changed since a generation token.
40    Changed {
41        repo_path: PathBuf,
42        #[arg(long)]
43        since: Option<u64>,
44    },
45    /// Print daemon logs for a repository.
46    Logs {
47        repo_path: PathBuf,
48        #[arg(long)]
49        follow: bool,
50        #[arg(long, default_value_t = 50)]
51        limit: usize,
52    },
53    /// Git fsmonitor helper entrypoint (invoked by core.fsmonitor).
54    FsmonitorHelper {
55        #[arg(value_parser = clap::value_parser!(u8).range(1..=2), default_value_t = 2)]
56        version: u8,
57        #[arg()]
58        token: Option<String>,
59        #[arg(long)]
60        repo: Option<PathBuf>,
61    },
62    /// Trigger background prefetch for a repository.
63    Prefetch {
64        repo_path: PathBuf,
65        /// Run immediately instead of queuing.
66        #[arg(long)]
67        now: bool,
68    },
69    /// Force maintenance tasks (commit-graph, GC) for a repository.
70    Maintain { repo_path: PathBuf },
71    /// Run health diagnostics for a repository.
72    Health { repo_path: PathBuf },
73    /// Launch the system tray UI.
74    Tray,
75    /// Talk to the daemon control plane.
76    #[command(subcommand)]
77    Daemon(DaemonCommands),
78    /// Database maintenance operations.
79    #[command(subcommand)]
80    Db(DbCommands),
81}
82
83#[derive(Debug, Subcommand)]
84pub enum DbCommands {
85    /// Show database statistics (size, entry counts).
86    Stats,
87    /// Compact database files to reclaim space.
88    Compact,
89    /// Prune old log entries from persistent storage.
90    PruneLogs {
91        /// Maximum age in days (entries older than this are removed).
92        #[arg(long, default_value = "7")]
93        older_than: u64,
94    },
95}
96
97#[derive(Debug, Subcommand)]
98pub enum DaemonCommands {
99    /// Start the daemon in the current process (foreground).
100    Run,
101    /// Start the daemon as a detached background process.
102    Start,
103    /// Stop a running daemon gracefully.
104    Stop,
105    /// Run daemon for a single repo, service queued jobs, then exit.
106    Oneshot { repo_path: PathBuf },
107    /// Fetch daemon health from a running instance.
108    Health,
109    /// Print daemon metrics and exit.
110    Metrics,
111    /// Request that a background job runs immediately.
112    QueueJob {
113        repo_path: PathBuf,
114        #[arg(value_enum)]
115        job: CliJobKind,
116    },
117}
118
119#[derive(Debug, Clone, Copy, clap::ValueEnum)]
120pub enum CliJobKind {
121    Prefetch,
122    Maintenance,
123}
124
125impl From<CliJobKind> for JobKind {
126    fn from(value: CliJobKind) -> Self {
127        match value {
128            CliJobKind::Prefetch => JobKind::Prefetch,
129            CliJobKind::Maintenance => JobKind::Maintenance,
130        }
131    }
132}
133
134/// High-level action resolved from the CLI.
135#[derive(Debug)]
136pub enum CliAction {
137    Rpc(DaemonCommand),
138    List {
139        stats: bool,
140    },
141    Logs {
142        repo_path: PathBuf,
143        follow: bool,
144        limit: usize,
145    },
146    FsMonitorHelper {
147        version: u8,
148        token: Option<String>,
149        repo: Option<PathBuf>,
150    },
151    StreamEvents,
152    RunDaemon,
153    StartDaemon,
154    StopDaemon,
155    OneshotDaemon {
156        repo_path: PathBuf,
157    },
158    RunTray,
159    DbStats,
160    DbCompact,
161    DbPruneLogs {
162        older_than_days: u64,
163    },
164}
165
166impl Cli {
167    pub fn into_action(self) -> Result<CliAction, CliError> {
168        match self.command {
169            Commands::Register { repo_path } => Ok(CliAction::Rpc(DaemonCommand::RegisterRepo {
170                repo_path: validated_path(repo_path)?,
171            })),
172            Commands::Unregister { repo_path } => {
173                Ok(CliAction::Rpc(DaemonCommand::UnregisterRepo {
174                    repo_path: validated_path(repo_path)?,
175                }))
176            }
177            Commands::List { stats } => Ok(CliAction::List { stats }),
178            Commands::Events => Ok(CliAction::StreamEvents),
179            Commands::Changed { repo_path, since } => {
180                Ok(CliAction::Rpc(DaemonCommand::FsMonitorSnapshot {
181                    repo_path: validated_path(repo_path)?,
182                    last_seen_generation: since,
183                }))
184            }
185            Commands::Logs {
186                repo_path,
187                follow,
188                limit,
189            } => Ok(CliAction::Logs {
190                repo_path,
191                follow,
192                limit,
193            }),
194            Commands::FsmonitorHelper {
195                version,
196                token,
197                repo,
198            } => Ok(CliAction::FsMonitorHelper {
199                version,
200                token,
201                repo,
202            }),
203            Commands::Status { repo_path } => Ok(CliAction::Rpc(DaemonCommand::Status {
204                repo_path: validated_path(repo_path)?,
205                known_generation: None,
206            })),
207            Commands::Prefetch { repo_path, now: _ } => {
208                Ok(CliAction::Rpc(DaemonCommand::QueueJob {
209                    repo_path: validated_path(repo_path)?,
210                    job: JobKind::Prefetch,
211                }))
212            }
213            Commands::Maintain { repo_path } => Ok(CliAction::Rpc(DaemonCommand::QueueJob {
214                repo_path: validated_path(repo_path)?,
215                job: JobKind::Maintenance,
216            })),
217            Commands::Health { repo_path } => Ok(CliAction::Rpc(DaemonCommand::RepoHealth {
218                repo_path: validated_path(repo_path)?,
219            })),
220            Commands::Tray => Ok(CliAction::RunTray),
221            Commands::Daemon(cmd) => match cmd {
222                DaemonCommands::Run => Ok(CliAction::RunDaemon),
223                DaemonCommands::Start => Ok(CliAction::StartDaemon),
224                DaemonCommands::Stop => Ok(CliAction::StopDaemon),
225                DaemonCommands::Oneshot { repo_path } => Ok(CliAction::OneshotDaemon { repo_path }),
226                DaemonCommands::Health => Ok(CliAction::Rpc(DaemonCommand::HealthCheck)),
227                DaemonCommands::Metrics => Ok(CliAction::Rpc(DaemonCommand::Metrics)),
228                DaemonCommands::QueueJob { repo_path, job } => {
229                    Ok(CliAction::Rpc(DaemonCommand::QueueJob {
230                        repo_path: validated_path(repo_path)?,
231                        job: job.into(),
232                    }))
233                }
234            },
235            Commands::Db(cmd) => match cmd {
236                DbCommands::Stats => Ok(CliAction::DbStats),
237                DbCommands::Compact => Ok(CliAction::DbCompact),
238                DbCommands::PruneLogs { older_than } => Ok(CliAction::DbPruneLogs {
239                    older_than_days: older_than,
240                }),
241            },
242        }
243    }
244}
245
246pub struct CliOutput {
247    pub message: String,
248}
249
250/// Error type returned by the CLI harness.
251#[derive(Debug, Error)]
252pub enum CliError {
253    #[error("{0}")]
254    Message(String),
255    #[error(transparent)]
256    Daemon(#[from] DaemonError),
257}
258
259pub async fn execute_rpc(
260    service: &impl DaemonService,
261    command: DaemonCommand,
262) -> Result<CliOutput, CliError> {
263    let response = service.execute(command).await?;
264    Ok(CliOutput {
265        message: format_response(&response),
266    })
267}
268
269pub fn format_response(response: &DaemonResponse) -> String {
270    match response {
271        DaemonResponse::Ack(ack) => ack.message.clone(),
272        DaemonResponse::RepoList(list) => format_repo_list(list),
273        DaemonResponse::RepoStatus(detail) => format_repo_status(detail),
274        DaemonResponse::RepoStatusUnchanged {
275            repo_path,
276            generation,
277        } => format!(
278            "{}: unchanged (generation {})",
279            repo_path.display(),
280            generation
281        ),
282        DaemonResponse::Health(health) => format_health(health),
283        DaemonResponse::RepoHealth(detail) => format_repo_health(detail),
284        DaemonResponse::Metrics(metrics) => format_metrics(metrics),
285        DaemonResponse::FsMonitorSnapshot(snapshot) => format_fsmonitor_snapshot(snapshot),
286        DaemonResponse::Logs(entries) => format_logs(entries),
287        DaemonResponse::Error(msg) => msg.clone(),
288    }
289}
290
291fn format_repo_list(list: &[RepoSummary]) -> String {
292    if list.is_empty() {
293        "no repositories registered".to_string()
294    } else {
295        list.iter()
296            .map(format_repo_summary_line)
297            .collect::<Vec<_>>()
298            .join("\n")
299    }
300}
301
302fn format_repo_summary_line(summary: &RepoSummary) -> String {
303    format!(
304        "{} [{} jobs, status {}, gen {}]",
305        summary.repo_path.display(),
306        summary.pending_jobs,
307        summary.status.as_str(),
308        summary.generation
309    )
310}
311
312pub fn format_repo_status(detail: &RepoStatusDetail) -> String {
313    if detail.dirty_paths.is_empty() {
314        format!(
315            "{}: clean (generation {})",
316            detail.repo_path.display(),
317            detail.generation
318        )
319    } else {
320        let mut lines = vec![format!(
321            "{} (generation {}):",
322            detail.repo_path.display(),
323            detail.generation
324        )];
325        lines.extend(
326            detail
327                .dirty_paths
328                .iter()
329                .map(|path| format!("  {}", path.display())),
330        );
331        lines.join("\n")
332    }
333}
334
335fn format_health(health: &DaemonHealth) -> String {
336    let mut lines = vec![format!(
337        "repos: {}, pending jobs: {}, uptime: {}s",
338        health.repo_count, health.pending_jobs, health.uptime_seconds
339    )];
340    if !health.repo_generations.is_empty() {
341        lines.push("repo generations:".into());
342        for entry in &health.repo_generations {
343            lines.push(format!(
344                "  {} -> generation {}",
345                entry.repo_path.display(),
346                entry.generation
347            ));
348        }
349    }
350    lines.join("\n")
351}
352
353fn format_repo_health(detail: &RepoHealthDetail) -> String {
354    let mut lines = vec![format!("Health report for {}", detail.repo_path.display())];
355    lines.push(format!("  generation: {}", detail.generation));
356    lines.push(format!("  pending jobs: {}", detail.pending_jobs));
357    lines.push(format!(
358        "  watcher: {}",
359        if detail.watcher_active {
360            "active"
361        } else {
362            "inactive"
363        }
364    ));
365    if let Some(last_event) = detail.last_event {
366        let timestamp = last_event
367            .duration_since(SystemTime::UNIX_EPOCH)
368            .map(|dur| dur.as_secs())
369            .unwrap_or_default();
370        lines.push(format!("  last event: {} (unix)", timestamp));
371    } else {
372        lines.push("  last event: none".into());
373    }
374    lines.push(format!("  dirty paths: {}", detail.dirty_path_count));
375    lines.push(format!(
376        "  sled integrity: {}",
377        if detail.sled_ok { "ok" } else { "ERROR" }
378    ));
379    lines.push(format!(
380        "  needs reconciliation: {}",
381        if detail.needs_reconciliation {
382            "yes"
383        } else {
384            "no"
385        }
386    ));
387    lines.push(format!(
388        "  throttling: {}",
389        if detail.throttling_active {
390            "active"
391        } else {
392            "off"
393        }
394    ));
395    if let Some(next_job) = &detail.next_scheduled_job {
396        lines.push(format!("  next scheduled job: {}", next_job));
397    }
398    lines.join("\n")
399}
400
401fn format_metrics(metrics: &DaemonMetrics) -> String {
402    let mut lines = Vec::new();
403    lines.push(format!(
404        "daemon: cpu={:.1}%, rss={}, uptime={}s, pending jobs={}",
405        metrics.global.cpu_percent,
406        format_bytes(metrics.global.rss_bytes),
407        metrics.global.uptime_seconds,
408        metrics.global.pending_jobs
409    ));
410    if !metrics.repos.is_empty() {
411        lines.push("repo queue depth:".into());
412        for repo in &metrics.repos {
413            lines.push(format!(
414                "  {} -> {} pending",
415                repo.repo_path.display(),
416                repo.pending_jobs
417            ));
418        }
419    }
420    lines.push("job counters:".into());
421    for kind in JobKind::ALL {
422        let counts = metrics.jobs.get(&kind).copied().unwrap_or_default();
423        lines.push(format!("  {}", render_metric_line(kind, counts)));
424    }
425    let mut extras: Vec<_> = metrics
426        .jobs
427        .iter()
428        .filter(|(kind, _)| !JobKind::ALL.contains(kind))
429        .map(|(kind, counts)| format!("  {}", render_metric_line(*kind, *counts)))
430        .collect();
431    lines.append(&mut extras);
432    lines.join("\n")
433}
434
435fn format_bytes(bytes: u64) -> String {
436    const UNITS: [&str; 5] = ["B", "KiB", "MiB", "GiB", "TiB"];
437    let mut value = bytes as f64;
438    let mut unit = 0;
439    while value >= 1024.0 && unit < UNITS.len() - 1 {
440        value /= 1024.0;
441        unit += 1;
442    }
443    if unit == 0 {
444        format!("{bytes} {}", UNITS[unit])
445    } else {
446        format!("{value:.1} {}", UNITS[unit])
447    }
448}
449
450fn render_metric_line(kind: JobKind, counts: gity_ipc::JobMetrics) -> String {
451    format!(
452        "{}: spawned={}, completed={}, failed={}",
453        kind.as_str(),
454        counts.spawned,
455        counts.completed,
456        counts.failed
457    )
458}
459
460fn format_logs(entries: &[LogEntry]) -> String {
461    if entries.is_empty() {
462        "no log entries found".into()
463    } else {
464        entries
465            .iter()
466            .map(format_log_entry)
467            .collect::<Vec<_>>()
468            .join("\n")
469    }
470}
471
472fn format_fsmonitor_snapshot(snapshot: &FsMonitorSnapshot) -> String {
473    if snapshot.dirty_paths.is_empty() {
474        format!(
475            "{}: no changes (generation {})",
476            snapshot.repo_path.display(),
477            snapshot.generation
478        )
479    } else {
480        let mut lines = vec![format!(
481            "{} (generation {}):",
482            snapshot.repo_path.display(),
483            snapshot.generation
484        )];
485        lines.extend(
486            snapshot
487                .dirty_paths
488                .iter()
489                .map(|path| format!("  {}", path.display())),
490        );
491        lines.join("\n")
492    }
493}
494
495fn format_log_entry(entry: &LogEntry) -> String {
496    let timestamp = entry
497        .timestamp
498        .duration_since(SystemTime::UNIX_EPOCH)
499        .map(|dur| dur.as_secs())
500        .unwrap_or_default();
501    format!(
502        "[{}] {}: {}",
503        timestamp,
504        entry.repo_path.display(),
505        entry.message
506    )
507}
508
509#[cfg(test)]
510mod tests {
511    use super::*;
512    use async_trait::async_trait;
513    use gity_ipc::{
514        DaemonHealth, DaemonMetrics, DaemonResponse, FsMonitorSnapshot, JobKind, JobMetrics,
515        RepoStatus, RepoStatusDetail, RepoSummary,
516    };
517    use std::collections::HashMap;
518    use std::sync::{Arc, Mutex};
519
520    #[test]
521    fn cli_action_for_register() {
522        let cli = Cli::parse_from(["gity", "register", "/tmp/demo"]);
523        match cli.into_action().unwrap() {
524            CliAction::Rpc(DaemonCommand::RegisterRepo { repo_path }) => {
525                assert_eq!(repo_path.as_path(), std::path::Path::new("/tmp/demo"));
526            }
527            other => panic!("unexpected action: {other:?}"),
528        }
529    }
530
531    #[test]
532    fn cli_action_for_status() {
533        let cli = Cli::parse_from(["gity", "status", "/tmp/demo"]);
534        match cli.into_action().unwrap() {
535            CliAction::Rpc(DaemonCommand::Status {
536                repo_path,
537                known_generation,
538            }) => {
539                assert_eq!(repo_path.as_path(), std::path::Path::new("/tmp/demo"));
540                assert!(known_generation.is_none());
541            }
542            other => panic!("unexpected action: {other:?}"),
543        }
544    }
545
546    #[test]
547    fn cli_action_for_list_with_stats() {
548        let cli = Cli::parse_from(["gity", "list", "--stats"]);
549        match cli.into_action().unwrap() {
550            CliAction::List { stats } => assert!(stats),
551            other => panic!("unexpected action: {other:?}"),
552        }
553    }
554
555    #[test]
556    fn cli_action_for_daemon_metrics() {
557        let cli = Cli::parse_from(["gity", "daemon", "metrics"]);
558        match cli.into_action().unwrap() {
559            CliAction::Rpc(DaemonCommand::Metrics) => {}
560            other => panic!("unexpected action: {other:?}"),
561        }
562    }
563
564    #[test]
565    fn cli_action_for_events() {
566        let cli = Cli::parse_from(["gity", "events"]);
567        match cli.into_action().unwrap() {
568            CliAction::StreamEvents => {}
569            other => panic!("unexpected action: {other:?}"),
570        }
571    }
572
573    #[test]
574    fn cli_action_for_fsmonitor_helper() {
575        let cli = Cli::parse_from(["gity", "fsmonitor-helper", "2", "123"]);
576        match cli.into_action().unwrap() {
577            CliAction::FsMonitorHelper { version, token, .. } => {
578                assert_eq!(version, 2);
579                assert_eq!(token.as_deref(), Some("123"));
580            }
581            other => panic!("unexpected action: {other:?}"),
582        }
583    }
584
585    #[test]
586    fn cli_action_for_logs_follow() {
587        let cli = Cli::parse_from(["gity", "logs", "/tmp/demo", "--follow", "--limit", "5"]);
588        match cli.into_action().unwrap() {
589            CliAction::Logs {
590                repo_path,
591                follow,
592                limit,
593            } => {
594                assert_eq!(repo_path, PathBuf::from("/tmp/demo"));
595                assert!(follow);
596                assert_eq!(limit, 5);
597            }
598            other => panic!("unexpected action: {other:?}"),
599        }
600    }
601
602    #[test]
603    fn format_fsmonitor_snapshot_lists_paths() {
604        let snapshot = FsMonitorSnapshot {
605            repo_path: PathBuf::from("/tmp/demo"),
606            dirty_paths: vec![PathBuf::from("a.txt"), PathBuf::from("dir/b.txt")],
607            generation: 3,
608        };
609        let text = format_fsmonitor_snapshot(&snapshot);
610        assert!(text.contains("a.txt"));
611        assert!(text.contains("dir/b.txt"));
612        assert!(text.contains("generation 3"));
613    }
614
615    #[tokio::test]
616    async fn prints_repo_list() {
617        let service = TestService::new(vec![DaemonResponse::RepoList(vec![RepoSummary {
618            repo_path: PathBuf::from("/tmp/demo"),
619            status: RepoStatus::Idle,
620            pending_jobs: 1,
621            last_event: None,
622            generation: 0,
623        }])]);
624        let output = execute_rpc(&service, DaemonCommand::ListRepos)
625            .await
626            .expect("list command succeeds");
627        assert!(output.message.contains("/tmp/demo"));
628        assert!(output.message.contains("[1 jobs, status idle, gen 0]"));
629    }
630
631    #[test]
632    fn format_status_includes_dirty_paths() {
633        let output = format_repo_status(&RepoStatusDetail {
634            repo_path: PathBuf::from("/tmp/demo"),
635            dirty_paths: vec![PathBuf::from("file.txt")],
636            generation: 42,
637        });
638        assert!(output.contains("file.txt"));
639        assert!(output.contains("generation 42"));
640    }
641
642    #[test]
643    fn format_metrics_includes_counts() {
644        let mut jobs = HashMap::new();
645        jobs.insert(
646            JobKind::Prefetch,
647            JobMetrics {
648                spawned: 3,
649                completed: 2,
650                failed: 1,
651            },
652        );
653        let metrics = DaemonMetrics {
654            jobs,
655            global: gity_ipc::GlobalMetrics {
656                pending_jobs: 1,
657                uptime_seconds: 2,
658                cpu_percent: 12.5,
659                rss_bytes: 2048,
660            },
661            repos: vec![gity_ipc::RepoMetrics {
662                repo_path: PathBuf::from("/tmp/demo"),
663                pending_jobs: 3,
664            }],
665        };
666        let output = format_metrics(&metrics);
667        assert!(output.contains("daemon: cpu=12.5%"));
668        assert!(output.contains("prefetch: spawned=3, completed=2, failed=1"));
669        assert!(output.contains("maintenance: spawned=0, completed=0, failed=0"));
670        assert!(output.contains("/tmp/demo -> 3 pending"));
671    }
672
673    #[test]
674    fn format_health_lists_generations() {
675        let health = DaemonHealth {
676            repo_count: 2,
677            pending_jobs: 1,
678            uptime_seconds: 10,
679            repo_generations: vec![
680                gity_ipc::RepoGeneration {
681                    repo_path: PathBuf::from("/repo/a"),
682                    generation: 5,
683                },
684                gity_ipc::RepoGeneration {
685                    repo_path: PathBuf::from("/repo/b"),
686                    generation: 3,
687                },
688            ],
689        };
690        let output = format_health(&health);
691        assert!(output.contains("repos: 2"));
692        assert!(output.contains("/repo/a -> generation 5"));
693        assert!(output.contains("/repo/b -> generation 3"));
694    }
695
696    struct TestService {
697        responses: Arc<Mutex<Vec<DaemonResponse>>>,
698    }
699
700    impl TestService {
701        fn new(responses: Vec<DaemonResponse>) -> Self {
702            Self {
703                responses: Arc::new(Mutex::new(responses)),
704            }
705        }
706    }
707
708    #[async_trait]
709    impl DaemonService for TestService {
710        async fn execute(&self, command: DaemonCommand) -> Result<DaemonResponse, DaemonError> {
711            self.responses
712                .lock()
713                .map_err(|_| DaemonError::Transport("poisoned test service".into()))?
714                .pop()
715                .ok_or_else(|| DaemonError::Rejected(format!("no response for {command:?}")))
716        }
717    }
718}