Skip to main content

vs_cli/
client.rs

1//! Synchronous local-socket client for the daemon wire protocol.
2//!
3//! The CLI is short-lived; one connection per invocation, no async.
4//! [`Client`] connects (AF_UNIX socket on Unix, named pipe on
5//! Windows — both via [`interprocess`]), sends one request line,
6//! reads response lines up to and including the blank-line
7//! terminator, and exposes the parsed warnings + envelope + body to
8//! the caller.
9
10use std::io::{BufRead, BufReader, Write};
11use std::path::{Path, PathBuf};
12use std::time::{Duration, Instant};
13
14use anyhow::{Context as _, Result};
15use interprocess::local_socket::{prelude::*, Stream};
16use vs_protocol::{Envelope, Request, ResponseHead, Warning};
17
18/// One CLI ↔ daemon connection.
19pub struct Client {
20    socket: PathBuf,
21    stream: BufReader<Stream>,
22}
23
24impl Client {
25    /// Connect to the daemon at `socket`. Returns the connected client
26    /// or an error if the socket is missing / unreachable.
27    pub fn connect(socket: impl Into<PathBuf>) -> Result<Self> {
28        let socket = socket.into();
29        let name = vs_daemon::transport::path_to_name(&socket)
30            .with_context(|| format!("derive ipc name for {}", socket.display()))?;
31        let stream =
32            Stream::connect(name).with_context(|| format!("connect {}", socket.display()))?;
33        Ok(Self {
34            socket,
35            stream: BufReader::new(stream),
36        })
37    }
38
39    /// Connect, retrying for `timeout` if the socket is missing —
40    /// useful immediately after spawning the daemon.
41    pub fn connect_with_retry(socket: impl AsRef<Path>, timeout: Duration) -> Result<Self> {
42        let deadline = Instant::now() + timeout;
43        let mut last_err = anyhow::anyhow!("connect: socket missing");
44        loop {
45            match Self::connect(socket.as_ref()) {
46                Ok(c) => return Ok(c),
47                Err(e) => {
48                    last_err = e;
49                    if Instant::now() >= deadline {
50                        break;
51                    }
52                    std::thread::sleep(Duration::from_millis(50));
53                }
54            }
55        }
56        Err(last_err)
57    }
58
59    /// Send `req` and read one full response.
60    pub fn call(&mut self, req: &Request) -> Result<Response> {
61        // Send.
62        let line = req.encode();
63        self.stream
64            .get_mut()
65            .write_all(line.as_bytes())
66            .context("write request")?;
67        self.stream.get_mut().flush().context("flush request")?;
68
69        // Read until blank-line terminator.
70        let mut warnings: Vec<Warning> = Vec::new();
71        let mut envelope: Option<Envelope> = None;
72        let mut body_lines: Vec<String> = Vec::new();
73        loop {
74            let mut buf = String::new();
75            let n = self.stream.read_line(&mut buf).context("read line")?;
76            if n == 0 {
77                anyhow::bail!("daemon closed connection");
78            }
79            // Strip trailing \n (or \r\n defensively).
80            if buf.ends_with('\n') {
81                buf.pop();
82                if buf.ends_with('\r') {
83                    buf.pop();
84                }
85            }
86            if buf.is_empty() {
87                if envelope.is_some() {
88                    break;
89                }
90                continue;
91            }
92            if envelope.is_none() {
93                if let Some(rest) = buf.strip_prefix('?') {
94                    let _ = rest;
95                    warnings.push(Warning::parse(&buf)?);
96                    continue;
97                }
98                if buf.starts_with('@') || buf.starts_with('!') {
99                    envelope = Some(Envelope::parse(&buf)?);
100                    continue;
101                }
102                anyhow::bail!("expected ?/@/! envelope, got: {buf}");
103            }
104            body_lines.push(buf);
105        }
106
107        Ok(Response {
108            warnings,
109            envelope: envelope.expect("breaks only after envelope"),
110            body: body_lines,
111        })
112    }
113
114    #[must_use]
115    pub fn socket(&self) -> &Path {
116        &self.socket
117    }
118}
119
120/// One full response: warnings + envelope + body lines.
121#[derive(Debug, Clone, PartialEq, Eq)]
122pub struct Response {
123    pub warnings: Vec<Warning>,
124    pub envelope: Envelope,
125    pub body: Vec<String>,
126}
127
128impl Response {
129    /// Return the response head, useful when the body is irrelevant
130    /// (e.g. for re-encoding to print to the user).
131    #[must_use]
132    pub fn head(&self) -> ResponseHead {
133        ResponseHead {
134            warnings: self.warnings.clone(),
135            envelope: self.envelope.clone(),
136        }
137    }
138
139    /// True if the envelope is a success.
140    #[must_use]
141    pub fn is_ok(&self) -> bool {
142        matches!(self.envelope, Envelope::Success(_))
143    }
144
145    /// Re-emit the response in canonical wire form, including the
146    /// trailing blank line. Used by `--json=off` (the default) to
147    /// stream the daemon's response to stdout verbatim.
148    #[must_use]
149    pub fn render_wire(&self) -> String {
150        let mut out = self.head().encode();
151        for line in &self.body {
152            out.push_str(line);
153            out.push('\n');
154        }
155        out.push('\n');
156        out
157    }
158}