Skip to main content

awsim_core/
request_event.rs

1use serde::Serialize;
2use tokio::sync::broadcast;
3
4#[derive(Debug, Clone, Serialize)]
5pub struct RequestEvent {
6    pub id: String,
7    pub ts: f64,
8    pub method: String,
9    pub path: String,
10    pub service: String,
11    pub operation: Option<String>,
12    pub account_id: String,
13    pub region: String,
14    pub principal_arn: Option<String>,
15    pub status_code: u16,
16    pub duration_ms: f64,
17    pub request_size: u64,
18    pub response_size: u64,
19    pub error_code: Option<String>,
20    /// Lambda-style memory size in MB, populated when the responding
21    /// service sets the `X-Awsim-Memory-MB` header on its response.
22    /// Used by the billing meter for accurate GB-second compute cost
23    /// (otherwise it falls back to the 128 MB minimum).
24    pub memory_mb: Option<u32>,
25    /// Number of state transitions executed by the responding service
26    /// for this request. Step Functions emits this so the meter can
27    /// charge per-transition (the actual AWS billing unit) instead of
28    /// per-StartExecution call. None for non-stateful services.
29    pub state_transitions: Option<u32>,
30    /// Number of characters in the request payload. Polly /
31    /// Comprehend / Translate emit this so the meter can charge
32    /// per-character (the AWS billing unit for these services). None
33    /// for services that don't bill per character.
34    pub character_count: Option<u64>,
35}
36
37#[derive(Clone, Debug)]
38pub struct RequestEventBus {
39    sender: broadcast::Sender<RequestEvent>,
40}
41
42impl RequestEventBus {
43    pub fn new() -> Self {
44        let (sender, _) = broadcast::channel(256);
45        Self { sender }
46    }
47
48    pub fn publish(&self, event: RequestEvent) {
49        let _ = self.sender.send(event);
50    }
51
52    pub fn subscribe(&self) -> broadcast::Receiver<RequestEvent> {
53        self.sender.subscribe()
54    }
55
56    pub fn sender(&self) -> &broadcast::Sender<RequestEvent> {
57        &self.sender
58    }
59}
60
61impl Default for RequestEventBus {
62    fn default() -> Self {
63        Self::new()
64    }
65}
66
67#[cfg(test)]
68mod tests {
69    use super::*;
70
71    #[tokio::test]
72    async fn broadcast_round_trip() {
73        let bus = RequestEventBus::new();
74        let mut rx = bus.subscribe();
75        let event = RequestEvent {
76            id: "req-1".to_string(),
77            ts: 1735041600.123,
78            method: "POST".to_string(),
79            path: "/".to_string(),
80            service: "s3".to_string(),
81            operation: Some("PutObject".to_string()),
82            account_id: "000000000000".to_string(),
83            region: "us-east-1".to_string(),
84            principal_arn: None,
85            status_code: 200,
86            duration_ms: 12.5,
87            request_size: 1024,
88            response_size: 256,
89            error_code: None,
90            memory_mb: None,
91            state_transitions: None,
92            character_count: None,
93        };
94        bus.publish(event.clone());
95        let received = rx.recv().await.expect("receive event");
96        assert_eq!(received.id, event.id);
97        assert_eq!(received.service, "s3");
98        assert_eq!(received.operation.as_deref(), Some("PutObject"));
99        assert_eq!(received.status_code, 200);
100    }
101}