use std::collections::{HashMap, HashSet};
use std::io::{BufRead, BufReader, Write as IoWrite};
#[cfg(unix)]
use std::os::unix::net::UnixStream as StdUnixStream;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::{Duration, Instant};
use notify::{Event, EventKind, RecommendedWatcher, RecursiveMode, Watcher};
use serde::{Deserialize, Serialize};
use tokio::io::{AsyncBufReadExt, AsyncWriteExt};
#[cfg(unix)]
use tokio::net::UnixListener;
use tokio::sync::{Mutex, mpsc};
use crate::config::{
self, AutoApplyPolicyConfig, CfgdConfig, MergedProfile, NotifyMethod, OriginType, PolicyAction,
ResolvedProfile,
};
use crate::errors::{DaemonError, Result};
use crate::output::{Printer, Role};
use crate::providers::{FileAction, PackageAction, PackageManager, ProviderRegistry};
use crate::state::StateStore;
pub trait DaemonHooks: Send + Sync {
fn build_registry(&self, config: &CfgdConfig) -> ProviderRegistry;
fn plan_files(&self, config_dir: &Path, resolved: &ResolvedProfile) -> Result<Vec<FileAction>>;
fn plan_packages(
&self,
profile: &MergedProfile,
managers: &[&dyn PackageManager],
) -> Result<Vec<PackageAction>>;
fn extend_registry_custom_managers(
&self,
registry: &mut ProviderRegistry,
packages: &config::PackagesSpec,
);
fn expand_tilde(&self, path: &Path) -> PathBuf;
}
const DEBOUNCE_MS: u64 = 500;
#[cfg(unix)]
const IPC_SOCKET_FILE: &str = "cfgd.sock";
#[cfg(windows)]
const WINDOWS_PIPE_PATH: &str = r"\\.\pipe\cfgd";
pub(crate) fn resolve_default_ipc_path() -> PathBuf {
if let Some(override_path) = std::env::var_os("CFGD_DAEMON_IPC_PATH") {
return PathBuf::from(override_path);
}
#[cfg(unix)]
{
crate::default_runtime_dir()
.map(|dir| dir.join(IPC_SOCKET_FILE))
.unwrap_or_else(|| PathBuf::from("/tmp/cfgd.sock"))
}
#[cfg(windows)]
{
PathBuf::from(WINDOWS_PIPE_PATH)
}
}
const DEFAULT_RECONCILE_SECS: u64 = 300; const DEFAULT_SYNC_SECS: u64 = 300; #[cfg(unix)]
const LAUNCHD_LABEL: &str = "com.cfgd.daemon";
#[cfg(unix)]
const LAUNCHD_AGENTS_DIR: &str = "Library/LaunchAgents";
#[cfg(unix)]
const SYSTEMD_USER_DIR: &str = ".config/systemd/user";
pub(super) struct SyncTask {
source_name: String,
repo_path: PathBuf,
auto_pull: bool,
auto_push: bool,
auto_apply: bool,
interval: Duration,
last_synced: Option<Instant>,
require_signed_commits: bool,
allow_unsigned: bool,
}
pub(super) struct ReconcileTask {
entity: String,
interval: Duration,
auto_apply: bool,
drift_policy: config::DriftPolicy,
last_reconciled: Option<Instant>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct SourceStatus {
pub name: String,
pub last_sync: Option<String>,
pub last_reconcile: Option<String>,
pub drift_count: u32,
pub status: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct DaemonStatusResponse {
pub running: bool,
pub pid: u32,
pub uptime_secs: u64,
pub last_reconcile: Option<String>,
pub last_sync: Option<String>,
pub drift_count: u32,
pub sources: Vec<SourceStatus>,
#[serde(skip_serializing_if = "Option::is_none")]
pub update_available: Option<String>,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub module_reconcile: Vec<ModuleReconcileStatus>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct ModuleReconcileStatus {
pub name: String,
pub interval: String,
pub auto_apply: bool,
pub drift_policy: String,
pub last_reconcile: Option<String>,
}
pub(super) struct DaemonState {
started_at: Instant,
last_reconcile: Option<String>,
last_sync: Option<String>,
drift_count: u32,
sources: Vec<SourceStatus>,
update_available: Option<String>,
module_last_reconcile: HashMap<String, String>,
store_path: Option<PathBuf>,
}
impl DaemonState {
fn new() -> Self {
Self {
started_at: Instant::now(),
last_reconcile: None,
last_sync: None,
drift_count: 0,
sources: vec![SourceStatus {
name: "local".to_string(),
last_sync: None,
last_reconcile: None,
drift_count: 0,
status: "active".to_string(),
}],
update_available: None,
module_last_reconcile: HashMap::new(),
store_path: None,
}
}
fn with_store_path(mut self, path: PathBuf) -> Self {
self.store_path = Some(path);
self
}
#[cfg(test)]
pub(super) fn store_path_for_test(&self) -> Option<&Path> {
self.store_path.as_deref()
}
fn to_response(&self) -> DaemonStatusResponse {
DaemonStatusResponse {
running: true,
pid: std::process::id(),
uptime_secs: self.started_at.elapsed().as_secs(),
last_reconcile: self.last_reconcile.clone(),
last_sync: self.last_sync.clone(),
drift_count: self.drift_count,
sources: self.sources.clone(),
update_available: self.update_available.clone(),
module_reconcile: vec![],
}
}
}
pub(super) struct Notifier {
method: NotifyMethod,
webhook_url: Option<String>,
}
impl Notifier {
fn new(method: NotifyMethod, webhook_url: Option<String>) -> Self {
Self {
method,
webhook_url,
}
}
fn notify(&self, title: &str, message: &str) {
match self.method {
NotifyMethod::Desktop => self.notify_desktop(title, message),
NotifyMethod::Stdout => self.notify_stdout(title, message),
NotifyMethod::Webhook => self.notify_webhook(title, message),
}
}
fn notify_desktop(&self, title: &str, message: &str) {
match notify_rust::Notification::new()
.summary(title)
.body(message)
.appname("cfgd")
.show()
{
Ok(_) => tracing::debug!(title = %title, "desktop notification sent"),
Err(e) => {
tracing::warn!(error = %e, "desktop notification failed, falling back to stdout");
self.notify_stdout(title, message);
}
}
}
fn notify_stdout(&self, title: &str, message: &str) {
tracing::info!(title = %title, message = %message, "notification");
}
fn notify_webhook(&self, title: &str, message: &str) {
let Some(ref url) = self.webhook_url else {
tracing::warn!("webhook notification requested but no webhook-url configured");
return;
};
let url = url.clone();
let body = build_webhook_payload(title, message, &crate::utc_now_iso8601());
tokio::task::spawn_blocking(move || {
match crate::http::http_agent(crate::http::HTTP_WEBHOOK_TIMEOUT)
.post(&url)
.set("Content-Type", "application/json")
.send_string(&body)
{
Ok(_) => tracing::debug!(url = %url, "webhook notification sent"),
Err(e) => tracing::warn!(error = %e, "webhook notification failed"),
}
});
}
}
pub(super) fn build_webhook_payload(title: &str, message: &str, timestamp_iso: &str) -> String {
serde_json::json!({
"event": title,
"message": message,
"timestamp": timestamp_iso,
"source": "cfgd",
})
.to_string()
}
mod checkin;
mod daemon_config;
mod drift;
mod git;
mod health_ipc;
mod reconcile;
mod runner;
mod service;
mod sync;
#[cfg(test)]
mod tests;
use checkin::*;
use daemon_config::*;
#[allow(unused_imports)]
use drift::*;
use git::*;
use health_ipc::*;
use reconcile::*;
use runner::*;
#[allow(unused_imports)]
use service::*;
#[allow(unused_imports)]
use sync::*;
pub use git::git_pull_sync;
pub use health_ipc::query_daemon_status;
pub use service::{install_service, run_as_windows_service, uninstall_service};
pub(super) struct PreLoopSetup {
pub cfg: CfgdConfig,
pub parsed: ParsedDaemonConfig,
pub notifier: Arc<Notifier>,
pub compliance_config: Option<config::ComplianceConfig>,
pub compliance_interval: Option<Duration>,
pub config_dir: PathBuf,
pub sync_tasks: Vec<SyncTask>,
pub initial_source_status: Vec<SourceStatus>,
pub managed_paths: Vec<PathBuf>,
pub reconcile_tasks: Vec<ReconcileTask>,
pub shortest_reconcile: Duration,
pub shortest_sync: Duration,
pub server_checkin_url: Option<String>,
}
pub(super) fn build_pre_loop_setup(
config_path: &Path,
profile_override: Option<&str>,
hooks: &dyn DaemonHooks,
) -> Result<PreLoopSetup> {
let cfg = config::load_config(config_path)?;
let daemon_cfg = cfg.spec.daemon.clone().unwrap_or(config::DaemonConfig {
enabled: true,
reconcile: None,
sync: None,
notify: None,
windows_event_log: false,
});
let parsed = parse_daemon_config(&daemon_cfg);
let notifier = Arc::new(Notifier::new(
parsed.notify_method.clone(),
parsed.webhook_url.clone(),
));
let compliance_config = cfg.spec.compliance.clone();
let compliance_interval = compliance_config
.as_ref()
.filter(|c| c.enabled)
.and_then(|c| crate::parse_duration_str(&c.interval).ok());
let config_dir = config_path
.parent()
.unwrap_or_else(|| Path::new("."))
.to_path_buf();
let allow_unsigned = cfg.spec.security.as_ref().is_some_and(|s| s.allow_unsigned);
let source_cache_dir = crate::sources::SourceManager::default_cache_dir()
.unwrap_or_else(|_| config_dir.join(".cfgd-sources"));
let sync_tasks = build_sync_tasks(
&config_dir,
&parsed,
&cfg.spec.sources,
allow_unsigned,
&source_cache_dir,
|source_dir| {
crate::sources::detect_source_manifest(source_dir)
.ok()
.flatten()
.map(|m| m.spec.policy.constraints.require_signed_commits)
},
);
let initial_source_status = build_initial_source_status(&cfg.spec.sources);
let managed_paths = discover_managed_paths(config_path, profile_override, hooks);
let profiles_dir = config_dir.join("profiles");
let profile_name = profile_override
.or(cfg.spec.profile.as_deref())
.unwrap_or("default");
let resolved_profile = config::resolve_profile(profile_name, &profiles_dir).ok();
let profile_chain: Vec<String> = resolved_profile
.as_ref()
.map(|r| r.layers.iter().map(|l| l.profile_name.clone()).collect())
.unwrap_or_else(|| vec![profile_name.to_string()]);
let chain_refs: Vec<&str> = profile_chain.iter().map(|s| s.as_str()).collect();
let reconcile_tasks = build_reconcile_tasks(
&daemon_cfg,
resolved_profile.as_ref(),
&chain_refs,
parsed.reconcile_interval,
parsed.auto_apply,
);
let shortest_reconcile = reconcile_tasks
.iter()
.map(|t| t.interval)
.min()
.unwrap_or(parsed.reconcile_interval);
let shortest_sync = sync_tasks
.iter()
.map(|t| t.interval)
.min()
.unwrap_or(parsed.sync_interval);
let server_checkin_url = find_server_url(&cfg);
Ok(PreLoopSetup {
cfg,
parsed,
notifier,
compliance_config,
compliance_interval,
config_dir,
sync_tasks,
initial_source_status,
managed_paths,
reconcile_tasks,
shortest_reconcile,
shortest_sync,
server_checkin_url,
})
}
pub async fn run_daemon(
config_path: PathBuf,
profile_override: Option<String>,
printer: Arc<Printer>,
hooks: Arc<dyn DaemonHooks>,
) -> Result<()> {
run_daemon_with(
config_path,
profile_override,
printer,
hooks,
DaemonRunOverrides::default(),
)
.await
}
#[derive(Default)]
pub(super) struct DaemonRunOverrides {
pub ipc_path: Option<PathBuf>,
pub state_dir_override: Option<PathBuf>,
pub skip_health_server: bool,
pub skip_startup_checkin: bool,
pub(in crate::daemon) external_triggers: Option<DaemonTriggers>,
}
struct TriggerSetup {
triggers: DaemonTriggers,
reconcile_pump: Option<tokio::task::JoinHandle<()>>,
sync_pump: Option<tokio::task::JoinHandle<()>>,
version_check_pump: Option<tokio::task::JoinHandle<()>>,
compliance_pump: Option<tokio::task::JoinHandle<()>>,
sighup_pump: Option<tokio::task::JoinHandle<()>>,
shutdown_task: Option<tokio::task::JoinHandle<()>>,
}
pub(super) async fn run_daemon_with(
config_path: PathBuf,
profile_override: Option<String>,
printer: Arc<Printer>,
hooks: Arc<dyn DaemonHooks>,
overrides: DaemonRunOverrides,
) -> Result<()> {
printer.heading("Daemon");
printer.status_simple(Role::Info, "Starting cfgd daemon...");
let setup = build_pre_loop_setup(&config_path, profile_override.as_deref(), &*hooks)?;
let (daemon_state, state_dir_warning) =
init_daemon_state_with_warning(overrides.state_dir_override.as_deref());
if let Some(msg) = state_dir_warning {
printer.status_simple(Role::Warn, msg);
}
let state = Arc::new(Mutex::new(daemon_state));
{
let mut st = state.lock().await;
st.sources.extend(setup.initial_source_status.clone());
}
let using_external_triggers = overrides.external_triggers.is_some();
let (file_rx_for_triggers, _watcher_handle): (
Option<mpsc::Receiver<PathBuf>>,
Option<notify::RecommendedWatcher>,
) = if using_external_triggers {
(None, None)
} else {
let (file_tx, file_rx) = mpsc::channel::<PathBuf>(256);
let watcher = setup_file_watcher(file_tx, &setup.managed_paths, &setup.config_dir)?;
(Some(file_rx), Some(watcher))
};
let ipc_path = overrides
.ipc_path
.clone()
.unwrap_or_else(resolve_default_ipc_path);
check_already_running(&ipc_path)?;
let health_handle = if overrides.skip_health_server {
None
} else {
let health_state = Arc::clone(&state);
let health_ipc_path = ipc_path.to_string_lossy().to_string();
Some(tokio::spawn(async move {
if let Err(e) = run_health_server(&health_ipc_path, health_state).await {
tracing::error!(error = %e, "health server error");
}
}))
};
let intervals = format_interval_lines(&setup.parsed, setup.compliance_interval);
print_startup_banner(&printer, &intervals, &ipc_path.to_string_lossy());
if setup.server_checkin_url.is_some() && !overrides.skip_startup_checkin {
let startup_cfg = setup.cfg.clone();
let startup_config_path = config_path.clone();
let startup_profile_override = profile_override.clone();
tokio::task::spawn_blocking(move || {
run_startup_checkin_blocking(
&startup_config_path,
startup_profile_override.as_deref(),
&startup_cfg,
);
})
.await
.map_err(|e| DaemonError::WatchError {
message: format!("startup check-in task failed: {}", e),
})?;
}
let reconcile_secs = Arc::new(std::sync::atomic::AtomicU64::new(
setup.shortest_reconcile.as_secs(),
));
let sync_secs = Arc::new(std::sync::atomic::AtomicU64::new(
setup.shortest_sync.as_secs(),
));
let TriggerSetup {
triggers,
reconcile_pump,
sync_pump,
version_check_pump,
compliance_pump,
sighup_pump,
shutdown_task,
} = if let Some(t) = overrides.external_triggers {
TriggerSetup {
triggers: t,
reconcile_pump: None,
sync_pump: None,
version_check_pump: None,
compliance_pump: None,
sighup_pump: None,
shutdown_task: None,
}
} else {
let (reconcile_tx, reconcile_rx) = mpsc::channel::<()>(8);
let (sync_tx, sync_rx) = mpsc::channel::<()>(8);
let (version_check_tx, version_check_rx) = mpsc::channel::<()>(8);
let (compliance_tx, compliance_rx) = mpsc::channel::<()>(8);
let (sighup_tx, sighup_rx) = mpsc::channel::<()>(8);
let reconcile_pump = spawn_interval_pump(Arc::clone(&reconcile_secs), reconcile_tx);
let sync_pump = spawn_interval_pump(Arc::clone(&sync_secs), sync_tx);
let version_check_secs = Arc::new(std::sync::atomic::AtomicU64::new(
crate::upgrade::version_check_interval().as_secs(),
));
let version_check_pump = spawn_interval_pump(version_check_secs, version_check_tx);
let compliance_pump = setup.compliance_interval.map(|d| {
let secs = Arc::new(std::sync::atomic::AtomicU64::new(d.as_secs()));
spawn_interval_pump(secs, compliance_tx)
});
#[cfg(unix)]
let sighup_pump = Some(spawn_sighup_pump(sighup_tx)?);
#[cfg(not(unix))]
let sighup_pump: Option<tokio::task::JoinHandle<()>> = {
let _ = sighup_tx; None
};
let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel::<()>();
let shutdown_printer = Arc::clone(&printer);
let shutdown_task = tokio::spawn(async move {
wait_for_shutdown(shutdown_printer).await;
let _ = shutdown_tx.send(());
});
let file_rx = file_rx_for_triggers.ok_or_else(|| DaemonError::WatchError {
message: "internal: production path did not initialise file watcher".to_string(),
})?;
TriggerSetup {
triggers: DaemonTriggers {
file_rx,
reconcile_rx,
sync_rx,
version_check_rx,
compliance_rx,
sighup_rx,
shutdown_rx,
},
reconcile_pump: Some(reconcile_pump),
sync_pump: Some(sync_pump),
version_check_pump: Some(version_check_pump),
compliance_pump,
sighup_pump,
shutdown_task: Some(shutdown_task),
}
};
let ctx = DaemonLoopContext {
state: Arc::clone(&state),
hooks: Arc::clone(&hooks),
notifier: Arc::clone(&setup.notifier),
config_path: config_path.clone(),
profile_override: profile_override.clone(),
on_change_reconcile: setup.parsed.on_change_reconcile,
notify_on_drift: setup.parsed.notify_on_drift,
compliance_config: setup.compliance_config.clone(),
printer: Arc::clone(&printer),
state_dir_override: overrides.state_dir_override.clone(),
};
let loop_result = run_daemon_loop(
ctx,
triggers,
setup.reconcile_tasks,
setup.sync_tasks,
reconcile_secs,
sync_secs,
)
.await;
if let Some(h) = reconcile_pump {
h.abort();
}
if let Some(h) = sync_pump {
h.abort();
}
if let Some(h) = version_check_pump {
h.abort();
}
if let Some(h) = compliance_pump {
h.abort();
}
if let Some(h) = sighup_pump {
h.abort();
}
if let Some(h) = shutdown_task {
h.abort();
}
if let Some(h) = health_handle {
h.abort();
let _ = h.await;
}
cleanup_ipc_socket(&ipc_path);
printer.status_simple(Role::Ok, "Daemon stopped");
loop_result
}
#[cfg(test)]
pub(super) fn init_daemon_state(override_dir: Option<&Path>) -> DaemonState {
init_daemon_state_with_warning(override_dir).0
}
pub(super) fn init_daemon_state_with_warning(
override_dir: Option<&Path>,
) -> (DaemonState, Option<String>) {
let dir_result = override_dir
.map(|d| Ok(d.to_path_buf()))
.unwrap_or_else(crate::state::default_state_dir);
match dir_result {
Ok(dir) => (
DaemonState::new().with_store_path(dir.join("state.db")),
None,
),
Err(e) => {
tracing::warn!(error = %e, "cannot resolve default state dir; /drift endpoint disabled");
let banner = format!("Drift endpoint disabled: cannot resolve default state dir ({e})");
(DaemonState::new(), Some(banner))
}
}
}
pub(super) fn check_already_running(_ipc_path: &Path) -> Result<()> {
#[cfg(unix)]
{
if _ipc_path.exists() {
if StdUnixStream::connect(_ipc_path).is_ok() {
return Err(DaemonError::AlreadyRunning {
pid: std::process::id(),
}
.into());
}
let _ = std::fs::remove_file(_ipc_path);
}
}
#[cfg(windows)]
{
if connect_daemon_ipc().is_some() {
return Err(DaemonError::AlreadyRunning {
pid: std::process::id(),
}
.into());
}
}
Ok(())
}
pub(super) fn format_interval_lines(
parsed: &ParsedDaemonConfig,
compliance_interval: Option<Duration>,
) -> Vec<String> {
let mut intervals = vec![format!(
"reconcile={}s",
parsed.reconcile_interval.as_secs()
)];
if parsed.auto_pull || parsed.auto_push {
intervals.push(format!(
"sync={}s (pull={}, push={})",
parsed.sync_interval.as_secs(),
parsed.auto_pull,
parsed.auto_push
));
}
if let Some(interval) = compliance_interval {
intervals.push(format!("compliance={}s", interval.as_secs()));
}
intervals
}
pub(super) fn print_startup_banner(printer: &Printer, intervals: &[String], ipc_path: &str) {
printer.status_simple(Role::Ok, format!("Health: {}", ipc_path));
printer.status_simple(Role::Ok, format!("Intervals: {}", intervals.join(", ")));
printer.status_simple(Role::Info, "Daemon running — press Ctrl+C to stop");
}
pub(super) fn run_startup_checkin_blocking(
config_path: &Path,
profile_override: Option<&str>,
cfg: &CfgdConfig,
) {
let config_dir = config_path
.parent()
.unwrap_or_else(|| Path::new("."))
.to_path_buf();
let profiles_dir = config_dir.join("profiles");
let profile_name = match profile_override.or(cfg.spec.profile.as_deref()) {
Some(p) => p,
None => {
tracing::error!("no profile configured — skipping reconciliation");
return;
}
};
match config::resolve_profile(profile_name, &profiles_dir) {
Ok(resolved) => {
let changed = try_server_checkin(cfg, &resolved);
if changed {
tracing::info!("server reports config changed at startup");
}
match crate::state::load_pending_server_config() {
Ok(Some(_pending)) => {
tracing::info!(
"startup: found pending server config — first reconcile will apply it"
);
if let Err(e) = crate::state::clear_pending_server_config() {
tracing::warn!(error = %e, "startup: failed to clear pending server config");
}
}
Ok(None) => {}
Err(e) => {
tracing::warn!(error = %e, "startup: failed to load pending server config");
}
}
}
Err(e) => {
tracing::warn!(error = %e, "startup check-in: failed to resolve profile");
}
}
}
#[allow(unused_variables)]
pub(super) fn cleanup_ipc_socket(ipc_path: &Path) {
#[cfg(unix)]
{
if ipc_path.exists() {
let _ = std::fs::remove_file(ipc_path);
}
}
}
fn spawn_interval_pump(
interval_secs: Arc<std::sync::atomic::AtomicU64>,
tx: mpsc::Sender<()>,
) -> tokio::task::JoinHandle<()> {
tokio::spawn(async move {
loop {
let secs = interval_secs
.load(std::sync::atomic::Ordering::Relaxed)
.max(1);
tokio::time::sleep(Duration::from_secs(secs)).await;
if tx.send(()).await.is_err() {
break;
}
}
})
}
#[cfg(unix)]
fn spawn_sighup_pump(tx: mpsc::Sender<()>) -> Result<tokio::task::JoinHandle<()>> {
let mut signal = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::hangup())
.map_err(|e| DaemonError::WatchError {
message: format!("failed to register SIGHUP handler: {}", e),
})?;
Ok(tokio::spawn(async move {
while signal.recv().await.is_some() {
if tx.send(()).await.is_err() {
break;
}
}
}))
}
async fn wait_for_shutdown(printer: Arc<Printer>) {
#[cfg(unix)]
{
let sigterm = async {
match tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate()) {
Ok(mut s) => {
s.recv().await;
}
Err(e) => {
tracing::warn!(error = %e, "failed to register SIGTERM handler");
std::future::pending::<()>().await;
}
}
};
tokio::select! {
_ = sigterm => {
printer.status_simple(Role::Info, "Received SIGTERM, shutting down daemon...");
}
_ = tokio::signal::ctrl_c() => {
printer.status_simple(Role::Info, "Shutting down daemon...");
}
}
}
#[cfg(not(unix))]
{
let _ = tokio::signal::ctrl_c().await;
printer.status_simple(Role::Info, "Shutting down daemon...");
}
}
pub(crate) fn parse_duration_or_default(s: &str) -> Duration {
crate::parse_duration_str(s).unwrap_or(Duration::from_secs(DEFAULT_RECONCILE_SECS))
}