tana_event_bus/
lib.rs

1//! Tana Event Bus - Rust Client Library
2//!
3//! This library provides a simple interface for dispatching events to the Tana Event Bus server.
4//!
5//! # Examples
6//!
7//! ```no_run
8//! use tana_event_bus::{dispatch_event, Event, EventLevel, configure};
9//! use serde_json::json;
10//!
11//! #[tokio::main]
12//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
13//!     // Optional: Configure once at app startup
14//!     configure(Some("http://events.tana.network:8508"), None, None)?;
15//!
16//!     // Dispatch an event
17//!     dispatch_event(Event {
18//!         service: "edge".to_string(),
19//!         level: EventLevel::Info,
20//!         category: "contract_execution".to_string(),
21//!         message: "Contract executed successfully".to_string(),
22//!         timestamp: None, // Will be set automatically
23//!         metadata: Some(json!({
24//!             "contract_id": "abc123",
25//!             "duration_ms": 45
26//!         })),
27//!     }).await?;
28//!
29//!     Ok(())
30//! }
31//! ```
32
33use once_cell::sync::Lazy;
34use serde::{Deserialize, Serialize};
35use std::sync::RwLock;
36use std::time::Duration;
37
38/// Event log levels
39#[derive(Debug, Clone, Serialize, Deserialize)]
40#[serde(rename_all = "lowercase")]
41pub enum EventLevel {
42    Debug,
43    Info,
44    Warn,
45    Error,
46}
47
48/// Core event structure
49#[derive(Debug, Clone, Serialize, Deserialize)]
50pub struct Event {
51    /// Service that emitted the event
52    pub service: String,
53    /// Log level
54    pub level: EventLevel,
55    /// Event category for filtering
56    pub category: String,
57    /// Human-readable event description
58    pub message: String,
59    /// Unix timestamp in milliseconds (will be set automatically if None)
60    #[serde(skip_serializing_if = "Option::is_none")]
61    pub timestamp: Option<u64>,
62    /// Optional event-specific metadata
63    #[serde(skip_serializing_if = "Option::is_none")]
64    pub metadata: Option<serde_json::Value>,
65}
66
67/// Response from the event bus server
68#[derive(Debug, Deserialize)]
69pub struct EventResponse {
70    pub success: bool,
71    #[serde(skip_serializing_if = "Option::is_none")]
72    pub message_id: Option<String>,
73    #[serde(skip_serializing_if = "Option::is_none")]
74    pub error: Option<String>,
75}
76
77/// Client configuration
78#[derive(Debug, Clone)]
79pub struct Config {
80    pub server_url: String,
81    pub timeout: Duration,
82    pub max_retries: usize,
83}
84
85impl Default for Config {
86    fn default() -> Self {
87        Self {
88            server_url: std::env::var("EVENT_BUS_URL")
89                .unwrap_or_else(|_| "http://localhost:8508".to_string()),
90            timeout: Duration::from_secs(5),
91            max_retries: 3,
92        }
93    }
94}
95
96/// Global configuration
97static CONFIG: Lazy<RwLock<Config>> = Lazy::new(|| RwLock::new(Config::default()));
98
99/// Configure the event bus client
100///
101/// # Arguments
102///
103/// * `server_url` - Optional server URL (defaults to EVENT_BUS_URL env or http://localhost:8508)
104/// * `timeout_secs` - Optional timeout in seconds (defaults to 5)
105/// * `max_retries` - Optional max retry attempts (defaults to 3)
106///
107/// # Examples
108///
109/// ```no_run
110/// use tana_event_bus::configure;
111///
112/// // Configure with all options
113/// configure(
114///     Some("http://events.tana.network:8508"),
115///     Some(10),
116///     Some(5)
117/// ).unwrap();
118/// ```
119pub fn configure(
120    server_url: Option<&str>,
121    timeout_secs: Option<u64>,
122    max_retries: Option<usize>,
123) -> Result<(), String> {
124    let mut config = CONFIG
125        .write()
126        .map_err(|e| format!("Failed to acquire config lock: {}", e))?;
127
128    if let Some(url) = server_url {
129        config.server_url = url.to_string();
130    }
131
132    if let Some(timeout) = timeout_secs {
133        config.timeout = Duration::from_secs(timeout);
134    }
135
136    if let Some(retries) = max_retries {
137        config.max_retries = retries;
138    }
139
140    log::debug!("Event bus configured: {:?}", *config);
141
142    Ok(())
143}
144
145/// Get current configuration
146pub fn get_config() -> Result<Config, String> {
147    CONFIG
148        .read()
149        .map(|c| c.clone())
150        .map_err(|e| format!("Failed to read config: {}", e))
151}
152
153/// Dispatch an event to the event bus server
154///
155/// # Arguments
156///
157/// * `event` - The event to dispatch
158///
159/// # Returns
160///
161/// Returns `Ok(EventResponse)` on success, or an error string on failure.
162///
163/// # Examples
164///
165/// ```no_run
166/// use tana_event_bus::{dispatch_event, Event, EventLevel};
167/// use serde_json::json;
168///
169/// #[tokio::main]
170/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
171///     let response = dispatch_event(Event {
172///         service: "ledger".to_string(),
173///         level: EventLevel::Info,
174///         category: "block_production".to_string(),
175///         message: "Block finalized".to_string(),
176///         timestamp: None,
177///         metadata: Some(json!({
178///             "height": 123,
179///             "tx_count": 5
180///         })),
181///     }).await?;
182///
183///     println!("Event dispatched: {:?}", response.message_id);
184///     Ok(())
185/// }
186/// ```
187pub async fn dispatch_event(mut event: Event) -> Result<EventResponse, String> {
188    let config = get_config()?;
189
190    // Set timestamp if not provided
191    if event.timestamp.is_none() {
192        event.timestamp = Some(
193            std::time::SystemTime::now()
194                .duration_since(std::time::UNIX_EPOCH)
195                .map_err(|e| format!("Failed to get timestamp: {}", e))?
196                .as_millis() as u64,
197        );
198    }
199
200    let client = reqwest::Client::builder()
201        .timeout(config.timeout)
202        .build()
203        .map_err(|e| format!("Failed to create HTTP client: {}", e))?;
204
205    let url = format!("{}/events", config.server_url);
206
207    let mut last_error = String::new();
208
209    // Retry loop
210    for attempt in 1..=config.max_retries {
211        log::debug!(
212            "Dispatching event (attempt {}/{}): service={} level={:?} category={}",
213            attempt,
214            config.max_retries,
215            event.service,
216            event.level,
217            event.category
218        );
219
220        match client.post(&url).json(&event).send().await {
221            Ok(response) => {
222                if response.status().is_success() {
223                    match response.json::<EventResponse>().await {
224                        Ok(event_response) => {
225                            log::debug!("Event dispatched successfully: {:?}", event_response.message_id);
226                            return Ok(event_response);
227                        }
228                        Err(e) => {
229                            last_error = format!("Failed to parse response: {}", e);
230                            log::warn!("Attempt {}: {}", attempt, last_error);
231                        }
232                    }
233                } else {
234                    last_error = format!("HTTP {}: {}", response.status(), response.text().await.unwrap_or_default());
235                    log::warn!("Attempt {}: {}", attempt, last_error);
236                }
237            }
238            Err(e) => {
239                last_error = format!("Request failed: {}", e);
240                log::warn!("Attempt {}: {}", attempt, last_error);
241            }
242        }
243
244        // Don't sleep after the last attempt
245        if attempt < config.max_retries {
246            let backoff = Duration::from_millis(2_u64.pow((attempt - 1) as u32) * 1000);
247            log::debug!("Retrying in {:?}...", backoff);
248            tokio::time::sleep(backoff).await;
249        }
250    }
251
252    Err(format!("All {} attempts failed. Last error: {}", config.max_retries, last_error))
253}
254
255/// Fire-and-forget event dispatch (spawns a tokio task, does not wait for response)
256///
257/// Useful for high-frequency events where you don't care about confirmation.
258///
259/// # Examples
260///
261/// ```no_run
262/// use tana_event_bus::{dispatch_event_async, Event, EventLevel};
263///
264/// fn some_function() {
265///     dispatch_event_async(Event {
266///         service: "edge".to_string(),
267///         level: EventLevel::Debug,
268///         category: "performance".to_string(),
269///         message: "Contract execution completed".to_string(),
270///         timestamp: None,
271///         metadata: None,
272///     });
273/// }
274/// ```
275pub fn dispatch_event_async(event: Event) {
276    tokio::spawn(async move {
277        if let Err(e) = dispatch_event(event).await {
278            log::error!("Async event dispatch failed: {}", e);
279        }
280    });
281}
282
283#[cfg(test)]
284mod tests {
285    use super::*;
286
287    #[test]
288    fn test_config_default() {
289        let config = Config::default();
290        assert!(config.server_url.contains("localhost") || config.server_url.contains("8508"));
291        assert_eq!(config.timeout, Duration::from_secs(5));
292        assert_eq!(config.max_retries, 3);
293    }
294
295    #[test]
296    fn test_configure() {
297        let result = configure(
298            Some("http://test.example.com:8508"),
299            Some(10),
300            Some(5),
301        );
302        assert!(result.is_ok());
303
304        let config = get_config().unwrap();
305        assert_eq!(config.server_url, "http://test.example.com:8508");
306        assert_eq!(config.timeout, Duration::from_secs(10));
307        assert_eq!(config.max_retries, 5);
308    }
309
310    #[test]
311    fn test_event_serialization() {
312        let event = Event {
313            service: "test".to_string(),
314            level: EventLevel::Info,
315            category: "test_category".to_string(),
316            message: "Test message".to_string(),
317            timestamp: Some(1234567890),
318            metadata: Some(serde_json::json!({"key": "value"})),
319        };
320
321        let json = serde_json::to_string(&event).unwrap();
322        assert!(json.contains("test"));
323        assert!(json.contains("info"));
324        assert!(json.contains("test_category"));
325    }
326}