objectiveai_api/ensemble/fetcher/caching_fetcher.rs
1//! Caching wrapper for ensemble fetchers.
2
3use crate::ctx;
4use futures::FutureExt;
5use std::sync::Arc;
6
7/// Wraps an ensemble fetcher with per-request deduplication caching.
8///
9/// When multiple parts of a request need the same ensemble, this fetcher
10/// ensures only one actual fetch is performed. Subsequent requests for the
11/// same ensemble ID within the same request context share the result.
12#[derive(Debug, Clone)]
13pub struct CachingFetcher<CTXEXT, FENS> {
14 /// The underlying fetcher to delegate to on cache miss.
15 pub inner: Arc<FENS>,
16 _marker: std::marker::PhantomData<CTXEXT>,
17}
18
19impl<CTXEXT, FENS> CachingFetcher<CTXEXT, FENS> {
20 /// Creates a new caching fetcher wrapping the given inner fetcher.
21 pub fn new(inner: Arc<FENS>) -> Self {
22 Self {
23 inner,
24 _marker: std::marker::PhantomData,
25 }
26 }
27}
28
29impl<CTXEXT, FENS> CachingFetcher<CTXEXT, FENS>
30where
31 CTXEXT: Send + Sync + 'static,
32 FENS: super::Fetcher<CTXEXT> + Send + Sync + 'static,
33{
34 /// Fetches an ensemble, using the request-scoped cache for deduplication.
35 ///
36 /// If another fetch for the same ID is already in progress within this
37 /// request context, waits for and shares that result instead of fetching again.
38 pub async fn fetch(
39 &self,
40 ctx: ctx::Context<CTXEXT>,
41 id: &str,
42 ) -> Result<
43 Option<(objectiveai::ensemble::Ensemble, u64)>,
44 objectiveai::error::ResponseError,
45 > {
46 // Clone the shared future while holding the lock, then release the lock before awaiting.
47 // This prevents deadlocks when multiple concurrent fetches hash to the same DashMap shard.
48 let shared = ctx
49 .ensemble_cache
50 .entry(id.to_owned())
51 .or_insert_with(|| {
52 let (tx, rx) = tokio::sync::oneshot::channel();
53 let inner = self.inner.clone();
54 let id = id.to_owned();
55 let ctx = ctx.clone();
56 tokio::spawn(async move {
57 let result = inner.fetch(ctx, &id).await;
58 let _ = tx.send(result);
59 });
60 rx.shared()
61 })
62 .clone();
63 // Lock is now released, safe to await
64 shared.await.unwrap()
65 }
66}