1mod spud;
2pub use spud::Spud;
3
4use anyhow::Context;
5use http_body_util::{BodyExt, Full};
6use hyper::body::Bytes;
7use hyper::{Method, Request};
8use hyper_util::client::legacy::Client;
9use hyperlocal::{UnixClientExt, UnixConnector, Uri};
10
11pub enum SseEvent {
15 Started {
16 call_id: String,
17 },
18 Output(serde_json::Value),
19 Error(serde_json::Value),
20 Custom {
21 event: String,
22 data: serde_json::Value,
23 },
24 End,
25}
26
27impl SseEvent {
28 pub fn from_stdout(line: &str) -> Self {
32 Self::from_line(line, "output")
33 }
34
35 pub fn from_stderr(line: &str) -> Self {
39 Self::from_line(line, "error")
40 }
41
42 fn from_line(line: &str, default_event: &str) -> Self {
43 if let Ok(parsed) = serde_json::from_str::<serde_json::Value>(line) {
44 if let Some(event) = parsed.get("event").and_then(|e| e.as_str()) {
45 let data = parsed.get("data").cloned().unwrap_or(parsed.clone());
46 return match event {
47 "started" => {
48 let call_id = parsed["data"]["call_id"].as_str().unwrap_or("").to_string();
49 Self::Started { call_id }
50 }
51 "end" => Self::End,
52 "error" => Self::Error(data),
53 "output" => Self::Output(data),
54 _ => Self::Custom {
55 event: event.to_string(),
56 data,
57 },
58 };
59 }
60 if default_event == "error" {
61 return Self::Error(parsed);
62 }
63 return Self::Output(parsed);
64 }
65 let text = serde_json::Value::String(line.to_string());
66 if default_event == "error" {
67 Self::Error(text)
68 } else {
69 Self::Output(text)
70 }
71 }
72
73 pub fn display_data(&self) -> Option<String> {
76 let data = match self {
77 Self::Output(d) | Self::Error(d) | Self::Custom { data: d, .. } => d,
78 Self::Started { .. } | Self::End => return None,
79 };
80 Some(match data {
81 serde_json::Value::String(s) => s.clone(),
82 other => other.to_string(),
83 })
84 }
85
86 pub fn to_json(&self) -> String {
88 let value = match self {
89 Self::Started { call_id } => {
90 serde_json::json!({"event": "started", "data": {"call_id": call_id}})
91 }
92 Self::Output(data) => serde_json::json!({"event": "output", "data": data}),
93 Self::Error(data) => serde_json::json!({"event": "error", "data": data}),
94 Self::Custom { event, data } => serde_json::json!({"event": event, "data": data}),
95 Self::End => serde_json::json!({"event": "end"}),
96 };
97 value.to_string()
98 }
99}
100
101#[derive(Clone)]
103pub struct SpudkitConnection {
104 path: String,
105}
106
107impl SpudkitConnection {
108 pub fn new(path: impl Into<String>) -> Self {
109 Self { path: path.into() }
110 }
111
112 async fn request_raw(
114 &self,
115 method: &str,
116 path: &str,
117 body: Option<&[u8]>,
118 extra_headers: &[(&str, &str)],
119 ) -> anyhow::Result<hyper::Response<hyper::body::Incoming>> {
120 let client: Client<UnixConnector, Full<Bytes>> = Client::unix();
121 let uri: hyper::Uri = Uri::new(&self.path, path).into();
122 let method: Method = method.parse().context("invalid HTTP method")?;
123 let body_bytes = body.unwrap_or(&[]);
124
125 let has_content_type = extra_headers
126 .iter()
127 .any(|(k, _)| k.eq_ignore_ascii_case("content-type"));
128
129 let mut builder = Request::builder()
130 .method(method)
131 .uri(uri)
132 .header("Host", "localhost");
133
134 if !has_content_type {
135 builder = builder.header("Content-Type", "application/json");
136 }
137
138 for (key, value) in extra_headers {
139 builder = builder.header(*key, *value);
140 }
141
142 let request = builder
143 .body(Full::new(Bytes::copy_from_slice(body_bytes)))
144 .context("failed to build request")?;
145
146 client.request(request).await.context("request failed")
147 }
148
149 pub async fn fetch(
151 &self,
152 method: &str,
153 path: &str,
154 body: Option<&[u8]>,
155 ) -> anyhow::Result<Vec<u8>> {
156 self.fetch_with_headers(method, path, body, &[]).await
157 }
158
159 pub async fn fetch_with_headers(
161 &self,
162 method: &str,
163 path: &str,
164 body: Option<&[u8]>,
165 headers: &[(&str, &str)],
166 ) -> anyhow::Result<Vec<u8>> {
167 let response = self.request_raw(method, path, body, headers).await?;
168
169 let body = response
170 .into_body()
171 .collect()
172 .await
173 .context("failed to read response body")?
174 .to_bytes();
175
176 Ok(body.to_vec())
177 }
178
179 async fn stream_raw(
182 &self,
183 method: &str,
184 path: &str,
185 body: Option<&[u8]>,
186 mut on_line: impl FnMut(&str),
187 ) -> anyhow::Result<()> {
188 let response = self
189 .request_raw(method, path, body, &[])
190 .await
191 .context("failed to connect")?;
192
193 let mut body = response.into_body();
194 let mut buffer = String::new();
195
196 while let Some(result) = body.frame().await {
197 match result {
198 Ok(frame) => {
199 if let Some(data) = frame.data_ref() {
200 buffer.push_str(&String::from_utf8_lossy(data));
201 process_buffer(&mut buffer, &mut on_line);
202 }
203 }
204 Err(_) => break,
205 }
206 }
207
208 on_line(r#"{"event":"end"}"#);
209
210 Ok(())
211 }
212
213 pub async fn stream(
215 &self,
216 method: &str,
217 path: &str,
218 body: Option<&[u8]>,
219 mut on_event: impl FnMut(SseEvent),
220 ) -> anyhow::Result<()> {
221 self.stream_raw(method, path, body, |data| {
222 if let Some(event) = parse_sse_line(data) {
223 on_event(event);
224 }
225 })
226 .await
227 }
228}
229
230fn parse_sse_line(data: &str) -> Option<SseEvent> {
231 Some(SseEvent::from_stdout(data))
232}
233
234fn process_buffer(buffer: &mut String, on_line: &mut impl FnMut(&str)) {
235 while let Some(pos) = buffer.find('\n') {
236 let line = buffer[..pos].to_string();
237 *buffer = buffer[pos + 1..].to_string();
238
239 if let Some(data) = line.strip_prefix("data:") {
240 let data = data.trim();
241 if !data.is_empty() {
242 on_line(data);
243 }
244 }
245 }
246}