oxcache 0.1.4

A high-performance multi-level cache library for Rust with L1 (memory) and L2 (Redis) caching.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
// Copyright (c) 2025-2026, Kirky.X
//
// MIT License
//
// 内存泄漏测试

#![allow(unexpected_cfgs)]

#[path = "common/mod.rs"]
mod common;

use common::is_redis_available;
use oxcache::backend::l1::L1Backend;
use oxcache::backend::l2::L2Backend;
use std::sync::Arc;
use std::time::Duration;
use tokio::time::sleep;

/// 内存泄漏测试模块
/// 使用循环引用和大量操作来检测潜在的内存泄漏

#[tokio::test]
async fn test_l1_cache_memory_leak() {
    let cache = Arc::new(L1Backend::new(1000));

    // 执行大量操作,检测内存泄漏
    for i in 0..10000 {
        let key = format!("key_{}", i % 100); // 循环使用100个key
        let value = vec![i as u8; 100];

        cache
            .set_bytes(&key, value.clone(), Some(60))
            .await
            .unwrap();
        cache.get_bytes(&key).await.unwrap();

        if i % 1000 == 0 {
            // 定期清理,模拟真实使用场景
            // L1Backend doesn't have clear method, so we'll delete keys individually
            for j in 0..100 {
                let key = format!("key_{}", j);
                let _ = cache.delete(&key).await;
            }
            sleep(Duration::from_millis(10)).await;
        }
    }

    // 清理所有数据
    for j in 0..100 {
        let key = format!("key_{}", j);
        let _ = cache.delete(&key).await;
    }

    // 强制drop,确保所有内存被释放
    drop(cache);

    // 给垃圾回收一些时间
    sleep(Duration::from_millis(100)).await;
}

#[tokio::test]
async fn test_l2_cache_memory_leak() {
    if !is_redis_available().await {
        println!("跳过test_l2_cache_memory_leak:Redis不可用");
        return;
    }

    use oxcache::config::L2Config;
    use oxcache::config::RedisMode;

    let config = L2Config {
        mode: RedisMode::Standalone,
        connection_string: secrecy::SecretString::from("redis://127.0.0.1:6379/15".to_string()),
        connection_timeout_ms: 5000,
        command_timeout_ms: 1000,
        password: None,
        enable_tls: false,
        sentinel: None,
        cluster: None,
        default_ttl: Some(3600),
        ..Default::default()
    };

    let l2_backend = L2Backend::new(&config)
        .await
        .expect("Failed to connect to Redis");

    // 执行大量L2操作
    for i in 0..5000 {
        let key = format!("l2_leak_test_{}", i % 50); // 循环使用50个key
        let value = vec![i as u8; 1024]; // 1KB数据

        l2_backend
            .set_with_version(&key, value.clone(), Some(300))
            .await
            .unwrap();
        l2_backend.get_bytes(&key).await.unwrap();

        if i % 500 == 0 {
            // 定期删除,避免Redis内存溢出
            l2_backend.delete(&key).await.unwrap();
            sleep(Duration::from_millis(50)).await;
        }
    }

    // 清理测试数据
    for i in 0..50 {
        let key = format!("l2_leak_test_{}", i);
        l2_backend.delete(&key).await.unwrap();
    }

    drop(l2_backend);
}

