Skip to main content

ic_agent/agent/
mod.rs

1//! The main Agent module. Contains the [Agent] type and all associated structures.
2pub(crate) mod agent_config;
3pub mod agent_error;
4pub(crate) mod builder;
5// delete this module after 0.40
6#[doc(hidden)]
7#[deprecated(since = "0.38.0", note = "use the AgentBuilder methods")]
8pub mod http_transport;
9pub(crate) mod nonce;
10pub(crate) mod response_authentication;
11pub mod route_provider;
12pub mod status;
13pub mod subnet;
14
15pub use agent_config::AgentConfig;
16pub use agent_error::AgentError;
17use agent_error::{HttpErrorPayload, Operation};
18use async_lock::Semaphore;
19use async_trait::async_trait;
20pub use builder::AgentBuilder;
21use bytes::Bytes;
22use cached::{Cached, TimedCache};
23use http::{header::CONTENT_TYPE, HeaderMap, Method, StatusCode, Uri};
24use ic_ed25519::{PublicKey, SignatureError};
25#[doc(inline)]
26pub use ic_transport_types::{
27    signed, CallResponse, Envelope, EnvelopeContent, RejectCode, RejectResponse, ReplyResponse,
28    RequestStatusResponse,
29};
30pub use nonce::{NonceFactory, NonceGenerator};
31use rangemap::RangeInclusiveMap;
32use reqwest::{Client, Request, Response};
33use route_provider::{
34    dynamic_routing::{dynamic_route_provider::DynamicRouteProviderBuilder, node::Node},
35    RouteProvider, UrlUntilReady,
36};
37pub use subnet::{Subnet, SubnetType};
38use time::OffsetDateTime;
39use tower_service::Service;
40
41#[cfg(test)]
42mod agent_test;
43
44use crate::{
45    agent::response_authentication::{
46        extract_der, lookup_canister_info, lookup_canister_metadata, lookup_canister_ranges,
47        lookup_incomplete_subnet, lookup_request_status, lookup_subnet_and_ranges,
48        lookup_subnet_canister_ranges, lookup_subnet_metrics, lookup_time, lookup_tree,
49        lookup_value,
50    },
51    agent_error::TransportError,
52    export::Principal,
53    identity::Identity,
54    to_request_id, RequestId,
55};
56use backoff::{backoff::Backoff, ExponentialBackoffBuilder};
57use backoff::{exponential::ExponentialBackoff, SystemClock};
58use ic_certification::{Certificate, Delegation, Label};
59use ic_transport_types::{
60    signed::{SignedQuery, SignedRequestStatus, SignedUpdate},
61    QueryResponse, ReadStateResponse, SubnetMetrics, TransportCallResponse,
62};
63use serde::Serialize;
64use status::Status;
65use std::{
66    borrow::Cow,
67    convert::TryFrom,
68    fmt::{self, Debug},
69    future::{Future, IntoFuture},
70    pin::Pin,
71    str::FromStr,
72    sync::{Arc, Mutex, RwLock},
73    task::{Context, Poll},
74    time::Duration,
75};
76
77use crate::agent::response_authentication::lookup_api_boundary_nodes;
78
79/// The effective ID that routes a request to a particular endpoint shape.
80///
81/// Most calls target a canister and are routed via an
82/// [effective canister id](https://internetcomputer.org/docs/current/references/ic-interface-spec#http-effective-canister-id).
83/// A small set of management-canister calls (currently `create_canister`,
84/// `provisional_create_canister_with_cycles`, and `list_canisters`) are instead
85/// routed via an [effective subnet id](https://internetcomputer.org/docs/current/references/ic-interface-spec#http-effective-subnet-id),
86/// targeting the subnet-scoped HTTP endpoints (`/api/v4/subnet/<id>/call`,
87/// `/api/v3/subnet/<id>/read_state`, `/api/v3/subnet/<id>/query`).
88///
89/// `Principal` converts into `EffectiveId::Canister(_)`, so passing a bare
90/// `Principal` to APIs that accept `impl Into<EffectiveId>` preserves the
91/// canister-scoped behavior of earlier `ic-agent` releases.
92#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
93pub enum EffectiveId {
94    /// An effective canister id. The request is routed to the canister-scoped HTTP endpoints.
95    Canister(Principal),
96    /// An effective subnet id. The request is routed to the subnet-scoped HTTP endpoints.
97    Subnet(Principal),
98}
99
100impl EffectiveId {
101    /// Returns the underlying principal regardless of variant.
102    pub fn as_principal(&self) -> Principal {
103        match self {
104            EffectiveId::Canister(p) | EffectiveId::Subnet(p) => *p,
105        }
106    }
107}
108
109impl From<Principal> for EffectiveId {
110    fn from(p: Principal) -> Self {
111        EffectiveId::Canister(p)
112    }
113}
114
115pub(crate) const IC_STATE_ROOT_DOMAIN_SEPARATOR: &[u8; 14] = b"\x0Dic-state-root";
116
117pub(crate) const IC_ROOT_KEY: &[u8; 133] = b"\x30\x81\x82\x30\x1d\x06\x0d\x2b\x06\x01\x04\x01\x82\xdc\x7c\x05\x03\x01\x02\x01\x06\x0c\x2b\x06\x01\x04\x01\x82\xdc\x7c\x05\x03\x02\x01\x03\x61\x00\x81\x4c\x0e\x6e\xc7\x1f\xab\x58\x3b\x08\xbd\x81\x37\x3c\x25\x5c\x3c\x37\x1b\x2e\x84\x86\x3c\x98\xa4\xf1\xe0\x8b\x74\x23\x5d\x14\xfb\x5d\x9c\x0c\xd5\x46\xd9\x68\x5f\x91\x3a\x0c\x0b\x2c\xc5\x34\x15\x83\xbf\x4b\x43\x92\xe4\x67\xdb\x96\xd6\x5b\x9b\xb4\xcb\x71\x71\x12\xf8\x47\x2e\x0d\x5a\x4d\x14\x50\x5f\xfd\x74\x84\xb0\x12\x91\x09\x1c\x5f\x87\xb9\x88\x83\x46\x3f\x98\x09\x1a\x0b\xaa\xae";
118
119#[cfg(not(target_family = "wasm"))]
120type AgentFuture<'a, V> = Pin<Box<dyn Future<Output = Result<V, AgentError>> + Send + 'a>>;
121
122#[cfg(target_family = "wasm")]
123type AgentFuture<'a, V> = Pin<Box<dyn Future<Output = Result<V, AgentError>> + 'a>>;
124
125/// A low level Agent to make calls to a Replica endpoint.
126///
127#[cfg_attr(unix, doc = " ```rust")] // pocket-ic
128#[cfg_attr(not(unix), doc = " ```ignore")]
129/// use ic_agent::{Agent, export::Principal};
130/// use candid::{Encode, Decode, CandidType, Nat};
131/// use serde::Deserialize;
132///
133/// #[derive(CandidType)]
134/// struct Argument {
135///   amount: Option<Nat>,
136/// }
137///
138/// #[derive(CandidType, Deserialize)]
139/// struct CreateCanisterResult {
140///   canister_id: Principal,
141/// }
142///
143/// # fn create_identity() -> impl ic_agent::Identity {
144/// #     // In real code, the raw key should be either read from a pem file or generated with randomness.
145/// #     ic_agent::identity::BasicIdentity::from_raw_key(&[0u8;32])
146/// # }
147/// #
148/// async fn create_a_canister() -> Result<Principal, Box<dyn std::error::Error>> {
149/// # Ok(ref_tests::utils::with_pic(async move |pic| {
150/// # let url = ref_tests::utils::get_pic_url(&pic);
151///   let agent = Agent::builder()
152///     .with_url(url)
153///     .with_identity(create_identity())
154///     .build()?;
155///
156///   // Only do the following call when not contacting the IC main net (e.g. a local emulator).
157///   // This is important as the main net public key is static and a rogue network could return
158///   // a different key.
159///   // If you know the root key ahead of time, you can use `agent.set_root_key(root_key);`.
160///   agent.fetch_root_key().await?;
161///   let management_canister_id = Principal::from_text("aaaaa-aa")?;
162///
163///   // Create a call to the management canister to create a new canister ID, and wait for a result.
164///   // This API only works in local instances; mainnet instances must use the cycles ledger.
165///   // See `dfx info default-effective-canister-id`.
166/// # let effective_canister_id = ref_tests::utils::get_effective_canister_id(&pic).await;
167///   let response = agent.update(&management_canister_id, "provisional_create_canister_with_cycles")
168///     .with_effective_canister_id(effective_canister_id)
169///     .with_arg(Encode!(&Argument { amount: None })?)
170///     .await?;
171///
172///   let result = Decode!(response.as_slice(), CreateCanisterResult)?;
173///   let canister_id: Principal = Principal::from_text(&result.canister_id.to_text())?;
174///   Ok(canister_id)
175/// # }).await)
176/// }
177///
178/// # let mut runtime = tokio::runtime::Runtime::new().unwrap();
179/// # runtime.block_on(async {
180/// let canister_id = create_a_canister().await.unwrap();
181/// eprintln!("{}", canister_id);
182/// # });
183/// ```
184///
185/// This agent does not understand Candid, and only acts on byte buffers.
186///
187/// Some methods return certificates. While there is a `verify_certificate` method, any certificate
188/// you receive from a method has already been verified and you do not need to manually verify it.
189#[derive(Clone)]
190pub struct Agent {
191    nonce_factory: Arc<dyn NonceGenerator>,
192    identity: Arc<dyn Identity>,
193    ingress_expiry: Duration,
194    root_key: Arc<RwLock<Vec<u8>>>,
195    client: Arc<dyn HttpService>,
196    route_provider: Arc<dyn RouteProvider>,
197    subnet_key_cache: Arc<Mutex<SubnetCache>>,
198    concurrent_requests_semaphore: Arc<Semaphore>,
199    verify_query_signatures: bool,
200    max_response_body_size: Option<usize>,
201    max_polling_time: Duration,
202    #[allow(dead_code)]
203    max_tcp_error_retries: usize,
204}
205
206impl fmt::Debug for Agent {
207    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
208        f.debug_struct("Agent")
209            .field("ingress_expiry", &self.ingress_expiry)
210            .finish_non_exhaustive()
211    }
212}
213
214impl Agent {
215    /// Create an instance of an [`AgentBuilder`] for building an [`Agent`]. This is simpler than
216    /// using the [`AgentConfig`] and [`Agent::new()`].
217    pub fn builder() -> builder::AgentBuilder {
218        Default::default()
219    }
220
221    /// Create an instance of an [`Agent`].
222    pub fn new(config: agent_config::AgentConfig) -> Result<Agent, AgentError> {
223        let client = config.http_service.unwrap_or_else(|| {
224            Arc::new(Retry429Logic {
225                client: config.client.unwrap_or_else(|| {
226                    #[cfg(not(target_family = "wasm"))]
227                    {
228                        Client::builder()
229                            .use_rustls_tls()
230                            .timeout(Duration::from_secs(360))
231                            .build()
232                            .expect("Could not create HTTP client.")
233                    }
234                    #[cfg(all(target_family = "wasm", feature = "wasm-bindgen"))]
235                    {
236                        Client::new()
237                    }
238                }),
239            })
240        });
241        Ok(Agent {
242            nonce_factory: config.nonce_factory,
243            identity: config.identity,
244            ingress_expiry: config.ingress_expiry,
245            root_key: Arc::new(RwLock::new(IC_ROOT_KEY.to_vec())),
246            client: client.clone(),
247            route_provider: if let Some(route_provider) = config.route_provider {
248                route_provider
249            } else if let Some(url) = config.url {
250                if config.background_dynamic_routing {
251                    assert!(
252                        url.scheme() == "https" && url.path() == "/" && url.port().is_none() && url.domain().is_some(),
253                        "in dynamic routing mode, URL must be in the exact form https://domain with no path, port, IP, or non-HTTPS scheme"
254                    );
255                    let seeds = vec![Node::new(url.domain().unwrap()).unwrap()];
256                    UrlUntilReady::new(url, async move {
257                        let provider =
258                            DynamicRouteProviderBuilder::new(seeds, client, None).build();
259                        provider.start().await;
260                        provider
261                    }) as Arc<dyn RouteProvider>
262                } else {
263                    Arc::new(url)
264                }
265            } else {
266                panic!("either route_provider or url must be specified");
267            },
268            subnet_key_cache: Arc::new(Mutex::new(SubnetCache::new())),
269            verify_query_signatures: config.verify_query_signatures,
270            concurrent_requests_semaphore: Arc::new(Semaphore::new(config.max_concurrent_requests)),
271            max_response_body_size: config.max_response_body_size,
272            max_tcp_error_retries: config.max_tcp_error_retries,
273            max_polling_time: config.max_polling_time,
274        })
275    }
276
277    /// Set the identity provider for signing messages.
278    ///
279    /// NOTE: if you change the identity while having update calls in
280    /// flight, you will not be able to [`Agent::request_status_raw`] the status of these
281    /// messages.
282    pub fn set_identity<I>(&mut self, identity: I)
283    where
284        I: 'static + Identity,
285    {
286        self.identity = Arc::new(identity);
287    }
288    /// Set the arc identity provider for signing messages.
289    ///
290    /// NOTE: if you change the identity while having update calls in
291    /// flight, you will not be able to [`Agent::request_status_raw`] the status of these
292    /// messages.
293    pub fn set_arc_identity(&mut self, identity: Arc<dyn Identity>) {
294        self.identity = identity;
295    }
296
297    /// By default, the agent is configured to talk to the main Internet Computer, and verifies
298    /// responses using a hard-coded public key.
299    ///
300    /// This function will instruct the agent to ask the endpoint for its public key, and use
301    /// that instead. This is required when talking to a local test instance, for example.
302    ///
303    /// *Only use this when you are  _not_ talking to the main Internet Computer, otherwise
304    /// you are prone to man-in-the-middle attacks! Do not call this function by default.*
305    pub async fn fetch_root_key(&self) -> Result<(), AgentError> {
306        if self.read_root_key()[..] != IC_ROOT_KEY[..] {
307            // already fetched the root key
308            return Ok(());
309        }
310        let status = self.status().await?;
311        let Some(root_key) = status.root_key else {
312            return Err(AgentError::NoRootKeyInStatus(status));
313        };
314        self.set_root_key(root_key);
315        Ok(())
316    }
317
318    /// By default, the agent is configured to talk to the main Internet Computer, and verifies
319    /// responses using a hard-coded public key.
320    ///
321    /// Using this function you can set the root key to a known one if you know if beforehand.
322    pub fn set_root_key(&self, root_key: Vec<u8>) {
323        *self.root_key.write().unwrap() = root_key;
324    }
325
326    /// Return the root key currently in use.
327    pub fn read_root_key(&self) -> Vec<u8> {
328        self.root_key.read().unwrap().clone()
329    }
330
331    fn get_expiry_date(&self) -> u64 {
332        let expiry_raw = OffsetDateTime::now_utc() + self.ingress_expiry;
333        let mut rounded = expiry_raw.replace_nanosecond(0).unwrap();
334        if self.ingress_expiry.as_secs() > 90 {
335            rounded = rounded.replace_second(0).unwrap();
336        }
337        rounded.unix_timestamp_nanos().try_into().unwrap()
338    }
339
340    /// Return the principal of the identity.
341    pub fn get_principal(&self) -> Result<Principal, String> {
342        self.identity.sender()
343    }
344
345    async fn query_endpoint<A>(
346        &self,
347        effective_canister_id: Principal,
348        serialized_bytes: Vec<u8>,
349    ) -> Result<A, AgentError>
350    where
351        A: serde::de::DeserializeOwned,
352    {
353        let _permit = self.concurrent_requests_semaphore.acquire().await;
354        let bytes = self
355            .execute(
356                Method::POST,
357                &format!("api/v3/canister/{}/query", effective_canister_id.to_text()),
358                Some(serialized_bytes),
359            )
360            .await?
361            .1;
362        serde_cbor::from_slice(&bytes).map_err(AgentError::InvalidCborData)
363    }
364
365    async fn read_state_endpoint<A>(
366        &self,
367        effective_canister_id: Principal,
368        serialized_bytes: Vec<u8>,
369    ) -> Result<A, AgentError>
370    where
371        A: serde::de::DeserializeOwned,
372    {
373        let _permit = self.concurrent_requests_semaphore.acquire().await;
374        let endpoint = format!(
375            "api/v3/canister/{}/read_state",
376            effective_canister_id.to_text()
377        );
378        let bytes = self
379            .execute(Method::POST, &endpoint, Some(serialized_bytes))
380            .await?
381            .1;
382        serde_cbor::from_slice(&bytes).map_err(AgentError::InvalidCborData)
383    }
384
385    async fn read_subnet_state_endpoint<A>(
386        &self,
387        subnet_id: Principal,
388        serialized_bytes: Vec<u8>,
389    ) -> Result<A, AgentError>
390    where
391        A: serde::de::DeserializeOwned,
392    {
393        let _permit = self.concurrent_requests_semaphore.acquire().await;
394        let endpoint = format!("api/v3/subnet/{}/read_state", subnet_id.to_text());
395        let bytes = self
396            .execute(Method::POST, &endpoint, Some(serialized_bytes))
397            .await?
398            .1;
399        serde_cbor::from_slice(&bytes).map_err(AgentError::InvalidCborData)
400    }
401
402    async fn query_subnet_endpoint<A>(
403        &self,
404        subnet_id: Principal,
405        serialized_bytes: Vec<u8>,
406    ) -> Result<A, AgentError>
407    where
408        A: serde::de::DeserializeOwned,
409    {
410        let _permit = self.concurrent_requests_semaphore.acquire().await;
411        let endpoint = format!("api/v3/subnet/{}/query", subnet_id.to_text());
412        let bytes = self
413            .execute(Method::POST, &endpoint, Some(serialized_bytes))
414            .await?
415            .1;
416        serde_cbor::from_slice(&bytes).map_err(AgentError::InvalidCborData)
417    }
418
419    async fn query_endpoint_for<A>(
420        &self,
421        effective_id: EffectiveId,
422        serialized_bytes: Vec<u8>,
423    ) -> Result<A, AgentError>
424    where
425        A: serde::de::DeserializeOwned,
426    {
427        match effective_id {
428            EffectiveId::Canister(p) => self.query_endpoint(p, serialized_bytes).await,
429            EffectiveId::Subnet(p) => self.query_subnet_endpoint(p, serialized_bytes).await,
430        }
431    }
432
433    async fn read_state_endpoint_for<A>(
434        &self,
435        effective_id: EffectiveId,
436        serialized_bytes: Vec<u8>,
437    ) -> Result<A, AgentError>
438    where
439        A: serde::de::DeserializeOwned,
440    {
441        match effective_id {
442            EffectiveId::Canister(p) => self.read_state_endpoint(p, serialized_bytes).await,
443            EffectiveId::Subnet(p) => self.read_subnet_state_endpoint(p, serialized_bytes).await,
444        }
445    }
446
447    async fn call_endpoint_for(
448        &self,
449        effective_id: EffectiveId,
450        serialized_bytes: Vec<u8>,
451    ) -> Result<TransportCallResponse, AgentError> {
452        match effective_id {
453            EffectiveId::Canister(p) => self.call_endpoint(p, serialized_bytes).await,
454            EffectiveId::Subnet(p) => self.call_subnet_endpoint(p, serialized_bytes).await,
455        }
456    }
457
458    async fn get_subnet_for(&self, effective_id: EffectiveId) -> Result<Arc<Subnet>, AgentError> {
459        match effective_id {
460            EffectiveId::Canister(p) => self.get_subnet_by_canister(&p).await,
461            EffectiveId::Subnet(p) => self.get_subnet_by_id(&p).await,
462        }
463    }
464
465    async fn fetch_subnet_for(&self, effective_id: EffectiveId) -> Result<Arc<Subnet>, AgentError> {
466        match effective_id {
467            EffectiveId::Canister(p) => self.fetch_subnet_by_canister(&p).await,
468            EffectiveId::Subnet(p) => self.fetch_subnet_by_id(&p).await,
469        }
470    }
471
472    async fn call_endpoint(
473        &self,
474        effective_canister_id: Principal,
475        serialized_bytes: Vec<u8>,
476    ) -> Result<TransportCallResponse, AgentError> {
477        let _permit = self.concurrent_requests_semaphore.acquire().await;
478        let endpoint = format!("api/v4/canister/{}/call", effective_canister_id.to_text());
479        let (status_code, response_body) = self
480            .execute(Method::POST, &endpoint, Some(serialized_bytes))
481            .await?;
482
483        if status_code == StatusCode::ACCEPTED {
484            return Ok(TransportCallResponse::Accepted);
485        }
486
487        serde_cbor::from_slice(&response_body).map_err(AgentError::InvalidCborData)
488    }
489
490    async fn call_subnet_endpoint(
491        &self,
492        subnet_id: Principal,
493        serialized_bytes: Vec<u8>,
494    ) -> Result<TransportCallResponse, AgentError> {
495        let _permit = self.concurrent_requests_semaphore.acquire().await;
496        let endpoint = format!("api/v4/subnet/{}/call", subnet_id.to_text());
497        let (status_code, response_body) = self
498            .execute(Method::POST, &endpoint, Some(serialized_bytes))
499            .await?;
500
501        if status_code == StatusCode::ACCEPTED {
502            return Ok(TransportCallResponse::Accepted);
503        }
504
505        serde_cbor::from_slice(&response_body).map_err(AgentError::InvalidCborData)
506    }
507
508    /// The simplest way to do a query call; sends a byte array and will return a byte vector.
509    /// The encoding is left as an exercise to the user.
510    #[allow(clippy::too_many_arguments)]
511    async fn query_raw(
512        &self,
513        canister_id: Principal,
514        effective_canister_id: Principal,
515        method_name: String,
516        arg: Vec<u8>,
517        ingress_expiry_datetime: Option<u64>,
518        use_nonce: bool,
519        explicit_verify_query_signatures: Option<bool>,
520    ) -> Result<Vec<u8>, AgentError> {
521        let operation = Operation::Call {
522            canister: canister_id,
523            method: method_name.clone(),
524        };
525        let content = self.query_content(
526            canister_id,
527            method_name,
528            arg,
529            ingress_expiry_datetime,
530            use_nonce,
531        )?;
532        let serialized_bytes = sign_envelope(&content, self.identity.clone())?;
533        self.query_inner(
534            EffectiveId::Canister(effective_canister_id),
535            serialized_bytes,
536            content.to_request_id(),
537            explicit_verify_query_signatures,
538            operation,
539        )
540        .await
541    }
542
543    /// Send the signed query to the network. Will return a byte vector.
544    /// The bytes will be checked if it is a valid query.
545    /// If you want to inspect the fields of the query call, use [`signed_query_inspect`] before calling this method.
546    ///
547    /// `effective_id` may be either an effective canister id (the common case) or
548    /// an effective subnet id; in the latter case the request is routed to the
549    /// subnet-scoped `/api/v3/subnet/<id>/query` endpoint. Per the IC interface
550    /// spec, subnet-scoped queries are currently only valid for the
551    /// `list_canisters` method of the Management Canister.
552    pub async fn query_signed(
553        &self,
554        effective_id: impl Into<EffectiveId>,
555        signed_query: Vec<u8>,
556    ) -> Result<Vec<u8>, AgentError> {
557        let envelope: Envelope =
558            serde_cbor::from_slice(&signed_query).map_err(AgentError::InvalidCborData)?;
559        let EnvelopeContent::Query {
560            canister_id,
561            method_name,
562            ..
563        } = &*envelope.content
564        else {
565            return Err(AgentError::CallDataMismatch {
566                field: "request_type".to_string(),
567                value_arg: "query".to_string(),
568                value_cbor: if matches!(*envelope.content, EnvelopeContent::Call { .. }) {
569                    "update"
570                } else {
571                    "read_state"
572                }
573                .to_string(),
574            });
575        };
576        let operation = Operation::Call {
577            canister: *canister_id,
578            method: method_name.clone(),
579        };
580        self.query_inner(
581            effective_id.into(),
582            signed_query,
583            envelope.content.to_request_id(),
584            None,
585            operation,
586        )
587        .await
588    }
589
590    /// Helper function for performing both the query call and possibly a `read_state` to check the subnet node keys.
591    ///
592    /// This should be used instead of `query_endpoint`. No validation is performed on `signed_query`.
593    async fn query_inner(
594        &self,
595        effective_id: EffectiveId,
596        signed_query: Vec<u8>,
597        request_id: RequestId,
598        explicit_verify_query_signatures: Option<bool>,
599        operation: Operation,
600    ) -> Result<Vec<u8>, AgentError> {
601        let response = if explicit_verify_query_signatures.unwrap_or(self.verify_query_signatures) {
602            let (response, mut subnet) = futures_util::try_join!(
603                self.query_endpoint_for::<QueryResponse>(effective_id, signed_query),
604                self.get_subnet_for(effective_id)
605            )?;
606            if response.signatures().is_empty() {
607                return Err(AgentError::MissingSignature);
608            } else if response.signatures().len() > subnet.node_keys.len() {
609                return Err(AgentError::TooManySignatures {
610                    had: response.signatures().len(),
611                    needed: subnet.node_keys.len(),
612                });
613            }
614            for signature in response.signatures() {
615                if OffsetDateTime::now_utc()
616                    - OffsetDateTime::from_unix_timestamp_nanos(signature.timestamp.into()).unwrap()
617                    > self.ingress_expiry
618                {
619                    return Err(AgentError::CertificateOutdated(self.ingress_expiry));
620                }
621                let signable = response.signable(request_id, signature.timestamp);
622                let node_key = if let Some(node_key) = subnet.node_keys.get(&signature.identity) {
623                    node_key
624                } else {
625                    subnet = self.fetch_subnet_for(effective_id).await?;
626                    subnet
627                        .node_keys
628                        .get(&signature.identity)
629                        .ok_or(AgentError::CertificateNotAuthorized())?
630                };
631                if node_key.len() != 44 {
632                    return Err(AgentError::DerKeyLengthMismatch {
633                        expected: 44,
634                        actual: node_key.len(),
635                    });
636                }
637                const DER_PREFIX: [u8; 12] = [48, 42, 48, 5, 6, 3, 43, 101, 112, 3, 33, 0];
638                if node_key[..12] != DER_PREFIX {
639                    return Err(AgentError::DerPrefixMismatch {
640                        expected: DER_PREFIX.to_vec(),
641                        actual: node_key[..12].to_vec(),
642                    });
643                }
644                let pubkey = PublicKey::deserialize_raw(&node_key[12..])
645                    .map_err(|_| AgentError::MalformedPublicKey)?;
646
647                match pubkey.verify_signature(&signable, &signature.signature[..]) {
648                    Ok(()) => (),
649                    Err(SignatureError::InvalidSignature) => {
650                        return Err(AgentError::QuerySignatureVerificationFailed)
651                    }
652                    Err(SignatureError::InvalidLength) => {
653                        return Err(AgentError::MalformedSignature)
654                    }
655                    _ => unreachable!(),
656                }
657            }
658            response
659        } else {
660            self.query_endpoint_for::<QueryResponse>(effective_id, signed_query)
661                .await?
662        };
663
664        match response {
665            QueryResponse::Replied { reply, .. } => Ok(reply.arg),
666            QueryResponse::Rejected { reject, .. } => Err(AgentError::UncertifiedReject {
667                reject,
668                operation: Some(operation),
669            }),
670        }
671    }
672
673    fn query_content(
674        &self,
675        canister_id: Principal,
676        method_name: String,
677        arg: Vec<u8>,
678        ingress_expiry_datetime: Option<u64>,
679        use_nonce: bool,
680    ) -> Result<EnvelopeContent, AgentError> {
681        Ok(EnvelopeContent::Query {
682            sender: self.identity.sender().map_err(AgentError::SigningError)?,
683            canister_id,
684            method_name,
685            arg,
686            ingress_expiry: ingress_expiry_datetime.unwrap_or_else(|| self.get_expiry_date()),
687            nonce: use_nonce.then(|| self.nonce_factory.generate()).flatten(),
688            sender_info: self.identity.sender_info(),
689        })
690    }
691
692    /// The simplest way to do an update call; sends a byte array and will return a response, [`CallResponse`], from the replica.
693    async fn update_raw(
694        &self,
695        canister_id: Principal,
696        effective_canister_id: Principal,
697        method_name: String,
698        arg: Vec<u8>,
699        ingress_expiry_datetime: Option<u64>,
700    ) -> Result<CallResponse<(Vec<u8>, Certificate)>, AgentError> {
701        let nonce = self.nonce_factory.generate();
702        let content = self.update_content(
703            canister_id,
704            method_name.clone(),
705            arg,
706            ingress_expiry_datetime,
707            nonce,
708        )?;
709        let operation = Some(Operation::Call {
710            canister: canister_id,
711            method: method_name,
712        });
713        let request_id = to_request_id(&content)?;
714        let serialized_bytes = sign_envelope(&content, self.identity.clone())?;
715
716        let response_body = self
717            .call_endpoint(effective_canister_id, serialized_bytes)
718            .await?;
719
720        match response_body {
721            TransportCallResponse::Replied { certificate } => {
722                let certificate =
723                    serde_cbor::from_slice(&certificate).map_err(AgentError::InvalidCborData)?;
724
725                self.verify(&certificate, effective_canister_id)?;
726                let status = lookup_request_status(&certificate, &request_id)?;
727
728                match status {
729                    RequestStatusResponse::Replied(reply) => {
730                        Ok(CallResponse::Response((reply.arg, certificate)))
731                    }
732                    RequestStatusResponse::Rejected(reject_response) => {
733                        Err(AgentError::CertifiedReject {
734                            reject: reject_response,
735                            operation,
736                        })?
737                    }
738                    _ => Ok(CallResponse::Poll(request_id)),
739                }
740            }
741            TransportCallResponse::Accepted => Ok(CallResponse::Poll(request_id)),
742            TransportCallResponse::NonReplicatedRejection(reject_response) => {
743                Err(AgentError::UncertifiedReject {
744                    reject: reject_response,
745                    operation,
746                })
747            }
748        }
749    }
750
751    /// Send the signed update to the network. Will return a [`CallResponse<Vec<u8>>`].
752    /// The bytes will be checked to verify that it is a valid update.
753    /// If you want to inspect the fields of the update, use [`signed_update_inspect`] before calling this method.
754    ///
755    /// `effective_id` may be either an effective canister id (the common case) or
756    /// an effective subnet id; in the latter case the request is routed to the
757    /// subnet-scoped `/api/v4/subnet/<id>/call` endpoint. Per the IC interface
758    /// spec, subnet-scoped update calls are currently only valid for canister
759    /// creation calls to the Management Canister.
760    pub async fn update_signed(
761        &self,
762        effective_id: impl Into<EffectiveId>,
763        signed_update: Vec<u8>,
764    ) -> Result<CallResponse<Vec<u8>>, AgentError> {
765        let effective_id = effective_id.into();
766        let envelope: Envelope =
767            serde_cbor::from_slice(&signed_update).map_err(AgentError::InvalidCborData)?;
768        let EnvelopeContent::Call {
769            canister_id,
770            method_name,
771            ..
772        } = &*envelope.content
773        else {
774            return Err(AgentError::CallDataMismatch {
775                field: "request_type".to_string(),
776                value_arg: "update".to_string(),
777                value_cbor: if matches!(*envelope.content, EnvelopeContent::Query { .. }) {
778                    "query"
779                } else {
780                    "read_state"
781                }
782                .to_string(),
783            });
784        };
785        let operation = Some(Operation::Call {
786            canister: *canister_id,
787            method: method_name.clone(),
788        });
789        let request_id = to_request_id(&envelope.content)?;
790
791        let response_body = self.call_endpoint_for(effective_id, signed_update).await?;
792
793        match response_body {
794            TransportCallResponse::Replied { certificate } => {
795                let certificate =
796                    serde_cbor::from_slice(&certificate).map_err(AgentError::InvalidCborData)?;
797
798                self.verify(&certificate, effective_id)?;
799                let status = lookup_request_status(&certificate, &request_id)?;
800
801                match status {
802                    RequestStatusResponse::Replied(reply) => Ok(CallResponse::Response(reply.arg)),
803                    RequestStatusResponse::Rejected(reject_response) => {
804                        Err(AgentError::CertifiedReject {
805                            reject: reject_response,
806                            operation,
807                        })?
808                    }
809                    _ => Ok(CallResponse::Poll(request_id)),
810                }
811            }
812            TransportCallResponse::Accepted => Ok(CallResponse::Poll(request_id)),
813            TransportCallResponse::NonReplicatedRejection(reject_response) => {
814                Err(AgentError::UncertifiedReject {
815                    reject: reject_response,
816                    operation,
817                })
818            }
819        }
820    }
821
822    fn update_content(
823        &self,
824        canister_id: Principal,
825        method_name: String,
826        arg: Vec<u8>,
827        ingress_expiry_datetime: Option<u64>,
828        nonce: Option<Vec<u8>>,
829    ) -> Result<EnvelopeContent, AgentError> {
830        Ok(EnvelopeContent::Call {
831            canister_id,
832            method_name,
833            arg,
834            nonce,
835            sender: self.identity.sender().map_err(AgentError::SigningError)?,
836            ingress_expiry: ingress_expiry_datetime.unwrap_or_else(|| self.get_expiry_date()),
837            sender_info: self.identity.sender_info(),
838        })
839    }
840
841    fn get_retry_policy(&self) -> ExponentialBackoff<SystemClock> {
842        ExponentialBackoffBuilder::new()
843            .with_initial_interval(Duration::from_millis(500))
844            .with_max_interval(Duration::from_secs(1))
845            .with_multiplier(1.4)
846            .with_max_elapsed_time(Some(self.max_polling_time))
847            .build()
848    }
849
850    /// Wait for `request_status` to return a Replied response and return the arg.
851    ///
852    /// `effective_id` may be either an effective canister id or an effective
853    /// subnet id; in the latter case the request is routed to the subnet-scoped
854    /// `/api/v3/subnet/<id>/read_state` endpoint.
855    pub async fn wait_signed(
856        &self,
857        request_id: &RequestId,
858        effective_id: impl Into<EffectiveId>,
859        signed_request_status: Vec<u8>,
860    ) -> Result<(Vec<u8>, Certificate), AgentError> {
861        let effective_id = effective_id.into();
862        let mut retry_policy = self.get_retry_policy();
863
864        let mut request_accepted = false;
865        loop {
866            let (resp, cert) = self
867                .request_status_signed(request_id, effective_id, signed_request_status.clone())
868                .await?;
869            match resp {
870                RequestStatusResponse::Unknown => {
871                    // If status is still `Unknown` after 5 minutes, the ingress message is lost.
872                    if retry_policy.get_elapsed_time() > Duration::from_secs(5 * 60) {
873                        return Err(AgentError::TimeoutWaitingForResponse());
874                    }
875                }
876
877                RequestStatusResponse::Received | RequestStatusResponse::Processing => {
878                    if !request_accepted {
879                        retry_policy.reset();
880                        request_accepted = true;
881                    }
882                }
883
884                RequestStatusResponse::Replied(ReplyResponse { arg, .. }) => {
885                    return Ok((arg, cert))
886                }
887
888                RequestStatusResponse::Rejected(response) => {
889                    return Err(AgentError::CertifiedReject {
890                        reject: response,
891                        operation: None,
892                    })
893                }
894
895                RequestStatusResponse::Done => {
896                    return Err(AgentError::RequestStatusDoneNoReply(String::from(
897                        *request_id,
898                    )))
899                }
900            };
901
902            match retry_policy.next_backoff() {
903                Some(duration) => crate::util::sleep(duration).await,
904
905                None => return Err(AgentError::TimeoutWaitingForResponse()),
906            }
907        }
908    }
909
910    /// Call `request_status` on the `RequestId` in a loop and return the response as a byte vector.
911    ///
912    /// `effective_id` may be either an effective canister id or an effective
913    /// subnet id; in the latter case state is polled from the subnet-scoped
914    /// `/api/v3/subnet/<id>/read_state` endpoint.
915    pub async fn wait(
916        &self,
917        request_id: &RequestId,
918        effective_id: impl Into<EffectiveId>,
919    ) -> Result<(Vec<u8>, Certificate), AgentError> {
920        self.wait_inner(request_id, effective_id.into(), None).await
921    }
922
923    async fn wait_inner(
924        &self,
925        request_id: &RequestId,
926        effective_id: EffectiveId,
927        operation: Option<Operation>,
928    ) -> Result<(Vec<u8>, Certificate), AgentError> {
929        let mut retry_policy = self.get_retry_policy();
930
931        let mut request_accepted = false;
932        loop {
933            let (resp, cert) = self.request_status_raw(request_id, effective_id).await?;
934            match resp {
935                RequestStatusResponse::Unknown => {
936                    // If status is still `Unknown` after 5 minutes, the ingress message is lost.
937                    if retry_policy.get_elapsed_time() > Duration::from_secs(5 * 60) {
938                        return Err(AgentError::TimeoutWaitingForResponse());
939                    }
940                }
941
942                RequestStatusResponse::Received | RequestStatusResponse::Processing => {
943                    if !request_accepted {
944                        // The system will return RequestStatusResponse::Unknown
945                        // until the request is accepted
946                        // and we generally cannot know how long that will take.
947                        // State transitions between Received and Processing may be
948                        // instantaneous. Therefore, once we know the request is accepted,
949                        // we should restart the backoff so the request does not time out.
950
951                        retry_policy.reset();
952                        request_accepted = true;
953                    }
954                }
955
956                RequestStatusResponse::Replied(ReplyResponse { arg, .. }) => {
957                    return Ok((arg, cert))
958                }
959
960                RequestStatusResponse::Rejected(response) => {
961                    return Err(AgentError::CertifiedReject {
962                        reject: response,
963                        operation,
964                    })
965                }
966
967                RequestStatusResponse::Done => {
968                    return Err(AgentError::RequestStatusDoneNoReply(String::from(
969                        *request_id,
970                    )))
971                }
972            };
973
974            match retry_policy.next_backoff() {
975                Some(duration) => crate::util::sleep(duration).await,
976
977                None => return Err(AgentError::TimeoutWaitingForResponse()),
978            }
979        }
980    }
981
982    /// Request the raw state tree directly.
983    /// See [the protocol docs](https://internetcomputer.org/docs/current/references/ic-interface-spec#http-read-state) for more information.
984    ///
985    /// `effective_id` may be either an effective canister id or an effective
986    /// subnet id; in the latter case the request is read from the subnet-scoped
987    /// `/api/v3/subnet/<id>/read_state` endpoint.
988    pub async fn read_state_raw(
989        &self,
990        paths: Vec<Vec<Label>>,
991        effective_id: impl Into<EffectiveId>,
992    ) -> Result<Certificate, AgentError> {
993        let effective_id = effective_id.into();
994        let content = self.read_state_content(paths)?;
995        let serialized_bytes = sign_envelope(&content, self.identity.clone())?;
996
997        let read_state_response: ReadStateResponse = self
998            .read_state_endpoint_for(effective_id, serialized_bytes)
999            .await?;
1000        let cert: Certificate = serde_cbor::from_slice(&read_state_response.certificate)
1001            .map_err(AgentError::InvalidCborData)?;
1002        self.verify(&cert, effective_id)?;
1003        Ok(cert)
1004    }
1005
1006    /// Request the raw state tree directly, under a subnet ID.
1007    ///
1008    /// Equivalent to calling [`read_state_raw`](Self::read_state_raw) with
1009    /// [`EffectiveId::Subnet`]; retained for backward compatibility.
1010    pub async fn read_subnet_state_raw(
1011        &self,
1012        paths: Vec<Vec<Label>>,
1013        subnet_id: Principal,
1014    ) -> Result<Certificate, AgentError> {
1015        self.read_state_raw(paths, EffectiveId::Subnet(subnet_id))
1016            .await
1017    }
1018
1019    fn read_state_content(&self, paths: Vec<Vec<Label>>) -> Result<EnvelopeContent, AgentError> {
1020        Ok(EnvelopeContent::ReadState {
1021            sender: self.identity.sender().map_err(AgentError::SigningError)?,
1022            paths,
1023            ingress_expiry: self.get_expiry_date(),
1024        })
1025    }
1026
1027    /// Verify a certificate, checking delegation if present.
1028    ///
1029    /// For [`EffectiveId::Canister`] the certificate must have authority over
1030    /// the canister (its canister-id ranges must include the principal). For
1031    /// [`EffectiveId::Subnet`] the certificate must instead be authoritative
1032    /// for the specified subnet.
1033    pub fn verify(
1034        &self,
1035        cert: &Certificate,
1036        effective_id: impl Into<EffectiveId>,
1037    ) -> Result<(), AgentError> {
1038        match effective_id.into() {
1039            EffectiveId::Canister(p) => self.verify_cert(cert, p)?,
1040            EffectiveId::Subnet(p) => self.verify_cert_for_subnet(cert, p)?,
1041        }
1042        self.verify_cert_timestamp(cert)?;
1043        Ok(())
1044    }
1045
1046    fn verify_cert(
1047        &self,
1048        cert: &Certificate,
1049        effective_canister_id: Principal,
1050    ) -> Result<(), AgentError> {
1051        let sig = &cert.signature;
1052
1053        let root_hash = cert.tree.digest();
1054        let mut msg = vec![];
1055        msg.extend_from_slice(IC_STATE_ROOT_DOMAIN_SEPARATOR);
1056        msg.extend_from_slice(&root_hash);
1057
1058        let der_key = self.check_delegation(&cert.delegation, effective_canister_id)?;
1059        let key = extract_der(der_key)?;
1060
1061        ic_verify_bls_signature::verify_bls_signature(sig, &msg, &key)
1062            .map_err(|_| AgentError::CertificateVerificationFailed())?;
1063        Ok(())
1064    }
1065
1066    /// Verify a certificate, checking delegation if present.
1067    /// Only passes if the certificate is for the specified subnet.
1068    ///
1069    /// Equivalent to calling [`verify`](Self::verify) with
1070    /// [`EffectiveId::Subnet`]; retained for backward compatibility.
1071    pub fn verify_for_subnet(
1072        &self,
1073        cert: &Certificate,
1074        subnet_id: Principal,
1075    ) -> Result<(), AgentError> {
1076        self.verify(cert, EffectiveId::Subnet(subnet_id))
1077    }
1078
1079    fn verify_cert_for_subnet(
1080        &self,
1081        cert: &Certificate,
1082        subnet_id: Principal,
1083    ) -> Result<(), AgentError> {
1084        let sig = &cert.signature;
1085
1086        let root_hash = cert.tree.digest();
1087        let mut msg = vec![];
1088        msg.extend_from_slice(IC_STATE_ROOT_DOMAIN_SEPARATOR);
1089        msg.extend_from_slice(&root_hash);
1090
1091        let der_key = self.check_delegation_for_subnet(&cert.delegation, subnet_id)?;
1092        let key = extract_der(der_key)?;
1093
1094        ic_verify_bls_signature::verify_bls_signature(sig, &msg, &key)
1095            .map_err(|_| AgentError::CertificateVerificationFailed())?;
1096        Ok(())
1097    }
1098
1099    fn verify_cert_timestamp(&self, cert: &Certificate) -> Result<(), AgentError> {
1100        // Verify that the certificate is not older than ingress expiry
1101        // Certificates with timestamps in the future are allowed
1102        let time = lookup_time(cert)?;
1103        if (OffsetDateTime::now_utc()
1104            - OffsetDateTime::from_unix_timestamp_nanos(time.into()).unwrap())
1105            > self.ingress_expiry
1106        {
1107            Err(AgentError::CertificateOutdated(self.ingress_expiry))
1108        } else {
1109            Ok(())
1110        }
1111    }
1112
1113    fn check_delegation(
1114        &self,
1115        delegation: &Option<Delegation>,
1116        effective_canister_id: Principal,
1117    ) -> Result<Vec<u8>, AgentError> {
1118        match delegation {
1119            None => Ok(self.read_root_key()),
1120            Some(delegation) => {
1121                let cert: Certificate = serde_cbor::from_slice(&delegation.certificate)
1122                    .map_err(AgentError::InvalidCborData)?;
1123                if cert.delegation.is_some() {
1124                    return Err(AgentError::CertificateHasTooManyDelegations);
1125                }
1126                self.verify_cert(&cert, effective_canister_id)?;
1127                let canister_range_shards_lookup =
1128                    ["canister_ranges".as_bytes(), delegation.subnet_id.as_ref()];
1129                let ranges: Vec<(Principal, Principal)> =
1130                    match lookup_tree(&cert.tree, canister_range_shards_lookup) {
1131                        Ok(canister_range_shards) => {
1132                            let mut shard_paths = canister_range_shards
1133                                .list_paths() // /canister_ranges/<subnet_id>/<shard>
1134                                .into_iter()
1135                                .map(|mut x| {
1136                                    x.pop() // flatten [label] to label
1137                                        .ok_or_else(AgentError::CertificateVerificationFailed)
1138                                })
1139                                .collect::<Result<Vec<_>, _>>()?;
1140                            if shard_paths.is_empty() {
1141                                return Err(AgentError::CertificateNotAuthorized());
1142                            }
1143                            shard_paths.sort_unstable();
1144                            let shard_division = shard_paths.partition_point(|shard| {
1145                                shard.as_bytes() <= effective_canister_id.as_slice()
1146                            });
1147                            if shard_division == 0 {
1148                                // the certificate is not authorized to answer calls for this canister
1149                                return Err(AgentError::CertificateNotAuthorized());
1150                            }
1151                            let max_potential_shard = &shard_paths[shard_division - 1];
1152                            let canister_range = lookup_value(
1153                                &canister_range_shards,
1154                                [max_potential_shard.as_bytes()],
1155                            )?;
1156                            serde_cbor::from_slice(canister_range)
1157                                .map_err(AgentError::InvalidCborData)?
1158                        }
1159                        // Fall back to /subnet/<subnet_id>/canister_ranges (non-sharded)
1160                        Err(AgentError::LookupPathAbsent(_) | AgentError::LookupPathUnknown(_)) => {
1161                            let subnet_ranges_path = [
1162                                "subnet".as_bytes(),
1163                                delegation.subnet_id.as_ref(),
1164                                "canister_ranges".as_bytes(),
1165                            ];
1166                            let canister_range = lookup_value(&cert.tree, subnet_ranges_path)?;
1167                            serde_cbor::from_slice(canister_range)
1168                                .map_err(AgentError::InvalidCborData)?
1169                        }
1170                        Err(e) => return Err(e),
1171                    };
1172                if !principal_is_within_ranges(&effective_canister_id, &ranges[..]) {
1173                    // the certificate is not authorized to answer calls for this canister
1174                    return Err(AgentError::CertificateNotAuthorized());
1175                }
1176
1177                let public_key_path = [
1178                    "subnet".as_bytes(),
1179                    delegation.subnet_id.as_ref(),
1180                    "public_key".as_bytes(),
1181                ];
1182                lookup_value(&cert.tree, public_key_path).map(<[u8]>::to_vec)
1183            }
1184        }
1185    }
1186
1187    fn check_delegation_for_subnet(
1188        &self,
1189        delegation: &Option<Delegation>,
1190        subnet_id: Principal,
1191    ) -> Result<Vec<u8>, AgentError> {
1192        match delegation {
1193            None => Ok(self.read_root_key()),
1194            Some(delegation) => {
1195                let cert: Certificate = serde_cbor::from_slice(&delegation.certificate)
1196                    .map_err(AgentError::InvalidCborData)?;
1197                if cert.delegation.is_some() {
1198                    return Err(AgentError::CertificateHasTooManyDelegations);
1199                }
1200                self.verify_cert_for_subnet(&cert, subnet_id)?;
1201                let public_key_path = [
1202                    "subnet".as_bytes(),
1203                    subnet_id.as_ref(),
1204                    "public_key".as_bytes(),
1205                ];
1206                let pk = lookup_value(&cert.tree, public_key_path)
1207                    .map_err(|_| AgentError::CertificateNotAuthorized())?
1208                    .to_vec();
1209                Ok(pk)
1210            }
1211        }
1212    }
1213
1214    /// Request information about a particular canister for a single state subkey.
1215    /// See [the protocol docs](https://internetcomputer.org/docs/current/references/ic-interface-spec#state-tree-canister-information) for more information.
1216    pub async fn read_state_canister_info(
1217        &self,
1218        canister_id: Principal,
1219        path: &str,
1220    ) -> Result<Vec<u8>, AgentError> {
1221        let paths: Vec<Vec<Label>> = vec![vec![
1222            "canister".into(),
1223            Label::from_bytes(canister_id.as_slice()),
1224            path.into(),
1225        ]];
1226
1227        let cert = self.read_state_raw(paths, canister_id).await?;
1228
1229        lookup_canister_info(cert, canister_id, path)
1230    }
1231
1232    /// Request the controller list of a given canister.
1233    pub async fn read_state_canister_controllers(
1234        &self,
1235        canister_id: Principal,
1236    ) -> Result<Vec<Principal>, AgentError> {
1237        let blob = self
1238            .read_state_canister_info(canister_id, "controllers")
1239            .await?;
1240        let controllers: Vec<Principal> =
1241            serde_cbor::from_slice(&blob).map_err(AgentError::InvalidCborData)?;
1242        Ok(controllers)
1243    }
1244
1245    /// Request the module hash of a given canister.
1246    pub async fn read_state_canister_module_hash(
1247        &self,
1248        canister_id: Principal,
1249    ) -> Result<Vec<u8>, AgentError> {
1250        self.read_state_canister_info(canister_id, "module_hash")
1251            .await
1252    }
1253
1254    /// Request the bytes of the canister's custom section `icp:public <path>` or `icp:private <path>`.
1255    pub async fn read_state_canister_metadata(
1256        &self,
1257        canister_id: Principal,
1258        path: &str,
1259    ) -> Result<Vec<u8>, AgentError> {
1260        let paths: Vec<Vec<Label>> = vec![vec![
1261            "canister".into(),
1262            Label::from_bytes(canister_id.as_slice()),
1263            "metadata".into(),
1264            path.into(),
1265        ]];
1266
1267        let cert = self.read_state_raw(paths, canister_id).await?;
1268
1269        lookup_canister_metadata(cert, canister_id, path)
1270    }
1271
1272    /// Request a list of metrics about the subnet.
1273    pub async fn read_state_subnet_metrics(
1274        &self,
1275        subnet_id: Principal,
1276    ) -> Result<SubnetMetrics, AgentError> {
1277        let paths = vec![vec![
1278            "subnet".into(),
1279            Label::from_bytes(subnet_id.as_slice()),
1280            "metrics".into(),
1281        ]];
1282        let cert = self.read_subnet_state_raw(paths, subnet_id).await?;
1283        lookup_subnet_metrics(cert, subnet_id)
1284    }
1285
1286    /// Request a list of metrics about the subnet.
1287    pub async fn read_state_subnet_canister_ranges(
1288        &self,
1289        subnet_id: Principal,
1290    ) -> Result<Vec<(Principal, Principal)>, AgentError> {
1291        let paths = vec![vec![
1292            "subnet".into(),
1293            Label::from_bytes(subnet_id.as_slice()),
1294            "canister_ranges".into(),
1295        ]];
1296        let cert = self.read_subnet_state_raw(paths, subnet_id).await?;
1297        lookup_subnet_canister_ranges(&cert, subnet_id)
1298    }
1299
1300    /// Fetches the status of a particular request by its ID.
1301    ///
1302    /// `effective_id` may be either an effective canister id or an effective
1303    /// subnet id; in the latter case the request is read from the subnet-scoped
1304    /// `/api/v3/subnet/<id>/read_state` endpoint.
1305    pub async fn request_status_raw(
1306        &self,
1307        request_id: &RequestId,
1308        effective_id: impl Into<EffectiveId>,
1309    ) -> Result<(RequestStatusResponse, Certificate), AgentError> {
1310        let paths: Vec<Vec<Label>> =
1311            vec![vec!["request_status".into(), request_id.to_vec().into()]];
1312
1313        let cert = self.read_state_raw(paths, effective_id).await?;
1314
1315        Ok((lookup_request_status(&cert, request_id)?, cert))
1316    }
1317
1318    /// Send the signed `request_status` to the network. Will return [`RequestStatusResponse`].
1319    /// The bytes will be checked to verify that it is a valid `request_status`.
1320    /// If you want to inspect the fields of the `request_status`, use [`signed_request_status_inspect`] before calling this method.
1321    ///
1322    /// `effective_id` may be either an effective canister id or an effective
1323    /// subnet id; in the latter case the request is read from the subnet-scoped
1324    /// `/api/v3/subnet/<id>/read_state` endpoint.
1325    pub async fn request_status_signed(
1326        &self,
1327        request_id: &RequestId,
1328        effective_id: impl Into<EffectiveId>,
1329        signed_request_status: Vec<u8>,
1330    ) -> Result<(RequestStatusResponse, Certificate), AgentError> {
1331        let effective_id = effective_id.into();
1332        let _envelope: Envelope =
1333            serde_cbor::from_slice(&signed_request_status).map_err(AgentError::InvalidCborData)?;
1334        let read_state_response: ReadStateResponse = self
1335            .read_state_endpoint_for(effective_id, signed_request_status)
1336            .await?;
1337
1338        let cert: Certificate = serde_cbor::from_slice(&read_state_response.certificate)
1339            .map_err(AgentError::InvalidCborData)?;
1340        self.verify(&cert, effective_id)?;
1341        Ok((lookup_request_status(&cert, request_id)?, cert))
1342    }
1343
1344    /// Returns an `UpdateBuilder` enabling the construction of an update call without
1345    /// passing all arguments.
1346    pub fn update<S: Into<String>>(
1347        &self,
1348        canister_id: &Principal,
1349        method_name: S,
1350    ) -> UpdateBuilder<'_> {
1351        UpdateBuilder::new(self, *canister_id, method_name.into())
1352    }
1353
1354    /// Calls and returns the information returned by the status endpoint of a replica.
1355    pub async fn status(&self) -> Result<Status, AgentError> {
1356        let endpoint = "api/v2/status";
1357        let bytes = self.execute(Method::GET, endpoint, None).await?.1;
1358
1359        let cbor: serde_cbor::Value =
1360            serde_cbor::from_slice(&bytes).map_err(AgentError::InvalidCborData)?;
1361
1362        Status::try_from(&cbor).map_err(|_| AgentError::InvalidReplicaStatus)
1363    }
1364
1365    /// Returns a `QueryBuilder` enabling the construction of a query call without
1366    /// passing all arguments.
1367    pub fn query<S: Into<String>>(
1368        &self,
1369        canister_id: &Principal,
1370        method_name: S,
1371    ) -> QueryBuilder<'_> {
1372        QueryBuilder::new(self, *canister_id, method_name.into())
1373    }
1374
1375    /// Sign a `request_status` call. This will return a [`signed::SignedRequestStatus`]
1376    /// which contains all fields of the `request_status` and the signed `request_status` in CBOR encoding.
1377    ///
1378    /// `effective_id` may be either an effective canister ID or an effective subnet ID,
1379    /// depending on which endpoint the caller will submit the signed message to.
1380    pub fn sign_request_status(
1381        &self,
1382        effective_id: impl Into<EffectiveId>,
1383        request_id: RequestId,
1384    ) -> Result<SignedRequestStatus, AgentError> {
1385        let effective_id = effective_id.into();
1386        let paths: Vec<Vec<Label>> =
1387            vec![vec!["request_status".into(), request_id.to_vec().into()]];
1388        let read_state_content = self.read_state_content(paths)?;
1389        let signed_request_status = sign_envelope(&read_state_content, self.identity.clone())?;
1390        let ingress_expiry = read_state_content.ingress_expiry();
1391        let sender = *read_state_content.sender();
1392        Ok(SignedRequestStatus {
1393            ingress_expiry,
1394            sender,
1395            effective_canister_id: effective_id.as_principal(),
1396            request_id,
1397            signed_request_status,
1398        })
1399    }
1400
1401    /// Retrieve subnet information for a canister. This uses an internal five-minute cache, fresh data can
1402    /// be fetched with [`fetch_subnet_by_canister`](Self::fetch_subnet_by_canister).
1403    pub async fn get_subnet_by_canister(
1404        &self,
1405        canister: &Principal,
1406    ) -> Result<Arc<Subnet>, AgentError> {
1407        let subnet = self
1408            .subnet_key_cache
1409            .lock()
1410            .unwrap()
1411            .get_subnet_by_canister(canister);
1412        if let Some(subnet) = subnet {
1413            Ok(subnet)
1414        } else {
1415            self.fetch_subnet_by_canister(canister).await
1416        }
1417    }
1418
1419    /// Retrieve subnet information for a subnet ID. This uses an internal five-minute cache, fresh data can
1420    /// be fetched with [`fetch_subnet_by_id`](Self::fetch_subnet_by_id).
1421    pub async fn get_subnet_by_id(&self, subnet_id: &Principal) -> Result<Arc<Subnet>, AgentError> {
1422        let subnet = self
1423            .subnet_key_cache
1424            .lock()
1425            .unwrap()
1426            .get_subnet_by_id(subnet_id);
1427        if let Some(subnet) = subnet {
1428            Ok(subnet)
1429        } else {
1430            self.fetch_subnet_by_id(subnet_id).await
1431        }
1432    }
1433
1434    /// Retrieve all existing API boundary nodes from the state tree via endpoint `/api/v3/canister/<effective_canister_id>/read_state`
1435    pub async fn fetch_api_boundary_nodes_by_canister_id(
1436        &self,
1437        canister_id: Principal,
1438    ) -> Result<Vec<ApiBoundaryNode>, AgentError> {
1439        let paths = vec![vec!["api_boundary_nodes".into()]];
1440        let certificate = self.read_state_raw(paths, canister_id).await?;
1441        let api_boundary_nodes = lookup_api_boundary_nodes(certificate)?;
1442        Ok(api_boundary_nodes)
1443    }
1444
1445    /// Retrieve all existing API boundary nodes from the state tree via endpoint `/api/v3/subnet/<subnet_id>/read_state`
1446    pub async fn fetch_api_boundary_nodes_by_subnet_id(
1447        &self,
1448        subnet_id: Principal,
1449    ) -> Result<Vec<ApiBoundaryNode>, AgentError> {
1450        let paths = vec![vec!["api_boundary_nodes".into()]];
1451        let certificate = self.read_subnet_state_raw(paths, subnet_id).await?;
1452        let api_boundary_nodes = lookup_api_boundary_nodes(certificate)?;
1453        Ok(api_boundary_nodes)
1454    }
1455
1456    /// Fetches and caches the subnet information for a canister.
1457    ///
1458    /// This function does not read from the cache; most users want
1459    /// [`get_subnet_by_canister`](Self::get_subnet_by_canister) instead.
1460    pub async fn fetch_subnet_by_canister(
1461        &self,
1462        canister: &Principal,
1463    ) -> Result<Arc<Subnet>, AgentError> {
1464        let canister_cert = self
1465            .read_state_raw(vec![vec!["subnet".into()]], *canister)
1466            .await?;
1467        let subnet_id = if let Some(delegation) = canister_cert.delegation.as_ref() {
1468            Principal::from_slice(&delegation.subnet_id)
1469        } else {
1470            // if no delegation, it comes from the root subnet
1471            Principal::self_authenticating(&self.root_key.read().unwrap()[..])
1472        };
1473        let mut subnet = lookup_incomplete_subnet(&subnet_id, &canister_cert)?;
1474        let canister_ranges = if let Some(delegation) = canister_cert.delegation.as_ref() {
1475            // non-root subnets will not serve /subnet/<>/canister_ranges when looked up by canister, but their delegation will contain /canister_ranges
1476            let delegation_cert: Certificate = serde_cbor::from_slice(&delegation.certificate)?;
1477            lookup_canister_ranges(&subnet_id, &delegation_cert)?
1478        } else {
1479            lookup_canister_ranges(&subnet_id, &canister_cert)?
1480        };
1481        subnet.canister_ranges = canister_ranges;
1482        if !subnet.canister_ranges.contains(canister) {
1483            return Err(AgentError::CertificateNotAuthorized());
1484        }
1485        let subnet = Arc::new(subnet);
1486        self.subnet_key_cache
1487            .lock()
1488            .unwrap()
1489            .insert_subnet(subnet_id, subnet.clone());
1490        Ok(subnet)
1491    }
1492
1493    /// Fetches and caches the subnet information for a subnet ID.
1494    ///
1495    /// This function does not read from the cache; most users want
1496    /// [`get_subnet_by_id`](Self::get_subnet_by_id) instead.
1497    pub async fn fetch_subnet_by_id(
1498        &self,
1499        subnet_id: &Principal,
1500    ) -> Result<Arc<Subnet>, AgentError> {
1501        let subnet_cert = self
1502            .read_subnet_state_raw(
1503                vec![
1504                    vec!["canister_ranges".into(), subnet_id.as_slice().into()],
1505                    vec!["subnet".into(), subnet_id.as_slice().into()],
1506                ],
1507                *subnet_id,
1508            )
1509            .await?;
1510        let subnet = lookup_subnet_and_ranges(subnet_id, &subnet_cert)?;
1511        let subnet = Arc::new(subnet);
1512        self.subnet_key_cache
1513            .lock()
1514            .unwrap()
1515            .insert_subnet(*subnet_id, subnet.clone());
1516        Ok(subnet)
1517    }
1518
1519    async fn request(
1520        &self,
1521        method: Method,
1522        endpoint: &str,
1523        body: Option<Vec<u8>>,
1524    ) -> Result<(StatusCode, HeaderMap, Vec<u8>), AgentError> {
1525        let body = body.map(Bytes::from);
1526
1527        let create_request_with_generated_url = || -> Result<http::Request<Bytes>, AgentError> {
1528            let url = self.route_provider.route()?.join(endpoint)?;
1529            let uri = Uri::from_str(url.as_str())
1530                .map_err(|e| AgentError::InvalidReplicaUrl(e.to_string()))?;
1531            let body = body.clone().unwrap_or_default();
1532            let request = http::Request::builder()
1533                .method(method.clone())
1534                .uri(uri)
1535                .header(CONTENT_TYPE, "application/cbor")
1536                .body(body)
1537                .map_err(|e| {
1538                    AgentError::TransportError(TransportError::Generic(format!(
1539                        "unable to create request: {e:#}"
1540                    )))
1541                })?;
1542
1543            Ok(request)
1544        };
1545
1546        let response = self
1547            .client
1548            .call(
1549                &create_request_with_generated_url,
1550                self.max_tcp_error_retries,
1551                self.max_response_body_size,
1552            )
1553            .await?;
1554
1555        let (parts, body) = response.into_parts();
1556
1557        Ok((parts.status, parts.headers, body.to_vec()))
1558    }
1559
1560    async fn execute(
1561        &self,
1562        method: Method,
1563        endpoint: &str,
1564        body: Option<Vec<u8>>,
1565    ) -> Result<(StatusCode, Vec<u8>), AgentError> {
1566        let request_result = self.request(method.clone(), endpoint, body.clone()).await?;
1567
1568        let status = request_result.0;
1569        let headers = request_result.1;
1570        let body = request_result.2;
1571
1572        if status.is_client_error() || status.is_server_error() {
1573            Err(AgentError::HttpError(HttpErrorPayload {
1574                status: status.into(),
1575                content_type: headers
1576                    .get(CONTENT_TYPE)
1577                    .and_then(|value| value.to_str().ok())
1578                    .map(str::to_string),
1579                content: body,
1580            }))
1581        } else if !(status == StatusCode::OK || status == StatusCode::ACCEPTED) {
1582            Err(AgentError::InvalidHttpResponse(format!(
1583                "Expected `200`, `202`, 4xx`, or `5xx` HTTP status code. Got: {status}",
1584            )))
1585        } else {
1586            Ok((status, body))
1587        }
1588    }
1589}
1590
1591// Checks if a principal is contained within a list of principal ranges
1592// A range is a tuple: (low: Principal, high: Principal), as described here: https://internetcomputer.org/docs/current/references/ic-interface-spec#state-tree-subnet
1593fn principal_is_within_ranges(principal: &Principal, ranges: &[(Principal, Principal)]) -> bool {
1594    ranges
1595        .iter()
1596        .any(|r| principal >= &r.0 && principal <= &r.1)
1597}
1598
1599fn sign_envelope(
1600    content: &EnvelopeContent,
1601    identity: Arc<dyn Identity>,
1602) -> Result<Vec<u8>, AgentError> {
1603    let signature = identity.sign(content).map_err(AgentError::SigningError)?;
1604
1605    let envelope = Envelope {
1606        content: Cow::Borrowed(content),
1607        sender_pubkey: signature.public_key,
1608        sender_sig: signature.signature,
1609        sender_delegation: signature.delegations,
1610    };
1611
1612    let mut serialized_bytes = Vec::new();
1613    let mut serializer = serde_cbor::Serializer::new(&mut serialized_bytes);
1614    serializer.self_describe()?;
1615    envelope.serialize(&mut serializer)?;
1616
1617    Ok(serialized_bytes)
1618}
1619
1620/// Inspect the bytes to be sent as a query
1621/// Return Ok only when the bytes can be deserialized as a query and all fields match with the arguments
1622pub fn signed_query_inspect(
1623    sender: Principal,
1624    canister_id: Principal,
1625    method_name: &str,
1626    arg: &[u8],
1627    ingress_expiry: u64,
1628    signed_query: Vec<u8>,
1629) -> Result<(), AgentError> {
1630    let envelope: Envelope =
1631        serde_cbor::from_slice(&signed_query).map_err(AgentError::InvalidCborData)?;
1632    match envelope.content.as_ref() {
1633        EnvelopeContent::Query {
1634            ingress_expiry: ingress_expiry_cbor,
1635            sender: sender_cbor,
1636            canister_id: canister_id_cbor,
1637            method_name: method_name_cbor,
1638            arg: arg_cbor,
1639            nonce: _nonce,
1640            sender_info: _,
1641        } => {
1642            if ingress_expiry != *ingress_expiry_cbor {
1643                return Err(AgentError::CallDataMismatch {
1644                    field: "ingress_expiry".to_string(),
1645                    value_arg: ingress_expiry.to_string(),
1646                    value_cbor: ingress_expiry_cbor.to_string(),
1647                });
1648            }
1649            if sender != *sender_cbor {
1650                return Err(AgentError::CallDataMismatch {
1651                    field: "sender".to_string(),
1652                    value_arg: sender.to_string(),
1653                    value_cbor: sender_cbor.to_string(),
1654                });
1655            }
1656            if canister_id != *canister_id_cbor {
1657                return Err(AgentError::CallDataMismatch {
1658                    field: "canister_id".to_string(),
1659                    value_arg: canister_id.to_string(),
1660                    value_cbor: canister_id_cbor.to_string(),
1661                });
1662            }
1663            if method_name != *method_name_cbor {
1664                return Err(AgentError::CallDataMismatch {
1665                    field: "method_name".to_string(),
1666                    value_arg: method_name.to_string(),
1667                    value_cbor: method_name_cbor.clone(),
1668                });
1669            }
1670            if arg != *arg_cbor {
1671                return Err(AgentError::CallDataMismatch {
1672                    field: "arg".to_string(),
1673                    value_arg: format!("{arg:?}"),
1674                    value_cbor: format!("{arg_cbor:?}"),
1675                });
1676            }
1677        }
1678        EnvelopeContent::Call { .. } => {
1679            return Err(AgentError::CallDataMismatch {
1680                field: "request_type".to_string(),
1681                value_arg: "query".to_string(),
1682                value_cbor: "call".to_string(),
1683            })
1684        }
1685        EnvelopeContent::ReadState { .. } => {
1686            return Err(AgentError::CallDataMismatch {
1687                field: "request_type".to_string(),
1688                value_arg: "query".to_string(),
1689                value_cbor: "read_state".to_string(),
1690            })
1691        }
1692    }
1693    Ok(())
1694}
1695
1696/// Inspect the bytes to be sent as an update
1697/// Return Ok only when the bytes can be deserialized as an update and all fields match with the arguments
1698pub fn signed_update_inspect(
1699    sender: Principal,
1700    canister_id: Principal,
1701    method_name: &str,
1702    arg: &[u8],
1703    ingress_expiry: u64,
1704    signed_update: Vec<u8>,
1705) -> Result<(), AgentError> {
1706    let envelope: Envelope =
1707        serde_cbor::from_slice(&signed_update).map_err(AgentError::InvalidCborData)?;
1708    match envelope.content.as_ref() {
1709        EnvelopeContent::Call {
1710            nonce: _nonce,
1711            ingress_expiry: ingress_expiry_cbor,
1712            sender: sender_cbor,
1713            canister_id: canister_id_cbor,
1714            method_name: method_name_cbor,
1715            arg: arg_cbor,
1716            sender_info: _,
1717        } => {
1718            if ingress_expiry != *ingress_expiry_cbor {
1719                return Err(AgentError::CallDataMismatch {
1720                    field: "ingress_expiry".to_string(),
1721                    value_arg: ingress_expiry.to_string(),
1722                    value_cbor: ingress_expiry_cbor.to_string(),
1723                });
1724            }
1725            if sender != *sender_cbor {
1726                return Err(AgentError::CallDataMismatch {
1727                    field: "sender".to_string(),
1728                    value_arg: sender.to_string(),
1729                    value_cbor: sender_cbor.to_string(),
1730                });
1731            }
1732            if canister_id != *canister_id_cbor {
1733                return Err(AgentError::CallDataMismatch {
1734                    field: "canister_id".to_string(),
1735                    value_arg: canister_id.to_string(),
1736                    value_cbor: canister_id_cbor.to_string(),
1737                });
1738            }
1739            if method_name != *method_name_cbor {
1740                return Err(AgentError::CallDataMismatch {
1741                    field: "method_name".to_string(),
1742                    value_arg: method_name.to_string(),
1743                    value_cbor: method_name_cbor.clone(),
1744                });
1745            }
1746            if arg != *arg_cbor {
1747                return Err(AgentError::CallDataMismatch {
1748                    field: "arg".to_string(),
1749                    value_arg: format!("{arg:?}"),
1750                    value_cbor: format!("{arg_cbor:?}"),
1751                });
1752            }
1753        }
1754        EnvelopeContent::ReadState { .. } => {
1755            return Err(AgentError::CallDataMismatch {
1756                field: "request_type".to_string(),
1757                value_arg: "call".to_string(),
1758                value_cbor: "read_state".to_string(),
1759            })
1760        }
1761        EnvelopeContent::Query { .. } => {
1762            return Err(AgentError::CallDataMismatch {
1763                field: "request_type".to_string(),
1764                value_arg: "call".to_string(),
1765                value_cbor: "query".to_string(),
1766            })
1767        }
1768    }
1769    Ok(())
1770}
1771
1772/// Inspect the bytes to be sent as a `request_status`
1773/// Return Ok only when the bytes can be deserialized as a `request_status` and all fields match with the arguments
1774pub fn signed_request_status_inspect(
1775    sender: Principal,
1776    request_id: &RequestId,
1777    ingress_expiry: u64,
1778    signed_request_status: Vec<u8>,
1779) -> Result<(), AgentError> {
1780    let paths: Vec<Vec<Label>> = vec![vec!["request_status".into(), request_id.to_vec().into()]];
1781    let envelope: Envelope =
1782        serde_cbor::from_slice(&signed_request_status).map_err(AgentError::InvalidCborData)?;
1783    match envelope.content.as_ref() {
1784        EnvelopeContent::ReadState {
1785            ingress_expiry: ingress_expiry_cbor,
1786            sender: sender_cbor,
1787            paths: paths_cbor,
1788        } => {
1789            if ingress_expiry != *ingress_expiry_cbor {
1790                return Err(AgentError::CallDataMismatch {
1791                    field: "ingress_expiry".to_string(),
1792                    value_arg: ingress_expiry.to_string(),
1793                    value_cbor: ingress_expiry_cbor.to_string(),
1794                });
1795            }
1796            if sender != *sender_cbor {
1797                return Err(AgentError::CallDataMismatch {
1798                    field: "sender".to_string(),
1799                    value_arg: sender.to_string(),
1800                    value_cbor: sender_cbor.to_string(),
1801                });
1802            }
1803
1804            if paths != *paths_cbor {
1805                return Err(AgentError::CallDataMismatch {
1806                    field: "paths".to_string(),
1807                    value_arg: format!("{paths:?}"),
1808                    value_cbor: format!("{paths_cbor:?}"),
1809                });
1810            }
1811        }
1812        EnvelopeContent::Query { .. } => {
1813            return Err(AgentError::CallDataMismatch {
1814                field: "request_type".to_string(),
1815                value_arg: "read_state".to_string(),
1816                value_cbor: "query".to_string(),
1817            })
1818        }
1819        EnvelopeContent::Call { .. } => {
1820            return Err(AgentError::CallDataMismatch {
1821                field: "request_type".to_string(),
1822                value_arg: "read_state".to_string(),
1823                value_cbor: "call".to_string(),
1824            })
1825        }
1826    }
1827    Ok(())
1828}
1829
1830#[derive(Clone)]
1831struct SubnetCache {
1832    subnets: TimedCache<Principal, Arc<Subnet>>,
1833    canister_index: RangeInclusiveMap<Principal, Principal>,
1834}
1835
1836impl SubnetCache {
1837    fn new() -> Self {
1838        Self {
1839            subnets: TimedCache::with_lifespan(Duration::from_secs(300)),
1840            canister_index: RangeInclusiveMap::new(),
1841        }
1842    }
1843
1844    fn get_subnet_by_canister(&mut self, canister: &Principal) -> Option<Arc<Subnet>> {
1845        self.canister_index
1846            .get(canister)
1847            .and_then(|subnet_id| self.subnets.cache_get(subnet_id).cloned())
1848            .filter(|subnet| subnet.canister_ranges.contains(canister))
1849    }
1850
1851    fn get_subnet_by_id(&mut self, subnet_id: &Principal) -> Option<Arc<Subnet>> {
1852        self.subnets.cache_get(subnet_id).cloned()
1853    }
1854
1855    fn insert_subnet(&mut self, subnet_id: Principal, subnet: Arc<Subnet>) {
1856        self.subnets.cache_set(subnet_id, subnet.clone());
1857        for range in subnet.canister_ranges.iter() {
1858            self.canister_index.insert(range.clone(), subnet_id);
1859        }
1860    }
1861}
1862
1863/// API boundary node, which routes /api calls to IC replica nodes.
1864#[derive(Debug, Clone)]
1865pub struct ApiBoundaryNode {
1866    /// Domain name
1867    pub domain: String,
1868    /// IPv6 address in the hexadecimal notation with colons.
1869    pub ipv6_address: String,
1870    /// IPv4 address in the dotted-decimal notation.
1871    pub ipv4_address: Option<String>,
1872}
1873
1874/// A query request builder.
1875///
1876/// This makes it easier to do query calls without actually passing all arguments.
1877#[derive(Debug, Clone)]
1878#[non_exhaustive]
1879pub struct QueryBuilder<'agent> {
1880    agent: &'agent Agent,
1881    /// The [effective canister ID](https://internetcomputer.org/docs/current/references/ic-interface-spec#http-effective-canister-id) of the destination.
1882    pub effective_canister_id: Principal,
1883    /// The principal ID of the canister being called.
1884    pub canister_id: Principal,
1885    /// The name of the canister method being called.
1886    pub method_name: String,
1887    /// The argument blob to be passed to the method.
1888    pub arg: Vec<u8>,
1889    /// The Unix timestamp that the request will expire at.
1890    pub ingress_expiry_datetime: Option<u64>,
1891    /// Whether to include a nonce with the message.
1892    pub use_nonce: bool,
1893}
1894
1895impl<'agent> QueryBuilder<'agent> {
1896    /// Creates a new query builder with an agent for a particular canister method.
1897    pub fn new(agent: &'agent Agent, canister_id: Principal, method_name: String) -> Self {
1898        Self {
1899            agent,
1900            effective_canister_id: canister_id,
1901            canister_id,
1902            method_name,
1903            arg: vec![],
1904            ingress_expiry_datetime: None,
1905            use_nonce: false,
1906        }
1907    }
1908
1909    /// Sets the [effective canister ID](https://internetcomputer.org/docs/current/references/ic-interface-spec#http-effective-canister-id) of the destination.
1910    pub fn with_effective_canister_id(mut self, canister_id: Principal) -> Self {
1911        self.effective_canister_id = canister_id;
1912        self
1913    }
1914
1915    /// Sets the argument blob to pass to the canister. For most canisters this should be a Candid-serialized tuple.
1916    pub fn with_arg<A: Into<Vec<u8>>>(mut self, arg: A) -> Self {
1917        self.arg = arg.into();
1918        self
1919    }
1920
1921    /// Sets `ingress_expiry_datetime` to the provided timestamp, at nanosecond precision.
1922    pub fn expire_at(mut self, time: impl Into<OffsetDateTime>) -> Self {
1923        self.ingress_expiry_datetime = Some(time.into().unix_timestamp_nanos() as u64);
1924        self
1925    }
1926
1927    /// Sets `ingress_expiry_datetime` to `max(now, 4min)`.
1928    pub fn expire_after(mut self, duration: Duration) -> Self {
1929        self.ingress_expiry_datetime = Some(
1930            OffsetDateTime::now_utc()
1931                .saturating_add(duration.try_into().expect("negative duration"))
1932                .unix_timestamp_nanos() as u64,
1933        );
1934        self
1935    }
1936
1937    /// Uses a nonce generated with the agent's configured nonce factory. By default queries do not use nonces,
1938    /// and thus may get a (briefly) cached response.
1939    pub fn with_nonce_generation(mut self) -> Self {
1940        self.use_nonce = true;
1941        self
1942    }
1943
1944    /// Make a query call. This will return a byte vector.
1945    pub async fn call(self) -> Result<Vec<u8>, AgentError> {
1946        self.agent
1947            .query_raw(
1948                self.canister_id,
1949                self.effective_canister_id,
1950                self.method_name,
1951                self.arg,
1952                self.ingress_expiry_datetime,
1953                self.use_nonce,
1954                None,
1955            )
1956            .await
1957    }
1958
1959    /// Make a query call with signature verification. This will return a byte vector.
1960    ///
1961    /// Compared with [call][Self::call], this method will **always** verify the signature of the query response
1962    /// regardless the Agent level configuration from [`AgentBuilder::with_verify_query_signatures`].
1963    pub async fn call_with_verification(self) -> Result<Vec<u8>, AgentError> {
1964        self.agent
1965            .query_raw(
1966                self.canister_id,
1967                self.effective_canister_id,
1968                self.method_name,
1969                self.arg,
1970                self.ingress_expiry_datetime,
1971                self.use_nonce,
1972                Some(true),
1973            )
1974            .await
1975    }
1976
1977    /// Make a query call without signature verification. This will return a byte vector.
1978    ///
1979    /// Compared with [call][Self::call], this method will **never** verify the signature of the query response
1980    /// regardless the Agent level configuration from [`AgentBuilder::with_verify_query_signatures`].
1981    pub async fn call_without_verification(self) -> Result<Vec<u8>, AgentError> {
1982        self.agent
1983            .query_raw(
1984                self.canister_id,
1985                self.effective_canister_id,
1986                self.method_name,
1987                self.arg,
1988                self.ingress_expiry_datetime,
1989                self.use_nonce,
1990                Some(false),
1991            )
1992            .await
1993    }
1994
1995    /// Sign a query call. This will return a [`signed::SignedQuery`]
1996    /// which contains all fields of the query and the signed query in CBOR encoding
1997    pub fn sign(self) -> Result<SignedQuery, AgentError> {
1998        let effective_canister_id = self.effective_canister_id;
1999        let identity = self.agent.identity.clone();
2000        let content = self.into_envelope()?;
2001        let signed_query = sign_envelope(&content, identity)?;
2002        let EnvelopeContent::Query {
2003            ingress_expiry,
2004            sender,
2005            canister_id,
2006            method_name,
2007            arg,
2008            nonce,
2009            sender_info,
2010        } = content
2011        else {
2012            unreachable!()
2013        };
2014        Ok(SignedQuery {
2015            ingress_expiry,
2016            sender,
2017            canister_id,
2018            method_name,
2019            arg,
2020            effective_canister_id,
2021            signed_query,
2022            nonce,
2023            sender_info,
2024        })
2025    }
2026
2027    /// Converts the query builder into [`EnvelopeContent`] for external signing or storage.
2028    pub fn into_envelope(self) -> Result<EnvelopeContent, AgentError> {
2029        self.agent.query_content(
2030            self.canister_id,
2031            self.method_name,
2032            self.arg,
2033            self.ingress_expiry_datetime,
2034            self.use_nonce,
2035        )
2036    }
2037}
2038
2039impl<'agent> IntoFuture for QueryBuilder<'agent> {
2040    type IntoFuture = AgentFuture<'agent, Vec<u8>>;
2041    type Output = Result<Vec<u8>, AgentError>;
2042    fn into_future(self) -> Self::IntoFuture {
2043        Box::pin(self.call())
2044    }
2045}
2046
2047/// An in-flight canister update call. Useful primarily as a `Future`.
2048pub struct UpdateCall<'agent> {
2049    agent: &'agent Agent,
2050    response_future: AgentFuture<'agent, CallResponse<(Vec<u8>, Certificate)>>,
2051    effective_canister_id: Principal,
2052    canister_id: Principal,
2053    method_name: String,
2054}
2055
2056impl fmt::Debug for UpdateCall<'_> {
2057    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2058        f.debug_struct("UpdateCall")
2059            .field("agent", &self.agent)
2060            .field("effective_canister_id", &self.effective_canister_id)
2061            .finish_non_exhaustive()
2062    }
2063}
2064
2065impl Future for UpdateCall<'_> {
2066    type Output = Result<CallResponse<(Vec<u8>, Certificate)>, AgentError>;
2067    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2068        self.response_future.as_mut().poll(cx)
2069    }
2070}
2071
2072impl<'a> UpdateCall<'a> {
2073    /// Waits for the update call to be completed, polling if necessary.
2074    pub async fn and_wait(self) -> Result<(Vec<u8>, Certificate), AgentError> {
2075        let response = self.response_future.await?;
2076
2077        match response {
2078            CallResponse::Response(response) => Ok(response),
2079            CallResponse::Poll(request_id) => {
2080                self.agent
2081                    .wait_inner(
2082                        &request_id,
2083                        EffectiveId::Canister(self.effective_canister_id),
2084                        Some(Operation::Call {
2085                            canister: self.canister_id,
2086                            method: self.method_name,
2087                        }),
2088                    )
2089                    .await
2090            }
2091        }
2092    }
2093}
2094/// An update request Builder.
2095///
2096/// This makes it easier to do update calls without actually passing all arguments or specifying
2097/// if you want to wait or not.
2098#[derive(Debug)]
2099pub struct UpdateBuilder<'agent> {
2100    agent: &'agent Agent,
2101    /// The [effective canister ID](https://internetcomputer.org/docs/current/references/ic-interface-spec#http-effective-canister-id) of the destination.
2102    pub effective_canister_id: Principal,
2103    /// The principal ID of the canister being called.
2104    pub canister_id: Principal,
2105    /// The name of the canister method being called.
2106    pub method_name: String,
2107    /// The argument blob to be passed to the method.
2108    pub arg: Vec<u8>,
2109    /// The Unix timestamp that the request will expire at.
2110    pub ingress_expiry_datetime: Option<u64>,
2111}
2112
2113impl<'agent> UpdateBuilder<'agent> {
2114    /// Creates a new update builder with an agent for a particular canister method.
2115    pub fn new(agent: &'agent Agent, canister_id: Principal, method_name: String) -> Self {
2116        Self {
2117            agent,
2118            effective_canister_id: canister_id,
2119            canister_id,
2120            method_name,
2121            arg: vec![],
2122            ingress_expiry_datetime: None,
2123        }
2124    }
2125
2126    /// Sets the [effective canister ID](https://internetcomputer.org/docs/current/references/ic-interface-spec#http-effective-canister-id) of the destination.
2127    pub fn with_effective_canister_id(mut self, canister_id: Principal) -> Self {
2128        self.effective_canister_id = canister_id;
2129        self
2130    }
2131
2132    /// Sets the argument blob to pass to the canister. For most canisters this should be a Candid-serialized tuple.
2133    pub fn with_arg<A: Into<Vec<u8>>>(mut self, arg: A) -> Self {
2134        self.arg = arg.into();
2135        self
2136    }
2137
2138    /// Sets `ingress_expiry_datetime` to the provided timestamp, at nanosecond precision.
2139    pub fn expire_at(mut self, time: impl Into<OffsetDateTime>) -> Self {
2140        self.ingress_expiry_datetime = Some(time.into().unix_timestamp_nanos() as u64);
2141        self
2142    }
2143
2144    /// Sets `ingress_expiry_datetime` to `min(now, 4min)`.
2145    pub fn expire_after(mut self, duration: Duration) -> Self {
2146        self.ingress_expiry_datetime = Some(
2147            OffsetDateTime::now_utc()
2148                .saturating_add(duration.try_into().expect("negative duration"))
2149                .unix_timestamp_nanos() as u64,
2150        );
2151        self
2152    }
2153
2154    /// Make an update call. This will call `request_status` on the `RequestId` in a loop and return
2155    /// the response as a byte vector.
2156    pub async fn call_and_wait(self) -> Result<Vec<u8>, AgentError> {
2157        self.call().and_wait().await.map(|x| x.0)
2158    }
2159
2160    /// Make an update call. This will return a `RequestId`.
2161    /// The `RequestId` should then be used for `request_status` (most likely in a loop).
2162    pub fn call(self) -> UpdateCall<'agent> {
2163        let method_name = self.method_name.clone();
2164        let response_future = async move {
2165            self.agent
2166                .update_raw(
2167                    self.canister_id,
2168                    self.effective_canister_id,
2169                    self.method_name,
2170                    self.arg,
2171                    self.ingress_expiry_datetime,
2172                )
2173                .await
2174        };
2175        UpdateCall {
2176            agent: self.agent,
2177            response_future: Box::pin(response_future),
2178            effective_canister_id: self.effective_canister_id,
2179            canister_id: self.canister_id,
2180            method_name,
2181        }
2182    }
2183
2184    /// Sign a update call. This will return a [`signed::SignedUpdate`]
2185    /// which contains all fields of the update and the signed update in CBOR encoding
2186    pub fn sign(self) -> Result<SignedUpdate, AgentError> {
2187        let identity = self.agent.identity.clone();
2188        let effective_canister_id = self.effective_canister_id;
2189        let content = self.into_envelope()?;
2190        let signed_update = sign_envelope(&content, identity)?;
2191        let request_id = to_request_id(&content)?;
2192        let EnvelopeContent::Call {
2193            nonce,
2194            ingress_expiry,
2195            sender,
2196            canister_id,
2197            method_name,
2198            arg,
2199            sender_info,
2200        } = content
2201        else {
2202            unreachable!()
2203        };
2204        Ok(SignedUpdate {
2205            nonce,
2206            ingress_expiry,
2207            sender,
2208            canister_id,
2209            method_name,
2210            arg,
2211            effective_canister_id,
2212            signed_update,
2213            request_id,
2214            sender_info,
2215        })
2216    }
2217
2218    /// Converts the update builder into an [`EnvelopeContent`] for external signing or storage.
2219    pub fn into_envelope(self) -> Result<EnvelopeContent, AgentError> {
2220        let nonce = self.agent.nonce_factory.generate();
2221        self.agent.update_content(
2222            self.canister_id,
2223            self.method_name,
2224            self.arg,
2225            self.ingress_expiry_datetime,
2226            nonce,
2227        )
2228    }
2229}
2230
2231impl<'agent> IntoFuture for UpdateBuilder<'agent> {
2232    type IntoFuture = AgentFuture<'agent, Vec<u8>>;
2233    type Output = Result<Vec<u8>, AgentError>;
2234    fn into_future(self) -> Self::IntoFuture {
2235        Box::pin(self.call_and_wait())
2236    }
2237}
2238
2239/// HTTP client middleware. Implemented automatically for `reqwest`-compatible by-ref `tower::Service`, such as `reqwest_middleware`.
2240#[cfg_attr(target_family = "wasm", async_trait(?Send))]
2241#[cfg_attr(not(target_family = "wasm"), async_trait)]
2242pub trait HttpService: Send + Sync + Debug {
2243    /// Perform a HTTP request. Any retry logic should call `req` again to get a new request.
2244    async fn call<'a>(
2245        &'a self,
2246        req: &'a (dyn Fn() -> Result<http::Request<Bytes>, AgentError> + Send + Sync),
2247        max_retries: usize,
2248        size_limit: Option<usize>,
2249    ) -> Result<http::Response<Bytes>, AgentError>;
2250}
2251
2252/// Convert from http Request to reqwest's one
2253fn from_http_request(req: http::Request<Bytes>) -> Result<Request, AgentError> {
2254    let (parts, body) = req.into_parts();
2255    let body = reqwest::Body::from(body);
2256    // I think it can never fail since it converts from `Url` to `Uri` and `Url` is a subset of `Uri`,
2257    // but just to be safe let's handle it.
2258    let request = http::Request::from_parts(parts, body)
2259        .try_into()
2260        .map_err(|e: reqwest::Error| AgentError::InvalidReplicaUrl(e.to_string()))?;
2261
2262    Ok(request)
2263}
2264
2265/// Convert from reqwests's Response to http one
2266#[cfg(not(target_family = "wasm"))]
2267async fn to_http_response(
2268    resp: Response,
2269    size_limit: Option<usize>,
2270) -> Result<http::Response<Bytes>, AgentError> {
2271    use http_body_util::{BodyExt, Limited};
2272
2273    let resp: http::Response<reqwest::Body> = resp.into();
2274    let (parts, body) = resp.into_parts();
2275    let body = Limited::new(body, size_limit.unwrap_or(usize::MAX));
2276    let body = body
2277        .collect()
2278        .await
2279        .map_err(|e| {
2280            AgentError::TransportError(TransportError::Generic(format!(
2281                "unable to read response body: {e:#}"
2282            )))
2283        })?
2284        .to_bytes();
2285    let resp = http::Response::from_parts(parts, body);
2286
2287    Ok(resp)
2288}
2289
2290/// Convert from reqwests's Response to http one.
2291/// WASM Response in reqwest doesn't have direct conversion for http::Response,
2292/// so we have to hack around using streams.
2293#[cfg(target_family = "wasm")]
2294async fn to_http_response(
2295    resp: Response,
2296    size_limit: Option<usize>,
2297) -> Result<http::Response<Bytes>, AgentError> {
2298    use futures_util::StreamExt;
2299    use http_body::Frame;
2300    use http_body_util::{Limited, StreamBody};
2301
2302    // Save headers
2303    let status = resp.status();
2304    let headers = resp.headers().clone();
2305
2306    // Convert body
2307    let stream = resp.bytes_stream().map(|x| x.map(Frame::data));
2308    let body = StreamBody::new(stream);
2309    let body = Limited::new(body, size_limit.unwrap_or(usize::MAX));
2310    let body = http_body_util::BodyExt::collect(body)
2311        .await
2312        .map_err(|e| {
2313            AgentError::TransportError(TransportError::Generic(format!(
2314                "unable to read response body: {e:#}"
2315            )))
2316        })?
2317        .to_bytes();
2318
2319    let mut resp = http::Response::new(body);
2320    *resp.status_mut() = status;
2321    *resp.headers_mut() = headers;
2322
2323    Ok(resp)
2324}
2325
2326#[cfg(not(target_family = "wasm"))]
2327#[async_trait]
2328impl<T> HttpService for T
2329where
2330    for<'a> &'a T: Service<Request, Response = Response, Error = reqwest::Error>,
2331    for<'a> <&'a Self as Service<Request>>::Future: Send,
2332    T: Send + Sync + Debug + ?Sized,
2333{
2334    #[allow(clippy::needless_arbitrary_self_type)]
2335    async fn call<'a>(
2336        mut self: &'a Self,
2337        req: &'a (dyn Fn() -> Result<http::Request<Bytes>, AgentError> + Send + Sync),
2338        max_retries: usize,
2339        size_limit: Option<usize>,
2340    ) -> Result<http::Response<Bytes>, AgentError> {
2341        let mut retry_count = 0;
2342        loop {
2343            let request = from_http_request(req()?)?;
2344
2345            match Service::call(&mut self, request).await {
2346                Err(err) => {
2347                    // Network-related errors can be retried.
2348                    if err.is_connect() {
2349                        if retry_count >= max_retries {
2350                            return Err(AgentError::TransportError(TransportError::Reqwest(err)));
2351                        }
2352                        retry_count += 1;
2353                    }
2354                    // All other errors return immediately.
2355                    else {
2356                        return Err(AgentError::TransportError(TransportError::Reqwest(err)));
2357                    }
2358                }
2359
2360                Ok(resp) => {
2361                    let resp = to_http_response(resp, size_limit).await?;
2362                    return Ok(resp);
2363                }
2364            }
2365        }
2366    }
2367}
2368
2369#[cfg(target_family = "wasm")]
2370#[async_trait(?Send)]
2371impl<T> HttpService for T
2372where
2373    for<'a> &'a T: Service<Request, Response = Response, Error = reqwest::Error>,
2374    T: Send + Sync + Debug + ?Sized,
2375{
2376    #[allow(clippy::needless_arbitrary_self_type)]
2377    async fn call<'a>(
2378        mut self: &'a Self,
2379        req: &'a (dyn Fn() -> Result<http::Request<Bytes>, AgentError> + Send + Sync),
2380        _retries: usize,
2381        _size_limit: Option<usize>,
2382    ) -> Result<http::Response<Bytes>, AgentError> {
2383        let request = from_http_request(req()?)?;
2384        let response = Service::call(&mut self, request)
2385            .await
2386            .map_err(|e| AgentError::TransportError(TransportError::Reqwest(e)))?;
2387
2388        to_http_response(response, _size_limit).await
2389    }
2390}
2391
2392#[derive(Debug)]
2393struct Retry429Logic {
2394    client: Client,
2395}
2396
2397#[cfg_attr(target_family = "wasm", async_trait(?Send))]
2398#[cfg_attr(not(target_family = "wasm"), async_trait)]
2399impl HttpService for Retry429Logic {
2400    async fn call<'a>(
2401        &'a self,
2402        req: &'a (dyn Fn() -> Result<http::Request<Bytes>, AgentError> + Send + Sync),
2403        _max_tcp_retries: usize,
2404        _size_limit: Option<usize>,
2405    ) -> Result<http::Response<Bytes>, AgentError> {
2406        let mut retries = 0;
2407        loop {
2408            #[cfg(not(target_family = "wasm"))]
2409            let resp = self.client.call(req, _max_tcp_retries, _size_limit).await?;
2410            // Client inconveniently does not implement Service on wasm
2411            #[cfg(target_family = "wasm")]
2412            let resp = {
2413                let request = from_http_request(req()?)?;
2414                let resp = self
2415                    .client
2416                    .execute(request)
2417                    .await
2418                    .map_err(|e| AgentError::TransportError(TransportError::Reqwest(e)))?;
2419
2420                to_http_response(resp, _size_limit).await?
2421            };
2422
2423            if resp.status() == StatusCode::TOO_MANY_REQUESTS {
2424                if retries == 6 {
2425                    break Ok(resp);
2426                } else {
2427                    retries += 1;
2428                    crate::util::sleep(Duration::from_millis(250)).await;
2429                    continue;
2430                }
2431            } else {
2432                break Ok(resp);
2433            }
2434        }
2435    }
2436}
2437
2438#[cfg(all(test, not(target_family = "wasm")))]
2439mod offline_tests {
2440    use super::*;
2441    use tokio::net::TcpListener;
2442    // Any tests that involve the network should go in agent_test, not here.
2443
2444    #[test]
2445    fn rounded_expiry() {
2446        let agent = Agent::builder()
2447            .with_url("http://not-a-real-url")
2448            .build()
2449            .unwrap();
2450        let mut prev_expiry = None;
2451        let mut num_timestamps = 0;
2452        for _ in 0..6 {
2453            let update = agent
2454                .update(&Principal::management_canister(), "not_a_method")
2455                .sign()
2456                .unwrap();
2457            if prev_expiry < Some(update.ingress_expiry) {
2458                prev_expiry = Some(update.ingress_expiry);
2459                num_timestamps += 1;
2460            }
2461        }
2462        // in six requests, there should be no more than two timestamps
2463        assert!(num_timestamps <= 2, "num_timestamps:{num_timestamps} > 2");
2464    }
2465
2466    #[tokio::test]
2467    async fn client_ratelimit() {
2468        let mock_server = TcpListener::bind("127.0.0.1:0").await.unwrap();
2469        let count = Arc::new(Mutex::new(0));
2470        let port = mock_server.local_addr().unwrap().port();
2471        tokio::spawn({
2472            let count = count.clone();
2473            async move {
2474                loop {
2475                    let (mut conn, _) = mock_server.accept().await.unwrap();
2476                    *count.lock().unwrap() += 1;
2477                    tokio::spawn(
2478                        // read all data, never reply
2479                        async move { tokio::io::copy(&mut conn, &mut tokio::io::sink()).await },
2480                    );
2481                }
2482            }
2483        });
2484        let agent = Agent::builder()
2485            .with_http_client(Client::builder().http1_only().build().unwrap())
2486            .with_url(format!("http://127.0.0.1:{port}"))
2487            .with_max_concurrent_requests(2)
2488            .build()
2489            .unwrap();
2490        for _ in 0..3 {
2491            let agent = agent.clone();
2492            tokio::spawn(async move {
2493                agent
2494                    .query(&"ryjl3-tyaaa-aaaaa-aaaba-cai".parse().unwrap(), "greet")
2495                    .call()
2496                    .await
2497            });
2498        }
2499        crate::util::sleep(Duration::from_millis(250)).await;
2500        assert_eq!(*count.lock().unwrap(), 2);
2501    }
2502}