net/adapter/net/behavior/aggregator/
query_client.rs1use std::borrow::Cow;
13use std::collections::HashMap;
14use std::sync::Arc;
15use std::time::{Duration, Instant};
16
17use parking_lot::RwLock;
18
19use super::query_service::{
20 FoldQueryError, FoldQueryOp, FoldQueryRequest, FoldQueryResponse, FOLD_QUERY_SERVICE,
21};
22use super::summarizer::SummaryAnnouncement;
23use crate::adapter::net::mesh_rpc::{typed_call, RpcError, TypedCallError};
24use crate::adapter::net::MeshNode;
25
26pub const DEFAULT_QUERY_CACHE_TTL: Duration = Duration::from_secs(5);
28
29pub const DEFAULT_QUERY_DEADLINE: Duration = Duration::from_secs(3);
34
35#[derive(Debug, thiserror::Error)]
39pub enum FoldQueryClientError {
40 #[error("transport: {0}")]
43 Transport(RpcError),
44 #[error("codec: {0}")]
46 Codec(String),
47 #[error("server: {0:?}")]
51 Server(FoldQueryError),
52}
53
54impl From<RpcError> for FoldQueryClientError {
55 fn from(e: RpcError) -> Self {
56 Self::Transport(e)
57 }
58}
59
60impl From<TypedCallError> for FoldQueryClientError {
61 fn from(e: TypedCallError) -> Self {
62 match e {
63 TypedCallError::Transport(t) => Self::Transport(t),
64 TypedCallError::Codec(c) => Self::Codec(c),
65 }
66 }
67}
68
69#[derive(Clone, Eq, PartialEq, Hash)]
70struct CacheKey {
71 target: u64,
72 service: Cow<'static, str>,
77 kind: u16,
78}
79
80struct CacheEntry {
81 summaries: Vec<SummaryAnnouncement>,
82 fetched_at: Instant,
83}
84
85#[derive(Clone)]
89pub struct FoldQueryClient {
90 mesh: Arc<MeshNode>,
91 cache: Arc<RwLock<HashMap<CacheKey, CacheEntry>>>,
92 ttl: Duration,
93 deadline: Duration,
94}
95
96impl FoldQueryClient {
97 pub fn new(mesh: Arc<MeshNode>) -> Self {
101 Self {
102 mesh,
103 cache: Arc::new(RwLock::new(HashMap::new())),
104 ttl: DEFAULT_QUERY_CACHE_TTL,
105 deadline: DEFAULT_QUERY_DEADLINE,
106 }
107 }
108
109 pub fn with_ttl(mut self, ttl: Duration) -> Self {
112 self.ttl = ttl;
113 self
114 }
115
116 pub fn with_deadline(mut self, deadline: Duration) -> Self {
118 self.deadline = deadline;
119 self
120 }
121
122 pub fn set_ttl_mut(&mut self, ttl: Duration) {
128 self.ttl = ttl;
129 }
130
131 pub fn set_deadline_mut(&mut self, deadline: Duration) {
135 self.deadline = deadline;
136 }
137
138 pub async fn query_latest(
147 &self,
148 target_node_id: u64,
149 kind: u16,
150 ) -> Result<Vec<SummaryAnnouncement>, FoldQueryClientError> {
151 self.do_query(target_node_id, Cow::Borrowed(FOLD_QUERY_SERVICE), kind)
152 .await
153 }
154
155 pub async fn query_with_service(
159 &self,
160 target_node_id: u64,
161 service: &str,
162 kind: u16,
163 ) -> Result<Vec<SummaryAnnouncement>, FoldQueryClientError> {
164 self.do_query(target_node_id, Cow::Owned(service.to_string()), kind)
165 .await
166 }
167
168 async fn do_query(
169 &self,
170 target_node_id: u64,
171 service: Cow<'static, str>,
172 kind: u16,
173 ) -> Result<Vec<SummaryAnnouncement>, FoldQueryClientError> {
174 let key = CacheKey {
175 target: target_node_id,
176 service,
177 kind,
178 };
179 if !self.ttl.is_zero() {
180 if let Some(entry) = self.cache.read().get(&key) {
181 if entry.fetched_at.elapsed() < self.ttl {
182 return Ok(entry.summaries.clone());
183 }
184 }
185 }
186 let summaries = self
187 .issue_call(
188 target_node_id,
189 &key.service,
190 kind,
191 FoldQueryOp::LatestSummary,
192 )
193 .await?;
194 if !self.ttl.is_zero() {
195 let mut cache = self.cache.write();
196 let ttl = self.ttl;
197 cache.retain(|_, e| e.fetched_at.elapsed() < ttl);
203 cache.insert(
204 key,
205 CacheEntry {
206 summaries: summaries.clone(),
207 fetched_at: Instant::now(),
208 },
209 );
210 }
211 Ok(summaries)
212 }
213
214 pub async fn query_summarize_now(
218 &self,
219 target_node_id: u64,
220 kind: u16,
221 ) -> Result<Vec<SummaryAnnouncement>, FoldQueryClientError> {
222 self.issue_call(
223 target_node_id,
224 FOLD_QUERY_SERVICE,
225 kind,
226 FoldQueryOp::SummarizeNow,
227 )
228 .await
229 }
230
231 pub fn invalidate_cache(&self) {
235 self.cache.write().clear();
236 }
237
238 pub fn invalidate_target(&self, target_node_id: u64) {
242 let mut cache = self.cache.write();
243 cache.retain(|k, _| k.target != target_node_id);
244 }
245
246 async fn issue_call(
247 &self,
248 target_node_id: u64,
249 service: &str,
250 kind: u16,
251 op: FoldQueryOp,
252 ) -> Result<Vec<SummaryAnnouncement>, FoldQueryClientError> {
253 let request = FoldQueryRequest { kind, op };
254 let response: FoldQueryResponse =
255 typed_call(&self.mesh, target_node_id, service, &request, self.deadline).await?;
256 match response {
257 FoldQueryResponse::Summaries { summaries, .. } => Ok(summaries),
258 FoldQueryResponse::Error(e) => Err(FoldQueryClientError::Server(e)),
259 }
260 }
261}
262
263#[cfg(test)]
264mod tests {
265 use super::*;
266 use crate::adapter::net::behavior::fold::capability::CapabilityFold;
267 use crate::adapter::net::behavior::fold::FoldKind;
268 use crate::adapter::net::identity::EntityKeypair;
269 use crate::adapter::net::{MeshNodeConfig, SubnetId};
270 use std::net::SocketAddr;
271
272 async fn build_mesh() -> Arc<MeshNode> {
273 let addr: SocketAddr = "127.0.0.1:0".parse().unwrap();
274 let cfg = MeshNodeConfig::new(addr, [0x17u8; 32]);
275 Arc::new(
276 MeshNode::new(EntityKeypair::generate(), cfg)
277 .await
278 .expect("MeshNode::new"),
279 )
280 }
281
282 #[tokio::test]
283 async fn new_carries_default_ttl_and_deadline() {
284 let mesh = build_mesh().await;
285 let client = FoldQueryClient::new(mesh);
286 assert_eq!(client.ttl, DEFAULT_QUERY_CACHE_TTL);
287 assert_eq!(client.deadline, DEFAULT_QUERY_DEADLINE);
288 }
289
290 #[tokio::test]
291 async fn with_ttl_zero_disables_cache() {
292 let mesh = build_mesh().await;
293 let client = FoldQueryClient::new(mesh).with_ttl(Duration::ZERO);
294 assert_eq!(client.ttl, Duration::ZERO);
295 }
296
297 #[tokio::test]
298 async fn invalidate_cache_clears_every_entry() {
299 let mesh = build_mesh().await;
300 let client = FoldQueryClient::new(mesh);
301 let key = CacheKey {
304 target: 0xAAAA,
305 service: Cow::Borrowed(FOLD_QUERY_SERVICE),
306 kind: CapabilityFold::KIND_ID,
307 };
308 client.cache.write().insert(
309 key.clone(),
310 CacheEntry {
311 summaries: vec![SummaryAnnouncement {
312 source_subnet: SubnetId::GLOBAL,
313 fold_kind: CapabilityFold::KIND_ID,
314 generation: 1,
315 buckets: vec![("idle".to_string(), 1)],
316 }],
317 fetched_at: Instant::now(),
318 },
319 );
320 assert_eq!(client.cache.read().len(), 1);
321 client.invalidate_cache();
322 assert_eq!(client.cache.read().len(), 0);
323 }
324
325 #[tokio::test]
326 async fn invalidate_target_drops_only_matching_entries() {
327 let mesh = build_mesh().await;
328 let client = FoldQueryClient::new(mesh);
329 let now = Instant::now();
330 for target in [0xAAAA_u64, 0xBBBB, 0xCCCC] {
331 client.cache.write().insert(
332 CacheKey {
333 target,
334 service: Cow::Borrowed(FOLD_QUERY_SERVICE),
335 kind: CapabilityFold::KIND_ID,
336 },
337 CacheEntry {
338 summaries: Vec::new(),
339 fetched_at: now,
340 },
341 );
342 }
343 assert_eq!(client.cache.read().len(), 3);
344 client.invalidate_target(0xBBBB);
345 let remaining: Vec<u64> = client.cache.read().keys().map(|k| k.target).collect();
346 assert!(remaining.contains(&0xAAAA));
347 assert!(remaining.contains(&0xCCCC));
348 assert!(!remaining.contains(&0xBBBB));
349 assert_eq!(remaining.len(), 2);
350 }
351
352 #[tokio::test]
353 async fn cache_hit_returns_without_hitting_wire() {
354 let mesh = build_mesh().await;
361 let client = FoldQueryClient::new(mesh.clone()).with_ttl(Duration::from_secs(60));
362 let target = 0xDEAD_u64;
363 let kind = CapabilityFold::KIND_ID;
364 let cached = SummaryAnnouncement {
365 source_subnet: SubnetId::new(&[3]),
366 fold_kind: kind,
367 generation: 7,
368 buckets: vec![("idle".to_string(), 4)],
369 };
370 client.cache.write().insert(
371 CacheKey {
372 target,
373 service: Cow::Borrowed(FOLD_QUERY_SERVICE),
374 kind,
375 },
376 CacheEntry {
377 summaries: vec![cached.clone()],
378 fetched_at: Instant::now(),
379 },
380 );
381 let result = client.query_latest(target, kind).await.expect("cache hit");
382 assert_eq!(result, vec![cached]);
383 }
384
385 #[tokio::test]
386 async fn opportunistic_eviction_drops_expired_entries_on_next_miss() {
387 let mesh = build_mesh().await;
391 let client = FoldQueryClient::new(mesh).with_ttl(Duration::from_millis(20));
392 let stale_key = CacheKey {
393 target: 0xAAAA,
394 service: Cow::Borrowed(FOLD_QUERY_SERVICE),
395 kind: CapabilityFold::KIND_ID,
396 };
397 client.cache.write().insert(
398 stale_key.clone(),
399 CacheEntry {
400 summaries: Vec::new(),
401 fetched_at: Instant::now() - Duration::from_secs(1),
402 },
403 );
404 assert_eq!(client.cache.read().len(), 1);
405
406 let ttl = client.ttl;
411 let mut cache = client.cache.write();
412 cache.retain(|_, e| e.fetched_at.elapsed() < ttl);
413 drop(cache);
414 assert_eq!(client.cache.read().len(), 0, "expired entry must be pruned");
415 }
416}