use std::thread::JoinHandle;
use crate::element_traits::Element;
use crossbeam_channel::{bounded, unbounded, Receiver, Sender};
pub mod error;
use error::Error;
pub enum Data {
Text(String),
None,
}
#[derive(Debug)]
pub enum Message {
Iter,
IterFin,
Quit,
Finished,
}
pub enum Datagram {
Message(Message),
Data(Data),
}
#[derive(Default)]
pub struct Parent {
msg_sender: Option<Sender<Message>>,
}
impl Parent {
pub fn new(msg_sender: Sender<Message>) -> Self {
Self {
msg_sender: Some(msg_sender),
}
}
fn send_msg(&self, msg: Message) -> Result<(), Error> {
match self.msg_sender.as_ref() {
Some(msg_sender) => msg_sender.send(msg).map_err(|_| Error::MessageParentFailed),
None => Err(Error::NoParentMessageSender),
}
}
pub fn send_finished(&self) -> Result<(), Error> {
self.send_msg(Message::Finished)
}
pub fn send_iter_fin(&self) -> Result<(), Error> {
self.send_msg(Message::IterFin)
}
}
#[derive(Default)]
pub struct SinkPipe {
pub element: Option<Box<dyn Element>>,
pub thread_handle: Option<JoinHandle<()>>,
pub datagram_sender: Option<Sender<Datagram>>,
pub msg_receiver: Option<Receiver<Message>>,
}
impl SinkPipe {
pub fn set_element(&mut self, element: impl Element + 'static) {
self.element = Some(Box::new(element));
}
pub fn send_quit(&self) -> Result<(), Error> {
match self.datagram_sender.as_ref() {
Some(msg_sender) => msg_sender
.send(Datagram::Message(Message::Quit))
.map_err(|_| Error::MessageSinkFailed),
None => Err(Error::NoSinkMessageSender),
}
}
pub fn join_thread(&mut self) -> Result<(), Error> {
match self.thread_handle.take() {
Some(join_handle) => join_handle.join().map_err(|_| Error::FailedToJoinThread),
None => Err(Error::NoThreadHandle),
}
}
pub fn drop_data_sender(&mut self) {
self.datagram_sender.take();
}
}
pub struct Pipeline {
head: SinkPipe,
}
impl Pipeline {
pub fn new(element: impl Element + 'static) -> Self {
let mut head = SinkPipe::default();
head.set_element(element);
Self { head }
}
pub fn init(&mut self) -> Result<(), Error> {
let (datagram_sender, datagram_receiver) = bounded(0);
let (msg_sender, my_msg_receiver) = unbounded();
let parent = Parent::new(msg_sender);
let mut sink_element = match self.head.element.take() {
Some(elm) => elm,
None => {
return Err(Error::NoSinkElement);
}
};
sink_element.set_parent(parent);
let datagram_receiver_clone = datagram_receiver.clone();
self.head.thread_handle = Some(std::thread::spawn(move || {
match sink_element.run(datagram_receiver_clone) {
Ok(_) => {}
Err(e) => println!("PIPELINE: Error occurred running sink element: {e}"),
}
}));
self.head.msg_receiver = Some(my_msg_receiver);
self.head.datagram_sender = Some(datagram_sender);
Ok(())
}
pub fn iter(&self) -> Result<(), Error> {
if self.head.thread_handle.is_none() {
return Err(Error::PipelineNotReady);
}
if let Some(datagram_sender) = &self.head.datagram_sender {
datagram_sender
.send(Datagram::Message(Message::Iter))
.map_err(|_| Error::MessageSinkFailed)?;
} else {
return Err(Error::NoSinkDatagramSender);
}
if let Some(message_receiver) = &self.head.msg_receiver {
match message_receiver
.recv()
.map_err(|_| Error::ReceiveFromSinkFailed)?
{
Message::IterFin | Message::Finished => Ok(()),
_ => Err(Error::ReceivedInvalidDatagramFromSink),
}
} else {
Err(Error::NoSinkMessageReceiver)
}
}
pub fn de_init(&mut self) -> Result<(), Error> {
self.head.send_quit()?;
self.head.join_thread()
}
}