use crate::backend::native::pattern::glob_matches;
use crate::backend::native::v2::pubsub::event::{PubSubEvent, PubSubEventType};
use std::sync::atomic::{AtomicU64, Ordering};
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct SubscriberId(u64);
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct NodeMetadata {
pub kind: String,
pub name: String,
}
impl NodeMetadata {
pub fn new(kind: String, name: String) -> Self {
Self { kind, name }
}
}
impl SubscriberId {
pub fn new() -> Self {
static COUNTER: AtomicU64 = AtomicU64::new(1);
Self(COUNTER.fetch_add(1, Ordering::SeqCst))
}
pub fn value(&self) -> u64 {
self.0
}
pub fn from_raw(id: u64) -> Self {
Self(id)
}
pub fn as_u64(&self) -> u64 {
self.0
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct SubscriptionFilter {
pub event_types: Option<Vec<PubSubEventType>>,
pub node_ids: Option<Vec<i64>>,
pub edge_ids: Option<Vec<i64>>,
pub key_hashes: Option<Vec<u64>>,
pub kind_patterns: Option<Vec<String>>,
pub name_patterns: Option<Vec<String>>,
}
impl SubscriptionFilter {
pub fn all() -> Self {
Self {
event_types: None,
node_ids: None,
edge_ids: None,
key_hashes: None,
kind_patterns: None,
name_patterns: None,
}
}
pub fn nodes(ids: Vec<i64>) -> Self {
Self {
event_types: Some(vec![PubSubEventType::Node]),
node_ids: Some(ids),
edge_ids: None,
key_hashes: None,
kind_patterns: None,
name_patterns: None,
}
}
pub fn edges(ids: Vec<i64>) -> Self {
Self {
event_types: Some(vec![PubSubEventType::Edge]),
node_ids: None,
edge_ids: Some(ids),
key_hashes: None,
kind_patterns: None,
name_patterns: None,
}
}
pub fn keys(hashes: Vec<u64>) -> Self {
Self {
event_types: Some(vec![PubSubEventType::KV]),
node_ids: None,
edge_ids: None,
key_hashes: Some(hashes),
kind_patterns: None,
name_patterns: None,
}
}
pub fn event_types(types: Vec<PubSubEventType>) -> Self {
Self {
event_types: Some(types),
node_ids: None,
edge_ids: None,
key_hashes: None,
kind_patterns: None,
name_patterns: None,
}
}
pub fn kind_patterns(patterns: Vec<String>) -> Self {
Self {
event_types: Some(vec![PubSubEventType::Node]),
node_ids: None,
edge_ids: None,
key_hashes: None,
kind_patterns: Some(patterns),
name_patterns: None,
}
}
pub fn name_patterns(patterns: Vec<String>) -> Self {
Self {
event_types: Some(vec![PubSubEventType::Node]),
node_ids: None,
edge_ids: None,
key_hashes: None,
kind_patterns: None,
name_patterns: Some(patterns),
}
}
pub fn node_patterns(kind_patterns: Vec<String>, name_patterns: Vec<String>) -> Self {
Self {
event_types: Some(vec![PubSubEventType::Node]),
node_ids: None,
edge_ids: None,
key_hashes: None,
kind_patterns: Some(kind_patterns),
name_patterns: Some(name_patterns),
}
}
pub fn has_patterns(&self) -> bool {
self.kind_patterns.is_some() || self.name_patterns.is_some()
}
pub fn matches(&self, event: &PubSubEvent, node_metadata: Option<&NodeMetadata>) -> bool {
if let Some(ref types) = self.event_types {
let event_type = event.event_type();
if !types.iter().any(|t| matches_type(t, event_type)) {
return false;
}
}
match event {
PubSubEvent::NodeChanged { node_id, .. } => {
let id_match = if let Some(ref ids) = self.node_ids {
ids.contains(node_id)
} else {
true
};
if !id_match {
return false;
}
if self.has_patterns() {
if let Some(metadata) = node_metadata {
if let Some(ref kind_patterns) = self.kind_patterns {
let kind_match = kind_patterns
.iter()
.any(|pattern| glob_matches(pattern, &metadata.kind));
if !kind_match {
return false;
}
}
if let Some(ref name_patterns) = self.name_patterns {
let name_match = name_patterns
.iter()
.any(|pattern| glob_matches(pattern, &metadata.name));
if !name_match {
return false;
}
}
true
} else {
false
}
} else {
true
}
}
PubSubEvent::EdgeChanged { edge_id, .. } => {
if let Some(ref ids) = self.edge_ids {
ids.contains(edge_id)
} else {
true
}
}
PubSubEvent::KVChanged { key_hash, .. } => {
if let Some(ref hashes) = self.key_hashes {
hashes.contains(key_hash)
} else {
true
}
}
PubSubEvent::SnapshotCommitted { .. } => {
true
}
}
}
pub fn matches_simple(&self, event: &PubSubEvent) -> bool {
self.matches(event, None)
}
}
fn matches_type(filter_type: &PubSubEventType, event_type: PubSubEventType) -> bool {
matches!(filter_type, PubSubEventType::All) || *filter_type == event_type
}
#[derive(Debug)]
pub struct Subscriber {
id: SubscriberId,
filter: SubscriptionFilter,
}
impl Subscriber {
pub fn new(filter: SubscriptionFilter) -> Self {
Self {
id: SubscriberId::new(),
filter,
}
}
pub fn id(&self) -> SubscriberId {
self.id
}
pub fn filter(&self) -> &SubscriptionFilter {
&self.filter
}
pub fn accepts(&self, event: &PubSubEvent) -> bool {
self.filter.matches_simple(event)
}
pub fn accepts_with_metadata(&self, event: &PubSubEvent, metadata: Option<&NodeMetadata>) -> bool {
self.filter.matches(event, metadata)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_subscriber_id_unique() {
let id1 = SubscriberId::new();
let id2 = SubscriberId::new();
let id3 = SubscriberId::new();
assert_ne!(id1, id2);
assert_ne!(id2, id3);
assert_ne!(id1, id3);
assert!(id2.value() > id1.value());
assert!(id3.value() > id2.value());
}
#[test]
fn test_filter_all_matches_all() {
let filter = SubscriptionFilter::all();
let node_event = PubSubEvent::NodeChanged {
node_id: 1,
snapshot_id: 100,
};
assert!(filter.matches_simple(&node_event));
let edge_event = PubSubEvent::EdgeChanged {
edge_id: 2,
snapshot_id: 100,
};
assert!(filter.matches_simple(&edge_event));
let kv_event = PubSubEvent::KVChanged {
key_hash: 999,
snapshot_id: 100,
};
assert!(filter.matches_simple(&kv_event));
let commit_event = PubSubEvent::SnapshotCommitted { snapshot_id: 100 };
assert!(filter.matches_simple(&commit_event));
}
#[test]
fn test_filter_nodes_only() {
let filter = SubscriptionFilter::nodes(vec![1, 2, 3]);
let node_event_1 = PubSubEvent::NodeChanged {
node_id: 1,
snapshot_id: 100,
};
assert!(filter.matches_simple(&node_event_1));
let node_event_2 = PubSubEvent::NodeChanged {
node_id: 2,
snapshot_id: 100,
};
assert!(filter.matches_simple(&node_event_2));
let node_event_4 = PubSubEvent::NodeChanged {
node_id: 4,
snapshot_id: 100,
};
assert!(!filter.matches_simple(&node_event_4));
let edge_event = PubSubEvent::EdgeChanged {
edge_id: 999,
snapshot_id: 100,
};
assert!(!filter.matches_simple(&edge_event));
let kv_event = PubSubEvent::KVChanged {
key_hash: 999,
snapshot_id: 100,
};
assert!(!filter.matches_simple(&kv_event));
}
#[test]
fn test_filter_edges_only() {
let filter = SubscriptionFilter::edges(vec![10, 20, 30]);
let edge_event = PubSubEvent::EdgeChanged {
edge_id: 10,
snapshot_id: 100,
};
assert!(filter.matches_simple(&edge_event));
let edge_event_wrong = PubSubEvent::EdgeChanged {
edge_id: 99,
snapshot_id: 100,
};
assert!(!filter.matches_simple(&edge_event_wrong));
let node_event = PubSubEvent::NodeChanged {
node_id: 10,
snapshot_id: 100,
};
assert!(!filter.matches_simple(&node_event));
}
#[test]
fn test_filter_key_hashes() {
let filter = SubscriptionFilter::keys(vec![100, 200, 300]);
let kv_event = PubSubEvent::KVChanged {
key_hash: 100,
snapshot_id: 100,
};
assert!(filter.matches_simple(&kv_event));
let kv_event_wrong = PubSubEvent::KVChanged {
key_hash: 999,
snapshot_id: 100,
};
assert!(!filter.matches_simple(&kv_event_wrong));
let node_event = PubSubEvent::NodeChanged {
node_id: 100,
snapshot_id: 100,
};
assert!(!filter.matches_simple(&node_event));
}
#[test]
fn test_filter_event_types() {
let filter =
SubscriptionFilter::event_types(vec![PubSubEventType::Node, PubSubEventType::Edge]);
let node_event = PubSubEvent::NodeChanged {
node_id: 1,
snapshot_id: 100,
};
assert!(filter.matches_simple(&node_event));
let edge_event = PubSubEvent::EdgeChanged {
edge_id: 2,
snapshot_id: 100,
};
assert!(filter.matches_simple(&edge_event));
let kv_event = PubSubEvent::KVChanged {
key_hash: 999,
snapshot_id: 100,
};
assert!(!filter.matches_simple(&kv_event));
let commit_event = PubSubEvent::SnapshotCommitted { snapshot_id: 100 };
assert!(!filter.matches_simple(&commit_event));
}
#[test]
fn test_filter_event_types_all_wildcard() {
let filter = SubscriptionFilter::event_types(vec![PubSubEventType::All]);
let node_event = PubSubEvent::NodeChanged {
node_id: 1,
snapshot_id: 100,
};
assert!(filter.matches_simple(&node_event));
let edge_event = PubSubEvent::EdgeChanged {
edge_id: 2,
snapshot_id: 100,
};
assert!(filter.matches_simple(&edge_event));
let kv_event = PubSubEvent::KVChanged {
key_hash: 999,
snapshot_id: 100,
};
assert!(filter.matches_simple(&kv_event));
let commit_event = PubSubEvent::SnapshotCommitted { snapshot_id: 100 };
assert!(filter.matches_simple(&commit_event));
}
#[test]
fn test_subscriber_creation() {
let filter = SubscriptionFilter::nodes(vec![1, 2, 3]);
let subscriber = Subscriber::new(filter.clone());
assert_eq!(subscriber.filter(), &filter);
let node_event = PubSubEvent::NodeChanged {
node_id: 2,
snapshot_id: 100,
};
assert!(subscriber.accepts(&node_event));
let edge_event = PubSubEvent::EdgeChanged {
edge_id: 999,
snapshot_id: 100,
};
assert!(!subscriber.accepts(&edge_event));
}
#[test]
fn test_multiple_subscribers_unique_ids() {
let filter1 = SubscriptionFilter::all();
let filter2 = SubscriptionFilter::all();
let sub1 = Subscriber::new(filter1);
let sub2 = Subscriber::new(filter2);
assert_ne!(sub1.id(), sub2.id());
}
#[test]
fn test_filter_specific_node() {
let filter = SubscriptionFilter::nodes(vec![42]);
let node_event_42 = PubSubEvent::NodeChanged {
node_id: 42,
snapshot_id: 100,
};
assert!(filter.matches_simple(&node_event_42));
let node_event_43 = PubSubEvent::NodeChanged {
node_id: 43,
snapshot_id: 100,
};
assert!(!filter.matches_simple(&node_event_43));
let edge_event = PubSubEvent::EdgeChanged {
edge_id: 42,
snapshot_id: 100,
};
assert!(!filter.matches_simple(&edge_event));
}
#[test]
fn test_filter_kind_patterns_wildcard() {
let filter = SubscriptionFilter::kind_patterns(vec!["agent:*".to_string()]);
let event = PubSubEvent::NodeChanged {
node_id: 1,
snapshot_id: 100,
};
assert!(!filter.matches(&event, None));
let metadata = NodeMetadata::new("agent:worker".to_string(), "agent-123".to_string());
assert!(filter.matches(&event, Some(&metadata)));
let wrong_metadata = NodeMetadata::new("user:admin".to_string(), "user-1".to_string());
assert!(!filter.matches(&event, Some(&wrong_metadata)));
}
#[test]
fn test_filter_name_patterns_wildcard() {
let filter = SubscriptionFilter::name_patterns(vec!["node-*".to_string()]);
let event = PubSubEvent::NodeChanged {
node_id: 1,
snapshot_id: 100,
};
assert!(!filter.matches(&event, None));
let metadata = NodeMetadata::new("any:kind".to_string(), "node-123".to_string());
assert!(filter.matches(&event, Some(&metadata)));
let wrong_metadata = NodeMetadata::new("any:kind".to_string(), "entity-123".to_string());
assert!(!filter.matches(&event, Some(&wrong_metadata)));
}
#[test]
fn test_filter_kind_patterns_multiple() {
let filter = SubscriptionFilter::kind_patterns(vec![
"agent:*".to_string(),
"user:*".to_string(),
]);
let event = PubSubEvent::NodeChanged {
node_id: 1,
snapshot_id: 100,
};
let agent_metadata = NodeMetadata::new("agent:worker".to_string(), "agent-1".to_string());
assert!(filter.matches(&event, Some(&agent_metadata)));
let user_metadata = NodeMetadata::new("user:admin".to_string(), "user-1".to_string());
assert!(filter.matches(&event, Some(&user_metadata)));
let system_metadata = NodeMetadata::new("system:process".to_string(), "sys-1".to_string());
assert!(!filter.matches(&event, Some(&system_metadata)));
}
#[test]
fn test_filter_name_patterns_question_mark() {
let filter = SubscriptionFilter::name_patterns(vec!["entity-???".to_string()]);
let event = PubSubEvent::NodeChanged {
node_id: 1,
snapshot_id: 100,
};
let metadata_match = NodeMetadata::new("any:kind".to_string(), "entity-123".to_string());
assert!(filter.matches(&event, Some(&metadata_match)));
let metadata_short = NodeMetadata::new("any:kind".to_string(), "entity-12".to_string());
assert!(!filter.matches(&event, Some(&metadata_short)));
let metadata_long = NodeMetadata::new("any:kind".to_string(), "entity-1234".to_string());
assert!(!filter.matches(&event, Some(&metadata_long)));
}
#[test]
fn test_filter_node_patterns_both() {
let filter = SubscriptionFilter::node_patterns(
vec!["agent:*".to_string()],
vec!["agent-*".to_string()],
);
let event = PubSubEvent::NodeChanged {
node_id: 1,
snapshot_id: 100,
};
let metadata_match = NodeMetadata::new("agent:worker".to_string(), "agent-123".to_string());
assert!(filter.matches(&event, Some(&metadata_match)));
let metadata_wrong_kind = NodeMetadata::new("user:admin".to_string(), "agent-123".to_string());
assert!(!filter.matches(&event, Some(&metadata_wrong_kind)));
let metadata_wrong_name = NodeMetadata::new("agent:worker".to_string(), "user-123".to_string());
assert!(!filter.matches(&event, Some(&metadata_wrong_name)));
}
#[test]
fn test_filter_has_patterns() {
let filter_no_patterns = SubscriptionFilter::nodes(vec![1, 2, 3]);
assert!(!filter_no_patterns.has_patterns());
let filter_kind_patterns = SubscriptionFilter::kind_patterns(vec!["agent:*".to_string()]);
assert!(filter_kind_patterns.has_patterns());
let filter_name_patterns = SubscriptionFilter::name_patterns(vec!["node-*".to_string()]);
assert!(filter_name_patterns.has_patterns());
let filter_both = SubscriptionFilter::node_patterns(
vec!["agent:*".to_string()],
vec!["agent-*".to_string()],
);
assert!(filter_both.has_patterns());
}
#[test]
fn test_pattern_filter_no_metadata_conservative() {
let filter = SubscriptionFilter::kind_patterns(vec!["agent:*".to_string()]);
let event = PubSubEvent::NodeChanged {
node_id: 1,
snapshot_id: 100,
};
assert!(!filter.matches(&event, None));
}
#[test]
fn test_subscriber_with_pattern_filter() {
let filter = SubscriptionFilter::kind_patterns(vec!["agent:*".to_string()]);
let subscriber = Subscriber::new(filter.clone());
let event = PubSubEvent::NodeChanged {
node_id: 1,
snapshot_id: 100,
};
assert!(!subscriber.accepts(&event));
let metadata = NodeMetadata::new("agent:worker".to_string(), "agent-1".to_string());
assert!(subscriber.accepts_with_metadata(&event, Some(&metadata)));
}
#[test]
fn test_pattern_filter_edge_event_no_match() {
let filter = SubscriptionFilter::kind_patterns(vec!["agent:*".to_string()]);
let event = PubSubEvent::EdgeChanged {
edge_id: 1,
snapshot_id: 100,
};
let metadata = NodeMetadata::new("agent:worker".to_string(), "agent-1".to_string());
assert!(!filter.matches(&event, Some(&metadata)));
}
#[test]
fn test_node_metadata_new() {
let metadata = NodeMetadata::new("agent:worker".to_string(), "agent-123".to_string());
assert_eq!(metadata.kind, "agent:worker");
assert_eq!(metadata.name, "agent-123");
}
}