rusted_pipe 0.0.2

Real time processing library for developing multithreaded ML pipelines, written in Rust.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
//! Module for the generic ReadChannel. A ReadChannel is used by processing nodes
//! to receive input data while waiting idle. Reach channels' main logic is to
//! allocate space for the incoming data and synchronize that data using the
//! user configured syncrhonizer.
use std::{
    sync::{Arc, PoisonError, RwLock},
    thread,
    time::Duration,
};

use crossbeam::channel::Sender;
use log::debug;

use crate::{
    buffers::{single_buffers::RtRingBuffer, synchronizers::PacketSynchronizer},
    graph::metrics::{BufferMonitor, BufferMonitorBuilder},
    packet::work_queue::WorkQueue,
};

use std::collections::HashMap;

use crate::{
    buffers::{single_buffers::FixedSizeBuffer, BufferIterator},
    packet::typed::PacketSetTrait,
    DataVersion,
};

use super::{ChannelError, ChannelID, Packet, ReadChannelTrait, ReceiverChannel};

/// A struct that holds a single FixedSizeBuffer and
/// an optional ReceiverChannel that maps its data into that buffer.
pub struct BufferReceiver<T: FixedSizeBuffer + ?Sized> {
    /// The fixed buffer implementation.
    pub buffer: Box<T>,
    /// An optional ReceiverChannel with the data type.
    /// It can be None if the channel is not yet connected.
    pub channel: Option<ReceiverChannel<T::Data>>,
}

impl<T: FixedSizeBuffer + ?Sized> BufferReceiver<T> {
    /// Link a receiver channel to a data transport. From now on
    /// the channel can start reading data.
    pub fn link(&mut self, receiver: ReceiverChannel<T::Data>) {
        if self.channel.is_some() {
            panic!("Channel is already linked!");
        }
        self.channel = Some(receiver);
    }

    /// Tries to read data from the data transport channel or an error
    /// it the channel has no connection yet.
    pub fn try_read(&mut self) -> Result<DataVersion, ChannelError> {
        if let Some(channel) = self.channel.as_ref() {
            let packet = channel.try_receive()?;
            let version = packet.version;
            self.buffer.insert(packet)?;
            return Ok(version);
        }
        Err(ChannelError::NotInitializedError)
    }
}

/// A trait for buffer data manipulation. Mostly used by the synchronizer
/// to find matching tuples.
pub trait ChannelBuffer {
    /// A list of named channels.
    fn available_channels(&self) -> Vec<&ChannelID>;
    /// True if a channel has the given data version.
    ///
    /// * Arguments
    /// `channel` - The name of the channel to inquire.
    /// `version` - The data version to find.
    fn has_version(&self, channel: &ChannelID, version: &DataVersion) -> bool;
    /// Gets the minimum version over all channels.
    fn max_version(&self) -> Option<&DataVersion>;
    /// Returns a reference of the head of the buffer in `channel`.
    ///
    /// * Arguments
    /// `channel` - The name of the channel to inquire.
    fn peek(&self, channel: &ChannelID) -> Option<&DataVersion>;
    /// Returns an iterator in `channel`.
    ///
    /// * Arguments
    /// `channel` - The name of the channel to inquire.
    fn iterator(&self, channel: &ChannelID) -> Option<Box<BufferIterator>>;
    /// Returns true if there is no data in any buffer.
    fn are_buffers_empty(&self) -> bool;
    /// Tries to read data for up to 'timeout' duration.
    ///
    /// * Arguments
    /// `timeout` - How long to wait for the data.
    fn try_receive(&mut self, timeout: Duration) -> Result<Option<&ChannelID>, ChannelError>;
    /// Waits for timeout for any channel to have data.
    ///
    /// * Arguments
    /// `timeout` - How long to wait for the data.
    ///
    /// * Returns
    /// true if there is dat a in any channel before timeout.
    fn wait_for_data(&self, timeout: Duration) -> Result<bool, ChannelError>;
}

/// A trait for generating packet set from an existing ReadChannel.
pub trait InputGenerator {
    type INPUT: PacketSetTrait + Send;
    /// Pulls the data specified in data_versions out from the buffers.
    /// For each channel it drops the data before the chosen version.
    ///
    /// * Arguments
    /// `data_versions` - A map containing a channel name an an optional data version.
    /// `exact_match` - Only returns data if an exact match exists.
    ///
    /// Returns None if the data cannot be fetched.
    fn get_packets_for_version(
        &mut self,
        data_versions: &HashMap<ChannelID, Option<DataVersion>>,
        exact_match: bool,
    ) -> Option<Self::INPUT>;

    fn create_channels(
        buffer_size: usize,
        block_on_full: bool,
        monitor: BufferMonitorBuilder,
    ) -> Self;
}

