sqlitegraph 3.4.0

Embedded graph database with full ACID transactions, HNSW vector search, dual backend support, and comprehensive graph algorithms library
Documentation
//! Pub/Sub streams specification for consumer group coordination.
//!
//! Tests define expected behavior for reliable message delivery with consumer groups.
//! Implementation must pass all specs before being considered complete.

use crate::sharding::Change;

/// Consumer group message with offset tracking.
#[derive(Debug, Clone)]
pub struct ConsumerGroupMessage {
    pub offset: usize,
    pub change: Change,
}

/// Consumer group tracking message offsets.
#[derive(Debug, Clone)]
pub struct ConsumerGroup {
    pub name: String,
    pub last_offset: usize,
    pub pending: Vec<usize>,
}

#[cfg(test)]
mod specs {
    use super::*;
    use crate::sharding::PubSub;

    /// Spec: Create consumer group.
    #[test]
    fn spec_create_consumer_group() {
        let pubsub = PubSub::new();

        pubsub
            .create_consumer_group("test-group".to_string())
            .unwrap();

        // Group should exist
        let groups = pubsub.list_consumer_groups().unwrap();
        assert_eq!(groups.len(), 1);
        assert!(groups.contains(&"test-group".to_string()));
    }

    /// Spec: Subscribe to topics as consumer group.
    #[test]
    fn spec_consumer_group_subscription() {
        let pubsub = PubSub::new();

        pubsub.create_consumer_group("agents".to_string()).unwrap();
        pubsub.publish(Change::node_inserted(100, "graph.node".to_string()));
        pubsub.publish(Change::node_inserted(200, "graph.node".to_string()));

        pubsub
            .subscribe_group("agents", vec!["graph.node".to_string()])
            .unwrap();

        // Should deliver from beginning
        let messages = pubsub.fetch_messages("agents", 10).unwrap();
        assert_eq!(messages.len(), 2);
        assert_eq!(messages[0].change.node_id, Some(100));
    }

    /// Spec: Consumer group tracks last offset.
    #[test]
    fn spec_consumer_group_offset_tracking() {
        let pubsub = PubSub::new();

        pubsub
            .create_consumer_group("workflow".to_string())
            .unwrap();

        pubsub.publish(Change::node_inserted(100, "graph.node".to_string()));
        pubsub.publish(Change::node_inserted(200, "graph.node".to_string()));

        pubsub
            .subscribe_group("workflow", vec!["graph.node".to_string()])
            .unwrap();

        // Fetch first message
        let messages = pubsub.fetch_messages("workflow", 1).unwrap();
        assert_eq!(messages.len(), 1);
        assert_eq!(messages[0].offset, 0);
    }

    /// Spec: Acknowledged messages not re-delivered.
    #[test]
    fn spec_consumer_group_ack() {
        let pubsub = PubSub::new();

        pubsub
            .create_consumer_group("reliable".to_string())
            .unwrap();

        pubsub.publish(Change::node_inserted(100, "graph.node".to_string()));
        pubsub.publish(Change::node_inserted(200, "graph.node".to_string()));

        pubsub
            .subscribe_group("reliable", vec!["graph.node".to_string()])
            .unwrap();

        // Fetch and ack first message
        let messages = pubsub.fetch_messages("reliable", 10).unwrap();
        assert_eq!(messages.len(), 2);
        pubsub.ack("reliable", messages[0].offset).unwrap();

        // Next fetch starts after ack
        let messages = pubsub.fetch_messages("reliable", 10).unwrap();
        assert_eq!(messages.len(), 1);
        assert_eq!(messages[0].change.node_id, Some(200));
    }

    /// Spec: Consumer group filters by topic.
    #[test]
    fn spec_consumer_group_topic_filtering() {
        let pubsub = PubSub::new();

        pubsub
            .create_consumer_group("selective".to_string())
            .unwrap();

        pubsub.publish(Change::node_inserted(100, "graph.node".to_string()));
        pubsub.publish(Change::edge_inserted(100, 200, "graph.edge".to_string()));

        pubsub
            .subscribe_group(
                "selective",
                vec!["graph.node".to_string()], // Only node events
            )
            .unwrap();

        let messages = pubsub.fetch_messages("selective", 10).unwrap();
        assert_eq!(messages.len(), 1);
        assert_eq!(
            messages[0].change.change_type,
            crate::sharding::ChangeType::NodeInserted
        );
    }

