use std::collections::VecDeque;
use std::panic::{AssertUnwindSafe, catch_unwind};
use std::pin::Pin;
use futures::stream::{self, Stream, StreamExt as _};
use tokio_util::sync::CancellationToken;
use swink_agent::{AssistantMessageEvent, StopReason};
use crate::finalize::StreamFinalize;
#[derive(Debug, PartialEq, Eq)]
pub enum SseLine {
Event(String),
Data(String),
Done,
Empty,
TransportError(String),
}
pub const SSE_TRANSPORT_ERROR_EVENT: &str = "__swink_transport_error__";
pub struct SseStreamParser {
buffer: String,
byte_carry: Vec<u8>,
pending_data: Option<String>,
}
impl SseStreamParser {
#[must_use]
pub const fn new() -> Self {
Self {
buffer: String::new(),
byte_carry: Vec::new(),
pending_data: None,
}
}
pub fn feed(&mut self, chunk: &[u8]) -> Vec<SseLine> {
let combined: Vec<u8> = if self.byte_carry.is_empty() {
chunk.to_vec()
} else {
let mut v = std::mem::take(&mut self.byte_carry);
v.extend_from_slice(chunk);
v
};
let bytes = combined.as_slice();
let mut cursor = 0;
while cursor < bytes.len() {
match std::str::from_utf8(&bytes[cursor..]) {
Ok(s) => {
self.buffer.push_str(s);
cursor = bytes.len();
}
Err(e) => {
let valid = e.valid_up_to();
if valid > 0 {
let s = std::str::from_utf8(&bytes[cursor..cursor + valid])
.expect("valid utf-8 prefix");
self.buffer.push_str(s);
}
cursor += valid;
match e.error_len() {
None => {
self.byte_carry.extend_from_slice(&bytes[cursor..]);
cursor = bytes.len();
}
Some(n) => {
self.buffer.push('\u{FFFD}');
cursor += n;
}
}
}
}
}
self.drain_lines()
}
pub fn flush(&mut self) -> Vec<SseLine> {
let mut lines = vec![];
if !self.byte_carry.is_empty() {
let carry = std::mem::take(&mut self.byte_carry);
self.buffer.push_str(&String::from_utf8_lossy(&carry));
}
if !self.buffer.trim().is_empty() {
let remaining = std::mem::take(&mut self.buffer);
for line in remaining.lines() {
self.process_raw_line(line, &mut lines);
}
}
self.buffer.clear();
if let Some(data) = self.pending_data.take() {
lines.push(SseLine::Data(data));
}
lines
}
fn drain_lines(&mut self) -> Vec<SseLine> {
let mut lines = vec![];
while let Some(pos) = self.buffer.find('\n') {
let line_end = if pos > 0 && self.buffer.as_bytes().get(pos - 1) == Some(&b'\r') {
pos - 1
} else {
pos
};
let line = self.buffer[..line_end].to_string();
self.buffer.drain(..=pos);
self.process_raw_line(&line, &mut lines);
}
lines
}
fn process_raw_line(&mut self, line: &str, output: &mut Vec<SseLine>) {
if line.is_empty() {
if let Some(data) = self.pending_data.take() {
output.push(SseLine::Data(data));
}
output.push(SseLine::Empty);
return;
}
if line.starts_with(':') {
return;
}
if let Some(event_type) = parse_sse_field_value(line, "event:") {
if let Some(data) = self.pending_data.take() {
output.push(SseLine::Data(data));
}
output.push(SseLine::Event(event_type.to_string()));
return;
}
if let Some(data) = parse_sse_field_value(line, "data:") {
if data == "[DONE]" {
if let Some(pending) = self.pending_data.take() {
output.push(SseLine::Data(pending));
}
output.push(SseLine::Done);
return;
}
if !data.is_empty() {
if let Some(ref mut pending) = self.pending_data {
pending.push('\n');
pending.push_str(data);
} else {
self.pending_data = Some(data.to_string());
}
}
return;
}
if let Some(data) = self.pending_data.take() {
output.push(SseLine::Data(data));
}
}
}
fn parse_sse_field_value<'a>(line: &'a str, field_prefix: &str) -> Option<&'a str> {
let value = line.strip_prefix(field_prefix)?;
Some(value.strip_prefix(' ').unwrap_or(value))
}
impl Default for SseStreamParser {
fn default() -> Self {
Self::new()
}
}
pub fn sse_data_lines(
byte_stream: impl Stream<Item = Result<bytes::Bytes, reqwest::Error>> + Send + 'static,
) -> Pin<Box<dyn Stream<Item = SseLine> + Send + 'static>> {
sse_data_lines_with_callback(byte_stream, None)
}
pub fn sse_lines(
byte_stream: impl Stream<Item = Result<bytes::Bytes, reqwest::Error>> + Send + 'static,
) -> Pin<Box<dyn Stream<Item = SseLine> + Send + 'static>> {
Box::pin(stream::unfold(
(
Box::pin(byte_stream),
SseStreamParser::new(),
VecDeque::<SseLine>::new(),
),
|(mut stream, mut parser, mut pending)| async move {
loop {
if let Some(line) = pending.pop_front() {
return Some((line, (stream, parser, pending)));
}
if let Some(result) = stream.next().await {
match result {
Ok(bytes) => {
pending.extend(parser.feed(&bytes));
}
Err(err) => {
pending.extend(parser.flush());
pending.push_back(SseLine::TransportError(format!(
"SSE transport error: {err}"
)));
}
}
continue;
}
pending.extend(parser.flush());
if pending.is_empty() {
return None;
}
}
},
))
}
pub fn sse_data_lines_with_callback(
byte_stream: impl Stream<Item = Result<bytes::Bytes, reqwest::Error>> + Send + 'static,
on_raw_payload: Option<swink_agent::OnRawPayload>,
) -> Pin<Box<dyn Stream<Item = SseLine> + Send + 'static>> {
Box::pin(stream::unfold(
(Box::pin(sse_lines(byte_stream)), on_raw_payload),
|(mut stream, callback)| async move {
loop {
if let Some(line) = stream.next().await {
if !matches!(
line,
SseLine::Data(_) | SseLine::Done | SseLine::TransportError(_)
) {
continue;
}
if let (SseLine::Data(data), Some(cb)) = (&line, &callback) {
let cb = AssertUnwindSafe(cb);
let data = AssertUnwindSafe(data);
let _ = catch_unwind(|| (cb)(&data));
}
return Some((line, (stream, callback)));
}
return None;
}
},
))
}
#[derive(Debug, PartialEq, Eq)]
pub struct SseEvent {
pub event_type: String,
pub data: String,
}
pub fn sse_paired_events(
byte_stream: impl Stream<Item = Result<bytes::Bytes, reqwest::Error>> + Send + 'static,
) -> Pin<Box<dyn Stream<Item = SseEvent> + Send + 'static>> {
sse_paired_events_with_callback(byte_stream, None)
}
pub fn sse_paired_events_with_callback(
byte_stream: impl Stream<Item = Result<bytes::Bytes, reqwest::Error>> + Send + 'static,
on_raw_payload: Option<swink_agent::OnRawPayload>,
) -> Pin<Box<dyn Stream<Item = SseEvent> + Send + 'static>> {
Box::pin(stream::unfold(
(
Box::pin(sse_lines(byte_stream)),
Option::<String>::None,
on_raw_payload,
),
|(mut stream, mut current_event, callback)| async move {
loop {
match stream.next().await {
Some(SseLine::Empty | SseLine::Done) => {
current_event = None;
}
Some(SseLine::TransportError(message)) => {
return Some((
SseEvent {
event_type: SSE_TRANSPORT_ERROR_EVENT.to_string(),
data: message,
},
(stream, current_event, callback),
));
}
Some(SseLine::Event(event_type)) => {
current_event = Some(event_type);
}
Some(SseLine::Data(data)) => {
if !data.is_empty() {
if let Some(cb) = &callback {
let cb = AssertUnwindSafe(cb);
let data = AssertUnwindSafe(&data);
let _ = catch_unwind(|| (cb)(&data));
}
let event_type = current_event
.take()
.unwrap_or_else(|| "unknown".to_string());
return Some((
SseEvent { event_type, data },
(stream, current_event, callback),
));
}
}
None => return None,
}
}
},
))
}
pub enum SseAction {
Continue(Vec<AssistantMessageEvent>),
Done(Vec<AssistantMessageEvent>),
Skip,
}
pub fn sse_adapter_stream<S, L, F>(
line_stream: Pin<Box<dyn Stream<Item = L> + Send>>,
cancellation_token: CancellationToken,
state: S,
cancel_message: &'static str,
on_item: F,
) -> Pin<Box<dyn Stream<Item = AssistantMessageEvent> + Send>>
where
S: StreamFinalize + Send + 'static,
L: Send + 'static,
F: FnMut(Option<L>, &mut S) -> SseAction + Send + 'static,
{
Box::pin(
stream::unfold(
(line_stream, cancellation_token, state, false, true, on_item),
move |(mut lines, token, mut state, mut done, first, mut on_item)| async move {
if done {
return None;
}
if first {
return Some((
vec![AssistantMessageEvent::Start],
(lines, token, state, done, false, on_item),
));
}
tokio::select! {
biased;
() = token.cancelled() => {
let mut events = crate::finalize::finalize_blocks(&mut state);
events.push(AssistantMessageEvent::Error {
stop_reason: StopReason::Aborted,
error_message: cancel_message.to_string(),
usage: None,
error_kind: None,
});
done = true;
Some((events, (lines, token, state, done, false, on_item)))
}
item = lines.next() => {
let action = on_item(item, &mut state);
match action {
SseAction::Continue(events) => {
Some((events, (lines, token, state, done, false, on_item)))
}
SseAction::Done(events) => {
done = true;
Some((events, (lines, token, state, done, false, on_item)))
}
SseAction::Skip => {
Some((vec![], (lines, token, state, done, false, on_item)))
}
}
}
}
},
)
.flat_map(stream::iter),
)
}
#[cfg(test)]
mod tests {
use futures::StreamExt as _;
use super::*;
#[test]
fn sse_parser_basic_event_data() {
let mut parser = SseStreamParser::new();
let lines = parser.feed(b"event: message_start\ndata: {}\n\n");
assert_eq!(
lines,
vec![
SseLine::Event("message_start".to_string()),
SseLine::Data("{}".to_string()),
SseLine::Empty,
]
);
}
#[test]
fn sse_parser_accepts_optional_space_after_colon() {
let mut parser = SseStreamParser::new();
let lines = parser.feed(b"event:message_start\ndata:{\"ok\":true}\n\n");
assert_eq!(
lines,
vec![
SseLine::Event("message_start".to_string()),
SseLine::Data("{\"ok\":true}".to_string()),
SseLine::Empty,
]
);
}
#[test]
fn sse_parser_accepts_mixed_spaced_and_unspaced_fields() {
let mut parser = SseStreamParser::new();
let lines = parser.feed(b"event: message_start\ndata:{\"one\":1}\ndata: {\"two\":2}\n\n");
assert_eq!(
lines,
vec![
SseLine::Event("message_start".to_string()),
SseLine::Data("{\"one\":1}\n{\"two\":2}".to_string()),
SseLine::Empty,
]
);
}
#[test]
fn sse_parser_partial_chunk_buffering() {
let mut parser = SseStreamParser::new();
let lines1 = parser.feed(b"event: content");
assert!(lines1.is_empty(), "no newline yet, nothing to yield");
let lines2 = parser.feed(b"_block_delta\ndata: {\"text\":\"hello\"}\n\n");
assert_eq!(
lines2,
vec![
SseLine::Event("content_block_delta".to_string()),
SseLine::Data("{\"text\":\"hello\"}".to_string()),
SseLine::Empty,
]
);
}
#[test]
fn sse_parser_done_sentinel() {
let mut parser = SseStreamParser::new();
let lines = parser.feed(b"data: [DONE]\n");
assert_eq!(lines, vec![SseLine::Done]);
}
#[test]
fn sse_parser_empty_line() {
let mut parser = SseStreamParser::new();
let lines = parser.feed(b"\n");
assert_eq!(lines, vec![SseLine::Empty]);
}
#[test]
fn sse_parser_comment_skipped() {
let mut parser = SseStreamParser::new();
let lines = parser.feed(b": this is a comment\n");
assert!(lines.is_empty());
}
#[test]
fn sse_parser_flush_remaining() {
let mut parser = SseStreamParser::new();
let lines = parser.feed(b"data: {\"final\":true}");
assert!(lines.is_empty(), "no newline, nothing drained yet");
let flushed = parser.flush();
assert_eq!(flushed, vec![SseLine::Data("{\"final\":true}".to_string())]);
}
#[test]
fn sse_parser_multiline_data_concatenation() {
let mut parser = SseStreamParser::new();
let lines = parser.feed(b"data: line1\ndata: line2\ndata: line3\n\n");
assert_eq!(
lines,
vec![
SseLine::Data("line1\nline2\nline3".to_string()),
SseLine::Empty,
]
);
}
#[test]
fn sse_parser_multiline_data_flushed_on_event() {
let mut parser = SseStreamParser::new();
let lines = parser.feed(b"data: part1\ndata: part2\nevent: next\n");
assert_eq!(
lines,
vec![
SseLine::Data("part1\npart2".to_string()),
SseLine::Event("next".to_string()),
]
);
}
#[test]
fn sse_parser_multiline_data_across_feeds() {
let mut parser = SseStreamParser::new();
let lines1 = parser.feed(b"data: first\n");
assert!(
lines1.is_empty(),
"pending data not emitted without separator"
);
let lines2 = parser.feed(b"data: second\n\n");
assert_eq!(
lines2,
vec![SseLine::Data("first\nsecond".to_string()), SseLine::Empty,]
);
}
#[test]
fn sse_parser_single_data_emitted_on_empty_line() {
let mut parser = SseStreamParser::new();
let lines = parser.feed(b"data: single\n\n");
assert_eq!(
lines,
vec![SseLine::Data("single".to_string()), SseLine::Empty,]
);
}
#[test]
fn sse_parser_pending_data_flushed_at_end() {
let mut parser = SseStreamParser::new();
let lines = parser.feed(b"data: orphan\n");
assert!(lines.is_empty());
let flushed = parser.flush();
assert_eq!(flushed, vec![SseLine::Data("orphan".to_string())]);
}
#[test]
fn sse_parser_split_utf8_across_chunks_is_lossless() {
let payload = "data: héllo\n\n".as_bytes();
let split_at = payload
.windows(2)
.position(|w| w == [0xC3, 0xA9])
.expect("payload contains é")
+ 1;
let (first, second) = payload.split_at(split_at);
let mut parser = SseStreamParser::new();
let lines1 = parser.feed(first);
for line in &lines1 {
if let SseLine::Data(d) = line {
assert!(!d.contains('\u{FFFD}'), "split byte produced U+FFFD: {d:?}");
}
}
let lines2 = parser.feed(second);
let combined: Vec<_> = lines1.into_iter().chain(lines2).collect();
assert_eq!(
combined,
vec![SseLine::Data("héllo".to_string()), SseLine::Empty]
);
}
#[test]
fn sse_parser_split_utf8_3byte_and_4byte() {
let payload = "data: €🦀\n\n".as_bytes();
let mut parser = SseStreamParser::new();
let mut all = Vec::new();
for b in payload {
all.extend(parser.feed(&[*b]));
}
assert_eq!(all, vec![SseLine::Data("€🦀".to_string()), SseLine::Empty]);
}
#[test]
fn sse_parser_truly_invalid_byte_uses_replacement_char() {
let mut parser = SseStreamParser::new();
let lines = parser.feed(b"data: a\xFFb\n\n");
assert_eq!(
lines,
vec![SseLine::Data("a\u{FFFD}b".to_string()), SseLine::Empty,]
);
}
#[test]
fn sse_parser_done_flushes_pending_data() {
let mut parser = SseStreamParser::new();
let lines = parser.feed(b"data: last\ndata: [DONE]\n");
assert_eq!(
lines,
vec![SseLine::Data("last".to_string()), SseLine::Done,]
);
}
#[tokio::test]
async fn on_raw_payload_fires_for_each_line() {
use std::sync::{Arc, Mutex};
let captured = Arc::new(Mutex::new(Vec::<String>::new()));
let captured_clone = Arc::clone(&captured);
let callback: Arc<dyn Fn(&str) + Send + Sync> = Arc::new(move |data: &str| {
captured_clone.lock().unwrap().push(data.to_owned());
});
let chunks = vec![Ok(bytes::Bytes::from("data: line1\n\ndata: line2\n\n"))];
let byte_stream = futures::stream::iter(chunks);
let mut data_stream = sse_data_lines_with_callback(byte_stream, Some(callback));
let first = data_stream.next().await;
assert_eq!(first, Some(SseLine::Data("line1".to_string())));
let second = data_stream.next().await;
assert_eq!(second, Some(SseLine::Data("line2".to_string())));
let lines = {
let guard = captured.lock().unwrap();
guard.clone()
};
assert_eq!(lines, vec!["line1".to_string(), "line2".to_string()]);
}
#[tokio::test]
async fn on_raw_payload_none_no_overhead() {
let chunks = vec![Ok(bytes::Bytes::from("data: hello\n\n"))];
let byte_stream = futures::stream::iter(chunks);
let mut data_stream = sse_data_lines_with_callback(byte_stream, None);
let first = data_stream.next().await;
assert_eq!(first, Some(SseLine::Data("hello".to_string())));
let done = data_stream.next().await;
assert!(done.is_none());
}
#[tokio::test]
async fn on_raw_payload_panic_caught() {
use std::sync::Arc;
let callback: Arc<dyn Fn(&str) + Send + Sync> = Arc::new(|_data: &str| {
panic!("callback panic!");
});
let chunks = vec![Ok(bytes::Bytes::from("data: safe\n\ndata: also_safe\n\n"))];
let byte_stream = futures::stream::iter(chunks);
let mut data_stream = sse_data_lines_with_callback(byte_stream, Some(callback));
let first = data_stream.next().await;
assert_eq!(first, Some(SseLine::Data("safe".to_string())));
let second = data_stream.next().await;
assert_eq!(second, Some(SseLine::Data("also_safe".to_string())));
}
#[tokio::test]
async fn sse_lines_preserves_events_and_separators() {
let chunks = vec![Ok(bytes::Bytes::from(
"event: start\ndata: hello\n\ndata: [DONE]\n",
))];
let byte_stream = futures::stream::iter(chunks);
let lines: Vec<_> = sse_lines(byte_stream).collect().await;
assert_eq!(
lines,
vec![
SseLine::Event("start".to_string()),
SseLine::Data("hello".to_string()),
SseLine::Empty,
SseLine::Done,
]
);
}
#[tokio::test]
async fn paired_events_pairs_event_with_data() {
let chunks = vec![Ok(bytes::Bytes::from(
"event: message_start\ndata: {\"type\":\"start\"}\n\nevent: content_block_delta\ndata: {\"text\":\"hi\"}\n\n",
))];
let byte_stream = futures::stream::iter(chunks);
let events: Vec<_> = super::sse_paired_events(byte_stream).collect().await;
assert_eq!(events.len(), 2);
assert_eq!(events[0].event_type, "message_start");
assert_eq!(events[0].data, "{\"type\":\"start\"}");
assert_eq!(events[1].event_type, "content_block_delta");
assert_eq!(events[1].data, "{\"text\":\"hi\"}");
}
#[tokio::test]
async fn paired_events_data_without_event_uses_unknown() {
let chunks = vec![Ok(bytes::Bytes::from("data: orphan\n\n"))];
let byte_stream = futures::stream::iter(chunks);
let events: Vec<_> = super::sse_paired_events(byte_stream).collect().await;
assert_eq!(events.len(), 1);
assert_eq!(events[0].event_type, "unknown");
assert_eq!(events[0].data, "orphan");
}
#[tokio::test]
async fn paired_events_empty_line_resets_event() {
let chunks = vec![Ok(bytes::Bytes::from(
"event: foo\n\ndata: after_reset\n\n",
))];
let byte_stream = futures::stream::iter(chunks);
let events: Vec<_> = super::sse_paired_events(byte_stream).collect().await;
assert_eq!(events.len(), 1);
assert_eq!(events[0].event_type, "unknown");
}
#[tokio::test]
async fn paired_events_on_raw_payload_fires_for_each_line() {
use std::sync::{Arc, Mutex};
let captured = Arc::new(Mutex::new(Vec::<String>::new()));
let captured_clone = Arc::clone(&captured);
let callback: Arc<dyn Fn(&str) + Send + Sync> = Arc::new(move |data: &str| {
captured_clone.lock().unwrap().push(data.to_owned());
});
let chunks = vec![Ok(bytes::Bytes::from(
"event: one\ndata: first\n\nevent: two\ndata: second\n\n",
))];
let byte_stream = futures::stream::iter(chunks);
let events: Vec<_> = super::sse_paired_events_with_callback(byte_stream, Some(callback))
.collect()
.await;
assert_eq!(events.len(), 2);
assert_eq!(
captured.lock().unwrap().clone(),
vec!["first".to_string(), "second".to_string()]
);
}
#[tokio::test]
async fn paired_events_on_raw_payload_panic_caught() {
use std::sync::Arc;
let callback: Arc<dyn Fn(&str) + Send + Sync> = Arc::new(|_data: &str| {
panic!("callback panic!");
});
let chunks = vec![Ok(bytes::Bytes::from(
"event: one\ndata: safe\n\nevent: two\ndata: still_safe\n\n",
))];
let byte_stream = futures::stream::iter(chunks);
let events: Vec<_> = super::sse_paired_events_with_callback(byte_stream, Some(callback))
.collect()
.await;
assert_eq!(events.len(), 2);
assert_eq!(events[0].data, "safe");
assert_eq!(events[1].data, "still_safe");
}
#[tokio::test]
async fn adapter_stream_emits_start_first() {
use crate::finalize::{OpenBlock, StreamFinalize};
struct EmptyState;
impl StreamFinalize for EmptyState {
fn drain_open_blocks(&mut self) -> Vec<OpenBlock> {
vec![]
}
}
let line_stream: Pin<Box<dyn Stream<Item = SseLine> + Send>> =
Box::pin(futures::stream::empty());
let token = CancellationToken::new();
let events: Vec<_> = super::sse_adapter_stream(
line_stream,
token,
EmptyState,
"cancelled",
|item, _state| match item {
None => super::SseAction::Done(vec![]),
Some(_) => super::SseAction::Skip,
},
)
.collect()
.await;
assert!(!events.is_empty());
assert!(matches!(events[0], AssistantMessageEvent::Start));
}
#[tokio::test]
async fn adapter_stream_finalizes_on_cancel() {
use crate::finalize::{OpenBlock, StreamFinalize};
struct TextState;
impl StreamFinalize for TextState {
fn drain_open_blocks(&mut self) -> Vec<OpenBlock> {
vec![OpenBlock::Text { content_index: 0 }]
}
}
let token = CancellationToken::new();
token.cancel();
let line_stream: Pin<Box<dyn Stream<Item = SseLine> + Send>> =
Box::pin(futures::stream::pending());
let events: Vec<_> = super::sse_adapter_stream(
line_stream,
token,
TextState,
"test cancelled",
|_item, _state| super::SseAction::Skip,
)
.collect()
.await;
assert_eq!(events.len(), 3);
assert!(matches!(events[0], AssistantMessageEvent::Start));
assert!(matches!(
events[1],
AssistantMessageEvent::TextEnd { content_index: 0 }
));
assert!(matches!(
events[2],
AssistantMessageEvent::Error {
stop_reason: StopReason::Aborted,
..
}
));
}
#[tokio::test]
async fn sse_lines_surfaces_transport_error() {
use tokio::io::AsyncWriteExt;
use tokio::net::TcpListener;
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
tokio::spawn(async move {
if let Ok((mut sock, _)) = listener.accept().await {
let mut buf = [0u8; 1024];
let _ = tokio::io::AsyncReadExt::read(&mut sock, &mut buf).await;
let header = "HTTP/1.1 200 OK\r\n\
Content-Type: text/event-stream\r\n\
Transfer-Encoding: chunked\r\n\r\n\
10\r\ndata: partial\n";
let _ = sock.write_all(header.as_bytes()).await;
drop(sock);
}
});
let client = reqwest::Client::new();
let resp = client
.get(format!("http://{addr}/"))
.send()
.await
.expect("connect");
let lines: Vec<_> = sse_lines(resp.bytes_stream()).collect().await;
assert!(
lines
.iter()
.any(|l| matches!(l, SseLine::TransportError(_))),
"expected TransportError line, got {lines:?}"
);
}
#[tokio::test]
async fn adapter_stream_delegates_lines_to_callback() {
use crate::finalize::{OpenBlock, StreamFinalize};
struct EmptyState;
impl StreamFinalize for EmptyState {
fn drain_open_blocks(&mut self) -> Vec<OpenBlock> {
vec![]
}
}
let line_stream: Pin<Box<dyn Stream<Item = SseLine> + Send>> = Box::pin(
futures::stream::iter(vec![SseLine::Data("hello".to_string())]),
);
let token = CancellationToken::new();
let events: Vec<_> = super::sse_adapter_stream(
line_stream,
token,
EmptyState,
"cancelled",
|item, _state| match item {
Some(SseLine::Data(text)) => {
super::SseAction::Continue(vec![AssistantMessageEvent::TextDelta {
content_index: 0,
delta: text,
}])
}
None => super::SseAction::Done(vec![]),
_ => super::SseAction::Skip,
},
)
.collect()
.await;
assert!(events.len() >= 2);
assert!(matches!(events[0], AssistantMessageEvent::Start));
assert!(matches!(
events[1],
AssistantMessageEvent::TextDelta {
content_index: 0,
..
}
));
}
}