springql_core/api/
spring_config.rs

1// This file is part of https://github.com/SpringQL/SpringQL which is licensed under MIT OR Apache-2.0. See file LICENSE-MIT or LICENSE-APACHE for full license details.
2
3use serde::Deserialize;
4
5use crate::api::error::{Result, SpringError};
6
7/// Default configuration.
8///
9/// Default key-values are overwritten by `overwrite_config_toml` parameter in `SpringConfig::new()`.
10const SPRING_CONFIG_DEFAULT: &str = r#"
11[worker]
12# Number of generic worker threads. Generic worker threads deal with internal and sink tasks.
13# Setting this to > 1 may improve throughput but lead to out-of-order stream processing.
14n_generic_worker_threads = 1
15
16# Number of source worker threads. Source worker threads collect rows from foreign source.
17# Too many number may may cause row fraud in runtime.
18# Setting this to > 1 may improve throughput but lead to out-of-order stream processing.
19n_source_worker_threads = 1
20
21# How long a generic worker or a source worker sleeps if it does not receive any row from the upstream.
22# Small number will improve the initial row's E2E latency but increase the CPU usage.
23sleep_msec_no_row = 100
24
25[memory]
26# How much memory is allowed to be used in SpringQL streaming runtime.
27upper_limit_bytes = 10_000_000
28
29# Percentage over `upper_limit_bytes` to transit from Moderate state to Severe.
30# In Severe state, internal scheduler is changed to exhibit memory-resilience.
31moderate_to_severe_percent = 60
32
33# Percentage over `upper_limit_bytes` to transit from Severe state to Critical.
34# In Critical state, all intermediate rows are purged to release memory.
35severe_to_critical_percent = 95
36
37critical_to_severe_percent = 80
38severe_to_moderate_percent = 40
39
40# Interval for MemoryStateMachineWorker to publish TransitPerformanceMetricsSummary event.
41memory_state_transition_interval_msec = 10
42
43# Interval for PerformanceMonitorWorker to publish ReportMetricsSummary event.
44performance_metrics_summary_report_interval_msec = 10
45
46[web_console]
47# Whether to enable POST API request to web console.
48enable_report_post = false
49
50report_interval_msec = 3_000
51
52host = "127.0.0.1"
53port = 8050
54
55timeout_msec = 3_000
56
57[source_reader]
58net_connect_timeout_msec = 1_000
59net_read_timeout_msec = 100
60
61can_read_timeout_msec = 100
62
63[sink_writer]
64net_connect_timeout_msec = 1_000
65net_write_timeout_msec = 100
66
67http_connect_timeout_msec = 1_000
68http_timeout_msec = 100
69"#;
70
71/// Top-level config.
72#[allow(missing_docs)]
73#[derive(Clone, Eq, PartialEq, Debug, Deserialize)]
74pub struct SpringConfig {
75    pub worker: SpringWorkerConfig,
76    pub memory: SpringMemoryConfig,
77    pub web_console: SpringWebConsoleConfig,
78    pub source_reader: SpringSourceReaderConfig,
79    pub sink_writer: SpringSinkWriterConfig,
80}
81
82impl Default for SpringConfig {
83    fn default() -> Self {
84        Self::new("").expect("default configuration must be valid")
85    }
86}
87
88impl SpringConfig {
89    /// # Failures
90    ///
91    /// - [SpringError::InvalidConfig](crate::api::error::SpringError::InvalidConfig) when:
92    ///   - `overwrite_config_toml` includes invalid key and/or value.
93    /// - [SpringError::InvalidFormat](crate::api::error::SpringError::InvalidFormat) when:
94    ///   - `overwrite_config_toml` is not valid as TOML.
95    pub fn new(overwrite_config_toml: &str) -> Result<Self> {
96        let default_conf = config::Config::builder()
97            .add_source(config::File::from_str(
98                SPRING_CONFIG_DEFAULT,
99                config::FileFormat::Toml,
100            ))
101            .build()
102            .expect("SPRING_CONFIG_DEFAULT is in wrong format");
103
104        let c = config::Config::builder()
105            .add_source(default_conf)
106            .add_source(config::File::from_str(
107                overwrite_config_toml,
108                config::FileFormat::Toml,
109            ))
110            .build()
111            .map_err(|e| SpringError::InvalidFormat {
112                s: overwrite_config_toml.to_string(),
113                source: e.into(),
114            })?;
115
116        c.try_deserialize()
117            .map_err(|e| SpringError::InvalidConfig { source: e.into() })
118    }
119
120    /// Configuration by TOML format string.
121    ///
122    /// # Parameters
123    ///
124    /// - `overwrite_config_toml`: TOML format configuration to overwrite default. See `SPRING_CONFIG_DEFAULT` in [spring_config.rs](https://github.com/SpringQL/SpringQL/tree/main/springql-core/src/api/spring_config.rs) for full-set default configuration.
125    ///
126    /// # Failures
127    ///
128    /// - [SpringError::InvalidConfig](crate::api::error::SpringError::InvalidConfig) when:
129    ///   - `overwrite_config_toml` includes invalid key and/or value.
130    /// - [SpringError::InvalidFormat](crate::api::error::SpringError::InvalidFormat) when:
131    ///   - `overwrite_config_toml` is not valid as TOML.
132    pub fn from_toml(overwrite_config_toml: &str) -> Result<SpringConfig> {
133        SpringConfig::new(overwrite_config_toml)
134    }
135}
136
137/// Config related to worker threads.
138#[allow(missing_docs)]
139#[derive(Copy, Clone, Eq, PartialEq, Debug, Deserialize)]
140pub struct SpringWorkerConfig {
141    pub n_generic_worker_threads: u16,
142    pub n_source_worker_threads: u16,
143    pub sleep_msec_no_row: u64,
144}
145
146/// Config related to memory management.
147#[allow(missing_docs)]
148#[derive(Copy, Clone, Eq, PartialEq, Debug, Deserialize)]
149pub struct SpringMemoryConfig {
150    pub upper_limit_bytes: u64,
151
152    pub moderate_to_severe_percent: u8,
153    pub severe_to_critical_percent: u8,
154
155    pub critical_to_severe_percent: u8,
156    pub severe_to_moderate_percent: u8,
157
158    pub memory_state_transition_interval_msec: u32,
159    pub performance_metrics_summary_report_interval_msec: u32,
160}
161
162/// Config related to web console.
163#[allow(missing_docs)]
164#[derive(Clone, Eq, PartialEq, Debug, Deserialize)]
165pub struct SpringWebConsoleConfig {
166    pub enable_report_post: bool,
167
168    pub report_interval_msec: u32,
169
170    pub host: String,
171    pub port: u16,
172
173    pub timeout_msec: u32,
174}
175
176/// Config related to source reader
177#[allow(missing_docs)]
178#[derive(Copy, Clone, Eq, PartialEq, Debug, Deserialize)]
179pub struct SpringSourceReaderConfig {
180    pub net_connect_timeout_msec: u32,
181    pub net_read_timeout_msec: u32,
182
183    pub can_read_timeout_msec: u32,
184}
185
186/// Config related to sink writer.
187#[allow(missing_docs)]
188#[derive(Copy, Clone, Eq, PartialEq, Debug, Deserialize)]
189pub struct SpringSinkWriterConfig {
190    pub net_connect_timeout_msec: u32,
191    pub net_write_timeout_msec: u32,
192
193    pub http_timeout_msec: u32,
194    pub http_connect_timeout_msec: u32,
195}