1use chrono::{DateTime, Utc};
2use serde::{Deserialize, Serialize};
3use things3_core::ThingsId;
4use tokio::sync::broadcast;
5use uuid::Uuid;
6
7use super::types::{Event, EventType};
8
9#[derive(Debug, Clone, Serialize, Deserialize, Default)]
11pub struct EventFilter {
12 pub event_types: Option<Vec<EventType>>,
13 pub entity_ids: Option<Vec<ThingsId>>,
14 pub sources: Option<Vec<String>>,
15 pub since: Option<DateTime<Utc>>,
16}
17
18impl EventFilter {
19 #[must_use]
21 pub fn matches(&self, event: &Event) -> bool {
22 if let Some(ref types) = self.event_types {
24 if !types
25 .iter()
26 .any(|t| std::mem::discriminant(t) == std::mem::discriminant(&event.event_type))
27 {
28 return false;
29 }
30 }
31
32 if let Some(ref ids) = self.entity_ids {
34 let event_entity_id: Option<&ThingsId> = match &event.event_type {
35 EventType::TaskCreated { task_id }
36 | EventType::TaskUpdated { task_id }
37 | EventType::TaskDeleted { task_id }
38 | EventType::TaskCompleted { task_id }
39 | EventType::TaskCancelled { task_id } => Some(task_id),
40 EventType::ProjectCreated { project_id }
41 | EventType::ProjectUpdated { project_id }
42 | EventType::ProjectDeleted { project_id }
43 | EventType::ProjectCompleted { project_id } => Some(project_id),
44 EventType::AreaCreated { area_id }
45 | EventType::AreaUpdated { area_id }
46 | EventType::AreaDeleted { area_id } => Some(area_id),
47 EventType::ProgressStarted { .. }
48 | EventType::ProgressUpdated { .. }
49 | EventType::ProgressCompleted { .. }
50 | EventType::ProgressFailed { .. } => None,
51 };
52
53 if let Some(entity_id) = event_entity_id {
54 if !ids.contains(entity_id) {
55 return false;
56 }
57 }
58 }
59
60 if let Some(ref sources) = self.sources {
62 if !sources.contains(&event.source) {
63 return false;
64 }
65 }
66
67 if let Some(since) = self.since {
69 if event.timestamp < since {
70 return false;
71 }
72 }
73
74 true
75 }
76}
77
78#[derive(Debug, Clone)]
80pub struct EventSubscription {
81 pub id: Uuid,
82 pub filter: EventFilter,
83 pub sender: broadcast::Sender<Event>,
84}
85
86#[cfg(test)]
87mod tests {
88 use super::*;
89 use crate::events::{Event, EventType};
90 use chrono::Utc;
91 use things3_core::ThingsId;
92 use tokio::sync::broadcast;
93 use uuid::Uuid;
94
95 #[test]
96 fn test_event_filter_matching() {
97 let task_id = ThingsId::new_v4();
98 let event = Event {
99 id: Uuid::new_v4(),
100 event_type: EventType::TaskCreated {
101 task_id: task_id.clone(),
102 },
103 timestamp: Utc::now(),
104 data: None,
105 source: "test".to_string(),
106 };
107
108 let filter = EventFilter {
109 event_types: Some(vec![EventType::TaskCreated {
110 task_id: ThingsId::new_v4(),
111 }]),
112 entity_ids: None,
113 sources: None,
114 since: None,
115 };
116
117 assert!(filter.matches(&event));
119
120 let filter_no_match = EventFilter {
121 event_types: Some(vec![EventType::TaskUpdated {
122 task_id: ThingsId::new_v4(),
123 }]),
124 entity_ids: None,
125 sources: None,
126 since: None,
127 };
128
129 assert!(!filter_no_match.matches(&event));
131 }
132
133 #[test]
134 fn test_event_filter_entity_ids() {
135 let task_id = ThingsId::new_v4();
136 let event = Event {
137 id: Uuid::new_v4(),
138 event_type: EventType::TaskCreated {
139 task_id: task_id.clone(),
140 },
141 timestamp: Utc::now(),
142 data: None,
143 source: "test".to_string(),
144 };
145
146 let filter = EventFilter {
147 event_types: None,
148 entity_ids: Some(vec![task_id]),
149 sources: None,
150 since: None,
151 };
152
153 assert!(filter.matches(&event));
154
155 let filter_no_match = EventFilter {
156 event_types: None,
157 entity_ids: Some(vec![ThingsId::new_v4()]),
158 sources: None,
159 since: None,
160 };
161
162 assert!(!filter_no_match.matches(&event));
163 }
164
165 #[test]
166 fn test_event_filter_sources() {
167 let event = Event {
168 id: Uuid::new_v4(),
169 event_type: EventType::TaskCreated {
170 task_id: ThingsId::new_v4(),
171 },
172 timestamp: Utc::now(),
173 data: None,
174 source: "test_source".to_string(),
175 };
176
177 let filter = EventFilter {
178 event_types: None,
179 entity_ids: None,
180 sources: Some(vec!["test_source".to_string()]),
181 since: None,
182 };
183
184 assert!(filter.matches(&event));
185
186 let filter_no_match = EventFilter {
187 event_types: None,
188 entity_ids: None,
189 sources: Some(vec!["other_source".to_string()]),
190 since: None,
191 };
192
193 assert!(!filter_no_match.matches(&event));
194 }
195
196 #[test]
197 fn test_event_filter_timestamp() {
198 let now = Utc::now();
199 let past = now - chrono::Duration::hours(1);
200 let future = now + chrono::Duration::hours(1);
201
202 let event = Event {
203 id: Uuid::new_v4(),
204 event_type: EventType::TaskCreated {
205 task_id: ThingsId::new_v4(),
206 },
207 timestamp: now,
208 data: None,
209 source: "test".to_string(),
210 };
211
212 let filter = EventFilter {
213 event_types: None,
214 entity_ids: None,
215 sources: None,
216 since: Some(past),
217 };
218
219 assert!(filter.matches(&event));
220
221 let filter_no_match = EventFilter {
222 event_types: None,
223 entity_ids: None,
224 sources: None,
225 since: Some(future),
226 };
227
228 assert!(!filter_no_match.matches(&event));
229 }
230
231 #[test]
232 fn test_event_filter_all_event_types() {
233 let task_id = ThingsId::new_v4();
234 let project_id = ThingsId::new_v4();
235 let area_id = ThingsId::new_v4();
236 let operation_id = Uuid::new_v4();
237
238 let events = vec![
239 Event {
240 id: Uuid::new_v4(),
241 event_type: EventType::TaskCreated {
242 task_id: task_id.clone(),
243 },
244 timestamp: Utc::now(),
245 data: None,
246 source: "test".to_string(),
247 },
248 Event {
249 id: Uuid::new_v4(),
250 event_type: EventType::ProjectCreated {
251 project_id: project_id.clone(),
252 },
253 timestamp: Utc::now(),
254 data: None,
255 source: "test".to_string(),
256 },
257 Event {
258 id: Uuid::new_v4(),
259 event_type: EventType::AreaCreated {
260 area_id: area_id.clone(),
261 },
262 timestamp: Utc::now(),
263 data: None,
264 source: "test".to_string(),
265 },
266 Event {
267 id: Uuid::new_v4(),
268 event_type: EventType::ProgressStarted { operation_id },
269 timestamp: Utc::now(),
270 data: None,
271 source: "test".to_string(),
272 },
273 ];
274
275 for event in events {
276 let filter = EventFilter {
277 event_types: None,
278 entity_ids: None,
279 sources: None,
280 since: None,
281 };
282 assert!(filter.matches(&event));
283 }
284 }
285
286 #[test]
287 fn test_event_filter_entity_id_extraction() {
288 let task_id = ThingsId::new_v4();
289 let project_id = ThingsId::new_v4();
290 let area_id = ThingsId::new_v4();
291 let operation_id = Uuid::new_v4();
292
293 let events: Vec<(EventType, Option<ThingsId>)> = vec![
294 (
295 EventType::TaskCreated {
296 task_id: task_id.clone(),
297 },
298 Some(task_id.clone()),
299 ),
300 (
301 EventType::TaskUpdated {
302 task_id: task_id.clone(),
303 },
304 Some(task_id.clone()),
305 ),
306 (
307 EventType::TaskDeleted {
308 task_id: task_id.clone(),
309 },
310 Some(task_id.clone()),
311 ),
312 (
313 EventType::TaskCompleted {
314 task_id: task_id.clone(),
315 },
316 Some(task_id.clone()),
317 ),
318 (
319 EventType::TaskCancelled {
320 task_id: task_id.clone(),
321 },
322 Some(task_id.clone()),
323 ),
324 (
325 EventType::ProjectCreated {
326 project_id: project_id.clone(),
327 },
328 Some(project_id.clone()),
329 ),
330 (
331 EventType::ProjectUpdated {
332 project_id: project_id.clone(),
333 },
334 Some(project_id.clone()),
335 ),
336 (
337 EventType::ProjectDeleted {
338 project_id: project_id.clone(),
339 },
340 Some(project_id.clone()),
341 ),
342 (
343 EventType::ProjectCompleted {
344 project_id: project_id.clone(),
345 },
346 Some(project_id.clone()),
347 ),
348 (
349 EventType::AreaCreated {
350 area_id: area_id.clone(),
351 },
352 Some(area_id.clone()),
353 ),
354 (
355 EventType::AreaUpdated {
356 area_id: area_id.clone(),
357 },
358 Some(area_id.clone()),
359 ),
360 (
361 EventType::AreaDeleted {
362 area_id: area_id.clone(),
363 },
364 Some(area_id.clone()),
365 ),
366 (EventType::ProgressStarted { operation_id }, None),
367 (EventType::ProgressUpdated { operation_id }, None),
368 (EventType::ProgressCompleted { operation_id }, None),
369 (EventType::ProgressFailed { operation_id }, None),
370 ];
371
372 for (event_type, expected_id) in events {
373 let event = Event {
374 id: Uuid::new_v4(),
375 event_type,
376 timestamp: Utc::now(),
377 data: None,
378 source: "test".to_string(),
379 };
380
381 let filter = EventFilter {
382 event_types: None,
383 entity_ids: expected_id.map(|id| vec![id]),
384 sources: None,
385 since: None,
386 };
387
388 assert!(filter.matches(&event));
389 }
390 }
391
392 #[test]
393 fn test_event_filter_creation() {
394 let filter = EventFilter {
395 event_types: Some(vec![EventType::TaskCreated {
396 task_id: ThingsId::new_v4(),
397 }]),
398 entity_ids: Some(vec![ThingsId::new_v4()]),
399 sources: Some(vec!["test".to_string()]),
400 since: Some(Utc::now()),
401 };
402
403 assert!(filter.event_types.is_some());
404 assert!(filter.entity_ids.is_some());
405 assert!(filter.sources.is_some());
406 assert!(filter.since.is_some());
407 }
408
409 #[test]
410 fn test_event_filter_serialization() {
411 let filter = EventFilter {
412 event_types: Some(vec![EventType::TaskCreated {
413 task_id: ThingsId::new_v4(),
414 }]),
415 entity_ids: Some(vec![ThingsId::new_v4()]),
416 sources: Some(vec!["test".to_string()]),
417 since: Some(Utc::now()),
418 };
419
420 let json = serde_json::to_string(&filter).unwrap();
421 let deserialized: EventFilter = serde_json::from_str(&json).unwrap();
422
423 assert_eq!(filter.event_types, deserialized.event_types);
424 assert_eq!(filter.entity_ids, deserialized.entity_ids);
425 assert_eq!(filter.sources, deserialized.sources);
426 }
427
428 #[tokio::test]
429 async fn test_event_filter_serialization_roundtrip() {
430 let original_filter = EventFilter {
431 event_types: Some(vec![
432 EventType::TaskCreated {
433 task_id: ThingsId::new_v4(),
434 },
435 EventType::ProjectCreated {
436 project_id: ThingsId::new_v4(),
437 },
438 ]),
439 entity_ids: Some(vec![ThingsId::new_v4(), ThingsId::new_v4()]),
440 sources: Some(vec![
441 "test_source".to_string(),
442 "another_source".to_string(),
443 ]),
444 since: Some(Utc::now()),
445 };
446
447 let json = serde_json::to_string(&original_filter).unwrap();
449
450 let deserialized_filter: EventFilter = serde_json::from_str(&json).unwrap();
452
453 assert_eq!(original_filter.event_types, deserialized_filter.event_types);
454 assert_eq!(original_filter.entity_ids, deserialized_filter.entity_ids);
455 assert_eq!(original_filter.sources, deserialized_filter.sources);
456 assert_eq!(original_filter.since, deserialized_filter.since);
457 }
458
459 #[tokio::test]
460 async fn test_event_filter_matching_with_timestamp() {
461 let filter = EventFilter {
462 event_types: Some(vec![EventType::TaskCreated {
463 task_id: ThingsId::new_v4(),
464 }]),
465 entity_ids: None,
466 sources: None,
467 since: Some(Utc::now() - chrono::Duration::hours(1)),
468 };
469
470 let event = Event {
471 event_type: EventType::TaskCreated {
472 task_id: ThingsId::new_v4(),
473 },
474 id: Uuid::new_v4(),
475 source: "test".to_string(),
476 timestamp: Utc::now(),
477 data: None,
478 };
479
480 assert!(filter.matches(&event));
481 }
482
483 #[tokio::test]
484 async fn test_event_filter_matching_with_sources() {
485 let filter = EventFilter {
486 event_types: None,
487 entity_ids: None,
488 sources: Some(vec!["test_source".to_string()]),
489 since: None,
490 };
491
492 let event = Event {
493 event_type: EventType::TaskCreated {
494 task_id: ThingsId::new_v4(),
495 },
496 id: Uuid::new_v4(),
497 source: "test_source".to_string(),
498 timestamp: Utc::now(),
499 data: None,
500 };
501
502 assert!(filter.matches(&event));
503 }
504
505 #[tokio::test]
506 async fn test_event_filter_matching_with_entity_ids() {
507 let entity_id = ThingsId::new_v4();
508 let filter = EventFilter {
509 event_types: None,
510 entity_ids: Some(vec![entity_id.clone()]),
511 sources: None,
512 since: None,
513 };
514
515 let event = Event {
516 event_type: EventType::TaskCreated { task_id: entity_id },
517 id: Uuid::new_v4(),
518 source: "test".to_string(),
519 timestamp: Utc::now(),
520 data: None,
521 };
522
523 assert!(filter.matches(&event));
524 }
525
526 #[tokio::test]
527 async fn test_event_filter_matching_no_match() {
528 let filter = EventFilter {
529 event_types: Some(vec![EventType::TaskCreated {
530 task_id: ThingsId::new_v4(),
531 }]),
532 entity_ids: None,
533 sources: None,
534 since: None,
535 };
536
537 let event = Event {
538 event_type: EventType::ProjectCreated {
539 project_id: ThingsId::new_v4(),
540 },
541 id: Uuid::new_v4(),
542 source: "test".to_string(),
543 timestamp: Utc::now(),
544 data: None,
545 };
546
547 assert!(!filter.matches(&event));
548 }
549
550 #[tokio::test]
551 async fn test_event_filter_matching_empty_filter() {
552 let filter = EventFilter {
553 event_types: None,
554 entity_ids: None,
555 sources: None,
556 since: None,
557 };
558
559 let event = Event {
560 event_type: EventType::TaskCreated {
561 task_id: ThingsId::new_v4(),
562 },
563 id: Uuid::new_v4(),
564 source: "test".to_string(),
565 timestamp: Utc::now(),
566 data: None,
567 };
568
569 assert!(filter.matches(&event));
571 }
572
573 #[test]
574 fn test_event_subscription_creation() {
575 let subscription_id = Uuid::new_v4();
576 let filter = EventFilter {
577 event_types: None,
578 entity_ids: None,
579 sources: None,
580 since: None,
581 };
582 let (sender, _receiver) = broadcast::channel(100);
583
584 let subscription = EventSubscription {
585 id: subscription_id,
586 filter,
587 sender,
588 };
589
590 assert_eq!(subscription.id, subscription_id);
591 }
592}