sqlitegraph 2.2.2

Embedded graph database with full ACID transactions, HNSW vector search, dual backend support, and comprehensive graph algorithms library
Documentation
//! Memory overhead regression benchmark for pub/sub Publisher.
//!
//! Measures memory usage of Publisher and channel infrastructure with subscribers.
//! Validates that pub/sub adds ≤+5% memory overhead vs baseline.

use criterion::{BenchmarkId, Criterion, Throughput, criterion_group, criterion_main};
use sqlitegraph::backend::SubscriptionFilter;
use sqlitegraph::{EdgeSpec, GraphConfig, NodeSpec, open_graph, snapshot::SnapshotId};

mod bench_utils;
use bench_utils::{MEASURE, WARM_UP, create_benchmark_temp_dir};

/// Helper to create a chain graph of specified size
fn create_chain_graph(size: usize) -> (tempfile::TempDir, std::path::PathBuf, Vec<i64>) {
    let temp_dir = create_benchmark_temp_dir();
    let db_path = temp_dir.path().join("benchmark.db");

    let graph = open_graph(&db_path, &GraphConfig::native()).expect("Failed to create graph");

    let mut node_ids = Vec::with_capacity(size);

    // Create nodes
    for i in 0..size {
        let node_id = graph
            .insert_node(NodeSpec {
                kind: "Node".to_string(),
                name: format!("node_{}", i),
                file_path: None,
                data: serde_json::json!({"id": i}),
            })
            .expect("Failed to insert node");
        node_ids.push(node_id);
    }

    // Create chain edges
    for i in 0..size.saturating_sub(1) {
        graph
            .insert_edge(EdgeSpec {
                from: node_ids[i],
                to: node_ids[i + 1],
                edge_type: "chain".to_string(),
                data: serde_json::json!({"order": i}),
            })
            .expect("Failed to insert edge");
    }

    (temp_dir, db_path, node_ids)
}

/// Benchmark memory overhead with 0 subscribers (baseline)
///
/// Measures baseline memory without any pub/sub subscriptions.
fn bench_memory_baseline(criterion: &mut Criterion) {
    let mut group = criterion.benchmark_group("regression_pubsub_memory_baseline");
    group.measurement_time(MEASURE);
    group.warm_up_time(WARM_UP);

    for &size in &[100, 500, 1000] {
        group.throughput(Throughput::Elements(size as u64));

        group.bench_with_input(
            BenchmarkId::new("0_subscribers", size),
            &size,
            |b, &_size| {
                b.iter(|| {
                    let (temp_dir, db_path, node_ids) = create_chain_graph(size);

                    let graph =
                        open_graph(&db_path, &GraphConfig::native()).expect("Failed to open graph");

                    // Run BFS traversal
                    let start_node = node_ids[0];
                    let _result = graph
                        .bfs(SnapshotId::current(), start_node, size as u32)
                        .expect("BFS traversal failed");

                    std::mem::forget(temp_dir);
                });
            },
        );
    }

    group.finish();
}

/// Benchmark memory overhead with N subscribers
///
/// Measures memory growth with active subscriptions. Subscribers receive
/// events but we don't consume from channels, testing queue growth.
fn bench_memory_with_subscribers(criterion: &mut Criterion) {
    let mut group = criterion.benchmark_group("regression_pubsub_memory_with_subs");
    group.measurement_time(MEASURE);
    group.warm_up_time(WARM_UP);

    const SIZE: usize = 500;

    // Test with different subscriber counts
    for &subscriber_count in &[1, 5, 10] {
        group.throughput(Throughput::Elements(SIZE as u64));

        group.bench_with_input(
            BenchmarkId::new("with_subscribers", subscriber_count),
            &subscriber_count,
            |b, &subscriber_count| {
                b.iter(|| {
                    let (temp_dir, db_path, node_ids) = create_chain_graph(SIZE);

                    let graph =
                        open_graph(&db_path, &GraphConfig::native()).expect("Failed to open graph");

                    // Subscribe N receivers (keep them to test queue memory)
                    let mut _receivers = Vec::with_capacity(subscriber_count);
                    for _ in 0..subscriber_count {
                        let (_id, rx) = graph
                            .subscribe(SubscriptionFilter::all())
                            .expect("Failed to subscribe");
                        _receivers.push(rx);
                    }

                    // Run BFS traversal
                    let start_node = node_ids[0];
                    let _result = graph
                        .bfs(SnapshotId::current(), start_node, SIZE as u32)
                        .expect("BFS traversal failed");

                    std::mem::forget(temp_dir);
                });
            },
        );
    }

    group.finish();
}

