Skip to main content

things3_cli/events/
filter.rs

1use chrono::{DateTime, Utc};
2use serde::{Deserialize, Serialize};
3use things3_core::ThingsId;
4use tokio::sync::broadcast;
5use uuid::Uuid;
6
7use super::types::{Event, EventType};
8
9/// Event filter for subscriptions
10#[derive(Debug, Clone, Serialize, Deserialize, Default)]
11pub struct EventFilter {
12    pub event_types: Option<Vec<EventType>>,
13    pub entity_ids: Option<Vec<ThingsId>>,
14    pub sources: Option<Vec<String>>,
15    pub since: Option<DateTime<Utc>>,
16}
17
18impl EventFilter {
19    /// Check if an event matches this filter
20    #[must_use]
21    pub fn matches(&self, event: &Event) -> bool {
22        // Check event types
23        if let Some(ref types) = self.event_types {
24            if !types
25                .iter()
26                .any(|t| std::mem::discriminant(t) == std::mem::discriminant(&event.event_type))
27            {
28                return false;
29            }
30        }
31
32        // Check entity IDs (applies to task/project/area events; progress events have no entity ID)
33        if let Some(ref ids) = self.entity_ids {
34            let event_entity_id: Option<&ThingsId> = match &event.event_type {
35                EventType::TaskCreated { task_id }
36                | EventType::TaskUpdated { task_id }
37                | EventType::TaskDeleted { task_id }
38                | EventType::TaskCompleted { task_id }
39                | EventType::TaskCancelled { task_id } => Some(task_id),
40                EventType::ProjectCreated { project_id }
41                | EventType::ProjectUpdated { project_id }
42                | EventType::ProjectDeleted { project_id }
43                | EventType::ProjectCompleted { project_id } => Some(project_id),
44                EventType::AreaCreated { area_id }
45                | EventType::AreaUpdated { area_id }
46                | EventType::AreaDeleted { area_id } => Some(area_id),
47                EventType::ProgressStarted { .. }
48                | EventType::ProgressUpdated { .. }
49                | EventType::ProgressCompleted { .. }
50                | EventType::ProgressFailed { .. } => None,
51            };
52
53            if let Some(entity_id) = event_entity_id {
54                if !ids.contains(entity_id) {
55                    return false;
56                }
57            }
58        }
59
60        // Check sources
61        if let Some(ref sources) = self.sources {
62            if !sources.contains(&event.source) {
63                return false;
64            }
65        }
66
67        // Check timestamp
68        if let Some(since) = self.since {
69            if event.timestamp < since {
70                return false;
71            }
72        }
73
74        true
75    }
76}
77
78/// Event subscription
79#[derive(Debug, Clone)]
80pub struct EventSubscription {
81    pub id: Uuid,
82    pub filter: EventFilter,
83    pub sender: broadcast::Sender<Event>,
84}
85
86#[cfg(test)]
87mod tests {
88    use super::*;
89    use crate::events::{Event, EventType};
90    use chrono::Utc;
91    use things3_core::ThingsId;
92    use tokio::sync::broadcast;
93    use uuid::Uuid;
94
95    #[test]
96    fn test_event_filter_matching() {
97        let task_id = ThingsId::new_v4();
98        let event = Event {
99            id: Uuid::new_v4(),
100            event_type: EventType::TaskCreated {
101                task_id: task_id.clone(),
102            },
103            timestamp: Utc::now(),
104            data: None,
105            source: "test".to_string(),
106        };
107
108        let filter = EventFilter {
109            event_types: Some(vec![EventType::TaskCreated {
110                task_id: ThingsId::new_v4(),
111            }]),
112            entity_ids: None,
113            sources: None,
114            since: None,
115        };
116
117        // Should match event type
118        assert!(filter.matches(&event));
119
120        let filter_no_match = EventFilter {
121            event_types: Some(vec![EventType::TaskUpdated {
122                task_id: ThingsId::new_v4(),
123            }]),
124            entity_ids: None,
125            sources: None,
126            since: None,
127        };
128
129        // Should not match different event type
130        assert!(!filter_no_match.matches(&event));
131    }
132
133    #[test]
134    fn test_event_filter_entity_ids() {
135        let task_id = ThingsId::new_v4();
136        let event = Event {
137            id: Uuid::new_v4(),
138            event_type: EventType::TaskCreated {
139                task_id: task_id.clone(),
140            },
141            timestamp: Utc::now(),
142            data: None,
143            source: "test".to_string(),
144        };
145
146        let filter = EventFilter {
147            event_types: None,
148            entity_ids: Some(vec![task_id]),
149            sources: None,
150            since: None,
151        };
152
153        assert!(filter.matches(&event));
154
155        let filter_no_match = EventFilter {
156            event_types: None,
157            entity_ids: Some(vec![ThingsId::new_v4()]),
158            sources: None,
159            since: None,
160        };
161
162        assert!(!filter_no_match.matches(&event));
163    }
164
165    #[test]
166    fn test_event_filter_sources() {
167        let event = Event {
168            id: Uuid::new_v4(),
169            event_type: EventType::TaskCreated {
170                task_id: ThingsId::new_v4(),
171            },
172            timestamp: Utc::now(),
173            data: None,
174            source: "test_source".to_string(),
175        };
176
177        let filter = EventFilter {
178            event_types: None,
179            entity_ids: None,
180            sources: Some(vec!["test_source".to_string()]),
181            since: None,
182        };
183
184        assert!(filter.matches(&event));
185
186        let filter_no_match = EventFilter {
187            event_types: None,
188            entity_ids: None,
189            sources: Some(vec!["other_source".to_string()]),
190            since: None,
191        };
192
193        assert!(!filter_no_match.matches(&event));
194    }
195
196    #[test]
197    fn test_event_filter_timestamp() {
198        let now = Utc::now();
199        let past = now - chrono::Duration::hours(1);
200        let future = now + chrono::Duration::hours(1);
201
202        let event = Event {
203            id: Uuid::new_v4(),
204            event_type: EventType::TaskCreated {
205                task_id: ThingsId::new_v4(),
206            },
207            timestamp: now,
208            data: None,
209            source: "test".to_string(),
210        };
211
212        let filter = EventFilter {
213            event_types: None,
214            entity_ids: None,
215            sources: None,
216            since: Some(past),
217        };
218
219        assert!(filter.matches(&event));
220
221        let filter_no_match = EventFilter {
222            event_types: None,
223            entity_ids: None,
224            sources: None,
225            since: Some(future),
226        };
227
228        assert!(!filter_no_match.matches(&event));
229    }
230
231    #[test]
232    fn test_event_filter_all_event_types() {
233        let task_id = ThingsId::new_v4();
234        let project_id = ThingsId::new_v4();
235        let area_id = ThingsId::new_v4();
236        let operation_id = Uuid::new_v4();
237
238        let events = vec![
239            Event {
240                id: Uuid::new_v4(),
241                event_type: EventType::TaskCreated {
242                    task_id: task_id.clone(),
243                },
244                timestamp: Utc::now(),
245                data: None,
246                source: "test".to_string(),
247            },
248            Event {
249                id: Uuid::new_v4(),
250                event_type: EventType::ProjectCreated {
251                    project_id: project_id.clone(),
252                },
253                timestamp: Utc::now(),
254                data: None,
255                source: "test".to_string(),
256            },
257            Event {
258                id: Uuid::new_v4(),
259                event_type: EventType::AreaCreated {
260                    area_id: area_id.clone(),
261                },
262                timestamp: Utc::now(),
263                data: None,
264                source: "test".to_string(),
265            },
266            Event {
267                id: Uuid::new_v4(),
268                event_type: EventType::ProgressStarted { operation_id },
269                timestamp: Utc::now(),
270                data: None,
271                source: "test".to_string(),
272            },
273        ];
274
275        for event in events {
276            let filter = EventFilter {
277                event_types: None,
278                entity_ids: None,
279                sources: None,
280                since: None,
281            };
282            assert!(filter.matches(&event));
283        }
284    }
285
286    #[test]
287    fn test_event_filter_entity_id_extraction() {
288        let task_id = ThingsId::new_v4();
289        let project_id = ThingsId::new_v4();
290        let area_id = ThingsId::new_v4();
291        let operation_id = Uuid::new_v4();
292
293        let events: Vec<(EventType, Option<ThingsId>)> = vec![
294            (
295                EventType::TaskCreated {
296                    task_id: task_id.clone(),
297                },
298                Some(task_id.clone()),
299            ),
300            (
301                EventType::TaskUpdated {
302                    task_id: task_id.clone(),
303                },
304                Some(task_id.clone()),
305            ),
306            (
307                EventType::TaskDeleted {
308                    task_id: task_id.clone(),
309                },
310                Some(task_id.clone()),
311            ),
312            (
313                EventType::TaskCompleted {
314                    task_id: task_id.clone(),
315                },
316                Some(task_id.clone()),
317            ),
318            (
319                EventType::TaskCancelled {
320                    task_id: task_id.clone(),
321                },
322                Some(task_id.clone()),
323            ),
324            (
325                EventType::ProjectCreated {
326                    project_id: project_id.clone(),
327                },
328                Some(project_id.clone()),
329            ),
330            (
331                EventType::ProjectUpdated {
332                    project_id: project_id.clone(),
333                },
334                Some(project_id.clone()),
335            ),
336            (
337                EventType::ProjectDeleted {
338                    project_id: project_id.clone(),
339                },
340                Some(project_id.clone()),
341            ),
342            (
343                EventType::ProjectCompleted {
344                    project_id: project_id.clone(),
345                },
346                Some(project_id.clone()),
347            ),
348            (
349                EventType::AreaCreated {
350                    area_id: area_id.clone(),
351                },
352                Some(area_id.clone()),
353            ),
354            (
355                EventType::AreaUpdated {
356                    area_id: area_id.clone(),
357                },
358                Some(area_id.clone()),
359            ),
360            (
361                EventType::AreaDeleted {
362                    area_id: area_id.clone(),
363                },
364                Some(area_id.clone()),
365            ),
366            (EventType::ProgressStarted { operation_id }, None),
367            (EventType::ProgressUpdated { operation_id }, None),
368            (EventType::ProgressCompleted { operation_id }, None),
369            (EventType::ProgressFailed { operation_id }, None),
370        ];
371
372        for (event_type, expected_id) in events {
373            let event = Event {
374                id: Uuid::new_v4(),
375                event_type,
376                timestamp: Utc::now(),
377                data: None,
378                source: "test".to_string(),
379            };
380
381            let filter = EventFilter {
382                event_types: None,
383                entity_ids: expected_id.map(|id| vec![id]),
384                sources: None,
385                since: None,
386            };
387
388            assert!(filter.matches(&event));
389        }
390    }
391
392    #[test]
393    fn test_event_filter_creation() {
394        let filter = EventFilter {
395            event_types: Some(vec![EventType::TaskCreated {
396                task_id: ThingsId::new_v4(),
397            }]),
398            entity_ids: Some(vec![ThingsId::new_v4()]),
399            sources: Some(vec!["test".to_string()]),
400            since: Some(Utc::now()),
401        };
402
403        assert!(filter.event_types.is_some());
404        assert!(filter.entity_ids.is_some());
405        assert!(filter.sources.is_some());
406        assert!(filter.since.is_some());
407    }
408
409    #[test]
410    fn test_event_filter_serialization() {
411        let filter = EventFilter {
412            event_types: Some(vec![EventType::TaskCreated {
413                task_id: ThingsId::new_v4(),
414            }]),
415            entity_ids: Some(vec![ThingsId::new_v4()]),
416            sources: Some(vec!["test".to_string()]),
417            since: Some(Utc::now()),
418        };
419
420        let json = serde_json::to_string(&filter).unwrap();
421        let deserialized: EventFilter = serde_json::from_str(&json).unwrap();
422
423        assert_eq!(filter.event_types, deserialized.event_types);
424        assert_eq!(filter.entity_ids, deserialized.entity_ids);
425        assert_eq!(filter.sources, deserialized.sources);
426    }
427
428    #[tokio::test]
429    async fn test_event_filter_serialization_roundtrip() {
430        let original_filter = EventFilter {
431            event_types: Some(vec![
432                EventType::TaskCreated {
433                    task_id: ThingsId::new_v4(),
434                },
435                EventType::ProjectCreated {
436                    project_id: ThingsId::new_v4(),
437                },
438            ]),
439            entity_ids: Some(vec![ThingsId::new_v4(), ThingsId::new_v4()]),
440            sources: Some(vec![
441                "test_source".to_string(),
442                "another_source".to_string(),
443            ]),
444            since: Some(Utc::now()),
445        };
446
447        // Serialize to JSON
448        let json = serde_json::to_string(&original_filter).unwrap();
449
450        // Deserialize back to EventFilter
451        let deserialized_filter: EventFilter = serde_json::from_str(&json).unwrap();
452
453        assert_eq!(original_filter.event_types, deserialized_filter.event_types);
454        assert_eq!(original_filter.entity_ids, deserialized_filter.entity_ids);
455        assert_eq!(original_filter.sources, deserialized_filter.sources);
456        assert_eq!(original_filter.since, deserialized_filter.since);
457    }
458
459    #[tokio::test]
460    async fn test_event_filter_matching_with_timestamp() {
461        let filter = EventFilter {
462            event_types: Some(vec![EventType::TaskCreated {
463                task_id: ThingsId::new_v4(),
464            }]),
465            entity_ids: None,
466            sources: None,
467            since: Some(Utc::now() - chrono::Duration::hours(1)),
468        };
469
470        let event = Event {
471            event_type: EventType::TaskCreated {
472                task_id: ThingsId::new_v4(),
473            },
474            id: Uuid::new_v4(),
475            source: "test".to_string(),
476            timestamp: Utc::now(),
477            data: None,
478        };
479
480        assert!(filter.matches(&event));
481    }
482
483    #[tokio::test]
484    async fn test_event_filter_matching_with_sources() {
485        let filter = EventFilter {
486            event_types: None,
487            entity_ids: None,
488            sources: Some(vec!["test_source".to_string()]),
489            since: None,
490        };
491
492        let event = Event {
493            event_type: EventType::TaskCreated {
494                task_id: ThingsId::new_v4(),
495            },
496            id: Uuid::new_v4(),
497            source: "test_source".to_string(),
498            timestamp: Utc::now(),
499            data: None,
500        };
501
502        assert!(filter.matches(&event));
503    }
504
505    #[tokio::test]
506    async fn test_event_filter_matching_with_entity_ids() {
507        let entity_id = ThingsId::new_v4();
508        let filter = EventFilter {
509            event_types: None,
510            entity_ids: Some(vec![entity_id.clone()]),
511            sources: None,
512            since: None,
513        };
514
515        let event = Event {
516            event_type: EventType::TaskCreated { task_id: entity_id },
517            id: Uuid::new_v4(),
518            source: "test".to_string(),
519            timestamp: Utc::now(),
520            data: None,
521        };
522
523        assert!(filter.matches(&event));
524    }
525
526    #[tokio::test]
527    async fn test_event_filter_matching_no_match() {
528        let filter = EventFilter {
529            event_types: Some(vec![EventType::TaskCreated {
530                task_id: ThingsId::new_v4(),
531            }]),
532            entity_ids: None,
533            sources: None,
534            since: None,
535        };
536
537        let event = Event {
538            event_type: EventType::ProjectCreated {
539                project_id: ThingsId::new_v4(),
540            },
541            id: Uuid::new_v4(),
542            source: "test".to_string(),
543            timestamp: Utc::now(),
544            data: None,
545        };
546
547        assert!(!filter.matches(&event));
548    }
549
550    #[tokio::test]
551    async fn test_event_filter_matching_empty_filter() {
552        let filter = EventFilter {
553            event_types: None,
554            entity_ids: None,
555            sources: None,
556            since: None,
557        };
558
559        let event = Event {
560            event_type: EventType::TaskCreated {
561                task_id: ThingsId::new_v4(),
562            },
563            id: Uuid::new_v4(),
564            source: "test".to_string(),
565            timestamp: Utc::now(),
566            data: None,
567        };
568
569        // Empty filter should match all events
570        assert!(filter.matches(&event));
571    }
572
573    #[test]
574    fn test_event_subscription_creation() {
575        let subscription_id = Uuid::new_v4();
576        let filter = EventFilter {
577            event_types: None,
578            entity_ids: None,
579            sources: None,
580            since: None,
581        };
582        let (sender, _receiver) = broadcast::channel(100);
583
584        let subscription = EventSubscription {
585            id: subscription_id,
586            filter,
587            sender,
588        };
589
590        assert_eq!(subscription.id, subscription_id);
591    }
592}