ic_agent/agent/
mod.rs

1//! The main Agent module. Contains the [Agent] type and all associated structures.
2pub(crate) mod agent_config;
3pub mod agent_error;
4pub(crate) mod builder;
5// delete this module after 0.40
6#[doc(hidden)]
7#[deprecated(since = "0.38.0", note = "use the AgentBuilder methods")]
8pub mod http_transport;
9pub(crate) mod nonce;
10pub(crate) mod response_authentication;
11pub mod route_provider;
12pub mod status;
13
14pub use agent_config::AgentConfig;
15pub use agent_error::AgentError;
16use agent_error::{HttpErrorPayload, Operation};
17use async_lock::Semaphore;
18use async_trait::async_trait;
19pub use builder::AgentBuilder;
20use cached::{Cached, TimedCache};
21use ed25519_consensus::{Error as Ed25519Error, Signature, VerificationKey};
22use futures_util::StreamExt;
23use http::{header::CONTENT_TYPE, HeaderMap, Method, StatusCode};
24#[doc(inline)]
25pub use ic_transport_types::{
26    signed, CallResponse, Envelope, EnvelopeContent, RejectCode, RejectResponse, ReplyResponse,
27    RequestStatusResponse,
28};
29pub use nonce::{NonceFactory, NonceGenerator};
30use rangemap::{RangeInclusiveMap, RangeInclusiveSet, StepFns};
31use reqwest::{Body, Client, Request, Response};
32use route_provider::{
33    dynamic_routing::{
34        dynamic_route_provider::DynamicRouteProviderBuilder, node::Node,
35        snapshot::latency_based_routing::LatencyRoutingSnapshot,
36    },
37    RouteProvider, UrlUntilReady,
38};
39use time::OffsetDateTime;
40use tower_service::Service;
41
42#[cfg(test)]
43mod agent_test;
44
45use crate::{
46    agent::response_authentication::{
47        extract_der, lookup_canister_info, lookup_canister_metadata, lookup_request_status,
48        lookup_subnet, lookup_subnet_metrics, lookup_time, lookup_value,
49    },
50    export::Principal,
51    identity::Identity,
52    to_request_id, RequestId,
53};
54use backoff::{backoff::Backoff, ExponentialBackoffBuilder};
55use backoff::{exponential::ExponentialBackoff, SystemClock};
56use ic_certification::{Certificate, Delegation, Label};
57use ic_transport_types::{
58    signed::{SignedQuery, SignedRequestStatus, SignedUpdate},
59    QueryResponse, ReadStateResponse, SubnetMetrics, TransportCallResponse,
60};
61use serde::Serialize;
62use status::Status;
63use std::{
64    borrow::Cow,
65    collections::HashMap,
66    convert::TryFrom,
67    fmt::{self, Debug},
68    future::{Future, IntoFuture},
69    pin::Pin,
70    sync::{Arc, Mutex, RwLock},
71    task::{Context, Poll},
72    time::Duration,
73};
74
75use crate::agent::response_authentication::lookup_api_boundary_nodes;
76
77const IC_STATE_ROOT_DOMAIN_SEPARATOR: &[u8; 14] = b"\x0Dic-state-root";
78
79const IC_ROOT_KEY: &[u8; 133] = b"\x30\x81\x82\x30\x1d\x06\x0d\x2b\x06\x01\x04\x01\x82\xdc\x7c\x05\x03\x01\x02\x01\x06\x0c\x2b\x06\x01\x04\x01\x82\xdc\x7c\x05\x03\x02\x01\x03\x61\x00\x81\x4c\x0e\x6e\xc7\x1f\xab\x58\x3b\x08\xbd\x81\x37\x3c\x25\x5c\x3c\x37\x1b\x2e\x84\x86\x3c\x98\xa4\xf1\xe0\x8b\x74\x23\x5d\x14\xfb\x5d\x9c\x0c\xd5\x46\xd9\x68\x5f\x91\x3a\x0c\x0b\x2c\xc5\x34\x15\x83\xbf\x4b\x43\x92\xe4\x67\xdb\x96\xd6\x5b\x9b\xb4\xcb\x71\x71\x12\xf8\x47\x2e\x0d\x5a\x4d\x14\x50\x5f\xfd\x74\x84\xb0\x12\x91\x09\x1c\x5f\x87\xb9\x88\x83\x46\x3f\x98\x09\x1a\x0b\xaa\xae";
80
81#[cfg(not(target_family = "wasm"))]
82type AgentFuture<'a, V> = Pin<Box<dyn Future<Output = Result<V, AgentError>> + Send + 'a>>;
83
84#[cfg(target_family = "wasm")]
85type AgentFuture<'a, V> = Pin<Box<dyn Future<Output = Result<V, AgentError>> + 'a>>;
86
87/// A low level Agent to make calls to a Replica endpoint.
88///
89/// ```ignore
90/// # // This test is ignored because it requires an ic to be running. We run these
91/// # // in the ic-ref workflow.
92/// use ic_agent::{Agent, export::Principal};
93/// use candid::{Encode, Decode, CandidType, Nat};
94/// use serde::Deserialize;
95///
96/// #[derive(CandidType)]
97/// struct Argument {
98///   amount: Option<Nat>,
99/// }
100///
101/// #[derive(CandidType, Deserialize)]
102/// struct CreateCanisterResult {
103///   canister_id: Principal,
104/// }
105///
106/// # fn create_identity() -> impl ic_agent::Identity {
107/// #
108/// #     ic_agent::identity::BasicIdentity::from_signing_key(
109/// #         ed25519_consensus::SigningKey::new(rand::thread_rng())
110/// #     )
111/// # }
112/// #
113/// async fn create_a_canister() -> Result<Principal, Box<dyn std::error::Error>> {
114/// # let url = format!("http://localhost:{}", option_env!("IC_REF_PORT").unwrap_or("4943"));
115///   let agent = Agent::builder()
116///     .with_url(url)
117///     .with_identity(create_identity())
118///     .build()?;
119///
120///   // Only do the following call when not contacting the IC main net (e.g. a local emulator).
121///   // This is important as the main net public key is static and a rogue network could return
122///   // a different key.
123///   // If you know the root key ahead of time, you can use `agent.set_root_key(root_key);`.
124///   agent.fetch_root_key().await?;
125///   let management_canister_id = Principal::from_text("aaaaa-aa")?;
126///
127///   // Create a call to the management canister to create a new canister ID,
128///   // and wait for a result.
129///   // The effective canister id must belong to the canister ranges of the subnet at which the canister is created.
130///   let effective_canister_id = Principal::from_text("rwlgt-iiaaa-aaaaa-aaaaa-cai").unwrap();
131///   let response = agent.update(&management_canister_id, "provisional_create_canister_with_cycles")
132///     .with_effective_canister_id(effective_canister_id)
133///     .with_arg(Encode!(&Argument { amount: None })?)
134///     .await?;
135///
136///   let result = Decode!(response.as_slice(), CreateCanisterResult)?;
137///   let canister_id: Principal = Principal::from_text(&result.canister_id.to_text())?;
138///   Ok(canister_id)
139/// }
140///
141/// # let mut runtime = tokio::runtime::Runtime::new().unwrap();
142/// # runtime.block_on(async {
143/// let canister_id = create_a_canister().await.unwrap();
144/// eprintln!("{}", canister_id);
145/// # });
146/// ```
147///
148/// This agent does not understand Candid, and only acts on byte buffers.
149///
150/// Some methods return certificates. While there is a `verify_certificate` method, any certificate
151/// you receive from a method has already been verified and you do not need to manually verify it.
152#[derive(Clone)]
153pub struct Agent {
154    nonce_factory: Arc<dyn NonceGenerator>,
155    identity: Arc<dyn Identity>,
156    ingress_expiry: Duration,
157    root_key: Arc<RwLock<Vec<u8>>>,
158    client: Arc<dyn HttpService>,
159    route_provider: Arc<dyn RouteProvider>,
160    subnet_key_cache: Arc<Mutex<SubnetCache>>,
161    concurrent_requests_semaphore: Arc<Semaphore>,
162    verify_query_signatures: bool,
163    max_response_body_size: Option<usize>,
164    max_polling_time: Duration,
165    #[allow(dead_code)]
166    max_tcp_error_retries: usize,
167}
168
169impl fmt::Debug for Agent {
170    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
171        f.debug_struct("Agent")
172            .field("ingress_expiry", &self.ingress_expiry)
173            .finish_non_exhaustive()
174    }
175}
176
177impl Agent {
178    /// Create an instance of an [`AgentBuilder`] for building an [`Agent`]. This is simpler than
179    /// using the [`AgentConfig`] and [`Agent::new()`].
180    pub fn builder() -> builder::AgentBuilder {
181        Default::default()
182    }
183
184    /// Create an instance of an [`Agent`].
185    pub fn new(config: agent_config::AgentConfig) -> Result<Agent, AgentError> {
186        let client = config.http_service.unwrap_or_else(|| {
187            Arc::new(Retry429Logic {
188                client: config.client.unwrap_or_else(|| {
189                    #[cfg(not(target_family = "wasm"))]
190                    {
191                        Client::builder()
192                            .use_rustls_tls()
193                            .timeout(Duration::from_secs(360))
194                            .build()
195                            .expect("Could not create HTTP client.")
196                    }
197                    #[cfg(all(target_family = "wasm", feature = "wasm-bindgen"))]
198                    {
199                        Client::new()
200                    }
201                }),
202            })
203        });
204        Ok(Agent {
205            nonce_factory: config.nonce_factory,
206            identity: config.identity,
207            ingress_expiry: config.ingress_expiry,
208            root_key: Arc::new(RwLock::new(IC_ROOT_KEY.to_vec())),
209            client: client.clone(),
210            route_provider: if let Some(route_provider) = config.route_provider {
211                route_provider
212            } else if let Some(url) = config.url {
213                if config.background_dynamic_routing {
214                    assert!(
215                        url.scheme() == "https" && url.path() == "/" && url.port().is_none() && url.domain().is_some(),
216                        "in dynamic routing mode, URL must be in the exact form https://domain with no path, port, IP, or non-HTTPS scheme"
217                    );
218                    let seeds = vec![Node::new(url.domain().unwrap()).unwrap()];
219                    UrlUntilReady::new(url, async move {
220                        DynamicRouteProviderBuilder::new(
221                            LatencyRoutingSnapshot::new(),
222                            seeds,
223                            client,
224                        )
225                        .build()
226                        .await
227                    }) as Arc<dyn RouteProvider>
228                } else {
229                    Arc::new(url)
230                }
231            } else {
232                panic!("either route_provider or url must be specified");
233            },
234            subnet_key_cache: Arc::new(Mutex::new(SubnetCache::new())),
235            verify_query_signatures: config.verify_query_signatures,
236            concurrent_requests_semaphore: Arc::new(Semaphore::new(config.max_concurrent_requests)),
237            max_response_body_size: config.max_response_body_size,
238            max_tcp_error_retries: config.max_tcp_error_retries,
239            max_polling_time: config.max_polling_time,
240        })
241    }
242
243    /// Set the identity provider for signing messages.
244    ///
245    /// NOTE: if you change the identity while having update calls in
246    /// flight, you will not be able to [`Agent::request_status_raw`] the status of these
247    /// messages.
248    pub fn set_identity<I>(&mut self, identity: I)
249    where
250        I: 'static + Identity,
251    {
252        self.identity = Arc::new(identity);
253    }
254    /// Set the arc identity provider for signing messages.
255    ///
256    /// NOTE: if you change the identity while having update calls in
257    /// flight, you will not be able to [`Agent::request_status_raw`] the status of these
258    /// messages.
259    pub fn set_arc_identity(&mut self, identity: Arc<dyn Identity>) {
260        self.identity = identity;
261    }
262
263    /// By default, the agent is configured to talk to the main Internet Computer, and verifies
264    /// responses using a hard-coded public key.
265    ///
266    /// This function will instruct the agent to ask the endpoint for its public key, and use
267    /// that instead. This is required when talking to a local test instance, for example.
268    ///
269    /// *Only use this when you are  _not_ talking to the main Internet Computer, otherwise
270    /// you are prone to man-in-the-middle attacks! Do not call this function by default.*
271    pub async fn fetch_root_key(&self) -> Result<(), AgentError> {
272        if self.read_root_key()[..] != IC_ROOT_KEY[..] {
273            // already fetched the root key
274            return Ok(());
275        }
276        let status = self.status().await?;
277        let Some(root_key) = status.root_key else {
278            return Err(AgentError::NoRootKeyInStatus(status));
279        };
280        self.set_root_key(root_key);
281        Ok(())
282    }
283
284    /// By default, the agent is configured to talk to the main Internet Computer, and verifies
285    /// responses using a hard-coded public key.
286    ///
287    /// Using this function you can set the root key to a known one if you know if beforehand.
288    pub fn set_root_key(&self, root_key: Vec<u8>) {
289        *self.root_key.write().unwrap() = root_key;
290    }
291
292    /// Return the root key currently in use.
293    pub fn read_root_key(&self) -> Vec<u8> {
294        self.root_key.read().unwrap().clone()
295    }
296
297    fn get_expiry_date(&self) -> u64 {
298        let expiry_raw = OffsetDateTime::now_utc() + self.ingress_expiry;
299        let mut rounded = expiry_raw.replace_nanosecond(0).unwrap();
300        if self.ingress_expiry.as_secs() > 90 {
301            rounded = rounded.replace_second(0).unwrap();
302        }
303        rounded.unix_timestamp_nanos().try_into().unwrap()
304    }
305
306    /// Return the principal of the identity.
307    pub fn get_principal(&self) -> Result<Principal, String> {
308        self.identity.sender()
309    }
310
311    async fn query_endpoint<A>(
312        &self,
313        effective_canister_id: Principal,
314        serialized_bytes: Vec<u8>,
315    ) -> Result<A, AgentError>
316    where
317        A: serde::de::DeserializeOwned,
318    {
319        let _permit = self.concurrent_requests_semaphore.acquire().await;
320        let bytes = self
321            .execute(
322                Method::POST,
323                &format!("api/v2/canister/{}/query", effective_canister_id.to_text()),
324                Some(serialized_bytes),
325            )
326            .await?
327            .1;
328        serde_cbor::from_slice(&bytes).map_err(AgentError::InvalidCborData)
329    }
330
331    async fn read_state_endpoint<A>(
332        &self,
333        effective_canister_id: Principal,
334        serialized_bytes: Vec<u8>,
335    ) -> Result<A, AgentError>
336    where
337        A: serde::de::DeserializeOwned,
338    {
339        let _permit = self.concurrent_requests_semaphore.acquire().await;
340        let endpoint = format!(
341            "api/v2/canister/{}/read_state",
342            effective_canister_id.to_text()
343        );
344        let bytes = self
345            .execute(Method::POST, &endpoint, Some(serialized_bytes))
346            .await?
347            .1;
348        serde_cbor::from_slice(&bytes).map_err(AgentError::InvalidCborData)
349    }
350
351    async fn read_subnet_state_endpoint<A>(
352        &self,
353        subnet_id: Principal,
354        serialized_bytes: Vec<u8>,
355    ) -> Result<A, AgentError>
356    where
357        A: serde::de::DeserializeOwned,
358    {
359        let _permit = self.concurrent_requests_semaphore.acquire().await;
360        let endpoint = format!("api/v2/subnet/{}/read_state", subnet_id.to_text());
361        let bytes = self
362            .execute(Method::POST, &endpoint, Some(serialized_bytes))
363            .await?
364            .1;
365        serde_cbor::from_slice(&bytes).map_err(AgentError::InvalidCborData)
366    }
367
368    async fn call_endpoint(
369        &self,
370        effective_canister_id: Principal,
371        serialized_bytes: Vec<u8>,
372    ) -> Result<TransportCallResponse, AgentError> {
373        let _permit = self.concurrent_requests_semaphore.acquire().await;
374        let endpoint = format!("api/v3/canister/{}/call", effective_canister_id.to_text());
375        let (status_code, response_body) = self
376            .execute(Method::POST, &endpoint, Some(serialized_bytes))
377            .await?;
378
379        if status_code == StatusCode::ACCEPTED {
380            return Ok(TransportCallResponse::Accepted);
381        }
382
383        serde_cbor::from_slice(&response_body).map_err(AgentError::InvalidCborData)
384    }
385
386    /// The simplest way to do a query call; sends a byte array and will return a byte vector.
387    /// The encoding is left as an exercise to the user.
388    #[allow(clippy::too_many_arguments)]
389    async fn query_raw(
390        &self,
391        canister_id: Principal,
392        effective_canister_id: Principal,
393        method_name: String,
394        arg: Vec<u8>,
395        ingress_expiry_datetime: Option<u64>,
396        use_nonce: bool,
397        explicit_verify_query_signatures: Option<bool>,
398    ) -> Result<Vec<u8>, AgentError> {
399        let operation = Operation::Call {
400            canister: canister_id,
401            method: method_name.clone(),
402        };
403        let content = self.query_content(
404            canister_id,
405            method_name,
406            arg,
407            ingress_expiry_datetime,
408            use_nonce,
409        )?;
410        let serialized_bytes = sign_envelope(&content, self.identity.clone())?;
411        self.query_inner(
412            effective_canister_id,
413            serialized_bytes,
414            content.to_request_id(),
415            explicit_verify_query_signatures,
416            operation,
417        )
418        .await
419    }
420
421    /// Send the signed query to the network. Will return a byte vector.
422    /// The bytes will be checked if it is a valid query.
423    /// If you want to inspect the fields of the query call, use [`signed_query_inspect`] before calling this method.
424    pub async fn query_signed(
425        &self,
426        effective_canister_id: Principal,
427        signed_query: Vec<u8>,
428    ) -> Result<Vec<u8>, AgentError> {
429        let envelope: Envelope =
430            serde_cbor::from_slice(&signed_query).map_err(AgentError::InvalidCborData)?;
431        let EnvelopeContent::Query {
432            canister_id,
433            method_name,
434            ..
435        } = &*envelope.content
436        else {
437            return Err(AgentError::CallDataMismatch {
438                field: "request_type".to_string(),
439                value_arg: "query".to_string(),
440                value_cbor: if matches!(*envelope.content, EnvelopeContent::Call { .. }) {
441                    "update"
442                } else {
443                    "read_state"
444                }
445                .to_string(),
446            });
447        };
448        let operation = Operation::Call {
449            canister: *canister_id,
450            method: method_name.clone(),
451        };
452        self.query_inner(
453            effective_canister_id,
454            signed_query,
455            envelope.content.to_request_id(),
456            None,
457            operation,
458        )
459        .await
460    }
461
462    /// Helper function for performing both the query call and possibly a `read_state` to check the subnet node keys.
463    ///
464    /// This should be used instead of `query_endpoint`. No validation is performed on `signed_query`.
465    async fn query_inner(
466        &self,
467        effective_canister_id: Principal,
468        signed_query: Vec<u8>,
469        request_id: RequestId,
470        explicit_verify_query_signatures: Option<bool>,
471        operation: Operation,
472    ) -> Result<Vec<u8>, AgentError> {
473        let response = if explicit_verify_query_signatures.unwrap_or(self.verify_query_signatures) {
474            let (response, mut subnet) = futures_util::try_join!(
475                self.query_endpoint::<QueryResponse>(effective_canister_id, signed_query),
476                self.get_subnet_by_canister(&effective_canister_id)
477            )?;
478            if response.signatures().is_empty() {
479                return Err(AgentError::MissingSignature);
480            } else if response.signatures().len() > subnet.node_keys.len() {
481                return Err(AgentError::TooManySignatures {
482                    had: response.signatures().len(),
483                    needed: subnet.node_keys.len(),
484                });
485            }
486            for signature in response.signatures() {
487                if OffsetDateTime::now_utc()
488                    - OffsetDateTime::from_unix_timestamp_nanos(signature.timestamp.into()).unwrap()
489                    > self.ingress_expiry
490                {
491                    return Err(AgentError::CertificateOutdated(self.ingress_expiry));
492                }
493                let signable = response.signable(request_id, signature.timestamp);
494                let node_key = if let Some(node_key) = subnet.node_keys.get(&signature.identity) {
495                    node_key
496                } else {
497                    subnet = self
498                        .fetch_subnet_by_canister(&effective_canister_id)
499                        .await?;
500                    subnet
501                        .node_keys
502                        .get(&signature.identity)
503                        .ok_or(AgentError::CertificateNotAuthorized())?
504                };
505                if node_key.len() != 44 {
506                    return Err(AgentError::DerKeyLengthMismatch {
507                        expected: 44,
508                        actual: node_key.len(),
509                    });
510                }
511                const DER_PREFIX: [u8; 12] = [48, 42, 48, 5, 6, 3, 43, 101, 112, 3, 33, 0];
512                if node_key[..12] != DER_PREFIX {
513                    return Err(AgentError::DerPrefixMismatch {
514                        expected: DER_PREFIX.to_vec(),
515                        actual: node_key[..12].to_vec(),
516                    });
517                }
518                let pubkey =
519                    VerificationKey::try_from(<[u8; 32]>::try_from(&node_key[12..]).unwrap())
520                        .map_err(|_| AgentError::MalformedPublicKey)?;
521                let sig = Signature::from(
522                    <[u8; 64]>::try_from(&signature.signature[..])
523                        .map_err(|_| AgentError::MalformedSignature)?,
524                );
525
526                match pubkey.verify(&sig, &signable) {
527                    Err(Ed25519Error::InvalidSignature) => {
528                        return Err(AgentError::QuerySignatureVerificationFailed)
529                    }
530                    Err(Ed25519Error::InvalidSliceLength) => {
531                        return Err(AgentError::MalformedSignature)
532                    }
533                    Err(Ed25519Error::MalformedPublicKey) => {
534                        return Err(AgentError::MalformedPublicKey)
535                    }
536                    Ok(()) => (),
537                    _ => unreachable!(),
538                }
539            }
540            response
541        } else {
542            self.query_endpoint::<QueryResponse>(effective_canister_id, signed_query)
543                .await?
544        };
545
546        match response {
547            QueryResponse::Replied { reply, .. } => Ok(reply.arg),
548            QueryResponse::Rejected { reject, .. } => Err(AgentError::UncertifiedReject {
549                reject,
550                operation: Some(operation),
551            }),
552        }
553    }
554
555    fn query_content(
556        &self,
557        canister_id: Principal,
558        method_name: String,
559        arg: Vec<u8>,
560        ingress_expiry_datetime: Option<u64>,
561        use_nonce: bool,
562    ) -> Result<EnvelopeContent, AgentError> {
563        Ok(EnvelopeContent::Query {
564            sender: self.identity.sender().map_err(AgentError::SigningError)?,
565            canister_id,
566            method_name,
567            arg,
568            ingress_expiry: ingress_expiry_datetime.unwrap_or_else(|| self.get_expiry_date()),
569            nonce: use_nonce.then(|| self.nonce_factory.generate()).flatten(),
570        })
571    }
572
573    /// The simplest way to do an update call; sends a byte array and will return a response, [`CallResponse`], from the replica.
574    async fn update_raw(
575        &self,
576        canister_id: Principal,
577        effective_canister_id: Principal,
578        method_name: String,
579        arg: Vec<u8>,
580        ingress_expiry_datetime: Option<u64>,
581    ) -> Result<CallResponse<(Vec<u8>, Certificate)>, AgentError> {
582        let nonce = self.nonce_factory.generate();
583        let content = self.update_content(
584            canister_id,
585            method_name.clone(),
586            arg,
587            ingress_expiry_datetime,
588            nonce,
589        )?;
590        let operation = Some(Operation::Call {
591            canister: canister_id,
592            method: method_name,
593        });
594        let request_id = to_request_id(&content)?;
595        let serialized_bytes = sign_envelope(&content, self.identity.clone())?;
596
597        let response_body = self
598            .call_endpoint(effective_canister_id, serialized_bytes)
599            .await?;
600
601        match response_body {
602            TransportCallResponse::Replied { certificate } => {
603                let certificate =
604                    serde_cbor::from_slice(&certificate).map_err(AgentError::InvalidCborData)?;
605
606                self.verify(&certificate, effective_canister_id)?;
607                let status = lookup_request_status(&certificate, &request_id)?;
608
609                match status {
610                    RequestStatusResponse::Replied(reply) => {
611                        Ok(CallResponse::Response((reply.arg, certificate)))
612                    }
613                    RequestStatusResponse::Rejected(reject_response) => {
614                        Err(AgentError::CertifiedReject {
615                            reject: reject_response,
616                            operation,
617                        })?
618                    }
619                    _ => Ok(CallResponse::Poll(request_id)),
620                }
621            }
622            TransportCallResponse::Accepted => Ok(CallResponse::Poll(request_id)),
623            TransportCallResponse::NonReplicatedRejection(reject_response) => {
624                Err(AgentError::UncertifiedReject {
625                    reject: reject_response,
626                    operation,
627                })
628            }
629        }
630    }
631
632    /// Send the signed update to the network. Will return a [`CallResponse<Vec<u8>>`].
633    /// The bytes will be checked to verify that it is a valid update.
634    /// If you want to inspect the fields of the update, use [`signed_update_inspect`] before calling this method.
635    pub async fn update_signed(
636        &self,
637        effective_canister_id: Principal,
638        signed_update: Vec<u8>,
639    ) -> Result<CallResponse<Vec<u8>>, AgentError> {
640        let envelope: Envelope =
641            serde_cbor::from_slice(&signed_update).map_err(AgentError::InvalidCborData)?;
642        let EnvelopeContent::Call {
643            canister_id,
644            method_name,
645            ..
646        } = &*envelope.content
647        else {
648            return Err(AgentError::CallDataMismatch {
649                field: "request_type".to_string(),
650                value_arg: "update".to_string(),
651                value_cbor: if matches!(*envelope.content, EnvelopeContent::Query { .. }) {
652                    "query"
653                } else {
654                    "read_state"
655                }
656                .to_string(),
657            });
658        };
659        let operation = Some(Operation::Call {
660            canister: *canister_id,
661            method: method_name.clone(),
662        });
663        let request_id = to_request_id(&envelope.content)?;
664
665        let response_body = self
666            .call_endpoint(effective_canister_id, signed_update)
667            .await?;
668
669        match response_body {
670            TransportCallResponse::Replied { certificate } => {
671                let certificate =
672                    serde_cbor::from_slice(&certificate).map_err(AgentError::InvalidCborData)?;
673
674                self.verify(&certificate, effective_canister_id)?;
675                let status = lookup_request_status(&certificate, &request_id)?;
676
677                match status {
678                    RequestStatusResponse::Replied(reply) => Ok(CallResponse::Response(reply.arg)),
679                    RequestStatusResponse::Rejected(reject_response) => {
680                        Err(AgentError::CertifiedReject {
681                            reject: reject_response,
682                            operation,
683                        })?
684                    }
685                    _ => Ok(CallResponse::Poll(request_id)),
686                }
687            }
688            TransportCallResponse::Accepted => Ok(CallResponse::Poll(request_id)),
689            TransportCallResponse::NonReplicatedRejection(reject_response) => {
690                Err(AgentError::UncertifiedReject {
691                    reject: reject_response,
692                    operation,
693                })
694            }
695        }
696    }
697
698    fn update_content(
699        &self,
700        canister_id: Principal,
701        method_name: String,
702        arg: Vec<u8>,
703        ingress_expiry_datetime: Option<u64>,
704        nonce: Option<Vec<u8>>,
705    ) -> Result<EnvelopeContent, AgentError> {
706        Ok(EnvelopeContent::Call {
707            canister_id,
708            method_name,
709            arg,
710            nonce,
711            sender: self.identity.sender().map_err(AgentError::SigningError)?,
712            ingress_expiry: ingress_expiry_datetime.unwrap_or_else(|| self.get_expiry_date()),
713        })
714    }
715
716    fn get_retry_policy(&self) -> ExponentialBackoff<SystemClock> {
717        ExponentialBackoffBuilder::new()
718            .with_initial_interval(Duration::from_millis(500))
719            .with_max_interval(Duration::from_secs(1))
720            .with_multiplier(1.4)
721            .with_max_elapsed_time(Some(self.max_polling_time))
722            .build()
723    }
724
725    /// Wait for `request_status` to return a Replied response and return the arg.
726    pub async fn wait_signed(
727        &self,
728        request_id: &RequestId,
729        effective_canister_id: Principal,
730        signed_request_status: Vec<u8>,
731    ) -> Result<(Vec<u8>, Certificate), AgentError> {
732        let mut retry_policy = self.get_retry_policy();
733
734        let mut request_accepted = false;
735        let (resp, cert) = self
736            .request_status_signed(
737                request_id,
738                effective_canister_id,
739                signed_request_status.clone(),
740            )
741            .await?;
742        loop {
743            match resp {
744                RequestStatusResponse::Unknown => {}
745
746                RequestStatusResponse::Received | RequestStatusResponse::Processing => {
747                    if !request_accepted {
748                        retry_policy.reset();
749                        request_accepted = true;
750                    }
751                }
752
753                RequestStatusResponse::Replied(ReplyResponse { arg, .. }) => {
754                    return Ok((arg, cert))
755                }
756
757                RequestStatusResponse::Rejected(response) => {
758                    return Err(AgentError::CertifiedReject {
759                        reject: response,
760                        operation: None,
761                    })
762                }
763
764                RequestStatusResponse::Done => {
765                    return Err(AgentError::RequestStatusDoneNoReply(String::from(
766                        *request_id,
767                    )))
768                }
769            };
770
771            match retry_policy.next_backoff() {
772                Some(duration) => crate::util::sleep(duration).await,
773
774                None => return Err(AgentError::TimeoutWaitingForResponse()),
775            }
776        }
777    }
778
779    /// Call `request_status` on the `RequestId` in a loop and return the response as a byte vector.
780    pub async fn wait(
781        &self,
782        request_id: &RequestId,
783        effective_canister_id: Principal,
784    ) -> Result<(Vec<u8>, Certificate), AgentError> {
785        self.wait_inner(request_id, effective_canister_id, None)
786            .await
787    }
788
789    async fn wait_inner(
790        &self,
791        request_id: &RequestId,
792        effective_canister_id: Principal,
793        operation: Option<Operation>,
794    ) -> Result<(Vec<u8>, Certificate), AgentError> {
795        let mut retry_policy = self.get_retry_policy();
796
797        let mut request_accepted = false;
798        loop {
799            let (resp, cert) = self
800                .request_status_raw(request_id, effective_canister_id)
801                .await?;
802            match resp {
803                RequestStatusResponse::Unknown => {}
804
805                RequestStatusResponse::Received | RequestStatusResponse::Processing => {
806                    if !request_accepted {
807                        // The system will return RequestStatusResponse::Unknown
808                        // until the request is accepted
809                        // and we generally cannot know how long that will take.
810                        // State transitions between Received and Processing may be
811                        // instantaneous. Therefore, once we know the request is accepted,
812                        // we should restart the backoff so the request does not time out.
813
814                        retry_policy.reset();
815                        request_accepted = true;
816                    }
817                }
818
819                RequestStatusResponse::Replied(ReplyResponse { arg, .. }) => {
820                    return Ok((arg, cert))
821                }
822
823                RequestStatusResponse::Rejected(response) => {
824                    return Err(AgentError::CertifiedReject {
825                        reject: response,
826                        operation,
827                    })
828                }
829
830                RequestStatusResponse::Done => {
831                    return Err(AgentError::RequestStatusDoneNoReply(String::from(
832                        *request_id,
833                    )))
834                }
835            };
836
837            match retry_policy.next_backoff() {
838                Some(duration) => crate::util::sleep(duration).await,
839
840                None => return Err(AgentError::TimeoutWaitingForResponse()),
841            }
842        }
843    }
844
845    /// Request the raw state tree directly, under an effective canister ID.
846    /// See [the protocol docs](https://internetcomputer.org/docs/current/references/ic-interface-spec#http-read-state) for more information.
847    pub async fn read_state_raw(
848        &self,
849        paths: Vec<Vec<Label>>,
850        effective_canister_id: Principal,
851    ) -> Result<Certificate, AgentError> {
852        let content = self.read_state_content(paths)?;
853        let serialized_bytes = sign_envelope(&content, self.identity.clone())?;
854
855        let read_state_response: ReadStateResponse = self
856            .read_state_endpoint(effective_canister_id, serialized_bytes)
857            .await?;
858        let cert: Certificate = serde_cbor::from_slice(&read_state_response.certificate)
859            .map_err(AgentError::InvalidCborData)?;
860        self.verify(&cert, effective_canister_id)?;
861        Ok(cert)
862    }
863
864    /// Request the raw state tree directly, under a subnet ID.
865    /// See [the protocol docs](https://internetcomputer.org/docs/current/references/ic-interface-spec#http-read-state) for more information.
866    pub async fn read_subnet_state_raw(
867        &self,
868        paths: Vec<Vec<Label>>,
869        subnet_id: Principal,
870    ) -> Result<Certificate, AgentError> {
871        let content = self.read_state_content(paths)?;
872        let serialized_bytes = sign_envelope(&content, self.identity.clone())?;
873
874        let read_state_response: ReadStateResponse = self
875            .read_subnet_state_endpoint(subnet_id, serialized_bytes)
876            .await?;
877        let cert: Certificate = serde_cbor::from_slice(&read_state_response.certificate)
878            .map_err(AgentError::InvalidCborData)?;
879        self.verify_for_subnet(&cert, subnet_id)?;
880        Ok(cert)
881    }
882
883    fn read_state_content(&self, paths: Vec<Vec<Label>>) -> Result<EnvelopeContent, AgentError> {
884        Ok(EnvelopeContent::ReadState {
885            sender: self.identity.sender().map_err(AgentError::SigningError)?,
886            paths,
887            ingress_expiry: self.get_expiry_date(),
888        })
889    }
890
891    /// Verify a certificate, checking delegation if present.
892    /// Only passes if the certificate also has authority over the canister.
893    pub fn verify(
894        &self,
895        cert: &Certificate,
896        effective_canister_id: Principal,
897    ) -> Result<(), AgentError> {
898        self.verify_cert(cert, effective_canister_id)?;
899        self.verify_cert_timestamp(cert)?;
900        Ok(())
901    }
902
903    fn verify_cert(
904        &self,
905        cert: &Certificate,
906        effective_canister_id: Principal,
907    ) -> Result<(), AgentError> {
908        let sig = &cert.signature;
909
910        let root_hash = cert.tree.digest();
911        let mut msg = vec![];
912        msg.extend_from_slice(IC_STATE_ROOT_DOMAIN_SEPARATOR);
913        msg.extend_from_slice(&root_hash);
914
915        let der_key = self.check_delegation(&cert.delegation, effective_canister_id)?;
916        let key = extract_der(der_key)?;
917
918        ic_verify_bls_signature::verify_bls_signature(sig, &msg, &key)
919            .map_err(|_| AgentError::CertificateVerificationFailed())?;
920        Ok(())
921    }
922
923    /// Verify a certificate, checking delegation if present.
924    /// Only passes if the certificate is for the specified subnet.
925    pub fn verify_for_subnet(
926        &self,
927        cert: &Certificate,
928        subnet_id: Principal,
929    ) -> Result<(), AgentError> {
930        self.verify_cert_for_subnet(cert, subnet_id)?;
931        self.verify_cert_timestamp(cert)?;
932        Ok(())
933    }
934
935    fn verify_cert_for_subnet(
936        &self,
937        cert: &Certificate,
938        subnet_id: Principal,
939    ) -> Result<(), AgentError> {
940        let sig = &cert.signature;
941
942        let root_hash = cert.tree.digest();
943        let mut msg = vec![];
944        msg.extend_from_slice(IC_STATE_ROOT_DOMAIN_SEPARATOR);
945        msg.extend_from_slice(&root_hash);
946
947        let der_key = self.check_delegation_for_subnet(&cert.delegation, subnet_id)?;
948        let key = extract_der(der_key)?;
949
950        ic_verify_bls_signature::verify_bls_signature(sig, &msg, &key)
951            .map_err(|_| AgentError::CertificateVerificationFailed())?;
952        Ok(())
953    }
954
955    fn verify_cert_timestamp(&self, cert: &Certificate) -> Result<(), AgentError> {
956        let time = lookup_time(cert)?;
957        if (OffsetDateTime::now_utc()
958            - OffsetDateTime::from_unix_timestamp_nanos(time.into()).unwrap())
959        .abs()
960            > self.ingress_expiry
961        {
962            Err(AgentError::CertificateOutdated(self.ingress_expiry))
963        } else {
964            Ok(())
965        }
966    }
967
968    fn check_delegation(
969        &self,
970        delegation: &Option<Delegation>,
971        effective_canister_id: Principal,
972    ) -> Result<Vec<u8>, AgentError> {
973        match delegation {
974            None => Ok(self.read_root_key()),
975            Some(delegation) => {
976                let cert: Certificate = serde_cbor::from_slice(&delegation.certificate)
977                    .map_err(AgentError::InvalidCborData)?;
978                if cert.delegation.is_some() {
979                    return Err(AgentError::CertificateHasTooManyDelegations);
980                }
981                self.verify_cert(&cert, effective_canister_id)?;
982                let canister_range_lookup = [
983                    "subnet".as_bytes(),
984                    delegation.subnet_id.as_ref(),
985                    "canister_ranges".as_bytes(),
986                ];
987                let canister_range = lookup_value(&cert.tree, canister_range_lookup)?;
988                let ranges: Vec<(Principal, Principal)> =
989                    serde_cbor::from_slice(canister_range).map_err(AgentError::InvalidCborData)?;
990                if !principal_is_within_ranges(&effective_canister_id, &ranges[..]) {
991                    // the certificate is not authorized to answer calls for this canister
992                    return Err(AgentError::CertificateNotAuthorized());
993                }
994
995                let public_key_path = [
996                    "subnet".as_bytes(),
997                    delegation.subnet_id.as_ref(),
998                    "public_key".as_bytes(),
999                ];
1000                lookup_value(&cert.tree, public_key_path).map(<[u8]>::to_vec)
1001            }
1002        }
1003    }
1004
1005    fn check_delegation_for_subnet(
1006        &self,
1007        delegation: &Option<Delegation>,
1008        subnet_id: Principal,
1009    ) -> Result<Vec<u8>, AgentError> {
1010        match delegation {
1011            None => Ok(self.read_root_key()),
1012            Some(delegation) => {
1013                let cert: Certificate = serde_cbor::from_slice(&delegation.certificate)
1014                    .map_err(AgentError::InvalidCborData)?;
1015                if cert.delegation.is_some() {
1016                    return Err(AgentError::CertificateHasTooManyDelegations);
1017                }
1018                self.verify_cert_for_subnet(&cert, subnet_id)?;
1019                let public_key_path = [
1020                    "subnet".as_bytes(),
1021                    delegation.subnet_id.as_ref(),
1022                    "public_key".as_bytes(),
1023                ];
1024                let pk = lookup_value(&cert.tree, public_key_path)
1025                    .map_err(|_| AgentError::CertificateNotAuthorized())?
1026                    .to_vec();
1027                Ok(pk)
1028            }
1029        }
1030    }
1031
1032    /// Request information about a particular canister for a single state subkey.
1033    /// See [the protocol docs](https://internetcomputer.org/docs/current/references/ic-interface-spec#state-tree-canister-information) for more information.
1034    pub async fn read_state_canister_info(
1035        &self,
1036        canister_id: Principal,
1037        path: &str,
1038    ) -> Result<Vec<u8>, AgentError> {
1039        let paths: Vec<Vec<Label>> = vec![vec![
1040            "canister".into(),
1041            Label::from_bytes(canister_id.as_slice()),
1042            path.into(),
1043        ]];
1044
1045        let cert = self.read_state_raw(paths, canister_id).await?;
1046
1047        lookup_canister_info(cert, canister_id, path)
1048    }
1049
1050    /// Request the bytes of the canister's custom section `icp:public <path>` or `icp:private <path>`.
1051    pub async fn read_state_canister_metadata(
1052        &self,
1053        canister_id: Principal,
1054        path: &str,
1055    ) -> Result<Vec<u8>, AgentError> {
1056        let paths: Vec<Vec<Label>> = vec![vec![
1057            "canister".into(),
1058            Label::from_bytes(canister_id.as_slice()),
1059            "metadata".into(),
1060            path.into(),
1061        ]];
1062
1063        let cert = self.read_state_raw(paths, canister_id).await?;
1064
1065        lookup_canister_metadata(cert, canister_id, path)
1066    }
1067
1068    /// Request a list of metrics about the subnet.
1069    pub async fn read_state_subnet_metrics(
1070        &self,
1071        subnet_id: Principal,
1072    ) -> Result<SubnetMetrics, AgentError> {
1073        let paths = vec![vec![
1074            "subnet".into(),
1075            Label::from_bytes(subnet_id.as_slice()),
1076            "metrics".into(),
1077        ]];
1078        let cert = self.read_subnet_state_raw(paths, subnet_id).await?;
1079        lookup_subnet_metrics(cert, subnet_id)
1080    }
1081
1082    /// Fetches the status of a particular request by its ID.
1083    pub async fn request_status_raw(
1084        &self,
1085        request_id: &RequestId,
1086        effective_canister_id: Principal,
1087    ) -> Result<(RequestStatusResponse, Certificate), AgentError> {
1088        let paths: Vec<Vec<Label>> =
1089            vec![vec!["request_status".into(), request_id.to_vec().into()]];
1090
1091        let cert = self.read_state_raw(paths, effective_canister_id).await?;
1092
1093        Ok((lookup_request_status(&cert, request_id)?, cert))
1094    }
1095
1096    /// Send the signed `request_status` to the network. Will return [`RequestStatusResponse`].
1097    /// The bytes will be checked to verify that it is a valid `request_status`.
1098    /// If you want to inspect the fields of the `request_status`, use [`signed_request_status_inspect`] before calling this method.
1099    pub async fn request_status_signed(
1100        &self,
1101        request_id: &RequestId,
1102        effective_canister_id: Principal,
1103        signed_request_status: Vec<u8>,
1104    ) -> Result<(RequestStatusResponse, Certificate), AgentError> {
1105        let _envelope: Envelope =
1106            serde_cbor::from_slice(&signed_request_status).map_err(AgentError::InvalidCborData)?;
1107        let read_state_response: ReadStateResponse = self
1108            .read_state_endpoint(effective_canister_id, signed_request_status)
1109            .await?;
1110
1111        let cert: Certificate = serde_cbor::from_slice(&read_state_response.certificate)
1112            .map_err(AgentError::InvalidCborData)?;
1113        self.verify(&cert, effective_canister_id)?;
1114        Ok((lookup_request_status(&cert, request_id)?, cert))
1115    }
1116
1117    /// Returns an `UpdateBuilder` enabling the construction of an update call without
1118    /// passing all arguments.
1119    pub fn update<S: Into<String>>(
1120        &self,
1121        canister_id: &Principal,
1122        method_name: S,
1123    ) -> UpdateBuilder {
1124        UpdateBuilder::new(self, *canister_id, method_name.into())
1125    }
1126
1127    /// Calls and returns the information returned by the status endpoint of a replica.
1128    pub async fn status(&self) -> Result<Status, AgentError> {
1129        let endpoint = "api/v2/status";
1130        let bytes = self.execute(Method::GET, endpoint, None).await?.1;
1131
1132        let cbor: serde_cbor::Value =
1133            serde_cbor::from_slice(&bytes).map_err(AgentError::InvalidCborData)?;
1134
1135        Status::try_from(&cbor).map_err(|_| AgentError::InvalidReplicaStatus)
1136    }
1137
1138    /// Returns a `QueryBuilder` enabling the construction of a query call without
1139    /// passing all arguments.
1140    pub fn query<S: Into<String>>(&self, canister_id: &Principal, method_name: S) -> QueryBuilder {
1141        QueryBuilder::new(self, *canister_id, method_name.into())
1142    }
1143
1144    /// Sign a `request_status` call. This will return a [`signed::SignedRequestStatus`]
1145    /// which contains all fields of the `request_status` and the signed `request_status` in CBOR encoding
1146    pub fn sign_request_status(
1147        &self,
1148        effective_canister_id: Principal,
1149        request_id: RequestId,
1150    ) -> Result<SignedRequestStatus, AgentError> {
1151        let paths: Vec<Vec<Label>> =
1152            vec![vec!["request_status".into(), request_id.to_vec().into()]];
1153        let read_state_content = self.read_state_content(paths)?;
1154        let signed_request_status = sign_envelope(&read_state_content, self.identity.clone())?;
1155        let ingress_expiry = read_state_content.ingress_expiry();
1156        let sender = *read_state_content.sender();
1157        Ok(SignedRequestStatus {
1158            ingress_expiry,
1159            sender,
1160            effective_canister_id,
1161            request_id,
1162            signed_request_status,
1163        })
1164    }
1165
1166    async fn get_subnet_by_canister(
1167        &self,
1168        canister: &Principal,
1169    ) -> Result<Arc<Subnet>, AgentError> {
1170        let subnet = self
1171            .subnet_key_cache
1172            .lock()
1173            .unwrap()
1174            .get_subnet_by_canister(canister);
1175        if let Some(subnet) = subnet {
1176            Ok(subnet)
1177        } else {
1178            self.fetch_subnet_by_canister(canister).await
1179        }
1180    }
1181
1182    /// Retrieve all existing API boundary nodes from the state tree via endpoint `/api/v2/canister/<effective_canister_id>/read_state`
1183    pub async fn fetch_api_boundary_nodes_by_canister_id(
1184        &self,
1185        canister_id: Principal,
1186    ) -> Result<Vec<ApiBoundaryNode>, AgentError> {
1187        let paths = vec![vec!["api_boundary_nodes".into()]];
1188        let certificate = self.read_state_raw(paths, canister_id).await?;
1189        let api_boundary_nodes = lookup_api_boundary_nodes(certificate)?;
1190        Ok(api_boundary_nodes)
1191    }
1192
1193    /// Retrieve all existing API boundary nodes from the state tree via endpoint `/api/v2/subnet/<subnet_id>/read_state`
1194    pub async fn fetch_api_boundary_nodes_by_subnet_id(
1195        &self,
1196        subnet_id: Principal,
1197    ) -> Result<Vec<ApiBoundaryNode>, AgentError> {
1198        let paths = vec![vec!["api_boundary_nodes".into()]];
1199        let certificate = self.read_subnet_state_raw(paths, subnet_id).await?;
1200        let api_boundary_nodes = lookup_api_boundary_nodes(certificate)?;
1201        Ok(api_boundary_nodes)
1202    }
1203
1204    async fn fetch_subnet_by_canister(
1205        &self,
1206        canister: &Principal,
1207    ) -> Result<Arc<Subnet>, AgentError> {
1208        let cert = self
1209            .read_state_raw(vec![vec!["subnet".into()]], *canister)
1210            .await?;
1211
1212        let (subnet_id, subnet) = lookup_subnet(&cert, &self.root_key.read().unwrap())?;
1213        let subnet = Arc::new(subnet);
1214        self.subnet_key_cache
1215            .lock()
1216            .unwrap()
1217            .insert_subnet(subnet_id, subnet.clone());
1218        Ok(subnet)
1219    }
1220
1221    async fn request(
1222        &self,
1223        method: Method,
1224        endpoint: &str,
1225        body: Option<Vec<u8>>,
1226    ) -> Result<(StatusCode, HeaderMap, Vec<u8>), AgentError> {
1227        let create_request_with_generated_url = || -> Result<Request, AgentError> {
1228            let url = self.route_provider.route()?.join(endpoint)?;
1229            let mut http_request = Request::new(method.clone(), url);
1230            http_request
1231                .headers_mut()
1232                .insert(CONTENT_TYPE, "application/cbor".parse().unwrap());
1233            *http_request.body_mut() = body.clone().map(Body::from);
1234            Ok(http_request)
1235        };
1236
1237        let response = self
1238            .client
1239            .call(
1240                &create_request_with_generated_url,
1241                self.max_tcp_error_retries,
1242            )
1243            .await?;
1244
1245        let http_status = response.status();
1246        let response_headers = response.headers().clone();
1247
1248        // Size Check (Content-Length)
1249        if matches!(self
1250            .max_response_body_size
1251            .zip(response.content_length()), Some((size_limit, content_length)) if content_length > size_limit as u64)
1252        {
1253            return Err(AgentError::ResponseSizeExceededLimit());
1254        }
1255
1256        let mut body: Vec<u8> = response
1257            .content_length()
1258            .map_or_else(Vec::new, |n| Vec::with_capacity(n as usize));
1259
1260        let mut stream = response.bytes_stream();
1261
1262        while let Some(chunk) = stream.next().await {
1263            let chunk = chunk?;
1264
1265            // Size Check (Body Size)
1266            if matches!(self
1267                .max_response_body_size, Some(size_limit) if body.len() + chunk.len() > size_limit)
1268            {
1269                return Err(AgentError::ResponseSizeExceededLimit());
1270            }
1271
1272            body.extend_from_slice(chunk.as_ref());
1273        }
1274
1275        Ok((http_status, response_headers, body))
1276    }
1277
1278    async fn execute(
1279        &self,
1280        method: Method,
1281        endpoint: &str,
1282        body: Option<Vec<u8>>,
1283    ) -> Result<(StatusCode, Vec<u8>), AgentError> {
1284        let request_result = self.request(method.clone(), endpoint, body.clone()).await?;
1285
1286        let status = request_result.0;
1287        let headers = request_result.1;
1288        let body = request_result.2;
1289
1290        if status.is_client_error() || status.is_server_error() {
1291            Err(AgentError::HttpError(HttpErrorPayload {
1292                status: status.into(),
1293                content_type: headers
1294                    .get(CONTENT_TYPE)
1295                    .and_then(|value| value.to_str().ok())
1296                    .map(str::to_string),
1297                content: body,
1298            }))
1299        } else if !(status == StatusCode::OK || status == StatusCode::ACCEPTED) {
1300            Err(AgentError::InvalidHttpResponse(format!(
1301                "Expected `200`, `202`, 4xx`, or `5xx` HTTP status code. Got: {status}",
1302            )))
1303        } else {
1304            Ok((status, body))
1305        }
1306    }
1307}
1308
1309// Checks if a principal is contained within a list of principal ranges
1310// A range is a tuple: (low: Principal, high: Principal), as described here: https://internetcomputer.org/docs/current/references/ic-interface-spec#state-tree-subnet
1311fn principal_is_within_ranges(principal: &Principal, ranges: &[(Principal, Principal)]) -> bool {
1312    ranges
1313        .iter()
1314        .any(|r| principal >= &r.0 && principal <= &r.1)
1315}
1316
1317fn sign_envelope(
1318    content: &EnvelopeContent,
1319    identity: Arc<dyn Identity>,
1320) -> Result<Vec<u8>, AgentError> {
1321    let signature = identity.sign(content).map_err(AgentError::SigningError)?;
1322
1323    let envelope = Envelope {
1324        content: Cow::Borrowed(content),
1325        sender_pubkey: signature.public_key,
1326        sender_sig: signature.signature,
1327        sender_delegation: signature.delegations,
1328    };
1329
1330    let mut serialized_bytes = Vec::new();
1331    let mut serializer = serde_cbor::Serializer::new(&mut serialized_bytes);
1332    serializer.self_describe()?;
1333    envelope.serialize(&mut serializer)?;
1334
1335    Ok(serialized_bytes)
1336}
1337
1338/// Inspect the bytes to be sent as a query
1339/// Return Ok only when the bytes can be deserialized as a query and all fields match with the arguments
1340pub fn signed_query_inspect(
1341    sender: Principal,
1342    canister_id: Principal,
1343    method_name: &str,
1344    arg: &[u8],
1345    ingress_expiry: u64,
1346    signed_query: Vec<u8>,
1347) -> Result<(), AgentError> {
1348    let envelope: Envelope =
1349        serde_cbor::from_slice(&signed_query).map_err(AgentError::InvalidCborData)?;
1350    match envelope.content.as_ref() {
1351        EnvelopeContent::Query {
1352            ingress_expiry: ingress_expiry_cbor,
1353            sender: sender_cbor,
1354            canister_id: canister_id_cbor,
1355            method_name: method_name_cbor,
1356            arg: arg_cbor,
1357            nonce: _nonce,
1358        } => {
1359            if ingress_expiry != *ingress_expiry_cbor {
1360                return Err(AgentError::CallDataMismatch {
1361                    field: "ingress_expiry".to_string(),
1362                    value_arg: ingress_expiry.to_string(),
1363                    value_cbor: ingress_expiry_cbor.to_string(),
1364                });
1365            }
1366            if sender != *sender_cbor {
1367                return Err(AgentError::CallDataMismatch {
1368                    field: "sender".to_string(),
1369                    value_arg: sender.to_string(),
1370                    value_cbor: sender_cbor.to_string(),
1371                });
1372            }
1373            if canister_id != *canister_id_cbor {
1374                return Err(AgentError::CallDataMismatch {
1375                    field: "canister_id".to_string(),
1376                    value_arg: canister_id.to_string(),
1377                    value_cbor: canister_id_cbor.to_string(),
1378                });
1379            }
1380            if method_name != *method_name_cbor {
1381                return Err(AgentError::CallDataMismatch {
1382                    field: "method_name".to_string(),
1383                    value_arg: method_name.to_string(),
1384                    value_cbor: method_name_cbor.clone(),
1385                });
1386            }
1387            if arg != *arg_cbor {
1388                return Err(AgentError::CallDataMismatch {
1389                    field: "arg".to_string(),
1390                    value_arg: format!("{arg:?}"),
1391                    value_cbor: format!("{arg_cbor:?}"),
1392                });
1393            }
1394        }
1395        EnvelopeContent::Call { .. } => {
1396            return Err(AgentError::CallDataMismatch {
1397                field: "request_type".to_string(),
1398                value_arg: "query".to_string(),
1399                value_cbor: "call".to_string(),
1400            })
1401        }
1402        EnvelopeContent::ReadState { .. } => {
1403            return Err(AgentError::CallDataMismatch {
1404                field: "request_type".to_string(),
1405                value_arg: "query".to_string(),
1406                value_cbor: "read_state".to_string(),
1407            })
1408        }
1409    }
1410    Ok(())
1411}
1412
1413/// Inspect the bytes to be sent as an update
1414/// Return Ok only when the bytes can be deserialized as an update and all fields match with the arguments
1415pub fn signed_update_inspect(
1416    sender: Principal,
1417    canister_id: Principal,
1418    method_name: &str,
1419    arg: &[u8],
1420    ingress_expiry: u64,
1421    signed_update: Vec<u8>,
1422) -> Result<(), AgentError> {
1423    let envelope: Envelope =
1424        serde_cbor::from_slice(&signed_update).map_err(AgentError::InvalidCborData)?;
1425    match envelope.content.as_ref() {
1426        EnvelopeContent::Call {
1427            nonce: _nonce,
1428            ingress_expiry: ingress_expiry_cbor,
1429            sender: sender_cbor,
1430            canister_id: canister_id_cbor,
1431            method_name: method_name_cbor,
1432            arg: arg_cbor,
1433        } => {
1434            if ingress_expiry != *ingress_expiry_cbor {
1435                return Err(AgentError::CallDataMismatch {
1436                    field: "ingress_expiry".to_string(),
1437                    value_arg: ingress_expiry.to_string(),
1438                    value_cbor: ingress_expiry_cbor.to_string(),
1439                });
1440            }
1441            if sender != *sender_cbor {
1442                return Err(AgentError::CallDataMismatch {
1443                    field: "sender".to_string(),
1444                    value_arg: sender.to_string(),
1445                    value_cbor: sender_cbor.to_string(),
1446                });
1447            }
1448            if canister_id != *canister_id_cbor {
1449                return Err(AgentError::CallDataMismatch {
1450                    field: "canister_id".to_string(),
1451                    value_arg: canister_id.to_string(),
1452                    value_cbor: canister_id_cbor.to_string(),
1453                });
1454            }
1455            if method_name != *method_name_cbor {
1456                return Err(AgentError::CallDataMismatch {
1457                    field: "method_name".to_string(),
1458                    value_arg: method_name.to_string(),
1459                    value_cbor: method_name_cbor.clone(),
1460                });
1461            }
1462            if arg != *arg_cbor {
1463                return Err(AgentError::CallDataMismatch {
1464                    field: "arg".to_string(),
1465                    value_arg: format!("{arg:?}"),
1466                    value_cbor: format!("{arg_cbor:?}"),
1467                });
1468            }
1469        }
1470        EnvelopeContent::ReadState { .. } => {
1471            return Err(AgentError::CallDataMismatch {
1472                field: "request_type".to_string(),
1473                value_arg: "call".to_string(),
1474                value_cbor: "read_state".to_string(),
1475            })
1476        }
1477        EnvelopeContent::Query { .. } => {
1478            return Err(AgentError::CallDataMismatch {
1479                field: "request_type".to_string(),
1480                value_arg: "call".to_string(),
1481                value_cbor: "query".to_string(),
1482            })
1483        }
1484    }
1485    Ok(())
1486}
1487
1488/// Inspect the bytes to be sent as a `request_status`
1489/// Return Ok only when the bytes can be deserialized as a `request_status` and all fields match with the arguments
1490pub fn signed_request_status_inspect(
1491    sender: Principal,
1492    request_id: &RequestId,
1493    ingress_expiry: u64,
1494    signed_request_status: Vec<u8>,
1495) -> Result<(), AgentError> {
1496    let paths: Vec<Vec<Label>> = vec![vec!["request_status".into(), request_id.to_vec().into()]];
1497    let envelope: Envelope =
1498        serde_cbor::from_slice(&signed_request_status).map_err(AgentError::InvalidCborData)?;
1499    match envelope.content.as_ref() {
1500        EnvelopeContent::ReadState {
1501            ingress_expiry: ingress_expiry_cbor,
1502            sender: sender_cbor,
1503            paths: paths_cbor,
1504        } => {
1505            if ingress_expiry != *ingress_expiry_cbor {
1506                return Err(AgentError::CallDataMismatch {
1507                    field: "ingress_expiry".to_string(),
1508                    value_arg: ingress_expiry.to_string(),
1509                    value_cbor: ingress_expiry_cbor.to_string(),
1510                });
1511            }
1512            if sender != *sender_cbor {
1513                return Err(AgentError::CallDataMismatch {
1514                    field: "sender".to_string(),
1515                    value_arg: sender.to_string(),
1516                    value_cbor: sender_cbor.to_string(),
1517                });
1518            }
1519
1520            if paths != *paths_cbor {
1521                return Err(AgentError::CallDataMismatch {
1522                    field: "paths".to_string(),
1523                    value_arg: format!("{paths:?}"),
1524                    value_cbor: format!("{paths_cbor:?}"),
1525                });
1526            }
1527        }
1528        EnvelopeContent::Query { .. } => {
1529            return Err(AgentError::CallDataMismatch {
1530                field: "request_type".to_string(),
1531                value_arg: "read_state".to_string(),
1532                value_cbor: "query".to_string(),
1533            })
1534        }
1535        EnvelopeContent::Call { .. } => {
1536            return Err(AgentError::CallDataMismatch {
1537                field: "request_type".to_string(),
1538                value_arg: "read_state".to_string(),
1539                value_cbor: "call".to_string(),
1540            })
1541        }
1542    }
1543    Ok(())
1544}
1545
1546#[derive(Clone)]
1547struct SubnetCache {
1548    subnets: TimedCache<Principal, Arc<Subnet>>,
1549    canister_index: RangeInclusiveMap<Principal, Principal, PrincipalStep>,
1550}
1551
1552impl SubnetCache {
1553    fn new() -> Self {
1554        Self {
1555            subnets: TimedCache::with_lifespan(300),
1556            canister_index: RangeInclusiveMap::new_with_step_fns(),
1557        }
1558    }
1559
1560    fn get_subnet_by_canister(&mut self, canister: &Principal) -> Option<Arc<Subnet>> {
1561        self.canister_index
1562            .get(canister)
1563            .and_then(|subnet_id| self.subnets.cache_get(subnet_id).cloned())
1564            .filter(|subnet| subnet.canister_ranges.contains(canister))
1565    }
1566
1567    fn insert_subnet(&mut self, subnet_id: Principal, subnet: Arc<Subnet>) {
1568        self.subnets.cache_set(subnet_id, subnet.clone());
1569        for range in subnet.canister_ranges.iter() {
1570            self.canister_index.insert(range.clone(), subnet_id);
1571        }
1572    }
1573}
1574
1575#[derive(Clone, Copy)]
1576struct PrincipalStep;
1577
1578impl StepFns<Principal> for PrincipalStep {
1579    fn add_one(start: &Principal) -> Principal {
1580        let bytes = start.as_slice();
1581        let mut arr = [0; 29];
1582        arr[..bytes.len()].copy_from_slice(bytes);
1583        for byte in arr[..bytes.len() - 1].iter_mut().rev() {
1584            *byte = byte.wrapping_add(1);
1585            if *byte != 0 {
1586                break;
1587            }
1588        }
1589        Principal::from_slice(&arr[..bytes.len()])
1590    }
1591    fn sub_one(start: &Principal) -> Principal {
1592        let bytes = start.as_slice();
1593        let mut arr = [0; 29];
1594        arr[..bytes.len()].copy_from_slice(bytes);
1595        for byte in arr[..bytes.len() - 1].iter_mut().rev() {
1596            *byte = byte.wrapping_sub(1);
1597            if *byte != 255 {
1598                break;
1599            }
1600        }
1601        Principal::from_slice(&arr[..bytes.len()])
1602    }
1603}
1604
1605#[derive(Clone)]
1606pub(crate) struct Subnet {
1607    // This key is just fetched for completeness. Do not actually use this value as it is not authoritative in case of a rogue subnet.
1608    // If a future agent needs to know the subnet key then it should fetch /subnet from the *root* subnet.
1609    _key: Vec<u8>,
1610    node_keys: HashMap<Principal, Vec<u8>>,
1611    canister_ranges: RangeInclusiveSet<Principal, PrincipalStep>,
1612}
1613
1614/// API boundary node, which routes /api calls to IC replica nodes.
1615#[derive(Debug, Clone)]
1616pub struct ApiBoundaryNode {
1617    /// Domain name
1618    pub domain: String,
1619    /// IPv6 address in the hexadecimal notation with colons.
1620    pub ipv6_address: String,
1621    /// IPv4 address in the dotted-decimal notation.
1622    pub ipv4_address: Option<String>,
1623}
1624
1625/// A query request builder.
1626///
1627/// This makes it easier to do query calls without actually passing all arguments.
1628#[derive(Debug, Clone)]
1629#[non_exhaustive]
1630pub struct QueryBuilder<'agent> {
1631    agent: &'agent Agent,
1632    /// The [effective canister ID](https://internetcomputer.org/docs/current/references/ic-interface-spec#http-effective-canister-id) of the destination.
1633    pub effective_canister_id: Principal,
1634    /// The principal ID of the canister being called.
1635    pub canister_id: Principal,
1636    /// The name of the canister method being called.
1637    pub method_name: String,
1638    /// The argument blob to be passed to the method.
1639    pub arg: Vec<u8>,
1640    /// The Unix timestamp that the request will expire at.
1641    pub ingress_expiry_datetime: Option<u64>,
1642    /// Whether to include a nonce with the message.
1643    pub use_nonce: bool,
1644}
1645
1646impl<'agent> QueryBuilder<'agent> {
1647    /// Creates a new query builder with an agent for a particular canister method.
1648    pub fn new(agent: &'agent Agent, canister_id: Principal, method_name: String) -> Self {
1649        Self {
1650            agent,
1651            effective_canister_id: canister_id,
1652            canister_id,
1653            method_name,
1654            arg: vec![],
1655            ingress_expiry_datetime: None,
1656            use_nonce: false,
1657        }
1658    }
1659
1660    /// Sets the [effective canister ID](https://internetcomputer.org/docs/current/references/ic-interface-spec#http-effective-canister-id) of the destination.
1661    pub fn with_effective_canister_id(mut self, canister_id: Principal) -> Self {
1662        self.effective_canister_id = canister_id;
1663        self
1664    }
1665
1666    /// Sets the argument blob to pass to the canister. For most canisters this should be a Candid-serialized tuple.
1667    pub fn with_arg<A: Into<Vec<u8>>>(mut self, arg: A) -> Self {
1668        self.arg = arg.into();
1669        self
1670    }
1671
1672    /// Sets `ingress_expiry_datetime` to the provided timestamp, at nanosecond precision.
1673    pub fn expire_at(mut self, time: impl Into<OffsetDateTime>) -> Self {
1674        self.ingress_expiry_datetime = Some(time.into().unix_timestamp_nanos() as u64);
1675        self
1676    }
1677
1678    /// Sets `ingress_expiry_datetime` to `max(now, 4min)`.
1679    pub fn expire_after(mut self, duration: Duration) -> Self {
1680        self.ingress_expiry_datetime = Some(
1681            OffsetDateTime::now_utc()
1682                .saturating_add(duration.try_into().expect("negative duration"))
1683                .unix_timestamp_nanos() as u64,
1684        );
1685        self
1686    }
1687
1688    /// Uses a nonce generated with the agent's configured nonce factory. By default queries do not use nonces,
1689    /// and thus may get a (briefly) cached response.
1690    pub fn with_nonce_generation(mut self) -> Self {
1691        self.use_nonce = true;
1692        self
1693    }
1694
1695    /// Make a query call. This will return a byte vector.
1696    pub async fn call(self) -> Result<Vec<u8>, AgentError> {
1697        self.agent
1698            .query_raw(
1699                self.canister_id,
1700                self.effective_canister_id,
1701                self.method_name,
1702                self.arg,
1703                self.ingress_expiry_datetime,
1704                self.use_nonce,
1705                None,
1706            )
1707            .await
1708    }
1709
1710    /// Make a query call with signature verification. This will return a byte vector.
1711    ///
1712    /// Compared with [call][Self::call], this method will **always** verify the signature of the query response
1713    /// regardless the Agent level configuration from [`AgentBuilder::with_verify_query_signatures`].
1714    pub async fn call_with_verification(self) -> Result<Vec<u8>, AgentError> {
1715        self.agent
1716            .query_raw(
1717                self.canister_id,
1718                self.effective_canister_id,
1719                self.method_name,
1720                self.arg,
1721                self.ingress_expiry_datetime,
1722                self.use_nonce,
1723                Some(true),
1724            )
1725            .await
1726    }
1727
1728    /// Make a query call without signature verification. This will return a byte vector.
1729    ///
1730    /// Compared with [call][Self::call], this method will **never** verify the signature of the query response
1731    /// regardless the Agent level configuration from [`AgentBuilder::with_verify_query_signatures`].
1732    pub async fn call_without_verification(self) -> Result<Vec<u8>, AgentError> {
1733        self.agent
1734            .query_raw(
1735                self.canister_id,
1736                self.effective_canister_id,
1737                self.method_name,
1738                self.arg,
1739                self.ingress_expiry_datetime,
1740                self.use_nonce,
1741                Some(false),
1742            )
1743            .await
1744    }
1745
1746    /// Sign a query call. This will return a [`signed::SignedQuery`]
1747    /// which contains all fields of the query and the signed query in CBOR encoding
1748    pub fn sign(self) -> Result<SignedQuery, AgentError> {
1749        let effective_canister_id = self.effective_canister_id;
1750        let identity = self.agent.identity.clone();
1751        let content = self.into_envelope()?;
1752        let signed_query = sign_envelope(&content, identity)?;
1753        let EnvelopeContent::Query {
1754            ingress_expiry,
1755            sender,
1756            canister_id,
1757            method_name,
1758            arg,
1759            nonce,
1760        } = content
1761        else {
1762            unreachable!()
1763        };
1764        Ok(SignedQuery {
1765            ingress_expiry,
1766            sender,
1767            canister_id,
1768            method_name,
1769            arg,
1770            effective_canister_id,
1771            signed_query,
1772            nonce,
1773        })
1774    }
1775
1776    /// Converts the query builder into [`EnvelopeContent`] for external signing or storage.
1777    pub fn into_envelope(self) -> Result<EnvelopeContent, AgentError> {
1778        self.agent.query_content(
1779            self.canister_id,
1780            self.method_name,
1781            self.arg,
1782            self.ingress_expiry_datetime,
1783            self.use_nonce,
1784        )
1785    }
1786}
1787
1788impl<'agent> IntoFuture for QueryBuilder<'agent> {
1789    type IntoFuture = AgentFuture<'agent, Vec<u8>>;
1790    type Output = Result<Vec<u8>, AgentError>;
1791    fn into_future(self) -> Self::IntoFuture {
1792        Box::pin(self.call())
1793    }
1794}
1795
1796/// An in-flight canister update call. Useful primarily as a `Future`.
1797pub struct UpdateCall<'agent> {
1798    agent: &'agent Agent,
1799    response_future: AgentFuture<'agent, CallResponse<(Vec<u8>, Certificate)>>,
1800    effective_canister_id: Principal,
1801    canister_id: Principal,
1802    method_name: String,
1803}
1804
1805impl fmt::Debug for UpdateCall<'_> {
1806    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1807        f.debug_struct("UpdateCall")
1808            .field("agent", &self.agent)
1809            .field("effective_canister_id", &self.effective_canister_id)
1810            .finish_non_exhaustive()
1811    }
1812}
1813
1814impl Future for UpdateCall<'_> {
1815    type Output = Result<CallResponse<(Vec<u8>, Certificate)>, AgentError>;
1816    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1817        self.response_future.as_mut().poll(cx)
1818    }
1819}
1820
1821impl<'a> UpdateCall<'a> {
1822    /// Waits for the update call to be completed, polling if necessary.
1823    pub async fn and_wait(self) -> Result<(Vec<u8>, Certificate), AgentError> {
1824        let response = self.response_future.await?;
1825
1826        match response {
1827            CallResponse::Response(response) => Ok(response),
1828            CallResponse::Poll(request_id) => {
1829                self.agent
1830                    .wait_inner(
1831                        &request_id,
1832                        self.effective_canister_id,
1833                        Some(Operation::Call {
1834                            canister: self.canister_id,
1835                            method: self.method_name,
1836                        }),
1837                    )
1838                    .await
1839            }
1840        }
1841    }
1842}
1843/// An update request Builder.
1844///
1845/// This makes it easier to do update calls without actually passing all arguments or specifying
1846/// if you want to wait or not.
1847#[derive(Debug)]
1848pub struct UpdateBuilder<'agent> {
1849    agent: &'agent Agent,
1850    /// The [effective canister ID](https://internetcomputer.org/docs/current/references/ic-interface-spec#http-effective-canister-id) of the destination.
1851    pub effective_canister_id: Principal,
1852    /// The principal ID of the canister being called.
1853    pub canister_id: Principal,
1854    /// The name of the canister method being called.
1855    pub method_name: String,
1856    /// The argument blob to be passed to the method.
1857    pub arg: Vec<u8>,
1858    /// The Unix timestamp that the request will expire at.
1859    pub ingress_expiry_datetime: Option<u64>,
1860}
1861
1862impl<'agent> UpdateBuilder<'agent> {
1863    /// Creates a new update builder with an agent for a particular canister method.
1864    pub fn new(agent: &'agent Agent, canister_id: Principal, method_name: String) -> Self {
1865        Self {
1866            agent,
1867            effective_canister_id: canister_id,
1868            canister_id,
1869            method_name,
1870            arg: vec![],
1871            ingress_expiry_datetime: None,
1872        }
1873    }
1874
1875    /// Sets the [effective canister ID](https://internetcomputer.org/docs/current/references/ic-interface-spec#http-effective-canister-id) of the destination.
1876    pub fn with_effective_canister_id(mut self, canister_id: Principal) -> Self {
1877        self.effective_canister_id = canister_id;
1878        self
1879    }
1880
1881    /// Sets the argument blob to pass to the canister. For most canisters this should be a Candid-serialized tuple.
1882    pub fn with_arg<A: Into<Vec<u8>>>(mut self, arg: A) -> Self {
1883        self.arg = arg.into();
1884        self
1885    }
1886
1887    /// Sets `ingress_expiry_datetime` to the provided timestamp, at nanosecond precision.
1888    pub fn expire_at(mut self, time: impl Into<OffsetDateTime>) -> Self {
1889        self.ingress_expiry_datetime = Some(time.into().unix_timestamp_nanos() as u64);
1890        self
1891    }
1892
1893    /// Sets `ingress_expiry_datetime` to `min(now, 4min)`.
1894    pub fn expire_after(mut self, duration: Duration) -> Self {
1895        self.ingress_expiry_datetime = Some(
1896            OffsetDateTime::now_utc()
1897                .saturating_add(duration.try_into().expect("negative duration"))
1898                .unix_timestamp_nanos() as u64,
1899        );
1900        self
1901    }
1902
1903    /// Make an update call. This will call `request_status` on the `RequestId` in a loop and return
1904    /// the response as a byte vector.
1905    pub async fn call_and_wait(self) -> Result<Vec<u8>, AgentError> {
1906        self.call().and_wait().await.map(|x| x.0)
1907    }
1908
1909    /// Make an update call. This will return a `RequestId`.
1910    /// The `RequestId` should then be used for `request_status` (most likely in a loop).
1911    pub fn call(self) -> UpdateCall<'agent> {
1912        let method_name = self.method_name.clone();
1913        let response_future = async move {
1914            self.agent
1915                .update_raw(
1916                    self.canister_id,
1917                    self.effective_canister_id,
1918                    self.method_name,
1919                    self.arg,
1920                    self.ingress_expiry_datetime,
1921                )
1922                .await
1923        };
1924        UpdateCall {
1925            agent: self.agent,
1926            response_future: Box::pin(response_future),
1927            effective_canister_id: self.effective_canister_id,
1928            canister_id: self.canister_id,
1929            method_name,
1930        }
1931    }
1932
1933    /// Sign a update call. This will return a [`signed::SignedUpdate`]
1934    /// which contains all fields of the update and the signed update in CBOR encoding
1935    pub fn sign(self) -> Result<SignedUpdate, AgentError> {
1936        let identity = self.agent.identity.clone();
1937        let effective_canister_id = self.effective_canister_id;
1938        let content = self.into_envelope()?;
1939        let signed_update = sign_envelope(&content, identity)?;
1940        let request_id = to_request_id(&content)?;
1941        let EnvelopeContent::Call {
1942            nonce,
1943            ingress_expiry,
1944            sender,
1945            canister_id,
1946            method_name,
1947            arg,
1948        } = content
1949        else {
1950            unreachable!()
1951        };
1952        Ok(SignedUpdate {
1953            nonce,
1954            ingress_expiry,
1955            sender,
1956            canister_id,
1957            method_name,
1958            arg,
1959            effective_canister_id,
1960            signed_update,
1961            request_id,
1962        })
1963    }
1964
1965    /// Converts the update builder into an [`EnvelopeContent`] for external signing or storage.
1966    pub fn into_envelope(self) -> Result<EnvelopeContent, AgentError> {
1967        let nonce = self.agent.nonce_factory.generate();
1968        self.agent.update_content(
1969            self.canister_id,
1970            self.method_name,
1971            self.arg,
1972            self.ingress_expiry_datetime,
1973            nonce,
1974        )
1975    }
1976}
1977
1978impl<'agent> IntoFuture for UpdateBuilder<'agent> {
1979    type IntoFuture = AgentFuture<'agent, Vec<u8>>;
1980    type Output = Result<Vec<u8>, AgentError>;
1981    fn into_future(self) -> Self::IntoFuture {
1982        Box::pin(self.call_and_wait())
1983    }
1984}
1985
1986/// HTTP client middleware. Implemented automatically for `reqwest`-compatible by-ref `tower::Service`, such as `reqwest_middleware`.
1987#[cfg_attr(target_family = "wasm", async_trait(?Send))]
1988#[cfg_attr(not(target_family = "wasm"), async_trait)]
1989pub trait HttpService: Send + Sync + Debug {
1990    /// Perform a HTTP request. Any retry logic should call `req` again, instead of `Request::try_clone`.
1991    async fn call<'a>(
1992        &'a self,
1993        req: &'a (dyn Fn() -> Result<Request, AgentError> + Send + Sync),
1994        max_retries: usize,
1995    ) -> Result<Response, AgentError>;
1996}
1997#[cfg(not(target_family = "wasm"))]
1998#[async_trait]
1999impl<T> HttpService for T
2000where
2001    for<'a> &'a T: Service<Request, Response = Response, Error = reqwest::Error>,
2002    for<'a> <&'a Self as Service<Request>>::Future: Send,
2003    T: Send + Sync + Debug + ?Sized,
2004{
2005    #[allow(clippy::needless_arbitrary_self_type)]
2006    async fn call<'a>(
2007        mut self: &'a Self,
2008        req: &'a (dyn Fn() -> Result<Request, AgentError> + Send + Sync),
2009        max_retries: usize,
2010    ) -> Result<Response, AgentError> {
2011        let mut retry_count = 0;
2012        loop {
2013            match Service::call(&mut self, req()?).await {
2014                Err(err) => {
2015                    // Network-related errors can be retried.
2016                    if err.is_connect() {
2017                        if retry_count >= max_retries {
2018                            return Err(AgentError::TransportError(err));
2019                        }
2020                        retry_count += 1;
2021                    }
2022                }
2023                Ok(resp) => return Ok(resp),
2024            }
2025        }
2026    }
2027}
2028
2029#[cfg(target_family = "wasm")]
2030#[async_trait(?Send)]
2031impl<T> HttpService for T
2032where
2033    for<'a> &'a T: Service<Request, Response = Response, Error = reqwest::Error>,
2034    T: Send + Sync + Debug + ?Sized,
2035{
2036    #[allow(clippy::needless_arbitrary_self_type)]
2037    async fn call<'a>(
2038        mut self: &'a Self,
2039        req: &'a (dyn Fn() -> Result<Request, AgentError> + Send + Sync),
2040        _: usize,
2041    ) -> Result<Response, AgentError> {
2042        Ok(Service::call(&mut self, req()?).await?)
2043    }
2044}
2045
2046#[derive(Debug)]
2047struct Retry429Logic {
2048    client: Client,
2049}
2050
2051#[cfg_attr(target_family = "wasm", async_trait(?Send))]
2052#[cfg_attr(not(target_family = "wasm"), async_trait)]
2053impl HttpService for Retry429Logic {
2054    async fn call<'a>(
2055        &'a self,
2056        req: &'a (dyn Fn() -> Result<Request, AgentError> + Send + Sync),
2057        _max_tcp_retries: usize,
2058    ) -> Result<Response, AgentError> {
2059        let mut retries = 0;
2060        loop {
2061            #[cfg(not(target_family = "wasm"))]
2062            let resp = self.client.call(req, _max_tcp_retries).await?;
2063            // Client inconveniently does not implement Service on wasm
2064            #[cfg(target_family = "wasm")]
2065            let resp = self.client.execute(req()?).await?;
2066            if resp.status() == StatusCode::TOO_MANY_REQUESTS {
2067                if retries == 6 {
2068                    break Ok(resp);
2069                } else {
2070                    retries += 1;
2071                    crate::util::sleep(Duration::from_millis(250)).await;
2072                    continue;
2073                }
2074            } else {
2075                break Ok(resp);
2076            }
2077        }
2078    }
2079}
2080
2081#[cfg(all(test, not(target_family = "wasm")))]
2082mod offline_tests {
2083    use super::*;
2084    use tokio::net::TcpListener;
2085    // Any tests that involve the network should go in agent_test, not here.
2086
2087    #[test]
2088    fn rounded_expiry() {
2089        let agent = Agent::builder()
2090            .with_url("http://not-a-real-url")
2091            .build()
2092            .unwrap();
2093        let mut prev_expiry = None;
2094        let mut num_timestamps = 0;
2095        for _ in 0..6 {
2096            let update = agent
2097                .update(&Principal::management_canister(), "not_a_method")
2098                .sign()
2099                .unwrap();
2100            if prev_expiry < Some(update.ingress_expiry) {
2101                prev_expiry = Some(update.ingress_expiry);
2102                num_timestamps += 1;
2103            }
2104        }
2105        // in six requests, there should be no more than two timestamps
2106        assert!(num_timestamps <= 2, "num_timestamps:{num_timestamps} > 2");
2107    }
2108
2109    #[tokio::test]
2110    async fn client_ratelimit() {
2111        let mock_server = TcpListener::bind("127.0.0.1:0").await.unwrap();
2112        let count = Arc::new(Mutex::new(0));
2113        let port = mock_server.local_addr().unwrap().port();
2114        tokio::spawn({
2115            let count = count.clone();
2116            async move {
2117                loop {
2118                    let (mut conn, _) = mock_server.accept().await.unwrap();
2119                    *count.lock().unwrap() += 1;
2120                    tokio::spawn(
2121                        // read all data, never reply
2122                        async move { tokio::io::copy(&mut conn, &mut tokio::io::sink()).await },
2123                    );
2124                }
2125            }
2126        });
2127        let agent = Agent::builder()
2128            .with_http_client(Client::builder().http1_only().build().unwrap())
2129            .with_url(format!("http://127.0.0.1:{port}"))
2130            .with_max_concurrent_requests(2)
2131            .build()
2132            .unwrap();
2133        for _ in 0..3 {
2134            let agent = agent.clone();
2135            tokio::spawn(async move {
2136                agent
2137                    .query(&"ryjl3-tyaaa-aaaaa-aaaba-cai".parse().unwrap(), "greet")
2138                    .call()
2139                    .await
2140            });
2141        }
2142        crate::util::sleep(Duration::from_millis(250)).await;
2143        assert_eq!(*count.lock().unwrap(), 2);
2144    }
2145}