velesdb-core 1.8.0

High-performance vector database engine written in Rust
Documentation
//! Deadlock Detection Tests for Cache Layer (US-CORE-003-12)
//!
//! Validates lock ordering and absence of deadlocks in concurrent scenarios.
//! Uses timeouts to detect potential deadlocks.

use std::sync::Arc;
use std::thread;
use std::time::Duration;

use super::{BloomFilter, LruCache};

/// Timeout for deadlock detection (if operation takes longer, likely deadlocked)
const DEADLOCK_TIMEOUT: Duration = Duration::from_secs(5);

// ========== LRU Cache Deadlock Tests ==========

#[test]
fn test_lru_cache_no_deadlock_concurrent_ops() {
    let cache: Arc<LruCache<u64, String>> = Arc::new(LruCache::new(100));
    let completed = Arc::new(std::sync::atomic::AtomicBool::new(false));
    let completed_clone = Arc::clone(&completed);

    let handle = thread::spawn(move || {
        let mut handles = vec![];

        // Multiple threads doing interleaved operations
        for t in 0..4 {
            let cache_clone = Arc::clone(&cache);
            handles.push(thread::spawn(move || {
                for i in 0..100 {
                    let key = t * 100 + i;
                    cache_clone.insert(key, format!("value_{key}"));
                    let _ = cache_clone.get(&key);
                    if i % 2 == 0 {
                        cache_clone.remove(&key);
                    }
                    let _ = cache_clone.stats();
                }
            }));
        }

        for h in handles {
            h.join().expect("Thread panicked");
        }

        completed_clone.store(true, std::sync::atomic::Ordering::SeqCst);
    });

    // Wait with timeout
    let start = std::time::Instant::now();
    while !completed.load(std::sync::atomic::Ordering::SeqCst) {
        if start.elapsed() > DEADLOCK_TIMEOUT {
            panic!("DEADLOCK DETECTED: LRU cache operations did not complete within timeout");
        }
        thread::sleep(Duration::from_millis(10));
    }

    handle.join().expect("Main thread panicked");
}

#[test]
fn test_lru_cache_no_deadlock_stats_during_eviction() {
    let cache: Arc<LruCache<u64, String>> = Arc::new(LruCache::new(10));
    let completed = Arc::new(std::sync::atomic::AtomicBool::new(false));
    let completed_clone = Arc::clone(&completed);

    let handle = thread::spawn(move || {
        let mut handles = vec![];

        // Thread causing evictions
        let cache_clone = Arc::clone(&cache);
        handles.push(thread::spawn(move || {
            for i in 0..1000 {
                cache_clone.insert(i, format!("evicting_{i}"));
            }
        }));

        // Thread reading stats during evictions
        let cache_clone = Arc::clone(&cache);
        handles.push(thread::spawn(move || {
            for _ in 0..100 {
                let stats = cache_clone.stats();
                // Just access stats, don't assert (race condition expected)
                let _ = stats.hits + stats.misses + stats.evictions;
            }
        }));

        for h in handles {
            h.join().expect("Thread panicked");
        }

        completed_clone.store(true, std::sync::atomic::Ordering::SeqCst);
    });

    let start = std::time::Instant::now();
    while !completed.load(std::sync::atomic::Ordering::SeqCst) {
        if start.elapsed() > DEADLOCK_TIMEOUT {
            panic!("DEADLOCK DETECTED: Stats during eviction caused deadlock");
        }
        thread::sleep(Duration::from_millis(10));
    }

    handle.join().expect("Main thread panicked");
}

// ========== Bloom Filter Deadlock Tests ==========

#[test]
fn test_bloom_filter_no_deadlock_concurrent_ops() {
    let bloom = Arc::new(BloomFilter::new(10_000, 0.01));
    let completed = Arc::new(std::sync::atomic::AtomicBool::new(false));
    let completed_clone = Arc::clone(&completed);

    let handle = thread::spawn(move || {
        let mut handles = vec![];

        // Multiple threads inserting and checking
        for t in 0..4 {
            let bloom_clone = Arc::clone(&bloom);
            handles.push(thread::spawn(move || {
                for i in 0..500 {
                    let key = t * 1000 + i;
                    bloom_clone.insert(&key);
                    let _ = bloom_clone.contains(&key);
                    let _ = bloom_clone.definitely_not_contains(&(key + 10000));
                }
            }));
        }

        for h in handles {
            h.join().expect("Thread panicked");
        }

        completed_clone.store(true, std::sync::atomic::Ordering::SeqCst);
    });

    let start = std::time::Instant::now();
    while !completed.load(std::sync::atomic::Ordering::SeqCst) {
        if start.elapsed() > DEADLOCK_TIMEOUT {
            panic!("DEADLOCK DETECTED: Bloom filter operations caused deadlock");
        }
        thread::sleep(Duration::from_millis(10));
    }

    handle.join().expect("Main thread panicked");
}

