Skip to main content

spudkit_core/
lib.rs

1mod spud;
2pub use spud::Spud;
3
4use anyhow::Context;
5use http_body_util::{BodyExt, Full};
6use hyper::body::Bytes;
7use hyper::{Method, Request};
8use hyper_util::client::legacy::Client;
9use hyperlocal::{UnixClientExt, UnixConnector, Uri};
10
11/// A tagged event in the spudkit protocol.
12/// Used on both the server side (creating events from container output)
13/// and the client side (parsing events from SSE streams).
14pub enum SseEvent {
15    Started {
16        call_id: String,
17    },
18    Output(serde_json::Value),
19    Error(serde_json::Value),
20    Custom {
21        event: String,
22        data: serde_json::Value,
23    },
24    End,
25}
26
27impl SseEvent {
28    /// Create an event from a raw stdout line.
29    /// If the line is already tagged JSON (has an "event" field), it is parsed as-is.
30    /// Otherwise, it is wrapped as an Output event.
31    pub fn from_stdout(line: &str) -> Self {
32        Self::from_line(line, "output")
33    }
34
35    /// Create an event from a raw stderr line.
36    /// If the line is already tagged JSON (has an "event" field), it is parsed as-is.
37    /// Otherwise, it is wrapped as an Error event.
38    pub fn from_stderr(line: &str) -> Self {
39        Self::from_line(line, "error")
40    }
41
42    fn from_line(line: &str, default_event: &str) -> Self {
43        if let Ok(parsed) = serde_json::from_str::<serde_json::Value>(line) {
44            if let Some(event) = parsed.get("event").and_then(|e| e.as_str()) {
45                let data = parsed.get("data").cloned().unwrap_or(parsed.clone());
46                return match event {
47                    "started" => {
48                        let call_id = parsed["data"]["call_id"].as_str().unwrap_or("").to_string();
49                        Self::Started { call_id }
50                    }
51                    "end" => Self::End,
52                    "error" => Self::Error(data),
53                    "output" => Self::Output(data),
54                    _ => Self::Custom {
55                        event: event.to_string(),
56                        data,
57                    },
58                };
59            }
60            if default_event == "error" {
61                return Self::Error(parsed);
62            }
63            return Self::Output(parsed);
64        }
65        let text = serde_json::Value::String(line.to_string());
66        if default_event == "error" {
67            Self::Error(text)
68        } else {
69            Self::Output(text)
70        }
71    }
72
73    /// Format the event's data for human-readable display.
74    /// Strings are returned unwrapped, everything else as JSON.
75    pub fn display_data(&self) -> Option<String> {
76        let data = match self {
77            Self::Output(d) | Self::Error(d) | Self::Custom { data: d, .. } => d,
78            Self::Started { .. } | Self::End => return None,
79        };
80        Some(match data {
81            serde_json::Value::String(s) => s.clone(),
82            other => other.to_string(),
83        })
84    }
85
86    /// Serialize the event to a JSON string suitable for SSE data.
87    pub fn to_json(&self) -> String {
88        let value = match self {
89            Self::Started { call_id } => {
90                serde_json::json!({"event": "started", "data": {"call_id": call_id}})
91            }
92            Self::Output(data) => serde_json::json!({"event": "output", "data": data}),
93            Self::Error(data) => serde_json::json!({"event": "error", "data": data}),
94            Self::Custom { event, data } => serde_json::json!({"event": event, "data": data}),
95            Self::End => serde_json::json!({"event": "end"}),
96        };
97        value.to_string()
98    }
99}
100
101/// A connection to a Unix socket endpoint.
102#[derive(Clone)]
103pub struct SpudkitConnection {
104    path: String,
105}
106
107impl SpudkitConnection {
108    pub fn new(path: impl Into<String>) -> Self {
109        Self { path: path.into() }
110    }
111
112    /// Send an HTTP request and return the raw streaming response.
113    async fn request_raw(
114        &self,
115        method: &str,
116        path: &str,
117        body: Option<&[u8]>,
118        extra_headers: &[(&str, &str)],
119    ) -> anyhow::Result<hyper::Response<hyper::body::Incoming>> {
120        let client: Client<UnixConnector, Full<Bytes>> = Client::unix();
121        let uri: hyper::Uri = Uri::new(&self.path, path).into();
122        let method: Method = method.parse().context("invalid HTTP method")?;
123        let body_bytes = body.unwrap_or(&[]);
124
125        let has_content_type = extra_headers
126            .iter()
127            .any(|(k, _)| k.eq_ignore_ascii_case("content-type"));
128
129        let mut builder = Request::builder()
130            .method(method)
131            .uri(uri)
132            .header("Host", "localhost");
133
134        if !has_content_type {
135            builder = builder.header("Content-Type", "application/json");
136        }
137
138        for (key, value) in extra_headers {
139            builder = builder.header(*key, *value);
140        }
141
142        let request = builder
143            .body(Full::new(Bytes::copy_from_slice(body_bytes)))
144            .context("failed to build request")?;
145
146        client.request(request).await.context("request failed")
147    }
148
149    /// Send an HTTP request and return the full response body.
150    pub async fn fetch(
151        &self,
152        method: &str,
153        path: &str,
154        body: Option<&[u8]>,
155    ) -> anyhow::Result<Vec<u8>> {
156        self.fetch_with_headers(method, path, body, &[]).await
157    }
158
159    /// Send an HTTP request with custom headers and return the full response body.
160    pub async fn fetch_with_headers(
161        &self,
162        method: &str,
163        path: &str,
164        body: Option<&[u8]>,
165        headers: &[(&str, &str)],
166    ) -> anyhow::Result<Vec<u8>> {
167        let response = self.request_raw(method, path, body, headers).await?;
168
169        let body = response
170            .into_body()
171            .collect()
172            .await
173            .context("failed to read response body")?
174            .to_bytes();
175
176        Ok(body.to_vec())
177    }
178
179    /// Stream raw SSE data lines. Calls `on_line` for each `data:` line
180    /// (with prefix stripped). Sends `{"event":"end"}` when the stream closes.
181    async fn stream_raw(
182        &self,
183        method: &str,
184        path: &str,
185        body: Option<&[u8]>,
186        mut on_line: impl FnMut(&str),
187    ) -> anyhow::Result<()> {
188        let response = self
189            .request_raw(method, path, body, &[])
190            .await
191            .context("failed to connect")?;
192
193        let mut body = response.into_body();
194        let mut buffer = String::new();
195
196        while let Some(result) = body.frame().await {
197            match result {
198                Ok(frame) => {
199                    if let Some(data) = frame.data_ref() {
200                        buffer.push_str(&String::from_utf8_lossy(data));
201                        process_buffer(&mut buffer, &mut on_line);
202                    }
203                }
204                Err(_) => break,
205            }
206        }
207
208        on_line(r#"{"event":"end"}"#);
209
210        Ok(())
211    }
212
213    /// Stream SSE events. Calls `on_event` for each parsed event.
214    pub async fn stream(
215        &self,
216        method: &str,
217        path: &str,
218        body: Option<&[u8]>,
219        mut on_event: impl FnMut(SseEvent),
220    ) -> anyhow::Result<()> {
221        self.stream_raw(method, path, body, |data| {
222            if let Some(event) = parse_sse_line(data) {
223                on_event(event);
224            }
225        })
226        .await
227    }
228}
229
230fn parse_sse_line(data: &str) -> Option<SseEvent> {
231    Some(SseEvent::from_stdout(data))
232}
233
234fn process_buffer(buffer: &mut String, on_line: &mut impl FnMut(&str)) {
235    while let Some(pos) = buffer.find('\n') {
236        let line = buffer[..pos].to_string();
237        *buffer = buffer[pos + 1..].to_string();
238
239        if let Some(data) = line.strip_prefix("data:") {
240            let data = data.trim();
241            if !data.is_empty() {
242                on_line(data);
243            }
244        }
245    }
246}