use std::os::unix::ffi::OsStrExt;
use std::os::unix::fs::PermissionsExt;
use std::os::unix::process::CommandExt;
use std::process::Stdio;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use anyhow::{Context, Result, bail};
use rusqlite::Connection;
use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader};
use tokio::net::{UnixListener, UnixStream};
use tokio::signal::unix::{SignalKind, signal};
use tokio::sync::Notify;
use tokio::time::interval;
use crate::ipc::{Request, Response};
use crate::{catalog, ipc, paths, record};
type Db = Arc<Mutex<Connection>>;
pub fn run(detach: bool) -> Result<()> {
validate_socket_path()?;
if detach {
return spawn_detached();
}
if already_running() {
println!("galdr daemon already running");
return Ok(());
}
let sock = paths::socket_path()?;
if sock.exists() {
let _ = std::fs::remove_file(&sock);
}
paths::ensure_dirs()?;
let rt = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.context("could not start the async runtime")?;
rt.block_on(serve())
}
fn already_running() -> bool {
matches!(ipc::query(&Request::Ping), Ok(Response::Pong { .. }))
}
pub fn stop_and_wait() {
let _ = ipc::query(&Request::Shutdown);
for _ in 0..40 {
if ipc::query(&Request::Ping).is_err() {
return;
}
std::thread::sleep(Duration::from_millis(50));
}
}
fn spawn_detached() -> Result<()> {
let exe = std::env::current_exe().context("could not find the galdr executable")?;
let mut cmd = std::process::Command::new(exe);
cmd.arg("daemon")
.stdin(Stdio::null())
.stdout(Stdio::null())
.stderr(Stdio::null())
.process_group(0);
let pid = cmd
.spawn()
.context("could not spawn the detached daemon")?
.id();
for _ in 0..40 {
if already_running() {
println!("galdr daemon started (pid {pid})");
return Ok(());
}
std::thread::sleep(Duration::from_millis(50));
}
bail!(
"daemon spawned (pid {pid}) but never answered on the control socket within 2s; \
it likely failed to start. Run `galdr daemon` in the foreground to see the error."
);
}
fn validate_socket_path() -> Result<()> {
const MAX: usize = 100; let sock = paths::socket_path()?;
let len = sock.as_os_str().as_bytes().len();
if len > MAX {
bail!(
"daemon socket path is too long ({len} bytes, limit ~{MAX}): {}\n\
Set GALDR_ROOT to a shorter directory (for example one under /tmp).",
sock.display()
);
}
Ok(())
}
async fn serve() -> Result<()> {
let sock = paths::socket_path()?;
let listener = UnixListener::bind(&sock)
.with_context(|| format!("could not bind the control socket {}", sock.display()))?;
std::fs::set_permissions(&sock, std::fs::Permissions::from_mode(0o600))?;
std::fs::write(paths::pidfile()?, std::process::id().to_string())?;
let mut conn = open_or_heal()?;
let stats = catalog::reindex(&mut conn)?;
eprintln!(
"galdr daemon: indexed {} recordings, {} steps, {} skills",
stats.recordings, stats.steps, stats.skills
);
let db: Db = Arc::new(Mutex::new(conn));
let shutdown = Arc::new(Notify::new());
let mut sigterm = signal(SignalKind::terminate())?;
let mut sigint = signal(SignalKind::interrupt())?;
let mut poll = interval(Duration::from_secs(2));
loop {
tokio::select! {
_ = shutdown.notified() => break,
_ = sigterm.recv() => break,
_ = sigint.recv() => break,
_ = poll.tick() => {
let db = db.clone();
let _ = tokio::task::spawn_blocking(move || {
if let Ok(conn) = db.lock() {
let _ = catalog::reconcile(&conn);
}
}).await;
}
accepted = listener.accept() => {
if let Ok((stream, _addr)) = accepted {
tokio::spawn(handle_conn(stream, db.clone(), shutdown.clone()));
}
}
}
}
if let Ok(conn) = db.lock() {
let _ = conn.execute_batch("PRAGMA wal_checkpoint(TRUNCATE);");
}
let _ = std::fs::remove_file(&sock);
let _ = std::fs::remove_file(paths::pidfile()?);
Ok(())
}
fn open_or_heal() -> Result<Connection> {
if let Ok(conn) = catalog::open() {
return Ok(conn);
}
let db = paths::catalog_db()?;
let _ = std::fs::remove_file(&db);
let _ = std::fs::remove_file(db.with_extension("sqlite-wal"));
let _ = std::fs::remove_file(db.with_extension("sqlite-shm"));
catalog::open()
}
async fn handle_conn(stream: UnixStream, db: Db, shutdown: Arc<Notify>) {
let _ = process_conn(stream, db, shutdown).await;
}
async fn process_conn(stream: UnixStream, db: Db, shutdown: Arc<Notify>) -> Result<()> {
const MAX_REQUEST_BYTES: u64 = 1024 * 1024;
const READ_TIMEOUT: Duration = Duration::from_secs(10);
let (read_half, mut write_half) = stream.into_split();
let mut reader = BufReader::new(read_half.take(MAX_REQUEST_BYTES));
let mut line = String::new();
match tokio::time::timeout(READ_TIMEOUT, reader.read_line(&mut line)).await {
Ok(Ok(0)) | Err(_) => return Ok(()),
Ok(Ok(_)) => {}
Ok(Err(err)) => return Err(err.into()),
}
let req: Request = match serde_json::from_str(line.trim()) {
Ok(req) => req,
Err(err) => {
let resp = Response::Error {
message: format!("bad request: {err}"),
};
let _ = write_response(&mut write_half, &resp).await;
return Ok(());
}
};
let is_shutdown = matches!(req, Request::Shutdown);
let resp = handle_request(&db, req);
let _ = write_response(&mut write_half, &resp).await;
if is_shutdown {
shutdown.notify_one();
}
Ok(())
}
fn handle_request(db: &Db, req: Request) -> Response {
match req {
Request::Ping => Response::Pong {
version: Some(env!("CARGO_PKG_VERSION").to_string()),
},
Request::Shutdown => Response::Ack,
Request::EventAppended { rec_id, event } => with_db(db, |c| {
catalog::index_event(c, &rec_id, event.as_ref()).map(|()| Response::Ack)
}),
Request::RecordingClosed { recording } => with_db(db, |c| {
catalog::index_recording(c, &recording).map(|()| Response::Ack)
}),
Request::SkillInstalled {
skill_name,
rec_id,
skill_path,
status,
} => with_db(db, |c| {
catalog::upsert_skill(
c,
&skill_name,
Some(&rec_id),
&skill_path,
Some(&record::now_rfc3339()),
&status,
)
.map(|()| Response::Ack)
}),
Request::ListRecordings => with_db(db, |c| {
catalog::list_recordings(c).map(|recordings| Response::Recordings { recordings })
}),
Request::ShowRecording { id } => with_db(db, |c| {
catalog::show_recording(c, &id).map(|recording| Response::Recording { recording })
}),
Request::ListSkills => with_db(db, |c| {
catalog::list_skills(c).map(|skills| Response::Skills { skills })
}),
Request::Reindex => with_db_mut(db, |c| {
catalog::reindex(c).map(|stats| Response::Reindexed { stats })
}),
}
}
fn with_db<F>(db: &Db, f: F) -> Response
where
F: FnOnce(&Connection) -> Result<Response>,
{
match db.lock() {
Ok(conn) => f(&conn).unwrap_or_else(|err| Response::Error {
message: err.to_string(),
}),
Err(_) => Response::Error {
message: "catalog lock poisoned".into(),
},
}
}
fn with_db_mut<F>(db: &Db, f: F) -> Response
where
F: FnOnce(&mut Connection) -> Result<Response>,
{
match db.lock() {
Ok(mut conn) => f(&mut conn).unwrap_or_else(|err| Response::Error {
message: err.to_string(),
}),
Err(_) => Response::Error {
message: "catalog lock poisoned".into(),
},
}
}
async fn write_response(w: &mut (impl AsyncWriteExt + Unpin), resp: &Response) -> Result<()> {
let mut line = serde_json::to_vec(resp)?;
line.push(b'\n');
w.write_all(&line).await?;
w.flush().await?;
Ok(())
}