use std::io::{BufRead, BufReader, Read, Write};
use std::path::PathBuf;
use anyhow::{Context, Result};
use interprocess::local_socket::prelude::*;
#[cfg(unix)]
use interprocess::local_socket::GenericFilePath;
#[cfg(not(unix))]
use interprocess::local_socket::GenericNamespaced;
use interprocess::local_socket::{ListenerOptions, Name, Stream};
use kintsugi_core::{Decision, ProposedCommand, Verdict};
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "kind", rename_all = "lowercase")]
pub enum Request {
Propose(ProposedCommand),
Resolve(Resolution),
Observe(Observation),
Record(ProposedCommand),
ListPending,
PendingStatus { id: String },
Approve { id: String },
Deny { id: String },
Status,
AuthBegin { op: String },
Shutdown {
op: String,
nonce: String,
proof: String,
},
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Observation {
#[serde(rename = "change")]
pub kind: String,
pub path: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Resolution {
pub command: ProposedCommand,
pub decision: Decision,
pub remember: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "kind", rename_all = "lowercase")]
pub enum Response {
Verdict(Verdict),
Ack,
PendingList {
items: Vec<kintsugi_core::PendingItem>,
},
Pending { status: String },
Status { scorer: String },
Challenge {
locked: bool,
nonce: String,
salt: String,
params: kintsugi_core::admin::KdfParams,
},
Error { message: String },
}
pub fn socket_path() -> PathBuf {
if let Ok(p) = std::env::var("KINTSUGI_SOCKET") {
return PathBuf::from(p);
}
#[cfg(unix)]
{
if let Ok(rt) = std::env::var("XDG_RUNTIME_DIR") {
if !rt.is_empty() {
return PathBuf::from(rt).join("kintsugi.sock");
}
}
if let Some(dirs) = directories::ProjectDirs::from("", "", "kintsugi") {
return dirs.data_dir().join("kintsugi.sock");
}
std::env::temp_dir().join("kintsugi.sock")
}
#[cfg(not(unix))]
{
PathBuf::from(r"\\.\pipe\kintsugi")
}
}
#[cfg(unix)]
pub(crate) fn set_mode(path: &std::path::Path, mode: u32) {
use std::os::unix::fs::PermissionsExt;
let _ = std::fs::set_permissions(path, std::fs::Permissions::from_mode(mode));
}
fn make_name() -> Result<Name<'static>> {
let path = socket_path();
#[cfg(unix)]
{
path.clone()
.to_fs_name::<GenericFilePath>()
.with_context(|| format!("invalid socket path {}", path.display()))
}
#[cfg(not(unix))]
{
let _ = &path;
"kintsugi"
.to_ns_name::<GenericNamespaced>()
.context("invalid namespaced pipe name")
}
}
fn write_message<W: Write, T: Serialize>(w: &mut W, value: &T) -> Result<()> {
let mut line = serde_json::to_string(value).context("serialize IPC message")?;
line.push('\n');
w.write_all(line.as_bytes()).context("write IPC message")?;
w.flush().context("flush IPC message")?;
Ok(())
}
pub const MAX_FRAME: u64 = 16 * 1024 * 1024;
fn read_message<R: BufRead, T: serde::de::DeserializeOwned>(r: &mut R) -> Result<T> {
let mut line = String::new();
let n = r.read_line(&mut line).context("read IPC message")?;
if n == 0 {
anyhow::bail!("connection closed before a message was received");
}
if !line.ends_with('\n') && n as u64 >= MAX_FRAME {
anyhow::bail!("IPC message exceeds {MAX_FRAME} bytes");
}
serde_json::from_str(line.trim_end()).context("deserialize IPC message")
}
fn bounded(stream: &mut Stream) -> BufReader<std::io::Take<&mut Stream>> {
BufReader::new(stream.take(MAX_FRAME))
}
fn expect_ack(resp: Response) -> Result<()> {
match resp {
Response::Ack => Ok(()),
Response::Error { message } => anyhow::bail!("daemon error: {message}"),
_ => anyhow::bail!("unexpected response (wanted Ack)"),
}
}
fn round_trip(req: &Request) -> Result<Response> {
let name = make_name()?;
let mut stream =
Stream::connect(name).context("connect to kintsugi daemon (is it running?)")?;
write_message(&mut stream, req)?;
let mut reader = bounded(&mut stream);
read_message(&mut reader)
}
pub struct Client;
impl Client {
pub fn send(cmd: &ProposedCommand) -> Result<Verdict> {
match round_trip(&Request::Propose(cmd.clone()))? {
Response::Verdict(v) => Ok(v),
Response::Error { message } => anyhow::bail!("daemon error: {message}"),
_ => anyhow::bail!("unexpected response to Propose"),
}
}
pub fn resolve(resolution: &Resolution) -> Result<()> {
expect_ack(round_trip(&Request::Resolve(resolution.clone()))?)
}
pub fn observe(observation: &Observation) -> Result<()> {
expect_ack(round_trip(&Request::Observe(observation.clone()))?)
}
pub fn record(cmd: &ProposedCommand) -> Result<()> {
expect_ack(round_trip(&Request::Record(cmd.clone()))?)
}
pub fn list_pending() -> Result<Vec<kintsugi_core::PendingItem>> {
match round_trip(&Request::ListPending)? {
Response::PendingList { items } => Ok(items),
Response::Error { message } => anyhow::bail!("daemon error: {message}"),
_ => anyhow::bail!("unexpected response to ListPending"),
}
}
pub fn pending_status(id: &str) -> Result<String> {
match round_trip(&Request::PendingStatus { id: id.to_string() })? {
Response::Pending { status } => Ok(status),
Response::Error { message } => anyhow::bail!("daemon error: {message}"),
_ => anyhow::bail!("unexpected response to PendingStatus"),
}
}
pub fn approve(id: &str) -> Result<()> {
expect_ack(round_trip(&Request::Approve { id: id.to_string() })?)
}
pub fn deny(id: &str) -> Result<()> {
expect_ack(round_trip(&Request::Deny { id: id.to_string() })?)
}
pub fn status_scorer() -> Result<String> {
match round_trip(&Request::Status)? {
Response::Status { scorer } => Ok(scorer),
Response::Error { message } => anyhow::bail!("daemon error: {message}"),
_ => anyhow::bail!("unexpected response to Status"),
}
}
pub fn auth_begin(op: &str) -> Result<(bool, String, String, kintsugi_core::admin::KdfParams)> {
match round_trip(&Request::AuthBegin { op: op.to_string() })? {
Response::Challenge {
locked,
nonce,
salt,
params,
} => Ok((locked, nonce, salt, params)),
Response::Error { message } => anyhow::bail!("daemon error: {message}"),
_ => anyhow::bail!("unexpected response to AuthBegin"),
}
}
pub fn shutdown(op: &str, nonce: &str, proof: &str) -> Result<()> {
expect_ack(round_trip(&Request::Shutdown {
op: op.to_string(),
nonce: nonce.to_string(),
proof: proof.to_string(),
})?)
}
pub fn is_daemon_running() -> bool {
match make_name() {
Ok(name) => Stream::connect(name).is_ok(),
Err(_) => false,
}
}
}
pub struct Server {
listener: interprocess::local_socket::Listener,
}
impl Server {
pub fn bind() -> Result<Self> {
#[cfg(unix)]
{
let path = socket_path();
if let Some(parent) = path.parent() {
let _ = std::fs::create_dir_all(parent);
set_mode(parent, 0o700);
}
if path.exists() {
let _ = std::fs::remove_file(&path);
}
}
let name = make_name()?;
let listener = ListenerOptions::new()
.name(name)
.create_sync()
.context("bind kintsugi daemon socket")?;
#[cfg(unix)]
set_mode(&socket_path(), 0o600);
Ok(Self { listener })
}
pub fn endpoint() -> PathBuf {
socket_path()
}
pub fn serve<F>(self, mut handler: F) -> Result<()>
where
F: FnMut(Request) -> Response,
{
for incoming in self.listener.incoming() {
let stream = match incoming {
Ok(s) => s,
Err(e) => {
eprintln!("kintsugi-daemon: accept error: {e}");
continue;
}
};
if let Err(e) = Self::handle_one(stream, &mut handler) {
eprintln!("kintsugi-daemon: connection error: {e}");
}
}
Ok(())
}
pub fn serve_until<F, S>(self, mut handler: F, stop: S) -> Result<()>
where
F: FnMut(Request) -> Response,
S: Fn() -> bool,
{
for incoming in self.listener.incoming() {
let stream = match incoming {
Ok(s) => s,
Err(e) => {
eprintln!("kintsugi-daemon: accept error: {e}");
continue;
}
};
if let Err(e) = Self::handle_one(stream, &mut handler) {
eprintln!("kintsugi-daemon: connection error: {e}");
}
if stop() {
break;
}
}
Ok(())
}
pub fn serve_n<F>(self, count: usize, mut handler: F) -> Result<()>
where
F: FnMut(Request) -> Response,
{
if count == 0 {
return Ok(());
}
let mut served = 0;
for incoming in self.listener.incoming() {
let stream = incoming.context("accept connection")?;
Self::handle_one(stream, &mut handler)?;
served += 1;
if served >= count {
break;
}
}
Ok(())
}
fn handle_one<F>(mut stream: Stream, handler: &mut F) -> Result<()>
where
F: FnMut(Request) -> Response,
{
let req: Request = {
let mut reader = bounded(&mut stream);
read_message(&mut reader)?
};
let resp = handler(req);
write_message(&mut stream, &resp)?;
Ok(())
}
}