ic_agent/agent/
mod.rs

1//! The main Agent module. Contains the [Agent] type and all associated structures.
2pub(crate) mod agent_config;
3pub mod agent_error;
4pub(crate) mod builder;
5// delete this module after 0.40
6#[doc(hidden)]
7#[deprecated(since = "0.38.0", note = "use the AgentBuilder methods")]
8pub mod http_transport;
9pub(crate) mod nonce;
10pub(crate) mod response_authentication;
11pub mod route_provider;
12pub mod status;
13
14pub use agent_config::AgentConfig;
15pub use agent_error::AgentError;
16use agent_error::{HttpErrorPayload, Operation};
17use async_lock::Semaphore;
18use async_trait::async_trait;
19pub use builder::AgentBuilder;
20use bytes::Bytes;
21use cached::{Cached, TimedCache};
22use http::{header::CONTENT_TYPE, HeaderMap, Method, StatusCode, Uri};
23use ic_ed25519::{PublicKey, SignatureError};
24#[doc(inline)]
25pub use ic_transport_types::{
26    signed, CallResponse, Envelope, EnvelopeContent, RejectCode, RejectResponse, ReplyResponse,
27    RequestStatusResponse,
28};
29pub use nonce::{NonceFactory, NonceGenerator};
30use rangemap::{RangeInclusiveMap, RangeInclusiveSet, StepFns};
31use reqwest::{Client, Request, Response};
32use route_provider::{
33    dynamic_routing::{
34        dynamic_route_provider::DynamicRouteProviderBuilder, node::Node,
35        snapshot::latency_based_routing::LatencyRoutingSnapshot,
36    },
37    RouteProvider, UrlUntilReady,
38};
39use time::OffsetDateTime;
40use tower_service::Service;
41
42#[cfg(test)]
43mod agent_test;
44
45use crate::{
46    agent::response_authentication::{
47        extract_der, lookup_canister_info, lookup_canister_metadata, lookup_request_status,
48        lookup_subnet, lookup_subnet_canister_ranges, lookup_subnet_metrics, lookup_time,
49        lookup_value,
50    },
51    agent_error::TransportError,
52    export::Principal,
53    identity::Identity,
54    to_request_id, RequestId,
55};
56use backoff::{backoff::Backoff, ExponentialBackoffBuilder};
57use backoff::{exponential::ExponentialBackoff, SystemClock};
58use ic_certification::{Certificate, Delegation, Label};
59use ic_transport_types::{
60    signed::{SignedQuery, SignedRequestStatus, SignedUpdate},
61    QueryResponse, ReadStateResponse, SubnetMetrics, TransportCallResponse,
62};
63use serde::Serialize;
64use status::Status;
65use std::{
66    borrow::Cow,
67    collections::HashMap,
68    convert::TryFrom,
69    fmt::{self, Debug},
70    future::{Future, IntoFuture},
71    pin::Pin,
72    str::FromStr,
73    sync::{Arc, Mutex, RwLock},
74    task::{Context, Poll},
75    time::Duration,
76};
77
78use crate::agent::response_authentication::lookup_api_boundary_nodes;
79
80const IC_STATE_ROOT_DOMAIN_SEPARATOR: &[u8; 14] = b"\x0Dic-state-root";
81
82const IC_ROOT_KEY: &[u8; 133] = b"\x30\x81\x82\x30\x1d\x06\x0d\x2b\x06\x01\x04\x01\x82\xdc\x7c\x05\x03\x01\x02\x01\x06\x0c\x2b\x06\x01\x04\x01\x82\xdc\x7c\x05\x03\x02\x01\x03\x61\x00\x81\x4c\x0e\x6e\xc7\x1f\xab\x58\x3b\x08\xbd\x81\x37\x3c\x25\x5c\x3c\x37\x1b\x2e\x84\x86\x3c\x98\xa4\xf1\xe0\x8b\x74\x23\x5d\x14\xfb\x5d\x9c\x0c\xd5\x46\xd9\x68\x5f\x91\x3a\x0c\x0b\x2c\xc5\x34\x15\x83\xbf\x4b\x43\x92\xe4\x67\xdb\x96\xd6\x5b\x9b\xb4\xcb\x71\x71\x12\xf8\x47\x2e\x0d\x5a\x4d\x14\x50\x5f\xfd\x74\x84\xb0\x12\x91\x09\x1c\x5f\x87\xb9\x88\x83\x46\x3f\x98\x09\x1a\x0b\xaa\xae";
83
84#[cfg(not(target_family = "wasm"))]
85type AgentFuture<'a, V> = Pin<Box<dyn Future<Output = Result<V, AgentError>> + Send + 'a>>;
86
87#[cfg(target_family = "wasm")]
88type AgentFuture<'a, V> = Pin<Box<dyn Future<Output = Result<V, AgentError>> + 'a>>;
89
90/// A low level Agent to make calls to a Replica endpoint.
91///
92/// ```ignore
93/// # // This test is ignored because it requires an ic to be running. We run these
94/// # // in the ic-ref workflow.
95/// use ic_agent::{Agent, export::Principal};
96/// use candid::{Encode, Decode, CandidType, Nat};
97/// use serde::Deserialize;
98///
99/// #[derive(CandidType)]
100/// struct Argument {
101///   amount: Option<Nat>,
102/// }
103///
104/// #[derive(CandidType, Deserialize)]
105/// struct CreateCanisterResult {
106///   canister_id: Principal,
107/// }
108///
109/// # fn create_identity() -> impl ic_agent::Identity {
110/// #     // In real code, the raw key should be either read from a pem file or generated with randomness.
111/// #     ic_agent::identity::BasicIdentity::from_raw_key(&[0u8;32])
112/// # }
113/// #
114/// async fn create_a_canister() -> Result<Principal, Box<dyn std::error::Error>> {
115/// # let url = format!("http://localhost:{}", option_env!("IC_REF_PORT").unwrap_or("4943"));
116///   let agent = Agent::builder()
117///     .with_url(url)
118///     .with_identity(create_identity())
119///     .build()?;
120///
121///   // Only do the following call when not contacting the IC main net (e.g. a local emulator).
122///   // This is important as the main net public key is static and a rogue network could return
123///   // a different key.
124///   // If you know the root key ahead of time, you can use `agent.set_root_key(root_key);`.
125///   agent.fetch_root_key().await?;
126///   let management_canister_id = Principal::from_text("aaaaa-aa")?;
127///
128///   // Create a call to the management canister to create a new canister ID,
129///   // and wait for a result.
130///   // The effective canister id must belong to the canister ranges of the subnet at which the canister is created.
131///   let effective_canister_id = Principal::from_text("rwlgt-iiaaa-aaaaa-aaaaa-cai").unwrap();
132///   let response = agent.update(&management_canister_id, "provisional_create_canister_with_cycles")
133///     .with_effective_canister_id(effective_canister_id)
134///     .with_arg(Encode!(&Argument { amount: None })?)
135///     .await?;
136///
137///   let result = Decode!(response.as_slice(), CreateCanisterResult)?;
138///   let canister_id: Principal = Principal::from_text(&result.canister_id.to_text())?;
139///   Ok(canister_id)
140/// }
141///
142/// # let mut runtime = tokio::runtime::Runtime::new().unwrap();
143/// # runtime.block_on(async {
144/// let canister_id = create_a_canister().await.unwrap();
145/// eprintln!("{}", canister_id);
146/// # });
147/// ```
148///
149/// This agent does not understand Candid, and only acts on byte buffers.
150///
151/// Some methods return certificates. While there is a `verify_certificate` method, any certificate
152/// you receive from a method has already been verified and you do not need to manually verify it.
153#[derive(Clone)]
154pub struct Agent {
155    nonce_factory: Arc<dyn NonceGenerator>,
156    identity: Arc<dyn Identity>,
157    ingress_expiry: Duration,
158    root_key: Arc<RwLock<Vec<u8>>>,
159    client: Arc<dyn HttpService>,
160    route_provider: Arc<dyn RouteProvider>,
161    subnet_key_cache: Arc<Mutex<SubnetCache>>,
162    concurrent_requests_semaphore: Arc<Semaphore>,
163    verify_query_signatures: bool,
164    max_response_body_size: Option<usize>,
165    max_polling_time: Duration,
166    #[allow(dead_code)]
167    max_tcp_error_retries: usize,
168}
169
170impl fmt::Debug for Agent {
171    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
172        f.debug_struct("Agent")
173            .field("ingress_expiry", &self.ingress_expiry)
174            .finish_non_exhaustive()
175    }
176}
177
178impl Agent {
179    /// Create an instance of an [`AgentBuilder`] for building an [`Agent`]. This is simpler than
180    /// using the [`AgentConfig`] and [`Agent::new()`].
181    pub fn builder() -> builder::AgentBuilder {
182        Default::default()
183    }
184
185    /// Create an instance of an [`Agent`].
186    pub fn new(config: agent_config::AgentConfig) -> Result<Agent, AgentError> {
187        let client = config.http_service.unwrap_or_else(|| {
188            Arc::new(Retry429Logic {
189                client: config.client.unwrap_or_else(|| {
190                    #[cfg(not(target_family = "wasm"))]
191                    {
192                        Client::builder()
193                            .use_rustls_tls()
194                            .timeout(Duration::from_secs(360))
195                            .build()
196                            .expect("Could not create HTTP client.")
197                    }
198                    #[cfg(all(target_family = "wasm", feature = "wasm-bindgen"))]
199                    {
200                        Client::new()
201                    }
202                }),
203            })
204        });
205        Ok(Agent {
206            nonce_factory: config.nonce_factory,
207            identity: config.identity,
208            ingress_expiry: config.ingress_expiry,
209            root_key: Arc::new(RwLock::new(IC_ROOT_KEY.to_vec())),
210            client: client.clone(),
211            route_provider: if let Some(route_provider) = config.route_provider {
212                route_provider
213            } else if let Some(url) = config.url {
214                if config.background_dynamic_routing {
215                    assert!(
216                        url.scheme() == "https" && url.path() == "/" && url.port().is_none() && url.domain().is_some(),
217                        "in dynamic routing mode, URL must be in the exact form https://domain with no path, port, IP, or non-HTTPS scheme"
218                    );
219                    let seeds = vec![Node::new(url.domain().unwrap()).unwrap()];
220                    UrlUntilReady::new(url, async move {
221                        DynamicRouteProviderBuilder::new(
222                            LatencyRoutingSnapshot::new(),
223                            seeds,
224                            client,
225                        )
226                        .build()
227                        .await
228                    }) as Arc<dyn RouteProvider>
229                } else {
230                    Arc::new(url)
231                }
232            } else {
233                panic!("either route_provider or url must be specified");
234            },
235            subnet_key_cache: Arc::new(Mutex::new(SubnetCache::new())),
236            verify_query_signatures: config.verify_query_signatures,
237            concurrent_requests_semaphore: Arc::new(Semaphore::new(config.max_concurrent_requests)),
238            max_response_body_size: config.max_response_body_size,
239            max_tcp_error_retries: config.max_tcp_error_retries,
240            max_polling_time: config.max_polling_time,
241        })
242    }
243
244    /// Set the identity provider for signing messages.
245    ///
246    /// NOTE: if you change the identity while having update calls in
247    /// flight, you will not be able to [`Agent::request_status_raw`] the status of these
248    /// messages.
249    pub fn set_identity<I>(&mut self, identity: I)
250    where
251        I: 'static + Identity,
252    {
253        self.identity = Arc::new(identity);
254    }
255    /// Set the arc identity provider for signing messages.
256    ///
257    /// NOTE: if you change the identity while having update calls in
258    /// flight, you will not be able to [`Agent::request_status_raw`] the status of these
259    /// messages.
260    pub fn set_arc_identity(&mut self, identity: Arc<dyn Identity>) {
261        self.identity = identity;
262    }
263
264    /// By default, the agent is configured to talk to the main Internet Computer, and verifies
265    /// responses using a hard-coded public key.
266    ///
267    /// This function will instruct the agent to ask the endpoint for its public key, and use
268    /// that instead. This is required when talking to a local test instance, for example.
269    ///
270    /// *Only use this when you are  _not_ talking to the main Internet Computer, otherwise
271    /// you are prone to man-in-the-middle attacks! Do not call this function by default.*
272    pub async fn fetch_root_key(&self) -> Result<(), AgentError> {
273        if self.read_root_key()[..] != IC_ROOT_KEY[..] {
274            // already fetched the root key
275            return Ok(());
276        }
277        let status = self.status().await?;
278        let Some(root_key) = status.root_key else {
279            return Err(AgentError::NoRootKeyInStatus(status));
280        };
281        self.set_root_key(root_key);
282        Ok(())
283    }
284
285    /// By default, the agent is configured to talk to the main Internet Computer, and verifies
286    /// responses using a hard-coded public key.
287    ///
288    /// Using this function you can set the root key to a known one if you know if beforehand.
289    pub fn set_root_key(&self, root_key: Vec<u8>) {
290        *self.root_key.write().unwrap() = root_key;
291    }
292
293    /// Return the root key currently in use.
294    pub fn read_root_key(&self) -> Vec<u8> {
295        self.root_key.read().unwrap().clone()
296    }
297
298    fn get_expiry_date(&self) -> u64 {
299        let expiry_raw = OffsetDateTime::now_utc() + self.ingress_expiry;
300        let mut rounded = expiry_raw.replace_nanosecond(0).unwrap();
301        if self.ingress_expiry.as_secs() > 90 {
302            rounded = rounded.replace_second(0).unwrap();
303        }
304        rounded.unix_timestamp_nanos().try_into().unwrap()
305    }
306
307    /// Return the principal of the identity.
308    pub fn get_principal(&self) -> Result<Principal, String> {
309        self.identity.sender()
310    }
311
312    async fn query_endpoint<A>(
313        &self,
314        effective_canister_id: Principal,
315        serialized_bytes: Vec<u8>,
316    ) -> Result<A, AgentError>
317    where
318        A: serde::de::DeserializeOwned,
319    {
320        let _permit = self.concurrent_requests_semaphore.acquire().await;
321        let bytes = self
322            .execute(
323                Method::POST,
324                &format!("api/v2/canister/{}/query", effective_canister_id.to_text()),
325                Some(serialized_bytes),
326            )
327            .await?
328            .1;
329        serde_cbor::from_slice(&bytes).map_err(AgentError::InvalidCborData)
330    }
331
332    async fn read_state_endpoint<A>(
333        &self,
334        effective_canister_id: Principal,
335        serialized_bytes: Vec<u8>,
336    ) -> Result<A, AgentError>
337    where
338        A: serde::de::DeserializeOwned,
339    {
340        let _permit = self.concurrent_requests_semaphore.acquire().await;
341        let endpoint = format!(
342            "api/v2/canister/{}/read_state",
343            effective_canister_id.to_text()
344        );
345        let bytes = self
346            .execute(Method::POST, &endpoint, Some(serialized_bytes))
347            .await?
348            .1;
349        serde_cbor::from_slice(&bytes).map_err(AgentError::InvalidCborData)
350    }
351
352    async fn read_subnet_state_endpoint<A>(
353        &self,
354        subnet_id: Principal,
355        serialized_bytes: Vec<u8>,
356    ) -> Result<A, AgentError>
357    where
358        A: serde::de::DeserializeOwned,
359    {
360        let _permit = self.concurrent_requests_semaphore.acquire().await;
361        let endpoint = format!("api/v2/subnet/{}/read_state", subnet_id.to_text());
362        let bytes = self
363            .execute(Method::POST, &endpoint, Some(serialized_bytes))
364            .await?
365            .1;
366        serde_cbor::from_slice(&bytes).map_err(AgentError::InvalidCborData)
367    }
368
369    async fn call_endpoint(
370        &self,
371        effective_canister_id: Principal,
372        serialized_bytes: Vec<u8>,
373    ) -> Result<TransportCallResponse, AgentError> {
374        let _permit = self.concurrent_requests_semaphore.acquire().await;
375        let endpoint = format!("api/v3/canister/{}/call", effective_canister_id.to_text());
376        let (status_code, response_body) = self
377            .execute(Method::POST, &endpoint, Some(serialized_bytes))
378            .await?;
379
380        if status_code == StatusCode::ACCEPTED {
381            return Ok(TransportCallResponse::Accepted);
382        }
383
384        serde_cbor::from_slice(&response_body).map_err(AgentError::InvalidCborData)
385    }
386
387    /// The simplest way to do a query call; sends a byte array and will return a byte vector.
388    /// The encoding is left as an exercise to the user.
389    #[allow(clippy::too_many_arguments)]
390    async fn query_raw(
391        &self,
392        canister_id: Principal,
393        effective_canister_id: Principal,
394        method_name: String,
395        arg: Vec<u8>,
396        ingress_expiry_datetime: Option<u64>,
397        use_nonce: bool,
398        explicit_verify_query_signatures: Option<bool>,
399    ) -> Result<Vec<u8>, AgentError> {
400        let operation = Operation::Call {
401            canister: canister_id,
402            method: method_name.clone(),
403        };
404        let content = self.query_content(
405            canister_id,
406            method_name,
407            arg,
408            ingress_expiry_datetime,
409            use_nonce,
410        )?;
411        let serialized_bytes = sign_envelope(&content, self.identity.clone())?;
412        self.query_inner(
413            effective_canister_id,
414            serialized_bytes,
415            content.to_request_id(),
416            explicit_verify_query_signatures,
417            operation,
418        )
419        .await
420    }
421
422    /// Send the signed query to the network. Will return a byte vector.
423    /// The bytes will be checked if it is a valid query.
424    /// If you want to inspect the fields of the query call, use [`signed_query_inspect`] before calling this method.
425    pub async fn query_signed(
426        &self,
427        effective_canister_id: Principal,
428        signed_query: Vec<u8>,
429    ) -> Result<Vec<u8>, AgentError> {
430        let envelope: Envelope =
431            serde_cbor::from_slice(&signed_query).map_err(AgentError::InvalidCborData)?;
432        let EnvelopeContent::Query {
433            canister_id,
434            method_name,
435            ..
436        } = &*envelope.content
437        else {
438            return Err(AgentError::CallDataMismatch {
439                field: "request_type".to_string(),
440                value_arg: "query".to_string(),
441                value_cbor: if matches!(*envelope.content, EnvelopeContent::Call { .. }) {
442                    "update"
443                } else {
444                    "read_state"
445                }
446                .to_string(),
447            });
448        };
449        let operation = Operation::Call {
450            canister: *canister_id,
451            method: method_name.clone(),
452        };
453        self.query_inner(
454            effective_canister_id,
455            signed_query,
456            envelope.content.to_request_id(),
457            None,
458            operation,
459        )
460        .await
461    }
462
463    /// Helper function for performing both the query call and possibly a `read_state` to check the subnet node keys.
464    ///
465    /// This should be used instead of `query_endpoint`. No validation is performed on `signed_query`.
466    async fn query_inner(
467        &self,
468        effective_canister_id: Principal,
469        signed_query: Vec<u8>,
470        request_id: RequestId,
471        explicit_verify_query_signatures: Option<bool>,
472        operation: Operation,
473    ) -> Result<Vec<u8>, AgentError> {
474        let response = if explicit_verify_query_signatures.unwrap_or(self.verify_query_signatures) {
475            let (response, mut subnet) = futures_util::try_join!(
476                self.query_endpoint::<QueryResponse>(effective_canister_id, signed_query),
477                self.get_subnet_by_canister(&effective_canister_id)
478            )?;
479            if response.signatures().is_empty() {
480                return Err(AgentError::MissingSignature);
481            } else if response.signatures().len() > subnet.node_keys.len() {
482                return Err(AgentError::TooManySignatures {
483                    had: response.signatures().len(),
484                    needed: subnet.node_keys.len(),
485                });
486            }
487            for signature in response.signatures() {
488                if OffsetDateTime::now_utc()
489                    - OffsetDateTime::from_unix_timestamp_nanos(signature.timestamp.into()).unwrap()
490                    > self.ingress_expiry
491                {
492                    return Err(AgentError::CertificateOutdated(self.ingress_expiry));
493                }
494                let signable = response.signable(request_id, signature.timestamp);
495                let node_key = if let Some(node_key) = subnet.node_keys.get(&signature.identity) {
496                    node_key
497                } else {
498                    subnet = self
499                        .fetch_subnet_by_canister(&effective_canister_id)
500                        .await?;
501                    subnet
502                        .node_keys
503                        .get(&signature.identity)
504                        .ok_or(AgentError::CertificateNotAuthorized())?
505                };
506                if node_key.len() != 44 {
507                    return Err(AgentError::DerKeyLengthMismatch {
508                        expected: 44,
509                        actual: node_key.len(),
510                    });
511                }
512                const DER_PREFIX: [u8; 12] = [48, 42, 48, 5, 6, 3, 43, 101, 112, 3, 33, 0];
513                if node_key[..12] != DER_PREFIX {
514                    return Err(AgentError::DerPrefixMismatch {
515                        expected: DER_PREFIX.to_vec(),
516                        actual: node_key[..12].to_vec(),
517                    });
518                }
519                let pubkey = PublicKey::deserialize_raw(&node_key[12..])
520                    .map_err(|_| AgentError::MalformedPublicKey)?;
521
522                match pubkey.verify_signature(&signable, &signature.signature[..]) {
523                    Ok(()) => (),
524                    Err(SignatureError::InvalidSignature) => {
525                        return Err(AgentError::QuerySignatureVerificationFailed)
526                    }
527                    Err(SignatureError::InvalidLength) => {
528                        return Err(AgentError::MalformedSignature)
529                    }
530                    _ => unreachable!(),
531                }
532            }
533            response
534        } else {
535            self.query_endpoint::<QueryResponse>(effective_canister_id, signed_query)
536                .await?
537        };
538
539        match response {
540            QueryResponse::Replied { reply, .. } => Ok(reply.arg),
541            QueryResponse::Rejected { reject, .. } => Err(AgentError::UncertifiedReject {
542                reject,
543                operation: Some(operation),
544            }),
545        }
546    }
547
548    fn query_content(
549        &self,
550        canister_id: Principal,
551        method_name: String,
552        arg: Vec<u8>,
553        ingress_expiry_datetime: Option<u64>,
554        use_nonce: bool,
555    ) -> Result<EnvelopeContent, AgentError> {
556        Ok(EnvelopeContent::Query {
557            sender: self.identity.sender().map_err(AgentError::SigningError)?,
558            canister_id,
559            method_name,
560            arg,
561            ingress_expiry: ingress_expiry_datetime.unwrap_or_else(|| self.get_expiry_date()),
562            nonce: use_nonce.then(|| self.nonce_factory.generate()).flatten(),
563        })
564    }
565
566    /// The simplest way to do an update call; sends a byte array and will return a response, [`CallResponse`], from the replica.
567    async fn update_raw(
568        &self,
569        canister_id: Principal,
570        effective_canister_id: Principal,
571        method_name: String,
572        arg: Vec<u8>,
573        ingress_expiry_datetime: Option<u64>,
574    ) -> Result<CallResponse<(Vec<u8>, Certificate)>, AgentError> {
575        let nonce = self.nonce_factory.generate();
576        let content = self.update_content(
577            canister_id,
578            method_name.clone(),
579            arg,
580            ingress_expiry_datetime,
581            nonce,
582        )?;
583        let operation = Some(Operation::Call {
584            canister: canister_id,
585            method: method_name,
586        });
587        let request_id = to_request_id(&content)?;
588        let serialized_bytes = sign_envelope(&content, self.identity.clone())?;
589
590        let response_body = self
591            .call_endpoint(effective_canister_id, serialized_bytes)
592            .await?;
593
594        match response_body {
595            TransportCallResponse::Replied { certificate } => {
596                let certificate =
597                    serde_cbor::from_slice(&certificate).map_err(AgentError::InvalidCborData)?;
598
599                self.verify(&certificate, effective_canister_id)?;
600                let status = lookup_request_status(&certificate, &request_id)?;
601
602                match status {
603                    RequestStatusResponse::Replied(reply) => {
604                        Ok(CallResponse::Response((reply.arg, certificate)))
605                    }
606                    RequestStatusResponse::Rejected(reject_response) => {
607                        Err(AgentError::CertifiedReject {
608                            reject: reject_response,
609                            operation,
610                        })?
611                    }
612                    _ => Ok(CallResponse::Poll(request_id)),
613                }
614            }
615            TransportCallResponse::Accepted => Ok(CallResponse::Poll(request_id)),
616            TransportCallResponse::NonReplicatedRejection(reject_response) => {
617                Err(AgentError::UncertifiedReject {
618                    reject: reject_response,
619                    operation,
620                })
621            }
622        }
623    }
624
625    /// Send the signed update to the network. Will return a [`CallResponse<Vec<u8>>`].
626    /// The bytes will be checked to verify that it is a valid update.
627    /// If you want to inspect the fields of the update, use [`signed_update_inspect`] before calling this method.
628    pub async fn update_signed(
629        &self,
630        effective_canister_id: Principal,
631        signed_update: Vec<u8>,
632    ) -> Result<CallResponse<Vec<u8>>, AgentError> {
633        let envelope: Envelope =
634            serde_cbor::from_slice(&signed_update).map_err(AgentError::InvalidCborData)?;
635        let EnvelopeContent::Call {
636            canister_id,
637            method_name,
638            ..
639        } = &*envelope.content
640        else {
641            return Err(AgentError::CallDataMismatch {
642                field: "request_type".to_string(),
643                value_arg: "update".to_string(),
644                value_cbor: if matches!(*envelope.content, EnvelopeContent::Query { .. }) {
645                    "query"
646                } else {
647                    "read_state"
648                }
649                .to_string(),
650            });
651        };
652        let operation = Some(Operation::Call {
653            canister: *canister_id,
654            method: method_name.clone(),
655        });
656        let request_id = to_request_id(&envelope.content)?;
657
658        let response_body = self
659            .call_endpoint(effective_canister_id, signed_update)
660            .await?;
661
662        match response_body {
663            TransportCallResponse::Replied { certificate } => {
664                let certificate =
665                    serde_cbor::from_slice(&certificate).map_err(AgentError::InvalidCborData)?;
666
667                self.verify(&certificate, effective_canister_id)?;
668                let status = lookup_request_status(&certificate, &request_id)?;
669
670                match status {
671                    RequestStatusResponse::Replied(reply) => Ok(CallResponse::Response(reply.arg)),
672                    RequestStatusResponse::Rejected(reject_response) => {
673                        Err(AgentError::CertifiedReject {
674                            reject: reject_response,
675                            operation,
676                        })?
677                    }
678                    _ => Ok(CallResponse::Poll(request_id)),
679                }
680            }
681            TransportCallResponse::Accepted => Ok(CallResponse::Poll(request_id)),
682            TransportCallResponse::NonReplicatedRejection(reject_response) => {
683                Err(AgentError::UncertifiedReject {
684                    reject: reject_response,
685                    operation,
686                })
687            }
688        }
689    }
690
691    fn update_content(
692        &self,
693        canister_id: Principal,
694        method_name: String,
695        arg: Vec<u8>,
696        ingress_expiry_datetime: Option<u64>,
697        nonce: Option<Vec<u8>>,
698    ) -> Result<EnvelopeContent, AgentError> {
699        Ok(EnvelopeContent::Call {
700            canister_id,
701            method_name,
702            arg,
703            nonce,
704            sender: self.identity.sender().map_err(AgentError::SigningError)?,
705            ingress_expiry: ingress_expiry_datetime.unwrap_or_else(|| self.get_expiry_date()),
706        })
707    }
708
709    fn get_retry_policy(&self) -> ExponentialBackoff<SystemClock> {
710        ExponentialBackoffBuilder::new()
711            .with_initial_interval(Duration::from_millis(500))
712            .with_max_interval(Duration::from_secs(1))
713            .with_multiplier(1.4)
714            .with_max_elapsed_time(Some(self.max_polling_time))
715            .build()
716    }
717
718    /// Wait for `request_status` to return a Replied response and return the arg.
719    pub async fn wait_signed(
720        &self,
721        request_id: &RequestId,
722        effective_canister_id: Principal,
723        signed_request_status: Vec<u8>,
724    ) -> Result<(Vec<u8>, Certificate), AgentError> {
725        let mut retry_policy = self.get_retry_policy();
726
727        let mut request_accepted = false;
728        loop {
729            let (resp, cert) = self
730                .request_status_signed(
731                    request_id,
732                    effective_canister_id,
733                    signed_request_status.clone(),
734                )
735                .await?;
736            match resp {
737                RequestStatusResponse::Unknown => {}
738
739                RequestStatusResponse::Received | RequestStatusResponse::Processing => {
740                    if !request_accepted {
741                        retry_policy.reset();
742                        request_accepted = true;
743                    }
744                }
745
746                RequestStatusResponse::Replied(ReplyResponse { arg, .. }) => {
747                    return Ok((arg, cert))
748                }
749
750                RequestStatusResponse::Rejected(response) => {
751                    return Err(AgentError::CertifiedReject {
752                        reject: response,
753                        operation: None,
754                    })
755                }
756
757                RequestStatusResponse::Done => {
758                    return Err(AgentError::RequestStatusDoneNoReply(String::from(
759                        *request_id,
760                    )))
761                }
762            };
763
764            match retry_policy.next_backoff() {
765                Some(duration) => crate::util::sleep(duration).await,
766
767                None => return Err(AgentError::TimeoutWaitingForResponse()),
768            }
769        }
770    }
771
772    /// Call `request_status` on the `RequestId` in a loop and return the response as a byte vector.
773    pub async fn wait(
774        &self,
775        request_id: &RequestId,
776        effective_canister_id: Principal,
777    ) -> Result<(Vec<u8>, Certificate), AgentError> {
778        self.wait_inner(request_id, effective_canister_id, None)
779            .await
780    }
781
782    async fn wait_inner(
783        &self,
784        request_id: &RequestId,
785        effective_canister_id: Principal,
786        operation: Option<Operation>,
787    ) -> Result<(Vec<u8>, Certificate), AgentError> {
788        let mut retry_policy = self.get_retry_policy();
789
790        let mut request_accepted = false;
791        loop {
792            let (resp, cert) = self
793                .request_status_raw(request_id, effective_canister_id)
794                .await?;
795            match resp {
796                RequestStatusResponse::Unknown => {}
797
798                RequestStatusResponse::Received | RequestStatusResponse::Processing => {
799                    if !request_accepted {
800                        // The system will return RequestStatusResponse::Unknown
801                        // until the request is accepted
802                        // and we generally cannot know how long that will take.
803                        // State transitions between Received and Processing may be
804                        // instantaneous. Therefore, once we know the request is accepted,
805                        // we should restart the backoff so the request does not time out.
806
807                        retry_policy.reset();
808                        request_accepted = true;
809                    }
810                }
811
812                RequestStatusResponse::Replied(ReplyResponse { arg, .. }) => {
813                    return Ok((arg, cert))
814                }
815
816                RequestStatusResponse::Rejected(response) => {
817                    return Err(AgentError::CertifiedReject {
818                        reject: response,
819                        operation,
820                    })
821                }
822
823                RequestStatusResponse::Done => {
824                    return Err(AgentError::RequestStatusDoneNoReply(String::from(
825                        *request_id,
826                    )))
827                }
828            };
829
830            match retry_policy.next_backoff() {
831                Some(duration) => crate::util::sleep(duration).await,
832
833                None => return Err(AgentError::TimeoutWaitingForResponse()),
834            }
835        }
836    }
837
838    /// Request the raw state tree directly, under an effective canister ID.
839    /// See [the protocol docs](https://internetcomputer.org/docs/current/references/ic-interface-spec#http-read-state) for more information.
840    pub async fn read_state_raw(
841        &self,
842        paths: Vec<Vec<Label>>,
843        effective_canister_id: Principal,
844    ) -> Result<Certificate, AgentError> {
845        let content = self.read_state_content(paths)?;
846        let serialized_bytes = sign_envelope(&content, self.identity.clone())?;
847
848        let read_state_response: ReadStateResponse = self
849            .read_state_endpoint(effective_canister_id, serialized_bytes)
850            .await?;
851        let cert: Certificate = serde_cbor::from_slice(&read_state_response.certificate)
852            .map_err(AgentError::InvalidCborData)?;
853        self.verify(&cert, effective_canister_id)?;
854        Ok(cert)
855    }
856
857    /// Request the raw state tree directly, under a subnet ID.
858    /// See [the protocol docs](https://internetcomputer.org/docs/current/references/ic-interface-spec#http-read-state) for more information.
859    pub async fn read_subnet_state_raw(
860        &self,
861        paths: Vec<Vec<Label>>,
862        subnet_id: Principal,
863    ) -> Result<Certificate, AgentError> {
864        let content = self.read_state_content(paths)?;
865        let serialized_bytes = sign_envelope(&content, self.identity.clone())?;
866
867        let read_state_response: ReadStateResponse = self
868            .read_subnet_state_endpoint(subnet_id, serialized_bytes)
869            .await?;
870        let cert: Certificate = serde_cbor::from_slice(&read_state_response.certificate)
871            .map_err(AgentError::InvalidCborData)?;
872        self.verify_for_subnet(&cert, subnet_id)?;
873        Ok(cert)
874    }
875
876    fn read_state_content(&self, paths: Vec<Vec<Label>>) -> Result<EnvelopeContent, AgentError> {
877        Ok(EnvelopeContent::ReadState {
878            sender: self.identity.sender().map_err(AgentError::SigningError)?,
879            paths,
880            ingress_expiry: self.get_expiry_date(),
881        })
882    }
883
884    /// Verify a certificate, checking delegation if present.
885    /// Only passes if the certificate also has authority over the canister.
886    pub fn verify(
887        &self,
888        cert: &Certificate,
889        effective_canister_id: Principal,
890    ) -> Result<(), AgentError> {
891        self.verify_cert(cert, effective_canister_id)?;
892        self.verify_cert_timestamp(cert)?;
893        Ok(())
894    }
895
896    fn verify_cert(
897        &self,
898        cert: &Certificate,
899        effective_canister_id: Principal,
900    ) -> Result<(), AgentError> {
901        let sig = &cert.signature;
902
903        let root_hash = cert.tree.digest();
904        let mut msg = vec![];
905        msg.extend_from_slice(IC_STATE_ROOT_DOMAIN_SEPARATOR);
906        msg.extend_from_slice(&root_hash);
907
908        let der_key = self.check_delegation(&cert.delegation, effective_canister_id)?;
909        let key = extract_der(der_key)?;
910
911        ic_verify_bls_signature::verify_bls_signature(sig, &msg, &key)
912            .map_err(|_| AgentError::CertificateVerificationFailed())?;
913        Ok(())
914    }
915
916    /// Verify a certificate, checking delegation if present.
917    /// Only passes if the certificate is for the specified subnet.
918    pub fn verify_for_subnet(
919        &self,
920        cert: &Certificate,
921        subnet_id: Principal,
922    ) -> Result<(), AgentError> {
923        self.verify_cert_for_subnet(cert, subnet_id)?;
924        self.verify_cert_timestamp(cert)?;
925        Ok(())
926    }
927
928    fn verify_cert_for_subnet(
929        &self,
930        cert: &Certificate,
931        subnet_id: Principal,
932    ) -> Result<(), AgentError> {
933        let sig = &cert.signature;
934
935        let root_hash = cert.tree.digest();
936        let mut msg = vec![];
937        msg.extend_from_slice(IC_STATE_ROOT_DOMAIN_SEPARATOR);
938        msg.extend_from_slice(&root_hash);
939
940        let der_key = self.check_delegation_for_subnet(&cert.delegation, subnet_id)?;
941        let key = extract_der(der_key)?;
942
943        ic_verify_bls_signature::verify_bls_signature(sig, &msg, &key)
944            .map_err(|_| AgentError::CertificateVerificationFailed())?;
945        Ok(())
946    }
947
948    fn verify_cert_timestamp(&self, cert: &Certificate) -> Result<(), AgentError> {
949        let time = lookup_time(cert)?;
950        if (OffsetDateTime::now_utc()
951            - OffsetDateTime::from_unix_timestamp_nanos(time.into()).unwrap())
952        .abs()
953            > self.ingress_expiry
954        {
955            Err(AgentError::CertificateOutdated(self.ingress_expiry))
956        } else {
957            Ok(())
958        }
959    }
960
961    fn check_delegation(
962        &self,
963        delegation: &Option<Delegation>,
964        effective_canister_id: Principal,
965    ) -> Result<Vec<u8>, AgentError> {
966        match delegation {
967            None => Ok(self.read_root_key()),
968            Some(delegation) => {
969                let cert: Certificate = serde_cbor::from_slice(&delegation.certificate)
970                    .map_err(AgentError::InvalidCborData)?;
971                if cert.delegation.is_some() {
972                    return Err(AgentError::CertificateHasTooManyDelegations);
973                }
974                self.verify_cert(&cert, effective_canister_id)?;
975                let canister_range_lookup = [
976                    "subnet".as_bytes(),
977                    delegation.subnet_id.as_ref(),
978                    "canister_ranges".as_bytes(),
979                ];
980                let canister_range = lookup_value(&cert.tree, canister_range_lookup)?;
981                let ranges: Vec<(Principal, Principal)> =
982                    serde_cbor::from_slice(canister_range).map_err(AgentError::InvalidCborData)?;
983                if !principal_is_within_ranges(&effective_canister_id, &ranges[..]) {
984                    // the certificate is not authorized to answer calls for this canister
985                    return Err(AgentError::CertificateNotAuthorized());
986                }
987
988                let public_key_path = [
989                    "subnet".as_bytes(),
990                    delegation.subnet_id.as_ref(),
991                    "public_key".as_bytes(),
992                ];
993                lookup_value(&cert.tree, public_key_path).map(<[u8]>::to_vec)
994            }
995        }
996    }
997
998    fn check_delegation_for_subnet(
999        &self,
1000        delegation: &Option<Delegation>,
1001        subnet_id: Principal,
1002    ) -> Result<Vec<u8>, AgentError> {
1003        match delegation {
1004            None => Ok(self.read_root_key()),
1005            Some(delegation) => {
1006                let cert: Certificate = serde_cbor::from_slice(&delegation.certificate)
1007                    .map_err(AgentError::InvalidCborData)?;
1008                if cert.delegation.is_some() {
1009                    return Err(AgentError::CertificateHasTooManyDelegations);
1010                }
1011                self.verify_cert_for_subnet(&cert, subnet_id)?;
1012                let public_key_path = [
1013                    "subnet".as_bytes(),
1014                    delegation.subnet_id.as_ref(),
1015                    "public_key".as_bytes(),
1016                ];
1017                let pk = lookup_value(&cert.tree, public_key_path)
1018                    .map_err(|_| AgentError::CertificateNotAuthorized())?
1019                    .to_vec();
1020                Ok(pk)
1021            }
1022        }
1023    }
1024
1025    /// Request information about a particular canister for a single state subkey.
1026    /// See [the protocol docs](https://internetcomputer.org/docs/current/references/ic-interface-spec#state-tree-canister-information) for more information.
1027    pub async fn read_state_canister_info(
1028        &self,
1029        canister_id: Principal,
1030        path: &str,
1031    ) -> Result<Vec<u8>, AgentError> {
1032        let paths: Vec<Vec<Label>> = vec![vec![
1033            "canister".into(),
1034            Label::from_bytes(canister_id.as_slice()),
1035            path.into(),
1036        ]];
1037
1038        let cert = self.read_state_raw(paths, canister_id).await?;
1039
1040        lookup_canister_info(cert, canister_id, path)
1041    }
1042
1043    /// Request the controller list of a given canister.
1044    pub async fn read_state_canister_controllers(
1045        &self,
1046        canister_id: Principal,
1047    ) -> Result<Vec<Principal>, AgentError> {
1048        let blob = self
1049            .read_state_canister_info(canister_id, "controllers")
1050            .await?;
1051        let controllers: Vec<Principal> =
1052            serde_cbor::from_slice(&blob).map_err(AgentError::InvalidCborData)?;
1053        Ok(controllers)
1054    }
1055
1056    /// Request the module hash of a given canister.
1057    pub async fn read_state_canister_module_hash(
1058        &self,
1059        canister_id: Principal,
1060    ) -> Result<Vec<u8>, AgentError> {
1061        self.read_state_canister_info(canister_id, "module_hash")
1062            .await
1063    }
1064
1065    /// Request the bytes of the canister's custom section `icp:public <path>` or `icp:private <path>`.
1066    pub async fn read_state_canister_metadata(
1067        &self,
1068        canister_id: Principal,
1069        path: &str,
1070    ) -> Result<Vec<u8>, AgentError> {
1071        let paths: Vec<Vec<Label>> = vec![vec![
1072            "canister".into(),
1073            Label::from_bytes(canister_id.as_slice()),
1074            "metadata".into(),
1075            path.into(),
1076        ]];
1077
1078        let cert = self.read_state_raw(paths, canister_id).await?;
1079
1080        lookup_canister_metadata(cert, canister_id, path)
1081    }
1082
1083    /// Request a list of metrics about the subnet.
1084    pub async fn read_state_subnet_metrics(
1085        &self,
1086        subnet_id: Principal,
1087    ) -> Result<SubnetMetrics, AgentError> {
1088        let paths = vec![vec![
1089            "subnet".into(),
1090            Label::from_bytes(subnet_id.as_slice()),
1091            "metrics".into(),
1092        ]];
1093        let cert = self.read_subnet_state_raw(paths, subnet_id).await?;
1094        lookup_subnet_metrics(cert, subnet_id)
1095    }
1096
1097    /// Request a list of metrics about the subnet.
1098    pub async fn read_state_subnet_canister_ranges(
1099        &self,
1100        subnet_id: Principal,
1101    ) -> Result<Vec<(Principal, Principal)>, AgentError> {
1102        let paths = vec![vec![
1103            "subnet".into(),
1104            Label::from_bytes(subnet_id.as_slice()),
1105            "canister_ranges".into(),
1106        ]];
1107        let cert = self.read_subnet_state_raw(paths, subnet_id).await?;
1108        lookup_subnet_canister_ranges(cert, subnet_id)
1109    }
1110
1111    /// Fetches the status of a particular request by its ID.
1112    pub async fn request_status_raw(
1113        &self,
1114        request_id: &RequestId,
1115        effective_canister_id: Principal,
1116    ) -> Result<(RequestStatusResponse, Certificate), AgentError> {
1117        let paths: Vec<Vec<Label>> =
1118            vec![vec!["request_status".into(), request_id.to_vec().into()]];
1119
1120        let cert = self.read_state_raw(paths, effective_canister_id).await?;
1121
1122        Ok((lookup_request_status(&cert, request_id)?, cert))
1123    }
1124
1125    /// Send the signed `request_status` to the network. Will return [`RequestStatusResponse`].
1126    /// The bytes will be checked to verify that it is a valid `request_status`.
1127    /// If you want to inspect the fields of the `request_status`, use [`signed_request_status_inspect`] before calling this method.
1128    pub async fn request_status_signed(
1129        &self,
1130        request_id: &RequestId,
1131        effective_canister_id: Principal,
1132        signed_request_status: Vec<u8>,
1133    ) -> Result<(RequestStatusResponse, Certificate), AgentError> {
1134        let _envelope: Envelope =
1135            serde_cbor::from_slice(&signed_request_status).map_err(AgentError::InvalidCborData)?;
1136        let read_state_response: ReadStateResponse = self
1137            .read_state_endpoint(effective_canister_id, signed_request_status)
1138            .await?;
1139
1140        let cert: Certificate = serde_cbor::from_slice(&read_state_response.certificate)
1141            .map_err(AgentError::InvalidCborData)?;
1142        self.verify(&cert, effective_canister_id)?;
1143        Ok((lookup_request_status(&cert, request_id)?, cert))
1144    }
1145
1146    /// Returns an `UpdateBuilder` enabling the construction of an update call without
1147    /// passing all arguments.
1148    pub fn update<S: Into<String>>(
1149        &self,
1150        canister_id: &Principal,
1151        method_name: S,
1152    ) -> UpdateBuilder {
1153        UpdateBuilder::new(self, *canister_id, method_name.into())
1154    }
1155
1156    /// Calls and returns the information returned by the status endpoint of a replica.
1157    pub async fn status(&self) -> Result<Status, AgentError> {
1158        let endpoint = "api/v2/status";
1159        let bytes = self.execute(Method::GET, endpoint, None).await?.1;
1160
1161        let cbor: serde_cbor::Value =
1162            serde_cbor::from_slice(&bytes).map_err(AgentError::InvalidCborData)?;
1163
1164        Status::try_from(&cbor).map_err(|_| AgentError::InvalidReplicaStatus)
1165    }
1166
1167    /// Returns a `QueryBuilder` enabling the construction of a query call without
1168    /// passing all arguments.
1169    pub fn query<S: Into<String>>(&self, canister_id: &Principal, method_name: S) -> QueryBuilder {
1170        QueryBuilder::new(self, *canister_id, method_name.into())
1171    }
1172
1173    /// Sign a `request_status` call. This will return a [`signed::SignedRequestStatus`]
1174    /// which contains all fields of the `request_status` and the signed `request_status` in CBOR encoding
1175    pub fn sign_request_status(
1176        &self,
1177        effective_canister_id: Principal,
1178        request_id: RequestId,
1179    ) -> Result<SignedRequestStatus, AgentError> {
1180        let paths: Vec<Vec<Label>> =
1181            vec![vec!["request_status".into(), request_id.to_vec().into()]];
1182        let read_state_content = self.read_state_content(paths)?;
1183        let signed_request_status = sign_envelope(&read_state_content, self.identity.clone())?;
1184        let ingress_expiry = read_state_content.ingress_expiry();
1185        let sender = *read_state_content.sender();
1186        Ok(SignedRequestStatus {
1187            ingress_expiry,
1188            sender,
1189            effective_canister_id,
1190            request_id,
1191            signed_request_status,
1192        })
1193    }
1194
1195    async fn get_subnet_by_canister(
1196        &self,
1197        canister: &Principal,
1198    ) -> Result<Arc<Subnet>, AgentError> {
1199        let subnet = self
1200            .subnet_key_cache
1201            .lock()
1202            .unwrap()
1203            .get_subnet_by_canister(canister);
1204        if let Some(subnet) = subnet {
1205            Ok(subnet)
1206        } else {
1207            self.fetch_subnet_by_canister(canister).await
1208        }
1209    }
1210
1211    /// Retrieve all existing API boundary nodes from the state tree via endpoint `/api/v2/canister/<effective_canister_id>/read_state`
1212    pub async fn fetch_api_boundary_nodes_by_canister_id(
1213        &self,
1214        canister_id: Principal,
1215    ) -> Result<Vec<ApiBoundaryNode>, AgentError> {
1216        let paths = vec![vec!["api_boundary_nodes".into()]];
1217        let certificate = self.read_state_raw(paths, canister_id).await?;
1218        let api_boundary_nodes = lookup_api_boundary_nodes(certificate)?;
1219        Ok(api_boundary_nodes)
1220    }
1221
1222    /// Retrieve all existing API boundary nodes from the state tree via endpoint `/api/v2/subnet/<subnet_id>/read_state`
1223    pub async fn fetch_api_boundary_nodes_by_subnet_id(
1224        &self,
1225        subnet_id: Principal,
1226    ) -> Result<Vec<ApiBoundaryNode>, AgentError> {
1227        let paths = vec![vec!["api_boundary_nodes".into()]];
1228        let certificate = self.read_subnet_state_raw(paths, subnet_id).await?;
1229        let api_boundary_nodes = lookup_api_boundary_nodes(certificate)?;
1230        Ok(api_boundary_nodes)
1231    }
1232
1233    async fn fetch_subnet_by_canister(
1234        &self,
1235        canister: &Principal,
1236    ) -> Result<Arc<Subnet>, AgentError> {
1237        let cert = self
1238            .read_state_raw(vec![vec!["subnet".into()]], *canister)
1239            .await?;
1240
1241        let (subnet_id, subnet) = lookup_subnet(&cert, &self.root_key.read().unwrap())?;
1242        if !subnet.canister_ranges.contains(canister) {
1243            return Err(AgentError::CertificateNotAuthorized());
1244        }
1245        let subnet = Arc::new(subnet);
1246        self.subnet_key_cache
1247            .lock()
1248            .unwrap()
1249            .insert_subnet(subnet_id, subnet.clone());
1250        Ok(subnet)
1251    }
1252
1253    async fn request(
1254        &self,
1255        method: Method,
1256        endpoint: &str,
1257        body: Option<Vec<u8>>,
1258    ) -> Result<(StatusCode, HeaderMap, Vec<u8>), AgentError> {
1259        let body = body.map(Bytes::from);
1260
1261        let create_request_with_generated_url = || -> Result<http::Request<Bytes>, AgentError> {
1262            let url = self.route_provider.route()?.join(endpoint)?;
1263            let uri = Uri::from_str(url.as_str())
1264                .map_err(|e| AgentError::InvalidReplicaUrl(e.to_string()))?;
1265            let body = body.clone().unwrap_or_default();
1266            let request = http::Request::builder()
1267                .method(method.clone())
1268                .uri(uri)
1269                .header(CONTENT_TYPE, "application/cbor")
1270                .body(body)
1271                .map_err(|e| {
1272                    AgentError::TransportError(TransportError::Generic(format!(
1273                        "unable to create request: {e:#}"
1274                    )))
1275                })?;
1276
1277            Ok(request)
1278        };
1279
1280        let response = self
1281            .client
1282            .call(
1283                &create_request_with_generated_url,
1284                self.max_tcp_error_retries,
1285                self.max_response_body_size,
1286            )
1287            .await?;
1288
1289        let (parts, body) = response.into_parts();
1290
1291        Ok((parts.status, parts.headers, body.to_vec()))
1292    }
1293
1294    async fn execute(
1295        &self,
1296        method: Method,
1297        endpoint: &str,
1298        body: Option<Vec<u8>>,
1299    ) -> Result<(StatusCode, Vec<u8>), AgentError> {
1300        let request_result = self.request(method.clone(), endpoint, body.clone()).await?;
1301
1302        let status = request_result.0;
1303        let headers = request_result.1;
1304        let body = request_result.2;
1305
1306        if status.is_client_error() || status.is_server_error() {
1307            Err(AgentError::HttpError(HttpErrorPayload {
1308                status: status.into(),
1309                content_type: headers
1310                    .get(CONTENT_TYPE)
1311                    .and_then(|value| value.to_str().ok())
1312                    .map(str::to_string),
1313                content: body,
1314            }))
1315        } else if !(status == StatusCode::OK || status == StatusCode::ACCEPTED) {
1316            Err(AgentError::InvalidHttpResponse(format!(
1317                "Expected `200`, `202`, 4xx`, or `5xx` HTTP status code. Got: {status}",
1318            )))
1319        } else {
1320            Ok((status, body))
1321        }
1322    }
1323}
1324
1325// Checks if a principal is contained within a list of principal ranges
1326// A range is a tuple: (low: Principal, high: Principal), as described here: https://internetcomputer.org/docs/current/references/ic-interface-spec#state-tree-subnet
1327fn principal_is_within_ranges(principal: &Principal, ranges: &[(Principal, Principal)]) -> bool {
1328    ranges
1329        .iter()
1330        .any(|r| principal >= &r.0 && principal <= &r.1)
1331}
1332
1333fn sign_envelope(
1334    content: &EnvelopeContent,
1335    identity: Arc<dyn Identity>,
1336) -> Result<Vec<u8>, AgentError> {
1337    let signature = identity.sign(content).map_err(AgentError::SigningError)?;
1338
1339    let envelope = Envelope {
1340        content: Cow::Borrowed(content),
1341        sender_pubkey: signature.public_key,
1342        sender_sig: signature.signature,
1343        sender_delegation: signature.delegations,
1344    };
1345
1346    let mut serialized_bytes = Vec::new();
1347    let mut serializer = serde_cbor::Serializer::new(&mut serialized_bytes);
1348    serializer.self_describe()?;
1349    envelope.serialize(&mut serializer)?;
1350
1351    Ok(serialized_bytes)
1352}
1353
1354/// Inspect the bytes to be sent as a query
1355/// Return Ok only when the bytes can be deserialized as a query and all fields match with the arguments
1356pub fn signed_query_inspect(
1357    sender: Principal,
1358    canister_id: Principal,
1359    method_name: &str,
1360    arg: &[u8],
1361    ingress_expiry: u64,
1362    signed_query: Vec<u8>,
1363) -> Result<(), AgentError> {
1364    let envelope: Envelope =
1365        serde_cbor::from_slice(&signed_query).map_err(AgentError::InvalidCborData)?;
1366    match envelope.content.as_ref() {
1367        EnvelopeContent::Query {
1368            ingress_expiry: ingress_expiry_cbor,
1369            sender: sender_cbor,
1370            canister_id: canister_id_cbor,
1371            method_name: method_name_cbor,
1372            arg: arg_cbor,
1373            nonce: _nonce,
1374        } => {
1375            if ingress_expiry != *ingress_expiry_cbor {
1376                return Err(AgentError::CallDataMismatch {
1377                    field: "ingress_expiry".to_string(),
1378                    value_arg: ingress_expiry.to_string(),
1379                    value_cbor: ingress_expiry_cbor.to_string(),
1380                });
1381            }
1382            if sender != *sender_cbor {
1383                return Err(AgentError::CallDataMismatch {
1384                    field: "sender".to_string(),
1385                    value_arg: sender.to_string(),
1386                    value_cbor: sender_cbor.to_string(),
1387                });
1388            }
1389            if canister_id != *canister_id_cbor {
1390                return Err(AgentError::CallDataMismatch {
1391                    field: "canister_id".to_string(),
1392                    value_arg: canister_id.to_string(),
1393                    value_cbor: canister_id_cbor.to_string(),
1394                });
1395            }
1396            if method_name != *method_name_cbor {
1397                return Err(AgentError::CallDataMismatch {
1398                    field: "method_name".to_string(),
1399                    value_arg: method_name.to_string(),
1400                    value_cbor: method_name_cbor.clone(),
1401                });
1402            }
1403            if arg != *arg_cbor {
1404                return Err(AgentError::CallDataMismatch {
1405                    field: "arg".to_string(),
1406                    value_arg: format!("{arg:?}"),
1407                    value_cbor: format!("{arg_cbor:?}"),
1408                });
1409            }
1410        }
1411        EnvelopeContent::Call { .. } => {
1412            return Err(AgentError::CallDataMismatch {
1413                field: "request_type".to_string(),
1414                value_arg: "query".to_string(),
1415                value_cbor: "call".to_string(),
1416            })
1417        }
1418        EnvelopeContent::ReadState { .. } => {
1419            return Err(AgentError::CallDataMismatch {
1420                field: "request_type".to_string(),
1421                value_arg: "query".to_string(),
1422                value_cbor: "read_state".to_string(),
1423            })
1424        }
1425    }
1426    Ok(())
1427}
1428
1429/// Inspect the bytes to be sent as an update
1430/// Return Ok only when the bytes can be deserialized as an update and all fields match with the arguments
1431pub fn signed_update_inspect(
1432    sender: Principal,
1433    canister_id: Principal,
1434    method_name: &str,
1435    arg: &[u8],
1436    ingress_expiry: u64,
1437    signed_update: Vec<u8>,
1438) -> Result<(), AgentError> {
1439    let envelope: Envelope =
1440        serde_cbor::from_slice(&signed_update).map_err(AgentError::InvalidCborData)?;
1441    match envelope.content.as_ref() {
1442        EnvelopeContent::Call {
1443            nonce: _nonce,
1444            ingress_expiry: ingress_expiry_cbor,
1445            sender: sender_cbor,
1446            canister_id: canister_id_cbor,
1447            method_name: method_name_cbor,
1448            arg: arg_cbor,
1449        } => {
1450            if ingress_expiry != *ingress_expiry_cbor {
1451                return Err(AgentError::CallDataMismatch {
1452                    field: "ingress_expiry".to_string(),
1453                    value_arg: ingress_expiry.to_string(),
1454                    value_cbor: ingress_expiry_cbor.to_string(),
1455                });
1456            }
1457            if sender != *sender_cbor {
1458                return Err(AgentError::CallDataMismatch {
1459                    field: "sender".to_string(),
1460                    value_arg: sender.to_string(),
1461                    value_cbor: sender_cbor.to_string(),
1462                });
1463            }
1464            if canister_id != *canister_id_cbor {
1465                return Err(AgentError::CallDataMismatch {
1466                    field: "canister_id".to_string(),
1467                    value_arg: canister_id.to_string(),
1468                    value_cbor: canister_id_cbor.to_string(),
1469                });
1470            }
1471            if method_name != *method_name_cbor {
1472                return Err(AgentError::CallDataMismatch {
1473                    field: "method_name".to_string(),
1474                    value_arg: method_name.to_string(),
1475                    value_cbor: method_name_cbor.clone(),
1476                });
1477            }
1478            if arg != *arg_cbor {
1479                return Err(AgentError::CallDataMismatch {
1480                    field: "arg".to_string(),
1481                    value_arg: format!("{arg:?}"),
1482                    value_cbor: format!("{arg_cbor:?}"),
1483                });
1484            }
1485        }
1486        EnvelopeContent::ReadState { .. } => {
1487            return Err(AgentError::CallDataMismatch {
1488                field: "request_type".to_string(),
1489                value_arg: "call".to_string(),
1490                value_cbor: "read_state".to_string(),
1491            })
1492        }
1493        EnvelopeContent::Query { .. } => {
1494            return Err(AgentError::CallDataMismatch {
1495                field: "request_type".to_string(),
1496                value_arg: "call".to_string(),
1497                value_cbor: "query".to_string(),
1498            })
1499        }
1500    }
1501    Ok(())
1502}
1503
1504/// Inspect the bytes to be sent as a `request_status`
1505/// Return Ok only when the bytes can be deserialized as a `request_status` and all fields match with the arguments
1506pub fn signed_request_status_inspect(
1507    sender: Principal,
1508    request_id: &RequestId,
1509    ingress_expiry: u64,
1510    signed_request_status: Vec<u8>,
1511) -> Result<(), AgentError> {
1512    let paths: Vec<Vec<Label>> = vec![vec!["request_status".into(), request_id.to_vec().into()]];
1513    let envelope: Envelope =
1514        serde_cbor::from_slice(&signed_request_status).map_err(AgentError::InvalidCborData)?;
1515    match envelope.content.as_ref() {
1516        EnvelopeContent::ReadState {
1517            ingress_expiry: ingress_expiry_cbor,
1518            sender: sender_cbor,
1519            paths: paths_cbor,
1520        } => {
1521            if ingress_expiry != *ingress_expiry_cbor {
1522                return Err(AgentError::CallDataMismatch {
1523                    field: "ingress_expiry".to_string(),
1524                    value_arg: ingress_expiry.to_string(),
1525                    value_cbor: ingress_expiry_cbor.to_string(),
1526                });
1527            }
1528            if sender != *sender_cbor {
1529                return Err(AgentError::CallDataMismatch {
1530                    field: "sender".to_string(),
1531                    value_arg: sender.to_string(),
1532                    value_cbor: sender_cbor.to_string(),
1533                });
1534            }
1535
1536            if paths != *paths_cbor {
1537                return Err(AgentError::CallDataMismatch {
1538                    field: "paths".to_string(),
1539                    value_arg: format!("{paths:?}"),
1540                    value_cbor: format!("{paths_cbor:?}"),
1541                });
1542            }
1543        }
1544        EnvelopeContent::Query { .. } => {
1545            return Err(AgentError::CallDataMismatch {
1546                field: "request_type".to_string(),
1547                value_arg: "read_state".to_string(),
1548                value_cbor: "query".to_string(),
1549            })
1550        }
1551        EnvelopeContent::Call { .. } => {
1552            return Err(AgentError::CallDataMismatch {
1553                field: "request_type".to_string(),
1554                value_arg: "read_state".to_string(),
1555                value_cbor: "call".to_string(),
1556            })
1557        }
1558    }
1559    Ok(())
1560}
1561
1562#[derive(Clone)]
1563struct SubnetCache {
1564    subnets: TimedCache<Principal, Arc<Subnet>>,
1565    canister_index: RangeInclusiveMap<Principal, Principal, PrincipalStep>,
1566}
1567
1568impl SubnetCache {
1569    fn new() -> Self {
1570        Self {
1571            subnets: TimedCache::with_lifespan(300),
1572            canister_index: RangeInclusiveMap::new_with_step_fns(),
1573        }
1574    }
1575
1576    fn get_subnet_by_canister(&mut self, canister: &Principal) -> Option<Arc<Subnet>> {
1577        self.canister_index
1578            .get(canister)
1579            .and_then(|subnet_id| self.subnets.cache_get(subnet_id).cloned())
1580            .filter(|subnet| subnet.canister_ranges.contains(canister))
1581    }
1582
1583    fn insert_subnet(&mut self, subnet_id: Principal, subnet: Arc<Subnet>) {
1584        self.subnets.cache_set(subnet_id, subnet.clone());
1585        for range in subnet.canister_ranges.iter() {
1586            self.canister_index.insert(range.clone(), subnet_id);
1587        }
1588    }
1589}
1590
1591#[derive(Clone, Copy)]
1592struct PrincipalStep;
1593
1594impl StepFns<Principal> for PrincipalStep {
1595    fn add_one(start: &Principal) -> Principal {
1596        let bytes = start.as_slice();
1597        let mut arr = [0; 29];
1598        arr[..bytes.len()].copy_from_slice(bytes);
1599        for byte in arr[..bytes.len() - 1].iter_mut().rev() {
1600            *byte = byte.wrapping_add(1);
1601            if *byte != 0 {
1602                break;
1603            }
1604        }
1605        Principal::from_slice(&arr[..bytes.len()])
1606    }
1607    fn sub_one(start: &Principal) -> Principal {
1608        let bytes = start.as_slice();
1609        let mut arr = [0; 29];
1610        arr[..bytes.len()].copy_from_slice(bytes);
1611        for byte in arr[..bytes.len() - 1].iter_mut().rev() {
1612            *byte = byte.wrapping_sub(1);
1613            if *byte != 255 {
1614                break;
1615            }
1616        }
1617        Principal::from_slice(&arr[..bytes.len()])
1618    }
1619}
1620
1621#[derive(Clone)]
1622pub(crate) struct Subnet {
1623    // This key is just fetched for completeness. Do not actually use this value as it is not authoritative in case of a rogue subnet.
1624    // If a future agent needs to know the subnet key then it should fetch /subnet from the *root* subnet.
1625    _key: Vec<u8>,
1626    node_keys: HashMap<Principal, Vec<u8>>,
1627    canister_ranges: RangeInclusiveSet<Principal, PrincipalStep>,
1628}
1629
1630/// API boundary node, which routes /api calls to IC replica nodes.
1631#[derive(Debug, Clone)]
1632pub struct ApiBoundaryNode {
1633    /// Domain name
1634    pub domain: String,
1635    /// IPv6 address in the hexadecimal notation with colons.
1636    pub ipv6_address: String,
1637    /// IPv4 address in the dotted-decimal notation.
1638    pub ipv4_address: Option<String>,
1639}
1640
1641/// A query request builder.
1642///
1643/// This makes it easier to do query calls without actually passing all arguments.
1644#[derive(Debug, Clone)]
1645#[non_exhaustive]
1646pub struct QueryBuilder<'agent> {
1647    agent: &'agent Agent,
1648    /// The [effective canister ID](https://internetcomputer.org/docs/current/references/ic-interface-spec#http-effective-canister-id) of the destination.
1649    pub effective_canister_id: Principal,
1650    /// The principal ID of the canister being called.
1651    pub canister_id: Principal,
1652    /// The name of the canister method being called.
1653    pub method_name: String,
1654    /// The argument blob to be passed to the method.
1655    pub arg: Vec<u8>,
1656    /// The Unix timestamp that the request will expire at.
1657    pub ingress_expiry_datetime: Option<u64>,
1658    /// Whether to include a nonce with the message.
1659    pub use_nonce: bool,
1660}
1661
1662impl<'agent> QueryBuilder<'agent> {
1663    /// Creates a new query builder with an agent for a particular canister method.
1664    pub fn new(agent: &'agent Agent, canister_id: Principal, method_name: String) -> Self {
1665        Self {
1666            agent,
1667            effective_canister_id: canister_id,
1668            canister_id,
1669            method_name,
1670            arg: vec![],
1671            ingress_expiry_datetime: None,
1672            use_nonce: false,
1673        }
1674    }
1675
1676    /// Sets the [effective canister ID](https://internetcomputer.org/docs/current/references/ic-interface-spec#http-effective-canister-id) of the destination.
1677    pub fn with_effective_canister_id(mut self, canister_id: Principal) -> Self {
1678        self.effective_canister_id = canister_id;
1679        self
1680    }
1681
1682    /// Sets the argument blob to pass to the canister. For most canisters this should be a Candid-serialized tuple.
1683    pub fn with_arg<A: Into<Vec<u8>>>(mut self, arg: A) -> Self {
1684        self.arg = arg.into();
1685        self
1686    }
1687
1688    /// Sets `ingress_expiry_datetime` to the provided timestamp, at nanosecond precision.
1689    pub fn expire_at(mut self, time: impl Into<OffsetDateTime>) -> Self {
1690        self.ingress_expiry_datetime = Some(time.into().unix_timestamp_nanos() as u64);
1691        self
1692    }
1693
1694    /// Sets `ingress_expiry_datetime` to `max(now, 4min)`.
1695    pub fn expire_after(mut self, duration: Duration) -> Self {
1696        self.ingress_expiry_datetime = Some(
1697            OffsetDateTime::now_utc()
1698                .saturating_add(duration.try_into().expect("negative duration"))
1699                .unix_timestamp_nanos() as u64,
1700        );
1701        self
1702    }
1703
1704    /// Uses a nonce generated with the agent's configured nonce factory. By default queries do not use nonces,
1705    /// and thus may get a (briefly) cached response.
1706    pub fn with_nonce_generation(mut self) -> Self {
1707        self.use_nonce = true;
1708        self
1709    }
1710
1711    /// Make a query call. This will return a byte vector.
1712    pub async fn call(self) -> Result<Vec<u8>, AgentError> {
1713        self.agent
1714            .query_raw(
1715                self.canister_id,
1716                self.effective_canister_id,
1717                self.method_name,
1718                self.arg,
1719                self.ingress_expiry_datetime,
1720                self.use_nonce,
1721                None,
1722            )
1723            .await
1724    }
1725
1726    /// Make a query call with signature verification. This will return a byte vector.
1727    ///
1728    /// Compared with [call][Self::call], this method will **always** verify the signature of the query response
1729    /// regardless the Agent level configuration from [`AgentBuilder::with_verify_query_signatures`].
1730    pub async fn call_with_verification(self) -> Result<Vec<u8>, AgentError> {
1731        self.agent
1732            .query_raw(
1733                self.canister_id,
1734                self.effective_canister_id,
1735                self.method_name,
1736                self.arg,
1737                self.ingress_expiry_datetime,
1738                self.use_nonce,
1739                Some(true),
1740            )
1741            .await
1742    }
1743
1744    /// Make a query call without signature verification. This will return a byte vector.
1745    ///
1746    /// Compared with [call][Self::call], this method will **never** verify the signature of the query response
1747    /// regardless the Agent level configuration from [`AgentBuilder::with_verify_query_signatures`].
1748    pub async fn call_without_verification(self) -> Result<Vec<u8>, AgentError> {
1749        self.agent
1750            .query_raw(
1751                self.canister_id,
1752                self.effective_canister_id,
1753                self.method_name,
1754                self.arg,
1755                self.ingress_expiry_datetime,
1756                self.use_nonce,
1757                Some(false),
1758            )
1759            .await
1760    }
1761
1762    /// Sign a query call. This will return a [`signed::SignedQuery`]
1763    /// which contains all fields of the query and the signed query in CBOR encoding
1764    pub fn sign(self) -> Result<SignedQuery, AgentError> {
1765        let effective_canister_id = self.effective_canister_id;
1766        let identity = self.agent.identity.clone();
1767        let content = self.into_envelope()?;
1768        let signed_query = sign_envelope(&content, identity)?;
1769        let EnvelopeContent::Query {
1770            ingress_expiry,
1771            sender,
1772            canister_id,
1773            method_name,
1774            arg,
1775            nonce,
1776        } = content
1777        else {
1778            unreachable!()
1779        };
1780        Ok(SignedQuery {
1781            ingress_expiry,
1782            sender,
1783            canister_id,
1784            method_name,
1785            arg,
1786            effective_canister_id,
1787            signed_query,
1788            nonce,
1789        })
1790    }
1791
1792    /// Converts the query builder into [`EnvelopeContent`] for external signing or storage.
1793    pub fn into_envelope(self) -> Result<EnvelopeContent, AgentError> {
1794        self.agent.query_content(
1795            self.canister_id,
1796            self.method_name,
1797            self.arg,
1798            self.ingress_expiry_datetime,
1799            self.use_nonce,
1800        )
1801    }
1802}
1803
1804impl<'agent> IntoFuture for QueryBuilder<'agent> {
1805    type IntoFuture = AgentFuture<'agent, Vec<u8>>;
1806    type Output = Result<Vec<u8>, AgentError>;
1807    fn into_future(self) -> Self::IntoFuture {
1808        Box::pin(self.call())
1809    }
1810}
1811
1812/// An in-flight canister update call. Useful primarily as a `Future`.
1813pub struct UpdateCall<'agent> {
1814    agent: &'agent Agent,
1815    response_future: AgentFuture<'agent, CallResponse<(Vec<u8>, Certificate)>>,
1816    effective_canister_id: Principal,
1817    canister_id: Principal,
1818    method_name: String,
1819}
1820
1821impl fmt::Debug for UpdateCall<'_> {
1822    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1823        f.debug_struct("UpdateCall")
1824            .field("agent", &self.agent)
1825            .field("effective_canister_id", &self.effective_canister_id)
1826            .finish_non_exhaustive()
1827    }
1828}
1829
1830impl Future for UpdateCall<'_> {
1831    type Output = Result<CallResponse<(Vec<u8>, Certificate)>, AgentError>;
1832    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1833        self.response_future.as_mut().poll(cx)
1834    }
1835}
1836
1837impl<'a> UpdateCall<'a> {
1838    /// Waits for the update call to be completed, polling if necessary.
1839    pub async fn and_wait(self) -> Result<(Vec<u8>, Certificate), AgentError> {
1840        let response = self.response_future.await?;
1841
1842        match response {
1843            CallResponse::Response(response) => Ok(response),
1844            CallResponse::Poll(request_id) => {
1845                self.agent
1846                    .wait_inner(
1847                        &request_id,
1848                        self.effective_canister_id,
1849                        Some(Operation::Call {
1850                            canister: self.canister_id,
1851                            method: self.method_name,
1852                        }),
1853                    )
1854                    .await
1855            }
1856        }
1857    }
1858}
1859/// An update request Builder.
1860///
1861/// This makes it easier to do update calls without actually passing all arguments or specifying
1862/// if you want to wait or not.
1863#[derive(Debug)]
1864pub struct UpdateBuilder<'agent> {
1865    agent: &'agent Agent,
1866    /// The [effective canister ID](https://internetcomputer.org/docs/current/references/ic-interface-spec#http-effective-canister-id) of the destination.
1867    pub effective_canister_id: Principal,
1868    /// The principal ID of the canister being called.
1869    pub canister_id: Principal,
1870    /// The name of the canister method being called.
1871    pub method_name: String,
1872    /// The argument blob to be passed to the method.
1873    pub arg: Vec<u8>,
1874    /// The Unix timestamp that the request will expire at.
1875    pub ingress_expiry_datetime: Option<u64>,
1876}
1877
1878impl<'agent> UpdateBuilder<'agent> {
1879    /// Creates a new update builder with an agent for a particular canister method.
1880    pub fn new(agent: &'agent Agent, canister_id: Principal, method_name: String) -> Self {
1881        Self {
1882            agent,
1883            effective_canister_id: canister_id,
1884            canister_id,
1885            method_name,
1886            arg: vec![],
1887            ingress_expiry_datetime: None,
1888        }
1889    }
1890
1891    /// Sets the [effective canister ID](https://internetcomputer.org/docs/current/references/ic-interface-spec#http-effective-canister-id) of the destination.
1892    pub fn with_effective_canister_id(mut self, canister_id: Principal) -> Self {
1893        self.effective_canister_id = canister_id;
1894        self
1895    }
1896
1897    /// Sets the argument blob to pass to the canister. For most canisters this should be a Candid-serialized tuple.
1898    pub fn with_arg<A: Into<Vec<u8>>>(mut self, arg: A) -> Self {
1899        self.arg = arg.into();
1900        self
1901    }
1902
1903    /// Sets `ingress_expiry_datetime` to the provided timestamp, at nanosecond precision.
1904    pub fn expire_at(mut self, time: impl Into<OffsetDateTime>) -> Self {
1905        self.ingress_expiry_datetime = Some(time.into().unix_timestamp_nanos() as u64);
1906        self
1907    }
1908
1909    /// Sets `ingress_expiry_datetime` to `min(now, 4min)`.
1910    pub fn expire_after(mut self, duration: Duration) -> Self {
1911        self.ingress_expiry_datetime = Some(
1912            OffsetDateTime::now_utc()
1913                .saturating_add(duration.try_into().expect("negative duration"))
1914                .unix_timestamp_nanos() as u64,
1915        );
1916        self
1917    }
1918
1919    /// Make an update call. This will call `request_status` on the `RequestId` in a loop and return
1920    /// the response as a byte vector.
1921    pub async fn call_and_wait(self) -> Result<Vec<u8>, AgentError> {
1922        self.call().and_wait().await.map(|x| x.0)
1923    }
1924
1925    /// Make an update call. This will return a `RequestId`.
1926    /// The `RequestId` should then be used for `request_status` (most likely in a loop).
1927    pub fn call(self) -> UpdateCall<'agent> {
1928        let method_name = self.method_name.clone();
1929        let response_future = async move {
1930            self.agent
1931                .update_raw(
1932                    self.canister_id,
1933                    self.effective_canister_id,
1934                    self.method_name,
1935                    self.arg,
1936                    self.ingress_expiry_datetime,
1937                )
1938                .await
1939        };
1940        UpdateCall {
1941            agent: self.agent,
1942            response_future: Box::pin(response_future),
1943            effective_canister_id: self.effective_canister_id,
1944            canister_id: self.canister_id,
1945            method_name,
1946        }
1947    }
1948
1949    /// Sign a update call. This will return a [`signed::SignedUpdate`]
1950    /// which contains all fields of the update and the signed update in CBOR encoding
1951    pub fn sign(self) -> Result<SignedUpdate, AgentError> {
1952        let identity = self.agent.identity.clone();
1953        let effective_canister_id = self.effective_canister_id;
1954        let content = self.into_envelope()?;
1955        let signed_update = sign_envelope(&content, identity)?;
1956        let request_id = to_request_id(&content)?;
1957        let EnvelopeContent::Call {
1958            nonce,
1959            ingress_expiry,
1960            sender,
1961            canister_id,
1962            method_name,
1963            arg,
1964        } = content
1965        else {
1966            unreachable!()
1967        };
1968        Ok(SignedUpdate {
1969            nonce,
1970            ingress_expiry,
1971            sender,
1972            canister_id,
1973            method_name,
1974            arg,
1975            effective_canister_id,
1976            signed_update,
1977            request_id,
1978        })
1979    }
1980
1981    /// Converts the update builder into an [`EnvelopeContent`] for external signing or storage.
1982    pub fn into_envelope(self) -> Result<EnvelopeContent, AgentError> {
1983        let nonce = self.agent.nonce_factory.generate();
1984        self.agent.update_content(
1985            self.canister_id,
1986            self.method_name,
1987            self.arg,
1988            self.ingress_expiry_datetime,
1989            nonce,
1990        )
1991    }
1992}
1993
1994impl<'agent> IntoFuture for UpdateBuilder<'agent> {
1995    type IntoFuture = AgentFuture<'agent, Vec<u8>>;
1996    type Output = Result<Vec<u8>, AgentError>;
1997    fn into_future(self) -> Self::IntoFuture {
1998        Box::pin(self.call_and_wait())
1999    }
2000}
2001
2002/// HTTP client middleware. Implemented automatically for `reqwest`-compatible by-ref `tower::Service`, such as `reqwest_middleware`.
2003#[cfg_attr(target_family = "wasm", async_trait(?Send))]
2004#[cfg_attr(not(target_family = "wasm"), async_trait)]
2005pub trait HttpService: Send + Sync + Debug {
2006    /// Perform a HTTP request. Any retry logic should call `req` again to get a new request.
2007    async fn call<'a>(
2008        &'a self,
2009        req: &'a (dyn Fn() -> Result<http::Request<Bytes>, AgentError> + Send + Sync),
2010        max_retries: usize,
2011        size_limit: Option<usize>,
2012    ) -> Result<http::Response<Bytes>, AgentError>;
2013}
2014
2015/// Convert from http Request to reqwest's one
2016fn from_http_request(req: http::Request<Bytes>) -> Result<Request, AgentError> {
2017    let (parts, body) = req.into_parts();
2018    let body = reqwest::Body::from(body);
2019    // I think it can never fail since it converts from `Url` to `Uri` and `Url` is a subset of `Uri`,
2020    // but just to be safe let's handle it.
2021    let request = http::Request::from_parts(parts, body)
2022        .try_into()
2023        .map_err(|e: reqwest::Error| AgentError::InvalidReplicaUrl(e.to_string()))?;
2024
2025    Ok(request)
2026}
2027
2028/// Convert from reqwests's Response to http one
2029#[cfg(not(target_family = "wasm"))]
2030async fn to_http_response(
2031    resp: Response,
2032    size_limit: Option<usize>,
2033) -> Result<http::Response<Bytes>, AgentError> {
2034    use http_body_util::{BodyExt, Limited};
2035
2036    let resp: http::Response<reqwest::Body> = resp.into();
2037    let (parts, body) = resp.into_parts();
2038    let body = Limited::new(body, size_limit.unwrap_or(usize::MAX));
2039    let body = body
2040        .collect()
2041        .await
2042        .map_err(|e| {
2043            AgentError::TransportError(TransportError::Generic(format!(
2044                "unable to read response body: {e:#}"
2045            )))
2046        })?
2047        .to_bytes();
2048    let resp = http::Response::from_parts(parts, body);
2049
2050    Ok(resp)
2051}
2052
2053/// Convert from reqwests's Response to http one.
2054/// WASM Response in reqwest doesn't have direct conversion for http::Response,
2055/// so we have to hack around using streams.
2056#[cfg(target_family = "wasm")]
2057async fn to_http_response(
2058    resp: Response,
2059    size_limit: Option<usize>,
2060) -> Result<http::Response<Bytes>, AgentError> {
2061    use futures_util::StreamExt;
2062    use http_body::Frame;
2063    use http_body_util::{Limited, StreamBody};
2064
2065    // Save headers
2066    let status = resp.status();
2067    let headers = resp.headers().clone();
2068
2069    // Convert body
2070    let stream = resp.bytes_stream().map(|x| x.map(Frame::data));
2071    let body = StreamBody::new(stream);
2072    let body = Limited::new(body, size_limit.unwrap_or(usize::MAX));
2073    let body = http_body_util::BodyExt::collect(body)
2074        .await
2075        .map_err(|e| {
2076            AgentError::TransportError(TransportError::Generic(format!(
2077                "unable to read response body: {e:#}"
2078            )))
2079        })?
2080        .to_bytes();
2081
2082    let mut resp = http::Response::new(body);
2083    *resp.status_mut() = status;
2084    *resp.headers_mut() = headers;
2085
2086    Ok(resp)
2087}
2088
2089#[cfg(not(target_family = "wasm"))]
2090#[async_trait]
2091impl<T> HttpService for T
2092where
2093    for<'a> &'a T: Service<Request, Response = Response, Error = reqwest::Error>,
2094    for<'a> <&'a Self as Service<Request>>::Future: Send,
2095    T: Send + Sync + Debug + ?Sized,
2096{
2097    #[allow(clippy::needless_arbitrary_self_type)]
2098    async fn call<'a>(
2099        mut self: &'a Self,
2100        req: &'a (dyn Fn() -> Result<http::Request<Bytes>, AgentError> + Send + Sync),
2101        max_retries: usize,
2102        size_limit: Option<usize>,
2103    ) -> Result<http::Response<Bytes>, AgentError> {
2104        let mut retry_count = 0;
2105        loop {
2106            let request = from_http_request(req()?)?;
2107
2108            match Service::call(&mut self, request).await {
2109                Err(err) => {
2110                    // Network-related errors can be retried.
2111                    if err.is_connect() {
2112                        if retry_count >= max_retries {
2113                            return Err(AgentError::TransportError(TransportError::Reqwest(err)));
2114                        }
2115                        retry_count += 1;
2116                    }
2117                    // All other errors return immediately.
2118                    else {
2119                        return Err(AgentError::TransportError(TransportError::Reqwest(err)));
2120                    }
2121                }
2122
2123                Ok(resp) => {
2124                    let resp = to_http_response(resp, size_limit).await?;
2125                    return Ok(resp);
2126                }
2127            }
2128        }
2129    }
2130}
2131
2132#[cfg(target_family = "wasm")]
2133#[async_trait(?Send)]
2134impl<T> HttpService for T
2135where
2136    for<'a> &'a T: Service<Request, Response = Response, Error = reqwest::Error>,
2137    T: Send + Sync + Debug + ?Sized,
2138{
2139    #[allow(clippy::needless_arbitrary_self_type)]
2140    async fn call<'a>(
2141        mut self: &'a Self,
2142        req: &'a (dyn Fn() -> Result<http::Request<Bytes>, AgentError> + Send + Sync),
2143        _retries: usize,
2144        _size_limit: Option<usize>,
2145    ) -> Result<http::Response<Bytes>, AgentError> {
2146        let request = from_http_request(req()?)?;
2147        let response = Service::call(&mut self, request)
2148            .await
2149            .map_err(|e| AgentError::TransportError(TransportError::Reqwest(e)))?;
2150
2151        to_http_response(response, _size_limit).await
2152    }
2153}
2154
2155#[derive(Debug)]
2156struct Retry429Logic {
2157    client: Client,
2158}
2159
2160#[cfg_attr(target_family = "wasm", async_trait(?Send))]
2161#[cfg_attr(not(target_family = "wasm"), async_trait)]
2162impl HttpService for Retry429Logic {
2163    async fn call<'a>(
2164        &'a self,
2165        req: &'a (dyn Fn() -> Result<http::Request<Bytes>, AgentError> + Send + Sync),
2166        _max_tcp_retries: usize,
2167        _size_limit: Option<usize>,
2168    ) -> Result<http::Response<Bytes>, AgentError> {
2169        let mut retries = 0;
2170        loop {
2171            #[cfg(not(target_family = "wasm"))]
2172            let resp = self.client.call(req, _max_tcp_retries, _size_limit).await?;
2173            // Client inconveniently does not implement Service on wasm
2174            #[cfg(target_family = "wasm")]
2175            let resp = {
2176                let request = from_http_request(req()?)?;
2177                let resp = self
2178                    .client
2179                    .execute(request)
2180                    .await
2181                    .map_err(|e| AgentError::TransportError(TransportError::Reqwest(e)))?;
2182
2183                to_http_response(resp, _size_limit).await?
2184            };
2185
2186            if resp.status() == StatusCode::TOO_MANY_REQUESTS {
2187                if retries == 6 {
2188                    break Ok(resp);
2189                } else {
2190                    retries += 1;
2191                    crate::util::sleep(Duration::from_millis(250)).await;
2192                    continue;
2193                }
2194            } else {
2195                break Ok(resp);
2196            }
2197        }
2198    }
2199}
2200
2201#[cfg(all(test, not(target_family = "wasm")))]
2202mod offline_tests {
2203    use super::*;
2204    use tokio::net::TcpListener;
2205    // Any tests that involve the network should go in agent_test, not here.
2206
2207    #[test]
2208    fn rounded_expiry() {
2209        let agent = Agent::builder()
2210            .with_url("http://not-a-real-url")
2211            .build()
2212            .unwrap();
2213        let mut prev_expiry = None;
2214        let mut num_timestamps = 0;
2215        for _ in 0..6 {
2216            let update = agent
2217                .update(&Principal::management_canister(), "not_a_method")
2218                .sign()
2219                .unwrap();
2220            if prev_expiry < Some(update.ingress_expiry) {
2221                prev_expiry = Some(update.ingress_expiry);
2222                num_timestamps += 1;
2223            }
2224        }
2225        // in six requests, there should be no more than two timestamps
2226        assert!(num_timestamps <= 2, "num_timestamps:{num_timestamps} > 2");
2227    }
2228
2229    #[tokio::test]
2230    async fn client_ratelimit() {
2231        let mock_server = TcpListener::bind("127.0.0.1:0").await.unwrap();
2232        let count = Arc::new(Mutex::new(0));
2233        let port = mock_server.local_addr().unwrap().port();
2234        tokio::spawn({
2235            let count = count.clone();
2236            async move {
2237                loop {
2238                    let (mut conn, _) = mock_server.accept().await.unwrap();
2239                    *count.lock().unwrap() += 1;
2240                    tokio::spawn(
2241                        // read all data, never reply
2242                        async move { tokio::io::copy(&mut conn, &mut tokio::io::sink()).await },
2243                    );
2244                }
2245            }
2246        });
2247        let agent = Agent::builder()
2248            .with_http_client(Client::builder().http1_only().build().unwrap())
2249            .with_url(format!("http://127.0.0.1:{port}"))
2250            .with_max_concurrent_requests(2)
2251            .build()
2252            .unwrap();
2253        for _ in 0..3 {
2254            let agent = agent.clone();
2255            tokio::spawn(async move {
2256                agent
2257                    .query(&"ryjl3-tyaaa-aaaaa-aaaba-cai".parse().unwrap(), "greet")
2258                    .call()
2259                    .await
2260            });
2261        }
2262        crate::util::sleep(Duration::from_millis(250)).await;
2263        assert_eq!(*count.lock().unwrap(), 2);
2264    }
2265}