Skip to main content

net/adapter/net/behavior/aggregator/
query_client.rs

1//! `FoldQueryClient` — client-side helper that wraps
2//! [`MeshNode::call`] with typed `fold.query` serialization and
3//! a per-`(target, service, kind)` TTL cache.
4//!
5//! Phase C slice 2 of `SCALING_SUBNET_SPEC.md`. The caching layer
6//! is the operator-facing contract the plan calls out:
7//!
8//! > **Caching:** the RPC client caches recent query results with
9//! > a short TTL (configurable, default 5s). Repeated queries for
10//! > the same data don't re-hit the aggregator.
11
12use std::borrow::Cow;
13use std::collections::HashMap;
14use std::sync::Arc;
15use std::time::{Duration, Instant};
16
17use parking_lot::RwLock;
18
19use super::query_service::{
20    FoldQueryError, FoldQueryOp, FoldQueryRequest, FoldQueryResponse, FOLD_QUERY_SERVICE,
21};
22use super::summarizer::SummaryAnnouncement;
23use crate::adapter::net::mesh_rpc::{typed_call, RpcError, TypedCallError};
24use crate::adapter::net::MeshNode;
25
26/// Default cache TTL — the plan's locked value.
27pub const DEFAULT_QUERY_CACHE_TTL: Duration = Duration::from_secs(5);
28
29/// Default RPC call deadline. Wraps `MeshNode::call`'s
30/// `CallOptions::deadline` with a sensible operator-tooling
31/// default; long enough to absorb cross-subnet latency, short
32/// enough that a wedged aggregator surfaces quickly.
33pub const DEFAULT_QUERY_DEADLINE: Duration = Duration::from_secs(3);
34
35/// Client-side errors the typed surface produces. Distinct from
36/// `RpcError` (transport) and `FoldQueryError` (handler-level)
37/// so the caller can match on the failure shape they care about.
38#[derive(Debug, thiserror::Error)]
39pub enum FoldQueryClientError {
40    /// Transport-level failure — no route, timeout, server
41    /// returned a non-Ok status before invoking the handler.
42    #[error("transport: {0}")]
43    Transport(RpcError),
44    /// Request serialization or response deserialization failed.
45    #[error("codec: {0}")]
46    Codec(String),
47    /// Aggregator handler rejected the request (e.g. unknown
48    /// fold kind). Forwarded from
49    /// [`super::FoldQueryResponse::Error`].
50    #[error("server: {0:?}")]
51    Server(FoldQueryError),
52}
53
54impl From<RpcError> for FoldQueryClientError {
55    fn from(e: RpcError) -> Self {
56        Self::Transport(e)
57    }
58}
59
60impl From<TypedCallError> for FoldQueryClientError {
61    fn from(e: TypedCallError) -> Self {
62        match e {
63            TypedCallError::Transport(t) => Self::Transport(t),
64            TypedCallError::Codec(c) => Self::Codec(c),
65        }
66    }
67}
68
69#[derive(Clone, Eq, PartialEq, Hash)]
70struct CacheKey {
71    target: u64,
72    /// `Cow` so the hot path (`query_latest` against the default
73    /// service name) avoids an allocation per lookup. Callers
74    /// hitting [`FoldQueryClient::query_with_service`] with a
75    /// non-static name pay the allocation once per call.
76    service: Cow<'static, str>,
77    kind: u16,
78}
79
80struct CacheEntry {
81    summaries: Vec<SummaryAnnouncement>,
82    fetched_at: Instant,
83}
84
85/// Typed `fold.query` client. Cheap to clone (just clones the
86/// `Arc`s); operator tooling typically constructs one per
87/// process and shares it.
88#[derive(Clone)]
89pub struct FoldQueryClient {
90    mesh: Arc<MeshNode>,
91    cache: Arc<RwLock<HashMap<CacheKey, CacheEntry>>>,
92    ttl: Duration,
93    deadline: Duration,
94}
95
96impl FoldQueryClient {
97    /// Build a client backed by `mesh` with the default TTL +
98    /// deadline. Callers wanting non-defaults use
99    /// [`Self::with_ttl`] / [`Self::with_deadline`].
100    pub fn new(mesh: Arc<MeshNode>) -> Self {
101        Self {
102            mesh,
103            cache: Arc::new(RwLock::new(HashMap::new())),
104            ttl: DEFAULT_QUERY_CACHE_TTL,
105            deadline: DEFAULT_QUERY_DEADLINE,
106        }
107    }
108
109    /// Override the cache TTL. `Duration::ZERO` disables the
110    /// cache entirely (every call hits the wire).
111    pub fn with_ttl(mut self, ttl: Duration) -> Self {
112        self.ttl = ttl;
113        self
114    }
115
116    /// Override the per-call deadline.
117    pub fn with_deadline(mut self, deadline: Duration) -> Self {
118        self.deadline = deadline;
119        self
120    }
121
122    /// Override the cache TTL in place. Used by the FFI wrapper
123    /// so adjusting TTL doesn't drop the warmed cache (a fresh
124    /// `with_ttl(self)` would clone all `Arc`s except the cache
125    /// is held by `&mut self` — the in-place mutation preserves
126    /// the inner `Arc<RwLock<HashMap<...>>>`).
127    pub fn set_ttl_mut(&mut self, ttl: Duration) {
128        self.ttl = ttl;
129    }
130
131    /// Override the per-call deadline in place. Same rationale as
132    /// [`Self::set_ttl_mut`] — preserves the cache state across
133    /// FFI-side deadline adjustments.
134    pub fn set_deadline_mut(&mut self, deadline: Duration) {
135        self.deadline = deadline;
136    }
137
138    /// Query the aggregator for its latest cached summaries.
139    /// Cache hit → return immediately; miss → issue RPC, cache
140    /// the result, return.
141    ///
142    /// `target_node_id` is the aggregator replica to query;
143    /// operator tooling typically finds it via the capability
144    /// index (`role:aggregator` tag) or the existing
145    /// `MeshNode::find_*` helpers.
146    pub async fn query_latest(
147        &self,
148        target_node_id: u64,
149        kind: u16,
150    ) -> Result<Vec<SummaryAnnouncement>, FoldQueryClientError> {
151        self.do_query(target_node_id, Cow::Borrowed(FOLD_QUERY_SERVICE), kind)
152            .await
153    }
154
155    /// Same as [`Self::query_latest`] but with a caller-supplied
156    /// service name. Useful when a node runs multiple
157    /// aggregators registered under distinct service names.
158    pub async fn query_with_service(
159        &self,
160        target_node_id: u64,
161        service: &str,
162        kind: u16,
163    ) -> Result<Vec<SummaryAnnouncement>, FoldQueryClientError> {
164        self.do_query(target_node_id, Cow::Owned(service.to_string()), kind)
165            .await
166    }
167
168    async fn do_query(
169        &self,
170        target_node_id: u64,
171        service: Cow<'static, str>,
172        kind: u16,
173    ) -> Result<Vec<SummaryAnnouncement>, FoldQueryClientError> {
174        let key = CacheKey {
175            target: target_node_id,
176            service,
177            kind,
178        };
179        if !self.ttl.is_zero() {
180            if let Some(entry) = self.cache.read().get(&key) {
181                if entry.fetched_at.elapsed() < self.ttl {
182                    return Ok(entry.summaries.clone());
183                }
184            }
185        }
186        let summaries = self
187            .issue_call(
188                target_node_id,
189                &key.service,
190                kind,
191                FoldQueryOp::LatestSummary,
192            )
193            .await?;
194        if !self.ttl.is_zero() {
195            let mut cache = self.cache.write();
196            let ttl = self.ttl;
197            // Opportunistic eviction: every cache miss is already
198            // paying for a wire round-trip, so an O(n) sweep of
199            // expired entries here is cheap relative to the work
200            // we're about to do — and it bounds the cache size
201            // for long-running operator tooling.
202            cache.retain(|_, e| e.fetched_at.elapsed() < ttl);
203            cache.insert(
204                key,
205                CacheEntry {
206                    summaries: summaries.clone(),
207                    fetched_at: Instant::now(),
208                },
209            );
210        }
211        Ok(summaries)
212    }
213
214    /// Issue a `SummarizeNow` query — never cached; always
215    /// hits the wire. Use when the staleness tolerance is
216    /// tighter than `summary_interval`.
217    pub async fn query_summarize_now(
218        &self,
219        target_node_id: u64,
220        kind: u16,
221    ) -> Result<Vec<SummaryAnnouncement>, FoldQueryClientError> {
222        self.issue_call(
223            target_node_id,
224            FOLD_QUERY_SERVICE,
225            kind,
226            FoldQueryOp::SummarizeNow,
227        )
228        .await
229    }
230
231    /// Drop every cached entry. Operator tooling calls this after
232    /// a topology change (e.g. a placement migration) so the next
233    /// query re-resolves against the new aggregator replica.
234    pub fn invalidate_cache(&self) {
235        self.cache.write().clear();
236    }
237
238    /// Drop just the entries matching `target_node_id`. Used when
239    /// a single replica is known stale but the rest of the cache
240    /// is still warm.
241    pub fn invalidate_target(&self, target_node_id: u64) {
242        let mut cache = self.cache.write();
243        cache.retain(|k, _| k.target != target_node_id);
244    }
245
246    async fn issue_call(
247        &self,
248        target_node_id: u64,
249        service: &str,
250        kind: u16,
251        op: FoldQueryOp,
252    ) -> Result<Vec<SummaryAnnouncement>, FoldQueryClientError> {
253        let request = FoldQueryRequest { kind, op };
254        let response: FoldQueryResponse =
255            typed_call(&self.mesh, target_node_id, service, &request, self.deadline).await?;
256        match response {
257            FoldQueryResponse::Summaries { summaries, .. } => Ok(summaries),
258            FoldQueryResponse::Error(e) => Err(FoldQueryClientError::Server(e)),
259        }
260    }
261}
262
263#[cfg(test)]
264mod tests {
265    use super::*;
266    use crate::adapter::net::behavior::fold::capability::CapabilityFold;
267    use crate::adapter::net::behavior::fold::FoldKind;
268    use crate::adapter::net::identity::EntityKeypair;
269    use crate::adapter::net::{MeshNodeConfig, SubnetId};
270    use std::net::SocketAddr;
271
272    async fn build_mesh() -> Arc<MeshNode> {
273        let addr: SocketAddr = "127.0.0.1:0".parse().unwrap();
274        let cfg = MeshNodeConfig::new(addr, [0x17u8; 32]);
275        Arc::new(
276            MeshNode::new(EntityKeypair::generate(), cfg)
277                .await
278                .expect("MeshNode::new"),
279        )
280    }
281
282    #[tokio::test]
283    async fn new_carries_default_ttl_and_deadline() {
284        let mesh = build_mesh().await;
285        let client = FoldQueryClient::new(mesh);
286        assert_eq!(client.ttl, DEFAULT_QUERY_CACHE_TTL);
287        assert_eq!(client.deadline, DEFAULT_QUERY_DEADLINE);
288    }
289
290    #[tokio::test]
291    async fn with_ttl_zero_disables_cache() {
292        let mesh = build_mesh().await;
293        let client = FoldQueryClient::new(mesh).with_ttl(Duration::ZERO);
294        assert_eq!(client.ttl, Duration::ZERO);
295    }
296
297    #[tokio::test]
298    async fn invalidate_cache_clears_every_entry() {
299        let mesh = build_mesh().await;
300        let client = FoldQueryClient::new(mesh);
301        // Prime the cache directly — bypass the wire (we're not
302        // testing transport here).
303        let key = CacheKey {
304            target: 0xAAAA,
305            service: Cow::Borrowed(FOLD_QUERY_SERVICE),
306            kind: CapabilityFold::KIND_ID,
307        };
308        client.cache.write().insert(
309            key.clone(),
310            CacheEntry {
311                summaries: vec![SummaryAnnouncement {
312                    source_subnet: SubnetId::GLOBAL,
313                    fold_kind: CapabilityFold::KIND_ID,
314                    generation: 1,
315                    buckets: vec![("idle".to_string(), 1)],
316                }],
317                fetched_at: Instant::now(),
318            },
319        );
320        assert_eq!(client.cache.read().len(), 1);
321        client.invalidate_cache();
322        assert_eq!(client.cache.read().len(), 0);
323    }
324
325    #[tokio::test]
326    async fn invalidate_target_drops_only_matching_entries() {
327        let mesh = build_mesh().await;
328        let client = FoldQueryClient::new(mesh);
329        let now = Instant::now();
330        for target in [0xAAAA_u64, 0xBBBB, 0xCCCC] {
331            client.cache.write().insert(
332                CacheKey {
333                    target,
334                    service: Cow::Borrowed(FOLD_QUERY_SERVICE),
335                    kind: CapabilityFold::KIND_ID,
336                },
337                CacheEntry {
338                    summaries: Vec::new(),
339                    fetched_at: now,
340                },
341            );
342        }
343        assert_eq!(client.cache.read().len(), 3);
344        client.invalidate_target(0xBBBB);
345        let remaining: Vec<u64> = client.cache.read().keys().map(|k| k.target).collect();
346        assert!(remaining.contains(&0xAAAA));
347        assert!(remaining.contains(&0xCCCC));
348        assert!(!remaining.contains(&0xBBBB));
349        assert_eq!(remaining.len(), 2);
350    }
351
352    #[tokio::test]
353    async fn cache_hit_returns_without_hitting_wire() {
354        // Pin the cache-fast-path: priming the cache and querying
355        // for the same `(target, service, kind)` returns the
356        // primed entry without ever calling `mesh.call`. Validates
357        // the cache contract without a live nRPC harness — the
358        // mesh handle would be needed to issue a real call, but
359        // the cache layer short-circuits first.
360        let mesh = build_mesh().await;
361        let client = FoldQueryClient::new(mesh.clone()).with_ttl(Duration::from_secs(60));
362        let target = 0xDEAD_u64;
363        let kind = CapabilityFold::KIND_ID;
364        let cached = SummaryAnnouncement {
365            source_subnet: SubnetId::new(&[3]),
366            fold_kind: kind,
367            generation: 7,
368            buckets: vec![("idle".to_string(), 4)],
369        };
370        client.cache.write().insert(
371            CacheKey {
372                target,
373                service: Cow::Borrowed(FOLD_QUERY_SERVICE),
374                kind,
375            },
376            CacheEntry {
377                summaries: vec![cached.clone()],
378                fetched_at: Instant::now(),
379            },
380        );
381        let result = client.query_latest(target, kind).await.expect("cache hit");
382        assert_eq!(result, vec![cached]);
383    }
384
385    #[tokio::test]
386    async fn opportunistic_eviction_drops_expired_entries_on_next_miss() {
387        // A short TTL plus a forced cache miss (different target
388        // id) must opportunistically prune the expired entry, so
389        // long-running tooling doesn't accumulate dead entries.
390        let mesh = build_mesh().await;
391        let client = FoldQueryClient::new(mesh).with_ttl(Duration::from_millis(20));
392        let stale_key = CacheKey {
393            target: 0xAAAA,
394            service: Cow::Borrowed(FOLD_QUERY_SERVICE),
395            kind: CapabilityFold::KIND_ID,
396        };
397        client.cache.write().insert(
398            stale_key.clone(),
399            CacheEntry {
400                summaries: Vec::new(),
401                fetched_at: Instant::now() - Duration::from_secs(1),
402            },
403        );
404        assert_eq!(client.cache.read().len(), 1);
405
406        // Issue a query against a different target. The wire call
407        // will fail (no peer), but the eviction sweep runs only
408        // after a successful call — exercise the eviction code
409        // path by calling it directly.
410        let ttl = client.ttl;
411        let mut cache = client.cache.write();
412        cache.retain(|_, e| e.fetched_at.elapsed() < ttl);
413        drop(cache);
414        assert_eq!(client.cache.read().len(), 0, "expired entry must be pruned");
415    }
416}