use super::data::InsightData;
use std::fs::{File, OpenOptions};
use std::io::{BufWriter, Write};
use std::path::PathBuf;
use std::sync::{Arc, Mutex};
#[derive(Debug, thiserror::Error)]
pub enum ExportError {
#[error("IO error: {0}")]
Io(#[from] std::io::Error),
#[error("Serialization error: {0}")]
Serialization(#[from] serde_json::Error),
#[error("HTTP error: {0}")]
Http(String),
#[error("Export sink unavailable: {0}")]
Unavailable(String),
}
pub type ExportResult<T> = Result<T, ExportError>;
pub trait InsightExporter: Send + Sync + 'static {
fn export(&self, insight: &InsightData) -> ExportResult<()>;
fn export_batch(&self, insights: &[InsightData]) -> ExportResult<()> {
for insight in insights {
self.export(insight)?;
}
Ok(())
}
fn flush(&self) -> ExportResult<()> {
Ok(())
}
fn close(&self) -> ExportResult<()> {
self.flush()
}
fn clone_exporter(&self) -> Box<dyn InsightExporter>;
}
pub struct FileExporter {
path: PathBuf,
writer: Arc<Mutex<BufWriter<File>>>,
}
impl FileExporter {
pub fn new(path: impl Into<PathBuf>) -> ExportResult<Self> {
let path = path.into();
let file = OpenOptions::new().create(true).append(true).open(&path)?;
let writer = BufWriter::new(file);
Ok(Self {
path,
writer: Arc::new(Mutex::new(writer)),
})
}
pub fn path(&self) -> &PathBuf {
&self.path
}
}
impl Clone for FileExporter {
fn clone(&self) -> Self {
Self {
path: self.path.clone(),
writer: self.writer.clone(),
}
}
}
impl InsightExporter for FileExporter {
fn export(&self, insight: &InsightData) -> ExportResult<()> {
let mut writer = self
.writer
.lock()
.map_err(|e| ExportError::Unavailable(e.to_string()))?;
let json = serde_json::to_string(insight)?;
writeln!(writer, "{}", json)?;
Ok(())
}
fn export_batch(&self, insights: &[InsightData]) -> ExportResult<()> {
let mut writer = self
.writer
.lock()
.map_err(|e| ExportError::Unavailable(e.to_string()))?;
for insight in insights {
let json = serde_json::to_string(insight)?;
writeln!(writer, "{}", json)?;
}
Ok(())
}
fn flush(&self) -> ExportResult<()> {
let mut writer = self
.writer
.lock()
.map_err(|e| ExportError::Unavailable(e.to_string()))?;
writer.flush()?;
Ok(())
}
fn clone_exporter(&self) -> Box<dyn InsightExporter> {
Box::new(self.clone())
}
}
#[derive(Clone, Debug)]
pub struct WebhookConfig {
pub url: String,
pub auth_header: Option<String>,
pub headers: Vec<(String, String)>,
pub batch_size: usize,
pub timeout_secs: u64,
}
impl WebhookConfig {
pub fn new(url: impl Into<String>) -> Self {
Self {
url: url.into(),
auth_header: None,
headers: Vec::new(),
batch_size: 100,
timeout_secs: 30,
}
}
pub fn auth(mut self, value: impl Into<String>) -> Self {
self.auth_header = Some(value.into());
self
}
pub fn header(mut self, name: impl Into<String>, value: impl Into<String>) -> Self {
self.headers.push((name.into(), value.into()));
self
}
pub fn batch_size(mut self, size: usize) -> Self {
self.batch_size = size;
self
}
pub fn timeout(mut self, secs: u64) -> Self {
self.timeout_secs = secs;
self
}
}
#[derive(Clone)]
pub struct WebhookExporter {
config: WebhookConfig,
buffer: Arc<Mutex<Vec<InsightData>>>,
#[cfg(feature = "webhook")]
sender: tokio::sync::mpsc::Sender<Vec<InsightData>>,
#[cfg(not(feature = "webhook"))]
_marker: std::marker::PhantomData<()>,
}
impl WebhookExporter {
pub fn new(config: WebhookConfig) -> Self {
#[cfg(feature = "webhook")]
{
let (tx, mut rx) = tokio::sync::mpsc::channel::<Vec<InsightData>>(100);
let config_clone = config.clone();
std::thread::spawn(move || {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
let client = reqwest::Client::builder()
.timeout(std::time::Duration::from_secs(config_clone.timeout_secs))
.build()
.expect("Failed to build HTTP client");
rt.block_on(async move {
while let Some(insights) = rx.recv().await {
let mut request = client.post(&config_clone.url).json(&insights);
if let Some(ref auth_value) = config_clone.auth_header {
request = request.header("Authorization", auth_value);
}
for (k, v) in &config_clone.headers {
request = request.header(k, v);
}
match request.send().await {
Ok(response) => {
if !response.status().is_success() {
tracing::error!(
"Webhook returned status {}",
response.status()
);
}
}
Err(e) => {
tracing::error!("Webhook error: {}", e);
}
}
}
});
});
Self {
config,
buffer: Arc::new(Mutex::new(Vec::new())),
sender: tx,
}
}
#[cfg(not(feature = "webhook"))]
Self {
config,
buffer: Arc::new(Mutex::new(Vec::new())),
_marker: std::marker::PhantomData,
}
}
#[cfg(feature = "webhook")]
fn send_insights(&self, insights: &[InsightData]) -> ExportResult<()> {
match self.sender.try_send(insights.to_vec()) {
Ok(_) => Ok(()),
Err(tokio::sync::mpsc::error::TrySendError::Full(_)) => {
tracing::warn!("Webhook exporter channel full, dropping batch");
Ok(())
}
Err(tokio::sync::mpsc::error::TrySendError::Closed(_)) => Err(
ExportError::Unavailable("Webhook worker channel closed".to_string()),
),
}
}
#[cfg(not(feature = "webhook"))]
fn send_insights(&self, insights: &[InsightData]) -> ExportResult<()> {
let json = serde_json::to_string(insights)?;
tracing::debug!(
url = %self.config.url,
count = insights.len(),
size = json.len(),
"Would send insights to webhook (enable 'webhook' feature for actual HTTP)"
);
Ok(())
}
}
impl InsightExporter for WebhookExporter {
fn export(&self, insight: &InsightData) -> ExportResult<()> {
let mut buffer = self
.buffer
.lock()
.map_err(|e| ExportError::Unavailable(e.to_string()))?;
buffer.push(insight.clone());
if buffer.len() >= self.config.batch_size {
let to_send: Vec<_> = buffer.drain(..).collect();
drop(buffer); self.send_insights(&to_send)?;
}
Ok(())
}
fn export_batch(&self, insights: &[InsightData]) -> ExportResult<()> {
for chunk in insights.chunks(self.config.batch_size) {
self.send_insights(chunk)?;
}
Ok(())
}
fn flush(&self) -> ExportResult<()> {
let mut buffer = self
.buffer
.lock()
.map_err(|e| ExportError::Unavailable(e.to_string()))?;
if !buffer.is_empty() {
let to_send: Vec<_> = buffer.drain(..).collect();
drop(buffer);
self.send_insights(&to_send)?;
}
Ok(())
}
fn clone_exporter(&self) -> Box<dyn InsightExporter> {
Box::new(self.clone())
}
}
#[derive(Default)]
pub struct CompositeExporter {
exporters: Vec<Box<dyn InsightExporter>>,
}
impl Clone for CompositeExporter {
fn clone(&self) -> Self {
let exporters = self.exporters.iter().map(|e| e.clone_exporter()).collect();
Self { exporters }
}
}
impl CompositeExporter {
pub fn new() -> Self {
Self {
exporters: Vec::new(),
}
}
pub fn with_exporter<E: InsightExporter>(mut self, exporter: E) -> Self {
self.exporters.push(Box::new(exporter));
self
}
pub fn with_boxed_exporter(mut self, exporter: Box<dyn InsightExporter>) -> Self {
self.exporters.push(exporter);
self
}
}
impl InsightExporter for CompositeExporter {
fn export(&self, insight: &InsightData) -> ExportResult<()> {
for exporter in &self.exporters {
if let Err(e) = exporter.export(insight) {
tracing::warn!(error = %e, "Export failed for one sink");
}
}
Ok(())
}
fn export_batch(&self, insights: &[InsightData]) -> ExportResult<()> {
for exporter in &self.exporters {
if let Err(e) = exporter.export_batch(insights) {
tracing::warn!(error = %e, "Batch export failed for one sink");
}
}
Ok(())
}
fn flush(&self) -> ExportResult<()> {
for exporter in &self.exporters {
if let Err(e) = exporter.flush() {
tracing::warn!(error = %e, "Flush failed for one sink");
}
}
Ok(())
}
fn close(&self) -> ExportResult<()> {
for exporter in &self.exporters {
if let Err(e) = exporter.close() {
tracing::warn!(error = %e, "Close failed for one sink");
}
}
Ok(())
}
fn clone_exporter(&self) -> Box<dyn InsightExporter> {
let exporters: Vec<_> = self.exporters.iter().map(|e| e.clone_exporter()).collect();
Box::new(CompositeExporter { exporters })
}
}
pub struct CallbackExporter<F>
where
F: Fn(&InsightData) + Send + Sync + 'static,
{
callback: Arc<F>,
}
impl<F> CallbackExporter<F>
where
F: Fn(&InsightData) + Send + Sync + 'static,
{
pub fn new(callback: F) -> Self {
Self {
callback: Arc::new(callback),
}
}
}
impl<F> Clone for CallbackExporter<F>
where
F: Fn(&InsightData) + Send + Sync + 'static,
{
fn clone(&self) -> Self {
Self {
callback: self.callback.clone(),
}
}
}
impl<F> InsightExporter for CallbackExporter<F>
where
F: Fn(&InsightData) + Send + Sync + 'static,
{
fn export(&self, insight: &InsightData) -> ExportResult<()> {
(self.callback)(insight);
Ok(())
}
fn clone_exporter(&self) -> Box<dyn InsightExporter> {
Box::new(self.clone())
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::Duration;
use tempfile::tempdir;
fn create_test_insight() -> InsightData {
InsightData::new("test-123", "GET", "/users")
.with_status(200)
.with_duration(Duration::from_millis(42))
}
#[test]
fn test_file_exporter() {
let dir = tempdir().unwrap();
let path = dir.path().join("test.jsonl");
let exporter = FileExporter::new(&path).unwrap();
exporter.export(&create_test_insight()).unwrap();
exporter.flush().unwrap();
let content = std::fs::read_to_string(&path).unwrap();
assert!(content.contains("test-123"));
assert!(content.contains("GET"));
assert!(content.contains("/users"));
}
#[test]
fn test_file_exporter_batch() {
let dir = tempdir().unwrap();
let path = dir.path().join("batch.jsonl");
let exporter = FileExporter::new(&path).unwrap();
let insights: Vec<_> = (0..5)
.map(|i| InsightData::new(format!("req-{}", i), "GET", "/test"))
.collect();
exporter.export_batch(&insights).unwrap();
exporter.flush().unwrap();
let content = std::fs::read_to_string(&path).unwrap();
let lines: Vec<_> = content.lines().collect();
assert_eq!(lines.len(), 5);
}
#[test]
fn test_callback_exporter() {
let count = Arc::new(AtomicUsize::new(0));
let count_clone = count.clone();
let exporter = CallbackExporter::new(move |_insight| {
count_clone.fetch_add(1, Ordering::SeqCst);
});
exporter.export(&create_test_insight()).unwrap();
exporter.export(&create_test_insight()).unwrap();
assert_eq!(count.load(Ordering::SeqCst), 2);
}
#[test]
fn test_composite_exporter() {
let dir = tempdir().unwrap();
let path = dir.path().join("composite.jsonl");
let count = Arc::new(AtomicUsize::new(0));
let count_clone = count.clone();
let composite = CompositeExporter::new()
.with_exporter(FileExporter::new(&path).unwrap())
.with_exporter(CallbackExporter::new(move |_| {
count_clone.fetch_add(1, Ordering::SeqCst);
}));
composite.export(&create_test_insight()).unwrap();
composite.flush().unwrap();
assert_eq!(count.load(Ordering::SeqCst), 1);
assert!(std::fs::read_to_string(&path).unwrap().contains("test-123"));
}
#[test]
fn test_webhook_config() {
let config = WebhookConfig::new("https://example.com/insights")
.auth("Bearer token")
.header("X-Custom", "value")
.batch_size(50)
.timeout(60);
assert_eq!(config.url, "https://example.com/insights");
assert_eq!(config.auth_header, Some("Bearer token".to_string()));
assert_eq!(config.batch_size, 50);
assert_eq!(config.timeout_secs, 60);
}
}