1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211
//! Overarching structure to group `JoinPattern`s and their associated channels //! together. Main structure for the public interface, used to create new //! channels and construct `JoinPattern`s based on them. use std::any::Any; use std::ops::Drop; use std::sync::mpsc::{channel, RecvError, Sender}; use super::channels::{BidirChannel, RecvChannel, SendChannel}; use super::controller::Controller; use super::patterns::unary::{BidirPartialPattern, RecvPartialPattern, SendPartialPattern}; use super::types::{ids, ControllerHandle, Packet}; /// Struct managing the creation of new channels and Join Patterns. /// /// This struct is used to group channels, such as `SendChannel`, which can /// be used in conjunction to create new Join Patterns. As such, it offers /// methods to create new channels which are then directly linked to this /// `Junction`. It also offers methods to start off the creation of new /// Join Patterns that rely on the channels created by this struct and can, /// in fact, only consist of channels associated with this struct. pub struct Junction { id: ids::JunctionId, controller_handle: Option<ControllerHandle>, sender: Sender<Packet>, } #[allow(clippy::new_without_default)] impl Junction { /// Create a new `Junction` and start control thread in background. /// /// Create a new `Junction` and spawn a control thread in the background /// that will handle all incoming `Packet`s for this `Junction`. A /// `JoinHandle` to this control thread is stored alongside the `Junction`. pub fn new() -> Junction { let (sender, receiver) = channel::<Packet>(); let controller = Controller::new(); Junction { id: ids::JunctionId::new(), controller_handle: Some(controller.start(sender.clone(), receiver)), sender, } } /// Return handle to internal `Controller` if available. /// /// Each `Junction` has an associated control thread with a `Controller` /// running to handle incoming `Packet`s. This `Controller` is gracefully /// stopped and its thread joined as soon as the `Junction` goes out of /// scope. However, as this is sometimes undesired behavior, the user can /// retrieve the handle to the `Junction`'s `Controller` and its thread /// with this function and stop the `Controller` at a time of their /// choosing. Once this function has executed, the `Junction` will no /// long automatically stop its `Controller` and join the control thread /// upon going out of scope. /// /// Note that this handle can only be retrieved once. pub fn controller_handle(&mut self) -> Option<ControllerHandle> { self.controller_handle.take() } /// Create and return a new `SendChannel` on this `Junction`. /// /// The generic parameter `T` is used to determine the type of values /// that can be sent on this channel. /// /// # Panics /// /// Panics if it received an error while trying to receive a new /// channel ID from the control thread. pub fn send_channel<T>(&self) -> SendChannel<T> where T: Any + Send, { SendChannel::new(self.new_channel_id().unwrap(), self.id, self.sender.clone()) } /// Create and return a new `RecvChannel` on this `Junction`. /// /// The generic parameter `R` is used to determine the type of values /// that can be received on this channel. /// /// # Panics /// /// Panics if it received an error while trying to receive a new /// channel ID from the control thread. pub fn recv_channel<R>(&self) -> RecvChannel<R> where R: Any + Send, { RecvChannel::new(self.new_channel_id().unwrap(), self.id, self.sender.clone()) } /// Create and return a new `BidirChannel` on this `Junction`. /// /// The generic parameter `T` is used to determine the type of values /// that can be sent on this channel while `R` is used to determine /// the type of values that can be received on this channel. /// /// # Panics /// /// Panics if it received an error while trying to receive the new /// channel IDs from the control thread. pub fn bidir_channel<T, R>(&self) -> BidirChannel<T, R> where T: Any + Send, R: Any + Send, { BidirChannel::new(self.new_channel_id().unwrap(), self.id, self.sender.clone()) } /// Request ID for a new channel from control thread. /// /// # Panics /// /// Panics if request for new channel id could not be sent to /// control thread. fn new_channel_id(&self) -> Result<ids::ChannelId, RecvError> { let (id_sender, id_receiver) = channel::<ids::ChannelId>(); self.sender .send(Packet::NewChannelIdRequest { return_sender: id_sender, }) .unwrap(); id_receiver.recv() } /// Create new partial Join Pattern starting with a `SendChannel`. /// /// # Panics /// /// Panics if the supplied `SendChannel` does not carry the same /// `JunctionID` as this `Junction`, i.e. has not been created by and is /// associated with this `Junction`. pub fn when<T>(&self, send_channel: &SendChannel<T>) -> SendPartialPattern<T> where T: Any + Send, { if send_channel.junction_id() == self.id { SendPartialPattern::new(self.id, send_channel.strip(), self.sender.clone()) } else { panic!( "SendChannel is not associated with Junction! Please use \ a SendChannel created using the same Junction calling \ this function!" ); } } /// Create new partial Join Pattern starting with a `RecvChannel`. /// /// # Panics /// /// Panics if the supplied `RecvChannel` does not carry the same /// `JunctionID` as this `Junction`, i.e. has not been created by and is /// associated with this `Junction`. pub fn when_recv<R>(&self, recv_channel: &RecvChannel<R>) -> RecvPartialPattern<R> where R: Any + Send, { if recv_channel.junction_id() == self.id { RecvPartialPattern::new(recv_channel.strip(), self.sender.clone()) } else { panic!( "RecvChannel is not associated with Junction! Please use \ a RecvChannel created using the same Junction calling \ this function!" ); } } /// Create a new partial Join Pattern starting with a `BidirChannel`. /// /// # Panics /// /// Panics if the supplied `BidirChannel` does not carry the same /// `JunctionID` as this `Junction`, i.e. has not been created by and is /// associated with this `Junction`. pub fn when_bidir<T, R>(&self, bidir_channel: &BidirChannel<T, R>) -> BidirPartialPattern<T, R> where T: Any + Send, R: Any + Send, { if bidir_channel.junction_id() == self.id { BidirPartialPattern::new(bidir_channel.strip(), self.sender.clone()) } else { panic!( "BidirChannel is not associated with Junction! Please use \ a BidirChannel created using the same Junction calling \ this function!" ); } } } impl Drop for Junction { /// Drop the `Junction` and free its resources. /// /// If there is a `ControllerHandle` still available, use it to stop the /// associated `Controller` and join the control thread. Otherwise, no /// action is needed. fn drop(&mut self) { if self.controller_handle.is_some() { self.controller_handle.take().unwrap().stop(); } } }