homeassistant_cli/api/
events.rs1use futures_util::StreamExt;
2
3use crate::api::{HaClient, HaError, HaEvent};
4
5pub async fn fire_event(
6 client: &HaClient,
7 event_type: &str,
8 data: Option<&serde_json::Value>,
9) -> Result<serde_json::Value, HaError> {
10 let req = client.post(&format!("/api/events/{event_type}"));
11 let req = if let Some(d) = data { req.json(d) } else { req };
12 let resp = req.send().await?;
13 match resp.status().as_u16() {
14 200 => Ok(resp.json().await?),
15 401 | 403 => Err(HaError::Auth("Unauthorized".into())),
16 404 => Err(HaError::NotFound(format!(
17 "Event type '{event_type}' not found"
18 ))),
19 status => Err(HaError::Api {
20 status,
21 message: resp.text().await.unwrap_or_default(),
22 }),
23 }
24}
25
26pub(crate) fn parse_sse_data(line: &str) -> Option<HaEvent> {
28 let json = line.strip_prefix("data: ")?;
29 serde_json::from_str(json).ok()
30}
31
32pub async fn watch_stream(
35 client: &HaClient,
36 restrict: Option<&str>,
37 mut on_event: impl FnMut(HaEvent) -> bool,
38) -> Result<(), HaError> {
39 let url = match restrict {
40 Some(r) => format!("{}/api/stream?restrict={}", client.base_url, r),
41 None => format!("{}/api/stream", client.base_url),
42 };
43
44 let resp = client
45 .client
46 .get(&url)
47 .bearer_auth(client.token())
48 .send()
49 .await?;
50
51 match resp.status().as_u16() {
52 200 => {}
53 401 | 403 => return Err(HaError::Auth("Unauthorized".into())),
54 status => {
55 return Err(HaError::Api {
56 status,
57 message: resp.text().await.unwrap_or_default(),
58 });
59 }
60 }
61
62 let mut stream = resp.bytes_stream();
63 let mut buffer = String::new();
64
65 while let Some(chunk) = stream.next().await {
66 let chunk = chunk?;
67 buffer.push_str(&String::from_utf8_lossy(&chunk));
68
69 while let Some(pos) = buffer.find('\n') {
70 let line = buffer[..pos].trim_end_matches('\r').to_owned();
71 buffer.drain(..=pos);
72 if let Some(event) = parse_sse_data(&line)
73 && !on_event(event)
74 {
75 return Ok(());
76 }
77 }
78 }
79 Ok(())
80}
81
82#[cfg(test)]
83mod tests {
84 use super::*;
85 use crate::api::HaClient;
86 use wiremock::matchers::{method, path};
87 use wiremock::{Mock, MockServer, ResponseTemplate};
88
89 #[tokio::test]
90 async fn fire_event_sends_post() {
91 let server = MockServer::start().await;
92 Mock::given(method("POST"))
93 .and(path("/api/events/my_event"))
94 .respond_with(
95 ResponseTemplate::new(200)
96 .set_body_json(serde_json::json!({"message": "Event my_event fired."})),
97 )
98 .mount(&server)
99 .await;
100
101 let client = HaClient::new(server.uri(), "tok");
102 let result = fire_event(&client, "my_event", None).await;
103 assert!(result.is_ok());
104 }
105
106 #[tokio::test]
107 async fn fire_event_with_data_includes_body() {
108 let server = MockServer::start().await;
109 Mock::given(method("POST"))
110 .and(path("/api/events/custom"))
111 .respond_with(
112 ResponseTemplate::new(200)
113 .set_body_json(serde_json::json!({"message": "Event custom fired."})),
114 )
115 .mount(&server)
116 .await;
117
118 let client = HaClient::new(server.uri(), "tok");
119 let data = serde_json::json!({"key": "value"});
120 let result = fire_event(&client, "custom", Some(&data)).await;
121 assert!(result.is_ok());
122 }
123
124 #[test]
125 fn parse_sse_line_extracts_data() {
126 let line =
127 r#"data: {"event_type":"state_changed","data":{},"time_fired":"2026-01-01T00:00:00Z"}"#;
128 let event = parse_sse_data(line).unwrap();
129 assert_eq!(event.event_type, "state_changed");
130 }
131
132 #[test]
133 fn parse_sse_line_returns_none_for_non_data_lines() {
134 assert!(parse_sse_data("").is_none());
135 assert!(parse_sse_data(": ping").is_none());
136 assert!(parse_sse_data("event: state_changed").is_none());
137 }
138}