ic_agent/agent/
mod.rs

1//! The main Agent module. Contains the [Agent] type and all associated structures.
2pub(crate) mod agent_config;
3pub mod agent_error;
4pub(crate) mod builder;
5// delete this module after 0.40
6#[doc(hidden)]
7#[deprecated(since = "0.38.0", note = "use the AgentBuilder methods")]
8pub mod http_transport;
9pub(crate) mod nonce;
10pub(crate) mod response_authentication;
11pub mod route_provider;
12pub mod status;
13
14pub use agent_config::AgentConfig;
15pub use agent_error::AgentError;
16use agent_error::{HttpErrorPayload, Operation};
17use async_lock::Semaphore;
18use async_trait::async_trait;
19pub use builder::AgentBuilder;
20use bytes::Bytes;
21use cached::{Cached, TimedCache};
22use http::{header::CONTENT_TYPE, HeaderMap, Method, StatusCode, Uri};
23use ic_ed25519::{PublicKey, SignatureError};
24#[doc(inline)]
25pub use ic_transport_types::{
26    signed, CallResponse, Envelope, EnvelopeContent, RejectCode, RejectResponse, ReplyResponse,
27    RequestStatusResponse,
28};
29pub use nonce::{NonceFactory, NonceGenerator};
30use rangemap::{RangeInclusiveMap, RangeInclusiveSet, StepFns};
31use reqwest::{Client, Request, Response};
32use route_provider::{
33    dynamic_routing::{
34        dynamic_route_provider::DynamicRouteProviderBuilder, node::Node,
35        snapshot::latency_based_routing::LatencyRoutingSnapshot,
36    },
37    RouteProvider, UrlUntilReady,
38};
39use time::OffsetDateTime;
40use tower_service::Service;
41
42#[cfg(test)]
43mod agent_test;
44
45use crate::{
46    agent::response_authentication::{
47        extract_der, lookup_canister_info, lookup_canister_metadata, lookup_request_status,
48        lookup_subnet, lookup_subnet_metrics, lookup_time, lookup_value,
49    },
50    agent_error::TransportError,
51    export::Principal,
52    identity::Identity,
53    to_request_id, RequestId,
54};
55use backoff::{backoff::Backoff, ExponentialBackoffBuilder};
56use backoff::{exponential::ExponentialBackoff, SystemClock};
57use ic_certification::{Certificate, Delegation, Label};
58use ic_transport_types::{
59    signed::{SignedQuery, SignedRequestStatus, SignedUpdate},
60    QueryResponse, ReadStateResponse, SubnetMetrics, TransportCallResponse,
61};
62use serde::Serialize;
63use status::Status;
64use std::{
65    borrow::Cow,
66    collections::HashMap,
67    convert::TryFrom,
68    fmt::{self, Debug},
69    future::{Future, IntoFuture},
70    pin::Pin,
71    str::FromStr,
72    sync::{Arc, Mutex, RwLock},
73    task::{Context, Poll},
74    time::Duration,
75};
76
77use crate::agent::response_authentication::lookup_api_boundary_nodes;
78
79const IC_STATE_ROOT_DOMAIN_SEPARATOR: &[u8; 14] = b"\x0Dic-state-root";
80
81const IC_ROOT_KEY: &[u8; 133] = b"\x30\x81\x82\x30\x1d\x06\x0d\x2b\x06\x01\x04\x01\x82\xdc\x7c\x05\x03\x01\x02\x01\x06\x0c\x2b\x06\x01\x04\x01\x82\xdc\x7c\x05\x03\x02\x01\x03\x61\x00\x81\x4c\x0e\x6e\xc7\x1f\xab\x58\x3b\x08\xbd\x81\x37\x3c\x25\x5c\x3c\x37\x1b\x2e\x84\x86\x3c\x98\xa4\xf1\xe0\x8b\x74\x23\x5d\x14\xfb\x5d\x9c\x0c\xd5\x46\xd9\x68\x5f\x91\x3a\x0c\x0b\x2c\xc5\x34\x15\x83\xbf\x4b\x43\x92\xe4\x67\xdb\x96\xd6\x5b\x9b\xb4\xcb\x71\x71\x12\xf8\x47\x2e\x0d\x5a\x4d\x14\x50\x5f\xfd\x74\x84\xb0\x12\x91\x09\x1c\x5f\x87\xb9\x88\x83\x46\x3f\x98\x09\x1a\x0b\xaa\xae";
82
83#[cfg(not(target_family = "wasm"))]
84type AgentFuture<'a, V> = Pin<Box<dyn Future<Output = Result<V, AgentError>> + Send + 'a>>;
85
86#[cfg(target_family = "wasm")]
87type AgentFuture<'a, V> = Pin<Box<dyn Future<Output = Result<V, AgentError>> + 'a>>;
88
89/// A low level Agent to make calls to a Replica endpoint.
90///
91/// ```ignore
92/// # // This test is ignored because it requires an ic to be running. We run these
93/// # // in the ic-ref workflow.
94/// use ic_agent::{Agent, export::Principal};
95/// use candid::{Encode, Decode, CandidType, Nat};
96/// use serde::Deserialize;
97///
98/// #[derive(CandidType)]
99/// struct Argument {
100///   amount: Option<Nat>,
101/// }
102///
103/// #[derive(CandidType, Deserialize)]
104/// struct CreateCanisterResult {
105///   canister_id: Principal,
106/// }
107///
108/// # fn create_identity() -> impl ic_agent::Identity {
109/// #     // In real code, the raw key should be either read from a pem file or generated with randomness.
110/// #     ic_agent::identity::BasicIdentity::from_raw_key(&[0u8;32])
111/// # }
112/// #
113/// async fn create_a_canister() -> Result<Principal, Box<dyn std::error::Error>> {
114/// # let url = format!("http://localhost:{}", option_env!("IC_REF_PORT").unwrap_or("4943"));
115///   let agent = Agent::builder()
116///     .with_url(url)
117///     .with_identity(create_identity())
118///     .build()?;
119///
120///   // Only do the following call when not contacting the IC main net (e.g. a local emulator).
121///   // This is important as the main net public key is static and a rogue network could return
122///   // a different key.
123///   // If you know the root key ahead of time, you can use `agent.set_root_key(root_key);`.
124///   agent.fetch_root_key().await?;
125///   let management_canister_id = Principal::from_text("aaaaa-aa")?;
126///
127///   // Create a call to the management canister to create a new canister ID,
128///   // and wait for a result.
129///   // The effective canister id must belong to the canister ranges of the subnet at which the canister is created.
130///   let effective_canister_id = Principal::from_text("rwlgt-iiaaa-aaaaa-aaaaa-cai").unwrap();
131///   let response = agent.update(&management_canister_id, "provisional_create_canister_with_cycles")
132///     .with_effective_canister_id(effective_canister_id)
133///     .with_arg(Encode!(&Argument { amount: None })?)
134///     .await?;
135///
136///   let result = Decode!(response.as_slice(), CreateCanisterResult)?;
137///   let canister_id: Principal = Principal::from_text(&result.canister_id.to_text())?;
138///   Ok(canister_id)
139/// }
140///
141/// # let mut runtime = tokio::runtime::Runtime::new().unwrap();
142/// # runtime.block_on(async {
143/// let canister_id = create_a_canister().await.unwrap();
144/// eprintln!("{}", canister_id);
145/// # });
146/// ```
147///
148/// This agent does not understand Candid, and only acts on byte buffers.
149///
150/// Some methods return certificates. While there is a `verify_certificate` method, any certificate
151/// you receive from a method has already been verified and you do not need to manually verify it.
152#[derive(Clone)]
153pub struct Agent {
154    nonce_factory: Arc<dyn NonceGenerator>,
155    identity: Arc<dyn Identity>,
156    ingress_expiry: Duration,
157    root_key: Arc<RwLock<Vec<u8>>>,
158    client: Arc<dyn HttpService>,
159    route_provider: Arc<dyn RouteProvider>,
160    subnet_key_cache: Arc<Mutex<SubnetCache>>,
161    concurrent_requests_semaphore: Arc<Semaphore>,
162    verify_query_signatures: bool,
163    max_response_body_size: Option<usize>,
164    max_polling_time: Duration,
165    #[allow(dead_code)]
166    max_tcp_error_retries: usize,
167}
168
169impl fmt::Debug for Agent {
170    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
171        f.debug_struct("Agent")
172            .field("ingress_expiry", &self.ingress_expiry)
173            .finish_non_exhaustive()
174    }
175}
176
177impl Agent {
178    /// Create an instance of an [`AgentBuilder`] for building an [`Agent`]. This is simpler than
179    /// using the [`AgentConfig`] and [`Agent::new()`].
180    pub fn builder() -> builder::AgentBuilder {
181        Default::default()
182    }
183
184    /// Create an instance of an [`Agent`].
185    pub fn new(config: agent_config::AgentConfig) -> Result<Agent, AgentError> {
186        let client = config.http_service.unwrap_or_else(|| {
187            Arc::new(Retry429Logic {
188                client: config.client.unwrap_or_else(|| {
189                    #[cfg(not(target_family = "wasm"))]
190                    {
191                        Client::builder()
192                            .use_rustls_tls()
193                            .timeout(Duration::from_secs(360))
194                            .build()
195                            .expect("Could not create HTTP client.")
196                    }
197                    #[cfg(all(target_family = "wasm", feature = "wasm-bindgen"))]
198                    {
199                        Client::new()
200                    }
201                }),
202            })
203        });
204        Ok(Agent {
205            nonce_factory: config.nonce_factory,
206            identity: config.identity,
207            ingress_expiry: config.ingress_expiry,
208            root_key: Arc::new(RwLock::new(IC_ROOT_KEY.to_vec())),
209            client: client.clone(),
210            route_provider: if let Some(route_provider) = config.route_provider {
211                route_provider
212            } else if let Some(url) = config.url {
213                if config.background_dynamic_routing {
214                    assert!(
215                        url.scheme() == "https" && url.path() == "/" && url.port().is_none() && url.domain().is_some(),
216                        "in dynamic routing mode, URL must be in the exact form https://domain with no path, port, IP, or non-HTTPS scheme"
217                    );
218                    let seeds = vec![Node::new(url.domain().unwrap()).unwrap()];
219                    UrlUntilReady::new(url, async move {
220                        DynamicRouteProviderBuilder::new(
221                            LatencyRoutingSnapshot::new(),
222                            seeds,
223                            client,
224                        )
225                        .build()
226                        .await
227                    }) as Arc<dyn RouteProvider>
228                } else {
229                    Arc::new(url)
230                }
231            } else {
232                panic!("either route_provider or url must be specified");
233            },
234            subnet_key_cache: Arc::new(Mutex::new(SubnetCache::new())),
235            verify_query_signatures: config.verify_query_signatures,
236            concurrent_requests_semaphore: Arc::new(Semaphore::new(config.max_concurrent_requests)),
237            max_response_body_size: config.max_response_body_size,
238            max_tcp_error_retries: config.max_tcp_error_retries,
239            max_polling_time: config.max_polling_time,
240        })
241    }
242
243    /// Set the identity provider for signing messages.
244    ///
245    /// NOTE: if you change the identity while having update calls in
246    /// flight, you will not be able to [`Agent::request_status_raw`] the status of these
247    /// messages.
248    pub fn set_identity<I>(&mut self, identity: I)
249    where
250        I: 'static + Identity,
251    {
252        self.identity = Arc::new(identity);
253    }
254    /// Set the arc identity provider for signing messages.
255    ///
256    /// NOTE: if you change the identity while having update calls in
257    /// flight, you will not be able to [`Agent::request_status_raw`] the status of these
258    /// messages.
259    pub fn set_arc_identity(&mut self, identity: Arc<dyn Identity>) {
260        self.identity = identity;
261    }
262
263    /// By default, the agent is configured to talk to the main Internet Computer, and verifies
264    /// responses using a hard-coded public key.
265    ///
266    /// This function will instruct the agent to ask the endpoint for its public key, and use
267    /// that instead. This is required when talking to a local test instance, for example.
268    ///
269    /// *Only use this when you are  _not_ talking to the main Internet Computer, otherwise
270    /// you are prone to man-in-the-middle attacks! Do not call this function by default.*
271    pub async fn fetch_root_key(&self) -> Result<(), AgentError> {
272        if self.read_root_key()[..] != IC_ROOT_KEY[..] {
273            // already fetched the root key
274            return Ok(());
275        }
276        let status = self.status().await?;
277        let Some(root_key) = status.root_key else {
278            return Err(AgentError::NoRootKeyInStatus(status));
279        };
280        self.set_root_key(root_key);
281        Ok(())
282    }
283
284    /// By default, the agent is configured to talk to the main Internet Computer, and verifies
285    /// responses using a hard-coded public key.
286    ///
287    /// Using this function you can set the root key to a known one if you know if beforehand.
288    pub fn set_root_key(&self, root_key: Vec<u8>) {
289        *self.root_key.write().unwrap() = root_key;
290    }
291
292    /// Return the root key currently in use.
293    pub fn read_root_key(&self) -> Vec<u8> {
294        self.root_key.read().unwrap().clone()
295    }
296
297    fn get_expiry_date(&self) -> u64 {
298        let expiry_raw = OffsetDateTime::now_utc() + self.ingress_expiry;
299        let mut rounded = expiry_raw.replace_nanosecond(0).unwrap();
300        if self.ingress_expiry.as_secs() > 90 {
301            rounded = rounded.replace_second(0).unwrap();
302        }
303        rounded.unix_timestamp_nanos().try_into().unwrap()
304    }
305
306    /// Return the principal of the identity.
307    pub fn get_principal(&self) -> Result<Principal, String> {
308        self.identity.sender()
309    }
310
311    async fn query_endpoint<A>(
312        &self,
313        effective_canister_id: Principal,
314        serialized_bytes: Vec<u8>,
315    ) -> Result<A, AgentError>
316    where
317        A: serde::de::DeserializeOwned,
318    {
319        let _permit = self.concurrent_requests_semaphore.acquire().await;
320        let bytes = self
321            .execute(
322                Method::POST,
323                &format!("api/v2/canister/{}/query", effective_canister_id.to_text()),
324                Some(serialized_bytes),
325            )
326            .await?
327            .1;
328        serde_cbor::from_slice(&bytes).map_err(AgentError::InvalidCborData)
329    }
330
331    async fn read_state_endpoint<A>(
332        &self,
333        effective_canister_id: Principal,
334        serialized_bytes: Vec<u8>,
335    ) -> Result<A, AgentError>
336    where
337        A: serde::de::DeserializeOwned,
338    {
339        let _permit = self.concurrent_requests_semaphore.acquire().await;
340        let endpoint = format!(
341            "api/v2/canister/{}/read_state",
342            effective_canister_id.to_text()
343        );
344        let bytes = self
345            .execute(Method::POST, &endpoint, Some(serialized_bytes))
346            .await?
347            .1;
348        serde_cbor::from_slice(&bytes).map_err(AgentError::InvalidCborData)
349    }
350
351    async fn read_subnet_state_endpoint<A>(
352        &self,
353        subnet_id: Principal,
354        serialized_bytes: Vec<u8>,
355    ) -> Result<A, AgentError>
356    where
357        A: serde::de::DeserializeOwned,
358    {
359        let _permit = self.concurrent_requests_semaphore.acquire().await;
360        let endpoint = format!("api/v2/subnet/{}/read_state", subnet_id.to_text());
361        let bytes = self
362            .execute(Method::POST, &endpoint, Some(serialized_bytes))
363            .await?
364            .1;
365        serde_cbor::from_slice(&bytes).map_err(AgentError::InvalidCborData)
366    }
367
368    async fn call_endpoint(
369        &self,
370        effective_canister_id: Principal,
371        serialized_bytes: Vec<u8>,
372    ) -> Result<TransportCallResponse, AgentError> {
373        let _permit = self.concurrent_requests_semaphore.acquire().await;
374        let endpoint = format!("api/v3/canister/{}/call", effective_canister_id.to_text());
375        let (status_code, response_body) = self
376            .execute(Method::POST, &endpoint, Some(serialized_bytes))
377            .await?;
378
379        if status_code == StatusCode::ACCEPTED {
380            return Ok(TransportCallResponse::Accepted);
381        }
382
383        serde_cbor::from_slice(&response_body).map_err(AgentError::InvalidCborData)
384    }
385
386    /// The simplest way to do a query call; sends a byte array and will return a byte vector.
387    /// The encoding is left as an exercise to the user.
388    #[allow(clippy::too_many_arguments)]
389    async fn query_raw(
390        &self,
391        canister_id: Principal,
392        effective_canister_id: Principal,
393        method_name: String,
394        arg: Vec<u8>,
395        ingress_expiry_datetime: Option<u64>,
396        use_nonce: bool,
397        explicit_verify_query_signatures: Option<bool>,
398    ) -> Result<Vec<u8>, AgentError> {
399        let operation = Operation::Call {
400            canister: canister_id,
401            method: method_name.clone(),
402        };
403        let content = self.query_content(
404            canister_id,
405            method_name,
406            arg,
407            ingress_expiry_datetime,
408            use_nonce,
409        )?;
410        let serialized_bytes = sign_envelope(&content, self.identity.clone())?;
411        self.query_inner(
412            effective_canister_id,
413            serialized_bytes,
414            content.to_request_id(),
415            explicit_verify_query_signatures,
416            operation,
417        )
418        .await
419    }
420
421    /// Send the signed query to the network. Will return a byte vector.
422    /// The bytes will be checked if it is a valid query.
423    /// If you want to inspect the fields of the query call, use [`signed_query_inspect`] before calling this method.
424    pub async fn query_signed(
425        &self,
426        effective_canister_id: Principal,
427        signed_query: Vec<u8>,
428    ) -> Result<Vec<u8>, AgentError> {
429        let envelope: Envelope =
430            serde_cbor::from_slice(&signed_query).map_err(AgentError::InvalidCborData)?;
431        let EnvelopeContent::Query {
432            canister_id,
433            method_name,
434            ..
435        } = &*envelope.content
436        else {
437            return Err(AgentError::CallDataMismatch {
438                field: "request_type".to_string(),
439                value_arg: "query".to_string(),
440                value_cbor: if matches!(*envelope.content, EnvelopeContent::Call { .. }) {
441                    "update"
442                } else {
443                    "read_state"
444                }
445                .to_string(),
446            });
447        };
448        let operation = Operation::Call {
449            canister: *canister_id,
450            method: method_name.clone(),
451        };
452        self.query_inner(
453            effective_canister_id,
454            signed_query,
455            envelope.content.to_request_id(),
456            None,
457            operation,
458        )
459        .await
460    }
461
462    /// Helper function for performing both the query call and possibly a `read_state` to check the subnet node keys.
463    ///
464    /// This should be used instead of `query_endpoint`. No validation is performed on `signed_query`.
465    async fn query_inner(
466        &self,
467        effective_canister_id: Principal,
468        signed_query: Vec<u8>,
469        request_id: RequestId,
470        explicit_verify_query_signatures: Option<bool>,
471        operation: Operation,
472    ) -> Result<Vec<u8>, AgentError> {
473        let response = if explicit_verify_query_signatures.unwrap_or(self.verify_query_signatures) {
474            let (response, mut subnet) = futures_util::try_join!(
475                self.query_endpoint::<QueryResponse>(effective_canister_id, signed_query),
476                self.get_subnet_by_canister(&effective_canister_id)
477            )?;
478            if response.signatures().is_empty() {
479                return Err(AgentError::MissingSignature);
480            } else if response.signatures().len() > subnet.node_keys.len() {
481                return Err(AgentError::TooManySignatures {
482                    had: response.signatures().len(),
483                    needed: subnet.node_keys.len(),
484                });
485            }
486            for signature in response.signatures() {
487                if OffsetDateTime::now_utc()
488                    - OffsetDateTime::from_unix_timestamp_nanos(signature.timestamp.into()).unwrap()
489                    > self.ingress_expiry
490                {
491                    return Err(AgentError::CertificateOutdated(self.ingress_expiry));
492                }
493                let signable = response.signable(request_id, signature.timestamp);
494                let node_key = if let Some(node_key) = subnet.node_keys.get(&signature.identity) {
495                    node_key
496                } else {
497                    subnet = self
498                        .fetch_subnet_by_canister(&effective_canister_id)
499                        .await?;
500                    subnet
501                        .node_keys
502                        .get(&signature.identity)
503                        .ok_or(AgentError::CertificateNotAuthorized())?
504                };
505                if node_key.len() != 44 {
506                    return Err(AgentError::DerKeyLengthMismatch {
507                        expected: 44,
508                        actual: node_key.len(),
509                    });
510                }
511                const DER_PREFIX: [u8; 12] = [48, 42, 48, 5, 6, 3, 43, 101, 112, 3, 33, 0];
512                if node_key[..12] != DER_PREFIX {
513                    return Err(AgentError::DerPrefixMismatch {
514                        expected: DER_PREFIX.to_vec(),
515                        actual: node_key[..12].to_vec(),
516                    });
517                }
518                let pubkey = PublicKey::deserialize_raw(&node_key[12..])
519                    .map_err(|_| AgentError::MalformedPublicKey)?;
520
521                match pubkey.verify_signature(&signable, &signature.signature[..]) {
522                    Ok(()) => (),
523                    Err(SignatureError::InvalidSignature) => {
524                        return Err(AgentError::QuerySignatureVerificationFailed)
525                    }
526                    Err(SignatureError::InvalidLength) => {
527                        return Err(AgentError::MalformedSignature)
528                    }
529                    _ => unreachable!(),
530                }
531            }
532            response
533        } else {
534            self.query_endpoint::<QueryResponse>(effective_canister_id, signed_query)
535                .await?
536        };
537
538        match response {
539            QueryResponse::Replied { reply, .. } => Ok(reply.arg),
540            QueryResponse::Rejected { reject, .. } => Err(AgentError::UncertifiedReject {
541                reject,
542                operation: Some(operation),
543            }),
544        }
545    }
546
547    fn query_content(
548        &self,
549        canister_id: Principal,
550        method_name: String,
551        arg: Vec<u8>,
552        ingress_expiry_datetime: Option<u64>,
553        use_nonce: bool,
554    ) -> Result<EnvelopeContent, AgentError> {
555        Ok(EnvelopeContent::Query {
556            sender: self.identity.sender().map_err(AgentError::SigningError)?,
557            canister_id,
558            method_name,
559            arg,
560            ingress_expiry: ingress_expiry_datetime.unwrap_or_else(|| self.get_expiry_date()),
561            nonce: use_nonce.then(|| self.nonce_factory.generate()).flatten(),
562        })
563    }
564
565    /// The simplest way to do an update call; sends a byte array and will return a response, [`CallResponse`], from the replica.
566    async fn update_raw(
567        &self,
568        canister_id: Principal,
569        effective_canister_id: Principal,
570        method_name: String,
571        arg: Vec<u8>,
572        ingress_expiry_datetime: Option<u64>,
573    ) -> Result<CallResponse<(Vec<u8>, Certificate)>, AgentError> {
574        let nonce = self.nonce_factory.generate();
575        let content = self.update_content(
576            canister_id,
577            method_name.clone(),
578            arg,
579            ingress_expiry_datetime,
580            nonce,
581        )?;
582        let operation = Some(Operation::Call {
583            canister: canister_id,
584            method: method_name,
585        });
586        let request_id = to_request_id(&content)?;
587        let serialized_bytes = sign_envelope(&content, self.identity.clone())?;
588
589        let response_body = self
590            .call_endpoint(effective_canister_id, serialized_bytes)
591            .await?;
592
593        match response_body {
594            TransportCallResponse::Replied { certificate } => {
595                let certificate =
596                    serde_cbor::from_slice(&certificate).map_err(AgentError::InvalidCborData)?;
597
598                self.verify(&certificate, effective_canister_id)?;
599                let status = lookup_request_status(&certificate, &request_id)?;
600
601                match status {
602                    RequestStatusResponse::Replied(reply) => {
603                        Ok(CallResponse::Response((reply.arg, certificate)))
604                    }
605                    RequestStatusResponse::Rejected(reject_response) => {
606                        Err(AgentError::CertifiedReject {
607                            reject: reject_response,
608                            operation,
609                        })?
610                    }
611                    _ => Ok(CallResponse::Poll(request_id)),
612                }
613            }
614            TransportCallResponse::Accepted => Ok(CallResponse::Poll(request_id)),
615            TransportCallResponse::NonReplicatedRejection(reject_response) => {
616                Err(AgentError::UncertifiedReject {
617                    reject: reject_response,
618                    operation,
619                })
620            }
621        }
622    }
623
624    /// Send the signed update to the network. Will return a [`CallResponse<Vec<u8>>`].
625    /// The bytes will be checked to verify that it is a valid update.
626    /// If you want to inspect the fields of the update, use [`signed_update_inspect`] before calling this method.
627    pub async fn update_signed(
628        &self,
629        effective_canister_id: Principal,
630        signed_update: Vec<u8>,
631    ) -> Result<CallResponse<Vec<u8>>, AgentError> {
632        let envelope: Envelope =
633            serde_cbor::from_slice(&signed_update).map_err(AgentError::InvalidCborData)?;
634        let EnvelopeContent::Call {
635            canister_id,
636            method_name,
637            ..
638        } = &*envelope.content
639        else {
640            return Err(AgentError::CallDataMismatch {
641                field: "request_type".to_string(),
642                value_arg: "update".to_string(),
643                value_cbor: if matches!(*envelope.content, EnvelopeContent::Query { .. }) {
644                    "query"
645                } else {
646                    "read_state"
647                }
648                .to_string(),
649            });
650        };
651        let operation = Some(Operation::Call {
652            canister: *canister_id,
653            method: method_name.clone(),
654        });
655        let request_id = to_request_id(&envelope.content)?;
656
657        let response_body = self
658            .call_endpoint(effective_canister_id, signed_update)
659            .await?;
660
661        match response_body {
662            TransportCallResponse::Replied { certificate } => {
663                let certificate =
664                    serde_cbor::from_slice(&certificate).map_err(AgentError::InvalidCborData)?;
665
666                self.verify(&certificate, effective_canister_id)?;
667                let status = lookup_request_status(&certificate, &request_id)?;
668
669                match status {
670                    RequestStatusResponse::Replied(reply) => Ok(CallResponse::Response(reply.arg)),
671                    RequestStatusResponse::Rejected(reject_response) => {
672                        Err(AgentError::CertifiedReject {
673                            reject: reject_response,
674                            operation,
675                        })?
676                    }
677                    _ => Ok(CallResponse::Poll(request_id)),
678                }
679            }
680            TransportCallResponse::Accepted => Ok(CallResponse::Poll(request_id)),
681            TransportCallResponse::NonReplicatedRejection(reject_response) => {
682                Err(AgentError::UncertifiedReject {
683                    reject: reject_response,
684                    operation,
685                })
686            }
687        }
688    }
689
690    fn update_content(
691        &self,
692        canister_id: Principal,
693        method_name: String,
694        arg: Vec<u8>,
695        ingress_expiry_datetime: Option<u64>,
696        nonce: Option<Vec<u8>>,
697    ) -> Result<EnvelopeContent, AgentError> {
698        Ok(EnvelopeContent::Call {
699            canister_id,
700            method_name,
701            arg,
702            nonce,
703            sender: self.identity.sender().map_err(AgentError::SigningError)?,
704            ingress_expiry: ingress_expiry_datetime.unwrap_or_else(|| self.get_expiry_date()),
705        })
706    }
707
708    fn get_retry_policy(&self) -> ExponentialBackoff<SystemClock> {
709        ExponentialBackoffBuilder::new()
710            .with_initial_interval(Duration::from_millis(500))
711            .with_max_interval(Duration::from_secs(1))
712            .with_multiplier(1.4)
713            .with_max_elapsed_time(Some(self.max_polling_time))
714            .build()
715    }
716
717    /// Wait for `request_status` to return a Replied response and return the arg.
718    pub async fn wait_signed(
719        &self,
720        request_id: &RequestId,
721        effective_canister_id: Principal,
722        signed_request_status: Vec<u8>,
723    ) -> Result<(Vec<u8>, Certificate), AgentError> {
724        let mut retry_policy = self.get_retry_policy();
725
726        let mut request_accepted = false;
727        let (resp, cert) = self
728            .request_status_signed(
729                request_id,
730                effective_canister_id,
731                signed_request_status.clone(),
732            )
733            .await?;
734        loop {
735            match resp {
736                RequestStatusResponse::Unknown => {}
737
738                RequestStatusResponse::Received | RequestStatusResponse::Processing => {
739                    if !request_accepted {
740                        retry_policy.reset();
741                        request_accepted = true;
742                    }
743                }
744
745                RequestStatusResponse::Replied(ReplyResponse { arg, .. }) => {
746                    return Ok((arg, cert))
747                }
748
749                RequestStatusResponse::Rejected(response) => {
750                    return Err(AgentError::CertifiedReject {
751                        reject: response,
752                        operation: None,
753                    })
754                }
755
756                RequestStatusResponse::Done => {
757                    return Err(AgentError::RequestStatusDoneNoReply(String::from(
758                        *request_id,
759                    )))
760                }
761            };
762
763            match retry_policy.next_backoff() {
764                Some(duration) => crate::util::sleep(duration).await,
765
766                None => return Err(AgentError::TimeoutWaitingForResponse()),
767            }
768        }
769    }
770
771    /// Call `request_status` on the `RequestId` in a loop and return the response as a byte vector.
772    pub async fn wait(
773        &self,
774        request_id: &RequestId,
775        effective_canister_id: Principal,
776    ) -> Result<(Vec<u8>, Certificate), AgentError> {
777        self.wait_inner(request_id, effective_canister_id, None)
778            .await
779    }
780
781    async fn wait_inner(
782        &self,
783        request_id: &RequestId,
784        effective_canister_id: Principal,
785        operation: Option<Operation>,
786    ) -> Result<(Vec<u8>, Certificate), AgentError> {
787        let mut retry_policy = self.get_retry_policy();
788
789        let mut request_accepted = false;
790        loop {
791            let (resp, cert) = self
792                .request_status_raw(request_id, effective_canister_id)
793                .await?;
794            match resp {
795                RequestStatusResponse::Unknown => {}
796
797                RequestStatusResponse::Received | RequestStatusResponse::Processing => {
798                    if !request_accepted {
799                        // The system will return RequestStatusResponse::Unknown
800                        // until the request is accepted
801                        // and we generally cannot know how long that will take.
802                        // State transitions between Received and Processing may be
803                        // instantaneous. Therefore, once we know the request is accepted,
804                        // we should restart the backoff so the request does not time out.
805
806                        retry_policy.reset();
807                        request_accepted = true;
808                    }
809                }
810
811                RequestStatusResponse::Replied(ReplyResponse { arg, .. }) => {
812                    return Ok((arg, cert))
813                }
814
815                RequestStatusResponse::Rejected(response) => {
816                    return Err(AgentError::CertifiedReject {
817                        reject: response,
818                        operation,
819                    })
820                }
821
822                RequestStatusResponse::Done => {
823                    return Err(AgentError::RequestStatusDoneNoReply(String::from(
824                        *request_id,
825                    )))
826                }
827            };
828
829            match retry_policy.next_backoff() {
830                Some(duration) => crate::util::sleep(duration).await,
831
832                None => return Err(AgentError::TimeoutWaitingForResponse()),
833            }
834        }
835    }
836
837    /// Request the raw state tree directly, under an effective canister ID.
838    /// See [the protocol docs](https://internetcomputer.org/docs/current/references/ic-interface-spec#http-read-state) for more information.
839    pub async fn read_state_raw(
840        &self,
841        paths: Vec<Vec<Label>>,
842        effective_canister_id: Principal,
843    ) -> Result<Certificate, AgentError> {
844        let content = self.read_state_content(paths)?;
845        let serialized_bytes = sign_envelope(&content, self.identity.clone())?;
846
847        let read_state_response: ReadStateResponse = self
848            .read_state_endpoint(effective_canister_id, serialized_bytes)
849            .await?;
850        let cert: Certificate = serde_cbor::from_slice(&read_state_response.certificate)
851            .map_err(AgentError::InvalidCborData)?;
852        self.verify(&cert, effective_canister_id)?;
853        Ok(cert)
854    }
855
856    /// Request the raw state tree directly, under a subnet ID.
857    /// See [the protocol docs](https://internetcomputer.org/docs/current/references/ic-interface-spec#http-read-state) for more information.
858    pub async fn read_subnet_state_raw(
859        &self,
860        paths: Vec<Vec<Label>>,
861        subnet_id: Principal,
862    ) -> Result<Certificate, AgentError> {
863        let content = self.read_state_content(paths)?;
864        let serialized_bytes = sign_envelope(&content, self.identity.clone())?;
865
866        let read_state_response: ReadStateResponse = self
867            .read_subnet_state_endpoint(subnet_id, serialized_bytes)
868            .await?;
869        let cert: Certificate = serde_cbor::from_slice(&read_state_response.certificate)
870            .map_err(AgentError::InvalidCborData)?;
871        self.verify_for_subnet(&cert, subnet_id)?;
872        Ok(cert)
873    }
874
875    fn read_state_content(&self, paths: Vec<Vec<Label>>) -> Result<EnvelopeContent, AgentError> {
876        Ok(EnvelopeContent::ReadState {
877            sender: self.identity.sender().map_err(AgentError::SigningError)?,
878            paths,
879            ingress_expiry: self.get_expiry_date(),
880        })
881    }
882
883    /// Verify a certificate, checking delegation if present.
884    /// Only passes if the certificate also has authority over the canister.
885    pub fn verify(
886        &self,
887        cert: &Certificate,
888        effective_canister_id: Principal,
889    ) -> Result<(), AgentError> {
890        self.verify_cert(cert, effective_canister_id)?;
891        self.verify_cert_timestamp(cert)?;
892        Ok(())
893    }
894
895    fn verify_cert(
896        &self,
897        cert: &Certificate,
898        effective_canister_id: Principal,
899    ) -> Result<(), AgentError> {
900        let sig = &cert.signature;
901
902        let root_hash = cert.tree.digest();
903        let mut msg = vec![];
904        msg.extend_from_slice(IC_STATE_ROOT_DOMAIN_SEPARATOR);
905        msg.extend_from_slice(&root_hash);
906
907        let der_key = self.check_delegation(&cert.delegation, effective_canister_id)?;
908        let key = extract_der(der_key)?;
909
910        ic_verify_bls_signature::verify_bls_signature(sig, &msg, &key)
911            .map_err(|_| AgentError::CertificateVerificationFailed())?;
912        Ok(())
913    }
914
915    /// Verify a certificate, checking delegation if present.
916    /// Only passes if the certificate is for the specified subnet.
917    pub fn verify_for_subnet(
918        &self,
919        cert: &Certificate,
920        subnet_id: Principal,
921    ) -> Result<(), AgentError> {
922        self.verify_cert_for_subnet(cert, subnet_id)?;
923        self.verify_cert_timestamp(cert)?;
924        Ok(())
925    }
926
927    fn verify_cert_for_subnet(
928        &self,
929        cert: &Certificate,
930        subnet_id: Principal,
931    ) -> Result<(), AgentError> {
932        let sig = &cert.signature;
933
934        let root_hash = cert.tree.digest();
935        let mut msg = vec![];
936        msg.extend_from_slice(IC_STATE_ROOT_DOMAIN_SEPARATOR);
937        msg.extend_from_slice(&root_hash);
938
939        let der_key = self.check_delegation_for_subnet(&cert.delegation, subnet_id)?;
940        let key = extract_der(der_key)?;
941
942        ic_verify_bls_signature::verify_bls_signature(sig, &msg, &key)
943            .map_err(|_| AgentError::CertificateVerificationFailed())?;
944        Ok(())
945    }
946
947    fn verify_cert_timestamp(&self, cert: &Certificate) -> Result<(), AgentError> {
948        let time = lookup_time(cert)?;
949        if (OffsetDateTime::now_utc()
950            - OffsetDateTime::from_unix_timestamp_nanos(time.into()).unwrap())
951        .abs()
952            > self.ingress_expiry
953        {
954            Err(AgentError::CertificateOutdated(self.ingress_expiry))
955        } else {
956            Ok(())
957        }
958    }
959
960    fn check_delegation(
961        &self,
962        delegation: &Option<Delegation>,
963        effective_canister_id: Principal,
964    ) -> Result<Vec<u8>, AgentError> {
965        match delegation {
966            None => Ok(self.read_root_key()),
967            Some(delegation) => {
968                let cert: Certificate = serde_cbor::from_slice(&delegation.certificate)
969                    .map_err(AgentError::InvalidCborData)?;
970                if cert.delegation.is_some() {
971                    return Err(AgentError::CertificateHasTooManyDelegations);
972                }
973                self.verify_cert(&cert, effective_canister_id)?;
974                let canister_range_lookup = [
975                    "subnet".as_bytes(),
976                    delegation.subnet_id.as_ref(),
977                    "canister_ranges".as_bytes(),
978                ];
979                let canister_range = lookup_value(&cert.tree, canister_range_lookup)?;
980                let ranges: Vec<(Principal, Principal)> =
981                    serde_cbor::from_slice(canister_range).map_err(AgentError::InvalidCborData)?;
982                if !principal_is_within_ranges(&effective_canister_id, &ranges[..]) {
983                    // the certificate is not authorized to answer calls for this canister
984                    return Err(AgentError::CertificateNotAuthorized());
985                }
986
987                let public_key_path = [
988                    "subnet".as_bytes(),
989                    delegation.subnet_id.as_ref(),
990                    "public_key".as_bytes(),
991                ];
992                lookup_value(&cert.tree, public_key_path).map(<[u8]>::to_vec)
993            }
994        }
995    }
996
997    fn check_delegation_for_subnet(
998        &self,
999        delegation: &Option<Delegation>,
1000        subnet_id: Principal,
1001    ) -> Result<Vec<u8>, AgentError> {
1002        match delegation {
1003            None => Ok(self.read_root_key()),
1004            Some(delegation) => {
1005                let cert: Certificate = serde_cbor::from_slice(&delegation.certificate)
1006                    .map_err(AgentError::InvalidCborData)?;
1007                if cert.delegation.is_some() {
1008                    return Err(AgentError::CertificateHasTooManyDelegations);
1009                }
1010                self.verify_cert_for_subnet(&cert, subnet_id)?;
1011                let public_key_path = [
1012                    "subnet".as_bytes(),
1013                    delegation.subnet_id.as_ref(),
1014                    "public_key".as_bytes(),
1015                ];
1016                let pk = lookup_value(&cert.tree, public_key_path)
1017                    .map_err(|_| AgentError::CertificateNotAuthorized())?
1018                    .to_vec();
1019                Ok(pk)
1020            }
1021        }
1022    }
1023
1024    /// Request information about a particular canister for a single state subkey.
1025    /// See [the protocol docs](https://internetcomputer.org/docs/current/references/ic-interface-spec#state-tree-canister-information) for more information.
1026    pub async fn read_state_canister_info(
1027        &self,
1028        canister_id: Principal,
1029        path: &str,
1030    ) -> Result<Vec<u8>, AgentError> {
1031        let paths: Vec<Vec<Label>> = vec![vec![
1032            "canister".into(),
1033            Label::from_bytes(canister_id.as_slice()),
1034            path.into(),
1035        ]];
1036
1037        let cert = self.read_state_raw(paths, canister_id).await?;
1038
1039        lookup_canister_info(cert, canister_id, path)
1040    }
1041
1042    /// Request the controller list of a given canister.
1043    pub async fn read_state_canister_controllers(
1044        &self,
1045        canister_id: Principal,
1046    ) -> Result<Vec<Principal>, AgentError> {
1047        let blob = self
1048            .read_state_canister_info(canister_id, "controllers")
1049            .await?;
1050        let controllers: Vec<Principal> =
1051            serde_cbor::from_slice(&blob).map_err(AgentError::InvalidCborData)?;
1052        Ok(controllers)
1053    }
1054
1055    /// Request the module hash of a given canister.
1056    pub async fn read_state_canister_module_hash(
1057        &self,
1058        canister_id: Principal,
1059    ) -> Result<Vec<u8>, AgentError> {
1060        self.read_state_canister_info(canister_id, "module_hash")
1061            .await
1062    }
1063
1064    /// Request the bytes of the canister's custom section `icp:public <path>` or `icp:private <path>`.
1065    pub async fn read_state_canister_metadata(
1066        &self,
1067        canister_id: Principal,
1068        path: &str,
1069    ) -> Result<Vec<u8>, AgentError> {
1070        let paths: Vec<Vec<Label>> = vec![vec![
1071            "canister".into(),
1072            Label::from_bytes(canister_id.as_slice()),
1073            "metadata".into(),
1074            path.into(),
1075        ]];
1076
1077        let cert = self.read_state_raw(paths, canister_id).await?;
1078
1079        lookup_canister_metadata(cert, canister_id, path)
1080    }
1081
1082    /// Request a list of metrics about the subnet.
1083    pub async fn read_state_subnet_metrics(
1084        &self,
1085        subnet_id: Principal,
1086    ) -> Result<SubnetMetrics, AgentError> {
1087        let paths = vec![vec![
1088            "subnet".into(),
1089            Label::from_bytes(subnet_id.as_slice()),
1090            "metrics".into(),
1091        ]];
1092        let cert = self.read_subnet_state_raw(paths, subnet_id).await?;
1093        lookup_subnet_metrics(cert, subnet_id)
1094    }
1095
1096    /// Fetches the status of a particular request by its ID.
1097    pub async fn request_status_raw(
1098        &self,
1099        request_id: &RequestId,
1100        effective_canister_id: Principal,
1101    ) -> Result<(RequestStatusResponse, Certificate), AgentError> {
1102        let paths: Vec<Vec<Label>> =
1103            vec![vec!["request_status".into(), request_id.to_vec().into()]];
1104
1105        let cert = self.read_state_raw(paths, effective_canister_id).await?;
1106
1107        Ok((lookup_request_status(&cert, request_id)?, cert))
1108    }
1109
1110    /// Send the signed `request_status` to the network. Will return [`RequestStatusResponse`].
1111    /// The bytes will be checked to verify that it is a valid `request_status`.
1112    /// If you want to inspect the fields of the `request_status`, use [`signed_request_status_inspect`] before calling this method.
1113    pub async fn request_status_signed(
1114        &self,
1115        request_id: &RequestId,
1116        effective_canister_id: Principal,
1117        signed_request_status: Vec<u8>,
1118    ) -> Result<(RequestStatusResponse, Certificate), AgentError> {
1119        let _envelope: Envelope =
1120            serde_cbor::from_slice(&signed_request_status).map_err(AgentError::InvalidCborData)?;
1121        let read_state_response: ReadStateResponse = self
1122            .read_state_endpoint(effective_canister_id, signed_request_status)
1123            .await?;
1124
1125        let cert: Certificate = serde_cbor::from_slice(&read_state_response.certificate)
1126            .map_err(AgentError::InvalidCborData)?;
1127        self.verify(&cert, effective_canister_id)?;
1128        Ok((lookup_request_status(&cert, request_id)?, cert))
1129    }
1130
1131    /// Returns an `UpdateBuilder` enabling the construction of an update call without
1132    /// passing all arguments.
1133    pub fn update<S: Into<String>>(
1134        &self,
1135        canister_id: &Principal,
1136        method_name: S,
1137    ) -> UpdateBuilder {
1138        UpdateBuilder::new(self, *canister_id, method_name.into())
1139    }
1140
1141    /// Calls and returns the information returned by the status endpoint of a replica.
1142    pub async fn status(&self) -> Result<Status, AgentError> {
1143        let endpoint = "api/v2/status";
1144        let bytes = self.execute(Method::GET, endpoint, None).await?.1;
1145
1146        let cbor: serde_cbor::Value =
1147            serde_cbor::from_slice(&bytes).map_err(AgentError::InvalidCborData)?;
1148
1149        Status::try_from(&cbor).map_err(|_| AgentError::InvalidReplicaStatus)
1150    }
1151
1152    /// Returns a `QueryBuilder` enabling the construction of a query call without
1153    /// passing all arguments.
1154    pub fn query<S: Into<String>>(&self, canister_id: &Principal, method_name: S) -> QueryBuilder {
1155        QueryBuilder::new(self, *canister_id, method_name.into())
1156    }
1157
1158    /// Sign a `request_status` call. This will return a [`signed::SignedRequestStatus`]
1159    /// which contains all fields of the `request_status` and the signed `request_status` in CBOR encoding
1160    pub fn sign_request_status(
1161        &self,
1162        effective_canister_id: Principal,
1163        request_id: RequestId,
1164    ) -> Result<SignedRequestStatus, AgentError> {
1165        let paths: Vec<Vec<Label>> =
1166            vec![vec!["request_status".into(), request_id.to_vec().into()]];
1167        let read_state_content = self.read_state_content(paths)?;
1168        let signed_request_status = sign_envelope(&read_state_content, self.identity.clone())?;
1169        let ingress_expiry = read_state_content.ingress_expiry();
1170        let sender = *read_state_content.sender();
1171        Ok(SignedRequestStatus {
1172            ingress_expiry,
1173            sender,
1174            effective_canister_id,
1175            request_id,
1176            signed_request_status,
1177        })
1178    }
1179
1180    async fn get_subnet_by_canister(
1181        &self,
1182        canister: &Principal,
1183    ) -> Result<Arc<Subnet>, AgentError> {
1184        let subnet = self
1185            .subnet_key_cache
1186            .lock()
1187            .unwrap()
1188            .get_subnet_by_canister(canister);
1189        if let Some(subnet) = subnet {
1190            Ok(subnet)
1191        } else {
1192            self.fetch_subnet_by_canister(canister).await
1193        }
1194    }
1195
1196    /// Retrieve all existing API boundary nodes from the state tree via endpoint `/api/v2/canister/<effective_canister_id>/read_state`
1197    pub async fn fetch_api_boundary_nodes_by_canister_id(
1198        &self,
1199        canister_id: Principal,
1200    ) -> Result<Vec<ApiBoundaryNode>, AgentError> {
1201        let paths = vec![vec!["api_boundary_nodes".into()]];
1202        let certificate = self.read_state_raw(paths, canister_id).await?;
1203        let api_boundary_nodes = lookup_api_boundary_nodes(certificate)?;
1204        Ok(api_boundary_nodes)
1205    }
1206
1207    /// Retrieve all existing API boundary nodes from the state tree via endpoint `/api/v2/subnet/<subnet_id>/read_state`
1208    pub async fn fetch_api_boundary_nodes_by_subnet_id(
1209        &self,
1210        subnet_id: Principal,
1211    ) -> Result<Vec<ApiBoundaryNode>, AgentError> {
1212        let paths = vec![vec!["api_boundary_nodes".into()]];
1213        let certificate = self.read_subnet_state_raw(paths, subnet_id).await?;
1214        let api_boundary_nodes = lookup_api_boundary_nodes(certificate)?;
1215        Ok(api_boundary_nodes)
1216    }
1217
1218    async fn fetch_subnet_by_canister(
1219        &self,
1220        canister: &Principal,
1221    ) -> Result<Arc<Subnet>, AgentError> {
1222        let cert = self
1223            .read_state_raw(vec![vec!["subnet".into()]], *canister)
1224            .await?;
1225
1226        let (subnet_id, subnet) = lookup_subnet(&cert, &self.root_key.read().unwrap())?;
1227        let subnet = Arc::new(subnet);
1228        self.subnet_key_cache
1229            .lock()
1230            .unwrap()
1231            .insert_subnet(subnet_id, subnet.clone());
1232        Ok(subnet)
1233    }
1234
1235    async fn request(
1236        &self,
1237        method: Method,
1238        endpoint: &str,
1239        body: Option<Vec<u8>>,
1240    ) -> Result<(StatusCode, HeaderMap, Vec<u8>), AgentError> {
1241        let body = body.map(Bytes::from);
1242
1243        let create_request_with_generated_url = || -> Result<http::Request<Bytes>, AgentError> {
1244            let url = self.route_provider.route()?.join(endpoint)?;
1245            let uri = Uri::from_str(url.as_str())
1246                .map_err(|e| AgentError::InvalidReplicaUrl(e.to_string()))?;
1247            let body = body.clone().unwrap_or_default();
1248            let request = http::Request::builder()
1249                .method(method.clone())
1250                .uri(uri)
1251                .header(CONTENT_TYPE, "application/cbor")
1252                .body(body)
1253                .map_err(|e| {
1254                    AgentError::TransportError(TransportError::Generic(format!(
1255                        "unable to create request: {e:#}"
1256                    )))
1257                })?;
1258
1259            Ok(request)
1260        };
1261
1262        let response = self
1263            .client
1264            .call(
1265                &create_request_with_generated_url,
1266                self.max_tcp_error_retries,
1267                self.max_response_body_size,
1268            )
1269            .await?;
1270
1271        let (parts, body) = response.into_parts();
1272
1273        Ok((parts.status, parts.headers, body.to_vec()))
1274    }
1275
1276    async fn execute(
1277        &self,
1278        method: Method,
1279        endpoint: &str,
1280        body: Option<Vec<u8>>,
1281    ) -> Result<(StatusCode, Vec<u8>), AgentError> {
1282        let request_result = self.request(method.clone(), endpoint, body.clone()).await?;
1283
1284        let status = request_result.0;
1285        let headers = request_result.1;
1286        let body = request_result.2;
1287
1288        if status.is_client_error() || status.is_server_error() {
1289            Err(AgentError::HttpError(HttpErrorPayload {
1290                status: status.into(),
1291                content_type: headers
1292                    .get(CONTENT_TYPE)
1293                    .and_then(|value| value.to_str().ok())
1294                    .map(str::to_string),
1295                content: body,
1296            }))
1297        } else if !(status == StatusCode::OK || status == StatusCode::ACCEPTED) {
1298            Err(AgentError::InvalidHttpResponse(format!(
1299                "Expected `200`, `202`, 4xx`, or `5xx` HTTP status code. Got: {status}",
1300            )))
1301        } else {
1302            Ok((status, body))
1303        }
1304    }
1305}
1306
1307// Checks if a principal is contained within a list of principal ranges
1308// A range is a tuple: (low: Principal, high: Principal), as described here: https://internetcomputer.org/docs/current/references/ic-interface-spec#state-tree-subnet
1309fn principal_is_within_ranges(principal: &Principal, ranges: &[(Principal, Principal)]) -> bool {
1310    ranges
1311        .iter()
1312        .any(|r| principal >= &r.0 && principal <= &r.1)
1313}
1314
1315fn sign_envelope(
1316    content: &EnvelopeContent,
1317    identity: Arc<dyn Identity>,
1318) -> Result<Vec<u8>, AgentError> {
1319    let signature = identity.sign(content).map_err(AgentError::SigningError)?;
1320
1321    let envelope = Envelope {
1322        content: Cow::Borrowed(content),
1323        sender_pubkey: signature.public_key,
1324        sender_sig: signature.signature,
1325        sender_delegation: signature.delegations,
1326    };
1327
1328    let mut serialized_bytes = Vec::new();
1329    let mut serializer = serde_cbor::Serializer::new(&mut serialized_bytes);
1330    serializer.self_describe()?;
1331    envelope.serialize(&mut serializer)?;
1332
1333    Ok(serialized_bytes)
1334}
1335
1336/// Inspect the bytes to be sent as a query
1337/// Return Ok only when the bytes can be deserialized as a query and all fields match with the arguments
1338pub fn signed_query_inspect(
1339    sender: Principal,
1340    canister_id: Principal,
1341    method_name: &str,
1342    arg: &[u8],
1343    ingress_expiry: u64,
1344    signed_query: Vec<u8>,
1345) -> Result<(), AgentError> {
1346    let envelope: Envelope =
1347        serde_cbor::from_slice(&signed_query).map_err(AgentError::InvalidCborData)?;
1348    match envelope.content.as_ref() {
1349        EnvelopeContent::Query {
1350            ingress_expiry: ingress_expiry_cbor,
1351            sender: sender_cbor,
1352            canister_id: canister_id_cbor,
1353            method_name: method_name_cbor,
1354            arg: arg_cbor,
1355            nonce: _nonce,
1356        } => {
1357            if ingress_expiry != *ingress_expiry_cbor {
1358                return Err(AgentError::CallDataMismatch {
1359                    field: "ingress_expiry".to_string(),
1360                    value_arg: ingress_expiry.to_string(),
1361                    value_cbor: ingress_expiry_cbor.to_string(),
1362                });
1363            }
1364            if sender != *sender_cbor {
1365                return Err(AgentError::CallDataMismatch {
1366                    field: "sender".to_string(),
1367                    value_arg: sender.to_string(),
1368                    value_cbor: sender_cbor.to_string(),
1369                });
1370            }
1371            if canister_id != *canister_id_cbor {
1372                return Err(AgentError::CallDataMismatch {
1373                    field: "canister_id".to_string(),
1374                    value_arg: canister_id.to_string(),
1375                    value_cbor: canister_id_cbor.to_string(),
1376                });
1377            }
1378            if method_name != *method_name_cbor {
1379                return Err(AgentError::CallDataMismatch {
1380                    field: "method_name".to_string(),
1381                    value_arg: method_name.to_string(),
1382                    value_cbor: method_name_cbor.clone(),
1383                });
1384            }
1385            if arg != *arg_cbor {
1386                return Err(AgentError::CallDataMismatch {
1387                    field: "arg".to_string(),
1388                    value_arg: format!("{arg:?}"),
1389                    value_cbor: format!("{arg_cbor:?}"),
1390                });
1391            }
1392        }
1393        EnvelopeContent::Call { .. } => {
1394            return Err(AgentError::CallDataMismatch {
1395                field: "request_type".to_string(),
1396                value_arg: "query".to_string(),
1397                value_cbor: "call".to_string(),
1398            })
1399        }
1400        EnvelopeContent::ReadState { .. } => {
1401            return Err(AgentError::CallDataMismatch {
1402                field: "request_type".to_string(),
1403                value_arg: "query".to_string(),
1404                value_cbor: "read_state".to_string(),
1405            })
1406        }
1407    }
1408    Ok(())
1409}
1410
1411/// Inspect the bytes to be sent as an update
1412/// Return Ok only when the bytes can be deserialized as an update and all fields match with the arguments
1413pub fn signed_update_inspect(
1414    sender: Principal,
1415    canister_id: Principal,
1416    method_name: &str,
1417    arg: &[u8],
1418    ingress_expiry: u64,
1419    signed_update: Vec<u8>,
1420) -> Result<(), AgentError> {
1421    let envelope: Envelope =
1422        serde_cbor::from_slice(&signed_update).map_err(AgentError::InvalidCborData)?;
1423    match envelope.content.as_ref() {
1424        EnvelopeContent::Call {
1425            nonce: _nonce,
1426            ingress_expiry: ingress_expiry_cbor,
1427            sender: sender_cbor,
1428            canister_id: canister_id_cbor,
1429            method_name: method_name_cbor,
1430            arg: arg_cbor,
1431        } => {
1432            if ingress_expiry != *ingress_expiry_cbor {
1433                return Err(AgentError::CallDataMismatch {
1434                    field: "ingress_expiry".to_string(),
1435                    value_arg: ingress_expiry.to_string(),
1436                    value_cbor: ingress_expiry_cbor.to_string(),
1437                });
1438            }
1439            if sender != *sender_cbor {
1440                return Err(AgentError::CallDataMismatch {
1441                    field: "sender".to_string(),
1442                    value_arg: sender.to_string(),
1443                    value_cbor: sender_cbor.to_string(),
1444                });
1445            }
1446            if canister_id != *canister_id_cbor {
1447                return Err(AgentError::CallDataMismatch {
1448                    field: "canister_id".to_string(),
1449                    value_arg: canister_id.to_string(),
1450                    value_cbor: canister_id_cbor.to_string(),
1451                });
1452            }
1453            if method_name != *method_name_cbor {
1454                return Err(AgentError::CallDataMismatch {
1455                    field: "method_name".to_string(),
1456                    value_arg: method_name.to_string(),
1457                    value_cbor: method_name_cbor.clone(),
1458                });
1459            }
1460            if arg != *arg_cbor {
1461                return Err(AgentError::CallDataMismatch {
1462                    field: "arg".to_string(),
1463                    value_arg: format!("{arg:?}"),
1464                    value_cbor: format!("{arg_cbor:?}"),
1465                });
1466            }
1467        }
1468        EnvelopeContent::ReadState { .. } => {
1469            return Err(AgentError::CallDataMismatch {
1470                field: "request_type".to_string(),
1471                value_arg: "call".to_string(),
1472                value_cbor: "read_state".to_string(),
1473            })
1474        }
1475        EnvelopeContent::Query { .. } => {
1476            return Err(AgentError::CallDataMismatch {
1477                field: "request_type".to_string(),
1478                value_arg: "call".to_string(),
1479                value_cbor: "query".to_string(),
1480            })
1481        }
1482    }
1483    Ok(())
1484}
1485
1486/// Inspect the bytes to be sent as a `request_status`
1487/// Return Ok only when the bytes can be deserialized as a `request_status` and all fields match with the arguments
1488pub fn signed_request_status_inspect(
1489    sender: Principal,
1490    request_id: &RequestId,
1491    ingress_expiry: u64,
1492    signed_request_status: Vec<u8>,
1493) -> Result<(), AgentError> {
1494    let paths: Vec<Vec<Label>> = vec![vec!["request_status".into(), request_id.to_vec().into()]];
1495    let envelope: Envelope =
1496        serde_cbor::from_slice(&signed_request_status).map_err(AgentError::InvalidCborData)?;
1497    match envelope.content.as_ref() {
1498        EnvelopeContent::ReadState {
1499            ingress_expiry: ingress_expiry_cbor,
1500            sender: sender_cbor,
1501            paths: paths_cbor,
1502        } => {
1503            if ingress_expiry != *ingress_expiry_cbor {
1504                return Err(AgentError::CallDataMismatch {
1505                    field: "ingress_expiry".to_string(),
1506                    value_arg: ingress_expiry.to_string(),
1507                    value_cbor: ingress_expiry_cbor.to_string(),
1508                });
1509            }
1510            if sender != *sender_cbor {
1511                return Err(AgentError::CallDataMismatch {
1512                    field: "sender".to_string(),
1513                    value_arg: sender.to_string(),
1514                    value_cbor: sender_cbor.to_string(),
1515                });
1516            }
1517
1518            if paths != *paths_cbor {
1519                return Err(AgentError::CallDataMismatch {
1520                    field: "paths".to_string(),
1521                    value_arg: format!("{paths:?}"),
1522                    value_cbor: format!("{paths_cbor:?}"),
1523                });
1524            }
1525        }
1526        EnvelopeContent::Query { .. } => {
1527            return Err(AgentError::CallDataMismatch {
1528                field: "request_type".to_string(),
1529                value_arg: "read_state".to_string(),
1530                value_cbor: "query".to_string(),
1531            })
1532        }
1533        EnvelopeContent::Call { .. } => {
1534            return Err(AgentError::CallDataMismatch {
1535                field: "request_type".to_string(),
1536                value_arg: "read_state".to_string(),
1537                value_cbor: "call".to_string(),
1538            })
1539        }
1540    }
1541    Ok(())
1542}
1543
1544#[derive(Clone)]
1545struct SubnetCache {
1546    subnets: TimedCache<Principal, Arc<Subnet>>,
1547    canister_index: RangeInclusiveMap<Principal, Principal, PrincipalStep>,
1548}
1549
1550impl SubnetCache {
1551    fn new() -> Self {
1552        Self {
1553            subnets: TimedCache::with_lifespan(300),
1554            canister_index: RangeInclusiveMap::new_with_step_fns(),
1555        }
1556    }
1557
1558    fn get_subnet_by_canister(&mut self, canister: &Principal) -> Option<Arc<Subnet>> {
1559        self.canister_index
1560            .get(canister)
1561            .and_then(|subnet_id| self.subnets.cache_get(subnet_id).cloned())
1562            .filter(|subnet| subnet.canister_ranges.contains(canister))
1563    }
1564
1565    fn insert_subnet(&mut self, subnet_id: Principal, subnet: Arc<Subnet>) {
1566        self.subnets.cache_set(subnet_id, subnet.clone());
1567        for range in subnet.canister_ranges.iter() {
1568            self.canister_index.insert(range.clone(), subnet_id);
1569        }
1570    }
1571}
1572
1573#[derive(Clone, Copy)]
1574struct PrincipalStep;
1575
1576impl StepFns<Principal> for PrincipalStep {
1577    fn add_one(start: &Principal) -> Principal {
1578        let bytes = start.as_slice();
1579        let mut arr = [0; 29];
1580        arr[..bytes.len()].copy_from_slice(bytes);
1581        for byte in arr[..bytes.len() - 1].iter_mut().rev() {
1582            *byte = byte.wrapping_add(1);
1583            if *byte != 0 {
1584                break;
1585            }
1586        }
1587        Principal::from_slice(&arr[..bytes.len()])
1588    }
1589    fn sub_one(start: &Principal) -> Principal {
1590        let bytes = start.as_slice();
1591        let mut arr = [0; 29];
1592        arr[..bytes.len()].copy_from_slice(bytes);
1593        for byte in arr[..bytes.len() - 1].iter_mut().rev() {
1594            *byte = byte.wrapping_sub(1);
1595            if *byte != 255 {
1596                break;
1597            }
1598        }
1599        Principal::from_slice(&arr[..bytes.len()])
1600    }
1601}
1602
1603#[derive(Clone)]
1604pub(crate) struct Subnet {
1605    // This key is just fetched for completeness. Do not actually use this value as it is not authoritative in case of a rogue subnet.
1606    // If a future agent needs to know the subnet key then it should fetch /subnet from the *root* subnet.
1607    _key: Vec<u8>,
1608    node_keys: HashMap<Principal, Vec<u8>>,
1609    canister_ranges: RangeInclusiveSet<Principal, PrincipalStep>,
1610}
1611
1612/// API boundary node, which routes /api calls to IC replica nodes.
1613#[derive(Debug, Clone)]
1614pub struct ApiBoundaryNode {
1615    /// Domain name
1616    pub domain: String,
1617    /// IPv6 address in the hexadecimal notation with colons.
1618    pub ipv6_address: String,
1619    /// IPv4 address in the dotted-decimal notation.
1620    pub ipv4_address: Option<String>,
1621}
1622
1623/// A query request builder.
1624///
1625/// This makes it easier to do query calls without actually passing all arguments.
1626#[derive(Debug, Clone)]
1627#[non_exhaustive]
1628pub struct QueryBuilder<'agent> {
1629    agent: &'agent Agent,
1630    /// The [effective canister ID](https://internetcomputer.org/docs/current/references/ic-interface-spec#http-effective-canister-id) of the destination.
1631    pub effective_canister_id: Principal,
1632    /// The principal ID of the canister being called.
1633    pub canister_id: Principal,
1634    /// The name of the canister method being called.
1635    pub method_name: String,
1636    /// The argument blob to be passed to the method.
1637    pub arg: Vec<u8>,
1638    /// The Unix timestamp that the request will expire at.
1639    pub ingress_expiry_datetime: Option<u64>,
1640    /// Whether to include a nonce with the message.
1641    pub use_nonce: bool,
1642}
1643
1644impl<'agent> QueryBuilder<'agent> {
1645    /// Creates a new query builder with an agent for a particular canister method.
1646    pub fn new(agent: &'agent Agent, canister_id: Principal, method_name: String) -> Self {
1647        Self {
1648            agent,
1649            effective_canister_id: canister_id,
1650            canister_id,
1651            method_name,
1652            arg: vec![],
1653            ingress_expiry_datetime: None,
1654            use_nonce: false,
1655        }
1656    }
1657
1658    /// Sets the [effective canister ID](https://internetcomputer.org/docs/current/references/ic-interface-spec#http-effective-canister-id) of the destination.
1659    pub fn with_effective_canister_id(mut self, canister_id: Principal) -> Self {
1660        self.effective_canister_id = canister_id;
1661        self
1662    }
1663
1664    /// Sets the argument blob to pass to the canister. For most canisters this should be a Candid-serialized tuple.
1665    pub fn with_arg<A: Into<Vec<u8>>>(mut self, arg: A) -> Self {
1666        self.arg = arg.into();
1667        self
1668    }
1669
1670    /// Sets `ingress_expiry_datetime` to the provided timestamp, at nanosecond precision.
1671    pub fn expire_at(mut self, time: impl Into<OffsetDateTime>) -> Self {
1672        self.ingress_expiry_datetime = Some(time.into().unix_timestamp_nanos() as u64);
1673        self
1674    }
1675
1676    /// Sets `ingress_expiry_datetime` to `max(now, 4min)`.
1677    pub fn expire_after(mut self, duration: Duration) -> Self {
1678        self.ingress_expiry_datetime = Some(
1679            OffsetDateTime::now_utc()
1680                .saturating_add(duration.try_into().expect("negative duration"))
1681                .unix_timestamp_nanos() as u64,
1682        );
1683        self
1684    }
1685
1686    /// Uses a nonce generated with the agent's configured nonce factory. By default queries do not use nonces,
1687    /// and thus may get a (briefly) cached response.
1688    pub fn with_nonce_generation(mut self) -> Self {
1689        self.use_nonce = true;
1690        self
1691    }
1692
1693    /// Make a query call. This will return a byte vector.
1694    pub async fn call(self) -> Result<Vec<u8>, AgentError> {
1695        self.agent
1696            .query_raw(
1697                self.canister_id,
1698                self.effective_canister_id,
1699                self.method_name,
1700                self.arg,
1701                self.ingress_expiry_datetime,
1702                self.use_nonce,
1703                None,
1704            )
1705            .await
1706    }
1707
1708    /// Make a query call with signature verification. This will return a byte vector.
1709    ///
1710    /// Compared with [call][Self::call], this method will **always** verify the signature of the query response
1711    /// regardless the Agent level configuration from [`AgentBuilder::with_verify_query_signatures`].
1712    pub async fn call_with_verification(self) -> Result<Vec<u8>, AgentError> {
1713        self.agent
1714            .query_raw(
1715                self.canister_id,
1716                self.effective_canister_id,
1717                self.method_name,
1718                self.arg,
1719                self.ingress_expiry_datetime,
1720                self.use_nonce,
1721                Some(true),
1722            )
1723            .await
1724    }
1725
1726    /// Make a query call without signature verification. This will return a byte vector.
1727    ///
1728    /// Compared with [call][Self::call], this method will **never** verify the signature of the query response
1729    /// regardless the Agent level configuration from [`AgentBuilder::with_verify_query_signatures`].
1730    pub async fn call_without_verification(self) -> Result<Vec<u8>, AgentError> {
1731        self.agent
1732            .query_raw(
1733                self.canister_id,
1734                self.effective_canister_id,
1735                self.method_name,
1736                self.arg,
1737                self.ingress_expiry_datetime,
1738                self.use_nonce,
1739                Some(false),
1740            )
1741            .await
1742    }
1743
1744    /// Sign a query call. This will return a [`signed::SignedQuery`]
1745    /// which contains all fields of the query and the signed query in CBOR encoding
1746    pub fn sign(self) -> Result<SignedQuery, AgentError> {
1747        let effective_canister_id = self.effective_canister_id;
1748        let identity = self.agent.identity.clone();
1749        let content = self.into_envelope()?;
1750        let signed_query = sign_envelope(&content, identity)?;
1751        let EnvelopeContent::Query {
1752            ingress_expiry,
1753            sender,
1754            canister_id,
1755            method_name,
1756            arg,
1757            nonce,
1758        } = content
1759        else {
1760            unreachable!()
1761        };
1762        Ok(SignedQuery {
1763            ingress_expiry,
1764            sender,
1765            canister_id,
1766            method_name,
1767            arg,
1768            effective_canister_id,
1769            signed_query,
1770            nonce,
1771        })
1772    }
1773
1774    /// Converts the query builder into [`EnvelopeContent`] for external signing or storage.
1775    pub fn into_envelope(self) -> Result<EnvelopeContent, AgentError> {
1776        self.agent.query_content(
1777            self.canister_id,
1778            self.method_name,
1779            self.arg,
1780            self.ingress_expiry_datetime,
1781            self.use_nonce,
1782        )
1783    }
1784}
1785
1786impl<'agent> IntoFuture for QueryBuilder<'agent> {
1787    type IntoFuture = AgentFuture<'agent, Vec<u8>>;
1788    type Output = Result<Vec<u8>, AgentError>;
1789    fn into_future(self) -> Self::IntoFuture {
1790        Box::pin(self.call())
1791    }
1792}
1793
1794/// An in-flight canister update call. Useful primarily as a `Future`.
1795pub struct UpdateCall<'agent> {
1796    agent: &'agent Agent,
1797    response_future: AgentFuture<'agent, CallResponse<(Vec<u8>, Certificate)>>,
1798    effective_canister_id: Principal,
1799    canister_id: Principal,
1800    method_name: String,
1801}
1802
1803impl fmt::Debug for UpdateCall<'_> {
1804    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1805        f.debug_struct("UpdateCall")
1806            .field("agent", &self.agent)
1807            .field("effective_canister_id", &self.effective_canister_id)
1808            .finish_non_exhaustive()
1809    }
1810}
1811
1812impl Future for UpdateCall<'_> {
1813    type Output = Result<CallResponse<(Vec<u8>, Certificate)>, AgentError>;
1814    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1815        self.response_future.as_mut().poll(cx)
1816    }
1817}
1818
1819impl<'a> UpdateCall<'a> {
1820    /// Waits for the update call to be completed, polling if necessary.
1821    pub async fn and_wait(self) -> Result<(Vec<u8>, Certificate), AgentError> {
1822        let response = self.response_future.await?;
1823
1824        match response {
1825            CallResponse::Response(response) => Ok(response),
1826            CallResponse::Poll(request_id) => {
1827                self.agent
1828                    .wait_inner(
1829                        &request_id,
1830                        self.effective_canister_id,
1831                        Some(Operation::Call {
1832                            canister: self.canister_id,
1833                            method: self.method_name,
1834                        }),
1835                    )
1836                    .await
1837            }
1838        }
1839    }
1840}
1841/// An update request Builder.
1842///
1843/// This makes it easier to do update calls without actually passing all arguments or specifying
1844/// if you want to wait or not.
1845#[derive(Debug)]
1846pub struct UpdateBuilder<'agent> {
1847    agent: &'agent Agent,
1848    /// The [effective canister ID](https://internetcomputer.org/docs/current/references/ic-interface-spec#http-effective-canister-id) of the destination.
1849    pub effective_canister_id: Principal,
1850    /// The principal ID of the canister being called.
1851    pub canister_id: Principal,
1852    /// The name of the canister method being called.
1853    pub method_name: String,
1854    /// The argument blob to be passed to the method.
1855    pub arg: Vec<u8>,
1856    /// The Unix timestamp that the request will expire at.
1857    pub ingress_expiry_datetime: Option<u64>,
1858}
1859
1860impl<'agent> UpdateBuilder<'agent> {
1861    /// Creates a new update builder with an agent for a particular canister method.
1862    pub fn new(agent: &'agent Agent, canister_id: Principal, method_name: String) -> Self {
1863        Self {
1864            agent,
1865            effective_canister_id: canister_id,
1866            canister_id,
1867            method_name,
1868            arg: vec![],
1869            ingress_expiry_datetime: None,
1870        }
1871    }
1872
1873    /// Sets the [effective canister ID](https://internetcomputer.org/docs/current/references/ic-interface-spec#http-effective-canister-id) of the destination.
1874    pub fn with_effective_canister_id(mut self, canister_id: Principal) -> Self {
1875        self.effective_canister_id = canister_id;
1876        self
1877    }
1878
1879    /// Sets the argument blob to pass to the canister. For most canisters this should be a Candid-serialized tuple.
1880    pub fn with_arg<A: Into<Vec<u8>>>(mut self, arg: A) -> Self {
1881        self.arg = arg.into();
1882        self
1883    }
1884
1885    /// Sets `ingress_expiry_datetime` to the provided timestamp, at nanosecond precision.
1886    pub fn expire_at(mut self, time: impl Into<OffsetDateTime>) -> Self {
1887        self.ingress_expiry_datetime = Some(time.into().unix_timestamp_nanos() as u64);
1888        self
1889    }
1890
1891    /// Sets `ingress_expiry_datetime` to `min(now, 4min)`.
1892    pub fn expire_after(mut self, duration: Duration) -> Self {
1893        self.ingress_expiry_datetime = Some(
1894            OffsetDateTime::now_utc()
1895                .saturating_add(duration.try_into().expect("negative duration"))
1896                .unix_timestamp_nanos() as u64,
1897        );
1898        self
1899    }
1900
1901    /// Make an update call. This will call `request_status` on the `RequestId` in a loop and return
1902    /// the response as a byte vector.
1903    pub async fn call_and_wait(self) -> Result<Vec<u8>, AgentError> {
1904        self.call().and_wait().await.map(|x| x.0)
1905    }
1906
1907    /// Make an update call. This will return a `RequestId`.
1908    /// The `RequestId` should then be used for `request_status` (most likely in a loop).
1909    pub fn call(self) -> UpdateCall<'agent> {
1910        let method_name = self.method_name.clone();
1911        let response_future = async move {
1912            self.agent
1913                .update_raw(
1914                    self.canister_id,
1915                    self.effective_canister_id,
1916                    self.method_name,
1917                    self.arg,
1918                    self.ingress_expiry_datetime,
1919                )
1920                .await
1921        };
1922        UpdateCall {
1923            agent: self.agent,
1924            response_future: Box::pin(response_future),
1925            effective_canister_id: self.effective_canister_id,
1926            canister_id: self.canister_id,
1927            method_name,
1928        }
1929    }
1930
1931    /// Sign a update call. This will return a [`signed::SignedUpdate`]
1932    /// which contains all fields of the update and the signed update in CBOR encoding
1933    pub fn sign(self) -> Result<SignedUpdate, AgentError> {
1934        let identity = self.agent.identity.clone();
1935        let effective_canister_id = self.effective_canister_id;
1936        let content = self.into_envelope()?;
1937        let signed_update = sign_envelope(&content, identity)?;
1938        let request_id = to_request_id(&content)?;
1939        let EnvelopeContent::Call {
1940            nonce,
1941            ingress_expiry,
1942            sender,
1943            canister_id,
1944            method_name,
1945            arg,
1946        } = content
1947        else {
1948            unreachable!()
1949        };
1950        Ok(SignedUpdate {
1951            nonce,
1952            ingress_expiry,
1953            sender,
1954            canister_id,
1955            method_name,
1956            arg,
1957            effective_canister_id,
1958            signed_update,
1959            request_id,
1960        })
1961    }
1962
1963    /// Converts the update builder into an [`EnvelopeContent`] for external signing or storage.
1964    pub fn into_envelope(self) -> Result<EnvelopeContent, AgentError> {
1965        let nonce = self.agent.nonce_factory.generate();
1966        self.agent.update_content(
1967            self.canister_id,
1968            self.method_name,
1969            self.arg,
1970            self.ingress_expiry_datetime,
1971            nonce,
1972        )
1973    }
1974}
1975
1976impl<'agent> IntoFuture for UpdateBuilder<'agent> {
1977    type IntoFuture = AgentFuture<'agent, Vec<u8>>;
1978    type Output = Result<Vec<u8>, AgentError>;
1979    fn into_future(self) -> Self::IntoFuture {
1980        Box::pin(self.call_and_wait())
1981    }
1982}
1983
1984/// HTTP client middleware. Implemented automatically for `reqwest`-compatible by-ref `tower::Service`, such as `reqwest_middleware`.
1985#[cfg_attr(target_family = "wasm", async_trait(?Send))]
1986#[cfg_attr(not(target_family = "wasm"), async_trait)]
1987pub trait HttpService: Send + Sync + Debug {
1988    /// Perform a HTTP request. Any retry logic should call `req` again to get a new request.
1989    async fn call<'a>(
1990        &'a self,
1991        req: &'a (dyn Fn() -> Result<http::Request<Bytes>, AgentError> + Send + Sync),
1992        max_retries: usize,
1993        size_limit: Option<usize>,
1994    ) -> Result<http::Response<Bytes>, AgentError>;
1995}
1996
1997/// Convert from http Request to reqwest's one
1998fn from_http_request(req: http::Request<Bytes>) -> Result<Request, AgentError> {
1999    let (parts, body) = req.into_parts();
2000    let body = reqwest::Body::from(body);
2001    // I think it can never fail since it converts from `Url` to `Uri` and `Url` is a subset of `Uri`,
2002    // but just to be safe let's handle it.
2003    let request = http::Request::from_parts(parts, body)
2004        .try_into()
2005        .map_err(|e: reqwest::Error| AgentError::InvalidReplicaUrl(e.to_string()))?;
2006
2007    Ok(request)
2008}
2009
2010/// Convert from reqwests's Response to http one
2011#[cfg(not(target_family = "wasm"))]
2012async fn to_http_response(
2013    resp: Response,
2014    size_limit: Option<usize>,
2015) -> Result<http::Response<Bytes>, AgentError> {
2016    use http_body_util::{BodyExt, Limited};
2017
2018    let resp: http::Response<reqwest::Body> = resp.into();
2019    let (parts, body) = resp.into_parts();
2020    let body = Limited::new(body, size_limit.unwrap_or(usize::MAX));
2021    let body = body
2022        .collect()
2023        .await
2024        .map_err(|e| {
2025            AgentError::TransportError(TransportError::Generic(format!(
2026                "unable to read response body: {e:#}"
2027            )))
2028        })?
2029        .to_bytes();
2030    let resp = http::Response::from_parts(parts, body);
2031
2032    Ok(resp)
2033}
2034
2035/// Convert from reqwests's Response to http one.
2036/// WASM Response in reqwest doesn't have direct conversion for http::Response,
2037/// so we have to hack around using streams.
2038#[cfg(target_family = "wasm")]
2039async fn to_http_response(
2040    resp: Response,
2041    size_limit: Option<usize>,
2042) -> Result<http::Response<Bytes>, AgentError> {
2043    use futures_util::StreamExt;
2044    use http_body::Frame;
2045    use http_body_util::{Limited, StreamBody};
2046
2047    // Save headers
2048    let status = resp.status();
2049    let headers = resp.headers().clone();
2050
2051    // Convert body
2052    let stream = resp.bytes_stream().map(|x| x.map(Frame::data));
2053    let body = StreamBody::new(stream);
2054    let body = Limited::new(body, size_limit.unwrap_or(usize::MAX));
2055    let body = http_body_util::BodyExt::collect(body)
2056        .await
2057        .map_err(|e| {
2058            AgentError::TransportError(TransportError::Generic(format!(
2059                "unable to read response body: {e:#}"
2060            )))
2061        })?
2062        .to_bytes();
2063
2064    let mut resp = http::Response::new(body);
2065    *resp.status_mut() = status;
2066    *resp.headers_mut() = headers;
2067
2068    Ok(resp)
2069}
2070
2071#[cfg(not(target_family = "wasm"))]
2072#[async_trait]
2073impl<T> HttpService for T
2074where
2075    for<'a> &'a T: Service<Request, Response = Response, Error = reqwest::Error>,
2076    for<'a> <&'a Self as Service<Request>>::Future: Send,
2077    T: Send + Sync + Debug + ?Sized,
2078{
2079    #[allow(clippy::needless_arbitrary_self_type)]
2080    async fn call<'a>(
2081        mut self: &'a Self,
2082        req: &'a (dyn Fn() -> Result<http::Request<Bytes>, AgentError> + Send + Sync),
2083        max_retries: usize,
2084        size_limit: Option<usize>,
2085    ) -> Result<http::Response<Bytes>, AgentError> {
2086        let mut retry_count = 0;
2087        loop {
2088            let request = from_http_request(req()?)?;
2089
2090            match Service::call(&mut self, request).await {
2091                Err(err) => {
2092                    // Network-related errors can be retried.
2093                    if err.is_connect() {
2094                        if retry_count >= max_retries {
2095                            return Err(AgentError::TransportError(TransportError::Reqwest(err)));
2096                        }
2097                        retry_count += 1;
2098                    }
2099                }
2100
2101                Ok(resp) => {
2102                    let resp = to_http_response(resp, size_limit).await?;
2103                    return Ok(resp);
2104                }
2105            }
2106        }
2107    }
2108}
2109
2110#[cfg(target_family = "wasm")]
2111#[async_trait(?Send)]
2112impl<T> HttpService for T
2113where
2114    for<'a> &'a T: Service<Request, Response = Response, Error = reqwest::Error>,
2115    T: Send + Sync + Debug + ?Sized,
2116{
2117    #[allow(clippy::needless_arbitrary_self_type)]
2118    async fn call<'a>(
2119        mut self: &'a Self,
2120        req: &'a (dyn Fn() -> Result<http::Request<Bytes>, AgentError> + Send + Sync),
2121        _retries: usize,
2122        _size_limit: Option<usize>,
2123    ) -> Result<http::Response<Bytes>, AgentError> {
2124        let request = from_http_request(req()?)?;
2125        let response = Service::call(&mut self, request)
2126            .await
2127            .map_err(|e| AgentError::TransportError(TransportError::Reqwest(e)))?;
2128
2129        to_http_response(response, _size_limit).await
2130    }
2131}
2132
2133#[derive(Debug)]
2134struct Retry429Logic {
2135    client: Client,
2136}
2137
2138#[cfg_attr(target_family = "wasm", async_trait(?Send))]
2139#[cfg_attr(not(target_family = "wasm"), async_trait)]
2140impl HttpService for Retry429Logic {
2141    async fn call<'a>(
2142        &'a self,
2143        req: &'a (dyn Fn() -> Result<http::Request<Bytes>, AgentError> + Send + Sync),
2144        _max_tcp_retries: usize,
2145        _size_limit: Option<usize>,
2146    ) -> Result<http::Response<Bytes>, AgentError> {
2147        let mut retries = 0;
2148        loop {
2149            #[cfg(not(target_family = "wasm"))]
2150            let resp = self.client.call(req, _max_tcp_retries, _size_limit).await?;
2151            // Client inconveniently does not implement Service on wasm
2152            #[cfg(target_family = "wasm")]
2153            let resp = {
2154                let request = from_http_request(req()?)?;
2155                let resp = self
2156                    .client
2157                    .execute(request)
2158                    .await
2159                    .map_err(|e| AgentError::TransportError(TransportError::Reqwest(e)))?;
2160
2161                to_http_response(resp, _size_limit).await?
2162            };
2163
2164            if resp.status() == StatusCode::TOO_MANY_REQUESTS {
2165                if retries == 6 {
2166                    break Ok(resp);
2167                } else {
2168                    retries += 1;
2169                    crate::util::sleep(Duration::from_millis(250)).await;
2170                    continue;
2171                }
2172            } else {
2173                break Ok(resp);
2174            }
2175        }
2176    }
2177}
2178
2179#[cfg(all(test, not(target_family = "wasm")))]
2180mod offline_tests {
2181    use super::*;
2182    use tokio::net::TcpListener;
2183    // Any tests that involve the network should go in agent_test, not here.
2184
2185    #[test]
2186    fn rounded_expiry() {
2187        let agent = Agent::builder()
2188            .with_url("http://not-a-real-url")
2189            .build()
2190            .unwrap();
2191        let mut prev_expiry = None;
2192        let mut num_timestamps = 0;
2193        for _ in 0..6 {
2194            let update = agent
2195                .update(&Principal::management_canister(), "not_a_method")
2196                .sign()
2197                .unwrap();
2198            if prev_expiry < Some(update.ingress_expiry) {
2199                prev_expiry = Some(update.ingress_expiry);
2200                num_timestamps += 1;
2201            }
2202        }
2203        // in six requests, there should be no more than two timestamps
2204        assert!(num_timestamps <= 2, "num_timestamps:{num_timestamps} > 2");
2205    }
2206
2207    #[tokio::test]
2208    async fn client_ratelimit() {
2209        let mock_server = TcpListener::bind("127.0.0.1:0").await.unwrap();
2210        let count = Arc::new(Mutex::new(0));
2211        let port = mock_server.local_addr().unwrap().port();
2212        tokio::spawn({
2213            let count = count.clone();
2214            async move {
2215                loop {
2216                    let (mut conn, _) = mock_server.accept().await.unwrap();
2217                    *count.lock().unwrap() += 1;
2218                    tokio::spawn(
2219                        // read all data, never reply
2220                        async move { tokio::io::copy(&mut conn, &mut tokio::io::sink()).await },
2221                    );
2222                }
2223            }
2224        });
2225        let agent = Agent::builder()
2226            .with_http_client(Client::builder().http1_only().build().unwrap())
2227            .with_url(format!("http://127.0.0.1:{port}"))
2228            .with_max_concurrent_requests(2)
2229            .build()
2230            .unwrap();
2231        for _ in 0..3 {
2232            let agent = agent.clone();
2233            tokio::spawn(async move {
2234                agent
2235                    .query(&"ryjl3-tyaaa-aaaaa-aaaba-cai".parse().unwrap(), "greet")
2236                    .call()
2237                    .await
2238            });
2239        }
2240        crate::util::sleep(Duration::from_millis(250)).await;
2241        assert_eq!(*count.lock().unwrap(), 2);
2242    }
2243}