ocl_stream/executor/
stream.rs1use crossbeam_channel::{Receiver, Sender};
8
9use crate::utils::result::{OCLStreamError, OCLStreamResult};
10use std::any::type_name;
11
12pub fn unbounded<T>() -> (OCLStream<T>, OCLStreamSender<T>)
15where
16 T: Send + Sync,
17{
18 log::trace!("Creating unbounded channel of type {}", type_name::<T>());
19 let (tx, rx) = crossbeam_channel::unbounded();
20 let stream = OCLStream { rx };
21 let sender = OCLStreamSender { tx };
22
23 (stream, sender)
24}
25
26pub fn bounded<T>(size: usize) -> (OCLStream<T>, OCLStreamSender<T>)
29where
30 T: Send + Sync,
31{
32 log::trace!(
33 "Creating bounded channel of type {} with size {}",
34 type_name::<T>(),
35 size
36 );
37
38 let (tx, rx) = crossbeam_channel::bounded(size);
39 let stream = OCLStream { rx };
40 let sender = OCLStreamSender { tx };
41
42 (stream, sender)
43}
44
45#[derive(Clone, Debug)]
47pub struct OCLStream<T>
48where
49 T: Send + Sync,
50{
51 rx: Receiver<OCLStreamResult<T>>,
52}
53
54impl<T> OCLStream<T>
55where
56 T: Send + Sync,
57{
58 pub fn next(&mut self) -> Result<T, OCLStreamError> {
60 log::trace!("Retrieving next value...");
61 self.rx.recv()?
62 }
63
64 pub fn has_next(&self) -> bool {
66 !self.rx.is_empty()
67 }
68}
69
70pub struct OCLStreamSender<T>
72where
73 T: Send + Sync,
74{
75 tx: Sender<OCLStreamResult<T>>,
76}
77
78impl<T> Clone for OCLStreamSender<T>
79where
80 T: Send + Sync,
81{
82 fn clone(&self) -> Self {
83 Self {
84 tx: self.tx.clone(),
85 }
86 }
87}
88
89impl<T> OCLStreamSender<T>
90where
91 T: Send + Sync + 'static,
92{
93 pub fn send(&self, value: T) -> OCLStreamResult<()> {
95 log::trace!("Sending value into channel...");
96 self.tx.send(Ok(value)).map_err(OCLStreamError::from)
97 }
98
99 pub fn err(&self, err: OCLStreamError) -> OCLStreamResult<()> {
101 log::trace!("Sending error into channel...");
102 self.tx.send(Err(err)).map_err(OCLStreamError::from)
103 }
104}