kekbit_core/
api.rs

1//! Defines read and write operations for a kekbit channel.
2use kekbit_codecs::codecs::DataFormat;
3use kekbit_codecs::codecs::Encodable;
4use std::io::Error;
5///Channel Access errors
6#[derive(Debug)]
7pub enum ChannelError {
8    ///The channel has an invalid signature. The channel signature must be `0x2A54_4942_4B45_4B2A`
9    InvalidSignature {
10        ///The expected signature always `0x2A54_4942_4B45_4B2A`
11        expected: u64,
12        ///The signature red from the kekbit storage
13        actual: u64,
14    },
15    ///The channel's storage is of an incompatible file format
16    IncompatibleVersion {
17        ///Expected storage version
18        expected: u64,
19        ///Actual  storage version
20        actual: u64,
21    },
22    ///The channel's capacity is invalid. Either too small or is not aligned to 8 bytes.
23    InvalidCapacity {
24        ///Actual capacity
25        capacity: u32,
26        ///Reason why the capacity is invalid
27        msg: &'static str,
28    },
29    ///The maximum message length specified is invalid
30    InvalidMaxMessageLength {
31        ///The specified maximum message length
32        msg_len: u32,
33        ///Reason why maximum message length is invalid
34        msg: &'static str,
35    },
36    ///The channel storage does not exist
37    StorageNotFound {
38        ///The file expected to back the channel storage
39        file_name: String,
40    },
41
42    ///The channel storage is not ready to access
43    StorageNotReady {
44        ///The file that backs the channel storage
45        file_name: String,
46    },
47    ///The channel storage is not ready to access
48    StorageAlreadyExists {
49        ///The file that backs the channel storage
50        file_name: String,
51    },
52    ///The channel storage can't be accessed
53    CouldNotAccessStorage {
54        ///The file that backs the channel storage
55        file_name: String,
56    },
57    ///Mapping the channel's file to memory had failed
58    MemoryMappingFailed {
59        reason: String,
60    },
61
62    AccessError {
63        reason: String,
64    },
65}
66
67///Write operation errors
68#[derive(Debug)]
69pub enum WriteError {
70    ///There is not enough space available in the channel for any write. The channel is full.
71    ChannelFull,
72    /// The record was larger than the maximum allowed size or the maximum available space.
73    NoSpaceForRecord,
74    /// The encoding operation had failed
75    EncodingError(Error),
76}
77
78///The `Writer` trait allows writing chunk of bytes as records into a kekbit channel.
79/// Implementers of this trait are called 'kekbit writers'. Usually a writer is bound to
80/// a given channel, and it is expected that there is only one writer which directly writes into the channel, however
81/// multiple writers may cooperate during the writing process. For any given channel a  [DataFormat](../codecs/trait.DataFormat.html) must be specified.
82pub trait Writer<D: DataFormat> {
83    /// Writes a given record to a kekbit channel.
84    ///
85    /// Returns the total amount of bytes wrote into the channel or a `WriteError` if the write operation fails.
86    ///
87    /// # Arguments
88    ///
89    /// * `data` - information to be encoded and pushed into channel.
90    ///
91    /// # Errors
92    ///
93    /// If the operation fails, than an error variant will be returned. Some errors such [EncodingError or NoSpaceForRecord](enum.WriteError.html) may
94    /// allow future writes to succeed while others such [ChannelFull](enum.WriteError.html#ChannelFull) signals the end of life for the channel.
95    fn write(&mut self, data: &impl Encodable<D>) -> Result<u32, WriteError>;
96    /// Writes into the stream a heartbeat message. This method shall be used by all writers
97    /// which want to respect to timeout interval associated to a channel. Hearbeating is the
98    /// expected mechanism by which a channel writer will keep the active readers interested in
99    /// the data published on the channel.
100    /// Heartbeat shall be done regularly at a time interval which ensures that at least one heartbeat
101    /// is sent between any two 'timeout' long intervals.
102    ///
103    /// Returns the total amount of bytes wrote into the channel or a `WriteError` if the write operation fails.
104    ///
105    /// # Errors
106    ///
107    /// If this call fails than an error variant will be returned. The errors are not recoverable,
108    /// they signal that the channel had reached the end of its lifetime.
109    fn heartbeat(&mut self) -> Result<u32, WriteError>;
110
111    /// Flushes the stream which possibly backs the kekbit writer.
112    /// By default this method does nothing, and should be implemented only for `Writer`s which it makes sense.
113    /// Returns the success of the operation
114    fn flush(&mut self) -> Result<(), std::io::Error> {
115        Ok(())
116    }
117}
118
119///Read operation errors
120#[derive(Clone, Copy, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)]
121pub enum ReadError {
122    ///Read operation had unexpectedly failed. Usually will happen when a channel was corrupted.
123    Failed {
124        ///The amount of bytes read *before* the error occurred.
125        bytes_read: u32,
126    },
127    ///Writer timeout had been detected. While the writer may resume pushing data in to the channel, most likely he had abandoned the channel.
128    Timeout {
129        ///Last time stamp at which the channel was still considered valid.
130        timeout: u64,
131    },
132    ///Channel is closed no more data will be pushed into.
133    Closed {
134        ///The amount of bytes read *before* the channel close mark was reached.
135        bytes_read: u32,
136    },
137    ///Channel full. There is no more space available in this channel.
138    ChannelFull {
139        ///The amount of bytes read *before* the end of channel was reached.
140        bytes_read: u32,
141    },
142}
143///Errors caused by failed [move_to](trait.Reader.html#method.move_to) operation.
144#[derive(Debug)]
145pub enum InvalidPosition {
146    ///Position is not properly aligned with the channel's records
147    Unaligned { position: u32 },
148    ///Position is not available  in the channel - it is past the last valid record of the channel.
149    Unavailable { position: u32 },
150}
151
152impl ReadError {
153    ///Returns the number of valid bytes read before an error occurred.
154    pub fn bytes_read(&self) -> u32 {
155        match self {
156            ReadError::Timeout { .. } => 0,
157            ReadError::Closed { bytes_read } => *bytes_read,
158            ReadError::ChannelFull { bytes_read } => *bytes_read,
159            ReadError::Failed { bytes_read } => *bytes_read,
160        }
161    }
162}
163
164///The `Reader` trait allows reading bytes from a kekbit channel. Implementers of this trait
165/// are called 'kekbit readers'. Usually a reader is bound to a given channel, and it is
166/// expected that multiple readers will safely access the same channel simultaneous.
167pub trait Reader {
168    ///Accesses a number of records from the kekbit channel, and for each record, calls
169    ///the given callback.
170    ///
171    /// Returns the amount of bytes read from the channel
172    ///
173    /// # Arguments
174    ///
175    /// * `handler` - The callback function to be called when a record is pulled from the channel.
176    ///               The function will receive as parameters the position of the message in the channel, and the message in binary format.
177    /// * `message_count` - A hint about how many records shall be read from the channel before the method completes.
178    ///                     It is expected that this method will take from the channel at most this many records
179    ///
180    ///
181    /// # Errors
182    ///
183    /// If this function fails, than an error variant will be returned. These errors are not expected to be recoverable. Once any error except `Timeout` occurred, there will never be
184    /// data to read pass the current read marker. However reading from beginning of the channel to the current
185    /// read marker should still be a valid operation. The `Timeout` exception,
186    /// may or may not be recoverable, depends on the channel `Writer` behaviour.
187    fn read(&mut self, handler: &mut impl FnMut(u32, &[u8]) -> (), message_count: u16) -> Result<u32, ReadError>;
188
189    /// Moves the reader to the given position in the channel *if the position is valid and points
190    /// to the beginning of a record*. This method could be used by a reader to resume work from
191    /// a previous session.
192    ///
193    /// Returns the position if the operation succeeds
194    ///
195    /// # Arguments
196    ///
197    /// * `position` - The position in channel where we want the reader to point. The value is accounted
198    ///                 from the beginning of the channel(e.g. a position of zero means the beginning of the channel). The position
199    ///                 must be valid, it must be properly aligned, and is should point to the start of a record.
200    ///
201    /// #  Errors
202    ///
203    /// If the channel is corrupted or the position is invalid a [InvalidPosition](enum.InvalidPosition.html)
204    /// will occur.
205    ///
206    fn move_to(&mut self, position: u32) -> Result<u32, InvalidPosition>;
207}