wingfoil 4.0.0

graph based stream processing framework
Documentation
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::thread::{self, JoinHandle};

use crate::{
    ChannelReceiverStream, Element, MutableNode, RunMode, StreamPeekRef, UpStreams,
    channel::{ChannelSender, channel_pair},
};
use tinyvec::TinyVec;

enum State<T: Element + Send> {
    Func(Box<dyn Fn(ChannelSender<T>, Arc<AtomicBool>) -> anyhow::Result<()> + Send + 'static>),
    JoinHandle(JoinHandle<anyhow::Result<()>>),
    Empty,
}

impl<T: Element + Send> State<T> {
    pub fn start(&mut self, channel_sender: ChannelSender<T>, stop: Arc<AtomicBool>) {
        if let State::Func(f) = std::mem::replace(self, State::Empty) {
            let handle = thread::spawn(move || f(channel_sender, stop));
            *self = State::JoinHandle(handle);
        }
    }

    pub fn check_running(&mut self) -> anyhow::Result<()> {
        match &*self {
            State::JoinHandle(handle) if handle.is_finished() => {
                if let State::JoinHandle(handle) = std::mem::replace(self, State::Empty) {
                    return match handle.join() {
                        Err(e) => Err(anyhow::anyhow!("Receiver thread panicked: {e:?}")),
                        Ok(Err(e)) => Err(e),
                        Ok(Ok(())) => Err(anyhow::anyhow!("Receiver thread exited unexpectedly")),
                    };
                }
                Ok(())
            }
            State::JoinHandle(_) => Ok(()),
            _ => Err(anyhow::anyhow!("Receiver thread not running")),
        }
    }

    pub fn stop(&mut self) -> anyhow::Result<()> {
        match std::mem::replace(self, State::Empty) {
            State::JoinHandle(handle) => handle
                .join()
                .map_err(|e| anyhow::anyhow!("Thread panicked: {e:?}"))?,
            _ => Ok(()),
        }
    }
}

pub(crate) struct ReceiverStream<T: Element + Send> {
    inner: ChannelReceiverStream<T>,
    sender: Option<ChannelSender<T>>,
    state: State<T>,
    stop: Arc<AtomicBool>,
    assert_realtime: bool,
}

impl<T: Element + Send> MutableNode for ReceiverStream<T> {
    fn upstreams(&self) -> UpStreams {
        self.inner.upstreams()
    }

    fn cycle(&mut self, state: &mut crate::GraphState) -> anyhow::Result<bool> {
        self.state.check_running()?;
        self.inner.cycle(state)
    }

    fn setup(&mut self, state: &mut crate::GraphState) -> anyhow::Result<()> {
        let mut sender = self
            .sender
            .take()
            .ok_or_else(|| anyhow::anyhow!("missing sender"))?;
        if state.run_mode() == RunMode::RealTime {
            sender.set_notifier(state.ready_notifier());
        }
        self.state.start(sender, self.stop.clone());
        self.inner.setup(state)
    }

    fn start(&mut self, state: &mut crate::GraphState) -> anyhow::Result<()> {
        if self.assert_realtime && state.run_mode() != RunMode::RealTime {
            anyhow::bail!("ReceiverStream only supports real-time mode");
        }
        self.inner.start(state)
    }

    fn stop(&mut self, state: &mut crate::GraphState) -> anyhow::Result<()> {
        self.stop.store(true, Ordering::Relaxed);
        self.state.stop()?;
        self.inner.stop(state)
    }

    fn teardown(&mut self, state: &mut crate::GraphState) -> anyhow::Result<()> {
        self.inner.teardown(state)
    }
}

impl<T: Element + Send> StreamPeekRef<TinyVec<[T; 1]>> for ReceiverStream<T> {
    fn peek_ref(&self) -> &TinyVec<[T; 1]> {
        self.inner.peek_ref()
    }
}

impl<T: Element + Send> ReceiverStream<T> {
    pub(crate) fn new(
        f: impl Fn(ChannelSender<T>, Arc<AtomicBool>) -> anyhow::Result<()> + Send + 'static,
        assert_realtime: bool,
    ) -> Self {
        let (sender, receiver) = channel_pair(None);
        let inner = ChannelReceiverStream::new(receiver, None, None);
        let sender = Some(sender);
        let stop = Arc::new(AtomicBool::new(false));
        let state = State::Func(Box::new(f));
        Self {
            inner,
            sender,
            state,
            stop,
            assert_realtime,
        }
    }
}