1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165
// Copyright 2018-2020 Cargill Incorporated // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. //! Traits for defining a connection matrix. //! //! A connection matrix is a collection containing [`Connection`] items, which allows sending to //! and receiving from those connections via a connection identifier. //! //! A connection matrix must implement the following operations: //! //! - add: Add a connection to the connection matrix //! - remove: Remove a connection from the connection matrix //! - send: Send a message to a connection //! - recv: Receive a message from any ready connection in the connection matrix //! //! Traits are defined in a granular manner which matches how a connection matrix is used. For //! example, lifecycle operations (add and remove) are performed in a different component than send //! and receive, and are thus handled via a separate trait. //! //! [`Connection`]: ../trait.Connection.html use std::time::Duration; use super::Connection; pub use super::error::{ ConnectionMatrixAddError, ConnectionMatrixRecvError, ConnectionMatrixRecvTimeoutError, ConnectionMatrixRemoveError, ConnectionMatrixSendError, }; /// Contains a payload and the identifier for the connection on which the payload was received #[derive(Debug, Default, PartialEq)] pub struct ConnectionMatrixEnvelope { /// The connection identifier id: String, /// The message payload bytes payload: Vec<u8>, } impl ConnectionMatrixEnvelope { /// Creates a new `ConnectionMatrixEnvelope` /// /// This is used by the implementation of a [`ConnectionMatrixReceiver`] to create the envelope /// returned by [`recv`] or [`recv_timeout`]. /// /// [`ConnectionMatrixReceiver`]: trait.ConnectionMatrixReceiver.html /// [`recv`]: trait.ConnectionMatrixReceiver.html#tymethod.recv /// [`recv_timeout`]: trait.ConnectionMatrixReceiver.html#tymethod.recv_timeout pub fn new(id: String, payload: Vec<u8>) -> Self { ConnectionMatrixEnvelope { id, payload } } /// Returns the connection identifier of the connection on which the payload was received pub fn id(&self) -> &str { &self.id } /// Returns the bytes of the payload pub fn payload(&self) -> &[u8] { &self.payload } /// Returns the bytes of the payload while consuming the `ConnectionMatrixEnvelope` pub fn take_payload(self) -> Vec<u8> { self.payload } /// Returns the payload and consumes the ConnectionMatrixEnvelope pub fn into_inner(self) -> Vec<u8> { self.payload } } impl From<ConnectionMatrixEnvelope> for Vec<u8> { fn from(envelope: ConnectionMatrixEnvelope) -> Self { envelope.payload.to_vec() } } /// Defines connection lifecycle operations (addition and removal of a `Connection`) /// /// This trait is distinct from the sender/receiver traits because the lifecycle operations /// typically occur in a separate component. Thus, we can expose this trait only where the /// lifecycle operations are performed and nowhere else in the system. pub trait ConnectionMatrixLifeCycle: Clone + Send { /// Adds a connection to the connection matrix /// /// # Arguments /// /// * `connection` - Connection being added to the connection matrix /// * `id` - Connection identifier; must be unique within the connection matrix /// /// If the add failed, a `ConnectionMatrixAddError` will be returned. fn add( &self, connection: Box<dyn Connection>, id: String, ) -> Result<usize, ConnectionMatrixAddError>; /// Removes a connection from the connection matrix /// /// # Arguments /// /// * `id` - the connection identifier for the connection being removed /// /// If the remove failed, a `ConnectionMatrixRemoveError` will be returned. fn remove(&self, id: &str) -> Result<Box<dyn Connection>, ConnectionMatrixRemoveError>; } /// Defines a function to send a message using a connection identifier pub trait ConnectionMatrixSender: Clone + Send { /// Sends a message over the specified connection. /// /// # Arguments /// /// * `id` - the identifier of the connection on which the message should be sent /// * `message` - the bytes of the message /// /// If the send failed, a `ConnectionMatrixSendError` will be returned. fn send(&self, id: String, message: Vec<u8>) -> Result<(), ConnectionMatrixSendError>; } /// Defines functions to receive messages from connections within the connection matrix pub trait ConnectionMatrixReceiver: Clone + Send { /// Attempts to receive a message. The envelope returned contains both the payload (message) /// and the identifier of the connection on which it was received. This function will block /// until there is a message to receive. The message will come from the first ready connection /// detected. /// /// If the receive failed, a `ConnectionMatrixRecvError` is returned. fn recv(&self) -> Result<ConnectionMatrixEnvelope, ConnectionMatrixRecvError>; /// Attempts to receive a message, with a timeout. The envelope returned contains both the /// payload (message) and the identifier of the connection on which it was received. This /// function will block until there is a message to receive or the specified timeout expires. /// The message will come from the first ready connection detected. /// /// # Arguments /// /// * `timeout` - `Duration` for the amount of time the function should block waiting on an /// envelope to arrive /// /// If the receive failed or timed out, a `ConnectionMatrixRecvTimeoutError` is returned. fn recv_timeout( &self, timeout: Duration, ) -> Result<ConnectionMatrixEnvelope, ConnectionMatrixRecvTimeoutError>; } /// Defines a function to shutdown the connection matrix pub trait ConnectionMatrixShutdown: Clone + Send { /// Notifies the underlying connection matrix to shutdown fn shutdown(&self); }