Skip to main content

aegis_streaming/
subscriber.rs

1//! Aegis Streaming Subscribers
2//!
3//! Subscriber management for event subscriptions.
4//!
5//! @version 0.1.0
6//! @author AutomataNexus Development Team
7
8use crate::channel::ChannelId;
9use crate::event::EventFilter;
10use serde::{Deserialize, Serialize};
11use std::collections::{HashMap, HashSet};
12use std::time::{SystemTime, UNIX_EPOCH};
13
14// =============================================================================
15// Subscriber ID
16// =============================================================================
17
18/// Unique identifier for a subscriber.
19#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
20pub struct SubscriberId(pub String);
21
22impl SubscriberId {
23    pub fn new(id: impl Into<String>) -> Self {
24        Self(id.into())
25    }
26
27    pub fn generate() -> Self {
28        let timestamp = SystemTime::now()
29            .duration_since(UNIX_EPOCH)
30            .unwrap_or_default()
31            .as_nanos();
32        Self(format!("sub_{:032x}", timestamp))
33    }
34
35    pub fn as_str(&self) -> &str {
36        &self.0
37    }
38}
39
40impl std::fmt::Display for SubscriberId {
41    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
42        write!(f, "{}", self.0)
43    }
44}
45
46impl From<String> for SubscriberId {
47    fn from(s: String) -> Self {
48        Self(s)
49    }
50}
51
52impl From<&str> for SubscriberId {
53    fn from(s: &str) -> Self {
54        Self(s.to_string())
55    }
56}
57
58// =============================================================================
59// Subscription
60// =============================================================================
61
62/// A subscription to one or more channels.
63#[derive(Debug, Clone, Serialize, Deserialize)]
64pub struct Subscription {
65    pub id: SubscriberId,
66    pub channels: HashSet<ChannelId>,
67    pub filter: Option<EventFilter>,
68    pub created_at: u64,
69    pub active: bool,
70    pub metadata: SubscriptionMetadata,
71}
72
73impl Subscription {
74    /// Create a new subscription.
75    pub fn new(id: impl Into<SubscriberId>) -> Self {
76        Self {
77            id: id.into(),
78            channels: HashSet::new(),
79            filter: None,
80            created_at: current_timestamp(),
81            active: true,
82            metadata: SubscriptionMetadata::default(),
83        }
84    }
85
86    /// Add a channel to the subscription.
87    pub fn add_channel(&mut self, channel: impl Into<ChannelId>) {
88        self.channels.insert(channel.into());
89    }
90
91    /// Remove a channel from the subscription.
92    pub fn remove_channel(&mut self, channel: &ChannelId) {
93        self.channels.remove(channel);
94    }
95
96    /// Set the event filter.
97    pub fn with_filter(mut self, filter: EventFilter) -> Self {
98        self.filter = Some(filter);
99        self
100    }
101
102    /// Check if subscribed to a channel.
103    pub fn is_subscribed_to(&self, channel: &ChannelId) -> bool {
104        self.channels.contains(channel)
105    }
106
107    /// Deactivate the subscription.
108    pub fn deactivate(&mut self) {
109        self.active = false;
110    }
111
112    /// Reactivate the subscription.
113    pub fn activate(&mut self) {
114        self.active = true;
115    }
116}
117
118// =============================================================================
119// Subscription Metadata
120// =============================================================================
121
122/// Metadata for a subscription.
123#[derive(Debug, Clone, Default, Serialize, Deserialize)]
124pub struct SubscriptionMetadata {
125    pub name: Option<String>,
126    pub description: Option<String>,
127    pub tags: Vec<String>,
128    pub delivery_mode: DeliveryMode,
129    pub ack_mode: AckMode,
130}
131
132impl SubscriptionMetadata {
133    pub fn with_name(mut self, name: impl Into<String>) -> Self {
134        self.name = Some(name.into());
135        self
136    }
137
138    pub fn with_description(mut self, description: impl Into<String>) -> Self {
139        self.description = Some(description.into());
140        self
141    }
142
143    pub fn with_tag(mut self, tag: impl Into<String>) -> Self {
144        self.tags.push(tag.into());
145        self
146    }
147
148    pub fn with_delivery_mode(mut self, mode: DeliveryMode) -> Self {
149        self.delivery_mode = mode;
150        self
151    }
152
153    pub fn with_ack_mode(mut self, mode: AckMode) -> Self {
154        self.ack_mode = mode;
155        self
156    }
157}
158
159// =============================================================================
160// Delivery Mode
161// =============================================================================
162
163/// Mode for event delivery.
164#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
165pub enum DeliveryMode {
166    /// Events are delivered at most once.
167    AtMostOnce,
168    /// Events are delivered at least once.
169    #[default]
170    AtLeastOnce,
171    /// Events are delivered exactly once.
172    ExactlyOnce,
173}
174
175// =============================================================================
176// Acknowledgment Mode
177// =============================================================================
178
179/// Mode for event acknowledgment.
180#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
181pub enum AckMode {
182    /// Automatic acknowledgment on receive.
183    #[default]
184    Auto,
185    /// Manual acknowledgment required.
186    Manual,
187    /// No acknowledgment needed.
188    None,
189}
190
191// =============================================================================
192// Subscriber
193// =============================================================================
194
195/// A subscriber that receives events.
196#[derive(Debug, Clone)]
197pub struct Subscriber {
198    pub id: SubscriberId,
199    pub subscriptions: Vec<Subscription>,
200    pub created_at: u64,
201    pub last_active: u64,
202    pub events_received: u64,
203    pub events_acknowledged: u64,
204}
205
206impl Subscriber {
207    /// Create a new subscriber.
208    pub fn new(id: impl Into<SubscriberId>) -> Self {
209        let now = current_timestamp();
210        Self {
211            id: id.into(),
212            subscriptions: Vec::new(),
213            created_at: now,
214            last_active: now,
215            events_received: 0,
216            events_acknowledged: 0,
217        }
218    }
219
220    /// Add a subscription.
221    pub fn add_subscription(&mut self, subscription: Subscription) {
222        self.subscriptions.push(subscription);
223    }
224
225    /// Remove a subscription.
226    pub fn remove_subscription(&mut self, subscription_id: &SubscriberId) {
227        self.subscriptions.retain(|s| &s.id != subscription_id);
228    }
229
230    /// Get active subscriptions.
231    pub fn active_subscriptions(&self) -> Vec<&Subscription> {
232        self.subscriptions.iter().filter(|s| s.active).collect()
233    }
234
235    /// Record an event received.
236    pub fn record_received(&mut self) {
237        self.events_received += 1;
238        self.last_active = current_timestamp();
239    }
240
241    /// Record an event acknowledged.
242    pub fn record_acknowledged(&mut self) {
243        self.events_acknowledged += 1;
244    }
245
246    /// Check if the subscriber is active.
247    pub fn is_active(&self) -> bool {
248        !self.subscriptions.is_empty() && self.subscriptions.iter().any(|s| s.active)
249    }
250}
251
252fn current_timestamp() -> u64 {
253    SystemTime::now()
254        .duration_since(UNIX_EPOCH)
255        .map(|d| d.as_millis() as u64)
256        .unwrap_or(0)
257}
258
259// =============================================================================
260// Consumer Group
261// =============================================================================
262
263/// A consumer group that coordinates message consumption across multiple subscribers.
264///
265/// Each member is assigned a set of channels, and the group tracks committed offsets
266/// per channel so that consumers can resume from where they left off.
267#[derive(Debug, Clone)]
268pub struct ConsumerGroup {
269    /// Unique identifier for this consumer group.
270    pub group_id: String,
271    /// Map of subscriber_id -> set of assigned channel names.
272    pub members: HashMap<SubscriberId, HashSet<String>>,
273    /// Map of channel_name -> committed offset.
274    pub committed_offsets: HashMap<String, u64>,
275    /// Timestamp when the group was created.
276    pub created_at: u64,
277}
278
279impl ConsumerGroup {
280    /// Create a new consumer group with the given ID.
281    pub fn new(group_id: impl Into<String>) -> Self {
282        Self {
283            group_id: group_id.into(),
284            members: HashMap::new(),
285            committed_offsets: HashMap::new(),
286            created_at: current_timestamp(),
287        }
288    }
289
290    /// Add a member to the consumer group with an initial set of assigned channels.
291    pub fn add_member(&mut self, subscriber_id: SubscriberId, channels: HashSet<String>) {
292        self.members.insert(subscriber_id, channels);
293    }
294
295    /// Remove a member from the consumer group. Returns the channels that were assigned
296    /// to the removed member, or None if the member was not found.
297    pub fn remove_member(&mut self, subscriber_id: &SubscriberId) -> Option<HashSet<String>> {
298        self.members.remove(subscriber_id)
299    }
300
301    /// Commit an offset for a channel. This records the position up to which
302    /// messages have been successfully processed.
303    pub fn commit_offset(&mut self, channel_name: impl Into<String>, offset: u64) {
304        self.committed_offsets.insert(channel_name.into(), offset);
305    }
306
307    /// Get the committed offset for a channel. Returns None if no offset
308    /// has been committed for that channel.
309    pub fn get_offset(&self, channel_name: &str) -> Option<u64> {
310        self.committed_offsets.get(channel_name).copied()
311    }
312
313    /// Get the number of members in the group.
314    pub fn member_count(&self) -> usize {
315        self.members.len()
316    }
317
318    /// Check if a subscriber is a member of this group.
319    pub fn is_member(&self, subscriber_id: &SubscriberId) -> bool {
320        self.members.contains_key(subscriber_id)
321    }
322
323    /// Get the channels assigned to a specific member.
324    pub fn get_member_channels(&self, subscriber_id: &SubscriberId) -> Option<&HashSet<String>> {
325        self.members.get(subscriber_id)
326    }
327}
328
329// =============================================================================
330// Tests
331// =============================================================================
332
333#[cfg(test)]
334mod tests {
335    use super::*;
336
337    #[test]
338    fn test_subscriber_id() {
339        let id1 = SubscriberId::generate();
340        let id2 = SubscriberId::generate();
341        assert_ne!(id1, id2);
342        assert!(id1.as_str().starts_with("sub_"));
343    }
344
345    #[test]
346    fn test_subscription() {
347        let mut subscription = Subscription::new("sub1");
348        subscription.add_channel("channel1");
349        subscription.add_channel("channel2");
350
351        assert!(subscription.is_subscribed_to(&ChannelId::new("channel1")));
352        assert!(!subscription.is_subscribed_to(&ChannelId::new("channel3")));
353        assert!(subscription.active);
354
355        subscription.deactivate();
356        assert!(!subscription.active);
357    }
358
359    #[test]
360    fn test_subscriber() {
361        let mut subscriber = Subscriber::new("user1");
362
363        let mut sub1 = Subscription::new("sub1");
364        sub1.add_channel("events");
365        subscriber.add_subscription(sub1);
366
367        assert!(subscriber.is_active());
368        assert_eq!(subscriber.active_subscriptions().len(), 1);
369
370        subscriber.record_received();
371        assert_eq!(subscriber.events_received, 1);
372    }
373
374    #[test]
375    fn test_subscription_metadata() {
376        let metadata = SubscriptionMetadata::default()
377            .with_name("Test Subscription")
378            .with_description("A test subscription")
379            .with_tag("test")
380            .with_delivery_mode(DeliveryMode::ExactlyOnce);
381
382        assert_eq!(metadata.name, Some("Test Subscription".to_string()));
383        assert_eq!(metadata.delivery_mode, DeliveryMode::ExactlyOnce);
384    }
385
386    #[test]
387    fn test_consumer_group_creation() {
388        let group = ConsumerGroup::new("group1");
389        assert_eq!(group.group_id, "group1");
390        assert_eq!(group.member_count(), 0);
391        assert!(group.committed_offsets.is_empty());
392        assert!(group.created_at > 0);
393    }
394
395    #[test]
396    fn test_consumer_group_add_remove_members() {
397        let mut group = ConsumerGroup::new("group1");
398
399        let sub1 = SubscriberId::new("sub1");
400        let sub2 = SubscriberId::new("sub2");
401
402        let mut channels1 = HashSet::new();
403        channels1.insert("events".to_string());
404        channels1.insert("logs".to_string());
405
406        let mut channels2 = HashSet::new();
407        channels2.insert("metrics".to_string());
408
409        group.add_member(sub1.clone(), channels1);
410        group.add_member(sub2.clone(), channels2);
411
412        assert_eq!(group.member_count(), 2);
413        assert!(group.is_member(&sub1));
414        assert!(group.is_member(&sub2));
415
416        let member_channels = group.get_member_channels(&sub1).unwrap();
417        assert!(member_channels.contains("events"));
418        assert!(member_channels.contains("logs"));
419
420        let removed = group.remove_member(&sub1);
421        assert!(removed.is_some());
422        assert_eq!(group.member_count(), 1);
423        assert!(!group.is_member(&sub1));
424
425        // Removing a non-existent member returns None
426        let removed = group.remove_member(&SubscriberId::new("nonexistent"));
427        assert!(removed.is_none());
428    }
429
430    #[test]
431    fn test_consumer_group_offset_tracking() {
432        let mut group = ConsumerGroup::new("group1");
433
434        // No offset committed yet
435        assert_eq!(group.get_offset("events"), None);
436
437        group.commit_offset("events", 42);
438        assert_eq!(group.get_offset("events"), Some(42));
439
440        // Update offset
441        group.commit_offset("events", 100);
442        assert_eq!(group.get_offset("events"), Some(100));
443
444        // Different channel
445        group.commit_offset("logs", 5);
446        assert_eq!(group.get_offset("logs"), Some(5));
447        assert_eq!(group.get_offset("events"), Some(100));
448    }
449}