1use std::io::{BufRead, BufReader, Write};
11use std::path::{Path, PathBuf};
12use std::time::{Duration, Instant};
13
14use anyhow::{Context as _, Result};
15use interprocess::local_socket::{prelude::*, Stream};
16use vs_protocol::{Envelope, Request, ResponseHead, Warning};
17
18pub struct Client {
20 socket: PathBuf,
21 stream: BufReader<Stream>,
22}
23
24impl Client {
25 pub fn connect(socket: impl Into<PathBuf>) -> Result<Self> {
28 let socket = socket.into();
29 let name = vs_daemon::transport::path_to_name(&socket)
30 .with_context(|| format!("derive ipc name for {}", socket.display()))?;
31 let stream =
32 Stream::connect(name).with_context(|| format!("connect {}", socket.display()))?;
33 Ok(Self {
34 socket,
35 stream: BufReader::new(stream),
36 })
37 }
38
39 pub fn connect_with_retry(socket: impl AsRef<Path>, timeout: Duration) -> Result<Self> {
42 let deadline = Instant::now() + timeout;
43 let mut last_err = anyhow::anyhow!("connect: socket missing");
44 loop {
45 match Self::connect(socket.as_ref()) {
46 Ok(c) => return Ok(c),
47 Err(e) => {
48 last_err = e;
49 if Instant::now() >= deadline {
50 break;
51 }
52 std::thread::sleep(Duration::from_millis(50));
53 }
54 }
55 }
56 Err(last_err)
57 }
58
59 pub fn call(&mut self, req: &Request) -> Result<Response> {
61 let line = req.encode();
63 self.stream
64 .get_mut()
65 .write_all(line.as_bytes())
66 .context("write request")?;
67 self.stream.get_mut().flush().context("flush request")?;
68
69 let mut warnings: Vec<Warning> = Vec::new();
71 let mut envelope: Option<Envelope> = None;
72 let mut body_lines: Vec<String> = Vec::new();
73 loop {
74 let mut buf = String::new();
75 let n = self.stream.read_line(&mut buf).context("read line")?;
76 if n == 0 {
77 anyhow::bail!("daemon closed connection");
78 }
79 if buf.ends_with('\n') {
81 buf.pop();
82 if buf.ends_with('\r') {
83 buf.pop();
84 }
85 }
86 if buf.is_empty() {
87 if envelope.is_some() {
88 break;
89 }
90 continue;
91 }
92 if envelope.is_none() {
93 if let Some(rest) = buf.strip_prefix('?') {
94 let _ = rest;
95 warnings.push(Warning::parse(&buf)?);
96 continue;
97 }
98 if buf.starts_with('@') || buf.starts_with('!') {
99 envelope = Some(Envelope::parse(&buf)?);
100 continue;
101 }
102 anyhow::bail!("expected ?/@/! envelope, got: {buf}");
103 }
104 body_lines.push(buf);
105 }
106
107 Ok(Response {
108 warnings,
109 envelope: envelope.expect("breaks only after envelope"),
110 body: body_lines,
111 })
112 }
113
114 #[must_use]
115 pub fn socket(&self) -> &Path {
116 &self.socket
117 }
118}
119
120#[derive(Debug, Clone, PartialEq, Eq)]
122pub struct Response {
123 pub warnings: Vec<Warning>,
124 pub envelope: Envelope,
125 pub body: Vec<String>,
126}
127
128impl Response {
129 #[must_use]
132 pub fn head(&self) -> ResponseHead {
133 ResponseHead {
134 warnings: self.warnings.clone(),
135 envelope: self.envelope.clone(),
136 }
137 }
138
139 #[must_use]
141 pub fn is_ok(&self) -> bool {
142 matches!(self.envelope, Envelope::Success(_))
143 }
144
145 #[must_use]
149 pub fn render_wire(&self) -> String {
150 let mut out = self.head().encode();
151 for line in &self.body {
152 out.push_str(line);
153 out.push('\n');
154 }
155 out.push('\n');
156 out
157 }
158}