1pub(crate) mod agent_config;
3pub mod agent_error;
4pub(crate) mod builder;
5#[doc(hidden)]
7#[deprecated(since = "0.38.0", note = "use the AgentBuilder methods")]
8pub mod http_transport;
9pub(crate) mod nonce;
10pub(crate) mod response_authentication;
11pub mod route_provider;
12pub mod status;
13pub mod subnet;
14
15pub use agent_config::AgentConfig;
16pub use agent_error::AgentError;
17use agent_error::{HttpErrorPayload, Operation};
18use async_lock::Semaphore;
19use async_trait::async_trait;
20pub use builder::AgentBuilder;
21use bytes::Bytes;
22use cached::{Cached, TimedCache};
23use http::{header::CONTENT_TYPE, HeaderMap, Method, StatusCode, Uri};
24use ic_ed25519::{PublicKey, SignatureError};
25#[doc(inline)]
26pub use ic_transport_types::{
27 signed, CallResponse, Envelope, EnvelopeContent, RejectCode, RejectResponse, ReplyResponse,
28 RequestStatusResponse,
29};
30pub use nonce::{NonceFactory, NonceGenerator};
31use rangemap::RangeInclusiveMap;
32use reqwest::{Client, Request, Response};
33use route_provider::{
34 dynamic_routing::{dynamic_route_provider::DynamicRouteProviderBuilder, node::Node},
35 RouteProvider, UrlUntilReady,
36};
37pub use subnet::{Subnet, SubnetType};
38use time::OffsetDateTime;
39use tower_service::Service;
40
41#[cfg(test)]
42mod agent_test;
43
44use crate::{
45 agent::response_authentication::{
46 extract_der, lookup_canister_info, lookup_canister_metadata, lookup_canister_ranges,
47 lookup_incomplete_subnet, lookup_request_status, lookup_subnet_and_ranges,
48 lookup_subnet_canister_ranges, lookup_subnet_metrics, lookup_time, lookup_tree,
49 lookup_value,
50 },
51 agent_error::TransportError,
52 export::Principal,
53 identity::Identity,
54 to_request_id, RequestId,
55};
56use backoff::{backoff::Backoff, ExponentialBackoffBuilder};
57use backoff::{exponential::ExponentialBackoff, SystemClock};
58use ic_certification::{Certificate, Delegation, Label};
59use ic_transport_types::{
60 signed::{SignedQuery, SignedRequestStatus, SignedUpdate},
61 QueryResponse, ReadStateResponse, SubnetMetrics, TransportCallResponse,
62};
63use serde::Serialize;
64use status::Status;
65use std::{
66 borrow::Cow,
67 convert::TryFrom,
68 fmt::{self, Debug},
69 future::{Future, IntoFuture},
70 pin::Pin,
71 str::FromStr,
72 sync::{Arc, Mutex, RwLock},
73 task::{Context, Poll},
74 time::Duration,
75};
76
77use crate::agent::response_authentication::lookup_api_boundary_nodes;
78
79#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
93pub enum EffectiveId {
94 Canister(Principal),
96 Subnet(Principal),
98}
99
100impl EffectiveId {
101 pub fn as_principal(&self) -> Principal {
103 match self {
104 EffectiveId::Canister(p) | EffectiveId::Subnet(p) => *p,
105 }
106 }
107}
108
109impl From<Principal> for EffectiveId {
110 fn from(p: Principal) -> Self {
111 EffectiveId::Canister(p)
112 }
113}
114
115pub(crate) const IC_STATE_ROOT_DOMAIN_SEPARATOR: &[u8; 14] = b"\x0Dic-state-root";
116
117pub(crate) const IC_ROOT_KEY: &[u8; 133] = b"\x30\x81\x82\x30\x1d\x06\x0d\x2b\x06\x01\x04\x01\x82\xdc\x7c\x05\x03\x01\x02\x01\x06\x0c\x2b\x06\x01\x04\x01\x82\xdc\x7c\x05\x03\x02\x01\x03\x61\x00\x81\x4c\x0e\x6e\xc7\x1f\xab\x58\x3b\x08\xbd\x81\x37\x3c\x25\x5c\x3c\x37\x1b\x2e\x84\x86\x3c\x98\xa4\xf1\xe0\x8b\x74\x23\x5d\x14\xfb\x5d\x9c\x0c\xd5\x46\xd9\x68\x5f\x91\x3a\x0c\x0b\x2c\xc5\x34\x15\x83\xbf\x4b\x43\x92\xe4\x67\xdb\x96\xd6\x5b\x9b\xb4\xcb\x71\x71\x12\xf8\x47\x2e\x0d\x5a\x4d\x14\x50\x5f\xfd\x74\x84\xb0\x12\x91\x09\x1c\x5f\x87\xb9\x88\x83\x46\x3f\x98\x09\x1a\x0b\xaa\xae";
118
119#[cfg(not(target_family = "wasm"))]
120type AgentFuture<'a, V> = Pin<Box<dyn Future<Output = Result<V, AgentError>> + Send + 'a>>;
121
122#[cfg(target_family = "wasm")]
123type AgentFuture<'a, V> = Pin<Box<dyn Future<Output = Result<V, AgentError>> + 'a>>;
124
125#[cfg_attr(unix, doc = " ```rust")] #[cfg_attr(not(unix), doc = " ```ignore")]
129#[derive(Clone)]
190pub struct Agent {
191 nonce_factory: Arc<dyn NonceGenerator>,
192 identity: Arc<dyn Identity>,
193 ingress_expiry: Duration,
194 root_key: Arc<RwLock<Vec<u8>>>,
195 client: Arc<dyn HttpService>,
196 route_provider: Arc<dyn RouteProvider>,
197 subnet_key_cache: Arc<Mutex<SubnetCache>>,
198 concurrent_requests_semaphore: Arc<Semaphore>,
199 verify_query_signatures: bool,
200 max_response_body_size: Option<usize>,
201 max_polling_time: Duration,
202 #[allow(dead_code)]
203 max_tcp_error_retries: usize,
204}
205
206impl fmt::Debug for Agent {
207 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
208 f.debug_struct("Agent")
209 .field("ingress_expiry", &self.ingress_expiry)
210 .finish_non_exhaustive()
211 }
212}
213
214impl Agent {
215 pub fn builder() -> builder::AgentBuilder {
218 Default::default()
219 }
220
221 pub fn new(config: agent_config::AgentConfig) -> Result<Agent, AgentError> {
223 let client = config.http_service.unwrap_or_else(|| {
224 Arc::new(Retry429Logic {
225 client: config.client.unwrap_or_else(|| {
226 #[cfg(not(target_family = "wasm"))]
227 {
228 Client::builder()
229 .use_rustls_tls()
230 .timeout(Duration::from_secs(360))
231 .build()
232 .expect("Could not create HTTP client.")
233 }
234 #[cfg(all(target_family = "wasm", feature = "wasm-bindgen"))]
235 {
236 Client::new()
237 }
238 }),
239 })
240 });
241 Ok(Agent {
242 nonce_factory: config.nonce_factory,
243 identity: config.identity,
244 ingress_expiry: config.ingress_expiry,
245 root_key: Arc::new(RwLock::new(IC_ROOT_KEY.to_vec())),
246 client: client.clone(),
247 route_provider: if let Some(route_provider) = config.route_provider {
248 route_provider
249 } else if let Some(url) = config.url {
250 if config.background_dynamic_routing {
251 assert!(
252 url.scheme() == "https" && url.path() == "/" && url.port().is_none() && url.domain().is_some(),
253 "in dynamic routing mode, URL must be in the exact form https://domain with no path, port, IP, or non-HTTPS scheme"
254 );
255 let seeds = vec![Node::new(url.domain().unwrap()).unwrap()];
256 UrlUntilReady::new(url, async move {
257 let provider =
258 DynamicRouteProviderBuilder::new(seeds, client, None).build();
259 provider.start().await;
260 provider
261 }) as Arc<dyn RouteProvider>
262 } else {
263 Arc::new(url)
264 }
265 } else {
266 panic!("either route_provider or url must be specified");
267 },
268 subnet_key_cache: Arc::new(Mutex::new(SubnetCache::new())),
269 verify_query_signatures: config.verify_query_signatures,
270 concurrent_requests_semaphore: Arc::new(Semaphore::new(config.max_concurrent_requests)),
271 max_response_body_size: config.max_response_body_size,
272 max_tcp_error_retries: config.max_tcp_error_retries,
273 max_polling_time: config.max_polling_time,
274 })
275 }
276
277 pub fn set_identity<I>(&mut self, identity: I)
283 where
284 I: 'static + Identity,
285 {
286 self.identity = Arc::new(identity);
287 }
288 pub fn set_arc_identity(&mut self, identity: Arc<dyn Identity>) {
294 self.identity = identity;
295 }
296
297 pub async fn fetch_root_key(&self) -> Result<(), AgentError> {
306 if self.read_root_key()[..] != IC_ROOT_KEY[..] {
307 return Ok(());
309 }
310 let status = self.status().await?;
311 let Some(root_key) = status.root_key else {
312 return Err(AgentError::NoRootKeyInStatus(status));
313 };
314 self.set_root_key(root_key);
315 Ok(())
316 }
317
318 pub fn set_root_key(&self, root_key: Vec<u8>) {
323 *self.root_key.write().unwrap() = root_key;
324 }
325
326 pub fn read_root_key(&self) -> Vec<u8> {
328 self.root_key.read().unwrap().clone()
329 }
330
331 fn get_expiry_date(&self) -> u64 {
332 let expiry_raw = OffsetDateTime::now_utc() + self.ingress_expiry;
333 let mut rounded = expiry_raw.replace_nanosecond(0).unwrap();
334 if self.ingress_expiry.as_secs() > 90 {
335 rounded = rounded.replace_second(0).unwrap();
336 }
337 rounded.unix_timestamp_nanos().try_into().unwrap()
338 }
339
340 pub fn get_principal(&self) -> Result<Principal, String> {
342 self.identity.sender()
343 }
344
345 async fn query_endpoint<A>(
346 &self,
347 effective_canister_id: Principal,
348 serialized_bytes: Vec<u8>,
349 ) -> Result<A, AgentError>
350 where
351 A: serde::de::DeserializeOwned,
352 {
353 let _permit = self.concurrent_requests_semaphore.acquire().await;
354 let bytes = self
355 .execute(
356 Method::POST,
357 &format!("api/v3/canister/{}/query", effective_canister_id.to_text()),
358 Some(serialized_bytes),
359 )
360 .await?
361 .1;
362 serde_cbor::from_slice(&bytes).map_err(AgentError::InvalidCborData)
363 }
364
365 async fn read_state_endpoint<A>(
366 &self,
367 effective_canister_id: Principal,
368 serialized_bytes: Vec<u8>,
369 ) -> Result<A, AgentError>
370 where
371 A: serde::de::DeserializeOwned,
372 {
373 let _permit = self.concurrent_requests_semaphore.acquire().await;
374 let endpoint = format!(
375 "api/v3/canister/{}/read_state",
376 effective_canister_id.to_text()
377 );
378 let bytes = self
379 .execute(Method::POST, &endpoint, Some(serialized_bytes))
380 .await?
381 .1;
382 serde_cbor::from_slice(&bytes).map_err(AgentError::InvalidCborData)
383 }
384
385 async fn read_subnet_state_endpoint<A>(
386 &self,
387 subnet_id: Principal,
388 serialized_bytes: Vec<u8>,
389 ) -> Result<A, AgentError>
390 where
391 A: serde::de::DeserializeOwned,
392 {
393 let _permit = self.concurrent_requests_semaphore.acquire().await;
394 let endpoint = format!("api/v3/subnet/{}/read_state", subnet_id.to_text());
395 let bytes = self
396 .execute(Method::POST, &endpoint, Some(serialized_bytes))
397 .await?
398 .1;
399 serde_cbor::from_slice(&bytes).map_err(AgentError::InvalidCborData)
400 }
401
402 async fn query_subnet_endpoint<A>(
403 &self,
404 subnet_id: Principal,
405 serialized_bytes: Vec<u8>,
406 ) -> Result<A, AgentError>
407 where
408 A: serde::de::DeserializeOwned,
409 {
410 let _permit = self.concurrent_requests_semaphore.acquire().await;
411 let endpoint = format!("api/v3/subnet/{}/query", subnet_id.to_text());
412 let bytes = self
413 .execute(Method::POST, &endpoint, Some(serialized_bytes))
414 .await?
415 .1;
416 serde_cbor::from_slice(&bytes).map_err(AgentError::InvalidCborData)
417 }
418
419 async fn query_endpoint_for<A>(
420 &self,
421 effective_id: EffectiveId,
422 serialized_bytes: Vec<u8>,
423 ) -> Result<A, AgentError>
424 where
425 A: serde::de::DeserializeOwned,
426 {
427 match effective_id {
428 EffectiveId::Canister(p) => self.query_endpoint(p, serialized_bytes).await,
429 EffectiveId::Subnet(p) => self.query_subnet_endpoint(p, serialized_bytes).await,
430 }
431 }
432
433 async fn read_state_endpoint_for<A>(
434 &self,
435 effective_id: EffectiveId,
436 serialized_bytes: Vec<u8>,
437 ) -> Result<A, AgentError>
438 where
439 A: serde::de::DeserializeOwned,
440 {
441 match effective_id {
442 EffectiveId::Canister(p) => self.read_state_endpoint(p, serialized_bytes).await,
443 EffectiveId::Subnet(p) => self.read_subnet_state_endpoint(p, serialized_bytes).await,
444 }
445 }
446
447 async fn call_endpoint_for(
448 &self,
449 effective_id: EffectiveId,
450 serialized_bytes: Vec<u8>,
451 ) -> Result<TransportCallResponse, AgentError> {
452 match effective_id {
453 EffectiveId::Canister(p) => self.call_endpoint(p, serialized_bytes).await,
454 EffectiveId::Subnet(p) => self.call_subnet_endpoint(p, serialized_bytes).await,
455 }
456 }
457
458 async fn get_subnet_for(&self, effective_id: EffectiveId) -> Result<Arc<Subnet>, AgentError> {
459 match effective_id {
460 EffectiveId::Canister(p) => self.get_subnet_by_canister(&p).await,
461 EffectiveId::Subnet(p) => self.get_subnet_by_id(&p).await,
462 }
463 }
464
465 async fn fetch_subnet_for(&self, effective_id: EffectiveId) -> Result<Arc<Subnet>, AgentError> {
466 match effective_id {
467 EffectiveId::Canister(p) => self.fetch_subnet_by_canister(&p).await,
468 EffectiveId::Subnet(p) => self.fetch_subnet_by_id(&p).await,
469 }
470 }
471
472 async fn call_endpoint(
473 &self,
474 effective_canister_id: Principal,
475 serialized_bytes: Vec<u8>,
476 ) -> Result<TransportCallResponse, AgentError> {
477 let _permit = self.concurrent_requests_semaphore.acquire().await;
478 let endpoint = format!("api/v4/canister/{}/call", effective_canister_id.to_text());
479 let (status_code, response_body) = self
480 .execute(Method::POST, &endpoint, Some(serialized_bytes))
481 .await?;
482
483 if status_code == StatusCode::ACCEPTED {
484 return Ok(TransportCallResponse::Accepted);
485 }
486
487 serde_cbor::from_slice(&response_body).map_err(AgentError::InvalidCborData)
488 }
489
490 async fn call_subnet_endpoint(
491 &self,
492 subnet_id: Principal,
493 serialized_bytes: Vec<u8>,
494 ) -> Result<TransportCallResponse, AgentError> {
495 let _permit = self.concurrent_requests_semaphore.acquire().await;
496 let endpoint = format!("api/v4/subnet/{}/call", subnet_id.to_text());
497 let (status_code, response_body) = self
498 .execute(Method::POST, &endpoint, Some(serialized_bytes))
499 .await?;
500
501 if status_code == StatusCode::ACCEPTED {
502 return Ok(TransportCallResponse::Accepted);
503 }
504
505 serde_cbor::from_slice(&response_body).map_err(AgentError::InvalidCborData)
506 }
507
508 #[allow(clippy::too_many_arguments)]
511 async fn query_raw(
512 &self,
513 canister_id: Principal,
514 effective_canister_id: Principal,
515 method_name: String,
516 arg: Vec<u8>,
517 ingress_expiry_datetime: Option<u64>,
518 use_nonce: bool,
519 explicit_verify_query_signatures: Option<bool>,
520 ) -> Result<Vec<u8>, AgentError> {
521 let operation = Operation::Call {
522 canister: canister_id,
523 method: method_name.clone(),
524 };
525 let content = self.query_content(
526 canister_id,
527 method_name,
528 arg,
529 ingress_expiry_datetime,
530 use_nonce,
531 )?;
532 let serialized_bytes = sign_envelope(&content, self.identity.clone())?;
533 self.query_inner(
534 EffectiveId::Canister(effective_canister_id),
535 serialized_bytes,
536 content.to_request_id(),
537 explicit_verify_query_signatures,
538 operation,
539 )
540 .await
541 }
542
543 pub async fn query_signed(
553 &self,
554 effective_id: impl Into<EffectiveId>,
555 signed_query: Vec<u8>,
556 ) -> Result<Vec<u8>, AgentError> {
557 let envelope: Envelope =
558 serde_cbor::from_slice(&signed_query).map_err(AgentError::InvalidCborData)?;
559 let EnvelopeContent::Query {
560 canister_id,
561 method_name,
562 ..
563 } = &*envelope.content
564 else {
565 return Err(AgentError::CallDataMismatch {
566 field: "request_type".to_string(),
567 value_arg: "query".to_string(),
568 value_cbor: if matches!(*envelope.content, EnvelopeContent::Call { .. }) {
569 "update"
570 } else {
571 "read_state"
572 }
573 .to_string(),
574 });
575 };
576 let operation = Operation::Call {
577 canister: *canister_id,
578 method: method_name.clone(),
579 };
580 self.query_inner(
581 effective_id.into(),
582 signed_query,
583 envelope.content.to_request_id(),
584 None,
585 operation,
586 )
587 .await
588 }
589
590 async fn query_inner(
594 &self,
595 effective_id: EffectiveId,
596 signed_query: Vec<u8>,
597 request_id: RequestId,
598 explicit_verify_query_signatures: Option<bool>,
599 operation: Operation,
600 ) -> Result<Vec<u8>, AgentError> {
601 let response = if explicit_verify_query_signatures.unwrap_or(self.verify_query_signatures) {
602 let (response, mut subnet) = futures_util::try_join!(
603 self.query_endpoint_for::<QueryResponse>(effective_id, signed_query),
604 self.get_subnet_for(effective_id)
605 )?;
606 if response.signatures().is_empty() {
607 return Err(AgentError::MissingSignature);
608 } else if response.signatures().len() > subnet.node_keys.len() {
609 return Err(AgentError::TooManySignatures {
610 had: response.signatures().len(),
611 needed: subnet.node_keys.len(),
612 });
613 }
614 for signature in response.signatures() {
615 if OffsetDateTime::now_utc()
616 - OffsetDateTime::from_unix_timestamp_nanos(signature.timestamp.into()).unwrap()
617 > self.ingress_expiry
618 {
619 return Err(AgentError::CertificateOutdated(self.ingress_expiry));
620 }
621 let signable = response.signable(request_id, signature.timestamp);
622 let node_key = if let Some(node_key) = subnet.node_keys.get(&signature.identity) {
623 node_key
624 } else {
625 subnet = self.fetch_subnet_for(effective_id).await?;
626 subnet
627 .node_keys
628 .get(&signature.identity)
629 .ok_or(AgentError::CertificateNotAuthorized())?
630 };
631 if node_key.len() != 44 {
632 return Err(AgentError::DerKeyLengthMismatch {
633 expected: 44,
634 actual: node_key.len(),
635 });
636 }
637 const DER_PREFIX: [u8; 12] = [48, 42, 48, 5, 6, 3, 43, 101, 112, 3, 33, 0];
638 if node_key[..12] != DER_PREFIX {
639 return Err(AgentError::DerPrefixMismatch {
640 expected: DER_PREFIX.to_vec(),
641 actual: node_key[..12].to_vec(),
642 });
643 }
644 let pubkey = PublicKey::deserialize_raw(&node_key[12..])
645 .map_err(|_| AgentError::MalformedPublicKey)?;
646
647 match pubkey.verify_signature(&signable, &signature.signature[..]) {
648 Ok(()) => (),
649 Err(SignatureError::InvalidSignature) => {
650 return Err(AgentError::QuerySignatureVerificationFailed)
651 }
652 Err(SignatureError::InvalidLength) => {
653 return Err(AgentError::MalformedSignature)
654 }
655 _ => unreachable!(),
656 }
657 }
658 response
659 } else {
660 self.query_endpoint_for::<QueryResponse>(effective_id, signed_query)
661 .await?
662 };
663
664 match response {
665 QueryResponse::Replied { reply, .. } => Ok(reply.arg),
666 QueryResponse::Rejected { reject, .. } => Err(AgentError::UncertifiedReject {
667 reject,
668 operation: Some(operation),
669 }),
670 }
671 }
672
673 fn query_content(
674 &self,
675 canister_id: Principal,
676 method_name: String,
677 arg: Vec<u8>,
678 ingress_expiry_datetime: Option<u64>,
679 use_nonce: bool,
680 ) -> Result<EnvelopeContent, AgentError> {
681 Ok(EnvelopeContent::Query {
682 sender: self.identity.sender().map_err(AgentError::SigningError)?,
683 canister_id,
684 method_name,
685 arg,
686 ingress_expiry: ingress_expiry_datetime.unwrap_or_else(|| self.get_expiry_date()),
687 nonce: use_nonce.then(|| self.nonce_factory.generate()).flatten(),
688 sender_info: self.identity.sender_info(),
689 })
690 }
691
692 async fn update_raw(
694 &self,
695 canister_id: Principal,
696 effective_canister_id: Principal,
697 method_name: String,
698 arg: Vec<u8>,
699 ingress_expiry_datetime: Option<u64>,
700 ) -> Result<CallResponse<(Vec<u8>, Certificate)>, AgentError> {
701 let nonce = self.nonce_factory.generate();
702 let content = self.update_content(
703 canister_id,
704 method_name.clone(),
705 arg,
706 ingress_expiry_datetime,
707 nonce,
708 )?;
709 let operation = Some(Operation::Call {
710 canister: canister_id,
711 method: method_name,
712 });
713 let request_id = to_request_id(&content)?;
714 let serialized_bytes = sign_envelope(&content, self.identity.clone())?;
715
716 let response_body = self
717 .call_endpoint(effective_canister_id, serialized_bytes)
718 .await?;
719
720 match response_body {
721 TransportCallResponse::Replied { certificate } => {
722 let certificate =
723 serde_cbor::from_slice(&certificate).map_err(AgentError::InvalidCborData)?;
724
725 self.verify(&certificate, effective_canister_id)?;
726 let status = lookup_request_status(&certificate, &request_id)?;
727
728 match status {
729 RequestStatusResponse::Replied(reply) => {
730 Ok(CallResponse::Response((reply.arg, certificate)))
731 }
732 RequestStatusResponse::Rejected(reject_response) => {
733 Err(AgentError::CertifiedReject {
734 reject: reject_response,
735 operation,
736 })?
737 }
738 _ => Ok(CallResponse::Poll(request_id)),
739 }
740 }
741 TransportCallResponse::Accepted => Ok(CallResponse::Poll(request_id)),
742 TransportCallResponse::NonReplicatedRejection(reject_response) => {
743 Err(AgentError::UncertifiedReject {
744 reject: reject_response,
745 operation,
746 })
747 }
748 }
749 }
750
751 pub async fn update_signed(
761 &self,
762 effective_id: impl Into<EffectiveId>,
763 signed_update: Vec<u8>,
764 ) -> Result<CallResponse<Vec<u8>>, AgentError> {
765 let effective_id = effective_id.into();
766 let envelope: Envelope =
767 serde_cbor::from_slice(&signed_update).map_err(AgentError::InvalidCborData)?;
768 let EnvelopeContent::Call {
769 canister_id,
770 method_name,
771 ..
772 } = &*envelope.content
773 else {
774 return Err(AgentError::CallDataMismatch {
775 field: "request_type".to_string(),
776 value_arg: "update".to_string(),
777 value_cbor: if matches!(*envelope.content, EnvelopeContent::Query { .. }) {
778 "query"
779 } else {
780 "read_state"
781 }
782 .to_string(),
783 });
784 };
785 let operation = Some(Operation::Call {
786 canister: *canister_id,
787 method: method_name.clone(),
788 });
789 let request_id = to_request_id(&envelope.content)?;
790
791 let response_body = self.call_endpoint_for(effective_id, signed_update).await?;
792
793 match response_body {
794 TransportCallResponse::Replied { certificate } => {
795 let certificate =
796 serde_cbor::from_slice(&certificate).map_err(AgentError::InvalidCborData)?;
797
798 self.verify(&certificate, effective_id)?;
799 let status = lookup_request_status(&certificate, &request_id)?;
800
801 match status {
802 RequestStatusResponse::Replied(reply) => Ok(CallResponse::Response(reply.arg)),
803 RequestStatusResponse::Rejected(reject_response) => {
804 Err(AgentError::CertifiedReject {
805 reject: reject_response,
806 operation,
807 })?
808 }
809 _ => Ok(CallResponse::Poll(request_id)),
810 }
811 }
812 TransportCallResponse::Accepted => Ok(CallResponse::Poll(request_id)),
813 TransportCallResponse::NonReplicatedRejection(reject_response) => {
814 Err(AgentError::UncertifiedReject {
815 reject: reject_response,
816 operation,
817 })
818 }
819 }
820 }
821
822 fn update_content(
823 &self,
824 canister_id: Principal,
825 method_name: String,
826 arg: Vec<u8>,
827 ingress_expiry_datetime: Option<u64>,
828 nonce: Option<Vec<u8>>,
829 ) -> Result<EnvelopeContent, AgentError> {
830 Ok(EnvelopeContent::Call {
831 canister_id,
832 method_name,
833 arg,
834 nonce,
835 sender: self.identity.sender().map_err(AgentError::SigningError)?,
836 ingress_expiry: ingress_expiry_datetime.unwrap_or_else(|| self.get_expiry_date()),
837 sender_info: self.identity.sender_info(),
838 })
839 }
840
841 fn get_retry_policy(&self) -> ExponentialBackoff<SystemClock> {
842 ExponentialBackoffBuilder::new()
843 .with_initial_interval(Duration::from_millis(500))
844 .with_max_interval(Duration::from_secs(1))
845 .with_multiplier(1.4)
846 .with_max_elapsed_time(Some(self.max_polling_time))
847 .build()
848 }
849
850 pub async fn wait_signed(
856 &self,
857 request_id: &RequestId,
858 effective_id: impl Into<EffectiveId>,
859 signed_request_status: Vec<u8>,
860 ) -> Result<(Vec<u8>, Certificate), AgentError> {
861 let effective_id = effective_id.into();
862 let mut retry_policy = self.get_retry_policy();
863
864 let mut request_accepted = false;
865 loop {
866 let (resp, cert) = self
867 .request_status_signed(request_id, effective_id, signed_request_status.clone())
868 .await?;
869 match resp {
870 RequestStatusResponse::Unknown => {
871 if retry_policy.get_elapsed_time() > Duration::from_secs(5 * 60) {
873 return Err(AgentError::TimeoutWaitingForResponse());
874 }
875 }
876
877 RequestStatusResponse::Received | RequestStatusResponse::Processing => {
878 if !request_accepted {
879 retry_policy.reset();
880 request_accepted = true;
881 }
882 }
883
884 RequestStatusResponse::Replied(ReplyResponse { arg, .. }) => {
885 return Ok((arg, cert))
886 }
887
888 RequestStatusResponse::Rejected(response) => {
889 return Err(AgentError::CertifiedReject {
890 reject: response,
891 operation: None,
892 })
893 }
894
895 RequestStatusResponse::Done => {
896 return Err(AgentError::RequestStatusDoneNoReply(String::from(
897 *request_id,
898 )))
899 }
900 };
901
902 match retry_policy.next_backoff() {
903 Some(duration) => crate::util::sleep(duration).await,
904
905 None => return Err(AgentError::TimeoutWaitingForResponse()),
906 }
907 }
908 }
909
910 pub async fn wait(
916 &self,
917 request_id: &RequestId,
918 effective_id: impl Into<EffectiveId>,
919 ) -> Result<(Vec<u8>, Certificate), AgentError> {
920 self.wait_inner(request_id, effective_id.into(), None).await
921 }
922
923 async fn wait_inner(
924 &self,
925 request_id: &RequestId,
926 effective_id: EffectiveId,
927 operation: Option<Operation>,
928 ) -> Result<(Vec<u8>, Certificate), AgentError> {
929 let mut retry_policy = self.get_retry_policy();
930
931 let mut request_accepted = false;
932 loop {
933 let (resp, cert) = self.request_status_raw(request_id, effective_id).await?;
934 match resp {
935 RequestStatusResponse::Unknown => {
936 if retry_policy.get_elapsed_time() > Duration::from_secs(5 * 60) {
938 return Err(AgentError::TimeoutWaitingForResponse());
939 }
940 }
941
942 RequestStatusResponse::Received | RequestStatusResponse::Processing => {
943 if !request_accepted {
944 retry_policy.reset();
952 request_accepted = true;
953 }
954 }
955
956 RequestStatusResponse::Replied(ReplyResponse { arg, .. }) => {
957 return Ok((arg, cert))
958 }
959
960 RequestStatusResponse::Rejected(response) => {
961 return Err(AgentError::CertifiedReject {
962 reject: response,
963 operation,
964 })
965 }
966
967 RequestStatusResponse::Done => {
968 return Err(AgentError::RequestStatusDoneNoReply(String::from(
969 *request_id,
970 )))
971 }
972 };
973
974 match retry_policy.next_backoff() {
975 Some(duration) => crate::util::sleep(duration).await,
976
977 None => return Err(AgentError::TimeoutWaitingForResponse()),
978 }
979 }
980 }
981
982 pub async fn read_state_raw(
989 &self,
990 paths: Vec<Vec<Label>>,
991 effective_id: impl Into<EffectiveId>,
992 ) -> Result<Certificate, AgentError> {
993 let effective_id = effective_id.into();
994 let content = self.read_state_content(paths)?;
995 let serialized_bytes = sign_envelope(&content, self.identity.clone())?;
996
997 let read_state_response: ReadStateResponse = self
998 .read_state_endpoint_for(effective_id, serialized_bytes)
999 .await?;
1000 let cert: Certificate = serde_cbor::from_slice(&read_state_response.certificate)
1001 .map_err(AgentError::InvalidCborData)?;
1002 self.verify(&cert, effective_id)?;
1003 Ok(cert)
1004 }
1005
1006 pub async fn read_subnet_state_raw(
1011 &self,
1012 paths: Vec<Vec<Label>>,
1013 subnet_id: Principal,
1014 ) -> Result<Certificate, AgentError> {
1015 self.read_state_raw(paths, EffectiveId::Subnet(subnet_id))
1016 .await
1017 }
1018
1019 fn read_state_content(&self, paths: Vec<Vec<Label>>) -> Result<EnvelopeContent, AgentError> {
1020 Ok(EnvelopeContent::ReadState {
1021 sender: self.identity.sender().map_err(AgentError::SigningError)?,
1022 paths,
1023 ingress_expiry: self.get_expiry_date(),
1024 })
1025 }
1026
1027 pub fn verify(
1034 &self,
1035 cert: &Certificate,
1036 effective_id: impl Into<EffectiveId>,
1037 ) -> Result<(), AgentError> {
1038 match effective_id.into() {
1039 EffectiveId::Canister(p) => self.verify_cert(cert, p)?,
1040 EffectiveId::Subnet(p) => self.verify_cert_for_subnet(cert, p)?,
1041 }
1042 self.verify_cert_timestamp(cert)?;
1043 Ok(())
1044 }
1045
1046 fn verify_cert(
1047 &self,
1048 cert: &Certificate,
1049 effective_canister_id: Principal,
1050 ) -> Result<(), AgentError> {
1051 let sig = &cert.signature;
1052
1053 let root_hash = cert.tree.digest();
1054 let mut msg = vec![];
1055 msg.extend_from_slice(IC_STATE_ROOT_DOMAIN_SEPARATOR);
1056 msg.extend_from_slice(&root_hash);
1057
1058 let der_key = self.check_delegation(&cert.delegation, effective_canister_id)?;
1059 let key = extract_der(der_key)?;
1060
1061 ic_verify_bls_signature::verify_bls_signature(sig, &msg, &key)
1062 .map_err(|_| AgentError::CertificateVerificationFailed())?;
1063 Ok(())
1064 }
1065
1066 pub fn verify_for_subnet(
1072 &self,
1073 cert: &Certificate,
1074 subnet_id: Principal,
1075 ) -> Result<(), AgentError> {
1076 self.verify(cert, EffectiveId::Subnet(subnet_id))
1077 }
1078
1079 fn verify_cert_for_subnet(
1080 &self,
1081 cert: &Certificate,
1082 subnet_id: Principal,
1083 ) -> Result<(), AgentError> {
1084 let sig = &cert.signature;
1085
1086 let root_hash = cert.tree.digest();
1087 let mut msg = vec![];
1088 msg.extend_from_slice(IC_STATE_ROOT_DOMAIN_SEPARATOR);
1089 msg.extend_from_slice(&root_hash);
1090
1091 let der_key = self.check_delegation_for_subnet(&cert.delegation, subnet_id)?;
1092 let key = extract_der(der_key)?;
1093
1094 ic_verify_bls_signature::verify_bls_signature(sig, &msg, &key)
1095 .map_err(|_| AgentError::CertificateVerificationFailed())?;
1096 Ok(())
1097 }
1098
1099 fn verify_cert_timestamp(&self, cert: &Certificate) -> Result<(), AgentError> {
1100 let time = lookup_time(cert)?;
1103 if (OffsetDateTime::now_utc()
1104 - OffsetDateTime::from_unix_timestamp_nanos(time.into()).unwrap())
1105 > self.ingress_expiry
1106 {
1107 Err(AgentError::CertificateOutdated(self.ingress_expiry))
1108 } else {
1109 Ok(())
1110 }
1111 }
1112
1113 fn check_delegation(
1114 &self,
1115 delegation: &Option<Delegation>,
1116 effective_canister_id: Principal,
1117 ) -> Result<Vec<u8>, AgentError> {
1118 match delegation {
1119 None => Ok(self.read_root_key()),
1120 Some(delegation) => {
1121 let cert: Certificate = serde_cbor::from_slice(&delegation.certificate)
1122 .map_err(AgentError::InvalidCborData)?;
1123 if cert.delegation.is_some() {
1124 return Err(AgentError::CertificateHasTooManyDelegations);
1125 }
1126 self.verify_cert(&cert, effective_canister_id)?;
1127 let canister_range_shards_lookup =
1128 ["canister_ranges".as_bytes(), delegation.subnet_id.as_ref()];
1129 let ranges: Vec<(Principal, Principal)> =
1130 match lookup_tree(&cert.tree, canister_range_shards_lookup) {
1131 Ok(canister_range_shards) => {
1132 let mut shard_paths = canister_range_shards
1133 .list_paths() .into_iter()
1135 .map(|mut x| {
1136 x.pop() .ok_or_else(AgentError::CertificateVerificationFailed)
1138 })
1139 .collect::<Result<Vec<_>, _>>()?;
1140 if shard_paths.is_empty() {
1141 return Err(AgentError::CertificateNotAuthorized());
1142 }
1143 shard_paths.sort_unstable();
1144 let shard_division = shard_paths.partition_point(|shard| {
1145 shard.as_bytes() <= effective_canister_id.as_slice()
1146 });
1147 if shard_division == 0 {
1148 return Err(AgentError::CertificateNotAuthorized());
1150 }
1151 let max_potential_shard = &shard_paths[shard_division - 1];
1152 let canister_range = lookup_value(
1153 &canister_range_shards,
1154 [max_potential_shard.as_bytes()],
1155 )?;
1156 serde_cbor::from_slice(canister_range)
1157 .map_err(AgentError::InvalidCborData)?
1158 }
1159 Err(AgentError::LookupPathAbsent(_) | AgentError::LookupPathUnknown(_)) => {
1161 let subnet_ranges_path = [
1162 "subnet".as_bytes(),
1163 delegation.subnet_id.as_ref(),
1164 "canister_ranges".as_bytes(),
1165 ];
1166 let canister_range = lookup_value(&cert.tree, subnet_ranges_path)?;
1167 serde_cbor::from_slice(canister_range)
1168 .map_err(AgentError::InvalidCborData)?
1169 }
1170 Err(e) => return Err(e),
1171 };
1172 if !principal_is_within_ranges(&effective_canister_id, &ranges[..]) {
1173 return Err(AgentError::CertificateNotAuthorized());
1175 }
1176
1177 let public_key_path = [
1178 "subnet".as_bytes(),
1179 delegation.subnet_id.as_ref(),
1180 "public_key".as_bytes(),
1181 ];
1182 lookup_value(&cert.tree, public_key_path).map(<[u8]>::to_vec)
1183 }
1184 }
1185 }
1186
1187 fn check_delegation_for_subnet(
1188 &self,
1189 delegation: &Option<Delegation>,
1190 subnet_id: Principal,
1191 ) -> Result<Vec<u8>, AgentError> {
1192 match delegation {
1193 None => Ok(self.read_root_key()),
1194 Some(delegation) => {
1195 let cert: Certificate = serde_cbor::from_slice(&delegation.certificate)
1196 .map_err(AgentError::InvalidCborData)?;
1197 if cert.delegation.is_some() {
1198 return Err(AgentError::CertificateHasTooManyDelegations);
1199 }
1200 self.verify_cert_for_subnet(&cert, subnet_id)?;
1201 let public_key_path = [
1202 "subnet".as_bytes(),
1203 subnet_id.as_ref(),
1204 "public_key".as_bytes(),
1205 ];
1206 let pk = lookup_value(&cert.tree, public_key_path)
1207 .map_err(|_| AgentError::CertificateNotAuthorized())?
1208 .to_vec();
1209 Ok(pk)
1210 }
1211 }
1212 }
1213
1214 pub async fn read_state_canister_info(
1217 &self,
1218 canister_id: Principal,
1219 path: &str,
1220 ) -> Result<Vec<u8>, AgentError> {
1221 let paths: Vec<Vec<Label>> = vec![vec![
1222 "canister".into(),
1223 Label::from_bytes(canister_id.as_slice()),
1224 path.into(),
1225 ]];
1226
1227 let cert = self.read_state_raw(paths, canister_id).await?;
1228
1229 lookup_canister_info(cert, canister_id, path)
1230 }
1231
1232 pub async fn read_state_canister_controllers(
1234 &self,
1235 canister_id: Principal,
1236 ) -> Result<Vec<Principal>, AgentError> {
1237 let blob = self
1238 .read_state_canister_info(canister_id, "controllers")
1239 .await?;
1240 let controllers: Vec<Principal> =
1241 serde_cbor::from_slice(&blob).map_err(AgentError::InvalidCborData)?;
1242 Ok(controllers)
1243 }
1244
1245 pub async fn read_state_canister_module_hash(
1247 &self,
1248 canister_id: Principal,
1249 ) -> Result<Vec<u8>, AgentError> {
1250 self.read_state_canister_info(canister_id, "module_hash")
1251 .await
1252 }
1253
1254 pub async fn read_state_canister_metadata(
1256 &self,
1257 canister_id: Principal,
1258 path: &str,
1259 ) -> Result<Vec<u8>, AgentError> {
1260 let paths: Vec<Vec<Label>> = vec![vec![
1261 "canister".into(),
1262 Label::from_bytes(canister_id.as_slice()),
1263 "metadata".into(),
1264 path.into(),
1265 ]];
1266
1267 let cert = self.read_state_raw(paths, canister_id).await?;
1268
1269 lookup_canister_metadata(cert, canister_id, path)
1270 }
1271
1272 pub async fn read_state_subnet_metrics(
1274 &self,
1275 subnet_id: Principal,
1276 ) -> Result<SubnetMetrics, AgentError> {
1277 let paths = vec![vec![
1278 "subnet".into(),
1279 Label::from_bytes(subnet_id.as_slice()),
1280 "metrics".into(),
1281 ]];
1282 let cert = self.read_subnet_state_raw(paths, subnet_id).await?;
1283 lookup_subnet_metrics(cert, subnet_id)
1284 }
1285
1286 pub async fn read_state_subnet_canister_ranges(
1288 &self,
1289 subnet_id: Principal,
1290 ) -> Result<Vec<(Principal, Principal)>, AgentError> {
1291 let paths = vec![vec![
1292 "subnet".into(),
1293 Label::from_bytes(subnet_id.as_slice()),
1294 "canister_ranges".into(),
1295 ]];
1296 let cert = self.read_subnet_state_raw(paths, subnet_id).await?;
1297 lookup_subnet_canister_ranges(&cert, subnet_id)
1298 }
1299
1300 pub async fn request_status_raw(
1306 &self,
1307 request_id: &RequestId,
1308 effective_id: impl Into<EffectiveId>,
1309 ) -> Result<(RequestStatusResponse, Certificate), AgentError> {
1310 let paths: Vec<Vec<Label>> =
1311 vec![vec!["request_status".into(), request_id.to_vec().into()]];
1312
1313 let cert = self.read_state_raw(paths, effective_id).await?;
1314
1315 Ok((lookup_request_status(&cert, request_id)?, cert))
1316 }
1317
1318 pub async fn request_status_signed(
1326 &self,
1327 request_id: &RequestId,
1328 effective_id: impl Into<EffectiveId>,
1329 signed_request_status: Vec<u8>,
1330 ) -> Result<(RequestStatusResponse, Certificate), AgentError> {
1331 let effective_id = effective_id.into();
1332 let _envelope: Envelope =
1333 serde_cbor::from_slice(&signed_request_status).map_err(AgentError::InvalidCborData)?;
1334 let read_state_response: ReadStateResponse = self
1335 .read_state_endpoint_for(effective_id, signed_request_status)
1336 .await?;
1337
1338 let cert: Certificate = serde_cbor::from_slice(&read_state_response.certificate)
1339 .map_err(AgentError::InvalidCborData)?;
1340 self.verify(&cert, effective_id)?;
1341 Ok((lookup_request_status(&cert, request_id)?, cert))
1342 }
1343
1344 pub fn update<S: Into<String>>(
1347 &self,
1348 canister_id: &Principal,
1349 method_name: S,
1350 ) -> UpdateBuilder<'_> {
1351 UpdateBuilder::new(self, *canister_id, method_name.into())
1352 }
1353
1354 pub async fn status(&self) -> Result<Status, AgentError> {
1356 let endpoint = "api/v2/status";
1357 let bytes = self.execute(Method::GET, endpoint, None).await?.1;
1358
1359 let cbor: serde_cbor::Value =
1360 serde_cbor::from_slice(&bytes).map_err(AgentError::InvalidCborData)?;
1361
1362 Status::try_from(&cbor).map_err(|_| AgentError::InvalidReplicaStatus)
1363 }
1364
1365 pub fn query<S: Into<String>>(
1368 &self,
1369 canister_id: &Principal,
1370 method_name: S,
1371 ) -> QueryBuilder<'_> {
1372 QueryBuilder::new(self, *canister_id, method_name.into())
1373 }
1374
1375 pub fn sign_request_status(
1381 &self,
1382 effective_id: impl Into<EffectiveId>,
1383 request_id: RequestId,
1384 ) -> Result<SignedRequestStatus, AgentError> {
1385 let effective_id = effective_id.into();
1386 let paths: Vec<Vec<Label>> =
1387 vec![vec!["request_status".into(), request_id.to_vec().into()]];
1388 let read_state_content = self.read_state_content(paths)?;
1389 let signed_request_status = sign_envelope(&read_state_content, self.identity.clone())?;
1390 let ingress_expiry = read_state_content.ingress_expiry();
1391 let sender = *read_state_content.sender();
1392 Ok(SignedRequestStatus {
1393 ingress_expiry,
1394 sender,
1395 effective_canister_id: effective_id.as_principal(),
1396 request_id,
1397 signed_request_status,
1398 })
1399 }
1400
1401 pub async fn get_subnet_by_canister(
1404 &self,
1405 canister: &Principal,
1406 ) -> Result<Arc<Subnet>, AgentError> {
1407 let subnet = self
1408 .subnet_key_cache
1409 .lock()
1410 .unwrap()
1411 .get_subnet_by_canister(canister);
1412 if let Some(subnet) = subnet {
1413 Ok(subnet)
1414 } else {
1415 self.fetch_subnet_by_canister(canister).await
1416 }
1417 }
1418
1419 pub async fn get_subnet_by_id(&self, subnet_id: &Principal) -> Result<Arc<Subnet>, AgentError> {
1422 let subnet = self
1423 .subnet_key_cache
1424 .lock()
1425 .unwrap()
1426 .get_subnet_by_id(subnet_id);
1427 if let Some(subnet) = subnet {
1428 Ok(subnet)
1429 } else {
1430 self.fetch_subnet_by_id(subnet_id).await
1431 }
1432 }
1433
1434 pub async fn fetch_api_boundary_nodes_by_canister_id(
1436 &self,
1437 canister_id: Principal,
1438 ) -> Result<Vec<ApiBoundaryNode>, AgentError> {
1439 let paths = vec![vec!["api_boundary_nodes".into()]];
1440 let certificate = self.read_state_raw(paths, canister_id).await?;
1441 let api_boundary_nodes = lookup_api_boundary_nodes(certificate)?;
1442 Ok(api_boundary_nodes)
1443 }
1444
1445 pub async fn fetch_api_boundary_nodes_by_subnet_id(
1447 &self,
1448 subnet_id: Principal,
1449 ) -> Result<Vec<ApiBoundaryNode>, AgentError> {
1450 let paths = vec![vec!["api_boundary_nodes".into()]];
1451 let certificate = self.read_subnet_state_raw(paths, subnet_id).await?;
1452 let api_boundary_nodes = lookup_api_boundary_nodes(certificate)?;
1453 Ok(api_boundary_nodes)
1454 }
1455
1456 pub async fn fetch_subnet_by_canister(
1461 &self,
1462 canister: &Principal,
1463 ) -> Result<Arc<Subnet>, AgentError> {
1464 let canister_cert = self
1465 .read_state_raw(vec![vec!["subnet".into()]], *canister)
1466 .await?;
1467 let subnet_id = if let Some(delegation) = canister_cert.delegation.as_ref() {
1468 Principal::from_slice(&delegation.subnet_id)
1469 } else {
1470 Principal::self_authenticating(&self.root_key.read().unwrap()[..])
1472 };
1473 let mut subnet = lookup_incomplete_subnet(&subnet_id, &canister_cert)?;
1474 let canister_ranges = if let Some(delegation) = canister_cert.delegation.as_ref() {
1475 let delegation_cert: Certificate = serde_cbor::from_slice(&delegation.certificate)?;
1477 lookup_canister_ranges(&subnet_id, &delegation_cert)?
1478 } else {
1479 lookup_canister_ranges(&subnet_id, &canister_cert)?
1480 };
1481 subnet.canister_ranges = canister_ranges;
1482 if !subnet.canister_ranges.contains(canister) {
1483 return Err(AgentError::CertificateNotAuthorized());
1484 }
1485 let subnet = Arc::new(subnet);
1486 self.subnet_key_cache
1487 .lock()
1488 .unwrap()
1489 .insert_subnet(subnet_id, subnet.clone());
1490 Ok(subnet)
1491 }
1492
1493 pub async fn fetch_subnet_by_id(
1498 &self,
1499 subnet_id: &Principal,
1500 ) -> Result<Arc<Subnet>, AgentError> {
1501 let subnet_cert = self
1502 .read_subnet_state_raw(
1503 vec![
1504 vec!["canister_ranges".into(), subnet_id.as_slice().into()],
1505 vec!["subnet".into(), subnet_id.as_slice().into()],
1506 ],
1507 *subnet_id,
1508 )
1509 .await?;
1510 let subnet = lookup_subnet_and_ranges(subnet_id, &subnet_cert)?;
1511 let subnet = Arc::new(subnet);
1512 self.subnet_key_cache
1513 .lock()
1514 .unwrap()
1515 .insert_subnet(*subnet_id, subnet.clone());
1516 Ok(subnet)
1517 }
1518
1519 async fn request(
1520 &self,
1521 method: Method,
1522 endpoint: &str,
1523 body: Option<Vec<u8>>,
1524 ) -> Result<(StatusCode, HeaderMap, Vec<u8>), AgentError> {
1525 let body = body.map(Bytes::from);
1526
1527 let create_request_with_generated_url = || -> Result<http::Request<Bytes>, AgentError> {
1528 let url = self.route_provider.route()?.join(endpoint)?;
1529 let uri = Uri::from_str(url.as_str())
1530 .map_err(|e| AgentError::InvalidReplicaUrl(e.to_string()))?;
1531 let body = body.clone().unwrap_or_default();
1532 let request = http::Request::builder()
1533 .method(method.clone())
1534 .uri(uri)
1535 .header(CONTENT_TYPE, "application/cbor")
1536 .body(body)
1537 .map_err(|e| {
1538 AgentError::TransportError(TransportError::Generic(format!(
1539 "unable to create request: {e:#}"
1540 )))
1541 })?;
1542
1543 Ok(request)
1544 };
1545
1546 let response = self
1547 .client
1548 .call(
1549 &create_request_with_generated_url,
1550 self.max_tcp_error_retries,
1551 self.max_response_body_size,
1552 )
1553 .await?;
1554
1555 let (parts, body) = response.into_parts();
1556
1557 Ok((parts.status, parts.headers, body.to_vec()))
1558 }
1559
1560 async fn execute(
1561 &self,
1562 method: Method,
1563 endpoint: &str,
1564 body: Option<Vec<u8>>,
1565 ) -> Result<(StatusCode, Vec<u8>), AgentError> {
1566 let request_result = self.request(method.clone(), endpoint, body.clone()).await?;
1567
1568 let status = request_result.0;
1569 let headers = request_result.1;
1570 let body = request_result.2;
1571
1572 if status.is_client_error() || status.is_server_error() {
1573 Err(AgentError::HttpError(HttpErrorPayload {
1574 status: status.into(),
1575 content_type: headers
1576 .get(CONTENT_TYPE)
1577 .and_then(|value| value.to_str().ok())
1578 .map(str::to_string),
1579 content: body,
1580 }))
1581 } else if !(status == StatusCode::OK || status == StatusCode::ACCEPTED) {
1582 Err(AgentError::InvalidHttpResponse(format!(
1583 "Expected `200`, `202`, 4xx`, or `5xx` HTTP status code. Got: {status}",
1584 )))
1585 } else {
1586 Ok((status, body))
1587 }
1588 }
1589}
1590
1591fn principal_is_within_ranges(principal: &Principal, ranges: &[(Principal, Principal)]) -> bool {
1594 ranges
1595 .iter()
1596 .any(|r| principal >= &r.0 && principal <= &r.1)
1597}
1598
1599fn sign_envelope(
1600 content: &EnvelopeContent,
1601 identity: Arc<dyn Identity>,
1602) -> Result<Vec<u8>, AgentError> {
1603 let signature = identity.sign(content).map_err(AgentError::SigningError)?;
1604
1605 let envelope = Envelope {
1606 content: Cow::Borrowed(content),
1607 sender_pubkey: signature.public_key,
1608 sender_sig: signature.signature,
1609 sender_delegation: signature.delegations,
1610 };
1611
1612 let mut serialized_bytes = Vec::new();
1613 let mut serializer = serde_cbor::Serializer::new(&mut serialized_bytes);
1614 serializer.self_describe()?;
1615 envelope.serialize(&mut serializer)?;
1616
1617 Ok(serialized_bytes)
1618}
1619
1620pub fn signed_query_inspect(
1623 sender: Principal,
1624 canister_id: Principal,
1625 method_name: &str,
1626 arg: &[u8],
1627 ingress_expiry: u64,
1628 signed_query: Vec<u8>,
1629) -> Result<(), AgentError> {
1630 let envelope: Envelope =
1631 serde_cbor::from_slice(&signed_query).map_err(AgentError::InvalidCborData)?;
1632 match envelope.content.as_ref() {
1633 EnvelopeContent::Query {
1634 ingress_expiry: ingress_expiry_cbor,
1635 sender: sender_cbor,
1636 canister_id: canister_id_cbor,
1637 method_name: method_name_cbor,
1638 arg: arg_cbor,
1639 nonce: _nonce,
1640 sender_info: _,
1641 } => {
1642 if ingress_expiry != *ingress_expiry_cbor {
1643 return Err(AgentError::CallDataMismatch {
1644 field: "ingress_expiry".to_string(),
1645 value_arg: ingress_expiry.to_string(),
1646 value_cbor: ingress_expiry_cbor.to_string(),
1647 });
1648 }
1649 if sender != *sender_cbor {
1650 return Err(AgentError::CallDataMismatch {
1651 field: "sender".to_string(),
1652 value_arg: sender.to_string(),
1653 value_cbor: sender_cbor.to_string(),
1654 });
1655 }
1656 if canister_id != *canister_id_cbor {
1657 return Err(AgentError::CallDataMismatch {
1658 field: "canister_id".to_string(),
1659 value_arg: canister_id.to_string(),
1660 value_cbor: canister_id_cbor.to_string(),
1661 });
1662 }
1663 if method_name != *method_name_cbor {
1664 return Err(AgentError::CallDataMismatch {
1665 field: "method_name".to_string(),
1666 value_arg: method_name.to_string(),
1667 value_cbor: method_name_cbor.clone(),
1668 });
1669 }
1670 if arg != *arg_cbor {
1671 return Err(AgentError::CallDataMismatch {
1672 field: "arg".to_string(),
1673 value_arg: format!("{arg:?}"),
1674 value_cbor: format!("{arg_cbor:?}"),
1675 });
1676 }
1677 }
1678 EnvelopeContent::Call { .. } => {
1679 return Err(AgentError::CallDataMismatch {
1680 field: "request_type".to_string(),
1681 value_arg: "query".to_string(),
1682 value_cbor: "call".to_string(),
1683 })
1684 }
1685 EnvelopeContent::ReadState { .. } => {
1686 return Err(AgentError::CallDataMismatch {
1687 field: "request_type".to_string(),
1688 value_arg: "query".to_string(),
1689 value_cbor: "read_state".to_string(),
1690 })
1691 }
1692 }
1693 Ok(())
1694}
1695
1696pub fn signed_update_inspect(
1699 sender: Principal,
1700 canister_id: Principal,
1701 method_name: &str,
1702 arg: &[u8],
1703 ingress_expiry: u64,
1704 signed_update: Vec<u8>,
1705) -> Result<(), AgentError> {
1706 let envelope: Envelope =
1707 serde_cbor::from_slice(&signed_update).map_err(AgentError::InvalidCborData)?;
1708 match envelope.content.as_ref() {
1709 EnvelopeContent::Call {
1710 nonce: _nonce,
1711 ingress_expiry: ingress_expiry_cbor,
1712 sender: sender_cbor,
1713 canister_id: canister_id_cbor,
1714 method_name: method_name_cbor,
1715 arg: arg_cbor,
1716 sender_info: _,
1717 } => {
1718 if ingress_expiry != *ingress_expiry_cbor {
1719 return Err(AgentError::CallDataMismatch {
1720 field: "ingress_expiry".to_string(),
1721 value_arg: ingress_expiry.to_string(),
1722 value_cbor: ingress_expiry_cbor.to_string(),
1723 });
1724 }
1725 if sender != *sender_cbor {
1726 return Err(AgentError::CallDataMismatch {
1727 field: "sender".to_string(),
1728 value_arg: sender.to_string(),
1729 value_cbor: sender_cbor.to_string(),
1730 });
1731 }
1732 if canister_id != *canister_id_cbor {
1733 return Err(AgentError::CallDataMismatch {
1734 field: "canister_id".to_string(),
1735 value_arg: canister_id.to_string(),
1736 value_cbor: canister_id_cbor.to_string(),
1737 });
1738 }
1739 if method_name != *method_name_cbor {
1740 return Err(AgentError::CallDataMismatch {
1741 field: "method_name".to_string(),
1742 value_arg: method_name.to_string(),
1743 value_cbor: method_name_cbor.clone(),
1744 });
1745 }
1746 if arg != *arg_cbor {
1747 return Err(AgentError::CallDataMismatch {
1748 field: "arg".to_string(),
1749 value_arg: format!("{arg:?}"),
1750 value_cbor: format!("{arg_cbor:?}"),
1751 });
1752 }
1753 }
1754 EnvelopeContent::ReadState { .. } => {
1755 return Err(AgentError::CallDataMismatch {
1756 field: "request_type".to_string(),
1757 value_arg: "call".to_string(),
1758 value_cbor: "read_state".to_string(),
1759 })
1760 }
1761 EnvelopeContent::Query { .. } => {
1762 return Err(AgentError::CallDataMismatch {
1763 field: "request_type".to_string(),
1764 value_arg: "call".to_string(),
1765 value_cbor: "query".to_string(),
1766 })
1767 }
1768 }
1769 Ok(())
1770}
1771
1772pub fn signed_request_status_inspect(
1775 sender: Principal,
1776 request_id: &RequestId,
1777 ingress_expiry: u64,
1778 signed_request_status: Vec<u8>,
1779) -> Result<(), AgentError> {
1780 let paths: Vec<Vec<Label>> = vec![vec!["request_status".into(), request_id.to_vec().into()]];
1781 let envelope: Envelope =
1782 serde_cbor::from_slice(&signed_request_status).map_err(AgentError::InvalidCborData)?;
1783 match envelope.content.as_ref() {
1784 EnvelopeContent::ReadState {
1785 ingress_expiry: ingress_expiry_cbor,
1786 sender: sender_cbor,
1787 paths: paths_cbor,
1788 } => {
1789 if ingress_expiry != *ingress_expiry_cbor {
1790 return Err(AgentError::CallDataMismatch {
1791 field: "ingress_expiry".to_string(),
1792 value_arg: ingress_expiry.to_string(),
1793 value_cbor: ingress_expiry_cbor.to_string(),
1794 });
1795 }
1796 if sender != *sender_cbor {
1797 return Err(AgentError::CallDataMismatch {
1798 field: "sender".to_string(),
1799 value_arg: sender.to_string(),
1800 value_cbor: sender_cbor.to_string(),
1801 });
1802 }
1803
1804 if paths != *paths_cbor {
1805 return Err(AgentError::CallDataMismatch {
1806 field: "paths".to_string(),
1807 value_arg: format!("{paths:?}"),
1808 value_cbor: format!("{paths_cbor:?}"),
1809 });
1810 }
1811 }
1812 EnvelopeContent::Query { .. } => {
1813 return Err(AgentError::CallDataMismatch {
1814 field: "request_type".to_string(),
1815 value_arg: "read_state".to_string(),
1816 value_cbor: "query".to_string(),
1817 })
1818 }
1819 EnvelopeContent::Call { .. } => {
1820 return Err(AgentError::CallDataMismatch {
1821 field: "request_type".to_string(),
1822 value_arg: "read_state".to_string(),
1823 value_cbor: "call".to_string(),
1824 })
1825 }
1826 }
1827 Ok(())
1828}
1829
1830#[derive(Clone)]
1831struct SubnetCache {
1832 subnets: TimedCache<Principal, Arc<Subnet>>,
1833 canister_index: RangeInclusiveMap<Principal, Principal>,
1834}
1835
1836impl SubnetCache {
1837 fn new() -> Self {
1838 Self {
1839 subnets: TimedCache::with_lifespan(Duration::from_secs(300)),
1840 canister_index: RangeInclusiveMap::new(),
1841 }
1842 }
1843
1844 fn get_subnet_by_canister(&mut self, canister: &Principal) -> Option<Arc<Subnet>> {
1845 self.canister_index
1846 .get(canister)
1847 .and_then(|subnet_id| self.subnets.cache_get(subnet_id).cloned())
1848 .filter(|subnet| subnet.canister_ranges.contains(canister))
1849 }
1850
1851 fn get_subnet_by_id(&mut self, subnet_id: &Principal) -> Option<Arc<Subnet>> {
1852 self.subnets.cache_get(subnet_id).cloned()
1853 }
1854
1855 fn insert_subnet(&mut self, subnet_id: Principal, subnet: Arc<Subnet>) {
1856 self.subnets.cache_set(subnet_id, subnet.clone());
1857 for range in subnet.canister_ranges.iter() {
1858 self.canister_index.insert(range.clone(), subnet_id);
1859 }
1860 }
1861}
1862
1863#[derive(Debug, Clone)]
1865pub struct ApiBoundaryNode {
1866 pub domain: String,
1868 pub ipv6_address: String,
1870 pub ipv4_address: Option<String>,
1872}
1873
1874#[derive(Debug, Clone)]
1878#[non_exhaustive]
1879pub struct QueryBuilder<'agent> {
1880 agent: &'agent Agent,
1881 pub effective_canister_id: Principal,
1883 pub canister_id: Principal,
1885 pub method_name: String,
1887 pub arg: Vec<u8>,
1889 pub ingress_expiry_datetime: Option<u64>,
1891 pub use_nonce: bool,
1893}
1894
1895impl<'agent> QueryBuilder<'agent> {
1896 pub fn new(agent: &'agent Agent, canister_id: Principal, method_name: String) -> Self {
1898 Self {
1899 agent,
1900 effective_canister_id: canister_id,
1901 canister_id,
1902 method_name,
1903 arg: vec![],
1904 ingress_expiry_datetime: None,
1905 use_nonce: false,
1906 }
1907 }
1908
1909 pub fn with_effective_canister_id(mut self, canister_id: Principal) -> Self {
1911 self.effective_canister_id = canister_id;
1912 self
1913 }
1914
1915 pub fn with_arg<A: Into<Vec<u8>>>(mut self, arg: A) -> Self {
1917 self.arg = arg.into();
1918 self
1919 }
1920
1921 pub fn expire_at(mut self, time: impl Into<OffsetDateTime>) -> Self {
1923 self.ingress_expiry_datetime = Some(time.into().unix_timestamp_nanos() as u64);
1924 self
1925 }
1926
1927 pub fn expire_after(mut self, duration: Duration) -> Self {
1929 self.ingress_expiry_datetime = Some(
1930 OffsetDateTime::now_utc()
1931 .saturating_add(duration.try_into().expect("negative duration"))
1932 .unix_timestamp_nanos() as u64,
1933 );
1934 self
1935 }
1936
1937 pub fn with_nonce_generation(mut self) -> Self {
1940 self.use_nonce = true;
1941 self
1942 }
1943
1944 pub async fn call(self) -> Result<Vec<u8>, AgentError> {
1946 self.agent
1947 .query_raw(
1948 self.canister_id,
1949 self.effective_canister_id,
1950 self.method_name,
1951 self.arg,
1952 self.ingress_expiry_datetime,
1953 self.use_nonce,
1954 None,
1955 )
1956 .await
1957 }
1958
1959 pub async fn call_with_verification(self) -> Result<Vec<u8>, AgentError> {
1964 self.agent
1965 .query_raw(
1966 self.canister_id,
1967 self.effective_canister_id,
1968 self.method_name,
1969 self.arg,
1970 self.ingress_expiry_datetime,
1971 self.use_nonce,
1972 Some(true),
1973 )
1974 .await
1975 }
1976
1977 pub async fn call_without_verification(self) -> Result<Vec<u8>, AgentError> {
1982 self.agent
1983 .query_raw(
1984 self.canister_id,
1985 self.effective_canister_id,
1986 self.method_name,
1987 self.arg,
1988 self.ingress_expiry_datetime,
1989 self.use_nonce,
1990 Some(false),
1991 )
1992 .await
1993 }
1994
1995 pub fn sign(self) -> Result<SignedQuery, AgentError> {
1998 let effective_canister_id = self.effective_canister_id;
1999 let identity = self.agent.identity.clone();
2000 let content = self.into_envelope()?;
2001 let signed_query = sign_envelope(&content, identity)?;
2002 let EnvelopeContent::Query {
2003 ingress_expiry,
2004 sender,
2005 canister_id,
2006 method_name,
2007 arg,
2008 nonce,
2009 sender_info,
2010 } = content
2011 else {
2012 unreachable!()
2013 };
2014 Ok(SignedQuery {
2015 ingress_expiry,
2016 sender,
2017 canister_id,
2018 method_name,
2019 arg,
2020 effective_canister_id,
2021 signed_query,
2022 nonce,
2023 sender_info,
2024 })
2025 }
2026
2027 pub fn into_envelope(self) -> Result<EnvelopeContent, AgentError> {
2029 self.agent.query_content(
2030 self.canister_id,
2031 self.method_name,
2032 self.arg,
2033 self.ingress_expiry_datetime,
2034 self.use_nonce,
2035 )
2036 }
2037}
2038
2039impl<'agent> IntoFuture for QueryBuilder<'agent> {
2040 type IntoFuture = AgentFuture<'agent, Vec<u8>>;
2041 type Output = Result<Vec<u8>, AgentError>;
2042 fn into_future(self) -> Self::IntoFuture {
2043 Box::pin(self.call())
2044 }
2045}
2046
2047pub struct UpdateCall<'agent> {
2049 agent: &'agent Agent,
2050 response_future: AgentFuture<'agent, CallResponse<(Vec<u8>, Certificate)>>,
2051 effective_canister_id: Principal,
2052 canister_id: Principal,
2053 method_name: String,
2054}
2055
2056impl fmt::Debug for UpdateCall<'_> {
2057 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2058 f.debug_struct("UpdateCall")
2059 .field("agent", &self.agent)
2060 .field("effective_canister_id", &self.effective_canister_id)
2061 .finish_non_exhaustive()
2062 }
2063}
2064
2065impl Future for UpdateCall<'_> {
2066 type Output = Result<CallResponse<(Vec<u8>, Certificate)>, AgentError>;
2067 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2068 self.response_future.as_mut().poll(cx)
2069 }
2070}
2071
2072impl<'a> UpdateCall<'a> {
2073 pub async fn and_wait(self) -> Result<(Vec<u8>, Certificate), AgentError> {
2075 let response = self.response_future.await?;
2076
2077 match response {
2078 CallResponse::Response(response) => Ok(response),
2079 CallResponse::Poll(request_id) => {
2080 self.agent
2081 .wait_inner(
2082 &request_id,
2083 EffectiveId::Canister(self.effective_canister_id),
2084 Some(Operation::Call {
2085 canister: self.canister_id,
2086 method: self.method_name,
2087 }),
2088 )
2089 .await
2090 }
2091 }
2092 }
2093}
2094#[derive(Debug)]
2099pub struct UpdateBuilder<'agent> {
2100 agent: &'agent Agent,
2101 pub effective_canister_id: Principal,
2103 pub canister_id: Principal,
2105 pub method_name: String,
2107 pub arg: Vec<u8>,
2109 pub ingress_expiry_datetime: Option<u64>,
2111}
2112
2113impl<'agent> UpdateBuilder<'agent> {
2114 pub fn new(agent: &'agent Agent, canister_id: Principal, method_name: String) -> Self {
2116 Self {
2117 agent,
2118 effective_canister_id: canister_id,
2119 canister_id,
2120 method_name,
2121 arg: vec![],
2122 ingress_expiry_datetime: None,
2123 }
2124 }
2125
2126 pub fn with_effective_canister_id(mut self, canister_id: Principal) -> Self {
2128 self.effective_canister_id = canister_id;
2129 self
2130 }
2131
2132 pub fn with_arg<A: Into<Vec<u8>>>(mut self, arg: A) -> Self {
2134 self.arg = arg.into();
2135 self
2136 }
2137
2138 pub fn expire_at(mut self, time: impl Into<OffsetDateTime>) -> Self {
2140 self.ingress_expiry_datetime = Some(time.into().unix_timestamp_nanos() as u64);
2141 self
2142 }
2143
2144 pub fn expire_after(mut self, duration: Duration) -> Self {
2146 self.ingress_expiry_datetime = Some(
2147 OffsetDateTime::now_utc()
2148 .saturating_add(duration.try_into().expect("negative duration"))
2149 .unix_timestamp_nanos() as u64,
2150 );
2151 self
2152 }
2153
2154 pub async fn call_and_wait(self) -> Result<Vec<u8>, AgentError> {
2157 self.call().and_wait().await.map(|x| x.0)
2158 }
2159
2160 pub fn call(self) -> UpdateCall<'agent> {
2163 let method_name = self.method_name.clone();
2164 let response_future = async move {
2165 self.agent
2166 .update_raw(
2167 self.canister_id,
2168 self.effective_canister_id,
2169 self.method_name,
2170 self.arg,
2171 self.ingress_expiry_datetime,
2172 )
2173 .await
2174 };
2175 UpdateCall {
2176 agent: self.agent,
2177 response_future: Box::pin(response_future),
2178 effective_canister_id: self.effective_canister_id,
2179 canister_id: self.canister_id,
2180 method_name,
2181 }
2182 }
2183
2184 pub fn sign(self) -> Result<SignedUpdate, AgentError> {
2187 let identity = self.agent.identity.clone();
2188 let effective_canister_id = self.effective_canister_id;
2189 let content = self.into_envelope()?;
2190 let signed_update = sign_envelope(&content, identity)?;
2191 let request_id = to_request_id(&content)?;
2192 let EnvelopeContent::Call {
2193 nonce,
2194 ingress_expiry,
2195 sender,
2196 canister_id,
2197 method_name,
2198 arg,
2199 sender_info,
2200 } = content
2201 else {
2202 unreachable!()
2203 };
2204 Ok(SignedUpdate {
2205 nonce,
2206 ingress_expiry,
2207 sender,
2208 canister_id,
2209 method_name,
2210 arg,
2211 effective_canister_id,
2212 signed_update,
2213 request_id,
2214 sender_info,
2215 })
2216 }
2217
2218 pub fn into_envelope(self) -> Result<EnvelopeContent, AgentError> {
2220 let nonce = self.agent.nonce_factory.generate();
2221 self.agent.update_content(
2222 self.canister_id,
2223 self.method_name,
2224 self.arg,
2225 self.ingress_expiry_datetime,
2226 nonce,
2227 )
2228 }
2229}
2230
2231impl<'agent> IntoFuture for UpdateBuilder<'agent> {
2232 type IntoFuture = AgentFuture<'agent, Vec<u8>>;
2233 type Output = Result<Vec<u8>, AgentError>;
2234 fn into_future(self) -> Self::IntoFuture {
2235 Box::pin(self.call_and_wait())
2236 }
2237}
2238
2239#[cfg_attr(target_family = "wasm", async_trait(?Send))]
2241#[cfg_attr(not(target_family = "wasm"), async_trait)]
2242pub trait HttpService: Send + Sync + Debug {
2243 async fn call<'a>(
2245 &'a self,
2246 req: &'a (dyn Fn() -> Result<http::Request<Bytes>, AgentError> + Send + Sync),
2247 max_retries: usize,
2248 size_limit: Option<usize>,
2249 ) -> Result<http::Response<Bytes>, AgentError>;
2250}
2251
2252fn from_http_request(req: http::Request<Bytes>) -> Result<Request, AgentError> {
2254 let (parts, body) = req.into_parts();
2255 let body = reqwest::Body::from(body);
2256 let request = http::Request::from_parts(parts, body)
2259 .try_into()
2260 .map_err(|e: reqwest::Error| AgentError::InvalidReplicaUrl(e.to_string()))?;
2261
2262 Ok(request)
2263}
2264
2265#[cfg(not(target_family = "wasm"))]
2267async fn to_http_response(
2268 resp: Response,
2269 size_limit: Option<usize>,
2270) -> Result<http::Response<Bytes>, AgentError> {
2271 use http_body_util::{BodyExt, Limited};
2272
2273 let resp: http::Response<reqwest::Body> = resp.into();
2274 let (parts, body) = resp.into_parts();
2275 let body = Limited::new(body, size_limit.unwrap_or(usize::MAX));
2276 let body = body
2277 .collect()
2278 .await
2279 .map_err(|e| {
2280 AgentError::TransportError(TransportError::Generic(format!(
2281 "unable to read response body: {e:#}"
2282 )))
2283 })?
2284 .to_bytes();
2285 let resp = http::Response::from_parts(parts, body);
2286
2287 Ok(resp)
2288}
2289
2290#[cfg(target_family = "wasm")]
2294async fn to_http_response(
2295 resp: Response,
2296 size_limit: Option<usize>,
2297) -> Result<http::Response<Bytes>, AgentError> {
2298 use futures_util::StreamExt;
2299 use http_body::Frame;
2300 use http_body_util::{Limited, StreamBody};
2301
2302 let status = resp.status();
2304 let headers = resp.headers().clone();
2305
2306 let stream = resp.bytes_stream().map(|x| x.map(Frame::data));
2308 let body = StreamBody::new(stream);
2309 let body = Limited::new(body, size_limit.unwrap_or(usize::MAX));
2310 let body = http_body_util::BodyExt::collect(body)
2311 .await
2312 .map_err(|e| {
2313 AgentError::TransportError(TransportError::Generic(format!(
2314 "unable to read response body: {e:#}"
2315 )))
2316 })?
2317 .to_bytes();
2318
2319 let mut resp = http::Response::new(body);
2320 *resp.status_mut() = status;
2321 *resp.headers_mut() = headers;
2322
2323 Ok(resp)
2324}
2325
2326#[cfg(not(target_family = "wasm"))]
2327#[async_trait]
2328impl<T> HttpService for T
2329where
2330 for<'a> &'a T: Service<Request, Response = Response, Error = reqwest::Error>,
2331 for<'a> <&'a Self as Service<Request>>::Future: Send,
2332 T: Send + Sync + Debug + ?Sized,
2333{
2334 #[allow(clippy::needless_arbitrary_self_type)]
2335 async fn call<'a>(
2336 mut self: &'a Self,
2337 req: &'a (dyn Fn() -> Result<http::Request<Bytes>, AgentError> + Send + Sync),
2338 max_retries: usize,
2339 size_limit: Option<usize>,
2340 ) -> Result<http::Response<Bytes>, AgentError> {
2341 let mut retry_count = 0;
2342 loop {
2343 let request = from_http_request(req()?)?;
2344
2345 match Service::call(&mut self, request).await {
2346 Err(err) => {
2347 if err.is_connect() {
2349 if retry_count >= max_retries {
2350 return Err(AgentError::TransportError(TransportError::Reqwest(err)));
2351 }
2352 retry_count += 1;
2353 }
2354 else {
2356 return Err(AgentError::TransportError(TransportError::Reqwest(err)));
2357 }
2358 }
2359
2360 Ok(resp) => {
2361 let resp = to_http_response(resp, size_limit).await?;
2362 return Ok(resp);
2363 }
2364 }
2365 }
2366 }
2367}
2368
2369#[cfg(target_family = "wasm")]
2370#[async_trait(?Send)]
2371impl<T> HttpService for T
2372where
2373 for<'a> &'a T: Service<Request, Response = Response, Error = reqwest::Error>,
2374 T: Send + Sync + Debug + ?Sized,
2375{
2376 #[allow(clippy::needless_arbitrary_self_type)]
2377 async fn call<'a>(
2378 mut self: &'a Self,
2379 req: &'a (dyn Fn() -> Result<http::Request<Bytes>, AgentError> + Send + Sync),
2380 _retries: usize,
2381 _size_limit: Option<usize>,
2382 ) -> Result<http::Response<Bytes>, AgentError> {
2383 let request = from_http_request(req()?)?;
2384 let response = Service::call(&mut self, request)
2385 .await
2386 .map_err(|e| AgentError::TransportError(TransportError::Reqwest(e)))?;
2387
2388 to_http_response(response, _size_limit).await
2389 }
2390}
2391
2392#[derive(Debug)]
2393struct Retry429Logic {
2394 client: Client,
2395}
2396
2397#[cfg_attr(target_family = "wasm", async_trait(?Send))]
2398#[cfg_attr(not(target_family = "wasm"), async_trait)]
2399impl HttpService for Retry429Logic {
2400 async fn call<'a>(
2401 &'a self,
2402 req: &'a (dyn Fn() -> Result<http::Request<Bytes>, AgentError> + Send + Sync),
2403 _max_tcp_retries: usize,
2404 _size_limit: Option<usize>,
2405 ) -> Result<http::Response<Bytes>, AgentError> {
2406 let mut retries = 0;
2407 loop {
2408 #[cfg(not(target_family = "wasm"))]
2409 let resp = self.client.call(req, _max_tcp_retries, _size_limit).await?;
2410 #[cfg(target_family = "wasm")]
2412 let resp = {
2413 let request = from_http_request(req()?)?;
2414 let resp = self
2415 .client
2416 .execute(request)
2417 .await
2418 .map_err(|e| AgentError::TransportError(TransportError::Reqwest(e)))?;
2419
2420 to_http_response(resp, _size_limit).await?
2421 };
2422
2423 if resp.status() == StatusCode::TOO_MANY_REQUESTS {
2424 if retries == 6 {
2425 break Ok(resp);
2426 } else {
2427 retries += 1;
2428 crate::util::sleep(Duration::from_millis(250)).await;
2429 continue;
2430 }
2431 } else {
2432 break Ok(resp);
2433 }
2434 }
2435 }
2436}
2437
2438#[cfg(all(test, not(target_family = "wasm")))]
2439mod offline_tests {
2440 use super::*;
2441 use tokio::net::TcpListener;
2442 #[test]
2445 fn rounded_expiry() {
2446 let agent = Agent::builder()
2447 .with_url("http://not-a-real-url")
2448 .build()
2449 .unwrap();
2450 let mut prev_expiry = None;
2451 let mut num_timestamps = 0;
2452 for _ in 0..6 {
2453 let update = agent
2454 .update(&Principal::management_canister(), "not_a_method")
2455 .sign()
2456 .unwrap();
2457 if prev_expiry < Some(update.ingress_expiry) {
2458 prev_expiry = Some(update.ingress_expiry);
2459 num_timestamps += 1;
2460 }
2461 }
2462 assert!(num_timestamps <= 2, "num_timestamps:{num_timestamps} > 2");
2464 }
2465
2466 #[tokio::test]
2467 async fn client_ratelimit() {
2468 let mock_server = TcpListener::bind("127.0.0.1:0").await.unwrap();
2469 let count = Arc::new(Mutex::new(0));
2470 let port = mock_server.local_addr().unwrap().port();
2471 tokio::spawn({
2472 let count = count.clone();
2473 async move {
2474 loop {
2475 let (mut conn, _) = mock_server.accept().await.unwrap();
2476 *count.lock().unwrap() += 1;
2477 tokio::spawn(
2478 async move { tokio::io::copy(&mut conn, &mut tokio::io::sink()).await },
2480 );
2481 }
2482 }
2483 });
2484 let agent = Agent::builder()
2485 .with_http_client(Client::builder().http1_only().build().unwrap())
2486 .with_url(format!("http://127.0.0.1:{port}"))
2487 .with_max_concurrent_requests(2)
2488 .build()
2489 .unwrap();
2490 for _ in 0..3 {
2491 let agent = agent.clone();
2492 tokio::spawn(async move {
2493 agent
2494 .query(&"ryjl3-tyaaa-aaaaa-aaaba-cai".parse().unwrap(), "greet")
2495 .call()
2496 .await
2497 });
2498 }
2499 crate::util::sleep(Duration::from_millis(250)).await;
2500 assert_eq!(*count.lock().unwrap(), 2);
2501 }
2502}