Skip to main content

koi_client/
lib.rs

1//! HTTP client for communicating with a running Koi daemon.
2//!
3//! Uses blocking `ureq` - no async runtime dependency on the client path.
4//! All paths use `/v1/mdns/` prefix for mDNS domain routes.
5
6use std::io::{BufRead, BufReader, Read};
7use std::time::Duration;
8
9use hickory_proto::rr::RecordType;
10use koi_common::mdns_protocol::{
11    AdminRegistration, DaemonStatus, RegisterPayload, RegistrationResult, RenewalResult,
12};
13use koi_common::net::resolve_localhost;
14use koi_common::types::{ServiceCheckKind, ServiceRecord};
15
16/// TCP connection timeout for general API requests.
17const CONNECT_TIMEOUT: Duration = Duration::from_secs(5);
18
19/// Read timeout for general (non-streaming) API requests.
20const READ_TIMEOUT: Duration = Duration::from_secs(10);
21
22/// Timeout for the fast health check probe.
23const HEALTH_TIMEOUT: Duration = Duration::from_millis(200);
24
25// ── Error types ───────────────────────────────────────────────────
26
27#[derive(Debug, thiserror::Error)]
28pub enum ClientError {
29    #[error("Daemon not reachable: {0}")]
30    Unreachable(String),
31
32    #[error("{error}: {message}")]
33    Api { error: String, message: String },
34
35    #[error("Request failed: {0}")]
36    Transport(String),
37
38    #[error("Invalid response: {0}")]
39    Decode(String),
40}
41
42pub type Result<T> = std::result::Result<T, ClientError>;
43
44// ── Client ────────────────────────────────────────────────────────
45
46/// Header name for Daemon Access Token authentication.
47const DAT_HEADER: &str = "X-Koi-Token";
48
49pub struct KoiClient {
50    endpoint: String,
51    agent: ureq::Agent,
52    /// Daemon Access Token (empty string means no auth).
53    token: String,
54}
55
56impl KoiClient {
57    pub fn new(endpoint: &str) -> Self {
58        let clean = endpoint.trim_end_matches('/');
59        let resolved = resolve_localhost(clean);
60        let agent = ureq::AgentBuilder::new()
61            .timeout_connect(CONNECT_TIMEOUT)
62            .timeout_read(READ_TIMEOUT)
63            .build();
64        Self {
65            endpoint: resolved,
66            agent,
67            token: String::new(),
68        }
69    }
70
71    /// Create a client with a Daemon Access Token for authenticated requests.
72    pub fn with_token(endpoint: &str, token: &str) -> Self {
73        let mut client = Self::new(endpoint);
74        client.token = token.to_string();
75        client
76    }
77
78    /// Create a client from the breadcrumb file (endpoint + token).
79    ///
80    /// Returns `None` if no breadcrumb exists.
81    pub fn from_breadcrumb() -> Option<Self> {
82        let bc = koi_config::breadcrumb::read_breadcrumb()?;
83        Some(Self::with_token(&bc.endpoint, &bc.token))
84    }
85
86    /// Attach the DAT header to a request if a token is present.
87    fn auth_get(&self, url: &str) -> ureq::Request {
88        let req = self.agent.get(url);
89        if self.token.is_empty() {
90            req
91        } else {
92            req.set(DAT_HEADER, &self.token)
93        }
94    }
95
96    /// Attach the DAT header to a POST request.
97    fn auth_post(&self, url: &str) -> ureq::Request {
98        let req = self.agent.post(url);
99        if self.token.is_empty() {
100            req
101        } else {
102            req.set(DAT_HEADER, &self.token)
103        }
104    }
105
106    /// Attach the DAT header to a PUT request.
107    fn auth_put(&self, url: &str) -> ureq::Request {
108        let req = self.agent.put(url);
109        if self.token.is_empty() {
110            req
111        } else {
112            req.set(DAT_HEADER, &self.token)
113        }
114    }
115
116    /// Attach the DAT header to a DELETE request.
117    fn auth_delete(&self, url: &str) -> ureq::Request {
118        let req = self.agent.delete(url);
119        if self.token.is_empty() {
120            req
121        } else {
122            req.set(DAT_HEADER, &self.token)
123        }
124    }
125
126    // ── Health ────────────────────────────────────────────────────
127
128    /// Quick health check with a 200ms timeout.
129    pub fn health(&self) -> Result<()> {
130        let agent = ureq::AgentBuilder::new()
131            .timeout_connect(HEALTH_TIMEOUT)
132            .timeout_read(HEALTH_TIMEOUT)
133            .build();
134        let url = format!("{}/healthz", self.endpoint);
135        agent.get(&url).call().map_err(map_error)?;
136        Ok(())
137    }
138
139    // ── Service operations (mDNS) ──────────────────────────────────
140
141    pub fn register(&self, payload: &RegisterPayload) -> Result<RegistrationResult> {
142        let url = format!("{}/v1/mdns/announce", self.endpoint);
143        let json_val =
144            serde_json::to_value(payload).map_err(|e| ClientError::Decode(e.to_string()))?;
145        let resp = self
146            .auth_post(&url)
147            .send_json(json_val)
148            .map_err(map_error)?;
149        let json: serde_json::Value = resp
150            .into_json()
151            .map_err(|e| ClientError::Decode(e.to_string()))?;
152        extract(&json, "registered")
153    }
154
155    pub fn unregister(&self, id: &str) -> Result<()> {
156        let url = format!("{}/v1/mdns/unregister/{id}", self.endpoint);
157        self.auth_delete(&url).call().map_err(map_error)?;
158        Ok(())
159    }
160
161    pub fn heartbeat(&self, id: &str) -> Result<RenewalResult> {
162        let url = format!("{}/v1/mdns/heartbeat/{id}", self.endpoint);
163        let resp = self.auth_put(&url).send_bytes(&[]).map_err(map_error)?;
164        let json: serde_json::Value = resp
165            .into_json()
166            .map_err(|e| ClientError::Decode(e.to_string()))?;
167        extract(&json, "renewed")
168    }
169
170    pub fn resolve(&self, instance: &str) -> Result<ServiceRecord> {
171        let url = format!("{}/v1/mdns/resolve", self.endpoint);
172        let resp = self
173            .auth_get(&url)
174            .query("name", instance)
175            .call()
176            .map_err(map_error)?;
177        let json: serde_json::Value = resp
178            .into_json()
179            .map_err(|e| ClientError::Decode(e.to_string()))?;
180        extract(&json, "resolved")
181    }
182
183    /// Start a browse SSE stream. Returns an iterator of JSON events.
184    pub fn browse_stream(&self, service_type: &str) -> Result<SseStream> {
185        let url = format!("{}/v1/mdns/discover", self.endpoint);
186        let mut req = self.stream_agent().get(&url);
187        if !self.token.is_empty() {
188            req = req.set(DAT_HEADER, &self.token);
189        }
190        let resp = req.query("type", service_type).call().map_err(map_error)?;
191        Ok(SseStream::new(Box::new(resp.into_reader())))
192    }
193
194    /// Start an events SSE stream. Returns an iterator of JSON events.
195    pub fn events_stream(&self, service_type: &str) -> Result<SseStream> {
196        let url = format!("{}/v1/mdns/subscribe", self.endpoint);
197        let mut req = self.stream_agent().get(&url);
198        if !self.token.is_empty() {
199            req = req.set(DAT_HEADER, &self.token);
200        }
201        let resp = req.query("type", service_type).call().map_err(map_error)?;
202        Ok(SseStream::new(Box::new(resp.into_reader())))
203    }
204
205    // ── Unified status ─────────────────────────────────────────────
206
207    /// Fetch unified status from `/v1/status`.
208    pub fn unified_status(&self) -> Result<serde_json::Value> {
209        let url = format!("{}/v1/status", self.endpoint);
210        let resp = self.auth_get(&url).call().map_err(map_error)?;
211        resp.into_json()
212            .map_err(|e| ClientError::Decode(e.to_string()))
213    }
214
215    // ── DNS operations (Phase 6) ───────────────────────────────────
216
217    pub fn dns_status(&self) -> Result<serde_json::Value> {
218        self.get_json("/v1/dns/status")
219    }
220
221    pub fn dns_lookup(&self, name: &str, record_type: RecordType) -> Result<serde_json::Value> {
222        let url = format!("{}/v1/dns/lookup", self.endpoint);
223        let resp = self
224            .auth_get(&url)
225            .query("name", name)
226            .query("type", record_type_str(record_type))
227            .call()
228            .map_err(map_error)?;
229        resp.into_json()
230            .map_err(|e| ClientError::Decode(e.to_string()))
231    }
232
233    pub fn dns_list(&self) -> Result<serde_json::Value> {
234        self.get_json("/v1/dns/list")
235    }
236
237    pub fn dns_add(&self, name: &str, ip: &str, ttl: Option<u32>) -> Result<serde_json::Value> {
238        let body = serde_json::json!({
239            "name": name,
240            "ip": ip,
241            "ttl": ttl,
242        });
243        self.post_json("/v1/dns/add", &body)
244    }
245
246    pub fn dns_remove(&self, name: &str) -> Result<serde_json::Value> {
247        let url = format!("{}/v1/dns/remove/{}", self.endpoint, name);
248        let resp = self.auth_delete(&url).call().map_err(map_error)?;
249        resp.into_json()
250            .map_err(|e| ClientError::Decode(e.to_string()))
251    }
252
253    pub fn dns_start(&self) -> Result<serde_json::Value> {
254        self.post_json("/v1/dns/serve", &serde_json::json!({}))
255    }
256
257    pub fn dns_stop(&self) -> Result<serde_json::Value> {
258        self.post_json("/v1/dns/stop", &serde_json::json!({}))
259    }
260
261    // ── Health operations (Phase 7) ───────────────────────────────
262
263    pub fn health_status(&self) -> Result<serde_json::Value> {
264        self.get_json("/v1/health/status")
265    }
266
267    pub fn health_add_check(
268        &self,
269        name: &str,
270        kind: ServiceCheckKind,
271        target: &str,
272        interval_secs: u64,
273        timeout_secs: u64,
274    ) -> Result<serde_json::Value> {
275        let body = serde_json::json!({
276            "name": name,
277            "kind": check_kind_str(kind),
278            "target": target,
279            "interval_secs": interval_secs,
280            "timeout_secs": timeout_secs,
281        });
282        self.post_json("/v1/health/add", &body)
283    }
284
285    pub fn health_remove_check(&self, name: &str) -> Result<serde_json::Value> {
286        let url = format!("{}/v1/health/remove/{}", self.endpoint, name);
287        let resp = self.auth_delete(&url).call().map_err(map_error)?;
288        resp.into_json()
289            .map_err(|e| ClientError::Decode(e.to_string()))
290    }
291
292    // ── Proxy operations (Phase 8) ───────────────────────────────
293
294    pub fn proxy_status(&self) -> Result<serde_json::Value> {
295        self.get_json("/v1/proxy/status")
296    }
297
298    pub fn proxy_list(&self) -> Result<serde_json::Value> {
299        self.get_json("/v1/proxy/list")
300    }
301
302    pub fn proxy_add(
303        &self,
304        name: &str,
305        listen_port: u16,
306        backend: &str,
307        allow_remote: bool,
308    ) -> Result<serde_json::Value> {
309        let body = serde_json::json!({
310            "name": name,
311            "listen_port": listen_port,
312            "backend": backend,
313            "allow_remote": allow_remote,
314        });
315        self.post_json("/v1/proxy/add", &body)
316    }
317
318    pub fn proxy_remove(&self, name: &str) -> Result<serde_json::Value> {
319        let url = format!("{}/v1/proxy/remove/{}", self.endpoint, name);
320        let resp = self.auth_delete(&url).call().map_err(map_error)?;
321        resp.into_json()
322            .map_err(|e| ClientError::Decode(e.to_string()))
323    }
324
325    // ── UDP operations ─────────────────────────────────────────────
326
327    pub fn udp_status(&self) -> Result<serde_json::Value> {
328        self.get_json("/v1/udp/status")
329    }
330
331    pub fn udp_bind(&self, port: u16, addr: &str, lease_secs: u64) -> Result<serde_json::Value> {
332        let body = serde_json::json!({
333            "port": port,
334            "addr": addr,
335            "lease_secs": lease_secs,
336        });
337        self.post_json("/v1/udp/bind", &body)
338    }
339
340    pub fn udp_unbind(&self, id: &str) -> Result<serde_json::Value> {
341        let url = format!("{}/v1/udp/bind/{}", self.endpoint, id);
342        let resp = self.auth_delete(&url).call().map_err(map_error)?;
343        resp.into_json()
344            .map_err(|e| ClientError::Decode(e.to_string()))
345    }
346
347    pub fn udp_send(&self, id: &str, dest: &str, payload_b64: &str) -> Result<serde_json::Value> {
348        let body = serde_json::json!({
349            "dest": dest,
350            "payload": payload_b64,
351        });
352        let path = format!("/v1/udp/send/{id}");
353        self.post_json(&path, &body)
354    }
355
356    pub fn udp_heartbeat(&self, id: &str) -> Result<serde_json::Value> {
357        let path = format!("/v1/udp/heartbeat/{id}");
358        self.put_json(&path, &serde_json::json!({}))
359    }
360
361    // ── Generic operations ─────────────────────────────────────────
362
363    /// POST JSON to an arbitrary path and return the response as a JSON value.
364    pub fn post_json(&self, path: &str, body: &serde_json::Value) -> Result<serde_json::Value> {
365        let url = format!("{}{path}", self.endpoint);
366        let resp = self
367            .auth_post(&url)
368            .send_json(body.clone())
369            .map_err(map_error)?;
370        resp.into_json()
371            .map_err(|e| ClientError::Decode(e.to_string()))
372    }
373
374    /// GET JSON from an arbitrary path and return the response as a JSON value.
375    pub fn get_json(&self, path: &str) -> Result<serde_json::Value> {
376        let url = format!("{}{path}", self.endpoint);
377        let resp = self.auth_get(&url).call().map_err(map_error)?;
378        resp.into_json()
379            .map_err(|e| ClientError::Decode(e.to_string()))
380    }
381
382    /// PUT JSON to an arbitrary path and return the response as a JSON value.
383    pub fn put_json(&self, path: &str, body: &serde_json::Value) -> Result<serde_json::Value> {
384        let url = format!("{}{path}", self.endpoint);
385        let resp = self
386            .auth_put(&url)
387            .send_json(body.clone())
388            .map_err(map_error)?;
389        resp.into_json()
390            .map_err(|e| ClientError::Decode(e.to_string()))
391    }
392
393    // ── Admin operations (mDNS) ──────────────────────────────────
394
395    pub fn admin_status(&self) -> Result<DaemonStatus> {
396        let url = format!("{}/v1/mdns/admin/status", self.endpoint);
397        let resp = self.auth_get(&url).call().map_err(map_error)?;
398        resp.into_json()
399            .map_err(|e| ClientError::Decode(e.to_string()))
400    }
401
402    pub fn admin_registrations(&self) -> Result<Vec<AdminRegistration>> {
403        let url = format!("{}/v1/mdns/admin/ls", self.endpoint);
404        let resp = self.auth_get(&url).call().map_err(map_error)?;
405        resp.into_json()
406            .map_err(|e| ClientError::Decode(e.to_string()))
407    }
408
409    pub fn admin_inspect(&self, id: &str) -> Result<AdminRegistration> {
410        let url = format!("{}/v1/mdns/admin/inspect/{id}", self.endpoint);
411        let resp = self.auth_get(&url).call().map_err(map_error)?;
412        resp.into_json()
413            .map_err(|e| ClientError::Decode(e.to_string()))
414    }
415
416    pub fn admin_force_unregister(&self, id: &str) -> Result<()> {
417        let url = format!("{}/v1/mdns/admin/unregister/{id}", self.endpoint);
418        self.auth_delete(&url).call().map_err(map_error)?;
419        Ok(())
420    }
421
422    pub fn admin_drain(&self, id: &str) -> Result<()> {
423        let url = format!("{}/v1/mdns/admin/drain/{id}", self.endpoint);
424        self.auth_post(&url).call().map_err(map_error)?;
425        Ok(())
426    }
427
428    pub fn admin_revive(&self, id: &str) -> Result<()> {
429        let url = format!("{}/v1/mdns/admin/revive/{id}", self.endpoint);
430        self.auth_post(&url).call().map_err(map_error)?;
431        Ok(())
432    }
433
434    // ── Admin operations (system) ────────────────────────────────────
435
436    /// Request a graceful shutdown of the running daemon.
437    pub fn shutdown(&self) -> Result<()> {
438        let url = format!("{}/v1/admin/shutdown", self.endpoint);
439        self.auth_post(&url).call().map_err(map_error)?;
440        Ok(())
441    }
442
443    // ── Certmesh operations (Phase 3) ──────────────────────────────
444
445    /// GET /v1/certmesh/roster - fetch signed roster manifest.
446    pub fn get_roster_manifest(&self) -> Result<serde_json::Value> {
447        let url = format!("{}/v1/certmesh/roster", self.endpoint);
448        let resp = self.auth_get(&url).call().map_err(map_error)?;
449        resp.into_json()
450            .map_err(|e| ClientError::Decode(e.to_string()))
451    }
452
453    /// POST /v1/certmesh/renew - push renewed cert to a member.
454    ///
455    /// `member_endpoint` is the member's HTTP endpoint, not the CA's.
456    /// Used when the primary pushes renewals to remote members.
457    #[allow(dead_code)]
458    pub fn push_renewal(
459        &self,
460        member_endpoint: &str,
461        request: &serde_json::Value,
462    ) -> Result<serde_json::Value> {
463        let url = format!("{member_endpoint}/v1/certmesh/renew");
464        let resp = self
465            .auth_post(&url)
466            .send_json(request.clone())
467            .map_err(map_error)?;
468        resp.into_json()
469            .map_err(|e| ClientError::Decode(e.to_string()))
470    }
471
472    /// POST /v1/certmesh/health - send health heartbeat.
473    pub fn health_heartbeat(&self, request: &serde_json::Value) -> Result<serde_json::Value> {
474        let url = format!("{}/v1/certmesh/health", self.endpoint);
475        let resp = self
476            .auth_post(&url)
477            .send_json(request.clone())
478            .map_err(map_error)?;
479        resp.into_json()
480            .map_err(|e| ClientError::Decode(e.to_string()))
481    }
482
483    // ── Private helpers ───────────────────────────────────────────
484
485    /// Agent without read timeout for SSE streams.
486    fn stream_agent(&self) -> ureq::Agent {
487        ureq::AgentBuilder::new()
488            .timeout_connect(CONNECT_TIMEOUT)
489            .build()
490    }
491}
492
493// ── SSE Stream ────────────────────────────────────────────────────
494
495/// Iterator over Server-Sent Events from the Koi daemon.
496///
497/// Parses `data: <json>` lines, skipping empty lines and event metadata.
498pub struct SseStream {
499    reader: BufReader<Box<dyn Read + Send>>,
500}
501
502impl SseStream {
503    fn new(reader: Box<dyn Read + Send>) -> Self {
504        Self {
505            reader: BufReader::new(reader),
506        }
507    }
508}
509
510impl Iterator for SseStream {
511    type Item = Result<serde_json::Value>;
512
513    fn next(&mut self) -> Option<Self::Item> {
514        loop {
515            let mut line = String::new();
516            match self.reader.read_line(&mut line) {
517                Ok(0) => return None,
518                Ok(_) => {
519                    let trimmed = line.trim();
520                    if let Some(data) = trimmed.strip_prefix("data:") {
521                        let data = data.trim_start();
522                        if data.is_empty() {
523                            continue;
524                        }
525                        match serde_json::from_str(data) {
526                            Ok(json) => return Some(Ok(json)),
527                            Err(e) => return Some(Err(ClientError::Decode(e.to_string()))),
528                        }
529                    }
530                    continue;
531                }
532                Err(e) => return Some(Err(ClientError::Transport(e.to_string()))),
533            }
534        }
535    }
536}
537
538// ── Error helpers ─────────────────────────────────────────────────
539
540fn map_error(e: ureq::Error) -> ClientError {
541    match e {
542        ureq::Error::Status(_status, resp) => {
543            let body = resp.into_string().unwrap_or_default();
544            if let Ok(json) = serde_json::from_str::<serde_json::Value>(&body) {
545                let error = json
546                    .get("error")
547                    .and_then(|v| v.as_str())
548                    .unwrap_or("unknown")
549                    .to_string();
550                let message = json
551                    .get("message")
552                    .and_then(|v| v.as_str())
553                    .unwrap_or(&body)
554                    .to_string();
555                ClientError::Api { error, message }
556            } else {
557                ClientError::Api {
558                    error: "http_error".into(),
559                    message: body,
560                }
561            }
562        }
563        ureq::Error::Transport(t) => ClientError::Unreachable(t.to_string()),
564    }
565}
566
567fn record_type_str(record_type: RecordType) -> &'static str {
568    match record_type {
569        RecordType::A => "A",
570        RecordType::AAAA => "AAAA",
571        RecordType::ANY => "ANY",
572        _ => "A",
573    }
574}
575
576fn check_kind_str(kind: ServiceCheckKind) -> &'static str {
577    match kind {
578        ServiceCheckKind::Http => "http",
579        ServiceCheckKind::Tcp => "tcp",
580    }
581}
582
583fn extract<T: serde::de::DeserializeOwned>(json: &serde_json::Value, key: &str) -> Result<T> {
584    if let Some(err_val) = json.get("error") {
585        let error = err_val.as_str().unwrap_or("unknown").to_string();
586        let message = json
587            .get("message")
588            .and_then(|m| m.as_str())
589            .unwrap_or("Unknown error")
590            .to_string();
591        return Err(ClientError::Api { error, message });
592    }
593    json.get(key)
594        .ok_or_else(|| ClientError::Decode(format!("Missing '{key}' in response")))
595        .and_then(|v| {
596            serde_json::from_value(v.clone()).map_err(|e| ClientError::Decode(e.to_string()))
597        })
598}
599
600#[cfg(test)]
601mod tests {
602    use super::*;
603
604    // ── Test helpers ────────────────────────────────────────────────
605
606    fn cursor_stream(input: &str) -> SseStream {
607        let cursor = std::io::Cursor::new(input.as_bytes().to_vec());
608        SseStream::new(Box::new(cursor))
609    }
610
611    // ── KoiClient::new() tests ──────────────────────────────────────
612
613    #[test]
614    fn client_new_strips_trailing_slash() {
615        // After Happy Eyeballs, localhost is rewritten to a literal IP.
616        let client = KoiClient::new("http://localhost:5641/");
617        assert!(
618            client.endpoint == "http://127.0.0.1:5641"
619                || client.endpoint == "http://[::1]:5641"
620                || client.endpoint == "http://localhost:5641",
621            "unexpected endpoint: {}",
622            client.endpoint
623        );
624        assert!(!client.endpoint.ends_with("/"));
625        assert!(client.token.is_empty());
626    }
627
628    #[test]
629    fn client_with_token_sets_token() {
630        let client = KoiClient::with_token("http://10.0.0.1:5641", "my-secret-token");
631        assert_eq!(client.endpoint, "http://10.0.0.1:5641");
632        assert_eq!(client.token, "my-secret-token");
633    }
634
635    #[test]
636    fn client_new_preserves_non_localhost() {
637        let client = KoiClient::new("http://10.0.0.1:5641");
638        assert_eq!(client.endpoint, "http://10.0.0.1:5641");
639    }
640
641    #[test]
642    fn client_new_strips_multiple_trailing_slashes() {
643        let client = KoiClient::new("http://localhost:5641///");
644        assert!(!client.endpoint.ends_with("/"));
645    }
646
647    // ── SSE parsing tests ───────────────────────────────────────────
648
649    #[test]
650    fn sse_stream_yields_parsed_json() {
651        let input = "data: {\"foo\": 1}\n\n";
652        let mut stream = cursor_stream(input);
653        let item = stream.next().unwrap().unwrap();
654        assert_eq!(item["foo"], 1);
655    }
656
657    #[test]
658    fn sse_stream_skips_empty_lines() {
659        let input = "\n\n\n\n";
660        let mut stream = cursor_stream(input);
661        assert!(stream.next().is_none());
662    }
663
664    #[test]
665    fn sse_stream_skips_non_data_lines() {
666        let input = "event: message\nretry: 1000\n\n";
667        let mut stream = cursor_stream(input);
668        assert!(stream.next().is_none());
669    }
670
671    #[test]
672    fn sse_stream_handles_leading_space() {
673        let input = "data:   {\"hello\": \"world\"}\n";
674        let mut stream = cursor_stream(input);
675        let item = stream.next().unwrap().unwrap();
676        assert_eq!(item["hello"], "world");
677    }
678
679    #[test]
680    fn sse_stream_handles_no_space() {
681        let input = "data:{\"hello\":\"world\"}\n";
682        let mut stream = cursor_stream(input);
683        let item = stream.next().unwrap().unwrap();
684        assert_eq!(item["hello"], "world");
685    }
686
687    #[test]
688    fn sse_stream_yields_multiple_events() {
689        let input = "data: {\"n\": 1}\n\ndata: {\"n\": 2}\n\n";
690        let mut stream = cursor_stream(input);
691        let first = stream.next().unwrap().unwrap();
692        let second = stream.next().unwrap().unwrap();
693        assert_eq!(first["n"], 1);
694        assert_eq!(second["n"], 2);
695    }
696
697    #[test]
698    fn sse_stream_returns_none_on_eof() {
699        let input = "data: {\"n\": 1}\n";
700        let mut stream = cursor_stream(input);
701        let _ = stream.next();
702        assert!(stream.next().is_none());
703    }
704
705    #[test]
706    fn sse_stream_decode_error_on_invalid_json() {
707        let input = "data: {bad json}\n";
708        let mut stream = cursor_stream(input);
709        let item = stream.next().unwrap();
710        assert!(item.is_err());
711    }
712
713    #[test]
714    fn sse_stream_transport_error_on_read_failure() {
715        struct BrokenReader;
716        impl Read for BrokenReader {
717            fn read(&mut self, _buf: &mut [u8]) -> std::io::Result<usize> {
718                Err(std::io::Error::other("boom"))
719            }
720        }
721
722        let stream = SseStream::new(Box::new(BrokenReader));
723        let mut stream = stream;
724        let item = stream.next().unwrap();
725        assert!(item.is_err());
726    }
727}