Skip to main content

kintsugi_daemon/
ipc.rs

1//! Local IPC transport between interception and the daemon.
2//!
3//! Wire format: one newline-delimited JSON value per message. The interception
4//! side connects and sends a [`Request`]; the daemon replies with a [`Response`].
5//! A `Propose` carries a [`ProposedCommand`] and is answered with a [`Verdict`];
6//! a `Resolve` records a human's decision on a held command and is answered with
7//! `Ack`. Transport is a Unix domain socket (filesystem) or a Windows named pipe
8//! (namespaced), abstracted by the `interprocess` crate.
9
10use std::io::{BufRead, BufReader, Read, Write};
11use std::path::PathBuf;
12
13use anyhow::{Context, Result};
14use interprocess::local_socket::prelude::*;
15#[cfg(unix)]
16use interprocess::local_socket::GenericFilePath;
17#[cfg(not(unix))]
18use interprocess::local_socket::GenericNamespaced;
19use interprocess::local_socket::{ListenerOptions, Name, Stream};
20use kintsugi_core::{Decision, ProposedCommand, Verdict};
21use serde::{Deserialize, Serialize};
22
23/// A request from interception to the daemon.
24#[derive(Debug, Clone, Serialize, Deserialize)]
25#[serde(tag = "kind", rename_all = "lowercase")]
26pub enum Request {
27    /// "Here is a command I'm about to run — what's the verdict?"
28    Propose(ProposedCommand),
29    /// "A human resolved a held command; record it (and maybe remember it)."
30    Resolve(Resolution),
31    /// "I observed a filesystem change that bypassed interception — just record
32    /// it." The backstop sends these so the daemon's single writer keeps the hash
33    /// chain intact.
34    Observe(Observation),
35    /// "A human (no AI agent) already ran this shell command — record it for the
36    /// audit trail." Passive session recording: the daemon classifies it (so a
37    /// destructive command is flagged in the timeline) but never blocks or
38    /// snapshots, because by the time we hear about it the command has run.
39    Record(ProposedCommand),
40    /// "List the commands currently held for approval."
41    ListPending,
42    /// "What is the status of this queued command?" (`pending`/`approved`/`denied`).
43    // Struct variants (not newtype-of-String): serde's internally-tagged enums
44    // cannot represent a tagged newtype wrapping a primitive.
45    PendingStatus { id: String },
46    /// "A human approved this queued command id."
47    Approve { id: String },
48    /// "A human denied this queued command id."
49    Deny { id: String },
50    /// "What is the daemon's runtime status?" — currently the active scorer, so
51    /// callers can tell whether the local model loaded or it's on the heuristic
52    /// fallback.
53    Status,
54    /// "I want to perform a privileged operation `op` (e.g. shutdown) — give me a
55    /// challenge." The daemon replies with a [`Response::Challenge`]. Auth is
56    /// enforced by the *daemon*, against the vault IT loaded at startup, so the
57    /// caller's environment can't point the check at a different/empty vault.
58    AuthBegin { op: String },
59    /// "Here is the challenge proof for `op`; do it." Currently only shutdown.
60    Shutdown {
61        op: String,
62        nonce: String,
63        proof: String,
64    },
65}
66
67/// A filesystem change observed by the backstop watcher.
68#[derive(Debug, Clone, Serialize, Deserialize)]
69pub struct Observation {
70    /// `created` | `modified` | `removed`. Serialized as `change` so it never
71    /// collides with the enum's internal `kind` tag.
72    #[serde(rename = "change")]
73    pub kind: String,
74    /// The path that changed.
75    pub path: String,
76}
77
78/// A human's resolution of a held command.
79#[derive(Debug, Clone, Serialize, Deserialize)]
80pub struct Resolution {
81    /// The original command being resolved.
82    pub command: ProposedCommand,
83    /// The human's decision — `Allow` or `Deny` (never `Hold`).
84    pub decision: Decision,
85    /// Whether to remember this decision for this exact command in this repo.
86    pub remember: bool,
87}
88
89/// The daemon's reply.
90#[derive(Debug, Clone, Serialize, Deserialize)]
91#[serde(tag = "kind", rename_all = "lowercase")]
92pub enum Response {
93    /// Verdict for a `Propose`.
94    Verdict(Verdict),
95    /// Acknowledgement of a `Resolve`/`Approve`/`Deny`/`Observe`.
96    Ack,
97    /// The approval queue (reply to `ListPending`). A struct variant because
98    /// serde's internally-tagged enums cannot wrap a bare sequence.
99    PendingList {
100        items: Vec<kintsugi_core::PendingItem>,
101    },
102    /// The status of a queued command (reply to `PendingStatus`): `pending` |
103    /// `approved` | `denied` | `gone` (not in the queue).
104    Pending { status: String },
105    /// The daemon's runtime status (reply to `Status`). `scorer` is the active
106    /// backend id, e.g. `heuristic` or `llama:Qwen3-4B-Instruct-2507-Q4_K_M`.
107    Status { scorer: String },
108    /// Reply to `AuthBegin`: a fresh challenge. `locked` is false when the daemon
109    /// has no vault (then no proof is needed); otherwise the caller derives the
110    /// proof from `nonce` + `salt` + `params` and the admin password.
111    Challenge {
112        locked: bool,
113        nonce: String,
114        salt: String,
115        params: kintsugi_core::admin::KdfParams,
116    },
117    /// Something went wrong handling the request.
118    Error { message: String },
119}
120
121/// Resolve the socket path. Override with `KINTSUGI_SOCKET` (handy in tests).
122///
123/// On Unix this is a filesystem path under `$XDG_RUNTIME_DIR` (falling back to
124/// the temp dir). On Windows a namespaced pipe name is used instead.
125pub fn socket_path() -> PathBuf {
126    if let Ok(p) = std::env::var("KINTSUGI_SOCKET") {
127        return PathBuf::from(p);
128    }
129    #[cfg(unix)]
130    {
131        // $XDG_RUNTIME_DIR is already a per-user 0700 dir — the right home.
132        if let Ok(rt) = std::env::var("XDG_RUNTIME_DIR") {
133            if !rt.is_empty() {
134                return PathBuf::from(rt).join("kintsugi.sock");
135            }
136        }
137        // Otherwise use the per-user data dir (created 0700 at bind), never the
138        // world-writable temp dir, so another local user can't pre-create or
139        // connect to the socket.
140        if let Some(dirs) = directories::ProjectDirs::from("", "", "kintsugi") {
141            return dirs.data_dir().join("kintsugi.sock");
142        }
143        std::env::temp_dir().join("kintsugi.sock")
144    }
145    #[cfg(not(unix))]
146    {
147        PathBuf::from(r"\\.\pipe\kintsugi")
148    }
149}
150
151/// Best-effort chmod on Unix (no-op elsewhere). Used to keep the socket and its
152/// parent dir private to the owning user.
153#[cfg(unix)]
154pub(crate) fn set_mode(path: &std::path::Path, mode: u32) {
155    use std::os::unix::fs::PermissionsExt;
156    let _ = std::fs::set_permissions(path, std::fs::Permissions::from_mode(mode));
157}
158
159/// Build the `interprocess` name for the current platform.
160fn make_name() -> Result<Name<'static>> {
161    let path = socket_path();
162    #[cfg(unix)]
163    {
164        path.clone()
165            .to_fs_name::<GenericFilePath>()
166            .with_context(|| format!("invalid socket path {}", path.display()))
167    }
168    #[cfg(not(unix))]
169    {
170        let _ = &path;
171        "kintsugi"
172            .to_ns_name::<GenericNamespaced>()
173            .context("invalid namespaced pipe name")
174    }
175}
176
177/// Write one JSON message followed by a newline.
178fn write_message<W: Write, T: Serialize>(w: &mut W, value: &T) -> Result<()> {
179    let mut line = serde_json::to_string(value).context("serialize IPC message")?;
180    line.push('\n');
181    w.write_all(line.as_bytes()).context("write IPC message")?;
182    w.flush().context("flush IPC message")?;
183    Ok(())
184}
185
186/// Maximum size of a single IPC message. Bounds memory so a misbehaving or
187/// hostile local peer can't OOM/stall the single-threaded daemon with a giant or
188/// newline-free stream.
189pub const MAX_FRAME: u64 = 16 * 1024 * 1024;
190
191/// Read one newline-delimited JSON message from a length-bounded reader.
192fn read_message<R: BufRead, T: serde::de::DeserializeOwned>(r: &mut R) -> Result<T> {
193    let mut line = String::new();
194    let n = r.read_line(&mut line).context("read IPC message")?;
195    if n == 0 {
196        anyhow::bail!("connection closed before a message was received");
197    }
198    if !line.ends_with('\n') && n as u64 >= MAX_FRAME {
199        anyhow::bail!("IPC message exceeds {MAX_FRAME} bytes");
200    }
201    serde_json::from_str(line.trim_end()).context("deserialize IPC message")
202}
203
204/// Wrap a stream in a length-bounded buffered reader (see [`MAX_FRAME`]).
205fn bounded(stream: &mut Stream) -> BufReader<std::io::Take<&mut Stream>> {
206    BufReader::new(stream.take(MAX_FRAME))
207}
208
209/// Expect an `Ack`, mapping anything else to an error.
210fn expect_ack(resp: Response) -> Result<()> {
211    match resp {
212        Response::Ack => Ok(()),
213        Response::Error { message } => anyhow::bail!("daemon error: {message}"),
214        _ => anyhow::bail!("unexpected response (wanted Ack)"),
215    }
216}
217
218/// Send a request and read the response on a fresh connection.
219fn round_trip(req: &Request) -> Result<Response> {
220    let name = make_name()?;
221    let mut stream =
222        Stream::connect(name).context("connect to kintsugi daemon (is it running?)")?;
223    write_message(&mut stream, req)?;
224    let mut reader = bounded(&mut stream);
225    read_message(&mut reader)
226}
227
228/// Client side: connect, send a request, and block for the response.
229pub struct Client;
230
231impl Client {
232    /// Propose a command and await its verdict.
233    pub fn send(cmd: &ProposedCommand) -> Result<Verdict> {
234        match round_trip(&Request::Propose(cmd.clone()))? {
235            Response::Verdict(v) => Ok(v),
236            Response::Error { message } => anyhow::bail!("daemon error: {message}"),
237            _ => anyhow::bail!("unexpected response to Propose"),
238        }
239    }
240
241    /// Record a human's resolution of a held command.
242    pub fn resolve(resolution: &Resolution) -> Result<()> {
243        expect_ack(round_trip(&Request::Resolve(resolution.clone()))?)
244    }
245
246    /// Record an observed filesystem change (backstop).
247    pub fn observe(observation: &Observation) -> Result<()> {
248        expect_ack(round_trip(&Request::Observe(observation.clone()))?)
249    }
250
251    /// Record a shell command a human already ran (passive session recording).
252    pub fn record(cmd: &ProposedCommand) -> Result<()> {
253        expect_ack(round_trip(&Request::Record(cmd.clone()))?)
254    }
255
256    /// List the commands currently held for approval.
257    pub fn list_pending() -> Result<Vec<kintsugi_core::PendingItem>> {
258        match round_trip(&Request::ListPending)? {
259            Response::PendingList { items } => Ok(items),
260            Response::Error { message } => anyhow::bail!("daemon error: {message}"),
261            _ => anyhow::bail!("unexpected response to ListPending"),
262        }
263    }
264
265    /// The status of a queued command: `pending` | `approved` | `denied` | `gone`.
266    pub fn pending_status(id: &str) -> Result<String> {
267        match round_trip(&Request::PendingStatus { id: id.to_string() })? {
268            Response::Pending { status } => Ok(status),
269            Response::Error { message } => anyhow::bail!("daemon error: {message}"),
270            _ => anyhow::bail!("unexpected response to PendingStatus"),
271        }
272    }
273
274    /// Approve a queued command (records the human decision; may snapshot).
275    pub fn approve(id: &str) -> Result<()> {
276        expect_ack(round_trip(&Request::Approve { id: id.to_string() })?)
277    }
278
279    /// Deny a queued command.
280    pub fn deny(id: &str) -> Result<()> {
281        expect_ack(round_trip(&Request::Deny { id: id.to_string() })?)
282    }
283
284    /// The daemon's active scorer backend id (e.g. `heuristic` or
285    /// `llama:<model>`). Lets callers report whether the local model is loaded.
286    pub fn status_scorer() -> Result<String> {
287        match round_trip(&Request::Status)? {
288            Response::Status { scorer } => Ok(scorer),
289            Response::Error { message } => anyhow::bail!("daemon error: {message}"),
290            _ => anyhow::bail!("unexpected response to Status"),
291        }
292    }
293
294    /// Begin authenticating a privileged op; returns (locked, nonce, salt, params).
295    pub fn auth_begin(op: &str) -> Result<(bool, String, String, kintsugi_core::admin::KdfParams)> {
296        match round_trip(&Request::AuthBegin { op: op.to_string() })? {
297            Response::Challenge {
298                locked,
299                nonce,
300                salt,
301                params,
302            } => Ok((locked, nonce, salt, params)),
303            Response::Error { message } => anyhow::bail!("daemon error: {message}"),
304            _ => anyhow::bail!("unexpected response to AuthBegin"),
305        }
306    }
307
308    /// Complete an authenticated shutdown with a challenge proof (hex). On success
309    /// the daemon records the event and exits.
310    pub fn shutdown(op: &str, nonce: &str, proof: &str) -> Result<()> {
311        expect_ack(round_trip(&Request::Shutdown {
312            op: op.to_string(),
313            nonce: nonce.to_string(),
314            proof: proof.to_string(),
315        })?)
316    }
317
318    /// Whether a daemon appears to be listening.
319    pub fn is_daemon_running() -> bool {
320        match make_name() {
321            Ok(name) => Stream::connect(name).is_ok(),
322            Err(_) => false,
323        }
324    }
325}
326
327/// Server side: a bound listener that dispatches each request to a handler.
328pub struct Server {
329    listener: interprocess::local_socket::Listener,
330}
331
332impl Server {
333    /// Bind the listener, clearing any stale Unix socket file first.
334    pub fn bind() -> Result<Self> {
335        #[cfg(unix)]
336        {
337            let path = socket_path();
338            // Ensure a private parent dir (0700) so peers can't pre-create the
339            // socket, then clear any stale socket file.
340            if let Some(parent) = path.parent() {
341                let _ = std::fs::create_dir_all(parent);
342                set_mode(parent, 0o700);
343            }
344            if path.exists() {
345                let _ = std::fs::remove_file(&path);
346            }
347        }
348        let name = make_name()?;
349        let listener = ListenerOptions::new()
350            .name(name)
351            .create_sync()
352            .context("bind kintsugi daemon socket")?;
353        // Restrict the socket to the owning user (no group/other access), so on a
354        // shared host another user can't connect and Approve/Deny/Resolve.
355        #[cfg(unix)]
356        set_mode(&socket_path(), 0o600);
357        Ok(Self { listener })
358    }
359
360    /// The path/name the server is listening on.
361    pub fn endpoint() -> PathBuf {
362        socket_path()
363    }
364
365    /// Serve connections sequentially, calling `handler` for each request.
366    pub fn serve<F>(self, mut handler: F) -> Result<()>
367    where
368        F: FnMut(Request) -> Response,
369    {
370        for incoming in self.listener.incoming() {
371            let stream = match incoming {
372                Ok(s) => s,
373                Err(e) => {
374                    eprintln!("kintsugi-daemon: accept error: {e}");
375                    continue;
376                }
377            };
378            if let Err(e) = Self::handle_one(stream, &mut handler) {
379                eprintln!("kintsugi-daemon: connection error: {e}");
380            }
381        }
382        Ok(())
383    }
384
385    /// Serve connections until `stop()` returns true (checked after each one).
386    /// Lets a handler request its own shutdown (e.g. an authenticated `Shutdown`).
387    pub fn serve_until<F, S>(self, mut handler: F, stop: S) -> Result<()>
388    where
389        F: FnMut(Request) -> Response,
390        S: Fn() -> bool,
391    {
392        for incoming in self.listener.incoming() {
393            let stream = match incoming {
394                Ok(s) => s,
395                Err(e) => {
396                    eprintln!("kintsugi-daemon: accept error: {e}");
397                    continue;
398                }
399            };
400            if let Err(e) = Self::handle_one(stream, &mut handler) {
401                eprintln!("kintsugi-daemon: connection error: {e}");
402            }
403            if stop() {
404                break;
405            }
406        }
407        Ok(())
408    }
409
410    /// Serve exactly `count` connections then stop. Used by tests.
411    pub fn serve_n<F>(self, count: usize, mut handler: F) -> Result<()>
412    where
413        F: FnMut(Request) -> Response,
414    {
415        if count == 0 {
416            return Ok(());
417        }
418        let mut served = 0;
419        for incoming in self.listener.incoming() {
420            let stream = incoming.context("accept connection")?;
421            Self::handle_one(stream, &mut handler)?;
422            served += 1;
423            if served >= count {
424                break;
425            }
426        }
427        Ok(())
428    }
429
430    fn handle_one<F>(mut stream: Stream, handler: &mut F) -> Result<()>
431    where
432        F: FnMut(Request) -> Response,
433    {
434        let req: Request = {
435            let mut reader = bounded(&mut stream);
436            read_message(&mut reader)?
437        };
438        let resp = handler(req);
439        write_message(&mut stream, &resp)?;
440        Ok(())
441    }
442}