    /// Spec: Multiple consumer groups independent.
    #[test]
    fn spec_multiple_consumer_groups() {
        let pubsub = PubSub::new();

        pubsub.create_consumer_group("group-a".to_string()).unwrap();
        pubsub.create_consumer_group("group-b".to_string()).unwrap();

        pubsub.publish(Change::node_inserted(100, "graph.node".to_string()));

        pubsub
            .subscribe_group("group-a", vec!["graph.node".to_string()])
            .unwrap();
        pubsub
            .subscribe_group("group-b", vec!["graph.node".to_string()])
            .unwrap();

        // Both groups receive same message
        let messages_a = pubsub.fetch_messages("group-a", 10).unwrap();
        let messages_b = pubsub.fetch_messages("group-b", 10).unwrap();

        assert_eq!(messages_a.len(), 1);
        assert_eq!(messages_b.len(), 1);
    }

    /// Spec: Consumer group sees new messages after offset.
    #[test]
    fn spec_consumer_group_new_messages() {
        let pubsub = PubSub::new();

        pubsub
            .create_consumer_group("incremental".to_string())
            .unwrap();

        pubsub
            .subscribe_group("incremental", vec!["graph.node".to_string()])
            .unwrap();

        // Fetch empty initially
        let messages = pubsub.fetch_messages("incremental", 10).unwrap();
        assert_eq!(messages.len(), 0);

        // Publish new message
        pubsub.publish(Change::node_inserted(300, "graph.node".to_string()));

        // Next fetch delivers new message
        let messages = pubsub.fetch_messages("incremental", 10).unwrap();
        assert_eq!(messages.len(), 1);
        assert_eq!(messages[0].change.node_id, Some(300));
    }

    /// Spec: Fetch limit controls batch size.
    #[test]
    fn spec_consumer_group_fetch_limit() {
        let pubsub = PubSub::new();

        pubsub.create_consumer_group("batch".to_string()).unwrap();

        for i in 0..10 {
            pubsub.publish(Change::node_inserted(i, "graph.node".to_string()));
        }

        pubsub
            .subscribe_group("batch", vec!["graph.node".to_string()])
            .unwrap();

        // Fetch in batches of 3
        let batch1 = pubsub.fetch_messages("batch", 3).unwrap();
        assert_eq!(batch1.len(), 3);

        let batch2 = pubsub.fetch_messages("batch", 3).unwrap();
        assert_eq!(batch2.len(), 3);

        // Total across batches
        let total = batch1.len() + batch2.len();
        assert!(total <= 10);
    }

    /// Spec: Pending messages tracked for delivery guarantee.
    #[test]
    fn spec_consumer_group_pending_tracking() {
        let pubsub = PubSub::new();

        pubsub.create_consumer_group("tracked".to_string()).unwrap();

        pubsub.publish(Change::node_inserted(100, "graph.node".to_string()));
        pubsub.publish(Change::node_inserted(200, "graph.node".to_string()));

        pubsub
            .subscribe_group("tracked", vec!["graph.node".to_string()])
            .unwrap();

        // Fetch but don't ack
        let messages = pubsub.fetch_messages("tracked", 10).unwrap();
        assert_eq!(messages.len(), 2);

        // Check pending
        let group = pubsub.get_consumer_group("tracked").unwrap();
        assert_eq!(group.pending.len(), 2);
    }

    /// Spec: Pending cleared on ack.
    #[test]
    fn spec_consumer_group_pending_cleared() {
        let pubsub = PubSub::new();

        pubsub.create_consumer_group("tracked".to_string()).unwrap();

        pubsub.publish(Change::node_inserted(100, "graph.node".to_string()));

        pubsub
            .subscribe_group("tracked", vec!["graph.node".to_string()])
            .unwrap();
        let messages = pubsub.fetch_messages("tracked", 10).unwrap();

        pubsub.ack("tracked", messages[0].offset).unwrap();

        let group = pubsub.get_consumer_group("tracked").unwrap();
        assert!(group.pending.is_empty());
    }

    /// Spec: Consumer group persists across restarts.
    #[test]
    fn spec_consumer_group_persistence() {
        let pubsub = PubSub::new();

        pubsub
            .create_consumer_group("persistent".to_string())
            .unwrap();
        pubsub.publish(Change::node_inserted(100, "graph.node".to_string()));

        pubsub
            .subscribe_group("persistent", vec!["graph.node".to_string()])
            .unwrap();

        let messages = pubsub.fetch_messages("persistent", 10).unwrap();
        pubsub.ack("persistent", messages[0].offset).unwrap();

        // Simulate restart: offset should be remembered
        let group = pubsub.get_consumer_group("persistent").unwrap();
        assert_eq!(group.last_offset, 1); // Next offset after ack
    }
}