Skip to main content

homeassistant_cli/api/
events.rs

1use futures_util::StreamExt;
2
3use crate::api::{HaClient, HaError, HaEvent};
4
5pub async fn fire_event(
6    client: &HaClient,
7    event_type: &str,
8    data: Option<&serde_json::Value>,
9) -> Result<serde_json::Value, HaError> {
10    let req = client.post(&format!("/api/events/{event_type}"));
11    let req = if let Some(d) = data { req.json(d) } else { req };
12    let resp = req.send().await?;
13    match resp.status().as_u16() {
14        200 => Ok(resp.json().await?),
15        401 | 403 => Err(HaError::Auth("Unauthorized".into())),
16        404 => Err(HaError::NotFound(format!(
17            "Event type '{event_type}' not found"
18        ))),
19        status => Err(HaError::Api {
20            status,
21            message: resp.text().await.unwrap_or_default(),
22        }),
23    }
24}
25
26/// Parse a single SSE line of the form `data: <json>` into an HaEvent.
27pub(crate) fn parse_sse_data(line: &str) -> Option<HaEvent> {
28    let json = line.strip_prefix("data: ")?;
29    serde_json::from_str(json).ok()
30}
31
32/// Stream SSE events from /api/stream, calling `on_event` for each.
33/// Returns when `on_event` returns false or the stream ends.
34pub async fn watch_stream(
35    client: &HaClient,
36    restrict: Option<&str>,
37    mut on_event: impl FnMut(HaEvent) -> bool,
38) -> Result<(), HaError> {
39    let url = match restrict {
40        Some(r) => format!("{}/api/stream?restrict={}", client.base_url, r),
41        None => format!("{}/api/stream", client.base_url),
42    };
43
44    let resp = client
45        .client
46        .get(&url)
47        .bearer_auth(client.token())
48        .send()
49        .await?;
50
51    match resp.status().as_u16() {
52        200 => {}
53        401 | 403 => return Err(HaError::Auth("Unauthorized".into())),
54        status => {
55            return Err(HaError::Api {
56                status,
57                message: resp.text().await.unwrap_or_default(),
58            });
59        }
60    }
61
62    let mut stream = resp.bytes_stream();
63    let mut buffer = String::new();
64
65    while let Some(chunk) = stream.next().await {
66        let chunk = chunk?;
67        buffer.push_str(&String::from_utf8_lossy(&chunk));
68
69        while let Some(pos) = buffer.find('\n') {
70            let line = buffer[..pos].trim_end_matches('\r').to_owned();
71            buffer.drain(..=pos);
72            if let Some(event) = parse_sse_data(&line)
73                && !on_event(event)
74            {
75                return Ok(());
76            }
77        }
78    }
79    Ok(())
80}
81
82#[cfg(test)]
83mod tests {
84    use super::*;
85    use crate::api::HaClient;
86    use wiremock::matchers::{method, path};
87    use wiremock::{Mock, MockServer, ResponseTemplate};
88
89    #[tokio::test]
90    async fn fire_event_sends_post() {
91        let server = MockServer::start().await;
92        Mock::given(method("POST"))
93            .and(path("/api/events/my_event"))
94            .respond_with(
95                ResponseTemplate::new(200)
96                    .set_body_json(serde_json::json!({"message": "Event my_event fired."})),
97            )
98            .mount(&server)
99            .await;
100
101        let client = HaClient::new(server.uri(), "tok");
102        let result = fire_event(&client, "my_event", None).await;
103        assert!(result.is_ok());
104    }
105
106    #[tokio::test]
107    async fn fire_event_with_data_includes_body() {
108        let server = MockServer::start().await;
109        Mock::given(method("POST"))
110            .and(path("/api/events/custom"))
111            .respond_with(
112                ResponseTemplate::new(200)
113                    .set_body_json(serde_json::json!({"message": "Event custom fired."})),
114            )
115            .mount(&server)
116            .await;
117
118        let client = HaClient::new(server.uri(), "tok");
119        let data = serde_json::json!({"key": "value"});
120        let result = fire_event(&client, "custom", Some(&data)).await;
121        assert!(result.is_ok());
122    }
123
124    #[test]
125    fn parse_sse_line_extracts_data() {
126        let line =
127            r#"data: {"event_type":"state_changed","data":{},"time_fired":"2026-01-01T00:00:00Z"}"#;
128        let event = parse_sse_data(line).unwrap();
129        assert_eq!(event.event_type, "state_changed");
130    }
131
132    #[test]
133    fn parse_sse_line_returns_none_for_non_data_lines() {
134        assert!(parse_sse_data("").is_none());
135        assert!(parse_sse_data(": ping").is_none());
136        assert!(parse_sse_data("event: state_changed").is_none());
137    }
138}