rustfs_obs/
logger.rs

1// Copyright 2024 RustFS Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use crate::sinks::Sink;
16use crate::{
17    AppConfig, AuditLogEntry, BaseLogEntry, ConsoleLogEntry, GlobalError, OtelConfig, ServerLogEntry, UnifiedLogEntry, sinks,
18};
19use rustfs_config::{APP_NAME, ENVIRONMENT, SERVICE_VERSION};
20use std::sync::Arc;
21use std::time::SystemTime;
22use tokio::sync::mpsc::{self, Receiver, Sender};
23use tokio::sync::{Mutex, OnceCell};
24use tracing_core::Level;
25
26// Add the global instance at the module level
27static GLOBAL_LOGGER: OnceCell<Arc<Mutex<Logger>>> = OnceCell::const_new();
28
29/// Server log processor
30#[derive(Debug)]
31pub struct Logger {
32    sender: Sender<UnifiedLogEntry>, // Log sending channel
33    queue_capacity: usize,
34}
35
36impl Logger {
37    /// Create a new Logger instance
38    /// Returns Logger and corresponding Receiver
39    pub fn new(config: &AppConfig) -> (Self, Receiver<UnifiedLogEntry>) {
40        // Get queue capacity from configuration, or use default values 10000
41        let queue_capacity = config.logger.as_ref().and_then(|l| l.queue_capacity).unwrap_or(10000);
42        let (sender, receiver) = mpsc::channel(queue_capacity);
43        (Logger { sender, queue_capacity }, receiver)
44    }
45
46    /// get the queue capacity
47    /// This function returns the queue capacity.
48    /// # Returns
49    /// The queue capacity
50    /// # Example
51    /// ```
52    /// use rustfs_obs::Logger;
53    /// async fn example(logger: &Logger) {
54    ///    let _ = logger.get_queue_capacity();
55    /// }
56    /// ```
57    pub fn get_queue_capacity(&self) -> usize {
58        self.queue_capacity
59    }
60
61    /// Log a server entry
62    #[tracing::instrument(skip(self), fields(log_source = "logger_server"))]
63    pub async fn log_server_entry(&self, entry: ServerLogEntry) -> Result<(), GlobalError> {
64        self.log_entry(UnifiedLogEntry::Server(entry)).await
65    }
66
67    /// Log an audit entry
68    #[tracing::instrument(skip(self), fields(log_source = "logger_audit"))]
69    pub async fn log_audit_entry(&self, entry: AuditLogEntry) -> Result<(), GlobalError> {
70        self.log_entry(UnifiedLogEntry::Audit(Box::new(entry))).await
71    }
72
73    /// Log a console entry
74    #[tracing::instrument(skip(self), fields(log_source = "logger_console"))]
75    pub async fn log_console_entry(&self, entry: ConsoleLogEntry) -> Result<(), GlobalError> {
76        self.log_entry(UnifiedLogEntry::Console(entry)).await
77    }
78
79    /// Asynchronous logging of unified log entries
80    #[tracing::instrument(skip(self), fields(log_source = "logger"))]
81    #[tracing::instrument(level = "error", skip_all)]
82    pub async fn log_entry(&self, entry: UnifiedLogEntry) -> Result<(), GlobalError> {
83        // Extract information for tracing based on entry type
84        match &entry {
85            UnifiedLogEntry::Server(server) => {
86                tracing::Span::current()
87                    .record("log_level", server.level.0.as_str())
88                    .record("log_message", server.base.message.as_deref().unwrap_or("log message not set"))
89                    .record("source", &server.source);
90
91                // Generate tracing event based on log level
92                match server.level.0 {
93                    Level::ERROR => {
94                        tracing::error!(target: "server_logs", message = %server.base.message.as_deref().unwrap_or(""));
95                    }
96                    Level::WARN => {
97                        tracing::warn!(target: "server_logs", message = %server.base.message.as_deref().unwrap_or(""));
98                    }
99                    Level::INFO => {
100                        tracing::info!(target: "server_logs", message = %server.base.message.as_deref().unwrap_or(""));
101                    }
102                    Level::DEBUG => {
103                        tracing::debug!(target: "server_logs", message = %server.base.message.as_deref().unwrap_or(""));
104                    }
105                    Level::TRACE => {
106                        tracing::trace!(target: "server_logs", message = %server.base.message.as_deref().unwrap_or(""));
107                    }
108                }
109            }
110            UnifiedLogEntry::Audit(audit) => {
111                tracing::info!(
112                    target: "audit_logs",
113                    event = %audit.event,
114                    api = %audit.api.name.as_deref().unwrap_or("unknown"),
115                    message = %audit.base.message.as_deref().unwrap_or("")
116                );
117            }
118            UnifiedLogEntry::Console(console) => {
119                let level_str = match console.level {
120                    crate::LogKind::Info => "INFO",
121                    crate::LogKind::Warning => "WARN",
122                    crate::LogKind::Error => "ERROR",
123                    crate::LogKind::Fatal => "FATAL",
124                };
125
126                tracing::info!(
127                    target: "console_logs",
128                    level = %level_str,
129                    node = %console.node_name,
130                    message = %console.console_msg
131                );
132            }
133        }
134
135        // Send logs to async queue with improved error handling
136        match self.sender.try_send(entry) {
137            Ok(_) => Ok(()),
138            Err(mpsc::error::TrySendError::Full(entry)) => {
139                // Processing strategy when queue is full
140                tracing::warn!("Log queue full, applying backpressure");
141                match tokio::time::timeout(std::time::Duration::from_millis(500), self.sender.send(entry)).await {
142                    Ok(Ok(_)) => Ok(()),
143                    Ok(Err(_)) => Err(GlobalError::SendFailed("Channel closed")),
144                    Err(_) => Err(GlobalError::Timeout("Queue backpressure timeout")),
145                }
146            }
147            Err(mpsc::error::TrySendError::Closed(_)) => Err(GlobalError::SendFailed("Logger channel closed")),
148        }
149    }
150
151    /// Write log with context information
152    /// This function writes log messages with context information.
153    ///
154    /// # Parameters
155    /// - `message`: Message to be logged
156    /// - `source`: Source of the log
157    /// - `request_id`: Request ID
158    /// - `user_id`: User ID
159    /// - `fields`: Additional fields
160    ///
161    /// # Returns
162    /// Result indicating whether the operation was successful
163    ///
164    /// # Example
165    /// ```
166    /// use tracing_core::Level;
167    /// use rustfs_obs::Logger;
168    ///
169    /// async fn example(logger: &Logger) {
170    ///    let _ = logger.write_with_context("This is an information message", "example",Level::INFO, Some("req-12345".to_string()), Some("user-6789".to_string()), vec![("endpoint".to_string(), "/api/v1/data".to_string())]).await;
171    /// }
172    pub async fn write_with_context(
173        &self,
174        message: &str,
175        source: &str,
176        level: Level,
177        request_id: Option<String>,
178        user_id: Option<String>,
179        fields: Vec<(String, String)>,
180    ) -> Result<(), GlobalError> {
181        let base = BaseLogEntry::new().message(Some(message.to_string())).request_id(request_id);
182
183        let server_entry = ServerLogEntry::new(level, source.to_string())
184            .user_id(user_id)
185            .fields(fields)
186            .with_base(base);
187
188        self.log_server_entry(server_entry).await
189    }
190
191    /// Write log
192    /// This function writes log messages.
193    /// # Parameters
194    /// - `message`: Message to be logged
195    /// - `source`: Source of the log
196    /// - `level`: Log level
197    ///
198    /// # Returns
199    /// Result indicating whether the operation was successful
200    ///
201    /// # Example
202    /// ```
203    /// use rustfs_obs::Logger;
204    /// use tracing_core::Level;
205    ///
206    /// async fn example(logger: &Logger) {
207    ///   let _ = logger.write("This is an information message", "example", Level::INFO).await;
208    /// }
209    /// ```
210    pub async fn write(&self, message: &str, source: &str, level: Level) -> Result<(), GlobalError> {
211        self.write_with_context(message, source, level, None, None, Vec::new()).await
212    }
213
214    /// Shutdown the logger
215    /// This function shuts down the logger.
216    ///
217    /// # Returns
218    /// Result indicating whether the operation was successful
219    ///
220    /// # Example
221    /// ```
222    /// use rustfs_obs::Logger;
223    ///
224    /// async fn example(logger: Logger) {
225    ///  let _ = logger.shutdown().await;
226    /// }
227    /// ```
228    pub async fn shutdown(self) -> Result<(), GlobalError> {
229        drop(self.sender); //Close the sending end so that the receiver knows that there is no new message
230        Ok(())
231    }
232}
233
234/// Start the log module
235/// This function starts the log module.
236/// It initializes the logger and starts the worker to process logs.
237/// # Parameters
238/// - `config`: Configuration information
239/// - `sinks`: A vector of Sink instances
240/// # Returns
241/// The global logger instance
242/// # Example
243/// ```no_run
244/// use rustfs_obs::{AppConfig, start_logger};
245///
246/// let config = AppConfig::default();
247/// let sinks = vec![];
248/// let logger = start_logger(&config, sinks);
249/// ```
250pub fn start_logger(config: &AppConfig, sinks: Vec<Arc<dyn Sink>>) -> Logger {
251    let (logger, receiver) = Logger::new(config);
252    tokio::spawn(crate::worker::start_worker(receiver, sinks));
253    logger
254}
255
256/// Initialize the global logger instance
257/// This function initializes the global logger instance and returns a reference to it.
258/// If the logger has been initialized before, it will return the existing logger instance.
259///
260/// # Parameters
261/// - `config`: Configuration information
262/// - `sinks`: A vector of Sink instances
263///
264/// # Returns
265/// A reference to the global logger instance
266///
267/// # Example
268/// ```
269/// use rustfs_obs::{AppConfig,init_global_logger};
270///
271/// let config = AppConfig::default();
272/// let logger = init_global_logger(&config);
273/// ```
274pub async fn init_global_logger(config: &AppConfig) -> Arc<Mutex<Logger>> {
275    let sinks = sinks::create_sinks(config).await;
276    let logger = Arc::new(Mutex::new(start_logger(config, sinks)));
277    GLOBAL_LOGGER.set(logger.clone()).expect("Logger already initialized");
278    logger
279}
280
281/// Get the global logger instance
282///
283/// This function returns a reference to the global logger instance.
284///
285/// # Returns
286/// A reference to the global logger instance
287///
288/// # Example
289/// ```no_run
290/// use rustfs_obs::get_global_logger;
291///
292/// let logger = get_global_logger();
293/// ```
294pub fn get_global_logger() -> &'static Arc<Mutex<Logger>> {
295    GLOBAL_LOGGER.get().expect("Logger not initialized")
296}
297
298/// Log information
299/// This function logs information messages.
300///
301/// # Parameters
302/// - `message`: Message to be logged
303/// - `source`: Source of the log
304///
305/// # Returns
306/// Result indicating whether the operation was successful
307///
308/// # Example
309/// ```no_run
310/// use rustfs_obs::log_info;
311///
312/// async fn example() {
313///    let _ = log_info("This is an information message", "example").await;
314/// }
315/// ```
316pub async fn log_info(message: &str, source: &str) -> Result<(), GlobalError> {
317    get_global_logger().lock().await.write(message, source, Level::INFO).await
318}
319
320/// Log error
321/// This function logs error messages.
322/// # Parameters
323/// - `message`: Message to be logged
324/// - `source`: Source of the log
325/// # Returns
326/// Result indicating whether the operation was successful
327/// # Example
328/// ```no_run
329/// use rustfs_obs::log_error;
330///
331/// async fn example() {
332///     let _ = log_error("This is an error message", "example").await;
333/// }
334pub async fn log_error(message: &str, source: &str) -> Result<(), GlobalError> {
335    get_global_logger().lock().await.write(message, source, Level::ERROR).await
336}
337
338/// Log warning
339/// This function logs warning messages.
340/// # Parameters
341/// - `message`: Message to be logged
342/// - `source`: Source of the log
343/// # Returns
344/// Result indicating whether the operation was successful
345///
346/// # Example
347/// ```no_run
348/// use rustfs_obs::log_warn;
349///
350/// async fn example() {
351///     let _ = log_warn("This is a warning message", "example").await;
352/// }
353/// ```
354pub async fn log_warn(message: &str, source: &str) -> Result<(), GlobalError> {
355    get_global_logger().lock().await.write(message, source, Level::WARN).await
356}
357
358/// Log debug
359/// This function logs debug messages.
360/// # Parameters
361/// - `message`: Message to be logged
362/// - `source`: Source of the log
363/// # Returns
364/// Result indicating whether the operation was successful
365///
366/// # Example
367/// ```no_run
368/// use rustfs_obs::log_debug;
369///
370/// async fn example() {
371///     let _ = log_debug("This is a debug message", "example").await;
372/// }
373/// ```
374pub async fn log_debug(message: &str, source: &str) -> Result<(), GlobalError> {
375    get_global_logger().lock().await.write(message, source, Level::DEBUG).await
376}
377
378/// Log trace
379/// This function logs trace messages.
380/// # Parameters
381/// - `message`: Message to be logged
382/// - `source`: Source of the log
383///
384/// # Returns
385/// Result indicating whether the operation was successful
386///
387/// # Example
388/// ```no_run
389/// use rustfs_obs::log_trace;
390///
391/// async fn example() {
392///    let _ = log_trace("This is a trace message", "example").await;
393/// }
394/// ```
395pub async fn log_trace(message: &str, source: &str) -> Result<(), GlobalError> {
396    get_global_logger().lock().await.write(message, source, Level::TRACE).await
397}
398
399/// Log with context information
400/// This function logs messages with context information.
401/// # Parameters
402/// - `message`: Message to be logged
403/// - `source`: Source of the log
404/// - `level`: Log level
405/// - `request_id`: Request ID
406/// - `user_id`: User ID
407/// - `fields`: Additional fields
408/// # Returns
409/// Result indicating whether the operation was successful
410/// # Example
411/// ```no_run
412/// use tracing_core::Level;
413/// use rustfs_obs::log_with_context;
414///
415/// async fn example() {
416///    let _ = log_with_context("This is an information message", "example", Level::INFO, Some("req-12345".to_string()), Some("user-6789".to_string()), vec![("endpoint".to_string(), "/api/v1/data".to_string())]).await;
417/// }
418/// ```
419pub async fn log_with_context(
420    message: &str,
421    source: &str,
422    level: Level,
423    request_id: Option<String>,
424    user_id: Option<String>,
425    fields: Vec<(String, String)>,
426) -> Result<(), GlobalError> {
427    get_global_logger()
428        .lock()
429        .await
430        .write_with_context(message, source, level, request_id, user_id, fields)
431        .await
432}
433
434/// Log initialization status
435#[derive(Debug)]
436pub(crate) struct InitLogStatus {
437    pub timestamp: SystemTime,
438    pub service_name: String,
439    pub version: String,
440    pub environment: String,
441}
442
443impl Default for InitLogStatus {
444    fn default() -> Self {
445        Self {
446            timestamp: SystemTime::now(),
447            service_name: String::from(APP_NAME),
448            version: SERVICE_VERSION.to_string(),
449            environment: ENVIRONMENT.to_string(),
450        }
451    }
452}
453
454impl InitLogStatus {
455    pub fn new_config(config: &OtelConfig) -> Self {
456        let config = config.clone();
457        let environment = config.environment.unwrap_or(ENVIRONMENT.to_string());
458        let version = config.service_version.unwrap_or(SERVICE_VERSION.to_string());
459        Self {
460            timestamp: SystemTime::now(),
461            service_name: String::from(APP_NAME),
462            version,
463            environment,
464        }
465    }
466
467    pub async fn init_start_log(config: &OtelConfig) -> Result<(), GlobalError> {
468        let status = Self::new_config(config);
469        log_init_state(Some(status)).await
470    }
471}
472
473/// Log initialization details during system startup
474async fn log_init_state(status: Option<InitLogStatus>) -> Result<(), GlobalError> {
475    let status = status.unwrap_or_default();
476
477    let base_entry = BaseLogEntry::new()
478        .timestamp(chrono::DateTime::from(status.timestamp))
479        .message(Some(format!(
480            "Service initialization started - {} v{} in {}",
481            status.service_name, status.version, status.environment
482        )))
483        .request_id(Some("system_init".to_string()));
484
485    let server_entry = ServerLogEntry::new(Level::INFO, "system_initialization".to_string())
486        .with_base(base_entry)
487        .user_id(Some("system".to_string()));
488
489    get_global_logger().lock().await.log_server_entry(server_entry).await?;
490    Ok(())
491}