ic_agent/agent/
mod.rs

1//! The main Agent module. Contains the [Agent] type and all associated structures.
2pub(crate) mod agent_config;
3pub mod agent_error;
4pub(crate) mod builder;
5// delete this module after 0.40
6#[doc(hidden)]
7#[deprecated(since = "0.38.0", note = "use the AgentBuilder methods")]
8pub mod http_transport;
9pub(crate) mod nonce;
10pub(crate) mod response_authentication;
11pub mod route_provider;
12pub mod status;
13
14pub use agent_config::AgentConfig;
15pub use agent_error::AgentError;
16use agent_error::HttpErrorPayload;
17use async_lock::Semaphore;
18use async_trait::async_trait;
19pub use builder::AgentBuilder;
20use cached::{Cached, TimedCache};
21use ed25519_consensus::{Error as Ed25519Error, Signature, VerificationKey};
22use futures_util::StreamExt;
23use http::{header::CONTENT_TYPE, HeaderMap, Method, StatusCode};
24#[doc(inline)]
25pub use ic_transport_types::{
26    signed, CallResponse, Envelope, EnvelopeContent, RejectCode, RejectResponse, ReplyResponse,
27    RequestStatusResponse,
28};
29pub use nonce::{NonceFactory, NonceGenerator};
30use rangemap::{RangeInclusiveMap, RangeInclusiveSet, StepFns};
31use reqwest::{Body, Client, Request, Response};
32use route_provider::{
33    dynamic_routing::{
34        dynamic_route_provider::DynamicRouteProviderBuilder, node::Node,
35        snapshot::latency_based_routing::LatencyRoutingSnapshot,
36    },
37    RouteProvider, UrlUntilReady,
38};
39use time::OffsetDateTime;
40use tower_service::Service;
41
42#[cfg(test)]
43mod agent_test;
44
45use crate::{
46    agent::response_authentication::{
47        extract_der, lookup_canister_info, lookup_canister_metadata, lookup_request_status,
48        lookup_subnet, lookup_subnet_metrics, lookup_time, lookup_value,
49    },
50    export::Principal,
51    identity::Identity,
52    to_request_id, RequestId,
53};
54use backoff::{backoff::Backoff, ExponentialBackoffBuilder};
55use backoff::{exponential::ExponentialBackoff, SystemClock};
56use ic_certification::{Certificate, Delegation, Label};
57use ic_transport_types::{
58    signed::{SignedQuery, SignedRequestStatus, SignedUpdate},
59    QueryResponse, ReadStateResponse, SubnetMetrics, TransportCallResponse,
60};
61use serde::Serialize;
62use status::Status;
63use std::{
64    borrow::Cow,
65    collections::HashMap,
66    convert::TryFrom,
67    fmt::{self, Debug},
68    future::{Future, IntoFuture},
69    pin::Pin,
70    sync::{Arc, Mutex, RwLock},
71    task::{Context, Poll},
72    time::Duration,
73};
74
75use crate::agent::response_authentication::lookup_api_boundary_nodes;
76
77const IC_STATE_ROOT_DOMAIN_SEPARATOR: &[u8; 14] = b"\x0Dic-state-root";
78
79const IC_ROOT_KEY: &[u8; 133] = b"\x30\x81\x82\x30\x1d\x06\x0d\x2b\x06\x01\x04\x01\x82\xdc\x7c\x05\x03\x01\x02\x01\x06\x0c\x2b\x06\x01\x04\x01\x82\xdc\x7c\x05\x03\x02\x01\x03\x61\x00\x81\x4c\x0e\x6e\xc7\x1f\xab\x58\x3b\x08\xbd\x81\x37\x3c\x25\x5c\x3c\x37\x1b\x2e\x84\x86\x3c\x98\xa4\xf1\xe0\x8b\x74\x23\x5d\x14\xfb\x5d\x9c\x0c\xd5\x46\xd9\x68\x5f\x91\x3a\x0c\x0b\x2c\xc5\x34\x15\x83\xbf\x4b\x43\x92\xe4\x67\xdb\x96\xd6\x5b\x9b\xb4\xcb\x71\x71\x12\xf8\x47\x2e\x0d\x5a\x4d\x14\x50\x5f\xfd\x74\x84\xb0\x12\x91\x09\x1c\x5f\x87\xb9\x88\x83\x46\x3f\x98\x09\x1a\x0b\xaa\xae";
80
81#[cfg(not(target_family = "wasm"))]
82type AgentFuture<'a, V> = Pin<Box<dyn Future<Output = Result<V, AgentError>> + Send + 'a>>;
83
84#[cfg(target_family = "wasm")]
85type AgentFuture<'a, V> = Pin<Box<dyn Future<Output = Result<V, AgentError>> + 'a>>;
86
87/// A low level Agent to make calls to a Replica endpoint.
88///
89/// ```ignore
90/// # // This test is ignored because it requires an ic to be running. We run these
91/// # // in the ic-ref workflow.
92/// use ic_agent::{Agent, export::Principal};
93/// use candid::{Encode, Decode, CandidType, Nat};
94/// use serde::Deserialize;
95///
96/// #[derive(CandidType)]
97/// struct Argument {
98///   amount: Option<Nat>,
99/// }
100///
101/// #[derive(CandidType, Deserialize)]
102/// struct CreateCanisterResult {
103///   canister_id: Principal,
104/// }
105///
106/// # fn create_identity() -> impl ic_agent::Identity {
107/// #
108/// #     ic_agent::identity::BasicIdentity::from_signing_key(
109/// #         ed25519_consensus::SigningKey::new(rand::thread_rng())
110/// #     )
111/// # }
112/// #
113/// async fn create_a_canister() -> Result<Principal, Box<dyn std::error::Error>> {
114/// # let url = format!("http://localhost:{}", option_env!("IC_REF_PORT").unwrap_or("4943"));
115///   let agent = Agent::builder()
116///     .with_url(url)
117///     .with_identity(create_identity())
118///     .build()?;
119///
120///   // Only do the following call when not contacting the IC main net (e.g. a local emulator).
121///   // This is important as the main net public key is static and a rogue network could return
122///   // a different key.
123///   // If you know the root key ahead of time, you can use `agent.set_root_key(root_key);`.
124///   agent.fetch_root_key().await?;
125///   let management_canister_id = Principal::from_text("aaaaa-aa")?;
126///
127///   // Create a call to the management canister to create a new canister ID,
128///   // and wait for a result.
129///   // The effective canister id must belong to the canister ranges of the subnet at which the canister is created.
130///   let effective_canister_id = Principal::from_text("rwlgt-iiaaa-aaaaa-aaaaa-cai").unwrap();
131///   let response = agent.update(&management_canister_id, "provisional_create_canister_with_cycles")
132///     .with_effective_canister_id(effective_canister_id)
133///     .with_arg(Encode!(&Argument { amount: None })?)
134///     .await?;
135///
136///   let result = Decode!(response.as_slice(), CreateCanisterResult)?;
137///   let canister_id: Principal = Principal::from_text(&result.canister_id.to_text())?;
138///   Ok(canister_id)
139/// }
140///
141/// # let mut runtime = tokio::runtime::Runtime::new().unwrap();
142/// # runtime.block_on(async {
143/// let canister_id = create_a_canister().await.unwrap();
144/// eprintln!("{}", canister_id);
145/// # });
146/// ```
147///
148/// This agent does not understand Candid, and only acts on byte buffers.
149///
150/// Some methods return certificates. While there is a `verify_certificate` method, any certificate
151/// you receive from a method has already been verified and you do not need to manually verify it.
152#[derive(Clone)]
153pub struct Agent {
154    nonce_factory: Arc<dyn NonceGenerator>,
155    identity: Arc<dyn Identity>,
156    ingress_expiry: Duration,
157    root_key: Arc<RwLock<Vec<u8>>>,
158    client: Arc<dyn HttpService>,
159    route_provider: Arc<dyn RouteProvider>,
160    subnet_key_cache: Arc<Mutex<SubnetCache>>,
161    concurrent_requests_semaphore: Arc<Semaphore>,
162    verify_query_signatures: bool,
163    max_response_body_size: Option<usize>,
164    max_polling_time: Duration,
165    #[allow(dead_code)]
166    max_tcp_error_retries: usize,
167}
168
169impl fmt::Debug for Agent {
170    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
171        f.debug_struct("Agent")
172            .field("ingress_expiry", &self.ingress_expiry)
173            .finish_non_exhaustive()
174    }
175}
176
177impl Agent {
178    /// Create an instance of an [`AgentBuilder`] for building an [`Agent`]. This is simpler than
179    /// using the [`AgentConfig`] and [`Agent::new()`].
180    pub fn builder() -> builder::AgentBuilder {
181        Default::default()
182    }
183
184    /// Create an instance of an [`Agent`].
185    pub fn new(config: agent_config::AgentConfig) -> Result<Agent, AgentError> {
186        let client = config.http_service.unwrap_or_else(|| {
187            Arc::new(Retry429Logic {
188                client: config.client.unwrap_or_else(|| {
189                    #[cfg(not(target_family = "wasm"))]
190                    {
191                        Client::builder()
192                            .use_rustls_tls()
193                            .timeout(Duration::from_secs(360))
194                            .build()
195                            .expect("Could not create HTTP client.")
196                    }
197                    #[cfg(all(target_family = "wasm", feature = "wasm-bindgen"))]
198                    {
199                        Client::new()
200                    }
201                }),
202            })
203        });
204        Ok(Agent {
205            nonce_factory: config.nonce_factory,
206            identity: config.identity,
207            ingress_expiry: config.ingress_expiry,
208            root_key: Arc::new(RwLock::new(IC_ROOT_KEY.to_vec())),
209            client: client.clone(),
210            route_provider: if let Some(route_provider) = config.route_provider {
211                route_provider
212            } else if let Some(url) = config.url {
213                if config.background_dynamic_routing {
214                    assert!(
215                        url.scheme() == "https" && url.path() == "/" && url.port().is_none() && url.domain().is_some(),
216                        "in dynamic routing mode, URL must be in the exact form https://domain with no path, port, IP, or non-HTTPS scheme"
217                    );
218                    let seeds = vec![Node::new(url.domain().unwrap()).unwrap()];
219                    UrlUntilReady::new(url, async move {
220                        DynamicRouteProviderBuilder::new(
221                            LatencyRoutingSnapshot::new(),
222                            seeds,
223                            client,
224                        )
225                        .build()
226                        .await
227                    }) as Arc<dyn RouteProvider>
228                } else {
229                    Arc::new(url)
230                }
231            } else {
232                panic!("either route_provider or url must be specified");
233            },
234            subnet_key_cache: Arc::new(Mutex::new(SubnetCache::new())),
235            verify_query_signatures: config.verify_query_signatures,
236            concurrent_requests_semaphore: Arc::new(Semaphore::new(config.max_concurrent_requests)),
237            max_response_body_size: config.max_response_body_size,
238            max_tcp_error_retries: config.max_tcp_error_retries,
239            max_polling_time: config.max_polling_time,
240        })
241    }
242
243    /// Set the identity provider for signing messages.
244    ///
245    /// NOTE: if you change the identity while having update calls in
246    /// flight, you will not be able to [`Agent::request_status_raw`] the status of these
247    /// messages.
248    pub fn set_identity<I>(&mut self, identity: I)
249    where
250        I: 'static + Identity,
251    {
252        self.identity = Arc::new(identity);
253    }
254    /// Set the arc identity provider for signing messages.
255    ///
256    /// NOTE: if you change the identity while having update calls in
257    /// flight, you will not be able to [`Agent::request_status_raw`] the status of these
258    /// messages.
259    pub fn set_arc_identity(&mut self, identity: Arc<dyn Identity>) {
260        self.identity = identity;
261    }
262
263    /// By default, the agent is configured to talk to the main Internet Computer, and verifies
264    /// responses using a hard-coded public key.
265    ///
266    /// This function will instruct the agent to ask the endpoint for its public key, and use
267    /// that instead. This is required when talking to a local test instance, for example.
268    ///
269    /// *Only use this when you are  _not_ talking to the main Internet Computer, otherwise
270    /// you are prone to man-in-the-middle attacks! Do not call this function by default.*
271    pub async fn fetch_root_key(&self) -> Result<(), AgentError> {
272        if self.read_root_key()[..] != IC_ROOT_KEY[..] {
273            // already fetched the root key
274            return Ok(());
275        }
276        let status = self.status().await?;
277        let Some(root_key) = status.root_key else {
278            return Err(AgentError::NoRootKeyInStatus(status));
279        };
280        self.set_root_key(root_key);
281        Ok(())
282    }
283
284    /// By default, the agent is configured to talk to the main Internet Computer, and verifies
285    /// responses using a hard-coded public key.
286    ///
287    /// Using this function you can set the root key to a known one if you know if beforehand.
288    pub fn set_root_key(&self, root_key: Vec<u8>) {
289        *self.root_key.write().unwrap() = root_key;
290    }
291
292    /// Return the root key currently in use.
293    pub fn read_root_key(&self) -> Vec<u8> {
294        self.root_key.read().unwrap().clone()
295    }
296
297    fn get_expiry_date(&self) -> u64 {
298        let expiry_raw = OffsetDateTime::now_utc() + self.ingress_expiry;
299        let mut rounded = expiry_raw.replace_nanosecond(0).unwrap();
300        if self.ingress_expiry.as_secs() > 90 {
301            rounded = rounded.replace_second(0).unwrap();
302        }
303        rounded.unix_timestamp_nanos().try_into().unwrap()
304    }
305
306    /// Return the principal of the identity.
307    pub fn get_principal(&self) -> Result<Principal, String> {
308        self.identity.sender()
309    }
310
311    async fn query_endpoint<A>(
312        &self,
313        effective_canister_id: Principal,
314        serialized_bytes: Vec<u8>,
315    ) -> Result<A, AgentError>
316    where
317        A: serde::de::DeserializeOwned,
318    {
319        let _permit = self.concurrent_requests_semaphore.acquire().await;
320        let bytes = self
321            .execute(
322                Method::POST,
323                &format!("api/v2/canister/{}/query", effective_canister_id.to_text()),
324                Some(serialized_bytes),
325            )
326            .await?
327            .1;
328        serde_cbor::from_slice(&bytes).map_err(AgentError::InvalidCborData)
329    }
330
331    async fn read_state_endpoint<A>(
332        &self,
333        effective_canister_id: Principal,
334        serialized_bytes: Vec<u8>,
335    ) -> Result<A, AgentError>
336    where
337        A: serde::de::DeserializeOwned,
338    {
339        let _permit = self.concurrent_requests_semaphore.acquire().await;
340        let endpoint = format!(
341            "api/v2/canister/{}/read_state",
342            effective_canister_id.to_text()
343        );
344        let bytes = self
345            .execute(Method::POST, &endpoint, Some(serialized_bytes))
346            .await?
347            .1;
348        serde_cbor::from_slice(&bytes).map_err(AgentError::InvalidCborData)
349    }
350
351    async fn read_subnet_state_endpoint<A>(
352        &self,
353        subnet_id: Principal,
354        serialized_bytes: Vec<u8>,
355    ) -> Result<A, AgentError>
356    where
357        A: serde::de::DeserializeOwned,
358    {
359        let _permit = self.concurrent_requests_semaphore.acquire().await;
360        let endpoint = format!("api/v2/subnet/{}/read_state", subnet_id.to_text());
361        let bytes = self
362            .execute(Method::POST, &endpoint, Some(serialized_bytes))
363            .await?
364            .1;
365        serde_cbor::from_slice(&bytes).map_err(AgentError::InvalidCborData)
366    }
367
368    async fn call_endpoint(
369        &self,
370        effective_canister_id: Principal,
371        serialized_bytes: Vec<u8>,
372    ) -> Result<TransportCallResponse, AgentError> {
373        let _permit = self.concurrent_requests_semaphore.acquire().await;
374        let endpoint = format!("api/v3/canister/{}/call", effective_canister_id.to_text());
375        let (status_code, response_body) = self
376            .execute(Method::POST, &endpoint, Some(serialized_bytes))
377            .await?;
378
379        if status_code == StatusCode::ACCEPTED {
380            return Ok(TransportCallResponse::Accepted);
381        }
382
383        serde_cbor::from_slice(&response_body).map_err(AgentError::InvalidCborData)
384    }
385
386    /// The simplest way to do a query call; sends a byte array and will return a byte vector.
387    /// The encoding is left as an exercise to the user.
388    #[allow(clippy::too_many_arguments)]
389    async fn query_raw(
390        &self,
391        canister_id: Principal,
392        effective_canister_id: Principal,
393        method_name: String,
394        arg: Vec<u8>,
395        ingress_expiry_datetime: Option<u64>,
396        use_nonce: bool,
397        explicit_verify_query_signatures: Option<bool>,
398    ) -> Result<Vec<u8>, AgentError> {
399        let content = self.query_content(
400            canister_id,
401            method_name,
402            arg,
403            ingress_expiry_datetime,
404            use_nonce,
405        )?;
406        let serialized_bytes = sign_envelope(&content, self.identity.clone())?;
407        self.query_inner(
408            effective_canister_id,
409            serialized_bytes,
410            content.to_request_id(),
411            explicit_verify_query_signatures,
412        )
413        .await
414    }
415
416    /// Send the signed query to the network. Will return a byte vector.
417    /// The bytes will be checked if it is a valid query.
418    /// If you want to inspect the fields of the query call, use [`signed_query_inspect`] before calling this method.
419    pub async fn query_signed(
420        &self,
421        effective_canister_id: Principal,
422        signed_query: Vec<u8>,
423    ) -> Result<Vec<u8>, AgentError> {
424        let envelope: Envelope =
425            serde_cbor::from_slice(&signed_query).map_err(AgentError::InvalidCborData)?;
426        self.query_inner(
427            effective_canister_id,
428            signed_query,
429            envelope.content.to_request_id(),
430            None,
431        )
432        .await
433    }
434
435    /// Helper function for performing both the query call and possibly a `read_state` to check the subnet node keys.
436    ///
437    /// This should be used instead of `query_endpoint`. No validation is performed on `signed_query`.
438    async fn query_inner(
439        &self,
440        effective_canister_id: Principal,
441        signed_query: Vec<u8>,
442        request_id: RequestId,
443        explicit_verify_query_signatures: Option<bool>,
444    ) -> Result<Vec<u8>, AgentError> {
445        let response = if explicit_verify_query_signatures.unwrap_or(self.verify_query_signatures) {
446            let (response, mut subnet) = futures_util::try_join!(
447                self.query_endpoint::<QueryResponse>(effective_canister_id, signed_query),
448                self.get_subnet_by_canister(&effective_canister_id)
449            )?;
450            if response.signatures().is_empty() {
451                return Err(AgentError::MissingSignature);
452            } else if response.signatures().len() > subnet.node_keys.len() {
453                return Err(AgentError::TooManySignatures {
454                    had: response.signatures().len(),
455                    needed: subnet.node_keys.len(),
456                });
457            }
458            for signature in response.signatures() {
459                if OffsetDateTime::now_utc()
460                    - OffsetDateTime::from_unix_timestamp_nanos(signature.timestamp.into()).unwrap()
461                    > self.ingress_expiry
462                {
463                    return Err(AgentError::CertificateOutdated(self.ingress_expiry));
464                }
465                let signable = response.signable(request_id, signature.timestamp);
466                let node_key = if let Some(node_key) = subnet.node_keys.get(&signature.identity) {
467                    node_key
468                } else {
469                    subnet = self
470                        .fetch_subnet_by_canister(&effective_canister_id)
471                        .await?;
472                    subnet
473                        .node_keys
474                        .get(&signature.identity)
475                        .ok_or(AgentError::CertificateNotAuthorized())?
476                };
477                if node_key.len() != 44 {
478                    return Err(AgentError::DerKeyLengthMismatch {
479                        expected: 44,
480                        actual: node_key.len(),
481                    });
482                }
483                const DER_PREFIX: [u8; 12] = [48, 42, 48, 5, 6, 3, 43, 101, 112, 3, 33, 0];
484                if node_key[..12] != DER_PREFIX {
485                    return Err(AgentError::DerPrefixMismatch {
486                        expected: DER_PREFIX.to_vec(),
487                        actual: node_key[..12].to_vec(),
488                    });
489                }
490                let pubkey =
491                    VerificationKey::try_from(<[u8; 32]>::try_from(&node_key[12..]).unwrap())
492                        .map_err(|_| AgentError::MalformedPublicKey)?;
493                let sig = Signature::from(
494                    <[u8; 64]>::try_from(&signature.signature[..])
495                        .map_err(|_| AgentError::MalformedSignature)?,
496                );
497
498                match pubkey.verify(&sig, &signable) {
499                    Err(Ed25519Error::InvalidSignature) => {
500                        return Err(AgentError::QuerySignatureVerificationFailed)
501                    }
502                    Err(Ed25519Error::InvalidSliceLength) => {
503                        return Err(AgentError::MalformedSignature)
504                    }
505                    Err(Ed25519Error::MalformedPublicKey) => {
506                        return Err(AgentError::MalformedPublicKey)
507                    }
508                    Ok(()) => (),
509                    _ => unreachable!(),
510                }
511            }
512            response
513        } else {
514            self.query_endpoint::<QueryResponse>(effective_canister_id, signed_query)
515                .await?
516        };
517
518        match response {
519            QueryResponse::Replied { reply, .. } => Ok(reply.arg),
520            QueryResponse::Rejected { reject, .. } => Err(AgentError::UncertifiedReject(reject)),
521        }
522    }
523
524    fn query_content(
525        &self,
526        canister_id: Principal,
527        method_name: String,
528        arg: Vec<u8>,
529        ingress_expiry_datetime: Option<u64>,
530        use_nonce: bool,
531    ) -> Result<EnvelopeContent, AgentError> {
532        Ok(EnvelopeContent::Query {
533            sender: self.identity.sender().map_err(AgentError::SigningError)?,
534            canister_id,
535            method_name,
536            arg,
537            ingress_expiry: ingress_expiry_datetime.unwrap_or_else(|| self.get_expiry_date()),
538            nonce: use_nonce.then(|| self.nonce_factory.generate()).flatten(),
539        })
540    }
541
542    /// The simplest way to do an update call; sends a byte array and will return a response, [`CallResponse`], from the replica.
543    async fn update_raw(
544        &self,
545        canister_id: Principal,
546        effective_canister_id: Principal,
547        method_name: String,
548        arg: Vec<u8>,
549        ingress_expiry_datetime: Option<u64>,
550    ) -> Result<CallResponse<(Vec<u8>, Certificate)>, AgentError> {
551        let nonce = self.nonce_factory.generate();
552        let content = self.update_content(
553            canister_id,
554            method_name,
555            arg,
556            ingress_expiry_datetime,
557            nonce,
558        )?;
559        let request_id = to_request_id(&content)?;
560        let serialized_bytes = sign_envelope(&content, self.identity.clone())?;
561
562        let response_body = self
563            .call_endpoint(effective_canister_id, serialized_bytes)
564            .await?;
565
566        match response_body {
567            TransportCallResponse::Replied { certificate } => {
568                let certificate =
569                    serde_cbor::from_slice(&certificate).map_err(AgentError::InvalidCborData)?;
570
571                self.verify(&certificate, effective_canister_id)?;
572                let status = lookup_request_status(&certificate, &request_id)?;
573
574                match status {
575                    RequestStatusResponse::Replied(reply) => {
576                        Ok(CallResponse::Response((reply.arg, certificate)))
577                    }
578                    RequestStatusResponse::Rejected(reject_response) => {
579                        Err(AgentError::CertifiedReject(reject_response))?
580                    }
581                    _ => Ok(CallResponse::Poll(request_id)),
582                }
583            }
584            TransportCallResponse::Accepted => Ok(CallResponse::Poll(request_id)),
585            TransportCallResponse::NonReplicatedRejection(reject_response) => {
586                Err(AgentError::UncertifiedReject(reject_response))
587            }
588        }
589    }
590
591    /// Send the signed update to the network. Will return a [`CallResponse<Vec<u8>>`].
592    /// The bytes will be checked to verify that it is a valid update.
593    /// If you want to inspect the fields of the update, use [`signed_update_inspect`] before calling this method.
594    pub async fn update_signed(
595        &self,
596        effective_canister_id: Principal,
597        signed_update: Vec<u8>,
598    ) -> Result<CallResponse<Vec<u8>>, AgentError> {
599        let envelope: Envelope =
600            serde_cbor::from_slice(&signed_update).map_err(AgentError::InvalidCborData)?;
601        let request_id = to_request_id(&envelope.content)?;
602
603        let response_body = self
604            .call_endpoint(effective_canister_id, signed_update)
605            .await?;
606
607        match response_body {
608            TransportCallResponse::Replied { certificate } => {
609                let certificate =
610                    serde_cbor::from_slice(&certificate).map_err(AgentError::InvalidCborData)?;
611
612                self.verify(&certificate, effective_canister_id)?;
613                let status = lookup_request_status(&certificate, &request_id)?;
614
615                match status {
616                    RequestStatusResponse::Replied(reply) => Ok(CallResponse::Response(reply.arg)),
617                    RequestStatusResponse::Rejected(reject_response) => {
618                        Err(AgentError::CertifiedReject(reject_response))?
619                    }
620                    _ => Ok(CallResponse::Poll(request_id)),
621                }
622            }
623            TransportCallResponse::Accepted => Ok(CallResponse::Poll(request_id)),
624            TransportCallResponse::NonReplicatedRejection(reject_response) => {
625                Err(AgentError::UncertifiedReject(reject_response))
626            }
627        }
628    }
629
630    fn update_content(
631        &self,
632        canister_id: Principal,
633        method_name: String,
634        arg: Vec<u8>,
635        ingress_expiry_datetime: Option<u64>,
636        nonce: Option<Vec<u8>>,
637    ) -> Result<EnvelopeContent, AgentError> {
638        Ok(EnvelopeContent::Call {
639            canister_id,
640            method_name,
641            arg,
642            nonce,
643            sender: self.identity.sender().map_err(AgentError::SigningError)?,
644            ingress_expiry: ingress_expiry_datetime.unwrap_or_else(|| self.get_expiry_date()),
645        })
646    }
647
648    fn get_retry_policy(&self) -> ExponentialBackoff<SystemClock> {
649        ExponentialBackoffBuilder::new()
650            .with_initial_interval(Duration::from_millis(500))
651            .with_max_interval(Duration::from_secs(1))
652            .with_multiplier(1.4)
653            .with_max_elapsed_time(Some(self.max_polling_time))
654            .build()
655    }
656
657    /// Wait for `request_status` to return a Replied response and return the arg.
658    pub async fn wait_signed(
659        &self,
660        request_id: &RequestId,
661        effective_canister_id: Principal,
662        signed_request_status: Vec<u8>,
663    ) -> Result<(Vec<u8>, Certificate), AgentError> {
664        let mut retry_policy = self.get_retry_policy();
665
666        let mut request_accepted = false;
667        let (resp, cert) = self
668            .request_status_signed(
669                request_id,
670                effective_canister_id,
671                signed_request_status.clone(),
672            )
673            .await?;
674        loop {
675            match resp {
676                RequestStatusResponse::Unknown => {}
677
678                RequestStatusResponse::Received | RequestStatusResponse::Processing => {
679                    if !request_accepted {
680                        retry_policy.reset();
681                        request_accepted = true;
682                    }
683                }
684
685                RequestStatusResponse::Replied(ReplyResponse { arg, .. }) => {
686                    return Ok((arg, cert))
687                }
688
689                RequestStatusResponse::Rejected(response) => {
690                    return Err(AgentError::CertifiedReject(response))
691                }
692
693                RequestStatusResponse::Done => {
694                    return Err(AgentError::RequestStatusDoneNoReply(String::from(
695                        *request_id,
696                    )))
697                }
698            };
699
700            match retry_policy.next_backoff() {
701                Some(duration) => crate::util::sleep(duration).await,
702
703                None => return Err(AgentError::TimeoutWaitingForResponse()),
704            }
705        }
706    }
707
708    /// Call `request_status` on the `RequestId` in a loop and return the response as a byte vector.
709    pub async fn wait(
710        &self,
711        request_id: &RequestId,
712        effective_canister_id: Principal,
713    ) -> Result<(Vec<u8>, Certificate), AgentError> {
714        let mut retry_policy = self.get_retry_policy();
715
716        let mut request_accepted = false;
717        loop {
718            let (resp, cert) = self
719                .request_status_raw(request_id, effective_canister_id)
720                .await?;
721            match resp {
722                RequestStatusResponse::Unknown => {}
723
724                RequestStatusResponse::Received | RequestStatusResponse::Processing => {
725                    if !request_accepted {
726                        // The system will return RequestStatusResponse::Unknown
727                        // until the request is accepted
728                        // and we generally cannot know how long that will take.
729                        // State transitions between Received and Processing may be
730                        // instantaneous. Therefore, once we know the request is accepted,
731                        // we should restart the backoff so the request does not time out.
732
733                        retry_policy.reset();
734                        request_accepted = true;
735                    }
736                }
737
738                RequestStatusResponse::Replied(ReplyResponse { arg, .. }) => {
739                    return Ok((arg, cert))
740                }
741
742                RequestStatusResponse::Rejected(response) => {
743                    return Err(AgentError::CertifiedReject(response))
744                }
745
746                RequestStatusResponse::Done => {
747                    return Err(AgentError::RequestStatusDoneNoReply(String::from(
748                        *request_id,
749                    )))
750                }
751            };
752
753            match retry_policy.next_backoff() {
754                Some(duration) => crate::util::sleep(duration).await,
755
756                None => return Err(AgentError::TimeoutWaitingForResponse()),
757            }
758        }
759    }
760
761    /// Request the raw state tree directly, under an effective canister ID.
762    /// See [the protocol docs](https://internetcomputer.org/docs/current/references/ic-interface-spec#http-read-state) for more information.
763    pub async fn read_state_raw(
764        &self,
765        paths: Vec<Vec<Label>>,
766        effective_canister_id: Principal,
767    ) -> Result<Certificate, AgentError> {
768        let content = self.read_state_content(paths)?;
769        let serialized_bytes = sign_envelope(&content, self.identity.clone())?;
770
771        let read_state_response: ReadStateResponse = self
772            .read_state_endpoint(effective_canister_id, serialized_bytes)
773            .await?;
774        let cert: Certificate = serde_cbor::from_slice(&read_state_response.certificate)
775            .map_err(AgentError::InvalidCborData)?;
776        self.verify(&cert, effective_canister_id)?;
777        Ok(cert)
778    }
779
780    /// Request the raw state tree directly, under a subnet ID.
781    /// See [the protocol docs](https://internetcomputer.org/docs/current/references/ic-interface-spec#http-read-state) for more information.
782    pub async fn read_subnet_state_raw(
783        &self,
784        paths: Vec<Vec<Label>>,
785        subnet_id: Principal,
786    ) -> Result<Certificate, AgentError> {
787        let content = self.read_state_content(paths)?;
788        let serialized_bytes = sign_envelope(&content, self.identity.clone())?;
789
790        let read_state_response: ReadStateResponse = self
791            .read_subnet_state_endpoint(subnet_id, serialized_bytes)
792            .await?;
793        let cert: Certificate = serde_cbor::from_slice(&read_state_response.certificate)
794            .map_err(AgentError::InvalidCborData)?;
795        self.verify_for_subnet(&cert, subnet_id)?;
796        Ok(cert)
797    }
798
799    fn read_state_content(&self, paths: Vec<Vec<Label>>) -> Result<EnvelopeContent, AgentError> {
800        Ok(EnvelopeContent::ReadState {
801            sender: self.identity.sender().map_err(AgentError::SigningError)?,
802            paths,
803            ingress_expiry: self.get_expiry_date(),
804        })
805    }
806
807    /// Verify a certificate, checking delegation if present.
808    /// Only passes if the certificate also has authority over the canister.
809    pub fn verify(
810        &self,
811        cert: &Certificate,
812        effective_canister_id: Principal,
813    ) -> Result<(), AgentError> {
814        self.verify_cert(cert, effective_canister_id)?;
815        self.verify_cert_timestamp(cert)?;
816        Ok(())
817    }
818
819    fn verify_cert(
820        &self,
821        cert: &Certificate,
822        effective_canister_id: Principal,
823    ) -> Result<(), AgentError> {
824        let sig = &cert.signature;
825
826        let root_hash = cert.tree.digest();
827        let mut msg = vec![];
828        msg.extend_from_slice(IC_STATE_ROOT_DOMAIN_SEPARATOR);
829        msg.extend_from_slice(&root_hash);
830
831        let der_key = self.check_delegation(&cert.delegation, effective_canister_id)?;
832        let key = extract_der(der_key)?;
833
834        ic_verify_bls_signature::verify_bls_signature(sig, &msg, &key)
835            .map_err(|_| AgentError::CertificateVerificationFailed())?;
836        Ok(())
837    }
838
839    /// Verify a certificate, checking delegation if present.
840    /// Only passes if the certificate is for the specified subnet.
841    pub fn verify_for_subnet(
842        &self,
843        cert: &Certificate,
844        subnet_id: Principal,
845    ) -> Result<(), AgentError> {
846        self.verify_cert_for_subnet(cert, subnet_id)?;
847        self.verify_cert_timestamp(cert)?;
848        Ok(())
849    }
850
851    fn verify_cert_for_subnet(
852        &self,
853        cert: &Certificate,
854        subnet_id: Principal,
855    ) -> Result<(), AgentError> {
856        let sig = &cert.signature;
857
858        let root_hash = cert.tree.digest();
859        let mut msg = vec![];
860        msg.extend_from_slice(IC_STATE_ROOT_DOMAIN_SEPARATOR);
861        msg.extend_from_slice(&root_hash);
862
863        let der_key = self.check_delegation_for_subnet(&cert.delegation, subnet_id)?;
864        let key = extract_der(der_key)?;
865
866        ic_verify_bls_signature::verify_bls_signature(sig, &msg, &key)
867            .map_err(|_| AgentError::CertificateVerificationFailed())?;
868        Ok(())
869    }
870
871    fn verify_cert_timestamp(&self, cert: &Certificate) -> Result<(), AgentError> {
872        let time = lookup_time(cert)?;
873        if (OffsetDateTime::now_utc()
874            - OffsetDateTime::from_unix_timestamp_nanos(time.into()).unwrap())
875        .abs()
876            > self.ingress_expiry
877        {
878            Err(AgentError::CertificateOutdated(self.ingress_expiry))
879        } else {
880            Ok(())
881        }
882    }
883
884    fn check_delegation(
885        &self,
886        delegation: &Option<Delegation>,
887        effective_canister_id: Principal,
888    ) -> Result<Vec<u8>, AgentError> {
889        match delegation {
890            None => Ok(self.read_root_key()),
891            Some(delegation) => {
892                let cert: Certificate = serde_cbor::from_slice(&delegation.certificate)
893                    .map_err(AgentError::InvalidCborData)?;
894                if cert.delegation.is_some() {
895                    return Err(AgentError::CertificateHasTooManyDelegations);
896                }
897                self.verify_cert(&cert, effective_canister_id)?;
898                let canister_range_lookup = [
899                    "subnet".as_bytes(),
900                    delegation.subnet_id.as_ref(),
901                    "canister_ranges".as_bytes(),
902                ];
903                let canister_range = lookup_value(&cert.tree, canister_range_lookup)?;
904                let ranges: Vec<(Principal, Principal)> =
905                    serde_cbor::from_slice(canister_range).map_err(AgentError::InvalidCborData)?;
906                if !principal_is_within_ranges(&effective_canister_id, &ranges[..]) {
907                    // the certificate is not authorized to answer calls for this canister
908                    return Err(AgentError::CertificateNotAuthorized());
909                }
910
911                let public_key_path = [
912                    "subnet".as_bytes(),
913                    delegation.subnet_id.as_ref(),
914                    "public_key".as_bytes(),
915                ];
916                lookup_value(&cert.tree, public_key_path).map(<[u8]>::to_vec)
917            }
918        }
919    }
920
921    fn check_delegation_for_subnet(
922        &self,
923        delegation: &Option<Delegation>,
924        subnet_id: Principal,
925    ) -> Result<Vec<u8>, AgentError> {
926        match delegation {
927            None => Ok(self.read_root_key()),
928            Some(delegation) => {
929                let cert: Certificate = serde_cbor::from_slice(&delegation.certificate)
930                    .map_err(AgentError::InvalidCborData)?;
931                if cert.delegation.is_some() {
932                    return Err(AgentError::CertificateHasTooManyDelegations);
933                }
934                self.verify_cert_for_subnet(&cert, subnet_id)?;
935                let public_key_path = [
936                    "subnet".as_bytes(),
937                    delegation.subnet_id.as_ref(),
938                    "public_key".as_bytes(),
939                ];
940                let pk = lookup_value(&cert.tree, public_key_path)
941                    .map_err(|_| AgentError::CertificateNotAuthorized())?
942                    .to_vec();
943                Ok(pk)
944            }
945        }
946    }
947
948    /// Request information about a particular canister for a single state subkey.
949    /// See [the protocol docs](https://internetcomputer.org/docs/current/references/ic-interface-spec#state-tree-canister-information) for more information.
950    pub async fn read_state_canister_info(
951        &self,
952        canister_id: Principal,
953        path: &str,
954    ) -> Result<Vec<u8>, AgentError> {
955        let paths: Vec<Vec<Label>> = vec![vec![
956            "canister".into(),
957            Label::from_bytes(canister_id.as_slice()),
958            path.into(),
959        ]];
960
961        let cert = self.read_state_raw(paths, canister_id).await?;
962
963        lookup_canister_info(cert, canister_id, path)
964    }
965
966    /// Request the bytes of the canister's custom section `icp:public <path>` or `icp:private <path>`.
967    pub async fn read_state_canister_metadata(
968        &self,
969        canister_id: Principal,
970        path: &str,
971    ) -> Result<Vec<u8>, AgentError> {
972        let paths: Vec<Vec<Label>> = vec![vec![
973            "canister".into(),
974            Label::from_bytes(canister_id.as_slice()),
975            "metadata".into(),
976            path.into(),
977        ]];
978
979        let cert = self.read_state_raw(paths, canister_id).await?;
980
981        lookup_canister_metadata(cert, canister_id, path)
982    }
983
984    /// Request a list of metrics about the subnet.
985    pub async fn read_state_subnet_metrics(
986        &self,
987        subnet_id: Principal,
988    ) -> Result<SubnetMetrics, AgentError> {
989        let paths = vec![vec![
990            "subnet".into(),
991            Label::from_bytes(subnet_id.as_slice()),
992            "metrics".into(),
993        ]];
994        let cert = self.read_subnet_state_raw(paths, subnet_id).await?;
995        lookup_subnet_metrics(cert, subnet_id)
996    }
997
998    /// Fetches the status of a particular request by its ID.
999    pub async fn request_status_raw(
1000        &self,
1001        request_id: &RequestId,
1002        effective_canister_id: Principal,
1003    ) -> Result<(RequestStatusResponse, Certificate), AgentError> {
1004        let paths: Vec<Vec<Label>> =
1005            vec![vec!["request_status".into(), request_id.to_vec().into()]];
1006
1007        let cert = self.read_state_raw(paths, effective_canister_id).await?;
1008
1009        Ok((lookup_request_status(&cert, request_id)?, cert))
1010    }
1011
1012    /// Send the signed `request_status` to the network. Will return [`RequestStatusResponse`].
1013    /// The bytes will be checked to verify that it is a valid `request_status`.
1014    /// If you want to inspect the fields of the `request_status`, use [`signed_request_status_inspect`] before calling this method.
1015    pub async fn request_status_signed(
1016        &self,
1017        request_id: &RequestId,
1018        effective_canister_id: Principal,
1019        signed_request_status: Vec<u8>,
1020    ) -> Result<(RequestStatusResponse, Certificate), AgentError> {
1021        let _envelope: Envelope =
1022            serde_cbor::from_slice(&signed_request_status).map_err(AgentError::InvalidCborData)?;
1023        let read_state_response: ReadStateResponse = self
1024            .read_state_endpoint(effective_canister_id, signed_request_status)
1025            .await?;
1026
1027        let cert: Certificate = serde_cbor::from_slice(&read_state_response.certificate)
1028            .map_err(AgentError::InvalidCborData)?;
1029        self.verify(&cert, effective_canister_id)?;
1030        Ok((lookup_request_status(&cert, request_id)?, cert))
1031    }
1032
1033    /// Returns an `UpdateBuilder` enabling the construction of an update call without
1034    /// passing all arguments.
1035    pub fn update<S: Into<String>>(
1036        &self,
1037        canister_id: &Principal,
1038        method_name: S,
1039    ) -> UpdateBuilder {
1040        UpdateBuilder::new(self, *canister_id, method_name.into())
1041    }
1042
1043    /// Calls and returns the information returned by the status endpoint of a replica.
1044    pub async fn status(&self) -> Result<Status, AgentError> {
1045        let endpoint = "api/v2/status";
1046        let bytes = self.execute(Method::GET, endpoint, None).await?.1;
1047
1048        let cbor: serde_cbor::Value =
1049            serde_cbor::from_slice(&bytes).map_err(AgentError::InvalidCborData)?;
1050
1051        Status::try_from(&cbor).map_err(|_| AgentError::InvalidReplicaStatus)
1052    }
1053
1054    /// Returns a `QueryBuilder` enabling the construction of a query call without
1055    /// passing all arguments.
1056    pub fn query<S: Into<String>>(&self, canister_id: &Principal, method_name: S) -> QueryBuilder {
1057        QueryBuilder::new(self, *canister_id, method_name.into())
1058    }
1059
1060    /// Sign a `request_status` call. This will return a [`signed::SignedRequestStatus`]
1061    /// which contains all fields of the `request_status` and the signed `request_status` in CBOR encoding
1062    pub fn sign_request_status(
1063        &self,
1064        effective_canister_id: Principal,
1065        request_id: RequestId,
1066    ) -> Result<SignedRequestStatus, AgentError> {
1067        let paths: Vec<Vec<Label>> =
1068            vec![vec!["request_status".into(), request_id.to_vec().into()]];
1069        let read_state_content = self.read_state_content(paths)?;
1070        let signed_request_status = sign_envelope(&read_state_content, self.identity.clone())?;
1071        let ingress_expiry = read_state_content.ingress_expiry();
1072        let sender = *read_state_content.sender();
1073        Ok(SignedRequestStatus {
1074            ingress_expiry,
1075            sender,
1076            effective_canister_id,
1077            request_id,
1078            signed_request_status,
1079        })
1080    }
1081
1082    async fn get_subnet_by_canister(
1083        &self,
1084        canister: &Principal,
1085    ) -> Result<Arc<Subnet>, AgentError> {
1086        let subnet = self
1087            .subnet_key_cache
1088            .lock()
1089            .unwrap()
1090            .get_subnet_by_canister(canister);
1091        if let Some(subnet) = subnet {
1092            Ok(subnet)
1093        } else {
1094            self.fetch_subnet_by_canister(canister).await
1095        }
1096    }
1097
1098    /// Retrieve all existing API boundary nodes from the state tree via endpoint `/api/v2/canister/<effective_canister_id>/read_state`
1099    pub async fn fetch_api_boundary_nodes_by_canister_id(
1100        &self,
1101        canister_id: Principal,
1102    ) -> Result<Vec<ApiBoundaryNode>, AgentError> {
1103        let paths = vec![vec!["api_boundary_nodes".into()]];
1104        let certificate = self.read_state_raw(paths, canister_id).await?;
1105        let api_boundary_nodes = lookup_api_boundary_nodes(certificate)?;
1106        Ok(api_boundary_nodes)
1107    }
1108
1109    /// Retrieve all existing API boundary nodes from the state tree via endpoint `/api/v2/subnet/<subnet_id>/read_state`
1110    pub async fn fetch_api_boundary_nodes_by_subnet_id(
1111        &self,
1112        subnet_id: Principal,
1113    ) -> Result<Vec<ApiBoundaryNode>, AgentError> {
1114        let paths = vec![vec!["api_boundary_nodes".into()]];
1115        let certificate = self.read_subnet_state_raw(paths, subnet_id).await?;
1116        let api_boundary_nodes = lookup_api_boundary_nodes(certificate)?;
1117        Ok(api_boundary_nodes)
1118    }
1119
1120    async fn fetch_subnet_by_canister(
1121        &self,
1122        canister: &Principal,
1123    ) -> Result<Arc<Subnet>, AgentError> {
1124        let cert = self
1125            .read_state_raw(vec![vec!["subnet".into()]], *canister)
1126            .await?;
1127
1128        let (subnet_id, subnet) = lookup_subnet(&cert, &self.root_key.read().unwrap())?;
1129        let subnet = Arc::new(subnet);
1130        self.subnet_key_cache
1131            .lock()
1132            .unwrap()
1133            .insert_subnet(subnet_id, subnet.clone());
1134        Ok(subnet)
1135    }
1136
1137    async fn request(
1138        &self,
1139        method: Method,
1140        endpoint: &str,
1141        body: Option<Vec<u8>>,
1142    ) -> Result<(StatusCode, HeaderMap, Vec<u8>), AgentError> {
1143        let create_request_with_generated_url = || -> Result<Request, AgentError> {
1144            let url = self.route_provider.route()?.join(endpoint)?;
1145            let mut http_request = Request::new(method.clone(), url);
1146            http_request
1147                .headers_mut()
1148                .insert(CONTENT_TYPE, "application/cbor".parse().unwrap());
1149            *http_request.body_mut() = body.clone().map(Body::from);
1150            Ok(http_request)
1151        };
1152
1153        let response = self
1154            .client
1155            .call(
1156                &create_request_with_generated_url,
1157                self.max_tcp_error_retries,
1158            )
1159            .await?;
1160
1161        let http_status = response.status();
1162        let response_headers = response.headers().clone();
1163
1164        // Size Check (Content-Length)
1165        if matches!(self
1166            .max_response_body_size
1167            .zip(response.content_length()), Some((size_limit, content_length)) if content_length > size_limit as u64)
1168        {
1169            return Err(AgentError::ResponseSizeExceededLimit());
1170        }
1171
1172        let mut body: Vec<u8> = response
1173            .content_length()
1174            .map_or_else(Vec::new, |n| Vec::with_capacity(n as usize));
1175
1176        let mut stream = response.bytes_stream();
1177
1178        while let Some(chunk) = stream.next().await {
1179            let chunk = chunk?;
1180
1181            // Size Check (Body Size)
1182            if matches!(self
1183                .max_response_body_size, Some(size_limit) if body.len() + chunk.len() > size_limit)
1184            {
1185                return Err(AgentError::ResponseSizeExceededLimit());
1186            }
1187
1188            body.extend_from_slice(chunk.as_ref());
1189        }
1190
1191        Ok((http_status, response_headers, body))
1192    }
1193
1194    async fn execute(
1195        &self,
1196        method: Method,
1197        endpoint: &str,
1198        body: Option<Vec<u8>>,
1199    ) -> Result<(StatusCode, Vec<u8>), AgentError> {
1200        let request_result = self.request(method.clone(), endpoint, body.clone()).await?;
1201
1202        let status = request_result.0;
1203        let headers = request_result.1;
1204        let body = request_result.2;
1205
1206        if status.is_client_error() || status.is_server_error() {
1207            Err(AgentError::HttpError(HttpErrorPayload {
1208                status: status.into(),
1209                content_type: headers
1210                    .get(CONTENT_TYPE)
1211                    .and_then(|value| value.to_str().ok())
1212                    .map(str::to_string),
1213                content: body,
1214            }))
1215        } else if !(status == StatusCode::OK || status == StatusCode::ACCEPTED) {
1216            Err(AgentError::InvalidHttpResponse(format!(
1217                "Expected `200`, `202`, 4xx`, or `5xx` HTTP status code. Got: {status}",
1218            )))
1219        } else {
1220            Ok((status, body))
1221        }
1222    }
1223}
1224
1225// Checks if a principal is contained within a list of principal ranges
1226// A range is a tuple: (low: Principal, high: Principal), as described here: https://internetcomputer.org/docs/current/references/ic-interface-spec#state-tree-subnet
1227fn principal_is_within_ranges(principal: &Principal, ranges: &[(Principal, Principal)]) -> bool {
1228    ranges
1229        .iter()
1230        .any(|r| principal >= &r.0 && principal <= &r.1)
1231}
1232
1233fn sign_envelope(
1234    content: &EnvelopeContent,
1235    identity: Arc<dyn Identity>,
1236) -> Result<Vec<u8>, AgentError> {
1237    let signature = identity.sign(content).map_err(AgentError::SigningError)?;
1238
1239    let envelope = Envelope {
1240        content: Cow::Borrowed(content),
1241        sender_pubkey: signature.public_key,
1242        sender_sig: signature.signature,
1243        sender_delegation: signature.delegations,
1244    };
1245
1246    let mut serialized_bytes = Vec::new();
1247    let mut serializer = serde_cbor::Serializer::new(&mut serialized_bytes);
1248    serializer.self_describe()?;
1249    envelope.serialize(&mut serializer)?;
1250
1251    Ok(serialized_bytes)
1252}
1253
1254/// Inspect the bytes to be sent as a query
1255/// Return Ok only when the bytes can be deserialized as a query and all fields match with the arguments
1256pub fn signed_query_inspect(
1257    sender: Principal,
1258    canister_id: Principal,
1259    method_name: &str,
1260    arg: &[u8],
1261    ingress_expiry: u64,
1262    signed_query: Vec<u8>,
1263) -> Result<(), AgentError> {
1264    let envelope: Envelope =
1265        serde_cbor::from_slice(&signed_query).map_err(AgentError::InvalidCborData)?;
1266    match envelope.content.as_ref() {
1267        EnvelopeContent::Query {
1268            ingress_expiry: ingress_expiry_cbor,
1269            sender: sender_cbor,
1270            canister_id: canister_id_cbor,
1271            method_name: method_name_cbor,
1272            arg: arg_cbor,
1273            nonce: _nonce,
1274        } => {
1275            if ingress_expiry != *ingress_expiry_cbor {
1276                return Err(AgentError::CallDataMismatch {
1277                    field: "ingress_expiry".to_string(),
1278                    value_arg: ingress_expiry.to_string(),
1279                    value_cbor: ingress_expiry_cbor.to_string(),
1280                });
1281            }
1282            if sender != *sender_cbor {
1283                return Err(AgentError::CallDataMismatch {
1284                    field: "sender".to_string(),
1285                    value_arg: sender.to_string(),
1286                    value_cbor: sender_cbor.to_string(),
1287                });
1288            }
1289            if canister_id != *canister_id_cbor {
1290                return Err(AgentError::CallDataMismatch {
1291                    field: "canister_id".to_string(),
1292                    value_arg: canister_id.to_string(),
1293                    value_cbor: canister_id_cbor.to_string(),
1294                });
1295            }
1296            if method_name != *method_name_cbor {
1297                return Err(AgentError::CallDataMismatch {
1298                    field: "method_name".to_string(),
1299                    value_arg: method_name.to_string(),
1300                    value_cbor: method_name_cbor.clone(),
1301                });
1302            }
1303            if arg != *arg_cbor {
1304                return Err(AgentError::CallDataMismatch {
1305                    field: "arg".to_string(),
1306                    value_arg: format!("{arg:?}"),
1307                    value_cbor: format!("{arg_cbor:?}"),
1308                });
1309            }
1310        }
1311        EnvelopeContent::Call { .. } => {
1312            return Err(AgentError::CallDataMismatch {
1313                field: "request_type".to_string(),
1314                value_arg: "query".to_string(),
1315                value_cbor: "call".to_string(),
1316            })
1317        }
1318        EnvelopeContent::ReadState { .. } => {
1319            return Err(AgentError::CallDataMismatch {
1320                field: "request_type".to_string(),
1321                value_arg: "query".to_string(),
1322                value_cbor: "read_state".to_string(),
1323            })
1324        }
1325    }
1326    Ok(())
1327}
1328
1329/// Inspect the bytes to be sent as an update
1330/// Return Ok only when the bytes can be deserialized as an update and all fields match with the arguments
1331pub fn signed_update_inspect(
1332    sender: Principal,
1333    canister_id: Principal,
1334    method_name: &str,
1335    arg: &[u8],
1336    ingress_expiry: u64,
1337    signed_update: Vec<u8>,
1338) -> Result<(), AgentError> {
1339    let envelope: Envelope =
1340        serde_cbor::from_slice(&signed_update).map_err(AgentError::InvalidCborData)?;
1341    match envelope.content.as_ref() {
1342        EnvelopeContent::Call {
1343            nonce: _nonce,
1344            ingress_expiry: ingress_expiry_cbor,
1345            sender: sender_cbor,
1346            canister_id: canister_id_cbor,
1347            method_name: method_name_cbor,
1348            arg: arg_cbor,
1349        } => {
1350            if ingress_expiry != *ingress_expiry_cbor {
1351                return Err(AgentError::CallDataMismatch {
1352                    field: "ingress_expiry".to_string(),
1353                    value_arg: ingress_expiry.to_string(),
1354                    value_cbor: ingress_expiry_cbor.to_string(),
1355                });
1356            }
1357            if sender != *sender_cbor {
1358                return Err(AgentError::CallDataMismatch {
1359                    field: "sender".to_string(),
1360                    value_arg: sender.to_string(),
1361                    value_cbor: sender_cbor.to_string(),
1362                });
1363            }
1364            if canister_id != *canister_id_cbor {
1365                return Err(AgentError::CallDataMismatch {
1366                    field: "canister_id".to_string(),
1367                    value_arg: canister_id.to_string(),
1368                    value_cbor: canister_id_cbor.to_string(),
1369                });
1370            }
1371            if method_name != *method_name_cbor {
1372                return Err(AgentError::CallDataMismatch {
1373                    field: "method_name".to_string(),
1374                    value_arg: method_name.to_string(),
1375                    value_cbor: method_name_cbor.clone(),
1376                });
1377            }
1378            if arg != *arg_cbor {
1379                return Err(AgentError::CallDataMismatch {
1380                    field: "arg".to_string(),
1381                    value_arg: format!("{arg:?}"),
1382                    value_cbor: format!("{arg_cbor:?}"),
1383                });
1384            }
1385        }
1386        EnvelopeContent::ReadState { .. } => {
1387            return Err(AgentError::CallDataMismatch {
1388                field: "request_type".to_string(),
1389                value_arg: "call".to_string(),
1390                value_cbor: "read_state".to_string(),
1391            })
1392        }
1393        EnvelopeContent::Query { .. } => {
1394            return Err(AgentError::CallDataMismatch {
1395                field: "request_type".to_string(),
1396                value_arg: "call".to_string(),
1397                value_cbor: "query".to_string(),
1398            })
1399        }
1400    }
1401    Ok(())
1402}
1403
1404/// Inspect the bytes to be sent as a `request_status`
1405/// Return Ok only when the bytes can be deserialized as a `request_status` and all fields match with the arguments
1406pub fn signed_request_status_inspect(
1407    sender: Principal,
1408    request_id: &RequestId,
1409    ingress_expiry: u64,
1410    signed_request_status: Vec<u8>,
1411) -> Result<(), AgentError> {
1412    let paths: Vec<Vec<Label>> = vec![vec!["request_status".into(), request_id.to_vec().into()]];
1413    let envelope: Envelope =
1414        serde_cbor::from_slice(&signed_request_status).map_err(AgentError::InvalidCborData)?;
1415    match envelope.content.as_ref() {
1416        EnvelopeContent::ReadState {
1417            ingress_expiry: ingress_expiry_cbor,
1418            sender: sender_cbor,
1419            paths: paths_cbor,
1420        } => {
1421            if ingress_expiry != *ingress_expiry_cbor {
1422                return Err(AgentError::CallDataMismatch {
1423                    field: "ingress_expiry".to_string(),
1424                    value_arg: ingress_expiry.to_string(),
1425                    value_cbor: ingress_expiry_cbor.to_string(),
1426                });
1427            }
1428            if sender != *sender_cbor {
1429                return Err(AgentError::CallDataMismatch {
1430                    field: "sender".to_string(),
1431                    value_arg: sender.to_string(),
1432                    value_cbor: sender_cbor.to_string(),
1433                });
1434            }
1435
1436            if paths != *paths_cbor {
1437                return Err(AgentError::CallDataMismatch {
1438                    field: "paths".to_string(),
1439                    value_arg: format!("{paths:?}"),
1440                    value_cbor: format!("{paths_cbor:?}"),
1441                });
1442            }
1443        }
1444        EnvelopeContent::Query { .. } => {
1445            return Err(AgentError::CallDataMismatch {
1446                field: "request_type".to_string(),
1447                value_arg: "read_state".to_string(),
1448                value_cbor: "query".to_string(),
1449            })
1450        }
1451        EnvelopeContent::Call { .. } => {
1452            return Err(AgentError::CallDataMismatch {
1453                field: "request_type".to_string(),
1454                value_arg: "read_state".to_string(),
1455                value_cbor: "call".to_string(),
1456            })
1457        }
1458    }
1459    Ok(())
1460}
1461
1462#[derive(Clone)]
1463struct SubnetCache {
1464    subnets: TimedCache<Principal, Arc<Subnet>>,
1465    canister_index: RangeInclusiveMap<Principal, Principal, PrincipalStep>,
1466}
1467
1468impl SubnetCache {
1469    fn new() -> Self {
1470        Self {
1471            subnets: TimedCache::with_lifespan(300),
1472            canister_index: RangeInclusiveMap::new_with_step_fns(),
1473        }
1474    }
1475
1476    fn get_subnet_by_canister(&mut self, canister: &Principal) -> Option<Arc<Subnet>> {
1477        self.canister_index
1478            .get(canister)
1479            .and_then(|subnet_id| self.subnets.cache_get(subnet_id).cloned())
1480            .filter(|subnet| subnet.canister_ranges.contains(canister))
1481    }
1482
1483    fn insert_subnet(&mut self, subnet_id: Principal, subnet: Arc<Subnet>) {
1484        self.subnets.cache_set(subnet_id, subnet.clone());
1485        for range in subnet.canister_ranges.iter() {
1486            self.canister_index.insert(range.clone(), subnet_id);
1487        }
1488    }
1489}
1490
1491#[derive(Clone, Copy)]
1492struct PrincipalStep;
1493
1494impl StepFns<Principal> for PrincipalStep {
1495    fn add_one(start: &Principal) -> Principal {
1496        let bytes = start.as_slice();
1497        let mut arr = [0; 29];
1498        arr[..bytes.len()].copy_from_slice(bytes);
1499        for byte in arr[..bytes.len() - 1].iter_mut().rev() {
1500            *byte = byte.wrapping_add(1);
1501            if *byte != 0 {
1502                break;
1503            }
1504        }
1505        Principal::from_slice(&arr[..bytes.len()])
1506    }
1507    fn sub_one(start: &Principal) -> Principal {
1508        let bytes = start.as_slice();
1509        let mut arr = [0; 29];
1510        arr[..bytes.len()].copy_from_slice(bytes);
1511        for byte in arr[..bytes.len() - 1].iter_mut().rev() {
1512            *byte = byte.wrapping_sub(1);
1513            if *byte != 255 {
1514                break;
1515            }
1516        }
1517        Principal::from_slice(&arr[..bytes.len()])
1518    }
1519}
1520
1521#[derive(Clone)]
1522pub(crate) struct Subnet {
1523    // This key is just fetched for completeness. Do not actually use this value as it is not authoritative in case of a rogue subnet.
1524    // If a future agent needs to know the subnet key then it should fetch /subnet from the *root* subnet.
1525    _key: Vec<u8>,
1526    node_keys: HashMap<Principal, Vec<u8>>,
1527    canister_ranges: RangeInclusiveSet<Principal, PrincipalStep>,
1528}
1529
1530/// API boundary node, which routes /api calls to IC replica nodes.
1531#[derive(Debug, Clone)]
1532pub struct ApiBoundaryNode {
1533    /// Domain name
1534    pub domain: String,
1535    /// IPv6 address in the hexadecimal notation with colons.
1536    pub ipv6_address: String,
1537    /// IPv4 address in the dotted-decimal notation.
1538    pub ipv4_address: Option<String>,
1539}
1540
1541/// A query request builder.
1542///
1543/// This makes it easier to do query calls without actually passing all arguments.
1544#[derive(Debug, Clone)]
1545#[non_exhaustive]
1546pub struct QueryBuilder<'agent> {
1547    agent: &'agent Agent,
1548    /// The [effective canister ID](https://internetcomputer.org/docs/current/references/ic-interface-spec#http-effective-canister-id) of the destination.
1549    pub effective_canister_id: Principal,
1550    /// The principal ID of the canister being called.
1551    pub canister_id: Principal,
1552    /// The name of the canister method being called.
1553    pub method_name: String,
1554    /// The argument blob to be passed to the method.
1555    pub arg: Vec<u8>,
1556    /// The Unix timestamp that the request will expire at.
1557    pub ingress_expiry_datetime: Option<u64>,
1558    /// Whether to include a nonce with the message.
1559    pub use_nonce: bool,
1560}
1561
1562impl<'agent> QueryBuilder<'agent> {
1563    /// Creates a new query builder with an agent for a particular canister method.
1564    pub fn new(agent: &'agent Agent, canister_id: Principal, method_name: String) -> Self {
1565        Self {
1566            agent,
1567            effective_canister_id: canister_id,
1568            canister_id,
1569            method_name,
1570            arg: vec![],
1571            ingress_expiry_datetime: None,
1572            use_nonce: false,
1573        }
1574    }
1575
1576    /// Sets the [effective canister ID](https://internetcomputer.org/docs/current/references/ic-interface-spec#http-effective-canister-id) of the destination.
1577    pub fn with_effective_canister_id(mut self, canister_id: Principal) -> Self {
1578        self.effective_canister_id = canister_id;
1579        self
1580    }
1581
1582    /// Sets the argument blob to pass to the canister. For most canisters this should be a Candid-serialized tuple.
1583    pub fn with_arg<A: Into<Vec<u8>>>(mut self, arg: A) -> Self {
1584        self.arg = arg.into();
1585        self
1586    }
1587
1588    /// Sets `ingress_expiry_datetime` to the provided timestamp, at nanosecond precision.
1589    pub fn expire_at(mut self, time: impl Into<OffsetDateTime>) -> Self {
1590        self.ingress_expiry_datetime = Some(time.into().unix_timestamp_nanos() as u64);
1591        self
1592    }
1593
1594    /// Sets `ingress_expiry_datetime` to `max(now, 4min)`.
1595    pub fn expire_after(mut self, duration: Duration) -> Self {
1596        self.ingress_expiry_datetime = Some(
1597            OffsetDateTime::now_utc()
1598                .saturating_add(duration.try_into().expect("negative duration"))
1599                .unix_timestamp_nanos() as u64,
1600        );
1601        self
1602    }
1603
1604    /// Uses a nonce generated with the agent's configured nonce factory. By default queries do not use nonces,
1605    /// and thus may get a (briefly) cached response.
1606    pub fn with_nonce_generation(mut self) -> Self {
1607        self.use_nonce = true;
1608        self
1609    }
1610
1611    /// Make a query call. This will return a byte vector.
1612    pub async fn call(self) -> Result<Vec<u8>, AgentError> {
1613        self.agent
1614            .query_raw(
1615                self.canister_id,
1616                self.effective_canister_id,
1617                self.method_name,
1618                self.arg,
1619                self.ingress_expiry_datetime,
1620                self.use_nonce,
1621                None,
1622            )
1623            .await
1624    }
1625
1626    /// Make a query call with signature verification. This will return a byte vector.
1627    ///
1628    /// Compared with [call][Self::call], this method will **always** verify the signature of the query response
1629    /// regardless the Agent level configuration from [`AgentBuilder::with_verify_query_signatures`].
1630    pub async fn call_with_verification(self) -> Result<Vec<u8>, AgentError> {
1631        self.agent
1632            .query_raw(
1633                self.canister_id,
1634                self.effective_canister_id,
1635                self.method_name,
1636                self.arg,
1637                self.ingress_expiry_datetime,
1638                self.use_nonce,
1639                Some(true),
1640            )
1641            .await
1642    }
1643
1644    /// Make a query call without signature verification. This will return a byte vector.
1645    ///
1646    /// Compared with [call][Self::call], this method will **never** verify the signature of the query response
1647    /// regardless the Agent level configuration from [`AgentBuilder::with_verify_query_signatures`].
1648    pub async fn call_without_verification(self) -> Result<Vec<u8>, AgentError> {
1649        self.agent
1650            .query_raw(
1651                self.canister_id,
1652                self.effective_canister_id,
1653                self.method_name,
1654                self.arg,
1655                self.ingress_expiry_datetime,
1656                self.use_nonce,
1657                Some(false),
1658            )
1659            .await
1660    }
1661
1662    /// Sign a query call. This will return a [`signed::SignedQuery`]
1663    /// which contains all fields of the query and the signed query in CBOR encoding
1664    pub fn sign(self) -> Result<SignedQuery, AgentError> {
1665        let effective_canister_id = self.effective_canister_id;
1666        let identity = self.agent.identity.clone();
1667        let content = self.into_envelope()?;
1668        let signed_query = sign_envelope(&content, identity)?;
1669        let EnvelopeContent::Query {
1670            ingress_expiry,
1671            sender,
1672            canister_id,
1673            method_name,
1674            arg,
1675            nonce,
1676        } = content
1677        else {
1678            unreachable!()
1679        };
1680        Ok(SignedQuery {
1681            ingress_expiry,
1682            sender,
1683            canister_id,
1684            method_name,
1685            arg,
1686            effective_canister_id,
1687            signed_query,
1688            nonce,
1689        })
1690    }
1691
1692    /// Converts the query builder into [`EnvelopeContent`] for external signing or storage.
1693    pub fn into_envelope(self) -> Result<EnvelopeContent, AgentError> {
1694        self.agent.query_content(
1695            self.canister_id,
1696            self.method_name,
1697            self.arg,
1698            self.ingress_expiry_datetime,
1699            self.use_nonce,
1700        )
1701    }
1702}
1703
1704impl<'agent> IntoFuture for QueryBuilder<'agent> {
1705    type IntoFuture = AgentFuture<'agent, Vec<u8>>;
1706    type Output = Result<Vec<u8>, AgentError>;
1707    fn into_future(self) -> Self::IntoFuture {
1708        Box::pin(self.call())
1709    }
1710}
1711
1712/// An in-flight canister update call. Useful primarily as a `Future`.
1713pub struct UpdateCall<'agent> {
1714    agent: &'agent Agent,
1715    response_future: AgentFuture<'agent, CallResponse<(Vec<u8>, Certificate)>>,
1716    effective_canister_id: Principal,
1717}
1718
1719impl fmt::Debug for UpdateCall<'_> {
1720    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1721        f.debug_struct("UpdateCall")
1722            .field("agent", &self.agent)
1723            .field("effective_canister_id", &self.effective_canister_id)
1724            .finish_non_exhaustive()
1725    }
1726}
1727
1728impl Future for UpdateCall<'_> {
1729    type Output = Result<CallResponse<(Vec<u8>, Certificate)>, AgentError>;
1730    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1731        self.response_future.as_mut().poll(cx)
1732    }
1733}
1734
1735impl<'a> UpdateCall<'a> {
1736    /// Waits for the update call to be completed, polling if necessary.
1737    pub async fn and_wait(self) -> Result<(Vec<u8>, Certificate), AgentError> {
1738        let response = self.response_future.await?;
1739
1740        match response {
1741            CallResponse::Response(response) => Ok(response),
1742            CallResponse::Poll(request_id) => {
1743                self.agent
1744                    .wait(&request_id, self.effective_canister_id)
1745                    .await
1746            }
1747        }
1748    }
1749}
1750/// An update request Builder.
1751///
1752/// This makes it easier to do update calls without actually passing all arguments or specifying
1753/// if you want to wait or not.
1754#[derive(Debug)]
1755pub struct UpdateBuilder<'agent> {
1756    agent: &'agent Agent,
1757    /// The [effective canister ID](https://internetcomputer.org/docs/current/references/ic-interface-spec#http-effective-canister-id) of the destination.
1758    pub effective_canister_id: Principal,
1759    /// The principal ID of the canister being called.
1760    pub canister_id: Principal,
1761    /// The name of the canister method being called.
1762    pub method_name: String,
1763    /// The argument blob to be passed to the method.
1764    pub arg: Vec<u8>,
1765    /// The Unix timestamp that the request will expire at.
1766    pub ingress_expiry_datetime: Option<u64>,
1767}
1768
1769impl<'agent> UpdateBuilder<'agent> {
1770    /// Creates a new update builder with an agent for a particular canister method.
1771    pub fn new(agent: &'agent Agent, canister_id: Principal, method_name: String) -> Self {
1772        Self {
1773            agent,
1774            effective_canister_id: canister_id,
1775            canister_id,
1776            method_name,
1777            arg: vec![],
1778            ingress_expiry_datetime: None,
1779        }
1780    }
1781
1782    /// Sets the [effective canister ID](https://internetcomputer.org/docs/current/references/ic-interface-spec#http-effective-canister-id) of the destination.
1783    pub fn with_effective_canister_id(mut self, canister_id: Principal) -> Self {
1784        self.effective_canister_id = canister_id;
1785        self
1786    }
1787
1788    /// Sets the argument blob to pass to the canister. For most canisters this should be a Candid-serialized tuple.
1789    pub fn with_arg<A: Into<Vec<u8>>>(mut self, arg: A) -> Self {
1790        self.arg = arg.into();
1791        self
1792    }
1793
1794    /// Sets `ingress_expiry_datetime` to the provided timestamp, at nanosecond precision.
1795    pub fn expire_at(mut self, time: impl Into<OffsetDateTime>) -> Self {
1796        self.ingress_expiry_datetime = Some(time.into().unix_timestamp_nanos() as u64);
1797        self
1798    }
1799
1800    /// Sets `ingress_expiry_datetime` to `min(now, 4min)`.
1801    pub fn expire_after(mut self, duration: Duration) -> Self {
1802        self.ingress_expiry_datetime = Some(
1803            OffsetDateTime::now_utc()
1804                .saturating_add(duration.try_into().expect("negative duration"))
1805                .unix_timestamp_nanos() as u64,
1806        );
1807        self
1808    }
1809
1810    /// Make an update call. This will call `request_status` on the `RequestId` in a loop and return
1811    /// the response as a byte vector.
1812    pub async fn call_and_wait(self) -> Result<Vec<u8>, AgentError> {
1813        self.call().and_wait().await.map(|x| x.0)
1814    }
1815
1816    /// Make an update call. This will return a `RequestId`.
1817    /// The `RequestId` should then be used for `request_status` (most likely in a loop).
1818    pub fn call(self) -> UpdateCall<'agent> {
1819        let response_future = async move {
1820            self.agent
1821                .update_raw(
1822                    self.canister_id,
1823                    self.effective_canister_id,
1824                    self.method_name,
1825                    self.arg,
1826                    self.ingress_expiry_datetime,
1827                )
1828                .await
1829        };
1830        UpdateCall {
1831            agent: self.agent,
1832            response_future: Box::pin(response_future),
1833            effective_canister_id: self.effective_canister_id,
1834        }
1835    }
1836
1837    /// Sign a update call. This will return a [`signed::SignedUpdate`]
1838    /// which contains all fields of the update and the signed update in CBOR encoding
1839    pub fn sign(self) -> Result<SignedUpdate, AgentError> {
1840        let identity = self.agent.identity.clone();
1841        let effective_canister_id = self.effective_canister_id;
1842        let content = self.into_envelope()?;
1843        let signed_update = sign_envelope(&content, identity)?;
1844        let request_id = to_request_id(&content)?;
1845        let EnvelopeContent::Call {
1846            nonce,
1847            ingress_expiry,
1848            sender,
1849            canister_id,
1850            method_name,
1851            arg,
1852        } = content
1853        else {
1854            unreachable!()
1855        };
1856        Ok(SignedUpdate {
1857            nonce,
1858            ingress_expiry,
1859            sender,
1860            canister_id,
1861            method_name,
1862            arg,
1863            effective_canister_id,
1864            signed_update,
1865            request_id,
1866        })
1867    }
1868
1869    /// Converts the update builder into an [`EnvelopeContent`] for external signing or storage.
1870    pub fn into_envelope(self) -> Result<EnvelopeContent, AgentError> {
1871        let nonce = self.agent.nonce_factory.generate();
1872        self.agent.update_content(
1873            self.canister_id,
1874            self.method_name,
1875            self.arg,
1876            self.ingress_expiry_datetime,
1877            nonce,
1878        )
1879    }
1880}
1881
1882impl<'agent> IntoFuture for UpdateBuilder<'agent> {
1883    type IntoFuture = AgentFuture<'agent, Vec<u8>>;
1884    type Output = Result<Vec<u8>, AgentError>;
1885    fn into_future(self) -> Self::IntoFuture {
1886        Box::pin(self.call_and_wait())
1887    }
1888}
1889
1890/// HTTP client middleware. Implemented automatically for `reqwest`-compatible by-ref `tower::Service`, such as `reqwest_middleware`.
1891#[cfg_attr(target_family = "wasm", async_trait(?Send))]
1892#[cfg_attr(not(target_family = "wasm"), async_trait)]
1893pub trait HttpService: Send + Sync + Debug {
1894    /// Perform a HTTP request. Any retry logic should call `req` again, instead of `Request::try_clone`.
1895    async fn call<'a>(
1896        &'a self,
1897        req: &'a (dyn Fn() -> Result<Request, AgentError> + Send + Sync),
1898        max_retries: usize,
1899    ) -> Result<Response, AgentError>;
1900}
1901#[cfg(not(target_family = "wasm"))]
1902#[async_trait]
1903impl<T> HttpService for T
1904where
1905    for<'a> &'a T: Service<Request, Response = Response, Error = reqwest::Error>,
1906    for<'a> <&'a Self as Service<Request>>::Future: Send,
1907    T: Send + Sync + Debug + ?Sized,
1908{
1909    #[allow(clippy::needless_arbitrary_self_type)]
1910    async fn call<'a>(
1911        mut self: &'a Self,
1912        req: &'a (dyn Fn() -> Result<Request, AgentError> + Send + Sync),
1913        max_retries: usize,
1914    ) -> Result<Response, AgentError> {
1915        let mut retry_count = 0;
1916        loop {
1917            match Service::call(&mut self, req()?).await {
1918                Err(err) => {
1919                    // Network-related errors can be retried.
1920                    if err.is_connect() {
1921                        if retry_count >= max_retries {
1922                            return Err(AgentError::TransportError(err));
1923                        }
1924                        retry_count += 1;
1925                    }
1926                }
1927                Ok(resp) => return Ok(resp),
1928            }
1929        }
1930    }
1931}
1932
1933#[cfg(target_family = "wasm")]
1934#[async_trait(?Send)]
1935impl<T> HttpService for T
1936where
1937    for<'a> &'a T: Service<Request, Response = Response, Error = reqwest::Error>,
1938    T: Send + Sync + Debug + ?Sized,
1939{
1940    #[allow(clippy::needless_arbitrary_self_type)]
1941    async fn call<'a>(
1942        mut self: &'a Self,
1943        req: &'a (dyn Fn() -> Result<Request, AgentError> + Send + Sync),
1944        _: usize,
1945    ) -> Result<Response, AgentError> {
1946        Ok(Service::call(&mut self, req()?).await?)
1947    }
1948}
1949
1950#[derive(Debug)]
1951struct Retry429Logic {
1952    client: Client,
1953}
1954
1955#[cfg_attr(target_family = "wasm", async_trait(?Send))]
1956#[cfg_attr(not(target_family = "wasm"), async_trait)]
1957impl HttpService for Retry429Logic {
1958    async fn call<'a>(
1959        &'a self,
1960        req: &'a (dyn Fn() -> Result<Request, AgentError> + Send + Sync),
1961        _max_tcp_retries: usize,
1962    ) -> Result<Response, AgentError> {
1963        let mut retries = 0;
1964        loop {
1965            #[cfg(not(target_family = "wasm"))]
1966            let resp = self.client.call(req, _max_tcp_retries).await?;
1967            // Client inconveniently does not implement Service on wasm
1968            #[cfg(target_family = "wasm")]
1969            let resp = self.client.execute(req()?).await?;
1970            if resp.status() == StatusCode::TOO_MANY_REQUESTS {
1971                if retries == 6 {
1972                    break Ok(resp);
1973                } else {
1974                    retries += 1;
1975                    crate::util::sleep(Duration::from_millis(250)).await;
1976                    continue;
1977                }
1978            } else {
1979                break Ok(resp);
1980            }
1981        }
1982    }
1983}
1984
1985#[cfg(all(test, not(target_family = "wasm")))]
1986mod offline_tests {
1987    use super::*;
1988    use tokio::net::TcpListener;
1989    // Any tests that involve the network should go in agent_test, not here.
1990
1991    #[test]
1992    fn rounded_expiry() {
1993        let agent = Agent::builder()
1994            .with_url("http://not-a-real-url")
1995            .build()
1996            .unwrap();
1997        let mut prev_expiry = None;
1998        let mut num_timestamps = 0;
1999        for _ in 0..6 {
2000            let update = agent
2001                .update(&Principal::management_canister(), "not_a_method")
2002                .sign()
2003                .unwrap();
2004            if prev_expiry < Some(update.ingress_expiry) {
2005                prev_expiry = Some(update.ingress_expiry);
2006                num_timestamps += 1;
2007            }
2008        }
2009        // in six requests, there should be no more than two timestamps
2010        assert!(num_timestamps <= 2, "num_timestamps:{num_timestamps} > 2");
2011    }
2012
2013    #[tokio::test]
2014    async fn client_ratelimit() {
2015        let mock_server = TcpListener::bind("127.0.0.1:0").await.unwrap();
2016        let count = Arc::new(Mutex::new(0));
2017        let port = mock_server.local_addr().unwrap().port();
2018        tokio::spawn({
2019            let count = count.clone();
2020            async move {
2021                loop {
2022                    let (mut conn, _) = mock_server.accept().await.unwrap();
2023                    *count.lock().unwrap() += 1;
2024                    tokio::spawn(
2025                        // read all data, never reply
2026                        async move { tokio::io::copy(&mut conn, &mut tokio::io::sink()).await },
2027                    );
2028                }
2029            }
2030        });
2031        let agent = Agent::builder()
2032            .with_http_client(Client::builder().http1_only().build().unwrap())
2033            .with_url(format!("http://127.0.0.1:{port}"))
2034            .with_max_concurrent_requests(2)
2035            .build()
2036            .unwrap();
2037        for _ in 0..3 {
2038            let agent = agent.clone();
2039            tokio::spawn(async move {
2040                agent
2041                    .query(&"ryjl3-tyaaa-aaaaa-aaaba-cai".parse().unwrap(), "greet")
2042                    .call()
2043                    .await
2044            });
2045        }
2046        crate::util::sleep(Duration::from_millis(250)).await;
2047        assert_eq!(*count.lock().unwrap(), 2);
2048    }
2049}