aegis_streaming/
subscriber.rs1use crate::channel::ChannelId;
9use crate::event::EventFilter;
10use serde::{Deserialize, Serialize};
11use std::collections::HashSet;
12use std::time::{SystemTime, UNIX_EPOCH};
13
14#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
20pub struct SubscriberId(pub String);
21
22impl SubscriberId {
23 pub fn new(id: impl Into<String>) -> Self {
24 Self(id.into())
25 }
26
27 pub fn generate() -> Self {
28 let timestamp = SystemTime::now()
29 .duration_since(UNIX_EPOCH)
30 .unwrap_or_default()
31 .as_nanos();
32 Self(format!("sub_{:032x}", timestamp))
33 }
34
35 pub fn as_str(&self) -> &str {
36 &self.0
37 }
38}
39
40impl std::fmt::Display for SubscriberId {
41 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
42 write!(f, "{}", self.0)
43 }
44}
45
46impl From<String> for SubscriberId {
47 fn from(s: String) -> Self {
48 Self(s)
49 }
50}
51
52impl From<&str> for SubscriberId {
53 fn from(s: &str) -> Self {
54 Self(s.to_string())
55 }
56}
57
58#[derive(Debug, Clone, Serialize, Deserialize)]
64pub struct Subscription {
65 pub id: SubscriberId,
66 pub channels: HashSet<ChannelId>,
67 pub filter: Option<EventFilter>,
68 pub created_at: u64,
69 pub active: bool,
70 pub metadata: SubscriptionMetadata,
71}
72
73impl Subscription {
74 pub fn new(id: impl Into<SubscriberId>) -> Self {
76 Self {
77 id: id.into(),
78 channels: HashSet::new(),
79 filter: None,
80 created_at: current_timestamp(),
81 active: true,
82 metadata: SubscriptionMetadata::default(),
83 }
84 }
85
86 pub fn add_channel(&mut self, channel: impl Into<ChannelId>) {
88 self.channels.insert(channel.into());
89 }
90
91 pub fn remove_channel(&mut self, channel: &ChannelId) {
93 self.channels.remove(channel);
94 }
95
96 pub fn with_filter(mut self, filter: EventFilter) -> Self {
98 self.filter = Some(filter);
99 self
100 }
101
102 pub fn is_subscribed_to(&self, channel: &ChannelId) -> bool {
104 self.channels.contains(channel)
105 }
106
107 pub fn deactivate(&mut self) {
109 self.active = false;
110 }
111
112 pub fn activate(&mut self) {
114 self.active = true;
115 }
116}
117
118#[derive(Debug, Clone, Default, Serialize, Deserialize)]
124pub struct SubscriptionMetadata {
125 pub name: Option<String>,
126 pub description: Option<String>,
127 pub tags: Vec<String>,
128 pub delivery_mode: DeliveryMode,
129 pub ack_mode: AckMode,
130}
131
132impl SubscriptionMetadata {
133 pub fn with_name(mut self, name: impl Into<String>) -> Self {
134 self.name = Some(name.into());
135 self
136 }
137
138 pub fn with_description(mut self, description: impl Into<String>) -> Self {
139 self.description = Some(description.into());
140 self
141 }
142
143 pub fn with_tag(mut self, tag: impl Into<String>) -> Self {
144 self.tags.push(tag.into());
145 self
146 }
147
148 pub fn with_delivery_mode(mut self, mode: DeliveryMode) -> Self {
149 self.delivery_mode = mode;
150 self
151 }
152
153 pub fn with_ack_mode(mut self, mode: AckMode) -> Self {
154 self.ack_mode = mode;
155 self
156 }
157}
158
159#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
165pub enum DeliveryMode {
166 AtMostOnce,
168 #[default]
170 AtLeastOnce,
171 ExactlyOnce,
173}
174
175#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
181pub enum AckMode {
182 #[default]
184 Auto,
185 Manual,
187 None,
189}
190
191#[derive(Debug, Clone)]
197pub struct Subscriber {
198 pub id: SubscriberId,
199 pub subscriptions: Vec<Subscription>,
200 pub created_at: u64,
201 pub last_active: u64,
202 pub events_received: u64,
203 pub events_acknowledged: u64,
204}
205
206impl Subscriber {
207 pub fn new(id: impl Into<SubscriberId>) -> Self {
209 let now = current_timestamp();
210 Self {
211 id: id.into(),
212 subscriptions: Vec::new(),
213 created_at: now,
214 last_active: now,
215 events_received: 0,
216 events_acknowledged: 0,
217 }
218 }
219
220 pub fn add_subscription(&mut self, subscription: Subscription) {
222 self.subscriptions.push(subscription);
223 }
224
225 pub fn remove_subscription(&mut self, subscription_id: &SubscriberId) {
227 self.subscriptions.retain(|s| &s.id != subscription_id);
228 }
229
230 pub fn active_subscriptions(&self) -> Vec<&Subscription> {
232 self.subscriptions.iter().filter(|s| s.active).collect()
233 }
234
235 pub fn record_received(&mut self) {
237 self.events_received += 1;
238 self.last_active = current_timestamp();
239 }
240
241 pub fn record_acknowledged(&mut self) {
243 self.events_acknowledged += 1;
244 }
245
246 pub fn is_active(&self) -> bool {
248 !self.subscriptions.is_empty()
249 && self.subscriptions.iter().any(|s| s.active)
250 }
251}
252
253fn current_timestamp() -> u64 {
254 SystemTime::now()
255 .duration_since(UNIX_EPOCH)
256 .map(|d| d.as_millis() as u64)
257 .unwrap_or(0)
258}
259
260#[cfg(test)]
265mod tests {
266 use super::*;
267
268 #[test]
269 fn test_subscriber_id() {
270 let id1 = SubscriberId::generate();
271 let id2 = SubscriberId::generate();
272 assert_ne!(id1, id2);
273 assert!(id1.as_str().starts_with("sub_"));
274 }
275
276 #[test]
277 fn test_subscription() {
278 let mut subscription = Subscription::new("sub1");
279 subscription.add_channel("channel1");
280 subscription.add_channel("channel2");
281
282 assert!(subscription.is_subscribed_to(&ChannelId::new("channel1")));
283 assert!(!subscription.is_subscribed_to(&ChannelId::new("channel3")));
284 assert!(subscription.active);
285
286 subscription.deactivate();
287 assert!(!subscription.active);
288 }
289
290 #[test]
291 fn test_subscriber() {
292 let mut subscriber = Subscriber::new("user1");
293
294 let mut sub1 = Subscription::new("sub1");
295 sub1.add_channel("events");
296 subscriber.add_subscription(sub1);
297
298 assert!(subscriber.is_active());
299 assert_eq!(subscriber.active_subscriptions().len(), 1);
300
301 subscriber.record_received();
302 assert_eq!(subscriber.events_received, 1);
303 }
304
305 #[test]
306 fn test_subscription_metadata() {
307 let metadata = SubscriptionMetadata::default()
308 .with_name("Test Subscription")
309 .with_description("A test subscription")
310 .with_tag("test")
311 .with_delivery_mode(DeliveryMode::ExactlyOnce);
312
313 assert_eq!(metadata.name, Some("Test Subscription".to_string()));
314 assert_eq!(metadata.delivery_mode, DeliveryMode::ExactlyOnce);
315 }
316}