1use once_cell::sync::Lazy;
34use serde::{Deserialize, Serialize};
35use std::sync::RwLock;
36use std::time::Duration;
37
38#[derive(Debug, Clone, Serialize, Deserialize)]
40#[serde(rename_all = "lowercase")]
41pub enum EventLevel {
42 Debug,
43 Info,
44 Warn,
45 Error,
46}
47
48#[derive(Debug, Clone, Serialize, Deserialize)]
50pub struct Event {
51 pub service: String,
53 pub level: EventLevel,
55 pub category: String,
57 pub message: String,
59 #[serde(skip_serializing_if = "Option::is_none")]
61 pub timestamp: Option<u64>,
62 #[serde(skip_serializing_if = "Option::is_none")]
64 pub metadata: Option<serde_json::Value>,
65}
66
67#[derive(Debug, Deserialize)]
69pub struct EventResponse {
70 pub success: bool,
71 #[serde(skip_serializing_if = "Option::is_none")]
72 pub message_id: Option<String>,
73 #[serde(skip_serializing_if = "Option::is_none")]
74 pub error: Option<String>,
75}
76
77#[derive(Debug, Clone)]
79pub struct Config {
80 pub server_url: String,
81 pub timeout: Duration,
82 pub max_retries: usize,
83}
84
85impl Default for Config {
86 fn default() -> Self {
87 Self {
88 server_url: std::env::var("EVENT_BUS_URL")
89 .unwrap_or_else(|_| "http://localhost:8508".to_string()),
90 timeout: Duration::from_secs(5),
91 max_retries: 3,
92 }
93 }
94}
95
96static CONFIG: Lazy<RwLock<Config>> = Lazy::new(|| RwLock::new(Config::default()));
98
99pub fn configure(
120 server_url: Option<&str>,
121 timeout_secs: Option<u64>,
122 max_retries: Option<usize>,
123) -> Result<(), String> {
124 let mut config = CONFIG
125 .write()
126 .map_err(|e| format!("Failed to acquire config lock: {}", e))?;
127
128 if let Some(url) = server_url {
129 config.server_url = url.to_string();
130 }
131
132 if let Some(timeout) = timeout_secs {
133 config.timeout = Duration::from_secs(timeout);
134 }
135
136 if let Some(retries) = max_retries {
137 config.max_retries = retries;
138 }
139
140 log::debug!("Event bus configured: {:?}", *config);
141
142 Ok(())
143}
144
145pub fn get_config() -> Result<Config, String> {
147 CONFIG
148 .read()
149 .map(|c| c.clone())
150 .map_err(|e| format!("Failed to read config: {}", e))
151}
152
153pub async fn dispatch_event(mut event: Event) -> Result<EventResponse, String> {
188 let config = get_config()?;
189
190 if event.timestamp.is_none() {
192 event.timestamp = Some(
193 std::time::SystemTime::now()
194 .duration_since(std::time::UNIX_EPOCH)
195 .map_err(|e| format!("Failed to get timestamp: {}", e))?
196 .as_millis() as u64,
197 );
198 }
199
200 let client = reqwest::Client::builder()
201 .timeout(config.timeout)
202 .build()
203 .map_err(|e| format!("Failed to create HTTP client: {}", e))?;
204
205 let url = format!("{}/events", config.server_url);
206
207 let mut last_error = String::new();
208
209 for attempt in 1..=config.max_retries {
211 log::debug!(
212 "Dispatching event (attempt {}/{}): service={} level={:?} category={}",
213 attempt,
214 config.max_retries,
215 event.service,
216 event.level,
217 event.category
218 );
219
220 match client.post(&url).json(&event).send().await {
221 Ok(response) => {
222 if response.status().is_success() {
223 match response.json::<EventResponse>().await {
224 Ok(event_response) => {
225 log::debug!("Event dispatched successfully: {:?}", event_response.message_id);
226 return Ok(event_response);
227 }
228 Err(e) => {
229 last_error = format!("Failed to parse response: {}", e);
230 log::warn!("Attempt {}: {}", attempt, last_error);
231 }
232 }
233 } else {
234 last_error = format!("HTTP {}: {}", response.status(), response.text().await.unwrap_or_default());
235 log::warn!("Attempt {}: {}", attempt, last_error);
236 }
237 }
238 Err(e) => {
239 last_error = format!("Request failed: {}", e);
240 log::warn!("Attempt {}: {}", attempt, last_error);
241 }
242 }
243
244 if attempt < config.max_retries {
246 let backoff = Duration::from_millis(2_u64.pow((attempt - 1) as u32) * 1000);
247 log::debug!("Retrying in {:?}...", backoff);
248 tokio::time::sleep(backoff).await;
249 }
250 }
251
252 Err(format!("All {} attempts failed. Last error: {}", config.max_retries, last_error))
253}
254
255pub fn dispatch_event_async(event: Event) {
276 tokio::spawn(async move {
277 if let Err(e) = dispatch_event(event).await {
278 log::error!("Async event dispatch failed: {}", e);
279 }
280 });
281}
282
283#[cfg(test)]
284mod tests {
285 use super::*;
286
287 #[test]
288 fn test_config_default() {
289 let config = Config::default();
290 assert!(config.server_url.contains("localhost") || config.server_url.contains("8508"));
291 assert_eq!(config.timeout, Duration::from_secs(5));
292 assert_eq!(config.max_retries, 3);
293 }
294
295 #[test]
296 fn test_configure() {
297 let result = configure(
298 Some("http://test.example.com:8508"),
299 Some(10),
300 Some(5),
301 );
302 assert!(result.is_ok());
303
304 let config = get_config().unwrap();
305 assert_eq!(config.server_url, "http://test.example.com:8508");
306 assert_eq!(config.timeout, Duration::from_secs(10));
307 assert_eq!(config.max_retries, 5);
308 }
309
310 #[test]
311 fn test_event_serialization() {
312 let event = Event {
313 service: "test".to_string(),
314 level: EventLevel::Info,
315 category: "test_category".to_string(),
316 message: "Test message".to_string(),
317 timestamp: Some(1234567890),
318 metadata: Some(serde_json::json!({"key": "value"})),
319 };
320
321 let json = serde_json::to_string(&event).unwrap();
322 assert!(json.contains("test"));
323 assert!(json.contains("info"));
324 assert!(json.contains("test_category"));
325 }
326}