1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222
//! Defines read and write operations for a kekbit channel. ///Channel Access errors #[derive(Debug, Eq, Hash, Ord, PartialEq, PartialOrd)] pub enum ChannelError { ///The channel has an invalid signature. The channel signature must be `0x2A54_4942_4B45_4B2A` InvalidSignature { ///The expected signature always `0x2A54_4942_4B45_4B2A` expected: u64, ///The signature red from the kekbit storage actual: u64, }, ///The channel's storage is of an incompatible file format IncompatibleVersion { ///Expected storage version expected: u64, ///Actual storage version actual: u64, }, ///The channel's capacity is invalid. Either too small or is not aligned to 8 bytes. InvalidCapacity { ///Actual capacity capacity: u32, ///Reason why the capacity is invalid msg: &'static str, }, ///The maximum message length specified is invalid InvalidMaxMessageLength { ///The specified maximum message length msg_len: u32, ///Reason why maximum message length is invalid msg: &'static str, }, ///The channel storage does not exist StorageNotFound { ///The file expected to back the channel storage file_name: String, }, ///The channel storage is not ready to access StorageNotReady { ///The file that backs the channel storage file_name: String, }, ///The channel storage is not ready to access StorageAlreadyExists { ///The file that backs the channel storage file_name: String, }, ///The channel storage can't be accessed CouldNotAccessStorage { ///The file that backs the channel storage file_name: String, }, ///Mapping the channel's file to memory had failed MemoryMappingFailed { reason: String, }, AccessError { reason: String, }, } ///Write operation errors #[derive(Clone, Copy, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)] pub enum WriteError { /// There is not enough space available in the channel for such an operation. NoSpaceAvailable { ///The space amount required by the record. It will be larger than the record size. required: u32, ///The space amount available in the channel. left: u32, }, /// The record was longer than the maximum allowed. MaxRecordLenExceed { ///The size of the record to be written. rec_len: u32, ///The maximum allowed size for a record. max_allowed: u32, }, } static HEARTBEAT_MSG: &[u8] = &[]; ///The `Writer` trait allows writing chunk of bytes as records into a kekbit channel. /// Implementers of this trait are called 'kekbit writers'. Usually a writer is bound to /// a given channel, and it is expected that there is only one writer which directly writes into the channel, however /// multiple writers may cooperate during the writing process. pub trait Writer { /// Writes a given record to a kekbit channel. /// /// Returns the total amount of bytes wrote into the channel or a `WriteError` if the write operation fails. /// /// # Arguments /// /// * `data` - The buffer which contains the record data to be written. /// * `len` - The amount of data to be write in the channel, the record length. /// /// # Errors /// /// If the operation fails, than an error variant will be returned. /// Regardless the error variant a future write with a smaller record size may be successful. /// fn write(&mut self, data: &[u8], len: u32) -> Result<u32, WriteError>; /// Writes into the stream a heartbeat message. This method shall be used by all writers /// which want to respect to timeout interval associated to a channel. Hearbeating is the /// expected mechanism by which a channel writer will keep the active readers interested in /// the data published on the channel. /// Heartbeat shall be done regularly, at a time interval which ensures that at least one heartbeat /// is sent between any two 'timeout' long intervals. /// /// Returns the total amount of bytes wrote into the channel or a `WriteError` if the write operation fails. /// /// # Errors /// /// If this call fails, than an error variant will be returned. However /// in this case the errors are not recoverable, they signal that the channel is at the /// end of its lifetime. #[inline(always)] fn heartbeat(&mut self) -> Result<u32, WriteError> { self.write(HEARTBEAT_MSG, 0) } /// Flushes the stream which possibly backs the kekbit writer. /// By default this method does nothing, and should be implemented only for `Writer`s which /// it makes sense. /// Returns the success of the operation fn flush(&mut self) -> Result<(), std::io::Error> { Ok(()) } } ///Read operation errors #[derive(Clone, Copy, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)] pub enum ReadError { ///Read operation had unexpectedly failed. Usually will happen when a channel was corrupted. Failed { ///The amount of bytes read *before* the error occurred. bytes_read: u32, }, ///Writer timeout had been detected. While the writer may resume pushing data in to the channel, most likely he had abandoned the channel. Timeout { ///Last time stamp at which the channel was still considered valid. timeout: u64, }, ///Channel is closed no more data will be pushed into. Closed { ///The amount of bytes read *before* the channel close mark was reached. bytes_read: u32, }, ///Channel full. There is no more space available in this channel. ChannelFull { ///The amount of bytes read *before* the end of channel was reached. bytes_read: u32, }, } ///Errors caused by failed [move_to](trait.Reader.html#method.move_to) operation. #[derive(Clone, Copy, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)] pub enum InvalidPosition { ///Position is not properly aligned with the channel's records Unaligned { position: u32 }, ///Position is not available in the channel - it is past the last valid record of the channel. Unavailable { position: u32 }, } impl ReadError { ///Returns the number of valid bytes read before an error occurred. pub fn bytes_read(&self) -> u32 { match self { ReadError::Timeout { .. } => 0, ReadError::Closed { bytes_read } => *bytes_read, ReadError::ChannelFull { bytes_read } => *bytes_read, ReadError::Failed { bytes_read } => *bytes_read, } } } ///The `Reader` trait allows reading bytes from a kekbit channel. Implementers of this trait /// are called 'kekbit readers'. Usually a reader is bound to a given channel, and it is /// expected that multiple readers will safely access the same channel simultaneous. pub trait Reader { ///Accesses a number of records from the kekbit channel, and for each record, calls ///the given callback. /// /// Returns the amount of bytes read from the channel /// /// # Arguments /// /// * `handler` - The callback function to be called when a record is pulled from the channel. /// The function will receive as parameters the position of the message in the channel, and the message in binary format. /// * `message_count` - A hint about how many records shall be read from the channel before the method completes. /// It is expected that this method will take from the channel at most this many records /// /// /// # Errors /// /// If this function fails, than an error variant will be returned. These errors are not expected to be recoverable. Once any error except `Timeout` occurred, there will never be /// data to read pass the current read marker. However reading from beginning of the channel to the current /// read marker should still be a valid operation. The `Timeout` exception, /// may or may not be recoverable, depends on the channel `Writer` behaviour. fn read(&mut self, handler: &mut impl FnMut(u32, &[u8]) -> (), message_count: u16) -> Result<u32, ReadError>; /// Moves the reader to the given position in the channel *if the position is valid and points /// to the beginning of a record*. This method could be used by a reader to resume work from /// a previous session. /// /// Returns the position if the operation succeeds /// /// # Arguments /// /// * `position` - The position in channel where we want the reader to point. The value is accounted /// from the beginning of the channel(e.g. a position of zero means the beginning of the channel). The position /// must be valid, it must be properly aligned, and is should point to the start of a record. /// /// # Errors /// /// If the channel is corrupted or the position is invalid a [InvalidPosition](enum.InvalidPosition.html) /// will occur. /// fn move_to(&mut self, position: u32) -> Result<u32, InvalidPosition>; }