use std::collections::HashMap;
use std::fmt;
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum SubscriptionStatus {
Active,
Paused,
Completed,
Error(String),
}
#[derive(Debug, Clone, Default)]
pub struct EventFilter {
pub operation_name: Option<String>,
pub variables_match: HashMap<String, String>,
}
impl EventFilter {
pub fn new() -> Self {
Self::default()
}
pub fn with_operation(op_name: impl Into<String>) -> Self {
Self {
operation_name: Some(op_name.into()),
variables_match: HashMap::new(),
}
}
pub fn require_variable(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
self.variables_match.insert(key.into(), value.into());
self
}
pub fn matches(&self, op_name: &str, vars: &HashMap<String, String>) -> bool {
if let Some(required_op) = &self.operation_name {
if required_op != op_name {
return false;
}
}
for (k, v) in &self.variables_match {
match vars.get(k) {
Some(val) if val == v => {}
_ => return false,
}
}
true
}
}
#[derive(Debug, Clone)]
pub struct Subscription {
pub id: String,
pub operation: String,
pub variables: HashMap<String, String>,
pub filter: EventFilter,
pub status: SubscriptionStatus,
pub created_at_ms: u64,
pub last_event_ms: u64,
pub event_count: u64,
pub(crate) next_sequence: u64,
}
#[derive(Debug, Clone)]
pub struct SubscriptionEvent {
pub subscription_id: String,
pub sequence: u64,
pub payload: String,
pub timestamp_ms: u64,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum SubError {
AlreadyExists(String),
NotFound(String),
}
impl fmt::Display for SubError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
SubError::AlreadyExists(id) => write!(f, "Subscription already exists: {id}"),
SubError::NotFound(id) => write!(f, "Subscription not found: {id}"),
}
}
}
impl std::error::Error for SubError {}
#[derive(Debug, Clone)]
pub struct SubStats {
pub total: usize,
pub active: usize,
pub paused: usize,
pub completed: usize,
pub error: usize,
}
pub struct SubscriptionManager {
subscriptions: HashMap<String, Subscription>,
clock: u64,
}
impl SubscriptionManager {
pub fn new() -> Self {
Self {
subscriptions: HashMap::new(),
clock: 0,
}
}
fn tick(&mut self) -> u64 {
self.clock += 1;
self.clock
}
pub fn subscribe(
&mut self,
id: impl Into<String>,
operation: impl Into<String>,
variables: HashMap<String, String>,
filter: EventFilter,
) -> Result<(), SubError> {
let id = id.into();
if self.subscriptions.contains_key(&id) {
return Err(SubError::AlreadyExists(id));
}
let ts = self.tick();
let sub = Subscription {
id: id.clone(),
operation: operation.into(),
variables,
filter,
status: SubscriptionStatus::Active,
created_at_ms: ts,
last_event_ms: 0,
event_count: 0,
next_sequence: 1,
};
self.subscriptions.insert(id, sub);
Ok(())
}
pub fn unsubscribe(&mut self, id: &str) -> Result<(), SubError> {
self.subscriptions
.remove(id)
.ok_or_else(|| SubError::NotFound(id.to_string()))?;
Ok(())
}
pub fn pause(&mut self, id: &str) -> Result<(), SubError> {
let sub = self
.subscriptions
.get_mut(id)
.ok_or_else(|| SubError::NotFound(id.to_string()))?;
sub.status = SubscriptionStatus::Paused;
Ok(())
}
pub fn resume(&mut self, id: &str) -> Result<(), SubError> {
let sub = self
.subscriptions
.get_mut(id)
.ok_or_else(|| SubError::NotFound(id.to_string()))?;
sub.status = SubscriptionStatus::Active;
Ok(())
}
pub fn complete(&mut self, id: &str) -> Result<(), SubError> {
let sub = self
.subscriptions
.get_mut(id)
.ok_or_else(|| SubError::NotFound(id.to_string()))?;
sub.status = SubscriptionStatus::Completed;
Ok(())
}
pub fn set_error(&mut self, id: &str, message: impl Into<String>) -> Result<(), SubError> {
let sub = self
.subscriptions
.get_mut(id)
.ok_or_else(|| SubError::NotFound(id.to_string()))?;
sub.status = SubscriptionStatus::Error(message.into());
Ok(())
}
pub fn publish_event(
&mut self,
payload: &str,
op_name: &str,
vars: &HashMap<String, String>,
) -> Vec<SubscriptionEvent> {
let ts = self.tick();
let mut events = Vec::new();
for sub in self.subscriptions.values_mut() {
if sub.status != SubscriptionStatus::Active {
continue;
}
if !sub.filter.matches(op_name, vars) {
continue;
}
let seq = sub.next_sequence;
sub.next_sequence += 1;
sub.event_count += 1;
sub.last_event_ms = ts;
events.push(SubscriptionEvent {
subscription_id: sub.id.clone(),
sequence: seq,
payload: payload.to_string(),
timestamp_ms: ts,
});
}
events
}
pub fn get(&self, id: &str) -> Option<&Subscription> {
self.subscriptions.get(id)
}
pub fn active_subscriptions(&self) -> Vec<&Subscription> {
self.subscriptions
.values()
.filter(|s| s.status == SubscriptionStatus::Active)
.collect()
}
pub fn cleanup_completed(&mut self) {
self.subscriptions
.retain(|_, s| s.status != SubscriptionStatus::Completed);
}
pub fn stats(&self) -> SubStats {
let mut active = 0;
let mut paused = 0;
let mut completed = 0;
let mut error = 0;
for sub in self.subscriptions.values() {
match &sub.status {
SubscriptionStatus::Active => active += 1,
SubscriptionStatus::Paused => paused += 1,
SubscriptionStatus::Completed => completed += 1,
SubscriptionStatus::Error(_) => error += 1,
}
}
SubStats {
total: self.subscriptions.len(),
active,
paused,
completed,
error,
}
}
pub fn len(&self) -> usize {
self.subscriptions.len()
}
pub fn is_empty(&self) -> bool {
self.subscriptions.is_empty()
}
}
impl Default for SubscriptionManager {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
fn empty_vars() -> HashMap<String, String> {
HashMap::new()
}
fn vars(pairs: &[(&str, &str)]) -> HashMap<String, String> {
pairs
.iter()
.map(|(k, v)| (k.to_string(), v.to_string()))
.collect()
}
fn mgr() -> SubscriptionManager {
SubscriptionManager::new()
}
#[test]
fn test_subscribe_and_get() {
let mut m = mgr();
m.subscribe(
"s1",
"subscription Foo {}",
empty_vars(),
EventFilter::new(),
)
.expect("should succeed");
let sub = m.get("s1").expect("should succeed");
assert_eq!(sub.id, "s1");
assert_eq!(sub.status, SubscriptionStatus::Active);
assert_eq!(sub.event_count, 0);
}
#[test]
fn test_subscribe_duplicate_error() {
let mut m = mgr();
m.subscribe("s1", "op", empty_vars(), EventFilter::new())
.expect("should succeed");
let r = m.subscribe("s1", "op", empty_vars(), EventFilter::new());
assert_eq!(r, Err(SubError::AlreadyExists("s1".to_string())));
}
#[test]
fn test_get_nonexistent() {
let m = mgr();
assert!(m.get("nope").is_none());
}
#[test]
fn test_unsubscribe() {
let mut m = mgr();
m.subscribe("s1", "op", empty_vars(), EventFilter::new())
.expect("should succeed");
m.unsubscribe("s1").expect("should succeed");
assert!(m.get("s1").is_none());
}
#[test]
fn test_unsubscribe_not_found() {
let mut m = mgr();
let r = m.unsubscribe("ghost");
assert_eq!(r, Err(SubError::NotFound("ghost".to_string())));
}
#[test]
fn test_pause_and_resume() {
let mut m = mgr();
m.subscribe("s1", "op", empty_vars(), EventFilter::new())
.expect("should succeed");
m.pause("s1").expect("should succeed");
assert_eq!(
m.get("s1").expect("should succeed").status,
SubscriptionStatus::Paused
);
m.resume("s1").expect("should succeed");
assert_eq!(
m.get("s1").expect("should succeed").status,
SubscriptionStatus::Active
);
}
#[test]
fn test_pause_not_found() {
let mut m = mgr();
assert_eq!(m.pause("x"), Err(SubError::NotFound("x".to_string())));
}
#[test]
fn test_resume_not_found() {
let mut m = mgr();
assert_eq!(m.resume("x"), Err(SubError::NotFound("x".to_string())));
}
#[test]
fn test_complete() {
let mut m = mgr();
m.subscribe("s1", "op", empty_vars(), EventFilter::new())
.expect("should succeed");
m.complete("s1").expect("should succeed");
assert_eq!(
m.get("s1").expect("should succeed").status,
SubscriptionStatus::Completed
);
}
#[test]
fn test_complete_not_found() {
let mut m = mgr();
assert_eq!(m.complete("x"), Err(SubError::NotFound("x".to_string())));
}
#[test]
fn test_publish_delivers_to_active() {
let mut m = mgr();
m.subscribe("s1", "op", empty_vars(), EventFilter::new())
.expect("should succeed");
let events = m.publish_event("{\"data\":1}", "op", &empty_vars());
assert_eq!(events.len(), 1);
assert_eq!(events[0].subscription_id, "s1");
assert_eq!(events[0].payload, "{\"data\":1}");
assert_eq!(events[0].sequence, 1);
}
#[test]
fn test_publish_does_not_deliver_to_paused() {
let mut m = mgr();
m.subscribe("s1", "op", empty_vars(), EventFilter::new())
.expect("should succeed");
m.pause("s1").expect("should succeed");
let events = m.publish_event("payload", "op", &empty_vars());
assert!(events.is_empty());
}
#[test]
fn test_publish_does_not_deliver_to_completed() {
let mut m = mgr();
m.subscribe("s1", "op", empty_vars(), EventFilter::new())
.expect("should succeed");
m.complete("s1").expect("should succeed");
let events = m.publish_event("payload", "op", &empty_vars());
assert!(events.is_empty());
}
#[test]
fn test_publish_increments_event_count() {
let mut m = mgr();
m.subscribe("s1", "op", empty_vars(), EventFilter::new())
.expect("should succeed");
m.publish_event("e1", "op", &empty_vars());
m.publish_event("e2", "op", &empty_vars());
assert_eq!(m.get("s1").expect("should succeed").event_count, 2);
}
#[test]
fn test_publish_sequence_increments() {
let mut m = mgr();
m.subscribe("s1", "op", empty_vars(), EventFilter::new())
.expect("should succeed");
let e1 = m.publish_event("e1", "op", &empty_vars());
let e2 = m.publish_event("e2", "op", &empty_vars());
assert_eq!(e1[0].sequence, 1);
assert_eq!(e2[0].sequence, 2);
}
#[test]
fn test_publish_multiple_subscribers() {
let mut m = mgr();
m.subscribe("s1", "op", empty_vars(), EventFilter::new())
.expect("should succeed");
m.subscribe("s2", "op", empty_vars(), EventFilter::new())
.expect("should succeed");
let events = m.publish_event("payload", "op", &empty_vars());
assert_eq!(events.len(), 2);
}
#[test]
fn test_filter_by_operation() {
let mut m = mgr();
m.subscribe(
"s1",
"op",
empty_vars(),
EventFilter::with_operation("targetOp"),
)
.expect("should succeed");
let e1 = m.publish_event("p", "otherOp", &empty_vars());
assert!(e1.is_empty());
let e2 = m.publish_event("p", "targetOp", &empty_vars());
assert_eq!(e2.len(), 1);
}
#[test]
fn test_filter_by_variable() {
let mut m = mgr();
let filter = EventFilter::new().require_variable("userId", "42");
m.subscribe("s1", "op", empty_vars(), filter)
.expect("should succeed");
let e1 = m.publish_event("p", "op", &empty_vars());
assert!(e1.is_empty());
let e2 = m.publish_event("p", "op", &vars(&[("userId", "99")]));
assert!(e2.is_empty());
let e3 = m.publish_event("p", "op", &vars(&[("userId", "42")]));
assert_eq!(e3.len(), 1);
}
#[test]
fn test_filter_matches_any_op_when_none_set() {
let filter = EventFilter::new();
assert!(filter.matches("anything", &empty_vars()));
}
#[test]
fn test_active_subscriptions() {
let mut m = mgr();
m.subscribe("s1", "op", empty_vars(), EventFilter::new())
.expect("should succeed");
m.subscribe("s2", "op", empty_vars(), EventFilter::new())
.expect("should succeed");
m.subscribe("s3", "op", empty_vars(), EventFilter::new())
.expect("should succeed");
m.pause("s2").expect("should succeed");
m.complete("s3").expect("should succeed");
let active = m.active_subscriptions();
assert_eq!(active.len(), 1);
assert_eq!(active[0].id, "s1");
}
#[test]
fn test_cleanup_completed() {
let mut m = mgr();
m.subscribe("s1", "op", empty_vars(), EventFilter::new())
.expect("should succeed");
m.subscribe("s2", "op", empty_vars(), EventFilter::new())
.expect("should succeed");
m.complete("s1").expect("should succeed");
m.cleanup_completed();
assert!(m.get("s1").is_none());
assert!(m.get("s2").is_some());
}
#[test]
fn test_stats_initial() {
let m = mgr();
let s = m.stats();
assert_eq!(s.total, 0);
assert_eq!(s.active, 0);
}
#[test]
fn test_stats_mixed() {
let mut m = mgr();
m.subscribe("s1", "op", empty_vars(), EventFilter::new())
.expect("should succeed");
m.subscribe("s2", "op", empty_vars(), EventFilter::new())
.expect("should succeed");
m.subscribe("s3", "op", empty_vars(), EventFilter::new())
.expect("should succeed");
m.subscribe("s4", "op", empty_vars(), EventFilter::new())
.expect("should succeed");
m.pause("s2").expect("should succeed");
m.complete("s3").expect("should succeed");
m.set_error("s4", "boom").expect("should succeed");
let s = m.stats();
assert_eq!(s.total, 4);
assert_eq!(s.active, 1);
assert_eq!(s.paused, 1);
assert_eq!(s.completed, 1);
assert_eq!(s.error, 1);
}
#[test]
fn test_sub_error_display() {
let e1 = SubError::AlreadyExists("s1".to_string());
assert!(e1.to_string().contains("s1"));
let e2 = SubError::NotFound("s2".to_string());
assert!(e2.to_string().contains("s2"));
}
#[test]
fn test_subscription_status_clone_eq() {
let s = SubscriptionStatus::Error("msg".to_string());
let s2 = s.clone();
assert_eq!(s, s2);
}
#[test]
fn test_default() {
let m = SubscriptionManager::default();
assert!(m.is_empty());
}
}