use crate::config::FileConfig;
use crate::core::event::QuantumLogEvent;
use crate::error::{QuantumLogError, Result};
use crate::sinks::file_common::{FileCleaner, FileWriter};
use crate::sinks::traits::{ExclusiveSink, QuantumSink, SinkError};
use crate::utils::FileTools;
use async_trait::async_trait;
use tokio::sync::{mpsc, oneshot};
use tokio::task::JoinHandle;
use tracing::Level;
use std::sync::{Arc, Mutex};
#[derive(Debug)]
pub struct FileSink {
config: FileConfig,
sender: Arc<Mutex<Option<mpsc::Sender<SinkMessage>>>>,
processor_handle: Arc<Mutex<Option<JoinHandle<()>>>>,
}
enum SinkMessage {
Event(Box<QuantumLogEvent>),
Shutdown(oneshot::Sender<Result<()>>),
}
struct FileSinkProcessor {
config: FileConfig,
receiver: mpsc::Receiver<SinkMessage>,
writer: FileWriter,
cleaner: Option<FileCleaner>,
level_filter: Option<Level>,
}
impl FileSink {
pub fn new(config: FileConfig) -> Self {
Self {
config,
sender: Arc::new(Mutex::new(None)),
processor_handle: Arc::new(Mutex::new(None)),
}
}
pub fn with_level_filter(self, _level: Level) -> Self {
self
}
pub async fn start(&mut self) -> Result<()> {
if self.sender.lock().unwrap().is_some() {
return Err(QuantumLogError::ConfigError(
"Sink already started".to_string(),
));
}
let buffer_size = 1000;
let (sender, receiver) = mpsc::channel(buffer_size);
let processor = FileSinkProcessor::new(self.config.clone(), receiver).await?;
let handle = tokio::spawn(async move {
if let Err(e) = processor.run().await {
tracing::error!("FileSink processor error: {}", e);
}
});
{
let mut guard = self.sender.lock().unwrap();
*guard = Some(sender);
}
{
let mut guard = self.processor_handle.lock().unwrap();
*guard = Some(handle);
}
Ok(())
}
pub async fn send_event_internal(&self, event: QuantumLogEvent) -> Result<()> {
let sender_opt = {
let guard = self.sender.lock().unwrap();
guard.as_ref().cloned()
};
if let Some(sender) = sender_opt {
let message = SinkMessage::Event(Box::new(event));
sender
.send(message)
.await
.map_err(|_| QuantumLogError::SinkError("Failed to send event to FileSink".to_string()))?;
}
Ok(())
}
pub fn try_send_event(&self, event: QuantumLogEvent) -> Result<()> {
let sender_opt = {
let guard = self.sender.lock().unwrap();
guard.as_ref().cloned()
};
if let Some(sender) = sender_opt {
let message = SinkMessage::Event(Box::new(event));
sender.try_send(message).map_err(|e| match e {
mpsc::error::TrySendError::Full(_) => {
QuantumLogError::SinkError("FileSink buffer full".to_string())
}
mpsc::error::TrySendError::Closed(_) => {
QuantumLogError::SinkError("FileSink closed".to_string())
}
})?;
}
Ok(())
}
async fn shutdown_ref(&self) -> Result<()> {
let sender_taken = {
let mut guard = self.sender.lock().unwrap();
guard.take()
};
if let Some(sender) = sender_taken {
let (tx, rx) = oneshot::channel();
if sender.send(SinkMessage::Shutdown(tx)).await.is_err() {
return Err(QuantumLogError::SinkError(
"Failed to send shutdown signal".to_string(),
));
}
match rx.await {
Ok(result) => result?,
Err(_) => {
return Err(QuantumLogError::SinkError(
"Shutdown signal lost".to_string(),
))
}
}
}
let handle_taken = {
let mut guard = self.processor_handle.lock().unwrap();
guard.take()
};
if let Some(handle) = handle_taken {
if let Err(e) = handle.await {
tracing::error!("Error waiting for FileSink processor: {}", e);
}
}
Ok(())
}
pub async fn shutdown(self) -> Result<()> {
self.shutdown_ref().await
}
pub fn is_running(&self) -> bool {
self.sender.lock().unwrap().is_some()
}
pub fn config(&self) -> &FileConfig {
&self.config
}
}
impl FileSinkProcessor {
async fn new(config: FileConfig, receiver: mpsc::Receiver<SinkMessage>) -> Result<Self> {
FileTools::ensure_directory_exists(&config.directory)?;
if !FileTools::is_directory_writable(&config.directory) {
return Err(QuantumLogError::IoError {
source: std::io::Error::new(
std::io::ErrorKind::PermissionDenied,
format!("目录不可写: {}", config.directory.display()),
),
});
}
let writer = FileWriter::new(config.clone()).await?;
let cleaner = if let Some(ref rotation) = config.rotation {
let base_path = config.directory.to_string_lossy().to_string();
let mut cleaner = FileCleaner::new(&base_path);
if let Some(max_files) = rotation.max_files {
cleaner = cleaner.max_files(max_files);
}
Some(cleaner)
} else {
None
};
let level_filter = if let Some(ref level_str) = config.level {
Some(level_str.parse::<Level>().map_err(|_| {
QuantumLogError::ConfigError(format!("Invalid log level: {}", level_str))
})?)
} else {
None
};
Ok(Self {
config,
receiver,
writer,
cleaner,
level_filter,
})
}
async fn run(mut self) -> Result<()> {
while let Some(message) = self.receiver.recv().await {
match message {
SinkMessage::Event(event) => {
if let Err(e) = self.handle_event(*event).await {
tracing::error!("Error handling event in FileSink: {}", e);
}
}
SinkMessage::Shutdown(response) => {
let result = self.shutdown().await;
let _ = response.send(result);
break;
}
}
}
Ok(())
}
async fn handle_event(&mut self, event: QuantumLogEvent) -> Result<()> {
if let Some(ref filter_level) = self.level_filter {
let event_level = event
.level
.parse::<Level>()
.map_err(|_| QuantumLogError::ConfigError(format!("Invalid log level: {}", event.level)))?;
if event_level < *filter_level {
return Ok(());
}
}
let formatted = self.format_event(&event)?;
self.writer.write(formatted.as_bytes()).await?;
Ok(())
}
fn format_event(&self, event: &QuantumLogEvent) -> Result<String> {
let format_type = match self.config.output_type {
crate::config::FileOutputType::Json => "json",
crate::config::FileOutputType::Text => "full",
crate::config::FileOutputType::Csv => "csv",
};
Ok(format!(
"{}\n",
event.to_formatted_string(format_type)
))
}
async fn shutdown(&mut self) -> Result<()> {
if let Err(e) = self.writer.flush().await {
tracing::error!("Error flushing file writer: {}", e);
}
if let Some(ref cleaner) = self.cleaner {
match cleaner.cleanup().await {
Ok(removed) => {
if removed > 0 {
tracing::info!("Cleaned up {} old log files", removed);
}
}
Err(e) => {
tracing::error!("Error during log file cleanup: {}", e);
}
}
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::core::event::ContextInfo;
use tempfile::TempDir;
use tracing::Level;
fn create_test_event(level: Level, message: &str) -> QuantumLogEvent {
static CALLSITE: tracing::callsite::DefaultCallsite =
tracing::callsite::DefaultCallsite::new(&tracing::Metadata::new(
"test",
"test_target",
Level::INFO,
Some("test.rs"),
Some(42),
Some("test_module"),
tracing::field::FieldSet::new(&[], tracing::callsite::Identifier(&CALLSITE)),
tracing::metadata::Kind::EVENT,
));
let fields = tracing::field::FieldSet::new(&[], tracing::callsite::Identifier(&CALLSITE));
let metadata = tracing::Metadata::new(
"test",
"test_target",
level,
Some("test.rs"),
Some(42),
Some("test_module"),
fields,
tracing::metadata::Kind::EVENT,
);
QuantumLogEvent::new(
level,
message.to_string(),
&metadata,
std::collections::HashMap::new(),
ContextInfo::default(),
)
}
#[tokio::test]
async fn test_file_sink_creation() {
let temp_dir = TempDir::new().unwrap();
let file_path = temp_dir.path().join("test.log");
let config = FileConfig {
enabled: true,
level: None,
output_type: crate::config::FileOutputType::Text,
directory: temp_dir.path().to_path_buf(),
filename_base: "test".to_string(),
extension: Some("log".to_string()),
separation_strategy: crate::config::FileSeparationStrategy::None,
write_buffer_size: 8192,
rotation: None,
writer_cache_ttl_seconds: 300,
writer_cache_capacity: 100,
};
let sink = FileSink::new(config);
assert!(!sink.is_running());
}
#[tokio::test]
async fn test_file_sink_start_stop() {
let temp_dir = TempDir::new().unwrap();
let file_path = temp_dir.path().join("test.log");
let config = FileConfig {
enabled: true,
level: None,
output_type: crate::config::FileOutputType::Text,
directory: temp_dir.path().to_path_buf(),
filename_base: "test".to_string(),
extension: Some("log".to_string()),
separation_strategy: crate::config::FileSeparationStrategy::None,
write_buffer_size: 8192,
rotation: None,
writer_cache_ttl_seconds: 300,
writer_cache_capacity: 100,
};
let mut sink = FileSink::new(config);
let result = sink.start().await;
assert!(result.is_ok());
assert!(sink.is_running());
let result = sink.shutdown().await;
assert!(result.is_ok());
}
#[tokio::test]
async fn test_file_sink_send_events() {
let temp_dir = TempDir::new().unwrap();
let file_path = temp_dir.path().join("test.log");
let config = FileConfig {
enabled: true,
level: None,
output_type: crate::config::FileOutputType::Text,
directory: temp_dir.path().to_path_buf(),
filename_base: "test".to_string(),
extension: Some("log".to_string()),
separation_strategy: crate::config::FileSeparationStrategy::None,
write_buffer_size: 8192,
rotation: None,
writer_cache_ttl_seconds: 300,
writer_cache_capacity: 100,
};
let mut sink = FileSink::new(config);
sink.start().await.unwrap();
let event = create_test_event(Level::INFO, "Test message");
sink.send_event(event).await.unwrap();
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
sink.shutdown().await.unwrap();
assert!(file_path.exists());
let content = tokio::fs::read_to_string(&file_path).await.unwrap();
assert!(content.contains("Test message"));
}
#[tokio::test]
async fn test_file_sink_try_send() {
let temp_dir = TempDir::new().unwrap();
let file_path = temp_dir.path().join("test.log");
let config = FileConfig {
enabled: true,
level: None,
output_type: crate::config::FileOutputType::Text,
directory: temp_dir.path().to_path_buf(),
filename_base: "test".to_string(),
extension: Some("log".to_string()),
separation_strategy: crate::config::FileSeparationStrategy::None,
write_buffer_size: 8192,
rotation: None,
writer_cache_ttl_seconds: 300,
writer_cache_capacity: 100,
};
let mut sink = FileSink::new(config);
sink.start().await.unwrap();
let event = create_test_event(Level::INFO, "Try send message");
let result = sink.try_send_event(event);
assert!(result.is_ok());
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
sink.shutdown().await.unwrap();
let content = tokio::fs::read_to_string(&file_path).await.unwrap();
assert!(content.contains("Try send message"));
}
#[tokio::test]
async fn test_file_sink_multiple_events() {
let temp_dir = TempDir::new().unwrap();
let file_path = temp_dir.path().join("test.log");
let config = FileConfig {
enabled: true,
level: None,
output_type: crate::config::FileOutputType::Text,
directory: temp_dir.path().to_path_buf(),
filename_base: "test".to_string(),
extension: Some("log".to_string()),
separation_strategy: crate::config::FileSeparationStrategy::None,
write_buffer_size: 8192,
rotation: None,
writer_cache_ttl_seconds: 300,
writer_cache_capacity: 100,
};
let mut sink = FileSink::new(config);
sink.start().await.unwrap();
for i in 0..10 {
let event = create_test_event(Level::INFO, &format!("Message {}", i));
sink.send_event(event).await.unwrap();
}
tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
sink.shutdown().await.unwrap();
let content = tokio::fs::read_to_string(&file_path).await.unwrap();
for i in 0..10 {
assert!(content.contains(&format!("Message {}", i)));
}
}
#[tokio::test]
async fn test_file_sink_config_access() {
let temp_dir = TempDir::new().unwrap();
let file_path = temp_dir.path().join("test.log");
let config = FileConfig {
enabled: true,
level: None,
output_type: crate::config::FileOutputType::Text,
directory: temp_dir.path().to_path_buf(),
filename_base: "test".to_string(),
extension: Some("log".to_string()),
separation_strategy: crate::config::FileSeparationStrategy::None,
write_buffer_size: 8192,
rotation: None,
writer_cache_ttl_seconds: 300,
writer_cache_capacity: 100,
};
let sink = FileSink::new(config.clone());
assert_eq!(sink.config().directory, config.directory);
assert_eq!(sink.config().filename_base, config.filename_base);
assert_eq!(sink.config().extension, config.extension);
}
}
#[async_trait]
impl QuantumSink for FileSink {
type Config = FileConfig;
type Error = SinkError;
async fn send_event(&self, event: QuantumLogEvent) -> std::result::Result<(), Self::Error> {
self.send_event_internal(event).await.map_err(|e| match e {
QuantumLogError::ChannelError(msg) => SinkError::Generic(msg),
QuantumLogError::ConfigError(msg) => SinkError::Config(msg),
QuantumLogError::IoError { source } => SinkError::Io(source),
_ => SinkError::Generic(e.to_string()),
})
}
async fn shutdown(&self) -> std::result::Result<(), Self::Error> {
self.shutdown_ref().await.map_err(|e| match e {
QuantumLogError::ChannelError(msg) => SinkError::Generic(msg),
QuantumLogError::ConfigError(msg) => SinkError::Config(msg),
QuantumLogError::IoError { source } => SinkError::Io(source),
_ => SinkError::Generic(e.to_string()),
})
}
async fn is_healthy(&self) -> bool {
self.is_running()
}
fn name(&self) -> &'static str {
"file"
}
fn stats(&self) -> String {
format!(
"FileSink: running={}, file={}/{}.{}",
self.is_running(),
self.config.directory.display(),
self.config.filename_base,
self.config.extension.as_deref().unwrap_or("log")
)
}
fn metadata(&self) -> crate::sinks::traits::SinkMetadata {
crate::sinks::traits::SinkMetadata {
name: "file".to_string(),
sink_type: crate::sinks::traits::SinkType::Exclusive,
enabled: self.is_running(),
description: Some(format!(
"File sink writing to {}/{}.{}",
self.config.directory.display(),
self.config.filename_base,
self.config.extension.as_deref().unwrap_or("log")
)),
}
}
}
impl ExclusiveSink for FileSink {}