use tokio::sync::mpsc;
use super::super::ctx::StreamEvent;
pub fn ordered_relay(
bounded_sink: mpsc::Sender<StreamEvent>,
) -> mpsc::UnboundedSender<StreamEvent> {
let (tx, mut rx) = mpsc::unbounded_channel::<StreamEvent>();
tokio::spawn(async move {
while let Some(event) = rx.recv().await {
if bounded_sink.send(event).await.is_err() {
break;
}
}
});
tx
}
#[cfg(test)]
mod tests {
use super::*;
use crate::models::ReasoningChunk;
#[tokio::test]
async fn events_arrive_in_order() {
let (sink_tx, mut sink_rx) = mpsc::channel::<StreamEvent>(16);
let relay = ordered_relay(sink_tx);
relay.send(StreamEvent::Text("a".to_string())).unwrap();
relay
.send(StreamEvent::Reasoning(ReasoningChunk {
text: "r1".to_string(),
signature: None,
}))
.unwrap();
relay.send(StreamEvent::Text("b".to_string())).unwrap();
relay
.send(StreamEvent::Done {
usage: None,
thinking_signature: None,
})
.unwrap();
drop(relay);
let mut seen: Vec<&'static str> = Vec::new();
while let Some(ev) = sink_rx.recv().await {
seen.push(match ev {
StreamEvent::Text(s) if s == "a" => "text-a",
StreamEvent::Text(s) if s == "b" => "text-b",
StreamEvent::Reasoning(_) => "reasoning",
StreamEvent::Done { .. } => "done",
_ => "other",
});
}
assert_eq!(seen, vec!["text-a", "reasoning", "text-b", "done"]);
}
}