springql_core/api/
spring_config.rs1use serde::Deserialize;
4
5use crate::api::error::{Result, SpringError};
6
7const SPRING_CONFIG_DEFAULT: &str = r#"
11[worker]
12# Number of generic worker threads. Generic worker threads deal with internal and sink tasks.
13# Setting this to > 1 may improve throughput but lead to out-of-order stream processing.
14n_generic_worker_threads = 1
15
16# Number of source worker threads. Source worker threads collect rows from foreign source.
17# Too many number may may cause row fraud in runtime.
18# Setting this to > 1 may improve throughput but lead to out-of-order stream processing.
19n_source_worker_threads = 1
20
21# How long a generic worker or a source worker sleeps if it does not receive any row from the upstream.
22# Small number will improve the initial row's E2E latency but increase the CPU usage.
23sleep_msec_no_row = 100
24
25[memory]
26# How much memory is allowed to be used in SpringQL streaming runtime.
27upper_limit_bytes = 10_000_000
28
29# Percentage over `upper_limit_bytes` to transit from Moderate state to Severe.
30# In Severe state, internal scheduler is changed to exhibit memory-resilience.
31moderate_to_severe_percent = 60
32
33# Percentage over `upper_limit_bytes` to transit from Severe state to Critical.
34# In Critical state, all intermediate rows are purged to release memory.
35severe_to_critical_percent = 95
36
37critical_to_severe_percent = 80
38severe_to_moderate_percent = 40
39
40# Interval for MemoryStateMachineWorker to publish TransitPerformanceMetricsSummary event.
41memory_state_transition_interval_msec = 10
42
43# Interval for PerformanceMonitorWorker to publish ReportMetricsSummary event.
44performance_metrics_summary_report_interval_msec = 10
45
46[web_console]
47# Whether to enable POST API request to web console.
48enable_report_post = false
49
50report_interval_msec = 3_000
51
52host = "127.0.0.1"
53port = 8050
54
55timeout_msec = 3_000
56
57[source_reader]
58net_connect_timeout_msec = 1_000
59net_read_timeout_msec = 100
60
61can_read_timeout_msec = 100
62
63[sink_writer]
64net_connect_timeout_msec = 1_000
65net_write_timeout_msec = 100
66
67http_connect_timeout_msec = 1_000
68http_timeout_msec = 100
69"#;
70
71#[allow(missing_docs)]
73#[derive(Clone, Eq, PartialEq, Debug, Deserialize)]
74pub struct SpringConfig {
75 pub worker: SpringWorkerConfig,
76 pub memory: SpringMemoryConfig,
77 pub web_console: SpringWebConsoleConfig,
78 pub source_reader: SpringSourceReaderConfig,
79 pub sink_writer: SpringSinkWriterConfig,
80}
81
82impl Default for SpringConfig {
83 fn default() -> Self {
84 Self::new("").expect("default configuration must be valid")
85 }
86}
87
88impl SpringConfig {
89 pub fn new(overwrite_config_toml: &str) -> Result<Self> {
96 let default_conf = config::Config::builder()
97 .add_source(config::File::from_str(
98 SPRING_CONFIG_DEFAULT,
99 config::FileFormat::Toml,
100 ))
101 .build()
102 .expect("SPRING_CONFIG_DEFAULT is in wrong format");
103
104 let c = config::Config::builder()
105 .add_source(default_conf)
106 .add_source(config::File::from_str(
107 overwrite_config_toml,
108 config::FileFormat::Toml,
109 ))
110 .build()
111 .map_err(|e| SpringError::InvalidFormat {
112 s: overwrite_config_toml.to_string(),
113 source: e.into(),
114 })?;
115
116 c.try_deserialize()
117 .map_err(|e| SpringError::InvalidConfig { source: e.into() })
118 }
119
120 pub fn from_toml(overwrite_config_toml: &str) -> Result<SpringConfig> {
133 SpringConfig::new(overwrite_config_toml)
134 }
135}
136
137#[allow(missing_docs)]
139#[derive(Copy, Clone, Eq, PartialEq, Debug, Deserialize)]
140pub struct SpringWorkerConfig {
141 pub n_generic_worker_threads: u16,
142 pub n_source_worker_threads: u16,
143 pub sleep_msec_no_row: u64,
144}
145
146#[allow(missing_docs)]
148#[derive(Copy, Clone, Eq, PartialEq, Debug, Deserialize)]
149pub struct SpringMemoryConfig {
150 pub upper_limit_bytes: u64,
151
152 pub moderate_to_severe_percent: u8,
153 pub severe_to_critical_percent: u8,
154
155 pub critical_to_severe_percent: u8,
156 pub severe_to_moderate_percent: u8,
157
158 pub memory_state_transition_interval_msec: u32,
159 pub performance_metrics_summary_report_interval_msec: u32,
160}
161
162#[allow(missing_docs)]
164#[derive(Clone, Eq, PartialEq, Debug, Deserialize)]
165pub struct SpringWebConsoleConfig {
166 pub enable_report_post: bool,
167
168 pub report_interval_msec: u32,
169
170 pub host: String,
171 pub port: u16,
172
173 pub timeout_msec: u32,
174}
175
176#[allow(missing_docs)]
178#[derive(Copy, Clone, Eq, PartialEq, Debug, Deserialize)]
179pub struct SpringSourceReaderConfig {
180 pub net_connect_timeout_msec: u32,
181 pub net_read_timeout_msec: u32,
182
183 pub can_read_timeout_msec: u32,
184}
185
186#[allow(missing_docs)]
188#[derive(Copy, Clone, Eq, PartialEq, Debug, Deserialize)]
189pub struct SpringSinkWriterConfig {
190 pub net_connect_timeout_msec: u32,
191 pub net_write_timeout_msec: u32,
192
193 pub http_timeout_msec: u32,
194 pub http_connect_timeout_msec: u32,
195}