use super::destinations::LogDestination;
use super::types::LogEntry;
use crate::utils::error::gateway_error::{GatewayError, Result};
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::RwLock;
use tracing::{debug, error};
pub struct LogAggregator {
destinations: Vec<LogDestination>,
pub(crate) buffer: Arc<RwLock<Vec<LogEntry>>>,
flush_interval: Duration,
}
impl Default for LogAggregator {
fn default() -> Self {
Self::new()
}
}
impl LogAggregator {
pub fn new() -> Self {
Self {
destinations: vec![],
buffer: Arc::new(RwLock::new(Vec::new())),
flush_interval: Duration::from_secs(10),
}
}
pub fn add_destination(mut self, destination: LogDestination) -> Self {
self.destinations.push(destination);
self
}
pub async fn log(&self, entry: LogEntry) {
let mut buffer = self.buffer.write().await;
buffer.push(entry);
if buffer.len() >= 100 {
drop(buffer);
self.flush_buffer().await;
}
}
pub async fn flush_buffer(&self) {
let mut buffer = self.buffer.write().await;
if buffer.is_empty() {
return;
}
let entries = buffer.drain(..).collect::<Vec<_>>();
drop(buffer);
for destination in &self.destinations {
if let Err(e) = self.send_to_destination(destination, &entries).await {
error!("Failed to send logs to destination: {}", e);
}
}
}
async fn send_to_destination(
&self,
destination: &LogDestination,
entries: &[LogEntry],
) -> Result<()> {
match destination {
LogDestination::Elasticsearch {
url: _,
index: _,
auth: _,
} => {
debug!("Sending {} logs to Elasticsearch", entries.len());
}
LogDestination::Splunk {
url: _,
token: _,
index: _,
} => {
debug!("Sending {} logs to Splunk", entries.len());
}
LogDestination::DatadogLogs {
api_key: _,
site: _,
} => {
debug!("Sending {} logs to Datadog", entries.len());
}
LogDestination::Webhook { url, headers } => {
let client = reqwest::Client::new();
let mut request = client.post(url).json(entries);
for (key, value) in headers {
request = request.header(key, value);
}
request
.send()
.await
.map_err(|e| GatewayError::Network(e.to_string()))?;
}
_ => {
debug!("Sending {} logs to destination", entries.len());
}
}
Ok(())
}
pub async fn start_background_flush(&self) {
let aggregator = self.clone();
tokio::spawn(async move {
let mut interval = tokio::time::interval(aggregator.flush_interval);
loop {
interval.tick().await;
aggregator.flush_buffer().await;
}
});
}
}
impl Clone for LogAggregator {
fn clone(&self) -> Self {
Self {
destinations: self.destinations.clone(),
buffer: self.buffer.clone(),
flush_interval: self.flush_interval,
}
}
}