ocl_stream/executor/
stream.rs

1/*
2 * opencl stream executor
3 * Copyright (C) 2021 trivernis
4 * See LICENSE for more information
5 */
6
7use crossbeam_channel::{Receiver, Sender};
8
9use crate::utils::result::{OCLStreamError, OCLStreamResult};
10use std::any::type_name;
11
12/// Creates a new OCLStream with the corresponding sender
13/// to communicate between the scheduler thread and the receiver thread
14pub fn unbounded<T>() -> (OCLStream<T>, OCLStreamSender<T>)
15where
16    T: Send + Sync,
17{
18    log::trace!("Creating unbounded channel of type {}", type_name::<T>());
19    let (tx, rx) = crossbeam_channel::unbounded();
20    let stream = OCLStream { rx };
21    let sender = OCLStreamSender { tx };
22
23    (stream, sender)
24}
25
26/// Creates a new OCLStream with the corresponding sender and a maximum capacity
27/// to communicate between the scheduler thread and the receiver thread
28pub fn bounded<T>(size: usize) -> (OCLStream<T>, OCLStreamSender<T>)
29where
30    T: Send + Sync,
31{
32    log::trace!(
33        "Creating bounded channel of type {} with size {}",
34        type_name::<T>(),
35        size
36    );
37
38    let (tx, rx) = crossbeam_channel::bounded(size);
39    let stream = OCLStream { rx };
40    let sender = OCLStreamSender { tx };
41
42    (stream, sender)
43}
44
45/// Receiver for OCL Data
46#[derive(Clone, Debug)]
47pub struct OCLStream<T>
48where
49    T: Send + Sync,
50{
51    rx: Receiver<OCLStreamResult<T>>,
52}
53
54impl<T> OCLStream<T>
55where
56    T: Send + Sync,
57{
58    /// Reads the next value from the channel
59    pub fn next(&mut self) -> Result<T, OCLStreamError> {
60        log::trace!("Retrieving next value...");
61        self.rx.recv()?
62    }
63
64    /// Returns if there is a value in the channel
65    pub fn has_next(&self) -> bool {
66        !self.rx.is_empty()
67    }
68}
69
70/// Sender for OCL Data
71pub struct OCLStreamSender<T>
72where
73    T: Send + Sync,
74{
75    tx: Sender<OCLStreamResult<T>>,
76}
77
78impl<T> Clone for OCLStreamSender<T>
79where
80    T: Send + Sync,
81{
82    fn clone(&self) -> Self {
83        Self {
84            tx: self.tx.clone(),
85        }
86    }
87}
88
89impl<T> OCLStreamSender<T>
90where
91    T: Send + Sync + 'static,
92{
93    /// Sends a value into the channel
94    pub fn send(&self, value: T) -> OCLStreamResult<()> {
95        log::trace!("Sending value into channel...");
96        self.tx.send(Ok(value)).map_err(OCLStreamError::from)
97    }
98
99    /// Sends an error into the channel
100    pub fn err(&self, err: OCLStreamError) -> OCLStreamResult<()> {
101        log::trace!("Sending error into channel...");
102        self.tx.send(Err(err)).map_err(OCLStreamError::from)
103    }
104}