#[test]
fn test_bloom_filter_no_deadlock_insert_during_contains() {
    let bloom = Arc::new(BloomFilter::new(10_000, 0.01));
    let completed = Arc::new(std::sync::atomic::AtomicBool::new(false));
    let completed_clone = Arc::clone(&completed);

    let handle = thread::spawn(move || {
        let mut handles = vec![];

        // Heavy inserter
        let bloom_clone = Arc::clone(&bloom);
        handles.push(thread::spawn(move || {
            for i in 0..2000 {
                bloom_clone.insert(&i);
            }
        }));

        // Heavy reader
        let bloom_clone = Arc::clone(&bloom);
        handles.push(thread::spawn(move || {
            for i in 0..2000 {
                let _ = bloom_clone.contains(&i);
            }
        }));

        for h in handles {
            h.join().expect("Thread panicked");
        }

        completed_clone.store(true, std::sync::atomic::Ordering::SeqCst);
    });

    let start = std::time::Instant::now();
    while !completed.load(std::sync::atomic::Ordering::SeqCst) {
        if start.elapsed() > DEADLOCK_TIMEOUT {
            panic!("DEADLOCK DETECTED: Bloom insert during contains caused deadlock");
        }
        thread::sleep(Duration::from_millis(10));
    }

    handle.join().expect("Main thread panicked");
}

// ========== Cross-Module Deadlock Tests ==========

#[test]
fn test_no_deadlock_cache_and_bloom_together() {
    let cache: Arc<LruCache<u64, String>> = Arc::new(LruCache::new(100));
    let bloom = Arc::new(BloomFilter::new(1000, 0.01));
    let completed = Arc::new(std::sync::atomic::AtomicBool::new(false));
    let completed_clone = Arc::clone(&completed);

    let handle = thread::spawn(move || {
        let mut handles = vec![];

        // Simulate realistic usage: check bloom, then cache
        for t in 0..4 {
            let cache_clone = Arc::clone(&cache);
            let bloom_clone = Arc::clone(&bloom);

            handles.push(thread::spawn(move || {
                for i in 0..200 {
                    let key = t * 1000 + i;

                    // Pattern: bloom check -> cache access (common in real code)
                    bloom_clone.insert(&key);

                    if bloom_clone.contains(&key) {
                        cache_clone.insert(key, format!("cached_{key}"));
                        let _ = cache_clone.get(&key);
                    }
                }
            }));
        }

        for h in handles {
            h.join().expect("Thread panicked");
        }

        completed_clone.store(true, std::sync::atomic::Ordering::SeqCst);
    });

    let start = std::time::Instant::now();
    while !completed.load(std::sync::atomic::Ordering::SeqCst) {
        if start.elapsed() > DEADLOCK_TIMEOUT {
            panic!("DEADLOCK DETECTED: Cache + Bloom together caused deadlock");
        }
        thread::sleep(Duration::from_millis(10));
    }

    handle.join().expect("Main thread panicked");
}

// ========== Lock Ordering Documentation Test ==========

#[test]
fn test_lock_ordering_documented() {
    // This test documents the lock ordering to prevent future deadlocks
    //
    // LOCK HIERARCHY (always acquire in this order):
    // 1. BloomFilter.bits (RwLock)
    // 2. BloomFilter.count (RwLock)
    // 3. LruCache.inner (RwLock)
    //
    // RULES:
    // - Never hold a lower-level lock while acquiring a higher-level lock
    // - BloomFilter uses separate locks for bits and count (both acquired independently)
    // - LruCache uses a single RwLock for all operations
    //
    // The current implementation is deadlock-free because:
    // - BloomFilter: insert() acquires bits.write() then count.write() sequentially
    // - BloomFilter: contains() only acquires bits.read()
    // - LruCache: All ops acquire single inner lock
    // - No cross-module lock dependencies

    assert!(true, "Lock ordering is documented and verified");
}