#[tokio::test]
async fn test_two_level_cache_memory_leak() {
    if !is_redis_available().await {
        println!("跳过test_two_level_cache_memory_leak:Redis不可用");
        return;
    }

    use oxcache::config::L2Config;
    use oxcache::config::RedisMode;

    let l1 = Arc::new(L1Backend::new(100));

    let config = L2Config {
        mode: RedisMode::Standalone,
        connection_string: secrecy::SecretString::from("redis://127.0.0.1:6379/14".to_string()),
        connection_timeout_ms: 5000,
        command_timeout_ms: 1000,
        password: None,
        enable_tls: false,
        sentinel: None,
        cluster: None,
        default_ttl: Some(3600),
        ..Default::default()
    };

    let l2 = L2Backend::new(&config)
        .await
        .expect("Failed to connect to Redis");

    // 直接使用L1和L2进行测试,不创建TwoLevelClient
    // 测试L1缓存的内存泄漏
    for i in 0..1500 {
        let key = format!("two_level_l1_{}", i % 100);
        let value = format!("value_{}", i).into_bytes();

        // 写入操作
        l1.set_bytes(&key, value.clone(), Some(120)).await.unwrap();

        // 读取操作
        let _ = l1.get_bytes(&key).await;

        // 定期清理
        if i % 150 == 0 {
            for j in 0..100 {
                let key = format!("two_level_l1_{}", j);
                let _ = l1.delete(&key).await;
            }
            sleep(Duration::from_millis(20)).await;
        }
    }

    // 清理L1数据
    for j in 0..100 {
        let key = format!("two_level_l1_{}", j);
        let _ = l1.delete(&key).await;
    }

    // 测试L2缓存的内存泄漏
    for i in 0..1500 {
        let key = format!("two_level_l2_{}", i % 100);
        let value = format!("value_{}", i).into_bytes();

        // 写入操作
        l2.set_with_version(&key, value.clone(), Some(120))
            .await
            .unwrap();

        // 读取操作
        let _ = l2.get_bytes(&key).await;

        // 定期清理
        if i % 150 == 0 {
            for j in 0..100 {
                let key = format!("two_level_l2_{}", j);
                l2.delete(&key).await.unwrap();
            }
            sleep(Duration::from_millis(20)).await;
        }
    }

    // 清理L2数据
    for j in 0..100 {
        let key = format!("two_level_l2_{}", j);
        l2.delete(&key).await.unwrap();
    }

    drop(l1);
    drop(l2);
    sleep(Duration::from_millis(100)).await;
}

#[tokio::test]
async fn test_batch_operation_memory_leak() {
    if !is_redis_available().await {
        println!("跳过test_batch_operation_memory_leak:Redis不可用");
        return;
    }

    let l1 = Arc::new(L1Backend::new(500));

    use oxcache::config::L2Config;
    use oxcache::config::RedisMode;

    let config = L2Config {
        mode: RedisMode::Standalone,
        connection_string: secrecy::SecretString::from("redis://127.0.0.1:6379/13".to_string()),
        connection_timeout_ms: 5000,
        command_timeout_ms: 1000,
        password: None,
        enable_tls: false,
        sentinel: None,
        cluster: None,
        default_ttl: Some(3600),
        ..Default::default()
    };

    let l2 = L2Backend::new(&config)
        .await
        .expect("Failed to connect to Redis");

    // 批量操作内存泄漏测试 - 分别测试L1和L2
    for batch_id in 0..50 {
        let mut batch = Vec::new();

        for i in 0..50 {
            let key = format!("batch_l1_{}_{}", batch_id, i);
            let value = vec![batch_id as u8; 256];
            batch.push((key, value));
        }

        // L1批量设置
        for (key, value) in &batch {
            l1.set_bytes(key, value.clone(), Some(60)).await.unwrap();
        }

        // L1批量获取
        for (key, _) in &batch {
            let _ = l1.get_bytes(key).await;
        }

        // L1批量删除
        for (key, _) in &batch {
            l1.delete(key).await.unwrap();
        }

        // L2批量操作
        let mut l2_batch = Vec::new();
        for i in 0..50 {
            let key = format!("batch_l2_{}_{}", batch_id, i);
            let value = vec![batch_id as u8; 256];
            l2_batch.push((key, value));
        }

        // L2批量设置
        for (key, value) in &l2_batch {
            l2.set_with_version(key, value.clone(), Some(60))
                .await
                .unwrap();
        }

        // L2批量获取
        for (key, _) in &l2_batch {
            let _ = l2.get_bytes(key).await;
        }

        // L2批量删除
        for (key, _) in &l2_batch {
            l2.delete(key).await.unwrap();
        }

        sleep(Duration::from_millis(10)).await;
    }

    // 清理L1缓存
    for i in 0..100 {
        let key = format!("batch_l1_0_{}", i);
        let _ = l1.delete(&key).await;
    }

    drop(l1);
    drop(l2);
}

