Skip to main content

ironflow_engine/notify/
audit_log.rs

1//! [`AuditLogSubscriber`] -- persists every event to an [`AuditLogStore`].
2
3use std::str::FromStr;
4use std::sync::Arc;
5
6use serde_json::to_value;
7use tracing::error;
8use uuid::Uuid;
9
10use ironflow_store::audit_log_store::AuditLogStore;
11use ironflow_store::entities::{EventKind, NewAuditLogEntry};
12
13use super::{Event, EventSubscriber, SubscriberFuture};
14
15/// Subscriber that persists every received event as an audit log entry.
16///
17/// Extracts contextual IDs (run, step, user) from the event payload
18/// for efficient filtering, and stores the full event as JSON.
19///
20/// # Examples
21///
22/// ```no_run
23/// use std::sync::Arc;
24/// use ironflow_engine::notify::{AuditLogSubscriber, Event, EventPublisher};
25/// use ironflow_store::memory::InMemoryStore;
26///
27/// let store = Arc::new(InMemoryStore::new());
28/// let mut publisher = EventPublisher::new();
29/// publisher.subscribe(
30///     AuditLogSubscriber::new(store),
31///     Event::ALL,
32/// );
33/// ```
34pub struct AuditLogSubscriber {
35    store: Arc<dyn AuditLogStore>,
36}
37
38impl AuditLogSubscriber {
39    /// Create a new subscriber backed by the given store.
40    ///
41    /// # Examples
42    ///
43    /// ```
44    /// use std::sync::Arc;
45    /// use ironflow_engine::notify::AuditLogSubscriber;
46    /// use ironflow_store::memory::InMemoryStore;
47    ///
48    /// let store = Arc::new(InMemoryStore::new());
49    /// let subscriber = AuditLogSubscriber::new(store);
50    /// ```
51    pub fn new(store: Arc<dyn AuditLogStore>) -> Self {
52        Self { store }
53    }
54}
55
56fn extract_run_id(event: &Event) -> Option<Uuid> {
57    match event {
58        Event::RunCreated { run_id, .. }
59        | Event::RunStatusChanged { run_id, .. }
60        | Event::RunFailed { run_id, .. }
61        | Event::StepCompleted { run_id, .. }
62        | Event::StepFailed { run_id, .. }
63        | Event::ApprovalRequested { run_id, .. }
64        | Event::ApprovalGranted { run_id, .. }
65        | Event::ApprovalRejected { run_id, .. } => Some(*run_id),
66        Event::UserSignedIn { .. } | Event::UserSignedUp { .. } | Event::UserSignedOut { .. } => {
67            None
68        }
69    }
70}
71
72fn extract_step_id(event: &Event) -> Option<Uuid> {
73    match event {
74        Event::StepCompleted { step_id, .. }
75        | Event::StepFailed { step_id, .. }
76        | Event::ApprovalRequested { step_id, .. } => Some(*step_id),
77        _ => None,
78    }
79}
80
81fn extract_user_id(event: &Event) -> Option<Uuid> {
82    match event {
83        Event::UserSignedIn { user_id, .. }
84        | Event::UserSignedUp { user_id, .. }
85        | Event::UserSignedOut { user_id, .. } => Some(*user_id),
86        _ => None,
87    }
88}
89
90impl EventSubscriber for AuditLogSubscriber {
91    fn name(&self) -> &str {
92        "audit_log"
93    }
94
95    fn handle<'a>(&'a self, event: &'a Event) -> SubscriberFuture<'a> {
96        Box::pin(async move {
97            let event_kind = match EventKind::from_str(event.event_type()) {
98                Ok(k) => k,
99                Err(e) => {
100                    error!(error = %e, event_type = event.event_type(), "unknown event kind for audit log");
101                    return;
102                }
103            };
104
105            let payload = match to_value(event) {
106                Ok(v) => v,
107                Err(e) => {
108                    error!(error = %e, event_type = event.event_type(), "failed to serialize event for audit log");
109                    return;
110                }
111            };
112
113            let entry = NewAuditLogEntry {
114                event_type: event_kind,
115                payload,
116                run_id: extract_run_id(event),
117                step_id: extract_step_id(event),
118                user_id: extract_user_id(event),
119            };
120
121            if let Err(e) = self.store.append_audit_log(entry).await {
122                error!(error = %e, event_type = event.event_type(), "failed to persist audit log entry");
123            }
124        })
125    }
126}
127
128#[cfg(test)]
129mod tests {
130    use std::sync::Arc;
131    use std::time::Duration;
132
133    use chrono::Utc;
134    use rust_decimal::Decimal;
135    use uuid::Uuid;
136
137    use ironflow_store::audit_log_store::AuditLogStore;
138    use ironflow_store::entities::{AuditLogFilter, EventKind};
139    use ironflow_store::memory::InMemoryStore;
140    use ironflow_store::models::RunStatus;
141
142    use super::*;
143    use crate::notify::{EventPublisher, EventSubscriber};
144
145    fn sample_run_status_changed() -> Event {
146        Event::RunStatusChanged {
147            run_id: Uuid::now_v7(),
148            workflow_name: "deploy".to_string(),
149            from: RunStatus::Running,
150            to: RunStatus::Completed,
151            error: None,
152            cost_usd: Decimal::new(42, 2),
153            duration_ms: 5000,
154            at: Utc::now(),
155        }
156    }
157
158    fn sample_user_signed_in() -> Event {
159        Event::UserSignedIn {
160            user_id: Uuid::now_v7(),
161            username: "alice".to_string(),
162            at: Utc::now(),
163        }
164    }
165
166    fn sample_step_failed() -> Event {
167        Event::StepFailed {
168            run_id: Uuid::now_v7(),
169            step_id: Uuid::now_v7(),
170            step_name: "build".to_string(),
171            kind: ironflow_store::models::StepKind::Shell,
172            error: "exit code 1".to_string(),
173            at: Utc::now(),
174        }
175    }
176
177    #[test]
178    fn name_is_audit_log() {
179        let store = Arc::new(InMemoryStore::new());
180        let subscriber = AuditLogSubscriber::new(store);
181        assert_eq!(subscriber.name(), "audit_log");
182    }
183
184    #[test]
185    fn extract_run_id_from_run_event() {
186        let event = sample_run_status_changed();
187        assert!(extract_run_id(&event).is_some());
188    }
189
190    #[test]
191    fn extract_run_id_from_user_event_is_none() {
192        let event = sample_user_signed_in();
193        assert!(extract_run_id(&event).is_none());
194    }
195
196    #[test]
197    fn extract_step_id_from_step_event() {
198        let event = sample_step_failed();
199        assert!(extract_step_id(&event).is_some());
200    }
201
202    #[test]
203    fn extract_step_id_from_run_event_is_none() {
204        let event = sample_run_status_changed();
205        assert!(extract_step_id(&event).is_none());
206    }
207
208    #[test]
209    fn extract_user_id_from_user_event() {
210        let event = sample_user_signed_in();
211        assert!(extract_user_id(&event).is_some());
212    }
213
214    #[test]
215    fn extract_user_id_from_run_event_is_none() {
216        let event = sample_run_status_changed();
217        assert!(extract_user_id(&event).is_none());
218    }
219
220    #[tokio::test]
221    async fn handle_persists_event() {
222        let store = Arc::new(InMemoryStore::new());
223        let subscriber = AuditLogSubscriber::new(store.clone());
224
225        let event = sample_run_status_changed();
226        subscriber.handle(&event).await;
227
228        let page = store
229            .list_audit_logs(AuditLogFilter::default(), 1, 20)
230            .await
231            .unwrap();
232
233        assert_eq!(page.items.len(), 1);
234        assert_eq!(page.items[0].event_type, EventKind::RunStatusChanged);
235        assert!(page.items[0].run_id.is_some());
236        assert!(page.items[0].step_id.is_none());
237        assert!(page.items[0].user_id.is_none());
238    }
239
240    #[tokio::test]
241    async fn handle_persists_step_event_with_ids() {
242        let store = Arc::new(InMemoryStore::new());
243        let subscriber = AuditLogSubscriber::new(store.clone());
244
245        let event = sample_step_failed();
246        subscriber.handle(&event).await;
247
248        let page = store
249            .list_audit_logs(AuditLogFilter::default(), 1, 20)
250            .await
251            .unwrap();
252
253        assert_eq!(page.items.len(), 1);
254        assert_eq!(page.items[0].event_type, EventKind::StepFailed);
255        assert!(page.items[0].run_id.is_some());
256        assert!(page.items[0].step_id.is_some());
257    }
258
259    #[tokio::test]
260    async fn handle_persists_user_event_with_user_id() {
261        let store = Arc::new(InMemoryStore::new());
262        let subscriber = AuditLogSubscriber::new(store.clone());
263
264        let event = sample_user_signed_in();
265        subscriber.handle(&event).await;
266
267        let page = store
268            .list_audit_logs(AuditLogFilter::default(), 1, 20)
269            .await
270            .unwrap();
271
272        assert_eq!(page.items.len(), 1);
273        assert_eq!(page.items[0].event_type, EventKind::UserSignedIn);
274        assert!(page.items[0].user_id.is_some());
275        assert!(page.items[0].run_id.is_none());
276    }
277
278    #[tokio::test]
279    async fn publisher_dispatches_to_audit_log_subscriber() {
280        let store = Arc::new(InMemoryStore::new());
281        let mut publisher = EventPublisher::new();
282
283        publisher.subscribe(AuditLogSubscriber::new(store.clone()), Event::ALL);
284
285        publisher.publish(sample_run_status_changed());
286        publisher.publish(sample_user_signed_in());
287        publisher.publish(sample_step_failed());
288
289        tokio::time::sleep(Duration::from_millis(100)).await;
290
291        let page = store
292            .list_audit_logs(AuditLogFilter::default(), 1, 20)
293            .await
294            .unwrap();
295
296        assert_eq!(page.items.len(), 3);
297    }
298
299    #[tokio::test]
300    async fn full_event_payload_is_preserved() {
301        let store = Arc::new(InMemoryStore::new());
302        let subscriber = AuditLogSubscriber::new(store.clone());
303
304        let run_id = Uuid::now_v7();
305        let event = Event::RunFailed {
306            run_id,
307            workflow_name: "deploy".to_string(),
308            error: Some("step crashed".to_string()),
309            cost_usd: Decimal::new(10, 2),
310            duration_ms: 3000,
311            at: Utc::now(),
312        };
313        subscriber.handle(&event).await;
314
315        let page = store
316            .list_audit_logs(AuditLogFilter::default(), 1, 20)
317            .await
318            .unwrap();
319
320        let payload = &page.items[0].payload;
321        assert_eq!(payload["type"], "run_failed");
322        assert_eq!(payload["workflow_name"], "deploy");
323        assert_eq!(payload["error"], "step crashed");
324    }
325}