//! Overarching structure to group `JoinPattern`s and their associated channels
//! together. Main structure for the public interface, used to create new
//! channels and construct `JoinPattern`s based on them.
use std::any::Any;
use std::ops::Drop;
use std::sync::mpsc::{channel, RecvError, Sender};
use super::channels::{BidirChannel, RecvChannel, SendChannel};
use super::controller::Controller;
use super::patterns::unary::{BidirPartialPattern, RecvPartialPattern, SendPartialPattern};
use super::types::{ids, ControllerHandle, Packet};
/// Struct managing the creation of new channels and Join Patterns.
///
/// This struct is used to group channels, such as `SendChannel`, which can
/// be used in conjunction to create new Join Patterns. As such, it offers
/// methods to create new channels which are then directly linked to this
/// `Junction`. It also offers methods to start off the creation of new
/// Join Patterns that rely on the channels created by this struct and can,
/// in fact, only consist of channels associated with this struct.
pub struct Junction {
id: ids::JunctionId,
controller_handle: Option<ControllerHandle>,
sender: Sender<Packet>,
}
#[allow(clippy::new_without_default)]
impl Junction {
/// Create a new `Junction` and start control thread in background.
///
/// Create a new `Junction` and spawn a control thread in the background
/// that will handle all incoming `Packet`s for this `Junction`. A
/// `JoinHandle` to this control thread is stored alongside the `Junction`.
pub fn new() -> Junction {
let (sender, receiver) = channel::<Packet>();
let controller = Controller::new();
Junction {
id: ids::JunctionId::new(),
controller_handle: Some(controller.start(sender.clone(), receiver)),
sender,
}
}
/// Return handle to internal `Controller` if available.
///
/// Each `Junction` has an associated control thread with a `Controller`
/// running to handle incoming `Packet`s. This `Controller` is gracefully
/// stopped and its thread joined as soon as the `Junction` goes out of
/// scope. However, as this is sometimes undesired behavior, the user can
/// retrieve the handle to the `Junction`'s `Controller` and its thread
/// with this function and stop the `Controller` at a time of their
/// choosing. Once this function has executed, the `Junction` will no
/// long automatically stop its `Controller` and join the control thread
/// upon going out of scope.
///
/// Note that this handle can only be retrieved once.
pub fn controller_handle(&mut self) -> Option<ControllerHandle> {
self.controller_handle.take()
}
/// Create and return a new `SendChannel` on this `Junction`.
///
/// The generic parameter `T` is used to determine the type of values
/// that can be sent on this channel.
///
/// # Panics
///
/// Panics if it received an error while trying to receive a new
/// channel ID from the control thread.
pub fn send_channel<T>(&self) -> SendChannel<T>
where
T: Any + Send,
{
SendChannel::new(self.new_channel_id().unwrap(), self.id, self.sender.clone())
}
/// Create and return a new `RecvChannel` on this `Junction`.
///
/// The generic parameter `R` is used to determine the type of values
/// that can be received on this channel.
///
/// # Panics
///
/// Panics if it received an error while trying to receive a new
/// channel ID from the control thread.
pub fn recv_channel<R>(&self) -> RecvChannel<R>
where
R: Any + Send,
{
RecvChannel::new(self.new_channel_id().unwrap(), self.id, self.sender.clone())
}
/// Create and return a new `BidirChannel` on this `Junction`.
///
/// The generic parameter `T` is used to determine the type of values
/// that can be sent on this channel while `R` is used to determine
/// the type of values that can be received on this channel.
///
/// # Panics
///
/// Panics if it received an error while trying to receive the new
/// channel IDs from the control thread.
pub fn bidir_channel<T, R>(&self) -> BidirChannel<T, R>
where
T: Any + Send,
R: Any + Send,
{
BidirChannel::new(self.new_channel_id().unwrap(), self.id, self.sender.clone())
}
/// Request ID for a new channel from control thread.
///
/// # Panics
///
/// Panics if request for new channel id could not be sent to
/// control thread.
fn new_channel_id(&self) -> Result<ids::ChannelId, RecvError> {
let (id_sender, id_receiver) = channel::<ids::ChannelId>();
self.sender
.send(Packet::NewChannelIdRequest {
return_sender: id_sender,
})
.unwrap();
id_receiver.recv()
}
/// Create new partial Join Pattern starting with a `SendChannel`.
///
/// # Panics
///
/// Panics if the supplied `SendChannel` does not carry the same
/// `JunctionID` as this `Junction`, i.e. has not been created by and is
/// associated with this `Junction`.
pub fn when<T>(&self, send_channel: &SendChannel<T>) -> SendPartialPattern<T>
where
T: Any + Send,
{
if send_channel.junction_id() == self.id {
SendPartialPattern::new(self.id, send_channel.strip(), self.sender.clone())
} else {
panic!(
"SendChannel is not associated with Junction! Please use \
a SendChannel created using the same Junction calling \
this function!"
);
}
}
/// Create new partial Join Pattern starting with a `RecvChannel`.
///
/// # Panics
///
/// Panics if the supplied `RecvChannel` does not carry the same
/// `JunctionID` as this `Junction`, i.e. has not been created by and is
/// associated with this `Junction`.
pub fn when_recv<R>(&self, recv_channel: &RecvChannel<R>) -> RecvPartialPattern<R>
where
R: Any + Send,
{
if recv_channel.junction_id() == self.id {
RecvPartialPattern::new(recv_channel.strip(), self.sender.clone())
} else {
panic!(
"RecvChannel is not associated with Junction! Please use \
a RecvChannel created using the same Junction calling \
this function!"
);
}
}
/// Create a new partial Join Pattern starting with a `BidirChannel`.
///
/// # Panics
///
/// Panics if the supplied `BidirChannel` does not carry the same
/// `JunctionID` as this `Junction`, i.e. has not been created by and is
/// associated with this `Junction`.
pub fn when_bidir<T, R>(&self, bidir_channel: &BidirChannel<T, R>) -> BidirPartialPattern<T, R>
where
T: Any + Send,
R: Any + Send,
{
if bidir_channel.junction_id() == self.id {
BidirPartialPattern::new(bidir_channel.strip(), self.sender.clone())
} else {
panic!(
"BidirChannel is not associated with Junction! Please use \
a BidirChannel created using the same Junction calling \
this function!"
);
}
}
}
impl Drop for Junction {
/// Drop the `Junction` and free its resources.
///
/// If there is a `ControllerHandle` still available, use it to stop the
/// associated `Controller` and join the control thread. Otherwise, no
/// action is needed.
fn drop(&mut self) {
if self.controller_handle.is_some() {
self.controller_handle.take().unwrap().stop();
}
}
}