use std::{
sync::{Arc, PoisonError, RwLock},
thread,
time::Duration,
};
use crossbeam::channel::Sender;
use log::debug;
use crate::{
buffers::{single_buffers::RtRingBuffer, synchronizers::PacketSynchronizer},
graph::metrics::{BufferMonitor, BufferMonitorBuilder},
packet::work_queue::WorkQueue,
};
use std::collections::HashMap;
use crate::{
buffers::{single_buffers::FixedSizeBuffer, BufferIterator},
packet::typed::PacketSetTrait,
DataVersion,
};
use super::{ChannelError, ChannelID, Packet, ReadChannelTrait, ReceiverChannel};
pub struct BufferReceiver<T: FixedSizeBuffer + ?Sized> {
pub buffer: Box<T>,
pub channel: Option<ReceiverChannel<T::Data>>,
}
impl<T: FixedSizeBuffer + ?Sized> BufferReceiver<T> {
pub fn link(&mut self, receiver: ReceiverChannel<T::Data>) {
if self.channel.is_some() {
panic!("Channel is already linked!");
}
self.channel = Some(receiver);
}
pub fn try_read(&mut self) -> Result<DataVersion, ChannelError> {
if let Some(channel) = self.channel.as_ref() {
let packet = channel.try_receive()?;
let version = packet.version;
self.buffer.insert(packet)?;
return Ok(version);
}
Err(ChannelError::NotInitializedError)
}
}
pub trait ChannelBuffer {
fn available_channels(&self) -> Vec<&ChannelID>;
fn has_version(&self, channel: &ChannelID, version: &DataVersion) -> bool;
fn max_version(&self) -> Option<&DataVersion>;
fn peek(&self, channel: &ChannelID) -> Option<&DataVersion>;
fn iterator(&self, channel: &ChannelID) -> Option<Box<BufferIterator>>;
fn are_buffers_empty(&self) -> bool;
fn try_receive(&mut self, timeout: Duration) -> Result<Option<&ChannelID>, ChannelError>;
fn wait_for_data(&self, timeout: Duration) -> Result<bool, ChannelError>;
}
pub trait InputGenerator {
type INPUT: PacketSetTrait + Send;
fn get_packets_for_version(
&mut self,
data_versions: &HashMap<ChannelID, Option<DataVersion>>,
exact_match: bool,
) -> Option<Self::INPUT>;
fn create_channels(
buffer_size: usize,
block_on_full: bool,
monitor: BufferMonitorBuilder,
) -> Self;
}
pub struct ReadChannel<T: InputGenerator + ChannelBuffer + Send> {
pub synch_strategy: Box<dyn PacketSynchronizer>,
pub work_queue: Option<WorkQueue<T::INPUT>>,
pub channels: Arc<RwLock<T>>,
}
unsafe impl<T: InputGenerator + ChannelBuffer + Send> Sync for ReadChannel<T> {}
unsafe impl<T: InputGenerator + ChannelBuffer + Send> Send for ReadChannel<T> {}
impl<T: InputGenerator + ChannelBuffer + Send + 'static> ReadChannelTrait for ReadChannel<T> {
type Data = T::INPUT;
fn read(&mut self, node_id: String, done_notification: Sender<String>) -> Option<ChannelID> {
let data;
{
let read_locked = self.channels.read().unwrap_or_else(PoisonError::into_inner);
let has_data = read_locked.wait_for_data(Duration::from_millis(50));
if let Err(err) = has_data {
eprintln!("Error while waiting for data {err} on channel {node_id}.");
return None;
}
if let Ok(data) = has_data {
if !data {
return None;
}
}
}
{
let mut write_locked = self
.channels
.write()
.unwrap_or_else(PoisonError::into_inner);
let result = write_locked.try_receive(Duration::from_micros(50));
data = match result {
Ok(has_data) => has_data.cloned(),
Err(err) => {
eprintln!("Node {node_id}: Exception while reading {err:?}");
match err {
crate::channels::ChannelError::ReceiveError(_) => {
if write_locked.are_buffers_empty() {
let _ = done_notification.send(node_id);
}
eprintln!("Channel is disonnected, closing");
thread::sleep(Duration::from_millis(100));
return None;
}
_ => {
if write_locked.are_buffers_empty() {
debug!("Sending done {node_id}");
let _ = done_notification.send(node_id);
}
None
}
}
}
};
}
if data.is_some() {
self.synchronize()
}
data
}
fn start(&mut self, work_queue: WorkQueue<Self::Data>) {
self.work_queue = Some(work_queue);
}
fn stop(&mut self) {}
}
impl<T: InputGenerator + ChannelBuffer + Send + 'static> ReadChannel<T> {
pub fn new(
synch_strategy: Box<dyn PacketSynchronizer>,
work_queue: Option<WorkQueue<T::INPUT>>,
channels: T,
) -> Self {
ReadChannel {
synch_strategy,
work_queue,
channels: Arc::new(RwLock::new(channels)),
}
}
pub fn create(
id: &str,
block_channel_full: bool,
channel_buffer_size: usize,
process_buffer_size: usize,
synch_strategy: Box<dyn PacketSynchronizer>,
monitor: bool,
) -> Self {
let mut monitor_builder = BufferMonitorBuilder::no_monitor();
if monitor {
monitor_builder = BufferMonitorBuilder::new(id);
}
let work_monitor = if monitor {
monitor_builder.make_channel("_work_queue")
} else {
BufferMonitor::default()
};
let work_queue = Some(WorkQueue::<T::INPUT>::new(
process_buffer_size,
work_monitor,
));
let channels = T::create_channels(channel_buffer_size, block_channel_full, monitor_builder);
Self {
synch_strategy,
work_queue,
channels: Arc::new(RwLock::new(channels)),
}
}
pub fn synchronize(&mut self) {
if let Some(queue) = self.work_queue.as_mut() {
let synch = self.synch_strategy.synchronize(self.channels.clone());
if let Some(sync) = synch {
let mut channels = if let Ok(channels) = self.channels.write() {
channels
} else {
return;
};
if let Some(value) = channels.get_packets_for_version(&sync, false) {
queue.push(value);
}
}
}
}
}
pub fn get_data<T>(
buffer: &mut RtRingBuffer<T>,
data_version: &Option<DataVersion>,
exact_match: bool,
) -> Option<Packet<T>> {
if data_version.is_none() {
return None;
}
loop {
let removed_packet = buffer.pop();
if let Some(entry) = removed_packet {
if let Some(data_version) = data_version {
if entry.version == *data_version {
return Some(entry);
} else if exact_match {
break;
}
}
if exact_match {
break;
}
} else {
break;
}
}
None
}
#[cfg(test)]
mod tests {
use crate::buffers::single_buffers::RtRingBuffer;
use crate::buffers::synchronizers::timestamp::TimestampSynchronizer;
use crate::channels::read_channel::ReadChannel;
use crate::channels::read_channel::ReadChannelTrait;
use crate::channels::typed_channel;
use crate::channels::SenderChannel;
use crate::channels::typed_read_channel::ReadChannel2;
use crate::graph::metrics::BufferMonitor;
use crate::packet::typed::ReadChannel2PacketSet;
use crate::packet::work_queue::WorkQueue;
use crate::packet::Packet;
use crate::DataVersion;
fn create_typed_read_channel() -> (
ReadChannel<ReadChannel2<String, String>>,
SenderChannel<String>,
) {
let synch_strategy = Box::<TimestampSynchronizer>::default();
let read_channel2 = ReadChannel2::create(
RtRingBuffer::<String>::new(2, true, BufferMonitor::default()),
RtRingBuffer::<String>::new(2, true, BufferMonitor::default()),
);
let read_channel =
ReadChannel::new(synch_strategy, Some(WorkQueue::default()), read_channel2);
let (channel_sender, channel_receiver) = typed_channel::<String>();
read_channel
.channels
.write()
.unwrap()
.c1()
.link(channel_receiver);
(read_channel, channel_sender)
}
#[test]
fn test_read_channel_try_read_returns_ok_if_data() {
let (mut read_channel, crossbeam_channels) = create_typed_read_channel();
crossbeam_channels
.send(Packet::new(
"my_data".to_string(),
DataVersion { timestamp_ns: 1 },
))
.unwrap();
read_channel.start(WorkQueue::default());
assert_eq!(
read_channel
.channels
.write()
.unwrap()
.c1()
.try_read()
.ok()
.unwrap(),
DataVersion { timestamp_ns: 1 }
);
}
#[test]
fn test_read_channel_try_read_returns_error_when_push_if_not_initialized() {
let (read_channel, _) = create_typed_read_channel();
assert!(read_channel
.channels
.write()
.unwrap()
.c1()
.try_read()
.is_err());
assert!(read_channel
.channels
.write()
.unwrap()
.c2()
.try_read()
.is_err());
}
#[test]
fn test_read_channel_try_read_returns_error_when_buffer_is_full() {
let synch_strategy = Box::<TimestampSynchronizer>::default();
let read_channel2 = ReadChannel2::create(
RtRingBuffer::<String>::new(2, true, BufferMonitor::default()),
RtRingBuffer::<String>::new(2, true, BufferMonitor::default()),
);
let mut read_channel = ReadChannel::new(
synch_strategy,
Some(WorkQueue::<ReadChannel2PacketSet<String, String>>::default()),
read_channel2,
);
let (s1, channel_receiver) = typed_channel::<String>();
read_channel
.channels
.write()
.unwrap()
.c1()
.link(channel_receiver);
let (_, channel_receiver) = typed_channel::<String>();
read_channel
.channels
.write()
.unwrap()
.c2()
.link(channel_receiver);
let mut packet = Packet::new("my_data".to_string(), DataVersion { timestamp_ns: 1 });
read_channel.start(WorkQueue::default());
s1.send(packet.clone()).unwrap();
packet.version.timestamp_ns = 2;
s1.send(packet.clone()).unwrap();
packet.version.timestamp_ns = 3;
s1.send(packet).unwrap();
assert!(read_channel
.channels
.write()
.unwrap()
.c1()
.try_read()
.is_ok());
assert!(read_channel
.channels
.write()
.unwrap()
.c1()
.try_read()
.is_ok());
assert!(read_channel
.channels
.write()
.unwrap()
.c1()
.try_read()
.is_err());
}
#[test]
#[should_panic]
fn test_read_channel_panics_if_already_linked() {
let (_, channel_receiver) = typed_channel::<String>();
let (read_channel, _) = create_typed_read_channel();
read_channel
.channels
.write()
.unwrap()
.c1()
.link(channel_receiver);
}
}