kekbit_core/api.rs
1//! Defines read and write operations for a kekbit channel.
2use kekbit_codecs::codecs::DataFormat;
3use kekbit_codecs::codecs::Encodable;
4use std::io::Error;
5///Channel Access errors
6#[derive(Debug)]
7pub enum ChannelError {
8 ///The channel has an invalid signature. The channel signature must be `0x2A54_4942_4B45_4B2A`
9 InvalidSignature {
10 ///The expected signature always `0x2A54_4942_4B45_4B2A`
11 expected: u64,
12 ///The signature red from the kekbit storage
13 actual: u64,
14 },
15 ///The channel's storage is of an incompatible file format
16 IncompatibleVersion {
17 ///Expected storage version
18 expected: u64,
19 ///Actual storage version
20 actual: u64,
21 },
22 ///The channel's capacity is invalid. Either too small or is not aligned to 8 bytes.
23 InvalidCapacity {
24 ///Actual capacity
25 capacity: u32,
26 ///Reason why the capacity is invalid
27 msg: &'static str,
28 },
29 ///The maximum message length specified is invalid
30 InvalidMaxMessageLength {
31 ///The specified maximum message length
32 msg_len: u32,
33 ///Reason why maximum message length is invalid
34 msg: &'static str,
35 },
36 ///The channel storage does not exist
37 StorageNotFound {
38 ///The file expected to back the channel storage
39 file_name: String,
40 },
41
42 ///The channel storage is not ready to access
43 StorageNotReady {
44 ///The file that backs the channel storage
45 file_name: String,
46 },
47 ///The channel storage is not ready to access
48 StorageAlreadyExists {
49 ///The file that backs the channel storage
50 file_name: String,
51 },
52 ///The channel storage can't be accessed
53 CouldNotAccessStorage {
54 ///The file that backs the channel storage
55 file_name: String,
56 },
57 ///Mapping the channel's file to memory had failed
58 MemoryMappingFailed {
59 reason: String,
60 },
61
62 AccessError {
63 reason: String,
64 },
65}
66
67///Write operation errors
68#[derive(Debug)]
69pub enum WriteError {
70 ///There is not enough space available in the channel for any write. The channel is full.
71 ChannelFull,
72 /// The record was larger than the maximum allowed size or the maximum available space.
73 NoSpaceForRecord,
74 /// The encoding operation had failed
75 EncodingError(Error),
76}
77
78///The `Writer` trait allows writing chunk of bytes as records into a kekbit channel.
79/// Implementers of this trait are called 'kekbit writers'. Usually a writer is bound to
80/// a given channel, and it is expected that there is only one writer which directly writes into the channel, however
81/// multiple writers may cooperate during the writing process. For any given channel a [DataFormat](../codecs/trait.DataFormat.html) must be specified.
82pub trait Writer<D: DataFormat> {
83 /// Writes a given record to a kekbit channel.
84 ///
85 /// Returns the total amount of bytes wrote into the channel or a `WriteError` if the write operation fails.
86 ///
87 /// # Arguments
88 ///
89 /// * `data` - information to be encoded and pushed into channel.
90 ///
91 /// # Errors
92 ///
93 /// If the operation fails, than an error variant will be returned. Some errors such [EncodingError or NoSpaceForRecord](enum.WriteError.html) may
94 /// allow future writes to succeed while others such [ChannelFull](enum.WriteError.html#ChannelFull) signals the end of life for the channel.
95 fn write(&mut self, data: &impl Encodable<D>) -> Result<u32, WriteError>;
96 /// Writes into the stream a heartbeat message. This method shall be used by all writers
97 /// which want to respect to timeout interval associated to a channel. Hearbeating is the
98 /// expected mechanism by which a channel writer will keep the active readers interested in
99 /// the data published on the channel.
100 /// Heartbeat shall be done regularly at a time interval which ensures that at least one heartbeat
101 /// is sent between any two 'timeout' long intervals.
102 ///
103 /// Returns the total amount of bytes wrote into the channel or a `WriteError` if the write operation fails.
104 ///
105 /// # Errors
106 ///
107 /// If this call fails than an error variant will be returned. The errors are not recoverable,
108 /// they signal that the channel had reached the end of its lifetime.
109 fn heartbeat(&mut self) -> Result<u32, WriteError>;
110
111 /// Flushes the stream which possibly backs the kekbit writer.
112 /// By default this method does nothing, and should be implemented only for `Writer`s which it makes sense.
113 /// Returns the success of the operation
114 fn flush(&mut self) -> Result<(), std::io::Error> {
115 Ok(())
116 }
117}
118
119///Read operation errors
120#[derive(Clone, Copy, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)]
121pub enum ReadError {
122 ///Read operation had unexpectedly failed. Usually will happen when a channel was corrupted.
123 Failed {
124 ///The amount of bytes read *before* the error occurred.
125 bytes_read: u32,
126 },
127 ///Writer timeout had been detected. While the writer may resume pushing data in to the channel, most likely he had abandoned the channel.
128 Timeout {
129 ///Last time stamp at which the channel was still considered valid.
130 timeout: u64,
131 },
132 ///Channel is closed no more data will be pushed into.
133 Closed {
134 ///The amount of bytes read *before* the channel close mark was reached.
135 bytes_read: u32,
136 },
137 ///Channel full. There is no more space available in this channel.
138 ChannelFull {
139 ///The amount of bytes read *before* the end of channel was reached.
140 bytes_read: u32,
141 },
142}
143///Errors caused by failed [move_to](trait.Reader.html#method.move_to) operation.
144#[derive(Debug)]
145pub enum InvalidPosition {
146 ///Position is not properly aligned with the channel's records
147 Unaligned { position: u32 },
148 ///Position is not available in the channel - it is past the last valid record of the channel.
149 Unavailable { position: u32 },
150}
151
152impl ReadError {
153 ///Returns the number of valid bytes read before an error occurred.
154 pub fn bytes_read(&self) -> u32 {
155 match self {
156 ReadError::Timeout { .. } => 0,
157 ReadError::Closed { bytes_read } => *bytes_read,
158 ReadError::ChannelFull { bytes_read } => *bytes_read,
159 ReadError::Failed { bytes_read } => *bytes_read,
160 }
161 }
162}
163
164///The `Reader` trait allows reading bytes from a kekbit channel. Implementers of this trait
165/// are called 'kekbit readers'. Usually a reader is bound to a given channel, and it is
166/// expected that multiple readers will safely access the same channel simultaneous.
167pub trait Reader {
168 ///Accesses a number of records from the kekbit channel, and for each record, calls
169 ///the given callback.
170 ///
171 /// Returns the amount of bytes read from the channel
172 ///
173 /// # Arguments
174 ///
175 /// * `handler` - The callback function to be called when a record is pulled from the channel.
176 /// The function will receive as parameters the position of the message in the channel, and the message in binary format.
177 /// * `message_count` - A hint about how many records shall be read from the channel before the method completes.
178 /// It is expected that this method will take from the channel at most this many records
179 ///
180 ///
181 /// # Errors
182 ///
183 /// If this function fails, than an error variant will be returned. These errors are not expected to be recoverable. Once any error except `Timeout` occurred, there will never be
184 /// data to read pass the current read marker. However reading from beginning of the channel to the current
185 /// read marker should still be a valid operation. The `Timeout` exception,
186 /// may or may not be recoverable, depends on the channel `Writer` behaviour.
187 fn read(&mut self, handler: &mut impl FnMut(u32, &[u8]) -> (), message_count: u16) -> Result<u32, ReadError>;
188
189 /// Moves the reader to the given position in the channel *if the position is valid and points
190 /// to the beginning of a record*. This method could be used by a reader to resume work from
191 /// a previous session.
192 ///
193 /// Returns the position if the operation succeeds
194 ///
195 /// # Arguments
196 ///
197 /// * `position` - The position in channel where we want the reader to point. The value is accounted
198 /// from the beginning of the channel(e.g. a position of zero means the beginning of the channel). The position
199 /// must be valid, it must be properly aligned, and is should point to the start of a record.
200 ///
201 /// # Errors
202 ///
203 /// If the channel is corrupted or the position is invalid a [InvalidPosition](enum.InvalidPosition.html)
204 /// will occur.
205 ///
206 fn move_to(&mut self, position: u32) -> Result<u32, InvalidPosition>;
207}