use std::sync::Arc;
use anyhow::{Context, Result};
use tokio::signal;
use tokio::sync::mpsc::unbounded_channel;
use crate::cli::config::Config;
use crate::daemon::{fs_watcher::FsWatcher, hook_endpoint, pipeline::Pipeline};
use crate::storage::Ledger;
pub async fn run() -> Result<()> {
println!("Carryover daemon starting...");
let listener = hook_endpoint::bind_loopback()
.await
.context("bind hook endpoint")?;
let local = listener.local_addr().context("local_addr")?;
println!("Hook endpoint listening on {local}");
let (hook_tx, mut hook_rx) = unbounded_channel::<hook_endpoint::HookEvent>();
let (watcher_tx, mut watcher_rx) = unbounded_channel::<crate::daemon::fs_watcher::WatchEvent>();
let ledger_path = Ledger::default_path().context("resolve ledger path")?;
let ledger = Ledger::open(&ledger_path).context("open ledger")?;
let config_path = Config::default_path().context("resolve config path")?;
let config = Config::load_or_default(&config_path).context("load config")?;
let home_dir = dirs::home_dir().context("resolve home directory")?;
let pipeline = Arc::new(Pipeline::build(&config, ledger, home_dir));
let watcher = match FsWatcher::spawn_for_all_tools(watcher_tx) {
Ok(w) => {
println!("fs watcher subscribed to {} root(s)", w.roots.len());
Some(w)
}
Err(e) => {
eprintln!("fs watcher: not started ({e}); hook endpoint still active");
None
}
};
let pl = pipeline.clone();
let drain = tokio::spawn(async move {
loop {
tokio::select! {
Some(evt) = hook_rx.recv() => {
pl.process_hook(&evt);
}
Some(evt) = watcher_rx.recv() => {
pl.process_watch(&evt);
}
else => break,
}
}
});
let app = hook_endpoint::router(hook_tx);
let server = axum::serve(listener, app).with_graceful_shutdown(shutdown_signal());
if let Err(e) = server.await {
eprintln!("hook endpoint server error: {e}");
}
drop(watcher);
drain.abort();
println!("Carryover daemon stopped.");
Ok(())
}
async fn shutdown_signal() {
let ctrl_c = async {
signal::ctrl_c().await.ok();
};
#[cfg(unix)]
{
use signal::unix::{signal, SignalKind};
let mut sigterm = match signal(SignalKind::terminate()) {
Ok(s) => s,
Err(_) => {
ctrl_c.await;
return;
}
};
tokio::select! {
_ = ctrl_c => {}
_ = sigterm.recv() => {}
}
}
#[cfg(not(unix))]
{
ctrl_c.await;
}
}