use crate::config::StdoutConfig;
use crate::core::event::QuantumLogEvent;
use crate::error::{QuantumLogError, Result};
use crate::sinks::traits::{QuantumSink, SinkError, StackableSink};
use async_trait::async_trait;
use std::io::{self, Write};
use std::sync::{Arc, Mutex};
use tokio::sync::{mpsc, oneshot};
use tokio::task::JoinHandle;
use tracing::Level;
#[derive(Debug)]
enum SinkMessage {
Event(Box<QuantumLogEvent>),
Shutdown(oneshot::Sender<Result<()>>),
}
#[derive(Debug)]
pub struct ConsoleSink {
config: StdoutConfig,
sender: Arc<Mutex<Option<mpsc::UnboundedSender<SinkMessage>>>>,
handle: Arc<Mutex<Option<JoinHandle<Result<()>>>>>,
}
impl ConsoleSink {
pub fn new(config: StdoutConfig) -> Self {
Self {
config,
sender: Arc::new(Mutex::new(None)),
handle: Arc::new(Mutex::new(None)),
}
}
pub fn with_level_filter(mut self, level: Level) -> Self {
self.config.level = Some(level.to_string().to_lowercase());
self
}
pub async fn start(&mut self) -> Result<()> {
if self.is_running() {
return Err(QuantumLogError::ConfigError(
"ConsoleSink is already running".to_string(),
));
}
let (sender, receiver) = mpsc::unbounded_channel();
let processor = ConsoleSinkProcessor::new(self.config.clone(), receiver)?;
let handle = tokio::spawn(async move { processor.run().await });
{
let mut sender_guard = self.sender.lock().unwrap();
*sender_guard = Some(sender);
}
{
let mut handle_guard = self.handle.lock().unwrap();
*handle_guard = Some(handle);
}
tracing::info!("ConsoleSink started");
Ok(())
}
pub async fn send_event_internal(&self, event: QuantumLogEvent) -> Result<()> {
let sender_opt = {
let sender_guard = self.sender.lock().unwrap();
sender_guard.as_ref().cloned()
};
if let Some(sender) = sender_opt {
sender
.send(SinkMessage::Event(Box::new(event)))
.map_err(|_| {
QuantumLogError::ChannelError("Failed to send event to ConsoleSink".to_string())
})?;
Ok(())
} else {
Err(QuantumLogError::ConfigError(
"ConsoleSink is not running".to_string(),
))
}
}
pub fn try_send_event(&self, event: QuantumLogEvent) -> Result<()> {
let sender_opt = {
let sender_guard = self.sender.lock().unwrap();
sender_guard.as_ref().cloned()
};
if let Some(sender) = sender_opt {
sender
.send(SinkMessage::Event(Box::new(event)))
.map_err(|_| {
QuantumLogError::ChannelError("Failed to send event to ConsoleSink".to_string())
})?;
Ok(())
} else {
Err(QuantumLogError::ConfigError(
"ConsoleSink is not running".to_string(),
))
}
}
pub async fn shutdown(&self) -> Result<()> {
let sender = {
let mut sender_guard = self.sender.lock().unwrap();
sender_guard.take()
};
if let Some(sender) = sender {
let (response_sender, response_receiver) = oneshot::channel();
sender
.send(SinkMessage::Shutdown(response_sender))
.map_err(|_| {
QuantumLogError::ChannelError("Failed to send shutdown signal".to_string())
})?;
let result = response_receiver.await.map_err(|_| {
QuantumLogError::ChannelError("Failed to receive shutdown response".to_string())
})?;
let handle = {
let mut handle_guard = self.handle.lock().unwrap();
handle_guard.take()
};
if let Some(handle) = handle {
let _ = handle.await;
}
tracing::info!("ConsoleSink shutdown completed");
result
} else {
Ok(())
}
}
pub fn is_running(&self) -> bool {
let sender_guard = self.sender.lock().unwrap();
let handle_guard = self.handle.lock().unwrap();
sender_guard.is_some() && handle_guard.is_some()
}
pub fn config(&self) -> &StdoutConfig {
&self.config
}
}
#[derive(Debug)]
struct ConsoleSinkProcessor {
config: StdoutConfig,
receiver: mpsc::UnboundedReceiver<SinkMessage>,
level_filter: Option<Level>,
}
impl ConsoleSinkProcessor {
fn new(config: StdoutConfig, receiver: mpsc::UnboundedReceiver<SinkMessage>) -> Result<Self> {
let level_filter = if let Some(ref level_str) = config.level {
Some(level_str.parse::<Level>().map_err(|_| {
QuantumLogError::ConfigError(format!("Invalid log level: {}", level_str))
})?)
} else {
None
};
Ok(Self {
config,
receiver,
level_filter,
})
}
async fn run(mut self) -> Result<()> {
while let Some(message) = self.receiver.recv().await {
match message {
SinkMessage::Event(event) => {
if let Err(e) = self.handle_event(*event) {
tracing::error!("Error handling event in ConsoleSink: {}", e);
}
}
SinkMessage::Shutdown(response) => {
let result = self.shutdown();
let _ = response.send(result);
break;
}
}
}
Ok(())
}
fn handle_event(&self, event: QuantumLogEvent) -> Result<()> {
if let Some(ref filter_level) = self.level_filter {
let event_level = event.level.parse::<Level>().map_err(|_| {
QuantumLogError::ConfigError(format!("Invalid log level: {}", event.level))
})?;
if event_level < *filter_level {
return Ok(());
}
}
let formatted = self.format_event(&event)?;
if self.should_use_stderr(&event) {
let mut stderr = io::stderr();
stderr
.write_all(formatted.as_bytes())
.map_err(|e| QuantumLogError::IoError { source: e })?;
stderr
.write_all(b"\n")
.map_err(|e| QuantumLogError::IoError { source: e })?;
stderr
.flush()
.map_err(|e| QuantumLogError::IoError { source: e })?;
} else {
let mut stdout = io::stdout();
stdout
.write_all(formatted.as_bytes())
.map_err(|e| QuantumLogError::IoError { source: e })?;
stdout
.write_all(b"\n")
.map_err(|e| QuantumLogError::IoError { source: e })?;
stdout
.flush()
.map_err(|e| QuantumLogError::IoError { source: e })?;
}
Ok(())
}
fn should_use_stderr(&self, event: &QuantumLogEvent) -> bool {
matches!(event.level.as_str(), "error" | "warn")
}
fn format_event(&self, event: &QuantumLogEvent) -> Result<String> {
match &self.config.format {
crate::config::OutputFormat::Text => {
if self.config.color_enabled.unwrap_or(true) {
Ok(self.format_colored_text(event))
} else {
Ok(event.to_formatted_string("full"))
}
}
crate::config::OutputFormat::Json => event
.to_json()
.map_err(|e| QuantumLogError::SerializationError { source: e }),
crate::config::OutputFormat::Csv => {
let csv_row = event.to_csv_row();
Ok(csv_row.join(","))
}
}
}
fn format_colored_text(&self, event: &QuantumLogEvent) -> String {
let level_color = match event.level.as_str() {
"error" => "\x1b[31m", "warn" => "\x1b[33m", "info" => "\x1b[32m", "debug" => "\x1b[36m", "trace" => "\x1b[37m", _ => "\x1b[0m", };
let reset = "\x1b[0m";
format!(
"{}{:>5}{} {} [{}] {}",
level_color,
event.level.to_uppercase(),
reset,
event.timestamp.format("%Y-%m-%d %H:%M:%S%.3f"),
event.target,
event.message
)
}
fn shutdown(&self) -> Result<()> {
let _ = io::stdout().flush();
let _ = io::stderr().flush();
tracing::info!("ConsoleSink shutdown completed");
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::config::OutputFormat;
use crate::core::event::ContextInfo;
use tracing::Level;
fn create_test_event(level: Level, message: &str) -> QuantumLogEvent {
static CALLSITE: tracing::callsite::DefaultCallsite =
tracing::callsite::DefaultCallsite::new(&tracing::Metadata::new(
"test",
"quantum_log::console::test",
Level::INFO,
Some(file!()),
Some(line!()),
Some(module_path!()),
tracing::field::FieldSet::new(&[], tracing::callsite::Identifier(&CALLSITE)),
tracing::metadata::Kind::EVENT,
));
let metadata = tracing::Metadata::new(
"test",
"test_target",
level,
Some("test.rs"),
Some(42),
Some("test_module"),
tracing::field::FieldSet::new(&[], tracing::callsite::Identifier(&CALLSITE)),
tracing::metadata::Kind::EVENT,
);
QuantumLogEvent::new(
level,
message.to_string(),
&metadata,
std::collections::HashMap::new(),
ContextInfo::default(),
)
}
#[tokio::test]
async fn test_console_sink_creation() {
let config = StdoutConfig {
enabled: true,
level: None,
colored: true,
format: OutputFormat::Text,
color_enabled: Some(true),
level_colors: None,
};
let sink = ConsoleSink::new(config);
assert!(!sink.is_running());
}
#[tokio::test]
async fn test_console_sink_start_stop() {
let config = StdoutConfig {
enabled: true,
level: None,
format: OutputFormat::Text,
color_enabled: Some(false),
level_colors: None,
colored: false,
};
let mut sink = ConsoleSink::new(config);
let result = sink.start().await;
assert!(result.is_ok());
assert!(sink.is_running());
let result = sink.shutdown().await;
assert!(result.is_ok());
assert!(!sink.is_running());
}
#[tokio::test]
async fn test_console_sink_send_event() {
let config = StdoutConfig {
enabled: true,
level: None,
format: OutputFormat::Text,
color_enabled: Some(false),
level_colors: None,
colored: false,
};
let mut sink = ConsoleSink::new(config);
sink.start().await.unwrap();
let event = create_test_event(Level::INFO, "Test console message");
let result = sink.send_event(event).await;
assert!(result.is_ok());
tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
sink.shutdown().await.unwrap();
}
#[tokio::test]
async fn test_console_sink_try_send_event() {
let config = StdoutConfig {
enabled: true,
level: None,
format: OutputFormat::Text,
color_enabled: Some(false),
level_colors: None,
colored: false,
};
let mut sink = ConsoleSink::new(config);
sink.start().await.unwrap();
let event = create_test_event(Level::INFO, "Test console message");
let result = sink.try_send_event(event);
assert!(result.is_ok());
tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
sink.shutdown().await.unwrap();
}
#[tokio::test]
async fn test_console_sink_level_filter() {
let config = StdoutConfig {
enabled: true,
level: Some("warn".to_string()),
colored: false,
format: OutputFormat::Text,
color_enabled: Some(false),
level_colors: None,
};
let mut sink = ConsoleSink::new(config);
sink.start().await.unwrap();
let debug_event = create_test_event(Level::DEBUG, "Debug message");
let result = sink.send_event(debug_event).await;
assert!(result.is_ok());
let error_event = create_test_event(Level::ERROR, "Error message");
let result = sink.send_event(error_event).await;
assert!(result.is_ok());
tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
sink.shutdown().await.unwrap();
}
#[tokio::test]
async fn test_console_sink_colored_output() {
let config = StdoutConfig {
enabled: true,
level: None,
colored: true,
format: OutputFormat::Text,
color_enabled: Some(true),
level_colors: None,
};
let mut sink = ConsoleSink::new(config);
sink.start().await.unwrap();
let events = vec![
create_test_event(Level::ERROR, "Error message"),
create_test_event(Level::WARN, "Warning message"),
create_test_event(Level::INFO, "Info message"),
create_test_event(Level::DEBUG, "Debug message"),
create_test_event(Level::TRACE, "Trace message"),
];
for event in events {
let result = sink.send_event(event).await;
assert!(result.is_ok());
}
tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
sink.shutdown().await.unwrap();
}
#[tokio::test]
async fn test_console_sink_json_format() {
let config = StdoutConfig {
enabled: true,
level: None,
colored: false,
format: OutputFormat::Json,
color_enabled: Some(false),
level_colors: None,
};
let mut sink = ConsoleSink::new(config);
sink.start().await.unwrap();
let event = create_test_event(Level::INFO, "Test JSON message");
let result = sink.send_event(event).await;
assert!(result.is_ok());
tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
sink.shutdown().await.unwrap();
}
#[test]
fn test_console_sink_config() {
let config = StdoutConfig {
enabled: true,
level: Some("info".to_string()),
colored: true,
format: OutputFormat::Json,
color_enabled: Some(true),
level_colors: None,
};
let sink = ConsoleSink::new(config.clone());
assert_eq!(sink.config().enabled, config.enabled);
assert_eq!(sink.config().level, config.level);
assert_eq!(sink.config().colored, config.colored);
assert_eq!(sink.config().format, config.format);
assert_eq!(sink.config().color_enabled, config.color_enabled);
}
}
#[async_trait]
impl QuantumSink for ConsoleSink {
type Config = StdoutConfig;
type Error = SinkError;
async fn send_event(&self, event: QuantumLogEvent) -> std::result::Result<(), Self::Error> {
<ConsoleSink as StackableSink>::send_event_internal(
self,
&event,
crate::config::BackpressureStrategy::Block,
)
.await
}
async fn shutdown(&self) -> std::result::Result<(), Self::Error> {
ConsoleSink::shutdown(self).await.map_err(|e| match e {
QuantumLogError::ChannelError(msg) => SinkError::Generic(msg),
QuantumLogError::ConfigError(msg) => SinkError::Config(msg),
QuantumLogError::IoError { source } => SinkError::Io(source),
_ => SinkError::Generic(e.to_string()),
})
}
async fn is_healthy(&self) -> bool {
self.is_running()
}
fn name(&self) -> &'static str {
"console"
}
fn stats(&self) -> String {
format!(
"ConsoleSink: running={}, config={:?}",
self.is_running(),
self.config
)
}
fn metadata(&self) -> crate::sinks::traits::SinkMetadata {
crate::sinks::traits::SinkMetadata {
name: "console".to_string(),
sink_type: crate::sinks::traits::SinkType::Stackable,
enabled: self.is_running(),
description: Some(format!("Console sink with config: {:?}", self.config)),
}
}
}
#[async_trait]
impl StackableSink for ConsoleSink {
async fn send_event_internal(
&self,
event: &QuantumLogEvent,
strategy: crate::config::BackpressureStrategy,
) -> crate::sinks::traits::SinkResult<()> {
use crate::diagnostics::get_diagnostics_instance;
use tokio::time::{timeout, Duration};
let diagnostics = get_diagnostics_instance();
let internal_event = event.clone();
let send_fut = self.send_event_internal(internal_event);
let result = match strategy {
crate::config::BackpressureStrategy::Block => send_fut.await,
crate::config::BackpressureStrategy::Drop => {
match timeout(Duration::from_millis(100), send_fut).await {
Ok(res) => res,
Err(_) => return Err(crate::sinks::traits::SinkError::Backpressure),
}
}
};
match result {
Ok(()) => {
if let Some(d) = diagnostics {
d.increment_events_processed();
}
Ok(())
}
Err(e) => Err(crate::sinks::traits::SinkError::Generic(e.to_string())),
}
}
}