Skip to main content

cfgd_core/daemon/
mod.rs

1// Daemon — file watchers, reconciliation loop, sync, notifications, health endpoint, service management
2//
3// Locking convention (enforced by code review, not the compiler):
4//   * `DaemonState` lives behind `Arc<tokio::sync::Mutex<_>>`.
5//   * Every `.lock().await` MUST drop the guard before any `.await` on
6//     network / filesystem / subprocess I/O. The pattern is: acquire,
7//     clone out the fields needed, drop the guard, then do work.
8//   * Holding the lock across an await would serialize the daemon onto
9//     one in-flight request and invites deadlock when handlers call
10//     each other. All 19 current `.lock().await` sites follow this rule;
11//     new sites must too.
12
13use std::collections::{HashMap, HashSet};
14use std::io::{BufRead, BufReader, Write as IoWrite};
15#[cfg(unix)]
16use std::os::unix::net::UnixStream as StdUnixStream;
17use std::path::{Path, PathBuf};
18use std::sync::Arc;
19use std::time::{Duration, Instant};
20
21use notify::{Event, EventKind, RecommendedWatcher, RecursiveMode, Watcher};
22use serde::{Deserialize, Serialize};
23use tokio::io::{AsyncBufReadExt, AsyncWriteExt};
24#[cfg(unix)]
25use tokio::net::UnixListener;
26use tokio::sync::{Mutex, mpsc};
27
28use crate::config::{
29    self, AutoApplyPolicyConfig, CfgdConfig, MergedProfile, NotifyMethod, OriginType, PolicyAction,
30    ResolvedProfile,
31};
32use crate::errors::{DaemonError, Result};
33use crate::output::{Printer, Role};
34use crate::providers::{FileAction, PackageAction, PackageManager, ProviderRegistry};
35use crate::state::StateStore;
36
37/// Trait for binary-specific operations the daemon needs.
38/// The workstation binary (`cfgd`) implements this with concrete provider types.
39pub trait DaemonHooks: Send + Sync {
40    /// Build a ProviderRegistry with all available providers for this binary.
41    fn build_registry(&self, config: &CfgdConfig) -> ProviderRegistry;
42
43    /// Plan file actions by comparing desired vs actual state.
44    fn plan_files(&self, config_dir: &Path, resolved: &ResolvedProfile) -> Result<Vec<FileAction>>;
45
46    /// Plan package actions by comparing installed vs desired.
47    fn plan_packages(
48        &self,
49        profile: &MergedProfile,
50        managers: &[&dyn PackageManager],
51    ) -> Result<Vec<PackageAction>>;
52
53    /// Extend the registry with custom (user-defined) package managers from the profile.
54    fn extend_registry_custom_managers(
55        &self,
56        registry: &mut ProviderRegistry,
57        packages: &config::PackagesSpec,
58    );
59
60    /// Expand tilde (~) to home directory in a path.
61    fn expand_tilde(&self, path: &Path) -> PathBuf;
62}
63
64const DEBOUNCE_MS: u64 = 500;
65
66/// Per-user fallback socket name placed under the resolved runtime directory.
67#[cfg(unix)]
68const IPC_SOCKET_FILE: &str = "cfgd.sock";
69
70/// Windows IPC endpoint. Named pipes are kernel objects in the
71/// `\\.\pipe\` namespace — per-session, not file-system objects — so the
72/// per-user-directory dance Unix needs does not apply here.
73#[cfg(windows)]
74const WINDOWS_PIPE_PATH: &str = r"\\.\pipe\cfgd";
75
76/// Resolve the daemon IPC endpoint when no explicit override is supplied.
77///
78/// Honors `CFGD_DAEMON_IPC_PATH` first so test harnesses and operators can
79/// isolate the socket. Otherwise:
80/// - Unix: places `cfgd.sock` under [`crate::default_runtime_dir`], which is
81///   `$XDG_RUNTIME_DIR/cfgd` on Linux when available (per-user tmpfs),
82///   `$HOME/.cache/cfgd` as the Linux fallback, and
83///   `$HOME/Library/Application Support/cfgd` on macOS. World-writable
84///   `/tmp` is deliberately avoided — see the v0.4.0 hijack-vector audit.
85///   A last-ditch fallback to `/tmp/cfgd.sock` only fires when home
86///   resolution fails entirely (no `$HOME`, no override); the bind path
87///   later refuses to listen if the parent dir is not owner-only.
88/// - Windows: returns the named-pipe path verbatim.
89///
90/// Used by both the server-side bind (`run_daemon_with`) and the client-side
91/// connect (`connect_daemon_ipc`) so the two stay in sync.
92pub(crate) fn resolve_default_ipc_path() -> PathBuf {
93    if let Some(override_path) = std::env::var_os("CFGD_DAEMON_IPC_PATH") {
94        return PathBuf::from(override_path);
95    }
96    #[cfg(unix)]
97    {
98        crate::default_runtime_dir()
99            .map(|dir| dir.join(IPC_SOCKET_FILE))
100            .unwrap_or_else(|| PathBuf::from("/tmp/cfgd.sock"))
101    }
102    #[cfg(windows)]
103    {
104        PathBuf::from(WINDOWS_PIPE_PATH)
105    }
106}
107const DEFAULT_RECONCILE_SECS: u64 = 300; // 5m
108const DEFAULT_SYNC_SECS: u64 = 300; // 5m
109#[cfg(unix)]
110const LAUNCHD_LABEL: &str = "com.cfgd.daemon";
111#[cfg(unix)]
112const LAUNCHD_AGENTS_DIR: &str = "Library/LaunchAgents";
113#[cfg(unix)]
114const SYSTEMD_USER_DIR: &str = ".config/systemd/user";
115
116// --- Sync Task ---
117
118pub(super) struct SyncTask {
119    source_name: String,
120    repo_path: PathBuf,
121    auto_pull: bool,
122    auto_push: bool,
123    auto_apply: bool,
124    interval: Duration,
125    last_synced: Option<Instant>,
126    /// When true, verify commit signatures after pull (source requires it).
127    require_signed_commits: bool,
128    /// When true, skip signature verification (global allow-unsigned).
129    allow_unsigned: bool,
130}
131
132// --- Reconcile Task (per-module or default) ---
133
134pub(super) struct ReconcileTask {
135    /// Module name, or `"__default__"` for non-patched resources.
136    entity: String,
137    interval: Duration,
138    auto_apply: bool,
139    drift_policy: config::DriftPolicy,
140    last_reconciled: Option<Instant>,
141}
142
143// --- Per-source status ---
144
145#[derive(Debug, Clone, Serialize, Deserialize)]
146#[serde(rename_all = "camelCase")]
147pub struct SourceStatus {
148    pub name: String,
149    pub last_sync: Option<String>,
150    pub last_reconcile: Option<String>,
151    pub drift_count: u32,
152    pub status: String,
153}
154
155// --- Shared Daemon State ---
156
157#[derive(Debug, Clone, Serialize, Deserialize)]
158#[serde(rename_all = "camelCase")]
159pub struct DaemonStatusResponse {
160    pub running: bool,
161    pub pid: u32,
162    pub uptime_secs: u64,
163    pub last_reconcile: Option<String>,
164    pub last_sync: Option<String>,
165    pub drift_count: u32,
166    pub sources: Vec<SourceStatus>,
167    #[serde(skip_serializing_if = "Option::is_none")]
168    pub update_available: Option<String>,
169    #[serde(default, skip_serializing_if = "Vec::is_empty")]
170    pub module_reconcile: Vec<ModuleReconcileStatus>,
171}
172
173#[derive(Debug, Clone, Serialize, Deserialize)]
174#[serde(rename_all = "camelCase")]
175pub struct ModuleReconcileStatus {
176    pub name: String,
177    pub interval: String,
178    pub auto_apply: bool,
179    pub drift_policy: String,
180    pub last_reconcile: Option<String>,
181}
182
183pub(super) struct DaemonState {
184    started_at: Instant,
185    last_reconcile: Option<String>,
186    last_sync: Option<String>,
187    drift_count: u32,
188    sources: Vec<SourceStatus>,
189    update_available: Option<String>,
190    module_last_reconcile: HashMap<String, String>,
191    // State DB path the `/drift` endpoint should read. `None` means "no store"
192    // (used in tests so endpoint returns empty events without touching the
193    // user's real `~/.local/share/cfgd/state.db`).
194    store_path: Option<PathBuf>,
195}
196
197impl DaemonState {
198    fn new() -> Self {
199        Self {
200            started_at: Instant::now(),
201            last_reconcile: None,
202            last_sync: None,
203            drift_count: 0,
204            sources: vec![SourceStatus {
205                name: "local".to_string(),
206                last_sync: None,
207                last_reconcile: None,
208                drift_count: 0,
209                status: "active".to_string(),
210            }],
211            update_available: None,
212            module_last_reconcile: HashMap::new(),
213            store_path: None,
214        }
215    }
216
217    fn with_store_path(mut self, path: PathBuf) -> Self {
218        self.store_path = Some(path);
219        self
220    }
221
222    #[cfg(test)]
223    pub(super) fn store_path_for_test(&self) -> Option<&Path> {
224        self.store_path.as_deref()
225    }
226
227    fn to_response(&self) -> DaemonStatusResponse {
228        DaemonStatusResponse {
229            running: true,
230            pid: std::process::id(),
231            uptime_secs: self.started_at.elapsed().as_secs(),
232            last_reconcile: self.last_reconcile.clone(),
233            last_sync: self.last_sync.clone(),
234            drift_count: self.drift_count,
235            sources: self.sources.clone(),
236            update_available: self.update_available.clone(),
237            module_reconcile: vec![],
238        }
239    }
240}
241
242// --- Notifier ---
243
244pub(super) struct Notifier {
245    method: NotifyMethod,
246    webhook_url: Option<String>,
247}
248
249impl Notifier {
250    fn new(method: NotifyMethod, webhook_url: Option<String>) -> Self {
251        Self {
252            method,
253            webhook_url,
254        }
255    }
256
257    fn notify(&self, title: &str, message: &str) {
258        match self.method {
259            NotifyMethod::Desktop => self.notify_desktop(title, message),
260            NotifyMethod::Stdout => self.notify_stdout(title, message),
261            NotifyMethod::Webhook => self.notify_webhook(title, message),
262        }
263    }
264
265    fn notify_desktop(&self, title: &str, message: &str) {
266        match notify_rust::Notification::new()
267            .summary(title)
268            .body(message)
269            .appname("cfgd")
270            .show()
271        {
272            Ok(_) => tracing::debug!(title = %title, "desktop notification sent"),
273            Err(e) => {
274                tracing::warn!(error = %e, "desktop notification failed, falling back to stdout");
275                self.notify_stdout(title, message);
276            }
277        }
278    }
279
280    fn notify_stdout(&self, title: &str, message: &str) {
281        tracing::info!(title = %title, message = %message, "notification");
282    }
283
284    fn notify_webhook(&self, title: &str, message: &str) {
285        let Some(ref url) = self.webhook_url else {
286            tracing::warn!("webhook notification requested but no webhook-url configured");
287            return;
288        };
289
290        let url = url.clone();
291        let body = build_webhook_payload(title, message, &crate::utc_now_iso8601());
292
293        // Run webhook POST via spawn_blocking (uses tokio's bounded threadpool)
294        tokio::task::spawn_blocking(move || {
295            match crate::http::http_agent(crate::http::HTTP_WEBHOOK_TIMEOUT)
296                .post(&url)
297                .set("Content-Type", "application/json")
298                .send_string(&body)
299            {
300                Ok(_) => tracing::debug!(url = %url, "webhook notification sent"),
301                Err(e) => tracing::warn!(error = %e, "webhook notification failed"),
302            }
303        });
304    }
305}
306/// Build the JSON payload posted by `Notifier::notify_webhook`. Split out so
307/// the schema is testable without spawning a tokio thread or hitting the
308/// network. The `timestamp_iso` is injected so tests get deterministic
309/// output rather than `utc_now_iso8601()` at call time.
310pub(super) fn build_webhook_payload(title: &str, message: &str, timestamp_iso: &str) -> String {
311    serde_json::json!({
312        "event": title,
313        "message": message,
314        "timestamp": timestamp_iso,
315        "source": "cfgd",
316    })
317    .to_string()
318}
319
320// --- Submodule declarations ---
321
322mod checkin;
323mod daemon_config;
324mod drift;
325mod git;
326mod health_ipc;
327mod reconcile;
328mod runner;
329mod service;
330mod sync;
331
332#[cfg(test)]
333mod tests;
334
335use checkin::*;
336use daemon_config::*;
337// drift::* exposes record_file_drift{,_to} — the wildcard re-exports them
338// for future tick handlers; today only direct `super::record_file_drift`
339// call-sites in reconcile.rs use them, so the parent-scope wildcard
340// appears unused under -D warnings.
341#[allow(unused_imports)]
342use drift::*;
343use git::*;
344use health_ipc::*;
345use reconcile::*;
346use runner::*;
347// service::* contains cfg-gated launchd/systemd/windows wrappers — the parent
348// wildcard appears unused on the platform that DOESN'T match its arm. Keep
349// the import live across all platforms so the cross-platform call sites
350// (install_service/uninstall_service/run_as_windows_service) compile uniformly.
351#[allow(unused_imports)]
352use service::*;
353// sync::* exposes handle_sync / handle_version_check / handle_compliance_snapshot;
354// the public re-exports point at them through `pub use`, but the wildcard at
355// this scope keeps direct super::handle_* call sites in runner.rs compiling
356// even when no other submodule path imports them.
357#[allow(unused_imports)]
358use sync::*;
359
360// --- Public re-exports (preserve crate::daemon::<name> API) ---
361
362pub use git::git_pull_sync;
363pub use health_ipc::query_daemon_status;
364pub use service::{install_service, run_as_windows_service, uninstall_service};
365
366// --- Pre-loop setup (synchronous; pulled out so the SETUP arms are unit-testable) ---
367
368/// Bundle of values built up from config + profile resolution before the
369/// daemon loop spawns its watcher, health server, and timer pumps.
370///
371/// Constructed by [`build_pre_loop_setup`]. Tests can drive that function
372/// against tempdir fixtures and assert on the populated fields without the
373/// rest of `run_daemon`'s side-effect machinery (mpsc pumps, signal handlers,
374/// Unix socket binds, network startup check-ins).
375pub(super) struct PreLoopSetup {
376    pub cfg: CfgdConfig,
377    pub parsed: ParsedDaemonConfig,
378    pub notifier: Arc<Notifier>,
379    pub compliance_config: Option<config::ComplianceConfig>,
380    pub compliance_interval: Option<Duration>,
381    pub config_dir: PathBuf,
382    pub sync_tasks: Vec<SyncTask>,
383    pub initial_source_status: Vec<SourceStatus>,
384    pub managed_paths: Vec<PathBuf>,
385    pub reconcile_tasks: Vec<ReconcileTask>,
386    pub shortest_reconcile: Duration,
387    pub shortest_sync: Duration,
388    pub server_checkin_url: Option<String>,
389}
390
391/// Build everything `run_daemon` needs before it starts spawning tasks.
392///
393/// This is purely synchronous: config load + profile resolution + pure
394/// helpers from `daemon_config`, `checkin`, and `reconcile` submodules. No
395/// sockets, no spawned tasks, no network. Production callers use this from
396/// `run_daemon`; tests use it to exercise the SETUP arms directly.
397pub(super) fn build_pre_loop_setup(
398    config_path: &Path,
399    profile_override: Option<&str>,
400    hooks: &dyn DaemonHooks,
401) -> Result<PreLoopSetup> {
402    let cfg = config::load_config(config_path)?;
403    let daemon_cfg = cfg.spec.daemon.clone().unwrap_or(config::DaemonConfig {
404        enabled: true,
405        reconcile: None,
406        sync: None,
407        notify: None,
408        windows_event_log: false,
409    });
410    let parsed = parse_daemon_config(&daemon_cfg);
411    let notifier = Arc::new(Notifier::new(
412        parsed.notify_method.clone(),
413        parsed.webhook_url.clone(),
414    ));
415
416    let compliance_config = cfg.spec.compliance.clone();
417    let compliance_interval = compliance_config
418        .as_ref()
419        .filter(|c| c.enabled)
420        .and_then(|c| crate::parse_duration_str(&c.interval).ok());
421
422    let config_dir = config_path
423        .parent()
424        .unwrap_or_else(|| Path::new("."))
425        .to_path_buf();
426    let allow_unsigned = cfg.spec.security.as_ref().is_some_and(|s| s.allow_unsigned);
427
428    let source_cache_dir = crate::sources::SourceManager::default_cache_dir()
429        .unwrap_or_else(|_| config_dir.join(".cfgd-sources"));
430    let sync_tasks = build_sync_tasks(
431        &config_dir,
432        &parsed,
433        &cfg.spec.sources,
434        allow_unsigned,
435        &source_cache_dir,
436        |source_dir| {
437            crate::sources::detect_source_manifest(source_dir)
438                .ok()
439                .flatten()
440                .map(|m| m.spec.policy.constraints.require_signed_commits)
441        },
442    );
443    let initial_source_status = build_initial_source_status(&cfg.spec.sources);
444
445    let managed_paths = discover_managed_paths(config_path, profile_override, hooks);
446
447    let profiles_dir = config_dir.join("profiles");
448    let profile_name = profile_override
449        .or(cfg.spec.profile.as_deref())
450        .unwrap_or("default");
451    let resolved_profile = config::resolve_profile(profile_name, &profiles_dir).ok();
452    let profile_chain: Vec<String> = resolved_profile
453        .as_ref()
454        .map(|r| r.layers.iter().map(|l| l.profile_name.clone()).collect())
455        .unwrap_or_else(|| vec![profile_name.to_string()]);
456    let chain_refs: Vec<&str> = profile_chain.iter().map(|s| s.as_str()).collect();
457    let reconcile_tasks = build_reconcile_tasks(
458        &daemon_cfg,
459        resolved_profile.as_ref(),
460        &chain_refs,
461        parsed.reconcile_interval,
462        parsed.auto_apply,
463    );
464
465    let shortest_reconcile = reconcile_tasks
466        .iter()
467        .map(|t| t.interval)
468        .min()
469        .unwrap_or(parsed.reconcile_interval);
470    let shortest_sync = sync_tasks
471        .iter()
472        .map(|t| t.interval)
473        .min()
474        .unwrap_or(parsed.sync_interval);
475
476    let server_checkin_url = find_server_url(&cfg);
477
478    Ok(PreLoopSetup {
479        cfg,
480        parsed,
481        notifier,
482        compliance_config,
483        compliance_interval,
484        config_dir,
485        sync_tasks,
486        initial_source_status,
487        managed_paths,
488        reconcile_tasks,
489        shortest_reconcile,
490        shortest_sync,
491        server_checkin_url,
492    })
493}
494
495// --- Main Daemon Entry Point ---
496
497pub async fn run_daemon(
498    config_path: PathBuf,
499    profile_override: Option<String>,
500    printer: Arc<Printer>,
501    hooks: Arc<dyn DaemonHooks>,
502) -> Result<()> {
503    run_daemon_with(
504        config_path,
505        profile_override,
506        printer,
507        hooks,
508        DaemonRunOverrides::default(),
509    )
510    .await
511}
512
513/// Test-shaped knobs for [`run_daemon_with`]. Production callers go through
514/// [`run_daemon`] which uses `DaemonRunOverrides::default()` and matches the
515/// pre-refactor behaviour byte-for-byte. Tests set the fields they need to
516/// bypass real-world side effects:
517///
518/// * `ipc_path` — point the health socket / already-running check at a
519///   tempdir so concurrent tests don't fight over the per-user runtime
520///   socket resolved by [`resolve_default_ipc_path`].
521/// * `state_dir_override` — redirect both the `DaemonState` store path and
522///   the per-tick `handle_reconcile` / `handle_compliance_snapshot` state
523///   dir to a tempdir so the real `~/.local/share/cfgd/` is never touched.
524/// * `skip_health_server` — don't spawn the HTTP/IPC health server. Useful
525///   when a test doesn't need `/healthz` or `/drift` and wants to avoid the
526///   socket bind entirely.
527/// * `skip_startup_checkin` — even if the parsed config has a Server origin,
528///   suppress the startup `try_server_checkin` call. Keeps tests offline.
529/// * `external_triggers` — when supplied, the function bypasses all
530///   real-world trigger sources (file_watcher, interval pumps, SIGHUP /
531///   SIGINT / SIGTERM handlers) and drives the loop entirely from the
532///   provided receivers. The test owns the matching senders and pushes
533///   events to drive specific arms in `run_daemon_loop`.
534#[derive(Default)]
535pub(super) struct DaemonRunOverrides {
536    pub ipc_path: Option<PathBuf>,
537    pub state_dir_override: Option<PathBuf>,
538    pub skip_health_server: bool,
539    pub skip_startup_checkin: bool,
540    pub(in crate::daemon) external_triggers: Option<DaemonTriggers>,
541}
542
543/// Bundle of trigger receivers + the task handles that feed them. Production
544/// callers build this from spawned pumps + signal handlers; tests build it
545/// from externally-owned senders with `pump` / `shutdown_task` fields left
546/// `None`. Lives in `run_daemon_with` only — not exposed.
547struct TriggerSetup {
548    triggers: DaemonTriggers,
549    reconcile_pump: Option<tokio::task::JoinHandle<()>>,
550    sync_pump: Option<tokio::task::JoinHandle<()>>,
551    version_check_pump: Option<tokio::task::JoinHandle<()>>,
552    compliance_pump: Option<tokio::task::JoinHandle<()>>,
553    sighup_pump: Option<tokio::task::JoinHandle<()>>,
554    shutdown_task: Option<tokio::task::JoinHandle<()>>,
555}
556
557pub(super) async fn run_daemon_with(
558    config_path: PathBuf,
559    profile_override: Option<String>,
560    printer: Arc<Printer>,
561    hooks: Arc<dyn DaemonHooks>,
562    overrides: DaemonRunOverrides,
563) -> Result<()> {
564    printer.heading("Daemon");
565    printer.status_simple(Role::Info, "Starting cfgd daemon...");
566
567    let setup = build_pre_loop_setup(&config_path, profile_override.as_deref(), &*hooks)?;
568
569    let (daemon_state, state_dir_warning) =
570        init_daemon_state_with_warning(overrides.state_dir_override.as_deref());
571    if let Some(msg) = state_dir_warning {
572        printer.status_simple(Role::Warn, msg);
573    }
574    let state = Arc::new(Mutex::new(daemon_state));
575
576    // Initialize per-source status entries
577    {
578        let mut st = state.lock().await;
579        st.sources.extend(setup.initial_source_status.clone());
580    }
581
582    // External-triggers mode supplies its own file_rx; production wires up a
583    // notify-based watcher and pushes via file_tx → file_rx.
584    let using_external_triggers = overrides.external_triggers.is_some();
585    let (file_rx_for_triggers, _watcher_handle): (
586        Option<mpsc::Receiver<PathBuf>>,
587        Option<notify::RecommendedWatcher>,
588    ) = if using_external_triggers {
589        (None, None)
590    } else {
591        let (file_tx, file_rx) = mpsc::channel::<PathBuf>(256);
592        let watcher = setup_file_watcher(file_tx, &setup.managed_paths, &setup.config_dir)?;
593        (Some(file_rx), Some(watcher))
594    };
595
596    let ipc_path = overrides
597        .ipc_path
598        .clone()
599        .unwrap_or_else(resolve_default_ipc_path);
600    check_already_running(&ipc_path)?;
601
602    // Start health server (skippable in tests that don't need /healthz).
603    let health_handle = if overrides.skip_health_server {
604        None
605    } else {
606        let health_state = Arc::clone(&state);
607        let health_ipc_path = ipc_path.to_string_lossy().to_string();
608        Some(tokio::spawn(async move {
609            if let Err(e) = run_health_server(&health_ipc_path, health_state).await {
610                tracing::error!(error = %e, "health server error");
611            }
612        }))
613    };
614
615    let intervals = format_interval_lines(&setup.parsed, setup.compliance_interval);
616    print_startup_banner(&printer, &intervals, &ipc_path.to_string_lossy());
617
618    // Initial server check-in at startup (skippable for offline tests).
619    if setup.server_checkin_url.is_some() && !overrides.skip_startup_checkin {
620        let startup_cfg = setup.cfg.clone();
621        let startup_config_path = config_path.clone();
622        let startup_profile_override = profile_override.clone();
623        tokio::task::spawn_blocking(move || {
624            run_startup_checkin_blocking(
625                &startup_config_path,
626                startup_profile_override.as_deref(),
627                &startup_cfg,
628            );
629        })
630        .await
631        .map_err(|e| DaemonError::WatchError {
632            message: format!("startup check-in task failed: {}", e),
633        })?;
634    }
635
636    // Shared atomics: SIGHUP updates these so pump tasks pick up the new
637    // cadence on the next tick. (See `runner::apply_sighup_reload`.)
638    let reconcile_secs = Arc::new(std::sync::atomic::AtomicU64::new(
639        setup.shortest_reconcile.as_secs(),
640    ));
641    let sync_secs = Arc::new(std::sync::atomic::AtomicU64::new(
642        setup.shortest_sync.as_secs(),
643    ));
644
645    // Build the triggers + spawn the production pumps/signal handlers, OR
646    // adopt the externally-supplied triggers verbatim. The cleanup path at
647    // the bottom only aborts what was actually spawned.
648    let TriggerSetup {
649        triggers,
650        reconcile_pump,
651        sync_pump,
652        version_check_pump,
653        compliance_pump,
654        sighup_pump,
655        shutdown_task,
656    } = if let Some(t) = overrides.external_triggers {
657        TriggerSetup {
658            triggers: t,
659            reconcile_pump: None,
660            sync_pump: None,
661            version_check_pump: None,
662            compliance_pump: None,
663            sighup_pump: None,
664            shutdown_task: None,
665        }
666    } else {
667        let (reconcile_tx, reconcile_rx) = mpsc::channel::<()>(8);
668        let (sync_tx, sync_rx) = mpsc::channel::<()>(8);
669        let (version_check_tx, version_check_rx) = mpsc::channel::<()>(8);
670        let (compliance_tx, compliance_rx) = mpsc::channel::<()>(8);
671        let (sighup_tx, sighup_rx) = mpsc::channel::<()>(8);
672
673        let reconcile_pump = spawn_interval_pump(Arc::clone(&reconcile_secs), reconcile_tx);
674        let sync_pump = spawn_interval_pump(Arc::clone(&sync_secs), sync_tx);
675
676        let version_check_secs = Arc::new(std::sync::atomic::AtomicU64::new(
677            crate::upgrade::version_check_interval().as_secs(),
678        ));
679        let version_check_pump = spawn_interval_pump(version_check_secs, version_check_tx);
680
681        let compliance_pump = setup.compliance_interval.map(|d| {
682            let secs = Arc::new(std::sync::atomic::AtomicU64::new(d.as_secs()));
683            spawn_interval_pump(secs, compliance_tx)
684        });
685
686        #[cfg(unix)]
687        let sighup_pump = Some(spawn_sighup_pump(sighup_tx)?);
688        #[cfg(not(unix))]
689        let sighup_pump: Option<tokio::task::JoinHandle<()>> = {
690            let _ = sighup_tx; // suppress unused warning on Windows
691            None
692        };
693
694        let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel::<()>();
695        let shutdown_printer = Arc::clone(&printer);
696        let shutdown_task = tokio::spawn(async move {
697            wait_for_shutdown(shutdown_printer).await;
698            let _ = shutdown_tx.send(());
699        });
700
701        let file_rx = file_rx_for_triggers.ok_or_else(|| DaemonError::WatchError {
702            message: "internal: production path did not initialise file watcher".to_string(),
703        })?;
704        TriggerSetup {
705            triggers: DaemonTriggers {
706                file_rx,
707                reconcile_rx,
708                sync_rx,
709                version_check_rx,
710                compliance_rx,
711                sighup_rx,
712                shutdown_rx,
713            },
714            reconcile_pump: Some(reconcile_pump),
715            sync_pump: Some(sync_pump),
716            version_check_pump: Some(version_check_pump),
717            compliance_pump,
718            sighup_pump,
719            shutdown_task: Some(shutdown_task),
720        }
721    };
722
723    let ctx = DaemonLoopContext {
724        state: Arc::clone(&state),
725        hooks: Arc::clone(&hooks),
726        notifier: Arc::clone(&setup.notifier),
727        config_path: config_path.clone(),
728        profile_override: profile_override.clone(),
729        on_change_reconcile: setup.parsed.on_change_reconcile,
730        notify_on_drift: setup.parsed.notify_on_drift,
731        compliance_config: setup.compliance_config.clone(),
732        printer: Arc::clone(&printer),
733        state_dir_override: overrides.state_dir_override.clone(),
734    };
735
736    let loop_result = run_daemon_loop(
737        ctx,
738        triggers,
739        setup.reconcile_tasks,
740        setup.sync_tasks,
741        reconcile_secs,
742        sync_secs,
743    )
744    .await;
745
746    // Shut down whatever the trigger-builder block actually spawned.
747    if let Some(h) = reconcile_pump {
748        h.abort();
749    }
750    if let Some(h) = sync_pump {
751        h.abort();
752    }
753    if let Some(h) = version_check_pump {
754        h.abort();
755    }
756    if let Some(h) = compliance_pump {
757        h.abort();
758    }
759    if let Some(h) = sighup_pump {
760        h.abort();
761    }
762    if let Some(h) = shutdown_task {
763        h.abort();
764    }
765
766    // Shutdown health server (only present when not skipped).
767    if let Some(h) = health_handle {
768        h.abort();
769        // Drain the cancellation; the JoinError is always Cancelled here
770        // (we just sent abort), nothing actionable to surface.
771        let _ = h.await;
772    }
773    cleanup_ipc_socket(&ipc_path);
774
775    printer.status_simple(Role::Ok, "Daemon stopped");
776    loop_result
777}
778
779/// Initialize a fresh `DaemonState`, attaching the state-DB path when one can
780/// be resolved. When the platform default state dir is unavailable, the
781/// returned state has no store path (the `/drift` IPC endpoint will return
782/// empty events rather than crash). The `override_dir` parameter exists for
783/// tests: passing `Some(dir)` skips the platform lookup entirely.
784///
785/// Test-only convenience that drops the warning string —
786/// `init_daemon_state_with_warning` is the one used by `run_daemon_with`.
787#[cfg(test)]
788pub(super) fn init_daemon_state(override_dir: Option<&Path>) -> DaemonState {
789    init_daemon_state_with_warning(override_dir).0
790}
791
792/// Like [`init_daemon_state`] but also returns a printer-facing warning
793/// message when the platform default state dir resolution fails — callers
794/// can surface it in the startup banner so operators aren't dependent on
795/// catching the `tracing::warn!` line.
796pub(super) fn init_daemon_state_with_warning(
797    override_dir: Option<&Path>,
798) -> (DaemonState, Option<String>) {
799    let dir_result = override_dir
800        .map(|d| Ok(d.to_path_buf()))
801        .unwrap_or_else(crate::state::default_state_dir);
802    match dir_result {
803        Ok(dir) => (
804            DaemonState::new().with_store_path(dir.join("state.db")),
805            None,
806        ),
807        Err(e) => {
808            tracing::warn!(error = %e, "cannot resolve default state dir; /drift endpoint disabled");
809            let banner = format!("Drift endpoint disabled: cannot resolve default state dir ({e})");
810            (DaemonState::new(), Some(banner))
811        }
812    }
813}
814
815/// Verify no other cfgd daemon is reachable via the IPC endpoint. Returns
816/// `Err(AlreadyRunning)` if a connect succeeds; clears a stale socket file
817/// (Unix) otherwise. On Windows, falls back to the shared
818/// `connect_daemon_ipc()` probe and ignores `_ipc_path` — named pipes are
819/// kernel objects with no on-disk cleanup.
820pub(super) fn check_already_running(_ipc_path: &Path) -> Result<()> {
821    #[cfg(unix)]
822    {
823        if _ipc_path.exists() {
824            if StdUnixStream::connect(_ipc_path).is_ok() {
825                return Err(DaemonError::AlreadyRunning {
826                    pid: std::process::id(),
827                }
828                .into());
829            }
830            // Stale socket from crashed daemon — remove it
831            let _ = std::fs::remove_file(_ipc_path);
832        }
833    }
834    #[cfg(windows)]
835    {
836        if connect_daemon_ipc().is_some() {
837            return Err(DaemonError::AlreadyRunning {
838                pid: std::process::id(),
839            }
840            .into());
841        }
842    }
843    Ok(())
844}
845
846/// Build the "Intervals: ..." line components for the startup banner. Returns
847/// a vector of `key=value` segments the printer joins with `, `. Sync and
848/// compliance lines are conditional; reconcile is always present.
849pub(super) fn format_interval_lines(
850    parsed: &ParsedDaemonConfig,
851    compliance_interval: Option<Duration>,
852) -> Vec<String> {
853    let mut intervals = vec![format!(
854        "reconcile={}s",
855        parsed.reconcile_interval.as_secs()
856    )];
857    if parsed.auto_pull || parsed.auto_push {
858        intervals.push(format!(
859            "sync={}s (pull={}, push={})",
860            parsed.sync_interval.as_secs(),
861            parsed.auto_pull,
862            parsed.auto_push
863        ));
864    }
865    if let Some(interval) = compliance_interval {
866        intervals.push(format!("compliance={}s", interval.as_secs()));
867    }
868    intervals
869}
870
871/// Emit the three-line startup banner: health endpoint, interval summary,
872/// run hint. Pure-output; testable via `Printer::for_test_at(Verbosity::Normal)`
873/// (Quiet suppresses Ok/Info statuses).
874pub(super) fn print_startup_banner(printer: &Printer, intervals: &[String], ipc_path: &str) {
875    printer.status_simple(Role::Ok, format!("Health: {}", ipc_path));
876    printer.status_simple(Role::Ok, format!("Intervals: {}", intervals.join(", ")));
877    printer.status_simple(Role::Info, "Daemon running — press Ctrl+C to stop");
878}
879
880/// Synchronous body of the startup server check-in. Resolves the profile,
881/// posts the check-in payload, and clears any pending server config so the
882/// first reconcile tick picks it up. Extracted from the `spawn_blocking`
883/// closure so tests can drive the no-profile and resolve-failure arms without
884/// scheduling onto a tokio runtime.
885pub(super) fn run_startup_checkin_blocking(
886    config_path: &Path,
887    profile_override: Option<&str>,
888    cfg: &CfgdConfig,
889) {
890    let config_dir = config_path
891        .parent()
892        .unwrap_or_else(|| Path::new("."))
893        .to_path_buf();
894    let profiles_dir = config_dir.join("profiles");
895    let profile_name = match profile_override.or(cfg.spec.profile.as_deref()) {
896        Some(p) => p,
897        None => {
898            tracing::error!("no profile configured — skipping reconciliation");
899            return;
900        }
901    };
902    match config::resolve_profile(profile_name, &profiles_dir) {
903        Ok(resolved) => {
904            let changed = try_server_checkin(cfg, &resolved);
905            if changed {
906                tracing::info!("server reports config changed at startup");
907            }
908            // Consume any pending server config at startup so the first
909            // reconcile tick picks up the changes.
910            match crate::state::load_pending_server_config() {
911                Ok(Some(_pending)) => {
912                    tracing::info!(
913                        "startup: found pending server config — first reconcile will apply it"
914                    );
915                    if let Err(e) = crate::state::clear_pending_server_config() {
916                        tracing::warn!(error = %e, "startup: failed to clear pending server config");
917                    }
918                }
919                Ok(None) => {}
920                Err(e) => {
921                    tracing::warn!(error = %e, "startup: failed to load pending server config");
922                }
923            }
924        }
925        Err(e) => {
926            tracing::warn!(error = %e, "startup check-in: failed to resolve profile");
927        }
928    }
929}
930
931/// Remove the daemon's IPC socket file at shutdown. No-op on Windows (named
932/// pipes are kernel objects with no on-disk artifact).
933#[allow(unused_variables)]
934pub(super) fn cleanup_ipc_socket(ipc_path: &Path) {
935    #[cfg(unix)]
936    {
937        if ipc_path.exists() {
938            let _ = std::fs::remove_file(ipc_path);
939        }
940    }
941}
942
943// --- Pump / shutdown task helpers ---
944
945/// Spawn a task that pumps fixed-cadence ticks into `tx`. The interval is read
946/// from `interval_secs` before every sleep, so SIGHUP-driven updates take
947/// effect on the next iteration. Aborting the returned handle stops the pump.
948fn spawn_interval_pump(
949    interval_secs: Arc<std::sync::atomic::AtomicU64>,
950    tx: mpsc::Sender<()>,
951) -> tokio::task::JoinHandle<()> {
952    tokio::spawn(async move {
953        loop {
954            let secs = interval_secs
955                .load(std::sync::atomic::Ordering::Relaxed)
956                .max(1);
957            tokio::time::sleep(Duration::from_secs(secs)).await;
958            if tx.send(()).await.is_err() {
959                break;
960            }
961        }
962    })
963}
964
965/// Spawn a task that pushes `()` to `tx` on every SIGHUP. Unix only.
966#[cfg(unix)]
967fn spawn_sighup_pump(tx: mpsc::Sender<()>) -> Result<tokio::task::JoinHandle<()>> {
968    let mut signal = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::hangup())
969        .map_err(|e| DaemonError::WatchError {
970            message: format!("failed to register SIGHUP handler: {}", e),
971        })?;
972    Ok(tokio::spawn(async move {
973        while signal.recv().await.is_some() {
974            if tx.send(()).await.is_err() {
975                break;
976            }
977        }
978    }))
979}
980
981/// Wait for SIGTERM (Unix) or Ctrl+C (any platform) and print the matching
982/// shutdown message. Returns when either fires.
983async fn wait_for_shutdown(printer: Arc<Printer>) {
984    #[cfg(unix)]
985    {
986        let sigterm = async {
987            match tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate()) {
988                Ok(mut s) => {
989                    s.recv().await;
990                }
991                Err(e) => {
992                    tracing::warn!(error = %e, "failed to register SIGTERM handler");
993                    std::future::pending::<()>().await;
994                }
995            }
996        };
997        tokio::select! {
998            _ = sigterm => {
999                printer.status_simple(Role::Info, "Received SIGTERM, shutting down daemon...");
1000            }
1001            _ = tokio::signal::ctrl_c() => {
1002                printer.status_simple(Role::Info, "Shutting down daemon...");
1003            }
1004        }
1005    }
1006    #[cfg(not(unix))]
1007    {
1008        let _ = tokio::signal::ctrl_c().await;
1009        printer.status_simple(Role::Info, "Shutting down daemon...");
1010    }
1011}
1012
1013// --- Helpers ---
1014
1015/// Module-local wrapper around [`crate::parse_duration_str`] that returns the
1016/// daemon's `DEFAULT_RECONCILE_SECS` (5 minutes) fallback when parsing fails.
1017///
1018/// Intentional duplication with `cfgd-operator::leader::parse_duration_secs`:
1019/// the two callers want different fallbacks (daemon reconcile loop default vs.
1020/// leader-election lease-window default), so a single shared helper with a
1021/// parameterised default would just push the default decision back to every
1022/// call site without saving any code. Kept local and documented per
1023/// dedup-audit S1 (decision: keep + document).
1024pub(crate) fn parse_duration_or_default(s: &str) -> Duration {
1025    crate::parse_duration_str(s).unwrap_or(Duration::from_secs(DEFAULT_RECONCILE_SECS))
1026}