datum-core 0.6.0

Rust stream-processing library mirroring Akka/Pekko Streams Typed, built on Ractor actors
Documentation
use super::*;

#[derive(Clone)]
pub struct Cancellable {
    pub(super) cancelled: Arc<AtomicBool>,
    _keep_alive: Option<Arc<dyn Send + Sync>>,
}

impl Cancellable {
    pub(super) fn new_with_keep_alive(keep_alive: Option<Arc<dyn Send + Sync>>) -> Self {
        Self {
            cancelled: Arc::new(AtomicBool::new(false)),
            _keep_alive: keep_alive,
        }
    }

    pub fn cancel(&self) -> bool {
        !self.cancelled.swap(true, Ordering::SeqCst)
    }

    #[must_use]
    pub fn is_cancelled(&self) -> bool {
        self.cancelled.load(Ordering::SeqCst)
    }
}

impl fmt::Debug for Cancellable {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        f.debug_struct("Cancellable").finish_non_exhaustive()
    }
}

#[must_use = "dropping the StreamCompletion cancels the running stream; call .wait()/.try_wait() or keep it alive"]
pub struct StreamCompletion<T> {
    state: StreamCompletionState<T>,
    cancel_on_drop: Option<StreamCancellation>,
}

enum StreamCompletionState<T> {
    Ready(Option<StreamResult<T>>),
    Receiver(oneshot::Receiver<StreamResult<T>>),
}

#[derive(Clone)]
pub(crate) struct StreamCancellation {
    cancelled: Arc<AtomicBool>,
    worker: Arc<Mutex<Option<thread::Thread>>>,
}

pub(super) struct RegisteredStreamWorker {
    cancellation: StreamCancellation,
}

impl StreamCancellation {
    pub(super) fn new() -> Self {
        Self {
            cancelled: Arc::new(AtomicBool::new(false)),
            worker: Arc::new(Mutex::new(None)),
        }
    }

    pub(crate) fn for_external_completion() -> Self {
        Self::new()
    }

    pub(crate) fn cancelled(&self) -> Arc<AtomicBool> {
        Arc::clone(&self.cancelled)
    }

    pub(super) fn register_current_worker(&self) -> RegisteredStreamWorker {
        *self.worker.lock().expect("stream worker slot poisoned") = Some(thread::current());
        RegisteredStreamWorker {
            cancellation: self.clone(),
        }
    }

    fn cancel(&self) {
        self.cancelled.store(true, Ordering::SeqCst);
        let worker = self
            .worker
            .lock()
            .expect("stream worker slot poisoned")
            .clone();
        if let Some(worker) = worker {
            worker.unpark();
        }
    }
}

impl Drop for RegisteredStreamWorker {
    fn drop(&mut self) {
        *self
            .cancellation
            .worker
            .lock()
            .expect("stream worker slot poisoned") = None;
    }
}

impl<T> StreamCompletion<T> {
    pub(crate) fn from_receiver(
        receiver: oneshot::Receiver<StreamResult<T>>,
        cancel_on_drop: Option<StreamCancellation>,
    ) -> Self {
        Self {
            state: StreamCompletionState::Receiver(receiver),
            cancel_on_drop,
        }
    }

    pub(crate) fn ready(result: StreamResult<T>) -> Self {
        Self {
            state: StreamCompletionState::Ready(Some(result)),
            cancel_on_drop: None,
        }
    }

    pub fn wait(self) -> StreamResult<T> {
        block_on(self)
    }

    #[must_use]
    pub fn try_wait(&mut self) -> Option<StreamResult<T>> {
        match &mut self.state {
            StreamCompletionState::Ready(result) => result.take(),
            StreamCompletionState::Receiver(receiver) => match receiver.try_recv() {
                Ok(Some(result)) => Some(result),
                Ok(None) => None,
                Err(_) => Some(Err(StreamError::AbruptTermination)),
            },
        }
    }
}

impl<T> Future for StreamCompletion<T> {
    type Output = StreamResult<T>;

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        match &mut self.state {
            StreamCompletionState::Ready(result) => {
                Poll::Ready(result.take().unwrap_or(Err(StreamError::AbruptTermination)))
            }
            StreamCompletionState::Receiver(receiver) => match Pin::new(receiver).poll(cx) {
                Poll::Ready(Ok(result)) => Poll::Ready(result),
                Poll::Ready(Err(_)) => Poll::Ready(Err(StreamError::AbruptTermination)),
                Poll::Pending => Poll::Pending,
            },
        }
    }
}

impl<T> Unpin for StreamCompletion<T> {}

impl<T> Drop for StreamCompletion<T> {
    fn drop(&mut self) {
        if let Some(cancellation) = &self.cancel_on_drop {
            cancellation.cancel();
        }
    }
}

impl<T> fmt::Debug for StreamCompletion<T> {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        f.debug_struct("StreamCompletion").finish_non_exhaustive()
    }
}