Skip to main content

objectiveai_api/ensemble/fetcher/
caching_fetcher.rs

1//! Caching wrapper for ensemble fetchers.
2
3use crate::ctx;
4use futures::FutureExt;
5use std::sync::Arc;
6
7/// Wraps an ensemble fetcher with per-request deduplication caching.
8///
9/// When multiple parts of a request need the same ensemble, this fetcher
10/// ensures only one actual fetch is performed. Subsequent requests for the
11/// same ensemble ID within the same request context share the result.
12#[derive(Debug, Clone)]
13pub struct CachingFetcher<CTXEXT, FENS> {
14    /// The underlying fetcher to delegate to on cache miss.
15    pub inner: Arc<FENS>,
16    _marker: std::marker::PhantomData<CTXEXT>,
17}
18
19impl<CTXEXT, FENS> CachingFetcher<CTXEXT, FENS> {
20    /// Creates a new caching fetcher wrapping the given inner fetcher.
21    pub fn new(inner: Arc<FENS>) -> Self {
22        Self {
23            inner,
24            _marker: std::marker::PhantomData,
25        }
26    }
27}
28
29impl<CTXEXT, FENS> CachingFetcher<CTXEXT, FENS>
30where
31    CTXEXT: Send + Sync + 'static,
32    FENS: super::Fetcher<CTXEXT> + Send + Sync + 'static,
33{
34    /// Fetches an ensemble, using the request-scoped cache for deduplication.
35    ///
36    /// If another fetch for the same ID is already in progress within this
37    /// request context, waits for and shares that result instead of fetching again.
38    pub async fn fetch(
39        &self,
40        ctx: ctx::Context<CTXEXT>,
41        id: &str,
42    ) -> Result<
43        Option<(objectiveai::ensemble::Ensemble, u64)>,
44        objectiveai::error::ResponseError,
45    > {
46        // Clone the shared future while holding the lock, then release the lock before awaiting.
47        // This prevents deadlocks when multiple concurrent fetches hash to the same DashMap shard.
48        let shared = ctx
49            .ensemble_cache
50            .entry(id.to_owned())
51            .or_insert_with(|| {
52                let (tx, rx) = tokio::sync::oneshot::channel();
53                let inner = self.inner.clone();
54                let id = id.to_owned();
55                let ctx = ctx.clone();
56                tokio::spawn(async move {
57                    let result = inner.fetch(ctx, &id).await;
58                    let _ = tx.send(result);
59                });
60                rx.shared()
61            })
62            .clone();
63        // Lock is now released, safe to await
64        shared.await.unwrap()
65    }
66}