use quantum_log::{
config::*,
core::event::{ContextInfo, QuantumLogEvent},
sinks::{
default_stdout::{DefaultStdoutConfig, DefaultStdoutSink, StdoutTarget},
traits::{ExclusiveSink, QuantumSink, SinkError, SinkMetadata, SinkType, StackableSink},
ErrorStrategy, Pipeline, PipelineConfig, PipelineBuilder,
},
};
use std::collections::HashMap;
use async_trait::async_trait;
use tokio::time::{sleep, Duration};
use tracing::Level;
#[derive(Debug, Clone)]
pub struct CustomStackableSink {
name: String,
prefix: String,
}
impl CustomStackableSink {
pub fn new(name: String, prefix: String) -> Self {
Self { name, prefix }
}
}
#[async_trait]
impl QuantumSink for CustomStackableSink {
type Config = ();
type Error = SinkError;
async fn send_event(&self, event: QuantumLogEvent) -> Result<(), Self::Error> {
println!(
"[{}] {}: {} - {}",
self.prefix, event.level, event.target, event.message
);
Ok(())
}
async fn shutdown(&self) -> Result<(), Self::Error> {
println!("[{}] Shutting down custom sink: {}", self.prefix, self.name);
Ok(())
}
async fn is_healthy(&self) -> bool {
true
}
fn name(&self) -> &'static str {
"custom_stackable"
}
fn stats(&self) -> String {
format!(
"CustomStackableSink: name={}, prefix={}",
self.name, self.prefix
)
}
fn metadata(&self) -> SinkMetadata {
SinkMetadata::new(self.name.clone(), SinkType::Stackable).with_description(format!(
"Custom stackable sink with prefix: {}",
self.prefix
))
}
}
impl StackableSink for CustomStackableSink {}
#[derive(Debug, Clone)]
pub struct CustomExclusiveSink {
name: String,
buffer: Vec<String>,
}
impl CustomExclusiveSink {
pub fn new(name: String) -> Self {
Self {
name,
buffer: Vec::new(),
}
}
pub fn get_logs(&self) -> &[String] {
&self.buffer
}
}
#[async_trait]
impl QuantumSink for CustomExclusiveSink {
type Config = ();
type Error = SinkError;
async fn send_event(&self, event: QuantumLogEvent) -> Result<(), Self::Error> {
println!(
"[EXCLUSIVE] {} - {} - {}",
event.level, event.target, event.message
);
Ok(())
}
async fn shutdown(&self) -> Result<(), Self::Error> {
println!(
"[EXCLUSIVE] Shutting down custom exclusive sink: {}",
self.name
);
Ok(())
}
async fn is_healthy(&self) -> bool {
true
}
fn name(&self) -> &'static str {
"custom_exclusive"
}
fn stats(&self) -> String {
format!(
"CustomExclusiveSink: name={}, buffer_size={}",
self.name,
self.buffer.len()
)
}
fn metadata(&self) -> SinkMetadata {
SinkMetadata::new(self.name.clone(), SinkType::Exclusive)
.with_description("Custom exclusive sink for testing".to_string())
}
}
impl ExclusiveSink for CustomExclusiveSink {}
fn create_test_event(level: Level, message: &str) -> QuantumLogEvent {
QuantumLogEvent {
timestamp: chrono::Utc::now(),
level: level.to_string(),
target: "example".to_string(),
message: message.to_string(),
module_path: Some("sink_trait_usage".to_string()),
file: Some("sink_trait_usage.rs".to_string()),
line: Some(42),
thread_name: Some("main".to_string()),
thread_id: format!("{:?}", std::thread::current().id()),
fields: HashMap::new(),
context: ContextInfo {
pid: std::process::id(),
tid: 0, username: None,
hostname: None,
mpi_rank: None,
custom_fields: HashMap::new(),
},
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
println!("=== QuantumLog 0.2.0 统一 Sink Trait 使用示例 ===");
println!("\n1. 使用默认标准输出 sink");
let default_config = DefaultStdoutConfig {
format: OutputFormat::Text,
colored: true,
target: StdoutTarget::Stdout,
..Default::default()
};
let default_stdout = DefaultStdoutSink::new(default_config).await?;
let json_config = DefaultStdoutConfig {
format: OutputFormat::Json,
colored: false,
target: StdoutTarget::Stdout,
..Default::default()
};
let json_stdout = DefaultStdoutSink::new(json_config).await?;
let event1 = create_test_event(Level::INFO, "使用默认标准输出 sink");
default_stdout.send_event(event1).await?;
let event2 = create_test_event(Level::WARN, "使用 JSON 格式标准输出 sink");
json_stdout.send_event(event2).await?;
println!("\n2. 创建自定义 sink");
let custom_stackable1 = CustomStackableSink::new("Logger1".to_string(), "CUSTOM1".to_string());
let custom_stackable2 = CustomStackableSink::new("Logger2".to_string(), "CUSTOM2".to_string());
let custom_exclusive = CustomExclusiveSink::new("ExclusiveLogger".to_string());
let event3 = create_test_event(Level::ERROR, "自定义 sink 测试消息");
custom_stackable1.send_event(event3.clone()).await?;
custom_stackable2.send_event(event3.clone()).await?;
custom_exclusive.send_event(event3).await?;
println!("\n3. 使用 Pipeline 管理多个 sink");
let pipeline_config = PipelineConfig {
name: "example_pipeline".to_string(),
parallel_processing: true,
max_retries: 3,
error_strategy: ErrorStrategy::LogAndContinue,
};
let pipeline = Pipeline::new(pipeline_config);
pipeline
.add_stackable_sink(custom_stackable1.clone(), custom_stackable1.metadata())
.await?;
pipeline
.add_stackable_sink(custom_stackable2.clone(), custom_stackable2.metadata())
.await?;
let events = vec![
create_test_event(Level::INFO, "Pipeline 测试消息 1"),
create_test_event(Level::WARN, "Pipeline 测试消息 2"),
create_test_event(Level::ERROR, "Pipeline 测试消息 3"),
];
for event in events {
pipeline.send_event(event).await?;
}
sleep(Duration::from_millis(100)).await;
println!("\n4. Pipeline 统计信息");
let stats = pipeline.get_stats().await;
println!("Pipeline 统计: {:?}", stats);
println!("\n5. 健康检查");
let health = pipeline.health_check().await;
println!("Pipeline 健康状态: {:?}", health);
println!("\n6. 优雅关闭 Pipeline");
pipeline.shutdown().await?;
println!("\n7. 关闭其他 sink");
json_stdout.shutdown().await?;
println!("\n=== 示例完成 ===");
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_custom_stackable_sink() {
let sink = CustomStackableSink::new("test".to_string(), "TEST".to_string());
let event = create_test_event(Level::INFO, "测试消息");
assert!(sink.send_event(event).await.is_ok());
assert!(sink.is_healthy().await);
assert_eq!(sink.name(), "custom_stackable");
assert!(sink.shutdown().await.is_ok());
}
#[tokio::test]
async fn test_custom_exclusive_sink() {
let sink = CustomExclusiveSink::new("test".to_string());
let event = create_test_event(Level::ERROR, "测试错误消息");
assert!(sink.send_event(event).await.is_ok());
assert!(sink.is_healthy().await);
assert_eq!(sink.name(), "custom_exclusive");
assert!(sink.shutdown().await.is_ok());
}
#[tokio::test]
async fn test_pipeline_with_custom_sinks() {
let config = PipelineConfig {
name: "test_pipeline".to_string(),
parallel_processing: false,
max_retries: 2,
error_strategy: ErrorStrategy::LogAndContinue,
};
let pipeline = Pipeline::new(config);
let stackable = CustomStackableSink::new("test_stackable".to_string(), "TEST".to_string());
pipeline
.add_stackable_sink(stackable.clone(), stackable.metadata())
.await
.unwrap();
let event = create_test_event(Level::INFO, "Pipeline 测试");
assert!(pipeline.send_event(event).await.is_ok());
sleep(Duration::from_millis(50)).await;
let health = pipeline.health_check().await;
assert!(health.overall_healthy);
assert!(pipeline.shutdown().await.is_ok());
}
#[tokio::test]
async fn test_pipeline_add_exclusive_sink() {
let pipeline = Pipeline::new(PipelineConfig::default());
let exclusive = CustomExclusiveSink::new("exclusive_one".to_string());
pipeline
.add_exclusive_sink(exclusive.clone(), exclusive.metadata())
.await
.unwrap();
let event = create_test_event(Level::INFO, "独占型 sink 事件");
assert!(pipeline.send_event(event).await.is_ok());
let stats = pipeline.get_stats().await;
assert_eq!(stats.exclusive_sink_count, 1);
assert_eq!(stats.enabled_sink_count, 1);
pipeline.shutdown().await.unwrap();
}
#[tokio::test]
async fn test_pipeline_builder_set_exclusive_sink() {
let exclusive = CustomExclusiveSink::new("builder_exclusive".to_string());
let pipeline = PipelineBuilder::new()
.with_name("builder_pipeline".to_string())
.with_parallel_processing(true)
.with_max_retries(3)
.with_error_strategy(ErrorStrategy::LogAndContinue)
.set_exclusive_sink(exclusive.clone(), exclusive.metadata())
.build();
let event = create_test_event(Level::WARN, "通过构建器设置独占型 sink");
assert!(pipeline.send_event(event).await.is_ok());
let stats = pipeline.get_stats().await;
assert_eq!(stats.name, "builder_pipeline");
assert_eq!(stats.exclusive_sink_count, 1);
assert_eq!(stats.enabled_sink_count, 1);
pipeline.shutdown().await.unwrap();
}
#[tokio::test]
async fn test_pipeline_with_exclusive_and_stackable_sinks() {
let pipeline = Pipeline::new(PipelineConfig::default());
let exclusive = CustomExclusiveSink::new("exclusive".to_string());
let stackable = CustomStackableSink::new("stackable".to_string(), "S".to_string());
pipeline
.add_exclusive_sink(exclusive.clone(), exclusive.metadata())
.await
.unwrap();
let res = pipeline
.add_stackable_sink(stackable.clone(), stackable.metadata())
.await;
assert!(
matches!(res, Err(SinkError::Config(ref msg)) if msg.contains("Cannot add stackable sink when exclusive sink is present")),
"expected Config error when adding stackable with exclusive present, got: {:?}",
res
);
for i in 0..3 {
let event = create_test_event(Level::INFO, &format!("事件 {}", i));
assert!(pipeline.send_event(event).await.is_ok());
}
let stats = pipeline.get_stats().await;
assert_eq!(stats.exclusive_sink_count, 1);
assert_eq!(stats.stackable_sink_count, 0);
assert_eq!(stats.enabled_sink_count, 1);
pipeline.shutdown().await.unwrap();
}
}