use std::io::{BufRead, BufReader, Write};
use std::os::unix::net::UnixStream;
use std::path::{Path, PathBuf};
use anyhow::{Context, Result, bail};
use serde::{Deserialize, Serialize};
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader as AsyncBufReader};
use tokio::net::UnixListener;
use crate::monitored::PathEntry;
#[derive(Debug, Serialize, Deserialize)]
pub struct SocketCmd {
pub cmd: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub path: Option<PathBuf>,
#[serde(skip_serializing_if = "Option::is_none")]
pub recursive: Option<bool>,
#[serde(skip_serializing_if = "Option::is_none")]
pub types: Option<Vec<String>>,
#[serde(skip_serializing_if = "Option::is_none")]
pub size: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub track_cmd: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub enum ErrorKind {
Permanent,
Transient,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct SocketResp {
pub ok: bool,
#[serde(skip_serializing_if = "Option::is_none")]
pub error: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub error_kind: Option<ErrorKind>,
#[serde(skip_serializing_if = "Option::is_none")]
pub paths: Option<Vec<PathEntry>>,
}
impl SocketResp {
pub fn ok() -> Self {
SocketResp {
ok: true,
error: None,
error_kind: None,
paths: None,
}
}
pub fn err(msg: impl Into<String>) -> Self {
SocketResp {
ok: false,
error: Some(msg.into()),
error_kind: None,
paths: None,
}
}
pub fn permanent_err(msg: impl Into<String>) -> Self {
SocketResp {
ok: false,
error: Some(msg.into()),
error_kind: Some(ErrorKind::Permanent),
paths: None,
}
}
}
fn to_toml_string<T: Serialize>(value: &T) -> Result<String> {
Ok(toml::to_string(value)?)
}
fn from_toml_str<T: serde::de::DeserializeOwned>(s: &str) -> Result<T> {
Ok(toml::from_str(s)?)
}
pub fn send_cmd(socket_path: &Path, cmd: &SocketCmd) -> Result<SocketResp> {
let stream = UnixStream::connect(socket_path).with_context(|| {
format!(
"Failed to connect to fsmon daemon at {}. Is the daemon running? \
Start it with: sudo systemctl start fsmon",
socket_path.display()
)
})?;
let toml = to_toml_string(cmd)?;
{
let mut writer = stream.try_clone()?;
write!(writer, "{toml}\n\n")?;
writer.flush()?;
}
let mut reader = BufReader::new(stream);
let mut response = String::new();
loop {
let mut line = String::new();
let bytes = reader.read_line(&mut line)?;
if bytes == 0 {
break; }
response.push_str(&line);
}
if response.trim().is_empty() {
bail!("Empty response from daemon");
}
let resp: SocketResp =
from_toml_str(response.trim()).with_context(|| "Failed to parse daemon response")?;
Ok(resp)
}
async fn read_toml_message(
reader: &mut AsyncBufReader<tokio::net::unix::OwnedReadHalf>,
) -> Result<String> {
let mut message = String::new();
loop {
let mut line = String::new();
let bytes = reader.read_line(&mut line).await?;
if bytes == 0 {
break; }
if line.trim().is_empty() && !message.is_empty() {
break; }
message.push_str(&line);
}
Ok(message)
}
pub async fn listen(
socket_path: &Path,
handler: impl Fn(SocketCmd) -> Result<SocketResp>,
) -> Result<()> {
if socket_path.exists() {
std::fs::remove_file(socket_path).with_context(|| {
format!("Failed to remove existing socket {}", socket_path.display())
})?;
}
let listener = UnixListener::bind(socket_path)
.with_context(|| format!("Failed to bind socket at {}", socket_path.display()))?;
loop {
match listener.accept().await {
Ok((stream, _addr)) => {
let (reader, mut writer) = stream.into_split();
let mut buf_reader = AsyncBufReader::new(reader);
match read_toml_message(&mut buf_reader).await {
Ok(message) if message.trim().is_empty() => continue,
Ok(message) => {
let resp = match from_toml_str::<SocketCmd>(message.trim()) {
Ok(cmd) => match handler(cmd) {
Ok(resp) => resp,
Err(e) => SocketResp::err(e.to_string()),
},
Err(e) => SocketResp::err(format!("Invalid command: {e}")),
};
let resp_toml = match to_toml_string(&resp) {
Ok(t) => t,
Err(e) => {
eprintln!("Failed to serialize response: {e}");
continue;
}
};
let resp_bytes = format!("{resp_toml}\n");
if let Err(e) = writer.write_all(resp_bytes.as_bytes()).await {
eprintln!("Failed to write response: {e}");
}
}
Err(e) => {
eprintln!("Failed to read from socket: {e}");
}
}
}
Err(e) => {
eprintln!("Failed to accept connection: {e}");
}
}
}
}