1use crate::channel::ChannelId;
9use crate::event::EventFilter;
10use serde::{Deserialize, Serialize};
11use std::collections::{HashMap, HashSet};
12use std::time::{SystemTime, UNIX_EPOCH};
13
14#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
20pub struct SubscriberId(pub String);
21
22impl SubscriberId {
23 pub fn new(id: impl Into<String>) -> Self {
24 Self(id.into())
25 }
26
27 pub fn generate() -> Self {
28 let timestamp = SystemTime::now()
29 .duration_since(UNIX_EPOCH)
30 .unwrap_or_default()
31 .as_nanos();
32 Self(format!("sub_{:032x}", timestamp))
33 }
34
35 pub fn as_str(&self) -> &str {
36 &self.0
37 }
38}
39
40impl std::fmt::Display for SubscriberId {
41 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
42 write!(f, "{}", self.0)
43 }
44}
45
46impl From<String> for SubscriberId {
47 fn from(s: String) -> Self {
48 Self(s)
49 }
50}
51
52impl From<&str> for SubscriberId {
53 fn from(s: &str) -> Self {
54 Self(s.to_string())
55 }
56}
57
58#[derive(Debug, Clone, Serialize, Deserialize)]
64pub struct Subscription {
65 pub id: SubscriberId,
66 pub channels: HashSet<ChannelId>,
67 pub filter: Option<EventFilter>,
68 pub created_at: u64,
69 pub active: bool,
70 pub metadata: SubscriptionMetadata,
71}
72
73impl Subscription {
74 pub fn new(id: impl Into<SubscriberId>) -> Self {
76 Self {
77 id: id.into(),
78 channels: HashSet::new(),
79 filter: None,
80 created_at: current_timestamp(),
81 active: true,
82 metadata: SubscriptionMetadata::default(),
83 }
84 }
85
86 pub fn add_channel(&mut self, channel: impl Into<ChannelId>) {
88 self.channels.insert(channel.into());
89 }
90
91 pub fn remove_channel(&mut self, channel: &ChannelId) {
93 self.channels.remove(channel);
94 }
95
96 pub fn with_filter(mut self, filter: EventFilter) -> Self {
98 self.filter = Some(filter);
99 self
100 }
101
102 pub fn is_subscribed_to(&self, channel: &ChannelId) -> bool {
104 self.channels.contains(channel)
105 }
106
107 pub fn deactivate(&mut self) {
109 self.active = false;
110 }
111
112 pub fn activate(&mut self) {
114 self.active = true;
115 }
116}
117
118#[derive(Debug, Clone, Default, Serialize, Deserialize)]
124pub struct SubscriptionMetadata {
125 pub name: Option<String>,
126 pub description: Option<String>,
127 pub tags: Vec<String>,
128 pub delivery_mode: DeliveryMode,
129 pub ack_mode: AckMode,
130}
131
132impl SubscriptionMetadata {
133 pub fn with_name(mut self, name: impl Into<String>) -> Self {
134 self.name = Some(name.into());
135 self
136 }
137
138 pub fn with_description(mut self, description: impl Into<String>) -> Self {
139 self.description = Some(description.into());
140 self
141 }
142
143 pub fn with_tag(mut self, tag: impl Into<String>) -> Self {
144 self.tags.push(tag.into());
145 self
146 }
147
148 pub fn with_delivery_mode(mut self, mode: DeliveryMode) -> Self {
149 self.delivery_mode = mode;
150 self
151 }
152
153 pub fn with_ack_mode(mut self, mode: AckMode) -> Self {
154 self.ack_mode = mode;
155 self
156 }
157}
158
159#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
165pub enum DeliveryMode {
166 AtMostOnce,
168 #[default]
170 AtLeastOnce,
171 ExactlyOnce,
173}
174
175#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
181pub enum AckMode {
182 #[default]
184 Auto,
185 Manual,
187 None,
189}
190
191#[derive(Debug, Clone)]
197pub struct Subscriber {
198 pub id: SubscriberId,
199 pub subscriptions: Vec<Subscription>,
200 pub created_at: u64,
201 pub last_active: u64,
202 pub events_received: u64,
203 pub events_acknowledged: u64,
204}
205
206impl Subscriber {
207 pub fn new(id: impl Into<SubscriberId>) -> Self {
209 let now = current_timestamp();
210 Self {
211 id: id.into(),
212 subscriptions: Vec::new(),
213 created_at: now,
214 last_active: now,
215 events_received: 0,
216 events_acknowledged: 0,
217 }
218 }
219
220 pub fn add_subscription(&mut self, subscription: Subscription) {
222 self.subscriptions.push(subscription);
223 }
224
225 pub fn remove_subscription(&mut self, subscription_id: &SubscriberId) {
227 self.subscriptions.retain(|s| &s.id != subscription_id);
228 }
229
230 pub fn active_subscriptions(&self) -> Vec<&Subscription> {
232 self.subscriptions.iter().filter(|s| s.active).collect()
233 }
234
235 pub fn record_received(&mut self) {
237 self.events_received += 1;
238 self.last_active = current_timestamp();
239 }
240
241 pub fn record_acknowledged(&mut self) {
243 self.events_acknowledged += 1;
244 }
245
246 pub fn is_active(&self) -> bool {
248 !self.subscriptions.is_empty() && self.subscriptions.iter().any(|s| s.active)
249 }
250}
251
252fn current_timestamp() -> u64 {
253 SystemTime::now()
254 .duration_since(UNIX_EPOCH)
255 .map(|d| d.as_millis() as u64)
256 .unwrap_or(0)
257}
258
259#[derive(Debug, Clone)]
268pub struct ConsumerGroup {
269 pub group_id: String,
271 pub members: HashMap<SubscriberId, HashSet<String>>,
273 pub committed_offsets: HashMap<String, u64>,
275 pub created_at: u64,
277}
278
279impl ConsumerGroup {
280 pub fn new(group_id: impl Into<String>) -> Self {
282 Self {
283 group_id: group_id.into(),
284 members: HashMap::new(),
285 committed_offsets: HashMap::new(),
286 created_at: current_timestamp(),
287 }
288 }
289
290 pub fn add_member(&mut self, subscriber_id: SubscriberId, channels: HashSet<String>) {
292 self.members.insert(subscriber_id, channels);
293 }
294
295 pub fn remove_member(&mut self, subscriber_id: &SubscriberId) -> Option<HashSet<String>> {
298 self.members.remove(subscriber_id)
299 }
300
301 pub fn commit_offset(&mut self, channel_name: impl Into<String>, offset: u64) {
304 self.committed_offsets.insert(channel_name.into(), offset);
305 }
306
307 pub fn get_offset(&self, channel_name: &str) -> Option<u64> {
310 self.committed_offsets.get(channel_name).copied()
311 }
312
313 pub fn member_count(&self) -> usize {
315 self.members.len()
316 }
317
318 pub fn is_member(&self, subscriber_id: &SubscriberId) -> bool {
320 self.members.contains_key(subscriber_id)
321 }
322
323 pub fn get_member_channels(&self, subscriber_id: &SubscriberId) -> Option<&HashSet<String>> {
325 self.members.get(subscriber_id)
326 }
327}
328
329#[cfg(test)]
334mod tests {
335 use super::*;
336
337 #[test]
338 fn test_subscriber_id() {
339 let id1 = SubscriberId::generate();
340 let id2 = SubscriberId::generate();
341 assert_ne!(id1, id2);
342 assert!(id1.as_str().starts_with("sub_"));
343 }
344
345 #[test]
346 fn test_subscription() {
347 let mut subscription = Subscription::new("sub1");
348 subscription.add_channel("channel1");
349 subscription.add_channel("channel2");
350
351 assert!(subscription.is_subscribed_to(&ChannelId::new("channel1")));
352 assert!(!subscription.is_subscribed_to(&ChannelId::new("channel3")));
353 assert!(subscription.active);
354
355 subscription.deactivate();
356 assert!(!subscription.active);
357 }
358
359 #[test]
360 fn test_subscriber() {
361 let mut subscriber = Subscriber::new("user1");
362
363 let mut sub1 = Subscription::new("sub1");
364 sub1.add_channel("events");
365 subscriber.add_subscription(sub1);
366
367 assert!(subscriber.is_active());
368 assert_eq!(subscriber.active_subscriptions().len(), 1);
369
370 subscriber.record_received();
371 assert_eq!(subscriber.events_received, 1);
372 }
373
374 #[test]
375 fn test_subscription_metadata() {
376 let metadata = SubscriptionMetadata::default()
377 .with_name("Test Subscription")
378 .with_description("A test subscription")
379 .with_tag("test")
380 .with_delivery_mode(DeliveryMode::ExactlyOnce);
381
382 assert_eq!(metadata.name, Some("Test Subscription".to_string()));
383 assert_eq!(metadata.delivery_mode, DeliveryMode::ExactlyOnce);
384 }
385
386 #[test]
387 fn test_consumer_group_creation() {
388 let group = ConsumerGroup::new("group1");
389 assert_eq!(group.group_id, "group1");
390 assert_eq!(group.member_count(), 0);
391 assert!(group.committed_offsets.is_empty());
392 assert!(group.created_at > 0);
393 }
394
395 #[test]
396 fn test_consumer_group_add_remove_members() {
397 let mut group = ConsumerGroup::new("group1");
398
399 let sub1 = SubscriberId::new("sub1");
400 let sub2 = SubscriberId::new("sub2");
401
402 let mut channels1 = HashSet::new();
403 channels1.insert("events".to_string());
404 channels1.insert("logs".to_string());
405
406 let mut channels2 = HashSet::new();
407 channels2.insert("metrics".to_string());
408
409 group.add_member(sub1.clone(), channels1);
410 group.add_member(sub2.clone(), channels2);
411
412 assert_eq!(group.member_count(), 2);
413 assert!(group.is_member(&sub1));
414 assert!(group.is_member(&sub2));
415
416 let member_channels = group.get_member_channels(&sub1).unwrap();
417 assert!(member_channels.contains("events"));
418 assert!(member_channels.contains("logs"));
419
420 let removed = group.remove_member(&sub1);
421 assert!(removed.is_some());
422 assert_eq!(group.member_count(), 1);
423 assert!(!group.is_member(&sub1));
424
425 let removed = group.remove_member(&SubscriberId::new("nonexistent"));
427 assert!(removed.is_none());
428 }
429
430 #[test]
431 fn test_consumer_group_offset_tracking() {
432 let mut group = ConsumerGroup::new("group1");
433
434 assert_eq!(group.get_offset("events"), None);
436
437 group.commit_offset("events", 42);
438 assert_eq!(group.get_offset("events"), Some(42));
439
440 group.commit_offset("events", 100);
442 assert_eq!(group.get_offset("events"), Some(100));
443
444 group.commit_offset("logs", 5);
446 assert_eq!(group.get_offset("logs"), Some(5));
447 assert_eq!(group.get_offset("events"), Some(100));
448 }
449}