1pub mod compose;
43pub mod policy;
44
45mod context;
46mod envelope;
47mod format;
48
49pub use compose::Compose;
50pub use policy::CompositionPolicy;
51
52pub(crate) use context::{CompositionContext, CompositionLayer};
54pub(crate) use format::CompositionFormat;
55
56use crate::format::Format;
57use crate::metrics::Timer;
58use crate::{
59 Backend, BackendError, BackendResult, CacheBackend, CacheKeyFormat, Compressor, DeleteStatus,
60 PassthroughCompressor,
61};
62use async_trait::async_trait;
63use envelope::CompositionEnvelope;
64use hitbox_core::{
65 BackendLabel, BoxContext, CacheContext, CacheKey, CacheStatus, CacheValue, Cacheable,
66 CacheableResponse, Offload, Raw, ResponseSource,
67};
68use policy::{
69 CompositionReadPolicy, CompositionWritePolicy, OptimisticParallelWritePolicy, ReadResult,
70 RefillPolicy, SequentialReadPolicy,
71};
72use smol_str::SmolStr;
73use std::sync::Arc;
74use thiserror::Error;
75
76#[derive(Debug, Error)]
81pub enum CompositionError {
82 #[error("Both cache layers failed - L1: {l1}, L2: {l2}")]
84 BothLayersFailed {
85 l1: BackendError,
87 l2: BackendError,
89 },
90}
91
92pub struct CompositionBackend<
103 L1,
104 L2,
105 O,
106 R = SequentialReadPolicy,
107 W = OptimisticParallelWritePolicy,
108> where
109 L1: Backend,
110 L2: Backend,
111 O: Offload<'static>,
112 R: CompositionReadPolicy,
113 W: CompositionWritePolicy,
114{
115 l1: L1,
117 l2: L2,
119 format: CompositionFormat,
121 offload: O,
123 read_policy: R,
125 write_policy: W,
127 refill_policy: RefillPolicy,
129 label: BackendLabel,
131 l1_label: SmolStr,
133 l2_label: SmolStr,
135}
136
137#[inline]
139fn compose_label(prefix: &str, suffix: &str) -> SmolStr {
140 SmolStr::from(format!("{}.{}", prefix, suffix))
141}
142
143impl<L1, L2, O> CompositionBackend<L1, L2, O, SequentialReadPolicy, OptimisticParallelWritePolicy>
144where
145 L1: Backend,
146 L2: Backend,
147 O: Offload<'static>,
148{
149 pub fn new(l1: L1, l2: L2, offload: O) -> Self {
161 let label = BackendLabel::new_static("composition");
162 let l1_label = compose_label(label.as_str(), l1.label().as_str());
163 let l2_label = compose_label(label.as_str(), l2.label().as_str());
164 let format = CompositionFormat::new(
165 Arc::new(l1.value_format().clone_box()),
166 Arc::new(l2.value_format().clone_box()),
167 Arc::new(l1.compressor().clone_box()),
168 Arc::new(l2.compressor().clone_box()),
169 l1_label.clone(),
170 l2_label.clone(),
171 );
172 Self {
173 l1,
174 l2,
175 format,
176 offload,
177 read_policy: SequentialReadPolicy::new(),
178 write_policy: OptimisticParallelWritePolicy::new(),
179 refill_policy: RefillPolicy::default(),
180 label,
181 l1_label,
182 l2_label,
183 }
184 }
185}
186
187impl<L1, L2, O, R, W> CompositionBackend<L1, L2, O, R, W>
188where
189 L1: Backend,
190 L2: Backend,
191 O: Offload<'static>,
192 R: CompositionReadPolicy,
193 W: CompositionWritePolicy,
194{
195 pub fn read_policy(&self) -> &R {
197 &self.read_policy
198 }
199
200 pub fn write_policy(&self) -> &W {
202 &self.write_policy
203 }
204
205 pub fn refill_policy(&self) -> &RefillPolicy {
207 &self.refill_policy
208 }
209
210 pub fn offload(&self) -> &O {
212 &self.offload
213 }
214
215 pub fn label(mut self, label: impl Into<BackendLabel>) -> Self {
220 self.label = label.into();
221 self.l1_label = compose_label(self.label.as_str(), self.l1.label().as_str());
223 self.l2_label = compose_label(self.label.as_str(), self.l2.label().as_str());
224 self.format
226 .set_labels(self.l1_label.clone(), self.l2_label.clone());
227 self
228 }
229
230 pub fn with_policy<NewR, NewW>(
248 self,
249 policy: CompositionPolicy<NewR, NewW>,
250 ) -> CompositionBackend<L1, L2, O, NewR, NewW>
251 where
252 NewR: CompositionReadPolicy,
253 NewW: CompositionWritePolicy,
254 {
255 CompositionBackend {
256 l1: self.l1,
257 l2: self.l2,
258 format: self.format,
259 offload: self.offload,
260 read_policy: policy.read,
261 write_policy: policy.write,
262 refill_policy: policy.refill,
263 label: self.label,
264 l1_label: self.l1_label,
265 l2_label: self.l2_label,
266 }
267 }
268
269 pub fn read<NewR: CompositionReadPolicy>(
282 self,
283 read_policy: NewR,
284 ) -> CompositionBackend<L1, L2, O, NewR, W> {
285 CompositionBackend {
286 l1: self.l1,
287 l2: self.l2,
288 format: self.format,
289 offload: self.offload,
290 read_policy,
291 write_policy: self.write_policy,
292 refill_policy: self.refill_policy,
293 label: self.label,
294 l1_label: self.l1_label,
295 l2_label: self.l2_label,
296 }
297 }
298
299 pub fn write<NewW: CompositionWritePolicy>(
312 self,
313 write_policy: NewW,
314 ) -> CompositionBackend<L1, L2, O, R, NewW> {
315 CompositionBackend {
316 l1: self.l1,
317 l2: self.l2,
318 format: self.format,
319 offload: self.offload,
320 read_policy: self.read_policy,
321 write_policy,
322 refill_policy: self.refill_policy,
323 label: self.label,
324 l1_label: self.l1_label,
325 l2_label: self.l2_label,
326 }
327 }
328
329 pub fn refill(mut self, refill_policy: RefillPolicy) -> Self {
342 self.refill_policy = refill_policy;
343 self
344 }
345}
346
347impl<L1, L2, O, R, W> Clone for CompositionBackend<L1, L2, O, R, W>
348where
349 L1: Clone + Backend,
350 L2: Clone + Backend,
351 O: Offload<'static>,
352 R: Clone + CompositionReadPolicy,
353 W: Clone + CompositionWritePolicy,
354{
355 fn clone(&self) -> Self {
356 Self {
357 l1: self.l1.clone(),
358 l2: self.l2.clone(),
359 format: self.format.clone(),
360 offload: self.offload.clone(),
361 read_policy: self.read_policy.clone(),
362 write_policy: self.write_policy.clone(),
363 refill_policy: self.refill_policy,
364 label: self.label.clone(),
365 l1_label: self.l1_label.clone(),
366 l2_label: self.l2_label.clone(),
367 }
368 }
369}
370
371impl<L1, L2, O, R, W> std::fmt::Debug for CompositionBackend<L1, L2, O, R, W>
372where
373 L1: std::fmt::Debug + Backend,
374 L2: std::fmt::Debug + Backend,
375 O: std::fmt::Debug + Offload<'static>,
376 R: std::fmt::Debug + CompositionReadPolicy,
377 W: std::fmt::Debug + CompositionWritePolicy,
378{
379 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
380 f.debug_struct("CompositionBackend")
381 .field("label", &self.label)
382 .field("l1", &self.l1)
383 .field("l2", &self.l2)
384 .field("format", &self.format)
385 .field("offload", &self.offload)
386 .field("read_policy", &self.read_policy)
387 .field("write_policy", &self.write_policy)
388 .field("refill_policy", &self.refill_policy)
389 .finish()
390 }
391}
392
393#[async_trait]
402impl<L1, L2, O, R, W> Backend for CompositionBackend<L1, L2, O, R, W>
403where
404 L1: Backend + Clone + Send + Sync + 'static,
405 L2: Backend + Clone + Send + Sync + 'static,
406 O: Offload<'static>,
407 R: CompositionReadPolicy,
408 W: CompositionWritePolicy,
409{
410 #[tracing::instrument(skip(self), level = "trace")]
411 async fn read(&self, key: &CacheKey) -> BackendResult<Option<CacheValue<Raw>>> {
412 let l1 = self.l1.clone();
414 let l2 = self.l2.clone();
415 let l1_label = self.l1_label.clone();
417 let l2_label = self.l2_label.clone();
418
419 let read_l1_with_envelope = |k: CacheKey| async move {
420 let ctx: BoxContext = CacheContext::default().boxed();
421 let timer = Timer::new();
422 let read_result = l1.read(&k).await;
423 crate::metrics::record_read(&l1_label, timer.elapsed());
424
425 let result = match read_result {
426 Ok(Some(l1_value)) => {
427 crate::metrics::record_read_bytes(&l1_label, l1_value.data().len());
428 let (expire, stale) = (l1_value.expire(), l1_value.stale());
429 let envelope = CompositionEnvelope::L1(l1_value);
430 match envelope.serialize() {
431 Ok(packed) => Ok(Some(CacheValue::new(packed, expire, stale))),
432 Err(e) => Err(e),
433 }
434 }
435 Ok(None) => Ok(None),
436 Err(e) => {
437 crate::metrics::record_read_error(&l1_label);
438 Err(e)
439 }
440 };
441 (result, ctx)
442 };
443
444 let read_l2_with_envelope = |k: CacheKey| async move {
445 let ctx: BoxContext = CacheContext::default().boxed();
446 let timer = Timer::new();
447 let read_result = l2.read(&k).await;
448 crate::metrics::record_read(&l2_label, timer.elapsed());
449
450 let result = match read_result {
451 Ok(Some(l2_value)) => {
452 crate::metrics::record_read_bytes(&l2_label, l2_value.data().len());
453 let (expire, stale) = (l2_value.expire(), l2_value.stale());
454 let envelope = CompositionEnvelope::L2(l2_value);
455 match envelope.serialize() {
456 Ok(packed) => Ok(Some(CacheValue::new(packed, expire, stale))),
457 Err(e) => Err(e),
458 }
459 }
460 Ok(None) => Ok(None),
461 Err(e) => {
462 crate::metrics::record_read_error(&l2_label);
463 Err(e)
464 }
465 };
466 (result, ctx)
467 };
468
469 let ReadResult { value, .. } = self
470 .read_policy
471 .execute_with(
472 key.clone(),
473 read_l1_with_envelope,
474 read_l2_with_envelope,
475 &self.offload,
476 )
477 .await?;
478
479 Ok(value)
481 }
482
483 #[tracing::instrument(skip(self, value), level = "trace")]
484 async fn write(&self, key: &CacheKey, value: CacheValue<Raw>) -> BackendResult<()> {
485 let composition = CompositionEnvelope::deserialize(value.data())?;
487
488 match composition {
492 CompositionEnvelope::Both { l1, l2 } => {
493 let l1_backend = self.l1.clone();
495 let l2_backend = self.l2.clone();
496 let l1_label = self.l1_label.clone();
498 let l2_label = self.l2_label.clone();
499 let l1_len = l1.data().len();
500 let l2_len = l2.data().len();
501
502 let write_l1 = |k: CacheKey| async move {
503 let timer = Timer::new();
504 let result = l1_backend.write(&k, l1).await;
505 crate::metrics::record_write(&l1_label, timer.elapsed());
506 match &result {
507 Ok(()) => crate::metrics::record_write_bytes(&l1_label, l1_len),
508 Err(_) => crate::metrics::record_write_error(&l1_label),
509 }
510 result
511 };
512 let write_l2 = |k: CacheKey| async move {
513 let timer = Timer::new();
514 let result = l2_backend.write(&k, l2).await;
515 crate::metrics::record_write(&l2_label, timer.elapsed());
516 match &result {
517 Ok(()) => crate::metrics::record_write_bytes(&l2_label, l2_len),
518 Err(_) => crate::metrics::record_write_error(&l2_label),
519 }
520 result
521 };
522
523 self.write_policy
524 .execute_with(key.clone(), write_l1, write_l2, &self.offload)
525 .await
526 }
527 CompositionEnvelope::L1(l1) => {
528 let l1_len = l1.data().len();
529 let timer = Timer::new();
530 let result = self.l1.write(key, l1).await;
531 crate::metrics::record_write(&self.l1_label, timer.elapsed());
532 match &result {
533 Ok(()) => crate::metrics::record_write_bytes(&self.l1_label, l1_len),
534 Err(_) => crate::metrics::record_write_error(&self.l1_label),
535 }
536 result
537 }
538 CompositionEnvelope::L2(l2) => {
539 let l2_len = l2.data().len();
540 let timer = Timer::new();
541 let result = self.l2.write(key, l2).await;
542 crate::metrics::record_write(&self.l2_label, timer.elapsed());
543 match &result {
544 Ok(()) => crate::metrics::record_write_bytes(&self.l2_label, l2_len),
545 Err(_) => crate::metrics::record_write_error(&self.l2_label),
546 }
547 result
548 }
549 }
550 }
551
552 #[tracing::instrument(skip(self), level = "trace")]
553 async fn remove(&self, key: &CacheKey) -> BackendResult<DeleteStatus> {
554 let (l1_result, l2_result) = futures::join!(self.l1.remove(key), self.l2.remove(key));
556
557 match (l1_result, l2_result) {
558 (Err(e1), Err(e2)) => {
559 tracing::error!(l1_error = ?e1, l2_error = ?e2, "Both L1 and L2 delete failed");
560 Err(BackendError::InternalError(Box::new(
561 CompositionError::BothLayersFailed { l1: e1, l2: e2 },
562 )))
563 }
564 (Err(e), Ok(status)) => {
565 tracing::warn!(error = ?e, "L1 delete failed");
566 Ok(status)
567 }
568 (Ok(status), Err(e)) => {
569 tracing::warn!(error = ?e, "L2 delete failed");
570 Ok(status)
571 }
572 (Ok(DeleteStatus::Deleted(n1)), Ok(DeleteStatus::Deleted(n2))) => {
573 Ok(DeleteStatus::Deleted(n1 + n2))
574 }
575 (Ok(DeleteStatus::Deleted(n)), Ok(DeleteStatus::Missing))
576 | (Ok(DeleteStatus::Missing), Ok(DeleteStatus::Deleted(n))) => {
577 Ok(DeleteStatus::Deleted(n))
578 }
579 (Ok(DeleteStatus::Missing), Ok(DeleteStatus::Missing)) => Ok(DeleteStatus::Missing),
580 }
581 }
582
583 fn label(&self) -> BackendLabel {
584 self.label.clone()
585 }
586
587 fn value_format(&self) -> &dyn Format {
588 &self.format
589 }
590
591 fn key_format(&self) -> &CacheKeyFormat {
592 &CacheKeyFormat::Bitcode
593 }
594
595 fn compressor(&self) -> &dyn Compressor {
596 &PassthroughCompressor
597 }
598}
599
600impl<L1, L2, O, R, W> CacheBackend for CompositionBackend<L1, L2, O, R, W>
601where
602 L1: CacheBackend + Clone + Send + Sync + 'static,
603 L2: CacheBackend + Clone + Send + Sync + 'static,
604 O: Offload<'static>,
605 R: CompositionReadPolicy,
606 W: CompositionWritePolicy,
607{
608 #[tracing::instrument(skip(self, ctx), level = "trace")]
609 async fn get<T>(
610 &self,
611 key: &CacheKey,
612 ctx: &mut BoxContext,
613 ) -> BackendResult<Option<CacheValue<T::Cached>>>
614 where
615 T: CacheableResponse,
616 T::Cached: Cacheable,
617 {
618 let l1 = self.l1.clone();
620 let l2 = self.l2.clone();
621
622 let l1_label = self.l1_label.clone();
624 let l2_label = self.l2_label.clone();
625
626 let l1_name = l1.label();
628 let l2_name = l2.label();
629
630 let format_for_l1 = self.format.clone();
632 let format_for_l2 = self.format.clone();
633
634 let l1_ctx = ctx.clone_box();
636 let l2_ctx = ctx.clone_box();
637
638 let read_l1 = |k: CacheKey| async move {
639 let mut internal_ctx = l1_ctx;
640
641 let read_timer = Timer::new();
643 let read_result = l1.read(&k).await;
644 crate::metrics::record_read(&l1_label, read_timer.elapsed());
645
646 let result = match read_result {
647 Ok(Some(raw_value)) => {
648 let (meta, raw_data) = raw_value.into_parts();
649 crate::metrics::record_read_bytes(&l1_label, raw_data.len());
650
651 let mut deserialized_opt: Option<T::Cached> = None;
653 match format_for_l1.deserialize_layer(
654 &raw_data,
655 CompositionLayer::L1,
656 &mut |deserializer| {
657 let value: T::Cached = deserializer.deserialize()?;
658 deserialized_opt = Some(value);
659 Ok(())
660 },
661 &mut internal_ctx,
662 ) {
663 Ok(()) => match deserialized_opt {
664 Some(deserialized) => {
665 internal_ctx.set_status(CacheStatus::Hit);
667
668 let source = if let Some(comp_ctx) =
671 internal_ctx.as_any().downcast_ref::<CompositionContext>()
672 {
673 BackendLabel::from(
675 comp_ctx.format.label_for_layer(comp_ctx.layer).clone(),
676 )
677 } else {
678 l1_name.clone()
680 };
681 internal_ctx.set_source(ResponseSource::Backend(source));
682
683 Ok(Some(CacheValue::new(deserialized, meta.expire, meta.stale)))
684 }
685 None => Err(BackendError::InternalError(Box::new(
686 std::io::Error::other("deserialization produced no result"),
687 ))),
688 },
689 Err(e) => Err(BackendError::InternalError(Box::new(e))),
690 }
691 }
692 Ok(None) => Ok(None),
693 Err(e) => {
694 crate::metrics::record_read_error(&l1_label);
695 Err(e)
696 }
697 };
698
699 (result, internal_ctx)
700 };
701
702 let read_l2 = |k: CacheKey| async move {
703 let mut internal_ctx = l2_ctx;
704
705 let read_timer = Timer::new();
707 let read_result = l2.read(&k).await;
708 crate::metrics::record_read(&l2_label, read_timer.elapsed());
709
710 let result = match read_result {
711 Ok(Some(raw_value)) => {
712 let (meta, raw_data) = raw_value.into_parts();
713 crate::metrics::record_read_bytes(&l2_label, raw_data.len());
714
715 let mut deserialized_opt: Option<T::Cached> = None;
719 match format_for_l2.deserialize_layer(
720 &raw_data,
721 CompositionLayer::L2,
722 &mut |deserializer| {
723 let value: T::Cached = deserializer.deserialize()?;
724 deserialized_opt = Some(value);
725 Ok(())
726 },
727 &mut internal_ctx,
728 ) {
729 Ok(()) => match deserialized_opt {
730 Some(deserialized) => {
731 let cache_value =
732 CacheValue::new(deserialized, meta.expire, meta.stale);
733
734 internal_ctx.set_status(CacheStatus::Hit);
736
737 let source = if let Some(comp_ctx) =
740 internal_ctx.as_any().downcast_ref::<CompositionContext>()
741 {
742 BackendLabel::from(
744 comp_ctx.format.label_for_layer(comp_ctx.layer).clone(),
745 )
746 } else {
747 l2_name.clone()
749 };
750 internal_ctx.set_source(ResponseSource::Backend(source));
751
752 Ok(Some(cache_value))
753 }
754 None => Err(BackendError::InternalError(Box::new(
755 std::io::Error::other("deserialization produced no result"),
756 ))),
757 },
758 Err(e) => Err(BackendError::InternalError(Box::new(e))),
759 }
760 }
761 Ok(None) => Ok(None),
762 Err(e) => {
763 crate::metrics::record_read_error(&l2_label);
764 Err(e)
765 }
766 };
767
768 (result, internal_ctx)
769 };
770
771 let ReadResult {
772 value,
773 source,
774 context: inner_ctx,
775 } = self
776 .read_policy
777 .execute_with(key.clone(), read_l1, read_l2, &self.offload)
778 .await?;
779
780 if let Some(ref _cache_value) = value {
782 ctx.merge_from(&*inner_ctx, &self.label);
783
784 if source == CompositionLayer::L2 && self.refill_policy == RefillPolicy::Always {
787 ctx.set_read_mode(hitbox_core::ReadMode::Refill);
788 }
789 }
790
791 Ok(value)
792 }
793
794 #[tracing::instrument(skip(self, value, ctx), level = "trace")]
795 async fn set<T>(
796 &self,
797 key: &CacheKey,
798 value: &CacheValue<T::Cached>,
799 ctx: &mut BoxContext,
800 ) -> BackendResult<()>
801 where
802 T: CacheableResponse,
803 T::Cached: Cacheable,
804 {
805 use hitbox_core::ReadMode;
806
807 if ctx.read_mode() == ReadMode::Refill {
810 match self.refill_policy {
811 RefillPolicy::Always => {
812 let l1_bytes = self
814 .format
815 .serialize_layer(
816 CompositionLayer::L1,
817 &mut |serializer| {
818 serializer.serialize(value.data())?;
819 Ok(())
820 },
821 &**ctx,
822 )
823 .map_err(|e| BackendError::InternalError(Box::new(e)))?;
824
825 let l1_len = l1_bytes.len();
826 let l1_value = CacheValue::new(l1_bytes, value.expire(), value.stale());
827
828 let timer = Timer::new();
830 let result = self.l1.write(key, l1_value).await;
831 crate::metrics::record_write(&self.l1_label, timer.elapsed());
832 match &result {
833 Ok(()) => crate::metrics::record_write_bytes(&self.l1_label, l1_len),
834 Err(_) => crate::metrics::record_write_error(&self.l1_label),
835 }
836 result?;
837
838 return self.l2.set::<T>(key, value, ctx).await;
841 }
842 RefillPolicy::Never => {
843 return Ok(());
846 }
847 }
848 }
849
850 if let Some(comp_ctx) = ctx.as_any().downcast_ref::<CompositionContext>()
853 && comp_ctx.layer == CompositionLayer::L2
854 {
855 match self.refill_policy {
856 RefillPolicy::Always => {
857 let l1_bytes = self
859 .format
860 .serialize_layer(
861 CompositionLayer::L1,
862 &mut |serializer| {
863 serializer.serialize(value.data())?;
864 Ok(())
865 },
866 &**ctx,
867 )
868 .map_err(|e| BackendError::InternalError(Box::new(e)))?;
869
870 let l1_len = l1_bytes.len();
871 let l1_value = CacheValue::new(l1_bytes, value.expire(), value.stale());
872
873 let timer = Timer::new();
875 let result = self.l1.write(key, l1_value).await;
876 crate::metrics::record_write(&self.l1_label, timer.elapsed());
877 match &result {
878 Ok(()) => crate::metrics::record_write_bytes(&self.l1_label, l1_len),
879 Err(_) => crate::metrics::record_write_error(&self.l1_label),
880 }
881 result?;
882
883 let mut inner_ctx = comp_ctx.inner().clone_box();
886 return self.l2.set::<T>(key, value, &mut inner_ctx).await;
887 }
888 RefillPolicy::Never => {
889 let mut inner_ctx = comp_ctx.inner().clone_box();
891 return self.l2.set::<T>(key, value, &mut inner_ctx).await;
892 }
893 }
894 }
895
896 let (l1_bytes, l2_bytes) = self
900 .format
901 .serialize_parts(
902 &mut |serializer| {
903 serializer.serialize(value.data())?;
904 Ok(())
905 },
906 &**ctx,
907 )
908 .map_err(|e| BackendError::InternalError(Box::new(e)))?;
909
910 let l1_len = l1_bytes.len();
911 let l2_len = l2_bytes.len();
912
913 let l1_value = CacheValue::new(l1_bytes, value.expire(), value.stale());
915 let l2_value = CacheValue::new(l2_bytes, value.expire(), value.stale());
916
917 let l1 = self.l1.clone();
919 let l2 = self.l2.clone();
920
921 let l1_label = self.l1_label.clone();
923 let l2_label = self.l2_label.clone();
924
925 let write_l1 = |k: CacheKey| async move {
927 let timer = Timer::new();
928 let result = l1.write(&k, l1_value).await;
929 crate::metrics::record_write(&l1_label, timer.elapsed());
930 match &result {
931 Ok(()) => crate::metrics::record_write_bytes(&l1_label, l1_len),
932 Err(_) => crate::metrics::record_write_error(&l1_label),
933 }
934 result
935 };
936
937 let write_l2 = |k: CacheKey| async move {
938 let timer = Timer::new();
939 let result = l2.write(&k, l2_value).await;
940 crate::metrics::record_write(&l2_label, timer.elapsed());
941 match &result {
942 Ok(()) => crate::metrics::record_write_bytes(&l2_label, l2_len),
943 Err(_) => crate::metrics::record_write_error(&l2_label),
944 }
945 result
946 };
947
948 self.write_policy
949 .execute_with(key.clone(), write_l1, write_l2, &self.offload)
950 .await
951 }
952
953 #[tracing::instrument(skip(self, ctx), level = "trace")]
954 async fn delete(&self, key: &CacheKey, ctx: &mut BoxContext) -> BackendResult<DeleteStatus> {
955 let mut l1_ctx = ctx.clone_box();
957 let mut l2_ctx = ctx.clone_box();
958 let (l1_result, l2_result) = futures::join!(
959 self.l1.delete(key, &mut l1_ctx),
960 self.l2.delete(key, &mut l2_ctx)
961 );
962
963 match (l1_result, l2_result) {
965 (Err(e1), Err(e2)) => {
966 tracing::error!(l1_error = ?e1, l2_error = ?e2, "Both L1 and L2 delete failed");
967 Err(BackendError::InternalError(Box::new(
968 CompositionError::BothLayersFailed { l1: e1, l2: e2 },
969 )))
970 }
971 (Err(e), Ok(status)) => {
972 tracing::warn!(error = ?e, "L1 delete failed");
973 Ok(status)
974 }
975 (Ok(status), Err(e)) => {
976 tracing::warn!(error = ?e, "L2 delete failed");
977 Ok(status)
978 }
979 (Ok(DeleteStatus::Deleted(n1)), Ok(DeleteStatus::Deleted(n2))) => {
980 tracing::trace!("Deleted from both L1 and L2");
981 Ok(DeleteStatus::Deleted(n1 + n2))
982 }
983 (Ok(DeleteStatus::Deleted(n)), Ok(DeleteStatus::Missing))
984 | (Ok(DeleteStatus::Missing), Ok(DeleteStatus::Deleted(n))) => {
985 tracing::trace!("Deleted from one layer");
986 Ok(DeleteStatus::Deleted(n))
987 }
988 (Ok(DeleteStatus::Missing), Ok(DeleteStatus::Missing)) => {
989 tracing::trace!("Key missing from both layers");
990 Ok(DeleteStatus::Missing)
991 }
992 }
993 }
994}
995
996#[cfg(test)]
997mod tests {
998 use super::*;
999 use crate::format::{Format, JsonFormat};
1000 use crate::{Backend, CacheKeyFormat, Compressor, PassthroughCompressor};
1001 use async_trait::async_trait;
1002 use chrono::Utc;
1003 use hitbox_core::{
1004 BoxContext, CacheContext, CachePolicy, CacheStatus, CacheValue, CacheableResponse,
1005 EntityPolicyConfig, Predicate, Raw, ResponseSource,
1006 };
1007 use serde::{Deserialize, Serialize};
1008 use smol_str::SmolStr;
1009 use std::collections::HashMap;
1010 use std::future::Future;
1011 use std::sync::{Arc, Mutex};
1012
1013 #[cfg(feature = "rkyv_format")]
1014 use rkyv::{Archive, Serialize as RkyvSerialize};
1015
1016 #[derive(Clone, Debug)]
1018 struct TestOffload;
1019
1020 impl Offload<'static> for TestOffload {
1021 #[allow(deprecated)]
1022 fn spawn<F>(&self, _kind: impl Into<SmolStr>, future: F)
1023 where
1024 F: Future<Output = ()> + Send + 'static,
1025 {
1026 tokio::spawn(future);
1027 }
1028 }
1029
1030 #[derive(Clone, Debug)]
1032 struct TestBackend {
1033 store: Arc<Mutex<HashMap<CacheKey, CacheValue<Raw>>>>,
1034 backend_label: &'static str,
1035 }
1036
1037 impl TestBackend {
1038 fn new() -> Self {
1039 Self {
1040 store: Arc::new(Mutex::new(HashMap::new())),
1041 backend_label: "test",
1042 }
1043 }
1044
1045 fn with_label(label: &'static str) -> Self {
1046 Self {
1047 store: Arc::new(Mutex::new(HashMap::new())),
1048 backend_label: label,
1049 }
1050 }
1051 }
1052
1053 #[async_trait]
1054 impl Backend for TestBackend {
1055 async fn read(&self, key: &CacheKey) -> BackendResult<Option<CacheValue<Raw>>> {
1056 Ok(self.store.lock().unwrap().get(key).cloned())
1057 }
1058
1059 async fn write(&self, key: &CacheKey, value: CacheValue<Raw>) -> BackendResult<()> {
1060 self.store.lock().unwrap().insert(key.clone(), value);
1061 Ok(())
1062 }
1063
1064 async fn remove(&self, key: &CacheKey) -> BackendResult<DeleteStatus> {
1065 match self.store.lock().unwrap().remove(key) {
1066 Some(_) => Ok(DeleteStatus::Deleted(1)),
1067 None => Ok(DeleteStatus::Missing),
1068 }
1069 }
1070
1071 fn label(&self) -> BackendLabel {
1072 BackendLabel::new(self.backend_label)
1073 }
1074
1075 fn value_format(&self) -> &dyn Format {
1076 &JsonFormat
1077 }
1078
1079 fn key_format(&self) -> &CacheKeyFormat {
1080 &CacheKeyFormat::Bitcode
1081 }
1082
1083 fn compressor(&self) -> &dyn Compressor {
1084 &PassthroughCompressor
1085 }
1086 }
1087
1088 impl CacheBackend for TestBackend {}
1089
1090 #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
1091 #[cfg_attr(
1092 feature = "rkyv_format",
1093 derive(Archive, RkyvSerialize, rkyv::Deserialize)
1094 )]
1095 struct CachedData {
1096 value: String,
1097 }
1098
1099 struct MockResponse;
1102
1103 impl CacheableResponse for MockResponse {
1106 type Cached = CachedData;
1107 type Subject = MockResponse;
1108 type IntoCachedFuture = std::future::Ready<CachePolicy<Self::Cached, Self>>;
1109 type FromCachedFuture = std::future::Ready<Self>;
1110
1111 async fn cache_policy<P: Predicate<Subject = Self::Subject> + Send + Sync>(
1112 self,
1113 _predicate: P,
1114 _config: &EntityPolicyConfig,
1115 ) -> CachePolicy<CacheValue<Self::Cached>, Self> {
1116 unimplemented!("Not used in these tests")
1117 }
1118
1119 fn into_cached(self) -> Self::IntoCachedFuture {
1120 unimplemented!("Not used in these tests")
1121 }
1122
1123 fn from_cached(_cached: Self::Cached) -> Self::FromCachedFuture {
1124 unimplemented!("Not used in these tests")
1125 }
1126 }
1127
1128 #[tokio::test]
1129 async fn test_l1_hit() {
1130 let l1 = TestBackend::with_label("moka");
1131 let l2 = TestBackend::with_label("redis");
1132 let backend = CompositionBackend::new(l1.clone(), l2, TestOffload).label("cache");
1133
1134 let key = CacheKey::from_str("test", "key1");
1135 let value = CacheValue::new(
1136 CachedData {
1137 value: "value1".to_string(),
1138 },
1139 Some(Utc::now() + chrono::Duration::seconds(60)),
1140 None,
1141 );
1142
1143 let mut ctx: BoxContext = CacheContext::default().boxed();
1145 backend
1146 .set::<MockResponse>(&key, &value, &mut ctx)
1147 .await
1148 .unwrap();
1149
1150 let mut ctx: BoxContext = CacheContext::default().boxed();
1152 let result = backend.get::<MockResponse>(&key, &mut ctx).await.unwrap();
1153 assert_eq!(result.unwrap().data().value, "value1");
1154
1155 assert_eq!(ctx.status(), CacheStatus::Hit);
1157 assert_eq!(ctx.source(), &ResponseSource::Backend("cache.moka".into()));
1158 }
1159
1160 #[tokio::test]
1161 async fn test_l2_hit_sets_refill_mode() {
1162 use hitbox_core::ReadMode;
1163
1164 let l1 = TestBackend::with_label("moka");
1165 let l2 = TestBackend::with_label("redis");
1166
1167 let key = CacheKey::from_str("test", "key1");
1168 let value = CacheValue::new(
1169 CachedData {
1170 value: "value1".to_string(),
1171 },
1172 Some(Utc::now() + chrono::Duration::seconds(60)),
1173 None,
1174 );
1175
1176 let backend = CompositionBackend::new(l1.clone(), l2.clone(), TestOffload)
1178 .label("cache")
1179 .refill(RefillPolicy::Always);
1180
1181 let mut ctx: BoxContext = CacheContext::default().boxed();
1183 backend
1184 .set::<MockResponse>(&key, &value, &mut ctx)
1185 .await
1186 .unwrap();
1187
1188 l1.store.lock().unwrap().clear();
1190
1191 let mut ctx: BoxContext = CacheContext::default().boxed();
1193 let result = backend.get::<MockResponse>(&key, &mut ctx).await.unwrap();
1194 assert_eq!(result.unwrap().data().value, "value1");
1195
1196 assert_eq!(ctx.status(), CacheStatus::Hit);
1198 assert_eq!(ctx.source(), &ResponseSource::Backend("cache.redis".into()));
1199
1200 assert_eq!(ctx.read_mode(), ReadMode::Refill);
1202
1203 let mut ctx: BoxContext = CacheContext::default().boxed();
1205 let l1_result = l1.get::<MockResponse>(&key, &mut ctx).await.unwrap();
1206 assert!(
1207 l1_result.is_none(),
1208 "L1 should not be populated directly by get()"
1209 );
1210 }
1211
1212 #[tokio::test]
1213 async fn test_miss_both_layers() {
1214 let l1 = TestBackend::new();
1215 let l2 = TestBackend::new();
1216 let backend = CompositionBackend::new(l1, l2, TestOffload);
1217
1218 let key = CacheKey::from_str("test", "nonexistent");
1219
1220 let mut ctx: BoxContext = CacheContext::default().boxed();
1221 let result = backend.get::<MockResponse>(&key, &mut ctx).await.unwrap();
1222 assert!(result.is_none());
1223 }
1224
1225 #[tokio::test]
1226 async fn test_write_to_both_layers() {
1227 let l1 = TestBackend::new();
1228 let l2 = TestBackend::new();
1229
1230 let key = CacheKey::from_str("test", "key1");
1231 let value = CacheValue::new(
1232 CachedData {
1233 value: "value1".to_string(),
1234 },
1235 Some(Utc::now() + chrono::Duration::seconds(60)),
1236 None,
1237 );
1238
1239 let backend = CompositionBackend::new(l1.clone(), l2.clone(), TestOffload);
1240
1241 let mut ctx: BoxContext = CacheContext::default().boxed();
1242 backend
1243 .set::<MockResponse>(&key, &value, &mut ctx)
1244 .await
1245 .unwrap();
1246
1247 let mut ctx: BoxContext = CacheContext::default().boxed();
1249 let l1_result = l1.get::<MockResponse>(&key, &mut ctx).await.unwrap();
1250 assert_eq!(l1_result.unwrap().data().value, "value1");
1251
1252 let mut ctx: BoxContext = CacheContext::default().boxed();
1253 let l2_result = l2.get::<MockResponse>(&key, &mut ctx).await.unwrap();
1254 assert_eq!(l2_result.unwrap().data().value, "value1");
1255 }
1256
1257 #[tokio::test]
1258 async fn test_delete_from_both_layers() {
1259 let l1 = TestBackend::new();
1260 let l2 = TestBackend::new();
1261
1262 let key = CacheKey::from_str("test", "key1");
1263 let value = CacheValue::new(
1264 CachedData {
1265 value: "value1".to_string(),
1266 },
1267 Some(Utc::now() + chrono::Duration::seconds(60)),
1268 None,
1269 );
1270
1271 let backend = CompositionBackend::new(l1.clone(), l2.clone(), TestOffload);
1272
1273 let mut ctx: BoxContext = CacheContext::default().boxed();
1275 backend
1276 .set::<MockResponse>(&key, &value, &mut ctx)
1277 .await
1278 .unwrap();
1279
1280 let mut ctx: BoxContext = CacheContext::default().boxed();
1282 let status = backend.delete(&key, &mut ctx).await.unwrap();
1283 assert_eq!(status, DeleteStatus::Deleted(2));
1284
1285 let mut ctx: BoxContext = CacheContext::default().boxed();
1287 let l1_result = l1.get::<MockResponse>(&key, &mut ctx).await.unwrap();
1288 assert!(l1_result.is_none());
1289
1290 let mut ctx: BoxContext = CacheContext::default().boxed();
1291 let l2_result = l2.get::<MockResponse>(&key, &mut ctx).await.unwrap();
1292 assert!(l2_result.is_none());
1293 }
1294
1295 #[tokio::test]
1296 async fn test_clone() {
1297 let l1 = TestBackend::new();
1298 let l2 = TestBackend::new();
1299 let backend = CompositionBackend::new(l1, l2, TestOffload);
1300
1301 let cloned = backend.clone();
1302
1303 let key = CacheKey::from_str("test", "key1");
1304 let value = CacheValue::new(
1305 CachedData {
1306 value: "value1".to_string(),
1307 },
1308 Some(Utc::now() + chrono::Duration::seconds(60)),
1309 None,
1310 );
1311
1312 let mut ctx: BoxContext = CacheContext::default().boxed();
1314 backend
1315 .set::<MockResponse>(&key, &value, &mut ctx)
1316 .await
1317 .unwrap();
1318
1319 let mut ctx: BoxContext = CacheContext::default().boxed();
1321 let result = cloned.get::<MockResponse>(&key, &mut ctx).await.unwrap();
1322 assert_eq!(result.unwrap().data().value, "value1");
1323 }
1324
1325 #[tokio::test]
1326 async fn test_nested_composition_source_path() {
1327 let l1 = TestBackend::with_label("moka");
1331 let l2 = TestBackend::with_label("redis");
1332 let l3 = TestBackend::with_label("disk");
1333
1334 let inner = CompositionBackend::new(l1.clone(), l2.clone(), TestOffload).label("inner");
1336
1337 let outer = CompositionBackend::new(inner, l3.clone(), TestOffload).label("outer");
1339
1340 let key = CacheKey::from_str("test", "nested");
1341 let value = CacheValue::new(
1342 CachedData {
1343 value: "nested_value".to_string(),
1344 },
1345 Some(Utc::now() + chrono::Duration::seconds(60)),
1346 None,
1347 );
1348
1349 let mut ctx: BoxContext = CacheContext::default().boxed();
1351 l1.set::<MockResponse>(&key, &value, &mut ctx)
1352 .await
1353 .unwrap();
1354
1355 let mut ctx: BoxContext = CacheContext::default().boxed();
1357 let result = outer.get::<MockResponse>(&key, &mut ctx).await.unwrap();
1358 assert_eq!(result.unwrap().data().value, "nested_value");
1359
1360 assert_eq!(ctx.status(), CacheStatus::Hit);
1362 assert_eq!(
1363 ctx.source(),
1364 &ResponseSource::Backend("outer.inner.moka".into())
1365 );
1366 }
1367
1368 #[tokio::test]
1369 async fn test_nested_composition_l2_source_path() {
1370 let l1 = TestBackend::with_label("moka");
1373 let l2 = TestBackend::with_label("redis");
1374 let l3 = TestBackend::with_label("disk");
1375
1376 let inner = CompositionBackend::new(l1.clone(), l2.clone(), TestOffload).label("inner");
1378
1379 let outer = CompositionBackend::new(inner, l3.clone(), TestOffload).label("outer");
1381
1382 let key = CacheKey::from_str("test", "nested_l2");
1383 let value = CacheValue::new(
1384 CachedData {
1385 value: "from_redis".to_string(),
1386 },
1387 Some(Utc::now() + chrono::Duration::seconds(60)),
1388 None,
1389 );
1390
1391 let mut ctx: BoxContext = CacheContext::default().boxed();
1393 l2.set::<MockResponse>(&key, &value, &mut ctx)
1394 .await
1395 .unwrap();
1396
1397 let mut ctx: BoxContext = CacheContext::default().boxed();
1399 let result = outer.get::<MockResponse>(&key, &mut ctx).await.unwrap();
1400 assert_eq!(result.unwrap().data().value, "from_redis");
1401
1402 assert_eq!(ctx.status(), CacheStatus::Hit);
1404 assert_eq!(
1405 ctx.source(),
1406 &ResponseSource::Backend("outer.inner.redis".into())
1407 );
1408 }
1409
1410 #[tokio::test]
1411 async fn test_nested_composition_outer_l2_source_path() {
1412 let l1 = TestBackend::with_label("moka");
1415 let l2 = TestBackend::with_label("redis");
1416 let l3 = TestBackend::with_label("disk");
1417
1418 let inner = CompositionBackend::new(l1.clone(), l2.clone(), TestOffload).label("inner");
1420
1421 let outer = CompositionBackend::new(inner, l3.clone(), TestOffload).label("outer");
1423
1424 let key = CacheKey::from_str("test", "outer_l2");
1425 let value = CacheValue::new(
1426 CachedData {
1427 value: "from_disk".to_string(),
1428 },
1429 Some(Utc::now() + chrono::Duration::seconds(60)),
1430 None,
1431 );
1432
1433 let mut ctx: BoxContext = CacheContext::default().boxed();
1435 l3.set::<MockResponse>(&key, &value, &mut ctx)
1436 .await
1437 .unwrap();
1438
1439 let mut ctx: BoxContext = CacheContext::default().boxed();
1441 let result = outer.get::<MockResponse>(&key, &mut ctx).await.unwrap();
1442 assert_eq!(result.unwrap().data().value, "from_disk");
1443
1444 assert_eq!(ctx.status(), CacheStatus::Hit);
1446 assert_eq!(ctx.source(), &ResponseSource::Backend("outer.disk".into()));
1447 }
1448
1449 #[tokio::test]
1450 async fn test_l1_hit_status() {
1451 let l1 = TestBackend::with_label("moka");
1452 let l2 = TestBackend::with_label("redis");
1453 let backend = CompositionBackend::new(l1.clone(), l2, TestOffload).label("cache");
1454
1455 let key = CacheKey::from_str("test", "metrics1");
1456 let value = CacheValue::new(
1457 CachedData {
1458 value: "value1".to_string(),
1459 },
1460 Some(Utc::now() + chrono::Duration::seconds(60)),
1461 None,
1462 );
1463
1464 let mut ctx: BoxContext = CacheContext::default().boxed();
1466 l1.set::<MockResponse>(&key, &value, &mut ctx)
1467 .await
1468 .unwrap();
1469
1470 let mut ctx: BoxContext = CacheContext::default().boxed();
1472 let result = backend.get::<MockResponse>(&key, &mut ctx).await.unwrap();
1473 assert_eq!(result.unwrap().data().value, "value1");
1474
1475 assert_eq!(ctx.status(), CacheStatus::Hit);
1477 assert_eq!(ctx.source(), &ResponseSource::Backend("cache.moka".into()));
1478 }
1479
1480 #[tokio::test]
1481 async fn test_l2_hit_with_refill_via_set() {
1482 use hitbox_core::ReadMode;
1483
1484 let l1 = TestBackend::with_label("moka");
1485 let l2 = TestBackend::with_label("redis");
1486
1487 let key = CacheKey::from_str("test", "metrics2");
1488 let value = CacheValue::new(
1489 CachedData {
1490 value: "from_l2".to_string(),
1491 },
1492 Some(Utc::now() + chrono::Duration::seconds(60)),
1493 None,
1494 );
1495
1496 let backend = CompositionBackend::new(l1.clone(), l2.clone(), TestOffload)
1497 .label("cache")
1498 .refill(RefillPolicy::Always);
1499
1500 let mut ctx: BoxContext = CacheContext::default().boxed();
1502 backend
1503 .set::<MockResponse>(&key, &value, &mut ctx)
1504 .await
1505 .unwrap();
1506
1507 l1.store.lock().unwrap().clear();
1509
1510 let mut ctx: BoxContext = CacheContext::default().boxed();
1512 let result = backend.get::<MockResponse>(&key, &mut ctx).await.unwrap();
1513 let cached_value = result.unwrap();
1514 assert_eq!(cached_value.data().value, "from_l2");
1515
1516 assert_eq!(ctx.status(), CacheStatus::Hit);
1518 assert_eq!(ctx.source(), &ResponseSource::Backend("cache.redis".into()));
1519 assert_eq!(ctx.read_mode(), ReadMode::Refill);
1520
1521 backend
1523 .set::<MockResponse>(&key, &cached_value, &mut ctx)
1524 .await
1525 .unwrap();
1526
1527 let mut ctx: BoxContext = CacheContext::default().boxed();
1529 let result = backend.get::<MockResponse>(&key, &mut ctx).await.unwrap();
1530 assert_eq!(result.unwrap().data().value, "from_l2");
1531 assert_eq!(ctx.source(), &ResponseSource::Backend("cache.moka".into()));
1532 }
1533
1534 #[tokio::test]
1535 async fn test_nested_composition_status() {
1536 let l1 = TestBackend::with_label("moka");
1537 let l2 = TestBackend::with_label("redis");
1538 let l3 = TestBackend::with_label("disk");
1539
1540 let inner = CompositionBackend::new(l1.clone(), l2.clone(), TestOffload).label("inner");
1541 let outer = CompositionBackend::new(inner, l3.clone(), TestOffload).label("outer");
1542
1543 let key = CacheKey::from_str("test", "nested_metrics");
1544 let value = CacheValue::new(
1545 CachedData {
1546 value: "nested".to_string(),
1547 },
1548 Some(Utc::now() + chrono::Duration::seconds(60)),
1549 None,
1550 );
1551
1552 let mut ctx: BoxContext = CacheContext::default().boxed();
1554 l1.set::<MockResponse>(&key, &value, &mut ctx)
1555 .await
1556 .unwrap();
1557
1558 let mut ctx: BoxContext = CacheContext::default().boxed();
1560 let result = outer.get::<MockResponse>(&key, &mut ctx).await.unwrap();
1561 assert_eq!(result.unwrap().data().value, "nested");
1562
1563 assert_eq!(ctx.status(), CacheStatus::Hit);
1565 assert_eq!(
1566 ctx.source(),
1567 &ResponseSource::Backend("outer.inner.moka".into())
1568 );
1569 }
1570}