Skip to main content

gity_daemon/
lib.rs

1mod events;
2mod fsmonitor_cache;
3mod logs;
4mod metrics;
5
6mod logging;
7
8use crate::{
9    logs::LogBook,
10    metrics::{collect_resource_snapshot, MetricsRegistry},
11};
12use async_nng::AsyncSocket;
13use async_trait::async_trait;
14use bincode::Options as BincodeOptions;
15pub use events::{NotificationServer, NotificationStream, NotificationSubscriber};
16pub use fsmonitor_cache::FsMonitorCache;
17use gity_git::{working_tree_status, RepoConfigurator};
18use gity_ipc::{
19    bounded_bincode, validate_message_size, Ack, DaemonCommand, DaemonError, DaemonHealth,
20    DaemonNotification, DaemonResponse, DaemonService, FsMonitorSnapshot, GlobalMetrics,
21    JobEventKind, JobEventNotification, JobKind, LogEntry, RepoGeneration, RepoHealthDetail,
22    RepoMetrics, RepoStatusDetail, RepoSummary, WatchEventKind as IpcWatchEventKind,
23    WatchEventNotification,
24};
25use gity_storage::{MetadataStore, RepoMetadata};
26use gity_watch::{
27    NotifyWatcher, WatchError, WatchEvent, WatchEventKind, WatchHandleRef, WatcherRef,
28};
29use nng::Protocol;
30use std::{
31    collections::{HashMap, HashSet, VecDeque},
32    path::{Path, PathBuf},
33    sync::{
34        atomic::{AtomicBool, Ordering},
35        Arc, Mutex,
36    },
37    time::{Duration, Instant, SystemTime},
38};
39use thiserror::Error;
40use tokio::sync::mpsc::error::TryRecvError;
41use tokio::{
42    process::Command,
43    select,
44    sync::{mpsc, Notify},
45    task::JoinHandle,
46    time::sleep,
47};
48use tracing::{error, info, warn};
49
50type SharedDaemon<S> = Arc<Mutex<Daemon<S>>>;
51type NotificationSender = mpsc::UnboundedSender<DaemonNotification>;
52
53/// Scheduler holding a FIFO queue of work. Priority/backoff can layer on later.
54#[derive(Default)]
55struct Scheduler {
56    queue: VecDeque<QueuedJob>,
57}
58
59#[derive(Clone)]
60struct QueuedJob {
61    repo_path: PathBuf,
62    kind: JobKind,
63}
64
65impl Scheduler {
66    fn enqueue(&mut self, repo_path: PathBuf, job: JobKind) {
67        self.queue.push_back(QueuedJob {
68            repo_path,
69            kind: job,
70        });
71    }
72
73    fn len(&self) -> usize {
74        self.queue.len()
75    }
76
77    fn next_job(&mut self) -> Option<QueuedJob> {
78        self.queue.pop_front()
79    }
80}
81
82/// Core daemon logic. IPC layers wrap this struct.
83pub struct Daemon<S: MetadataStore> {
84    store: S,
85    scheduler: Scheduler,
86    started_at: Instant,
87    metrics: MetricsRegistry,
88    notifications: Option<NotificationSender>,
89    fsmonitor_helper: Option<String>,
90    logs: LogBook,
91    fsmonitor_cache: Option<FsMonitorCache>,
92}
93
94impl<S: MetadataStore> Daemon<S> {
95    pub fn new(store: S) -> Self {
96        Self::with_components(
97            store,
98            MetricsRegistry::default(),
99            None,
100            None,
101            LogBook::new(200),
102            None,
103        )
104    }
105
106    pub fn with_metrics(store: S, metrics: MetricsRegistry) -> Self {
107        Self::with_components(store, metrics, None, None, LogBook::new(200), None)
108    }
109
110    pub fn with_components(
111        store: S,
112        metrics: MetricsRegistry,
113        notifications: Option<NotificationSender>,
114        fsmonitor_helper: Option<String>,
115        logs: LogBook,
116        fsmonitor_cache: Option<FsMonitorCache>,
117    ) -> Self {
118        Self {
119            store,
120            scheduler: Scheduler::default(),
121            started_at: Instant::now(),
122            metrics,
123            notifications,
124            fsmonitor_helper,
125            logs,
126            fsmonitor_cache,
127        }
128    }
129
130    fn handle(&mut self, command: DaemonCommand) -> DaemonResponse {
131        match command {
132            DaemonCommand::RegisterRepo { repo_path } => {
133                match self.store.register_repo(repo_path.as_path().to_path_buf()) {
134                    Ok(_) => match configure_repo(&repo_path, self.fsmonitor_helper.as_deref()) {
135                        Ok(_) => DaemonResponse::Ack(Ack::new("registered")),
136                        Err(err) => err,
137                    },
138                    Err(err) => err_response(err),
139                }
140            }
141            DaemonCommand::UnregisterRepo { repo_path } => {
142                match self.store.unregister_repo(&repo_path) {
143                    Ok(Some(_)) => match clear_repo_config(&repo_path) {
144                        Ok(_) => DaemonResponse::Ack(Ack::new("unregistered")),
145                        Err(err) => err,
146                    },
147                    Ok(None) => DaemonResponse::Error(format!(
148                        "repository not registered: {}",
149                        repo_path.display()
150                    )),
151                    Err(err) => err_response(err),
152                }
153            }
154            DaemonCommand::ListRepos => match self.store.list_repos() {
155                Ok(repos) => {
156                    DaemonResponse::RepoList(repos.into_iter().map(repo_summary).collect())
157                }
158                Err(err) => err_response(err),
159            },
160            DaemonCommand::Status {
161                repo_path,
162                known_generation,
163            } => {
164                let current_generation = match self.store.current_generation(&repo_path) {
165                    Ok(current) => current,
166                    Err(err) => return err_response(err),
167                };
168                let dirty_paths = match self.store.drain_dirty_paths(&repo_path) {
169                    Ok(paths) => paths,
170                    Err(err) => return err_response(err),
171                };
172                if dirty_paths.is_empty()
173                    && known_generation
174                        .map(|generation| generation == current_generation)
175                        .unwrap_or(false)
176                {
177                    return DaemonResponse::RepoStatusUnchanged {
178                        repo_path: repo_path.as_path().to_path_buf(),
179                        generation: current_generation,
180                    };
181                }
182                let generation = match self.store.bump_generation(&repo_path) {
183                    Ok(new_generation) => new_generation,
184                    Err(err) => return err_response(err),
185                };
186                match self.compute_status(&repo_path, dirty_paths, generation) {
187                    Ok(detail) => {
188                        self.emit_status_notification(&detail);
189                        DaemonResponse::RepoStatus(detail)
190                    }
191                    Err(err) => err,
192                }
193            }
194            DaemonCommand::QueueJob { repo_path, job } => {
195                self.scheduler
196                    .enqueue(repo_path.as_path().to_path_buf(), job);
197                send_job_notification(&self.notifications, &repo_path, job, JobEventKind::Queued);
198                match self.store.increment_jobs(&repo_path, 1) {
199                    Ok(_) => DaemonResponse::Ack(Ack::new(format!("queued {:?}", job))),
200                    Err(err) => err_response(err),
201                }
202            }
203            DaemonCommand::HealthCheck => match self.store.list_repos() {
204                Ok(repos) => {
205                    let generations = repos
206                        .iter()
207                        .map(|meta| RepoGeneration {
208                            repo_path: meta.repo_path.clone(),
209                            generation: meta.generation,
210                        })
211                        .collect();
212                    DaemonResponse::Health(DaemonHealth {
213                        repo_count: repos.len(),
214                        pending_jobs: self.scheduler.len(),
215                        uptime_seconds: self.started_at.elapsed().as_secs(),
216                        repo_generations: generations,
217                    })
218                }
219                Err(err) => err_response(err),
220            },
221            DaemonCommand::Metrics => match self.store.list_repos() {
222                Ok(repos) => {
223                    let resource = collect_resource_snapshot();
224                    let mut snapshot = self.metrics.snapshot();
225                    snapshot.global = GlobalMetrics {
226                        pending_jobs: self.scheduler.len(),
227                        uptime_seconds: self.started_at.elapsed().as_secs(),
228                        cpu_percent: resource.cpu_percent,
229                        rss_bytes: resource.rss_bytes,
230                    };
231                    snapshot.repos = repos
232                        .into_iter()
233                        .map(|meta| RepoMetrics {
234                            repo_path: meta.repo_path,
235                            pending_jobs: meta.pending_jobs,
236                        })
237                        .collect();
238                    DaemonResponse::Metrics(snapshot)
239                }
240                Err(err) => err_response(err),
241            },
242            DaemonCommand::FsMonitorSnapshot {
243                repo_path,
244                last_seen_generation,
245            } => {
246                let current_generation = match self.store.current_generation(&repo_path) {
247                    Ok(current) => current,
248                    Err(err) => return err_response(err),
249                };
250                let dirty_paths = match self.store.drain_dirty_paths(&repo_path) {
251                    Ok(paths) => paths,
252                    Err(err) => return err_response(err),
253                };
254                if dirty_paths.is_empty()
255                    && last_seen_generation
256                        .map(|known| known == current_generation)
257                        .unwrap_or(false)
258                {
259                    let snapshot = FsMonitorSnapshot {
260                        repo_path: repo_path.as_path().to_path_buf(),
261                        dirty_paths: Vec::new(),
262                        generation: current_generation,
263                    };
264                    // Write to mmap cache for fast subsequent reads
265                    if let Some(cache) = &self.fsmonitor_cache {
266                        let _ = cache.write(&repo_path, &snapshot);
267                    }
268                    return DaemonResponse::FsMonitorSnapshot(snapshot);
269                }
270                let generation = match self.store.bump_generation(&repo_path) {
271                    Ok(new_generation) => new_generation,
272                    Err(err) => return err_response(err),
273                };
274                // Filter out .git internal paths - fsmonitor only wants working tree files
275                let working_tree_paths = filter_working_tree_paths(dirty_paths);
276                let snapshot = FsMonitorSnapshot {
277                    repo_path: repo_path.as_path().to_path_buf(),
278                    dirty_paths: working_tree_paths,
279                    generation,
280                };
281                // Write to mmap cache for fast subsequent reads
282                if let Some(cache) = &self.fsmonitor_cache {
283                    let _ = cache.write(&repo_path, &snapshot);
284                }
285                DaemonResponse::FsMonitorSnapshot(snapshot)
286            }
287            DaemonCommand::FetchLogs { repo_path, limit } => {
288                let entries = self.logs.recent(&repo_path, limit);
289                DaemonResponse::Logs(entries)
290            }
291            DaemonCommand::RepoHealth { repo_path } => self.compute_repo_health(&repo_path),
292            DaemonCommand::Shutdown => DaemonResponse::Ack(Ack::new("shutdown requested")),
293        }
294    }
295
296    fn compute_repo_health(&self, repo_path: &Path) -> DaemonResponse {
297        let meta = match self.store.get_repo(repo_path) {
298            Ok(Some(meta)) => meta,
299            Ok(None) => {
300                return DaemonResponse::Error(format!(
301                    "repository not registered: {}",
302                    repo_path.display()
303                ));
304            }
305            Err(err) => return err_response(err),
306        };
307        let dirty_path_count = match self.store.dirty_path_count(repo_path) {
308            Ok(count) => count,
309            Err(err) => {
310                warn!("failed to get dirty path count: {}", err);
311                0
312            }
313        };
314        DaemonResponse::RepoHealth(RepoHealthDetail {
315            repo_path: repo_path.to_path_buf(),
316            generation: meta.generation,
317            pending_jobs: meta.pending_jobs,
318            watcher_active: false, // Placeholder - Runtime will override with actual state
319            last_event: meta.last_event,
320            dirty_path_count,
321            sled_ok: true, // Placeholder - checked via store operations
322            needs_reconciliation: meta.needs_reconciliation.unwrap_or(false),
323            throttling_active: false, // Placeholder - Runtime will override
324            next_scheduled_job: None,
325        })
326    }
327
328    fn next_job(&mut self) -> Option<QueuedJob> {
329        self.scheduler.next_job()
330    }
331
332    fn mark_job_completed(&mut self, job: &QueuedJob) {
333        let now = SystemTime::now();
334        if let Err(err) = self.store.increment_jobs(&job.repo_path, -1) {
335            warn!(
336                "failed to decrement job counter for {}: {}",
337                job.repo_path.display(),
338                err
339            );
340        }
341        if let Err(err) = self.store.record_event(&job.repo_path, now) {
342            warn!(
343                "failed to record job completion event for {}: {}",
344                job.repo_path.display(),
345                err
346            );
347        }
348    }
349
350    fn emit_status_notification(&self, detail: &RepoStatusDetail) {
351        if let Some(tx) = &self.notifications {
352            let _ = tx.send(DaemonNotification::RepoStatus(detail.clone()));
353        }
354    }
355}
356
357fn repo_summary(meta: RepoMetadata) -> RepoSummary {
358    RepoSummary {
359        repo_path: meta.repo_path,
360        status: meta.status,
361        pending_jobs: meta.pending_jobs,
362        last_event: meta.last_event,
363        generation: meta.generation,
364    }
365}
366
367fn err_response(err: impl ToString) -> DaemonResponse {
368    DaemonResponse::Error(err.to_string())
369}
370
371fn configure_repo(path: &Path, helper: Option<&str>) -> Result<(), DaemonResponse> {
372    RepoConfigurator::open(path)
373        .and_then(|repo| repo.apply_performance_settings(helper))
374        .map_err(|err| DaemonResponse::Error(format!("failed to configure repo: {err}")))
375}
376
377fn clear_repo_config(path: &Path) -> Result<(), DaemonResponse> {
378    RepoConfigurator::open(path)
379        .and_then(|repo| repo.clear_performance_settings())
380        .map_err(|err| DaemonResponse::Error(format!("failed to clear repo config: {err}")))
381}
382
383impl<S: MetadataStore> Daemon<S> {
384    fn handle_watch_event(&mut self, repo_path: &Path, event: &WatchEvent) {
385        let now = SystemTime::now();
386        if let Err(err) = self.store.record_event(repo_path, now) {
387            warn!(
388                "failed to record watch event for {}: {}",
389                repo_path.display(),
390                err
391            );
392        }
393        if should_track(&event.kind) {
394            if let Some(relative) = relative_path(repo_path, &event.path) {
395                if let Err(err) = self.store.mark_dirty_path(repo_path, relative) {
396                    warn!(
397                        "failed to mark dirty path for {}: {}",
398                        repo_path.display(),
399                        err
400                    );
401                }
402            }
403        } else if let WatchEventKind::Deleted = event.kind {
404            if let Err(err) = self.store.mark_dirty_path(repo_path, PathBuf::from(".")) {
405                warn!(
406                    "failed to mark repo dirty after deletion in {}: {}",
407                    repo_path.display(),
408                    err
409                );
410            }
411        }
412    }
413
414    fn repo_paths(&self) -> Vec<PathBuf> {
415        self.store
416            .list_repos()
417            .map(|repos| repos.into_iter().map(|meta| meta.repo_path).collect())
418            .unwrap_or_default()
419    }
420
421    fn compute_status(
422        &self,
423        repo_path: &Path,
424        paths: Vec<PathBuf>,
425        generation: u64,
426    ) -> Result<RepoStatusDetail, DaemonResponse> {
427        let dirty_paths = working_tree_status(repo_path, &paths)
428            .map_err(|err| DaemonResponse::Error(format!("git status failed: {err}")))?;
429        Ok(RepoStatusDetail {
430            repo_path: repo_path.to_path_buf(),
431            dirty_paths,
432            generation,
433        })
434    }
435}
436
437fn relative_path(repo_path: &Path, path: &Path) -> Option<PathBuf> {
438    if let Ok(relative) = path.strip_prefix(repo_path) {
439        if relative.as_os_str().is_empty() {
440            None
441        } else {
442            Some(relative.to_path_buf())
443        }
444    } else {
445        Some(path.to_path_buf())
446    }
447}
448
449/// Returns true if the path is inside the `.git` directory.
450/// Git's fsmonitor only wants working tree paths, not internal Git files.
451fn is_git_internal_path(path: &Path) -> bool {
452    path.components().any(|c| c.as_os_str() == ".git")
453}
454
455/// Filter out `.git` internal paths for fsmonitor output.
456fn filter_working_tree_paths(paths: Vec<PathBuf>) -> Vec<PathBuf> {
457    paths
458        .into_iter()
459        .filter(|p| !is_git_internal_path(p))
460        .collect()
461}
462
463fn should_track(kind: &WatchEventKind) -> bool {
464    matches!(
465        kind,
466        WatchEventKind::Created | WatchEventKind::Modified | WatchEventKind::Deleted
467    )
468}
469
470fn map_watch_kind(kind: &WatchEventKind) -> IpcWatchEventKind {
471    match kind {
472        WatchEventKind::Created => IpcWatchEventKind::Created,
473        WatchEventKind::Modified => IpcWatchEventKind::Modified,
474        WatchEventKind::Deleted => IpcWatchEventKind::Deleted,
475    }
476}
477
478fn send_job_notification(
479    notifications: &Option<NotificationSender>,
480    repo_path: &Path,
481    job: JobKind,
482    kind: JobEventKind,
483) {
484    if let Some(tx) = notifications {
485        if let Err(err) = tx.send(DaemonNotification::JobEvent(JobEventNotification {
486            repo_path: repo_path.to_path_buf(),
487            job,
488            kind,
489        })) {
490            warn!("failed to send job notification: {}", err);
491        }
492    }
493}
494
495fn send_log_notification(notifications: &Option<NotificationSender>, entry: &LogEntry) {
496    if let Some(tx) = notifications {
497        if let Err(err) = tx.send(DaemonNotification::Log(entry.clone())) {
498            warn!("failed to send log notification: {}", err);
499        }
500    }
501}
502
503/// Signal shared by runtime/server tasks to coordinate shutdown.
504#[derive(Clone)]
505pub struct Shutdown {
506    flag: Arc<AtomicBool>,
507    notify: Arc<Notify>,
508}
509
510impl Shutdown {
511    pub fn new() -> Self {
512        Self {
513            flag: Arc::new(AtomicBool::new(false)),
514            notify: Arc::new(Notify::new()),
515        }
516    }
517}
518
519impl Default for Shutdown {
520    fn default() -> Self {
521        Self::new()
522    }
523}
524
525impl Shutdown {
526    pub fn shutdown(&self) {
527        if !self.flag.swap(true, Ordering::SeqCst) {
528            self.notify.notify_waiters();
529        }
530    }
531
532    pub fn is_shutdown(&self) -> bool {
533        self.flag.load(Ordering::SeqCst)
534    }
535
536    pub async fn wait(&self) {
537        if self.is_shutdown() {
538            return;
539        }
540        self.notify.notified().await;
541    }
542}
543
544/// Configuration for idle-time scheduling.
545#[derive(Clone)]
546pub struct IdleScheduleConfig {
547    /// Duration of inactivity before triggering prefetch.
548    pub prefetch_idle_secs: u64,
549    /// Duration of inactivity before triggering maintenance.
550    pub maintenance_idle_secs: u64,
551    /// Whether idle scheduling is enabled.
552    pub enabled: bool,
553    /// Interval in seconds between log pruning checks.
554    pub log_prune_interval_secs: u64,
555    /// Maximum age in seconds for log entries before pruning.
556    pub log_max_age_secs: u64,
557    /// Interval in seconds between database compaction.
558    pub compact_interval_secs: u64,
559}
560
561impl Default for IdleScheduleConfig {
562    fn default() -> Self {
563        Self {
564            prefetch_idle_secs: 300,        // 5 minutes
565            maintenance_idle_secs: 600,     // 10 minutes
566            enabled: true,
567            log_prune_interval_secs: 3600,  // 1 hour
568            log_max_age_secs: 604800,       // 7 days
569            compact_interval_secs: 3600,    // 1 hour
570        }
571    }
572}
573
574/// Tracks per-repo idle state for scheduling.
575struct RepoIdleState {
576    last_activity: Instant,
577    prefetch_scheduled: bool,
578    maintenance_scheduled: bool,
579}
580
581impl RepoIdleState {
582    fn new() -> Self {
583        Self {
584            last_activity: Instant::now(),
585            prefetch_scheduled: false,
586            maintenance_scheduled: false,
587        }
588    }
589
590    fn touch(&mut self) {
591        self.last_activity = Instant::now();
592        self.prefetch_scheduled = false;
593        self.maintenance_scheduled = false;
594    }
595
596    fn idle_duration(&self) -> Duration {
597        self.last_activity.elapsed()
598    }
599}
600
601/// Adaptive poll interval that adjusts based on activity.
602/// Polls fast when active, slow when idle to save CPU.
603pub struct AdaptivePollInterval {
604    last_activity: Instant,
605    /// Poll interval when recently active (default: 50ms)
606    pub active_interval: Duration,
607    /// Poll interval when idle (default: 250ms)
608    pub idle_interval: Duration,
609    /// Poll interval when very idle (default: 500ms)
610    pub very_idle_interval: Duration,
611    /// Time before considered idle (default: 5s)
612    pub idle_threshold: Duration,
613    /// Time before considered very idle (default: 30s)
614    pub very_idle_threshold: Duration,
615}
616
617impl Default for AdaptivePollInterval {
618    fn default() -> Self {
619        Self {
620            last_activity: Instant::now(),
621            active_interval: Duration::from_millis(50),
622            idle_interval: Duration::from_millis(250),
623            very_idle_interval: Duration::from_millis(500),
624            idle_threshold: Duration::from_secs(5),
625            very_idle_threshold: Duration::from_secs(30),
626        }
627    }
628}
629
630impl AdaptivePollInterval {
631    /// Returns the current poll interval based on time since last activity.
632    pub fn current(&self) -> Duration {
633        let since_activity = self.last_activity.elapsed();
634        if since_activity < self.idle_threshold {
635            self.active_interval
636        } else if since_activity < self.very_idle_threshold {
637            self.idle_interval
638        } else {
639            self.very_idle_interval
640        }
641    }
642
643    /// Record activity to reset to fast polling.
644    pub fn touch(&mut self) {
645        self.last_activity = Instant::now();
646    }
647}
648
649/// Background scheduler loop.
650pub struct Runtime<S: MetadataStore> {
651    daemon: SharedDaemon<S>,
652    watcher: WatcherRef,
653    active_watchers: Arc<Mutex<HashMap<PathBuf, WatchRegistration>>>,
654    event_tx: mpsc::UnboundedSender<WatchEventEnvelope>,
655    event_rx: Mutex<mpsc::UnboundedReceiver<WatchEventEnvelope>>,
656    shutdown: Shutdown,
657    adaptive_poll: Mutex<AdaptivePollInterval>,
658    metrics: MetricsRegistry,
659    notifications: Option<NotificationSender>,
660    logs: LogBook,
661    idle_config: IdleScheduleConfig,
662    idle_states: Arc<Mutex<HashMap<PathBuf, RepoIdleState>>>,
663    /// Tracks last watcher activity time per repository for health checks
664    last_watcher_activity: Arc<Mutex<HashMap<PathBuf, Instant>>>,
665    /// Tracks whether resource throttling is currently active
666    throttling_active: Arc<AtomicBool>,
667    /// Last time log entries were pruned
668    last_prune_time: Arc<Mutex<Instant>>,
669    /// Last time database was compacted
670    last_compact_time: Arc<Mutex<Instant>>,
671    /// Data directory path for compaction (optional)
672    data_dir: Option<PathBuf>,
673}
674
675impl<S: MetadataStore> Runtime<S> {
676    pub fn new(store: S, log_tree: Option<sled::Tree>) -> Self {
677        Self::with_watcher_and_notifications(
678            store,
679            Arc::new(NotifyWatcher::new()),
680            None,
681            None,
682            log_tree,
683        )
684    }
685
686    pub fn with_watcher(store: S, watcher: WatcherRef, log_tree: Option<sled::Tree>) -> Self {
687        Self::with_watcher_and_notifications(store, watcher, None, None, log_tree)
688    }
689
690    pub fn with_notifications(
691        store: S,
692        notifications: Option<NotificationSender>,
693        fsmonitor_helper: Option<String>,
694        log_tree: Option<sled::Tree>,
695    ) -> Self {
696        Self::with_data_dir(
697            store,
698            Arc::new(NotifyWatcher::new()),
699            notifications,
700            fsmonitor_helper,
701            log_tree,
702            None,
703        )
704    }
705
706    pub fn with_notifications_and_data_dir(
707        store: S,
708        notifications: Option<NotificationSender>,
709        fsmonitor_helper: Option<String>,
710        log_tree: Option<sled::Tree>,
711        data_dir: PathBuf,
712    ) -> Self {
713        Self::with_data_dir(
714            store,
715            Arc::new(NotifyWatcher::new()),
716            notifications,
717            fsmonitor_helper,
718            log_tree,
719            Some(data_dir),
720        )
721    }
722
723    pub fn with_watcher_and_notifications(
724        store: S,
725        watcher: WatcherRef,
726        notifications: Option<NotificationSender>,
727        fsmonitor_helper: Option<String>,
728        log_tree: Option<sled::Tree>,
729    ) -> Self {
730        Self::with_data_dir(store, watcher, notifications, fsmonitor_helper, log_tree, None)
731    }
732
733    pub fn with_data_dir(
734        store: S,
735        watcher: WatcherRef,
736        notifications: Option<NotificationSender>,
737        fsmonitor_helper: Option<String>,
738        log_tree: Option<sled::Tree>,
739        data_dir: Option<PathBuf>,
740    ) -> Self {
741        let (event_tx, event_rx) = mpsc::unbounded_channel();
742        let metrics = MetricsRegistry::default();
743        let logs = log_tree
744            .map(|tree| LogBook::with_persistence(200, tree))
745            .unwrap_or_else(|| LogBook::new(200));
746        let daemon_notifications = notifications.clone();
747        // Create fsmonitor cache if data_dir is provided
748        let fsmonitor_cache = data_dir.as_ref().and_then(|dir| {
749            FsMonitorCache::new(dir.join("fsmonitor_cache"))
750                .map_err(|e| warn!("failed to create fsmonitor cache: {}", e))
751                .ok()
752        });
753        Self {
754            daemon: Arc::new(Mutex::new(Daemon::with_components(
755                store,
756                metrics.clone(),
757                daemon_notifications,
758                fsmonitor_helper,
759                logs.clone(),
760                fsmonitor_cache,
761            ))),
762            watcher,
763            active_watchers: Arc::new(Mutex::new(HashMap::new())),
764            event_tx,
765            event_rx: Mutex::new(event_rx),
766            shutdown: Shutdown::new(),
767            adaptive_poll: Mutex::new(AdaptivePollInterval::default()),
768            metrics,
769            notifications,
770            logs,
771            idle_config: IdleScheduleConfig::default(),
772            idle_states: Arc::new(Mutex::new(HashMap::new())),
773            last_watcher_activity: Arc::new(Mutex::new(HashMap::new())),
774            throttling_active: Arc::new(AtomicBool::new(false)),
775            last_prune_time: Arc::new(Mutex::new(Instant::now())),
776            last_compact_time: Arc::new(Mutex::new(Instant::now())),
777            data_dir,
778        }
779    }
780
781    pub fn shared(&self) -> SharedDaemon<S> {
782        Arc::clone(&self.daemon)
783    }
784
785    pub fn shutdown_signal(&self) -> Shutdown {
786        self.shutdown.clone()
787    }
788
789    pub fn service_handle(&self) -> InProcessService<S> {
790        InProcessService::from_shared(self.shared())
791    }
792
793    pub async fn run(self) {
794        // Check for repos that need reconciliation on startup
795        self.check_reconciliation_needed();
796
797        while !self.shutdown.is_shutdown() {
798            self.drain_watch_events();
799            if let Some(job) = self.next_job_to_run() {
800                self.spawn_job(job);
801            }
802            self.check_idle_schedules();
803            if let Err(err) = self.sync_watchers().await {
804                eprintln!("failed to synchronize watchers: {err}");
805            }
806            // Use adaptive poll interval: fast when active, slow when idle
807            let poll_duration = self
808                .adaptive_poll
809                .lock()
810                .map(|guard| guard.current())
811                .unwrap_or(Duration::from_millis(250));
812            sleep(poll_duration).await;
813        }
814        self.stop_all_watchers();
815
816        // Perform final compaction on shutdown
817        if let Some(data_dir) = &self.data_dir {
818            info!("performing final compaction before shutdown");
819            match gity_storage::StorageContext::new(data_dir) {
820                Ok(storage) => {
821                    if let Err(e) = storage.compact_all() {
822                        warn!("shutdown compaction failed: {}", e);
823                    }
824                }
825                Err(e) => {
826                    warn!("failed to open storage for shutdown compaction: {}", e);
827                }
828            }
829        }
830    }
831
832    /// Check for repos that need reconciliation after daemon restart/downtime.
833    fn check_reconciliation_needed(&self) {
834        let repos = self.current_repo_paths();
835        let now = SystemTime::now();
836        let max_gap = Duration::from_secs(300); // 5 minute gap triggers reconciliation
837
838        for repo_path in repos {
839            let needs_reconciliation = if let Ok(guard) = self.daemon.lock() {
840                match guard.store.get_repo(&repo_path) {
841                    Ok(Some(meta)) => {
842                        if let Some(last_event) = meta.last_event {
843                            match now.duration_since(last_event) {
844                                Ok(gap) => gap > max_gap,
845                                Err(_) => false,
846                            }
847                        } else {
848                            // No last event recorded, assume fresh registration
849                            false
850                        }
851                    }
852                    _ => false,
853                }
854            } else {
855                continue;
856            };
857
858            if needs_reconciliation {
859                self.schedule_reconciliation(&repo_path);
860            }
861        }
862    }
863
864    /// Schedule a reconciliation scan for a repository.
865    fn schedule_reconciliation(&self, repo_path: &Path) {
866        let entry = self.logs.record(
867            repo_path,
868            "reconciliation needed after daemon downtime - scheduling status refresh",
869        );
870        send_log_notification(&self.notifications, &entry);
871
872        // Mark the repo as needing reconciliation
873        if let Ok(guard) = self.daemon.lock() {
874            if let Err(err) = guard.store.set_needs_reconciliation(repo_path, true) {
875                error!(
876                    "failed to mark reconciliation needed for {}: {}",
877                    repo_path.display(),
878                    err
879                );
880            }
881        }
882
883        // Mark the entire repo as dirty to force a full status check
884        if let Ok(guard) = self.daemon.lock() {
885            if let Err(err) = guard.store.mark_dirty_path(repo_path, PathBuf::from(".")) {
886                error!(
887                    "failed to mark repo dirty for reconciliation in {}: {}",
888                    repo_path.display(),
889                    err
890                );
891            }
892        }
893
894        info!("scheduled reconciliation scan for {}", repo_path.display());
895    }
896
897    fn check_idle_schedules(&self) {
898        if !self.idle_config.enabled {
899            return;
900        }
901
902        let repos = self.current_repo_paths();
903        let prefetch_threshold = Duration::from_secs(self.idle_config.prefetch_idle_secs);
904        let maintenance_threshold = Duration::from_secs(self.idle_config.maintenance_idle_secs);
905
906        let mut states = match self.idle_states.lock() {
907            Ok(guard) => guard,
908            Err(_) => return,
909        };
910
911        for repo in repos {
912            let state = states
913                .entry(repo.clone())
914                .or_insert_with(RepoIdleState::new);
915            let idle = state.idle_duration();
916
917            // Schedule prefetch if idle long enough
918            if idle >= prefetch_threshold && !state.prefetch_scheduled {
919                if let Ok(mut guard) = self.daemon.lock() {
920                    guard.scheduler.enqueue(repo.clone(), JobKind::Prefetch);
921                    send_job_notification(
922                        &self.notifications,
923                        &repo,
924                        JobKind::Prefetch,
925                        JobEventKind::Queued,
926                    );
927                    let entry = self.logs.record(&repo, "idle prefetch scheduled");
928                    send_log_notification(&self.notifications, &entry);
929                }
930                state.prefetch_scheduled = true;
931            }
932
933            // Schedule maintenance if idle even longer
934            if idle >= maintenance_threshold && !state.maintenance_scheduled {
935                if let Ok(mut guard) = self.daemon.lock() {
936                    guard.scheduler.enqueue(repo.clone(), JobKind::Maintenance);
937                    send_job_notification(
938                        &self.notifications,
939                        &repo,
940                        JobKind::Maintenance,
941                        JobEventKind::Queued,
942                    );
943                    let entry = self.logs.record(&repo, "idle maintenance scheduled");
944                    send_log_notification(&self.notifications, &entry);
945                }
946                state.maintenance_scheduled = true;
947            }
948        }
949
950        // Clean up states for repos that are no longer registered
951        let repo_set: HashSet<_> = self.current_repo_paths().into_iter().collect();
952        states.retain(|path, _| repo_set.contains(path));
953
954        // Drop the lock before global maintenance tasks
955        drop(states);
956
957        // Global maintenance: Log pruning
958        let prune_interval = Duration::from_secs(self.idle_config.log_prune_interval_secs);
959        if let Ok(mut last_prune) = self.last_prune_time.lock() {
960            if last_prune.elapsed() >= prune_interval {
961                let max_age = Duration::from_secs(self.idle_config.log_max_age_secs);
962                let pruned = self.logs.prune_old_entries(max_age);
963                if pruned > 0 {
964                    info!("pruned {} old log entries", pruned);
965                }
966                *last_prune = Instant::now();
967            }
968        }
969
970        // Global maintenance: Database compaction
971        if let Some(data_dir) = &self.data_dir {
972            let compact_interval = Duration::from_secs(self.idle_config.compact_interval_secs);
973            if let Ok(mut last_compact) = self.last_compact_time.lock() {
974                if last_compact.elapsed() >= compact_interval {
975                    match gity_storage::StorageContext::new(data_dir) {
976                        Ok(storage) => {
977                            if let Err(e) = storage.compact_all() {
978                                warn!("compaction failed: {}", e);
979                            } else {
980                                info!("database compaction completed");
981                            }
982                        }
983                        Err(e) => {
984                            warn!("failed to open storage for compaction: {}", e);
985                        }
986                    }
987                    *last_compact = Instant::now();
988                }
989            }
990        }
991    }
992
993    fn touch_repo_activity(&self, repo_path: &Path) {
994        if let Ok(mut states) = self.idle_states.lock() {
995            if let Some(state) = states.get_mut(repo_path) {
996                state.touch();
997            }
998        }
999    }
1000
1001    fn next_job_to_run(&self) -> Option<QueuedJob> {
1002        if let Ok(mut guard) = self.daemon.lock() {
1003            guard.next_job()
1004        } else {
1005            None
1006        }
1007    }
1008
1009    async fn sync_watchers(&self) -> Result<(), WatchError> {
1010        let desired = self.current_repo_paths();
1011        let existing: HashSet<PathBuf> = self
1012            .active_watchers
1013            .lock()
1014            .map_err(|_| WatchError::Backend("watcher map poisoned".into()))?
1015            .keys()
1016            .cloned()
1017            .collect();
1018
1019        for repo in desired.iter() {
1020            if !existing.contains(repo) {
1021                if let Err(err) = self.start_watcher(repo.clone()).await {
1022                    eprintln!("failed to start watcher for {}: {err}", repo.display());
1023                }
1024            }
1025        }
1026
1027        let desired_set: HashSet<PathBuf> = desired.into_iter().collect();
1028        let to_remove: Vec<PathBuf> = self
1029            .active_watchers
1030            .lock()
1031            .map_err(|_| WatchError::Backend("watcher map poisoned".into()))?
1032            .keys()
1033            .filter(|path| !desired_set.contains(*path))
1034            .cloned()
1035            .collect();
1036
1037        for repo in to_remove {
1038            self.stop_watcher(&repo);
1039        }
1040
1041        Ok(())
1042    }
1043
1044    async fn start_watcher(&self, repo_path: PathBuf) -> Result<(), WatchError> {
1045        let subscription = self.watcher.watch(repo_path.clone()).await?;
1046        let (handle, mut receiver) = subscription.into_parts();
1047        let event_tx = self.event_tx.clone();
1048        let repo_for_task = repo_path.clone();
1049        let task = tokio::spawn(async move {
1050            while let Some(event) = receiver.recv().await {
1051                if event_tx
1052                    .send(WatchEventEnvelope {
1053                        repo_path: repo_for_task.clone(),
1054                        event,
1055                    })
1056                    .is_err()
1057                {
1058                    warn!("watch event channel closed");
1059                    break;
1060                }
1061            }
1062        });
1063        self.active_watchers
1064            .lock()
1065            .map_err(|_| WatchError::Backend("watcher map poisoned".into()))?
1066            .insert(repo_path, WatchRegistration { handle, task });
1067        Ok(())
1068    }
1069
1070    fn stop_watcher(&self, repo_path: &Path) {
1071        if let Ok(mut watchers) = self.active_watchers.lock() {
1072            if let Some(registration) = watchers.remove(repo_path) {
1073                registration.handle.stop();
1074                registration.task.abort();
1075            }
1076        }
1077    }
1078
1079    fn stop_all_watchers(&self) {
1080        if let Ok(mut watchers) = self.active_watchers.lock() {
1081            for (_path, registration) in watchers.drain() {
1082                registration.handle.stop();
1083                registration.task.abort();
1084            }
1085        }
1086    }
1087
1088    fn current_repo_paths(&self) -> Vec<PathBuf> {
1089        if let Ok(guard) = self.daemon.lock() {
1090            guard.repo_paths()
1091        } else {
1092            Vec::new()
1093        }
1094    }
1095
1096    fn spawn_job(&self, job: QueuedJob) {
1097        let daemon = Arc::clone(&self.daemon);
1098        let metrics = self.metrics.clone();
1099        metrics.record_job_spawned(job.kind);
1100        let notifications = self.notifications.clone();
1101        let logs = self.logs.clone();
1102        let entry = logs.record(&job.repo_path, format!("job {:?} started", job.kind));
1103        send_log_notification(&notifications, &entry);
1104        send_job_notification(
1105            &notifications,
1106            &job.repo_path,
1107            job.kind,
1108            JobEventKind::Started,
1109        );
1110        tokio::spawn(async move {
1111            let start = Instant::now();
1112            let result = JobExecutor::run(&job).await;
1113            let duration = start.elapsed();
1114            match result {
1115                Ok(_) => {
1116                    metrics.record_job_completed(job.kind);
1117                    send_job_notification(
1118                        &notifications,
1119                        &job.repo_path,
1120                        job.kind,
1121                        JobEventKind::Completed,
1122                    );
1123                    let entry = logs.record(
1124                        &job.repo_path,
1125                        format!("job {:?} completed in {:?}", job.kind, duration),
1126                    );
1127                    send_log_notification(&notifications, &entry);
1128                    info!(
1129                        "job {:?} for {} finished in {:?}",
1130                        job.kind,
1131                        job.repo_path.display(),
1132                        duration
1133                    );
1134                }
1135                Err(err) => {
1136                    metrics.record_job_failed(job.kind);
1137                    send_job_notification(
1138                        &notifications,
1139                        &job.repo_path,
1140                        job.kind,
1141                        JobEventKind::Failed,
1142                    );
1143                    let entry = logs.record(
1144                        &job.repo_path,
1145                        format!("job {:?} failed after {:?}: {err}", job.kind, duration),
1146                    );
1147                    send_log_notification(&notifications, &entry);
1148                    error!(
1149                        "job {:?} for {} failed after {:?}: {err}",
1150                        job.kind,
1151                        job.repo_path.display(),
1152                        duration
1153                    );
1154                    send_log_notification(
1155                        &notifications,
1156                        &LogEntry {
1157                            repo_path: job.repo_path.clone(),
1158                            message: format!(
1159                                "job {:?} failed after {:?}: {err}",
1160                                job.kind, duration
1161                            ),
1162                            timestamp: SystemTime::now(),
1163                        },
1164                    );
1165                }
1166            }
1167            if let Ok(mut guard) = daemon.lock() {
1168                guard.mark_job_completed(&job);
1169            }
1170        });
1171    }
1172
1173    #[cfg(test)]
1174    pub(crate) fn watcher_state(&self) -> Arc<Mutex<HashMap<PathBuf, WatchRegistration>>> {
1175        Arc::clone(&self.active_watchers)
1176    }
1177
1178    fn drain_watch_events(&self) {
1179        let mut receiver = match self.event_rx.lock() {
1180            Ok(rx) => rx,
1181            Err(_) => {
1182                warn!("watch event receiver poisoned");
1183                return;
1184            }
1185        };
1186
1187        loop {
1188            match receiver.try_recv() {
1189                Ok(envelope) => {
1190                    // Record watcher activity for this repo
1191                    self.record_watcher_activity(&envelope.repo_path);
1192                    // Reset idle timer for this repo
1193                    self.touch_repo_activity(&envelope.repo_path);
1194                    // Reset adaptive poll to fast mode on activity
1195                    if let Ok(mut poll) = self.adaptive_poll.lock() {
1196                        poll.touch();
1197                    }
1198                    if let Ok(mut guard) = self.daemon.lock() {
1199                        guard.handle_watch_event(&envelope.repo_path, &envelope.event);
1200                        self.emit_watch_notification(&envelope.repo_path, &envelope.event);
1201                        self.record_log(
1202                            &envelope.repo_path,
1203                            format!(
1204                                "watch {:?} {}",
1205                                envelope.event.kind,
1206                                envelope.event.path.display()
1207                            ),
1208                        );
1209                    } else {
1210                        warn!("failed to lock daemon for watch event");
1211                        break;
1212                    }
1213                }
1214                Err(TryRecvError::Empty) => break,
1215                Err(TryRecvError::Disconnected) => {
1216                    warn!("watch event channel disconnected");
1217                    break;
1218                }
1219            }
1220        }
1221    }
1222
1223    /// Records watcher activity timestamp for health check verification
1224    fn record_watcher_activity(&self, repo_path: &Path) {
1225        if let Ok(mut activity) = self.last_watcher_activity.lock() {
1226            activity.insert(repo_path.to_path_buf(), Instant::now());
1227        }
1228    }
1229
1230    fn emit_watch_notification(&self, repo_path: &Path, event: &WatchEvent) {
1231        if let Some(tx) = &self.notifications {
1232            let path = relative_path(repo_path, &event.path).unwrap_or_else(|| event.path.clone());
1233            let notification = DaemonNotification::WatchEvent(WatchEventNotification {
1234                repo_path: repo_path.to_path_buf(),
1235                path,
1236                kind: map_watch_kind(&event.kind),
1237            });
1238            if let Err(err) = tx.send(notification) {
1239                warn!("failed to send watch notification: {}", err);
1240            }
1241        }
1242    }
1243
1244    fn record_log(&self, repo_path: &Path, message: impl Into<String>) {
1245        let entry = self.logs.record(repo_path, message);
1246        send_log_notification(&self.notifications, &entry);
1247    }
1248
1249    /// Computes repository health using both daemon state and runtime state.
1250    pub fn compute_repo_health(&self, repo_path: &Path) -> DaemonResponse {
1251        if let Ok(guard) = self.daemon.lock() {
1252            let response = guard.compute_repo_health(repo_path);
1253            match response {
1254                DaemonResponse::RepoHealth(mut detail) => {
1255                    // Check watcher activity
1256                    detail.watcher_active = self.is_watcher_active(repo_path);
1257                    // Check throttling state
1258                    detail.throttling_active = self.throttling_active.load(Ordering::SeqCst);
1259                    // Sled is OK if we could acquire the lock
1260                    detail.sled_ok = true;
1261                    DaemonResponse::RepoHealth(detail)
1262                }
1263                _ => response,
1264            }
1265        } else {
1266            DaemonResponse::Error("failed to acquire daemon lock".to_string())
1267        }
1268    }
1269
1270    /// Checks if the watcher is active for a repository (saw activity in last 60 seconds)
1271    fn is_watcher_active(&self, repo_path: &Path) -> bool {
1272        if let Ok(activity) = self.last_watcher_activity.lock() {
1273            if let Some(last_activity) = activity.get(repo_path) {
1274                last_activity.elapsed().as_secs() < 60
1275            } else {
1276                // No activity recorded - check if repo was just registered
1277                if let Ok(guard) = self.daemon.lock() {
1278                    guard.store.get_repo(repo_path).ok().flatten().is_some()
1279                } else {
1280                    false
1281                }
1282            }
1283        } else {
1284            true // Default to true if lock fails
1285        }
1286    }
1287}
1288
1289struct WatchRegistration {
1290    handle: WatchHandleRef,
1291    task: JoinHandle<()>,
1292}
1293
1294struct JobExecutor;
1295
1296impl JobExecutor {
1297    async fn run(job: &QueuedJob) -> Result<(), JobExecutionError> {
1298        match job.kind {
1299            JobKind::Prefetch => {
1300                // Use git maintenance's prefetch task which:
1301                // - Fetches into refs/prefetch/ namespace (doesn't update local refs)
1302                // - Is safe to run in background without disrupting user
1303                // - Handles multiple remotes correctly
1304                Self::run_git(
1305                    &job.repo_path,
1306                    &["maintenance", "run", "--task=prefetch", "--quiet"],
1307                )
1308                .await
1309            }
1310            JobKind::Maintenance => {
1311                // Run maintenance with --auto to let Git decide what needs running
1312                // based on repository state (loose object count, commit-graph age, etc.)
1313                // This runs: commit-graph, loose-objects, incremental-repack as needed
1314                Self::run_git(&job.repo_path, &["maintenance", "run", "--auto", "--quiet"]).await
1315            }
1316        }
1317    }
1318
1319    async fn run_git(repo_path: &Path, args: &[&str]) -> Result<(), JobExecutionError> {
1320        let output = Command::new("git")
1321            .args(args)
1322            .current_dir(repo_path)
1323            .stdout(std::process::Stdio::null())
1324            .stderr(std::process::Stdio::piped())
1325            .output()
1326            .await
1327            .map_err(JobExecutionError::Io)?;
1328
1329        if output.status.success() {
1330            Ok(())
1331        } else {
1332            // Log stderr for debugging but don't fail on expected errors
1333            // (e.g., no remote configured, already up-to-date)
1334            let stderr = String::from_utf8_lossy(&output.stderr);
1335            if stderr.contains("No remote") || stderr.contains("not a git repository") {
1336                // Expected for local-only repos
1337                Ok(())
1338            } else {
1339                Err(JobExecutionError::Exit(output.status.code().unwrap_or(-1)))
1340            }
1341        }
1342    }
1343}
1344
1345#[derive(Debug, Error)]
1346enum JobExecutionError {
1347    #[error("io error: {0}")]
1348    Io(#[from] std::io::Error),
1349    #[error("git command exited with status {0}")]
1350    Exit(i32),
1351}
1352
1353#[derive(Clone)]
1354struct WatchEventEnvelope {
1355    repo_path: PathBuf,
1356    event: WatchEvent,
1357}
1358
1359/// Shared in-process service used by tests.
1360pub struct InProcessService<S: MetadataStore> {
1361    inner: SharedDaemon<S>,
1362}
1363
1364impl<S: MetadataStore> InProcessService<S> {
1365    pub fn new(store: S) -> Self {
1366        Self {
1367            inner: Arc::new(Mutex::new(Daemon::new(store))),
1368        }
1369    }
1370
1371    pub fn from_shared(inner: SharedDaemon<S>) -> Self {
1372        Self { inner }
1373    }
1374}
1375
1376#[async_trait]
1377impl<S: MetadataStore> DaemonService for InProcessService<S> {
1378    async fn execute(&self, command: DaemonCommand) -> Result<DaemonResponse, DaemonError> {
1379        let mut guard = self
1380            .inner
1381            .lock()
1382            .map_err(|e| DaemonError::Transport(format!("failed to acquire daemon lock: {}", e)))?;
1383        Ok(guard.handle(command))
1384    }
1385}
1386
1387/// IPC server using async-nng REP sockets.
1388pub struct NngServer<S: MetadataStore> {
1389    address: String,
1390    daemon: SharedDaemon<S>,
1391    shutdown: Shutdown,
1392}
1393
1394impl<S: MetadataStore> NngServer<S> {
1395    pub fn new(address: impl Into<String>, daemon: SharedDaemon<S>, shutdown: Shutdown) -> Self {
1396        Self {
1397            address: address.into(),
1398            daemon,
1399            shutdown,
1400        }
1401    }
1402
1403    pub async fn run(&self) -> Result<(), ServerError> {
1404        let socket = nng::Socket::new(Protocol::Rep0)?;
1405        socket.listen(&self.address)?;
1406        let mut async_socket = AsyncSocket::try_from(socket)?;
1407
1408        loop {
1409            select! {
1410                _ = self.shutdown.wait() => break,
1411                recv = async_socket.receive(None) => {
1412                    let message = match recv {
1413                        Ok(msg) => msg,
1414                        Err(nng::Error::Canceled) => continue,
1415                        Err(err) => return Err(ServerError::Socket(err)),
1416                    };
1417                    let reply = self.process_message(message)?;
1418                    async_socket
1419                        .send(reply, None)
1420                        .await
1421                        .map_err(|(_, err)| ServerError::Socket(err))?;
1422                }
1423            }
1424        }
1425
1426        Ok(())
1427    }
1428
1429    fn process_message(&self, message: nng::Message) -> Result<nng::Message, ServerError> {
1430        let data = message.as_slice();
1431
1432        // Validate message size before deserialization
1433        validate_message_size(data).map_err(|err| ServerError::Deserialization(err.to_string()))?;
1434
1435        let command: DaemonCommand = bounded_bincode()
1436            .deserialize(data)
1437            .map_err(|err| ServerError::Deserialization(err.to_string()))?;
1438
1439        let response = {
1440            let mut guard = self
1441                .daemon
1442                .lock()
1443                .map_err(|e| ServerError::Poisoned(e.to_string()))?;
1444            guard.handle(command)
1445        };
1446
1447        let payload = bounded_bincode()
1448            .serialize(&response)
1449            .map_err(|err| ServerError::Serialization(err.to_string()))?;
1450
1451        let mut reply = nng::Message::new();
1452        reply.push_back(&payload);
1453        Ok(reply)
1454    }
1455}
1456
1457/// IPC client implementation using async-nng REQ sockets.
1458pub struct NngClient {
1459    address: String,
1460}
1461
1462impl NngClient {
1463    pub fn new(address: impl Into<String>) -> Self {
1464        Self {
1465            address: address.into(),
1466        }
1467    }
1468
1469    async fn request(&self, command: DaemonCommand) -> Result<DaemonResponse, DaemonError> {
1470        let socket = nng::Socket::new(Protocol::Req0).map_err(map_client_error)?;
1471        socket.dial(&self.address).map_err(map_client_error)?;
1472        let mut async_socket = AsyncSocket::try_from(socket).map_err(map_client_error)?;
1473
1474        let payload = bounded_bincode()
1475            .serialize(&command)
1476            .map_err(|err| DaemonError::Transport(err.to_string()))?;
1477
1478        let mut message = nng::Message::new();
1479        message.push_back(&payload);
1480        async_socket
1481            .send(message, None)
1482            .await
1483            .map_err(|(_, err)| map_client_error(err))?;
1484
1485        let reply = async_socket.receive(None).await.map_err(map_client_error)?;
1486        let data = reply.as_slice();
1487
1488        validate_message_size(data).map_err(|err| DaemonError::Transport(err.to_string()))?;
1489
1490        let response: DaemonResponse = bounded_bincode()
1491            .deserialize(data)
1492            .map_err(|err| DaemonError::Transport(err.to_string()))?;
1493
1494        Ok(response)
1495    }
1496}
1497
1498#[async_trait]
1499impl DaemonService for NngClient {
1500    async fn execute(&self, command: DaemonCommand) -> Result<DaemonResponse, DaemonError> {
1501        self.request(command).await
1502    }
1503}
1504
1505fn map_client_error(err: nng::Error) -> DaemonError {
1506    DaemonError::Transport(err.to_string())
1507}
1508
1509#[derive(Debug, Error)]
1510pub enum ServerError {
1511    #[error("nng error: {0}")]
1512    Socket(#[from] nng::Error),
1513    #[error("deserialization error: {0}")]
1514    Deserialization(String),
1515    #[error("serialization error: {0}")]
1516    Serialization(String),
1517    #[error("daemon lock poisoned: {0}")]
1518    Poisoned(String),
1519}
1520
1521#[cfg(test)]
1522mod tests {
1523    use super::*;
1524    use git2::Repository;
1525    use gity_ipc::ValidatedPath;
1526    use gity_storage::InMemoryMetadataStore;
1527    use gity_watch::{ManualWatchHandle, ManualWatcher, WatchEvent, WatchEventKind};
1528    use std::time::Duration;
1529    use tempfile::tempdir;
1530
1531    fn validated_path(path: PathBuf) -> ValidatedPath {
1532        ValidatedPath::new(path).unwrap()
1533    }
1534
1535    #[test]
1536    fn scheduler_drops_completed_jobs() {
1537        let store = InMemoryMetadataStore::new();
1538        let mut daemon = Daemon::new(store);
1539        let (_repo_dir, repo) = init_temp_repo();
1540        assert!(matches!(
1541            daemon.handle(DaemonCommand::RegisterRepo {
1542                repo_path: validated_path(repo.clone())
1543            }),
1544            DaemonResponse::Ack(_)
1545        ));
1546        daemon.handle(DaemonCommand::QueueJob {
1547            repo_path: validated_path(repo.clone()),
1548            job: JobKind::Prefetch,
1549        });
1550        if let Some(job) = daemon.next_job() {
1551            daemon.mark_job_completed(&job);
1552        }
1553        match daemon.handle(DaemonCommand::ListRepos) {
1554            DaemonResponse::RepoList(entries) => assert_eq!(entries[0].pending_jobs, 0),
1555            other => panic!("unexpected response: {other:?}"),
1556        }
1557    }
1558
1559    #[test]
1560    fn metrics_snapshot_reflects_registry() {
1561        let store = InMemoryMetadataStore::new();
1562        let metrics = MetricsRegistry::default();
1563        let mut daemon = Daemon::with_metrics(store, metrics.clone());
1564
1565        metrics.record_job_spawned(JobKind::Prefetch);
1566        metrics.record_job_failed(JobKind::Prefetch);
1567        metrics.record_job_completed(JobKind::Maintenance);
1568
1569        match daemon.handle(DaemonCommand::Metrics) {
1570            DaemonResponse::Metrics(snapshot) => {
1571                let prefetch = snapshot.jobs.get(&JobKind::Prefetch).unwrap();
1572                assert_eq!(prefetch.spawned, 1);
1573                assert_eq!(prefetch.failed, 1);
1574                assert_eq!(prefetch.completed, 0);
1575                let maintenance = snapshot.jobs.get(&JobKind::Maintenance).unwrap();
1576                assert_eq!(maintenance.completed, 1);
1577                assert!(!snapshot.repos.is_empty() || snapshot.global.pending_jobs == 0);
1578            }
1579            other => panic!("unexpected response: {other:?}"),
1580        }
1581    }
1582
1583    #[test]
1584    fn fsmonitor_snapshot_returns_dirty_paths() {
1585        let store = InMemoryMetadataStore::new();
1586        let mut daemon = Daemon::new(store);
1587        let (_repo_dir, repo) = init_temp_repo();
1588        match daemon.handle(DaemonCommand::RegisterRepo {
1589            repo_path: validated_path(repo.clone()),
1590        }) {
1591            DaemonResponse::Ack(_) => {}
1592            other => panic!("unexpected response: {other:?}"),
1593        }
1594        daemon.handle_watch_event(
1595            &repo,
1596            &WatchEvent::new(repo.join("file.txt"), WatchEventKind::Modified),
1597        );
1598        let response = daemon.handle(DaemonCommand::FsMonitorSnapshot {
1599            repo_path: validated_path(repo.clone()),
1600            last_seen_generation: None,
1601        });
1602        match response {
1603            DaemonResponse::FsMonitorSnapshot(snapshot) => {
1604                assert_eq!(snapshot.repo_path, repo);
1605                assert!(snapshot.dirty_paths.contains(&PathBuf::from("file.txt")));
1606                assert!(snapshot.generation > 0);
1607            }
1608            other => panic!("unexpected response: {other:?}"),
1609        }
1610    }
1611
1612    #[tokio::test]
1613    async fn runtime_processes_jobs() {
1614        let runtime = Runtime::new(InMemoryMetadataStore::new(), None);
1615        let shutdown = runtime.shutdown_signal();
1616        let shared = runtime.shared();
1617        let runtime_task = tokio::spawn(runtime.run());
1618
1619        let service = InProcessService::from_shared(shared.clone());
1620        let (_repo_dir, repo) = init_temp_repo();
1621        service
1622            .execute(DaemonCommand::RegisterRepo {
1623                repo_path: validated_path(repo.clone()),
1624            })
1625            .await
1626            .unwrap();
1627        service
1628            .execute(DaemonCommand::QueueJob {
1629                repo_path: validated_path(repo.clone()),
1630                job: JobKind::Maintenance,
1631            })
1632            .await
1633            .unwrap();
1634
1635        tokio::time::sleep(Duration::from_millis(300)).await;
1636        shutdown.shutdown();
1637        runtime_task.await.unwrap();
1638
1639        match service.execute(DaemonCommand::ListRepos).await.unwrap() {
1640            DaemonResponse::RepoList(entries) => assert_eq!(entries[0].pending_jobs, 0),
1641            other => panic!("unexpected response: {other:?}"),
1642        }
1643    }
1644
1645    #[tokio::test]
1646    async fn nng_client_server_roundtrip() {
1647        let runtime = Runtime::new(InMemoryMetadataStore::new(), None);
1648        let shutdown = runtime.shutdown_signal();
1649        let shared = runtime.shared();
1650        let address = test_address();
1651        let server = NngServer::new(address.clone(), shared, shutdown.clone());
1652        let runtime_task = tokio::spawn(runtime.run());
1653        let server_task = tokio::spawn(async move {
1654            server.run().await.expect("server exits cleanly");
1655        });
1656
1657        tokio::time::sleep(Duration::from_millis(100)).await;
1658
1659        let client = NngClient::new(address);
1660        let (_repo_dir, repo_path) = init_temp_repo();
1661        client
1662            .execute(DaemonCommand::RegisterRepo {
1663                repo_path: validated_path(repo_path.clone()),
1664            })
1665            .await
1666            .unwrap();
1667
1668        match client.execute(DaemonCommand::ListRepos).await.unwrap() {
1669            DaemonResponse::RepoList(entries) => assert_eq!(entries.len(), 1),
1670            other => panic!("unexpected response: {other:?}"),
1671        }
1672
1673        shutdown.shutdown();
1674        runtime_task.await.unwrap();
1675        server_task.await.unwrap();
1676    }
1677
1678    #[tokio::test]
1679    async fn watcher_events_record_last_event() {
1680        let runtime = Runtime::with_watcher(
1681            InMemoryMetadataStore::new(),
1682            Arc::new(ManualWatcher::new()),
1683            None,
1684        );
1685        let watcher_state = runtime.watcher_state();
1686        let shutdown = runtime.shutdown_signal();
1687        let shared = runtime.shared();
1688        let runtime_task = tokio::spawn(runtime.run());
1689
1690        let service = InProcessService::from_shared(shared);
1691        let (_dir, repo_path) = init_temp_repo();
1692        std::fs::write(repo_path.join("file.txt"), "data").expect("write file");
1693        service
1694            .execute(DaemonCommand::RegisterRepo {
1695                repo_path: validated_path(repo_path.clone()),
1696            })
1697            .await
1698            .unwrap();
1699
1700        tokio::time::sleep(Duration::from_millis(300)).await;
1701
1702        let manual_handle = {
1703            let state = watcher_state.lock().expect("watcher state poisoned");
1704            let registration = state.values().next().expect("watcher registered");
1705            registration.handle.clone()
1706        };
1707        let manual = manual_handle
1708            .as_any()
1709            .downcast_ref::<ManualWatchHandle>()
1710            .expect("manual handle");
1711        manual
1712            .emit(WatchEvent::new(
1713                repo_path.join("file.txt"),
1714                WatchEventKind::Modified,
1715            ))
1716            .unwrap();
1717
1718        tokio::time::sleep(Duration::from_millis(300)).await;
1719
1720        match service.execute(DaemonCommand::ListRepos).await.unwrap() {
1721            DaemonResponse::RepoList(entries) => {
1722                assert!(entries[0].last_event.is_some());
1723            }
1724            other => panic!("unexpected response: {other:?}"),
1725        }
1726
1727        let status_response = service
1728            .execute(DaemonCommand::Status {
1729                repo_path: validated_path(repo_path.clone()),
1730                known_generation: None,
1731            })
1732            .await
1733            .unwrap();
1734        match status_response {
1735            DaemonResponse::RepoStatus(detail) => {
1736                assert!(
1737                    detail.dirty_paths.contains(&PathBuf::from("file.txt")),
1738                    "status should include modified file: {:?}",
1739                    detail.dirty_paths
1740                );
1741                assert!(detail.generation > 0);
1742            }
1743            other => panic!("unexpected response: {other:?}"),
1744        }
1745
1746        shutdown.shutdown();
1747        runtime_task.await.unwrap();
1748    }
1749
1750    fn init_temp_repo() -> (tempfile::TempDir, PathBuf) {
1751        let dir = tempdir().expect("create temp dir");
1752        Repository::init(dir.path()).expect("init repo");
1753        let path = dir.path().to_path_buf();
1754        (dir, path)
1755    }
1756
1757    fn test_address() -> String {
1758        use std::net::TcpListener;
1759        let listener = TcpListener::bind("127.0.0.1:0").expect("bind ephemeral port");
1760        let addr = listener.local_addr().unwrap();
1761        drop(listener);
1762        format!("tcp://{}", addr)
1763    }
1764}