1use std::io::{BufRead, BufReader, Read, Write};
11use std::path::PathBuf;
12
13use anyhow::{Context, Result};
14use interprocess::local_socket::prelude::*;
15#[cfg(unix)]
16use interprocess::local_socket::GenericFilePath;
17#[cfg(not(unix))]
18use interprocess::local_socket::GenericNamespaced;
19use interprocess::local_socket::{ListenerOptions, Name, Stream};
20use kintsugi_core::{Decision, ProposedCommand, Verdict};
21use serde::{Deserialize, Serialize};
22
23#[derive(Debug, Clone, Serialize, Deserialize)]
25#[serde(tag = "kind", rename_all = "lowercase")]
26pub enum Request {
27 Propose(ProposedCommand),
29 Resolve(Resolution),
31 Observe(Observation),
35 Record(ProposedCommand),
40 ListPending,
42 PendingStatus { id: String },
46 Approve { id: String },
48 Deny { id: String },
50 Status,
54 AuthBegin { op: String },
59 Shutdown {
61 op: String,
62 nonce: String,
63 proof: String,
64 },
65}
66
67#[derive(Debug, Clone, Serialize, Deserialize)]
69pub struct Observation {
70 #[serde(rename = "change")]
73 pub kind: String,
74 pub path: String,
76}
77
78#[derive(Debug, Clone, Serialize, Deserialize)]
80pub struct Resolution {
81 pub command: ProposedCommand,
83 pub decision: Decision,
85 pub remember: bool,
87}
88
89#[derive(Debug, Clone, Serialize, Deserialize)]
91#[serde(tag = "kind", rename_all = "lowercase")]
92pub enum Response {
93 Verdict(Verdict),
95 Ack,
97 PendingList {
100 items: Vec<kintsugi_core::PendingItem>,
101 },
102 Pending { status: String },
105 Status { scorer: String },
108 Challenge {
112 locked: bool,
113 nonce: String,
114 salt: String,
115 params: kintsugi_core::admin::KdfParams,
116 },
117 Error { message: String },
119}
120
121pub fn socket_path() -> PathBuf {
126 if let Ok(p) = std::env::var("KINTSUGI_SOCKET") {
127 return PathBuf::from(p);
128 }
129 #[cfg(unix)]
130 {
131 if let Ok(rt) = std::env::var("XDG_RUNTIME_DIR") {
133 if !rt.is_empty() {
134 return PathBuf::from(rt).join("kintsugi.sock");
135 }
136 }
137 if let Some(dirs) = directories::ProjectDirs::from("", "", "kintsugi") {
141 return dirs.data_dir().join("kintsugi.sock");
142 }
143 std::env::temp_dir().join("kintsugi.sock")
144 }
145 #[cfg(not(unix))]
146 {
147 PathBuf::from(r"\\.\pipe\kintsugi")
148 }
149}
150
151#[cfg(unix)]
154pub(crate) fn set_mode(path: &std::path::Path, mode: u32) {
155 use std::os::unix::fs::PermissionsExt;
156 let _ = std::fs::set_permissions(path, std::fs::Permissions::from_mode(mode));
157}
158
159fn make_name() -> Result<Name<'static>> {
161 let path = socket_path();
162 #[cfg(unix)]
163 {
164 path.clone()
165 .to_fs_name::<GenericFilePath>()
166 .with_context(|| format!("invalid socket path {}", path.display()))
167 }
168 #[cfg(not(unix))]
169 {
170 let _ = &path;
171 "kintsugi"
172 .to_ns_name::<GenericNamespaced>()
173 .context("invalid namespaced pipe name")
174 }
175}
176
177fn write_message<W: Write, T: Serialize>(w: &mut W, value: &T) -> Result<()> {
179 let mut line = serde_json::to_string(value).context("serialize IPC message")?;
180 line.push('\n');
181 w.write_all(line.as_bytes()).context("write IPC message")?;
182 w.flush().context("flush IPC message")?;
183 Ok(())
184}
185
186pub const MAX_FRAME: u64 = 16 * 1024 * 1024;
190
191fn read_message<R: BufRead, T: serde::de::DeserializeOwned>(r: &mut R) -> Result<T> {
193 let mut line = String::new();
194 let n = r.read_line(&mut line).context("read IPC message")?;
195 if n == 0 {
196 anyhow::bail!("connection closed before a message was received");
197 }
198 if !line.ends_with('\n') && n as u64 >= MAX_FRAME {
199 anyhow::bail!("IPC message exceeds {MAX_FRAME} bytes");
200 }
201 serde_json::from_str(line.trim_end()).context("deserialize IPC message")
202}
203
204fn bounded(stream: &mut Stream) -> BufReader<std::io::Take<&mut Stream>> {
206 BufReader::new(stream.take(MAX_FRAME))
207}
208
209fn expect_ack(resp: Response) -> Result<()> {
211 match resp {
212 Response::Ack => Ok(()),
213 Response::Error { message } => anyhow::bail!("daemon error: {message}"),
214 _ => anyhow::bail!("unexpected response (wanted Ack)"),
215 }
216}
217
218fn round_trip(req: &Request) -> Result<Response> {
220 let name = make_name()?;
221 let mut stream =
222 Stream::connect(name).context("connect to kintsugi daemon (is it running?)")?;
223 write_message(&mut stream, req)?;
224 let mut reader = bounded(&mut stream);
225 read_message(&mut reader)
226}
227
228pub struct Client;
230
231impl Client {
232 pub fn send(cmd: &ProposedCommand) -> Result<Verdict> {
234 match round_trip(&Request::Propose(cmd.clone()))? {
235 Response::Verdict(v) => Ok(v),
236 Response::Error { message } => anyhow::bail!("daemon error: {message}"),
237 _ => anyhow::bail!("unexpected response to Propose"),
238 }
239 }
240
241 pub fn resolve(resolution: &Resolution) -> Result<()> {
243 expect_ack(round_trip(&Request::Resolve(resolution.clone()))?)
244 }
245
246 pub fn observe(observation: &Observation) -> Result<()> {
248 expect_ack(round_trip(&Request::Observe(observation.clone()))?)
249 }
250
251 pub fn record(cmd: &ProposedCommand) -> Result<()> {
253 expect_ack(round_trip(&Request::Record(cmd.clone()))?)
254 }
255
256 pub fn list_pending() -> Result<Vec<kintsugi_core::PendingItem>> {
258 match round_trip(&Request::ListPending)? {
259 Response::PendingList { items } => Ok(items),
260 Response::Error { message } => anyhow::bail!("daemon error: {message}"),
261 _ => anyhow::bail!("unexpected response to ListPending"),
262 }
263 }
264
265 pub fn pending_status(id: &str) -> Result<String> {
267 match round_trip(&Request::PendingStatus { id: id.to_string() })? {
268 Response::Pending { status } => Ok(status),
269 Response::Error { message } => anyhow::bail!("daemon error: {message}"),
270 _ => anyhow::bail!("unexpected response to PendingStatus"),
271 }
272 }
273
274 pub fn approve(id: &str) -> Result<()> {
276 expect_ack(round_trip(&Request::Approve { id: id.to_string() })?)
277 }
278
279 pub fn deny(id: &str) -> Result<()> {
281 expect_ack(round_trip(&Request::Deny { id: id.to_string() })?)
282 }
283
284 pub fn status_scorer() -> Result<String> {
287 match round_trip(&Request::Status)? {
288 Response::Status { scorer } => Ok(scorer),
289 Response::Error { message } => anyhow::bail!("daemon error: {message}"),
290 _ => anyhow::bail!("unexpected response to Status"),
291 }
292 }
293
294 pub fn auth_begin(op: &str) -> Result<(bool, String, String, kintsugi_core::admin::KdfParams)> {
296 match round_trip(&Request::AuthBegin { op: op.to_string() })? {
297 Response::Challenge {
298 locked,
299 nonce,
300 salt,
301 params,
302 } => Ok((locked, nonce, salt, params)),
303 Response::Error { message } => anyhow::bail!("daemon error: {message}"),
304 _ => anyhow::bail!("unexpected response to AuthBegin"),
305 }
306 }
307
308 pub fn shutdown(op: &str, nonce: &str, proof: &str) -> Result<()> {
311 expect_ack(round_trip(&Request::Shutdown {
312 op: op.to_string(),
313 nonce: nonce.to_string(),
314 proof: proof.to_string(),
315 })?)
316 }
317
318 pub fn is_daemon_running() -> bool {
320 match make_name() {
321 Ok(name) => Stream::connect(name).is_ok(),
322 Err(_) => false,
323 }
324 }
325}
326
327pub struct Server {
329 listener: interprocess::local_socket::Listener,
330}
331
332impl Server {
333 pub fn bind() -> Result<Self> {
335 if Client::is_daemon_running() {
339 anyhow::bail!(
340 "a kintsugi daemon is already running on {}",
341 socket_path().display()
342 );
343 }
344 #[cfg(unix)]
345 {
346 let path = socket_path();
347 if let Some(parent) = path.parent() {
350 let _ = std::fs::create_dir_all(parent);
351 set_mode(parent, 0o700);
352 }
353 if path.exists() {
354 let _ = std::fs::remove_file(&path);
355 }
356 }
357 let name = make_name()?;
358 let listener = ListenerOptions::new()
359 .name(name)
360 .create_sync()
361 .context("bind kintsugi daemon socket")?;
362 #[cfg(unix)]
365 set_mode(&socket_path(), 0o600);
366 Ok(Self { listener })
367 }
368
369 pub fn endpoint() -> PathBuf {
371 socket_path()
372 }
373
374 pub fn serve<F>(self, mut handler: F) -> Result<()>
376 where
377 F: FnMut(Request) -> Response,
378 {
379 for incoming in self.listener.incoming() {
380 let stream = match incoming {
381 Ok(s) => s,
382 Err(e) => {
383 eprintln!("kintsugi-daemon: accept error: {e}");
384 continue;
385 }
386 };
387 if let Err(e) = Self::handle_one(stream, &mut handler) {
388 eprintln!("kintsugi-daemon: connection error: {e}");
389 }
390 }
391 Ok(())
392 }
393
394 pub fn serve_until<F, S>(self, mut handler: F, stop: S) -> Result<()>
397 where
398 F: FnMut(Request) -> Response,
399 S: Fn() -> bool,
400 {
401 for incoming in self.listener.incoming() {
402 let stream = match incoming {
403 Ok(s) => s,
404 Err(e) => {
405 eprintln!("kintsugi-daemon: accept error: {e}");
406 continue;
407 }
408 };
409 if let Err(e) = Self::handle_one(stream, &mut handler) {
410 eprintln!("kintsugi-daemon: connection error: {e}");
411 }
412 if stop() {
413 break;
414 }
415 }
416 Ok(())
417 }
418
419 pub fn serve_n<F>(self, count: usize, mut handler: F) -> Result<()>
421 where
422 F: FnMut(Request) -> Response,
423 {
424 if count == 0 {
425 return Ok(());
426 }
427 let mut served = 0;
428 for incoming in self.listener.incoming() {
429 let stream = incoming.context("accept connection")?;
430 Self::handle_one(stream, &mut handler)?;
431 served += 1;
432 if served >= count {
433 break;
434 }
435 }
436 Ok(())
437 }
438
439 fn handle_one<F>(mut stream: Stream, handler: &mut F) -> Result<()>
440 where
441 F: FnMut(Request) -> Response,
442 {
443 let req: Request = {
444 let mut reader = bounded(&mut stream);
445 read_message(&mut reader)?
446 };
447 let resp = handler(req);
448 write_message(&mut stream, &resp)?;
449 Ok(())
450 }
451}