objectiveai_api/ensemble_llm/fetcher/caching_fetcher.rs
1//! Caching wrapper for Ensemble LLM fetchers.
2
3use crate::ctx;
4use futures::FutureExt;
5use std::sync::Arc;
6
7/// Wraps an Ensemble LLM fetcher with per-request deduplication caching.
8///
9/// When multiple parts of a request need the same Ensemble LLM, this fetcher
10/// ensures only one actual fetch is performed. Subsequent requests for the
11/// same Ensemble LLM ID within the same request context share the result.
12#[derive(Debug, Clone)]
13pub struct CachingFetcher<CTXEXT, FENSLLM> {
14 /// The underlying fetcher to delegate to on cache miss.
15 pub inner: Arc<FENSLLM>,
16 _marker: std::marker::PhantomData<CTXEXT>,
17}
18
19impl<CTXEXT, FENSLLM> CachingFetcher<CTXEXT, FENSLLM> {
20 /// Creates a new caching fetcher wrapping the given inner fetcher.
21 pub fn new(inner: Arc<FENSLLM>) -> Self {
22 Self {
23 inner,
24 _marker: std::marker::PhantomData,
25 }
26 }
27}
28
29impl<CTXEXT, FENSLLM> CachingFetcher<CTXEXT, FENSLLM>
30where
31 CTXEXT: Send + Sync + 'static,
32 FENSLLM: super::Fetcher<CTXEXT> + Send + Sync + 'static,
33{
34 /// Spawns concurrent fetch tasks for multiple Ensemble LLM IDs.
35 ///
36 /// This allows pre-warming the cache when the set of required IDs is known
37 /// ahead of time, reducing latency by parallelizing the fetches.
38 pub fn spawn_fetches<'id>(
39 &self,
40 ctx: ctx::Context<CTXEXT>,
41 ids: impl Iterator<Item = &'id str>,
42 ) {
43 for id in ids {
44 ctx.ensemble_llm_cache
45 .entry(id.to_owned())
46 .or_insert_with(|| {
47 let (tx, rx) = tokio::sync::oneshot::channel();
48 let inner = self.inner.clone();
49 let id = id.to_owned();
50 let ctx = ctx.clone();
51 tokio::spawn(async move {
52 let result = inner.fetch(ctx, &id).await;
53 let _ = tx.send(result);
54 });
55 rx.shared()
56 });
57 }
58 }
59
60 /// Fetches an Ensemble LLM, using the request-scoped cache for deduplication.
61 ///
62 /// If another fetch for the same ID is already in progress within this
63 /// request context, waits for and shares that result instead of fetching again.
64 pub async fn fetch(
65 &self,
66 ctx: ctx::Context<CTXEXT>,
67 id: &str,
68 ) -> Result<
69 Option<(objectiveai::ensemble_llm::EnsembleLlm, u64)>,
70 objectiveai::error::ResponseError,
71 > {
72 // Clone the shared future while holding the lock, then release the lock before awaiting.
73 // This prevents deadlocks when multiple concurrent fetches hash to the same DashMap shard.
74 let shared = ctx
75 .ensemble_llm_cache
76 .entry(id.to_owned())
77 .or_insert_with(|| {
78 let (tx, rx) = tokio::sync::oneshot::channel();
79 let inner = self.inner.clone();
80 let id = id.to_owned();
81 let ctx = ctx.clone();
82 tokio::spawn(async move {
83 let result = inner.fetch(ctx, &id).await;
84 let _ = tx.send(result);
85 });
86 rx.shared()
87 })
88 .clone();
89 // Lock is now released, safe to await
90 shared.await.unwrap()
91 }
92}