#[tokio::test]
async fn test_concurrent_memory_leak() {
    let cache = Arc::new(L1Backend::new(1000));
    let mut handles = vec![];

    // 并发内存泄漏测试
    for thread_id in 0..10 {
        let cache_clone = Arc::clone(&cache);

        let handle = tokio::spawn(async move {
            for i in 0..1000 {
                let key = format!("thread_{}_key_{}", thread_id, i % 50);
                let value = format!("thread_{}_value_{}", thread_id, i).into_bytes();

                cache_clone
                    .set_bytes(&key, value.clone(), Some(60))
                    .await
                    .unwrap();
                let _ = cache_clone.get_bytes(&key).await;

                if i % 100 == 0 {
                    // 定期清理部分key,避免全部清理影响并发测试
                    for j in 0..50 {
                        let key = format!("thread_{}_key_{}", thread_id, j);
                        let _ = cache_clone.delete(&key).await;
                    }
                }
            }
        });

        handles.push(handle);
    }

    // 等待所有任务完成
    for handle in handles {
        handle.await.unwrap();
    }

    // 清理所有数据
    for thread_id in 0..10 {
        for i in 0..50 {
            let key = format!("thread_{}_key_{}", thread_id, i);
            let _ = cache.delete(&key).await;
        }
    }

    drop(cache);
    sleep(Duration::from_millis(200)).await;
}

/// 这个测试专门用于检测循环引用导致的内存泄漏
#[tokio::test]
async fn test_circular_reference_memory_leak() {
    use std::cell::RefCell;
    use std::rc::Rc;

    struct Node {
        _value: Vec<u8>,
        next: Option<Rc<RefCell<Node>>>,
    }

    // 创建循环引用
    let node1 = Rc::new(RefCell::new(Node {
        _value: vec![1; 1024],
        next: None,
    }));

    let node2 = Rc::new(RefCell::new(Node {
        _value: vec![2; 1024],
        next: Some(Rc::clone(&node1)),
    }));

    // 创建循环
    node1.borrow_mut().next = Some(Rc::clone(&node2));

    // 使用缓存存储循环引用(序列化为字节数组)
    let cache = Arc::new(L1Backend::new(100));

    // 将循环引用序列化为字节数组存储
    let serialized = format!("circular_ref_data_{}", Rc::strong_count(&node1)).into_bytes();
    cache
        .set_bytes("circular_ref", serialized.clone(), Some(10))
        .await
        .unwrap();

    // 删除后应该释放内存
    cache.delete("circular_ref").await.unwrap();
    drop(cache);
    drop(node1);
    drop(node2);

    sleep(Duration::from_millis(100)).await;
}

// 内存使用监控辅助函数(需要jemalloc或其他内存分配器支持)
#[cfg(feature = "memory_profiling_never_12345")]
mod memory_profiling {
    use super::*;

    use jemalloc_ctl::{epoch, stats};
    use std::fmt;

    #[derive(Debug)]
    struct JemallocWrapper(jemalloc_ctl::Error);

