turul_http_mcp_server/
handler.rs1use std::pin::Pin;
7use std::task::{Context, Poll};
8
9use bytes::Bytes;
10use futures::Stream;
11use http_body::Body;
12
13pub struct SseStreamBody {
15 stream: Pin<
16 Box<
17 dyn Stream<Item = std::result::Result<String, tokio::sync::broadcast::error::RecvError>>
18 + Send,
19 >,
20 >,
21}
22
23impl SseStreamBody {
24 pub fn new<S>(stream: S) -> Self
25 where
26 S: Stream<Item = std::result::Result<String, tokio::sync::broadcast::error::RecvError>>
27 + Send
28 + 'static,
29 {
30 Self {
31 stream: Box::pin(stream),
32 }
33 }
34}
35
36impl Body for SseStreamBody {
37 type Data = Bytes;
38 type Error = Box<dyn std::error::Error + Send + Sync>;
39
40 fn poll_frame(
41 mut self: Pin<&mut Self>,
42 cx: &mut Context<'_>,
43 ) -> Poll<Option<std::result::Result<http_body::Frame<Self::Data>, Self::Error>>> {
44 match self.stream.as_mut().poll_next(cx) {
45 Poll::Ready(Some(Ok(data))) => {
46 let bytes = Bytes::from(data);
47 Poll::Ready(Some(Ok(http_body::Frame::data(bytes))))
48 }
49 Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(Box::new(e)))),
50 Poll::Ready(None) => Poll::Ready(None),
51 Poll::Pending => Poll::Pending,
52 }
53 }
54}
55
56