melange_db 0.2.8

基于 sled 架构深度优化的下一代高性能嵌入式数据库,支持 ARM64 NEON SIMD 优化、多级缓存和布隆过滤器
Documentation
use melange_db::{Db, Config, platform_utils, atomic_operations_manager::AtomicOperationsManager};
use std::sync::Arc;
use std::thread;
use std::io;

fn main() -> io::Result<()> {
    println!("🚀 原子操作统一管理器测试");
    println!("==========================");

    // 创建临时数据库
    let db_path = platform_utils::setup_example_db("atomic_operations_test");
    platform_utils::cleanup_db_directory(&db_path);

    let config = Config::new().path(&db_path);
    let db: Db<1024> = config.open()?;
    let db = Arc::new(db);

    println!("\n📋 测试1: 创建AtomicOperationsManager");
    println!("------------------------------------");

    // 创建原子操作统一管理器
    let atomic_manager = Arc::new(AtomicOperationsManager::new(db.clone()));
    println!("✅ AtomicOperationsManager创建成功");

    println!("\n📋 测试2: 基本原子递增");
    println!("--------------------");

    // 测试基本递增功能
    let val1 = atomic_manager.increment("test_counter".to_string(), 1)?;
    println!("  第1次递增: {}", val1);

    let val2 = atomic_manager.increment("test_counter".to_string(), 1)?;
    println!("  第2次递增: {}", val2);

    let val3 = atomic_manager.increment("test_counter".to_string(), 5)?;
    println!("  步长5递增: {}", val3);

    let current = atomic_manager.get("test_counter".to_string())?;
    println!("  当前计数器值: {:?}", current);

    if current == Some(7) {
        println!("  ✅ 基本递增测试通过");
    } else {
        println!("  ❌ 基本递增测试失败: 预期7,实际{:?}", current);
    }

    println!("\n📋 测试3: 新原子操作测试");
    println!("----------------------");

    // 测试原子递减
    let dec_val = atomic_manager.decrement("test_counter".to_string(), 3)?;
    println!("  递减3: {} (预期 4)", dec_val);

    // 测试原子乘法
    let mul_val = atomic_manager.multiply("test_counter".to_string(), 2)?;
    println!("  乘以2: {} (预期 8)", mul_val);

    // 测试原子除法
    let div_val = atomic_manager.divide("test_counter".to_string(), 2)?;
    println!("  除以2: {} (预期 4)", div_val);

    // 测试原子百分比
    let pct_val = atomic_manager.percentage("test_counter".to_string(), 50)?;
    println!("  50%: {} (预期 2)", pct_val);

    let final_value = atomic_manager.get("test_counter".to_string())?;
    println!("  最终值: {:?}", final_value);

    if final_value == Some(2) {
        println!("  ✅ 新原子操作测试通过");
    } else {
        println!("  ❌ 新原子操作测试失败: 预期2,实际{:?}", final_value);
    }

    println!("\n📋 测试4: 原子比较和交换(CAS)");
    println!("---------------------------");

    // 测试成功的CAS操作
    let cas_success = atomic_manager.compare_and_swap("cas_counter".to_string(), 0, 100)?;
    println!("  CAS(0->100): {} (预期 true,因为默认值是0)", cas_success);

    let cas_value = atomic_manager.get("cas_counter".to_string())?;
    println!("  CAS后的值: {:?} (预期 Some(100))", cas_value);

    // 测试失败的CAS操作
    let cas_fail = atomic_manager.compare_and_swap("cas_counter".to_string(), 0, 200)?;
    println!("  CAS(0->200): {} (预期 false,因为当前值是100)", cas_fail);

    let cas_value2 = atomic_manager.get("cas_counter".to_string())?;
    println!("  失败CAS后的值: {:?} (预期 Some(100))", cas_value2);

    if cas_success && !cas_fail && cas_value == Some(100) && cas_value2 == Some(100) {
        println!("  ✅ 原子比较和交换测试通过");
    } else {
        println!("  ❌ 原子比较和交换测试失败");
    }

    println!("\n📋 测试5: 多线程高压力并发测试");
    println!("----------------------------");

    let mut handles = vec![];

    // 线程1-3:高频原子递增操作
    for thread_id in 1..=3 {
        let manager_clone = Arc::clone(&atomic_manager);
        let handle = thread::spawn(move || {
            for i in 0..30 {
                match manager_clone.increment("high_freq_counter".to_string(), 1) {
                    Ok(value) => {
                        if i % 10 == 0 {
                            println!("  线程{}(高频): 计数器 = {}", thread_id, value);
                        }
                    }
                    Err(e) => eprintln!("  线程{}高频操作失败: {:?}", thread_id, e),
                }
            }
        });
        handles.push(handle);
    }

    // 线程4-5:混合原子操作测试
    for thread_id in 4..=5 {
        let manager_clone = Arc::clone(&atomic_manager);
        let handle = thread::spawn(move || {
            for i in 0..20 {
                // 使用不同的原子操作类型
                let result: Result<(), std::io::Error> = match i % 6 {
                    0 => manager_clone.increment("mixed_ops_counter".to_string(), 5).map(|_| ()),
                    1 => manager_clone.decrement("mixed_ops_counter".to_string(), 1).map(|_| ()),
                    2 => manager_clone.multiply("mixed_ops_counter".to_string(), 2).map(|_| ()),
                    3 => manager_clone.divide("mixed_ops_counter".to_string(), 3).map(|_| ()),
                    4 => manager_clone.percentage("mixed_ops_counter".to_string(), 50).map(|_| ()),
                    _ => {
                        // CAS操作:尝试将当前值翻倍
                        if let Ok(Some(current)) = manager_clone.get("mixed_ops_counter".to_string()) {
                            manager_clone.compare_and_swap("mixed_ops_counter".to_string(), current, current * 2).map(|_| ())
                        } else {
                            Ok(())
                        }
                    }
                };

                if let Err(e) = result {
                    eprintln!("  线程{}混合操作失败: {:?}", thread_id, e);
                }

                if i % 5 == 0 {
                    if let Ok(Some(value)) = manager_clone.get("mixed_ops_counter".to_string()) {
                        println!("  线程{}(混合): 计数器 = {}", thread_id, value);
                    }
                }
            }
        });
        handles.push(handle);
    }

    // 线程6:ID分配器测试
    let manager_clone6 = Arc::clone(&atomic_manager);
    let handle6 = thread::spawn(move || {
        for i in 0..25 {
            match manager_clone6.increment("user_id_allocator".to_string(), 1) {
                Ok(user_id) => {
                    // 模拟创建用户数据
                    let user_key = format!("user:{}:profile", user_id);
                    let user_value = format!("用户{}", i);

                    if let Err(e) = manager_clone6.insert(user_key.as_bytes(), user_value.as_bytes()) {
                        eprintln!("  线程6用户数据写入失败: {:?}", e);
                    }

                    if i % 8 == 0 {
                        println!("  线程6(ID分配): 创建用户{} ID:{}", i, user_id);
                    }
                }
                Err(e) => eprintln!("  线程6用户ID分配失败: {:?}", e),
            }
        }
    });
    handles.push(handle6);

    // 线程7:读取统计线程
    let manager_clone7 = Arc::clone(&atomic_manager);
    let handle7 = thread::spawn(move || {
        for i in 0..15 {
            // 读取各种计数器
            let high_freq = manager_clone7.get("high_freq_counter".to_string()).unwrap_or(Some(0)).unwrap_or(0);
            let user_ids = manager_clone7.get("user_id_allocator".to_string()).unwrap_or(Some(0)).unwrap_or(0);
            let mixed_ops = manager_clone7.get("mixed_ops_counter".to_string()).unwrap_or(Some(0)).unwrap_or(0);

            // 读取数据库记录
            let user_count = manager_clone7.scan_prefix(b"user:").unwrap_or_default().len();

            if i % 3 == 0 {
                println!("  线程7(统计): 高频:{} 用户ID:{} 混合:{} 用户记录:{}",
                         high_freq, user_ids, mixed_ops, user_count);
            }

            thread::sleep(std::time::Duration::from_millis(15));
        }
    });
    handles.push(handle7);

    println!("  启动7个并发线程...");

    // 等待所有线程完成
    for handle in handles {
        handle.join().unwrap();
    }

    println!("\n📋 测试6: 并发结果验证");
    println!("---------------------");

    // 验证各种计数器
    let high_freq_counter = atomic_manager.get("high_freq_counter".to_string())?;
    let user_id_counter = atomic_manager.get("user_id_allocator".to_string())?;
    let mixed_ops_counter = atomic_manager.get("mixed_ops_counter".to_string())?;
    let user_records = atomic_manager.scan_prefix(b"user:")?;

    println!("  并发测试结果:");
    println!("    high_freq_counter: {:?}", high_freq_counter);
    println!("    user_id_allocator: {:?}", user_id_counter);
    println!("    mixed_ops_counter: {:?}", mixed_ops_counter);
    println!("    user记录数: {}", user_records.len());

    // 验证预期值
    let expected_high_freq = 3 * 30; // 3个线程 * 30次 = 90
    let expected_user_ids = 25; // 1个线程 * 25次 = 25

    let high_freq_ok = high_freq_counter == Some(expected_high_freq);
    let user_ids_ok = user_id_counter == Some(expected_user_ids);
    let user_records_ok = user_records.len() == expected_user_ids as usize;

    println!("  一致性检查:");
    println!("    high_freq_counter: {} (预期: {})", high_freq_ok, expected_high_freq);
    println!("    user_id_allocator: {} (预期: {})", user_ids_ok, expected_user_ids);
    println!("    user记录数: {} (预期: {})", user_records_ok, expected_user_ids);

    if high_freq_ok && user_ids_ok && user_records_ok {
        println!("  ✅ 7线程高压力并发测试通过");
    } else {
        println!("  ❌ 并发测试失败");
    }

    println!("\n📋 测试7: 重置计数器");
    println!("------------------");

    atomic_manager.reset("test_counter".to_string(), 100)?;
    let reset_value = atomic_manager.get("test_counter".to_string())?;
    println!("  重置后的值: {:?}", reset_value);

    if reset_value == Some(100) {
        println!("  ✅ 重置计数器测试通过");
    } else {
        println!("  ❌ 重置计数器测试失败: 预期100,实际{:?}", reset_value);
    }

    println!("\n📋 测试8: 持久化验证");
    println!("------------------");

    // 等待所有持久化操作完成
    thread::sleep(std::time::Duration::from_millis(200));

    // 创建新的AtomicOperationsManager实例来测试持久化
    let atomic_manager2 = AtomicOperationsManager::new(db.clone());

    // 预热计数器
    let loaded_count = atomic_manager2.preload_counters()?;
    println!("  预热加载了 {} 个计数器", loaded_count);

    let persisted_value = atomic_manager2.get("test_counter".to_string())?;
    println!("  持久化后的test_counter值: {:?}", persisted_value);

    // 验证并发测试的计数器也持久化了
    let persisted_high_freq = atomic_manager2.get("high_freq_counter".to_string())?;
    let persisted_user_ids = atomic_manager2.get("user_id_allocator".to_string())?;

    println!("  持久化验证:");
    println!("    test_counter: {:?} (原: {:?})", persisted_value, reset_value);
    println!("    high_freq_counter: {:?} (原: {:?})", persisted_high_freq, high_freq_counter);
    println!("    user_id_allocator: {:?} (原: {:?})", persisted_user_ids, user_id_counter);

    let persistence_ok = persisted_value == Some(100) &&
                        persisted_high_freq == high_freq_counter &&
                        persisted_user_ids == user_id_counter;

    if persistence_ok {
        println!("  ✅ 持久化验证通过");
    } else {
        println!("  ❌ 持久化验证失败");
    }

    println!("\n📋 测试9: 数据库操作集成测试");
    println!("----------------------------");

    // 测试通过统一管理器进行数据库操作
    atomic_manager.insert(b"test_key", b"test_value")?;
    let retrieved_value = atomic_manager.get_data(b"test_key")?;
    println!("  插入并获取数据: {:?}", retrieved_value);

    let scan_results = atomic_manager.scan_prefix(b"test")?;
    println!("  扫描前缀'test'的结果: {} 个键值对", scan_results.len());

    if retrieved_value.is_some() && !scan_results.is_empty() {
        println!("  ✅ 数据库操作集成测试通过");
    } else {
        println!("  ❌ 数据库操作集成测试失败");
    }

    println!("\n🎉 所有原子操作统一管理器测试完成!");
    println!("✅ 架构安全性:所有操作都通过AtomicOperationsManager统一入口");
    println!("✅ 功能完整性:支持递增、递减、乘法、除法、百分比、CAS等操作");
    println!("✅ 并发安全性:多线程环境下的原子性保证");
    println!("✅ 持久化一致性:原子操作结果正确持久化到数据库");

    // 清理测试数据库
    platform_utils::cleanup_db_directory(&db_path);

    Ok(())
}