use azeventhubs::{
producer::{EventHubProducerClient, EventHubProducerClientOptions, SendEventOptions},
BasicRetryPolicy,
};
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::collections::HashMap;
use tokio::sync::Mutex;
use tracing::{debug, error};
use crate::pipeline::{PipelineSender, SendResult};
#[derive(Clone)]
pub struct EventHubConfig {
pub connection_string: String,
pub hub_name: String,
}
impl EventHubConfig {
pub fn from_env() -> Result<Self, String> {
Ok(Self {
connection_string: std::env::var("EVENTHUB_CONNECTION_STRING")
.map_err(|_| "EVENTHUB_CONNECTION_STRING not set")?,
hub_name: std::env::var("EVENTHUB_NAME")
.unwrap_or_else(|_| "otlp-ingestion".to_string()),
})
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
struct EventEnvelope {
signal_type: String,
table: String,
payload: Value,
}
impl EventEnvelope {
fn new(table: &str, payload: Value) -> Self {
let signal_type = match table {
"logs" => "logs",
"traces" => "traces",
"gauge" => "metrics_gauge",
"sum" => "metrics_sum",
_ => table,
};
Self {
signal_type: signal_type.to_string(),
table: table.to_string(),
payload,
}
}
}
pub struct EventHubSender {
producer: Mutex<EventHubProducerClient<BasicRetryPolicy>>,
}
impl EventHubSender {
pub async fn new(config: EventHubConfig) -> Result<Self, String> {
let producer = EventHubProducerClient::new_from_connection_string(
&config.connection_string,
config.hub_name,
EventHubProducerClientOptions::default(),
)
.await
.map_err(|e| format!("Failed to create Event Hub producer: {:?}", e))?;
Ok(Self {
producer: Mutex::new(producer),
})
}
async fn send_batch(&self, table: &str, records: Vec<Value>) -> Result<usize, String> {
let count = records.len();
let mut producer = self.producer.lock().await;
for record in records {
let envelope = EventEnvelope::new(table, record);
let json = serde_json::to_vec(&envelope)
.map_err(|e| format!("JSON serialization failed: {}", e))?;
producer
.send_event(json, SendEventOptions::default())
.await
.map_err(|e| format!("Event Hub send failed: {:?}", e))?;
}
debug!(table = table, count = count, "Sent events to Event Hub");
Ok(count)
}
}
#[async_trait::async_trait]
impl PipelineSender for EventHubSender {
async fn send_all(&self, grouped: HashMap<String, Vec<Value>>) -> SendResult {
let mut result = SendResult::default();
for (table, records) in grouped {
match self.send_batch(&table, records).await {
Ok(count) => {
result.succeeded.insert(table, count);
}
Err(e) => {
error!(table = %table, error = %e, "Failed to send to Event Hub");
result.failed.insert(table, e);
}
}
}
result
}
}