use crate::core::event::QuantumLogEvent;
use async_trait::async_trait;
use std::error::Error as StdError;
use std::fmt::Debug;
use std::result::Result;
#[async_trait]
pub trait QuantumSink: Send + Sync + Debug {
type Config: Send + Sync + Debug + Clone;
type Error: StdError + Send + Sync + 'static;
async fn send_event(&self, event: QuantumLogEvent) -> Result<(), Self::Error>;
async fn shutdown(&self) -> Result<(), Self::Error>;
async fn is_healthy(&self) -> bool {
true
}
fn name(&self) -> &'static str;
fn stats(&self) -> String {
format!("Sink: {}, Status: Healthy", self.name())
}
fn metadata(&self) -> SinkMetadata;
}
pub trait ExclusiveSink: QuantumSink {}
#[async_trait]
pub trait StackableSink: QuantumSink {
async fn send_event_internal(
&self,
event: &QuantumLogEvent,
strategy: crate::config::BackpressureStrategy,
) -> SinkResult<()> {
let _ = strategy; self.send_event(event.clone())
.await
.map_err(|e| SinkError::Generic(e.to_string()))
}
}
#[async_trait]
pub trait SinkFactory<T: QuantumSink> {
async fn create_sink(config: T::Config) -> Result<T, T::Error>;
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum SinkType {
Exclusive,
Stackable,
}
#[derive(Debug, Clone)]
pub struct SinkMetadata {
pub name: String,
pub sink_type: SinkType,
pub enabled: bool,
pub description: Option<String>,
}
impl SinkMetadata {
pub fn new(name: String, sink_type: SinkType) -> Self {
Self {
name,
sink_type,
enabled: true,
description: None,
}
}
pub fn with_description(mut self, description: String) -> Self {
self.description = Some(description);
self
}
pub fn with_enabled(mut self, enabled: bool) -> Self {
self.enabled = enabled;
self
}
}
#[derive(Debug, thiserror::Error)]
pub enum SinkError {
#[error("Configuration error: {0}")]
Config(String),
#[error("I/O error: {0}")]
Io(#[from] std::io::Error),
#[error("Serialization error: {0}")]
Serialization(String),
#[error("Network error: {0}")]
Network(String),
#[cfg(feature = "database")]
#[error("Database error: {0}")]
Database(String),
#[error("Generic error: {0}")]
Generic(String),
#[error("Sink is closed")]
Closed,
#[error("Backpressure limit exceeded")]
Backpressure,
}
pub type SinkResult<T> = Result<T, SinkError>;
#[async_trait]
pub trait QuantumSinkDyn: Send + Sync + Debug {
async fn send_event_dyn(&self, event: QuantumLogEvent) -> SinkResult<()>;
async fn shutdown_dyn(&self) -> SinkResult<()>;
async fn is_healthy_dyn(&self) -> bool;
fn name_dyn(&self) -> &'static str;
fn stats_dyn(&self) -> String;
fn metadata_dyn(&self) -> SinkMetadata;
}
#[async_trait]
impl<T> QuantumSinkDyn for T
where
T: QuantumSink<Error = SinkError> + Send + Sync + Debug,
{
async fn send_event_dyn(&self, event: QuantumLogEvent) -> SinkResult<()> {
self.send_event(event).await
}
async fn shutdown_dyn(&self) -> SinkResult<()> {
self.shutdown().await
}
async fn is_healthy_dyn(&self) -> bool {
self.is_healthy().await
}
fn name_dyn(&self) -> &'static str {
self.name()
}
fn stats_dyn(&self) -> String {
self.stats()
}
fn metadata_dyn(&self) -> SinkMetadata {
self.metadata()
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
#[derive(Debug)]
struct MockSink {
name: &'static str,
event_count: Arc<AtomicU64>,
should_fail: bool,
}
impl MockSink {
fn new(name: &'static str) -> Self {
Self {
name,
event_count: Arc::new(AtomicU64::new(0)),
should_fail: false,
}
}
fn with_failure(mut self) -> Self {
self.should_fail = true;
self
}
fn event_count(&self) -> u64 {
self.event_count.load(Ordering::Relaxed)
}
}
#[async_trait]
impl QuantumSink for MockSink {
type Config = ();
type Error = SinkError;
async fn send_event(&self, _event: QuantumLogEvent) -> Result<(), Self::Error> {
if self.should_fail {
return Err(SinkError::Generic("Mock failure".to_string()));
}
self.event_count.fetch_add(1, Ordering::Relaxed);
Ok(())
}
async fn shutdown(&self) -> Result<(), Self::Error> {
Ok(())
}
async fn is_healthy(&self) -> bool {
!self.should_fail
}
fn name(&self) -> &'static str {
self.name
}
fn stats(&self) -> String {
format!("MockSink[{}]: {} events", self.name, self.event_count())
}
fn metadata(&self) -> SinkMetadata {
SinkMetadata {
name: self.name.to_string(),
sink_type: SinkType::Stackable,
enabled: true,
description: Some("Mock sink for testing".to_string()),
}
}
}
#[async_trait]
impl StackableSink for MockSink {
async fn send_event_internal(
&self,
event: &QuantumLogEvent,
_strategy: crate::config::BackpressureStrategy,
) -> SinkResult<()> {
self.send_event(event.clone()).await
}
}
#[derive(Debug)]
struct MockExclusiveSink {
inner: MockSink,
}
impl MockExclusiveSink {
fn new(name: &'static str) -> Self {
Self {
inner: MockSink::new(name),
}
}
}
#[async_trait]
impl QuantumSink for MockExclusiveSink {
type Config = ();
type Error = SinkError;
async fn send_event(&self, event: QuantumLogEvent) -> Result<(), Self::Error> {
self.inner.send_event(event).await
}
async fn shutdown(&self) -> Result<(), Self::Error> {
self.inner.shutdown().await
}
async fn is_healthy(&self) -> bool {
self.inner.is_healthy().await
}
fn name(&self) -> &'static str {
self.inner.name()
}
fn stats(&self) -> String {
self.inner.stats()
}
fn metadata(&self) -> SinkMetadata {
self.inner.metadata()
}
}
impl ExclusiveSink for MockExclusiveSink {}
fn create_test_event() -> QuantumLogEvent {
use crate::core::event::ContextInfo;
use std::collections::HashMap;
QuantumLogEvent {
timestamp: chrono::Utc::now(),
level: "INFO".to_string(),
target: "test".to_string(),
message: "Test message".to_string(),
module_path: Some("test::module".to_string()),
file: Some("test.rs".to_string()),
line: Some(42),
thread_name: Some("test-thread".to_string()),
thread_id: "test-thread-id".to_string(),
fields: HashMap::new(),
context: ContextInfo {
pid: std::process::id(),
tid: 12345,
username: None,
hostname: None,
mpi_rank: None,
custom_fields: HashMap::new(),
},
}
}
#[tokio::test]
async fn test_stackable_sink_basic_functionality() {
let sink = MockSink::new("test_stackable");
let event = create_test_event();
assert!(sink.send_event(event).await.is_ok());
assert_eq!(sink.event_count(), 1);
assert!(sink.is_healthy().await);
let stats = sink.stats();
assert!(stats.contains("test_stackable"));
assert!(stats.contains("1 events"));
assert!(sink.shutdown().await.is_ok());
}
#[tokio::test]
async fn test_exclusive_sink_basic_functionality() {
let sink = MockExclusiveSink::new("test_exclusive");
let event = create_test_event();
assert!(sink.send_event(event).await.is_ok());
assert!(sink.is_healthy().await);
let metadata = sink.metadata();
assert_eq!(metadata.name, "test_exclusive");
assert_eq!(metadata.sink_type, SinkType::Stackable);
assert!(sink.shutdown().await.is_ok());
}
#[tokio::test]
async fn test_sink_error_handling() {
let sink = MockSink::new("failing_sink").with_failure();
let event = create_test_event();
let result = sink.send_event(event).await;
assert!(result.is_err());
if let Err(SinkError::Generic(msg)) = result {
assert_eq!(msg, "Mock failure");
} else {
panic!("Expected Generic error");
}
assert!(!sink.is_healthy().await);
}
#[tokio::test]
async fn test_sink_error_types() {
let io_error = SinkError::Io(std::io::Error::new(
std::io::ErrorKind::PermissionDenied,
"Permission denied",
));
assert!(io_error.to_string().contains("I/O error"));
let config_error = SinkError::Config("Invalid config".to_string());
assert!(config_error.to_string().contains("Configuration error"));
let network_error = SinkError::Network("Connection failed".to_string());
assert!(network_error.to_string().contains("Network error"));
let closed_error = SinkError::Closed;
assert!(closed_error.to_string().contains("Sink is closed"));
let backpressure_error = SinkError::Backpressure;
assert!(backpressure_error.to_string().contains("Backpressure"));
}
#[tokio::test]
async fn test_sink_metadata() {
let sink = MockSink::new("metadata_test");
let metadata = sink.metadata();
assert_eq!(metadata.name, "metadata_test");
assert_eq!(metadata.sink_type, SinkType::Stackable);
assert!(metadata.enabled);
assert!(metadata.description.as_ref().unwrap().contains("Mock sink"));
}
#[tokio::test]
async fn test_concurrent_sink_operations() {
let sink = Arc::new(MockSink::new("concurrent_test"));
let mut handles = vec![];
for i in 0..10 {
let sink_clone = Arc::clone(&sink);
let handle = tokio::spawn(async move {
let mut event = create_test_event();
event.message = format!("Message {}", i);
sink_clone.send_event(event).await
});
handles.push(handle);
}
for handle in handles {
assert!(handle.await.unwrap().is_ok());
}
assert_eq!(sink.event_count(), 10);
}
}