1use std::pin::Pin;
29use std::task::{Context, Poll};
30
31use bytes::BytesMut;
32use futures_core::Stream;
33use futures_util::StreamExt;
34use reqwest::header::{ACCEPT, AUTHORIZATION, CACHE_CONTROL};
35use serde_json::Value;
36
37use crate::client::PulseClient;
38use crate::error::PulseError;
39
40const PATH: &str = "/api/pulse/events/stream";
41
42pub struct EventsResource<'c> {
44 pub(crate) client: &'c PulseClient,
45}
46
47impl<'c> EventsResource<'c> {
48 pub async fn stream(self) -> Result<EventsStream, PulseError> {
56 let token = self.client.token().ok_or_else(|| PulseError::NoToken {
57 path: PATH.to_string(),
58 })?;
59 if token.is_empty() {
60 return Err(PulseError::NoToken {
61 path: PATH.to_string(),
62 });
63 }
64
65 let url = format!("{}{PATH}", self.client.inner.base_url);
66 let response = self
72 .client
73 .inner
74 .http
75 .get(url)
76 .header(AUTHORIZATION, format!("Bearer {token}"))
77 .header(ACCEPT, "text/event-stream")
78 .header(CACHE_CONTROL, "no-cache")
79 .send()
80 .await?;
81
82 let status = response.status();
83 if !status.is_success() {
84 let bytes = response.bytes().await?;
85 let body = if bytes.is_empty() {
86 None
87 } else {
88 match serde_json::from_slice::<Value>(&bytes) {
89 Ok(v) => Some(v),
90 Err(_) => {
91 let raw = String::from_utf8_lossy(&bytes);
92 Some(serde_json::json!({ "error": raw.to_string() }))
93 }
94 }
95 };
96 return Err(match status.as_u16() {
97 401 => PulseError::Auth {
98 path: PATH.to_string(),
99 body,
100 },
101 other => PulseError::Api {
102 status: other,
103 path: PATH.to_string(),
104 body,
105 },
106 });
107 }
108
109 Ok(EventsStream {
110 inner: Box::pin(response.bytes_stream()),
111 buffer: BytesMut::with_capacity(4096),
112 data_lines: Vec::new(),
113 done: false,
114 })
115 }
116}
117
118pub struct EventsStream {
122 inner: Pin<Box<dyn Stream<Item = reqwest::Result<bytes::Bytes>> + Send>>,
123 buffer: BytesMut,
124 data_lines: Vec<String>,
125 done: bool,
126}
127
128impl Stream for EventsStream {
129 type Item = Result<Value, PulseError>;
130
131 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
132 if self.done {
133 return Poll::Ready(None);
134 }
135
136 loop {
137 if let Some(event) = self.try_parse_buffered_event() {
139 return Poll::Ready(Some(Ok(event)));
140 }
141
142 match self.inner.poll_next_unpin(cx) {
144 Poll::Pending => return Poll::Pending,
145 Poll::Ready(None) => {
146 self.done = true;
147 return Poll::Ready(None);
148 }
149 Poll::Ready(Some(Err(e))) => {
150 self.done = true;
151 return Poll::Ready(Some(Err(PulseError::Transport(e))));
152 }
153 Poll::Ready(Some(Ok(chunk))) => {
154 self.buffer.extend_from_slice(&chunk);
155 }
157 }
158 }
159 }
160}
161
162impl EventsStream {
163 fn try_parse_buffered_event(&mut self) -> Option<Value> {
167 loop {
168 let newline_pos = self.buffer.iter().position(|&b| b == b'\n')?;
169 let line_bytes = self.buffer.split_to(newline_pos + 1);
171 let line_len = if line_bytes.len() >= 2 && line_bytes[line_bytes.len() - 2] == b'\r' {
173 line_bytes.len() - 2
174 } else {
175 line_bytes.len() - 1
176 };
177 let line = std::str::from_utf8(&line_bytes[..line_len]).unwrap_or("");
178
179 if line.is_empty() {
180 if !self.data_lines.is_empty() {
182 let payload = self.data_lines.join("\n");
183 self.data_lines.clear();
184 return Some(match serde_json::from_str::<Value>(&payload) {
185 Ok(v) => v,
186 Err(_) => serde_json::json!({ "data": payload }),
187 });
188 }
189 continue;
190 }
191 if line.starts_with(':') {
192 continue; }
194 if let Some(rest) = line.strip_prefix("data:") {
195 let value = rest.strip_prefix(' ').unwrap_or(rest);
196 self.data_lines.push(value.to_string());
197 }
198 }
200 }
201}
202
203impl std::fmt::Debug for EventsResource<'_> {
204 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
205 f.debug_struct("EventsResource").finish()
206 }
207}
208
209impl std::fmt::Debug for EventsStream {
210 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
211 f.debug_struct("EventsStream")
212 .field("done", &self.done)
213 .field("buffered_lines", &self.data_lines.len())
214 .finish()
215 }
216}