use crate::channel::ChannelId;
use crate::event::EventFilter;
use serde::{Deserialize, Serialize};
use std::collections::{HashMap, HashSet};
use std::time::{SystemTime, UNIX_EPOCH};
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct SubscriberId(pub String);
impl SubscriberId {
pub fn new(id: impl Into<String>) -> Self {
Self(id.into())
}
pub fn generate() -> Self {
let timestamp = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_nanos();
Self(format!("sub_{:032x}", timestamp))
}
pub fn as_str(&self) -> &str {
&self.0
}
}
impl std::fmt::Display for SubscriberId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.0)
}
}
impl From<String> for SubscriberId {
fn from(s: String) -> Self {
Self(s)
}
}
impl From<&str> for SubscriberId {
fn from(s: &str) -> Self {
Self(s.to_string())
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Subscription {
pub id: SubscriberId,
pub channels: HashSet<ChannelId>,
pub filter: Option<EventFilter>,
pub created_at: u64,
pub active: bool,
pub metadata: SubscriptionMetadata,
}
impl Subscription {
pub fn new(id: impl Into<SubscriberId>) -> Self {
Self {
id: id.into(),
channels: HashSet::new(),
filter: None,
created_at: current_timestamp(),
active: true,
metadata: SubscriptionMetadata::default(),
}
}
pub fn add_channel(&mut self, channel: impl Into<ChannelId>) {
self.channels.insert(channel.into());
}
pub fn remove_channel(&mut self, channel: &ChannelId) {
self.channels.remove(channel);
}
pub fn with_filter(mut self, filter: EventFilter) -> Self {
self.filter = Some(filter);
self
}
pub fn is_subscribed_to(&self, channel: &ChannelId) -> bool {
self.channels.contains(channel)
}
pub fn deactivate(&mut self) {
self.active = false;
}
pub fn activate(&mut self) {
self.active = true;
}
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct SubscriptionMetadata {
pub name: Option<String>,
pub description: Option<String>,
pub tags: Vec<String>,
pub delivery_mode: DeliveryMode,
pub ack_mode: AckMode,
}
impl SubscriptionMetadata {
pub fn with_name(mut self, name: impl Into<String>) -> Self {
self.name = Some(name.into());
self
}
pub fn with_description(mut self, description: impl Into<String>) -> Self {
self.description = Some(description.into());
self
}
pub fn with_tag(mut self, tag: impl Into<String>) -> Self {
self.tags.push(tag.into());
self
}
pub fn with_delivery_mode(mut self, mode: DeliveryMode) -> Self {
self.delivery_mode = mode;
self
}
pub fn with_ack_mode(mut self, mode: AckMode) -> Self {
self.ack_mode = mode;
self
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
pub enum DeliveryMode {
AtMostOnce,
#[default]
AtLeastOnce,
ExactlyOnce,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
pub enum AckMode {
#[default]
Auto,
Manual,
None,
}
#[derive(Debug, Clone)]
pub struct Subscriber {
pub id: SubscriberId,
pub subscriptions: Vec<Subscription>,
pub created_at: u64,
pub last_active: u64,
pub events_received: u64,
pub events_acknowledged: u64,
}
impl Subscriber {
pub fn new(id: impl Into<SubscriberId>) -> Self {
let now = current_timestamp();
Self {
id: id.into(),
subscriptions: Vec::new(),
created_at: now,
last_active: now,
events_received: 0,
events_acknowledged: 0,
}
}
pub fn add_subscription(&mut self, subscription: Subscription) {
self.subscriptions.push(subscription);
}
pub fn remove_subscription(&mut self, subscription_id: &SubscriberId) {
self.subscriptions.retain(|s| &s.id != subscription_id);
}
pub fn active_subscriptions(&self) -> Vec<&Subscription> {
self.subscriptions.iter().filter(|s| s.active).collect()
}
pub fn record_received(&mut self) {
self.events_received += 1;
self.last_active = current_timestamp();
}
pub fn record_acknowledged(&mut self) {
self.events_acknowledged += 1;
}
pub fn is_active(&self) -> bool {
!self.subscriptions.is_empty() && self.subscriptions.iter().any(|s| s.active)
}
}
fn current_timestamp() -> u64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_millis() as u64)
.unwrap_or(0)
}
#[derive(Debug, Clone)]
pub struct ConsumerGroup {
pub group_id: String,
pub members: HashMap<SubscriberId, HashSet<String>>,
pub committed_offsets: HashMap<String, u64>,
pub created_at: u64,
}
impl ConsumerGroup {
pub fn new(group_id: impl Into<String>) -> Self {
Self {
group_id: group_id.into(),
members: HashMap::new(),
committed_offsets: HashMap::new(),
created_at: current_timestamp(),
}
}
pub fn add_member(&mut self, subscriber_id: SubscriberId, channels: HashSet<String>) {
self.members.insert(subscriber_id, channels);
}
pub fn remove_member(&mut self, subscriber_id: &SubscriberId) -> Option<HashSet<String>> {
self.members.remove(subscriber_id)
}
pub fn commit_offset(&mut self, channel_name: impl Into<String>, offset: u64) {
self.committed_offsets.insert(channel_name.into(), offset);
}
pub fn get_offset(&self, channel_name: &str) -> Option<u64> {
self.committed_offsets.get(channel_name).copied()
}
pub fn member_count(&self) -> usize {
self.members.len()
}
pub fn is_member(&self, subscriber_id: &SubscriberId) -> bool {
self.members.contains_key(subscriber_id)
}
pub fn get_member_channels(&self, subscriber_id: &SubscriberId) -> Option<&HashSet<String>> {
self.members.get(subscriber_id)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_subscriber_id() {
let id1 = SubscriberId::generate();
let id2 = SubscriberId::generate();
assert_ne!(id1, id2);
assert!(id1.as_str().starts_with("sub_"));
}
#[test]
fn test_subscription() {
let mut subscription = Subscription::new("sub1");
subscription.add_channel("channel1");
subscription.add_channel("channel2");
assert!(subscription.is_subscribed_to(&ChannelId::new("channel1")));
assert!(!subscription.is_subscribed_to(&ChannelId::new("channel3")));
assert!(subscription.active);
subscription.deactivate();
assert!(!subscription.active);
}
#[test]
fn test_subscriber() {
let mut subscriber = Subscriber::new("user1");
let mut sub1 = Subscription::new("sub1");
sub1.add_channel("events");
subscriber.add_subscription(sub1);
assert!(subscriber.is_active());
assert_eq!(subscriber.active_subscriptions().len(), 1);
subscriber.record_received();
assert_eq!(subscriber.events_received, 1);
}
#[test]
fn test_subscription_metadata() {
let metadata = SubscriptionMetadata::default()
.with_name("Test Subscription")
.with_description("A test subscription")
.with_tag("test")
.with_delivery_mode(DeliveryMode::ExactlyOnce);
assert_eq!(metadata.name, Some("Test Subscription".to_string()));
assert_eq!(metadata.delivery_mode, DeliveryMode::ExactlyOnce);
}
#[test]
fn test_consumer_group_creation() {
let group = ConsumerGroup::new("group1");
assert_eq!(group.group_id, "group1");
assert_eq!(group.member_count(), 0);
assert!(group.committed_offsets.is_empty());
assert!(group.created_at > 0);
}
#[test]
fn test_consumer_group_add_remove_members() {
let mut group = ConsumerGroup::new("group1");
let sub1 = SubscriberId::new("sub1");
let sub2 = SubscriberId::new("sub2");
let mut channels1 = HashSet::new();
channels1.insert("events".to_string());
channels1.insert("logs".to_string());
let mut channels2 = HashSet::new();
channels2.insert("metrics".to_string());
group.add_member(sub1.clone(), channels1);
group.add_member(sub2.clone(), channels2);
assert_eq!(group.member_count(), 2);
assert!(group.is_member(&sub1));
assert!(group.is_member(&sub2));
let member_channels = group.get_member_channels(&sub1).unwrap();
assert!(member_channels.contains("events"));
assert!(member_channels.contains("logs"));
let removed = group.remove_member(&sub1);
assert!(removed.is_some());
assert_eq!(group.member_count(), 1);
assert!(!group.is_member(&sub1));
let removed = group.remove_member(&SubscriberId::new("nonexistent"));
assert!(removed.is_none());
}
#[test]
fn test_consumer_group_offset_tracking() {
let mut group = ConsumerGroup::new("group1");
assert_eq!(group.get_offset("events"), None);
group.commit_offset("events", 42);
assert_eq!(group.get_offset("events"), Some(42));
group.commit_offset("events", 100);
assert_eq!(group.get_offset("events"), Some(100));
group.commit_offset("logs", 5);
assert_eq!(group.get_offset("logs"), Some(5));
assert_eq!(group.get_offset("events"), Some(100));
}
}