/// A generic ReadChannel that holds a reference to a struct that has
/// a set of trait for managing the internal channels.
pub struct ReadChannel<T: InputGenerator + ChannelBuffer + Send> {
    /// What synch strategy to use when trying to synchronize the buffers.
    pub synch_strategy: Box<dyn PacketSynchronizer>,
    /// A work queue that holds the already matched tuples.
    pub work_queue: Option<WorkQueue<T::INPUT>>,
    /// A reference to the channels of the ReadChannel.
    pub channels: Arc<RwLock<T>>,
}

unsafe impl<T: InputGenerator + ChannelBuffer + Send> Sync for ReadChannel<T> {}
unsafe impl<T: InputGenerator + ChannelBuffer + Send> Send for ReadChannel<T> {}

impl<T: InputGenerator + ChannelBuffer + Send + 'static> ReadChannelTrait for ReadChannel<T> {
    type Data = T::INPUT;

    fn read(&mut self, node_id: String, done_notification: Sender<String>) -> Option<ChannelID> {
        let data;

        {
            let read_locked = self.channels.read().unwrap_or_else(PoisonError::into_inner);
            let has_data = read_locked.wait_for_data(Duration::from_millis(50));
            if let Err(err) = has_data {
                eprintln!("Error while waiting for data {err} on channel {node_id}.");
                return None;
            }
            if let Ok(data) = has_data {
                if !data {
                    return None;
                }
            }
        }

        {
            let mut write_locked = self
                .channels
                .write()
                .unwrap_or_else(PoisonError::into_inner);
            let result = write_locked.try_receive(Duration::from_micros(50));

            data = match result {
                Ok(has_data) => has_data.cloned(),
                Err(err) => {
                    eprintln!("Node {node_id}: Exception while reading {err:?}");
                    match err {
                        crate::channels::ChannelError::ReceiveError(_) => {
                            if write_locked.are_buffers_empty() {
                                let _ = done_notification.send(node_id);
                            }
                            eprintln!("Channel is disonnected, closing");
                            thread::sleep(Duration::from_millis(100));
                            return None;
                        }
                        _ => {
                            if write_locked.are_buffers_empty() {
                                debug!("Sending done {node_id}");
                                let _ = done_notification.send(node_id);
                            }
                            None
                        }
                    }
                }
            };
        }

        if data.is_some() {
            self.synchronize()
        }
        data
    }

    fn start(&mut self, work_queue: WorkQueue<Self::Data>) {
        self.work_queue = Some(work_queue);
    }

    fn stop(&mut self) {}
}

impl<T: InputGenerator + ChannelBuffer + Send + 'static> ReadChannel<T> {
    pub fn new(
        synch_strategy: Box<dyn PacketSynchronizer>,
        work_queue: Option<WorkQueue<T::INPUT>>,
        channels: T,
    ) -> Self {
        ReadChannel {
            synch_strategy,
            work_queue,
            channels: Arc::new(RwLock::new(channels)),
        }
    }

    pub fn create(
        id: &str,
        block_channel_full: bool,
        channel_buffer_size: usize,
        process_buffer_size: usize,
        synch_strategy: Box<dyn PacketSynchronizer>,
        monitor: bool,
    ) -> Self {
        let mut monitor_builder = BufferMonitorBuilder::no_monitor();
        if monitor {
            monitor_builder = BufferMonitorBuilder::new(id);
        }
        let work_monitor = if monitor {
            monitor_builder.make_channel("_work_queue")
        } else {
            BufferMonitor::default()
        };

        let work_queue = Some(WorkQueue::<T::INPUT>::new(
            process_buffer_size,
            work_monitor,
        ));

        let channels = T::create_channels(channel_buffer_size, block_channel_full, monitor_builder);

        Self {
            synch_strategy,
            work_queue,
            channels: Arc::new(RwLock::new(channels)),
        }
    }

    pub fn synchronize(&mut self) {
        if let Some(queue) = self.work_queue.as_mut() {
            let synch = self.synch_strategy.synchronize(self.channels.clone());
            if let Some(sync) = synch {
                let mut channels = if let Ok(channels) = self.channels.write() {
                    channels
                } else {
                    return;
                };

                if let Some(value) = channels.get_packets_for_version(&sync, false) {
                    queue.push(value);
                }
            }
        }
    }
}

pub fn get_data<T>(
    buffer: &mut RtRingBuffer<T>,
    data_version: &Option<DataVersion>,
    exact_match: bool,
) -> Option<Packet<T>> {
    if data_version.is_none() {
        return None;
    }
    loop {
        let removed_packet = buffer.pop();

        if let Some(entry) = removed_packet {
            if let Some(data_version) = data_version {
                if entry.version == *data_version {
                    return Some(entry);
                } else if exact_match {
                    break;
                }
            }
            if exact_match {
                break;
            }
        } else {
            break;
        }
    }
    None
}