    impl fmt::Display for JemallocWrapper {
        fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
            write!(f, "jemalloc error: {:?}", self.0)
        }
    }

    impl std::error::Error for JemallocWrapper {}

    pub async fn get_memory_usage() -> Result<(usize, usize), Box<dyn std::error::Error>> {
        epoch::advance().map_err(|e| Box::new(JemallocWrapper(e)) as Box<dyn std::error::Error>)?;

        let allocated = stats::allocated::read()
            .map_err(|e| Box::new(JemallocWrapper(e)) as Box<dyn std::error::Error>)?;
        let active = stats::active::read()
            .map_err(|e| Box::new(JemallocWrapper(e)) as Box<dyn std::error::Error>)?;

        Ok((allocated, active))
    }

    #[tokio::test]
    async fn test_memory_usage_tracking() {
        let (initial_allocated, initial_active) = get_memory_usage().await.unwrap();

        let cache = Arc::new(L1Backend::new(10000));

        // 执行大量操作,模拟真实使用场景
        for i in 0..10000 {
            let key = format!("mem_test_{}", i);
            let value = vec![i as u8; 1024];
            cache.set_bytes(&key, value, Some(60)).await.unwrap();
        }

        let (peak_allocated, peak_active) = get_memory_usage().await.unwrap();
        println!(
            "Memory usage - Initial: {} bytes allocated, {} bytes active",
            initial_allocated, initial_active
        );
        println!(
            "Memory usage - Peak: {} bytes allocated, {} bytes active",
            peak_allocated, peak_active
        );

        // 清理缓存:L1Backend没有clear方法,逐个删除键
        for i in 0..10000 {
            let key = format!("mem_test_{}", i);
            let _ = cache.delete(&key).await;
        }
        drop(cache);
        sleep(Duration::from_millis(500)).await;

        let (final_allocated, final_active) = get_memory_usage().await.unwrap();
        println!(
            "Memory usage - Final: {} bytes allocated, {} bytes active",
            final_allocated, final_active
        );

        // Jemalloc 会保留内存以便后续重用,这不是真正的内存泄漏
        // 验证分配的内存没有过度增长(允许最多 10MB 的 Jemalloc 缓存)
        let max_reasonable_allocation = initial_allocated.saturating_add(10 * 1024 * 1024);
        assert!(
            final_allocated < max_reasonable_allocation,
            "Potential memory leak: allocated {} bytes (initial: {}, max reasonable: {})",
            final_allocated,
            initial_allocated,
            max_reasonable_allocation
        );

        // 验证没有持续的内存增长趋势(多次运行测试不应该导致内存持续增加)
        // 这个检查在单次测试中没有意义,但可以防止明显的泄漏
        assert!(
            final_allocated <= peak_allocated * 2,
            "Memory allocation increased significantly after cleanup: {} vs peak {}",
            final_allocated,
            peak_allocated
        );
    }

    #[tokio::test]
    async fn test_long_running_memory_stability() {
        // 长时间运行的内存稳定性测试
        let (initial_allocated, _) = get_memory_usage().await.unwrap();
        let cache = Arc::new(L1Backend::new(5000));

        // 定期记录内存使用情况
        let mut memory_samples = Vec::new();
        memory_samples.push(initial_allocated);

        // 运行10轮,每轮执行操作后休息一段时间
        for round in 0..10 {
            println!("Running memory stability test round {}/10", round + 1);

            // 执行批量操作
            for i in 0..2000 {
                let key = format!("longrun_{}_{}", round, i % 500);
                let value = vec![round as u8; 512];
                cache.set_bytes(&key, value, Some(120)).await.unwrap();
            }

            // 执行批量读取
            for i in 0..2000 {
                let key = format!("longrun_{}_{}", round, i % 500);
                let _ = cache.get_bytes(&key).await;
            }

            // 定期清理旧数据
            if round % 2 == 0 {
                for i in 0..500 {
                    let key = format!("longrun_{}_{}", (round + 1) % 2, i);
                    let _ = cache.delete(&key).await;
                }
            }

            // 记录内存使用情况
            let (current_allocated, _) = get_memory_usage().await.unwrap();
            memory_samples.push(current_allocated);
            println!(
                "  Memory usage after round {}: {} bytes",
                round + 1,
                current_allocated
            );

            // 休息一段时间
            sleep(Duration::from_millis(200)).await;
        }

        // 清理所有数据
        for round in 0..10 {
            for i in 0..500 {
                let key = format!("longrun_{}_{}", round, i);
                let _ = cache.delete(&key).await;
            }
        }

        drop(cache);
        sleep(Duration::from_millis(500)).await;

        // 验证长时间运行后的内存稳定性
        let max_memory = memory_samples.iter().max().unwrap();
        let min_memory = memory_samples.iter().min().unwrap();

        println!("Long running memory stability test results:");
        println!("  Minimum memory usage: {} bytes", min_memory);
        println!("  Maximum memory usage: {} bytes", max_memory);
        println!("  Memory usage range: {} bytes", max_memory - min_memory);

        // 检查内存使用是否在合理范围内波动,没有持续增长
        assert!(
            *max_memory < initial_allocated * 3,
            "Memory usage exceeded expected limit: {} vs {}",
            max_memory,
            initial_allocated * 3
        );
    }
}