use crate::sharding::Change;
#[derive(Debug, Clone)]
pub struct ConsumerGroupMessage {
pub offset: usize,
pub change: Change,
}
#[derive(Debug, Clone)]
pub struct ConsumerGroup {
pub name: String,
pub last_offset: usize,
pub pending: Vec<usize>,
}
#[cfg(test)]
mod specs {
use super::*;
use crate::sharding::PubSub;
#[test]
fn spec_create_consumer_group() {
let pubsub = PubSub::new();
pubsub
.create_consumer_group("test-group".to_string())
.unwrap();
let groups = pubsub.list_consumer_groups().unwrap();
assert_eq!(groups.len(), 1);
assert!(groups.contains(&"test-group".to_string()));
}
#[test]
fn spec_consumer_group_subscription() {
let pubsub = PubSub::new();
pubsub.create_consumer_group("agents".to_string()).unwrap();
pubsub.publish(Change::node_inserted(100, "graph.node".to_string()));
pubsub.publish(Change::node_inserted(200, "graph.node".to_string()));
pubsub
.subscribe_group("agents", vec!["graph.node".to_string()])
.unwrap();
let messages = pubsub.fetch_messages("agents", 10).unwrap();
assert_eq!(messages.len(), 2);
assert_eq!(messages[0].change.node_id, Some(100));
}
#[test]
fn spec_consumer_group_offset_tracking() {
let pubsub = PubSub::new();
pubsub
.create_consumer_group("workflow".to_string())
.unwrap();
pubsub.publish(Change::node_inserted(100, "graph.node".to_string()));
pubsub.publish(Change::node_inserted(200, "graph.node".to_string()));
pubsub
.subscribe_group("workflow", vec!["graph.node".to_string()])
.unwrap();
let messages = pubsub.fetch_messages("workflow", 1).unwrap();
assert_eq!(messages.len(), 1);
assert_eq!(messages[0].offset, 0);
}
#[test]
fn spec_consumer_group_ack() {
let pubsub = PubSub::new();
pubsub
.create_consumer_group("reliable".to_string())
.unwrap();
pubsub.publish(Change::node_inserted(100, "graph.node".to_string()));
pubsub.publish(Change::node_inserted(200, "graph.node".to_string()));
pubsub
.subscribe_group("reliable", vec!["graph.node".to_string()])
.unwrap();
let messages = pubsub.fetch_messages("reliable", 10).unwrap();
assert_eq!(messages.len(), 2);
pubsub.ack("reliable", messages[0].offset).unwrap();
let messages = pubsub.fetch_messages("reliable", 10).unwrap();
assert_eq!(messages.len(), 1);
assert_eq!(messages[0].change.node_id, Some(200));
}
#[test]
fn spec_consumer_group_topic_filtering() {
let pubsub = PubSub::new();
pubsub
.create_consumer_group("selective".to_string())
.unwrap();
pubsub.publish(Change::node_inserted(100, "graph.node".to_string()));
pubsub.publish(Change::edge_inserted(100, 200, "graph.edge".to_string()));
pubsub
.subscribe_group(
"selective",
vec!["graph.node".to_string()], )
.unwrap();
let messages = pubsub.fetch_messages("selective", 10).unwrap();
assert_eq!(messages.len(), 1);
assert_eq!(
messages[0].change.change_type,
crate::sharding::ChangeType::NodeInserted
);
}
#[test]
fn spec_multiple_consumer_groups() {
let pubsub = PubSub::new();
pubsub.create_consumer_group("group-a".to_string()).unwrap();
pubsub.create_consumer_group("group-b".to_string()).unwrap();
pubsub.publish(Change::node_inserted(100, "graph.node".to_string()));
pubsub
.subscribe_group("group-a", vec!["graph.node".to_string()])
.unwrap();
pubsub
.subscribe_group("group-b", vec!["graph.node".to_string()])
.unwrap();
let messages_a = pubsub.fetch_messages("group-a", 10).unwrap();
let messages_b = pubsub.fetch_messages("group-b", 10).unwrap();
assert_eq!(messages_a.len(), 1);
assert_eq!(messages_b.len(), 1);
}
#[test]
fn spec_consumer_group_new_messages() {
let pubsub = PubSub::new();
pubsub
.create_consumer_group("incremental".to_string())
.unwrap();
pubsub
.subscribe_group("incremental", vec!["graph.node".to_string()])
.unwrap();
let messages = pubsub.fetch_messages("incremental", 10).unwrap();
assert_eq!(messages.len(), 0);
pubsub.publish(Change::node_inserted(300, "graph.node".to_string()));
let messages = pubsub.fetch_messages("incremental", 10).unwrap();
assert_eq!(messages.len(), 1);
assert_eq!(messages[0].change.node_id, Some(300));
}
#[test]
fn spec_consumer_group_fetch_limit() {
let pubsub = PubSub::new();
pubsub.create_consumer_group("batch".to_string()).unwrap();
for i in 0..10 {
pubsub.publish(Change::node_inserted(i, "graph.node".to_string()));
}
pubsub
.subscribe_group("batch", vec!["graph.node".to_string()])
.unwrap();
let batch1 = pubsub.fetch_messages("batch", 3).unwrap();
assert_eq!(batch1.len(), 3);
let batch2 = pubsub.fetch_messages("batch", 3).unwrap();
assert_eq!(batch2.len(), 3);
let total = batch1.len() + batch2.len();
assert!(total <= 10);
}
#[test]
fn spec_consumer_group_pending_tracking() {
let pubsub = PubSub::new();
pubsub.create_consumer_group("tracked".to_string()).unwrap();
pubsub.publish(Change::node_inserted(100, "graph.node".to_string()));
pubsub.publish(Change::node_inserted(200, "graph.node".to_string()));
pubsub
.subscribe_group("tracked", vec!["graph.node".to_string()])
.unwrap();
let messages = pubsub.fetch_messages("tracked", 10).unwrap();
assert_eq!(messages.len(), 2);
let group = pubsub.get_consumer_group("tracked").unwrap();
assert_eq!(group.pending.len(), 2);
}
#[test]
fn spec_consumer_group_pending_cleared() {
let pubsub = PubSub::new();
pubsub.create_consumer_group("tracked".to_string()).unwrap();
pubsub.publish(Change::node_inserted(100, "graph.node".to_string()));
pubsub
.subscribe_group("tracked", vec!["graph.node".to_string()])
.unwrap();
let messages = pubsub.fetch_messages("tracked", 10).unwrap();
pubsub.ack("tracked", messages[0].offset).unwrap();
let group = pubsub.get_consumer_group("tracked").unwrap();
assert!(group.pending.is_empty());
}
#[test]
fn spec_consumer_group_persistence() {
let pubsub = PubSub::new();
pubsub
.create_consumer_group("persistent".to_string())
.unwrap();
pubsub.publish(Change::node_inserted(100, "graph.node".to_string()));
pubsub
.subscribe_group("persistent", vec!["graph.node".to_string()])
.unwrap();
let messages = pubsub.fetch_messages("persistent", 10).unwrap();
pubsub.ack("persistent", messages[0].offset).unwrap();
let group = pubsub.get_consumer_group("persistent").unwrap();
assert_eq!(group.last_offset, 1); }
}