mcp_core_rs/utils/
cleanup.rs

1use std::{
2    pin::Pin,
3    task::{Context, Poll},
4};
5
6use futures::Stream as FuturesStream;
7
8pub struct CleanupStream<S> {
9    pub inner: S,
10    pub shutdown_tx: Option<tokio::sync::oneshot::Sender<()>>,
11}
12
13impl<S, T, E> FuturesStream for CleanupStream<S>
14where
15    S: FuturesStream<Item = Result<T, E>> + Unpin,
16{
17    type Item = Result<T, E>;
18
19    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
20        let poll = Pin::new(&mut self.inner).poll_next(cx);
21        if let Poll::Ready(None) = poll {
22            if let Some(tx) = self.shutdown_tx.take() {
23                let _ = tx.send(());
24            }
25        }
26        poll
27    }
28}