reactive_messaging/
config.rs

1//! Contains constants and other configuration information affecting default & fixed behaviors of this library
2
3use std::{ops::RangeInclusive, num::NonZeroU8};
4use std::time::Duration;
5use reactive_mutiny::prelude::Instruments;
6//use strum_macros::FromRepr;
7
8
9// /// Specifies the channels (queues) from `reactive-mutiny` that may be used to send/receive data.\
10// /// On different hardware, the performance characteristics may vary.
11// #[derive(Debug,PartialEq,FromRepr)]
12// pub enum Channels<const CONFIG: u64> {
13//     Atomic { uni: Option<>, sender: Option<> },
14//     FullSync { uni: None, sender: None },
15//     Crossbeam {},
16// }
17
18/// Specifies how to behave when communication failures happen
19#[derive(Debug,PartialEq)]
20pub enum RetryingStrategies {
21
22    /// Simply ignore full buffer failures denials of sending & receiving messages, without retrying nor dropping the connection.\
23    /// This option is acceptable when missing messages don't disrupt the communications and when low latencies / realtime-ish behavior is required.\
24    /// Set [ConstConfig::sender_buffer] & [ConstConfig::receiver_buffer] accordingly.
25    DoNotRetry,
26
27    /// Drops the connection on "buffer is full" errors, also without retrying
28    EndCommunications,
29
30    /// Retries, in case of "buffer is full" errors, ending the communications if success still can't be achieved.\
31    /// Uses an Exponential Backoff strategy with factor 2.526 and 20% jitter, giving the milliseconds to sleep between,
32    /// at most, the given number of attempts.\
33    /// The total retrying time would be the sum of the geometric progression: (-1+2.526^n)/(1.526) -- in milliseconds.\
34    /// Example: for up to 5 minutes retrying, use 14 attempts.
35    RetryWithBackoffUpTo(u8),
36
37    /// Retries, in case of "buffer is full" errors, ending the communications if success still can't be achieved
38    /// during the specified milliseconds -- during which retrying will be performed in a pool loop, yielding
39    /// to tokio before each attempt.\
40    /// Use this option if low latency is desirable -- but see also [Self::RetrySleepingArithmetically]
41    RetryYieldingForUpToMillis(u8),
42
43    /// Deprecated. Do not use -- to be replaced or removed, as spinning doesn't make sense in this lib
44    RetrySpinningForUpToMillis(u8),
45    // /// reconnect if dropped? this may go as normal parameter... and on the client only
46}
47impl RetryingStrategies {
48    /// requires 3+8=11 bits to represent the data; reverse of [Self::from_repr()]
49    const fn as_repr(&self) -> u16 {
50        match self {
51            Self::DoNotRetry => 0,
52            Self::EndCommunications                    => 1,
53            Self::RetryWithBackoffUpTo(n)        => 2 | (*n as u16) << 3,
54            Self::RetryYieldingForUpToMillis(n)  => 3 | (*n as u16) << 3,
55            Self::RetrySpinningForUpToMillis(n)  => 4 | (*n as u16) << 3,
56        }
57    }
58    /// reverse of [Self::from_repr()]
59    const fn from_repr(repr: u16) -> Self {
60        let (variant, n) = (repr & 7, repr >> 3);
61        match variant {
62            0 => Self::DoNotRetry,
63            1 => Self::EndCommunications,
64            2 => Self::RetryWithBackoffUpTo(n as u8),
65            3 => Self::RetryYieldingForUpToMillis(n as u8),
66            4 => Self::RetrySpinningForUpToMillis(n as u8),
67            _ => unreachable!(),    // If this errors out, did a new enum member was added?
68        }
69    }
70}
71
72/// Socket options for the local peer to be set when the connection is established
73#[derive(Debug,PartialEq)]
74pub struct SocketOptions {
75    /// Also known as time-to-live (TTL), specifies how many hops may relay an outgoing packet before it being dropped and an error being returned.\
76    /// If NonZero, will cause the socket configuration function to be called with that value.
77    pub hops_to_live: Option<NonZeroU8>,
78    /// If specified, must be a power of 2 with the number of milliseconds to wait for any unsent messages when closing the connection.\
79    /// In Linux, defaults to 0.
80    pub linger_millis: Option<u32>,
81    /// Set this to `true` if lower latency is preferred over throughput; `false` (default on Linux) to use all the available bandwidth
82    /// (sending full packets, waiting up to 200ms for fulfillment).\
83    /// `None` will leave it as the system's default -- in Linux, false.
84    /// Some hints:
85    ///   - The peer reporting events may prefer to set it to `true`;
86    ///   - The other peer, receiving events from many, many peers and sending messages that won't be used for decision-making, may set it to `false`
87    pub no_delay: Option<bool>,
88}
89impl SocketOptions {
90    /// requires 8+(1+5)+(1+1)=16 bits to represent the data; reverse of [Self::from_repr()]
91    const fn as_repr(&self) -> u32 {
92        (self.no_delay.is_some() as u32)                                                        // << 0       // no delay flag
93        | (unwrap_bool_or_default(self.no_delay) as u32)                                           << 1       // no delay data
94        | (self.linger_millis.is_some() as u32)                                                    << 2       // linger flag
95        | if let Some(linger_millis) = self.linger_millis {
96              if linger_millis > 0 {
97                set_bits_from_power_of_2_u32(0, 3..=7, linger_millis) as u32        // 3..=7       // linger data
98              } else {
99                0
100              }
101          } else {
102            0
103          }
104        | (unwrap_non_zero_u8_or_default(self.hops_to_live) as u32)                                << 8      // hops
105    }
106    /// reverse of [Self::from_repr()]
107    const fn from_repr(repr: u32) -> Self {
108        let (no_delay_flag, no_delay_data, linger_flag, linger_data, hops) = (
109            (repr & (1 << 0) ) > 0,                             // no delay flag
110            (repr & (1 << 1) ) > 0,                             // no delay data
111            (repr & (1 << 2) ) > 0,                             // linger flag
112            get_power_of_2_u32_bits(repr as u64, 3..=7),   // power of 32 bits linger data
113            (repr & (( (!0u8)  as u32) << 8) ) >> 8,            // raw 8 bits hops data
114        );
115        Self {
116            hops_to_live:  if hops > 0      { NonZeroU8::new(hops as u8) } else { None },
117            linger_millis: if linger_flag   { Some(linger_data) }          else { None },
118            no_delay:      if no_delay_flag { Some(no_delay_data) }        else { None },
119        }
120    }
121}
122
123
124/// Implements something that could be called the "Zero-Cost Const Configuration Pattern", that produces a `usize`
125/// whose goal is to be the only const parameter of a generic struct (avoiding the alternative of bloating it with several const params).\
126/// When using the const "query functions" defined here in `if`s, the compiler will have the opportunity to cancel out any unreachable code (zero-cost abstraction).\
127/// Some commonly used combinations may be pre-defined in some enum variants, but you may always build unmapped possibilities through [Self::custom()].\
128/// Usage examples:
129/// ```nocompile
130///     see bellow
131#[derive(Debug,PartialEq)]
132pub struct ConstConfig {
133    /// Pre-allocates the sender/receiver buffers to this value (power of 2).
134    /// Setting it wisely may economize some `realloc` calls
135    pub msg_size_hint: u32,
136    /// How many messages (per peer) may be enqueued for output (power of 2)
137    /// before operations start to fail
138    pub sender_buffer: u32,
139    /// How many messages (per peer) may be enqueued for processing (power of 2)
140    /// before operations start to fail
141    pub receiver_buffer: u32,
142    /// How many milliseconds to wait when flushing messages out to a to-be-closed connection.
143    pub flush_timeout_millis: u16,
144    /// Specifies what to do when operations fail (full buffers / connection droppings)
145    pub retrying_strategy: RetryingStrategies,
146    /// Messes with the low level (system) socket options
147    pub socket_options: SocketOptions,
148    // /// Allows changing the backing queue for the sender/receiver buffers
149    // pub channel: Channels,
150    /// Allows changing the Stream executor options in regard to logging & collected/reported metrics
151    pub executor_instruments: /*reactive_mutiny::*/Instruments,
152}
153
154#[warn(non_snake_case)]
155impl ConstConfig {
156
157    #![allow(non_snake_case)]   // some consts accepts parameters... _/o\_
158
159    // the consts here determine what bits they use
160    // and may also specify ranges for store data (rather than just flags)
161
162    /// u32_value = 2^n
163    const MSG_SIZE_HINT: RangeInclusive<usize> = 0..=4;
164    /// u32_value = 2^n
165    const SENDER_BUFFER: RangeInclusive<usize> = 5..=9;
166    /// u32_value = 2^n
167    const RECEIVER_BUFFER: RangeInclusive<usize> = 10..=14;
168    /// u16_value = 2^n
169    const FLUSH_TIMEOUT_MILLIS: RangeInclusive<usize> = 15..=18;
170    /// One of [RetryingStrategies], converted by [RetryingStrategies::as_repr()]
171    const RETRYING_STRATEGY: RangeInclusive<usize> = 19..=29;
172    /// One of [SocketOptions], converted by [SocketOptions::as_repr()]
173    const SOCKET_OPTIONS: RangeInclusive<usize> = 30..=45;
174    // /// The channel types the synthatic-sugar macros should instantiate
175    // const CHANNEL: RangeInclusive<usize> = 46..=48;
176    /// The 8 bits from `reactive-mutiny`
177    const EXECUTOR_INSTRUMENTS: RangeInclusive<usize> = 49..=57;
178
179
180    /// Contains sane & performant defaults.\
181    /// Usage example:
182    /// ```nocompile
183    ///  const CONFIG: ConstConfig = ConstConfig {
184    ///     receiver_buffer: 1024,
185    ///     ..ConstConfig::default()
186    /// };
187    pub const fn default() -> ConstConfig {
188        ConstConfig {
189            msg_size_hint:                  1024,
190            sender_buffer:                  1024,
191            receiver_buffer:                1024,
192            flush_timeout_millis:           256,
193            retrying_strategy:              RetryingStrategies::RetryWithBackoffUpTo(20),
194            socket_options:                 SocketOptions { hops_to_live: NonZeroU8::new(255), linger_millis: Some(128), no_delay: Some(true) },
195            // channel:                        Channels::Atomic,
196            executor_instruments:           Instruments::from(Instruments::NoInstruments.into()),
197        }
198    }
199
200    /// For use when instantiating a generic struct that uses the "Const Config Pattern"
201    /// -- when choosing a pre-defined configuration.\
202    /// See also [Self::custom()].\
203    /// Example:
204    /// ```nocompile
205    ///     see bellow
206    pub const fn into(self) -> u64 {
207        let mut config = 0u64;
208        config = set_bits_from_power_of_2_u32(config, Self::MSG_SIZE_HINT,         self.msg_size_hint);
209        config = set_bits_from_power_of_2_u32(config, Self::SENDER_BUFFER,         self.sender_buffer);
210        config = set_bits_from_power_of_2_u32(config, Self::RECEIVER_BUFFER,       self.receiver_buffer);
211        config = set_bits_from_power_of_2_u16(config, Self::FLUSH_TIMEOUT_MILLIS,  self.flush_timeout_millis);
212        let retrying_strategy_repr = self.retrying_strategy.as_repr();
213        config = set_bits(config, Self::RETRYING_STRATEGY, retrying_strategy_repr as u64);
214        let socket_options_repr = self.socket_options.as_repr();
215        config = set_bits(config, Self::SOCKET_OPTIONS, socket_options_repr as u64);
216        // let channel_repr = self.channel as u8;
217        // config = set_bits(config, Self::CHANNEL, channel_repr as u64);
218        let executor_instruments_repr = self.executor_instruments.into();
219        config = set_bits(config, Self::EXECUTOR_INSTRUMENTS, executor_instruments_repr as u64);
220        config
221    }
222
223    /// Builds [Self] from the generic `const CONFIGS: usize` parameter used in structs
224    /// by the "Const Config Pattern"
225    pub const fn from(config: u64) -> Self {
226        let msg_size_hint              = get_power_of_2_u32_bits(config, Self::MSG_SIZE_HINT);
227        let sender_buffer              = get_power_of_2_u32_bits(config, Self::SENDER_BUFFER);
228        let receiver_buffer            = get_power_of_2_u32_bits(config, Self::RECEIVER_BUFFER);
229        let flush_timeout_millis       = get_power_of_2_u16_bits(config, Self::FLUSH_TIMEOUT_MILLIS);
230        let retrying_strategy_repr     = get_bits(config, Self::RETRYING_STRATEGY);
231        let socket_options_repr        = get_bits(config, Self::SOCKET_OPTIONS);
232        // let channel_repr                  = get_bits(config, Self::CHANNEL);
233        let executor_instruments_repr  = get_bits(config, Self::EXECUTOR_INSTRUMENTS);
234        Self {
235            msg_size_hint,
236            flush_timeout_millis,
237            sender_buffer,
238            receiver_buffer,
239            retrying_strategy:    RetryingStrategies::from_repr(retrying_strategy_repr as u16),
240            socket_options:       SocketOptions::from_repr(socket_options_repr as u32),
241            // channel:              if let Some(channel) = Channels::from_repr(channel_repr as usize) {channel} else {Channels::Atomic},
242            executor_instruments: Instruments::from(executor_instruments_repr as usize),
243        }
244    }
245
246    // query functions for business logic configuration attributes
247    //////////////////////////////////////////////////////////////
248    // to be used by the struct in which the generic `const CONFIGS: usize` resides
249
250    pub const fn extract_receiver_buffer(config: u64) -> u32 {
251        let config = Self::from(config);
252        config.receiver_buffer
253    }
254
255    pub const fn extract_executor_instruments(config: u64) -> usize {
256        let config = Self::from(config);
257        config.executor_instruments.into()
258    }
259
260    pub const fn extract_msg_size_hint(config: u64) -> u32 {
261        let config = Self::from(config);
262        config.msg_size_hint
263    }
264
265    pub const fn extract_graceful_close_timeout(config: u64) -> Duration {
266        let config = Self::from(config);
267        Duration::from_millis(config.flush_timeout_millis as u64)
268    }
269
270    pub const fn extract_retrying_strategy(config: u64) -> RetryingStrategies {
271        let config = Self::from(config);
272        config.retrying_strategy
273    }
274
275    pub const fn extract_socket_options(config: u64) -> SocketOptions {
276        let config = Self::from(config);
277        config.socket_options
278    }
279}
280
281/// Helper for retrieving data (other than simple flags) from the configuration
282/// -- as stored in the specified `bits` by [Self::set_bits()]
283const fn get_bits(config: u64, bits: RangeInclusive<usize>) -> u64 {
284    let bits_len = *bits.end()-*bits.start()+1;
285    (config>>*bits.start()) & ((1<<bits_len)-1)
286}
287
288/// Helper for storing data (other than simple flags) in the configuration
289/// -- stored in the specified `bits`.\
290/// `value` should not be higher than what fits in the bits.\
291/// Returns the `configs` with the `value` applied to it in a way it may be retrieved by [Self::get_bits()]
292const fn set_bits(mut config: u64, bits: RangeInclusive<usize>, value: u64) -> u64 {
293    let bits_len = *bits.end()-*bits.start()+1;
294    if value > (1<<bits_len)-1 {
295        // "The value specified is above the maximum the reserved bits for it can take"
296        unreachable!();
297    } else {
298        config &= !( ((1<<bits_len)-1) << *bits.start() );   // clear the target bits
299        config |= value << *bits.start();                    // set them
300        config
301    }
302}
303
304/// Retrieves 5 `bits` from `configs` that represents a power of 2 over the `u32` space
305const fn get_power_of_2_u32_bits(config: u64, bits: RangeInclusive<usize>) -> u32 {
306    let value = get_bits(config, bits);
307    1 << value
308}
309
310/// Packs, optimally, the `power_of_2_u32_value` into 5 `bits`, returning the new value for the given `config`
311const fn set_bits_from_power_of_2_u32(config: u64, bits: RangeInclusive<usize>, power_of_2_u32_value: u32) -> u64 {
312    if power_of_2_u32_value.is_power_of_two() {
313        set_bits(config, bits, power_of_2_u32_value.ilog2() as u64)
314    } else {
315        // "The value must be a power of 2"
316        unreachable!();
317    }
318}
319
320/// Retrieves 4 `bits` from `configs` that represents a power of 2 over the `u16` space
321const fn get_power_of_2_u16_bits(config: u64, bits: RangeInclusive<usize>) -> u16 {
322    let value = get_bits(config, bits);
323    1 << value
324}
325
326/// Packs, optimally, the `power_of_2_u16_value` into 4 `bits`, returning the new value for the given `config`
327const fn set_bits_from_power_of_2_u16(config: u64, bits: RangeInclusive<usize>, power_of_2_u16_value: u16) -> u64 {
328    if power_of_2_u16_value.is_power_of_two() {
329        set_bits(config, bits, power_of_2_u16_value.ilog2() as u64)
330    } else {
331        // "The value must be a power of 2"
332        unreachable!();
333    }
334}
335
336/// Retrieves 3 `bits` from `configs` that represents a power of 2 over the `u8` space
337const fn _get_power_of_3_u8_bits(config: u64, bits: RangeInclusive<usize>) -> u8 {
338    let value = get_bits(config, bits);
339    1 << value
340}
341
342/// Packs, optimally, the `power_of_2_u8_value` into 3 `bits`, returning the new value for the given `config`
343const fn _set_bits_from_power_of_2_u8(config: u64, bits: RangeInclusive<usize>, power_of_2_u8_value: u8) -> u64 {
344    if power_of_2_u8_value.is_power_of_two() {
345        set_bits(config, bits, power_of_2_u8_value.ilog2() as u64)
346    } else {
347        // "The value must be a power of 2"
348        unreachable!();
349    }
350}
351
352// const versions of some `Option<>` functions
353//////////////////////////////////////////////
354
355/// same as Option::<bool>::unwrap_or(false), but const
356const fn unwrap_bool_or_default(option: Option<bool>) -> bool {
357    match option {
358        Some(v) => v,
359        None => false,
360    }
361}
362/// same as Option::<u8>::unwrap_or(0), but const
363const fn _unwrap_u8_or_default(option: Option<u8>) -> u8 {
364    match option {
365        Some(v) => v,
366        None => 0,
367    }
368}
369/// same as Option::<u16>::unwrap_or(0), but const
370const fn _unwrap_u16_or_default(option: Option<u16>) -> u16 {
371    match option {
372        Some(v) => v,
373        None => 0,
374    }
375}
376/// same as Option::<u32>::unwrap_or(0), but const
377const fn _unwrap_u32_or_default(option: Option<u32>) -> u32 {
378    match option {
379        Some(v) => v,
380        None => 0,
381    }
382}
383
384// same as Option::<NonZero*>::map(|v| v.get()).unwrap_or(0), but const
385const fn unwrap_non_zero_u8_or_default(option: Option<NonZeroU8>) -> u8 {
386    match option {
387        Some(v) => v.get(),
388        None => 0,
389    }
390}
391
392
393/// Unit tests & enforces the requisites of the [stream_executor](self) module.\
394/// Tests here mixes manual & automated assertions -- you should manually inspect the output of each one and check if the log outputs make sense
395#[cfg(any(test,doc))]
396mod tests {
397    use super::*;
398
399    #[cfg_attr(not(doc),test)]
400    fn retrying_strategies_repr() {
401        let subjects = vec![
402            vec![
403                RetryingStrategies::DoNotRetry,
404                RetryingStrategies::EndCommunications,
405            ].into_iter(),
406            (0..8).map(|n| RetryingStrategies::RetryWithBackoffUpTo(1<<n)).collect::<Vec<_>>().into_iter(),
407            (0..8).map(|n| RetryingStrategies::RetryYieldingForUpToMillis(1<<n)).collect::<Vec<_>>().into_iter(),
408            (0..8).map(|n| RetryingStrategies::RetrySpinningForUpToMillis(1<<n)).collect::<Vec<_>>().into_iter(),
409        ].into_iter().flatten();
410
411        for expected in subjects {
412            let converted = RetryingStrategies::as_repr(&expected);
413            let reconverted = RetryingStrategies::from_repr(converted);
414            assert_eq!(reconverted, expected, "FAILED: {:?} (repr: 0x{:x}); reconverted: {:?}", expected, converted, reconverted);
415        }
416    }
417
418    #[cfg_attr(not(doc),test)]
419    fn socket_options_repr() {
420        let subjects = vec![
421            vec![
422                SocketOptions { hops_to_live: None,                 linger_millis: None,    no_delay: None},
423                SocketOptions { hops_to_live: None,                 linger_millis: None,    no_delay: Some(false)},
424                SocketOptions { hops_to_live: None,                 linger_millis: None,    no_delay: Some(true)},
425                SocketOptions { hops_to_live: None,                 linger_millis: Some(1), no_delay: None},
426                SocketOptions { hops_to_live: NonZeroU8::new(1), linger_millis: None,    no_delay: None},
427            ].into_iter(),
428            (0..31).map(|n| SocketOptions { hops_to_live: None,                    linger_millis: Some(1<<n), no_delay: None }).collect::<Vec<_>>().into_iter(),
429            (0..8) .map(|n| SocketOptions { hops_to_live: NonZeroU8::new(1<<n), linger_millis: None,       no_delay: None }).collect::<Vec<_>>().into_iter(),
430        ].into_iter().flatten();
431
432        for expected in subjects {
433            let converted = SocketOptions::as_repr(&expected);
434            let reconverted = SocketOptions::from_repr(converted);
435            assert_eq!(reconverted, expected, "FAILED: {:?} (repr: 0x{:x}); reconverted: {:?}", expected, converted, reconverted);
436        }
437        // for expected in subjects {
438        //     let converted = SocketOptions::into_repr(&expected);
439        //     let reconverted = SocketOptions::from_repr(converted);
440        //     println!("{:?}: repr: 0x{:x}; worked? {} ---- {:?}", expected, converted, reconverted==expected, reconverted);
441        // }
442
443    }
444
445    #[cfg_attr(not(doc),test)]
446    fn const_config() {
447        let expected = || ConstConfig {
448            msg_size_hint:                  1024,
449            sender_buffer:                  2048,
450            receiver_buffer:                2048,
451            flush_timeout_millis:           256,
452            retrying_strategy:              RetryingStrategies::RetryWithBackoffUpTo(14),
453            socket_options:                 SocketOptions { hops_to_live: NonZeroU8::new(255), linger_millis: Some(128), no_delay: Some(true) },
454            // channel:                     Channels::Atomic,
455            executor_instruments:           Instruments::from(Instruments::LogsWithExpensiveMetrics.into()),
456        };
457        let converted = ConstConfig::into(expected());
458        let reconverted = ConstConfig::from(converted);
459        assert_eq!(reconverted, expected(), "FAILED: {:?} (repr: 0x{:x}); reconverted: {:?}", expected(), converted, reconverted);
460    }
461
462}