1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
use serde::Deserialize;
use crate::api::error::{Result, SpringError};
const SPRING_CONFIG_DEFAULT: &str = r#"
[worker]
# Number of generic worker threads. Generic worker threads deal with internal and sink tasks.
# Setting this to > 1 may improve throughput but lead to out-of-order stream processing.
n_generic_worker_threads = 1
# Number of source worker threads. Source worker threads collect rows from foreign source.
# Too many number may may cause row fraud in runtime.
# Setting this to > 1 may improve throughput but lead to out-of-order stream processing.
n_source_worker_threads = 1
[memory]
# How much memory is allowed to be used in SpringQL streaming runtime.
upper_limit_bytes = 10_000_000
# Percentage over `upper_limit_bytes` to transit from Moderate state to Severe.
# In Severe state, internal scheduler is changed to exhibit memory-resilience.
moderate_to_severe_percent = 60
# Percentage over `upper_limit_bytes` to transit from Severe state to Critical.
# In Critical state, all intermediate rows are purged to release memory.
severe_to_critical_percent = 95
critical_to_severe_percent = 80
severe_to_moderate_percent = 40
# Interval for MemoryStateMachineWorker to publish TransitPerformanceMetricsSummary event.
memory_state_transition_interval_msec = 10
# Interval for PerformanceMonitorWorker to publish ReportMetricsSummary event.
performance_metrics_summary_report_interval_msec = 10
[web_console]
# Whether to enable POST API request to web console.
enable_report_post = false
report_interval_msec = 3_000
host = "127.0.0.1"
port = 8050
timeout_msec = 3_000
[source_reader]
net_connect_timeout_msec = 1_000
net_read_timeout_msec = 100
can_read_timeout_msec = 100
[sink_writer]
net_connect_timeout_msec = 1_000
net_write_timeout_msec = 100
http_connect_timeout_msec = 1_000
http_timeout_msec = 100
"#;
#[allow(missing_docs)]
#[derive(Clone, Eq, PartialEq, Debug, Deserialize)]
pub struct SpringConfig {
pub worker: SpringWorkerConfig,
pub memory: SpringMemoryConfig,
pub web_console: SpringWebConsoleConfig,
pub source_reader: SpringSourceReaderConfig,
pub sink_writer: SpringSinkWriterConfig,
}
impl Default for SpringConfig {
fn default() -> Self {
Self::new("").expect("default configuration must be valid")
}
}
impl SpringConfig {
pub fn new(overwrite_config_toml: &str) -> Result<Self> {
let default_conf = config::Config::builder()
.add_source(config::File::from_str(
SPRING_CONFIG_DEFAULT,
config::FileFormat::Toml,
))
.build()
.expect("SPRING_CONFIG_DEFAULT is in wrong format");
let c = config::Config::builder()
.add_source(default_conf)
.add_source(config::File::from_str(
overwrite_config_toml,
config::FileFormat::Toml,
))
.build()
.map_err(|e| SpringError::InvalidFormat {
s: overwrite_config_toml.to_string(),
source: e.into(),
})?;
c.try_deserialize()
.map_err(|e| SpringError::InvalidConfig { source: e.into() })
}
pub fn from_toml(overwrite_config_toml: &str) -> Result<SpringConfig> {
SpringConfig::new(overwrite_config_toml)
}
}
#[allow(missing_docs)]
#[derive(Copy, Clone, Eq, PartialEq, Debug, Deserialize)]
pub struct SpringWorkerConfig {
pub n_generic_worker_threads: u16,
pub n_source_worker_threads: u16,
}
#[allow(missing_docs)]
#[derive(Copy, Clone, Eq, PartialEq, Debug, Deserialize)]
pub struct SpringMemoryConfig {
pub upper_limit_bytes: u64,
pub moderate_to_severe_percent: u8,
pub severe_to_critical_percent: u8,
pub critical_to_severe_percent: u8,
pub severe_to_moderate_percent: u8,
pub memory_state_transition_interval_msec: u32,
pub performance_metrics_summary_report_interval_msec: u32,
}
#[allow(missing_docs)]
#[derive(Clone, Eq, PartialEq, Debug, Deserialize)]
pub struct SpringWebConsoleConfig {
pub enable_report_post: bool,
pub report_interval_msec: u32,
pub host: String,
pub port: u16,
pub timeout_msec: u32,
}
#[allow(missing_docs)]
#[derive(Copy, Clone, Eq, PartialEq, Debug, Deserialize)]
pub struct SpringSourceReaderConfig {
pub net_connect_timeout_msec: u32,
pub net_read_timeout_msec: u32,
pub can_read_timeout_msec: u32,
}
#[allow(missing_docs)]
#[derive(Copy, Clone, Eq, PartialEq, Debug, Deserialize)]
pub struct SpringSinkWriterConfig {
pub net_connect_timeout_msec: u32,
pub net_write_timeout_msec: u32,
pub http_timeout_msec: u32,
pub http_connect_timeout_msec: u32,
}