use std::pin::Pin;
use std::task::{Context, Poll};
use bytes::BytesMut;
use futures_core::Stream;
use futures_util::StreamExt;
use reqwest::header::{ACCEPT, AUTHORIZATION, CACHE_CONTROL};
use serde_json::Value;
use crate::client::PulseClient;
use crate::error::PulseError;
const PATH: &str = "/api/pulse/events/stream";
pub struct EventsResource<'c> {
pub(crate) client: &'c PulseClient,
}
impl<'c> EventsResource<'c> {
pub async fn stream(self) -> Result<EventsStream, PulseError> {
let token = self.client.token().ok_or_else(|| PulseError::NoToken {
path: PATH.to_string(),
})?;
if token.is_empty() {
return Err(PulseError::NoToken {
path: PATH.to_string(),
});
}
let url = format!("{}{PATH}", self.client.inner.base_url);
let response = self
.client
.inner
.http
.get(url)
.header(AUTHORIZATION, format!("Bearer {token}"))
.header(ACCEPT, "text/event-stream")
.header(CACHE_CONTROL, "no-cache")
.send()
.await?;
let status = response.status();
if !status.is_success() {
let bytes = response.bytes().await?;
let body = if bytes.is_empty() {
None
} else {
match serde_json::from_slice::<Value>(&bytes) {
Ok(v) => Some(v),
Err(_) => {
let raw = String::from_utf8_lossy(&bytes);
Some(serde_json::json!({ "error": raw.to_string() }))
}
}
};
return Err(match status.as_u16() {
401 => PulseError::Auth {
path: PATH.to_string(),
body,
},
other => PulseError::Api {
status: other,
path: PATH.to_string(),
body,
},
});
}
Ok(EventsStream {
inner: Box::pin(response.bytes_stream()),
buffer: BytesMut::with_capacity(4096),
data_lines: Vec::new(),
done: false,
})
}
}
pub struct EventsStream {
inner: Pin<Box<dyn Stream<Item = reqwest::Result<bytes::Bytes>> + Send>>,
buffer: BytesMut,
data_lines: Vec<String>,
done: bool,
}
impl Stream for EventsStream {
type Item = Result<Value, PulseError>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
if self.done {
return Poll::Ready(None);
}
loop {
if let Some(event) = self.try_parse_buffered_event() {
return Poll::Ready(Some(Ok(event)));
}
match self.inner.poll_next_unpin(cx) {
Poll::Pending => return Poll::Pending,
Poll::Ready(None) => {
self.done = true;
return Poll::Ready(None);
}
Poll::Ready(Some(Err(e))) => {
self.done = true;
return Poll::Ready(Some(Err(PulseError::Transport(e))));
}
Poll::Ready(Some(Ok(chunk))) => {
self.buffer.extend_from_slice(&chunk);
}
}
}
}
}
impl EventsStream {
fn try_parse_buffered_event(&mut self) -> Option<Value> {
loop {
let newline_pos = self.buffer.iter().position(|&b| b == b'\n')?;
let line_bytes = self.buffer.split_to(newline_pos + 1);
let line_len = if line_bytes.len() >= 2 && line_bytes[line_bytes.len() - 2] == b'\r' {
line_bytes.len() - 2
} else {
line_bytes.len() - 1
};
let line = std::str::from_utf8(&line_bytes[..line_len]).unwrap_or("");
if line.is_empty() {
if !self.data_lines.is_empty() {
let payload = self.data_lines.join("\n");
self.data_lines.clear();
return Some(match serde_json::from_str::<Value>(&payload) {
Ok(v) => v,
Err(_) => serde_json::json!({ "data": payload }),
});
}
continue;
}
if line.starts_with(':') {
continue; }
if let Some(rest) = line.strip_prefix("data:") {
let value = rest.strip_prefix(' ').unwrap_or(rest);
self.data_lines.push(value.to_string());
}
}
}
}
impl std::fmt::Debug for EventsResource<'_> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("EventsResource").finish()
}
}
impl std::fmt::Debug for EventsStream {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("EventsStream")
.field("done", &self.done)
.field("buffered_lines", &self.data_lines.len())
.finish()
}
}