1use std::str::FromStr;
4use std::sync::Arc;
5
6use serde_json::to_value;
7use tracing::error;
8use uuid::Uuid;
9
10use ironflow_store::audit_log_store::AuditLogStore;
11use ironflow_store::entities::{EventKind, NewAuditLogEntry};
12
13use super::{Event, EventSubscriber, SubscriberFuture};
14
15pub struct AuditLogSubscriber {
35 store: Arc<dyn AuditLogStore>,
36}
37
38impl AuditLogSubscriber {
39 pub fn new(store: Arc<dyn AuditLogStore>) -> Self {
52 Self { store }
53 }
54}
55
56fn extract_run_id(event: &Event) -> Option<Uuid> {
57 match event {
58 Event::RunCreated { run_id, .. }
59 | Event::RunStatusChanged { run_id, .. }
60 | Event::RunFailed { run_id, .. }
61 | Event::StepCompleted { run_id, .. }
62 | Event::StepFailed { run_id, .. }
63 | Event::ApprovalRequested { run_id, .. }
64 | Event::ApprovalGranted { run_id, .. }
65 | Event::ApprovalRejected { run_id, .. } => Some(*run_id),
66 Event::UserSignedIn { .. } | Event::UserSignedUp { .. } | Event::UserSignedOut { .. } => {
67 None
68 }
69 }
70}
71
72fn extract_step_id(event: &Event) -> Option<Uuid> {
73 match event {
74 Event::StepCompleted { step_id, .. }
75 | Event::StepFailed { step_id, .. }
76 | Event::ApprovalRequested { step_id, .. } => Some(*step_id),
77 _ => None,
78 }
79}
80
81fn extract_user_id(event: &Event) -> Option<Uuid> {
82 match event {
83 Event::UserSignedIn { user_id, .. }
84 | Event::UserSignedUp { user_id, .. }
85 | Event::UserSignedOut { user_id, .. } => Some(*user_id),
86 _ => None,
87 }
88}
89
90impl EventSubscriber for AuditLogSubscriber {
91 fn name(&self) -> &str {
92 "audit_log"
93 }
94
95 fn handle<'a>(&'a self, event: &'a Event) -> SubscriberFuture<'a> {
96 Box::pin(async move {
97 let event_kind = match EventKind::from_str(event.event_type()) {
98 Ok(k) => k,
99 Err(e) => {
100 error!(error = %e, event_type = event.event_type(), "unknown event kind for audit log");
101 return;
102 }
103 };
104
105 let payload = match to_value(event) {
106 Ok(v) => v,
107 Err(e) => {
108 error!(error = %e, event_type = event.event_type(), "failed to serialize event for audit log");
109 return;
110 }
111 };
112
113 let entry = NewAuditLogEntry {
114 event_type: event_kind,
115 payload,
116 run_id: extract_run_id(event),
117 step_id: extract_step_id(event),
118 user_id: extract_user_id(event),
119 };
120
121 if let Err(e) = self.store.append_audit_log(entry).await {
122 error!(error = %e, event_type = event.event_type(), "failed to persist audit log entry");
123 }
124 })
125 }
126}
127
128#[cfg(test)]
129mod tests {
130 use std::sync::Arc;
131 use std::time::Duration;
132
133 use chrono::Utc;
134 use rust_decimal::Decimal;
135 use uuid::Uuid;
136
137 use ironflow_store::audit_log_store::AuditLogStore;
138 use ironflow_store::entities::{AuditLogFilter, EventKind};
139 use ironflow_store::memory::InMemoryStore;
140 use ironflow_store::models::RunStatus;
141
142 use super::*;
143 use crate::notify::{EventPublisher, EventSubscriber};
144
145 fn sample_run_status_changed() -> Event {
146 Event::RunStatusChanged {
147 run_id: Uuid::now_v7(),
148 workflow_name: "deploy".to_string(),
149 from: RunStatus::Running,
150 to: RunStatus::Completed,
151 error: None,
152 cost_usd: Decimal::new(42, 2),
153 duration_ms: 5000,
154 at: Utc::now(),
155 }
156 }
157
158 fn sample_user_signed_in() -> Event {
159 Event::UserSignedIn {
160 user_id: Uuid::now_v7(),
161 username: "alice".to_string(),
162 at: Utc::now(),
163 }
164 }
165
166 fn sample_step_failed() -> Event {
167 Event::StepFailed {
168 run_id: Uuid::now_v7(),
169 step_id: Uuid::now_v7(),
170 step_name: "build".to_string(),
171 kind: ironflow_store::models::StepKind::Shell,
172 error: "exit code 1".to_string(),
173 at: Utc::now(),
174 }
175 }
176
177 #[test]
178 fn name_is_audit_log() {
179 let store = Arc::new(InMemoryStore::new());
180 let subscriber = AuditLogSubscriber::new(store);
181 assert_eq!(subscriber.name(), "audit_log");
182 }
183
184 #[test]
185 fn extract_run_id_from_run_event() {
186 let event = sample_run_status_changed();
187 assert!(extract_run_id(&event).is_some());
188 }
189
190 #[test]
191 fn extract_run_id_from_user_event_is_none() {
192 let event = sample_user_signed_in();
193 assert!(extract_run_id(&event).is_none());
194 }
195
196 #[test]
197 fn extract_step_id_from_step_event() {
198 let event = sample_step_failed();
199 assert!(extract_step_id(&event).is_some());
200 }
201
202 #[test]
203 fn extract_step_id_from_run_event_is_none() {
204 let event = sample_run_status_changed();
205 assert!(extract_step_id(&event).is_none());
206 }
207
208 #[test]
209 fn extract_user_id_from_user_event() {
210 let event = sample_user_signed_in();
211 assert!(extract_user_id(&event).is_some());
212 }
213
214 #[test]
215 fn extract_user_id_from_run_event_is_none() {
216 let event = sample_run_status_changed();
217 assert!(extract_user_id(&event).is_none());
218 }
219
220 #[tokio::test]
221 async fn handle_persists_event() {
222 let store = Arc::new(InMemoryStore::new());
223 let subscriber = AuditLogSubscriber::new(store.clone());
224
225 let event = sample_run_status_changed();
226 subscriber.handle(&event).await;
227
228 let page = store
229 .list_audit_logs(AuditLogFilter::default(), 1, 20)
230 .await
231 .unwrap();
232
233 assert_eq!(page.items.len(), 1);
234 assert_eq!(page.items[0].event_type, EventKind::RunStatusChanged);
235 assert!(page.items[0].run_id.is_some());
236 assert!(page.items[0].step_id.is_none());
237 assert!(page.items[0].user_id.is_none());
238 }
239
240 #[tokio::test]
241 async fn handle_persists_step_event_with_ids() {
242 let store = Arc::new(InMemoryStore::new());
243 let subscriber = AuditLogSubscriber::new(store.clone());
244
245 let event = sample_step_failed();
246 subscriber.handle(&event).await;
247
248 let page = store
249 .list_audit_logs(AuditLogFilter::default(), 1, 20)
250 .await
251 .unwrap();
252
253 assert_eq!(page.items.len(), 1);
254 assert_eq!(page.items[0].event_type, EventKind::StepFailed);
255 assert!(page.items[0].run_id.is_some());
256 assert!(page.items[0].step_id.is_some());
257 }
258
259 #[tokio::test]
260 async fn handle_persists_user_event_with_user_id() {
261 let store = Arc::new(InMemoryStore::new());
262 let subscriber = AuditLogSubscriber::new(store.clone());
263
264 let event = sample_user_signed_in();
265 subscriber.handle(&event).await;
266
267 let page = store
268 .list_audit_logs(AuditLogFilter::default(), 1, 20)
269 .await
270 .unwrap();
271
272 assert_eq!(page.items.len(), 1);
273 assert_eq!(page.items[0].event_type, EventKind::UserSignedIn);
274 assert!(page.items[0].user_id.is_some());
275 assert!(page.items[0].run_id.is_none());
276 }
277
278 #[tokio::test]
279 async fn publisher_dispatches_to_audit_log_subscriber() {
280 let store = Arc::new(InMemoryStore::new());
281 let mut publisher = EventPublisher::new();
282
283 publisher.subscribe(AuditLogSubscriber::new(store.clone()), Event::ALL);
284
285 publisher.publish(sample_run_status_changed());
286 publisher.publish(sample_user_signed_in());
287 publisher.publish(sample_step_failed());
288
289 tokio::time::sleep(Duration::from_millis(100)).await;
290
291 let page = store
292 .list_audit_logs(AuditLogFilter::default(), 1, 20)
293 .await
294 .unwrap();
295
296 assert_eq!(page.items.len(), 3);
297 }
298
299 #[tokio::test]
300 async fn full_event_payload_is_preserved() {
301 let store = Arc::new(InMemoryStore::new());
302 let subscriber = AuditLogSubscriber::new(store.clone());
303
304 let run_id = Uuid::now_v7();
305 let event = Event::RunFailed {
306 run_id,
307 workflow_name: "deploy".to_string(),
308 error: Some("step crashed".to_string()),
309 cost_usd: Decimal::new(10, 2),
310 duration_ms: 3000,
311 at: Utc::now(),
312 };
313 subscriber.handle(&event).await;
314
315 let page = store
316 .list_audit_logs(AuditLogFilter::default(), 1, 20)
317 .await
318 .unwrap();
319
320 let payload = &page.items[0].payload;
321 assert_eq!(payload["type"], "run_failed");
322 assert_eq!(payload["workflow_name"], "deploy");
323 assert_eq!(payload["error"], "step crashed");
324 }
325}