1mod events;
2mod fsmonitor_cache;
3mod logs;
4mod metrics;
5
6mod logging;
7
8use crate::{
9 logs::LogBook,
10 metrics::{collect_resource_snapshot, MetricsRegistry},
11};
12use async_nng::AsyncSocket;
13use async_trait::async_trait;
14use bincode::Options as BincodeOptions;
15pub use events::{NotificationServer, NotificationStream, NotificationSubscriber};
16pub use fsmonitor_cache::FsMonitorCache;
17use gity_git::{working_tree_status, RepoConfigurator};
18use gity_ipc::{
19 bounded_bincode, validate_message_size, Ack, DaemonCommand, DaemonError, DaemonHealth,
20 DaemonNotification, DaemonResponse, DaemonService, FsMonitorSnapshot, GlobalMetrics,
21 JobEventKind, JobEventNotification, JobKind, LogEntry, RepoGeneration, RepoHealthDetail,
22 RepoMetrics, RepoStatusDetail, RepoSummary, WatchEventKind as IpcWatchEventKind,
23 WatchEventNotification,
24};
25use gity_storage::{MetadataStore, RepoMetadata};
26use gity_watch::{
27 NotifyWatcher, WatchError, WatchEvent, WatchEventKind, WatchHandleRef, WatcherRef,
28};
29use nng::Protocol;
30use std::{
31 collections::{HashMap, HashSet, VecDeque},
32 path::{Path, PathBuf},
33 sync::{
34 atomic::{AtomicBool, Ordering},
35 Arc, Mutex,
36 },
37 time::{Duration, Instant, SystemTime},
38};
39use thiserror::Error;
40use tokio::sync::mpsc::error::TryRecvError;
41use tokio::{
42 process::Command,
43 select,
44 sync::{mpsc, Notify},
45 task::JoinHandle,
46 time::sleep,
47};
48use tracing::{error, info, warn};
49
50type SharedDaemon<S> = Arc<Mutex<Daemon<S>>>;
51type NotificationSender = mpsc::UnboundedSender<DaemonNotification>;
52
53#[derive(Default)]
55struct Scheduler {
56 queue: VecDeque<QueuedJob>,
57}
58
59#[derive(Clone)]
60struct QueuedJob {
61 repo_path: PathBuf,
62 kind: JobKind,
63}
64
65impl Scheduler {
66 fn enqueue(&mut self, repo_path: PathBuf, job: JobKind) {
67 self.queue.push_back(QueuedJob {
68 repo_path,
69 kind: job,
70 });
71 }
72
73 fn len(&self) -> usize {
74 self.queue.len()
75 }
76
77 fn next_job(&mut self) -> Option<QueuedJob> {
78 self.queue.pop_front()
79 }
80}
81
82pub struct Daemon<S: MetadataStore> {
84 store: S,
85 scheduler: Scheduler,
86 started_at: Instant,
87 metrics: MetricsRegistry,
88 notifications: Option<NotificationSender>,
89 fsmonitor_helper: Option<String>,
90 logs: LogBook,
91 fsmonitor_cache: Option<FsMonitorCache>,
92}
93
94impl<S: MetadataStore> Daemon<S> {
95 pub fn new(store: S) -> Self {
96 Self::with_components(
97 store,
98 MetricsRegistry::default(),
99 None,
100 None,
101 LogBook::new(200),
102 None,
103 )
104 }
105
106 pub fn with_metrics(store: S, metrics: MetricsRegistry) -> Self {
107 Self::with_components(store, metrics, None, None, LogBook::new(200), None)
108 }
109
110 pub fn with_components(
111 store: S,
112 metrics: MetricsRegistry,
113 notifications: Option<NotificationSender>,
114 fsmonitor_helper: Option<String>,
115 logs: LogBook,
116 fsmonitor_cache: Option<FsMonitorCache>,
117 ) -> Self {
118 Self {
119 store,
120 scheduler: Scheduler::default(),
121 started_at: Instant::now(),
122 metrics,
123 notifications,
124 fsmonitor_helper,
125 logs,
126 fsmonitor_cache,
127 }
128 }
129
130 fn handle(&mut self, command: DaemonCommand) -> DaemonResponse {
131 match command {
132 DaemonCommand::RegisterRepo { repo_path } => {
133 match self.store.register_repo(repo_path.as_path().to_path_buf()) {
134 Ok(_) => match configure_repo(&repo_path, self.fsmonitor_helper.as_deref()) {
135 Ok(_) => DaemonResponse::Ack(Ack::new("registered")),
136 Err(err) => err,
137 },
138 Err(err) => err_response(err),
139 }
140 }
141 DaemonCommand::UnregisterRepo { repo_path } => {
142 match self.store.unregister_repo(&repo_path) {
143 Ok(Some(_)) => match clear_repo_config(&repo_path) {
144 Ok(_) => DaemonResponse::Ack(Ack::new("unregistered")),
145 Err(err) => err,
146 },
147 Ok(None) => DaemonResponse::Error(format!(
148 "repository not registered: {}",
149 repo_path.display()
150 )),
151 Err(err) => err_response(err),
152 }
153 }
154 DaemonCommand::ListRepos => match self.store.list_repos() {
155 Ok(repos) => {
156 DaemonResponse::RepoList(repos.into_iter().map(repo_summary).collect())
157 }
158 Err(err) => err_response(err),
159 },
160 DaemonCommand::Status {
161 repo_path,
162 known_generation,
163 } => {
164 let current_generation = match self.store.current_generation(&repo_path) {
165 Ok(current) => current,
166 Err(err) => return err_response(err),
167 };
168 let dirty_paths = match self.store.drain_dirty_paths(&repo_path) {
169 Ok(paths) => paths,
170 Err(err) => return err_response(err),
171 };
172 if dirty_paths.is_empty()
173 && known_generation
174 .map(|generation| generation == current_generation)
175 .unwrap_or(false)
176 {
177 return DaemonResponse::RepoStatusUnchanged {
178 repo_path: repo_path.as_path().to_path_buf(),
179 generation: current_generation,
180 };
181 }
182 let generation = match self.store.bump_generation(&repo_path) {
183 Ok(new_generation) => new_generation,
184 Err(err) => return err_response(err),
185 };
186 match self.compute_status(&repo_path, dirty_paths, generation) {
187 Ok(detail) => {
188 self.emit_status_notification(&detail);
189 DaemonResponse::RepoStatus(detail)
190 }
191 Err(err) => err,
192 }
193 }
194 DaemonCommand::QueueJob { repo_path, job } => {
195 self.scheduler
196 .enqueue(repo_path.as_path().to_path_buf(), job);
197 send_job_notification(&self.notifications, &repo_path, job, JobEventKind::Queued);
198 match self.store.increment_jobs(&repo_path, 1) {
199 Ok(_) => DaemonResponse::Ack(Ack::new(format!("queued {:?}", job))),
200 Err(err) => err_response(err),
201 }
202 }
203 DaemonCommand::HealthCheck => match self.store.list_repos() {
204 Ok(repos) => {
205 let generations = repos
206 .iter()
207 .map(|meta| RepoGeneration {
208 repo_path: meta.repo_path.clone(),
209 generation: meta.generation,
210 })
211 .collect();
212 DaemonResponse::Health(DaemonHealth {
213 repo_count: repos.len(),
214 pending_jobs: self.scheduler.len(),
215 uptime_seconds: self.started_at.elapsed().as_secs(),
216 repo_generations: generations,
217 })
218 }
219 Err(err) => err_response(err),
220 },
221 DaemonCommand::Metrics => match self.store.list_repos() {
222 Ok(repos) => {
223 let resource = collect_resource_snapshot();
224 let mut snapshot = self.metrics.snapshot();
225 snapshot.global = GlobalMetrics {
226 pending_jobs: self.scheduler.len(),
227 uptime_seconds: self.started_at.elapsed().as_secs(),
228 cpu_percent: resource.cpu_percent,
229 rss_bytes: resource.rss_bytes,
230 };
231 snapshot.repos = repos
232 .into_iter()
233 .map(|meta| RepoMetrics {
234 repo_path: meta.repo_path,
235 pending_jobs: meta.pending_jobs,
236 })
237 .collect();
238 DaemonResponse::Metrics(snapshot)
239 }
240 Err(err) => err_response(err),
241 },
242 DaemonCommand::FsMonitorSnapshot {
243 repo_path,
244 last_seen_generation,
245 } => {
246 let current_generation = match self.store.current_generation(&repo_path) {
247 Ok(current) => current,
248 Err(err) => return err_response(err),
249 };
250 let dirty_paths = match self.store.drain_dirty_paths(&repo_path) {
251 Ok(paths) => paths,
252 Err(err) => return err_response(err),
253 };
254 if dirty_paths.is_empty()
255 && last_seen_generation
256 .map(|known| known == current_generation)
257 .unwrap_or(false)
258 {
259 let snapshot = FsMonitorSnapshot {
260 repo_path: repo_path.as_path().to_path_buf(),
261 dirty_paths: Vec::new(),
262 generation: current_generation,
263 };
264 if let Some(cache) = &self.fsmonitor_cache {
266 let _ = cache.write(&repo_path, &snapshot);
267 }
268 return DaemonResponse::FsMonitorSnapshot(snapshot);
269 }
270 let generation = match self.store.bump_generation(&repo_path) {
271 Ok(new_generation) => new_generation,
272 Err(err) => return err_response(err),
273 };
274 let working_tree_paths = filter_working_tree_paths(dirty_paths);
276 let snapshot = FsMonitorSnapshot {
277 repo_path: repo_path.as_path().to_path_buf(),
278 dirty_paths: working_tree_paths,
279 generation,
280 };
281 if let Some(cache) = &self.fsmonitor_cache {
283 let _ = cache.write(&repo_path, &snapshot);
284 }
285 DaemonResponse::FsMonitorSnapshot(snapshot)
286 }
287 DaemonCommand::FetchLogs { repo_path, limit } => {
288 let entries = self.logs.recent(&repo_path, limit);
289 DaemonResponse::Logs(entries)
290 }
291 DaemonCommand::RepoHealth { repo_path } => self.compute_repo_health(&repo_path),
292 DaemonCommand::Shutdown => DaemonResponse::Ack(Ack::new("shutdown requested")),
293 }
294 }
295
296 fn compute_repo_health(&self, repo_path: &Path) -> DaemonResponse {
297 let meta = match self.store.get_repo(repo_path) {
298 Ok(Some(meta)) => meta,
299 Ok(None) => {
300 return DaemonResponse::Error(format!(
301 "repository not registered: {}",
302 repo_path.display()
303 ));
304 }
305 Err(err) => return err_response(err),
306 };
307 let dirty_path_count = match self.store.dirty_path_count(repo_path) {
308 Ok(count) => count,
309 Err(err) => {
310 warn!("failed to get dirty path count: {}", err);
311 0
312 }
313 };
314 DaemonResponse::RepoHealth(RepoHealthDetail {
315 repo_path: repo_path.to_path_buf(),
316 generation: meta.generation,
317 pending_jobs: meta.pending_jobs,
318 watcher_active: false, last_event: meta.last_event,
320 dirty_path_count,
321 sled_ok: true, needs_reconciliation: meta.needs_reconciliation.unwrap_or(false),
323 throttling_active: false, next_scheduled_job: None,
325 })
326 }
327
328 fn next_job(&mut self) -> Option<QueuedJob> {
329 self.scheduler.next_job()
330 }
331
332 fn mark_job_completed(&mut self, job: &QueuedJob) {
333 let now = SystemTime::now();
334 if let Err(err) = self.store.increment_jobs(&job.repo_path, -1) {
335 warn!(
336 "failed to decrement job counter for {}: {}",
337 job.repo_path.display(),
338 err
339 );
340 }
341 if let Err(err) = self.store.record_event(&job.repo_path, now) {
342 warn!(
343 "failed to record job completion event for {}: {}",
344 job.repo_path.display(),
345 err
346 );
347 }
348 }
349
350 fn emit_status_notification(&self, detail: &RepoStatusDetail) {
351 if let Some(tx) = &self.notifications {
352 let _ = tx.send(DaemonNotification::RepoStatus(detail.clone()));
353 }
354 }
355}
356
357fn repo_summary(meta: RepoMetadata) -> RepoSummary {
358 RepoSummary {
359 repo_path: meta.repo_path,
360 status: meta.status,
361 pending_jobs: meta.pending_jobs,
362 last_event: meta.last_event,
363 generation: meta.generation,
364 }
365}
366
367fn err_response(err: impl ToString) -> DaemonResponse {
368 DaemonResponse::Error(err.to_string())
369}
370
371fn configure_repo(path: &Path, helper: Option<&str>) -> Result<(), DaemonResponse> {
372 RepoConfigurator::open(path)
373 .and_then(|repo| repo.apply_performance_settings(helper))
374 .map_err(|err| DaemonResponse::Error(format!("failed to configure repo: {err}")))
375}
376
377fn clear_repo_config(path: &Path) -> Result<(), DaemonResponse> {
378 RepoConfigurator::open(path)
379 .and_then(|repo| repo.clear_performance_settings())
380 .map_err(|err| DaemonResponse::Error(format!("failed to clear repo config: {err}")))
381}
382
383impl<S: MetadataStore> Daemon<S> {
384 fn handle_watch_event(&mut self, repo_path: &Path, event: &WatchEvent) {
385 let now = SystemTime::now();
386 if let Err(err) = self.store.record_event(repo_path, now) {
387 warn!(
388 "failed to record watch event for {}: {}",
389 repo_path.display(),
390 err
391 );
392 }
393 if should_track(&event.kind) {
394 if let Some(relative) = relative_path(repo_path, &event.path) {
395 if let Err(err) = self.store.mark_dirty_path(repo_path, relative) {
396 warn!(
397 "failed to mark dirty path for {}: {}",
398 repo_path.display(),
399 err
400 );
401 }
402 }
403 } else if let WatchEventKind::Deleted = event.kind {
404 if let Err(err) = self.store.mark_dirty_path(repo_path, PathBuf::from(".")) {
405 warn!(
406 "failed to mark repo dirty after deletion in {}: {}",
407 repo_path.display(),
408 err
409 );
410 }
411 }
412 }
413
414 fn repo_paths(&self) -> Vec<PathBuf> {
415 self.store
416 .list_repos()
417 .map(|repos| repos.into_iter().map(|meta| meta.repo_path).collect())
418 .unwrap_or_default()
419 }
420
421 fn compute_status(
422 &self,
423 repo_path: &Path,
424 paths: Vec<PathBuf>,
425 generation: u64,
426 ) -> Result<RepoStatusDetail, DaemonResponse> {
427 let dirty_paths = working_tree_status(repo_path, &paths)
428 .map_err(|err| DaemonResponse::Error(format!("git status failed: {err}")))?;
429 Ok(RepoStatusDetail {
430 repo_path: repo_path.to_path_buf(),
431 dirty_paths,
432 generation,
433 })
434 }
435}
436
437fn relative_path(repo_path: &Path, path: &Path) -> Option<PathBuf> {
438 if let Ok(relative) = path.strip_prefix(repo_path) {
439 if relative.as_os_str().is_empty() {
440 None
441 } else {
442 Some(relative.to_path_buf())
443 }
444 } else {
445 Some(path.to_path_buf())
446 }
447}
448
449fn is_git_internal_path(path: &Path) -> bool {
452 path.components().any(|c| c.as_os_str() == ".git")
453}
454
455fn filter_working_tree_paths(paths: Vec<PathBuf>) -> Vec<PathBuf> {
457 paths
458 .into_iter()
459 .filter(|p| !is_git_internal_path(p))
460 .collect()
461}
462
463fn should_track(kind: &WatchEventKind) -> bool {
464 matches!(
465 kind,
466 WatchEventKind::Created | WatchEventKind::Modified | WatchEventKind::Deleted
467 )
468}
469
470fn map_watch_kind(kind: &WatchEventKind) -> IpcWatchEventKind {
471 match kind {
472 WatchEventKind::Created => IpcWatchEventKind::Created,
473 WatchEventKind::Modified => IpcWatchEventKind::Modified,
474 WatchEventKind::Deleted => IpcWatchEventKind::Deleted,
475 }
476}
477
478fn send_job_notification(
479 notifications: &Option<NotificationSender>,
480 repo_path: &Path,
481 job: JobKind,
482 kind: JobEventKind,
483) {
484 if let Some(tx) = notifications {
485 if let Err(err) = tx.send(DaemonNotification::JobEvent(JobEventNotification {
486 repo_path: repo_path.to_path_buf(),
487 job,
488 kind,
489 })) {
490 warn!("failed to send job notification: {}", err);
491 }
492 }
493}
494
495fn send_log_notification(notifications: &Option<NotificationSender>, entry: &LogEntry) {
496 if let Some(tx) = notifications {
497 if let Err(err) = tx.send(DaemonNotification::Log(entry.clone())) {
498 warn!("failed to send log notification: {}", err);
499 }
500 }
501}
502
503#[derive(Clone)]
505pub struct Shutdown {
506 flag: Arc<AtomicBool>,
507 notify: Arc<Notify>,
508}
509
510impl Shutdown {
511 pub fn new() -> Self {
512 Self {
513 flag: Arc::new(AtomicBool::new(false)),
514 notify: Arc::new(Notify::new()),
515 }
516 }
517}
518
519impl Default for Shutdown {
520 fn default() -> Self {
521 Self::new()
522 }
523}
524
525impl Shutdown {
526 pub fn shutdown(&self) {
527 if !self.flag.swap(true, Ordering::SeqCst) {
528 self.notify.notify_waiters();
529 }
530 }
531
532 pub fn is_shutdown(&self) -> bool {
533 self.flag.load(Ordering::SeqCst)
534 }
535
536 pub async fn wait(&self) {
537 if self.is_shutdown() {
538 return;
539 }
540 self.notify.notified().await;
541 }
542}
543
544#[derive(Clone)]
546pub struct IdleScheduleConfig {
547 pub prefetch_idle_secs: u64,
549 pub maintenance_idle_secs: u64,
551 pub enabled: bool,
553 pub log_prune_interval_secs: u64,
555 pub log_max_age_secs: u64,
557 pub compact_interval_secs: u64,
559}
560
561impl Default for IdleScheduleConfig {
562 fn default() -> Self {
563 Self {
564 prefetch_idle_secs: 300, maintenance_idle_secs: 600, enabled: true,
567 log_prune_interval_secs: 3600, log_max_age_secs: 604800, compact_interval_secs: 3600, }
571 }
572}
573
574struct RepoIdleState {
576 last_activity: Instant,
577 prefetch_scheduled: bool,
578 maintenance_scheduled: bool,
579}
580
581impl RepoIdleState {
582 fn new() -> Self {
583 Self {
584 last_activity: Instant::now(),
585 prefetch_scheduled: false,
586 maintenance_scheduled: false,
587 }
588 }
589
590 fn touch(&mut self) {
591 self.last_activity = Instant::now();
592 self.prefetch_scheduled = false;
593 self.maintenance_scheduled = false;
594 }
595
596 fn idle_duration(&self) -> Duration {
597 self.last_activity.elapsed()
598 }
599}
600
601pub struct AdaptivePollInterval {
604 last_activity: Instant,
605 pub active_interval: Duration,
607 pub idle_interval: Duration,
609 pub very_idle_interval: Duration,
611 pub idle_threshold: Duration,
613 pub very_idle_threshold: Duration,
615}
616
617impl Default for AdaptivePollInterval {
618 fn default() -> Self {
619 Self {
620 last_activity: Instant::now(),
621 active_interval: Duration::from_millis(50),
622 idle_interval: Duration::from_millis(250),
623 very_idle_interval: Duration::from_millis(500),
624 idle_threshold: Duration::from_secs(5),
625 very_idle_threshold: Duration::from_secs(30),
626 }
627 }
628}
629
630impl AdaptivePollInterval {
631 pub fn current(&self) -> Duration {
633 let since_activity = self.last_activity.elapsed();
634 if since_activity < self.idle_threshold {
635 self.active_interval
636 } else if since_activity < self.very_idle_threshold {
637 self.idle_interval
638 } else {
639 self.very_idle_interval
640 }
641 }
642
643 pub fn touch(&mut self) {
645 self.last_activity = Instant::now();
646 }
647}
648
649pub struct Runtime<S: MetadataStore> {
651 daemon: SharedDaemon<S>,
652 watcher: WatcherRef,
653 active_watchers: Arc<Mutex<HashMap<PathBuf, WatchRegistration>>>,
654 event_tx: mpsc::UnboundedSender<WatchEventEnvelope>,
655 event_rx: Mutex<mpsc::UnboundedReceiver<WatchEventEnvelope>>,
656 shutdown: Shutdown,
657 adaptive_poll: Mutex<AdaptivePollInterval>,
658 metrics: MetricsRegistry,
659 notifications: Option<NotificationSender>,
660 logs: LogBook,
661 idle_config: IdleScheduleConfig,
662 idle_states: Arc<Mutex<HashMap<PathBuf, RepoIdleState>>>,
663 last_watcher_activity: Arc<Mutex<HashMap<PathBuf, Instant>>>,
665 throttling_active: Arc<AtomicBool>,
667 last_prune_time: Arc<Mutex<Instant>>,
669 last_compact_time: Arc<Mutex<Instant>>,
671 data_dir: Option<PathBuf>,
673}
674
675impl<S: MetadataStore> Runtime<S> {
676 pub fn new(store: S, log_tree: Option<sled::Tree>) -> Self {
677 Self::with_watcher_and_notifications(
678 store,
679 Arc::new(NotifyWatcher::new()),
680 None,
681 None,
682 log_tree,
683 )
684 }
685
686 pub fn with_watcher(store: S, watcher: WatcherRef, log_tree: Option<sled::Tree>) -> Self {
687 Self::with_watcher_and_notifications(store, watcher, None, None, log_tree)
688 }
689
690 pub fn with_notifications(
691 store: S,
692 notifications: Option<NotificationSender>,
693 fsmonitor_helper: Option<String>,
694 log_tree: Option<sled::Tree>,
695 ) -> Self {
696 Self::with_data_dir(
697 store,
698 Arc::new(NotifyWatcher::new()),
699 notifications,
700 fsmonitor_helper,
701 log_tree,
702 None,
703 )
704 }
705
706 pub fn with_notifications_and_data_dir(
707 store: S,
708 notifications: Option<NotificationSender>,
709 fsmonitor_helper: Option<String>,
710 log_tree: Option<sled::Tree>,
711 data_dir: PathBuf,
712 ) -> Self {
713 Self::with_data_dir(
714 store,
715 Arc::new(NotifyWatcher::new()),
716 notifications,
717 fsmonitor_helper,
718 log_tree,
719 Some(data_dir),
720 )
721 }
722
723 pub fn with_watcher_and_notifications(
724 store: S,
725 watcher: WatcherRef,
726 notifications: Option<NotificationSender>,
727 fsmonitor_helper: Option<String>,
728 log_tree: Option<sled::Tree>,
729 ) -> Self {
730 Self::with_data_dir(store, watcher, notifications, fsmonitor_helper, log_tree, None)
731 }
732
733 pub fn with_data_dir(
734 store: S,
735 watcher: WatcherRef,
736 notifications: Option<NotificationSender>,
737 fsmonitor_helper: Option<String>,
738 log_tree: Option<sled::Tree>,
739 data_dir: Option<PathBuf>,
740 ) -> Self {
741 let (event_tx, event_rx) = mpsc::unbounded_channel();
742 let metrics = MetricsRegistry::default();
743 let logs = log_tree
744 .map(|tree| LogBook::with_persistence(200, tree))
745 .unwrap_or_else(|| LogBook::new(200));
746 let daemon_notifications = notifications.clone();
747 let fsmonitor_cache = data_dir.as_ref().and_then(|dir| {
749 FsMonitorCache::new(dir.join("fsmonitor_cache"))
750 .map_err(|e| warn!("failed to create fsmonitor cache: {}", e))
751 .ok()
752 });
753 Self {
754 daemon: Arc::new(Mutex::new(Daemon::with_components(
755 store,
756 metrics.clone(),
757 daemon_notifications,
758 fsmonitor_helper,
759 logs.clone(),
760 fsmonitor_cache,
761 ))),
762 watcher,
763 active_watchers: Arc::new(Mutex::new(HashMap::new())),
764 event_tx,
765 event_rx: Mutex::new(event_rx),
766 shutdown: Shutdown::new(),
767 adaptive_poll: Mutex::new(AdaptivePollInterval::default()),
768 metrics,
769 notifications,
770 logs,
771 idle_config: IdleScheduleConfig::default(),
772 idle_states: Arc::new(Mutex::new(HashMap::new())),
773 last_watcher_activity: Arc::new(Mutex::new(HashMap::new())),
774 throttling_active: Arc::new(AtomicBool::new(false)),
775 last_prune_time: Arc::new(Mutex::new(Instant::now())),
776 last_compact_time: Arc::new(Mutex::new(Instant::now())),
777 data_dir,
778 }
779 }
780
781 pub fn shared(&self) -> SharedDaemon<S> {
782 Arc::clone(&self.daemon)
783 }
784
785 pub fn shutdown_signal(&self) -> Shutdown {
786 self.shutdown.clone()
787 }
788
789 pub fn service_handle(&self) -> InProcessService<S> {
790 InProcessService::from_shared(self.shared())
791 }
792
793 pub async fn run(self) {
794 self.check_reconciliation_needed();
796
797 while !self.shutdown.is_shutdown() {
798 self.drain_watch_events();
799 if let Some(job) = self.next_job_to_run() {
800 self.spawn_job(job);
801 }
802 self.check_idle_schedules();
803 if let Err(err) = self.sync_watchers().await {
804 eprintln!("failed to synchronize watchers: {err}");
805 }
806 let poll_duration = self
808 .adaptive_poll
809 .lock()
810 .map(|guard| guard.current())
811 .unwrap_or(Duration::from_millis(250));
812 sleep(poll_duration).await;
813 }
814 self.stop_all_watchers();
815
816 if let Some(data_dir) = &self.data_dir {
818 info!("performing final compaction before shutdown");
819 match gity_storage::StorageContext::new(data_dir) {
820 Ok(storage) => {
821 if let Err(e) = storage.compact_all() {
822 warn!("shutdown compaction failed: {}", e);
823 }
824 }
825 Err(e) => {
826 warn!("failed to open storage for shutdown compaction: {}", e);
827 }
828 }
829 }
830 }
831
832 fn check_reconciliation_needed(&self) {
834 let repos = self.current_repo_paths();
835 let now = SystemTime::now();
836 let max_gap = Duration::from_secs(300); for repo_path in repos {
839 let needs_reconciliation = if let Ok(guard) = self.daemon.lock() {
840 match guard.store.get_repo(&repo_path) {
841 Ok(Some(meta)) => {
842 if let Some(last_event) = meta.last_event {
843 match now.duration_since(last_event) {
844 Ok(gap) => gap > max_gap,
845 Err(_) => false,
846 }
847 } else {
848 false
850 }
851 }
852 _ => false,
853 }
854 } else {
855 continue;
856 };
857
858 if needs_reconciliation {
859 self.schedule_reconciliation(&repo_path);
860 }
861 }
862 }
863
864 fn schedule_reconciliation(&self, repo_path: &Path) {
866 let entry = self.logs.record(
867 repo_path,
868 "reconciliation needed after daemon downtime - scheduling status refresh",
869 );
870 send_log_notification(&self.notifications, &entry);
871
872 if let Ok(guard) = self.daemon.lock() {
874 if let Err(err) = guard.store.set_needs_reconciliation(repo_path, true) {
875 error!(
876 "failed to mark reconciliation needed for {}: {}",
877 repo_path.display(),
878 err
879 );
880 }
881 }
882
883 if let Ok(guard) = self.daemon.lock() {
885 if let Err(err) = guard.store.mark_dirty_path(repo_path, PathBuf::from(".")) {
886 error!(
887 "failed to mark repo dirty for reconciliation in {}: {}",
888 repo_path.display(),
889 err
890 );
891 }
892 }
893
894 info!("scheduled reconciliation scan for {}", repo_path.display());
895 }
896
897 fn check_idle_schedules(&self) {
898 if !self.idle_config.enabled {
899 return;
900 }
901
902 let repos = self.current_repo_paths();
903 let prefetch_threshold = Duration::from_secs(self.idle_config.prefetch_idle_secs);
904 let maintenance_threshold = Duration::from_secs(self.idle_config.maintenance_idle_secs);
905
906 let mut states = match self.idle_states.lock() {
907 Ok(guard) => guard,
908 Err(_) => return,
909 };
910
911 for repo in repos {
912 let state = states
913 .entry(repo.clone())
914 .or_insert_with(RepoIdleState::new);
915 let idle = state.idle_duration();
916
917 if idle >= prefetch_threshold && !state.prefetch_scheduled {
919 if let Ok(mut guard) = self.daemon.lock() {
920 guard.scheduler.enqueue(repo.clone(), JobKind::Prefetch);
921 send_job_notification(
922 &self.notifications,
923 &repo,
924 JobKind::Prefetch,
925 JobEventKind::Queued,
926 );
927 let entry = self.logs.record(&repo, "idle prefetch scheduled");
928 send_log_notification(&self.notifications, &entry);
929 }
930 state.prefetch_scheduled = true;
931 }
932
933 if idle >= maintenance_threshold && !state.maintenance_scheduled {
935 if let Ok(mut guard) = self.daemon.lock() {
936 guard.scheduler.enqueue(repo.clone(), JobKind::Maintenance);
937 send_job_notification(
938 &self.notifications,
939 &repo,
940 JobKind::Maintenance,
941 JobEventKind::Queued,
942 );
943 let entry = self.logs.record(&repo, "idle maintenance scheduled");
944 send_log_notification(&self.notifications, &entry);
945 }
946 state.maintenance_scheduled = true;
947 }
948 }
949
950 let repo_set: HashSet<_> = self.current_repo_paths().into_iter().collect();
952 states.retain(|path, _| repo_set.contains(path));
953
954 drop(states);
956
957 let prune_interval = Duration::from_secs(self.idle_config.log_prune_interval_secs);
959 if let Ok(mut last_prune) = self.last_prune_time.lock() {
960 if last_prune.elapsed() >= prune_interval {
961 let max_age = Duration::from_secs(self.idle_config.log_max_age_secs);
962 let pruned = self.logs.prune_old_entries(max_age);
963 if pruned > 0 {
964 info!("pruned {} old log entries", pruned);
965 }
966 *last_prune = Instant::now();
967 }
968 }
969
970 if let Some(data_dir) = &self.data_dir {
972 let compact_interval = Duration::from_secs(self.idle_config.compact_interval_secs);
973 if let Ok(mut last_compact) = self.last_compact_time.lock() {
974 if last_compact.elapsed() >= compact_interval {
975 match gity_storage::StorageContext::new(data_dir) {
976 Ok(storage) => {
977 if let Err(e) = storage.compact_all() {
978 warn!("compaction failed: {}", e);
979 } else {
980 info!("database compaction completed");
981 }
982 }
983 Err(e) => {
984 warn!("failed to open storage for compaction: {}", e);
985 }
986 }
987 *last_compact = Instant::now();
988 }
989 }
990 }
991 }
992
993 fn touch_repo_activity(&self, repo_path: &Path) {
994 if let Ok(mut states) = self.idle_states.lock() {
995 if let Some(state) = states.get_mut(repo_path) {
996 state.touch();
997 }
998 }
999 }
1000
1001 fn next_job_to_run(&self) -> Option<QueuedJob> {
1002 if let Ok(mut guard) = self.daemon.lock() {
1003 guard.next_job()
1004 } else {
1005 None
1006 }
1007 }
1008
1009 async fn sync_watchers(&self) -> Result<(), WatchError> {
1010 let desired = self.current_repo_paths();
1011 let existing: HashSet<PathBuf> = self
1012 .active_watchers
1013 .lock()
1014 .map_err(|_| WatchError::Backend("watcher map poisoned".into()))?
1015 .keys()
1016 .cloned()
1017 .collect();
1018
1019 for repo in desired.iter() {
1020 if !existing.contains(repo) {
1021 if let Err(err) = self.start_watcher(repo.clone()).await {
1022 eprintln!("failed to start watcher for {}: {err}", repo.display());
1023 }
1024 }
1025 }
1026
1027 let desired_set: HashSet<PathBuf> = desired.into_iter().collect();
1028 let to_remove: Vec<PathBuf> = self
1029 .active_watchers
1030 .lock()
1031 .map_err(|_| WatchError::Backend("watcher map poisoned".into()))?
1032 .keys()
1033 .filter(|path| !desired_set.contains(*path))
1034 .cloned()
1035 .collect();
1036
1037 for repo in to_remove {
1038 self.stop_watcher(&repo);
1039 }
1040
1041 Ok(())
1042 }
1043
1044 async fn start_watcher(&self, repo_path: PathBuf) -> Result<(), WatchError> {
1045 let subscription = self.watcher.watch(repo_path.clone()).await?;
1046 let (handle, mut receiver) = subscription.into_parts();
1047 let event_tx = self.event_tx.clone();
1048 let repo_for_task = repo_path.clone();
1049 let task = tokio::spawn(async move {
1050 while let Some(event) = receiver.recv().await {
1051 if event_tx
1052 .send(WatchEventEnvelope {
1053 repo_path: repo_for_task.clone(),
1054 event,
1055 })
1056 .is_err()
1057 {
1058 warn!("watch event channel closed");
1059 break;
1060 }
1061 }
1062 });
1063 self.active_watchers
1064 .lock()
1065 .map_err(|_| WatchError::Backend("watcher map poisoned".into()))?
1066 .insert(repo_path, WatchRegistration { handle, task });
1067 Ok(())
1068 }
1069
1070 fn stop_watcher(&self, repo_path: &Path) {
1071 if let Ok(mut watchers) = self.active_watchers.lock() {
1072 if let Some(registration) = watchers.remove(repo_path) {
1073 registration.handle.stop();
1074 registration.task.abort();
1075 }
1076 }
1077 }
1078
1079 fn stop_all_watchers(&self) {
1080 if let Ok(mut watchers) = self.active_watchers.lock() {
1081 for (_path, registration) in watchers.drain() {
1082 registration.handle.stop();
1083 registration.task.abort();
1084 }
1085 }
1086 }
1087
1088 fn current_repo_paths(&self) -> Vec<PathBuf> {
1089 if let Ok(guard) = self.daemon.lock() {
1090 guard.repo_paths()
1091 } else {
1092 Vec::new()
1093 }
1094 }
1095
1096 fn spawn_job(&self, job: QueuedJob) {
1097 let daemon = Arc::clone(&self.daemon);
1098 let metrics = self.metrics.clone();
1099 metrics.record_job_spawned(job.kind);
1100 let notifications = self.notifications.clone();
1101 let logs = self.logs.clone();
1102 let entry = logs.record(&job.repo_path, format!("job {:?} started", job.kind));
1103 send_log_notification(¬ifications, &entry);
1104 send_job_notification(
1105 ¬ifications,
1106 &job.repo_path,
1107 job.kind,
1108 JobEventKind::Started,
1109 );
1110 tokio::spawn(async move {
1111 let start = Instant::now();
1112 let result = JobExecutor::run(&job).await;
1113 let duration = start.elapsed();
1114 match result {
1115 Ok(_) => {
1116 metrics.record_job_completed(job.kind);
1117 send_job_notification(
1118 ¬ifications,
1119 &job.repo_path,
1120 job.kind,
1121 JobEventKind::Completed,
1122 );
1123 let entry = logs.record(
1124 &job.repo_path,
1125 format!("job {:?} completed in {:?}", job.kind, duration),
1126 );
1127 send_log_notification(¬ifications, &entry);
1128 info!(
1129 "job {:?} for {} finished in {:?}",
1130 job.kind,
1131 job.repo_path.display(),
1132 duration
1133 );
1134 }
1135 Err(err) => {
1136 metrics.record_job_failed(job.kind);
1137 send_job_notification(
1138 ¬ifications,
1139 &job.repo_path,
1140 job.kind,
1141 JobEventKind::Failed,
1142 );
1143 let entry = logs.record(
1144 &job.repo_path,
1145 format!("job {:?} failed after {:?}: {err}", job.kind, duration),
1146 );
1147 send_log_notification(¬ifications, &entry);
1148 error!(
1149 "job {:?} for {} failed after {:?}: {err}",
1150 job.kind,
1151 job.repo_path.display(),
1152 duration
1153 );
1154 send_log_notification(
1155 ¬ifications,
1156 &LogEntry {
1157 repo_path: job.repo_path.clone(),
1158 message: format!(
1159 "job {:?} failed after {:?}: {err}",
1160 job.kind, duration
1161 ),
1162 timestamp: SystemTime::now(),
1163 },
1164 );
1165 }
1166 }
1167 if let Ok(mut guard) = daemon.lock() {
1168 guard.mark_job_completed(&job);
1169 }
1170 });
1171 }
1172
1173 #[cfg(test)]
1174 pub(crate) fn watcher_state(&self) -> Arc<Mutex<HashMap<PathBuf, WatchRegistration>>> {
1175 Arc::clone(&self.active_watchers)
1176 }
1177
1178 fn drain_watch_events(&self) {
1179 let mut receiver = match self.event_rx.lock() {
1180 Ok(rx) => rx,
1181 Err(_) => {
1182 warn!("watch event receiver poisoned");
1183 return;
1184 }
1185 };
1186
1187 loop {
1188 match receiver.try_recv() {
1189 Ok(envelope) => {
1190 self.record_watcher_activity(&envelope.repo_path);
1192 self.touch_repo_activity(&envelope.repo_path);
1194 if let Ok(mut poll) = self.adaptive_poll.lock() {
1196 poll.touch();
1197 }
1198 if let Ok(mut guard) = self.daemon.lock() {
1199 guard.handle_watch_event(&envelope.repo_path, &envelope.event);
1200 self.emit_watch_notification(&envelope.repo_path, &envelope.event);
1201 self.record_log(
1202 &envelope.repo_path,
1203 format!(
1204 "watch {:?} {}",
1205 envelope.event.kind,
1206 envelope.event.path.display()
1207 ),
1208 );
1209 } else {
1210 warn!("failed to lock daemon for watch event");
1211 break;
1212 }
1213 }
1214 Err(TryRecvError::Empty) => break,
1215 Err(TryRecvError::Disconnected) => {
1216 warn!("watch event channel disconnected");
1217 break;
1218 }
1219 }
1220 }
1221 }
1222
1223 fn record_watcher_activity(&self, repo_path: &Path) {
1225 if let Ok(mut activity) = self.last_watcher_activity.lock() {
1226 activity.insert(repo_path.to_path_buf(), Instant::now());
1227 }
1228 }
1229
1230 fn emit_watch_notification(&self, repo_path: &Path, event: &WatchEvent) {
1231 if let Some(tx) = &self.notifications {
1232 let path = relative_path(repo_path, &event.path).unwrap_or_else(|| event.path.clone());
1233 let notification = DaemonNotification::WatchEvent(WatchEventNotification {
1234 repo_path: repo_path.to_path_buf(),
1235 path,
1236 kind: map_watch_kind(&event.kind),
1237 });
1238 if let Err(err) = tx.send(notification) {
1239 warn!("failed to send watch notification: {}", err);
1240 }
1241 }
1242 }
1243
1244 fn record_log(&self, repo_path: &Path, message: impl Into<String>) {
1245 let entry = self.logs.record(repo_path, message);
1246 send_log_notification(&self.notifications, &entry);
1247 }
1248
1249 pub fn compute_repo_health(&self, repo_path: &Path) -> DaemonResponse {
1251 if let Ok(guard) = self.daemon.lock() {
1252 let response = guard.compute_repo_health(repo_path);
1253 match response {
1254 DaemonResponse::RepoHealth(mut detail) => {
1255 detail.watcher_active = self.is_watcher_active(repo_path);
1257 detail.throttling_active = self.throttling_active.load(Ordering::SeqCst);
1259 detail.sled_ok = true;
1261 DaemonResponse::RepoHealth(detail)
1262 }
1263 _ => response,
1264 }
1265 } else {
1266 DaemonResponse::Error("failed to acquire daemon lock".to_string())
1267 }
1268 }
1269
1270 fn is_watcher_active(&self, repo_path: &Path) -> bool {
1272 if let Ok(activity) = self.last_watcher_activity.lock() {
1273 if let Some(last_activity) = activity.get(repo_path) {
1274 last_activity.elapsed().as_secs() < 60
1275 } else {
1276 if let Ok(guard) = self.daemon.lock() {
1278 guard.store.get_repo(repo_path).ok().flatten().is_some()
1279 } else {
1280 false
1281 }
1282 }
1283 } else {
1284 true }
1286 }
1287}
1288
1289struct WatchRegistration {
1290 handle: WatchHandleRef,
1291 task: JoinHandle<()>,
1292}
1293
1294struct JobExecutor;
1295
1296impl JobExecutor {
1297 async fn run(job: &QueuedJob) -> Result<(), JobExecutionError> {
1298 match job.kind {
1299 JobKind::Prefetch => {
1300 Self::run_git(
1305 &job.repo_path,
1306 &["maintenance", "run", "--task=prefetch", "--quiet"],
1307 )
1308 .await
1309 }
1310 JobKind::Maintenance => {
1311 Self::run_git(&job.repo_path, &["maintenance", "run", "--auto", "--quiet"]).await
1315 }
1316 }
1317 }
1318
1319 async fn run_git(repo_path: &Path, args: &[&str]) -> Result<(), JobExecutionError> {
1320 let output = Command::new("git")
1321 .args(args)
1322 .current_dir(repo_path)
1323 .stdout(std::process::Stdio::null())
1324 .stderr(std::process::Stdio::piped())
1325 .output()
1326 .await
1327 .map_err(JobExecutionError::Io)?;
1328
1329 if output.status.success() {
1330 Ok(())
1331 } else {
1332 let stderr = String::from_utf8_lossy(&output.stderr);
1335 if stderr.contains("No remote") || stderr.contains("not a git repository") {
1336 Ok(())
1338 } else {
1339 Err(JobExecutionError::Exit(output.status.code().unwrap_or(-1)))
1340 }
1341 }
1342 }
1343}
1344
1345#[derive(Debug, Error)]
1346enum JobExecutionError {
1347 #[error("io error: {0}")]
1348 Io(#[from] std::io::Error),
1349 #[error("git command exited with status {0}")]
1350 Exit(i32),
1351}
1352
1353#[derive(Clone)]
1354struct WatchEventEnvelope {
1355 repo_path: PathBuf,
1356 event: WatchEvent,
1357}
1358
1359pub struct InProcessService<S: MetadataStore> {
1361 inner: SharedDaemon<S>,
1362}
1363
1364impl<S: MetadataStore> InProcessService<S> {
1365 pub fn new(store: S) -> Self {
1366 Self {
1367 inner: Arc::new(Mutex::new(Daemon::new(store))),
1368 }
1369 }
1370
1371 pub fn from_shared(inner: SharedDaemon<S>) -> Self {
1372 Self { inner }
1373 }
1374}
1375
1376#[async_trait]
1377impl<S: MetadataStore> DaemonService for InProcessService<S> {
1378 async fn execute(&self, command: DaemonCommand) -> Result<DaemonResponse, DaemonError> {
1379 let mut guard = self
1380 .inner
1381 .lock()
1382 .map_err(|e| DaemonError::Transport(format!("failed to acquire daemon lock: {}", e)))?;
1383 Ok(guard.handle(command))
1384 }
1385}
1386
1387pub struct NngServer<S: MetadataStore> {
1389 address: String,
1390 daemon: SharedDaemon<S>,
1391 shutdown: Shutdown,
1392}
1393
1394impl<S: MetadataStore> NngServer<S> {
1395 pub fn new(address: impl Into<String>, daemon: SharedDaemon<S>, shutdown: Shutdown) -> Self {
1396 Self {
1397 address: address.into(),
1398 daemon,
1399 shutdown,
1400 }
1401 }
1402
1403 pub async fn run(&self) -> Result<(), ServerError> {
1404 let socket = nng::Socket::new(Protocol::Rep0)?;
1405 socket.listen(&self.address)?;
1406 let mut async_socket = AsyncSocket::try_from(socket)?;
1407
1408 loop {
1409 select! {
1410 _ = self.shutdown.wait() => break,
1411 recv = async_socket.receive(None) => {
1412 let message = match recv {
1413 Ok(msg) => msg,
1414 Err(nng::Error::Canceled) => continue,
1415 Err(err) => return Err(ServerError::Socket(err)),
1416 };
1417 let reply = self.process_message(message)?;
1418 async_socket
1419 .send(reply, None)
1420 .await
1421 .map_err(|(_, err)| ServerError::Socket(err))?;
1422 }
1423 }
1424 }
1425
1426 Ok(())
1427 }
1428
1429 fn process_message(&self, message: nng::Message) -> Result<nng::Message, ServerError> {
1430 let data = message.as_slice();
1431
1432 validate_message_size(data).map_err(|err| ServerError::Deserialization(err.to_string()))?;
1434
1435 let command: DaemonCommand = bounded_bincode()
1436 .deserialize(data)
1437 .map_err(|err| ServerError::Deserialization(err.to_string()))?;
1438
1439 let response = {
1440 let mut guard = self
1441 .daemon
1442 .lock()
1443 .map_err(|e| ServerError::Poisoned(e.to_string()))?;
1444 guard.handle(command)
1445 };
1446
1447 let payload = bounded_bincode()
1448 .serialize(&response)
1449 .map_err(|err| ServerError::Serialization(err.to_string()))?;
1450
1451 let mut reply = nng::Message::new();
1452 reply.push_back(&payload);
1453 Ok(reply)
1454 }
1455}
1456
1457pub struct NngClient {
1459 address: String,
1460}
1461
1462impl NngClient {
1463 pub fn new(address: impl Into<String>) -> Self {
1464 Self {
1465 address: address.into(),
1466 }
1467 }
1468
1469 async fn request(&self, command: DaemonCommand) -> Result<DaemonResponse, DaemonError> {
1470 let socket = nng::Socket::new(Protocol::Req0).map_err(map_client_error)?;
1471 socket.dial(&self.address).map_err(map_client_error)?;
1472 let mut async_socket = AsyncSocket::try_from(socket).map_err(map_client_error)?;
1473
1474 let payload = bounded_bincode()
1475 .serialize(&command)
1476 .map_err(|err| DaemonError::Transport(err.to_string()))?;
1477
1478 let mut message = nng::Message::new();
1479 message.push_back(&payload);
1480 async_socket
1481 .send(message, None)
1482 .await
1483 .map_err(|(_, err)| map_client_error(err))?;
1484
1485 let reply = async_socket.receive(None).await.map_err(map_client_error)?;
1486 let data = reply.as_slice();
1487
1488 validate_message_size(data).map_err(|err| DaemonError::Transport(err.to_string()))?;
1489
1490 let response: DaemonResponse = bounded_bincode()
1491 .deserialize(data)
1492 .map_err(|err| DaemonError::Transport(err.to_string()))?;
1493
1494 Ok(response)
1495 }
1496}
1497
1498#[async_trait]
1499impl DaemonService for NngClient {
1500 async fn execute(&self, command: DaemonCommand) -> Result<DaemonResponse, DaemonError> {
1501 self.request(command).await
1502 }
1503}
1504
1505fn map_client_error(err: nng::Error) -> DaemonError {
1506 DaemonError::Transport(err.to_string())
1507}
1508
1509#[derive(Debug, Error)]
1510pub enum ServerError {
1511 #[error("nng error: {0}")]
1512 Socket(#[from] nng::Error),
1513 #[error("deserialization error: {0}")]
1514 Deserialization(String),
1515 #[error("serialization error: {0}")]
1516 Serialization(String),
1517 #[error("daemon lock poisoned: {0}")]
1518 Poisoned(String),
1519}
1520
1521#[cfg(test)]
1522mod tests {
1523 use super::*;
1524 use git2::Repository;
1525 use gity_ipc::ValidatedPath;
1526 use gity_storage::InMemoryMetadataStore;
1527 use gity_watch::{ManualWatchHandle, ManualWatcher, WatchEvent, WatchEventKind};
1528 use std::time::Duration;
1529 use tempfile::tempdir;
1530
1531 fn validated_path(path: PathBuf) -> ValidatedPath {
1532 ValidatedPath::new(path).unwrap()
1533 }
1534
1535 #[test]
1536 fn scheduler_drops_completed_jobs() {
1537 let store = InMemoryMetadataStore::new();
1538 let mut daemon = Daemon::new(store);
1539 let (_repo_dir, repo) = init_temp_repo();
1540 assert!(matches!(
1541 daemon.handle(DaemonCommand::RegisterRepo {
1542 repo_path: validated_path(repo.clone())
1543 }),
1544 DaemonResponse::Ack(_)
1545 ));
1546 daemon.handle(DaemonCommand::QueueJob {
1547 repo_path: validated_path(repo.clone()),
1548 job: JobKind::Prefetch,
1549 });
1550 if let Some(job) = daemon.next_job() {
1551 daemon.mark_job_completed(&job);
1552 }
1553 match daemon.handle(DaemonCommand::ListRepos) {
1554 DaemonResponse::RepoList(entries) => assert_eq!(entries[0].pending_jobs, 0),
1555 other => panic!("unexpected response: {other:?}"),
1556 }
1557 }
1558
1559 #[test]
1560 fn metrics_snapshot_reflects_registry() {
1561 let store = InMemoryMetadataStore::new();
1562 let metrics = MetricsRegistry::default();
1563 let mut daemon = Daemon::with_metrics(store, metrics.clone());
1564
1565 metrics.record_job_spawned(JobKind::Prefetch);
1566 metrics.record_job_failed(JobKind::Prefetch);
1567 metrics.record_job_completed(JobKind::Maintenance);
1568
1569 match daemon.handle(DaemonCommand::Metrics) {
1570 DaemonResponse::Metrics(snapshot) => {
1571 let prefetch = snapshot.jobs.get(&JobKind::Prefetch).unwrap();
1572 assert_eq!(prefetch.spawned, 1);
1573 assert_eq!(prefetch.failed, 1);
1574 assert_eq!(prefetch.completed, 0);
1575 let maintenance = snapshot.jobs.get(&JobKind::Maintenance).unwrap();
1576 assert_eq!(maintenance.completed, 1);
1577 assert!(!snapshot.repos.is_empty() || snapshot.global.pending_jobs == 0);
1578 }
1579 other => panic!("unexpected response: {other:?}"),
1580 }
1581 }
1582
1583 #[test]
1584 fn fsmonitor_snapshot_returns_dirty_paths() {
1585 let store = InMemoryMetadataStore::new();
1586 let mut daemon = Daemon::new(store);
1587 let (_repo_dir, repo) = init_temp_repo();
1588 match daemon.handle(DaemonCommand::RegisterRepo {
1589 repo_path: validated_path(repo.clone()),
1590 }) {
1591 DaemonResponse::Ack(_) => {}
1592 other => panic!("unexpected response: {other:?}"),
1593 }
1594 daemon.handle_watch_event(
1595 &repo,
1596 &WatchEvent::new(repo.join("file.txt"), WatchEventKind::Modified),
1597 );
1598 let response = daemon.handle(DaemonCommand::FsMonitorSnapshot {
1599 repo_path: validated_path(repo.clone()),
1600 last_seen_generation: None,
1601 });
1602 match response {
1603 DaemonResponse::FsMonitorSnapshot(snapshot) => {
1604 assert_eq!(snapshot.repo_path, repo);
1605 assert!(snapshot.dirty_paths.contains(&PathBuf::from("file.txt")));
1606 assert!(snapshot.generation > 0);
1607 }
1608 other => panic!("unexpected response: {other:?}"),
1609 }
1610 }
1611
1612 #[tokio::test]
1613 async fn runtime_processes_jobs() {
1614 let runtime = Runtime::new(InMemoryMetadataStore::new(), None);
1615 let shutdown = runtime.shutdown_signal();
1616 let shared = runtime.shared();
1617 let runtime_task = tokio::spawn(runtime.run());
1618
1619 let service = InProcessService::from_shared(shared.clone());
1620 let (_repo_dir, repo) = init_temp_repo();
1621 service
1622 .execute(DaemonCommand::RegisterRepo {
1623 repo_path: validated_path(repo.clone()),
1624 })
1625 .await
1626 .unwrap();
1627 service
1628 .execute(DaemonCommand::QueueJob {
1629 repo_path: validated_path(repo.clone()),
1630 job: JobKind::Maintenance,
1631 })
1632 .await
1633 .unwrap();
1634
1635 tokio::time::sleep(Duration::from_millis(300)).await;
1636 shutdown.shutdown();
1637 runtime_task.await.unwrap();
1638
1639 match service.execute(DaemonCommand::ListRepos).await.unwrap() {
1640 DaemonResponse::RepoList(entries) => assert_eq!(entries[0].pending_jobs, 0),
1641 other => panic!("unexpected response: {other:?}"),
1642 }
1643 }
1644
1645 #[tokio::test]
1646 async fn nng_client_server_roundtrip() {
1647 let runtime = Runtime::new(InMemoryMetadataStore::new(), None);
1648 let shutdown = runtime.shutdown_signal();
1649 let shared = runtime.shared();
1650 let address = test_address();
1651 let server = NngServer::new(address.clone(), shared, shutdown.clone());
1652 let runtime_task = tokio::spawn(runtime.run());
1653 let server_task = tokio::spawn(async move {
1654 server.run().await.expect("server exits cleanly");
1655 });
1656
1657 tokio::time::sleep(Duration::from_millis(100)).await;
1658
1659 let client = NngClient::new(address);
1660 let (_repo_dir, repo_path) = init_temp_repo();
1661 client
1662 .execute(DaemonCommand::RegisterRepo {
1663 repo_path: validated_path(repo_path.clone()),
1664 })
1665 .await
1666 .unwrap();
1667
1668 match client.execute(DaemonCommand::ListRepos).await.unwrap() {
1669 DaemonResponse::RepoList(entries) => assert_eq!(entries.len(), 1),
1670 other => panic!("unexpected response: {other:?}"),
1671 }
1672
1673 shutdown.shutdown();
1674 runtime_task.await.unwrap();
1675 server_task.await.unwrap();
1676 }
1677
1678 #[tokio::test]
1679 async fn watcher_events_record_last_event() {
1680 let runtime = Runtime::with_watcher(
1681 InMemoryMetadataStore::new(),
1682 Arc::new(ManualWatcher::new()),
1683 None,
1684 );
1685 let watcher_state = runtime.watcher_state();
1686 let shutdown = runtime.shutdown_signal();
1687 let shared = runtime.shared();
1688 let runtime_task = tokio::spawn(runtime.run());
1689
1690 let service = InProcessService::from_shared(shared);
1691 let (_dir, repo_path) = init_temp_repo();
1692 std::fs::write(repo_path.join("file.txt"), "data").expect("write file");
1693 service
1694 .execute(DaemonCommand::RegisterRepo {
1695 repo_path: validated_path(repo_path.clone()),
1696 })
1697 .await
1698 .unwrap();
1699
1700 tokio::time::sleep(Duration::from_millis(300)).await;
1701
1702 let manual_handle = {
1703 let state = watcher_state.lock().expect("watcher state poisoned");
1704 let registration = state.values().next().expect("watcher registered");
1705 registration.handle.clone()
1706 };
1707 let manual = manual_handle
1708 .as_any()
1709 .downcast_ref::<ManualWatchHandle>()
1710 .expect("manual handle");
1711 manual
1712 .emit(WatchEvent::new(
1713 repo_path.join("file.txt"),
1714 WatchEventKind::Modified,
1715 ))
1716 .unwrap();
1717
1718 tokio::time::sleep(Duration::from_millis(300)).await;
1719
1720 match service.execute(DaemonCommand::ListRepos).await.unwrap() {
1721 DaemonResponse::RepoList(entries) => {
1722 assert!(entries[0].last_event.is_some());
1723 }
1724 other => panic!("unexpected response: {other:?}"),
1725 }
1726
1727 let status_response = service
1728 .execute(DaemonCommand::Status {
1729 repo_path: validated_path(repo_path.clone()),
1730 known_generation: None,
1731 })
1732 .await
1733 .unwrap();
1734 match status_response {
1735 DaemonResponse::RepoStatus(detail) => {
1736 assert!(
1737 detail.dirty_paths.contains(&PathBuf::from("file.txt")),
1738 "status should include modified file: {:?}",
1739 detail.dirty_paths
1740 );
1741 assert!(detail.generation > 0);
1742 }
1743 other => panic!("unexpected response: {other:?}"),
1744 }
1745
1746 shutdown.shutdown();
1747 runtime_task.await.unwrap();
1748 }
1749
1750 fn init_temp_repo() -> (tempfile::TempDir, PathBuf) {
1751 let dir = tempdir().expect("create temp dir");
1752 Repository::init(dir.path()).expect("init repo");
1753 let path = dir.path().to_path_buf();
1754 (dir, path)
1755 }
1756
1757 fn test_address() -> String {
1758 use std::net::TcpListener;
1759 let listener = TcpListener::bind("127.0.0.1:0").expect("bind ephemeral port");
1760 let addr = listener.local_addr().unwrap();
1761 drop(listener);
1762 format!("tcp://{}", addr)
1763 }
1764}