mcp_core_rs/utils/
cleanup.rs1use std::{
2 pin::Pin,
3 task::{Context, Poll},
4};
5
6use futures::Stream as FuturesStream;
7
8pub struct CleanupStream<S> {
9 pub inner: S,
10 pub shutdown_tx: Option<tokio::sync::oneshot::Sender<()>>,
11}
12
13impl<S, T, E> FuturesStream for CleanupStream<S>
14where
15 S: FuturesStream<Item = Result<T, E>> + Unpin,
16{
17 type Item = Result<T, E>;
18
19 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
20 let poll = Pin::new(&mut self.inner).poll_next(cx);
21 if let Poll::Ready(None) = poll {
22 if let Some(tx) = self.shutdown_tx.take() {
23 let _ = tx.send(());
24 }
25 }
26 poll
27 }
28}