Skip to main content

koi_client/
lib.rs

1//! HTTP client for communicating with a running Koi daemon.
2//!
3//! Uses blocking `ureq` - no async runtime dependency on the client path.
4//! All paths use `/v1/mdns/` prefix for mDNS domain routes.
5
6use std::io::{BufRead, BufReader, Read};
7use std::time::Duration;
8
9use hickory_proto::rr::RecordType;
10use koi_common::net::resolve_localhost;
11use koi_common::types::ServiceRecord;
12use koi_health::ServiceCheckKind;
13use koi_mdns::protocol::{
14    AdminRegistration, DaemonStatus, RegisterPayload, RegistrationResult, RenewalResult,
15};
16
17/// TCP connection timeout for general API requests.
18const CONNECT_TIMEOUT: Duration = Duration::from_secs(5);
19
20/// Read timeout for general (non-streaming) API requests.
21const READ_TIMEOUT: Duration = Duration::from_secs(10);
22
23/// Timeout for the fast health check probe.
24const HEALTH_TIMEOUT: Duration = Duration::from_millis(200);
25
26// ── Error types ───────────────────────────────────────────────────
27
28#[derive(Debug, thiserror::Error)]
29pub enum ClientError {
30    #[error("Daemon not reachable: {0}")]
31    Unreachable(String),
32
33    #[error("{error}: {message}")]
34    Api { error: String, message: String },
35
36    #[error("Request failed: {0}")]
37    Transport(String),
38
39    #[error("Invalid response: {0}")]
40    Decode(String),
41}
42
43pub type Result<T> = std::result::Result<T, ClientError>;
44
45// ── Client ────────────────────────────────────────────────────────
46
47/// Header name for Daemon Access Token authentication.
48const DAT_HEADER: &str = "X-Koi-Token";
49
50pub struct KoiClient {
51    endpoint: String,
52    agent: ureq::Agent,
53    /// Daemon Access Token (empty string means no auth).
54    token: String,
55}
56
57impl KoiClient {
58    pub fn new(endpoint: &str) -> Self {
59        let clean = endpoint.trim_end_matches('/');
60        let resolved = resolve_localhost(clean);
61        let agent = ureq::AgentBuilder::new()
62            .timeout_connect(CONNECT_TIMEOUT)
63            .timeout_read(READ_TIMEOUT)
64            .build();
65        Self {
66            endpoint: resolved,
67            agent,
68            token: String::new(),
69        }
70    }
71
72    /// Create a client with a Daemon Access Token for authenticated requests.
73    pub fn with_token(endpoint: &str, token: &str) -> Self {
74        let mut client = Self::new(endpoint);
75        client.token = token.to_string();
76        client
77    }
78
79    /// Create a client from the breadcrumb file (endpoint + token).
80    ///
81    /// Returns `None` if no breadcrumb exists.
82    pub fn from_breadcrumb() -> Option<Self> {
83        let bc = koi_config::breadcrumb::read_breadcrumb()?;
84        Some(Self::with_token(&bc.endpoint, &bc.token))
85    }
86
87    /// Attach the DAT header to a request if a token is present.
88    fn auth_get(&self, url: &str) -> ureq::Request {
89        let req = self.agent.get(url);
90        if self.token.is_empty() {
91            req
92        } else {
93            req.set(DAT_HEADER, &self.token)
94        }
95    }
96
97    /// Attach the DAT header to a POST request.
98    fn auth_post(&self, url: &str) -> ureq::Request {
99        let req = self.agent.post(url);
100        if self.token.is_empty() {
101            req
102        } else {
103            req.set(DAT_HEADER, &self.token)
104        }
105    }
106
107    /// Attach the DAT header to a PUT request.
108    fn auth_put(&self, url: &str) -> ureq::Request {
109        let req = self.agent.put(url);
110        if self.token.is_empty() {
111            req
112        } else {
113            req.set(DAT_HEADER, &self.token)
114        }
115    }
116
117    /// Attach the DAT header to a DELETE request.
118    fn auth_delete(&self, url: &str) -> ureq::Request {
119        let req = self.agent.delete(url);
120        if self.token.is_empty() {
121            req
122        } else {
123            req.set(DAT_HEADER, &self.token)
124        }
125    }
126
127    // ── Health ────────────────────────────────────────────────────
128
129    /// Quick health check with a 200ms timeout.
130    pub fn health(&self) -> Result<()> {
131        let agent = ureq::AgentBuilder::new()
132            .timeout_connect(HEALTH_TIMEOUT)
133            .timeout_read(HEALTH_TIMEOUT)
134            .build();
135        let url = format!("{}/healthz", self.endpoint);
136        agent.get(&url).call().map_err(map_error)?;
137        Ok(())
138    }
139
140    // ── Service operations (mDNS) ──────────────────────────────────
141
142    pub fn register(&self, payload: &RegisterPayload) -> Result<RegistrationResult> {
143        let url = format!("{}/v1/mdns/announce", self.endpoint);
144        let json_val =
145            serde_json::to_value(payload).map_err(|e| ClientError::Decode(e.to_string()))?;
146        let resp = self
147            .auth_post(&url)
148            .send_json(json_val)
149            .map_err(map_error)?;
150        let json: serde_json::Value = resp
151            .into_json()
152            .map_err(|e| ClientError::Decode(e.to_string()))?;
153        extract(&json, "registered")
154    }
155
156    pub fn unregister(&self, id: &str) -> Result<()> {
157        let url = format!("{}/v1/mdns/unregister/{id}", self.endpoint);
158        self.auth_delete(&url).call().map_err(map_error)?;
159        Ok(())
160    }
161
162    pub fn heartbeat(&self, id: &str) -> Result<RenewalResult> {
163        let url = format!("{}/v1/mdns/heartbeat/{id}", self.endpoint);
164        let resp = self.auth_put(&url).send_bytes(&[]).map_err(map_error)?;
165        let json: serde_json::Value = resp
166            .into_json()
167            .map_err(|e| ClientError::Decode(e.to_string()))?;
168        extract(&json, "renewed")
169    }
170
171    pub fn resolve(&self, instance: &str) -> Result<ServiceRecord> {
172        let url = format!("{}/v1/mdns/resolve", self.endpoint);
173        let resp = self
174            .auth_get(&url)
175            .query("name", instance)
176            .call()
177            .map_err(map_error)?;
178        let json: serde_json::Value = resp
179            .into_json()
180            .map_err(|e| ClientError::Decode(e.to_string()))?;
181        extract(&json, "resolved")
182    }
183
184    /// Start a browse SSE stream. Returns an iterator of JSON events.
185    pub fn browse_stream(&self, service_type: &str) -> Result<SseStream> {
186        let url = format!("{}/v1/mdns/discover", self.endpoint);
187        let mut req = self.stream_agent().get(&url);
188        if !self.token.is_empty() {
189            req = req.set(DAT_HEADER, &self.token);
190        }
191        let resp = req.query("type", service_type).call().map_err(map_error)?;
192        Ok(SseStream::new(Box::new(resp.into_reader())))
193    }
194
195    /// Start an events SSE stream. Returns an iterator of JSON events.
196    pub fn events_stream(&self, service_type: &str) -> Result<SseStream> {
197        let url = format!("{}/v1/mdns/subscribe", self.endpoint);
198        let mut req = self.stream_agent().get(&url);
199        if !self.token.is_empty() {
200            req = req.set(DAT_HEADER, &self.token);
201        }
202        let resp = req.query("type", service_type).call().map_err(map_error)?;
203        Ok(SseStream::new(Box::new(resp.into_reader())))
204    }
205
206    // ── Unified status ─────────────────────────────────────────────
207
208    /// Fetch unified status from `/v1/status`.
209    pub fn unified_status(&self) -> Result<serde_json::Value> {
210        let url = format!("{}/v1/status", self.endpoint);
211        let resp = self.auth_get(&url).call().map_err(map_error)?;
212        resp.into_json()
213            .map_err(|e| ClientError::Decode(e.to_string()))
214    }
215
216    // ── DNS operations (Phase 6) ───────────────────────────────────
217
218    pub fn dns_status(&self) -> Result<serde_json::Value> {
219        self.get_json("/v1/dns/status")
220    }
221
222    pub fn dns_lookup(&self, name: &str, record_type: RecordType) -> Result<serde_json::Value> {
223        let url = format!("{}/v1/dns/lookup", self.endpoint);
224        let resp = self
225            .auth_get(&url)
226            .query("name", name)
227            .query("type", record_type_str(record_type))
228            .call()
229            .map_err(map_error)?;
230        resp.into_json()
231            .map_err(|e| ClientError::Decode(e.to_string()))
232    }
233
234    pub fn dns_list(&self) -> Result<serde_json::Value> {
235        self.get_json("/v1/dns/list")
236    }
237
238    pub fn dns_add(&self, name: &str, ip: &str, ttl: Option<u32>) -> Result<serde_json::Value> {
239        let body = serde_json::json!({
240            "name": name,
241            "ip": ip,
242            "ttl": ttl,
243        });
244        self.post_json("/v1/dns/add", &body)
245    }
246
247    pub fn dns_remove(&self, name: &str) -> Result<serde_json::Value> {
248        let url = format!("{}/v1/dns/remove/{}", self.endpoint, name);
249        let resp = self.auth_delete(&url).call().map_err(map_error)?;
250        resp.into_json()
251            .map_err(|e| ClientError::Decode(e.to_string()))
252    }
253
254    pub fn dns_start(&self) -> Result<serde_json::Value> {
255        self.post_json("/v1/dns/serve", &serde_json::json!({}))
256    }
257
258    pub fn dns_stop(&self) -> Result<serde_json::Value> {
259        self.post_json("/v1/dns/stop", &serde_json::json!({}))
260    }
261
262    // ── Health operations (Phase 7) ───────────────────────────────
263
264    pub fn health_status(&self) -> Result<serde_json::Value> {
265        self.get_json("/v1/health/status")
266    }
267
268    pub fn health_add_check(
269        &self,
270        name: &str,
271        kind: ServiceCheckKind,
272        target: &str,
273        interval_secs: u64,
274        timeout_secs: u64,
275    ) -> Result<serde_json::Value> {
276        let body = serde_json::json!({
277            "name": name,
278            "kind": check_kind_str(kind),
279            "target": target,
280            "interval_secs": interval_secs,
281            "timeout_secs": timeout_secs,
282        });
283        self.post_json("/v1/health/add", &body)
284    }
285
286    pub fn health_remove_check(&self, name: &str) -> Result<serde_json::Value> {
287        let url = format!("{}/v1/health/remove/{}", self.endpoint, name);
288        let resp = self.auth_delete(&url).call().map_err(map_error)?;
289        resp.into_json()
290            .map_err(|e| ClientError::Decode(e.to_string()))
291    }
292
293    // ── Proxy operations (Phase 8) ───────────────────────────────
294
295    pub fn proxy_status(&self) -> Result<serde_json::Value> {
296        self.get_json("/v1/proxy/status")
297    }
298
299    pub fn proxy_list(&self) -> Result<serde_json::Value> {
300        self.get_json("/v1/proxy/list")
301    }
302
303    pub fn proxy_add(
304        &self,
305        name: &str,
306        listen_port: u16,
307        backend: &str,
308        allow_remote: bool,
309    ) -> Result<serde_json::Value> {
310        let body = serde_json::json!({
311            "name": name,
312            "listen_port": listen_port,
313            "backend": backend,
314            "allow_remote": allow_remote,
315        });
316        self.post_json("/v1/proxy/add", &body)
317    }
318
319    pub fn proxy_remove(&self, name: &str) -> Result<serde_json::Value> {
320        let url = format!("{}/v1/proxy/remove/{}", self.endpoint, name);
321        let resp = self.auth_delete(&url).call().map_err(map_error)?;
322        resp.into_json()
323            .map_err(|e| ClientError::Decode(e.to_string()))
324    }
325
326    // ── UDP operations ─────────────────────────────────────────────
327
328    pub fn udp_status(&self) -> Result<serde_json::Value> {
329        self.get_json("/v1/udp/status")
330    }
331
332    pub fn udp_bind(&self, port: u16, addr: &str, lease_secs: u64) -> Result<serde_json::Value> {
333        let body = serde_json::json!({
334            "port": port,
335            "addr": addr,
336            "lease_secs": lease_secs,
337        });
338        self.post_json("/v1/udp/bind", &body)
339    }
340
341    pub fn udp_unbind(&self, id: &str) -> Result<serde_json::Value> {
342        let url = format!("{}/v1/udp/bind/{}", self.endpoint, id);
343        let resp = self.auth_delete(&url).call().map_err(map_error)?;
344        resp.into_json()
345            .map_err(|e| ClientError::Decode(e.to_string()))
346    }
347
348    pub fn udp_send(&self, id: &str, dest: &str, payload_b64: &str) -> Result<serde_json::Value> {
349        let body = serde_json::json!({
350            "dest": dest,
351            "payload": payload_b64,
352        });
353        let path = format!("/v1/udp/send/{id}");
354        self.post_json(&path, &body)
355    }
356
357    pub fn udp_heartbeat(&self, id: &str) -> Result<serde_json::Value> {
358        let path = format!("/v1/udp/heartbeat/{id}");
359        self.put_json(&path, &serde_json::json!({}))
360    }
361
362    // ── Generic operations ─────────────────────────────────────────
363
364    /// POST JSON to an arbitrary path and return the response as a JSON value.
365    pub fn post_json(&self, path: &str, body: &serde_json::Value) -> Result<serde_json::Value> {
366        let url = format!("{}{path}", self.endpoint);
367        let resp = self
368            .auth_post(&url)
369            .send_json(body.clone())
370            .map_err(map_error)?;
371        resp.into_json()
372            .map_err(|e| ClientError::Decode(e.to_string()))
373    }
374
375    /// GET JSON from an arbitrary path and return the response as a JSON value.
376    pub fn get_json(&self, path: &str) -> Result<serde_json::Value> {
377        let url = format!("{}{path}", self.endpoint);
378        let resp = self.auth_get(&url).call().map_err(map_error)?;
379        resp.into_json()
380            .map_err(|e| ClientError::Decode(e.to_string()))
381    }
382
383    /// PUT JSON to an arbitrary path and return the response as a JSON value.
384    pub fn put_json(&self, path: &str, body: &serde_json::Value) -> Result<serde_json::Value> {
385        let url = format!("{}{path}", self.endpoint);
386        let resp = self
387            .auth_put(&url)
388            .send_json(body.clone())
389            .map_err(map_error)?;
390        resp.into_json()
391            .map_err(|e| ClientError::Decode(e.to_string()))
392    }
393
394    // ── Admin operations (mDNS) ──────────────────────────────────
395
396    pub fn admin_status(&self) -> Result<DaemonStatus> {
397        let url = format!("{}/v1/mdns/admin/status", self.endpoint);
398        let resp = self.auth_get(&url).call().map_err(map_error)?;
399        resp.into_json()
400            .map_err(|e| ClientError::Decode(e.to_string()))
401    }
402
403    pub fn admin_registrations(&self) -> Result<Vec<AdminRegistration>> {
404        let url = format!("{}/v1/mdns/admin/ls", self.endpoint);
405        let resp = self.auth_get(&url).call().map_err(map_error)?;
406        resp.into_json()
407            .map_err(|e| ClientError::Decode(e.to_string()))
408    }
409
410    pub fn admin_inspect(&self, id: &str) -> Result<AdminRegistration> {
411        let url = format!("{}/v1/mdns/admin/inspect/{id}", self.endpoint);
412        let resp = self.auth_get(&url).call().map_err(map_error)?;
413        resp.into_json()
414            .map_err(|e| ClientError::Decode(e.to_string()))
415    }
416
417    pub fn admin_force_unregister(&self, id: &str) -> Result<()> {
418        let url = format!("{}/v1/mdns/admin/unregister/{id}", self.endpoint);
419        self.auth_delete(&url).call().map_err(map_error)?;
420        Ok(())
421    }
422
423    pub fn admin_drain(&self, id: &str) -> Result<()> {
424        let url = format!("{}/v1/mdns/admin/drain/{id}", self.endpoint);
425        self.auth_post(&url).call().map_err(map_error)?;
426        Ok(())
427    }
428
429    pub fn admin_revive(&self, id: &str) -> Result<()> {
430        let url = format!("{}/v1/mdns/admin/revive/{id}", self.endpoint);
431        self.auth_post(&url).call().map_err(map_error)?;
432        Ok(())
433    }
434
435    // ── Admin operations (system) ────────────────────────────────────
436
437    /// Request a graceful shutdown of the running daemon.
438    pub fn shutdown(&self) -> Result<()> {
439        let url = format!("{}/v1/admin/shutdown", self.endpoint);
440        self.auth_post(&url).call().map_err(map_error)?;
441        Ok(())
442    }
443
444    // ── Certmesh operations (Phase 3) ──────────────────────────────
445
446    /// GET /v1/certmesh/roster - fetch signed roster manifest.
447    pub fn get_roster_manifest(&self) -> Result<serde_json::Value> {
448        let url = format!("{}/v1/certmesh/roster", self.endpoint);
449        let resp = self.auth_get(&url).call().map_err(map_error)?;
450        resp.into_json()
451            .map_err(|e| ClientError::Decode(e.to_string()))
452    }
453
454    /// POST /v1/certmesh/renew - push renewed cert to a member.
455    ///
456    /// `member_endpoint` is the member's HTTP endpoint, not the CA's.
457    /// Used when the primary pushes renewals to remote members.
458    #[allow(dead_code)]
459    pub fn push_renewal(
460        &self,
461        member_endpoint: &str,
462        request: &serde_json::Value,
463    ) -> Result<serde_json::Value> {
464        let url = format!("{member_endpoint}/v1/certmesh/renew");
465        let resp = self
466            .auth_post(&url)
467            .send_json(request.clone())
468            .map_err(map_error)?;
469        resp.into_json()
470            .map_err(|e| ClientError::Decode(e.to_string()))
471    }
472
473    /// POST /v1/certmesh/health - send health heartbeat.
474    pub fn health_heartbeat(&self, request: &serde_json::Value) -> Result<serde_json::Value> {
475        let url = format!("{}/v1/certmesh/health", self.endpoint);
476        let resp = self
477            .auth_post(&url)
478            .send_json(request.clone())
479            .map_err(map_error)?;
480        resp.into_json()
481            .map_err(|e| ClientError::Decode(e.to_string()))
482    }
483
484    // ── Private helpers ───────────────────────────────────────────
485
486    /// Agent without read timeout for SSE streams.
487    fn stream_agent(&self) -> ureq::Agent {
488        ureq::AgentBuilder::new()
489            .timeout_connect(CONNECT_TIMEOUT)
490            .build()
491    }
492}
493
494// ── SSE Stream ────────────────────────────────────────────────────
495
496/// Iterator over Server-Sent Events from the Koi daemon.
497///
498/// Parses `data: <json>` lines, skipping empty lines and event metadata.
499pub struct SseStream {
500    reader: BufReader<Box<dyn Read + Send>>,
501}
502
503impl SseStream {
504    fn new(reader: Box<dyn Read + Send>) -> Self {
505        Self {
506            reader: BufReader::new(reader),
507        }
508    }
509}
510
511impl Iterator for SseStream {
512    type Item = Result<serde_json::Value>;
513
514    fn next(&mut self) -> Option<Self::Item> {
515        loop {
516            let mut line = String::new();
517            match self.reader.read_line(&mut line) {
518                Ok(0) => return None,
519                Ok(_) => {
520                    let trimmed = line.trim();
521                    if let Some(data) = trimmed.strip_prefix("data:") {
522                        let data = data.trim_start();
523                        if data.is_empty() {
524                            continue;
525                        }
526                        match serde_json::from_str(data) {
527                            Ok(json) => return Some(Ok(json)),
528                            Err(e) => return Some(Err(ClientError::Decode(e.to_string()))),
529                        }
530                    }
531                    continue;
532                }
533                Err(e) => return Some(Err(ClientError::Transport(e.to_string()))),
534            }
535        }
536    }
537}
538
539// ── Error helpers ─────────────────────────────────────────────────
540
541fn map_error(e: ureq::Error) -> ClientError {
542    match e {
543        ureq::Error::Status(_status, resp) => {
544            let body = resp.into_string().unwrap_or_default();
545            if let Ok(json) = serde_json::from_str::<serde_json::Value>(&body) {
546                let error = json
547                    .get("error")
548                    .and_then(|v| v.as_str())
549                    .unwrap_or("unknown")
550                    .to_string();
551                let message = json
552                    .get("message")
553                    .and_then(|v| v.as_str())
554                    .unwrap_or(&body)
555                    .to_string();
556                ClientError::Api { error, message }
557            } else {
558                ClientError::Api {
559                    error: "http_error".into(),
560                    message: body,
561                }
562            }
563        }
564        ureq::Error::Transport(t) => ClientError::Unreachable(t.to_string()),
565    }
566}
567
568fn record_type_str(record_type: RecordType) -> &'static str {
569    match record_type {
570        RecordType::A => "A",
571        RecordType::AAAA => "AAAA",
572        RecordType::ANY => "ANY",
573        _ => "A",
574    }
575}
576
577fn check_kind_str(kind: ServiceCheckKind) -> &'static str {
578    match kind {
579        ServiceCheckKind::Http => "http",
580        ServiceCheckKind::Tcp => "tcp",
581    }
582}
583
584fn extract<T: serde::de::DeserializeOwned>(json: &serde_json::Value, key: &str) -> Result<T> {
585    if let Some(err_val) = json.get("error") {
586        let error = err_val.as_str().unwrap_or("unknown").to_string();
587        let message = json
588            .get("message")
589            .and_then(|m| m.as_str())
590            .unwrap_or("Unknown error")
591            .to_string();
592        return Err(ClientError::Api { error, message });
593    }
594    json.get(key)
595        .ok_or_else(|| ClientError::Decode(format!("Missing '{key}' in response")))
596        .and_then(|v| {
597            serde_json::from_value(v.clone()).map_err(|e| ClientError::Decode(e.to_string()))
598        })
599}
600
601#[cfg(test)]
602mod tests {
603    use super::*;
604
605    // ── Test helpers ────────────────────────────────────────────────
606
607    fn cursor_stream(input: &str) -> SseStream {
608        let cursor = std::io::Cursor::new(input.as_bytes().to_vec());
609        SseStream::new(Box::new(cursor))
610    }
611
612    // ── KoiClient::new() tests ──────────────────────────────────────
613
614    #[test]
615    fn client_new_strips_trailing_slash() {
616        // After Happy Eyeballs, localhost is rewritten to a literal IP.
617        let client = KoiClient::new("http://localhost:5641/");
618        assert!(
619            client.endpoint == "http://127.0.0.1:5641"
620                || client.endpoint == "http://[::1]:5641"
621                || client.endpoint == "http://localhost:5641",
622            "unexpected endpoint: {}",
623            client.endpoint
624        );
625        assert!(!client.endpoint.ends_with("/"));
626        assert!(client.token.is_empty());
627    }
628
629    #[test]
630    fn client_with_token_sets_token() {
631        let client = KoiClient::with_token("http://10.0.0.1:5641", "my-secret-token");
632        assert_eq!(client.endpoint, "http://10.0.0.1:5641");
633        assert_eq!(client.token, "my-secret-token");
634    }
635
636    #[test]
637    fn client_new_preserves_non_localhost() {
638        let client = KoiClient::new("http://10.0.0.1:5641");
639        assert_eq!(client.endpoint, "http://10.0.0.1:5641");
640    }
641
642    #[test]
643    fn client_new_strips_multiple_trailing_slashes() {
644        let client = KoiClient::new("http://localhost:5641///");
645        assert!(!client.endpoint.ends_with("/"));
646    }
647
648    // ── SSE parsing tests ───────────────────────────────────────────
649
650    #[test]
651    fn sse_stream_yields_parsed_json() {
652        let input = "data: {\"foo\": 1}\n\n";
653        let mut stream = cursor_stream(input);
654        let item = stream.next().unwrap().unwrap();
655        assert_eq!(item["foo"], 1);
656    }
657
658    #[test]
659    fn sse_stream_skips_empty_lines() {
660        let input = "\n\n\n\n";
661        let mut stream = cursor_stream(input);
662        assert!(stream.next().is_none());
663    }
664
665    #[test]
666    fn sse_stream_skips_non_data_lines() {
667        let input = "event: message\nretry: 1000\n\n";
668        let mut stream = cursor_stream(input);
669        assert!(stream.next().is_none());
670    }
671
672    #[test]
673    fn sse_stream_handles_leading_space() {
674        let input = "data:   {\"hello\": \"world\"}\n";
675        let mut stream = cursor_stream(input);
676        let item = stream.next().unwrap().unwrap();
677        assert_eq!(item["hello"], "world");
678    }
679
680    #[test]
681    fn sse_stream_handles_no_space() {
682        let input = "data:{\"hello\":\"world\"}\n";
683        let mut stream = cursor_stream(input);
684        let item = stream.next().unwrap().unwrap();
685        assert_eq!(item["hello"], "world");
686    }
687
688    #[test]
689    fn sse_stream_yields_multiple_events() {
690        let input = "data: {\"n\": 1}\n\ndata: {\"n\": 2}\n\n";
691        let mut stream = cursor_stream(input);
692        let first = stream.next().unwrap().unwrap();
693        let second = stream.next().unwrap().unwrap();
694        assert_eq!(first["n"], 1);
695        assert_eq!(second["n"], 2);
696    }
697
698    #[test]
699    fn sse_stream_returns_none_on_eof() {
700        let input = "data: {\"n\": 1}\n";
701        let mut stream = cursor_stream(input);
702        let _ = stream.next();
703        assert!(stream.next().is_none());
704    }
705
706    #[test]
707    fn sse_stream_decode_error_on_invalid_json() {
708        let input = "data: {bad json}\n";
709        let mut stream = cursor_stream(input);
710        let item = stream.next().unwrap();
711        assert!(item.is_err());
712    }
713
714    #[test]
715    fn sse_stream_transport_error_on_read_failure() {
716        struct BrokenReader;
717        impl Read for BrokenReader {
718            fn read(&mut self, _buf: &mut [u8]) -> std::io::Result<usize> {
719                Err(std::io::Error::other("boom"))
720            }
721        }
722
723        let stream = SseStream::new(Box::new(BrokenReader));
724        let mut stream = stream;
725        let item = stream.next().unwrap();
726        assert!(item.is_err());
727    }
728}