rustfs_obs/
config.rs

1// Copyright 2024 RustFS Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use rustfs_config::{
16    APP_NAME, DEFAULT_LOG_DIR, DEFAULT_LOG_FILENAME, DEFAULT_LOG_KEEP_FILES, DEFAULT_LOG_LEVEL, DEFAULT_LOG_ROTATION_SIZE_MB,
17    DEFAULT_LOG_ROTATION_TIME, ENVIRONMENT, METER_INTERVAL, SAMPLE_RATIO, SERVICE_VERSION, USE_STDOUT,
18};
19use serde::{Deserialize, Serialize};
20use std::env;
21
22/// OpenTelemetry Configuration
23/// Add service name, service version, environment
24/// Add interval time for metric collection
25/// Add sample ratio for trace sampling
26/// Add endpoint for metric collection
27/// Add use_stdout for output to stdout
28/// Add logger level for log level
29/// Add local_logging_enabled for local logging enabled
30#[derive(Debug, Deserialize, Serialize, Clone)]
31pub struct OtelConfig {
32    pub endpoint: String,                    // Endpoint for metric collection
33    pub use_stdout: Option<bool>,            // Output to stdout
34    pub sample_ratio: Option<f64>,           // Trace sampling ratio
35    pub meter_interval: Option<u64>,         // Metric collection interval
36    pub service_name: Option<String>,        // Service name
37    pub service_version: Option<String>,     // Service version
38    pub environment: Option<String>,         // Environment
39    pub logger_level: Option<String>,        // Logger level
40    pub local_logging_enabled: Option<bool>, // Local logging enabled
41    // Added flexi_logger related configurations
42    pub log_directory: Option<String>,     // LOG FILE DIRECTORY
43    pub log_filename: Option<String>,      // The name of the log file
44    pub log_rotation_size_mb: Option<u64>, // Log file size cut threshold (MB)
45    pub log_rotation_time: Option<String>, // Logs are cut by time (Hour, Day,Minute, Second)
46    pub log_keep_files: Option<u16>,       // Number of log files to be retained
47}
48
49impl OtelConfig {
50    /// Helper function: Extract observable configuration from environment variables
51    pub fn extract_otel_config_from_env(endpoint: Option<String>) -> OtelConfig {
52        let endpoint = if let Some(endpoint) = endpoint {
53            if endpoint.is_empty() {
54                env::var("RUSTFS_OBS_ENDPOINT").unwrap_or_else(|_| "".to_string())
55            } else {
56                endpoint
57            }
58        } else {
59            env::var("RUSTFS_OBS_ENDPOINT").unwrap_or_else(|_| "".to_string())
60        };
61        let mut use_stdout = env::var("RUSTFS_OBS_USE_STDOUT")
62            .ok()
63            .and_then(|v| v.parse().ok())
64            .or(Some(USE_STDOUT));
65        if endpoint.is_empty() {
66            use_stdout = Some(true);
67        }
68
69        OtelConfig {
70            endpoint,
71            use_stdout,
72            sample_ratio: env::var("RUSTFS_OBS_SAMPLE_RATIO")
73                .ok()
74                .and_then(|v| v.parse().ok())
75                .or(Some(SAMPLE_RATIO)),
76            meter_interval: env::var("RUSTFS_OBS_METER_INTERVAL")
77                .ok()
78                .and_then(|v| v.parse().ok())
79                .or(Some(METER_INTERVAL)),
80            service_name: env::var("RUSTFS_OBS_SERVICE_NAME")
81                .ok()
82                .and_then(|v| v.parse().ok())
83                .or(Some(APP_NAME.to_string())),
84            service_version: env::var("RUSTFS_OBS_SERVICE_VERSION")
85                .ok()
86                .and_then(|v| v.parse().ok())
87                .or(Some(SERVICE_VERSION.to_string())),
88            environment: env::var("RUSTFS_OBS_ENVIRONMENT")
89                .ok()
90                .and_then(|v| v.parse().ok())
91                .or(Some(ENVIRONMENT.to_string())),
92            logger_level: env::var("RUSTFS_OBS_LOGGER_LEVEL")
93                .ok()
94                .and_then(|v| v.parse().ok())
95                .or(Some(DEFAULT_LOG_LEVEL.to_string())),
96            local_logging_enabled: env::var("RUSTFS_OBS_LOCAL_LOGGING_ENABLED")
97                .ok()
98                .and_then(|v| v.parse().ok())
99                .or(Some(false)),
100            log_directory: env::var("RUSTFS_OBS_LOG_DIRECTORY")
101                .ok()
102                .and_then(|v| v.parse().ok())
103                .or(Some(DEFAULT_LOG_DIR.to_string())),
104            log_filename: env::var("RUSTFS_OBS_LOG_FILENAME")
105                .ok()
106                .and_then(|v| v.parse().ok())
107                .or(Some(DEFAULT_LOG_FILENAME.to_string())),
108            log_rotation_size_mb: env::var("RUSTFS_OBS_LOG_ROTATION_SIZE_MB")
109                .ok()
110                .and_then(|v| v.parse().ok())
111                .or(Some(DEFAULT_LOG_ROTATION_SIZE_MB)), // Default to 100 MB
112            log_rotation_time: env::var("RUSTFS_OBS_LOG_ROTATION_TIME")
113                .ok()
114                .and_then(|v| v.parse().ok())
115                .or(Some(DEFAULT_LOG_ROTATION_TIME.to_string())), // Default to "Day"
116            log_keep_files: env::var("RUSTFS_OBS_LOG_KEEP_FILES")
117                .ok()
118                .and_then(|v| v.parse().ok())
119                .or(Some(DEFAULT_LOG_KEEP_FILES)), // Default to keeping 30 log files
120        }
121    }
122
123    /// Create a new instance of OtelConfig with default values
124    ///
125    /// # Returns
126    /// A new instance of OtelConfig
127    pub fn new() -> Self {
128        Self::extract_otel_config_from_env(None)
129    }
130}
131
132impl Default for OtelConfig {
133    fn default() -> Self {
134        Self::new()
135    }
136}
137
138/// Kafka Sink Configuration - Add batch parameters
139#[derive(Debug, Deserialize, Serialize, Clone)]
140pub struct KafkaSinkConfig {
141    pub brokers: String,
142    pub topic: String,
143    pub batch_size: Option<usize>,     // Batch size, default 100
144    pub batch_timeout_ms: Option<u64>, // Batch timeout time, default 1000ms
145}
146
147impl KafkaSinkConfig {
148    pub fn new() -> Self {
149        Self::default()
150    }
151}
152
153impl Default for KafkaSinkConfig {
154    fn default() -> Self {
155        Self {
156            brokers: env::var("RUSTFS_SINKS_KAFKA_BROKERS")
157                .ok()
158                .filter(|s| !s.trim().is_empty())
159                .unwrap_or_else(|| "localhost:9092".to_string()),
160            topic: env::var("RUSTFS_SINKS_KAFKA_TOPIC")
161                .ok()
162                .filter(|s| !s.trim().is_empty())
163                .unwrap_or_else(|| "rustfs_sink".to_string()),
164            batch_size: Some(100),
165            batch_timeout_ms: Some(1000),
166        }
167    }
168}
169
170/// Webhook Sink Configuration - Add Retry Parameters
171#[derive(Debug, Deserialize, Serialize, Clone)]
172pub struct WebhookSinkConfig {
173    pub endpoint: String,
174    pub auth_token: String,
175    pub max_retries: Option<usize>,  // Maximum number of retry times, default 3
176    pub retry_delay_ms: Option<u64>, // Retry the delay cardinality, default 100ms
177}
178
179impl WebhookSinkConfig {
180    pub fn new() -> Self {
181        Self::default()
182    }
183}
184
185impl Default for WebhookSinkConfig {
186    fn default() -> Self {
187        Self {
188            endpoint: env::var("RUSTFS_SINKS_WEBHOOK_ENDPOINT")
189                .ok()
190                .filter(|s| !s.trim().is_empty())
191                .unwrap_or_else(|| "http://localhost:8080".to_string()),
192            auth_token: env::var("RUSTFS_SINKS_WEBHOOK_AUTH_TOKEN")
193                .ok()
194                .filter(|s| !s.trim().is_empty())
195                .unwrap_or_else(|| "rustfs_webhook_token".to_string()),
196            max_retries: Some(3),
197            retry_delay_ms: Some(100),
198        }
199    }
200}
201
202/// File Sink Configuration - Add buffering parameters
203#[derive(Debug, Deserialize, Serialize, Clone)]
204pub struct FileSinkConfig {
205    pub path: String,
206    pub buffer_size: Option<usize>,     // Write buffer size, default 8192
207    pub flush_interval_ms: Option<u64>, // Refresh interval time, default 1000ms
208    pub flush_threshold: Option<usize>, // Refresh threshold, default 100 logs
209}
210
211impl FileSinkConfig {
212    pub fn get_default_log_path() -> String {
213        let temp_dir = env::temp_dir().join("rustfs");
214
215        if let Err(e) = std::fs::create_dir_all(&temp_dir) {
216            eprintln!("Failed to create log directory: {e}");
217            return "rustfs/rustfs.log".to_string();
218        }
219        temp_dir
220            .join("rustfs.log")
221            .to_str()
222            .unwrap_or("rustfs/rustfs.log")
223            .to_string()
224    }
225    pub fn new() -> Self {
226        Self::default()
227    }
228}
229
230impl Default for FileSinkConfig {
231    fn default() -> Self {
232        Self {
233            path: env::var("RUSTFS_SINKS_FILE_PATH")
234                .ok()
235                .filter(|s| !s.trim().is_empty())
236                .unwrap_or_else(Self::get_default_log_path),
237            buffer_size: env::var("RUSTFS_SINKS_FILE_BUFFER_SIZE")
238                .ok()
239                .and_then(|v| v.parse().ok())
240                .or(Some(8192)),
241            flush_interval_ms: env::var("RUSTFS_SINKS_FILE_FLUSH_INTERVAL_MS")
242                .ok()
243                .and_then(|v| v.parse().ok())
244                .or(Some(1000)),
245            flush_threshold: env::var("RUSTFS_SINKS_FILE_FLUSH_THRESHOLD")
246                .ok()
247                .and_then(|v| v.parse().ok())
248                .or(Some(100)),
249        }
250    }
251}
252
253/// Sink configuration collection
254#[derive(Debug, Clone, Serialize, Deserialize)]
255#[serde(tag = "type")]
256pub enum SinkConfig {
257    File(FileSinkConfig),
258    Kafka(KafkaSinkConfig),
259    Webhook(WebhookSinkConfig),
260}
261
262impl SinkConfig {
263    pub fn new() -> Self {
264        Self::File(FileSinkConfig::new())
265    }
266}
267
268impl Default for SinkConfig {
269    fn default() -> Self {
270        Self::new()
271    }
272}
273
274///Logger Configuration
275#[derive(Debug, Deserialize, Serialize, Clone)]
276pub struct LoggerConfig {
277    pub queue_capacity: Option<usize>,
278}
279
280impl LoggerConfig {
281    pub fn new() -> Self {
282        Self {
283            queue_capacity: Some(10000),
284        }
285    }
286}
287
288impl Default for LoggerConfig {
289    fn default() -> Self {
290        Self::new()
291    }
292}
293
294/// Overall application configuration
295/// Add observability, sinks, and logger configuration
296///
297/// Observability: OpenTelemetry configuration
298/// Sinks: Kafka, Webhook, File sink configuration
299/// Logger: Logger configuration
300///
301/// # Example
302/// ```
303/// use rustfs_obs::AppConfig;
304///
305/// let config = AppConfig::new_with_endpoint(None);
306/// ```
307#[derive(Debug, Deserialize, Clone)]
308pub struct AppConfig {
309    pub observability: OtelConfig,
310    pub sinks: Vec<SinkConfig>,
311    pub logger: Option<LoggerConfig>,
312}
313
314impl AppConfig {
315    /// Create a new instance of AppConfig with default values
316    ///
317    /// # Returns
318    /// A new instance of AppConfig
319    pub fn new() -> Self {
320        Self {
321            observability: OtelConfig::default(),
322            sinks: vec![SinkConfig::default()],
323            logger: Some(LoggerConfig::default()),
324        }
325    }
326
327    pub fn new_with_endpoint(endpoint: Option<String>) -> Self {
328        Self {
329            observability: OtelConfig::extract_otel_config_from_env(endpoint),
330            sinks: vec![SinkConfig::new()],
331            logger: Some(LoggerConfig::new()),
332        }
333    }
334}
335
336// implement default for AppConfig
337impl Default for AppConfig {
338    fn default() -> Self {
339        Self::new()
340    }
341}