Skip to main content

aegis_streaming/
subscriber.rs

1//! Aegis Streaming Subscribers
2//!
3//! Subscriber management for event subscriptions.
4//!
5//! @version 0.1.0
6//! @author AutomataNexus Development Team
7
8use crate::channel::ChannelId;
9use crate::event::EventFilter;
10use serde::{Deserialize, Serialize};
11use std::collections::HashSet;
12use std::time::{SystemTime, UNIX_EPOCH};
13
14// =============================================================================
15// Subscriber ID
16// =============================================================================
17
18/// Unique identifier for a subscriber.
19#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
20pub struct SubscriberId(pub String);
21
22impl SubscriberId {
23    pub fn new(id: impl Into<String>) -> Self {
24        Self(id.into())
25    }
26
27    pub fn generate() -> Self {
28        let timestamp = SystemTime::now()
29            .duration_since(UNIX_EPOCH)
30            .unwrap_or_default()
31            .as_nanos();
32        Self(format!("sub_{:032x}", timestamp))
33    }
34
35    pub fn as_str(&self) -> &str {
36        &self.0
37    }
38}
39
40impl std::fmt::Display for SubscriberId {
41    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
42        write!(f, "{}", self.0)
43    }
44}
45
46impl From<String> for SubscriberId {
47    fn from(s: String) -> Self {
48        Self(s)
49    }
50}
51
52impl From<&str> for SubscriberId {
53    fn from(s: &str) -> Self {
54        Self(s.to_string())
55    }
56}
57
58// =============================================================================
59// Subscription
60// =============================================================================
61
62/// A subscription to one or more channels.
63#[derive(Debug, Clone, Serialize, Deserialize)]
64pub struct Subscription {
65    pub id: SubscriberId,
66    pub channels: HashSet<ChannelId>,
67    pub filter: Option<EventFilter>,
68    pub created_at: u64,
69    pub active: bool,
70    pub metadata: SubscriptionMetadata,
71}
72
73impl Subscription {
74    /// Create a new subscription.
75    pub fn new(id: impl Into<SubscriberId>) -> Self {
76        Self {
77            id: id.into(),
78            channels: HashSet::new(),
79            filter: None,
80            created_at: current_timestamp(),
81            active: true,
82            metadata: SubscriptionMetadata::default(),
83        }
84    }
85
86    /// Add a channel to the subscription.
87    pub fn add_channel(&mut self, channel: impl Into<ChannelId>) {
88        self.channels.insert(channel.into());
89    }
90
91    /// Remove a channel from the subscription.
92    pub fn remove_channel(&mut self, channel: &ChannelId) {
93        self.channels.remove(channel);
94    }
95
96    /// Set the event filter.
97    pub fn with_filter(mut self, filter: EventFilter) -> Self {
98        self.filter = Some(filter);
99        self
100    }
101
102    /// Check if subscribed to a channel.
103    pub fn is_subscribed_to(&self, channel: &ChannelId) -> bool {
104        self.channels.contains(channel)
105    }
106
107    /// Deactivate the subscription.
108    pub fn deactivate(&mut self) {
109        self.active = false;
110    }
111
112    /// Reactivate the subscription.
113    pub fn activate(&mut self) {
114        self.active = true;
115    }
116}
117
118// =============================================================================
119// Subscription Metadata
120// =============================================================================
121
122/// Metadata for a subscription.
123#[derive(Debug, Clone, Default, Serialize, Deserialize)]
124pub struct SubscriptionMetadata {
125    pub name: Option<String>,
126    pub description: Option<String>,
127    pub tags: Vec<String>,
128    pub delivery_mode: DeliveryMode,
129    pub ack_mode: AckMode,
130}
131
132impl SubscriptionMetadata {
133    pub fn with_name(mut self, name: impl Into<String>) -> Self {
134        self.name = Some(name.into());
135        self
136    }
137
138    pub fn with_description(mut self, description: impl Into<String>) -> Self {
139        self.description = Some(description.into());
140        self
141    }
142
143    pub fn with_tag(mut self, tag: impl Into<String>) -> Self {
144        self.tags.push(tag.into());
145        self
146    }
147
148    pub fn with_delivery_mode(mut self, mode: DeliveryMode) -> Self {
149        self.delivery_mode = mode;
150        self
151    }
152
153    pub fn with_ack_mode(mut self, mode: AckMode) -> Self {
154        self.ack_mode = mode;
155        self
156    }
157}
158
159// =============================================================================
160// Delivery Mode
161// =============================================================================
162
163/// Mode for event delivery.
164#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
165pub enum DeliveryMode {
166    /// Events are delivered at most once.
167    AtMostOnce,
168    /// Events are delivered at least once.
169    #[default]
170    AtLeastOnce,
171    /// Events are delivered exactly once.
172    ExactlyOnce,
173}
174
175// =============================================================================
176// Acknowledgment Mode
177// =============================================================================
178
179/// Mode for event acknowledgment.
180#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
181pub enum AckMode {
182    /// Automatic acknowledgment on receive.
183    #[default]
184    Auto,
185    /// Manual acknowledgment required.
186    Manual,
187    /// No acknowledgment needed.
188    None,
189}
190
191// =============================================================================
192// Subscriber
193// =============================================================================
194
195/// A subscriber that receives events.
196#[derive(Debug, Clone)]
197pub struct Subscriber {
198    pub id: SubscriberId,
199    pub subscriptions: Vec<Subscription>,
200    pub created_at: u64,
201    pub last_active: u64,
202    pub events_received: u64,
203    pub events_acknowledged: u64,
204}
205
206impl Subscriber {
207    /// Create a new subscriber.
208    pub fn new(id: impl Into<SubscriberId>) -> Self {
209        let now = current_timestamp();
210        Self {
211            id: id.into(),
212            subscriptions: Vec::new(),
213            created_at: now,
214            last_active: now,
215            events_received: 0,
216            events_acknowledged: 0,
217        }
218    }
219
220    /// Add a subscription.
221    pub fn add_subscription(&mut self, subscription: Subscription) {
222        self.subscriptions.push(subscription);
223    }
224
225    /// Remove a subscription.
226    pub fn remove_subscription(&mut self, subscription_id: &SubscriberId) {
227        self.subscriptions.retain(|s| &s.id != subscription_id);
228    }
229
230    /// Get active subscriptions.
231    pub fn active_subscriptions(&self) -> Vec<&Subscription> {
232        self.subscriptions.iter().filter(|s| s.active).collect()
233    }
234
235    /// Record an event received.
236    pub fn record_received(&mut self) {
237        self.events_received += 1;
238        self.last_active = current_timestamp();
239    }
240
241    /// Record an event acknowledged.
242    pub fn record_acknowledged(&mut self) {
243        self.events_acknowledged += 1;
244    }
245
246    /// Check if the subscriber is active.
247    pub fn is_active(&self) -> bool {
248        !self.subscriptions.is_empty()
249            && self.subscriptions.iter().any(|s| s.active)
250    }
251}
252
253fn current_timestamp() -> u64 {
254    SystemTime::now()
255        .duration_since(UNIX_EPOCH)
256        .map(|d| d.as_millis() as u64)
257        .unwrap_or(0)
258}
259
260// =============================================================================
261// Tests
262// =============================================================================
263
264#[cfg(test)]
265mod tests {
266    use super::*;
267
268    #[test]
269    fn test_subscriber_id() {
270        let id1 = SubscriberId::generate();
271        let id2 = SubscriberId::generate();
272        assert_ne!(id1, id2);
273        assert!(id1.as_str().starts_with("sub_"));
274    }
275
276    #[test]
277    fn test_subscription() {
278        let mut subscription = Subscription::new("sub1");
279        subscription.add_channel("channel1");
280        subscription.add_channel("channel2");
281
282        assert!(subscription.is_subscribed_to(&ChannelId::new("channel1")));
283        assert!(!subscription.is_subscribed_to(&ChannelId::new("channel3")));
284        assert!(subscription.active);
285
286        subscription.deactivate();
287        assert!(!subscription.active);
288    }
289
290    #[test]
291    fn test_subscriber() {
292        let mut subscriber = Subscriber::new("user1");
293
294        let mut sub1 = Subscription::new("sub1");
295        sub1.add_channel("events");
296        subscriber.add_subscription(sub1);
297
298        assert!(subscriber.is_active());
299        assert_eq!(subscriber.active_subscriptions().len(), 1);
300
301        subscriber.record_received();
302        assert_eq!(subscriber.events_received, 1);
303    }
304
305    #[test]
306    fn test_subscription_metadata() {
307        let metadata = SubscriptionMetadata::default()
308            .with_name("Test Subscription")
309            .with_description("A test subscription")
310            .with_tag("test")
311            .with_delivery_mode(DeliveryMode::ExactlyOnce);
312
313        assert_eq!(metadata.name, Some("Test Subscription".to_string()));
314        assert_eq!(metadata.delivery_mode, DeliveryMode::ExactlyOnce);
315    }
316}