1pub mod compose;
43pub mod policy;
44
45mod context;
46mod envelope;
47mod format;
48
49pub use compose::Compose;
50pub use policy::CompositionPolicy;
51
52pub(crate) use context::{CompositionContext, CompositionLayer};
54pub(crate) use format::CompositionFormat;
55
56use crate::format::Format;
57use crate::metrics::Timer;
58use crate::{
59 Backend, BackendError, BackendResult, CacheBackend, CacheKeyFormat, Compressor, DeleteStatus,
60 PassthroughCompressor,
61};
62use async_trait::async_trait;
63use envelope::CompositionEnvelope;
64use hitbox_core::{
65 BackendLabel, BoxContext, CacheContext, CacheKey, CacheStatus, CacheValue, Cacheable,
66 CacheableResponse, Offload, Raw, ResponseSource,
67};
68use policy::{
69 CompositionReadPolicy, CompositionWritePolicy, OptimisticParallelWritePolicy, ReadResult,
70 RefillPolicy, SequentialReadPolicy,
71};
72use smol_str::SmolStr;
73use std::sync::Arc;
74use thiserror::Error;
75
76#[derive(Debug, Error)]
81pub enum CompositionError {
82 #[error("Both cache layers failed - L1: {l1}, L2: {l2}")]
84 BothLayersFailed {
85 l1: BackendError,
87 l2: BackendError,
89 },
90}
91
92pub struct CompositionBackend<
103 L1,
104 L2,
105 O,
106 R = SequentialReadPolicy,
107 W = OptimisticParallelWritePolicy,
108> where
109 L1: Backend,
110 L2: Backend,
111 O: Offload<'static>,
112 R: CompositionReadPolicy,
113 W: CompositionWritePolicy,
114{
115 l1: L1,
117 l2: L2,
119 format: CompositionFormat,
121 offload: O,
123 read_policy: R,
125 write_policy: W,
127 refill_policy: RefillPolicy,
129 label: BackendLabel,
131 l1_label: SmolStr,
133 l2_label: SmolStr,
135}
136
137#[inline]
139fn compose_label(prefix: &str, suffix: &str) -> SmolStr {
140 SmolStr::from(format!("{}.{}", prefix, suffix))
141}
142
143impl<L1, L2, O> CompositionBackend<L1, L2, O, SequentialReadPolicy, OptimisticParallelWritePolicy>
144where
145 L1: Backend,
146 L2: Backend,
147 O: Offload<'static>,
148{
149 pub fn new(l1: L1, l2: L2, offload: O) -> Self {
161 let label = BackendLabel::new_static("composition");
162 let l1_label = compose_label(label.as_str(), l1.label().as_str());
163 let l2_label = compose_label(label.as_str(), l2.label().as_str());
164 let format = CompositionFormat::new(
165 Arc::new(l1.value_format().clone_box()),
166 Arc::new(l2.value_format().clone_box()),
167 Arc::new(l1.compressor().clone_box()),
168 Arc::new(l2.compressor().clone_box()),
169 l1_label.clone(),
170 l2_label.clone(),
171 );
172 Self {
173 l1,
174 l2,
175 format,
176 offload,
177 read_policy: SequentialReadPolicy::new(),
178 write_policy: OptimisticParallelWritePolicy::new(),
179 refill_policy: RefillPolicy::default(),
180 label,
181 l1_label,
182 l2_label,
183 }
184 }
185}
186
187impl<L1, L2, O, R, W> CompositionBackend<L1, L2, O, R, W>
188where
189 L1: Backend,
190 L2: Backend,
191 O: Offload<'static>,
192 R: CompositionReadPolicy,
193 W: CompositionWritePolicy,
194{
195 pub fn read_policy(&self) -> &R {
197 &self.read_policy
198 }
199
200 pub fn write_policy(&self) -> &W {
202 &self.write_policy
203 }
204
205 pub fn refill_policy(&self) -> &RefillPolicy {
207 &self.refill_policy
208 }
209
210 pub fn offload(&self) -> &O {
212 &self.offload
213 }
214
215 pub fn label(mut self, label: impl Into<BackendLabel>) -> Self {
220 self.label = label.into();
221 self.l1_label = compose_label(self.label.as_str(), self.l1.label().as_str());
223 self.l2_label = compose_label(self.label.as_str(), self.l2.label().as_str());
224 self.format
226 .set_labels(self.l1_label.clone(), self.l2_label.clone());
227 self
228 }
229
230 pub fn with_policy<NewR, NewW>(
248 self,
249 policy: CompositionPolicy<NewR, NewW>,
250 ) -> CompositionBackend<L1, L2, O, NewR, NewW>
251 where
252 NewR: CompositionReadPolicy,
253 NewW: CompositionWritePolicy,
254 {
255 CompositionBackend {
256 l1: self.l1,
257 l2: self.l2,
258 format: self.format,
259 offload: self.offload,
260 read_policy: policy.read,
261 write_policy: policy.write,
262 refill_policy: policy.refill,
263 label: self.label,
264 l1_label: self.l1_label,
265 l2_label: self.l2_label,
266 }
267 }
268
269 pub fn read<NewR: CompositionReadPolicy>(
282 self,
283 read_policy: NewR,
284 ) -> CompositionBackend<L1, L2, O, NewR, W> {
285 CompositionBackend {
286 l1: self.l1,
287 l2: self.l2,
288 format: self.format,
289 offload: self.offload,
290 read_policy,
291 write_policy: self.write_policy,
292 refill_policy: self.refill_policy,
293 label: self.label,
294 l1_label: self.l1_label,
295 l2_label: self.l2_label,
296 }
297 }
298
299 pub fn write<NewW: CompositionWritePolicy>(
312 self,
313 write_policy: NewW,
314 ) -> CompositionBackend<L1, L2, O, R, NewW> {
315 CompositionBackend {
316 l1: self.l1,
317 l2: self.l2,
318 format: self.format,
319 offload: self.offload,
320 read_policy: self.read_policy,
321 write_policy,
322 refill_policy: self.refill_policy,
323 label: self.label,
324 l1_label: self.l1_label,
325 l2_label: self.l2_label,
326 }
327 }
328
329 pub fn refill(mut self, refill_policy: RefillPolicy) -> Self {
342 self.refill_policy = refill_policy;
343 self
344 }
345}
346
347impl<L1, L2, O, R, W> Clone for CompositionBackend<L1, L2, O, R, W>
348where
349 L1: Clone + Backend,
350 L2: Clone + Backend,
351 O: Offload<'static>,
352 R: Clone + CompositionReadPolicy,
353 W: Clone + CompositionWritePolicy,
354{
355 fn clone(&self) -> Self {
356 Self {
357 l1: self.l1.clone(),
358 l2: self.l2.clone(),
359 format: self.format.clone(),
360 offload: self.offload.clone(),
361 read_policy: self.read_policy.clone(),
362 write_policy: self.write_policy.clone(),
363 refill_policy: self.refill_policy,
364 label: self.label.clone(),
365 l1_label: self.l1_label.clone(),
366 l2_label: self.l2_label.clone(),
367 }
368 }
369}
370
371impl<L1, L2, O, R, W> std::fmt::Debug for CompositionBackend<L1, L2, O, R, W>
372where
373 L1: std::fmt::Debug + Backend,
374 L2: std::fmt::Debug + Backend,
375 O: std::fmt::Debug + Offload<'static>,
376 R: std::fmt::Debug + CompositionReadPolicy,
377 W: std::fmt::Debug + CompositionWritePolicy,
378{
379 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
380 f.debug_struct("CompositionBackend")
381 .field("label", &self.label)
382 .field("l1", &self.l1)
383 .field("l2", &self.l2)
384 .field("format", &self.format)
385 .field("offload", &self.offload)
386 .field("read_policy", &self.read_policy)
387 .field("write_policy", &self.write_policy)
388 .field("refill_policy", &self.refill_policy)
389 .finish()
390 }
391}
392
393#[async_trait]
402impl<L1, L2, O, R, W> Backend for CompositionBackend<L1, L2, O, R, W>
403where
404 L1: Backend + Clone + Send + Sync + 'static,
405 L2: Backend + Clone + Send + Sync + 'static,
406 O: Offload<'static>,
407 R: CompositionReadPolicy,
408 W: CompositionWritePolicy,
409{
410 #[tracing::instrument(skip(self), level = "trace")]
411 async fn read(&self, key: &CacheKey) -> BackendResult<Option<CacheValue<Raw>>> {
412 let l1 = self.l1.clone();
414 let l2 = self.l2.clone();
415 let l1_label = self.l1_label.clone();
417 let l2_label = self.l2_label.clone();
418
419 let read_l1_with_envelope = |k: CacheKey| async move {
420 let ctx: BoxContext = CacheContext::default().boxed();
421 let timer = Timer::new();
422 let read_result = l1.read(&k).await;
423 crate::metrics::record_read(&l1_label, timer.elapsed());
424
425 let result = match read_result {
426 Ok(Some(l1_value)) => {
427 crate::metrics::record_read_bytes(&l1_label, l1_value.data().len());
428 let (expire, stale) = (l1_value.expire(), l1_value.stale());
429 let envelope = CompositionEnvelope::L1(l1_value);
430 match envelope.serialize() {
431 Ok(packed) => Ok(Some(CacheValue::new(packed, expire, stale))),
432 Err(e) => Err(e),
433 }
434 }
435 Ok(None) => Ok(None),
436 Err(e) => {
437 crate::metrics::record_read_error(&l1_label);
438 Err(e)
439 }
440 };
441 (result, ctx)
442 };
443
444 let read_l2_with_envelope = |k: CacheKey| async move {
445 let ctx: BoxContext = CacheContext::default().boxed();
446 let timer = Timer::new();
447 let read_result = l2.read(&k).await;
448 crate::metrics::record_read(&l2_label, timer.elapsed());
449
450 let result = match read_result {
451 Ok(Some(l2_value)) => {
452 crate::metrics::record_read_bytes(&l2_label, l2_value.data().len());
453 let (expire, stale) = (l2_value.expire(), l2_value.stale());
454 let envelope = CompositionEnvelope::L2(l2_value);
455 match envelope.serialize() {
456 Ok(packed) => Ok(Some(CacheValue::new(packed, expire, stale))),
457 Err(e) => Err(e),
458 }
459 }
460 Ok(None) => Ok(None),
461 Err(e) => {
462 crate::metrics::record_read_error(&l2_label);
463 Err(e)
464 }
465 };
466 (result, ctx)
467 };
468
469 let ReadResult { value, .. } = self
470 .read_policy
471 .execute_with(
472 key.clone(),
473 read_l1_with_envelope,
474 read_l2_with_envelope,
475 &self.offload,
476 )
477 .await?;
478
479 Ok(value)
481 }
482
483 #[tracing::instrument(skip(self, value), level = "trace")]
484 async fn write(&self, key: &CacheKey, value: CacheValue<Raw>) -> BackendResult<()> {
485 let composition = CompositionEnvelope::deserialize(value.data())?;
487
488 match composition {
492 CompositionEnvelope::Both { l1, l2 } => {
493 let l1_backend = self.l1.clone();
495 let l2_backend = self.l2.clone();
496 let l1_label = self.l1_label.clone();
498 let l2_label = self.l2_label.clone();
499 let l1_len = l1.data().len();
500 let l2_len = l2.data().len();
501
502 let write_l1 = |k: CacheKey| async move {
503 let timer = Timer::new();
504 let result = l1_backend.write(&k, l1).await;
505 crate::metrics::record_write(&l1_label, timer.elapsed());
506 match &result {
507 Ok(()) => crate::metrics::record_write_bytes(&l1_label, l1_len),
508 Err(_) => crate::metrics::record_write_error(&l1_label),
509 }
510 result
511 };
512 let write_l2 = |k: CacheKey| async move {
513 let timer = Timer::new();
514 let result = l2_backend.write(&k, l2).await;
515 crate::metrics::record_write(&l2_label, timer.elapsed());
516 match &result {
517 Ok(()) => crate::metrics::record_write_bytes(&l2_label, l2_len),
518 Err(_) => crate::metrics::record_write_error(&l2_label),
519 }
520 result
521 };
522
523 self.write_policy
524 .execute_with(key.clone(), write_l1, write_l2, &self.offload)
525 .await
526 }
527 CompositionEnvelope::L1(l1) => {
528 let l1_len = l1.data().len();
529 let timer = Timer::new();
530 let result = self.l1.write(key, l1).await;
531 crate::metrics::record_write(&self.l1_label, timer.elapsed());
532 match &result {
533 Ok(()) => crate::metrics::record_write_bytes(&self.l1_label, l1_len),
534 Err(_) => crate::metrics::record_write_error(&self.l1_label),
535 }
536 result
537 }
538 CompositionEnvelope::L2(l2) => {
539 let l2_len = l2.data().len();
540 let timer = Timer::new();
541 let result = self.l2.write(key, l2).await;
542 crate::metrics::record_write(&self.l2_label, timer.elapsed());
543 match &result {
544 Ok(()) => crate::metrics::record_write_bytes(&self.l2_label, l2_len),
545 Err(_) => crate::metrics::record_write_error(&self.l2_label),
546 }
547 result
548 }
549 }
550 }
551
552 #[tracing::instrument(skip(self), level = "trace")]
553 async fn remove(&self, key: &CacheKey) -> BackendResult<DeleteStatus> {
554 let (l1_result, l2_result) = futures::join!(self.l1.remove(key), self.l2.remove(key));
556
557 match (l1_result, l2_result) {
558 (Err(e1), Err(e2)) => {
559 tracing::error!(l1_error = ?e1, l2_error = ?e2, "Both L1 and L2 delete failed");
560 Err(BackendError::InternalError(Box::new(
561 CompositionError::BothLayersFailed { l1: e1, l2: e2 },
562 )))
563 }
564 (Err(e), Ok(status)) => {
565 tracing::warn!(error = ?e, "L1 delete failed");
566 Ok(status)
567 }
568 (Ok(status), Err(e)) => {
569 tracing::warn!(error = ?e, "L2 delete failed");
570 Ok(status)
571 }
572 (Ok(DeleteStatus::Deleted(n1)), Ok(DeleteStatus::Deleted(n2))) => {
573 Ok(DeleteStatus::Deleted(n1 + n2))
574 }
575 (Ok(DeleteStatus::Deleted(n)), Ok(DeleteStatus::Missing))
576 | (Ok(DeleteStatus::Missing), Ok(DeleteStatus::Deleted(n))) => {
577 Ok(DeleteStatus::Deleted(n))
578 }
579 (Ok(DeleteStatus::Missing), Ok(DeleteStatus::Missing)) => Ok(DeleteStatus::Missing),
580 }
581 }
582
583 fn label(&self) -> BackendLabel {
584 self.label.clone()
585 }
586
587 fn value_format(&self) -> &dyn Format {
588 &self.format
589 }
590
591 fn key_format(&self) -> &CacheKeyFormat {
592 &CacheKeyFormat::Bitcode
593 }
594
595 fn compressor(&self) -> &dyn Compressor {
596 &PassthroughCompressor
597 }
598}
599
600impl<L1, L2, O, R, W> CacheBackend for CompositionBackend<L1, L2, O, R, W>
601where
602 L1: CacheBackend + Clone + Send + Sync + 'static,
603 L2: CacheBackend + Clone + Send + Sync + 'static,
604 O: Offload<'static>,
605 R: CompositionReadPolicy,
606 W: CompositionWritePolicy,
607{
608 #[tracing::instrument(skip(self, ctx), level = "trace")]
609 async fn get<T>(
610 &self,
611 key: &CacheKey,
612 ctx: &mut BoxContext,
613 ) -> BackendResult<Option<CacheValue<T::Cached>>>
614 where
615 T: CacheableResponse,
616 T::Cached: Cacheable,
617 {
618 let l1 = self.l1.clone();
620 let l2 = self.l2.clone();
621
622 let l1_label = self.l1_label.clone();
624 let l2_label = self.l2_label.clone();
625
626 let l1_name = l1.label();
628 let l2_name = l2.label();
629
630 let format_for_l1 = self.format.clone();
632 let format_for_l2 = self.format.clone();
633
634 let l1_ctx = ctx.clone_box();
636 let l2_ctx = ctx.clone_box();
637
638 let read_l1 = |k: CacheKey| async move {
639 let mut internal_ctx = l1_ctx;
640
641 let read_timer = Timer::new();
643 let read_result = l1.read(&k).await;
644 crate::metrics::record_read(&l1_label, read_timer.elapsed());
645
646 let result = match read_result {
647 Ok(Some(raw_value)) => {
648 let (meta, raw_data) = raw_value.into_parts();
649 crate::metrics::record_read_bytes(&l1_label, raw_data.len());
650
651 let mut deserialized_opt: Option<T::Cached> = None;
653 match format_for_l1.deserialize_layer(
654 &raw_data,
655 CompositionLayer::L1,
656 &mut |deserializer| {
657 let value: T::Cached = deserializer.deserialize()?;
658 deserialized_opt = Some(value);
659 Ok(())
660 },
661 &mut internal_ctx,
662 ) {
663 Ok(()) => match deserialized_opt {
664 Some(deserialized) => {
665 internal_ctx.set_status(CacheStatus::Hit);
667
668 let source = if let Some(comp_ctx) =
671 internal_ctx.as_any().downcast_ref::<CompositionContext>()
672 {
673 BackendLabel::from(
675 comp_ctx.format.label_for_layer(comp_ctx.layer).clone(),
676 )
677 } else {
678 l1_name.clone()
680 };
681 internal_ctx.set_source(ResponseSource::Backend(source));
682
683 Ok(Some(CacheValue::new(deserialized, meta.expire, meta.stale)))
684 }
685 None => Err(BackendError::InternalError(Box::new(
686 std::io::Error::other("deserialization produced no result"),
687 ))),
688 },
689 Err(e) => Err(BackendError::InternalError(Box::new(e))),
690 }
691 }
692 Ok(None) => Ok(None),
693 Err(e) => {
694 crate::metrics::record_read_error(&l1_label);
695 Err(e)
696 }
697 };
698
699 (result, internal_ctx)
700 };
701
702 let read_l2 = |k: CacheKey| async move {
703 let mut internal_ctx = l2_ctx;
704
705 let read_timer = Timer::new();
707 let read_result = l2.read(&k).await;
708 crate::metrics::record_read(&l2_label, read_timer.elapsed());
709
710 let result = match read_result {
711 Ok(Some(raw_value)) => {
712 let (meta, raw_data) = raw_value.into_parts();
713 crate::metrics::record_read_bytes(&l2_label, raw_data.len());
714
715 let mut deserialized_opt: Option<T::Cached> = None;
719 match format_for_l2.deserialize_layer(
720 &raw_data,
721 CompositionLayer::L2,
722 &mut |deserializer| {
723 let value: T::Cached = deserializer.deserialize()?;
724 deserialized_opt = Some(value);
725 Ok(())
726 },
727 &mut internal_ctx,
728 ) {
729 Ok(()) => match deserialized_opt {
730 Some(deserialized) => {
731 let cache_value =
732 CacheValue::new(deserialized, meta.expire, meta.stale);
733
734 internal_ctx.set_status(CacheStatus::Hit);
736
737 let source = if let Some(comp_ctx) =
740 internal_ctx.as_any().downcast_ref::<CompositionContext>()
741 {
742 BackendLabel::from(
744 comp_ctx.format.label_for_layer(comp_ctx.layer).clone(),
745 )
746 } else {
747 l2_name.clone()
749 };
750 internal_ctx.set_source(ResponseSource::Backend(source));
751
752 Ok(Some(cache_value))
753 }
754 None => Err(BackendError::InternalError(Box::new(
755 std::io::Error::other("deserialization produced no result"),
756 ))),
757 },
758 Err(e) => Err(BackendError::InternalError(Box::new(e))),
759 }
760 }
761 Ok(None) => Ok(None),
762 Err(e) => {
763 crate::metrics::record_read_error(&l2_label);
764 Err(e)
765 }
766 };
767
768 (result, internal_ctx)
769 };
770
771 let ReadResult {
772 value,
773 source,
774 context: inner_ctx,
775 } = self
776 .read_policy
777 .execute_with(key.clone(), read_l1, read_l2, &self.offload)
778 .await?;
779
780 if let Some(ref _cache_value) = value {
782 ctx.merge_from(&*inner_ctx, &self.label);
783
784 if source == CompositionLayer::L2 && self.refill_policy == RefillPolicy::Always {
787 ctx.set_read_mode(hitbox_core::ReadMode::Refill);
788 }
789 }
790
791 Ok(value)
792 }
793
794 #[tracing::instrument(skip(self, value, ctx), level = "trace")]
795 async fn set<T>(
796 &self,
797 key: &CacheKey,
798 value: &CacheValue<T::Cached>,
799 ctx: &mut BoxContext,
800 ) -> BackendResult<()>
801 where
802 T: CacheableResponse,
803 T::Cached: Cacheable,
804 {
805 use hitbox_core::ReadMode;
806
807 if ctx.read_mode() == ReadMode::Refill {
810 match self.refill_policy {
811 RefillPolicy::Always => {
812 let l1_bytes = self
814 .format
815 .serialize_layer(
816 CompositionLayer::L1,
817 &mut |serializer| {
818 serializer.serialize(value.data())?;
819 Ok(())
820 },
821 &**ctx,
822 )
823 .map_err(|e| BackendError::InternalError(Box::new(e)))?;
824
825 let l1_len = l1_bytes.len();
826 let l1_value = CacheValue::new(l1_bytes, value.expire(), value.stale());
827
828 let timer = Timer::new();
830 let result = self.l1.write(key, l1_value).await;
831 crate::metrics::record_write(&self.l1_label, timer.elapsed());
832 match &result {
833 Ok(()) => crate::metrics::record_write_bytes(&self.l1_label, l1_len),
834 Err(_) => crate::metrics::record_write_error(&self.l1_label),
835 }
836 result?;
837
838 return self.l2.set::<T>(key, value, ctx).await;
841 }
842 RefillPolicy::Never => {
843 return Ok(());
846 }
847 }
848 }
849
850 if let Some(comp_ctx) = ctx.as_any().downcast_ref::<CompositionContext>()
853 && comp_ctx.layer == CompositionLayer::L2
854 {
855 match self.refill_policy {
856 RefillPolicy::Always => {
857 let l1_bytes = self
859 .format
860 .serialize_layer(
861 CompositionLayer::L1,
862 &mut |serializer| {
863 serializer.serialize(value.data())?;
864 Ok(())
865 },
866 &**ctx,
867 )
868 .map_err(|e| BackendError::InternalError(Box::new(e)))?;
869
870 let l1_len = l1_bytes.len();
871 let l1_value = CacheValue::new(l1_bytes, value.expire(), value.stale());
872
873 let timer = Timer::new();
875 let result = self.l1.write(key, l1_value).await;
876 crate::metrics::record_write(&self.l1_label, timer.elapsed());
877 match &result {
878 Ok(()) => crate::metrics::record_write_bytes(&self.l1_label, l1_len),
879 Err(_) => crate::metrics::record_write_error(&self.l1_label),
880 }
881 result?;
882
883 let mut inner_ctx = comp_ctx.inner().clone_box();
886 return self.l2.set::<T>(key, value, &mut inner_ctx).await;
887 }
888 RefillPolicy::Never => {
889 let mut inner_ctx = comp_ctx.inner().clone_box();
891 return self.l2.set::<T>(key, value, &mut inner_ctx).await;
892 }
893 }
894 }
895
896 let (l1_bytes, l2_bytes) = self
900 .format
901 .serialize_parts(
902 &mut |serializer| {
903 serializer.serialize(value.data())?;
904 Ok(())
905 },
906 &**ctx,
907 )
908 .map_err(|e| BackendError::InternalError(Box::new(e)))?;
909
910 let l1_len = l1_bytes.len();
911 let l2_len = l2_bytes.len();
912
913 let l1_value = CacheValue::new(l1_bytes, value.expire(), value.stale());
915 let l2_value = CacheValue::new(l2_bytes, value.expire(), value.stale());
916
917 let l1 = self.l1.clone();
919 let l2 = self.l2.clone();
920
921 let l1_label = self.l1_label.clone();
923 let l2_label = self.l2_label.clone();
924
925 let write_l1 = |k: CacheKey| async move {
927 let timer = Timer::new();
928 let result = l1.write(&k, l1_value).await;
929 crate::metrics::record_write(&l1_label, timer.elapsed());
930 match &result {
931 Ok(()) => crate::metrics::record_write_bytes(&l1_label, l1_len),
932 Err(_) => crate::metrics::record_write_error(&l1_label),
933 }
934 result
935 };
936
937 let write_l2 = |k: CacheKey| async move {
938 let timer = Timer::new();
939 let result = l2.write(&k, l2_value).await;
940 crate::metrics::record_write(&l2_label, timer.elapsed());
941 match &result {
942 Ok(()) => crate::metrics::record_write_bytes(&l2_label, l2_len),
943 Err(_) => crate::metrics::record_write_error(&l2_label),
944 }
945 result
946 };
947
948 self.write_policy
949 .execute_with(key.clone(), write_l1, write_l2, &self.offload)
950 .await
951 }
952
953 #[tracing::instrument(skip(self, ctx), level = "trace")]
954 async fn delete(&self, key: &CacheKey, ctx: &mut BoxContext) -> BackendResult<DeleteStatus> {
955 let mut l1_ctx = ctx.clone_box();
957 let mut l2_ctx = ctx.clone_box();
958 let (l1_result, l2_result) = futures::join!(
959 self.l1.delete(key, &mut l1_ctx),
960 self.l2.delete(key, &mut l2_ctx)
961 );
962
963 match (l1_result, l2_result) {
965 (Err(e1), Err(e2)) => {
966 tracing::error!(l1_error = ?e1, l2_error = ?e2, "Both L1 and L2 delete failed");
967 Err(BackendError::InternalError(Box::new(
968 CompositionError::BothLayersFailed { l1: e1, l2: e2 },
969 )))
970 }
971 (Err(e), Ok(status)) => {
972 tracing::warn!(error = ?e, "L1 delete failed");
973 Ok(status)
974 }
975 (Ok(status), Err(e)) => {
976 tracing::warn!(error = ?e, "L2 delete failed");
977 Ok(status)
978 }
979 (Ok(DeleteStatus::Deleted(n1)), Ok(DeleteStatus::Deleted(n2))) => {
980 tracing::trace!("Deleted from both L1 and L2");
981 Ok(DeleteStatus::Deleted(n1 + n2))
982 }
983 (Ok(DeleteStatus::Deleted(n)), Ok(DeleteStatus::Missing))
984 | (Ok(DeleteStatus::Missing), Ok(DeleteStatus::Deleted(n))) => {
985 tracing::trace!("Deleted from one layer");
986 Ok(DeleteStatus::Deleted(n))
987 }
988 (Ok(DeleteStatus::Missing), Ok(DeleteStatus::Missing)) => {
989 tracing::trace!("Key missing from both layers");
990 Ok(DeleteStatus::Missing)
991 }
992 }
993 }
994}
995
996#[cfg(test)]
997mod tests {
998 use super::*;
999 use crate::format::{Format, JsonFormat};
1000 use crate::{Backend, CacheKeyFormat, Compressor, PassthroughCompressor};
1001 use async_trait::async_trait;
1002 use chrono::Utc;
1003 use hitbox_core::{
1004 BoxContext, CacheContext, CachePolicy, CacheStatus, CacheValue, CacheableResponse,
1005 EntityPolicyConfig, Predicate, Raw, ResponseSource,
1006 };
1007 use serde::{Deserialize, Serialize};
1008 use smol_str::SmolStr;
1009 use std::collections::HashMap;
1010 use std::future::Future;
1011 use std::sync::{Arc, Mutex};
1012
1013 #[cfg(feature = "rkyv_format")]
1014 use rkyv::{Archive, Serialize as RkyvSerialize};
1015
1016 #[derive(Clone, Debug)]
1018 struct TestOffload;
1019
1020 impl Offload<'static> for TestOffload {
1021 fn spawn<F>(&self, _kind: impl Into<SmolStr>, future: F)
1022 where
1023 F: Future<Output = ()> + Send + 'static,
1024 {
1025 tokio::spawn(future);
1026 }
1027 }
1028
1029 #[derive(Clone, Debug)]
1031 struct TestBackend {
1032 store: Arc<Mutex<HashMap<CacheKey, CacheValue<Raw>>>>,
1033 backend_label: &'static str,
1034 }
1035
1036 impl TestBackend {
1037 fn new() -> Self {
1038 Self {
1039 store: Arc::new(Mutex::new(HashMap::new())),
1040 backend_label: "test",
1041 }
1042 }
1043
1044 fn with_label(label: &'static str) -> Self {
1045 Self {
1046 store: Arc::new(Mutex::new(HashMap::new())),
1047 backend_label: label,
1048 }
1049 }
1050 }
1051
1052 #[async_trait]
1053 impl Backend for TestBackend {
1054 async fn read(&self, key: &CacheKey) -> BackendResult<Option<CacheValue<Raw>>> {
1055 Ok(self.store.lock().unwrap().get(key).cloned())
1056 }
1057
1058 async fn write(&self, key: &CacheKey, value: CacheValue<Raw>) -> BackendResult<()> {
1059 self.store.lock().unwrap().insert(key.clone(), value);
1060 Ok(())
1061 }
1062
1063 async fn remove(&self, key: &CacheKey) -> BackendResult<DeleteStatus> {
1064 match self.store.lock().unwrap().remove(key) {
1065 Some(_) => Ok(DeleteStatus::Deleted(1)),
1066 None => Ok(DeleteStatus::Missing),
1067 }
1068 }
1069
1070 fn label(&self) -> BackendLabel {
1071 BackendLabel::new(self.backend_label)
1072 }
1073
1074 fn value_format(&self) -> &dyn Format {
1075 &JsonFormat
1076 }
1077
1078 fn key_format(&self) -> &CacheKeyFormat {
1079 &CacheKeyFormat::Bitcode
1080 }
1081
1082 fn compressor(&self) -> &dyn Compressor {
1083 &PassthroughCompressor
1084 }
1085 }
1086
1087 impl CacheBackend for TestBackend {}
1088
1089 #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
1090 #[cfg_attr(
1091 feature = "rkyv_format",
1092 derive(Archive, RkyvSerialize, rkyv::Deserialize)
1093 )]
1094 struct CachedData {
1095 value: String,
1096 }
1097
1098 struct MockResponse;
1101
1102 impl CacheableResponse for MockResponse {
1105 type Cached = CachedData;
1106 type Subject = MockResponse;
1107 type IntoCachedFuture = std::future::Ready<CachePolicy<Self::Cached, Self>>;
1108 type FromCachedFuture = std::future::Ready<Self>;
1109
1110 async fn cache_policy<P: Predicate<Subject = Self::Subject> + Send + Sync>(
1111 self,
1112 _predicate: P,
1113 _config: &EntityPolicyConfig,
1114 ) -> CachePolicy<CacheValue<Self::Cached>, Self> {
1115 unimplemented!("Not used in these tests")
1116 }
1117
1118 fn into_cached(self) -> Self::IntoCachedFuture {
1119 unimplemented!("Not used in these tests")
1120 }
1121
1122 fn from_cached(_cached: Self::Cached) -> Self::FromCachedFuture {
1123 unimplemented!("Not used in these tests")
1124 }
1125 }
1126
1127 #[tokio::test]
1128 async fn test_l1_hit() {
1129 let l1 = TestBackend::with_label("moka");
1130 let l2 = TestBackend::with_label("redis");
1131 let backend = CompositionBackend::new(l1.clone(), l2, TestOffload).label("cache");
1132
1133 let key = CacheKey::from_str("test", "key1");
1134 let value = CacheValue::new(
1135 CachedData {
1136 value: "value1".to_string(),
1137 },
1138 Some(Utc::now() + chrono::Duration::seconds(60)),
1139 None,
1140 );
1141
1142 let mut ctx: BoxContext = CacheContext::default().boxed();
1144 backend
1145 .set::<MockResponse>(&key, &value, &mut ctx)
1146 .await
1147 .unwrap();
1148
1149 let mut ctx: BoxContext = CacheContext::default().boxed();
1151 let result = backend.get::<MockResponse>(&key, &mut ctx).await.unwrap();
1152 assert_eq!(result.unwrap().data().value, "value1");
1153
1154 assert_eq!(ctx.status(), CacheStatus::Hit);
1156 assert_eq!(ctx.source(), &ResponseSource::Backend("cache.moka".into()));
1157 }
1158
1159 #[tokio::test]
1160 async fn test_l2_hit_sets_refill_mode() {
1161 use hitbox_core::ReadMode;
1162
1163 let l1 = TestBackend::with_label("moka");
1164 let l2 = TestBackend::with_label("redis");
1165
1166 let key = CacheKey::from_str("test", "key1");
1167 let value = CacheValue::new(
1168 CachedData {
1169 value: "value1".to_string(),
1170 },
1171 Some(Utc::now() + chrono::Duration::seconds(60)),
1172 None,
1173 );
1174
1175 let backend = CompositionBackend::new(l1.clone(), l2.clone(), TestOffload)
1177 .label("cache")
1178 .refill(RefillPolicy::Always);
1179
1180 let mut ctx: BoxContext = CacheContext::default().boxed();
1182 backend
1183 .set::<MockResponse>(&key, &value, &mut ctx)
1184 .await
1185 .unwrap();
1186
1187 l1.store.lock().unwrap().clear();
1189
1190 let mut ctx: BoxContext = CacheContext::default().boxed();
1192 let result = backend.get::<MockResponse>(&key, &mut ctx).await.unwrap();
1193 assert_eq!(result.unwrap().data().value, "value1");
1194
1195 assert_eq!(ctx.status(), CacheStatus::Hit);
1197 assert_eq!(ctx.source(), &ResponseSource::Backend("cache.redis".into()));
1198
1199 assert_eq!(ctx.read_mode(), ReadMode::Refill);
1201
1202 let mut ctx: BoxContext = CacheContext::default().boxed();
1204 let l1_result = l1.get::<MockResponse>(&key, &mut ctx).await.unwrap();
1205 assert!(
1206 l1_result.is_none(),
1207 "L1 should not be populated directly by get()"
1208 );
1209 }
1210
1211 #[tokio::test]
1212 async fn test_miss_both_layers() {
1213 let l1 = TestBackend::new();
1214 let l2 = TestBackend::new();
1215 let backend = CompositionBackend::new(l1, l2, TestOffload);
1216
1217 let key = CacheKey::from_str("test", "nonexistent");
1218
1219 let mut ctx: BoxContext = CacheContext::default().boxed();
1220 let result = backend.get::<MockResponse>(&key, &mut ctx).await.unwrap();
1221 assert!(result.is_none());
1222 }
1223
1224 #[tokio::test]
1225 async fn test_write_to_both_layers() {
1226 let l1 = TestBackend::new();
1227 let l2 = TestBackend::new();
1228
1229 let key = CacheKey::from_str("test", "key1");
1230 let value = CacheValue::new(
1231 CachedData {
1232 value: "value1".to_string(),
1233 },
1234 Some(Utc::now() + chrono::Duration::seconds(60)),
1235 None,
1236 );
1237
1238 let backend = CompositionBackend::new(l1.clone(), l2.clone(), TestOffload);
1239
1240 let mut ctx: BoxContext = CacheContext::default().boxed();
1241 backend
1242 .set::<MockResponse>(&key, &value, &mut ctx)
1243 .await
1244 .unwrap();
1245
1246 let mut ctx: BoxContext = CacheContext::default().boxed();
1248 let l1_result = l1.get::<MockResponse>(&key, &mut ctx).await.unwrap();
1249 assert_eq!(l1_result.unwrap().data().value, "value1");
1250
1251 let mut ctx: BoxContext = CacheContext::default().boxed();
1252 let l2_result = l2.get::<MockResponse>(&key, &mut ctx).await.unwrap();
1253 assert_eq!(l2_result.unwrap().data().value, "value1");
1254 }
1255
1256 #[tokio::test]
1257 async fn test_delete_from_both_layers() {
1258 let l1 = TestBackend::new();
1259 let l2 = TestBackend::new();
1260
1261 let key = CacheKey::from_str("test", "key1");
1262 let value = CacheValue::new(
1263 CachedData {
1264 value: "value1".to_string(),
1265 },
1266 Some(Utc::now() + chrono::Duration::seconds(60)),
1267 None,
1268 );
1269
1270 let backend = CompositionBackend::new(l1.clone(), l2.clone(), TestOffload);
1271
1272 let mut ctx: BoxContext = CacheContext::default().boxed();
1274 backend
1275 .set::<MockResponse>(&key, &value, &mut ctx)
1276 .await
1277 .unwrap();
1278
1279 let mut ctx: BoxContext = CacheContext::default().boxed();
1281 let status = backend.delete(&key, &mut ctx).await.unwrap();
1282 assert_eq!(status, DeleteStatus::Deleted(2));
1283
1284 let mut ctx: BoxContext = CacheContext::default().boxed();
1286 let l1_result = l1.get::<MockResponse>(&key, &mut ctx).await.unwrap();
1287 assert!(l1_result.is_none());
1288
1289 let mut ctx: BoxContext = CacheContext::default().boxed();
1290 let l2_result = l2.get::<MockResponse>(&key, &mut ctx).await.unwrap();
1291 assert!(l2_result.is_none());
1292 }
1293
1294 #[tokio::test]
1295 async fn test_clone() {
1296 let l1 = TestBackend::new();
1297 let l2 = TestBackend::new();
1298 let backend = CompositionBackend::new(l1, l2, TestOffload);
1299
1300 let cloned = backend.clone();
1301
1302 let key = CacheKey::from_str("test", "key1");
1303 let value = CacheValue::new(
1304 CachedData {
1305 value: "value1".to_string(),
1306 },
1307 Some(Utc::now() + chrono::Duration::seconds(60)),
1308 None,
1309 );
1310
1311 let mut ctx: BoxContext = CacheContext::default().boxed();
1313 backend
1314 .set::<MockResponse>(&key, &value, &mut ctx)
1315 .await
1316 .unwrap();
1317
1318 let mut ctx: BoxContext = CacheContext::default().boxed();
1320 let result = cloned.get::<MockResponse>(&key, &mut ctx).await.unwrap();
1321 assert_eq!(result.unwrap().data().value, "value1");
1322 }
1323
1324 #[tokio::test]
1325 async fn test_nested_composition_source_path() {
1326 let l1 = TestBackend::with_label("moka");
1330 let l2 = TestBackend::with_label("redis");
1331 let l3 = TestBackend::with_label("disk");
1332
1333 let inner = CompositionBackend::new(l1.clone(), l2.clone(), TestOffload).label("inner");
1335
1336 let outer = CompositionBackend::new(inner, l3.clone(), TestOffload).label("outer");
1338
1339 let key = CacheKey::from_str("test", "nested");
1340 let value = CacheValue::new(
1341 CachedData {
1342 value: "nested_value".to_string(),
1343 },
1344 Some(Utc::now() + chrono::Duration::seconds(60)),
1345 None,
1346 );
1347
1348 let mut ctx: BoxContext = CacheContext::default().boxed();
1350 l1.set::<MockResponse>(&key, &value, &mut ctx)
1351 .await
1352 .unwrap();
1353
1354 let mut ctx: BoxContext = CacheContext::default().boxed();
1356 let result = outer.get::<MockResponse>(&key, &mut ctx).await.unwrap();
1357 assert_eq!(result.unwrap().data().value, "nested_value");
1358
1359 assert_eq!(ctx.status(), CacheStatus::Hit);
1361 assert_eq!(
1362 ctx.source(),
1363 &ResponseSource::Backend("outer.inner.moka".into())
1364 );
1365 }
1366
1367 #[tokio::test]
1368 async fn test_nested_composition_l2_source_path() {
1369 let l1 = TestBackend::with_label("moka");
1372 let l2 = TestBackend::with_label("redis");
1373 let l3 = TestBackend::with_label("disk");
1374
1375 let inner = CompositionBackend::new(l1.clone(), l2.clone(), TestOffload).label("inner");
1377
1378 let outer = CompositionBackend::new(inner, l3.clone(), TestOffload).label("outer");
1380
1381 let key = CacheKey::from_str("test", "nested_l2");
1382 let value = CacheValue::new(
1383 CachedData {
1384 value: "from_redis".to_string(),
1385 },
1386 Some(Utc::now() + chrono::Duration::seconds(60)),
1387 None,
1388 );
1389
1390 let mut ctx: BoxContext = CacheContext::default().boxed();
1392 l2.set::<MockResponse>(&key, &value, &mut ctx)
1393 .await
1394 .unwrap();
1395
1396 let mut ctx: BoxContext = CacheContext::default().boxed();
1398 let result = outer.get::<MockResponse>(&key, &mut ctx).await.unwrap();
1399 assert_eq!(result.unwrap().data().value, "from_redis");
1400
1401 assert_eq!(ctx.status(), CacheStatus::Hit);
1403 assert_eq!(
1404 ctx.source(),
1405 &ResponseSource::Backend("outer.inner.redis".into())
1406 );
1407 }
1408
1409 #[tokio::test]
1410 async fn test_nested_composition_outer_l2_source_path() {
1411 let l1 = TestBackend::with_label("moka");
1414 let l2 = TestBackend::with_label("redis");
1415 let l3 = TestBackend::with_label("disk");
1416
1417 let inner = CompositionBackend::new(l1.clone(), l2.clone(), TestOffload).label("inner");
1419
1420 let outer = CompositionBackend::new(inner, l3.clone(), TestOffload).label("outer");
1422
1423 let key = CacheKey::from_str("test", "outer_l2");
1424 let value = CacheValue::new(
1425 CachedData {
1426 value: "from_disk".to_string(),
1427 },
1428 Some(Utc::now() + chrono::Duration::seconds(60)),
1429 None,
1430 );
1431
1432 let mut ctx: BoxContext = CacheContext::default().boxed();
1434 l3.set::<MockResponse>(&key, &value, &mut ctx)
1435 .await
1436 .unwrap();
1437
1438 let mut ctx: BoxContext = CacheContext::default().boxed();
1440 let result = outer.get::<MockResponse>(&key, &mut ctx).await.unwrap();
1441 assert_eq!(result.unwrap().data().value, "from_disk");
1442
1443 assert_eq!(ctx.status(), CacheStatus::Hit);
1445 assert_eq!(ctx.source(), &ResponseSource::Backend("outer.disk".into()));
1446 }
1447
1448 #[tokio::test]
1449 async fn test_l1_hit_status() {
1450 let l1 = TestBackend::with_label("moka");
1451 let l2 = TestBackend::with_label("redis");
1452 let backend = CompositionBackend::new(l1.clone(), l2, TestOffload).label("cache");
1453
1454 let key = CacheKey::from_str("test", "metrics1");
1455 let value = CacheValue::new(
1456 CachedData {
1457 value: "value1".to_string(),
1458 },
1459 Some(Utc::now() + chrono::Duration::seconds(60)),
1460 None,
1461 );
1462
1463 let mut ctx: BoxContext = CacheContext::default().boxed();
1465 l1.set::<MockResponse>(&key, &value, &mut ctx)
1466 .await
1467 .unwrap();
1468
1469 let mut ctx: BoxContext = CacheContext::default().boxed();
1471 let result = backend.get::<MockResponse>(&key, &mut ctx).await.unwrap();
1472 assert_eq!(result.unwrap().data().value, "value1");
1473
1474 assert_eq!(ctx.status(), CacheStatus::Hit);
1476 assert_eq!(ctx.source(), &ResponseSource::Backend("cache.moka".into()));
1477 }
1478
1479 #[tokio::test]
1480 async fn test_l2_hit_with_refill_via_set() {
1481 use hitbox_core::ReadMode;
1482
1483 let l1 = TestBackend::with_label("moka");
1484 let l2 = TestBackend::with_label("redis");
1485
1486 let key = CacheKey::from_str("test", "metrics2");
1487 let value = CacheValue::new(
1488 CachedData {
1489 value: "from_l2".to_string(),
1490 },
1491 Some(Utc::now() + chrono::Duration::seconds(60)),
1492 None,
1493 );
1494
1495 let backend = CompositionBackend::new(l1.clone(), l2.clone(), TestOffload)
1496 .label("cache")
1497 .refill(RefillPolicy::Always);
1498
1499 let mut ctx: BoxContext = CacheContext::default().boxed();
1501 backend
1502 .set::<MockResponse>(&key, &value, &mut ctx)
1503 .await
1504 .unwrap();
1505
1506 l1.store.lock().unwrap().clear();
1508
1509 let mut ctx: BoxContext = CacheContext::default().boxed();
1511 let result = backend.get::<MockResponse>(&key, &mut ctx).await.unwrap();
1512 let cached_value = result.unwrap();
1513 assert_eq!(cached_value.data().value, "from_l2");
1514
1515 assert_eq!(ctx.status(), CacheStatus::Hit);
1517 assert_eq!(ctx.source(), &ResponseSource::Backend("cache.redis".into()));
1518 assert_eq!(ctx.read_mode(), ReadMode::Refill);
1519
1520 backend
1522 .set::<MockResponse>(&key, &cached_value, &mut ctx)
1523 .await
1524 .unwrap();
1525
1526 let mut ctx: BoxContext = CacheContext::default().boxed();
1528 let result = backend.get::<MockResponse>(&key, &mut ctx).await.unwrap();
1529 assert_eq!(result.unwrap().data().value, "from_l2");
1530 assert_eq!(ctx.source(), &ResponseSource::Backend("cache.moka".into()));
1531 }
1532
1533 #[tokio::test]
1534 async fn test_nested_composition_status() {
1535 let l1 = TestBackend::with_label("moka");
1536 let l2 = TestBackend::with_label("redis");
1537 let l3 = TestBackend::with_label("disk");
1538
1539 let inner = CompositionBackend::new(l1.clone(), l2.clone(), TestOffload).label("inner");
1540 let outer = CompositionBackend::new(inner, l3.clone(), TestOffload).label("outer");
1541
1542 let key = CacheKey::from_str("test", "nested_metrics");
1543 let value = CacheValue::new(
1544 CachedData {
1545 value: "nested".to_string(),
1546 },
1547 Some(Utc::now() + chrono::Duration::seconds(60)),
1548 None,
1549 );
1550
1551 let mut ctx: BoxContext = CacheContext::default().boxed();
1553 l1.set::<MockResponse>(&key, &value, &mut ctx)
1554 .await
1555 .unwrap();
1556
1557 let mut ctx: BoxContext = CacheContext::default().boxed();
1559 let result = outer.get::<MockResponse>(&key, &mut ctx).await.unwrap();
1560 assert_eq!(result.unwrap().data().value, "nested");
1561
1562 assert_eq!(ctx.status(), CacheStatus::Hit);
1564 assert_eq!(
1565 ctx.source(),
1566 &ResponseSource::Backend("outer.inner.moka".into())
1567 );
1568 }
1569}