1use std::str::FromStr;
4use std::sync::Arc;
5
6use serde_json::to_value;
7use tracing::error;
8use uuid::Uuid;
9
10use ironflow_store::audit_log_store::AuditLogStore;
11use ironflow_store::entities::{EventKind, NewAuditLogEntry};
12
13use super::{Event, EventSubscriber, SubscriberFuture};
14
15pub struct AuditLogSubscriber {
35 store: Arc<dyn AuditLogStore>,
36}
37
38impl AuditLogSubscriber {
39 pub fn new(store: Arc<dyn AuditLogStore>) -> Self {
52 Self { store }
53 }
54}
55
56fn extract_run_id(event: &Event) -> Option<Uuid> {
57 match event {
58 Event::RunCreated { run_id, .. }
59 | Event::RunStatusChanged { run_id, .. }
60 | Event::RunFailed { run_id, .. }
61 | Event::StepCompleted { run_id, .. }
62 | Event::StepFailed { run_id, .. }
63 | Event::ApprovalRequested { run_id, .. }
64 | Event::ApprovalGranted { run_id, .. }
65 | Event::ApprovalRejected { run_id, .. }
66 | Event::LogLine { run_id, .. } => Some(*run_id),
67 Event::UserSignedIn { .. } | Event::UserSignedUp { .. } | Event::UserSignedOut { .. } => {
68 None
69 }
70 }
71}
72
73fn extract_step_id(event: &Event) -> Option<Uuid> {
74 match event {
75 Event::StepCompleted { step_id, .. }
76 | Event::StepFailed { step_id, .. }
77 | Event::ApprovalRequested { step_id, .. } => Some(*step_id),
78 _ => None,
79 }
80}
81
82fn extract_user_id(event: &Event) -> Option<Uuid> {
83 match event {
84 Event::UserSignedIn { user_id, .. }
85 | Event::UserSignedUp { user_id, .. }
86 | Event::UserSignedOut { user_id, .. } => Some(*user_id),
87 _ => None,
88 }
89}
90
91impl EventSubscriber for AuditLogSubscriber {
92 fn name(&self) -> &str {
93 "audit_log"
94 }
95
96 fn handle<'a>(&'a self, event: &'a Event) -> SubscriberFuture<'a> {
97 Box::pin(async move {
98 let event_kind = match EventKind::from_str(event.event_type()) {
99 Ok(k) => k,
100 Err(e) => {
101 error!(error = %e, event_type = event.event_type(), "unknown event kind for audit log");
102 return;
103 }
104 };
105
106 let payload = match to_value(event) {
107 Ok(v) => v,
108 Err(e) => {
109 error!(error = %e, event_type = event.event_type(), "failed to serialize event for audit log");
110 return;
111 }
112 };
113
114 let entry = NewAuditLogEntry {
115 event_type: event_kind,
116 payload,
117 run_id: extract_run_id(event),
118 step_id: extract_step_id(event),
119 user_id: extract_user_id(event),
120 };
121
122 if let Err(e) = self.store.append_audit_log(entry).await {
123 error!(error = %e, event_type = event.event_type(), "failed to persist audit log entry");
124 }
125 })
126 }
127}
128
129#[cfg(test)]
130mod tests {
131 use std::sync::Arc;
132 use std::time::Duration;
133
134 use chrono::Utc;
135 use rust_decimal::Decimal;
136 use uuid::Uuid;
137
138 use ironflow_store::audit_log_store::AuditLogStore;
139 use ironflow_store::entities::{AuditLogFilter, EventKind};
140 use ironflow_store::memory::InMemoryStore;
141 use ironflow_store::models::RunStatus;
142
143 use super::*;
144 use crate::notify::{EventPublisher, EventSubscriber};
145
146 fn sample_run_status_changed() -> Event {
147 Event::RunStatusChanged {
148 run_id: Uuid::now_v7(),
149 workflow_name: "deploy".to_string(),
150 from: RunStatus::Running,
151 to: RunStatus::Completed,
152 error: None,
153 cost_usd: Decimal::new(42, 2),
154 duration_ms: 5000,
155 at: Utc::now(),
156 }
157 }
158
159 fn sample_user_signed_in() -> Event {
160 Event::UserSignedIn {
161 user_id: Uuid::now_v7(),
162 username: "alice".to_string(),
163 at: Utc::now(),
164 }
165 }
166
167 fn sample_step_failed() -> Event {
168 Event::StepFailed {
169 run_id: Uuid::now_v7(),
170 step_id: Uuid::now_v7(),
171 step_name: "build".to_string(),
172 kind: ironflow_store::models::StepKind::Shell,
173 error: "exit code 1".to_string(),
174 at: Utc::now(),
175 }
176 }
177
178 #[test]
179 fn name_is_audit_log() {
180 let store = Arc::new(InMemoryStore::new());
181 let subscriber = AuditLogSubscriber::new(store);
182 assert_eq!(subscriber.name(), "audit_log");
183 }
184
185 #[test]
186 fn extract_run_id_from_run_event() {
187 let event = sample_run_status_changed();
188 assert!(extract_run_id(&event).is_some());
189 }
190
191 #[test]
192 fn extract_run_id_from_user_event_is_none() {
193 let event = sample_user_signed_in();
194 assert!(extract_run_id(&event).is_none());
195 }
196
197 #[test]
198 fn extract_step_id_from_step_event() {
199 let event = sample_step_failed();
200 assert!(extract_step_id(&event).is_some());
201 }
202
203 #[test]
204 fn extract_step_id_from_run_event_is_none() {
205 let event = sample_run_status_changed();
206 assert!(extract_step_id(&event).is_none());
207 }
208
209 #[test]
210 fn extract_user_id_from_user_event() {
211 let event = sample_user_signed_in();
212 assert!(extract_user_id(&event).is_some());
213 }
214
215 #[test]
216 fn extract_user_id_from_run_event_is_none() {
217 let event = sample_run_status_changed();
218 assert!(extract_user_id(&event).is_none());
219 }
220
221 #[tokio::test]
222 async fn handle_persists_event() {
223 let store = Arc::new(InMemoryStore::new());
224 let subscriber = AuditLogSubscriber::new(store.clone());
225
226 let event = sample_run_status_changed();
227 subscriber.handle(&event).await;
228
229 let page = store
230 .list_audit_logs(AuditLogFilter::default(), 1, 20)
231 .await
232 .unwrap();
233
234 assert_eq!(page.items.len(), 1);
235 assert_eq!(page.items[0].event_type, EventKind::RunStatusChanged);
236 assert!(page.items[0].run_id.is_some());
237 assert!(page.items[0].step_id.is_none());
238 assert!(page.items[0].user_id.is_none());
239 }
240
241 #[tokio::test]
242 async fn handle_persists_step_event_with_ids() {
243 let store = Arc::new(InMemoryStore::new());
244 let subscriber = AuditLogSubscriber::new(store.clone());
245
246 let event = sample_step_failed();
247 subscriber.handle(&event).await;
248
249 let page = store
250 .list_audit_logs(AuditLogFilter::default(), 1, 20)
251 .await
252 .unwrap();
253
254 assert_eq!(page.items.len(), 1);
255 assert_eq!(page.items[0].event_type, EventKind::StepFailed);
256 assert!(page.items[0].run_id.is_some());
257 assert!(page.items[0].step_id.is_some());
258 }
259
260 #[tokio::test]
261 async fn handle_persists_user_event_with_user_id() {
262 let store = Arc::new(InMemoryStore::new());
263 let subscriber = AuditLogSubscriber::new(store.clone());
264
265 let event = sample_user_signed_in();
266 subscriber.handle(&event).await;
267
268 let page = store
269 .list_audit_logs(AuditLogFilter::default(), 1, 20)
270 .await
271 .unwrap();
272
273 assert_eq!(page.items.len(), 1);
274 assert_eq!(page.items[0].event_type, EventKind::UserSignedIn);
275 assert!(page.items[0].user_id.is_some());
276 assert!(page.items[0].run_id.is_none());
277 }
278
279 #[tokio::test]
280 async fn publisher_dispatches_to_audit_log_subscriber() {
281 let store = Arc::new(InMemoryStore::new());
282 let mut publisher = EventPublisher::new();
283
284 publisher.subscribe(AuditLogSubscriber::new(store.clone()), Event::ALL);
285
286 publisher.publish(sample_run_status_changed());
287 publisher.publish(sample_user_signed_in());
288 publisher.publish(sample_step_failed());
289
290 tokio::time::sleep(Duration::from_millis(100)).await;
291
292 let page = store
293 .list_audit_logs(AuditLogFilter::default(), 1, 20)
294 .await
295 .unwrap();
296
297 assert_eq!(page.items.len(), 3);
298 }
299
300 #[tokio::test]
301 async fn full_event_payload_is_preserved() {
302 let store = Arc::new(InMemoryStore::new());
303 let subscriber = AuditLogSubscriber::new(store.clone());
304
305 let run_id = Uuid::now_v7();
306 let event = Event::RunFailed {
307 run_id,
308 workflow_name: "deploy".to_string(),
309 error: Some("step crashed".to_string()),
310 cost_usd: Decimal::new(10, 2),
311 duration_ms: 3000,
312 at: Utc::now(),
313 };
314 subscriber.handle(&event).await;
315
316 let page = store
317 .list_audit_logs(AuditLogFilter::default(), 1, 20)
318 .await
319 .unwrap();
320
321 let payload = &page.items[0].payload;
322 assert_eq!(payload["type"], "run_failed");
323 assert_eq!(payload["workflow_name"], "deploy");
324 assert_eq!(payload["error"], "step crashed");
325 }
326}