1use clap::{Parser, Subcommand};
2use gity_ipc::{
3 DaemonCommand, DaemonError, DaemonHealth, DaemonMetrics, DaemonResponse, DaemonService,
4 FsMonitorSnapshot, JobKind, LogEntry, RepoHealthDetail, RepoStatusDetail, RepoSummary,
5 ValidatedPath,
6};
7use std::{path::PathBuf, time::SystemTime};
8use thiserror::Error;
9
10fn validated_path(path: PathBuf) -> Result<ValidatedPath, CliError> {
12 ValidatedPath::new(path).map_err(|e| CliError::Message(e.to_string()))
13}
14
15#[derive(Debug, Parser)]
17#[command(author, version, about = "Fast Git helper daemon")]
18pub struct Cli {
19 #[command(subcommand)]
20 pub command: Commands,
21}
22
23#[derive(Debug, Subcommand)]
24pub enum Commands {
25 Register { repo_path: PathBuf },
27 Unregister { repo_path: PathBuf },
29 List {
31 #[arg(long)]
33 stats: bool,
34 },
35 Events,
37 Status { repo_path: PathBuf },
39 Changed {
41 repo_path: PathBuf,
42 #[arg(long)]
43 since: Option<u64>,
44 },
45 Logs {
47 repo_path: PathBuf,
48 #[arg(long)]
49 follow: bool,
50 #[arg(long, default_value_t = 50)]
51 limit: usize,
52 },
53 FsmonitorHelper {
55 #[arg(value_parser = clap::value_parser!(u8).range(1..=2), default_value_t = 2)]
56 version: u8,
57 #[arg()]
58 token: Option<String>,
59 #[arg(long)]
60 repo: Option<PathBuf>,
61 },
62 Prefetch {
64 repo_path: PathBuf,
65 #[arg(long)]
67 now: bool,
68 },
69 Maintain { repo_path: PathBuf },
71 Health { repo_path: PathBuf },
73 Tray,
75 #[command(subcommand)]
77 Daemon(DaemonCommands),
78 #[command(subcommand)]
80 Db(DbCommands),
81}
82
83#[derive(Debug, Subcommand)]
84pub enum DbCommands {
85 Stats,
87 Compact,
89 PruneLogs {
91 #[arg(long, default_value = "7")]
93 older_than: u64,
94 },
95}
96
97#[derive(Debug, Subcommand)]
98pub enum DaemonCommands {
99 Run,
101 Start,
103 Stop,
105 Oneshot { repo_path: PathBuf },
107 Health,
109 Metrics,
111 QueueJob {
113 repo_path: PathBuf,
114 #[arg(value_enum)]
115 job: CliJobKind,
116 },
117}
118
119#[derive(Debug, Clone, Copy, clap::ValueEnum)]
120pub enum CliJobKind {
121 Prefetch,
122 Maintenance,
123}
124
125impl From<CliJobKind> for JobKind {
126 fn from(value: CliJobKind) -> Self {
127 match value {
128 CliJobKind::Prefetch => JobKind::Prefetch,
129 CliJobKind::Maintenance => JobKind::Maintenance,
130 }
131 }
132}
133
134#[derive(Debug)]
136pub enum CliAction {
137 Rpc(DaemonCommand),
138 List {
139 stats: bool,
140 },
141 Logs {
142 repo_path: PathBuf,
143 follow: bool,
144 limit: usize,
145 },
146 FsMonitorHelper {
147 version: u8,
148 token: Option<String>,
149 repo: Option<PathBuf>,
150 },
151 StreamEvents,
152 RunDaemon,
153 StartDaemon,
154 StopDaemon,
155 OneshotDaemon {
156 repo_path: PathBuf,
157 },
158 RunTray,
159 DbStats,
160 DbCompact,
161 DbPruneLogs {
162 older_than_days: u64,
163 },
164}
165
166impl Cli {
167 pub fn into_action(self) -> Result<CliAction, CliError> {
168 match self.command {
169 Commands::Register { repo_path } => Ok(CliAction::Rpc(DaemonCommand::RegisterRepo {
170 repo_path: validated_path(repo_path)?,
171 })),
172 Commands::Unregister { repo_path } => {
173 Ok(CliAction::Rpc(DaemonCommand::UnregisterRepo {
174 repo_path: validated_path(repo_path)?,
175 }))
176 }
177 Commands::List { stats } => Ok(CliAction::List { stats }),
178 Commands::Events => Ok(CliAction::StreamEvents),
179 Commands::Changed { repo_path, since } => {
180 Ok(CliAction::Rpc(DaemonCommand::FsMonitorSnapshot {
181 repo_path: validated_path(repo_path)?,
182 last_seen_generation: since,
183 }))
184 }
185 Commands::Logs {
186 repo_path,
187 follow,
188 limit,
189 } => Ok(CliAction::Logs {
190 repo_path,
191 follow,
192 limit,
193 }),
194 Commands::FsmonitorHelper {
195 version,
196 token,
197 repo,
198 } => Ok(CliAction::FsMonitorHelper {
199 version,
200 token,
201 repo,
202 }),
203 Commands::Status { repo_path } => Ok(CliAction::Rpc(DaemonCommand::Status {
204 repo_path: validated_path(repo_path)?,
205 known_generation: None,
206 })),
207 Commands::Prefetch { repo_path, now: _ } => {
208 Ok(CliAction::Rpc(DaemonCommand::QueueJob {
209 repo_path: validated_path(repo_path)?,
210 job: JobKind::Prefetch,
211 }))
212 }
213 Commands::Maintain { repo_path } => Ok(CliAction::Rpc(DaemonCommand::QueueJob {
214 repo_path: validated_path(repo_path)?,
215 job: JobKind::Maintenance,
216 })),
217 Commands::Health { repo_path } => Ok(CliAction::Rpc(DaemonCommand::RepoHealth {
218 repo_path: validated_path(repo_path)?,
219 })),
220 Commands::Tray => Ok(CliAction::RunTray),
221 Commands::Daemon(cmd) => match cmd {
222 DaemonCommands::Run => Ok(CliAction::RunDaemon),
223 DaemonCommands::Start => Ok(CliAction::StartDaemon),
224 DaemonCommands::Stop => Ok(CliAction::StopDaemon),
225 DaemonCommands::Oneshot { repo_path } => Ok(CliAction::OneshotDaemon { repo_path }),
226 DaemonCommands::Health => Ok(CliAction::Rpc(DaemonCommand::HealthCheck)),
227 DaemonCommands::Metrics => Ok(CliAction::Rpc(DaemonCommand::Metrics)),
228 DaemonCommands::QueueJob { repo_path, job } => {
229 Ok(CliAction::Rpc(DaemonCommand::QueueJob {
230 repo_path: validated_path(repo_path)?,
231 job: job.into(),
232 }))
233 }
234 },
235 Commands::Db(cmd) => match cmd {
236 DbCommands::Stats => Ok(CliAction::DbStats),
237 DbCommands::Compact => Ok(CliAction::DbCompact),
238 DbCommands::PruneLogs { older_than } => Ok(CliAction::DbPruneLogs {
239 older_than_days: older_than,
240 }),
241 },
242 }
243 }
244}
245
246pub struct CliOutput {
247 pub message: String,
248}
249
250#[derive(Debug, Error)]
252pub enum CliError {
253 #[error("{0}")]
254 Message(String),
255 #[error(transparent)]
256 Daemon(#[from] DaemonError),
257}
258
259pub async fn execute_rpc(
260 service: &impl DaemonService,
261 command: DaemonCommand,
262) -> Result<CliOutput, CliError> {
263 let response = service.execute(command).await?;
264 Ok(CliOutput {
265 message: format_response(&response),
266 })
267}
268
269pub fn format_response(response: &DaemonResponse) -> String {
270 match response {
271 DaemonResponse::Ack(ack) => ack.message.clone(),
272 DaemonResponse::RepoList(list) => format_repo_list(list),
273 DaemonResponse::RepoStatus(detail) => format_repo_status(detail),
274 DaemonResponse::RepoStatusUnchanged {
275 repo_path,
276 generation,
277 } => format!(
278 "{}: unchanged (generation {})",
279 repo_path.display(),
280 generation
281 ),
282 DaemonResponse::Health(health) => format_health(health),
283 DaemonResponse::RepoHealth(detail) => format_repo_health(detail),
284 DaemonResponse::Metrics(metrics) => format_metrics(metrics),
285 DaemonResponse::FsMonitorSnapshot(snapshot) => format_fsmonitor_snapshot(snapshot),
286 DaemonResponse::Logs(entries) => format_logs(entries),
287 DaemonResponse::Error(msg) => msg.clone(),
288 }
289}
290
291fn format_repo_list(list: &[RepoSummary]) -> String {
292 if list.is_empty() {
293 "no repositories registered".to_string()
294 } else {
295 list.iter()
296 .map(format_repo_summary_line)
297 .collect::<Vec<_>>()
298 .join("\n")
299 }
300}
301
302fn format_repo_summary_line(summary: &RepoSummary) -> String {
303 format!(
304 "{} [{} jobs, status {}, gen {}]",
305 summary.repo_path.display(),
306 summary.pending_jobs,
307 summary.status.as_str(),
308 summary.generation
309 )
310}
311
312pub fn format_repo_status(detail: &RepoStatusDetail) -> String {
313 if detail.dirty_paths.is_empty() {
314 format!(
315 "{}: clean (generation {})",
316 detail.repo_path.display(),
317 detail.generation
318 )
319 } else {
320 let mut lines = vec![format!(
321 "{} (generation {}):",
322 detail.repo_path.display(),
323 detail.generation
324 )];
325 lines.extend(
326 detail
327 .dirty_paths
328 .iter()
329 .map(|path| format!(" {}", path.display())),
330 );
331 lines.join("\n")
332 }
333}
334
335fn format_health(health: &DaemonHealth) -> String {
336 let mut lines = vec![format!(
337 "repos: {}, pending jobs: {}, uptime: {}s",
338 health.repo_count, health.pending_jobs, health.uptime_seconds
339 )];
340 if !health.repo_generations.is_empty() {
341 lines.push("repo generations:".into());
342 for entry in &health.repo_generations {
343 lines.push(format!(
344 " {} -> generation {}",
345 entry.repo_path.display(),
346 entry.generation
347 ));
348 }
349 }
350 lines.join("\n")
351}
352
353fn format_repo_health(detail: &RepoHealthDetail) -> String {
354 let mut lines = vec![format!("Health report for {}", detail.repo_path.display())];
355 lines.push(format!(" generation: {}", detail.generation));
356 lines.push(format!(" pending jobs: {}", detail.pending_jobs));
357 lines.push(format!(
358 " watcher: {}",
359 if detail.watcher_active {
360 "active"
361 } else {
362 "inactive"
363 }
364 ));
365 if let Some(last_event) = detail.last_event {
366 let timestamp = last_event
367 .duration_since(SystemTime::UNIX_EPOCH)
368 .map(|dur| dur.as_secs())
369 .unwrap_or_default();
370 lines.push(format!(" last event: {} (unix)", timestamp));
371 } else {
372 lines.push(" last event: none".into());
373 }
374 lines.push(format!(" dirty paths: {}", detail.dirty_path_count));
375 lines.push(format!(
376 " sled integrity: {}",
377 if detail.sled_ok { "ok" } else { "ERROR" }
378 ));
379 lines.push(format!(
380 " needs reconciliation: {}",
381 if detail.needs_reconciliation {
382 "yes"
383 } else {
384 "no"
385 }
386 ));
387 lines.push(format!(
388 " throttling: {}",
389 if detail.throttling_active {
390 "active"
391 } else {
392 "off"
393 }
394 ));
395 if let Some(next_job) = &detail.next_scheduled_job {
396 lines.push(format!(" next scheduled job: {}", next_job));
397 }
398 lines.join("\n")
399}
400
401fn format_metrics(metrics: &DaemonMetrics) -> String {
402 let mut lines = Vec::new();
403 lines.push(format!(
404 "daemon: cpu={:.1}%, rss={}, uptime={}s, pending jobs={}",
405 metrics.global.cpu_percent,
406 format_bytes(metrics.global.rss_bytes),
407 metrics.global.uptime_seconds,
408 metrics.global.pending_jobs
409 ));
410 if !metrics.repos.is_empty() {
411 lines.push("repo queue depth:".into());
412 for repo in &metrics.repos {
413 lines.push(format!(
414 " {} -> {} pending",
415 repo.repo_path.display(),
416 repo.pending_jobs
417 ));
418 }
419 }
420 lines.push("job counters:".into());
421 for kind in JobKind::ALL {
422 let counts = metrics.jobs.get(&kind).copied().unwrap_or_default();
423 lines.push(format!(" {}", render_metric_line(kind, counts)));
424 }
425 let mut extras: Vec<_> = metrics
426 .jobs
427 .iter()
428 .filter(|(kind, _)| !JobKind::ALL.contains(kind))
429 .map(|(kind, counts)| format!(" {}", render_metric_line(*kind, *counts)))
430 .collect();
431 lines.append(&mut extras);
432 lines.join("\n")
433}
434
435fn format_bytes(bytes: u64) -> String {
436 const UNITS: [&str; 5] = ["B", "KiB", "MiB", "GiB", "TiB"];
437 let mut value = bytes as f64;
438 let mut unit = 0;
439 while value >= 1024.0 && unit < UNITS.len() - 1 {
440 value /= 1024.0;
441 unit += 1;
442 }
443 if unit == 0 {
444 format!("{bytes} {}", UNITS[unit])
445 } else {
446 format!("{value:.1} {}", UNITS[unit])
447 }
448}
449
450fn render_metric_line(kind: JobKind, counts: gity_ipc::JobMetrics) -> String {
451 format!(
452 "{}: spawned={}, completed={}, failed={}",
453 kind.as_str(),
454 counts.spawned,
455 counts.completed,
456 counts.failed
457 )
458}
459
460fn format_logs(entries: &[LogEntry]) -> String {
461 if entries.is_empty() {
462 "no log entries found".into()
463 } else {
464 entries
465 .iter()
466 .map(format_log_entry)
467 .collect::<Vec<_>>()
468 .join("\n")
469 }
470}
471
472fn format_fsmonitor_snapshot(snapshot: &FsMonitorSnapshot) -> String {
473 if snapshot.dirty_paths.is_empty() {
474 format!(
475 "{}: no changes (generation {})",
476 snapshot.repo_path.display(),
477 snapshot.generation
478 )
479 } else {
480 let mut lines = vec![format!(
481 "{} (generation {}):",
482 snapshot.repo_path.display(),
483 snapshot.generation
484 )];
485 lines.extend(
486 snapshot
487 .dirty_paths
488 .iter()
489 .map(|path| format!(" {}", path.display())),
490 );
491 lines.join("\n")
492 }
493}
494
495fn format_log_entry(entry: &LogEntry) -> String {
496 let timestamp = entry
497 .timestamp
498 .duration_since(SystemTime::UNIX_EPOCH)
499 .map(|dur| dur.as_secs())
500 .unwrap_or_default();
501 format!(
502 "[{}] {}: {}",
503 timestamp,
504 entry.repo_path.display(),
505 entry.message
506 )
507}
508
509#[cfg(test)]
510mod tests {
511 use super::*;
512 use async_trait::async_trait;
513 use gity_ipc::{
514 DaemonHealth, DaemonMetrics, DaemonResponse, FsMonitorSnapshot, JobKind, JobMetrics,
515 RepoStatus, RepoStatusDetail, RepoSummary,
516 };
517 use std::collections::HashMap;
518 use std::sync::{Arc, Mutex};
519
520 #[test]
521 fn cli_action_for_register() {
522 let cli = Cli::parse_from(["gity", "register", "/tmp/demo"]);
523 match cli.into_action().unwrap() {
524 CliAction::Rpc(DaemonCommand::RegisterRepo { repo_path }) => {
525 assert_eq!(repo_path.as_path(), std::path::Path::new("/tmp/demo"));
526 }
527 other => panic!("unexpected action: {other:?}"),
528 }
529 }
530
531 #[test]
532 fn cli_action_for_status() {
533 let cli = Cli::parse_from(["gity", "status", "/tmp/demo"]);
534 match cli.into_action().unwrap() {
535 CliAction::Rpc(DaemonCommand::Status {
536 repo_path,
537 known_generation,
538 }) => {
539 assert_eq!(repo_path.as_path(), std::path::Path::new("/tmp/demo"));
540 assert!(known_generation.is_none());
541 }
542 other => panic!("unexpected action: {other:?}"),
543 }
544 }
545
546 #[test]
547 fn cli_action_for_list_with_stats() {
548 let cli = Cli::parse_from(["gity", "list", "--stats"]);
549 match cli.into_action().unwrap() {
550 CliAction::List { stats } => assert!(stats),
551 other => panic!("unexpected action: {other:?}"),
552 }
553 }
554
555 #[test]
556 fn cli_action_for_daemon_metrics() {
557 let cli = Cli::parse_from(["gity", "daemon", "metrics"]);
558 match cli.into_action().unwrap() {
559 CliAction::Rpc(DaemonCommand::Metrics) => {}
560 other => panic!("unexpected action: {other:?}"),
561 }
562 }
563
564 #[test]
565 fn cli_action_for_events() {
566 let cli = Cli::parse_from(["gity", "events"]);
567 match cli.into_action().unwrap() {
568 CliAction::StreamEvents => {}
569 other => panic!("unexpected action: {other:?}"),
570 }
571 }
572
573 #[test]
574 fn cli_action_for_fsmonitor_helper() {
575 let cli = Cli::parse_from(["gity", "fsmonitor-helper", "2", "123"]);
576 match cli.into_action().unwrap() {
577 CliAction::FsMonitorHelper { version, token, .. } => {
578 assert_eq!(version, 2);
579 assert_eq!(token.as_deref(), Some("123"));
580 }
581 other => panic!("unexpected action: {other:?}"),
582 }
583 }
584
585 #[test]
586 fn cli_action_for_logs_follow() {
587 let cli = Cli::parse_from(["gity", "logs", "/tmp/demo", "--follow", "--limit", "5"]);
588 match cli.into_action().unwrap() {
589 CliAction::Logs {
590 repo_path,
591 follow,
592 limit,
593 } => {
594 assert_eq!(repo_path, PathBuf::from("/tmp/demo"));
595 assert!(follow);
596 assert_eq!(limit, 5);
597 }
598 other => panic!("unexpected action: {other:?}"),
599 }
600 }
601
602 #[test]
603 fn format_fsmonitor_snapshot_lists_paths() {
604 let snapshot = FsMonitorSnapshot {
605 repo_path: PathBuf::from("/tmp/demo"),
606 dirty_paths: vec![PathBuf::from("a.txt"), PathBuf::from("dir/b.txt")],
607 generation: 3,
608 };
609 let text = format_fsmonitor_snapshot(&snapshot);
610 assert!(text.contains("a.txt"));
611 assert!(text.contains("dir/b.txt"));
612 assert!(text.contains("generation 3"));
613 }
614
615 #[tokio::test]
616 async fn prints_repo_list() {
617 let service = TestService::new(vec![DaemonResponse::RepoList(vec![RepoSummary {
618 repo_path: PathBuf::from("/tmp/demo"),
619 status: RepoStatus::Idle,
620 pending_jobs: 1,
621 last_event: None,
622 generation: 0,
623 }])]);
624 let output = execute_rpc(&service, DaemonCommand::ListRepos)
625 .await
626 .expect("list command succeeds");
627 assert!(output.message.contains("/tmp/demo"));
628 assert!(output.message.contains("[1 jobs, status idle, gen 0]"));
629 }
630
631 #[test]
632 fn format_status_includes_dirty_paths() {
633 let output = format_repo_status(&RepoStatusDetail {
634 repo_path: PathBuf::from("/tmp/demo"),
635 dirty_paths: vec![PathBuf::from("file.txt")],
636 generation: 42,
637 });
638 assert!(output.contains("file.txt"));
639 assert!(output.contains("generation 42"));
640 }
641
642 #[test]
643 fn format_metrics_includes_counts() {
644 let mut jobs = HashMap::new();
645 jobs.insert(
646 JobKind::Prefetch,
647 JobMetrics {
648 spawned: 3,
649 completed: 2,
650 failed: 1,
651 },
652 );
653 let metrics = DaemonMetrics {
654 jobs,
655 global: gity_ipc::GlobalMetrics {
656 pending_jobs: 1,
657 uptime_seconds: 2,
658 cpu_percent: 12.5,
659 rss_bytes: 2048,
660 },
661 repos: vec![gity_ipc::RepoMetrics {
662 repo_path: PathBuf::from("/tmp/demo"),
663 pending_jobs: 3,
664 }],
665 };
666 let output = format_metrics(&metrics);
667 assert!(output.contains("daemon: cpu=12.5%"));
668 assert!(output.contains("prefetch: spawned=3, completed=2, failed=1"));
669 assert!(output.contains("maintenance: spawned=0, completed=0, failed=0"));
670 assert!(output.contains("/tmp/demo -> 3 pending"));
671 }
672
673 #[test]
674 fn format_health_lists_generations() {
675 let health = DaemonHealth {
676 repo_count: 2,
677 pending_jobs: 1,
678 uptime_seconds: 10,
679 repo_generations: vec![
680 gity_ipc::RepoGeneration {
681 repo_path: PathBuf::from("/repo/a"),
682 generation: 5,
683 },
684 gity_ipc::RepoGeneration {
685 repo_path: PathBuf::from("/repo/b"),
686 generation: 3,
687 },
688 ],
689 };
690 let output = format_health(&health);
691 assert!(output.contains("repos: 2"));
692 assert!(output.contains("/repo/a -> generation 5"));
693 assert!(output.contains("/repo/b -> generation 3"));
694 }
695
696 struct TestService {
697 responses: Arc<Mutex<Vec<DaemonResponse>>>,
698 }
699
700 impl TestService {
701 fn new(responses: Vec<DaemonResponse>) -> Self {
702 Self {
703 responses: Arc::new(Mutex::new(responses)),
704 }
705 }
706 }
707
708 #[async_trait]
709 impl DaemonService for TestService {
710 async fn execute(&self, command: DaemonCommand) -> Result<DaemonResponse, DaemonError> {
711 self.responses
712 .lock()
713 .map_err(|_| DaemonError::Transport("poisoned test service".into()))?
714 .pop()
715 .ok_or_else(|| DaemonError::Rejected(format!("no response for {command:?}")))
716 }
717 }
718}