use crate::config::{NetworkConfig, NetworkProtocol};
use crate::core::event::QuantumLogEvent;
use crate::error::{QuantumLogError, Result};
use crate::sinks::traits::{ExclusiveSink, QuantumSink, SinkError};
use async_trait::async_trait;
use std::net::SocketAddr;
use std::sync::Arc;
use tokio::io::{AsyncWriteExt, BufWriter};
use tokio::net::{TcpStream, UdpSocket};
use tokio::sync::{mpsc, oneshot, Mutex};
use tokio::task::JoinHandle;
use tokio::time::{timeout, Duration};
use tracing::Level;
#[derive(Debug)]
pub struct NetworkSink {
config: NetworkConfig,
sender: Option<mpsc::Sender<SinkMessage>>,
processor_handle: Option<JoinHandle<()>>,
}
enum SinkMessage {
Event(Box<QuantumLogEvent>),
Shutdown(oneshot::Sender<Result<()>>),
}
enum NetworkConnection {
Tcp(Arc<Mutex<BufWriter<TcpStream>>>),
Udp(Arc<UdpSocket>, SocketAddr),
}
struct NetworkSinkProcessor {
config: NetworkConfig,
receiver: mpsc::Receiver<SinkMessage>,
connection: Option<NetworkConnection>,
reconnect_count: usize,
level_filter: Option<Level>,
}
impl NetworkSink {
pub fn new(config: NetworkConfig) -> Self {
Self {
config,
sender: None,
processor_handle: None,
}
}
pub fn with_level_filter(self, _level: Level) -> Self {
self
}
pub async fn start(&mut self) -> Result<()> {
if self.sender.is_some() {
return Err(QuantumLogError::ConfigError(
"Sink already started".to_string(),
));
}
let buffer_size = 1000;
let (sender, receiver) = mpsc::channel(buffer_size);
let processor = NetworkSinkProcessor::new(self.config.clone(), receiver).await?;
let handle = tokio::spawn(async move {
if let Err(e) = processor.run().await {
tracing::error!("NetworkSink processor error: {}", e);
}
});
self.sender = Some(sender);
self.processor_handle = Some(handle);
Ok(())
}
pub async fn send_event_internal(&self, event: QuantumLogEvent) -> Result<()> {
if let Some(sender) = &self.sender {
let message = SinkMessage::Event(Box::new(event));
sender.send(message).await.map_err(|_| {
QuantumLogError::SinkError("Failed to send event to NetworkSink".to_string())
})?;
} else {
return Err(QuantumLogError::SinkError(
"NetworkSink not started".to_string(),
));
}
Ok(())
}
pub async fn shutdown(mut self) -> Result<()> {
if let Some(sender) = self.sender.take() {
let (tx, rx) = oneshot::channel();
if sender.send(SinkMessage::Shutdown(tx)).await.is_err() {
return Err(QuantumLogError::SinkError(
"Failed to send shutdown signal".to_string(),
));
}
match rx.await {
Ok(result) => result?,
Err(_) => {
return Err(QuantumLogError::SinkError(
"Shutdown signal lost".to_string(),
))
}
}
}
if let Some(handle) = self.processor_handle.take() {
if let Err(e) = handle.await {
tracing::error!("Error waiting for NetworkSink processor: {}", e);
}
}
Ok(())
}
pub fn is_running(&self) -> bool {
self.sender.is_some()
}
pub fn config(&self) -> &NetworkConfig {
&self.config
}
}
impl NetworkSinkProcessor {
async fn new(config: NetworkConfig, receiver: mpsc::Receiver<SinkMessage>) -> Result<Self> {
let mut processor = Self {
config,
receiver,
connection: None,
reconnect_count: 0,
level_filter: None,
};
if let Err(e) = processor.connect().await {
tracing::warn!("Failed to establish initial network connection: {}", e);
}
Ok(processor)
}
async fn connect(&mut self) -> Result<()> {
let address = format!("{}:{}", self.config.host, self.config.port);
let socket_addr: SocketAddr = address.parse().map_err(|e| {
QuantumLogError::NetworkError(format!("Invalid address {}: {}", address, e))
})?;
match self.config.protocol {
NetworkProtocol::Tcp => {
let stream = timeout(
Duration::from_millis(self.config.timeout_ms.unwrap_or(30000)),
TcpStream::connect(socket_addr),
)
.await
.map_err(|_| QuantumLogError::NetworkError("Connection timeout".to_string()))?
.map_err(|e| {
QuantumLogError::NetworkError(format!("TCP connection failed: {}", e))
})?;
let writer = BufWriter::new(stream);
self.connection = Some(NetworkConnection::Tcp(Arc::new(Mutex::new(writer))));
tracing::info!("TCP connection established to {}", address);
}
NetworkProtocol::Udp => {
let socket = UdpSocket::bind("0.0.0.0:0").await.map_err(|e| {
QuantumLogError::NetworkError(format!("UDP socket bind failed: {}", e))
})?;
self.connection = Some(NetworkConnection::Udp(Arc::new(socket), socket_addr));
tracing::info!("UDP socket created for {}", address);
}
NetworkProtocol::Http => {
todo!("HTTP protocol support not yet implemented")
}
}
self.reconnect_count = 0;
Ok(())
}
async fn reconnect(&mut self) -> Result<()> {
if self.reconnect_count >= 5 {
return Err(QuantumLogError::NetworkError(
"Max reconnection attempts (5) exceeded".to_string(),
));
}
self.reconnect_count += 1;
tokio::time::sleep(Duration::from_secs(5)).await;
tracing::info!("Attempting reconnection #{}", self.reconnect_count);
self.connect().await
}
async fn run(mut self) -> Result<()> {
while let Some(message) = self.receiver.recv().await {
match message {
SinkMessage::Event(event) => {
if let Err(e) = self.handle_event(*event).await {
tracing::error!("Error handling event in NetworkSink: {}", e);
if matches!(e, QuantumLogError::NetworkError(_)) {
if let Err(reconnect_err) = self.reconnect().await {
tracing::error!("Reconnection failed: {}", reconnect_err);
}
}
}
}
SinkMessage::Shutdown(response) => {
let result = self.shutdown().await;
let _ = response.send(result);
break;
}
}
}
Ok(())
}
async fn handle_event(&mut self, event: QuantumLogEvent) -> Result<()> {
if let Some(ref filter_level) = self.level_filter {
let event_level = event.level.parse::<Level>().map_err(|_| {
QuantumLogError::ConfigError(format!("Invalid log level: {}", event.level))
})?;
if event_level < *filter_level {
return Ok(());
}
}
if self.connection.is_none() {
self.connect().await?;
}
let formatted = self.format_event(&event)?;
let data = formatted.as_bytes();
match &self.connection {
Some(NetworkConnection::Tcp(writer_arc)) => {
let mut writer = writer_arc.lock().await;
writer.write_all(data).await.map_err(|e| {
QuantumLogError::NetworkError(format!("TCP write failed: {}", e))
})?;
writer.write_all(b"\n").await.map_err(|e| {
QuantumLogError::NetworkError(format!("TCP write failed: {}", e))
})?;
writer.flush().await.map_err(|e| {
QuantumLogError::NetworkError(format!("TCP flush failed: {}", e))
})?;
}
Some(NetworkConnection::Udp(socket, addr)) => {
socket.send_to(data, addr).await.map_err(|e| {
QuantumLogError::NetworkError(format!("UDP send failed: {}", e))
})?;
}
None => {
return Err(QuantumLogError::NetworkError(
"No active connection".to_string(),
));
}
}
Ok(())
}
fn format_event(&self, event: &QuantumLogEvent) -> Result<String> {
match self.config.format {
crate::config::OutputFormat::Text => Ok(event.to_formatted_string("full")),
crate::config::OutputFormat::Json => event
.to_json()
.map_err(|e| QuantumLogError::SerializationError { source: e }),
crate::config::OutputFormat::Csv => {
let csv_row = event.to_csv_row();
Ok(csv_row.join(","))
}
}
}
async fn shutdown(&mut self) -> Result<()> {
if let Some(NetworkConnection::Tcp(writer_arc)) = &self.connection {
let mut writer = writer_arc.lock().await;
if let Err(e) = writer.flush().await {
tracing::error!("Error flushing TCP connection: {}", e);
}
}
self.connection = None;
tracing::info!("NetworkSink shutdown completed");
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::config::OutputFormat;
use crate::core::event::ContextInfo;
use tokio::net::{TcpListener, UdpSocket};
use tracing::Level;
fn create_test_event(level: Level, message: &str) -> QuantumLogEvent {
static CALLSITE: tracing::callsite::DefaultCallsite =
tracing::callsite::DefaultCallsite::new(&tracing::Metadata::new(
"test",
"quantum_log::network::test",
Level::INFO,
Some(file!()),
Some(line!()),
Some(module_path!()),
tracing::field::FieldSet::new(&[], tracing::callsite::Identifier(&CALLSITE)),
tracing::metadata::Kind::EVENT,
));
let metadata = tracing::Metadata::new(
"test",
"test_target",
level,
Some("test.rs"),
Some(42),
Some("test_module"),
tracing::field::FieldSet::new(&[], tracing::callsite::Identifier(&CALLSITE)),
tracing::metadata::Kind::EVENT,
);
QuantumLogEvent::new(
level,
message.to_string(),
&metadata,
std::collections::HashMap::new(),
ContextInfo::default(),
)
}
#[tokio::test]
async fn test_network_sink_creation() {
let config = NetworkConfig {
enabled: true,
level: None,
host: "127.0.0.1".to_string(),
port: 8080,
protocol: NetworkProtocol::Tcp,
format: OutputFormat::Text,
buffer_size: 8192,
timeout_ms: Some(30000),
};
let sink = NetworkSink::new(config);
assert!(!sink.is_running());
}
#[tokio::test]
async fn test_tcp_network_sink() {
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
let server_handle = tokio::spawn(async move {
if let Ok((stream, _)) = listener.accept().await {
let mut buffer = [0; 1024];
let _ = stream.readable().await;
let _ = stream.try_read(&mut buffer);
}
});
let config = NetworkConfig {
enabled: true,
level: None,
host: addr.ip().to_string(),
port: addr.port(),
protocol: NetworkProtocol::Tcp,
format: OutputFormat::Text,
buffer_size: 8192,
timeout_ms: Some(5000),
};
let mut sink = NetworkSink::new(config);
let result = sink.start().await;
assert!(result.is_ok());
assert!(sink.is_running());
let event = create_test_event(Level::INFO, "Test TCP message");
let result = sink.send_event(event).await;
assert!(result.is_ok());
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
let result = sink.shutdown().await;
assert!(result.is_ok());
let _ = server_handle.await;
}
#[tokio::test]
async fn test_udp_network_sink() {
let server_socket = UdpSocket::bind("127.0.0.1:0").await.unwrap();
let addr = server_socket.local_addr().unwrap();
let server_handle = tokio::spawn(async move {
let mut buffer = [0; 1024];
let _ = server_socket.recv(&mut buffer).await;
});
let config = NetworkConfig {
enabled: true,
level: None,
host: addr.ip().to_string(),
port: addr.port(),
protocol: NetworkProtocol::Udp,
format: OutputFormat::Text,
buffer_size: 8192,
timeout_ms: Some(5000),
};
let mut sink = NetworkSink::new(config);
let result = sink.start().await;
assert!(result.is_ok());
assert!(sink.is_running());
let event = create_test_event(Level::INFO, "Test UDP message");
let result = sink.send_event(event).await;
assert!(result.is_ok());
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
let result = sink.shutdown().await;
assert!(result.is_ok());
let _ = server_handle.await;
}
#[tokio::test]
async fn test_network_sink_backpressure_drop() {
let config = NetworkConfig {
enabled: true,
level: None,
host: "127.0.0.1".to_string(),
port: 9999, protocol: NetworkProtocol::Tcp,
format: OutputFormat::Text,
buffer_size: 8192,
timeout_ms: Some(1000),
};
let mut sink = NetworkSink::new(config);
sink.start().await.unwrap();
let event = create_test_event(Level::INFO, "Test message");
let result = sink.send_event(event).await;
assert!(result.is_ok());
sink.shutdown().await.unwrap();
}
#[tokio::test]
async fn test_network_sink_json_format() {
let config = NetworkConfig {
enabled: true,
level: None,
host: "127.0.0.1".to_string(),
port: 8080,
protocol: NetworkProtocol::Tcp,
format: OutputFormat::Json,
buffer_size: 8192,
timeout_ms: Some(30000),
};
let sink = NetworkSink::new(config);
assert_eq!(sink.config().format, OutputFormat::Json);
assert_eq!(sink.config().protocol, NetworkProtocol::Tcp);
}
}
#[async_trait]
impl QuantumSink for NetworkSink {
type Config = NetworkConfig;
type Error = SinkError;
async fn send_event(&self, event: QuantumLogEvent) -> std::result::Result<(), Self::Error> {
self.send_event_internal(event).await.map_err(|e| match e {
QuantumLogError::ChannelError(msg) => SinkError::Generic(msg),
QuantumLogError::ConfigError(msg) => SinkError::Config(msg),
QuantumLogError::IoError { source } => SinkError::Io(source),
QuantumLogError::NetworkError(msg) => SinkError::Network(msg),
_ => SinkError::Generic(e.to_string()),
})
}
async fn shutdown(&self) -> std::result::Result<(), Self::Error> {
Err(SinkError::Generic(
"NetworkSink shutdown requires mutable reference".to_string(),
))
}
async fn is_healthy(&self) -> bool {
self.is_running()
}
fn name(&self) -> &'static str {
"network"
}
fn stats(&self) -> String {
format!(
"NetworkSink: running={}, protocol={:?}, target={}:{}",
self.is_running(),
self.config.protocol,
self.config.host,
self.config.port
)
}
fn metadata(&self) -> crate::sinks::traits::SinkMetadata {
crate::sinks::traits::SinkMetadata {
name: "network".to_string(),
sink_type: crate::sinks::traits::SinkType::Exclusive,
enabled: self.is_running(),
description: Some(format!(
"Network sink using {:?} protocol to {}:{}",
self.config.protocol, self.config.host, self.config.port
)),
}
}
}
impl ExclusiveSink for NetworkSink {}