1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
use std::error::Error;
use std::fs::File;
use std::io::prelude::*;
use std::time::Duration;

/**
 * Wrapper for the middleware configurations.
*/
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct Configuration {
    ///Stack size of the spawned Reader/Sender threads in bytes.
    pub thread_stack_size: usize,

    ///Stack size of the main Middleware thread in bytes.
    pub middleware_thread_stack_size: usize,

    ///Timeout in microseconds used by the Sender thread for reading messages from the channel.
    pub stream_sender_timeout: u64,

    ///Stability calculation flag.
    pub track_causal_stability: bool,

    ///Parameters that set message batching.
    pub batching: Batching,
}

impl Configuration {
    /**
     * Returns the timeout wrapped in a Duration.
     */
    pub fn get_stream_sender_timeout(&self) -> Duration {
        Duration::from_micros(self.stream_sender_timeout)
    }
}

/**
 * Reads the middleware configuration from a TOML file.
 * An error is returned if not successful.
 *
 * # Arguments
 *
 * `configuration_file_path` - path to the TOML configuration file.
 */
pub fn read_configuration_file(
    configuration_file_path: String,
) -> Result<Configuration, Box<dyn Error>> {
    let mut configuration_string = String::new();
    let mut file = File::open(configuration_file_path)?;

    file.read_to_string(&mut configuration_string)?;
    let configuration: Configuration = toml::from_str(&configuration_string)?;

    Ok(configuration)
}

/**
 * Configuration parameters for the Sender threads message batching.
 */
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct Batching {
    ///Number of bytes to be buffered before calling Write.
    pub size: u64,

    ///Number of messages to be buffered before calling Write.
    pub message_number: usize,

    ///Lower value of the ACK timeout in microseconds.
    pub lower_timeout: u64,

    ///Upper value of the ACK timeout in microseconds.
    pub upper_timeout: u64,
}

impl Batching {
    /**
     * Returns the timeout wrapped in a Duration.
     */
    pub fn get_lower_timeout(&self) -> Duration {
        Duration::from_micros(self.lower_timeout)
    }

    /**
     * Returns the timeout wrapped in a Duration.
     */
    pub fn get_upper_timeout(&self) -> Duration {
        Duration::from_micros(self.upper_timeout)
    }
}