1use std::{
2 path::{Path, PathBuf},
3 sync::Arc,
4};
5
6use async_trait::async_trait;
7use bincode::{
8 config::standard as bincode_config,
9 serde::{decode_from_slice, encode_to_vec},
10};
11use bytes::Bytes;
12use chrono::{DateTime, Utc};
13use feoxdb::{FeoxError, FeoxStore};
14use hitbox_backend::format::{Format, JsonFormat};
15use hitbox_backend::{
16 Backend, BackendError, BackendResult, CacheKeyFormat, Compressor, DeleteStatus,
17 PassthroughCompressor,
18};
19use hitbox_core::{BackendLabel, CacheKey, CacheValue, Raw};
20use serde::{Deserialize, Serialize};
21
22use crate::FeOxDbError;
23
24#[derive(Serialize, Deserialize)]
25struct SerializableCacheValue {
26 #[serde(with = "serde_bytes")]
27 data: Vec<u8>,
28 stale: Option<DateTime<Utc>>,
29 expire: Option<DateTime<Utc>>,
30}
31
32impl From<CacheValue<Raw>> for SerializableCacheValue {
33 fn from(value: CacheValue<Raw>) -> Self {
34 Self {
35 data: value.data().to_vec(),
36 stale: value.stale(),
37 expire: value.expire(),
38 }
39 }
40}
41
42impl From<SerializableCacheValue> for CacheValue<Raw> {
43 fn from(value: SerializableCacheValue) -> Self {
44 CacheValue::new(Bytes::from(value.data), value.expire, value.stale)
45 }
46}
47
48#[derive(Clone)]
72pub struct FeOxDbBackend<S = JsonFormat, C = PassthroughCompressor>
73where
74 S: Format,
75 C: Compressor,
76{
77 store: Arc<FeoxStore>,
78 key_format: CacheKeyFormat,
79 serializer: S,
80 compressor: C,
81 label: BackendLabel,
82}
83
84impl<S, C> FeOxDbBackend<S, C>
85where
86 S: Format,
87 C: Compressor,
88{
89 pub fn flush(&self) {
97 self.store.flush();
98 }
99}
100
101impl FeOxDbBackend<JsonFormat, PassthroughCompressor> {
102 pub fn builder() -> FeOxDbBackendBuilder<JsonFormat, PassthroughCompressor> {
104 FeOxDbBackendBuilder::default()
105 }
106
107 pub fn in_memory() -> Result<Self, FeOxDbError> {
118 let store = FeoxStore::builder().enable_ttl(true).build()?;
119
120 Ok(Self {
121 store: Arc::new(store),
122 key_format: CacheKeyFormat::Bitcode,
123 serializer: JsonFormat,
124 compressor: PassthroughCompressor,
125 label: BackendLabel::new_static("feoxdb"),
126 })
127 }
128}
129
130pub struct FeOxDbBackendBuilder<S = JsonFormat, C = PassthroughCompressor>
145where
146 S: Format,
147 C: Compressor,
148{
149 path: Option<PathBuf>,
150 max_file_size: Option<u64>,
151 max_memory: Option<usize>,
152 key_format: CacheKeyFormat,
153 serializer: S,
154 compressor: C,
155 label: BackendLabel,
156}
157
158impl Default for FeOxDbBackendBuilder<JsonFormat, PassthroughCompressor> {
159 fn default() -> Self {
160 Self {
161 path: None,
162 max_file_size: None,
163 max_memory: None,
164 key_format: CacheKeyFormat::Bitcode,
165 serializer: JsonFormat,
166 compressor: PassthroughCompressor,
167 label: BackendLabel::new_static("feoxdb"),
168 }
169 }
170}
171
172impl<S, C> FeOxDbBackendBuilder<S, C>
173where
174 S: Format,
175 C: Compressor,
176{
177 pub fn path(mut self, path: impl AsRef<Path>) -> Self {
182 self.path = Some(path.as_ref().to_path_buf());
183 self
184 }
185
186 pub fn max_file_size(mut self, bytes: u64) -> Self {
193 self.max_file_size = Some(bytes);
194 self
195 }
196
197 pub fn max_memory(mut self, bytes: usize) -> Self {
207 self.max_memory = Some(bytes);
208 self
209 }
210
211 pub fn key_format(mut self, format: CacheKeyFormat) -> Self {
213 self.key_format = format;
214 self
215 }
216
217 pub fn label(mut self, label: impl Into<BackendLabel>) -> Self {
219 self.label = label.into();
220 self
221 }
222
223 pub fn value_format<NewS>(self, serializer: NewS) -> FeOxDbBackendBuilder<NewS, C>
228 where
229 NewS: Format,
230 {
231 FeOxDbBackendBuilder {
232 path: self.path,
233 max_file_size: self.max_file_size,
234 max_memory: self.max_memory,
235 key_format: self.key_format,
236 serializer,
237 compressor: self.compressor,
238 label: self.label,
239 }
240 }
241
242 pub fn compressor<NewC>(self, compressor: NewC) -> FeOxDbBackendBuilder<S, NewC>
248 where
249 NewC: Compressor,
250 {
251 FeOxDbBackendBuilder {
252 path: self.path,
253 max_file_size: self.max_file_size,
254 max_memory: self.max_memory,
255 key_format: self.key_format,
256 serializer: self.serializer,
257 compressor,
258 label: self.label,
259 }
260 }
261
262 pub fn build(self) -> Result<FeOxDbBackend<S, C>, FeOxDbError> {
266 let mut builder = FeoxStore::builder().enable_ttl(true);
267
268 if let Some(mut path) = self.path {
269 if path.is_dir() {
270 path.push("cache.db");
271 }
272 let path_str = path.to_string_lossy().to_string();
273 builder = builder.device_path(path_str);
274 }
275
276 if let Some(file_size) = self.max_file_size {
277 builder = builder.file_size(file_size);
278 }
279
280 if let Some(memory) = self.max_memory {
281 builder = builder.max_memory(memory);
282 }
283
284 let store = builder.build()?;
285
286 Ok(FeOxDbBackend {
287 store: Arc::new(store),
288 key_format: self.key_format,
289 serializer: self.serializer,
290 compressor: self.compressor,
291 label: self.label,
292 })
293 }
294}
295
296#[async_trait]
297impl<S, C> Backend for FeOxDbBackend<S, C>
298where
299 S: Format + Send + Sync,
300 C: Compressor + Send + Sync,
301{
302 async fn read(&self, key: &CacheKey) -> BackendResult<Option<CacheValue<Raw>>> {
303 let store = self.store.clone();
304
305 let key_bytes = encode_to_vec(key, bincode_config())
306 .map_err(|e| BackendError::InternalError(Box::new(e)))?;
307
308 tokio::task::spawn_blocking(move || match store.get(&key_bytes) {
309 Ok(encoded) => {
310 let (serializable, _): (SerializableCacheValue, _) =
311 decode_from_slice(&encoded, bincode_config())
312 .map_err(|e| BackendError::InternalError(Box::new(e)))?;
313
314 let cache_value: CacheValue<Raw> = serializable.into();
315
316 if let Some(expire_time) = cache_value.expire()
317 && expire_time < Utc::now()
318 {
319 return Ok(None);
320 }
321
322 Ok(Some(cache_value))
323 }
324 Err(FeoxError::KeyNotFound) => Ok(None),
325 Err(e) => Err(BackendError::InternalError(Box::new(e))),
326 })
327 .await
328 .map_err(|e| BackendError::InternalError(Box::new(e)))?
329 }
330
331 async fn write(&self, key: &CacheKey, value: CacheValue<Raw>) -> BackendResult<()> {
332 let store = self.store.clone();
333
334 let key_bytes = encode_to_vec(key, bincode_config())
335 .map_err(|e| BackendError::InternalError(Box::new(e)))?;
336
337 let ttl = value.ttl();
339
340 let serializable: SerializableCacheValue = value.into();
341 let value_bytes = encode_to_vec(&serializable, bincode_config())
342 .map_err(|e| BackendError::InternalError(Box::new(e)))?;
343
344 tokio::task::spawn_blocking(move || {
345 ttl.map(|ttl_duration| ttl_duration.as_secs())
346 .map(|ttl_secs| store.insert_with_ttl(&key_bytes, &value_bytes, ttl_secs))
347 .unwrap_or_else(|| store.insert(&key_bytes, &value_bytes))
348 .map_err(|e| BackendError::InternalError(Box::new(e)))?;
349 Ok(())
350 })
351 .await
352 .map_err(|e| BackendError::InternalError(Box::new(e)))?
353 }
354
355 async fn remove(&self, key: &CacheKey) -> BackendResult<DeleteStatus> {
356 let store = self.store.clone();
357
358 let key_bytes = encode_to_vec(key, bincode_config())
359 .map_err(|e| BackendError::InternalError(Box::new(e)))?;
360
361 tokio::task::spawn_blocking(move || {
362 let exists = store.contains_key(&key_bytes);
363
364 if exists {
365 store
366 .delete(&key_bytes)
367 .map_err(|e| BackendError::InternalError(Box::new(e)))?;
368 Ok(DeleteStatus::Deleted(1))
369 } else {
370 Ok(DeleteStatus::Missing)
371 }
372 })
373 .await
374 .map_err(|e| BackendError::InternalError(Box::new(e)))?
375 }
376
377 fn value_format(&self) -> &dyn Format {
378 &self.serializer
379 }
380
381 fn key_format(&self) -> &CacheKeyFormat {
382 &self.key_format
383 }
384
385 fn compressor(&self) -> &dyn Compressor {
386 &self.compressor
387 }
388
389 fn label(&self) -> BackendLabel {
390 self.label.clone()
391 }
392}
393
394impl<S, C> hitbox_backend::CacheBackend for FeOxDbBackend<S, C>
396where
397 S: Format + Send + Sync,
398 C: Compressor + Send + Sync,
399{
400}
401
402#[cfg(test)]
403mod tests {
404 use super::*;
405 use chrono::Utc;
406 use tempfile::TempDir;
407
408 #[tokio::test]
409 async fn test_write_and_read() {
410 let temp_dir = TempDir::new().unwrap();
411 let backend = FeOxDbBackend::builder()
412 .path(temp_dir.path())
413 .build()
414 .unwrap();
415
416 let key = CacheKey::from_str("test-key", "1");
417 let value = CacheValue::new(
418 Bytes::from(&b"test-value"[..]),
419 Some(Utc::now() + chrono::Duration::hours(1)),
420 None,
421 );
422
423 backend.write(&key, value.clone()).await.unwrap();
425
426 let result = backend.read(&key).await.unwrap();
428 assert!(result.is_some());
429 assert_eq!(result.unwrap().data().as_ref(), b"test-value");
430 }
431
432 #[tokio::test]
433 async fn test_delete() {
434 let temp_dir = TempDir::new().unwrap();
435 let backend = FeOxDbBackend::builder()
436 .path(temp_dir.path())
437 .build()
438 .unwrap();
439
440 let key = CacheKey::from_str("delete-key", "1");
441 let value = CacheValue::new(
442 Bytes::from(&b"test-value"[..]),
443 Some(Utc::now() + chrono::Duration::hours(1)),
444 None,
445 );
446
447 backend.write(&key, value).await.unwrap();
449
450 let status = backend.remove(&key).await.unwrap();
452 assert_eq!(status, DeleteStatus::Deleted(1));
453
454 let result = backend.read(&key).await.unwrap();
456 assert!(result.is_none());
457 }
458
459 #[tokio::test]
460 async fn test_delete_missing() {
461 let temp_dir = TempDir::new().unwrap();
462 let backend = FeOxDbBackend::builder()
463 .path(temp_dir.path())
464 .build()
465 .unwrap();
466
467 let key = CacheKey::from_str("nonexistent", "1");
468 let status = backend.remove(&key).await.unwrap();
469 assert_eq!(status, DeleteStatus::Missing);
470 }
471
472 #[tokio::test]
473 async fn test_read_nonexistent() {
474 let temp_dir = TempDir::new().unwrap();
475 let backend = FeOxDbBackend::builder()
476 .path(temp_dir.path())
477 .build()
478 .unwrap();
479
480 let key = CacheKey::from_str("nonexistent-read", "1");
481 let result = backend.read(&key).await.unwrap();
482 assert!(result.is_none());
483 }
484
485 #[tokio::test]
486 async fn test_in_memory_backend() {
487 let backend = FeOxDbBackend::in_memory().unwrap();
488
489 let key = CacheKey::from_str("memory-key", "1");
490 let value = CacheValue::new(
491 Bytes::from(&b"memory-value"[..]),
492 Some(Utc::now() + chrono::Duration::hours(1)),
493 None,
494 );
495
496 backend.write(&key, value).await.unwrap();
498
499 let result = backend.read(&key).await.unwrap();
501 assert!(result.is_some());
502 assert_eq!(result.unwrap().data().as_ref(), b"memory-value");
503 }
504
505 #[tokio::test]
506 async fn test_clone_shares_store() {
507 let temp_dir = TempDir::new().unwrap();
508 let backend1 = FeOxDbBackend::builder()
509 .path(temp_dir.path())
510 .build()
511 .unwrap();
512 let backend2 = backend1.clone();
513
514 let key = CacheKey::from_str("shared-key", "1");
515 let value = CacheValue::new(
516 Bytes::from(&b"shared-value"[..]),
517 Some(Utc::now() + chrono::Duration::hours(1)),
518 None,
519 );
520
521 backend1.write(&key, value).await.unwrap();
523
524 let result = backend2.read(&key).await.unwrap();
526 assert!(result.is_some());
527 assert_eq!(result.unwrap().data().as_ref(), b"shared-value");
528 }
529
530 #[tokio::test]
531 async fn test_per_key_ttl() {
532 let temp_dir = TempDir::new().unwrap();
533 let backend = FeOxDbBackend::builder()
534 .path(temp_dir.path())
535 .build()
536 .unwrap();
537
538 let now = Utc::now();
539 let expire_1h = now + chrono::Duration::hours(1);
540 let expire_24h = now + chrono::Duration::hours(24);
541
542 let key1 = CacheKey::from_str("key1", "1");
544 let value1 = CacheValue::new(Bytes::from(&b"value1"[..]), Some(expire_1h), None);
545 backend.write(&key1, value1).await.unwrap();
546
547 let key2 = CacheKey::from_str("key2", "1");
549 let value2 = CacheValue::new(Bytes::from(&b"value2"[..]), Some(expire_24h), None);
550 backend.write(&key2, value2).await.unwrap();
551
552 let read1 = backend
554 .read(&key1)
555 .await
556 .unwrap()
557 .expect("key1 should exist");
558 let read2 = backend
559 .read(&key2)
560 .await
561 .unwrap()
562 .expect("key2 should exist");
563
564 let tolerance = chrono::Duration::seconds(1);
566 assert!(
567 (read1.expire().unwrap() - expire_1h).abs() < tolerance,
568 "key1 expire time should be ~1 hour from now"
569 );
570 assert!(
571 (read2.expire().unwrap() - expire_24h).abs() < tolerance,
572 "key2 expire time should be ~24 hours from now"
573 );
574 }
575
576 #[tokio::test]
577 async fn test_expired_entry_not_returned() {
578 let backend = FeOxDbBackend::in_memory().unwrap();
579
580 let key = CacheKey::from_str("expired-key", "1");
582 let expired_time = Utc::now() - chrono::Duration::seconds(10);
583 let value = CacheValue::new(Bytes::from(&b"expired"[..]), Some(expired_time), None);
584 backend.write(&key, value).await.unwrap();
585
586 let result = backend.read(&key).await.unwrap();
588 assert!(result.is_none(), "Expired entry should not be returned");
589 }
590
591 #[tokio::test]
592 async fn test_memory_limit_exceeded() {
593 let backend = FeOxDbBackend::builder()
595 .max_memory(1024) .build()
597 .unwrap();
598
599 let key = CacheKey::from_str("big-key", "1");
601 let large_data = vec![0u8; 2048]; let value = CacheValue::new(
603 Bytes::from(large_data),
604 Some(Utc::now() + chrono::Duration::hours(1)),
605 None,
606 );
607
608 let result = backend.write(&key, value).await;
609 assert!(
610 result.is_err(),
611 "Write should fail when exceeding memory limit"
612 );
613 }
614
615 #[tokio::test]
616 async fn test_builder_with_label() {
617 let backend = FeOxDbBackend::builder()
618 .label("custom-label")
619 .build()
620 .unwrap();
621
622 assert_eq!(backend.label().as_ref(), "custom-label");
623 }
624
625 #[tokio::test]
626 async fn test_builder_with_custom_format() {
627 use hitbox_backend::format::BincodeFormat;
628
629 let temp_dir = TempDir::new().unwrap();
630 let backend = FeOxDbBackend::builder()
631 .path(temp_dir.path())
632 .value_format(BincodeFormat)
633 .build()
634 .unwrap();
635
636 let key = CacheKey::from_str("format-key", "1");
638 let value = CacheValue::new(
639 Bytes::from(&b"format-value"[..]),
640 Some(Utc::now() + chrono::Duration::hours(1)),
641 None,
642 );
643
644 backend.write(&key, value).await.unwrap();
645 let result = backend.read(&key).await.unwrap();
646 assert!(result.is_some());
647 assert_eq!(result.unwrap().data().as_ref(), b"format-value");
648 }
649
650 #[tokio::test]
651 async fn test_flush_persists_data() {
652 let temp_dir = TempDir::new().unwrap();
653 let db_path = temp_dir.path().join("cache.db");
654
655 {
657 let backend = FeOxDbBackend::builder()
658 .path(temp_dir.path())
659 .build()
660 .unwrap();
661
662 let key = CacheKey::from_str("persist-key", "1");
663 let value = CacheValue::new(
664 Bytes::from(&b"persist-value"[..]),
665 Some(Utc::now() + chrono::Duration::hours(1)),
666 None,
667 );
668 backend.write(&key, value).await.unwrap();
669 backend.flush();
670 }
671
672 let backend = FeOxDbBackend::builder().path(&db_path).build().unwrap();
674
675 let key = CacheKey::from_str("persist-key", "1");
676 let result = backend.read(&key).await.unwrap();
677 assert!(
678 result.is_some(),
679 "Data should persist after flush and reopen"
680 );
681 assert_eq!(result.unwrap().data().as_ref(), b"persist-value");
682 }
683
684 #[tokio::test]
685 async fn test_file_size_limit_drops_excess_writes() {
686 let temp_dir = TempDir::new().unwrap();
687 let db_path = temp_dir.path().join("cache.db");
688
689 let file_size_limit = 10 * 1024 * 1024; let chunk_size = 256 * 1024; let num_chunks = 60; {
695 let backend = FeOxDbBackend::builder()
696 .path(temp_dir.path())
697 .max_file_size(file_size_limit)
698 .build()
699 .unwrap();
700
701 let chunk = vec![0u8; chunk_size];
702 for i in 0..num_chunks {
703 let key = CacheKey::from_str(&format!("chunk-{}", i), "1");
704 let value = CacheValue::new(
705 Bytes::from(chunk.clone()),
706 Some(Utc::now() + chrono::Duration::hours(1)),
707 None,
708 );
709 let _ = backend.write(&key, value).await;
710 if i % 5 == 4 {
712 backend.flush();
713 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
714 }
715 }
716 backend.flush();
717 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
718 }
719
720 let backend = FeOxDbBackend::builder()
722 .path(&db_path)
723 .max_file_size(file_size_limit)
724 .build()
725 .unwrap();
726
727 let mut persisted_count = 0;
728 for i in 0..num_chunks {
729 let key = CacheKey::from_str(&format!("chunk-{}", i), "1");
730 if backend.read(&key).await.unwrap().is_some() {
731 persisted_count += 1;
732 }
733 }
734
735 assert!(persisted_count > 0, "At least some chunks should persist");
737 assert!(
738 persisted_count < num_chunks,
739 "Not all chunks should persist when exceeding file size limit. \
740 Persisted {}/{} chunks",
741 persisted_count,
742 num_chunks
743 );
744 }
745}