Skip to main content

objectiveai_api/ensemble_llm/fetcher/
caching_fetcher.rs

1//! Caching wrapper for Ensemble LLM fetchers.
2
3use crate::ctx;
4use futures::FutureExt;
5use std::sync::Arc;
6
7/// Wraps an Ensemble LLM fetcher with per-request deduplication caching.
8///
9/// When multiple parts of a request need the same Ensemble LLM, this fetcher
10/// ensures only one actual fetch is performed. Subsequent requests for the
11/// same Ensemble LLM ID within the same request context share the result.
12#[derive(Debug, Clone)]
13pub struct CachingFetcher<CTXEXT, FENSLLM> {
14    /// The underlying fetcher to delegate to on cache miss.
15    pub inner: Arc<FENSLLM>,
16    _marker: std::marker::PhantomData<CTXEXT>,
17}
18
19impl<CTXEXT, FENSLLM> CachingFetcher<CTXEXT, FENSLLM> {
20    /// Creates a new caching fetcher wrapping the given inner fetcher.
21    pub fn new(inner: Arc<FENSLLM>) -> Self {
22        Self {
23            inner,
24            _marker: std::marker::PhantomData,
25        }
26    }
27}
28
29impl<CTXEXT, FENSLLM> CachingFetcher<CTXEXT, FENSLLM>
30where
31    CTXEXT: Send + Sync + 'static,
32    FENSLLM: super::Fetcher<CTXEXT> + Send + Sync + 'static,
33{
34    /// Spawns concurrent fetch tasks for multiple Ensemble LLM IDs.
35    ///
36    /// This allows pre-warming the cache when the set of required IDs is known
37    /// ahead of time, reducing latency by parallelizing the fetches.
38    pub fn spawn_fetches<'id>(
39        &self,
40        ctx: ctx::Context<CTXEXT>,
41        ids: impl Iterator<Item = &'id str>,
42    ) {
43        for id in ids {
44            ctx.ensemble_llm_cache
45                .entry(id.to_owned())
46                .or_insert_with(|| {
47                    let (tx, rx) = tokio::sync::oneshot::channel();
48                    let inner = self.inner.clone();
49                    let id = id.to_owned();
50                    let ctx = ctx.clone();
51                    tokio::spawn(async move {
52                        let result = inner.fetch(ctx, &id).await;
53                        let _ = tx.send(result);
54                    });
55                    rx.shared()
56                });
57        }
58    }
59
60    /// Fetches an Ensemble LLM, using the request-scoped cache for deduplication.
61    ///
62    /// If another fetch for the same ID is already in progress within this
63    /// request context, waits for and shares that result instead of fetching again.
64    pub async fn fetch(
65        &self,
66        ctx: ctx::Context<CTXEXT>,
67        id: &str,
68    ) -> Result<
69        Option<(objectiveai::ensemble_llm::EnsembleLlm, u64)>,
70        objectiveai::error::ResponseError,
71    > {
72        // Clone the shared future while holding the lock, then release the lock before awaiting.
73        // This prevents deadlocks when multiple concurrent fetches hash to the same DashMap shard.
74        let shared = ctx
75            .ensemble_llm_cache
76            .entry(id.to_owned())
77            .or_insert_with(|| {
78                let (tx, rx) = tokio::sync::oneshot::channel();
79                let inner = self.inner.clone();
80                let id = id.to_owned();
81                let ctx = ctx.clone();
82                tokio::spawn(async move {
83                    let result = inner.fetch(ctx, &id).await;
84                    let _ = tx.send(result);
85                });
86                rx.shared()
87            })
88            .clone();
89        // Lock is now released, safe to await
90        shared.await.unwrap()
91    }
92}