1pub(crate) mod agent_config;
3pub mod agent_error;
4pub(crate) mod builder;
5#[doc(hidden)]
7#[deprecated(since = "0.38.0", note = "use the AgentBuilder methods")]
8pub mod http_transport;
9pub(crate) mod nonce;
10pub(crate) mod response_authentication;
11pub mod route_provider;
12pub mod status;
13pub mod subnet;
14
15pub use agent_config::AgentConfig;
16pub use agent_error::AgentError;
17use agent_error::{HttpErrorPayload, Operation};
18use async_lock::Semaphore;
19use async_trait::async_trait;
20pub use builder::AgentBuilder;
21use bytes::Bytes;
22use cached::{Cached, TimedCache};
23use http::{header::CONTENT_TYPE, HeaderMap, Method, StatusCode, Uri};
24use ic_ed25519::{PublicKey, SignatureError};
25#[doc(inline)]
26pub use ic_transport_types::{
27 signed, CallResponse, Envelope, EnvelopeContent, RejectCode, RejectResponse, ReplyResponse,
28 RequestStatusResponse,
29};
30pub use nonce::{NonceFactory, NonceGenerator};
31use rangemap::{RangeInclusiveMap, StepFns};
32use reqwest::{Client, Request, Response};
33use route_provider::{
34 dynamic_routing::{dynamic_route_provider::DynamicRouteProviderBuilder, node::Node},
35 RouteProvider, UrlUntilReady,
36};
37pub use subnet::Subnet;
38use time::OffsetDateTime;
39use tower_service::Service;
40
41#[cfg(test)]
42mod agent_test;
43
44use crate::{
45 agent::response_authentication::{
46 extract_der, lookup_canister_info, lookup_canister_metadata, lookup_canister_ranges,
47 lookup_incomplete_subnet, lookup_request_status, lookup_subnet_and_ranges,
48 lookup_subnet_canister_ranges, lookup_subnet_metrics, lookup_time, lookup_tree,
49 lookup_value,
50 },
51 agent_error::TransportError,
52 export::Principal,
53 identity::Identity,
54 to_request_id, RequestId,
55};
56use backoff::{backoff::Backoff, ExponentialBackoffBuilder};
57use backoff::{exponential::ExponentialBackoff, SystemClock};
58use ic_certification::{Certificate, Delegation, Label};
59use ic_transport_types::{
60 signed::{SignedQuery, SignedRequestStatus, SignedUpdate},
61 QueryResponse, ReadStateResponse, SubnetMetrics, TransportCallResponse,
62};
63use serde::Serialize;
64use status::Status;
65use std::{
66 borrow::Cow,
67 convert::TryFrom,
68 fmt::{self, Debug},
69 future::{Future, IntoFuture},
70 pin::Pin,
71 str::FromStr,
72 sync::{Arc, Mutex, RwLock},
73 task::{Context, Poll},
74 time::Duration,
75};
76
77use crate::agent::response_authentication::lookup_api_boundary_nodes;
78
79const IC_STATE_ROOT_DOMAIN_SEPARATOR: &[u8; 14] = b"\x0Dic-state-root";
80
81const IC_ROOT_KEY: &[u8; 133] = b"\x30\x81\x82\x30\x1d\x06\x0d\x2b\x06\x01\x04\x01\x82\xdc\x7c\x05\x03\x01\x02\x01\x06\x0c\x2b\x06\x01\x04\x01\x82\xdc\x7c\x05\x03\x02\x01\x03\x61\x00\x81\x4c\x0e\x6e\xc7\x1f\xab\x58\x3b\x08\xbd\x81\x37\x3c\x25\x5c\x3c\x37\x1b\x2e\x84\x86\x3c\x98\xa4\xf1\xe0\x8b\x74\x23\x5d\x14\xfb\x5d\x9c\x0c\xd5\x46\xd9\x68\x5f\x91\x3a\x0c\x0b\x2c\xc5\x34\x15\x83\xbf\x4b\x43\x92\xe4\x67\xdb\x96\xd6\x5b\x9b\xb4\xcb\x71\x71\x12\xf8\x47\x2e\x0d\x5a\x4d\x14\x50\x5f\xfd\x74\x84\xb0\x12\x91\x09\x1c\x5f\x87\xb9\x88\x83\x46\x3f\x98\x09\x1a\x0b\xaa\xae";
82
83#[cfg(not(target_family = "wasm"))]
84type AgentFuture<'a, V> = Pin<Box<dyn Future<Output = Result<V, AgentError>> + Send + 'a>>;
85
86#[cfg(target_family = "wasm")]
87type AgentFuture<'a, V> = Pin<Box<dyn Future<Output = Result<V, AgentError>> + 'a>>;
88
89#[cfg_attr(unix, doc = " ```rust")] #[cfg_attr(not(unix), doc = " ```ignore")]
93#[derive(Clone)]
154pub struct Agent {
155 nonce_factory: Arc<dyn NonceGenerator>,
156 identity: Arc<dyn Identity>,
157 ingress_expiry: Duration,
158 root_key: Arc<RwLock<Vec<u8>>>,
159 client: Arc<dyn HttpService>,
160 route_provider: Arc<dyn RouteProvider>,
161 subnet_key_cache: Arc<Mutex<SubnetCache>>,
162 concurrent_requests_semaphore: Arc<Semaphore>,
163 verify_query_signatures: bool,
164 max_response_body_size: Option<usize>,
165 max_polling_time: Duration,
166 #[allow(dead_code)]
167 max_tcp_error_retries: usize,
168}
169
170impl fmt::Debug for Agent {
171 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
172 f.debug_struct("Agent")
173 .field("ingress_expiry", &self.ingress_expiry)
174 .finish_non_exhaustive()
175 }
176}
177
178impl Agent {
179 pub fn builder() -> builder::AgentBuilder {
182 Default::default()
183 }
184
185 pub fn new(config: agent_config::AgentConfig) -> Result<Agent, AgentError> {
187 let client = config.http_service.unwrap_or_else(|| {
188 Arc::new(Retry429Logic {
189 client: config.client.unwrap_or_else(|| {
190 #[cfg(not(target_family = "wasm"))]
191 {
192 Client::builder()
193 .use_rustls_tls()
194 .timeout(Duration::from_secs(360))
195 .build()
196 .expect("Could not create HTTP client.")
197 }
198 #[cfg(all(target_family = "wasm", feature = "wasm-bindgen"))]
199 {
200 Client::new()
201 }
202 }),
203 })
204 });
205 Ok(Agent {
206 nonce_factory: config.nonce_factory,
207 identity: config.identity,
208 ingress_expiry: config.ingress_expiry,
209 root_key: Arc::new(RwLock::new(IC_ROOT_KEY.to_vec())),
210 client: client.clone(),
211 route_provider: if let Some(route_provider) = config.route_provider {
212 route_provider
213 } else if let Some(url) = config.url {
214 if config.background_dynamic_routing {
215 assert!(
216 url.scheme() == "https" && url.path() == "/" && url.port().is_none() && url.domain().is_some(),
217 "in dynamic routing mode, URL must be in the exact form https://domain with no path, port, IP, or non-HTTPS scheme"
218 );
219 let seeds = vec![Node::new(url.domain().unwrap()).unwrap()];
220 UrlUntilReady::new(url, async move {
221 let provider = DynamicRouteProviderBuilder::new(seeds, client).build();
222 provider.start().await;
223 provider
224 }) as Arc<dyn RouteProvider>
225 } else {
226 Arc::new(url)
227 }
228 } else {
229 panic!("either route_provider or url must be specified");
230 },
231 subnet_key_cache: Arc::new(Mutex::new(SubnetCache::new())),
232 verify_query_signatures: config.verify_query_signatures,
233 concurrent_requests_semaphore: Arc::new(Semaphore::new(config.max_concurrent_requests)),
234 max_response_body_size: config.max_response_body_size,
235 max_tcp_error_retries: config.max_tcp_error_retries,
236 max_polling_time: config.max_polling_time,
237 })
238 }
239
240 pub fn set_identity<I>(&mut self, identity: I)
246 where
247 I: 'static + Identity,
248 {
249 self.identity = Arc::new(identity);
250 }
251 pub fn set_arc_identity(&mut self, identity: Arc<dyn Identity>) {
257 self.identity = identity;
258 }
259
260 pub async fn fetch_root_key(&self) -> Result<(), AgentError> {
269 if self.read_root_key()[..] != IC_ROOT_KEY[..] {
270 return Ok(());
272 }
273 let status = self.status().await?;
274 let Some(root_key) = status.root_key else {
275 return Err(AgentError::NoRootKeyInStatus(status));
276 };
277 self.set_root_key(root_key);
278 Ok(())
279 }
280
281 pub fn set_root_key(&self, root_key: Vec<u8>) {
286 *self.root_key.write().unwrap() = root_key;
287 }
288
289 pub fn read_root_key(&self) -> Vec<u8> {
291 self.root_key.read().unwrap().clone()
292 }
293
294 fn get_expiry_date(&self) -> u64 {
295 let expiry_raw = OffsetDateTime::now_utc() + self.ingress_expiry;
296 let mut rounded = expiry_raw.replace_nanosecond(0).unwrap();
297 if self.ingress_expiry.as_secs() > 90 {
298 rounded = rounded.replace_second(0).unwrap();
299 }
300 rounded.unix_timestamp_nanos().try_into().unwrap()
301 }
302
303 pub fn get_principal(&self) -> Result<Principal, String> {
305 self.identity.sender()
306 }
307
308 async fn query_endpoint<A>(
309 &self,
310 effective_canister_id: Principal,
311 serialized_bytes: Vec<u8>,
312 ) -> Result<A, AgentError>
313 where
314 A: serde::de::DeserializeOwned,
315 {
316 let _permit = self.concurrent_requests_semaphore.acquire().await;
317 let bytes = self
318 .execute(
319 Method::POST,
320 &format!("api/v3/canister/{}/query", effective_canister_id.to_text()),
321 Some(serialized_bytes),
322 )
323 .await?
324 .1;
325 serde_cbor::from_slice(&bytes).map_err(AgentError::InvalidCborData)
326 }
327
328 async fn read_state_endpoint<A>(
329 &self,
330 effective_canister_id: Principal,
331 serialized_bytes: Vec<u8>,
332 ) -> Result<A, AgentError>
333 where
334 A: serde::de::DeserializeOwned,
335 {
336 let _permit = self.concurrent_requests_semaphore.acquire().await;
337 let endpoint = format!(
338 "api/v3/canister/{}/read_state",
339 effective_canister_id.to_text()
340 );
341 let bytes = self
342 .execute(Method::POST, &endpoint, Some(serialized_bytes))
343 .await?
344 .1;
345 serde_cbor::from_slice(&bytes).map_err(AgentError::InvalidCborData)
346 }
347
348 async fn read_subnet_state_endpoint<A>(
349 &self,
350 subnet_id: Principal,
351 serialized_bytes: Vec<u8>,
352 ) -> Result<A, AgentError>
353 where
354 A: serde::de::DeserializeOwned,
355 {
356 let _permit = self.concurrent_requests_semaphore.acquire().await;
357 let endpoint = format!("api/v3/subnet/{}/read_state", subnet_id.to_text());
358 let bytes = self
359 .execute(Method::POST, &endpoint, Some(serialized_bytes))
360 .await?
361 .1;
362 serde_cbor::from_slice(&bytes).map_err(AgentError::InvalidCborData)
363 }
364
365 async fn call_endpoint(
366 &self,
367 effective_canister_id: Principal,
368 serialized_bytes: Vec<u8>,
369 ) -> Result<TransportCallResponse, AgentError> {
370 let _permit = self.concurrent_requests_semaphore.acquire().await;
371 let endpoint = format!("api/v4/canister/{}/call", effective_canister_id.to_text());
372 let (status_code, response_body) = self
373 .execute(Method::POST, &endpoint, Some(serialized_bytes))
374 .await?;
375
376 if status_code == StatusCode::ACCEPTED {
377 return Ok(TransportCallResponse::Accepted);
378 }
379
380 serde_cbor::from_slice(&response_body).map_err(AgentError::InvalidCborData)
381 }
382
383 #[allow(clippy::too_many_arguments)]
386 async fn query_raw(
387 &self,
388 canister_id: Principal,
389 effective_canister_id: Principal,
390 method_name: String,
391 arg: Vec<u8>,
392 ingress_expiry_datetime: Option<u64>,
393 use_nonce: bool,
394 explicit_verify_query_signatures: Option<bool>,
395 ) -> Result<Vec<u8>, AgentError> {
396 let operation = Operation::Call {
397 canister: canister_id,
398 method: method_name.clone(),
399 };
400 let content = self.query_content(
401 canister_id,
402 method_name,
403 arg,
404 ingress_expiry_datetime,
405 use_nonce,
406 )?;
407 let serialized_bytes = sign_envelope(&content, self.identity.clone())?;
408 self.query_inner(
409 effective_canister_id,
410 serialized_bytes,
411 content.to_request_id(),
412 explicit_verify_query_signatures,
413 operation,
414 )
415 .await
416 }
417
418 pub async fn query_signed(
422 &self,
423 effective_canister_id: Principal,
424 signed_query: Vec<u8>,
425 ) -> Result<Vec<u8>, AgentError> {
426 let envelope: Envelope =
427 serde_cbor::from_slice(&signed_query).map_err(AgentError::InvalidCborData)?;
428 let EnvelopeContent::Query {
429 canister_id,
430 method_name,
431 ..
432 } = &*envelope.content
433 else {
434 return Err(AgentError::CallDataMismatch {
435 field: "request_type".to_string(),
436 value_arg: "query".to_string(),
437 value_cbor: if matches!(*envelope.content, EnvelopeContent::Call { .. }) {
438 "update"
439 } else {
440 "read_state"
441 }
442 .to_string(),
443 });
444 };
445 let operation = Operation::Call {
446 canister: *canister_id,
447 method: method_name.clone(),
448 };
449 self.query_inner(
450 effective_canister_id,
451 signed_query,
452 envelope.content.to_request_id(),
453 None,
454 operation,
455 )
456 .await
457 }
458
459 async fn query_inner(
463 &self,
464 effective_canister_id: Principal,
465 signed_query: Vec<u8>,
466 request_id: RequestId,
467 explicit_verify_query_signatures: Option<bool>,
468 operation: Operation,
469 ) -> Result<Vec<u8>, AgentError> {
470 let response = if explicit_verify_query_signatures.unwrap_or(self.verify_query_signatures) {
471 let (response, mut subnet) = futures_util::try_join!(
472 self.query_endpoint::<QueryResponse>(effective_canister_id, signed_query),
473 self.get_subnet_by_canister(&effective_canister_id)
474 )?;
475 if response.signatures().is_empty() {
476 return Err(AgentError::MissingSignature);
477 } else if response.signatures().len() > subnet.node_keys.len() {
478 return Err(AgentError::TooManySignatures {
479 had: response.signatures().len(),
480 needed: subnet.node_keys.len(),
481 });
482 }
483 for signature in response.signatures() {
484 if OffsetDateTime::now_utc()
485 - OffsetDateTime::from_unix_timestamp_nanos(signature.timestamp.into()).unwrap()
486 > self.ingress_expiry
487 {
488 return Err(AgentError::CertificateOutdated(self.ingress_expiry));
489 }
490 let signable = response.signable(request_id, signature.timestamp);
491 let node_key = if let Some(node_key) = subnet.node_keys.get(&signature.identity) {
492 node_key
493 } else {
494 subnet = self
495 .fetch_subnet_by_canister(&effective_canister_id)
496 .await?;
497 subnet
498 .node_keys
499 .get(&signature.identity)
500 .ok_or(AgentError::CertificateNotAuthorized())?
501 };
502 if node_key.len() != 44 {
503 return Err(AgentError::DerKeyLengthMismatch {
504 expected: 44,
505 actual: node_key.len(),
506 });
507 }
508 const DER_PREFIX: [u8; 12] = [48, 42, 48, 5, 6, 3, 43, 101, 112, 3, 33, 0];
509 if node_key[..12] != DER_PREFIX {
510 return Err(AgentError::DerPrefixMismatch {
511 expected: DER_PREFIX.to_vec(),
512 actual: node_key[..12].to_vec(),
513 });
514 }
515 let pubkey = PublicKey::deserialize_raw(&node_key[12..])
516 .map_err(|_| AgentError::MalformedPublicKey)?;
517
518 match pubkey.verify_signature(&signable, &signature.signature[..]) {
519 Ok(()) => (),
520 Err(SignatureError::InvalidSignature) => {
521 return Err(AgentError::QuerySignatureVerificationFailed)
522 }
523 Err(SignatureError::InvalidLength) => {
524 return Err(AgentError::MalformedSignature)
525 }
526 _ => unreachable!(),
527 }
528 }
529 response
530 } else {
531 self.query_endpoint::<QueryResponse>(effective_canister_id, signed_query)
532 .await?
533 };
534
535 match response {
536 QueryResponse::Replied { reply, .. } => Ok(reply.arg),
537 QueryResponse::Rejected { reject, .. } => Err(AgentError::UncertifiedReject {
538 reject,
539 operation: Some(operation),
540 }),
541 }
542 }
543
544 fn query_content(
545 &self,
546 canister_id: Principal,
547 method_name: String,
548 arg: Vec<u8>,
549 ingress_expiry_datetime: Option<u64>,
550 use_nonce: bool,
551 ) -> Result<EnvelopeContent, AgentError> {
552 Ok(EnvelopeContent::Query {
553 sender: self.identity.sender().map_err(AgentError::SigningError)?,
554 canister_id,
555 method_name,
556 arg,
557 ingress_expiry: ingress_expiry_datetime.unwrap_or_else(|| self.get_expiry_date()),
558 nonce: use_nonce.then(|| self.nonce_factory.generate()).flatten(),
559 })
560 }
561
562 async fn update_raw(
564 &self,
565 canister_id: Principal,
566 effective_canister_id: Principal,
567 method_name: String,
568 arg: Vec<u8>,
569 ingress_expiry_datetime: Option<u64>,
570 ) -> Result<CallResponse<(Vec<u8>, Certificate)>, AgentError> {
571 let nonce = self.nonce_factory.generate();
572 let content = self.update_content(
573 canister_id,
574 method_name.clone(),
575 arg,
576 ingress_expiry_datetime,
577 nonce,
578 )?;
579 let operation = Some(Operation::Call {
580 canister: canister_id,
581 method: method_name,
582 });
583 let request_id = to_request_id(&content)?;
584 let serialized_bytes = sign_envelope(&content, self.identity.clone())?;
585
586 let response_body = self
587 .call_endpoint(effective_canister_id, serialized_bytes)
588 .await?;
589
590 match response_body {
591 TransportCallResponse::Replied { certificate } => {
592 let certificate =
593 serde_cbor::from_slice(&certificate).map_err(AgentError::InvalidCborData)?;
594
595 self.verify(&certificate, effective_canister_id)?;
596 let status = lookup_request_status(&certificate, &request_id)?;
597
598 match status {
599 RequestStatusResponse::Replied(reply) => {
600 Ok(CallResponse::Response((reply.arg, certificate)))
601 }
602 RequestStatusResponse::Rejected(reject_response) => {
603 Err(AgentError::CertifiedReject {
604 reject: reject_response,
605 operation,
606 })?
607 }
608 _ => Ok(CallResponse::Poll(request_id)),
609 }
610 }
611 TransportCallResponse::Accepted => Ok(CallResponse::Poll(request_id)),
612 TransportCallResponse::NonReplicatedRejection(reject_response) => {
613 Err(AgentError::UncertifiedReject {
614 reject: reject_response,
615 operation,
616 })
617 }
618 }
619 }
620
621 pub async fn update_signed(
625 &self,
626 effective_canister_id: Principal,
627 signed_update: Vec<u8>,
628 ) -> Result<CallResponse<Vec<u8>>, AgentError> {
629 let envelope: Envelope =
630 serde_cbor::from_slice(&signed_update).map_err(AgentError::InvalidCborData)?;
631 let EnvelopeContent::Call {
632 canister_id,
633 method_name,
634 ..
635 } = &*envelope.content
636 else {
637 return Err(AgentError::CallDataMismatch {
638 field: "request_type".to_string(),
639 value_arg: "update".to_string(),
640 value_cbor: if matches!(*envelope.content, EnvelopeContent::Query { .. }) {
641 "query"
642 } else {
643 "read_state"
644 }
645 .to_string(),
646 });
647 };
648 let operation = Some(Operation::Call {
649 canister: *canister_id,
650 method: method_name.clone(),
651 });
652 let request_id = to_request_id(&envelope.content)?;
653
654 let response_body = self
655 .call_endpoint(effective_canister_id, signed_update)
656 .await?;
657
658 match response_body {
659 TransportCallResponse::Replied { certificate } => {
660 let certificate =
661 serde_cbor::from_slice(&certificate).map_err(AgentError::InvalidCborData)?;
662
663 self.verify(&certificate, effective_canister_id)?;
664 let status = lookup_request_status(&certificate, &request_id)?;
665
666 match status {
667 RequestStatusResponse::Replied(reply) => Ok(CallResponse::Response(reply.arg)),
668 RequestStatusResponse::Rejected(reject_response) => {
669 Err(AgentError::CertifiedReject {
670 reject: reject_response,
671 operation,
672 })?
673 }
674 _ => Ok(CallResponse::Poll(request_id)),
675 }
676 }
677 TransportCallResponse::Accepted => Ok(CallResponse::Poll(request_id)),
678 TransportCallResponse::NonReplicatedRejection(reject_response) => {
679 Err(AgentError::UncertifiedReject {
680 reject: reject_response,
681 operation,
682 })
683 }
684 }
685 }
686
687 fn update_content(
688 &self,
689 canister_id: Principal,
690 method_name: String,
691 arg: Vec<u8>,
692 ingress_expiry_datetime: Option<u64>,
693 nonce: Option<Vec<u8>>,
694 ) -> Result<EnvelopeContent, AgentError> {
695 Ok(EnvelopeContent::Call {
696 canister_id,
697 method_name,
698 arg,
699 nonce,
700 sender: self.identity.sender().map_err(AgentError::SigningError)?,
701 ingress_expiry: ingress_expiry_datetime.unwrap_or_else(|| self.get_expiry_date()),
702 })
703 }
704
705 fn get_retry_policy(&self) -> ExponentialBackoff<SystemClock> {
706 ExponentialBackoffBuilder::new()
707 .with_initial_interval(Duration::from_millis(500))
708 .with_max_interval(Duration::from_secs(1))
709 .with_multiplier(1.4)
710 .with_max_elapsed_time(Some(self.max_polling_time))
711 .build()
712 }
713
714 pub async fn wait_signed(
716 &self,
717 request_id: &RequestId,
718 effective_canister_id: Principal,
719 signed_request_status: Vec<u8>,
720 ) -> Result<(Vec<u8>, Certificate), AgentError> {
721 let mut retry_policy = self.get_retry_policy();
722
723 let mut request_accepted = false;
724 loop {
725 let (resp, cert) = self
726 .request_status_signed(
727 request_id,
728 effective_canister_id,
729 signed_request_status.clone(),
730 )
731 .await?;
732 match resp {
733 RequestStatusResponse::Unknown => {
734 if retry_policy.get_elapsed_time() > Duration::from_secs(5 * 60) {
736 return Err(AgentError::TimeoutWaitingForResponse());
737 }
738 }
739
740 RequestStatusResponse::Received | RequestStatusResponse::Processing => {
741 if !request_accepted {
742 retry_policy.reset();
743 request_accepted = true;
744 }
745 }
746
747 RequestStatusResponse::Replied(ReplyResponse { arg, .. }) => {
748 return Ok((arg, cert))
749 }
750
751 RequestStatusResponse::Rejected(response) => {
752 return Err(AgentError::CertifiedReject {
753 reject: response,
754 operation: None,
755 })
756 }
757
758 RequestStatusResponse::Done => {
759 return Err(AgentError::RequestStatusDoneNoReply(String::from(
760 *request_id,
761 )))
762 }
763 };
764
765 match retry_policy.next_backoff() {
766 Some(duration) => crate::util::sleep(duration).await,
767
768 None => return Err(AgentError::TimeoutWaitingForResponse()),
769 }
770 }
771 }
772
773 pub async fn wait(
775 &self,
776 request_id: &RequestId,
777 effective_canister_id: Principal,
778 ) -> Result<(Vec<u8>, Certificate), AgentError> {
779 self.wait_inner(request_id, effective_canister_id, None)
780 .await
781 }
782
783 async fn wait_inner(
784 &self,
785 request_id: &RequestId,
786 effective_canister_id: Principal,
787 operation: Option<Operation>,
788 ) -> Result<(Vec<u8>, Certificate), AgentError> {
789 let mut retry_policy = self.get_retry_policy();
790
791 let mut request_accepted = false;
792 loop {
793 let (resp, cert) = self
794 .request_status_raw(request_id, effective_canister_id)
795 .await?;
796 match resp {
797 RequestStatusResponse::Unknown => {
798 if retry_policy.get_elapsed_time() > Duration::from_secs(5 * 60) {
800 return Err(AgentError::TimeoutWaitingForResponse());
801 }
802 }
803
804 RequestStatusResponse::Received | RequestStatusResponse::Processing => {
805 if !request_accepted {
806 retry_policy.reset();
814 request_accepted = true;
815 }
816 }
817
818 RequestStatusResponse::Replied(ReplyResponse { arg, .. }) => {
819 return Ok((arg, cert))
820 }
821
822 RequestStatusResponse::Rejected(response) => {
823 return Err(AgentError::CertifiedReject {
824 reject: response,
825 operation,
826 })
827 }
828
829 RequestStatusResponse::Done => {
830 return Err(AgentError::RequestStatusDoneNoReply(String::from(
831 *request_id,
832 )))
833 }
834 };
835
836 match retry_policy.next_backoff() {
837 Some(duration) => crate::util::sleep(duration).await,
838
839 None => return Err(AgentError::TimeoutWaitingForResponse()),
840 }
841 }
842 }
843
844 pub async fn read_state_raw(
847 &self,
848 paths: Vec<Vec<Label>>,
849 effective_canister_id: Principal,
850 ) -> Result<Certificate, AgentError> {
851 let content = self.read_state_content(paths)?;
852 let serialized_bytes = sign_envelope(&content, self.identity.clone())?;
853
854 let read_state_response: ReadStateResponse = self
855 .read_state_endpoint(effective_canister_id, serialized_bytes)
856 .await?;
857 let cert: Certificate = serde_cbor::from_slice(&read_state_response.certificate)
858 .map_err(AgentError::InvalidCborData)?;
859 self.verify(&cert, effective_canister_id)?;
860 Ok(cert)
861 }
862
863 pub async fn read_subnet_state_raw(
866 &self,
867 paths: Vec<Vec<Label>>,
868 subnet_id: Principal,
869 ) -> Result<Certificate, AgentError> {
870 let content = self.read_state_content(paths)?;
871 let serialized_bytes = sign_envelope(&content, self.identity.clone())?;
872
873 let read_state_response: ReadStateResponse = self
874 .read_subnet_state_endpoint(subnet_id, serialized_bytes)
875 .await?;
876 let cert: Certificate = serde_cbor::from_slice(&read_state_response.certificate)
877 .map_err(AgentError::InvalidCborData)?;
878 self.verify_for_subnet(&cert, subnet_id)?;
879 Ok(cert)
880 }
881
882 fn read_state_content(&self, paths: Vec<Vec<Label>>) -> Result<EnvelopeContent, AgentError> {
883 Ok(EnvelopeContent::ReadState {
884 sender: self.identity.sender().map_err(AgentError::SigningError)?,
885 paths,
886 ingress_expiry: self.get_expiry_date(),
887 })
888 }
889
890 pub fn verify(
893 &self,
894 cert: &Certificate,
895 effective_canister_id: Principal,
896 ) -> Result<(), AgentError> {
897 self.verify_cert(cert, effective_canister_id)?;
898 self.verify_cert_timestamp(cert)?;
899 Ok(())
900 }
901
902 fn verify_cert(
903 &self,
904 cert: &Certificate,
905 effective_canister_id: Principal,
906 ) -> Result<(), AgentError> {
907 let sig = &cert.signature;
908
909 let root_hash = cert.tree.digest();
910 let mut msg = vec![];
911 msg.extend_from_slice(IC_STATE_ROOT_DOMAIN_SEPARATOR);
912 msg.extend_from_slice(&root_hash);
913
914 let der_key = self.check_delegation(&cert.delegation, effective_canister_id)?;
915 let key = extract_der(der_key)?;
916
917 ic_verify_bls_signature::verify_bls_signature(sig, &msg, &key)
918 .map_err(|_| AgentError::CertificateVerificationFailed())?;
919 Ok(())
920 }
921
922 pub fn verify_for_subnet(
925 &self,
926 cert: &Certificate,
927 subnet_id: Principal,
928 ) -> Result<(), AgentError> {
929 self.verify_cert_for_subnet(cert, subnet_id)?;
930 self.verify_cert_timestamp(cert)?;
931 Ok(())
932 }
933
934 fn verify_cert_for_subnet(
935 &self,
936 cert: &Certificate,
937 subnet_id: Principal,
938 ) -> Result<(), AgentError> {
939 let sig = &cert.signature;
940
941 let root_hash = cert.tree.digest();
942 let mut msg = vec![];
943 msg.extend_from_slice(IC_STATE_ROOT_DOMAIN_SEPARATOR);
944 msg.extend_from_slice(&root_hash);
945
946 let der_key = self.check_delegation_for_subnet(&cert.delegation, subnet_id)?;
947 let key = extract_der(der_key)?;
948
949 ic_verify_bls_signature::verify_bls_signature(sig, &msg, &key)
950 .map_err(|_| AgentError::CertificateVerificationFailed())?;
951 Ok(())
952 }
953
954 fn verify_cert_timestamp(&self, cert: &Certificate) -> Result<(), AgentError> {
955 let time = lookup_time(cert)?;
958 if (OffsetDateTime::now_utc()
959 - OffsetDateTime::from_unix_timestamp_nanos(time.into()).unwrap())
960 > self.ingress_expiry
961 {
962 Err(AgentError::CertificateOutdated(self.ingress_expiry))
963 } else {
964 Ok(())
965 }
966 }
967
968 fn check_delegation(
969 &self,
970 delegation: &Option<Delegation>,
971 effective_canister_id: Principal,
972 ) -> Result<Vec<u8>, AgentError> {
973 match delegation {
974 None => Ok(self.read_root_key()),
975 Some(delegation) => {
976 let cert: Certificate = serde_cbor::from_slice(&delegation.certificate)
977 .map_err(AgentError::InvalidCborData)?;
978 if cert.delegation.is_some() {
979 return Err(AgentError::CertificateHasTooManyDelegations);
980 }
981 self.verify_cert(&cert, effective_canister_id)?;
982 let canister_range_shards_lookup =
983 ["canister_ranges".as_bytes(), delegation.subnet_id.as_ref()];
984 let canister_range_shards = lookup_tree(&cert.tree, canister_range_shards_lookup)?;
985 let mut shard_paths = canister_range_shards
986 .list_paths() .into_iter()
988 .map(|mut x| {
989 x.pop() .ok_or_else(AgentError::CertificateVerificationFailed)
991 })
992 .collect::<Result<Vec<_>, _>>()?;
993 if shard_paths.is_empty() {
994 return Err(AgentError::CertificateNotAuthorized());
995 }
996 shard_paths.sort_unstable();
997 let shard_division = shard_paths
998 .partition_point(|shard| shard.as_bytes() <= effective_canister_id.as_slice());
999 if shard_division == 0 {
1000 return Err(AgentError::CertificateNotAuthorized());
1002 }
1003 let max_potential_shard = &shard_paths[shard_division - 1];
1004 let canister_range_lookup = [max_potential_shard.as_bytes()];
1005 let canister_range = lookup_value(&canister_range_shards, canister_range_lookup)?;
1006 let ranges: Vec<(Principal, Principal)> =
1007 serde_cbor::from_slice(canister_range).map_err(AgentError::InvalidCborData)?;
1008 if !principal_is_within_ranges(&effective_canister_id, &ranges[..]) {
1009 return Err(AgentError::CertificateNotAuthorized());
1011 }
1012
1013 let public_key_path = [
1014 "subnet".as_bytes(),
1015 delegation.subnet_id.as_ref(),
1016 "public_key".as_bytes(),
1017 ];
1018 lookup_value(&cert.tree, public_key_path).map(<[u8]>::to_vec)
1019 }
1020 }
1021 }
1022
1023 fn check_delegation_for_subnet(
1024 &self,
1025 delegation: &Option<Delegation>,
1026 subnet_id: Principal,
1027 ) -> Result<Vec<u8>, AgentError> {
1028 match delegation {
1029 None => Ok(self.read_root_key()),
1030 Some(delegation) => {
1031 let cert: Certificate = serde_cbor::from_slice(&delegation.certificate)
1032 .map_err(AgentError::InvalidCborData)?;
1033 if cert.delegation.is_some() {
1034 return Err(AgentError::CertificateHasTooManyDelegations);
1035 }
1036 self.verify_cert_for_subnet(&cert, subnet_id)?;
1037 let public_key_path = [
1038 "subnet".as_bytes(),
1039 subnet_id.as_ref(),
1040 "public_key".as_bytes(),
1041 ];
1042 let pk = lookup_value(&cert.tree, public_key_path)
1043 .map_err(|_| AgentError::CertificateNotAuthorized())?
1044 .to_vec();
1045 Ok(pk)
1046 }
1047 }
1048 }
1049
1050 pub async fn read_state_canister_info(
1053 &self,
1054 canister_id: Principal,
1055 path: &str,
1056 ) -> Result<Vec<u8>, AgentError> {
1057 let paths: Vec<Vec<Label>> = vec![vec![
1058 "canister".into(),
1059 Label::from_bytes(canister_id.as_slice()),
1060 path.into(),
1061 ]];
1062
1063 let cert = self.read_state_raw(paths, canister_id).await?;
1064
1065 lookup_canister_info(cert, canister_id, path)
1066 }
1067
1068 pub async fn read_state_canister_controllers(
1070 &self,
1071 canister_id: Principal,
1072 ) -> Result<Vec<Principal>, AgentError> {
1073 let blob = self
1074 .read_state_canister_info(canister_id, "controllers")
1075 .await?;
1076 let controllers: Vec<Principal> =
1077 serde_cbor::from_slice(&blob).map_err(AgentError::InvalidCborData)?;
1078 Ok(controllers)
1079 }
1080
1081 pub async fn read_state_canister_module_hash(
1083 &self,
1084 canister_id: Principal,
1085 ) -> Result<Vec<u8>, AgentError> {
1086 self.read_state_canister_info(canister_id, "module_hash")
1087 .await
1088 }
1089
1090 pub async fn read_state_canister_metadata(
1092 &self,
1093 canister_id: Principal,
1094 path: &str,
1095 ) -> Result<Vec<u8>, AgentError> {
1096 let paths: Vec<Vec<Label>> = vec![vec![
1097 "canister".into(),
1098 Label::from_bytes(canister_id.as_slice()),
1099 "metadata".into(),
1100 path.into(),
1101 ]];
1102
1103 let cert = self.read_state_raw(paths, canister_id).await?;
1104
1105 lookup_canister_metadata(cert, canister_id, path)
1106 }
1107
1108 pub async fn read_state_subnet_metrics(
1110 &self,
1111 subnet_id: Principal,
1112 ) -> Result<SubnetMetrics, AgentError> {
1113 let paths = vec![vec![
1114 "subnet".into(),
1115 Label::from_bytes(subnet_id.as_slice()),
1116 "metrics".into(),
1117 ]];
1118 let cert = self.read_subnet_state_raw(paths, subnet_id).await?;
1119 lookup_subnet_metrics(cert, subnet_id)
1120 }
1121
1122 pub async fn read_state_subnet_canister_ranges(
1124 &self,
1125 subnet_id: Principal,
1126 ) -> Result<Vec<(Principal, Principal)>, AgentError> {
1127 let paths = vec![vec![
1128 "subnet".into(),
1129 Label::from_bytes(subnet_id.as_slice()),
1130 "canister_ranges".into(),
1131 ]];
1132 let cert = self.read_subnet_state_raw(paths, subnet_id).await?;
1133 lookup_subnet_canister_ranges(&cert, subnet_id)
1134 }
1135
1136 pub async fn request_status_raw(
1138 &self,
1139 request_id: &RequestId,
1140 effective_canister_id: Principal,
1141 ) -> Result<(RequestStatusResponse, Certificate), AgentError> {
1142 let paths: Vec<Vec<Label>> =
1143 vec![vec!["request_status".into(), request_id.to_vec().into()]];
1144
1145 let cert = self.read_state_raw(paths, effective_canister_id).await?;
1146
1147 Ok((lookup_request_status(&cert, request_id)?, cert))
1148 }
1149
1150 pub async fn request_status_signed(
1154 &self,
1155 request_id: &RequestId,
1156 effective_canister_id: Principal,
1157 signed_request_status: Vec<u8>,
1158 ) -> Result<(RequestStatusResponse, Certificate), AgentError> {
1159 let _envelope: Envelope =
1160 serde_cbor::from_slice(&signed_request_status).map_err(AgentError::InvalidCborData)?;
1161 let read_state_response: ReadStateResponse = self
1162 .read_state_endpoint(effective_canister_id, signed_request_status)
1163 .await?;
1164
1165 let cert: Certificate = serde_cbor::from_slice(&read_state_response.certificate)
1166 .map_err(AgentError::InvalidCborData)?;
1167 self.verify(&cert, effective_canister_id)?;
1168 Ok((lookup_request_status(&cert, request_id)?, cert))
1169 }
1170
1171 pub fn update<S: Into<String>>(
1174 &self,
1175 canister_id: &Principal,
1176 method_name: S,
1177 ) -> UpdateBuilder<'_> {
1178 UpdateBuilder::new(self, *canister_id, method_name.into())
1179 }
1180
1181 pub async fn status(&self) -> Result<Status, AgentError> {
1183 let endpoint = "api/v2/status";
1184 let bytes = self.execute(Method::GET, endpoint, None).await?.1;
1185
1186 let cbor: serde_cbor::Value =
1187 serde_cbor::from_slice(&bytes).map_err(AgentError::InvalidCborData)?;
1188
1189 Status::try_from(&cbor).map_err(|_| AgentError::InvalidReplicaStatus)
1190 }
1191
1192 pub fn query<S: Into<String>>(
1195 &self,
1196 canister_id: &Principal,
1197 method_name: S,
1198 ) -> QueryBuilder<'_> {
1199 QueryBuilder::new(self, *canister_id, method_name.into())
1200 }
1201
1202 pub fn sign_request_status(
1205 &self,
1206 effective_canister_id: Principal,
1207 request_id: RequestId,
1208 ) -> Result<SignedRequestStatus, AgentError> {
1209 let paths: Vec<Vec<Label>> =
1210 vec![vec!["request_status".into(), request_id.to_vec().into()]];
1211 let read_state_content = self.read_state_content(paths)?;
1212 let signed_request_status = sign_envelope(&read_state_content, self.identity.clone())?;
1213 let ingress_expiry = read_state_content.ingress_expiry();
1214 let sender = *read_state_content.sender();
1215 Ok(SignedRequestStatus {
1216 ingress_expiry,
1217 sender,
1218 effective_canister_id,
1219 request_id,
1220 signed_request_status,
1221 })
1222 }
1223
1224 pub async fn get_subnet_by_canister(
1227 &self,
1228 canister: &Principal,
1229 ) -> Result<Arc<Subnet>, AgentError> {
1230 let subnet = self
1231 .subnet_key_cache
1232 .lock()
1233 .unwrap()
1234 .get_subnet_by_canister(canister);
1235 if let Some(subnet) = subnet {
1236 Ok(subnet)
1237 } else {
1238 self.fetch_subnet_by_canister(canister).await
1239 }
1240 }
1241
1242 pub async fn get_subnet_by_id(&self, subnet_id: &Principal) -> Result<Arc<Subnet>, AgentError> {
1245 let subnet = self
1246 .subnet_key_cache
1247 .lock()
1248 .unwrap()
1249 .get_subnet_by_id(subnet_id);
1250 if let Some(subnet) = subnet {
1251 Ok(subnet)
1252 } else {
1253 self.fetch_subnet_by_id(subnet_id).await
1254 }
1255 }
1256
1257 pub async fn fetch_api_boundary_nodes_by_canister_id(
1259 &self,
1260 canister_id: Principal,
1261 ) -> Result<Vec<ApiBoundaryNode>, AgentError> {
1262 let paths = vec![vec!["api_boundary_nodes".into()]];
1263 let certificate = self.read_state_raw(paths, canister_id).await?;
1264 let api_boundary_nodes = lookup_api_boundary_nodes(certificate)?;
1265 Ok(api_boundary_nodes)
1266 }
1267
1268 pub async fn fetch_api_boundary_nodes_by_subnet_id(
1270 &self,
1271 subnet_id: Principal,
1272 ) -> Result<Vec<ApiBoundaryNode>, AgentError> {
1273 let paths = vec![vec!["api_boundary_nodes".into()]];
1274 let certificate = self.read_subnet_state_raw(paths, subnet_id).await?;
1275 let api_boundary_nodes = lookup_api_boundary_nodes(certificate)?;
1276 Ok(api_boundary_nodes)
1277 }
1278
1279 pub async fn fetch_subnet_by_canister(
1284 &self,
1285 canister: &Principal,
1286 ) -> Result<Arc<Subnet>, AgentError> {
1287 let canister_cert = self
1288 .read_state_raw(vec![vec!["subnet".into()]], *canister)
1289 .await?;
1290 let subnet_id = if let Some(delegation) = canister_cert.delegation.as_ref() {
1291 Principal::from_slice(&delegation.subnet_id)
1292 } else {
1293 Principal::self_authenticating(&self.root_key.read().unwrap()[..])
1295 };
1296 let mut subnet = lookup_incomplete_subnet(&subnet_id, &canister_cert)?;
1297 let canister_ranges = if let Some(delegation) = canister_cert.delegation.as_ref() {
1298 let delegation_cert: Certificate = serde_cbor::from_slice(&delegation.certificate)?;
1300 lookup_canister_ranges(&subnet_id, &delegation_cert)?
1301 } else {
1302 lookup_canister_ranges(&subnet_id, &canister_cert)?
1303 };
1304 subnet.canister_ranges = canister_ranges;
1305 if !subnet.canister_ranges.contains(canister) {
1306 return Err(AgentError::CertificateNotAuthorized());
1307 }
1308 let subnet = Arc::new(subnet);
1309 self.subnet_key_cache
1310 .lock()
1311 .unwrap()
1312 .insert_subnet(subnet_id, subnet.clone());
1313 Ok(subnet)
1314 }
1315
1316 pub async fn fetch_subnet_by_id(
1321 &self,
1322 subnet_id: &Principal,
1323 ) -> Result<Arc<Subnet>, AgentError> {
1324 let subnet_cert = self
1325 .read_subnet_state_raw(
1326 vec![
1327 vec!["canister_ranges".into(), subnet_id.as_slice().into()],
1328 vec!["subnet".into(), subnet_id.as_slice().into()],
1329 ],
1330 *subnet_id,
1331 )
1332 .await?;
1333 let subnet = lookup_subnet_and_ranges(subnet_id, &subnet_cert)?;
1334 let subnet = Arc::new(subnet);
1335 self.subnet_key_cache
1336 .lock()
1337 .unwrap()
1338 .insert_subnet(*subnet_id, subnet.clone());
1339 Ok(subnet)
1340 }
1341
1342 async fn request(
1343 &self,
1344 method: Method,
1345 endpoint: &str,
1346 body: Option<Vec<u8>>,
1347 ) -> Result<(StatusCode, HeaderMap, Vec<u8>), AgentError> {
1348 let body = body.map(Bytes::from);
1349
1350 let create_request_with_generated_url = || -> Result<http::Request<Bytes>, AgentError> {
1351 let url = self.route_provider.route()?.join(endpoint)?;
1352 let uri = Uri::from_str(url.as_str())
1353 .map_err(|e| AgentError::InvalidReplicaUrl(e.to_string()))?;
1354 let body = body.clone().unwrap_or_default();
1355 let request = http::Request::builder()
1356 .method(method.clone())
1357 .uri(uri)
1358 .header(CONTENT_TYPE, "application/cbor")
1359 .body(body)
1360 .map_err(|e| {
1361 AgentError::TransportError(TransportError::Generic(format!(
1362 "unable to create request: {e:#}"
1363 )))
1364 })?;
1365
1366 Ok(request)
1367 };
1368
1369 let response = self
1370 .client
1371 .call(
1372 &create_request_with_generated_url,
1373 self.max_tcp_error_retries,
1374 self.max_response_body_size,
1375 )
1376 .await?;
1377
1378 let (parts, body) = response.into_parts();
1379
1380 Ok((parts.status, parts.headers, body.to_vec()))
1381 }
1382
1383 async fn execute(
1384 &self,
1385 method: Method,
1386 endpoint: &str,
1387 body: Option<Vec<u8>>,
1388 ) -> Result<(StatusCode, Vec<u8>), AgentError> {
1389 let request_result = self.request(method.clone(), endpoint, body.clone()).await?;
1390
1391 let status = request_result.0;
1392 let headers = request_result.1;
1393 let body = request_result.2;
1394
1395 if status.is_client_error() || status.is_server_error() {
1396 Err(AgentError::HttpError(HttpErrorPayload {
1397 status: status.into(),
1398 content_type: headers
1399 .get(CONTENT_TYPE)
1400 .and_then(|value| value.to_str().ok())
1401 .map(str::to_string),
1402 content: body,
1403 }))
1404 } else if !(status == StatusCode::OK || status == StatusCode::ACCEPTED) {
1405 Err(AgentError::InvalidHttpResponse(format!(
1406 "Expected `200`, `202`, 4xx`, or `5xx` HTTP status code. Got: {status}",
1407 )))
1408 } else {
1409 Ok((status, body))
1410 }
1411 }
1412}
1413
1414fn principal_is_within_ranges(principal: &Principal, ranges: &[(Principal, Principal)]) -> bool {
1417 ranges
1418 .iter()
1419 .any(|r| principal >= &r.0 && principal <= &r.1)
1420}
1421
1422fn sign_envelope(
1423 content: &EnvelopeContent,
1424 identity: Arc<dyn Identity>,
1425) -> Result<Vec<u8>, AgentError> {
1426 let signature = identity.sign(content).map_err(AgentError::SigningError)?;
1427
1428 let envelope = Envelope {
1429 content: Cow::Borrowed(content),
1430 sender_pubkey: signature.public_key,
1431 sender_sig: signature.signature,
1432 sender_delegation: signature.delegations,
1433 };
1434
1435 let mut serialized_bytes = Vec::new();
1436 let mut serializer = serde_cbor::Serializer::new(&mut serialized_bytes);
1437 serializer.self_describe()?;
1438 envelope.serialize(&mut serializer)?;
1439
1440 Ok(serialized_bytes)
1441}
1442
1443pub fn signed_query_inspect(
1446 sender: Principal,
1447 canister_id: Principal,
1448 method_name: &str,
1449 arg: &[u8],
1450 ingress_expiry: u64,
1451 signed_query: Vec<u8>,
1452) -> Result<(), AgentError> {
1453 let envelope: Envelope =
1454 serde_cbor::from_slice(&signed_query).map_err(AgentError::InvalidCborData)?;
1455 match envelope.content.as_ref() {
1456 EnvelopeContent::Query {
1457 ingress_expiry: ingress_expiry_cbor,
1458 sender: sender_cbor,
1459 canister_id: canister_id_cbor,
1460 method_name: method_name_cbor,
1461 arg: arg_cbor,
1462 nonce: _nonce,
1463 } => {
1464 if ingress_expiry != *ingress_expiry_cbor {
1465 return Err(AgentError::CallDataMismatch {
1466 field: "ingress_expiry".to_string(),
1467 value_arg: ingress_expiry.to_string(),
1468 value_cbor: ingress_expiry_cbor.to_string(),
1469 });
1470 }
1471 if sender != *sender_cbor {
1472 return Err(AgentError::CallDataMismatch {
1473 field: "sender".to_string(),
1474 value_arg: sender.to_string(),
1475 value_cbor: sender_cbor.to_string(),
1476 });
1477 }
1478 if canister_id != *canister_id_cbor {
1479 return Err(AgentError::CallDataMismatch {
1480 field: "canister_id".to_string(),
1481 value_arg: canister_id.to_string(),
1482 value_cbor: canister_id_cbor.to_string(),
1483 });
1484 }
1485 if method_name != *method_name_cbor {
1486 return Err(AgentError::CallDataMismatch {
1487 field: "method_name".to_string(),
1488 value_arg: method_name.to_string(),
1489 value_cbor: method_name_cbor.clone(),
1490 });
1491 }
1492 if arg != *arg_cbor {
1493 return Err(AgentError::CallDataMismatch {
1494 field: "arg".to_string(),
1495 value_arg: format!("{arg:?}"),
1496 value_cbor: format!("{arg_cbor:?}"),
1497 });
1498 }
1499 }
1500 EnvelopeContent::Call { .. } => {
1501 return Err(AgentError::CallDataMismatch {
1502 field: "request_type".to_string(),
1503 value_arg: "query".to_string(),
1504 value_cbor: "call".to_string(),
1505 })
1506 }
1507 EnvelopeContent::ReadState { .. } => {
1508 return Err(AgentError::CallDataMismatch {
1509 field: "request_type".to_string(),
1510 value_arg: "query".to_string(),
1511 value_cbor: "read_state".to_string(),
1512 })
1513 }
1514 }
1515 Ok(())
1516}
1517
1518pub fn signed_update_inspect(
1521 sender: Principal,
1522 canister_id: Principal,
1523 method_name: &str,
1524 arg: &[u8],
1525 ingress_expiry: u64,
1526 signed_update: Vec<u8>,
1527) -> Result<(), AgentError> {
1528 let envelope: Envelope =
1529 serde_cbor::from_slice(&signed_update).map_err(AgentError::InvalidCborData)?;
1530 match envelope.content.as_ref() {
1531 EnvelopeContent::Call {
1532 nonce: _nonce,
1533 ingress_expiry: ingress_expiry_cbor,
1534 sender: sender_cbor,
1535 canister_id: canister_id_cbor,
1536 method_name: method_name_cbor,
1537 arg: arg_cbor,
1538 } => {
1539 if ingress_expiry != *ingress_expiry_cbor {
1540 return Err(AgentError::CallDataMismatch {
1541 field: "ingress_expiry".to_string(),
1542 value_arg: ingress_expiry.to_string(),
1543 value_cbor: ingress_expiry_cbor.to_string(),
1544 });
1545 }
1546 if sender != *sender_cbor {
1547 return Err(AgentError::CallDataMismatch {
1548 field: "sender".to_string(),
1549 value_arg: sender.to_string(),
1550 value_cbor: sender_cbor.to_string(),
1551 });
1552 }
1553 if canister_id != *canister_id_cbor {
1554 return Err(AgentError::CallDataMismatch {
1555 field: "canister_id".to_string(),
1556 value_arg: canister_id.to_string(),
1557 value_cbor: canister_id_cbor.to_string(),
1558 });
1559 }
1560 if method_name != *method_name_cbor {
1561 return Err(AgentError::CallDataMismatch {
1562 field: "method_name".to_string(),
1563 value_arg: method_name.to_string(),
1564 value_cbor: method_name_cbor.clone(),
1565 });
1566 }
1567 if arg != *arg_cbor {
1568 return Err(AgentError::CallDataMismatch {
1569 field: "arg".to_string(),
1570 value_arg: format!("{arg:?}"),
1571 value_cbor: format!("{arg_cbor:?}"),
1572 });
1573 }
1574 }
1575 EnvelopeContent::ReadState { .. } => {
1576 return Err(AgentError::CallDataMismatch {
1577 field: "request_type".to_string(),
1578 value_arg: "call".to_string(),
1579 value_cbor: "read_state".to_string(),
1580 })
1581 }
1582 EnvelopeContent::Query { .. } => {
1583 return Err(AgentError::CallDataMismatch {
1584 field: "request_type".to_string(),
1585 value_arg: "call".to_string(),
1586 value_cbor: "query".to_string(),
1587 })
1588 }
1589 }
1590 Ok(())
1591}
1592
1593pub fn signed_request_status_inspect(
1596 sender: Principal,
1597 request_id: &RequestId,
1598 ingress_expiry: u64,
1599 signed_request_status: Vec<u8>,
1600) -> Result<(), AgentError> {
1601 let paths: Vec<Vec<Label>> = vec![vec!["request_status".into(), request_id.to_vec().into()]];
1602 let envelope: Envelope =
1603 serde_cbor::from_slice(&signed_request_status).map_err(AgentError::InvalidCborData)?;
1604 match envelope.content.as_ref() {
1605 EnvelopeContent::ReadState {
1606 ingress_expiry: ingress_expiry_cbor,
1607 sender: sender_cbor,
1608 paths: paths_cbor,
1609 } => {
1610 if ingress_expiry != *ingress_expiry_cbor {
1611 return Err(AgentError::CallDataMismatch {
1612 field: "ingress_expiry".to_string(),
1613 value_arg: ingress_expiry.to_string(),
1614 value_cbor: ingress_expiry_cbor.to_string(),
1615 });
1616 }
1617 if sender != *sender_cbor {
1618 return Err(AgentError::CallDataMismatch {
1619 field: "sender".to_string(),
1620 value_arg: sender.to_string(),
1621 value_cbor: sender_cbor.to_string(),
1622 });
1623 }
1624
1625 if paths != *paths_cbor {
1626 return Err(AgentError::CallDataMismatch {
1627 field: "paths".to_string(),
1628 value_arg: format!("{paths:?}"),
1629 value_cbor: format!("{paths_cbor:?}"),
1630 });
1631 }
1632 }
1633 EnvelopeContent::Query { .. } => {
1634 return Err(AgentError::CallDataMismatch {
1635 field: "request_type".to_string(),
1636 value_arg: "read_state".to_string(),
1637 value_cbor: "query".to_string(),
1638 })
1639 }
1640 EnvelopeContent::Call { .. } => {
1641 return Err(AgentError::CallDataMismatch {
1642 field: "request_type".to_string(),
1643 value_arg: "read_state".to_string(),
1644 value_cbor: "call".to_string(),
1645 })
1646 }
1647 }
1648 Ok(())
1649}
1650
1651#[derive(Clone)]
1652struct SubnetCache {
1653 subnets: TimedCache<Principal, Arc<Subnet>>,
1654 canister_index: RangeInclusiveMap<Principal, Principal, PrincipalStep>,
1655}
1656
1657impl SubnetCache {
1658 fn new() -> Self {
1659 Self {
1660 subnets: TimedCache::with_lifespan(Duration::from_secs(300)),
1661 canister_index: RangeInclusiveMap::new_with_step_fns(),
1662 }
1663 }
1664
1665 fn get_subnet_by_canister(&mut self, canister: &Principal) -> Option<Arc<Subnet>> {
1666 self.canister_index
1667 .get(canister)
1668 .and_then(|subnet_id| self.subnets.cache_get(subnet_id).cloned())
1669 .filter(|subnet| subnet.canister_ranges.contains(canister))
1670 }
1671
1672 fn get_subnet_by_id(&mut self, subnet_id: &Principal) -> Option<Arc<Subnet>> {
1673 self.subnets.cache_get(subnet_id).cloned()
1674 }
1675
1676 fn insert_subnet(&mut self, subnet_id: Principal, subnet: Arc<Subnet>) {
1677 self.subnets.cache_set(subnet_id, subnet.clone());
1678 for range in subnet.canister_ranges.iter() {
1679 self.canister_index.insert(range.clone(), subnet_id);
1680 }
1681 }
1682}
1683
1684#[derive(Clone, Copy)]
1685pub(crate) struct PrincipalStep;
1686
1687impl StepFns<Principal> for PrincipalStep {
1688 fn add_one(start: &Principal) -> Principal {
1689 let bytes = start.as_slice();
1690 let mut arr = [0; 29];
1691 arr[..bytes.len()].copy_from_slice(bytes);
1692 for byte in arr[..bytes.len() - 1].iter_mut().rev() {
1693 *byte = byte.wrapping_add(1);
1694 if *byte != 0 {
1695 break;
1696 }
1697 }
1698 Principal::from_slice(&arr[..bytes.len()])
1699 }
1700 fn sub_one(start: &Principal) -> Principal {
1701 let bytes = start.as_slice();
1702 let mut arr = [0; 29];
1703 arr[..bytes.len()].copy_from_slice(bytes);
1704 for byte in arr[..bytes.len() - 1].iter_mut().rev() {
1705 *byte = byte.wrapping_sub(1);
1706 if *byte != 255 {
1707 break;
1708 }
1709 }
1710 Principal::from_slice(&arr[..bytes.len()])
1711 }
1712}
1713
1714#[derive(Debug, Clone)]
1716pub struct ApiBoundaryNode {
1717 pub domain: String,
1719 pub ipv6_address: String,
1721 pub ipv4_address: Option<String>,
1723}
1724
1725#[derive(Debug, Clone)]
1729#[non_exhaustive]
1730pub struct QueryBuilder<'agent> {
1731 agent: &'agent Agent,
1732 pub effective_canister_id: Principal,
1734 pub canister_id: Principal,
1736 pub method_name: String,
1738 pub arg: Vec<u8>,
1740 pub ingress_expiry_datetime: Option<u64>,
1742 pub use_nonce: bool,
1744}
1745
1746impl<'agent> QueryBuilder<'agent> {
1747 pub fn new(agent: &'agent Agent, canister_id: Principal, method_name: String) -> Self {
1749 Self {
1750 agent,
1751 effective_canister_id: canister_id,
1752 canister_id,
1753 method_name,
1754 arg: vec![],
1755 ingress_expiry_datetime: None,
1756 use_nonce: false,
1757 }
1758 }
1759
1760 pub fn with_effective_canister_id(mut self, canister_id: Principal) -> Self {
1762 self.effective_canister_id = canister_id;
1763 self
1764 }
1765
1766 pub fn with_arg<A: Into<Vec<u8>>>(mut self, arg: A) -> Self {
1768 self.arg = arg.into();
1769 self
1770 }
1771
1772 pub fn expire_at(mut self, time: impl Into<OffsetDateTime>) -> Self {
1774 self.ingress_expiry_datetime = Some(time.into().unix_timestamp_nanos() as u64);
1775 self
1776 }
1777
1778 pub fn expire_after(mut self, duration: Duration) -> Self {
1780 self.ingress_expiry_datetime = Some(
1781 OffsetDateTime::now_utc()
1782 .saturating_add(duration.try_into().expect("negative duration"))
1783 .unix_timestamp_nanos() as u64,
1784 );
1785 self
1786 }
1787
1788 pub fn with_nonce_generation(mut self) -> Self {
1791 self.use_nonce = true;
1792 self
1793 }
1794
1795 pub async fn call(self) -> Result<Vec<u8>, AgentError> {
1797 self.agent
1798 .query_raw(
1799 self.canister_id,
1800 self.effective_canister_id,
1801 self.method_name,
1802 self.arg,
1803 self.ingress_expiry_datetime,
1804 self.use_nonce,
1805 None,
1806 )
1807 .await
1808 }
1809
1810 pub async fn call_with_verification(self) -> Result<Vec<u8>, AgentError> {
1815 self.agent
1816 .query_raw(
1817 self.canister_id,
1818 self.effective_canister_id,
1819 self.method_name,
1820 self.arg,
1821 self.ingress_expiry_datetime,
1822 self.use_nonce,
1823 Some(true),
1824 )
1825 .await
1826 }
1827
1828 pub async fn call_without_verification(self) -> Result<Vec<u8>, AgentError> {
1833 self.agent
1834 .query_raw(
1835 self.canister_id,
1836 self.effective_canister_id,
1837 self.method_name,
1838 self.arg,
1839 self.ingress_expiry_datetime,
1840 self.use_nonce,
1841 Some(false),
1842 )
1843 .await
1844 }
1845
1846 pub fn sign(self) -> Result<SignedQuery, AgentError> {
1849 let effective_canister_id = self.effective_canister_id;
1850 let identity = self.agent.identity.clone();
1851 let content = self.into_envelope()?;
1852 let signed_query = sign_envelope(&content, identity)?;
1853 let EnvelopeContent::Query {
1854 ingress_expiry,
1855 sender,
1856 canister_id,
1857 method_name,
1858 arg,
1859 nonce,
1860 } = content
1861 else {
1862 unreachable!()
1863 };
1864 Ok(SignedQuery {
1865 ingress_expiry,
1866 sender,
1867 canister_id,
1868 method_name,
1869 arg,
1870 effective_canister_id,
1871 signed_query,
1872 nonce,
1873 })
1874 }
1875
1876 pub fn into_envelope(self) -> Result<EnvelopeContent, AgentError> {
1878 self.agent.query_content(
1879 self.canister_id,
1880 self.method_name,
1881 self.arg,
1882 self.ingress_expiry_datetime,
1883 self.use_nonce,
1884 )
1885 }
1886}
1887
1888impl<'agent> IntoFuture for QueryBuilder<'agent> {
1889 type IntoFuture = AgentFuture<'agent, Vec<u8>>;
1890 type Output = Result<Vec<u8>, AgentError>;
1891 fn into_future(self) -> Self::IntoFuture {
1892 Box::pin(self.call())
1893 }
1894}
1895
1896pub struct UpdateCall<'agent> {
1898 agent: &'agent Agent,
1899 response_future: AgentFuture<'agent, CallResponse<(Vec<u8>, Certificate)>>,
1900 effective_canister_id: Principal,
1901 canister_id: Principal,
1902 method_name: String,
1903}
1904
1905impl fmt::Debug for UpdateCall<'_> {
1906 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1907 f.debug_struct("UpdateCall")
1908 .field("agent", &self.agent)
1909 .field("effective_canister_id", &self.effective_canister_id)
1910 .finish_non_exhaustive()
1911 }
1912}
1913
1914impl Future for UpdateCall<'_> {
1915 type Output = Result<CallResponse<(Vec<u8>, Certificate)>, AgentError>;
1916 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1917 self.response_future.as_mut().poll(cx)
1918 }
1919}
1920
1921impl<'a> UpdateCall<'a> {
1922 pub async fn and_wait(self) -> Result<(Vec<u8>, Certificate), AgentError> {
1924 let response = self.response_future.await?;
1925
1926 match response {
1927 CallResponse::Response(response) => Ok(response),
1928 CallResponse::Poll(request_id) => {
1929 self.agent
1930 .wait_inner(
1931 &request_id,
1932 self.effective_canister_id,
1933 Some(Operation::Call {
1934 canister: self.canister_id,
1935 method: self.method_name,
1936 }),
1937 )
1938 .await
1939 }
1940 }
1941 }
1942}
1943#[derive(Debug)]
1948pub struct UpdateBuilder<'agent> {
1949 agent: &'agent Agent,
1950 pub effective_canister_id: Principal,
1952 pub canister_id: Principal,
1954 pub method_name: String,
1956 pub arg: Vec<u8>,
1958 pub ingress_expiry_datetime: Option<u64>,
1960}
1961
1962impl<'agent> UpdateBuilder<'agent> {
1963 pub fn new(agent: &'agent Agent, canister_id: Principal, method_name: String) -> Self {
1965 Self {
1966 agent,
1967 effective_canister_id: canister_id,
1968 canister_id,
1969 method_name,
1970 arg: vec![],
1971 ingress_expiry_datetime: None,
1972 }
1973 }
1974
1975 pub fn with_effective_canister_id(mut self, canister_id: Principal) -> Self {
1977 self.effective_canister_id = canister_id;
1978 self
1979 }
1980
1981 pub fn with_arg<A: Into<Vec<u8>>>(mut self, arg: A) -> Self {
1983 self.arg = arg.into();
1984 self
1985 }
1986
1987 pub fn expire_at(mut self, time: impl Into<OffsetDateTime>) -> Self {
1989 self.ingress_expiry_datetime = Some(time.into().unix_timestamp_nanos() as u64);
1990 self
1991 }
1992
1993 pub fn expire_after(mut self, duration: Duration) -> Self {
1995 self.ingress_expiry_datetime = Some(
1996 OffsetDateTime::now_utc()
1997 .saturating_add(duration.try_into().expect("negative duration"))
1998 .unix_timestamp_nanos() as u64,
1999 );
2000 self
2001 }
2002
2003 pub async fn call_and_wait(self) -> Result<Vec<u8>, AgentError> {
2006 self.call().and_wait().await.map(|x| x.0)
2007 }
2008
2009 pub fn call(self) -> UpdateCall<'agent> {
2012 let method_name = self.method_name.clone();
2013 let response_future = async move {
2014 self.agent
2015 .update_raw(
2016 self.canister_id,
2017 self.effective_canister_id,
2018 self.method_name,
2019 self.arg,
2020 self.ingress_expiry_datetime,
2021 )
2022 .await
2023 };
2024 UpdateCall {
2025 agent: self.agent,
2026 response_future: Box::pin(response_future),
2027 effective_canister_id: self.effective_canister_id,
2028 canister_id: self.canister_id,
2029 method_name,
2030 }
2031 }
2032
2033 pub fn sign(self) -> Result<SignedUpdate, AgentError> {
2036 let identity = self.agent.identity.clone();
2037 let effective_canister_id = self.effective_canister_id;
2038 let content = self.into_envelope()?;
2039 let signed_update = sign_envelope(&content, identity)?;
2040 let request_id = to_request_id(&content)?;
2041 let EnvelopeContent::Call {
2042 nonce,
2043 ingress_expiry,
2044 sender,
2045 canister_id,
2046 method_name,
2047 arg,
2048 } = content
2049 else {
2050 unreachable!()
2051 };
2052 Ok(SignedUpdate {
2053 nonce,
2054 ingress_expiry,
2055 sender,
2056 canister_id,
2057 method_name,
2058 arg,
2059 effective_canister_id,
2060 signed_update,
2061 request_id,
2062 })
2063 }
2064
2065 pub fn into_envelope(self) -> Result<EnvelopeContent, AgentError> {
2067 let nonce = self.agent.nonce_factory.generate();
2068 self.agent.update_content(
2069 self.canister_id,
2070 self.method_name,
2071 self.arg,
2072 self.ingress_expiry_datetime,
2073 nonce,
2074 )
2075 }
2076}
2077
2078impl<'agent> IntoFuture for UpdateBuilder<'agent> {
2079 type IntoFuture = AgentFuture<'agent, Vec<u8>>;
2080 type Output = Result<Vec<u8>, AgentError>;
2081 fn into_future(self) -> Self::IntoFuture {
2082 Box::pin(self.call_and_wait())
2083 }
2084}
2085
2086#[cfg_attr(target_family = "wasm", async_trait(?Send))]
2088#[cfg_attr(not(target_family = "wasm"), async_trait)]
2089pub trait HttpService: Send + Sync + Debug {
2090 async fn call<'a>(
2092 &'a self,
2093 req: &'a (dyn Fn() -> Result<http::Request<Bytes>, AgentError> + Send + Sync),
2094 max_retries: usize,
2095 size_limit: Option<usize>,
2096 ) -> Result<http::Response<Bytes>, AgentError>;
2097}
2098
2099fn from_http_request(req: http::Request<Bytes>) -> Result<Request, AgentError> {
2101 let (parts, body) = req.into_parts();
2102 let body = reqwest::Body::from(body);
2103 let request = http::Request::from_parts(parts, body)
2106 .try_into()
2107 .map_err(|e: reqwest::Error| AgentError::InvalidReplicaUrl(e.to_string()))?;
2108
2109 Ok(request)
2110}
2111
2112#[cfg(not(target_family = "wasm"))]
2114async fn to_http_response(
2115 resp: Response,
2116 size_limit: Option<usize>,
2117) -> Result<http::Response<Bytes>, AgentError> {
2118 use http_body_util::{BodyExt, Limited};
2119
2120 let resp: http::Response<reqwest::Body> = resp.into();
2121 let (parts, body) = resp.into_parts();
2122 let body = Limited::new(body, size_limit.unwrap_or(usize::MAX));
2123 let body = body
2124 .collect()
2125 .await
2126 .map_err(|e| {
2127 AgentError::TransportError(TransportError::Generic(format!(
2128 "unable to read response body: {e:#}"
2129 )))
2130 })?
2131 .to_bytes();
2132 let resp = http::Response::from_parts(parts, body);
2133
2134 Ok(resp)
2135}
2136
2137#[cfg(target_family = "wasm")]
2141async fn to_http_response(
2142 resp: Response,
2143 size_limit: Option<usize>,
2144) -> Result<http::Response<Bytes>, AgentError> {
2145 use futures_util::StreamExt;
2146 use http_body::Frame;
2147 use http_body_util::{Limited, StreamBody};
2148
2149 let status = resp.status();
2151 let headers = resp.headers().clone();
2152
2153 let stream = resp.bytes_stream().map(|x| x.map(Frame::data));
2155 let body = StreamBody::new(stream);
2156 let body = Limited::new(body, size_limit.unwrap_or(usize::MAX));
2157 let body = http_body_util::BodyExt::collect(body)
2158 .await
2159 .map_err(|e| {
2160 AgentError::TransportError(TransportError::Generic(format!(
2161 "unable to read response body: {e:#}"
2162 )))
2163 })?
2164 .to_bytes();
2165
2166 let mut resp = http::Response::new(body);
2167 *resp.status_mut() = status;
2168 *resp.headers_mut() = headers;
2169
2170 Ok(resp)
2171}
2172
2173#[cfg(not(target_family = "wasm"))]
2174#[async_trait]
2175impl<T> HttpService for T
2176where
2177 for<'a> &'a T: Service<Request, Response = Response, Error = reqwest::Error>,
2178 for<'a> <&'a Self as Service<Request>>::Future: Send,
2179 T: Send + Sync + Debug + ?Sized,
2180{
2181 #[allow(clippy::needless_arbitrary_self_type)]
2182 async fn call<'a>(
2183 mut self: &'a Self,
2184 req: &'a (dyn Fn() -> Result<http::Request<Bytes>, AgentError> + Send + Sync),
2185 max_retries: usize,
2186 size_limit: Option<usize>,
2187 ) -> Result<http::Response<Bytes>, AgentError> {
2188 let mut retry_count = 0;
2189 loop {
2190 let request = from_http_request(req()?)?;
2191
2192 match Service::call(&mut self, request).await {
2193 Err(err) => {
2194 if err.is_connect() {
2196 if retry_count >= max_retries {
2197 return Err(AgentError::TransportError(TransportError::Reqwest(err)));
2198 }
2199 retry_count += 1;
2200 }
2201 else {
2203 return Err(AgentError::TransportError(TransportError::Reqwest(err)));
2204 }
2205 }
2206
2207 Ok(resp) => {
2208 let resp = to_http_response(resp, size_limit).await?;
2209 return Ok(resp);
2210 }
2211 }
2212 }
2213 }
2214}
2215
2216#[cfg(target_family = "wasm")]
2217#[async_trait(?Send)]
2218impl<T> HttpService for T
2219where
2220 for<'a> &'a T: Service<Request, Response = Response, Error = reqwest::Error>,
2221 T: Send + Sync + Debug + ?Sized,
2222{
2223 #[allow(clippy::needless_arbitrary_self_type)]
2224 async fn call<'a>(
2225 mut self: &'a Self,
2226 req: &'a (dyn Fn() -> Result<http::Request<Bytes>, AgentError> + Send + Sync),
2227 _retries: usize,
2228 _size_limit: Option<usize>,
2229 ) -> Result<http::Response<Bytes>, AgentError> {
2230 let request = from_http_request(req()?)?;
2231 let response = Service::call(&mut self, request)
2232 .await
2233 .map_err(|e| AgentError::TransportError(TransportError::Reqwest(e)))?;
2234
2235 to_http_response(response, _size_limit).await
2236 }
2237}
2238
2239#[derive(Debug)]
2240struct Retry429Logic {
2241 client: Client,
2242}
2243
2244#[cfg_attr(target_family = "wasm", async_trait(?Send))]
2245#[cfg_attr(not(target_family = "wasm"), async_trait)]
2246impl HttpService for Retry429Logic {
2247 async fn call<'a>(
2248 &'a self,
2249 req: &'a (dyn Fn() -> Result<http::Request<Bytes>, AgentError> + Send + Sync),
2250 _max_tcp_retries: usize,
2251 _size_limit: Option<usize>,
2252 ) -> Result<http::Response<Bytes>, AgentError> {
2253 let mut retries = 0;
2254 loop {
2255 #[cfg(not(target_family = "wasm"))]
2256 let resp = self.client.call(req, _max_tcp_retries, _size_limit).await?;
2257 #[cfg(target_family = "wasm")]
2259 let resp = {
2260 let request = from_http_request(req()?)?;
2261 let resp = self
2262 .client
2263 .execute(request)
2264 .await
2265 .map_err(|e| AgentError::TransportError(TransportError::Reqwest(e)))?;
2266
2267 to_http_response(resp, _size_limit).await?
2268 };
2269
2270 if resp.status() == StatusCode::TOO_MANY_REQUESTS {
2271 if retries == 6 {
2272 break Ok(resp);
2273 } else {
2274 retries += 1;
2275 crate::util::sleep(Duration::from_millis(250)).await;
2276 continue;
2277 }
2278 } else {
2279 break Ok(resp);
2280 }
2281 }
2282 }
2283}
2284
2285#[cfg(all(test, not(target_family = "wasm")))]
2286mod offline_tests {
2287 use super::*;
2288 use tokio::net::TcpListener;
2289 #[test]
2292 fn rounded_expiry() {
2293 let agent = Agent::builder()
2294 .with_url("http://not-a-real-url")
2295 .build()
2296 .unwrap();
2297 let mut prev_expiry = None;
2298 let mut num_timestamps = 0;
2299 for _ in 0..6 {
2300 let update = agent
2301 .update(&Principal::management_canister(), "not_a_method")
2302 .sign()
2303 .unwrap();
2304 if prev_expiry < Some(update.ingress_expiry) {
2305 prev_expiry = Some(update.ingress_expiry);
2306 num_timestamps += 1;
2307 }
2308 }
2309 assert!(num_timestamps <= 2, "num_timestamps:{num_timestamps} > 2");
2311 }
2312
2313 #[tokio::test]
2314 async fn client_ratelimit() {
2315 let mock_server = TcpListener::bind("127.0.0.1:0").await.unwrap();
2316 let count = Arc::new(Mutex::new(0));
2317 let port = mock_server.local_addr().unwrap().port();
2318 tokio::spawn({
2319 let count = count.clone();
2320 async move {
2321 loop {
2322 let (mut conn, _) = mock_server.accept().await.unwrap();
2323 *count.lock().unwrap() += 1;
2324 tokio::spawn(
2325 async move { tokio::io::copy(&mut conn, &mut tokio::io::sink()).await },
2327 );
2328 }
2329 }
2330 });
2331 let agent = Agent::builder()
2332 .with_http_client(Client::builder().http1_only().build().unwrap())
2333 .with_url(format!("http://127.0.0.1:{port}"))
2334 .with_max_concurrent_requests(2)
2335 .build()
2336 .unwrap();
2337 for _ in 0..3 {
2338 let agent = agent.clone();
2339 tokio::spawn(async move {
2340 agent
2341 .query(&"ryjl3-tyaaa-aaaaa-aaaba-cai".parse().unwrap(), "greet")
2342 .call()
2343 .await
2344 });
2345 }
2346 crate::util::sleep(Duration::from_millis(250)).await;
2347 assert_eq!(*count.lock().unwrap(), 2);
2348 }
2349}