use futures_util::TryStreamExt;
use tokio::io::{AsyncBufRead, AsyncBufReadExt, Lines};
use tokio_util::io::StreamReader;
pub type LinesStream = Lines<Box<dyn AsyncBufRead + Send + Unpin>>;
pub(crate) fn lines_from_response(response: reqwest::Response) -> LinesStream {
let bytes = response
.bytes_stream()
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e));
let reader = StreamReader::new(bytes);
let boxed: Box<dyn AsyncBufRead + Send + Unpin> = Box::new(reader);
boxed.lines()
}
pub(crate) async fn read_next_sse_event<T: serde::de::DeserializeOwned>(
url: &str,
lines: &mut LinesStream,
) -> Result<T, super::Error> {
let mut payload = String::new();
loop {
let line = lines
.next_line()
.await
.map_err(|e| super::Error::MalformedResponse {
url: url.to_string(),
message: format!("error reading SSE line: {e}"),
})?;
match line {
None => {
return Err(super::Error::MalformedResponse {
url: url.to_string(),
message: "SSE stream ended before a complete event was delivered".into(),
});
}
Some(l) if l.is_empty() => {
if payload.is_empty() {
continue;
}
return serde_json::from_str(&payload).map_err(|e| super::Error::MalformedResponse {
url: url.to_string(),
message: format!(
"SSE event payload did not deserialize as JSON-RPC response: {e}; payload starts with: {}",
payload.chars().take(200).collect::<String>(),
),
});
}
Some(l) => {
if let Some(data) = l.strip_prefix("data: ").or_else(|| l.strip_prefix("data:")) {
payload.push_str(data);
}
}
}
}
}
pub(crate) async fn parse_streamable_http_response<T: serde::de::DeserializeOwned>(
url: &str,
response: reqwest::Response,
) -> Result<T, super::Error> {
let bytes = response.bytes().await.map_err(|source| super::Error::Request {
url: url.to_string(),
source,
})?;
if let Ok(v) = serde_json::from_slice::<T>(&bytes) {
return Ok(v);
}
let text = std::str::from_utf8(&bytes).map_err(|_| {
super::Error::MalformedResponse {
url: url.to_string(),
message: "response body is not valid UTF-8".into(),
}
})?;
let collected: String = text
.lines()
.filter_map(|l| l.strip_prefix("data: ").or_else(|| l.strip_prefix("data:")))
.collect();
serde_json::from_str(&collected).map_err(|e| {
super::Error::MalformedResponse {
url: url.to_string(),
message: format!(
"neither JSON nor SSE-wrapped JSON: {e}; body starts with: {}",
text.chars().take(200).collect::<String>(),
),
}
})
}