#[cfg(test)]
mod tests {
    use crate::buffers::single_buffers::RtRingBuffer;
    use crate::buffers::synchronizers::timestamp::TimestampSynchronizer;

    use crate::channels::read_channel::ReadChannel;
    use crate::channels::read_channel::ReadChannelTrait;
    use crate::channels::typed_channel;

    use crate::channels::SenderChannel;

    use crate::channels::typed_read_channel::ReadChannel2;

    use crate::graph::metrics::BufferMonitor;
    use crate::packet::typed::ReadChannel2PacketSet;
    use crate::packet::work_queue::WorkQueue;
    use crate::packet::Packet;
    use crate::DataVersion;

    fn create_typed_read_channel() -> (
        ReadChannel<ReadChannel2<String, String>>,
        SenderChannel<String>,
    ) {
        let synch_strategy = Box::<TimestampSynchronizer>::default();
        let read_channel2 = ReadChannel2::create(
            RtRingBuffer::<String>::new(2, true, BufferMonitor::default()),
            RtRingBuffer::<String>::new(2, true, BufferMonitor::default()),
        );
        let read_channel =
            ReadChannel::new(synch_strategy, Some(WorkQueue::default()), read_channel2);

        let (channel_sender, channel_receiver) = typed_channel::<String>();
        read_channel
            .channels
            .write()
            .unwrap()
            .c1()
            .link(channel_receiver);

        (read_channel, channel_sender)
    }

    #[test]
    fn test_read_channel_try_read_returns_ok_if_data() {
        let (mut read_channel, crossbeam_channels) = create_typed_read_channel();
        crossbeam_channels
            .send(Packet::new(
                "my_data".to_string(),
                DataVersion { timestamp_ns: 1 },
            ))
            .unwrap();
        read_channel.start(WorkQueue::default());
        assert_eq!(
            read_channel
                .channels
                .write()
                .unwrap()
                .c1()
                .try_read()
                .ok()
                .unwrap(),
            DataVersion { timestamp_ns: 1 }
        );
    }

    #[test]
    fn test_read_channel_try_read_returns_error_when_push_if_not_initialized() {
        let (read_channel, _) = create_typed_read_channel();
        assert!(read_channel
            .channels
            .write()
            .unwrap()
            .c1()
            .try_read()
            .is_err());
        assert!(read_channel
            .channels
            .write()
            .unwrap()
            .c2()
            .try_read()
            .is_err());
    }

    #[test]
    fn test_read_channel_try_read_returns_error_when_buffer_is_full() {
        let synch_strategy = Box::<TimestampSynchronizer>::default();
        let read_channel2 = ReadChannel2::create(
            RtRingBuffer::<String>::new(2, true, BufferMonitor::default()),
            RtRingBuffer::<String>::new(2, true, BufferMonitor::default()),
        );
        let mut read_channel = ReadChannel::new(
            synch_strategy,
            Some(WorkQueue::<ReadChannel2PacketSet<String, String>>::default()),
            read_channel2,
        );

        let (s1, channel_receiver) = typed_channel::<String>();
        read_channel
            .channels
            .write()
            .unwrap()
            .c1()
            .link(channel_receiver);

        let (_, channel_receiver) = typed_channel::<String>();
        read_channel
            .channels
            .write()
            .unwrap()
            .c2()
            .link(channel_receiver);

        let mut packet = Packet::new("my_data".to_string(), DataVersion { timestamp_ns: 1 });

        read_channel.start(WorkQueue::default());
        s1.send(packet.clone()).unwrap();

        packet.version.timestamp_ns = 2;
        s1.send(packet.clone()).unwrap();

        packet.version.timestamp_ns = 3;
        s1.send(packet).unwrap();
        //assert!(read_channel.read("c1".to_string(), done.0));
        assert!(read_channel
            .channels
            .write()
            .unwrap()
            .c1()
            .try_read()
            .is_ok());
        assert!(read_channel
            .channels
            .write()
            .unwrap()
            .c1()
            .try_read()
            .is_ok());
        assert!(read_channel
            .channels
            .write()
            .unwrap()
            .c1()
            .try_read()
            .is_err());
    }

    #[test]
    #[should_panic]
    fn test_read_channel_panics_if_already_linked() {
        let (_, channel_receiver) = typed_channel::<String>();
        let (read_channel, _) = create_typed_read_channel();
        read_channel
            .channels
            .write()
            .unwrap()
            .c1()
            .link(channel_receiver);
    }
}