Skip to main content

ic_agent/agent/
mod.rs

1//! The main Agent module. Contains the [Agent] type and all associated structures.
2pub(crate) mod agent_config;
3pub mod agent_error;
4pub(crate) mod builder;
5// delete this module after 0.40
6#[doc(hidden)]
7#[deprecated(since = "0.38.0", note = "use the AgentBuilder methods")]
8pub mod http_transport;
9pub(crate) mod nonce;
10pub(crate) mod response_authentication;
11pub mod route_provider;
12pub mod status;
13pub mod subnet;
14
15pub use agent_config::AgentConfig;
16pub use agent_error::AgentError;
17use agent_error::{HttpErrorPayload, Operation};
18use async_lock::Semaphore;
19use async_trait::async_trait;
20pub use builder::AgentBuilder;
21use bytes::Bytes;
22use cached::{Cached, TimedCache};
23use http::{header::CONTENT_TYPE, HeaderMap, Method, StatusCode, Uri};
24use ic_ed25519::{PublicKey, SignatureError};
25#[doc(inline)]
26pub use ic_transport_types::{
27    signed, CallResponse, Envelope, EnvelopeContent, RejectCode, RejectResponse, ReplyResponse,
28    RequestStatusResponse,
29};
30pub use nonce::{NonceFactory, NonceGenerator};
31use rangemap::RangeInclusiveMap;
32use reqwest::{Client, Request, Response};
33use route_provider::{
34    dynamic_routing::{dynamic_route_provider::DynamicRouteProviderBuilder, node::Node},
35    RouteProvider, UrlUntilReady,
36};
37pub use subnet::{Subnet, SubnetType};
38use time::OffsetDateTime;
39use tower_service::Service;
40
41#[cfg(test)]
42mod agent_test;
43
44use crate::{
45    agent::response_authentication::{
46        extract_der, lookup_canister_info, lookup_canister_metadata, lookup_canister_ranges,
47        lookup_incomplete_subnet, lookup_request_status, lookup_subnet_and_ranges,
48        lookup_subnet_canister_ranges, lookup_subnet_metrics, lookup_time, lookup_tree,
49        lookup_value,
50    },
51    agent_error::TransportError,
52    export::Principal,
53    identity::Identity,
54    to_request_id, RequestId,
55};
56use backoff::{backoff::Backoff, ExponentialBackoffBuilder};
57use backoff::{exponential::ExponentialBackoff, SystemClock};
58use ic_certification::{Certificate, Delegation, Label};
59use ic_transport_types::{
60    signed::{SignedQuery, SignedRequestStatus, SignedUpdate},
61    QueryResponse, ReadStateResponse, SubnetMetrics, TransportCallResponse,
62};
63use serde::Serialize;
64use status::Status;
65use std::{
66    borrow::Cow,
67    convert::TryFrom,
68    fmt::{self, Debug},
69    future::{Future, IntoFuture},
70    pin::Pin,
71    str::FromStr,
72    sync::{Arc, Mutex, RwLock},
73    task::{Context, Poll},
74    time::Duration,
75};
76
77use crate::agent::response_authentication::lookup_api_boundary_nodes;
78
79pub(crate) const IC_STATE_ROOT_DOMAIN_SEPARATOR: &[u8; 14] = b"\x0Dic-state-root";
80
81pub(crate) const IC_ROOT_KEY: &[u8; 133] = b"\x30\x81\x82\x30\x1d\x06\x0d\x2b\x06\x01\x04\x01\x82\xdc\x7c\x05\x03\x01\x02\x01\x06\x0c\x2b\x06\x01\x04\x01\x82\xdc\x7c\x05\x03\x02\x01\x03\x61\x00\x81\x4c\x0e\x6e\xc7\x1f\xab\x58\x3b\x08\xbd\x81\x37\x3c\x25\x5c\x3c\x37\x1b\x2e\x84\x86\x3c\x98\xa4\xf1\xe0\x8b\x74\x23\x5d\x14\xfb\x5d\x9c\x0c\xd5\x46\xd9\x68\x5f\x91\x3a\x0c\x0b\x2c\xc5\x34\x15\x83\xbf\x4b\x43\x92\xe4\x67\xdb\x96\xd6\x5b\x9b\xb4\xcb\x71\x71\x12\xf8\x47\x2e\x0d\x5a\x4d\x14\x50\x5f\xfd\x74\x84\xb0\x12\x91\x09\x1c\x5f\x87\xb9\x88\x83\x46\x3f\x98\x09\x1a\x0b\xaa\xae";
82
83#[cfg(not(target_family = "wasm"))]
84type AgentFuture<'a, V> = Pin<Box<dyn Future<Output = Result<V, AgentError>> + Send + 'a>>;
85
86#[cfg(target_family = "wasm")]
87type AgentFuture<'a, V> = Pin<Box<dyn Future<Output = Result<V, AgentError>> + 'a>>;
88
89/// A low level Agent to make calls to a Replica endpoint.
90///
91#[cfg_attr(unix, doc = " ```rust")] // pocket-ic
92#[cfg_attr(not(unix), doc = " ```ignore")]
93/// use ic_agent::{Agent, export::Principal};
94/// use candid::{Encode, Decode, CandidType, Nat};
95/// use serde::Deserialize;
96///
97/// #[derive(CandidType)]
98/// struct Argument {
99///   amount: Option<Nat>,
100/// }
101///
102/// #[derive(CandidType, Deserialize)]
103/// struct CreateCanisterResult {
104///   canister_id: Principal,
105/// }
106///
107/// # fn create_identity() -> impl ic_agent::Identity {
108/// #     // In real code, the raw key should be either read from a pem file or generated with randomness.
109/// #     ic_agent::identity::BasicIdentity::from_raw_key(&[0u8;32])
110/// # }
111/// #
112/// async fn create_a_canister() -> Result<Principal, Box<dyn std::error::Error>> {
113/// # Ok(ref_tests::utils::with_pic(async move |pic| {
114/// # let url = ref_tests::utils::get_pic_url(&pic);
115///   let agent = Agent::builder()
116///     .with_url(url)
117///     .with_identity(create_identity())
118///     .build()?;
119///
120///   // Only do the following call when not contacting the IC main net (e.g. a local emulator).
121///   // This is important as the main net public key is static and a rogue network could return
122///   // a different key.
123///   // If you know the root key ahead of time, you can use `agent.set_root_key(root_key);`.
124///   agent.fetch_root_key().await?;
125///   let management_canister_id = Principal::from_text("aaaaa-aa")?;
126///
127///   // Create a call to the management canister to create a new canister ID, and wait for a result.
128///   // This API only works in local instances; mainnet instances must use the cycles ledger.
129///   // See `dfx info default-effective-canister-id`.
130/// # let effective_canister_id = ref_tests::utils::get_effective_canister_id(&pic).await;
131///   let response = agent.update(&management_canister_id, "provisional_create_canister_with_cycles")
132///     .with_effective_canister_id(effective_canister_id)
133///     .with_arg(Encode!(&Argument { amount: None })?)
134///     .await?;
135///
136///   let result = Decode!(response.as_slice(), CreateCanisterResult)?;
137///   let canister_id: Principal = Principal::from_text(&result.canister_id.to_text())?;
138///   Ok(canister_id)
139/// # }).await)
140/// }
141///
142/// # let mut runtime = tokio::runtime::Runtime::new().unwrap();
143/// # runtime.block_on(async {
144/// let canister_id = create_a_canister().await.unwrap();
145/// eprintln!("{}", canister_id);
146/// # });
147/// ```
148///
149/// This agent does not understand Candid, and only acts on byte buffers.
150///
151/// Some methods return certificates. While there is a `verify_certificate` method, any certificate
152/// you receive from a method has already been verified and you do not need to manually verify it.
153#[derive(Clone)]
154pub struct Agent {
155    nonce_factory: Arc<dyn NonceGenerator>,
156    identity: Arc<dyn Identity>,
157    ingress_expiry: Duration,
158    root_key: Arc<RwLock<Vec<u8>>>,
159    client: Arc<dyn HttpService>,
160    route_provider: Arc<dyn RouteProvider>,
161    subnet_key_cache: Arc<Mutex<SubnetCache>>,
162    concurrent_requests_semaphore: Arc<Semaphore>,
163    verify_query_signatures: bool,
164    max_response_body_size: Option<usize>,
165    max_polling_time: Duration,
166    #[allow(dead_code)]
167    max_tcp_error_retries: usize,
168}
169
170impl fmt::Debug for Agent {
171    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
172        f.debug_struct("Agent")
173            .field("ingress_expiry", &self.ingress_expiry)
174            .finish_non_exhaustive()
175    }
176}
177
178impl Agent {
179    /// Create an instance of an [`AgentBuilder`] for building an [`Agent`]. This is simpler than
180    /// using the [`AgentConfig`] and [`Agent::new()`].
181    pub fn builder() -> builder::AgentBuilder {
182        Default::default()
183    }
184
185    /// Create an instance of an [`Agent`].
186    pub fn new(config: agent_config::AgentConfig) -> Result<Agent, AgentError> {
187        let client = config.http_service.unwrap_or_else(|| {
188            Arc::new(Retry429Logic {
189                client: config.client.unwrap_or_else(|| {
190                    #[cfg(not(target_family = "wasm"))]
191                    {
192                        Client::builder()
193                            .use_rustls_tls()
194                            .timeout(Duration::from_secs(360))
195                            .build()
196                            .expect("Could not create HTTP client.")
197                    }
198                    #[cfg(all(target_family = "wasm", feature = "wasm-bindgen"))]
199                    {
200                        Client::new()
201                    }
202                }),
203            })
204        });
205        Ok(Agent {
206            nonce_factory: config.nonce_factory,
207            identity: config.identity,
208            ingress_expiry: config.ingress_expiry,
209            root_key: Arc::new(RwLock::new(IC_ROOT_KEY.to_vec())),
210            client: client.clone(),
211            route_provider: if let Some(route_provider) = config.route_provider {
212                route_provider
213            } else if let Some(url) = config.url {
214                if config.background_dynamic_routing {
215                    assert!(
216                        url.scheme() == "https" && url.path() == "/" && url.port().is_none() && url.domain().is_some(),
217                        "in dynamic routing mode, URL must be in the exact form https://domain with no path, port, IP, or non-HTTPS scheme"
218                    );
219                    let seeds = vec![Node::new(url.domain().unwrap()).unwrap()];
220                    UrlUntilReady::new(url, async move {
221                        let provider =
222                            DynamicRouteProviderBuilder::new(seeds, client, None).build();
223                        provider.start().await;
224                        provider
225                    }) as Arc<dyn RouteProvider>
226                } else {
227                    Arc::new(url)
228                }
229            } else {
230                panic!("either route_provider or url must be specified");
231            },
232            subnet_key_cache: Arc::new(Mutex::new(SubnetCache::new())),
233            verify_query_signatures: config.verify_query_signatures,
234            concurrent_requests_semaphore: Arc::new(Semaphore::new(config.max_concurrent_requests)),
235            max_response_body_size: config.max_response_body_size,
236            max_tcp_error_retries: config.max_tcp_error_retries,
237            max_polling_time: config.max_polling_time,
238        })
239    }
240
241    /// Set the identity provider for signing messages.
242    ///
243    /// NOTE: if you change the identity while having update calls in
244    /// flight, you will not be able to [`Agent::request_status_raw`] the status of these
245    /// messages.
246    pub fn set_identity<I>(&mut self, identity: I)
247    where
248        I: 'static + Identity,
249    {
250        self.identity = Arc::new(identity);
251    }
252    /// Set the arc identity provider for signing messages.
253    ///
254    /// NOTE: if you change the identity while having update calls in
255    /// flight, you will not be able to [`Agent::request_status_raw`] the status of these
256    /// messages.
257    pub fn set_arc_identity(&mut self, identity: Arc<dyn Identity>) {
258        self.identity = identity;
259    }
260
261    /// By default, the agent is configured to talk to the main Internet Computer, and verifies
262    /// responses using a hard-coded public key.
263    ///
264    /// This function will instruct the agent to ask the endpoint for its public key, and use
265    /// that instead. This is required when talking to a local test instance, for example.
266    ///
267    /// *Only use this when you are  _not_ talking to the main Internet Computer, otherwise
268    /// you are prone to man-in-the-middle attacks! Do not call this function by default.*
269    pub async fn fetch_root_key(&self) -> Result<(), AgentError> {
270        if self.read_root_key()[..] != IC_ROOT_KEY[..] {
271            // already fetched the root key
272            return Ok(());
273        }
274        let status = self.status().await?;
275        let Some(root_key) = status.root_key else {
276            return Err(AgentError::NoRootKeyInStatus(status));
277        };
278        self.set_root_key(root_key);
279        Ok(())
280    }
281
282    /// By default, the agent is configured to talk to the main Internet Computer, and verifies
283    /// responses using a hard-coded public key.
284    ///
285    /// Using this function you can set the root key to a known one if you know if beforehand.
286    pub fn set_root_key(&self, root_key: Vec<u8>) {
287        *self.root_key.write().unwrap() = root_key;
288    }
289
290    /// Return the root key currently in use.
291    pub fn read_root_key(&self) -> Vec<u8> {
292        self.root_key.read().unwrap().clone()
293    }
294
295    fn get_expiry_date(&self) -> u64 {
296        let expiry_raw = OffsetDateTime::now_utc() + self.ingress_expiry;
297        let mut rounded = expiry_raw.replace_nanosecond(0).unwrap();
298        if self.ingress_expiry.as_secs() > 90 {
299            rounded = rounded.replace_second(0).unwrap();
300        }
301        rounded.unix_timestamp_nanos().try_into().unwrap()
302    }
303
304    /// Return the principal of the identity.
305    pub fn get_principal(&self) -> Result<Principal, String> {
306        self.identity.sender()
307    }
308
309    async fn query_endpoint<A>(
310        &self,
311        effective_canister_id: Principal,
312        serialized_bytes: Vec<u8>,
313    ) -> Result<A, AgentError>
314    where
315        A: serde::de::DeserializeOwned,
316    {
317        let _permit = self.concurrent_requests_semaphore.acquire().await;
318        let bytes = self
319            .execute(
320                Method::POST,
321                &format!("api/v3/canister/{}/query", effective_canister_id.to_text()),
322                Some(serialized_bytes),
323            )
324            .await?
325            .1;
326        serde_cbor::from_slice(&bytes).map_err(AgentError::InvalidCborData)
327    }
328
329    async fn read_state_endpoint<A>(
330        &self,
331        effective_canister_id: Principal,
332        serialized_bytes: Vec<u8>,
333    ) -> Result<A, AgentError>
334    where
335        A: serde::de::DeserializeOwned,
336    {
337        let _permit = self.concurrent_requests_semaphore.acquire().await;
338        let endpoint = format!(
339            "api/v3/canister/{}/read_state",
340            effective_canister_id.to_text()
341        );
342        let bytes = self
343            .execute(Method::POST, &endpoint, Some(serialized_bytes))
344            .await?
345            .1;
346        serde_cbor::from_slice(&bytes).map_err(AgentError::InvalidCborData)
347    }
348
349    async fn read_subnet_state_endpoint<A>(
350        &self,
351        subnet_id: Principal,
352        serialized_bytes: Vec<u8>,
353    ) -> Result<A, AgentError>
354    where
355        A: serde::de::DeserializeOwned,
356    {
357        let _permit = self.concurrent_requests_semaphore.acquire().await;
358        let endpoint = format!("api/v3/subnet/{}/read_state", subnet_id.to_text());
359        let bytes = self
360            .execute(Method::POST, &endpoint, Some(serialized_bytes))
361            .await?
362            .1;
363        serde_cbor::from_slice(&bytes).map_err(AgentError::InvalidCborData)
364    }
365
366    async fn call_endpoint(
367        &self,
368        effective_canister_id: Principal,
369        serialized_bytes: Vec<u8>,
370    ) -> Result<TransportCallResponse, AgentError> {
371        let _permit = self.concurrent_requests_semaphore.acquire().await;
372        let endpoint = format!("api/v4/canister/{}/call", effective_canister_id.to_text());
373        let (status_code, response_body) = self
374            .execute(Method::POST, &endpoint, Some(serialized_bytes))
375            .await?;
376
377        if status_code == StatusCode::ACCEPTED {
378            return Ok(TransportCallResponse::Accepted);
379        }
380
381        serde_cbor::from_slice(&response_body).map_err(AgentError::InvalidCborData)
382    }
383
384    /// The simplest way to do a query call; sends a byte array and will return a byte vector.
385    /// The encoding is left as an exercise to the user.
386    #[allow(clippy::too_many_arguments)]
387    async fn query_raw(
388        &self,
389        canister_id: Principal,
390        effective_canister_id: Principal,
391        method_name: String,
392        arg: Vec<u8>,
393        ingress_expiry_datetime: Option<u64>,
394        use_nonce: bool,
395        explicit_verify_query_signatures: Option<bool>,
396    ) -> Result<Vec<u8>, AgentError> {
397        let operation = Operation::Call {
398            canister: canister_id,
399            method: method_name.clone(),
400        };
401        let content = self.query_content(
402            canister_id,
403            method_name,
404            arg,
405            ingress_expiry_datetime,
406            use_nonce,
407        )?;
408        let serialized_bytes = sign_envelope(&content, self.identity.clone())?;
409        self.query_inner(
410            effective_canister_id,
411            serialized_bytes,
412            content.to_request_id(),
413            explicit_verify_query_signatures,
414            operation,
415        )
416        .await
417    }
418
419    /// Send the signed query to the network. Will return a byte vector.
420    /// The bytes will be checked if it is a valid query.
421    /// If you want to inspect the fields of the query call, use [`signed_query_inspect`] before calling this method.
422    pub async fn query_signed(
423        &self,
424        effective_canister_id: Principal,
425        signed_query: Vec<u8>,
426    ) -> Result<Vec<u8>, AgentError> {
427        let envelope: Envelope =
428            serde_cbor::from_slice(&signed_query).map_err(AgentError::InvalidCborData)?;
429        let EnvelopeContent::Query {
430            canister_id,
431            method_name,
432            ..
433        } = &*envelope.content
434        else {
435            return Err(AgentError::CallDataMismatch {
436                field: "request_type".to_string(),
437                value_arg: "query".to_string(),
438                value_cbor: if matches!(*envelope.content, EnvelopeContent::Call { .. }) {
439                    "update"
440                } else {
441                    "read_state"
442                }
443                .to_string(),
444            });
445        };
446        let operation = Operation::Call {
447            canister: *canister_id,
448            method: method_name.clone(),
449        };
450        self.query_inner(
451            effective_canister_id,
452            signed_query,
453            envelope.content.to_request_id(),
454            None,
455            operation,
456        )
457        .await
458    }
459
460    /// Helper function for performing both the query call and possibly a `read_state` to check the subnet node keys.
461    ///
462    /// This should be used instead of `query_endpoint`. No validation is performed on `signed_query`.
463    async fn query_inner(
464        &self,
465        effective_canister_id: Principal,
466        signed_query: Vec<u8>,
467        request_id: RequestId,
468        explicit_verify_query_signatures: Option<bool>,
469        operation: Operation,
470    ) -> Result<Vec<u8>, AgentError> {
471        let response = if explicit_verify_query_signatures.unwrap_or(self.verify_query_signatures) {
472            let (response, mut subnet) = futures_util::try_join!(
473                self.query_endpoint::<QueryResponse>(effective_canister_id, signed_query),
474                self.get_subnet_by_canister(&effective_canister_id)
475            )?;
476            if response.signatures().is_empty() {
477                return Err(AgentError::MissingSignature);
478            } else if response.signatures().len() > subnet.node_keys.len() {
479                return Err(AgentError::TooManySignatures {
480                    had: response.signatures().len(),
481                    needed: subnet.node_keys.len(),
482                });
483            }
484            for signature in response.signatures() {
485                if OffsetDateTime::now_utc()
486                    - OffsetDateTime::from_unix_timestamp_nanos(signature.timestamp.into()).unwrap()
487                    > self.ingress_expiry
488                {
489                    return Err(AgentError::CertificateOutdated(self.ingress_expiry));
490                }
491                let signable = response.signable(request_id, signature.timestamp);
492                let node_key = if let Some(node_key) = subnet.node_keys.get(&signature.identity) {
493                    node_key
494                } else {
495                    subnet = self
496                        .fetch_subnet_by_canister(&effective_canister_id)
497                        .await?;
498                    subnet
499                        .node_keys
500                        .get(&signature.identity)
501                        .ok_or(AgentError::CertificateNotAuthorized())?
502                };
503                if node_key.len() != 44 {
504                    return Err(AgentError::DerKeyLengthMismatch {
505                        expected: 44,
506                        actual: node_key.len(),
507                    });
508                }
509                const DER_PREFIX: [u8; 12] = [48, 42, 48, 5, 6, 3, 43, 101, 112, 3, 33, 0];
510                if node_key[..12] != DER_PREFIX {
511                    return Err(AgentError::DerPrefixMismatch {
512                        expected: DER_PREFIX.to_vec(),
513                        actual: node_key[..12].to_vec(),
514                    });
515                }
516                let pubkey = PublicKey::deserialize_raw(&node_key[12..])
517                    .map_err(|_| AgentError::MalformedPublicKey)?;
518
519                match pubkey.verify_signature(&signable, &signature.signature[..]) {
520                    Ok(()) => (),
521                    Err(SignatureError::InvalidSignature) => {
522                        return Err(AgentError::QuerySignatureVerificationFailed)
523                    }
524                    Err(SignatureError::InvalidLength) => {
525                        return Err(AgentError::MalformedSignature)
526                    }
527                    _ => unreachable!(),
528                }
529            }
530            response
531        } else {
532            self.query_endpoint::<QueryResponse>(effective_canister_id, signed_query)
533                .await?
534        };
535
536        match response {
537            QueryResponse::Replied { reply, .. } => Ok(reply.arg),
538            QueryResponse::Rejected { reject, .. } => Err(AgentError::UncertifiedReject {
539                reject,
540                operation: Some(operation),
541            }),
542        }
543    }
544
545    fn query_content(
546        &self,
547        canister_id: Principal,
548        method_name: String,
549        arg: Vec<u8>,
550        ingress_expiry_datetime: Option<u64>,
551        use_nonce: bool,
552    ) -> Result<EnvelopeContent, AgentError> {
553        Ok(EnvelopeContent::Query {
554            sender: self.identity.sender().map_err(AgentError::SigningError)?,
555            canister_id,
556            method_name,
557            arg,
558            ingress_expiry: ingress_expiry_datetime.unwrap_or_else(|| self.get_expiry_date()),
559            nonce: use_nonce.then(|| self.nonce_factory.generate()).flatten(),
560            sender_info: self.identity.sender_info(),
561        })
562    }
563
564    /// The simplest way to do an update call; sends a byte array and will return a response, [`CallResponse`], from the replica.
565    async fn update_raw(
566        &self,
567        canister_id: Principal,
568        effective_canister_id: Principal,
569        method_name: String,
570        arg: Vec<u8>,
571        ingress_expiry_datetime: Option<u64>,
572    ) -> Result<CallResponse<(Vec<u8>, Certificate)>, AgentError> {
573        let nonce = self.nonce_factory.generate();
574        let content = self.update_content(
575            canister_id,
576            method_name.clone(),
577            arg,
578            ingress_expiry_datetime,
579            nonce,
580        )?;
581        let operation = Some(Operation::Call {
582            canister: canister_id,
583            method: method_name,
584        });
585        let request_id = to_request_id(&content)?;
586        let serialized_bytes = sign_envelope(&content, self.identity.clone())?;
587
588        let response_body = self
589            .call_endpoint(effective_canister_id, serialized_bytes)
590            .await?;
591
592        match response_body {
593            TransportCallResponse::Replied { certificate } => {
594                let certificate =
595                    serde_cbor::from_slice(&certificate).map_err(AgentError::InvalidCborData)?;
596
597                self.verify(&certificate, effective_canister_id)?;
598                let status = lookup_request_status(&certificate, &request_id)?;
599
600                match status {
601                    RequestStatusResponse::Replied(reply) => {
602                        Ok(CallResponse::Response((reply.arg, certificate)))
603                    }
604                    RequestStatusResponse::Rejected(reject_response) => {
605                        Err(AgentError::CertifiedReject {
606                            reject: reject_response,
607                            operation,
608                        })?
609                    }
610                    _ => Ok(CallResponse::Poll(request_id)),
611                }
612            }
613            TransportCallResponse::Accepted => Ok(CallResponse::Poll(request_id)),
614            TransportCallResponse::NonReplicatedRejection(reject_response) => {
615                Err(AgentError::UncertifiedReject {
616                    reject: reject_response,
617                    operation,
618                })
619            }
620        }
621    }
622
623    /// Send the signed update to the network. Will return a [`CallResponse<Vec<u8>>`].
624    /// The bytes will be checked to verify that it is a valid update.
625    /// If you want to inspect the fields of the update, use [`signed_update_inspect`] before calling this method.
626    pub async fn update_signed(
627        &self,
628        effective_canister_id: Principal,
629        signed_update: Vec<u8>,
630    ) -> Result<CallResponse<Vec<u8>>, AgentError> {
631        let envelope: Envelope =
632            serde_cbor::from_slice(&signed_update).map_err(AgentError::InvalidCborData)?;
633        let EnvelopeContent::Call {
634            canister_id,
635            method_name,
636            ..
637        } = &*envelope.content
638        else {
639            return Err(AgentError::CallDataMismatch {
640                field: "request_type".to_string(),
641                value_arg: "update".to_string(),
642                value_cbor: if matches!(*envelope.content, EnvelopeContent::Query { .. }) {
643                    "query"
644                } else {
645                    "read_state"
646                }
647                .to_string(),
648            });
649        };
650        let operation = Some(Operation::Call {
651            canister: *canister_id,
652            method: method_name.clone(),
653        });
654        let request_id = to_request_id(&envelope.content)?;
655
656        let response_body = self
657            .call_endpoint(effective_canister_id, signed_update)
658            .await?;
659
660        match response_body {
661            TransportCallResponse::Replied { certificate } => {
662                let certificate =
663                    serde_cbor::from_slice(&certificate).map_err(AgentError::InvalidCborData)?;
664
665                self.verify(&certificate, effective_canister_id)?;
666                let status = lookup_request_status(&certificate, &request_id)?;
667
668                match status {
669                    RequestStatusResponse::Replied(reply) => Ok(CallResponse::Response(reply.arg)),
670                    RequestStatusResponse::Rejected(reject_response) => {
671                        Err(AgentError::CertifiedReject {
672                            reject: reject_response,
673                            operation,
674                        })?
675                    }
676                    _ => Ok(CallResponse::Poll(request_id)),
677                }
678            }
679            TransportCallResponse::Accepted => Ok(CallResponse::Poll(request_id)),
680            TransportCallResponse::NonReplicatedRejection(reject_response) => {
681                Err(AgentError::UncertifiedReject {
682                    reject: reject_response,
683                    operation,
684                })
685            }
686        }
687    }
688
689    fn update_content(
690        &self,
691        canister_id: Principal,
692        method_name: String,
693        arg: Vec<u8>,
694        ingress_expiry_datetime: Option<u64>,
695        nonce: Option<Vec<u8>>,
696    ) -> Result<EnvelopeContent, AgentError> {
697        Ok(EnvelopeContent::Call {
698            canister_id,
699            method_name,
700            arg,
701            nonce,
702            sender: self.identity.sender().map_err(AgentError::SigningError)?,
703            ingress_expiry: ingress_expiry_datetime.unwrap_or_else(|| self.get_expiry_date()),
704            sender_info: self.identity.sender_info(),
705        })
706    }
707
708    fn get_retry_policy(&self) -> ExponentialBackoff<SystemClock> {
709        ExponentialBackoffBuilder::new()
710            .with_initial_interval(Duration::from_millis(500))
711            .with_max_interval(Duration::from_secs(1))
712            .with_multiplier(1.4)
713            .with_max_elapsed_time(Some(self.max_polling_time))
714            .build()
715    }
716
717    /// Wait for `request_status` to return a Replied response and return the arg.
718    pub async fn wait_signed(
719        &self,
720        request_id: &RequestId,
721        effective_canister_id: Principal,
722        signed_request_status: Vec<u8>,
723    ) -> Result<(Vec<u8>, Certificate), AgentError> {
724        let mut retry_policy = self.get_retry_policy();
725
726        let mut request_accepted = false;
727        loop {
728            let (resp, cert) = self
729                .request_status_signed(
730                    request_id,
731                    effective_canister_id,
732                    signed_request_status.clone(),
733                )
734                .await?;
735            match resp {
736                RequestStatusResponse::Unknown => {
737                    // If status is still `Unknown` after 5 minutes, the ingress message is lost.
738                    if retry_policy.get_elapsed_time() > Duration::from_secs(5 * 60) {
739                        return Err(AgentError::TimeoutWaitingForResponse());
740                    }
741                }
742
743                RequestStatusResponse::Received | RequestStatusResponse::Processing => {
744                    if !request_accepted {
745                        retry_policy.reset();
746                        request_accepted = true;
747                    }
748                }
749
750                RequestStatusResponse::Replied(ReplyResponse { arg, .. }) => {
751                    return Ok((arg, cert))
752                }
753
754                RequestStatusResponse::Rejected(response) => {
755                    return Err(AgentError::CertifiedReject {
756                        reject: response,
757                        operation: None,
758                    })
759                }
760
761                RequestStatusResponse::Done => {
762                    return Err(AgentError::RequestStatusDoneNoReply(String::from(
763                        *request_id,
764                    )))
765                }
766            };
767
768            match retry_policy.next_backoff() {
769                Some(duration) => crate::util::sleep(duration).await,
770
771                None => return Err(AgentError::TimeoutWaitingForResponse()),
772            }
773        }
774    }
775
776    /// Call `request_status` on the `RequestId` in a loop and return the response as a byte vector.
777    pub async fn wait(
778        &self,
779        request_id: &RequestId,
780        effective_canister_id: Principal,
781    ) -> Result<(Vec<u8>, Certificate), AgentError> {
782        self.wait_inner(request_id, effective_canister_id, None)
783            .await
784    }
785
786    async fn wait_inner(
787        &self,
788        request_id: &RequestId,
789        effective_canister_id: Principal,
790        operation: Option<Operation>,
791    ) -> Result<(Vec<u8>, Certificate), AgentError> {
792        let mut retry_policy = self.get_retry_policy();
793
794        let mut request_accepted = false;
795        loop {
796            let (resp, cert) = self
797                .request_status_raw(request_id, effective_canister_id)
798                .await?;
799            match resp {
800                RequestStatusResponse::Unknown => {
801                    // If status is still `Unknown` after 5 minutes, the ingress message is lost.
802                    if retry_policy.get_elapsed_time() > Duration::from_secs(5 * 60) {
803                        return Err(AgentError::TimeoutWaitingForResponse());
804                    }
805                }
806
807                RequestStatusResponse::Received | RequestStatusResponse::Processing => {
808                    if !request_accepted {
809                        // The system will return RequestStatusResponse::Unknown
810                        // until the request is accepted
811                        // and we generally cannot know how long that will take.
812                        // State transitions between Received and Processing may be
813                        // instantaneous. Therefore, once we know the request is accepted,
814                        // we should restart the backoff so the request does not time out.
815
816                        retry_policy.reset();
817                        request_accepted = true;
818                    }
819                }
820
821                RequestStatusResponse::Replied(ReplyResponse { arg, .. }) => {
822                    return Ok((arg, cert))
823                }
824
825                RequestStatusResponse::Rejected(response) => {
826                    return Err(AgentError::CertifiedReject {
827                        reject: response,
828                        operation,
829                    })
830                }
831
832                RequestStatusResponse::Done => {
833                    return Err(AgentError::RequestStatusDoneNoReply(String::from(
834                        *request_id,
835                    )))
836                }
837            };
838
839            match retry_policy.next_backoff() {
840                Some(duration) => crate::util::sleep(duration).await,
841
842                None => return Err(AgentError::TimeoutWaitingForResponse()),
843            }
844        }
845    }
846
847    /// Request the raw state tree directly, under an effective canister ID.
848    /// See [the protocol docs](https://internetcomputer.org/docs/current/references/ic-interface-spec#http-read-state) for more information.
849    pub async fn read_state_raw(
850        &self,
851        paths: Vec<Vec<Label>>,
852        effective_canister_id: Principal,
853    ) -> Result<Certificate, AgentError> {
854        let content = self.read_state_content(paths)?;
855        let serialized_bytes = sign_envelope(&content, self.identity.clone())?;
856
857        let read_state_response: ReadStateResponse = self
858            .read_state_endpoint(effective_canister_id, serialized_bytes)
859            .await?;
860        let cert: Certificate = serde_cbor::from_slice(&read_state_response.certificate)
861            .map_err(AgentError::InvalidCborData)?;
862        self.verify(&cert, effective_canister_id)?;
863        Ok(cert)
864    }
865
866    /// Request the raw state tree directly, under a subnet ID.
867    /// See [the protocol docs](https://internetcomputer.org/docs/current/references/ic-interface-spec#http-read-state) for more information.
868    pub async fn read_subnet_state_raw(
869        &self,
870        paths: Vec<Vec<Label>>,
871        subnet_id: Principal,
872    ) -> Result<Certificate, AgentError> {
873        let content = self.read_state_content(paths)?;
874        let serialized_bytes = sign_envelope(&content, self.identity.clone())?;
875
876        let read_state_response: ReadStateResponse = self
877            .read_subnet_state_endpoint(subnet_id, serialized_bytes)
878            .await?;
879        let cert: Certificate = serde_cbor::from_slice(&read_state_response.certificate)
880            .map_err(AgentError::InvalidCborData)?;
881        self.verify_for_subnet(&cert, subnet_id)?;
882        Ok(cert)
883    }
884
885    fn read_state_content(&self, paths: Vec<Vec<Label>>) -> Result<EnvelopeContent, AgentError> {
886        Ok(EnvelopeContent::ReadState {
887            sender: self.identity.sender().map_err(AgentError::SigningError)?,
888            paths,
889            ingress_expiry: self.get_expiry_date(),
890        })
891    }
892
893    /// Verify a certificate, checking delegation if present.
894    /// Only passes if the certificate also has authority over the canister.
895    pub fn verify(
896        &self,
897        cert: &Certificate,
898        effective_canister_id: Principal,
899    ) -> Result<(), AgentError> {
900        self.verify_cert(cert, effective_canister_id)?;
901        self.verify_cert_timestamp(cert)?;
902        Ok(())
903    }
904
905    fn verify_cert(
906        &self,
907        cert: &Certificate,
908        effective_canister_id: Principal,
909    ) -> Result<(), AgentError> {
910        let sig = &cert.signature;
911
912        let root_hash = cert.tree.digest();
913        let mut msg = vec![];
914        msg.extend_from_slice(IC_STATE_ROOT_DOMAIN_SEPARATOR);
915        msg.extend_from_slice(&root_hash);
916
917        let der_key = self.check_delegation(&cert.delegation, effective_canister_id)?;
918        let key = extract_der(der_key)?;
919
920        ic_verify_bls_signature::verify_bls_signature(sig, &msg, &key)
921            .map_err(|_| AgentError::CertificateVerificationFailed())?;
922        Ok(())
923    }
924
925    /// Verify a certificate, checking delegation if present.
926    /// Only passes if the certificate is for the specified subnet.
927    pub fn verify_for_subnet(
928        &self,
929        cert: &Certificate,
930        subnet_id: Principal,
931    ) -> Result<(), AgentError> {
932        self.verify_cert_for_subnet(cert, subnet_id)?;
933        self.verify_cert_timestamp(cert)?;
934        Ok(())
935    }
936
937    fn verify_cert_for_subnet(
938        &self,
939        cert: &Certificate,
940        subnet_id: Principal,
941    ) -> Result<(), AgentError> {
942        let sig = &cert.signature;
943
944        let root_hash = cert.tree.digest();
945        let mut msg = vec![];
946        msg.extend_from_slice(IC_STATE_ROOT_DOMAIN_SEPARATOR);
947        msg.extend_from_slice(&root_hash);
948
949        let der_key = self.check_delegation_for_subnet(&cert.delegation, subnet_id)?;
950        let key = extract_der(der_key)?;
951
952        ic_verify_bls_signature::verify_bls_signature(sig, &msg, &key)
953            .map_err(|_| AgentError::CertificateVerificationFailed())?;
954        Ok(())
955    }
956
957    fn verify_cert_timestamp(&self, cert: &Certificate) -> Result<(), AgentError> {
958        // Verify that the certificate is not older than ingress expiry
959        // Certificates with timestamps in the future are allowed
960        let time = lookup_time(cert)?;
961        if (OffsetDateTime::now_utc()
962            - OffsetDateTime::from_unix_timestamp_nanos(time.into()).unwrap())
963            > self.ingress_expiry
964        {
965            Err(AgentError::CertificateOutdated(self.ingress_expiry))
966        } else {
967            Ok(())
968        }
969    }
970
971    fn check_delegation(
972        &self,
973        delegation: &Option<Delegation>,
974        effective_canister_id: Principal,
975    ) -> Result<Vec<u8>, AgentError> {
976        match delegation {
977            None => Ok(self.read_root_key()),
978            Some(delegation) => {
979                let cert: Certificate = serde_cbor::from_slice(&delegation.certificate)
980                    .map_err(AgentError::InvalidCborData)?;
981                if cert.delegation.is_some() {
982                    return Err(AgentError::CertificateHasTooManyDelegations);
983                }
984                self.verify_cert(&cert, effective_canister_id)?;
985                let canister_range_shards_lookup =
986                    ["canister_ranges".as_bytes(), delegation.subnet_id.as_ref()];
987                let ranges: Vec<(Principal, Principal)> =
988                    match lookup_tree(&cert.tree, canister_range_shards_lookup) {
989                        Ok(canister_range_shards) => {
990                            let mut shard_paths = canister_range_shards
991                                .list_paths() // /canister_ranges/<subnet_id>/<shard>
992                                .into_iter()
993                                .map(|mut x| {
994                                    x.pop() // flatten [label] to label
995                                        .ok_or_else(AgentError::CertificateVerificationFailed)
996                                })
997                                .collect::<Result<Vec<_>, _>>()?;
998                            if shard_paths.is_empty() {
999                                return Err(AgentError::CertificateNotAuthorized());
1000                            }
1001                            shard_paths.sort_unstable();
1002                            let shard_division = shard_paths.partition_point(|shard| {
1003                                shard.as_bytes() <= effective_canister_id.as_slice()
1004                            });
1005                            if shard_division == 0 {
1006                                // the certificate is not authorized to answer calls for this canister
1007                                return Err(AgentError::CertificateNotAuthorized());
1008                            }
1009                            let max_potential_shard = &shard_paths[shard_division - 1];
1010                            let canister_range = lookup_value(
1011                                &canister_range_shards,
1012                                [max_potential_shard.as_bytes()],
1013                            )?;
1014                            serde_cbor::from_slice(canister_range)
1015                                .map_err(AgentError::InvalidCborData)?
1016                        }
1017                        // Fall back to /subnet/<subnet_id>/canister_ranges (non-sharded)
1018                        Err(AgentError::LookupPathAbsent(_) | AgentError::LookupPathUnknown(_)) => {
1019                            let subnet_ranges_path = [
1020                                "subnet".as_bytes(),
1021                                delegation.subnet_id.as_ref(),
1022                                "canister_ranges".as_bytes(),
1023                            ];
1024                            let canister_range = lookup_value(&cert.tree, subnet_ranges_path)?;
1025                            serde_cbor::from_slice(canister_range)
1026                                .map_err(AgentError::InvalidCborData)?
1027                        }
1028                        Err(e) => return Err(e),
1029                    };
1030                if !principal_is_within_ranges(&effective_canister_id, &ranges[..]) {
1031                    // the certificate is not authorized to answer calls for this canister
1032                    return Err(AgentError::CertificateNotAuthorized());
1033                }
1034
1035                let public_key_path = [
1036                    "subnet".as_bytes(),
1037                    delegation.subnet_id.as_ref(),
1038                    "public_key".as_bytes(),
1039                ];
1040                lookup_value(&cert.tree, public_key_path).map(<[u8]>::to_vec)
1041            }
1042        }
1043    }
1044
1045    fn check_delegation_for_subnet(
1046        &self,
1047        delegation: &Option<Delegation>,
1048        subnet_id: Principal,
1049    ) -> Result<Vec<u8>, AgentError> {
1050        match delegation {
1051            None => Ok(self.read_root_key()),
1052            Some(delegation) => {
1053                let cert: Certificate = serde_cbor::from_slice(&delegation.certificate)
1054                    .map_err(AgentError::InvalidCborData)?;
1055                if cert.delegation.is_some() {
1056                    return Err(AgentError::CertificateHasTooManyDelegations);
1057                }
1058                self.verify_cert_for_subnet(&cert, subnet_id)?;
1059                let public_key_path = [
1060                    "subnet".as_bytes(),
1061                    subnet_id.as_ref(),
1062                    "public_key".as_bytes(),
1063                ];
1064                let pk = lookup_value(&cert.tree, public_key_path)
1065                    .map_err(|_| AgentError::CertificateNotAuthorized())?
1066                    .to_vec();
1067                Ok(pk)
1068            }
1069        }
1070    }
1071
1072    /// Request information about a particular canister for a single state subkey.
1073    /// See [the protocol docs](https://internetcomputer.org/docs/current/references/ic-interface-spec#state-tree-canister-information) for more information.
1074    pub async fn read_state_canister_info(
1075        &self,
1076        canister_id: Principal,
1077        path: &str,
1078    ) -> Result<Vec<u8>, AgentError> {
1079        let paths: Vec<Vec<Label>> = vec![vec![
1080            "canister".into(),
1081            Label::from_bytes(canister_id.as_slice()),
1082            path.into(),
1083        ]];
1084
1085        let cert = self.read_state_raw(paths, canister_id).await?;
1086
1087        lookup_canister_info(cert, canister_id, path)
1088    }
1089
1090    /// Request the controller list of a given canister.
1091    pub async fn read_state_canister_controllers(
1092        &self,
1093        canister_id: Principal,
1094    ) -> Result<Vec<Principal>, AgentError> {
1095        let blob = self
1096            .read_state_canister_info(canister_id, "controllers")
1097            .await?;
1098        let controllers: Vec<Principal> =
1099            serde_cbor::from_slice(&blob).map_err(AgentError::InvalidCborData)?;
1100        Ok(controllers)
1101    }
1102
1103    /// Request the module hash of a given canister.
1104    pub async fn read_state_canister_module_hash(
1105        &self,
1106        canister_id: Principal,
1107    ) -> Result<Vec<u8>, AgentError> {
1108        self.read_state_canister_info(canister_id, "module_hash")
1109            .await
1110    }
1111
1112    /// Request the bytes of the canister's custom section `icp:public <path>` or `icp:private <path>`.
1113    pub async fn read_state_canister_metadata(
1114        &self,
1115        canister_id: Principal,
1116        path: &str,
1117    ) -> Result<Vec<u8>, AgentError> {
1118        let paths: Vec<Vec<Label>> = vec![vec![
1119            "canister".into(),
1120            Label::from_bytes(canister_id.as_slice()),
1121            "metadata".into(),
1122            path.into(),
1123        ]];
1124
1125        let cert = self.read_state_raw(paths, canister_id).await?;
1126
1127        lookup_canister_metadata(cert, canister_id, path)
1128    }
1129
1130    /// Request a list of metrics about the subnet.
1131    pub async fn read_state_subnet_metrics(
1132        &self,
1133        subnet_id: Principal,
1134    ) -> Result<SubnetMetrics, AgentError> {
1135        let paths = vec![vec![
1136            "subnet".into(),
1137            Label::from_bytes(subnet_id.as_slice()),
1138            "metrics".into(),
1139        ]];
1140        let cert = self.read_subnet_state_raw(paths, subnet_id).await?;
1141        lookup_subnet_metrics(cert, subnet_id)
1142    }
1143
1144    /// Request a list of metrics about the subnet.
1145    pub async fn read_state_subnet_canister_ranges(
1146        &self,
1147        subnet_id: Principal,
1148    ) -> Result<Vec<(Principal, Principal)>, AgentError> {
1149        let paths = vec![vec![
1150            "subnet".into(),
1151            Label::from_bytes(subnet_id.as_slice()),
1152            "canister_ranges".into(),
1153        ]];
1154        let cert = self.read_subnet_state_raw(paths, subnet_id).await?;
1155        lookup_subnet_canister_ranges(&cert, subnet_id)
1156    }
1157
1158    /// Fetches the status of a particular request by its ID.
1159    pub async fn request_status_raw(
1160        &self,
1161        request_id: &RequestId,
1162        effective_canister_id: Principal,
1163    ) -> Result<(RequestStatusResponse, Certificate), AgentError> {
1164        let paths: Vec<Vec<Label>> =
1165            vec![vec!["request_status".into(), request_id.to_vec().into()]];
1166
1167        let cert = self.read_state_raw(paths, effective_canister_id).await?;
1168
1169        Ok((lookup_request_status(&cert, request_id)?, cert))
1170    }
1171
1172    /// Send the signed `request_status` to the network. Will return [`RequestStatusResponse`].
1173    /// The bytes will be checked to verify that it is a valid `request_status`.
1174    /// If you want to inspect the fields of the `request_status`, use [`signed_request_status_inspect`] before calling this method.
1175    pub async fn request_status_signed(
1176        &self,
1177        request_id: &RequestId,
1178        effective_canister_id: Principal,
1179        signed_request_status: Vec<u8>,
1180    ) -> Result<(RequestStatusResponse, Certificate), AgentError> {
1181        let _envelope: Envelope =
1182            serde_cbor::from_slice(&signed_request_status).map_err(AgentError::InvalidCborData)?;
1183        let read_state_response: ReadStateResponse = self
1184            .read_state_endpoint(effective_canister_id, signed_request_status)
1185            .await?;
1186
1187        let cert: Certificate = serde_cbor::from_slice(&read_state_response.certificate)
1188            .map_err(AgentError::InvalidCborData)?;
1189        self.verify(&cert, effective_canister_id)?;
1190        Ok((lookup_request_status(&cert, request_id)?, cert))
1191    }
1192
1193    /// Returns an `UpdateBuilder` enabling the construction of an update call without
1194    /// passing all arguments.
1195    pub fn update<S: Into<String>>(
1196        &self,
1197        canister_id: &Principal,
1198        method_name: S,
1199    ) -> UpdateBuilder<'_> {
1200        UpdateBuilder::new(self, *canister_id, method_name.into())
1201    }
1202
1203    /// Calls and returns the information returned by the status endpoint of a replica.
1204    pub async fn status(&self) -> Result<Status, AgentError> {
1205        let endpoint = "api/v2/status";
1206        let bytes = self.execute(Method::GET, endpoint, None).await?.1;
1207
1208        let cbor: serde_cbor::Value =
1209            serde_cbor::from_slice(&bytes).map_err(AgentError::InvalidCborData)?;
1210
1211        Status::try_from(&cbor).map_err(|_| AgentError::InvalidReplicaStatus)
1212    }
1213
1214    /// Returns a `QueryBuilder` enabling the construction of a query call without
1215    /// passing all arguments.
1216    pub fn query<S: Into<String>>(
1217        &self,
1218        canister_id: &Principal,
1219        method_name: S,
1220    ) -> QueryBuilder<'_> {
1221        QueryBuilder::new(self, *canister_id, method_name.into())
1222    }
1223
1224    /// Sign a `request_status` call. This will return a [`signed::SignedRequestStatus`]
1225    /// which contains all fields of the `request_status` and the signed `request_status` in CBOR encoding
1226    pub fn sign_request_status(
1227        &self,
1228        effective_canister_id: Principal,
1229        request_id: RequestId,
1230    ) -> Result<SignedRequestStatus, AgentError> {
1231        let paths: Vec<Vec<Label>> =
1232            vec![vec!["request_status".into(), request_id.to_vec().into()]];
1233        let read_state_content = self.read_state_content(paths)?;
1234        let signed_request_status = sign_envelope(&read_state_content, self.identity.clone())?;
1235        let ingress_expiry = read_state_content.ingress_expiry();
1236        let sender = *read_state_content.sender();
1237        Ok(SignedRequestStatus {
1238            ingress_expiry,
1239            sender,
1240            effective_canister_id,
1241            request_id,
1242            signed_request_status,
1243        })
1244    }
1245
1246    /// Retrieve subnet information for a canister. This uses an internal five-minute cache, fresh data can
1247    /// be fetched with [`fetch_subnet_by_canister`](Self::fetch_subnet_by_canister).
1248    pub async fn get_subnet_by_canister(
1249        &self,
1250        canister: &Principal,
1251    ) -> Result<Arc<Subnet>, AgentError> {
1252        let subnet = self
1253            .subnet_key_cache
1254            .lock()
1255            .unwrap()
1256            .get_subnet_by_canister(canister);
1257        if let Some(subnet) = subnet {
1258            Ok(subnet)
1259        } else {
1260            self.fetch_subnet_by_canister(canister).await
1261        }
1262    }
1263
1264    /// Retrieve subnet information for a subnet ID. This uses an internal five-minute cache, fresh data can
1265    /// be fetched with [`fetch_subnet_by_id`](Self::fetch_subnet_by_id).
1266    pub async fn get_subnet_by_id(&self, subnet_id: &Principal) -> Result<Arc<Subnet>, AgentError> {
1267        let subnet = self
1268            .subnet_key_cache
1269            .lock()
1270            .unwrap()
1271            .get_subnet_by_id(subnet_id);
1272        if let Some(subnet) = subnet {
1273            Ok(subnet)
1274        } else {
1275            self.fetch_subnet_by_id(subnet_id).await
1276        }
1277    }
1278
1279    /// Retrieve all existing API boundary nodes from the state tree via endpoint `/api/v3/canister/<effective_canister_id>/read_state`
1280    pub async fn fetch_api_boundary_nodes_by_canister_id(
1281        &self,
1282        canister_id: Principal,
1283    ) -> Result<Vec<ApiBoundaryNode>, AgentError> {
1284        let paths = vec![vec!["api_boundary_nodes".into()]];
1285        let certificate = self.read_state_raw(paths, canister_id).await?;
1286        let api_boundary_nodes = lookup_api_boundary_nodes(certificate)?;
1287        Ok(api_boundary_nodes)
1288    }
1289
1290    /// Retrieve all existing API boundary nodes from the state tree via endpoint `/api/v3/subnet/<subnet_id>/read_state`
1291    pub async fn fetch_api_boundary_nodes_by_subnet_id(
1292        &self,
1293        subnet_id: Principal,
1294    ) -> Result<Vec<ApiBoundaryNode>, AgentError> {
1295        let paths = vec![vec!["api_boundary_nodes".into()]];
1296        let certificate = self.read_subnet_state_raw(paths, subnet_id).await?;
1297        let api_boundary_nodes = lookup_api_boundary_nodes(certificate)?;
1298        Ok(api_boundary_nodes)
1299    }
1300
1301    /// Fetches and caches the subnet information for a canister.
1302    ///
1303    /// This function does not read from the cache; most users want
1304    /// [`get_subnet_by_canister`](Self::get_subnet_by_canister) instead.
1305    pub async fn fetch_subnet_by_canister(
1306        &self,
1307        canister: &Principal,
1308    ) -> Result<Arc<Subnet>, AgentError> {
1309        let canister_cert = self
1310            .read_state_raw(vec![vec!["subnet".into()]], *canister)
1311            .await?;
1312        let subnet_id = if let Some(delegation) = canister_cert.delegation.as_ref() {
1313            Principal::from_slice(&delegation.subnet_id)
1314        } else {
1315            // if no delegation, it comes from the root subnet
1316            Principal::self_authenticating(&self.root_key.read().unwrap()[..])
1317        };
1318        let mut subnet = lookup_incomplete_subnet(&subnet_id, &canister_cert)?;
1319        let canister_ranges = if let Some(delegation) = canister_cert.delegation.as_ref() {
1320            // non-root subnets will not serve /subnet/<>/canister_ranges when looked up by canister, but their delegation will contain /canister_ranges
1321            let delegation_cert: Certificate = serde_cbor::from_slice(&delegation.certificate)?;
1322            lookup_canister_ranges(&subnet_id, &delegation_cert)?
1323        } else {
1324            lookup_canister_ranges(&subnet_id, &canister_cert)?
1325        };
1326        subnet.canister_ranges = canister_ranges;
1327        if !subnet.canister_ranges.contains(canister) {
1328            return Err(AgentError::CertificateNotAuthorized());
1329        }
1330        let subnet = Arc::new(subnet);
1331        self.subnet_key_cache
1332            .lock()
1333            .unwrap()
1334            .insert_subnet(subnet_id, subnet.clone());
1335        Ok(subnet)
1336    }
1337
1338    /// Fetches and caches the subnet information for a subnet ID.
1339    ///
1340    /// This function does not read from the cache; most users want
1341    /// [`get_subnet_by_id`](Self::get_subnet_by_id) instead.
1342    pub async fn fetch_subnet_by_id(
1343        &self,
1344        subnet_id: &Principal,
1345    ) -> Result<Arc<Subnet>, AgentError> {
1346        let subnet_cert = self
1347            .read_subnet_state_raw(
1348                vec![
1349                    vec!["canister_ranges".into(), subnet_id.as_slice().into()],
1350                    vec!["subnet".into(), subnet_id.as_slice().into()],
1351                ],
1352                *subnet_id,
1353            )
1354            .await?;
1355        let subnet = lookup_subnet_and_ranges(subnet_id, &subnet_cert)?;
1356        let subnet = Arc::new(subnet);
1357        self.subnet_key_cache
1358            .lock()
1359            .unwrap()
1360            .insert_subnet(*subnet_id, subnet.clone());
1361        Ok(subnet)
1362    }
1363
1364    async fn request(
1365        &self,
1366        method: Method,
1367        endpoint: &str,
1368        body: Option<Vec<u8>>,
1369    ) -> Result<(StatusCode, HeaderMap, Vec<u8>), AgentError> {
1370        let body = body.map(Bytes::from);
1371
1372        let create_request_with_generated_url = || -> Result<http::Request<Bytes>, AgentError> {
1373            let url = self.route_provider.route()?.join(endpoint)?;
1374            let uri = Uri::from_str(url.as_str())
1375                .map_err(|e| AgentError::InvalidReplicaUrl(e.to_string()))?;
1376            let body = body.clone().unwrap_or_default();
1377            let request = http::Request::builder()
1378                .method(method.clone())
1379                .uri(uri)
1380                .header(CONTENT_TYPE, "application/cbor")
1381                .body(body)
1382                .map_err(|e| {
1383                    AgentError::TransportError(TransportError::Generic(format!(
1384                        "unable to create request: {e:#}"
1385                    )))
1386                })?;
1387
1388            Ok(request)
1389        };
1390
1391        let response = self
1392            .client
1393            .call(
1394                &create_request_with_generated_url,
1395                self.max_tcp_error_retries,
1396                self.max_response_body_size,
1397            )
1398            .await?;
1399
1400        let (parts, body) = response.into_parts();
1401
1402        Ok((parts.status, parts.headers, body.to_vec()))
1403    }
1404
1405    async fn execute(
1406        &self,
1407        method: Method,
1408        endpoint: &str,
1409        body: Option<Vec<u8>>,
1410    ) -> Result<(StatusCode, Vec<u8>), AgentError> {
1411        let request_result = self.request(method.clone(), endpoint, body.clone()).await?;
1412
1413        let status = request_result.0;
1414        let headers = request_result.1;
1415        let body = request_result.2;
1416
1417        if status.is_client_error() || status.is_server_error() {
1418            Err(AgentError::HttpError(HttpErrorPayload {
1419                status: status.into(),
1420                content_type: headers
1421                    .get(CONTENT_TYPE)
1422                    .and_then(|value| value.to_str().ok())
1423                    .map(str::to_string),
1424                content: body,
1425            }))
1426        } else if !(status == StatusCode::OK || status == StatusCode::ACCEPTED) {
1427            Err(AgentError::InvalidHttpResponse(format!(
1428                "Expected `200`, `202`, 4xx`, or `5xx` HTTP status code. Got: {status}",
1429            )))
1430        } else {
1431            Ok((status, body))
1432        }
1433    }
1434}
1435
1436// Checks if a principal is contained within a list of principal ranges
1437// A range is a tuple: (low: Principal, high: Principal), as described here: https://internetcomputer.org/docs/current/references/ic-interface-spec#state-tree-subnet
1438fn principal_is_within_ranges(principal: &Principal, ranges: &[(Principal, Principal)]) -> bool {
1439    ranges
1440        .iter()
1441        .any(|r| principal >= &r.0 && principal <= &r.1)
1442}
1443
1444fn sign_envelope(
1445    content: &EnvelopeContent,
1446    identity: Arc<dyn Identity>,
1447) -> Result<Vec<u8>, AgentError> {
1448    let signature = identity.sign(content).map_err(AgentError::SigningError)?;
1449
1450    let envelope = Envelope {
1451        content: Cow::Borrowed(content),
1452        sender_pubkey: signature.public_key,
1453        sender_sig: signature.signature,
1454        sender_delegation: signature.delegations,
1455    };
1456
1457    let mut serialized_bytes = Vec::new();
1458    let mut serializer = serde_cbor::Serializer::new(&mut serialized_bytes);
1459    serializer.self_describe()?;
1460    envelope.serialize(&mut serializer)?;
1461
1462    Ok(serialized_bytes)
1463}
1464
1465/// Inspect the bytes to be sent as a query
1466/// Return Ok only when the bytes can be deserialized as a query and all fields match with the arguments
1467pub fn signed_query_inspect(
1468    sender: Principal,
1469    canister_id: Principal,
1470    method_name: &str,
1471    arg: &[u8],
1472    ingress_expiry: u64,
1473    signed_query: Vec<u8>,
1474) -> Result<(), AgentError> {
1475    let envelope: Envelope =
1476        serde_cbor::from_slice(&signed_query).map_err(AgentError::InvalidCborData)?;
1477    match envelope.content.as_ref() {
1478        EnvelopeContent::Query {
1479            ingress_expiry: ingress_expiry_cbor,
1480            sender: sender_cbor,
1481            canister_id: canister_id_cbor,
1482            method_name: method_name_cbor,
1483            arg: arg_cbor,
1484            nonce: _nonce,
1485            sender_info: _,
1486        } => {
1487            if ingress_expiry != *ingress_expiry_cbor {
1488                return Err(AgentError::CallDataMismatch {
1489                    field: "ingress_expiry".to_string(),
1490                    value_arg: ingress_expiry.to_string(),
1491                    value_cbor: ingress_expiry_cbor.to_string(),
1492                });
1493            }
1494            if sender != *sender_cbor {
1495                return Err(AgentError::CallDataMismatch {
1496                    field: "sender".to_string(),
1497                    value_arg: sender.to_string(),
1498                    value_cbor: sender_cbor.to_string(),
1499                });
1500            }
1501            if canister_id != *canister_id_cbor {
1502                return Err(AgentError::CallDataMismatch {
1503                    field: "canister_id".to_string(),
1504                    value_arg: canister_id.to_string(),
1505                    value_cbor: canister_id_cbor.to_string(),
1506                });
1507            }
1508            if method_name != *method_name_cbor {
1509                return Err(AgentError::CallDataMismatch {
1510                    field: "method_name".to_string(),
1511                    value_arg: method_name.to_string(),
1512                    value_cbor: method_name_cbor.clone(),
1513                });
1514            }
1515            if arg != *arg_cbor {
1516                return Err(AgentError::CallDataMismatch {
1517                    field: "arg".to_string(),
1518                    value_arg: format!("{arg:?}"),
1519                    value_cbor: format!("{arg_cbor:?}"),
1520                });
1521            }
1522        }
1523        EnvelopeContent::Call { .. } => {
1524            return Err(AgentError::CallDataMismatch {
1525                field: "request_type".to_string(),
1526                value_arg: "query".to_string(),
1527                value_cbor: "call".to_string(),
1528            })
1529        }
1530        EnvelopeContent::ReadState { .. } => {
1531            return Err(AgentError::CallDataMismatch {
1532                field: "request_type".to_string(),
1533                value_arg: "query".to_string(),
1534                value_cbor: "read_state".to_string(),
1535            })
1536        }
1537    }
1538    Ok(())
1539}
1540
1541/// Inspect the bytes to be sent as an update
1542/// Return Ok only when the bytes can be deserialized as an update and all fields match with the arguments
1543pub fn signed_update_inspect(
1544    sender: Principal,
1545    canister_id: Principal,
1546    method_name: &str,
1547    arg: &[u8],
1548    ingress_expiry: u64,
1549    signed_update: Vec<u8>,
1550) -> Result<(), AgentError> {
1551    let envelope: Envelope =
1552        serde_cbor::from_slice(&signed_update).map_err(AgentError::InvalidCborData)?;
1553    match envelope.content.as_ref() {
1554        EnvelopeContent::Call {
1555            nonce: _nonce,
1556            ingress_expiry: ingress_expiry_cbor,
1557            sender: sender_cbor,
1558            canister_id: canister_id_cbor,
1559            method_name: method_name_cbor,
1560            arg: arg_cbor,
1561            sender_info: _,
1562        } => {
1563            if ingress_expiry != *ingress_expiry_cbor {
1564                return Err(AgentError::CallDataMismatch {
1565                    field: "ingress_expiry".to_string(),
1566                    value_arg: ingress_expiry.to_string(),
1567                    value_cbor: ingress_expiry_cbor.to_string(),
1568                });
1569            }
1570            if sender != *sender_cbor {
1571                return Err(AgentError::CallDataMismatch {
1572                    field: "sender".to_string(),
1573                    value_arg: sender.to_string(),
1574                    value_cbor: sender_cbor.to_string(),
1575                });
1576            }
1577            if canister_id != *canister_id_cbor {
1578                return Err(AgentError::CallDataMismatch {
1579                    field: "canister_id".to_string(),
1580                    value_arg: canister_id.to_string(),
1581                    value_cbor: canister_id_cbor.to_string(),
1582                });
1583            }
1584            if method_name != *method_name_cbor {
1585                return Err(AgentError::CallDataMismatch {
1586                    field: "method_name".to_string(),
1587                    value_arg: method_name.to_string(),
1588                    value_cbor: method_name_cbor.clone(),
1589                });
1590            }
1591            if arg != *arg_cbor {
1592                return Err(AgentError::CallDataMismatch {
1593                    field: "arg".to_string(),
1594                    value_arg: format!("{arg:?}"),
1595                    value_cbor: format!("{arg_cbor:?}"),
1596                });
1597            }
1598        }
1599        EnvelopeContent::ReadState { .. } => {
1600            return Err(AgentError::CallDataMismatch {
1601                field: "request_type".to_string(),
1602                value_arg: "call".to_string(),
1603                value_cbor: "read_state".to_string(),
1604            })
1605        }
1606        EnvelopeContent::Query { .. } => {
1607            return Err(AgentError::CallDataMismatch {
1608                field: "request_type".to_string(),
1609                value_arg: "call".to_string(),
1610                value_cbor: "query".to_string(),
1611            })
1612        }
1613    }
1614    Ok(())
1615}
1616
1617/// Inspect the bytes to be sent as a `request_status`
1618/// Return Ok only when the bytes can be deserialized as a `request_status` and all fields match with the arguments
1619pub fn signed_request_status_inspect(
1620    sender: Principal,
1621    request_id: &RequestId,
1622    ingress_expiry: u64,
1623    signed_request_status: Vec<u8>,
1624) -> Result<(), AgentError> {
1625    let paths: Vec<Vec<Label>> = vec![vec!["request_status".into(), request_id.to_vec().into()]];
1626    let envelope: Envelope =
1627        serde_cbor::from_slice(&signed_request_status).map_err(AgentError::InvalidCborData)?;
1628    match envelope.content.as_ref() {
1629        EnvelopeContent::ReadState {
1630            ingress_expiry: ingress_expiry_cbor,
1631            sender: sender_cbor,
1632            paths: paths_cbor,
1633        } => {
1634            if ingress_expiry != *ingress_expiry_cbor {
1635                return Err(AgentError::CallDataMismatch {
1636                    field: "ingress_expiry".to_string(),
1637                    value_arg: ingress_expiry.to_string(),
1638                    value_cbor: ingress_expiry_cbor.to_string(),
1639                });
1640            }
1641            if sender != *sender_cbor {
1642                return Err(AgentError::CallDataMismatch {
1643                    field: "sender".to_string(),
1644                    value_arg: sender.to_string(),
1645                    value_cbor: sender_cbor.to_string(),
1646                });
1647            }
1648
1649            if paths != *paths_cbor {
1650                return Err(AgentError::CallDataMismatch {
1651                    field: "paths".to_string(),
1652                    value_arg: format!("{paths:?}"),
1653                    value_cbor: format!("{paths_cbor:?}"),
1654                });
1655            }
1656        }
1657        EnvelopeContent::Query { .. } => {
1658            return Err(AgentError::CallDataMismatch {
1659                field: "request_type".to_string(),
1660                value_arg: "read_state".to_string(),
1661                value_cbor: "query".to_string(),
1662            })
1663        }
1664        EnvelopeContent::Call { .. } => {
1665            return Err(AgentError::CallDataMismatch {
1666                field: "request_type".to_string(),
1667                value_arg: "read_state".to_string(),
1668                value_cbor: "call".to_string(),
1669            })
1670        }
1671    }
1672    Ok(())
1673}
1674
1675#[derive(Clone)]
1676struct SubnetCache {
1677    subnets: TimedCache<Principal, Arc<Subnet>>,
1678    canister_index: RangeInclusiveMap<Principal, Principal>,
1679}
1680
1681impl SubnetCache {
1682    fn new() -> Self {
1683        Self {
1684            subnets: TimedCache::with_lifespan(Duration::from_secs(300)),
1685            canister_index: RangeInclusiveMap::new(),
1686        }
1687    }
1688
1689    fn get_subnet_by_canister(&mut self, canister: &Principal) -> Option<Arc<Subnet>> {
1690        self.canister_index
1691            .get(canister)
1692            .and_then(|subnet_id| self.subnets.cache_get(subnet_id).cloned())
1693            .filter(|subnet| subnet.canister_ranges.contains(canister))
1694    }
1695
1696    fn get_subnet_by_id(&mut self, subnet_id: &Principal) -> Option<Arc<Subnet>> {
1697        self.subnets.cache_get(subnet_id).cloned()
1698    }
1699
1700    fn insert_subnet(&mut self, subnet_id: Principal, subnet: Arc<Subnet>) {
1701        self.subnets.cache_set(subnet_id, subnet.clone());
1702        for range in subnet.canister_ranges.iter() {
1703            self.canister_index.insert(range.clone(), subnet_id);
1704        }
1705    }
1706}
1707
1708/// API boundary node, which routes /api calls to IC replica nodes.
1709#[derive(Debug, Clone)]
1710pub struct ApiBoundaryNode {
1711    /// Domain name
1712    pub domain: String,
1713    /// IPv6 address in the hexadecimal notation with colons.
1714    pub ipv6_address: String,
1715    /// IPv4 address in the dotted-decimal notation.
1716    pub ipv4_address: Option<String>,
1717}
1718
1719/// A query request builder.
1720///
1721/// This makes it easier to do query calls without actually passing all arguments.
1722#[derive(Debug, Clone)]
1723#[non_exhaustive]
1724pub struct QueryBuilder<'agent> {
1725    agent: &'agent Agent,
1726    /// The [effective canister ID](https://internetcomputer.org/docs/current/references/ic-interface-spec#http-effective-canister-id) of the destination.
1727    pub effective_canister_id: Principal,
1728    /// The principal ID of the canister being called.
1729    pub canister_id: Principal,
1730    /// The name of the canister method being called.
1731    pub method_name: String,
1732    /// The argument blob to be passed to the method.
1733    pub arg: Vec<u8>,
1734    /// The Unix timestamp that the request will expire at.
1735    pub ingress_expiry_datetime: Option<u64>,
1736    /// Whether to include a nonce with the message.
1737    pub use_nonce: bool,
1738}
1739
1740impl<'agent> QueryBuilder<'agent> {
1741    /// Creates a new query builder with an agent for a particular canister method.
1742    pub fn new(agent: &'agent Agent, canister_id: Principal, method_name: String) -> Self {
1743        Self {
1744            agent,
1745            effective_canister_id: canister_id,
1746            canister_id,
1747            method_name,
1748            arg: vec![],
1749            ingress_expiry_datetime: None,
1750            use_nonce: false,
1751        }
1752    }
1753
1754    /// Sets the [effective canister ID](https://internetcomputer.org/docs/current/references/ic-interface-spec#http-effective-canister-id) of the destination.
1755    pub fn with_effective_canister_id(mut self, canister_id: Principal) -> Self {
1756        self.effective_canister_id = canister_id;
1757        self
1758    }
1759
1760    /// Sets the argument blob to pass to the canister. For most canisters this should be a Candid-serialized tuple.
1761    pub fn with_arg<A: Into<Vec<u8>>>(mut self, arg: A) -> Self {
1762        self.arg = arg.into();
1763        self
1764    }
1765
1766    /// Sets `ingress_expiry_datetime` to the provided timestamp, at nanosecond precision.
1767    pub fn expire_at(mut self, time: impl Into<OffsetDateTime>) -> Self {
1768        self.ingress_expiry_datetime = Some(time.into().unix_timestamp_nanos() as u64);
1769        self
1770    }
1771
1772    /// Sets `ingress_expiry_datetime` to `max(now, 4min)`.
1773    pub fn expire_after(mut self, duration: Duration) -> Self {
1774        self.ingress_expiry_datetime = Some(
1775            OffsetDateTime::now_utc()
1776                .saturating_add(duration.try_into().expect("negative duration"))
1777                .unix_timestamp_nanos() as u64,
1778        );
1779        self
1780    }
1781
1782    /// Uses a nonce generated with the agent's configured nonce factory. By default queries do not use nonces,
1783    /// and thus may get a (briefly) cached response.
1784    pub fn with_nonce_generation(mut self) -> Self {
1785        self.use_nonce = true;
1786        self
1787    }
1788
1789    /// Make a query call. This will return a byte vector.
1790    pub async fn call(self) -> Result<Vec<u8>, AgentError> {
1791        self.agent
1792            .query_raw(
1793                self.canister_id,
1794                self.effective_canister_id,
1795                self.method_name,
1796                self.arg,
1797                self.ingress_expiry_datetime,
1798                self.use_nonce,
1799                None,
1800            )
1801            .await
1802    }
1803
1804    /// Make a query call with signature verification. This will return a byte vector.
1805    ///
1806    /// Compared with [call][Self::call], this method will **always** verify the signature of the query response
1807    /// regardless the Agent level configuration from [`AgentBuilder::with_verify_query_signatures`].
1808    pub async fn call_with_verification(self) -> Result<Vec<u8>, AgentError> {
1809        self.agent
1810            .query_raw(
1811                self.canister_id,
1812                self.effective_canister_id,
1813                self.method_name,
1814                self.arg,
1815                self.ingress_expiry_datetime,
1816                self.use_nonce,
1817                Some(true),
1818            )
1819            .await
1820    }
1821
1822    /// Make a query call without signature verification. This will return a byte vector.
1823    ///
1824    /// Compared with [call][Self::call], this method will **never** verify the signature of the query response
1825    /// regardless the Agent level configuration from [`AgentBuilder::with_verify_query_signatures`].
1826    pub async fn call_without_verification(self) -> Result<Vec<u8>, AgentError> {
1827        self.agent
1828            .query_raw(
1829                self.canister_id,
1830                self.effective_canister_id,
1831                self.method_name,
1832                self.arg,
1833                self.ingress_expiry_datetime,
1834                self.use_nonce,
1835                Some(false),
1836            )
1837            .await
1838    }
1839
1840    /// Sign a query call. This will return a [`signed::SignedQuery`]
1841    /// which contains all fields of the query and the signed query in CBOR encoding
1842    pub fn sign(self) -> Result<SignedQuery, AgentError> {
1843        let effective_canister_id = self.effective_canister_id;
1844        let identity = self.agent.identity.clone();
1845        let content = self.into_envelope()?;
1846        let signed_query = sign_envelope(&content, identity)?;
1847        let EnvelopeContent::Query {
1848            ingress_expiry,
1849            sender,
1850            canister_id,
1851            method_name,
1852            arg,
1853            nonce,
1854            sender_info,
1855        } = content
1856        else {
1857            unreachable!()
1858        };
1859        Ok(SignedQuery {
1860            ingress_expiry,
1861            sender,
1862            canister_id,
1863            method_name,
1864            arg,
1865            effective_canister_id,
1866            signed_query,
1867            nonce,
1868            sender_info,
1869        })
1870    }
1871
1872    /// Converts the query builder into [`EnvelopeContent`] for external signing or storage.
1873    pub fn into_envelope(self) -> Result<EnvelopeContent, AgentError> {
1874        self.agent.query_content(
1875            self.canister_id,
1876            self.method_name,
1877            self.arg,
1878            self.ingress_expiry_datetime,
1879            self.use_nonce,
1880        )
1881    }
1882}
1883
1884impl<'agent> IntoFuture for QueryBuilder<'agent> {
1885    type IntoFuture = AgentFuture<'agent, Vec<u8>>;
1886    type Output = Result<Vec<u8>, AgentError>;
1887    fn into_future(self) -> Self::IntoFuture {
1888        Box::pin(self.call())
1889    }
1890}
1891
1892/// An in-flight canister update call. Useful primarily as a `Future`.
1893pub struct UpdateCall<'agent> {
1894    agent: &'agent Agent,
1895    response_future: AgentFuture<'agent, CallResponse<(Vec<u8>, Certificate)>>,
1896    effective_canister_id: Principal,
1897    canister_id: Principal,
1898    method_name: String,
1899}
1900
1901impl fmt::Debug for UpdateCall<'_> {
1902    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1903        f.debug_struct("UpdateCall")
1904            .field("agent", &self.agent)
1905            .field("effective_canister_id", &self.effective_canister_id)
1906            .finish_non_exhaustive()
1907    }
1908}
1909
1910impl Future for UpdateCall<'_> {
1911    type Output = Result<CallResponse<(Vec<u8>, Certificate)>, AgentError>;
1912    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1913        self.response_future.as_mut().poll(cx)
1914    }
1915}
1916
1917impl<'a> UpdateCall<'a> {
1918    /// Waits for the update call to be completed, polling if necessary.
1919    pub async fn and_wait(self) -> Result<(Vec<u8>, Certificate), AgentError> {
1920        let response = self.response_future.await?;
1921
1922        match response {
1923            CallResponse::Response(response) => Ok(response),
1924            CallResponse::Poll(request_id) => {
1925                self.agent
1926                    .wait_inner(
1927                        &request_id,
1928                        self.effective_canister_id,
1929                        Some(Operation::Call {
1930                            canister: self.canister_id,
1931                            method: self.method_name,
1932                        }),
1933                    )
1934                    .await
1935            }
1936        }
1937    }
1938}
1939/// An update request Builder.
1940///
1941/// This makes it easier to do update calls without actually passing all arguments or specifying
1942/// if you want to wait or not.
1943#[derive(Debug)]
1944pub struct UpdateBuilder<'agent> {
1945    agent: &'agent Agent,
1946    /// The [effective canister ID](https://internetcomputer.org/docs/current/references/ic-interface-spec#http-effective-canister-id) of the destination.
1947    pub effective_canister_id: Principal,
1948    /// The principal ID of the canister being called.
1949    pub canister_id: Principal,
1950    /// The name of the canister method being called.
1951    pub method_name: String,
1952    /// The argument blob to be passed to the method.
1953    pub arg: Vec<u8>,
1954    /// The Unix timestamp that the request will expire at.
1955    pub ingress_expiry_datetime: Option<u64>,
1956}
1957
1958impl<'agent> UpdateBuilder<'agent> {
1959    /// Creates a new update builder with an agent for a particular canister method.
1960    pub fn new(agent: &'agent Agent, canister_id: Principal, method_name: String) -> Self {
1961        Self {
1962            agent,
1963            effective_canister_id: canister_id,
1964            canister_id,
1965            method_name,
1966            arg: vec![],
1967            ingress_expiry_datetime: None,
1968        }
1969    }
1970
1971    /// Sets the [effective canister ID](https://internetcomputer.org/docs/current/references/ic-interface-spec#http-effective-canister-id) of the destination.
1972    pub fn with_effective_canister_id(mut self, canister_id: Principal) -> Self {
1973        self.effective_canister_id = canister_id;
1974        self
1975    }
1976
1977    /// Sets the argument blob to pass to the canister. For most canisters this should be a Candid-serialized tuple.
1978    pub fn with_arg<A: Into<Vec<u8>>>(mut self, arg: A) -> Self {
1979        self.arg = arg.into();
1980        self
1981    }
1982
1983    /// Sets `ingress_expiry_datetime` to the provided timestamp, at nanosecond precision.
1984    pub fn expire_at(mut self, time: impl Into<OffsetDateTime>) -> Self {
1985        self.ingress_expiry_datetime = Some(time.into().unix_timestamp_nanos() as u64);
1986        self
1987    }
1988
1989    /// Sets `ingress_expiry_datetime` to `min(now, 4min)`.
1990    pub fn expire_after(mut self, duration: Duration) -> Self {
1991        self.ingress_expiry_datetime = Some(
1992            OffsetDateTime::now_utc()
1993                .saturating_add(duration.try_into().expect("negative duration"))
1994                .unix_timestamp_nanos() as u64,
1995        );
1996        self
1997    }
1998
1999    /// Make an update call. This will call `request_status` on the `RequestId` in a loop and return
2000    /// the response as a byte vector.
2001    pub async fn call_and_wait(self) -> Result<Vec<u8>, AgentError> {
2002        self.call().and_wait().await.map(|x| x.0)
2003    }
2004
2005    /// Make an update call. This will return a `RequestId`.
2006    /// The `RequestId` should then be used for `request_status` (most likely in a loop).
2007    pub fn call(self) -> UpdateCall<'agent> {
2008        let method_name = self.method_name.clone();
2009        let response_future = async move {
2010            self.agent
2011                .update_raw(
2012                    self.canister_id,
2013                    self.effective_canister_id,
2014                    self.method_name,
2015                    self.arg,
2016                    self.ingress_expiry_datetime,
2017                )
2018                .await
2019        };
2020        UpdateCall {
2021            agent: self.agent,
2022            response_future: Box::pin(response_future),
2023            effective_canister_id: self.effective_canister_id,
2024            canister_id: self.canister_id,
2025            method_name,
2026        }
2027    }
2028
2029    /// Sign a update call. This will return a [`signed::SignedUpdate`]
2030    /// which contains all fields of the update and the signed update in CBOR encoding
2031    pub fn sign(self) -> Result<SignedUpdate, AgentError> {
2032        let identity = self.agent.identity.clone();
2033        let effective_canister_id = self.effective_canister_id;
2034        let content = self.into_envelope()?;
2035        let signed_update = sign_envelope(&content, identity)?;
2036        let request_id = to_request_id(&content)?;
2037        let EnvelopeContent::Call {
2038            nonce,
2039            ingress_expiry,
2040            sender,
2041            canister_id,
2042            method_name,
2043            arg,
2044            sender_info,
2045        } = content
2046        else {
2047            unreachable!()
2048        };
2049        Ok(SignedUpdate {
2050            nonce,
2051            ingress_expiry,
2052            sender,
2053            canister_id,
2054            method_name,
2055            arg,
2056            effective_canister_id,
2057            signed_update,
2058            request_id,
2059            sender_info,
2060        })
2061    }
2062
2063    /// Converts the update builder into an [`EnvelopeContent`] for external signing or storage.
2064    pub fn into_envelope(self) -> Result<EnvelopeContent, AgentError> {
2065        let nonce = self.agent.nonce_factory.generate();
2066        self.agent.update_content(
2067            self.canister_id,
2068            self.method_name,
2069            self.arg,
2070            self.ingress_expiry_datetime,
2071            nonce,
2072        )
2073    }
2074}
2075
2076impl<'agent> IntoFuture for UpdateBuilder<'agent> {
2077    type IntoFuture = AgentFuture<'agent, Vec<u8>>;
2078    type Output = Result<Vec<u8>, AgentError>;
2079    fn into_future(self) -> Self::IntoFuture {
2080        Box::pin(self.call_and_wait())
2081    }
2082}
2083
2084/// HTTP client middleware. Implemented automatically for `reqwest`-compatible by-ref `tower::Service`, such as `reqwest_middleware`.
2085#[cfg_attr(target_family = "wasm", async_trait(?Send))]
2086#[cfg_attr(not(target_family = "wasm"), async_trait)]
2087pub trait HttpService: Send + Sync + Debug {
2088    /// Perform a HTTP request. Any retry logic should call `req` again to get a new request.
2089    async fn call<'a>(
2090        &'a self,
2091        req: &'a (dyn Fn() -> Result<http::Request<Bytes>, AgentError> + Send + Sync),
2092        max_retries: usize,
2093        size_limit: Option<usize>,
2094    ) -> Result<http::Response<Bytes>, AgentError>;
2095}
2096
2097/// Convert from http Request to reqwest's one
2098fn from_http_request(req: http::Request<Bytes>) -> Result<Request, AgentError> {
2099    let (parts, body) = req.into_parts();
2100    let body = reqwest::Body::from(body);
2101    // I think it can never fail since it converts from `Url` to `Uri` and `Url` is a subset of `Uri`,
2102    // but just to be safe let's handle it.
2103    let request = http::Request::from_parts(parts, body)
2104        .try_into()
2105        .map_err(|e: reqwest::Error| AgentError::InvalidReplicaUrl(e.to_string()))?;
2106
2107    Ok(request)
2108}
2109
2110/// Convert from reqwests's Response to http one
2111#[cfg(not(target_family = "wasm"))]
2112async fn to_http_response(
2113    resp: Response,
2114    size_limit: Option<usize>,
2115) -> Result<http::Response<Bytes>, AgentError> {
2116    use http_body_util::{BodyExt, Limited};
2117
2118    let resp: http::Response<reqwest::Body> = resp.into();
2119    let (parts, body) = resp.into_parts();
2120    let body = Limited::new(body, size_limit.unwrap_or(usize::MAX));
2121    let body = body
2122        .collect()
2123        .await
2124        .map_err(|e| {
2125            AgentError::TransportError(TransportError::Generic(format!(
2126                "unable to read response body: {e:#}"
2127            )))
2128        })?
2129        .to_bytes();
2130    let resp = http::Response::from_parts(parts, body);
2131
2132    Ok(resp)
2133}
2134
2135/// Convert from reqwests's Response to http one.
2136/// WASM Response in reqwest doesn't have direct conversion for http::Response,
2137/// so we have to hack around using streams.
2138#[cfg(target_family = "wasm")]
2139async fn to_http_response(
2140    resp: Response,
2141    size_limit: Option<usize>,
2142) -> Result<http::Response<Bytes>, AgentError> {
2143    use futures_util::StreamExt;
2144    use http_body::Frame;
2145    use http_body_util::{Limited, StreamBody};
2146
2147    // Save headers
2148    let status = resp.status();
2149    let headers = resp.headers().clone();
2150
2151    // Convert body
2152    let stream = resp.bytes_stream().map(|x| x.map(Frame::data));
2153    let body = StreamBody::new(stream);
2154    let body = Limited::new(body, size_limit.unwrap_or(usize::MAX));
2155    let body = http_body_util::BodyExt::collect(body)
2156        .await
2157        .map_err(|e| {
2158            AgentError::TransportError(TransportError::Generic(format!(
2159                "unable to read response body: {e:#}"
2160            )))
2161        })?
2162        .to_bytes();
2163
2164    let mut resp = http::Response::new(body);
2165    *resp.status_mut() = status;
2166    *resp.headers_mut() = headers;
2167
2168    Ok(resp)
2169}
2170
2171#[cfg(not(target_family = "wasm"))]
2172#[async_trait]
2173impl<T> HttpService for T
2174where
2175    for<'a> &'a T: Service<Request, Response = Response, Error = reqwest::Error>,
2176    for<'a> <&'a Self as Service<Request>>::Future: Send,
2177    T: Send + Sync + Debug + ?Sized,
2178{
2179    #[allow(clippy::needless_arbitrary_self_type)]
2180    async fn call<'a>(
2181        mut self: &'a Self,
2182        req: &'a (dyn Fn() -> Result<http::Request<Bytes>, AgentError> + Send + Sync),
2183        max_retries: usize,
2184        size_limit: Option<usize>,
2185    ) -> Result<http::Response<Bytes>, AgentError> {
2186        let mut retry_count = 0;
2187        loop {
2188            let request = from_http_request(req()?)?;
2189
2190            match Service::call(&mut self, request).await {
2191                Err(err) => {
2192                    // Network-related errors can be retried.
2193                    if err.is_connect() {
2194                        if retry_count >= max_retries {
2195                            return Err(AgentError::TransportError(TransportError::Reqwest(err)));
2196                        }
2197                        retry_count += 1;
2198                    }
2199                    // All other errors return immediately.
2200                    else {
2201                        return Err(AgentError::TransportError(TransportError::Reqwest(err)));
2202                    }
2203                }
2204
2205                Ok(resp) => {
2206                    let resp = to_http_response(resp, size_limit).await?;
2207                    return Ok(resp);
2208                }
2209            }
2210        }
2211    }
2212}
2213
2214#[cfg(target_family = "wasm")]
2215#[async_trait(?Send)]
2216impl<T> HttpService for T
2217where
2218    for<'a> &'a T: Service<Request, Response = Response, Error = reqwest::Error>,
2219    T: Send + Sync + Debug + ?Sized,
2220{
2221    #[allow(clippy::needless_arbitrary_self_type)]
2222    async fn call<'a>(
2223        mut self: &'a Self,
2224        req: &'a (dyn Fn() -> Result<http::Request<Bytes>, AgentError> + Send + Sync),
2225        _retries: usize,
2226        _size_limit: Option<usize>,
2227    ) -> Result<http::Response<Bytes>, AgentError> {
2228        let request = from_http_request(req()?)?;
2229        let response = Service::call(&mut self, request)
2230            .await
2231            .map_err(|e| AgentError::TransportError(TransportError::Reqwest(e)))?;
2232
2233        to_http_response(response, _size_limit).await
2234    }
2235}
2236
2237#[derive(Debug)]
2238struct Retry429Logic {
2239    client: Client,
2240}
2241
2242#[cfg_attr(target_family = "wasm", async_trait(?Send))]
2243#[cfg_attr(not(target_family = "wasm"), async_trait)]
2244impl HttpService for Retry429Logic {
2245    async fn call<'a>(
2246        &'a self,
2247        req: &'a (dyn Fn() -> Result<http::Request<Bytes>, AgentError> + Send + Sync),
2248        _max_tcp_retries: usize,
2249        _size_limit: Option<usize>,
2250    ) -> Result<http::Response<Bytes>, AgentError> {
2251        let mut retries = 0;
2252        loop {
2253            #[cfg(not(target_family = "wasm"))]
2254            let resp = self.client.call(req, _max_tcp_retries, _size_limit).await?;
2255            // Client inconveniently does not implement Service on wasm
2256            #[cfg(target_family = "wasm")]
2257            let resp = {
2258                let request = from_http_request(req()?)?;
2259                let resp = self
2260                    .client
2261                    .execute(request)
2262                    .await
2263                    .map_err(|e| AgentError::TransportError(TransportError::Reqwest(e)))?;
2264
2265                to_http_response(resp, _size_limit).await?
2266            };
2267
2268            if resp.status() == StatusCode::TOO_MANY_REQUESTS {
2269                if retries == 6 {
2270                    break Ok(resp);
2271                } else {
2272                    retries += 1;
2273                    crate::util::sleep(Duration::from_millis(250)).await;
2274                    continue;
2275                }
2276            } else {
2277                break Ok(resp);
2278            }
2279        }
2280    }
2281}
2282
2283#[cfg(all(test, not(target_family = "wasm")))]
2284mod offline_tests {
2285    use super::*;
2286    use tokio::net::TcpListener;
2287    // Any tests that involve the network should go in agent_test, not here.
2288
2289    #[test]
2290    fn rounded_expiry() {
2291        let agent = Agent::builder()
2292            .with_url("http://not-a-real-url")
2293            .build()
2294            .unwrap();
2295        let mut prev_expiry = None;
2296        let mut num_timestamps = 0;
2297        for _ in 0..6 {
2298            let update = agent
2299                .update(&Principal::management_canister(), "not_a_method")
2300                .sign()
2301                .unwrap();
2302            if prev_expiry < Some(update.ingress_expiry) {
2303                prev_expiry = Some(update.ingress_expiry);
2304                num_timestamps += 1;
2305            }
2306        }
2307        // in six requests, there should be no more than two timestamps
2308        assert!(num_timestamps <= 2, "num_timestamps:{num_timestamps} > 2");
2309    }
2310
2311    #[tokio::test]
2312    async fn client_ratelimit() {
2313        let mock_server = TcpListener::bind("127.0.0.1:0").await.unwrap();
2314        let count = Arc::new(Mutex::new(0));
2315        let port = mock_server.local_addr().unwrap().port();
2316        tokio::spawn({
2317            let count = count.clone();
2318            async move {
2319                loop {
2320                    let (mut conn, _) = mock_server.accept().await.unwrap();
2321                    *count.lock().unwrap() += 1;
2322                    tokio::spawn(
2323                        // read all data, never reply
2324                        async move { tokio::io::copy(&mut conn, &mut tokio::io::sink()).await },
2325                    );
2326                }
2327            }
2328        });
2329        let agent = Agent::builder()
2330            .with_http_client(Client::builder().http1_only().build().unwrap())
2331            .with_url(format!("http://127.0.0.1:{port}"))
2332            .with_max_concurrent_requests(2)
2333            .build()
2334            .unwrap();
2335        for _ in 0..3 {
2336            let agent = agent.clone();
2337            tokio::spawn(async move {
2338                agent
2339                    .query(&"ryjl3-tyaaa-aaaaa-aaaba-cai".parse().unwrap(), "greet")
2340                    .call()
2341                    .await
2342            });
2343        }
2344        crate::util::sleep(Duration::from_millis(250)).await;
2345        assert_eq!(*count.lock().unwrap(), 2);
2346    }
2347}