1use std::collections::{HashMap, HashSet};
14use std::io::{BufRead, BufReader, Write as IoWrite};
15#[cfg(unix)]
16use std::os::unix::net::UnixStream as StdUnixStream;
17use std::path::{Path, PathBuf};
18use std::sync::Arc;
19use std::time::{Duration, Instant};
20
21use notify::{Event, EventKind, RecommendedWatcher, RecursiveMode, Watcher};
22use serde::{Deserialize, Serialize};
23use tokio::io::{AsyncBufReadExt, AsyncWriteExt};
24#[cfg(unix)]
25use tokio::net::UnixListener;
26use tokio::sync::{Mutex, mpsc};
27
28use crate::config::{
29 self, AutoApplyPolicyConfig, CfgdConfig, MergedProfile, NotifyMethod, OriginType, PolicyAction,
30 ResolvedProfile,
31};
32use crate::errors::{DaemonError, Result};
33use crate::output::{Printer, Role};
34use crate::providers::{FileAction, PackageAction, PackageManager, ProviderRegistry};
35use crate::state::StateStore;
36
37pub trait DaemonHooks: Send + Sync {
40 fn build_registry(&self, config: &CfgdConfig) -> ProviderRegistry;
42
43 fn plan_files(&self, config_dir: &Path, resolved: &ResolvedProfile) -> Result<Vec<FileAction>>;
45
46 fn plan_packages(
48 &self,
49 profile: &MergedProfile,
50 managers: &[&dyn PackageManager],
51 ) -> Result<Vec<PackageAction>>;
52
53 fn extend_registry_custom_managers(
55 &self,
56 registry: &mut ProviderRegistry,
57 packages: &config::PackagesSpec,
58 );
59
60 fn expand_tilde(&self, path: &Path) -> PathBuf;
62}
63
64const DEBOUNCE_MS: u64 = 500;
65
66#[cfg(unix)]
68const IPC_SOCKET_FILE: &str = "cfgd.sock";
69
70#[cfg(windows)]
74const WINDOWS_PIPE_PATH: &str = r"\\.\pipe\cfgd";
75
76pub(crate) fn resolve_default_ipc_path() -> PathBuf {
93 if let Some(override_path) = std::env::var_os("CFGD_DAEMON_IPC_PATH") {
94 return PathBuf::from(override_path);
95 }
96 #[cfg(unix)]
97 {
98 crate::default_runtime_dir()
99 .map(|dir| dir.join(IPC_SOCKET_FILE))
100 .unwrap_or_else(|| PathBuf::from("/tmp/cfgd.sock"))
101 }
102 #[cfg(windows)]
103 {
104 PathBuf::from(WINDOWS_PIPE_PATH)
105 }
106}
107const DEFAULT_RECONCILE_SECS: u64 = 300; const DEFAULT_SYNC_SECS: u64 = 300; #[cfg(unix)]
110const LAUNCHD_LABEL: &str = "com.cfgd.daemon";
111#[cfg(unix)]
112const LAUNCHD_AGENTS_DIR: &str = "Library/LaunchAgents";
113#[cfg(unix)]
114const SYSTEMD_USER_DIR: &str = ".config/systemd/user";
115
116pub(super) struct SyncTask {
119 source_name: String,
120 repo_path: PathBuf,
121 auto_pull: bool,
122 auto_push: bool,
123 auto_apply: bool,
124 interval: Duration,
125 last_synced: Option<Instant>,
126 require_signed_commits: bool,
128 allow_unsigned: bool,
130}
131
132pub(super) struct ReconcileTask {
135 entity: String,
137 interval: Duration,
138 auto_apply: bool,
139 drift_policy: config::DriftPolicy,
140 last_reconciled: Option<Instant>,
141}
142
143#[derive(Debug, Clone, Serialize, Deserialize)]
146#[serde(rename_all = "camelCase")]
147pub struct SourceStatus {
148 pub name: String,
149 pub last_sync: Option<String>,
150 pub last_reconcile: Option<String>,
151 pub drift_count: u32,
152 pub status: String,
153}
154
155#[derive(Debug, Clone, Serialize, Deserialize)]
158#[serde(rename_all = "camelCase")]
159pub struct DaemonStatusResponse {
160 pub running: bool,
161 pub pid: u32,
162 pub uptime_secs: u64,
163 pub last_reconcile: Option<String>,
164 pub last_sync: Option<String>,
165 pub drift_count: u32,
166 pub sources: Vec<SourceStatus>,
167 #[serde(skip_serializing_if = "Option::is_none")]
168 pub update_available: Option<String>,
169 #[serde(default, skip_serializing_if = "Vec::is_empty")]
170 pub module_reconcile: Vec<ModuleReconcileStatus>,
171}
172
173#[derive(Debug, Clone, Serialize, Deserialize)]
174#[serde(rename_all = "camelCase")]
175pub struct ModuleReconcileStatus {
176 pub name: String,
177 pub interval: String,
178 pub auto_apply: bool,
179 pub drift_policy: String,
180 pub last_reconcile: Option<String>,
181}
182
183pub(super) struct DaemonState {
184 started_at: Instant,
185 last_reconcile: Option<String>,
186 last_sync: Option<String>,
187 drift_count: u32,
188 sources: Vec<SourceStatus>,
189 update_available: Option<String>,
190 module_last_reconcile: HashMap<String, String>,
191 store_path: Option<PathBuf>,
195}
196
197impl DaemonState {
198 fn new() -> Self {
199 Self {
200 started_at: Instant::now(),
201 last_reconcile: None,
202 last_sync: None,
203 drift_count: 0,
204 sources: vec![SourceStatus {
205 name: "local".to_string(),
206 last_sync: None,
207 last_reconcile: None,
208 drift_count: 0,
209 status: "active".to_string(),
210 }],
211 update_available: None,
212 module_last_reconcile: HashMap::new(),
213 store_path: None,
214 }
215 }
216
217 fn with_store_path(mut self, path: PathBuf) -> Self {
218 self.store_path = Some(path);
219 self
220 }
221
222 #[cfg(test)]
223 pub(super) fn store_path_for_test(&self) -> Option<&Path> {
224 self.store_path.as_deref()
225 }
226
227 fn to_response(&self) -> DaemonStatusResponse {
228 DaemonStatusResponse {
229 running: true,
230 pid: std::process::id(),
231 uptime_secs: self.started_at.elapsed().as_secs(),
232 last_reconcile: self.last_reconcile.clone(),
233 last_sync: self.last_sync.clone(),
234 drift_count: self.drift_count,
235 sources: self.sources.clone(),
236 update_available: self.update_available.clone(),
237 module_reconcile: vec![],
238 }
239 }
240}
241
242pub(super) struct Notifier {
245 method: NotifyMethod,
246 webhook_url: Option<String>,
247}
248
249impl Notifier {
250 fn new(method: NotifyMethod, webhook_url: Option<String>) -> Self {
251 Self {
252 method,
253 webhook_url,
254 }
255 }
256
257 fn notify(&self, title: &str, message: &str) {
258 match self.method {
259 NotifyMethod::Desktop => self.notify_desktop(title, message),
260 NotifyMethod::Stdout => self.notify_stdout(title, message),
261 NotifyMethod::Webhook => self.notify_webhook(title, message),
262 }
263 }
264
265 fn notify_desktop(&self, title: &str, message: &str) {
266 match notify_rust::Notification::new()
267 .summary(title)
268 .body(message)
269 .appname("cfgd")
270 .show()
271 {
272 Ok(_) => tracing::debug!(title = %title, "desktop notification sent"),
273 Err(e) => {
274 tracing::warn!(error = %e, "desktop notification failed, falling back to stdout");
275 self.notify_stdout(title, message);
276 }
277 }
278 }
279
280 fn notify_stdout(&self, title: &str, message: &str) {
281 tracing::info!(title = %title, message = %message, "notification");
282 }
283
284 fn notify_webhook(&self, title: &str, message: &str) {
285 let Some(ref url) = self.webhook_url else {
286 tracing::warn!("webhook notification requested but no webhook-url configured");
287 return;
288 };
289
290 let url = url.clone();
291 let body = build_webhook_payload(title, message, &crate::utc_now_iso8601());
292
293 tokio::task::spawn_blocking(move || {
295 match crate::http::http_agent(crate::http::HTTP_WEBHOOK_TIMEOUT)
296 .post(&url)
297 .set("Content-Type", "application/json")
298 .send_string(&body)
299 {
300 Ok(_) => tracing::debug!(url = %url, "webhook notification sent"),
301 Err(e) => tracing::warn!(error = %e, "webhook notification failed"),
302 }
303 });
304 }
305}
306pub(super) fn build_webhook_payload(title: &str, message: &str, timestamp_iso: &str) -> String {
311 serde_json::json!({
312 "event": title,
313 "message": message,
314 "timestamp": timestamp_iso,
315 "source": "cfgd",
316 })
317 .to_string()
318}
319
320mod checkin;
323mod daemon_config;
324mod drift;
325mod git;
326mod health_ipc;
327mod reconcile;
328mod runner;
329mod service;
330mod sync;
331
332#[cfg(test)]
333mod tests;
334
335use checkin::*;
336use daemon_config::*;
337#[allow(unused_imports)]
342use drift::*;
343use git::*;
344use health_ipc::*;
345use reconcile::*;
346use runner::*;
347#[allow(unused_imports)]
352use service::*;
353#[allow(unused_imports)]
358use sync::*;
359
360pub use git::git_pull_sync;
363pub use health_ipc::query_daemon_status;
364pub use service::{install_service, run_as_windows_service, uninstall_service};
365
366pub(super) struct PreLoopSetup {
376 pub cfg: CfgdConfig,
377 pub parsed: ParsedDaemonConfig,
378 pub notifier: Arc<Notifier>,
379 pub compliance_config: Option<config::ComplianceConfig>,
380 pub compliance_interval: Option<Duration>,
381 pub config_dir: PathBuf,
382 pub sync_tasks: Vec<SyncTask>,
383 pub initial_source_status: Vec<SourceStatus>,
384 pub managed_paths: Vec<PathBuf>,
385 pub reconcile_tasks: Vec<ReconcileTask>,
386 pub shortest_reconcile: Duration,
387 pub shortest_sync: Duration,
388 pub server_checkin_url: Option<String>,
389}
390
391pub(super) fn build_pre_loop_setup(
398 config_path: &Path,
399 profile_override: Option<&str>,
400 hooks: &dyn DaemonHooks,
401) -> Result<PreLoopSetup> {
402 let cfg = config::load_config(config_path)?;
403 let daemon_cfg = cfg.spec.daemon.clone().unwrap_or(config::DaemonConfig {
404 enabled: true,
405 reconcile: None,
406 sync: None,
407 notify: None,
408 windows_event_log: false,
409 });
410 let parsed = parse_daemon_config(&daemon_cfg);
411 let notifier = Arc::new(Notifier::new(
412 parsed.notify_method.clone(),
413 parsed.webhook_url.clone(),
414 ));
415
416 let compliance_config = cfg.spec.compliance.clone();
417 let compliance_interval = compliance_config
418 .as_ref()
419 .filter(|c| c.enabled)
420 .and_then(|c| crate::parse_duration_str(&c.interval).ok());
421
422 let config_dir = config_path
423 .parent()
424 .unwrap_or_else(|| Path::new("."))
425 .to_path_buf();
426 let allow_unsigned = cfg.spec.security.as_ref().is_some_and(|s| s.allow_unsigned);
427
428 let source_cache_dir = crate::sources::SourceManager::default_cache_dir()
429 .unwrap_or_else(|_| config_dir.join(".cfgd-sources"));
430 let sync_tasks = build_sync_tasks(
431 &config_dir,
432 &parsed,
433 &cfg.spec.sources,
434 allow_unsigned,
435 &source_cache_dir,
436 |source_dir| {
437 crate::sources::detect_source_manifest(source_dir)
438 .ok()
439 .flatten()
440 .map(|m| m.spec.policy.constraints.require_signed_commits)
441 },
442 );
443 let initial_source_status = build_initial_source_status(&cfg.spec.sources);
444
445 let managed_paths = discover_managed_paths(config_path, profile_override, hooks);
446
447 let profiles_dir = config_dir.join("profiles");
448 let profile_name = profile_override
449 .or(cfg.spec.profile.as_deref())
450 .unwrap_or("default");
451 let resolved_profile = config::resolve_profile(profile_name, &profiles_dir).ok();
452 let profile_chain: Vec<String> = resolved_profile
453 .as_ref()
454 .map(|r| r.layers.iter().map(|l| l.profile_name.clone()).collect())
455 .unwrap_or_else(|| vec![profile_name.to_string()]);
456 let chain_refs: Vec<&str> = profile_chain.iter().map(|s| s.as_str()).collect();
457 let reconcile_tasks = build_reconcile_tasks(
458 &daemon_cfg,
459 resolved_profile.as_ref(),
460 &chain_refs,
461 parsed.reconcile_interval,
462 parsed.auto_apply,
463 );
464
465 let shortest_reconcile = reconcile_tasks
466 .iter()
467 .map(|t| t.interval)
468 .min()
469 .unwrap_or(parsed.reconcile_interval);
470 let shortest_sync = sync_tasks
471 .iter()
472 .map(|t| t.interval)
473 .min()
474 .unwrap_or(parsed.sync_interval);
475
476 let server_checkin_url = find_server_url(&cfg);
477
478 Ok(PreLoopSetup {
479 cfg,
480 parsed,
481 notifier,
482 compliance_config,
483 compliance_interval,
484 config_dir,
485 sync_tasks,
486 initial_source_status,
487 managed_paths,
488 reconcile_tasks,
489 shortest_reconcile,
490 shortest_sync,
491 server_checkin_url,
492 })
493}
494
495pub async fn run_daemon(
498 config_path: PathBuf,
499 profile_override: Option<String>,
500 printer: Arc<Printer>,
501 hooks: Arc<dyn DaemonHooks>,
502) -> Result<()> {
503 run_daemon_with(
504 config_path,
505 profile_override,
506 printer,
507 hooks,
508 DaemonRunOverrides::default(),
509 )
510 .await
511}
512
513#[derive(Default)]
535pub(super) struct DaemonRunOverrides {
536 pub ipc_path: Option<PathBuf>,
537 pub state_dir_override: Option<PathBuf>,
538 pub skip_health_server: bool,
539 pub skip_startup_checkin: bool,
540 pub(in crate::daemon) external_triggers: Option<DaemonTriggers>,
541}
542
543struct TriggerSetup {
548 triggers: DaemonTriggers,
549 reconcile_pump: Option<tokio::task::JoinHandle<()>>,
550 sync_pump: Option<tokio::task::JoinHandle<()>>,
551 version_check_pump: Option<tokio::task::JoinHandle<()>>,
552 compliance_pump: Option<tokio::task::JoinHandle<()>>,
553 sighup_pump: Option<tokio::task::JoinHandle<()>>,
554 shutdown_task: Option<tokio::task::JoinHandle<()>>,
555}
556
557pub(super) async fn run_daemon_with(
558 config_path: PathBuf,
559 profile_override: Option<String>,
560 printer: Arc<Printer>,
561 hooks: Arc<dyn DaemonHooks>,
562 overrides: DaemonRunOverrides,
563) -> Result<()> {
564 printer.heading("Daemon");
565 printer.status_simple(Role::Info, "Starting cfgd daemon...");
566
567 let setup = build_pre_loop_setup(&config_path, profile_override.as_deref(), &*hooks)?;
568
569 let (daemon_state, state_dir_warning) =
570 init_daemon_state_with_warning(overrides.state_dir_override.as_deref());
571 if let Some(msg) = state_dir_warning {
572 printer.status_simple(Role::Warn, msg);
573 }
574 let state = Arc::new(Mutex::new(daemon_state));
575
576 {
578 let mut st = state.lock().await;
579 st.sources.extend(setup.initial_source_status.clone());
580 }
581
582 let using_external_triggers = overrides.external_triggers.is_some();
585 let (file_rx_for_triggers, _watcher_handle): (
586 Option<mpsc::Receiver<PathBuf>>,
587 Option<notify::RecommendedWatcher>,
588 ) = if using_external_triggers {
589 (None, None)
590 } else {
591 let (file_tx, file_rx) = mpsc::channel::<PathBuf>(256);
592 let watcher = setup_file_watcher(file_tx, &setup.managed_paths, &setup.config_dir)?;
593 (Some(file_rx), Some(watcher))
594 };
595
596 let ipc_path = overrides
597 .ipc_path
598 .clone()
599 .unwrap_or_else(resolve_default_ipc_path);
600 check_already_running(&ipc_path)?;
601
602 let health_handle = if overrides.skip_health_server {
604 None
605 } else {
606 let health_state = Arc::clone(&state);
607 let health_ipc_path = ipc_path.to_string_lossy().to_string();
608 Some(tokio::spawn(async move {
609 if let Err(e) = run_health_server(&health_ipc_path, health_state).await {
610 tracing::error!(error = %e, "health server error");
611 }
612 }))
613 };
614
615 let intervals = format_interval_lines(&setup.parsed, setup.compliance_interval);
616 print_startup_banner(&printer, &intervals, &ipc_path.to_string_lossy());
617
618 if setup.server_checkin_url.is_some() && !overrides.skip_startup_checkin {
620 let startup_cfg = setup.cfg.clone();
621 let startup_config_path = config_path.clone();
622 let startup_profile_override = profile_override.clone();
623 tokio::task::spawn_blocking(move || {
624 run_startup_checkin_blocking(
625 &startup_config_path,
626 startup_profile_override.as_deref(),
627 &startup_cfg,
628 );
629 })
630 .await
631 .map_err(|e| DaemonError::WatchError {
632 message: format!("startup check-in task failed: {}", e),
633 })?;
634 }
635
636 let reconcile_secs = Arc::new(std::sync::atomic::AtomicU64::new(
639 setup.shortest_reconcile.as_secs(),
640 ));
641 let sync_secs = Arc::new(std::sync::atomic::AtomicU64::new(
642 setup.shortest_sync.as_secs(),
643 ));
644
645 let TriggerSetup {
649 triggers,
650 reconcile_pump,
651 sync_pump,
652 version_check_pump,
653 compliance_pump,
654 sighup_pump,
655 shutdown_task,
656 } = if let Some(t) = overrides.external_triggers {
657 TriggerSetup {
658 triggers: t,
659 reconcile_pump: None,
660 sync_pump: None,
661 version_check_pump: None,
662 compliance_pump: None,
663 sighup_pump: None,
664 shutdown_task: None,
665 }
666 } else {
667 let (reconcile_tx, reconcile_rx) = mpsc::channel::<()>(8);
668 let (sync_tx, sync_rx) = mpsc::channel::<()>(8);
669 let (version_check_tx, version_check_rx) = mpsc::channel::<()>(8);
670 let (compliance_tx, compliance_rx) = mpsc::channel::<()>(8);
671 let (sighup_tx, sighup_rx) = mpsc::channel::<()>(8);
672
673 let reconcile_pump = spawn_interval_pump(Arc::clone(&reconcile_secs), reconcile_tx);
674 let sync_pump = spawn_interval_pump(Arc::clone(&sync_secs), sync_tx);
675
676 let version_check_secs = Arc::new(std::sync::atomic::AtomicU64::new(
677 crate::upgrade::version_check_interval().as_secs(),
678 ));
679 let version_check_pump = spawn_interval_pump(version_check_secs, version_check_tx);
680
681 let compliance_pump = setup.compliance_interval.map(|d| {
682 let secs = Arc::new(std::sync::atomic::AtomicU64::new(d.as_secs()));
683 spawn_interval_pump(secs, compliance_tx)
684 });
685
686 #[cfg(unix)]
687 let sighup_pump = Some(spawn_sighup_pump(sighup_tx)?);
688 #[cfg(not(unix))]
689 let sighup_pump: Option<tokio::task::JoinHandle<()>> = {
690 let _ = sighup_tx; None
692 };
693
694 let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel::<()>();
695 let shutdown_printer = Arc::clone(&printer);
696 let shutdown_task = tokio::spawn(async move {
697 wait_for_shutdown(shutdown_printer).await;
698 let _ = shutdown_tx.send(());
699 });
700
701 let file_rx = file_rx_for_triggers.ok_or_else(|| DaemonError::WatchError {
702 message: "internal: production path did not initialise file watcher".to_string(),
703 })?;
704 TriggerSetup {
705 triggers: DaemonTriggers {
706 file_rx,
707 reconcile_rx,
708 sync_rx,
709 version_check_rx,
710 compliance_rx,
711 sighup_rx,
712 shutdown_rx,
713 },
714 reconcile_pump: Some(reconcile_pump),
715 sync_pump: Some(sync_pump),
716 version_check_pump: Some(version_check_pump),
717 compliance_pump,
718 sighup_pump,
719 shutdown_task: Some(shutdown_task),
720 }
721 };
722
723 let ctx = DaemonLoopContext {
724 state: Arc::clone(&state),
725 hooks: Arc::clone(&hooks),
726 notifier: Arc::clone(&setup.notifier),
727 config_path: config_path.clone(),
728 profile_override: profile_override.clone(),
729 on_change_reconcile: setup.parsed.on_change_reconcile,
730 notify_on_drift: setup.parsed.notify_on_drift,
731 compliance_config: setup.compliance_config.clone(),
732 printer: Arc::clone(&printer),
733 state_dir_override: overrides.state_dir_override.clone(),
734 };
735
736 let loop_result = run_daemon_loop(
737 ctx,
738 triggers,
739 setup.reconcile_tasks,
740 setup.sync_tasks,
741 reconcile_secs,
742 sync_secs,
743 )
744 .await;
745
746 if let Some(h) = reconcile_pump {
748 h.abort();
749 }
750 if let Some(h) = sync_pump {
751 h.abort();
752 }
753 if let Some(h) = version_check_pump {
754 h.abort();
755 }
756 if let Some(h) = compliance_pump {
757 h.abort();
758 }
759 if let Some(h) = sighup_pump {
760 h.abort();
761 }
762 if let Some(h) = shutdown_task {
763 h.abort();
764 }
765
766 if let Some(h) = health_handle {
768 h.abort();
769 let _ = h.await;
772 }
773 cleanup_ipc_socket(&ipc_path);
774
775 printer.status_simple(Role::Ok, "Daemon stopped");
776 loop_result
777}
778
779#[cfg(test)]
788pub(super) fn init_daemon_state(override_dir: Option<&Path>) -> DaemonState {
789 init_daemon_state_with_warning(override_dir).0
790}
791
792pub(super) fn init_daemon_state_with_warning(
797 override_dir: Option<&Path>,
798) -> (DaemonState, Option<String>) {
799 let dir_result = override_dir
800 .map(|d| Ok(d.to_path_buf()))
801 .unwrap_or_else(crate::state::default_state_dir);
802 match dir_result {
803 Ok(dir) => (
804 DaemonState::new().with_store_path(dir.join("state.db")),
805 None,
806 ),
807 Err(e) => {
808 tracing::warn!(error = %e, "cannot resolve default state dir; /drift endpoint disabled");
809 let banner = format!("Drift endpoint disabled: cannot resolve default state dir ({e})");
810 (DaemonState::new(), Some(banner))
811 }
812 }
813}
814
815pub(super) fn check_already_running(_ipc_path: &Path) -> Result<()> {
821 #[cfg(unix)]
822 {
823 if _ipc_path.exists() {
824 if StdUnixStream::connect(_ipc_path).is_ok() {
825 return Err(DaemonError::AlreadyRunning {
826 pid: std::process::id(),
827 }
828 .into());
829 }
830 let _ = std::fs::remove_file(_ipc_path);
832 }
833 }
834 #[cfg(windows)]
835 {
836 if connect_daemon_ipc().is_some() {
837 return Err(DaemonError::AlreadyRunning {
838 pid: std::process::id(),
839 }
840 .into());
841 }
842 }
843 Ok(())
844}
845
846pub(super) fn format_interval_lines(
850 parsed: &ParsedDaemonConfig,
851 compliance_interval: Option<Duration>,
852) -> Vec<String> {
853 let mut intervals = vec![format!(
854 "reconcile={}s",
855 parsed.reconcile_interval.as_secs()
856 )];
857 if parsed.auto_pull || parsed.auto_push {
858 intervals.push(format!(
859 "sync={}s (pull={}, push={})",
860 parsed.sync_interval.as_secs(),
861 parsed.auto_pull,
862 parsed.auto_push
863 ));
864 }
865 if let Some(interval) = compliance_interval {
866 intervals.push(format!("compliance={}s", interval.as_secs()));
867 }
868 intervals
869}
870
871pub(super) fn print_startup_banner(printer: &Printer, intervals: &[String], ipc_path: &str) {
875 printer.status_simple(Role::Ok, format!("Health: {}", ipc_path));
876 printer.status_simple(Role::Ok, format!("Intervals: {}", intervals.join(", ")));
877 printer.status_simple(Role::Info, "Daemon running — press Ctrl+C to stop");
878}
879
880pub(super) fn run_startup_checkin_blocking(
886 config_path: &Path,
887 profile_override: Option<&str>,
888 cfg: &CfgdConfig,
889) {
890 let config_dir = config_path
891 .parent()
892 .unwrap_or_else(|| Path::new("."))
893 .to_path_buf();
894 let profiles_dir = config_dir.join("profiles");
895 let profile_name = match profile_override.or(cfg.spec.profile.as_deref()) {
896 Some(p) => p,
897 None => {
898 tracing::error!("no profile configured — skipping reconciliation");
899 return;
900 }
901 };
902 match config::resolve_profile(profile_name, &profiles_dir) {
903 Ok(resolved) => {
904 let changed = try_server_checkin(cfg, &resolved);
905 if changed {
906 tracing::info!("server reports config changed at startup");
907 }
908 match crate::state::load_pending_server_config() {
911 Ok(Some(_pending)) => {
912 tracing::info!(
913 "startup: found pending server config — first reconcile will apply it"
914 );
915 if let Err(e) = crate::state::clear_pending_server_config() {
916 tracing::warn!(error = %e, "startup: failed to clear pending server config");
917 }
918 }
919 Ok(None) => {}
920 Err(e) => {
921 tracing::warn!(error = %e, "startup: failed to load pending server config");
922 }
923 }
924 }
925 Err(e) => {
926 tracing::warn!(error = %e, "startup check-in: failed to resolve profile");
927 }
928 }
929}
930
931#[allow(unused_variables)]
934pub(super) fn cleanup_ipc_socket(ipc_path: &Path) {
935 #[cfg(unix)]
936 {
937 if ipc_path.exists() {
938 let _ = std::fs::remove_file(ipc_path);
939 }
940 }
941}
942
943fn spawn_interval_pump(
949 interval_secs: Arc<std::sync::atomic::AtomicU64>,
950 tx: mpsc::Sender<()>,
951) -> tokio::task::JoinHandle<()> {
952 tokio::spawn(async move {
953 loop {
954 let secs = interval_secs
955 .load(std::sync::atomic::Ordering::Relaxed)
956 .max(1);
957 tokio::time::sleep(Duration::from_secs(secs)).await;
958 if tx.send(()).await.is_err() {
959 break;
960 }
961 }
962 })
963}
964
965#[cfg(unix)]
967fn spawn_sighup_pump(tx: mpsc::Sender<()>) -> Result<tokio::task::JoinHandle<()>> {
968 let mut signal = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::hangup())
969 .map_err(|e| DaemonError::WatchError {
970 message: format!("failed to register SIGHUP handler: {}", e),
971 })?;
972 Ok(tokio::spawn(async move {
973 while signal.recv().await.is_some() {
974 if tx.send(()).await.is_err() {
975 break;
976 }
977 }
978 }))
979}
980
981async fn wait_for_shutdown(printer: Arc<Printer>) {
984 #[cfg(unix)]
985 {
986 let sigterm = async {
987 match tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate()) {
988 Ok(mut s) => {
989 s.recv().await;
990 }
991 Err(e) => {
992 tracing::warn!(error = %e, "failed to register SIGTERM handler");
993 std::future::pending::<()>().await;
994 }
995 }
996 };
997 tokio::select! {
998 _ = sigterm => {
999 printer.status_simple(Role::Info, "Received SIGTERM, shutting down daemon...");
1000 }
1001 _ = tokio::signal::ctrl_c() => {
1002 printer.status_simple(Role::Info, "Shutting down daemon...");
1003 }
1004 }
1005 }
1006 #[cfg(not(unix))]
1007 {
1008 let _ = tokio::signal::ctrl_c().await;
1009 printer.status_simple(Role::Info, "Shutting down daemon...");
1010 }
1011}
1012
1013pub(crate) fn parse_duration_or_default(s: &str) -> Duration {
1025 crate::parse_duration_str(s).unwrap_or(Duration::from_secs(DEFAULT_RECONCILE_SECS))
1026}