use std::path::Path;
use tokio::io::{AsyncBufReadExt, BufReader};
use tokio::net::UnixListener;
use tokio::sync::mpsc::UnboundedSender;
use tokio_util::sync::CancellationToken;
use daemon8_types::Observation;
use crate::Result;
use crate::normalize;
pub async fn run_unix_listener(
path: &Path,
tx: UnboundedSender<Observation>,
cancel: CancellationToken,
) -> Result<()> {
if path.exists() {
let _ = std::fs::remove_file(path);
}
let listener = UnixListener::bind(path)?;
tracing::info!(path = %path.display(), "Unix socket listener bound");
let cleanup_path = path.to_owned();
let _cleanup = CleanupGuard(cleanup_path);
loop {
tokio::select! {
result = listener.accept() => {
match result {
Ok((stream, _addr)) => {
let tx = tx.clone();
let cancel = cancel.clone();
tokio::spawn(async move {
handle_connection(stream, tx, cancel).await;
});
}
Err(e) => {
tracing::warn!(error = %e, "Unix socket accept error");
}
}
}
() = cancel.cancelled() => {
tracing::debug!("Unix socket listener stopping");
return Ok(());
}
}
}
}
async fn handle_connection(
stream: tokio::net::UnixStream,
tx: UnboundedSender<Observation>,
cancel: CancellationToken,
) {
let reader = BufReader::new(stream);
let mut lines = reader.lines();
loop {
tokio::select! {
result = lines.next_line() => {
match result {
Ok(Some(line)) => {
match serde_json::from_str::<serde_json::Value>(&line) {
Ok(value) => {
let obs = normalize::normalize(value);
let _ = tx.send(obs);
}
Err(e) => {
tracing::debug!(error = %e, "Unix socket: invalid JSON line, skipping");
}
}
}
Ok(None) => break, Err(e) => {
tracing::debug!(error = %e, "Unix socket read error");
break;
}
}
}
() = cancel.cancelled() => break,
}
}
}
struct CleanupGuard(std::path::PathBuf);
impl Drop for CleanupGuard {
fn drop(&mut self) {
let _ = std::fs::remove_file(&self.0);
}
}