1use std::io::{BufRead, BufReader, Read, Write};
11use std::path::PathBuf;
12
13use anyhow::{Context, Result};
14use interprocess::local_socket::prelude::*;
15#[cfg(unix)]
16use interprocess::local_socket::GenericFilePath;
17#[cfg(not(unix))]
18use interprocess::local_socket::GenericNamespaced;
19use interprocess::local_socket::{ListenerOptions, Name, Stream};
20use kintsugi_core::{Decision, ProposedCommand, Verdict};
21use serde::{Deserialize, Serialize};
22
23#[derive(Debug, Clone, Serialize, Deserialize)]
25#[serde(tag = "kind", rename_all = "lowercase")]
26pub enum Request {
27 Propose(ProposedCommand),
29 Resolve(Resolution),
31 Observe(Observation),
35 Record(ProposedCommand),
40 ListPending,
42 PendingStatus { id: String },
46 Approve { id: String },
48 Deny { id: String },
50 Status,
54 AuthBegin { op: String },
59 Shutdown {
61 op: String,
62 nonce: String,
63 proof: String,
64 },
65}
66
67#[derive(Debug, Clone, Serialize, Deserialize)]
69pub struct Observation {
70 #[serde(rename = "change")]
73 pub kind: String,
74 pub path: String,
76}
77
78#[derive(Debug, Clone, Serialize, Deserialize)]
80pub struct Resolution {
81 pub command: ProposedCommand,
83 pub decision: Decision,
85 pub remember: bool,
87}
88
89#[derive(Debug, Clone, Serialize, Deserialize)]
91#[serde(tag = "kind", rename_all = "lowercase")]
92pub enum Response {
93 Verdict(Verdict),
95 Ack,
97 PendingList {
100 items: Vec<kintsugi_core::PendingItem>,
101 },
102 Pending { status: String },
105 Status { scorer: String },
108 Challenge {
112 locked: bool,
113 nonce: String,
114 salt: String,
115 params: kintsugi_core::admin::KdfParams,
116 },
117 Error { message: String },
119}
120
121pub fn socket_path() -> PathBuf {
126 if let Ok(p) = std::env::var("KINTSUGI_SOCKET") {
127 return PathBuf::from(p);
128 }
129 #[cfg(unix)]
130 {
131 if let Ok(rt) = std::env::var("XDG_RUNTIME_DIR") {
133 if !rt.is_empty() {
134 return PathBuf::from(rt).join("kintsugi.sock");
135 }
136 }
137 if let Some(dirs) = directories::ProjectDirs::from("", "", "kintsugi") {
141 return dirs.data_dir().join("kintsugi.sock");
142 }
143 std::env::temp_dir().join("kintsugi.sock")
144 }
145 #[cfg(not(unix))]
146 {
147 PathBuf::from(r"\\.\pipe\kintsugi")
148 }
149}
150
151#[cfg(unix)]
154pub(crate) fn set_mode(path: &std::path::Path, mode: u32) {
155 use std::os::unix::fs::PermissionsExt;
156 let _ = std::fs::set_permissions(path, std::fs::Permissions::from_mode(mode));
157}
158
159fn make_name() -> Result<Name<'static>> {
161 let path = socket_path();
162 #[cfg(unix)]
163 {
164 path.clone()
165 .to_fs_name::<GenericFilePath>()
166 .with_context(|| format!("invalid socket path {}", path.display()))
167 }
168 #[cfg(not(unix))]
169 {
170 let _ = &path;
171 "kintsugi"
172 .to_ns_name::<GenericNamespaced>()
173 .context("invalid namespaced pipe name")
174 }
175}
176
177fn write_message<W: Write, T: Serialize>(w: &mut W, value: &T) -> Result<()> {
179 let mut line = serde_json::to_string(value).context("serialize IPC message")?;
180 line.push('\n');
181 w.write_all(line.as_bytes()).context("write IPC message")?;
182 w.flush().context("flush IPC message")?;
183 Ok(())
184}
185
186pub const MAX_FRAME: u64 = 16 * 1024 * 1024;
190
191fn read_message<R: BufRead, T: serde::de::DeserializeOwned>(r: &mut R) -> Result<T> {
193 let mut line = String::new();
194 let n = r.read_line(&mut line).context("read IPC message")?;
195 if n == 0 {
196 anyhow::bail!("connection closed before a message was received");
197 }
198 if !line.ends_with('\n') && n as u64 >= MAX_FRAME {
199 anyhow::bail!("IPC message exceeds {MAX_FRAME} bytes");
200 }
201 serde_json::from_str(line.trim_end()).context("deserialize IPC message")
202}
203
204fn bounded(stream: &mut Stream) -> BufReader<std::io::Take<&mut Stream>> {
206 BufReader::new(stream.take(MAX_FRAME))
207}
208
209fn expect_ack(resp: Response) -> Result<()> {
211 match resp {
212 Response::Ack => Ok(()),
213 Response::Error { message } => anyhow::bail!("daemon error: {message}"),
214 _ => anyhow::bail!("unexpected response (wanted Ack)"),
215 }
216}
217
218fn round_trip(req: &Request) -> Result<Response> {
220 let name = make_name()?;
221 let mut stream =
222 Stream::connect(name).context("connect to kintsugi daemon (is it running?)")?;
223 write_message(&mut stream, req)?;
224 let mut reader = bounded(&mut stream);
225 read_message(&mut reader)
226}
227
228pub struct Client;
230
231impl Client {
232 pub fn send(cmd: &ProposedCommand) -> Result<Verdict> {
234 match round_trip(&Request::Propose(cmd.clone()))? {
235 Response::Verdict(v) => Ok(v),
236 Response::Error { message } => anyhow::bail!("daemon error: {message}"),
237 _ => anyhow::bail!("unexpected response to Propose"),
238 }
239 }
240
241 pub fn resolve(resolution: &Resolution) -> Result<()> {
243 expect_ack(round_trip(&Request::Resolve(resolution.clone()))?)
244 }
245
246 pub fn observe(observation: &Observation) -> Result<()> {
248 expect_ack(round_trip(&Request::Observe(observation.clone()))?)
249 }
250
251 pub fn record(cmd: &ProposedCommand) -> Result<()> {
253 expect_ack(round_trip(&Request::Record(cmd.clone()))?)
254 }
255
256 pub fn list_pending() -> Result<Vec<kintsugi_core::PendingItem>> {
258 match round_trip(&Request::ListPending)? {
259 Response::PendingList { items } => Ok(items),
260 Response::Error { message } => anyhow::bail!("daemon error: {message}"),
261 _ => anyhow::bail!("unexpected response to ListPending"),
262 }
263 }
264
265 pub fn pending_status(id: &str) -> Result<String> {
267 match round_trip(&Request::PendingStatus { id: id.to_string() })? {
268 Response::Pending { status } => Ok(status),
269 Response::Error { message } => anyhow::bail!("daemon error: {message}"),
270 _ => anyhow::bail!("unexpected response to PendingStatus"),
271 }
272 }
273
274 pub fn approve(id: &str) -> Result<()> {
276 expect_ack(round_trip(&Request::Approve { id: id.to_string() })?)
277 }
278
279 pub fn deny(id: &str) -> Result<()> {
281 expect_ack(round_trip(&Request::Deny { id: id.to_string() })?)
282 }
283
284 pub fn status_scorer() -> Result<String> {
287 match round_trip(&Request::Status)? {
288 Response::Status { scorer } => Ok(scorer),
289 Response::Error { message } => anyhow::bail!("daemon error: {message}"),
290 _ => anyhow::bail!("unexpected response to Status"),
291 }
292 }
293
294 pub fn auth_begin(op: &str) -> Result<(bool, String, String, kintsugi_core::admin::KdfParams)> {
296 match round_trip(&Request::AuthBegin { op: op.to_string() })? {
297 Response::Challenge {
298 locked,
299 nonce,
300 salt,
301 params,
302 } => Ok((locked, nonce, salt, params)),
303 Response::Error { message } => anyhow::bail!("daemon error: {message}"),
304 _ => anyhow::bail!("unexpected response to AuthBegin"),
305 }
306 }
307
308 pub fn shutdown(op: &str, nonce: &str, proof: &str) -> Result<()> {
311 expect_ack(round_trip(&Request::Shutdown {
312 op: op.to_string(),
313 nonce: nonce.to_string(),
314 proof: proof.to_string(),
315 })?)
316 }
317
318 pub fn is_daemon_running() -> bool {
320 match make_name() {
321 Ok(name) => Stream::connect(name).is_ok(),
322 Err(_) => false,
323 }
324 }
325}
326
327pub struct Server {
329 listener: interprocess::local_socket::Listener,
330}
331
332impl Server {
333 pub fn bind() -> Result<Self> {
335 #[cfg(unix)]
336 {
337 let path = socket_path();
338 if let Some(parent) = path.parent() {
341 let _ = std::fs::create_dir_all(parent);
342 set_mode(parent, 0o700);
343 }
344 if path.exists() {
345 let _ = std::fs::remove_file(&path);
346 }
347 }
348 let name = make_name()?;
349 let listener = ListenerOptions::new()
350 .name(name)
351 .create_sync()
352 .context("bind kintsugi daemon socket")?;
353 #[cfg(unix)]
356 set_mode(&socket_path(), 0o600);
357 Ok(Self { listener })
358 }
359
360 pub fn endpoint() -> PathBuf {
362 socket_path()
363 }
364
365 pub fn serve<F>(self, mut handler: F) -> Result<()>
367 where
368 F: FnMut(Request) -> Response,
369 {
370 for incoming in self.listener.incoming() {
371 let stream = match incoming {
372 Ok(s) => s,
373 Err(e) => {
374 eprintln!("kintsugi-daemon: accept error: {e}");
375 continue;
376 }
377 };
378 if let Err(e) = Self::handle_one(stream, &mut handler) {
379 eprintln!("kintsugi-daemon: connection error: {e}");
380 }
381 }
382 Ok(())
383 }
384
385 pub fn serve_until<F, S>(self, mut handler: F, stop: S) -> Result<()>
388 where
389 F: FnMut(Request) -> Response,
390 S: Fn() -> bool,
391 {
392 for incoming in self.listener.incoming() {
393 let stream = match incoming {
394 Ok(s) => s,
395 Err(e) => {
396 eprintln!("kintsugi-daemon: accept error: {e}");
397 continue;
398 }
399 };
400 if let Err(e) = Self::handle_one(stream, &mut handler) {
401 eprintln!("kintsugi-daemon: connection error: {e}");
402 }
403 if stop() {
404 break;
405 }
406 }
407 Ok(())
408 }
409
410 pub fn serve_n<F>(self, count: usize, mut handler: F) -> Result<()>
412 where
413 F: FnMut(Request) -> Response,
414 {
415 if count == 0 {
416 return Ok(());
417 }
418 let mut served = 0;
419 for incoming in self.listener.incoming() {
420 let stream = incoming.context("accept connection")?;
421 Self::handle_one(stream, &mut handler)?;
422 served += 1;
423 if served >= count {
424 break;
425 }
426 }
427 Ok(())
428 }
429
430 fn handle_one<F>(mut stream: Stream, handler: &mut F) -> Result<()>
431 where
432 F: FnMut(Request) -> Response,
433 {
434 let req: Request = {
435 let mut reader = bounded(&mut stream);
436 read_message(&mut reader)?
437 };
438 let resp = handler(req);
439 write_message(&mut stream, &resp)?;
440 Ok(())
441 }
442}