use crate::config::OutputFormat;
use crate::core::event::QuantumLogEvent;
use crate::sinks::traits::{QuantumSink, SinkError, SinkResult, StackableSink};
use async_trait::async_trait;
use serde_json;
use std::io::{self, Write};
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::Arc;
use tokio::sync::Mutex;
use tracing::Level;
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum StdoutTarget {
Stdout,
Stderr,
}
impl Default for StdoutTarget {
fn default() -> Self {
Self::Stdout
}
}
#[derive(Debug, Clone)]
pub struct DefaultStdoutConfig {
pub format: OutputFormat,
pub colored: bool,
pub target: StdoutTarget,
pub min_level: Option<Level>,
pub include_timestamp: bool,
pub include_thread: bool,
pub include_module: bool,
pub prefix: Option<String>,
}
impl Default for DefaultStdoutConfig {
fn default() -> Self {
Self {
format: OutputFormat::Text,
colored: true,
target: StdoutTarget::Stdout,
min_level: None,
include_timestamp: true,
include_thread: false,
include_module: true,
prefix: None,
}
}
}
#[derive(Debug)]
pub struct DefaultStdoutSink {
config: DefaultStdoutConfig,
output_lock: Arc<Mutex<()>>,
is_closed: AtomicBool,
event_count: AtomicU64,
error_count: AtomicU64,
}
impl DefaultStdoutSink {
pub async fn new(config: DefaultStdoutConfig) -> SinkResult<Self> {
Ok(Self {
config,
output_lock: Arc::new(Mutex::new(())),
is_closed: AtomicBool::new(false),
event_count: AtomicU64::new(0),
error_count: AtomicU64::new(0),
})
}
pub async fn with_default_config() -> SinkResult<Self> {
Self::new(DefaultStdoutConfig::default()).await
}
fn should_filter_event(&self, event: &QuantumLogEvent) -> bool {
if let Some(min_level) = &self.config.min_level {
match (event.level.as_str(), min_level) {
("ERROR", _) => false,
("WARN", &Level::ERROR) => true,
("WARN", _) => false,
("INFO", &Level::ERROR | &Level::WARN) => true,
("INFO", _) => false,
("DEBUG", &Level::ERROR | &Level::WARN | &Level::INFO) => true,
("DEBUG", _) => false,
("TRACE", &Level::TRACE) => false,
("TRACE", _) => true,
_ => false,
}
} else {
false
}
}
fn format_as_text(&self, event: &QuantumLogEvent) -> String {
let mut output = String::new();
if let Some(prefix) = &self.config.prefix {
output.push_str(prefix);
output.push(' ');
}
if self.config.include_timestamp {
output.push_str(&format!(
"[{}] ",
event.timestamp.format("%Y-%m-%d %H:%M:%S%.3f")
));
}
if self.config.colored {
let colored_level = match event.level.as_str() {
"ERROR" => "\x1b[31mERROR\x1b[0m", "WARN" => "\x1b[33mWARN\x1b[0m", "INFO" => "\x1b[32mINFO\x1b[0m", "DEBUG" => "\x1b[36mDEBUG\x1b[0m", "TRACE" => "\x1b[37mTRACE\x1b[0m", _ => &event.level,
};
output.push_str(&format!("[{}] ", colored_level));
} else {
output.push_str(&format!("[{}] ", event.level));
}
if self.config.include_module {
if let Some(module) = &event.module_path {
output.push_str(&format!("[{}] ", module));
}
}
if self.config.include_thread {
if let Some(thread_name) = &event.thread_name {
output.push_str(&format!("[{}] ", thread_name));
} else {
output.push_str(&format!("[{}] ", event.context.tid));
}
}
output.push_str(&event.message);
if !event.fields.is_empty() {
output.push_str(" {");
for (i, (key, value)) in event.fields.iter().enumerate() {
if i > 0 {
output.push_str(", ");
}
let value_str = match value {
serde_json::Value::String(s) => s.clone(),
_ => value.to_string(),
};
output.push_str(&format!("{}: {}", key, value_str));
}
output.push('}');
}
output
}
fn format_as_json(&self, event: &QuantumLogEvent) -> SinkResult<String> {
let mut json_event = serde_json::Map::new();
json_event.insert(
"timestamp".to_string(),
serde_json::Value::String(event.timestamp.to_rfc3339()),
);
json_event.insert(
"level".to_string(),
serde_json::Value::String(event.level.clone()),
);
json_event.insert(
"message".to_string(),
serde_json::Value::String(event.message.clone()),
);
if let Some(module) = &event.module_path {
json_event.insert(
"module".to_string(),
serde_json::Value::String(module.clone()),
);
}
if let Some(file) = &event.file {
json_event.insert("file".to_string(), serde_json::Value::String(file.clone()));
}
if let Some(line) = event.line {
json_event.insert(
"line".to_string(),
serde_json::Value::Number(serde_json::Number::from(line)),
);
}
json_event.insert(
"thread_id".to_string(),
serde_json::Value::String(event.context.tid.to_string()),
);
if let Some(prefix) = &self.config.prefix {
json_event.insert(
"prefix".to_string(),
serde_json::Value::String(prefix.clone()),
);
}
if !event.fields.is_empty() {
let mut fields_map = serde_json::Map::new();
for (key, value) in &event.fields {
fields_map.insert(key.clone(), value.clone());
}
json_event.insert("fields".to_string(), serde_json::Value::Object(fields_map));
}
serde_json::to_string(&json_event).map_err(|e| SinkError::Serialization(e.to_string()))
}
fn format_as_csv(&self, event: &QuantumLogEvent) -> SinkResult<String> {
let mut csv_fields = vec![
self.escape_csv_field(&event.timestamp.to_rfc3339()),
self.escape_csv_field(&event.level),
self.escape_csv_field(&event.message),
self.escape_csv_field(event.module_path.as_deref().unwrap_or("")),
self.escape_csv_field(event.file.as_deref().unwrap_or("")),
self.escape_csv_field(event.line.map(|l| l.to_string()).as_deref().unwrap_or("")),
self.escape_csv_field(&event.context.tid.to_string()),
self.escape_csv_field(""), self.escape_csv_field(self.config.prefix.as_deref().unwrap_or("")),
];
let fields_json = if event.fields.is_empty() {
String::new()
} else {
serde_json::to_string(&event.fields)
.map_err(|e| SinkError::Serialization(e.to_string()))?
};
csv_fields.push(self.escape_csv_field(&fields_json));
Ok(csv_fields.join(","))
}
fn escape_csv_field(&self, field: &str) -> String {
if field.contains(',') || field.contains('"') || field.contains('\n') {
format!("\"{}\"", field.replace('"', "\"\""))
} else {
field.to_string()
}
}
async fn write_output(&self, content: &str) -> SinkResult<()> {
let _lock = self.output_lock.lock().await;
match self.config.target {
StdoutTarget::Stdout => {
print!("{}", content);
io::stdout().flush().map_err(SinkError::Io)?
}
StdoutTarget::Stderr => {
eprint!("{}", content);
io::stderr().flush().map_err(SinkError::Io)?
}
}
Ok(())
}
}
#[async_trait]
impl QuantumSink for DefaultStdoutSink {
type Config = DefaultStdoutConfig;
type Error = SinkError;
async fn send_event(&self, event: QuantumLogEvent) -> Result<(), Self::Error> {
if self.is_closed.load(Ordering::Relaxed) {
return Err(SinkError::Closed);
}
if self.should_filter_event(&event) {
return Ok(());
}
let formatted = match self.config.format {
OutputFormat::Text => {
format!("{}\n", self.format_as_text(&event))
}
OutputFormat::Json => {
format!("{}\n", self.format_as_json(&event)?)
}
OutputFormat::Csv => {
format!("{}\n", self.format_as_csv(&event)?)
}
};
match self.write_output(&formatted).await {
Ok(()) => {
self.event_count.fetch_add(1, Ordering::Relaxed);
Ok(())
}
Err(e) => {
self.error_count.fetch_add(1, Ordering::Relaxed);
Err(e)
}
}
}
async fn shutdown(&self) -> Result<(), Self::Error> {
self.is_closed.store(true, Ordering::Relaxed);
let _lock = self.output_lock.lock().await;
match self.config.target {
StdoutTarget::Stdout => io::stdout().flush().map_err(SinkError::Io)?,
StdoutTarget::Stderr => io::stderr().flush().map_err(SinkError::Io)?,
}
Ok(())
}
async fn is_healthy(&self) -> bool {
!self.is_closed.load(Ordering::Relaxed)
}
fn name(&self) -> &'static str {
"default_stdout"
}
fn stats(&self) -> String {
format!(
"DefaultStdoutSink: events={}, errors={}, target={:?}, format={:?}",
self.event_count.load(Ordering::Relaxed),
self.error_count.load(Ordering::Relaxed),
self.config.target,
self.config.format
)
}
fn metadata(&self) -> crate::sinks::traits::SinkMetadata {
crate::sinks::traits::SinkMetadata {
name: "default_stdout".to_string(),
sink_type: crate::sinks::traits::SinkType::Stackable,
enabled: !self.is_closed.load(std::sync::atomic::Ordering::Relaxed),
description: Some(format!(
"Default stdout sink targeting {:?} with {:?} format",
self.config.target, self.config.format
)),
}
}
}
#[async_trait]
impl StackableSink for DefaultStdoutSink {
async fn send_event_internal(
&self,
event: &QuantumLogEvent,
strategy: crate::config::BackpressureStrategy,
) -> SinkResult<()> {
use crate::diagnostics::get_diagnostics_instance;
use tokio::time::{timeout, Duration};
if self.should_filter_event(event) {
return Ok(());
}
let diagnostics = get_diagnostics_instance();
let content = match self.config.format {
OutputFormat::Text => self.format_as_text(event),
OutputFormat::Json => self.format_as_json(event)?,
OutputFormat::Csv => self.format_as_csv(event)?,
};
let write_fut = self.write_output(&content);
let result = match strategy {
crate::config::BackpressureStrategy::Block => write_fut.await,
crate::config::BackpressureStrategy::Drop => {
match timeout(Duration::from_millis(100), write_fut).await {
Ok(res) => res,
Err(_) => return Err(SinkError::Backpressure),
}
}
};
match result {
Ok(()) => {
if let Some(d) = diagnostics {
d.increment_stdout_writes();
}
Ok(())
}
Err(e) => Err(e),
}
}
}
pub async fn create_default_stdout_sink() -> SinkResult<DefaultStdoutSink> {
DefaultStdoutSink::with_default_config().await
}
pub async fn create_json_stdout_sink() -> SinkResult<DefaultStdoutSink> {
let config = DefaultStdoutConfig {
format: OutputFormat::Json,
..Default::default()
};
DefaultStdoutSink::new(config).await
}
pub async fn create_plain_stdout_sink() -> SinkResult<DefaultStdoutSink> {
let config = DefaultStdoutConfig {
colored: false,
..Default::default()
};
DefaultStdoutSink::new(config).await
}
pub async fn create_stderr_sink() -> SinkResult<DefaultStdoutSink> {
let config = DefaultStdoutConfig {
target: StdoutTarget::Stderr,
..Default::default()
};
DefaultStdoutSink::new(config).await
}
#[cfg(test)]
mod tests {
use super::*;
use crate::core::event::QuantumLogEvent;
use chrono::Utc;
use std::collections::HashMap;
use tokio::time::{sleep, Duration};
use tracing::Level;
fn create_test_event(level: &str, message: &str) -> QuantumLogEvent {
QuantumLogEvent {
timestamp: Utc::now(),
level: level.to_string(),
target: "test".to_string(),
message: message.to_string(),
module_path: Some("test::module".to_string()),
file: Some("test.rs".to_string()),
line: Some(42),
thread_name: Some("test-thread".to_string()),
thread_id: "test-thread-id".to_string(),
fields: HashMap::new(),
context: crate::core::event::ContextInfo {
pid: std::process::id(),
tid: 12345,
username: None,
hostname: None,
mpi_rank: None,
custom_fields: std::collections::HashMap::new(),
},
}
}
#[tokio::test]
async fn test_default_stdout_sink_creation() {
let sink = DefaultStdoutSink::with_default_config().await;
assert!(sink.is_ok());
let sink = sink.unwrap();
assert!(matches!(sink.config.format, OutputFormat::Text));
assert!(sink.config.colored);
assert!(matches!(sink.config.target, StdoutTarget::Stdout));
}
#[tokio::test]
async fn test_custom_config_creation() {
let config = DefaultStdoutConfig {
format: OutputFormat::Json,
colored: false,
target: StdoutTarget::Stderr,
min_level: Some(Level::WARN),
include_timestamp: false,
include_thread: true,
include_module: false,
prefix: Some("TEST".to_string()),
};
let sink = DefaultStdoutSink::new(config).await;
assert!(sink.is_ok());
let sink = sink.unwrap();
assert!(matches!(sink.config.format, OutputFormat::Json));
assert!(!sink.config.colored);
assert!(matches!(sink.config.target, StdoutTarget::Stderr));
assert!(sink.config.min_level.is_some());
}
#[tokio::test]
async fn test_send_event_basic() {
let sink = DefaultStdoutSink::with_default_config().await.unwrap();
let event = create_test_event("INFO", "Test message");
let result = sink.send_event(event).await;
assert!(result.is_ok());
assert_eq!(sink.event_count.load(Ordering::Relaxed), 1);
assert_eq!(sink.error_count.load(Ordering::Relaxed), 0);
}
#[tokio::test]
async fn test_level_filtering() {
let config = DefaultStdoutConfig {
min_level: Some(Level::WARN),
..Default::default()
};
let sink = DefaultStdoutSink::new(config).await.unwrap();
let info_event = create_test_event("INFO", "Info message");
let result = sink.send_event(info_event).await;
assert!(result.is_ok());
assert_eq!(sink.event_count.load(Ordering::Relaxed), 0);
let error_event = create_test_event("ERROR", "Error message");
let result = sink.send_event(error_event).await;
assert!(result.is_ok());
assert_eq!(sink.event_count.load(Ordering::Relaxed), 1);
}
#[tokio::test]
async fn test_format_as_text() {
let config = DefaultStdoutConfig {
colored: false,
include_timestamp: true,
include_module: true,
include_thread: true,
prefix: Some("TEST".to_string()),
..Default::default()
};
let sink = DefaultStdoutSink::new(config).await.unwrap();
let mut event = create_test_event("INFO", "Test message");
event.fields.insert(
"key1".to_string(),
serde_json::Value::String("value1".to_string()),
);
event.fields.insert(
"key2".to_string(),
serde_json::Value::String("value2".to_string()),
);
let formatted = sink.format_as_text(&event);
assert!(formatted.contains("TEST"));
assert!(formatted.contains("[INFO]"));
assert!(formatted.contains("Test message"));
assert!(formatted.contains("test::module"));
assert!(formatted.contains("test-thread"));
assert!(formatted.contains("key1: value1"));
assert!(formatted.contains("key2: value2"));
}
#[tokio::test]
async fn test_format_as_json() {
let config = DefaultStdoutConfig {
format: OutputFormat::Json,
prefix: Some("TEST".to_string()),
..Default::default()
};
let sink = DefaultStdoutSink::new(config).await.unwrap();
let mut event = create_test_event("INFO", "Test message");
event.fields.insert(
"key1".to_string(),
serde_json::Value::String("value1".to_string()),
);
let formatted = sink.format_as_json(&event);
assert!(formatted.is_ok());
let json_str = formatted.unwrap();
assert!(json_str.contains("\"level\":\"INFO\""));
assert!(json_str.contains("\"message\":\"Test message\""));
assert!(json_str.contains("\"module\":\"test::module\""));
assert!(json_str.contains("\"prefix\":\"TEST\""));
assert!(json_str.contains("\"fields\""));
assert!(json_str.contains("\"key1\":\"value1\""));
}
#[tokio::test]
async fn test_colored_output() {
let config = DefaultStdoutConfig {
colored: true,
..Default::default()
};
let sink = DefaultStdoutSink::new(config).await.unwrap();
let levels = vec!["ERROR", "WARN", "INFO", "DEBUG", "TRACE"];
for level in levels {
let event = create_test_event(level, "Test message");
let formatted = sink.format_as_text(&event);
assert!(formatted.contains("\x1b["));
assert!(formatted.contains("\x1b[0m"));
}
}
#[tokio::test]
async fn test_shutdown() {
let sink = DefaultStdoutSink::with_default_config().await.unwrap();
assert!(sink.is_healthy().await);
let result = sink.shutdown().await;
assert!(result.is_ok());
assert!(!sink.is_healthy().await);
let event = create_test_event("INFO", "Test message");
let result = sink.send_event(event).await;
assert!(result.is_err());
assert!(matches!(result.unwrap_err(), SinkError::Closed));
}
#[tokio::test]
async fn test_quantum_sink_trait() {
let sink = DefaultStdoutSink::with_default_config().await.unwrap();
assert_eq!(sink.name(), "default_stdout");
assert!(sink.is_healthy().await);
let stats = sink.stats();
assert!(stats.contains("DefaultStdoutSink"));
assert!(stats.contains("events=0"));
assert!(stats.contains("errors=0"));
}
#[tokio::test]
async fn test_stackable_sink_trait() {
let sink = DefaultStdoutSink::with_default_config().await.unwrap();
let _metadata = sink.metadata();
assert!(_metadata.enabled);
}
#[tokio::test]
async fn test_concurrent_access() {
let sink = std::sync::Arc::new(DefaultStdoutSink::with_default_config().await.unwrap());
let mut handles = vec![];
for i in 0..10 {
let sink_clone = std::sync::Arc::clone(&sink);
let handle = tokio::spawn(async move {
let event = create_test_event("INFO", &format!("Message {}", i));
sink_clone.send_event(event).await
});
handles.push(handle);
}
for handle in handles {
let result = handle.await.unwrap();
assert!(result.is_ok());
}
assert_eq!(sink.event_count.load(Ordering::Relaxed), 10);
}
#[tokio::test]
async fn test_convenience_functions() {
let default_sink = create_default_stdout_sink().await;
assert!(default_sink.is_ok());
let json_sink = create_json_stdout_sink().await;
assert!(json_sink.is_ok());
let json_sink = json_sink.unwrap();
assert!(matches!(json_sink.config.format, OutputFormat::Json));
let plain_sink = create_plain_stdout_sink().await;
assert!(plain_sink.is_ok());
let plain_sink = plain_sink.unwrap();
assert!(!plain_sink.config.colored);
let stderr_sink = create_stderr_sink().await;
assert!(stderr_sink.is_ok());
let stderr_sink = stderr_sink.unwrap();
assert!(matches!(stderr_sink.config.target, StdoutTarget::Stderr));
}
#[tokio::test]
async fn test_should_filter_event() {
let config = DefaultStdoutConfig {
min_level: Some(Level::WARN),
..Default::default()
};
let sink = DefaultStdoutSink::new(config).await.unwrap();
let trace_event = create_test_event("TRACE", "Trace message");
let debug_event = create_test_event("DEBUG", "Debug message");
let info_event = create_test_event("INFO", "Info message");
let warn_event = create_test_event("WARN", "Warn message");
let error_event = create_test_event("ERROR", "Error message");
assert!(sink.should_filter_event(&trace_event));
assert!(sink.should_filter_event(&debug_event));
assert!(sink.should_filter_event(&info_event));
assert!(!sink.should_filter_event(&warn_event));
assert!(!sink.should_filter_event(&error_event));
}
#[tokio::test]
async fn test_stdout_target_default() {
let default_target = StdoutTarget::default();
assert!(matches!(default_target, StdoutTarget::Stdout));
}
#[tokio::test]
async fn test_config_default() {
let default_config = DefaultStdoutConfig::default();
assert!(matches!(default_config.format, OutputFormat::Text));
assert!(default_config.colored);
assert!(matches!(default_config.target, StdoutTarget::Stdout));
assert!(default_config.min_level.is_none());
assert!(default_config.include_timestamp);
assert!(!default_config.include_thread);
assert!(default_config.include_module);
assert!(default_config.prefix.is_none());
}
}