rusted_pipe 0.0.2

Real time processing library for developing multithreaded ML pipelines, written in Rust.
Documentation
use super::{
    build::{ProcessorWorker, WorkerStatus},
    metrics::ProfilerTag,
    processor::Processors,
};
use crate::channels::ReadChannelTrait;
use crate::channels::WriteChannelTrait;
use crate::graph::build::GraphStatus;
use crate::{
    channels::read_channel::{ChannelBuffer, InputGenerator},
    RustedPipeError,
};
use crate::{
    channels::{read_channel::ReadChannel, typed_write_channel::TypedWriteChannel},
    packet::work_queue::WorkQueue,
};
use atomic::{Atomic, Ordering};
use crossbeam::channel::Sender;
use lazy_static::lazy_static;
use log::{debug, warn};
use prometheus::{histogram_opts, register_histogram_vec};
use prometheus::{Histogram, HistogramVec};
use rusty_pool::ThreadPool;
use std::{
    sync::{Arc, Condvar, Mutex, PoisonError},
    thread,
    time::Duration,
};

lazy_static! {
    static ref METRICS_TIMER: HistogramVec = register_histogram_vec!(
        histogram_opts!(
            "processing_time",
            format!("Timing for a single processor run."),
        ),
        &["node_id"]
    )
    .expect("Cannot create processing_time metrics");
}

pub(super) fn read_channel_data<T: InputGenerator + ChannelBuffer + Send>(
    id: String,
    running: Arc<Atomic<GraphStatus>>,
    mut read_channel: ReadChannel<T>,
    done_notification: Sender<String>,
) where
    T: ChannelBuffer + 'static,
{
    let id = id;
    while running.load(Ordering::Relaxed) != GraphStatus::Terminating {
        read_channel.read(id.clone(), done_notification.clone());
    }
    read_channel.stop();
}

pub(super) type Wait = Arc<(Mutex<WorkerStatus>, Condvar)>;

pub(super) struct ConsumerThread<INPUT, OUTPUT>
where
    INPUT: InputGenerator + ChannelBuffer + Send + 'static,
    OUTPUT: WriteChannelTrait + 'static + Send,
{
    id: String,
    running: Arc<Atomic<GraphStatus>>,
    _free: Wait,
    done_notification: Sender<String>,
    thread_pool: ThreadPool,
    metrics_timer: Histogram,
    profiler: Arc<ProfilerTag>,
    shared_writer: Option<Arc<Mutex<TypedWriteChannel<OUTPUT>>>>,
    shared_processor: Arc<Mutex<Processors<INPUT, OUTPUT>>>,
    status: Arc<Atomic<WorkerStatus>>,
    work_queue: Option<WorkQueue<INPUT::INPUT>>,
}

impl<INPUT, OUTPUT> ConsumerThread<INPUT, OUTPUT>
where
    INPUT: InputGenerator + ChannelBuffer + Send + 'static,
    OUTPUT: WriteChannelTrait + 'static + Send,
{
    pub(super) fn new(
        id: String,
        running: Arc<Atomic<GraphStatus>>,
        free: Wait,
        worker: ProcessorWorker<INPUT, OUTPUT>,
        done_notification: Sender<String>,
        thread_pool: ThreadPool,
        profiler: ProfilerTag,
    ) -> Self {
        let metrics_timer = METRICS_TIMER.with_label_values(&[&id]);

        let mut shared_writer = None;
        if let Some(channel) = worker.write_channel {
            shared_writer = Some(Arc::new(Mutex::new(channel)));
        }

        let shared_processor = Arc::new(Mutex::new(worker.processor));
        let status = Arc::new(Atomic::new(WorkerStatus::Idle));
        let work_queue = worker.work_queue;
        Self {
            id,
            running,
            _free: free,
            done_notification,
            thread_pool,
            metrics_timer,
            profiler: Arc::new(profiler),
            shared_writer,
            shared_processor,
            status,
            work_queue,
        }
    }

    pub(super) fn consume(&mut self) {
        while self.running.load(Ordering::Relaxed) != GraphStatus::Terminating {
            if self.status.load(Ordering::Relaxed) == WorkerStatus::Idle {
                let lock_status = self.status.clone();

                let mut packet = None;
                if let Some(work_queue) = self.work_queue.as_mut() {
                    let task = work_queue.get(Some(Duration::from_millis(100)));
                    if let Ok(read_event) = task {
                        packet = Some(read_event.packet_data);
                    } else {
                        if self.running.load(Ordering::Relaxed)
                            == GraphStatus::WaitingForDataToTerminate
                        {
                            debug!("Sending done {}", self.id);
                            let _ = self.done_notification.send(self.id.clone());
                        }

                        continue;
                    }
                }
                self.status.store(WorkerStatus::Running, Ordering::Relaxed);

                let processor_clone = self.shared_processor.clone();
                let profiler_clone = self.profiler.clone();
                let id_thread = self.id.clone();
                let arc_write_channel = self.shared_writer.clone();
                let done_clone = self.done_notification.clone();
                let metrics_clone = self.metrics_timer.clone();

                let future = move || {
                    profiler_clone.add("consumer".to_string(), id_thread.clone());
                    let timer = metrics_clone.start_timer();
                    let result = match &mut *processor_clone
                        .lock()
                        .unwrap_or_else(PoisonError::into_inner)
                    {
                        Processors::Processor(proc) => {
                            if let Some(packet) = packet {
                                let write_channel = arc_write_channel.expect(&format!(
                                    "Consumer thread for node {} was created without write channel",
                                    id_thread
                                ));
                                let write_channel =
                                    write_channel.lock().unwrap_or_else(PoisonError::into_inner);

                                proc.handle(packet, write_channel)
                            } else {
                                warn!("Packet is None, not processing");
                                return;
                            }
                        }
                        Processors::TerminalProcessor(proc) => {
                            if let Some(packet) = packet {
                                proc.handle(packet)
                            } else {
                                warn!("Packet is None, not processing");
                                return;
                            }
                        }
                        Processors::SourceProcessor(proc) => {
                            let write_channel = arc_write_channel.expect(&format!(
                                "Consumer thread for node {} was created without write channel",
                                id_thread
                            ));
                            let write_channel =
                                write_channel.lock().unwrap_or_else(PoisonError::into_inner);

                            proc.handle(write_channel)
                        }
                    };

                    profiler_clone.remove("consumer".to_string(), id_thread.clone());
                    timer.observe_duration();
                    match result {
                        Ok(_) => lock_status.store(WorkerStatus::Idle, Ordering::Relaxed),
                        Err(RustedPipeError::EndOfStream()) => {
                            eprintln!("Terminating worker {id_thread:?}");
                            lock_status.store(WorkerStatus::Terminating, Ordering::Relaxed);
                            let _ = done_clone.send(id_thread.clone());
                        }
                        Err(err) => {
                            eprintln!("Error in worker {id_thread:?}: {err:?}");
                            lock_status.store(WorkerStatus::Terminating, Ordering::Relaxed);
                        }
                    };
                };

                let handle = self.thread_pool.evaluate(future);
                if handle.try_await_complete().is_err() {
                    eprintln!("Thread panicked in worker {:?}", self.id.clone());
                    self.status.store(WorkerStatus::Idle, Ordering::Relaxed);
                }
            } else {
                thread::sleep(Duration::from_millis(100));
                if self.running.load(Ordering::Relaxed) == GraphStatus::WaitingForDataToTerminate {
                    debug!("Sending done {}", self.id);
                    let _ = self.done_notification.send(self.id.clone());
                }
            }
        }
        println!("Worker {} exited", self.id);
    }
}