Skip to main content

ironflow_engine/notify/
audit_log.rs

1//! [`AuditLogSubscriber`] -- persists every event to an [`AuditLogStore`].
2
3use std::str::FromStr;
4use std::sync::Arc;
5
6use serde_json::to_value;
7use tracing::error;
8use uuid::Uuid;
9
10use ironflow_store::audit_log_store::AuditLogStore;
11use ironflow_store::entities::{EventKind, NewAuditLogEntry};
12
13use super::{Event, EventSubscriber, SubscriberFuture};
14
15/// Subscriber that persists every received event as an audit log entry.
16///
17/// Extracts contextual IDs (run, step, user) from the event payload
18/// for efficient filtering, and stores the full event as JSON.
19///
20/// # Examples
21///
22/// ```no_run
23/// use std::sync::Arc;
24/// use ironflow_engine::notify::{AuditLogSubscriber, Event, EventPublisher};
25/// use ironflow_store::memory::InMemoryStore;
26///
27/// let store = Arc::new(InMemoryStore::new());
28/// let mut publisher = EventPublisher::new();
29/// publisher.subscribe(
30///     AuditLogSubscriber::new(store),
31///     Event::ALL,
32/// );
33/// ```
34pub struct AuditLogSubscriber {
35    store: Arc<dyn AuditLogStore>,
36}
37
38impl AuditLogSubscriber {
39    /// Create a new subscriber backed by the given store.
40    ///
41    /// # Examples
42    ///
43    /// ```
44    /// use std::sync::Arc;
45    /// use ironflow_engine::notify::AuditLogSubscriber;
46    /// use ironflow_store::memory::InMemoryStore;
47    ///
48    /// let store = Arc::new(InMemoryStore::new());
49    /// let subscriber = AuditLogSubscriber::new(store);
50    /// ```
51    pub fn new(store: Arc<dyn AuditLogStore>) -> Self {
52        Self { store }
53    }
54}
55
56fn extract_run_id(event: &Event) -> Option<Uuid> {
57    match event {
58        Event::RunCreated { run_id, .. }
59        | Event::RunStatusChanged { run_id, .. }
60        | Event::RunFailed { run_id, .. }
61        | Event::StepCompleted { run_id, .. }
62        | Event::StepFailed { run_id, .. }
63        | Event::ApprovalRequested { run_id, .. }
64        | Event::ApprovalGranted { run_id, .. }
65        | Event::ApprovalRejected { run_id, .. }
66        | Event::LogLine { run_id, .. } => Some(*run_id),
67        Event::UserSignedIn { .. } | Event::UserSignedUp { .. } | Event::UserSignedOut { .. } => {
68            None
69        }
70    }
71}
72
73fn extract_step_id(event: &Event) -> Option<Uuid> {
74    match event {
75        Event::StepCompleted { step_id, .. }
76        | Event::StepFailed { step_id, .. }
77        | Event::ApprovalRequested { step_id, .. } => Some(*step_id),
78        _ => None,
79    }
80}
81
82fn extract_user_id(event: &Event) -> Option<Uuid> {
83    match event {
84        Event::UserSignedIn { user_id, .. }
85        | Event::UserSignedUp { user_id, .. }
86        | Event::UserSignedOut { user_id, .. } => Some(*user_id),
87        _ => None,
88    }
89}
90
91impl EventSubscriber for AuditLogSubscriber {
92    fn name(&self) -> &str {
93        "audit_log"
94    }
95
96    fn handle<'a>(&'a self, event: &'a Event) -> SubscriberFuture<'a> {
97        Box::pin(async move {
98            let event_kind = match EventKind::from_str(event.event_type()) {
99                Ok(k) => k,
100                Err(e) => {
101                    error!(error = %e, event_type = event.event_type(), "unknown event kind for audit log");
102                    return;
103                }
104            };
105
106            let payload = match to_value(event) {
107                Ok(v) => v,
108                Err(e) => {
109                    error!(error = %e, event_type = event.event_type(), "failed to serialize event for audit log");
110                    return;
111                }
112            };
113
114            let entry = NewAuditLogEntry {
115                event_type: event_kind,
116                payload,
117                run_id: extract_run_id(event),
118                step_id: extract_step_id(event),
119                user_id: extract_user_id(event),
120            };
121
122            if let Err(e) = self.store.append_audit_log(entry).await {
123                error!(error = %e, event_type = event.event_type(), "failed to persist audit log entry");
124            }
125        })
126    }
127}
128
129#[cfg(test)]
130mod tests {
131    use std::sync::Arc;
132    use std::time::Duration;
133
134    use chrono::Utc;
135    use rust_decimal::Decimal;
136    use uuid::Uuid;
137
138    use ironflow_store::audit_log_store::AuditLogStore;
139    use ironflow_store::entities::{AuditLogFilter, EventKind};
140    use ironflow_store::memory::InMemoryStore;
141    use ironflow_store::models::RunStatus;
142
143    use super::*;
144    use crate::notify::{EventPublisher, EventSubscriber};
145
146    fn sample_run_status_changed() -> Event {
147        Event::RunStatusChanged {
148            run_id: Uuid::now_v7(),
149            workflow_name: "deploy".to_string(),
150            from: RunStatus::Running,
151            to: RunStatus::Completed,
152            error: None,
153            cost_usd: Decimal::new(42, 2),
154            duration_ms: 5000,
155            at: Utc::now(),
156        }
157    }
158
159    fn sample_user_signed_in() -> Event {
160        Event::UserSignedIn {
161            user_id: Uuid::now_v7(),
162            username: "alice".to_string(),
163            at: Utc::now(),
164        }
165    }
166
167    fn sample_step_failed() -> Event {
168        Event::StepFailed {
169            run_id: Uuid::now_v7(),
170            step_id: Uuid::now_v7(),
171            step_name: "build".to_string(),
172            kind: ironflow_store::models::StepKind::Shell,
173            error: "exit code 1".to_string(),
174            at: Utc::now(),
175        }
176    }
177
178    #[test]
179    fn name_is_audit_log() {
180        let store = Arc::new(InMemoryStore::new());
181        let subscriber = AuditLogSubscriber::new(store);
182        assert_eq!(subscriber.name(), "audit_log");
183    }
184
185    #[test]
186    fn extract_run_id_from_run_event() {
187        let event = sample_run_status_changed();
188        assert!(extract_run_id(&event).is_some());
189    }
190
191    #[test]
192    fn extract_run_id_from_user_event_is_none() {
193        let event = sample_user_signed_in();
194        assert!(extract_run_id(&event).is_none());
195    }
196
197    #[test]
198    fn extract_step_id_from_step_event() {
199        let event = sample_step_failed();
200        assert!(extract_step_id(&event).is_some());
201    }
202
203    #[test]
204    fn extract_step_id_from_run_event_is_none() {
205        let event = sample_run_status_changed();
206        assert!(extract_step_id(&event).is_none());
207    }
208
209    #[test]
210    fn extract_user_id_from_user_event() {
211        let event = sample_user_signed_in();
212        assert!(extract_user_id(&event).is_some());
213    }
214
215    #[test]
216    fn extract_user_id_from_run_event_is_none() {
217        let event = sample_run_status_changed();
218        assert!(extract_user_id(&event).is_none());
219    }
220
221    #[tokio::test]
222    async fn handle_persists_event() {
223        let store = Arc::new(InMemoryStore::new());
224        let subscriber = AuditLogSubscriber::new(store.clone());
225
226        let event = sample_run_status_changed();
227        subscriber.handle(&event).await;
228
229        let page = store
230            .list_audit_logs(AuditLogFilter::default(), 1, 20)
231            .await
232            .unwrap();
233
234        assert_eq!(page.items.len(), 1);
235        assert_eq!(page.items[0].event_type, EventKind::RunStatusChanged);
236        assert!(page.items[0].run_id.is_some());
237        assert!(page.items[0].step_id.is_none());
238        assert!(page.items[0].user_id.is_none());
239    }
240
241    #[tokio::test]
242    async fn handle_persists_step_event_with_ids() {
243        let store = Arc::new(InMemoryStore::new());
244        let subscriber = AuditLogSubscriber::new(store.clone());
245
246        let event = sample_step_failed();
247        subscriber.handle(&event).await;
248
249        let page = store
250            .list_audit_logs(AuditLogFilter::default(), 1, 20)
251            .await
252            .unwrap();
253
254        assert_eq!(page.items.len(), 1);
255        assert_eq!(page.items[0].event_type, EventKind::StepFailed);
256        assert!(page.items[0].run_id.is_some());
257        assert!(page.items[0].step_id.is_some());
258    }
259
260    #[tokio::test]
261    async fn handle_persists_user_event_with_user_id() {
262        let store = Arc::new(InMemoryStore::new());
263        let subscriber = AuditLogSubscriber::new(store.clone());
264
265        let event = sample_user_signed_in();
266        subscriber.handle(&event).await;
267
268        let page = store
269            .list_audit_logs(AuditLogFilter::default(), 1, 20)
270            .await
271            .unwrap();
272
273        assert_eq!(page.items.len(), 1);
274        assert_eq!(page.items[0].event_type, EventKind::UserSignedIn);
275        assert!(page.items[0].user_id.is_some());
276        assert!(page.items[0].run_id.is_none());
277    }
278
279    #[tokio::test]
280    async fn publisher_dispatches_to_audit_log_subscriber() {
281        let store = Arc::new(InMemoryStore::new());
282        let mut publisher = EventPublisher::new();
283
284        publisher.subscribe(AuditLogSubscriber::new(store.clone()), Event::ALL);
285
286        publisher.publish(sample_run_status_changed());
287        publisher.publish(sample_user_signed_in());
288        publisher.publish(sample_step_failed());
289
290        tokio::time::sleep(Duration::from_millis(100)).await;
291
292        let page = store
293            .list_audit_logs(AuditLogFilter::default(), 1, 20)
294            .await
295            .unwrap();
296
297        assert_eq!(page.items.len(), 3);
298    }
299
300    #[tokio::test]
301    async fn full_event_payload_is_preserved() {
302        let store = Arc::new(InMemoryStore::new());
303        let subscriber = AuditLogSubscriber::new(store.clone());
304
305        let run_id = Uuid::now_v7();
306        let event = Event::RunFailed {
307            run_id,
308            workflow_name: "deploy".to_string(),
309            error: Some("step crashed".to_string()),
310            cost_usd: Decimal::new(10, 2),
311            duration_ms: 3000,
312            at: Utc::now(),
313        };
314        subscriber.handle(&event).await;
315
316        let page = store
317            .list_audit_logs(AuditLogFilter::default(), 1, 20)
318            .await
319            .unwrap();
320
321        let payload = &page.items[0].payload;
322        assert_eq!(payload["type"], "run_failed");
323        assert_eq!(payload["workflow_name"], "deploy");
324        assert_eq!(payload["error"], "step crashed");
325    }
326}