Skip to main content

ormdb_server/pubsub/
subscription.rs

1//! Subscription tracking and filtering.
2
3use std::time::Instant;
4
5use ormdb_proto::query::Filter;
6
7/// A filter for subscriptions.
8#[derive(Debug, Clone)]
9pub struct SubscriptionFilter {
10    /// Optional filter expression.
11    pub filter: Option<Filter>,
12    /// Fields to include in change events.
13    pub fields: Option<Vec<String>>,
14    /// Whether to include related entity changes.
15    pub include_relations: bool,
16}
17
18impl SubscriptionFilter {
19    /// Create a new subscription filter.
20    pub fn new() -> Self {
21        Self {
22            filter: None,
23            fields: None,
24            include_relations: false,
25        }
26    }
27
28    /// Create a filter from a protocol subscription.
29    pub fn from_subscription(sub: &ormdb_proto::Subscription) -> Self {
30        Self {
31            filter: sub.filter.clone(),
32            fields: sub.fields.clone(),
33            include_relations: sub.include_relations,
34        }
35    }
36}
37
38impl Default for SubscriptionFilter {
39    fn default() -> Self {
40        Self::new()
41    }
42}
43
44/// A subscription entry tracking an active subscription.
45#[derive(Debug, Clone)]
46pub struct SubscriptionEntry {
47    /// Unique subscription ID.
48    pub id: u64,
49    /// Client identifier.
50    pub client_id: String,
51    /// Entity type being watched.
52    pub entity: String,
53    /// Filter for this subscription.
54    pub filter: SubscriptionFilter,
55    /// When the subscription was created.
56    pub created_at: Instant,
57    /// Number of events sent to this subscription.
58    pub events_sent: u64,
59}
60
61impl SubscriptionEntry {
62    /// Create a new subscription entry.
63    pub fn new(
64        id: u64,
65        client_id: impl Into<String>,
66        entity: impl Into<String>,
67        filter: SubscriptionFilter,
68    ) -> Self {
69        Self {
70            id,
71            client_id: client_id.into(),
72            entity: entity.into(),
73            filter,
74            created_at: Instant::now(),
75            events_sent: 0,
76        }
77    }
78
79    /// Get the age of this subscription.
80    pub fn age(&self) -> std::time::Duration {
81        self.created_at.elapsed()
82    }
83
84    /// Increment the events sent counter.
85    pub fn record_event(&mut self) {
86        self.events_sent += 1;
87    }
88}
89
90#[cfg(test)]
91mod tests {
92    use super::*;
93
94    #[test]
95    fn test_subscription_entry() {
96        let filter = SubscriptionFilter::new();
97        let entry = SubscriptionEntry::new(1, "client-123", "User", filter);
98
99        assert_eq!(entry.id, 1);
100        assert_eq!(entry.client_id, "client-123");
101        assert_eq!(entry.entity, "User");
102        assert_eq!(entry.events_sent, 0);
103    }
104
105    #[test]
106    fn test_subscription_filter() {
107        let filter = SubscriptionFilter {
108            filter: None,
109            fields: Some(vec!["id".to_string(), "name".to_string()]),
110            include_relations: true,
111        };
112
113        assert!(filter.fields.is_some());
114        assert!(filter.include_relations);
115    }
116}