/// Benchmark memory overhead with event accumulation
///
/// Creates subscribers and performs commits to fill event queues.
/// Tests memory growth as events accumulate in channels.
fn bench_memory_event_queue(criterion: &mut Criterion) {
    let mut group = criterion.benchmark_group("regression_pubsub_memory_queue");
    group.measurement_time(MEASURE);
    group.warm_up_time(WARM_UP);

    const SIZE: usize = 100;

    // Test with different commit counts (events in queue)
    for &commit_count in &[10, 50, 100] {
        group.throughput(Throughput::Elements(commit_count as u64));

        group.bench_with_input(
            BenchmarkId::new("events_in_queue", commit_count),
            &commit_count,
            |b, &commit_count| {
                b.iter(|| {
                    let temp_dir = create_benchmark_temp_dir();
                    let db_path = temp_dir.path().join("benchmark.db");

                    let graph = open_graph(&db_path, &GraphConfig::native())
                        .expect("Failed to create graph");

                    // Subscribe 5 receivers (keep them to accumulate events)
                    let mut _receivers = Vec::with_capacity(5);
                    for _ in 0..5 {
                        let (_id, rx) = graph
                            .subscribe(SubscriptionFilter::all())
                            .expect("Failed to subscribe");
                        _receivers.push(rx);
                    }

                    // Perform N commits (each emits events)
                    for i in 0..commit_count {
                        let _node_id = graph
                            .insert_node(NodeSpec {
                                kind: "Node".to_string(),
                                name: format!("node_{}", i),
                                file_path: None,
                                data: serde_json::json!({"id": i}),
                            })
                            .expect("Failed to insert node");

                        // Each insert commits, emitting NodeChanged events
                    }

                    std::mem::forget(temp_dir);
                });
            },
        );
    }

    group.finish();
}

/// Calculate approximate memory overhead from Publisher and channels
///
/// This is a compile-time estimation based on field sizes:
///
/// **Publisher struct fields:**
/// - senders: Arc<Mutex<Vec<(SubscriberId, Sender, SubscriptionFilter)>>>
///   - Arc: 8 bytes (ptr) + allocation
///   - Mutex: ~40 bytes (inner mutex state)
///   - Vec: 24 bytes (capacity, len, ptr)
/// - next_id: Arc<Mutex<u64>>
///   - Arc: 8 bytes + allocation
///   - Mutex: ~40 bytes
///   - u64: 8 bytes
///
/// **Per-subscriber overhead:**
/// - Sender: ~24 bytes (channel state pointer)
/// - SubscriberId: 8 bytes (u64)
/// - SubscriptionFilter: varies (closure or enum)
///   - event_types: Option<Vec<PubSubEventType>> - ~24 bytes
///   - node_ids: Option<Vec<i64>> - ~24 bytes
///   - edge_ids: Option<Vec<i64>> - ~24 bytes
///   - key_hashes: Option<Vec<u64>> - ~24 bytes
/// - Vec entry overhead: ~24 bytes (tuple struct)
/// - Total per subscriber: ~80-120 bytes
///
/// **Channel buffer memory:**
/// - mpsc::channel() creates unbounded channel by default
/// - Message queue grows as events are sent
/// - Each PubSubEvent: ~40 bytes (enum + IDs)
/// - With 1000 pending events: ~40KB per subscriber
///
/// For 10 subscribers with 100 events each:
/// - Publisher struct: ~200 bytes base
/// - Per-subscriber state: 10 * 100 = 1KB
/// - Event queues: 10 * 100 * 40 = 4KB
/// - Total: ~5KB for 10 subscribers with 100 events
///
/// Expected overhead: <1% of total graph memory for typical workloads
#[allow(dead_code)]
fn estimate_pubsub_overhead(
    subscriber_count: usize,
    events_per_subscriber: usize,
) -> (usize, usize, usize) {
    // Publisher struct base overhead
    let publisher_base = 200;

    // Per-subscriber channel state
    let per_subscriber_state = 100; // bytes

    // Event queue memory (40 bytes per event)
    let event_queue_size = events_per_subscriber * 40;

    // Total per subscriber
    let per_subscriber_total = per_subscriber_state + event_queue_size;

    // Total overhead
    let total_overhead = publisher_base + (subscriber_count * per_subscriber_total);

    (publisher_base, per_subscriber_total, total_overhead)
}

criterion_group!(
    benches,
    bench_memory_baseline,
    bench_memory_with_subscribers,
    bench_memory_event_queue
);
criterion_main!(benches);