use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use futures::Stream;
use tokio::sync::{mpsc, OwnedSemaphorePermit};
use super::{Runner, RunnerUpdate};
pub struct RunnerStream {
rx: mpsc::UnboundedReceiver<RunnerUpdate>,
runner: Arc<Runner>,
id: String,
completed: bool,
_permit: OwnedSemaphorePermit,
}
impl RunnerStream {
pub(super) fn new(
rx: mpsc::UnboundedReceiver<RunnerUpdate>,
runner: Arc<Runner>,
id: String,
permit: OwnedSemaphorePermit,
) -> Self {
Self {
rx,
runner,
id,
completed: false,
_permit: permit,
}
}
}
impl Stream for RunnerStream {
type Item = RunnerUpdate;
fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
match self.rx.poll_recv(cx) {
Poll::Ready(Some(update)) => {
if matches!(
update,
RunnerUpdate::End(_)
| RunnerUpdate::Fatal(_)
| RunnerUpdate::RunnerExited
) {
self.completed = true;
}
Poll::Ready(Some(update))
}
Poll::Ready(None) => {
self.completed = true;
Poll::Ready(None)
}
Poll::Pending => Poll::Pending,
}
}
}
impl Drop for RunnerStream {
fn drop(&mut self) {
if self.completed {
return;
}
let runner = self.runner.clone();
let id = std::mem::take(&mut self.id);
if tokio::runtime::Handle::try_current().is_ok() {
tokio::spawn(async move {
runner.unregister(&id).await;
});
}
}
}