ic_agent/agent/
mod.rs

1//! The main Agent module. Contains the [Agent] type and all associated structures.
2pub(crate) mod agent_config;
3pub mod agent_error;
4pub(crate) mod builder;
5// delete this module after 0.40
6#[doc(hidden)]
7#[deprecated(since = "0.38.0", note = "use the AgentBuilder methods")]
8pub mod http_transport;
9pub(crate) mod nonce;
10pub(crate) mod response_authentication;
11pub mod route_provider;
12pub mod status;
13
14pub use agent_config::AgentConfig;
15pub use agent_error::AgentError;
16use agent_error::{HttpErrorPayload, Operation};
17use async_lock::Semaphore;
18use async_trait::async_trait;
19pub use builder::AgentBuilder;
20use cached::{Cached, TimedCache};
21use ed25519_consensus::{Error as Ed25519Error, Signature, VerificationKey};
22use futures_util::StreamExt;
23use http::{header::CONTENT_TYPE, HeaderMap, Method, StatusCode};
24#[doc(inline)]
25pub use ic_transport_types::{
26    signed, CallResponse, Envelope, EnvelopeContent, RejectCode, RejectResponse, ReplyResponse,
27    RequestStatusResponse,
28};
29pub use nonce::{NonceFactory, NonceGenerator};
30use rangemap::{RangeInclusiveMap, RangeInclusiveSet, StepFns};
31use reqwest::{Body, Client, Request, Response};
32use route_provider::{
33    dynamic_routing::{
34        dynamic_route_provider::DynamicRouteProviderBuilder, node::Node,
35        snapshot::latency_based_routing::LatencyRoutingSnapshot,
36    },
37    RouteProvider, UrlUntilReady,
38};
39use time::OffsetDateTime;
40use tower_service::Service;
41
42#[cfg(test)]
43mod agent_test;
44
45use crate::{
46    agent::response_authentication::{
47        extract_der, lookup_canister_info, lookup_canister_metadata, lookup_request_status,
48        lookup_subnet, lookup_subnet_metrics, lookup_time, lookup_value,
49    },
50    export::Principal,
51    identity::Identity,
52    to_request_id, RequestId,
53};
54use backoff::{backoff::Backoff, ExponentialBackoffBuilder};
55use backoff::{exponential::ExponentialBackoff, SystemClock};
56use ic_certification::{Certificate, Delegation, Label};
57use ic_transport_types::{
58    signed::{SignedQuery, SignedRequestStatus, SignedUpdate},
59    QueryResponse, ReadStateResponse, SubnetMetrics, TransportCallResponse,
60};
61use serde::Serialize;
62use status::Status;
63use std::{
64    borrow::Cow,
65    collections::HashMap,
66    convert::TryFrom,
67    fmt::{self, Debug},
68    future::{Future, IntoFuture},
69    pin::Pin,
70    sync::{Arc, Mutex, RwLock},
71    task::{Context, Poll},
72    time::Duration,
73};
74
75use crate::agent::response_authentication::lookup_api_boundary_nodes;
76
77const IC_STATE_ROOT_DOMAIN_SEPARATOR: &[u8; 14] = b"\x0Dic-state-root";
78
79const IC_ROOT_KEY: &[u8; 133] = b"\x30\x81\x82\x30\x1d\x06\x0d\x2b\x06\x01\x04\x01\x82\xdc\x7c\x05\x03\x01\x02\x01\x06\x0c\x2b\x06\x01\x04\x01\x82\xdc\x7c\x05\x03\x02\x01\x03\x61\x00\x81\x4c\x0e\x6e\xc7\x1f\xab\x58\x3b\x08\xbd\x81\x37\x3c\x25\x5c\x3c\x37\x1b\x2e\x84\x86\x3c\x98\xa4\xf1\xe0\x8b\x74\x23\x5d\x14\xfb\x5d\x9c\x0c\xd5\x46\xd9\x68\x5f\x91\x3a\x0c\x0b\x2c\xc5\x34\x15\x83\xbf\x4b\x43\x92\xe4\x67\xdb\x96\xd6\x5b\x9b\xb4\xcb\x71\x71\x12\xf8\x47\x2e\x0d\x5a\x4d\x14\x50\x5f\xfd\x74\x84\xb0\x12\x91\x09\x1c\x5f\x87\xb9\x88\x83\x46\x3f\x98\x09\x1a\x0b\xaa\xae";
80
81#[cfg(not(target_family = "wasm"))]
82type AgentFuture<'a, V> = Pin<Box<dyn Future<Output = Result<V, AgentError>> + Send + 'a>>;
83
84#[cfg(target_family = "wasm")]
85type AgentFuture<'a, V> = Pin<Box<dyn Future<Output = Result<V, AgentError>> + 'a>>;
86
87/// A low level Agent to make calls to a Replica endpoint.
88///
89/// ```ignore
90/// # // This test is ignored because it requires an ic to be running. We run these
91/// # // in the ic-ref workflow.
92/// use ic_agent::{Agent, export::Principal};
93/// use candid::{Encode, Decode, CandidType, Nat};
94/// use serde::Deserialize;
95///
96/// #[derive(CandidType)]
97/// struct Argument {
98///   amount: Option<Nat>,
99/// }
100///
101/// #[derive(CandidType, Deserialize)]
102/// struct CreateCanisterResult {
103///   canister_id: Principal,
104/// }
105///
106/// # fn create_identity() -> impl ic_agent::Identity {
107/// #
108/// #     ic_agent::identity::BasicIdentity::from_signing_key(
109/// #         ed25519_consensus::SigningKey::new(rand::thread_rng())
110/// #     )
111/// # }
112/// #
113/// async fn create_a_canister() -> Result<Principal, Box<dyn std::error::Error>> {
114/// # let url = format!("http://localhost:{}", option_env!("IC_REF_PORT").unwrap_or("4943"));
115///   let agent = Agent::builder()
116///     .with_url(url)
117///     .with_identity(create_identity())
118///     .build()?;
119///
120///   // Only do the following call when not contacting the IC main net (e.g. a local emulator).
121///   // This is important as the main net public key is static and a rogue network could return
122///   // a different key.
123///   // If you know the root key ahead of time, you can use `agent.set_root_key(root_key);`.
124///   agent.fetch_root_key().await?;
125///   let management_canister_id = Principal::from_text("aaaaa-aa")?;
126///
127///   // Create a call to the management canister to create a new canister ID,
128///   // and wait for a result.
129///   // The effective canister id must belong to the canister ranges of the subnet at which the canister is created.
130///   let effective_canister_id = Principal::from_text("rwlgt-iiaaa-aaaaa-aaaaa-cai").unwrap();
131///   let response = agent.update(&management_canister_id, "provisional_create_canister_with_cycles")
132///     .with_effective_canister_id(effective_canister_id)
133///     .with_arg(Encode!(&Argument { amount: None })?)
134///     .await?;
135///
136///   let result = Decode!(response.as_slice(), CreateCanisterResult)?;
137///   let canister_id: Principal = Principal::from_text(&result.canister_id.to_text())?;
138///   Ok(canister_id)
139/// }
140///
141/// # let mut runtime = tokio::runtime::Runtime::new().unwrap();
142/// # runtime.block_on(async {
143/// let canister_id = create_a_canister().await.unwrap();
144/// eprintln!("{}", canister_id);
145/// # });
146/// ```
147///
148/// This agent does not understand Candid, and only acts on byte buffers.
149///
150/// Some methods return certificates. While there is a `verify_certificate` method, any certificate
151/// you receive from a method has already been verified and you do not need to manually verify it.
152#[derive(Clone)]
153pub struct Agent {
154    nonce_factory: Arc<dyn NonceGenerator>,
155    identity: Arc<dyn Identity>,
156    ingress_expiry: Duration,
157    root_key: Arc<RwLock<Vec<u8>>>,
158    client: Arc<dyn HttpService>,
159    route_provider: Arc<dyn RouteProvider>,
160    subnet_key_cache: Arc<Mutex<SubnetCache>>,
161    concurrent_requests_semaphore: Arc<Semaphore>,
162    verify_query_signatures: bool,
163    max_response_body_size: Option<usize>,
164    max_polling_time: Duration,
165    #[allow(dead_code)]
166    max_tcp_error_retries: usize,
167}
168
169impl fmt::Debug for Agent {
170    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
171        f.debug_struct("Agent")
172            .field("ingress_expiry", &self.ingress_expiry)
173            .finish_non_exhaustive()
174    }
175}
176
177impl Agent {
178    /// Create an instance of an [`AgentBuilder`] for building an [`Agent`]. This is simpler than
179    /// using the [`AgentConfig`] and [`Agent::new()`].
180    pub fn builder() -> builder::AgentBuilder {
181        Default::default()
182    }
183
184    /// Create an instance of an [`Agent`].
185    pub fn new(config: agent_config::AgentConfig) -> Result<Agent, AgentError> {
186        let client = config.http_service.unwrap_or_else(|| {
187            Arc::new(Retry429Logic {
188                client: config.client.unwrap_or_else(|| {
189                    #[cfg(not(target_family = "wasm"))]
190                    {
191                        Client::builder()
192                            .use_rustls_tls()
193                            .timeout(Duration::from_secs(360))
194                            .build()
195                            .expect("Could not create HTTP client.")
196                    }
197                    #[cfg(all(target_family = "wasm", feature = "wasm-bindgen"))]
198                    {
199                        Client::new()
200                    }
201                }),
202            })
203        });
204        Ok(Agent {
205            nonce_factory: config.nonce_factory,
206            identity: config.identity,
207            ingress_expiry: config.ingress_expiry,
208            root_key: Arc::new(RwLock::new(IC_ROOT_KEY.to_vec())),
209            client: client.clone(),
210            route_provider: if let Some(route_provider) = config.route_provider {
211                route_provider
212            } else if let Some(url) = config.url {
213                if config.background_dynamic_routing {
214                    assert!(
215                        url.scheme() == "https" && url.path() == "/" && url.port().is_none() && url.domain().is_some(),
216                        "in dynamic routing mode, URL must be in the exact form https://domain with no path, port, IP, or non-HTTPS scheme"
217                    );
218                    let seeds = vec![Node::new(url.domain().unwrap()).unwrap()];
219                    UrlUntilReady::new(url, async move {
220                        DynamicRouteProviderBuilder::new(
221                            LatencyRoutingSnapshot::new(),
222                            seeds,
223                            client,
224                        )
225                        .build()
226                        .await
227                    }) as Arc<dyn RouteProvider>
228                } else {
229                    Arc::new(url)
230                }
231            } else {
232                panic!("either route_provider or url must be specified");
233            },
234            subnet_key_cache: Arc::new(Mutex::new(SubnetCache::new())),
235            verify_query_signatures: config.verify_query_signatures,
236            concurrent_requests_semaphore: Arc::new(Semaphore::new(config.max_concurrent_requests)),
237            max_response_body_size: config.max_response_body_size,
238            max_tcp_error_retries: config.max_tcp_error_retries,
239            max_polling_time: config.max_polling_time,
240        })
241    }
242
243    /// Set the identity provider for signing messages.
244    ///
245    /// NOTE: if you change the identity while having update calls in
246    /// flight, you will not be able to [`Agent::request_status_raw`] the status of these
247    /// messages.
248    pub fn set_identity<I>(&mut self, identity: I)
249    where
250        I: 'static + Identity,
251    {
252        self.identity = Arc::new(identity);
253    }
254    /// Set the arc identity provider for signing messages.
255    ///
256    /// NOTE: if you change the identity while having update calls in
257    /// flight, you will not be able to [`Agent::request_status_raw`] the status of these
258    /// messages.
259    pub fn set_arc_identity(&mut self, identity: Arc<dyn Identity>) {
260        self.identity = identity;
261    }
262
263    /// By default, the agent is configured to talk to the main Internet Computer, and verifies
264    /// responses using a hard-coded public key.
265    ///
266    /// This function will instruct the agent to ask the endpoint for its public key, and use
267    /// that instead. This is required when talking to a local test instance, for example.
268    ///
269    /// *Only use this when you are  _not_ talking to the main Internet Computer, otherwise
270    /// you are prone to man-in-the-middle attacks! Do not call this function by default.*
271    pub async fn fetch_root_key(&self) -> Result<(), AgentError> {
272        if self.read_root_key()[..] != IC_ROOT_KEY[..] {
273            // already fetched the root key
274            return Ok(());
275        }
276        let status = self.status().await?;
277        let Some(root_key) = status.root_key else {
278            return Err(AgentError::NoRootKeyInStatus(status));
279        };
280        self.set_root_key(root_key);
281        Ok(())
282    }
283
284    /// By default, the agent is configured to talk to the main Internet Computer, and verifies
285    /// responses using a hard-coded public key.
286    ///
287    /// Using this function you can set the root key to a known one if you know if beforehand.
288    pub fn set_root_key(&self, root_key: Vec<u8>) {
289        *self.root_key.write().unwrap() = root_key;
290    }
291
292    /// Return the root key currently in use.
293    pub fn read_root_key(&self) -> Vec<u8> {
294        self.root_key.read().unwrap().clone()
295    }
296
297    fn get_expiry_date(&self) -> u64 {
298        let expiry_raw = OffsetDateTime::now_utc() + self.ingress_expiry;
299        let mut rounded = expiry_raw.replace_nanosecond(0).unwrap();
300        if self.ingress_expiry.as_secs() > 90 {
301            rounded = rounded.replace_second(0).unwrap();
302        }
303        rounded.unix_timestamp_nanos().try_into().unwrap()
304    }
305
306    /// Return the principal of the identity.
307    pub fn get_principal(&self) -> Result<Principal, String> {
308        self.identity.sender()
309    }
310
311    async fn query_endpoint<A>(
312        &self,
313        effective_canister_id: Principal,
314        serialized_bytes: Vec<u8>,
315    ) -> Result<A, AgentError>
316    where
317        A: serde::de::DeserializeOwned,
318    {
319        let _permit = self.concurrent_requests_semaphore.acquire().await;
320        let bytes = self
321            .execute(
322                Method::POST,
323                &format!("api/v2/canister/{}/query", effective_canister_id.to_text()),
324                Some(serialized_bytes),
325            )
326            .await?
327            .1;
328        serde_cbor::from_slice(&bytes).map_err(AgentError::InvalidCborData)
329    }
330
331    async fn read_state_endpoint<A>(
332        &self,
333        effective_canister_id: Principal,
334        serialized_bytes: Vec<u8>,
335    ) -> Result<A, AgentError>
336    where
337        A: serde::de::DeserializeOwned,
338    {
339        let _permit = self.concurrent_requests_semaphore.acquire().await;
340        let endpoint = format!(
341            "api/v2/canister/{}/read_state",
342            effective_canister_id.to_text()
343        );
344        let bytes = self
345            .execute(Method::POST, &endpoint, Some(serialized_bytes))
346            .await?
347            .1;
348        serde_cbor::from_slice(&bytes).map_err(AgentError::InvalidCborData)
349    }
350
351    async fn read_subnet_state_endpoint<A>(
352        &self,
353        subnet_id: Principal,
354        serialized_bytes: Vec<u8>,
355    ) -> Result<A, AgentError>
356    where
357        A: serde::de::DeserializeOwned,
358    {
359        let _permit = self.concurrent_requests_semaphore.acquire().await;
360        let endpoint = format!("api/v2/subnet/{}/read_state", subnet_id.to_text());
361        let bytes = self
362            .execute(Method::POST, &endpoint, Some(serialized_bytes))
363            .await?
364            .1;
365        serde_cbor::from_slice(&bytes).map_err(AgentError::InvalidCborData)
366    }
367
368    async fn call_endpoint(
369        &self,
370        effective_canister_id: Principal,
371        serialized_bytes: Vec<u8>,
372    ) -> Result<TransportCallResponse, AgentError> {
373        let _permit = self.concurrent_requests_semaphore.acquire().await;
374        let endpoint = format!("api/v3/canister/{}/call", effective_canister_id.to_text());
375        let (status_code, response_body) = self
376            .execute(Method::POST, &endpoint, Some(serialized_bytes))
377            .await?;
378
379        if status_code == StatusCode::ACCEPTED {
380            return Ok(TransportCallResponse::Accepted);
381        }
382
383        serde_cbor::from_slice(&response_body).map_err(AgentError::InvalidCborData)
384    }
385
386    /// The simplest way to do a query call; sends a byte array and will return a byte vector.
387    /// The encoding is left as an exercise to the user.
388    #[allow(clippy::too_many_arguments)]
389    async fn query_raw(
390        &self,
391        canister_id: Principal,
392        effective_canister_id: Principal,
393        method_name: String,
394        arg: Vec<u8>,
395        ingress_expiry_datetime: Option<u64>,
396        use_nonce: bool,
397        explicit_verify_query_signatures: Option<bool>,
398    ) -> Result<Vec<u8>, AgentError> {
399        let operation = Operation::Call {
400            canister: canister_id,
401            method: method_name.clone(),
402        };
403        let content = self.query_content(
404            canister_id,
405            method_name,
406            arg,
407            ingress_expiry_datetime,
408            use_nonce,
409        )?;
410        let serialized_bytes = sign_envelope(&content, self.identity.clone())?;
411        self.query_inner(
412            effective_canister_id,
413            serialized_bytes,
414            content.to_request_id(),
415            explicit_verify_query_signatures,
416            operation,
417        )
418        .await
419    }
420
421    /// Send the signed query to the network. Will return a byte vector.
422    /// The bytes will be checked if it is a valid query.
423    /// If you want to inspect the fields of the query call, use [`signed_query_inspect`] before calling this method.
424    pub async fn query_signed(
425        &self,
426        effective_canister_id: Principal,
427        signed_query: Vec<u8>,
428    ) -> Result<Vec<u8>, AgentError> {
429        let envelope: Envelope =
430            serde_cbor::from_slice(&signed_query).map_err(AgentError::InvalidCborData)?;
431        let EnvelopeContent::Query {
432            canister_id,
433            method_name,
434            ..
435        } = &*envelope.content
436        else {
437            return Err(AgentError::CallDataMismatch {
438                field: "request_type".to_string(),
439                value_arg: "query".to_string(),
440                value_cbor: if matches!(*envelope.content, EnvelopeContent::Call { .. }) {
441                    "update"
442                } else {
443                    "read_state"
444                }
445                .to_string(),
446            });
447        };
448        let operation = Operation::Call {
449            canister: *canister_id,
450            method: method_name.clone(),
451        };
452        self.query_inner(
453            effective_canister_id,
454            signed_query,
455            envelope.content.to_request_id(),
456            None,
457            operation,
458        )
459        .await
460    }
461
462    /// Helper function for performing both the query call and possibly a `read_state` to check the subnet node keys.
463    ///
464    /// This should be used instead of `query_endpoint`. No validation is performed on `signed_query`.
465    async fn query_inner(
466        &self,
467        effective_canister_id: Principal,
468        signed_query: Vec<u8>,
469        request_id: RequestId,
470        explicit_verify_query_signatures: Option<bool>,
471        operation: Operation,
472    ) -> Result<Vec<u8>, AgentError> {
473        let response = if explicit_verify_query_signatures.unwrap_or(self.verify_query_signatures) {
474            let (response, mut subnet) = futures_util::try_join!(
475                self.query_endpoint::<QueryResponse>(effective_canister_id, signed_query),
476                self.get_subnet_by_canister(&effective_canister_id)
477            )?;
478            if response.signatures().is_empty() {
479                return Err(AgentError::MissingSignature);
480            } else if response.signatures().len() > subnet.node_keys.len() {
481                return Err(AgentError::TooManySignatures {
482                    had: response.signatures().len(),
483                    needed: subnet.node_keys.len(),
484                });
485            }
486            for signature in response.signatures() {
487                if OffsetDateTime::now_utc()
488                    - OffsetDateTime::from_unix_timestamp_nanos(signature.timestamp.into()).unwrap()
489                    > self.ingress_expiry
490                {
491                    return Err(AgentError::CertificateOutdated(self.ingress_expiry));
492                }
493                let signable = response.signable(request_id, signature.timestamp);
494                let node_key = if let Some(node_key) = subnet.node_keys.get(&signature.identity) {
495                    node_key
496                } else {
497                    subnet = self
498                        .fetch_subnet_by_canister(&effective_canister_id)
499                        .await?;
500                    subnet
501                        .node_keys
502                        .get(&signature.identity)
503                        .ok_or(AgentError::CertificateNotAuthorized())?
504                };
505                if node_key.len() != 44 {
506                    return Err(AgentError::DerKeyLengthMismatch {
507                        expected: 44,
508                        actual: node_key.len(),
509                    });
510                }
511                const DER_PREFIX: [u8; 12] = [48, 42, 48, 5, 6, 3, 43, 101, 112, 3, 33, 0];
512                if node_key[..12] != DER_PREFIX {
513                    return Err(AgentError::DerPrefixMismatch {
514                        expected: DER_PREFIX.to_vec(),
515                        actual: node_key[..12].to_vec(),
516                    });
517                }
518                let pubkey =
519                    VerificationKey::try_from(<[u8; 32]>::try_from(&node_key[12..]).unwrap())
520                        .map_err(|_| AgentError::MalformedPublicKey)?;
521                let sig = Signature::from(
522                    <[u8; 64]>::try_from(&signature.signature[..])
523                        .map_err(|_| AgentError::MalformedSignature)?,
524                );
525
526                match pubkey.verify(&sig, &signable) {
527                    Err(Ed25519Error::InvalidSignature) => {
528                        return Err(AgentError::QuerySignatureVerificationFailed)
529                    }
530                    Err(Ed25519Error::InvalidSliceLength) => {
531                        return Err(AgentError::MalformedSignature)
532                    }
533                    Err(Ed25519Error::MalformedPublicKey) => {
534                        return Err(AgentError::MalformedPublicKey)
535                    }
536                    Ok(()) => (),
537                    _ => unreachable!(),
538                }
539            }
540            response
541        } else {
542            self.query_endpoint::<QueryResponse>(effective_canister_id, signed_query)
543                .await?
544        };
545
546        match response {
547            QueryResponse::Replied { reply, .. } => Ok(reply.arg),
548            QueryResponse::Rejected { reject, .. } => Err(AgentError::UncertifiedReject {
549                reject,
550                operation: Some(operation),
551            }),
552        }
553    }
554
555    fn query_content(
556        &self,
557        canister_id: Principal,
558        method_name: String,
559        arg: Vec<u8>,
560        ingress_expiry_datetime: Option<u64>,
561        use_nonce: bool,
562    ) -> Result<EnvelopeContent, AgentError> {
563        Ok(EnvelopeContent::Query {
564            sender: self.identity.sender().map_err(AgentError::SigningError)?,
565            canister_id,
566            method_name,
567            arg,
568            ingress_expiry: ingress_expiry_datetime.unwrap_or_else(|| self.get_expiry_date()),
569            nonce: use_nonce.then(|| self.nonce_factory.generate()).flatten(),
570        })
571    }
572
573    /// The simplest way to do an update call; sends a byte array and will return a response, [`CallResponse`], from the replica.
574    async fn update_raw(
575        &self,
576        canister_id: Principal,
577        effective_canister_id: Principal,
578        method_name: String,
579        arg: Vec<u8>,
580        ingress_expiry_datetime: Option<u64>,
581    ) -> Result<CallResponse<(Vec<u8>, Certificate)>, AgentError> {
582        let nonce = self.nonce_factory.generate();
583        let content = self.update_content(
584            canister_id,
585            method_name.clone(),
586            arg,
587            ingress_expiry_datetime,
588            nonce,
589        )?;
590        let operation = Some(Operation::Call {
591            canister: canister_id,
592            method: method_name,
593        });
594        let request_id = to_request_id(&content)?;
595        let serialized_bytes = sign_envelope(&content, self.identity.clone())?;
596
597        let response_body = self
598            .call_endpoint(effective_canister_id, serialized_bytes)
599            .await?;
600
601        match response_body {
602            TransportCallResponse::Replied { certificate } => {
603                let certificate =
604                    serde_cbor::from_slice(&certificate).map_err(AgentError::InvalidCborData)?;
605
606                self.verify(&certificate, effective_canister_id)?;
607                let status = lookup_request_status(&certificate, &request_id)?;
608
609                match status {
610                    RequestStatusResponse::Replied(reply) => {
611                        Ok(CallResponse::Response((reply.arg, certificate)))
612                    }
613                    RequestStatusResponse::Rejected(reject_response) => {
614                        Err(AgentError::CertifiedReject {
615                            reject: reject_response,
616                            operation,
617                        })?
618                    }
619                    _ => Ok(CallResponse::Poll(request_id)),
620                }
621            }
622            TransportCallResponse::Accepted => Ok(CallResponse::Poll(request_id)),
623            TransportCallResponse::NonReplicatedRejection(reject_response) => {
624                Err(AgentError::UncertifiedReject {
625                    reject: reject_response,
626                    operation,
627                })
628            }
629        }
630    }
631
632    /// Send the signed update to the network. Will return a [`CallResponse<Vec<u8>>`].
633    /// The bytes will be checked to verify that it is a valid update.
634    /// If you want to inspect the fields of the update, use [`signed_update_inspect`] before calling this method.
635    pub async fn update_signed(
636        &self,
637        effective_canister_id: Principal,
638        signed_update: Vec<u8>,
639    ) -> Result<CallResponse<Vec<u8>>, AgentError> {
640        let envelope: Envelope =
641            serde_cbor::from_slice(&signed_update).map_err(AgentError::InvalidCborData)?;
642        let EnvelopeContent::Call {
643            canister_id,
644            method_name,
645            ..
646        } = &*envelope.content
647        else {
648            return Err(AgentError::CallDataMismatch {
649                field: "request_type".to_string(),
650                value_arg: "update".to_string(),
651                value_cbor: if matches!(*envelope.content, EnvelopeContent::Query { .. }) {
652                    "query"
653                } else {
654                    "read_state"
655                }
656                .to_string(),
657            });
658        };
659        let operation = Some(Operation::Call {
660            canister: *canister_id,
661            method: method_name.clone(),
662        });
663        let request_id = to_request_id(&envelope.content)?;
664
665        let response_body = self
666            .call_endpoint(effective_canister_id, signed_update)
667            .await?;
668
669        match response_body {
670            TransportCallResponse::Replied { certificate } => {
671                let certificate =
672                    serde_cbor::from_slice(&certificate).map_err(AgentError::InvalidCborData)?;
673
674                self.verify(&certificate, effective_canister_id)?;
675                let status = lookup_request_status(&certificate, &request_id)?;
676
677                match status {
678                    RequestStatusResponse::Replied(reply) => Ok(CallResponse::Response(reply.arg)),
679                    RequestStatusResponse::Rejected(reject_response) => {
680                        Err(AgentError::CertifiedReject {
681                            reject: reject_response,
682                            operation,
683                        })?
684                    }
685                    _ => Ok(CallResponse::Poll(request_id)),
686                }
687            }
688            TransportCallResponse::Accepted => Ok(CallResponse::Poll(request_id)),
689            TransportCallResponse::NonReplicatedRejection(reject_response) => {
690                Err(AgentError::UncertifiedReject {
691                    reject: reject_response,
692                    operation,
693                })
694            }
695        }
696    }
697
698    fn update_content(
699        &self,
700        canister_id: Principal,
701        method_name: String,
702        arg: Vec<u8>,
703        ingress_expiry_datetime: Option<u64>,
704        nonce: Option<Vec<u8>>,
705    ) -> Result<EnvelopeContent, AgentError> {
706        Ok(EnvelopeContent::Call {
707            canister_id,
708            method_name,
709            arg,
710            nonce,
711            sender: self.identity.sender().map_err(AgentError::SigningError)?,
712            ingress_expiry: ingress_expiry_datetime.unwrap_or_else(|| self.get_expiry_date()),
713        })
714    }
715
716    fn get_retry_policy(&self) -> ExponentialBackoff<SystemClock> {
717        ExponentialBackoffBuilder::new()
718            .with_initial_interval(Duration::from_millis(500))
719            .with_max_interval(Duration::from_secs(1))
720            .with_multiplier(1.4)
721            .with_max_elapsed_time(Some(self.max_polling_time))
722            .build()
723    }
724
725    /// Wait for `request_status` to return a Replied response and return the arg.
726    pub async fn wait_signed(
727        &self,
728        request_id: &RequestId,
729        effective_canister_id: Principal,
730        signed_request_status: Vec<u8>,
731    ) -> Result<(Vec<u8>, Certificate), AgentError> {
732        let mut retry_policy = self.get_retry_policy();
733
734        let mut request_accepted = false;
735        let (resp, cert) = self
736            .request_status_signed(
737                request_id,
738                effective_canister_id,
739                signed_request_status.clone(),
740            )
741            .await?;
742        loop {
743            match resp {
744                RequestStatusResponse::Unknown => {}
745
746                RequestStatusResponse::Received | RequestStatusResponse::Processing => {
747                    if !request_accepted {
748                        retry_policy.reset();
749                        request_accepted = true;
750                    }
751                }
752
753                RequestStatusResponse::Replied(ReplyResponse { arg, .. }) => {
754                    return Ok((arg, cert))
755                }
756
757                RequestStatusResponse::Rejected(response) => {
758                    return Err(AgentError::CertifiedReject {
759                        reject: response,
760                        operation: None,
761                    })
762                }
763
764                RequestStatusResponse::Done => {
765                    return Err(AgentError::RequestStatusDoneNoReply(String::from(
766                        *request_id,
767                    )))
768                }
769            };
770
771            match retry_policy.next_backoff() {
772                Some(duration) => crate::util::sleep(duration).await,
773
774                None => return Err(AgentError::TimeoutWaitingForResponse()),
775            }
776        }
777    }
778
779    /// Call `request_status` on the `RequestId` in a loop and return the response as a byte vector.
780    pub async fn wait(
781        &self,
782        request_id: &RequestId,
783        effective_canister_id: Principal,
784    ) -> Result<(Vec<u8>, Certificate), AgentError> {
785        self.wait_inner(request_id, effective_canister_id, None)
786            .await
787    }
788
789    async fn wait_inner(
790        &self,
791        request_id: &RequestId,
792        effective_canister_id: Principal,
793        operation: Option<Operation>,
794    ) -> Result<(Vec<u8>, Certificate), AgentError> {
795        let mut retry_policy = self.get_retry_policy();
796
797        let mut request_accepted = false;
798        loop {
799            let (resp, cert) = self
800                .request_status_raw(request_id, effective_canister_id)
801                .await?;
802            match resp {
803                RequestStatusResponse::Unknown => {}
804
805                RequestStatusResponse::Received | RequestStatusResponse::Processing => {
806                    if !request_accepted {
807                        // The system will return RequestStatusResponse::Unknown
808                        // until the request is accepted
809                        // and we generally cannot know how long that will take.
810                        // State transitions between Received and Processing may be
811                        // instantaneous. Therefore, once we know the request is accepted,
812                        // we should restart the backoff so the request does not time out.
813
814                        retry_policy.reset();
815                        request_accepted = true;
816                    }
817                }
818
819                RequestStatusResponse::Replied(ReplyResponse { arg, .. }) => {
820                    return Ok((arg, cert))
821                }
822
823                RequestStatusResponse::Rejected(response) => {
824                    return Err(AgentError::CertifiedReject {
825                        reject: response,
826                        operation,
827                    })
828                }
829
830                RequestStatusResponse::Done => {
831                    return Err(AgentError::RequestStatusDoneNoReply(String::from(
832                        *request_id,
833                    )))
834                }
835            };
836
837            match retry_policy.next_backoff() {
838                Some(duration) => crate::util::sleep(duration).await,
839
840                None => return Err(AgentError::TimeoutWaitingForResponse()),
841            }
842        }
843    }
844
845    /// Request the raw state tree directly, under an effective canister ID.
846    /// See [the protocol docs](https://internetcomputer.org/docs/current/references/ic-interface-spec#http-read-state) for more information.
847    pub async fn read_state_raw(
848        &self,
849        paths: Vec<Vec<Label>>,
850        effective_canister_id: Principal,
851    ) -> Result<Certificate, AgentError> {
852        let content = self.read_state_content(paths)?;
853        let serialized_bytes = sign_envelope(&content, self.identity.clone())?;
854
855        let read_state_response: ReadStateResponse = self
856            .read_state_endpoint(effective_canister_id, serialized_bytes)
857            .await?;
858        let cert: Certificate = serde_cbor::from_slice(&read_state_response.certificate)
859            .map_err(AgentError::InvalidCborData)?;
860        self.verify(&cert, effective_canister_id)?;
861        Ok(cert)
862    }
863
864    /// Request the raw state tree directly, under a subnet ID.
865    /// See [the protocol docs](https://internetcomputer.org/docs/current/references/ic-interface-spec#http-read-state) for more information.
866    pub async fn read_subnet_state_raw(
867        &self,
868        paths: Vec<Vec<Label>>,
869        subnet_id: Principal,
870    ) -> Result<Certificate, AgentError> {
871        let content = self.read_state_content(paths)?;
872        let serialized_bytes = sign_envelope(&content, self.identity.clone())?;
873
874        let read_state_response: ReadStateResponse = self
875            .read_subnet_state_endpoint(subnet_id, serialized_bytes)
876            .await?;
877        let cert: Certificate = serde_cbor::from_slice(&read_state_response.certificate)
878            .map_err(AgentError::InvalidCborData)?;
879        self.verify_for_subnet(&cert, subnet_id)?;
880        Ok(cert)
881    }
882
883    fn read_state_content(&self, paths: Vec<Vec<Label>>) -> Result<EnvelopeContent, AgentError> {
884        Ok(EnvelopeContent::ReadState {
885            sender: self.identity.sender().map_err(AgentError::SigningError)?,
886            paths,
887            ingress_expiry: self.get_expiry_date(),
888        })
889    }
890
891    /// Verify a certificate, checking delegation if present.
892    /// Only passes if the certificate also has authority over the canister.
893    pub fn verify(
894        &self,
895        cert: &Certificate,
896        effective_canister_id: Principal,
897    ) -> Result<(), AgentError> {
898        self.verify_cert(cert, effective_canister_id)?;
899        self.verify_cert_timestamp(cert)?;
900        Ok(())
901    }
902
903    fn verify_cert(
904        &self,
905        cert: &Certificate,
906        effective_canister_id: Principal,
907    ) -> Result<(), AgentError> {
908        let sig = &cert.signature;
909
910        let root_hash = cert.tree.digest();
911        let mut msg = vec![];
912        msg.extend_from_slice(IC_STATE_ROOT_DOMAIN_SEPARATOR);
913        msg.extend_from_slice(&root_hash);
914
915        let der_key = self.check_delegation(&cert.delegation, effective_canister_id)?;
916        let key = extract_der(der_key)?;
917
918        ic_verify_bls_signature::verify_bls_signature(sig, &msg, &key)
919            .map_err(|_| AgentError::CertificateVerificationFailed())?;
920        Ok(())
921    }
922
923    /// Verify a certificate, checking delegation if present.
924    /// Only passes if the certificate is for the specified subnet.
925    pub fn verify_for_subnet(
926        &self,
927        cert: &Certificate,
928        subnet_id: Principal,
929    ) -> Result<(), AgentError> {
930        self.verify_cert_for_subnet(cert, subnet_id)?;
931        self.verify_cert_timestamp(cert)?;
932        Ok(())
933    }
934
935    fn verify_cert_for_subnet(
936        &self,
937        cert: &Certificate,
938        subnet_id: Principal,
939    ) -> Result<(), AgentError> {
940        let sig = &cert.signature;
941
942        let root_hash = cert.tree.digest();
943        let mut msg = vec![];
944        msg.extend_from_slice(IC_STATE_ROOT_DOMAIN_SEPARATOR);
945        msg.extend_from_slice(&root_hash);
946
947        let der_key = self.check_delegation_for_subnet(&cert.delegation, subnet_id)?;
948        let key = extract_der(der_key)?;
949
950        ic_verify_bls_signature::verify_bls_signature(sig, &msg, &key)
951            .map_err(|_| AgentError::CertificateVerificationFailed())?;
952        Ok(())
953    }
954
955    fn verify_cert_timestamp(&self, cert: &Certificate) -> Result<(), AgentError> {
956        let time = lookup_time(cert)?;
957        if (OffsetDateTime::now_utc()
958            - OffsetDateTime::from_unix_timestamp_nanos(time.into()).unwrap())
959        .abs()
960            > self.ingress_expiry
961        {
962            Err(AgentError::CertificateOutdated(self.ingress_expiry))
963        } else {
964            Ok(())
965        }
966    }
967
968    fn check_delegation(
969        &self,
970        delegation: &Option<Delegation>,
971        effective_canister_id: Principal,
972    ) -> Result<Vec<u8>, AgentError> {
973        match delegation {
974            None => Ok(self.read_root_key()),
975            Some(delegation) => {
976                let cert: Certificate = serde_cbor::from_slice(&delegation.certificate)
977                    .map_err(AgentError::InvalidCborData)?;
978                if cert.delegation.is_some() {
979                    return Err(AgentError::CertificateHasTooManyDelegations);
980                }
981                self.verify_cert(&cert, effective_canister_id)?;
982                let canister_range_lookup = [
983                    "subnet".as_bytes(),
984                    delegation.subnet_id.as_ref(),
985                    "canister_ranges".as_bytes(),
986                ];
987                let canister_range = lookup_value(&cert.tree, canister_range_lookup)?;
988                let ranges: Vec<(Principal, Principal)> =
989                    serde_cbor::from_slice(canister_range).map_err(AgentError::InvalidCborData)?;
990                if !principal_is_within_ranges(&effective_canister_id, &ranges[..]) {
991                    // the certificate is not authorized to answer calls for this canister
992                    return Err(AgentError::CertificateNotAuthorized());
993                }
994
995                let public_key_path = [
996                    "subnet".as_bytes(),
997                    delegation.subnet_id.as_ref(),
998                    "public_key".as_bytes(),
999                ];
1000                lookup_value(&cert.tree, public_key_path).map(<[u8]>::to_vec)
1001            }
1002        }
1003    }
1004
1005    fn check_delegation_for_subnet(
1006        &self,
1007        delegation: &Option<Delegation>,
1008        subnet_id: Principal,
1009    ) -> Result<Vec<u8>, AgentError> {
1010        match delegation {
1011            None => Ok(self.read_root_key()),
1012            Some(delegation) => {
1013                let cert: Certificate = serde_cbor::from_slice(&delegation.certificate)
1014                    .map_err(AgentError::InvalidCborData)?;
1015                if cert.delegation.is_some() {
1016                    return Err(AgentError::CertificateHasTooManyDelegations);
1017                }
1018                self.verify_cert_for_subnet(&cert, subnet_id)?;
1019                let public_key_path = [
1020                    "subnet".as_bytes(),
1021                    delegation.subnet_id.as_ref(),
1022                    "public_key".as_bytes(),
1023                ];
1024                let pk = lookup_value(&cert.tree, public_key_path)
1025                    .map_err(|_| AgentError::CertificateNotAuthorized())?
1026                    .to_vec();
1027                Ok(pk)
1028            }
1029        }
1030    }
1031
1032    /// Request information about a particular canister for a single state subkey.
1033    /// See [the protocol docs](https://internetcomputer.org/docs/current/references/ic-interface-spec#state-tree-canister-information) for more information.
1034    pub async fn read_state_canister_info(
1035        &self,
1036        canister_id: Principal,
1037        path: &str,
1038    ) -> Result<Vec<u8>, AgentError> {
1039        let paths: Vec<Vec<Label>> = vec![vec![
1040            "canister".into(),
1041            Label::from_bytes(canister_id.as_slice()),
1042            path.into(),
1043        ]];
1044
1045        let cert = self.read_state_raw(paths, canister_id).await?;
1046
1047        lookup_canister_info(cert, canister_id, path)
1048    }
1049
1050    /// Request the controller list of a given canister.
1051    pub async fn read_state_canister_controllers(
1052        &self,
1053        canister_id: Principal,
1054    ) -> Result<Vec<Principal>, AgentError> {
1055        let blob = self
1056            .read_state_canister_info(canister_id, "controllers")
1057            .await?;
1058        let controllers: Vec<Principal> =
1059            serde_cbor::from_slice(&blob).map_err(AgentError::InvalidCborData)?;
1060        Ok(controllers)
1061    }
1062
1063    /// Request the module hash of a given canister.
1064    pub async fn read_state_canister_module_hash(
1065        &self,
1066        canister_id: Principal,
1067    ) -> Result<Vec<u8>, AgentError> {
1068        self.read_state_canister_info(canister_id, "module_hash")
1069            .await
1070    }
1071
1072    /// Request the bytes of the canister's custom section `icp:public <path>` or `icp:private <path>`.
1073    pub async fn read_state_canister_metadata(
1074        &self,
1075        canister_id: Principal,
1076        path: &str,
1077    ) -> Result<Vec<u8>, AgentError> {
1078        let paths: Vec<Vec<Label>> = vec![vec![
1079            "canister".into(),
1080            Label::from_bytes(canister_id.as_slice()),
1081            "metadata".into(),
1082            path.into(),
1083        ]];
1084
1085        let cert = self.read_state_raw(paths, canister_id).await?;
1086
1087        lookup_canister_metadata(cert, canister_id, path)
1088    }
1089
1090    /// Request a list of metrics about the subnet.
1091    pub async fn read_state_subnet_metrics(
1092        &self,
1093        subnet_id: Principal,
1094    ) -> Result<SubnetMetrics, AgentError> {
1095        let paths = vec![vec![
1096            "subnet".into(),
1097            Label::from_bytes(subnet_id.as_slice()),
1098            "metrics".into(),
1099        ]];
1100        let cert = self.read_subnet_state_raw(paths, subnet_id).await?;
1101        lookup_subnet_metrics(cert, subnet_id)
1102    }
1103
1104    /// Fetches the status of a particular request by its ID.
1105    pub async fn request_status_raw(
1106        &self,
1107        request_id: &RequestId,
1108        effective_canister_id: Principal,
1109    ) -> Result<(RequestStatusResponse, Certificate), AgentError> {
1110        let paths: Vec<Vec<Label>> =
1111            vec![vec!["request_status".into(), request_id.to_vec().into()]];
1112
1113        let cert = self.read_state_raw(paths, effective_canister_id).await?;
1114
1115        Ok((lookup_request_status(&cert, request_id)?, cert))
1116    }
1117
1118    /// Send the signed `request_status` to the network. Will return [`RequestStatusResponse`].
1119    /// The bytes will be checked to verify that it is a valid `request_status`.
1120    /// If you want to inspect the fields of the `request_status`, use [`signed_request_status_inspect`] before calling this method.
1121    pub async fn request_status_signed(
1122        &self,
1123        request_id: &RequestId,
1124        effective_canister_id: Principal,
1125        signed_request_status: Vec<u8>,
1126    ) -> Result<(RequestStatusResponse, Certificate), AgentError> {
1127        let _envelope: Envelope =
1128            serde_cbor::from_slice(&signed_request_status).map_err(AgentError::InvalidCborData)?;
1129        let read_state_response: ReadStateResponse = self
1130            .read_state_endpoint(effective_canister_id, signed_request_status)
1131            .await?;
1132
1133        let cert: Certificate = serde_cbor::from_slice(&read_state_response.certificate)
1134            .map_err(AgentError::InvalidCborData)?;
1135        self.verify(&cert, effective_canister_id)?;
1136        Ok((lookup_request_status(&cert, request_id)?, cert))
1137    }
1138
1139    /// Returns an `UpdateBuilder` enabling the construction of an update call without
1140    /// passing all arguments.
1141    pub fn update<S: Into<String>>(
1142        &self,
1143        canister_id: &Principal,
1144        method_name: S,
1145    ) -> UpdateBuilder {
1146        UpdateBuilder::new(self, *canister_id, method_name.into())
1147    }
1148
1149    /// Calls and returns the information returned by the status endpoint of a replica.
1150    pub async fn status(&self) -> Result<Status, AgentError> {
1151        let endpoint = "api/v2/status";
1152        let bytes = self.execute(Method::GET, endpoint, None).await?.1;
1153
1154        let cbor: serde_cbor::Value =
1155            serde_cbor::from_slice(&bytes).map_err(AgentError::InvalidCborData)?;
1156
1157        Status::try_from(&cbor).map_err(|_| AgentError::InvalidReplicaStatus)
1158    }
1159
1160    /// Returns a `QueryBuilder` enabling the construction of a query call without
1161    /// passing all arguments.
1162    pub fn query<S: Into<String>>(&self, canister_id: &Principal, method_name: S) -> QueryBuilder {
1163        QueryBuilder::new(self, *canister_id, method_name.into())
1164    }
1165
1166    /// Sign a `request_status` call. This will return a [`signed::SignedRequestStatus`]
1167    /// which contains all fields of the `request_status` and the signed `request_status` in CBOR encoding
1168    pub fn sign_request_status(
1169        &self,
1170        effective_canister_id: Principal,
1171        request_id: RequestId,
1172    ) -> Result<SignedRequestStatus, AgentError> {
1173        let paths: Vec<Vec<Label>> =
1174            vec![vec!["request_status".into(), request_id.to_vec().into()]];
1175        let read_state_content = self.read_state_content(paths)?;
1176        let signed_request_status = sign_envelope(&read_state_content, self.identity.clone())?;
1177        let ingress_expiry = read_state_content.ingress_expiry();
1178        let sender = *read_state_content.sender();
1179        Ok(SignedRequestStatus {
1180            ingress_expiry,
1181            sender,
1182            effective_canister_id,
1183            request_id,
1184            signed_request_status,
1185        })
1186    }
1187
1188    async fn get_subnet_by_canister(
1189        &self,
1190        canister: &Principal,
1191    ) -> Result<Arc<Subnet>, AgentError> {
1192        let subnet = self
1193            .subnet_key_cache
1194            .lock()
1195            .unwrap()
1196            .get_subnet_by_canister(canister);
1197        if let Some(subnet) = subnet {
1198            Ok(subnet)
1199        } else {
1200            self.fetch_subnet_by_canister(canister).await
1201        }
1202    }
1203
1204    /// Retrieve all existing API boundary nodes from the state tree via endpoint `/api/v2/canister/<effective_canister_id>/read_state`
1205    pub async fn fetch_api_boundary_nodes_by_canister_id(
1206        &self,
1207        canister_id: Principal,
1208    ) -> Result<Vec<ApiBoundaryNode>, AgentError> {
1209        let paths = vec![vec!["api_boundary_nodes".into()]];
1210        let certificate = self.read_state_raw(paths, canister_id).await?;
1211        let api_boundary_nodes = lookup_api_boundary_nodes(certificate)?;
1212        Ok(api_boundary_nodes)
1213    }
1214
1215    /// Retrieve all existing API boundary nodes from the state tree via endpoint `/api/v2/subnet/<subnet_id>/read_state`
1216    pub async fn fetch_api_boundary_nodes_by_subnet_id(
1217        &self,
1218        subnet_id: Principal,
1219    ) -> Result<Vec<ApiBoundaryNode>, AgentError> {
1220        let paths = vec![vec!["api_boundary_nodes".into()]];
1221        let certificate = self.read_subnet_state_raw(paths, subnet_id).await?;
1222        let api_boundary_nodes = lookup_api_boundary_nodes(certificate)?;
1223        Ok(api_boundary_nodes)
1224    }
1225
1226    async fn fetch_subnet_by_canister(
1227        &self,
1228        canister: &Principal,
1229    ) -> Result<Arc<Subnet>, AgentError> {
1230        let cert = self
1231            .read_state_raw(vec![vec!["subnet".into()]], *canister)
1232            .await?;
1233
1234        let (subnet_id, subnet) = lookup_subnet(&cert, &self.root_key.read().unwrap())?;
1235        let subnet = Arc::new(subnet);
1236        self.subnet_key_cache
1237            .lock()
1238            .unwrap()
1239            .insert_subnet(subnet_id, subnet.clone());
1240        Ok(subnet)
1241    }
1242
1243    async fn request(
1244        &self,
1245        method: Method,
1246        endpoint: &str,
1247        body: Option<Vec<u8>>,
1248    ) -> Result<(StatusCode, HeaderMap, Vec<u8>), AgentError> {
1249        let create_request_with_generated_url = || -> Result<Request, AgentError> {
1250            let url = self.route_provider.route()?.join(endpoint)?;
1251            let mut http_request = Request::new(method.clone(), url);
1252            http_request
1253                .headers_mut()
1254                .insert(CONTENT_TYPE, "application/cbor".parse().unwrap());
1255            *http_request.body_mut() = body.clone().map(Body::from);
1256            Ok(http_request)
1257        };
1258
1259        let response = self
1260            .client
1261            .call(
1262                &create_request_with_generated_url,
1263                self.max_tcp_error_retries,
1264            )
1265            .await?;
1266
1267        let http_status = response.status();
1268        let response_headers = response.headers().clone();
1269
1270        // Size Check (Content-Length)
1271        if matches!(self
1272            .max_response_body_size
1273            .zip(response.content_length()), Some((size_limit, content_length)) if content_length > size_limit as u64)
1274        {
1275            return Err(AgentError::ResponseSizeExceededLimit());
1276        }
1277
1278        let mut body: Vec<u8> = response
1279            .content_length()
1280            .map_or_else(Vec::new, |n| Vec::with_capacity(n as usize));
1281
1282        let mut stream = response.bytes_stream();
1283
1284        while let Some(chunk) = stream.next().await {
1285            let chunk = chunk?;
1286
1287            // Size Check (Body Size)
1288            if matches!(self
1289                .max_response_body_size, Some(size_limit) if body.len() + chunk.len() > size_limit)
1290            {
1291                return Err(AgentError::ResponseSizeExceededLimit());
1292            }
1293
1294            body.extend_from_slice(chunk.as_ref());
1295        }
1296
1297        Ok((http_status, response_headers, body))
1298    }
1299
1300    async fn execute(
1301        &self,
1302        method: Method,
1303        endpoint: &str,
1304        body: Option<Vec<u8>>,
1305    ) -> Result<(StatusCode, Vec<u8>), AgentError> {
1306        let request_result = self.request(method.clone(), endpoint, body.clone()).await?;
1307
1308        let status = request_result.0;
1309        let headers = request_result.1;
1310        let body = request_result.2;
1311
1312        if status.is_client_error() || status.is_server_error() {
1313            Err(AgentError::HttpError(HttpErrorPayload {
1314                status: status.into(),
1315                content_type: headers
1316                    .get(CONTENT_TYPE)
1317                    .and_then(|value| value.to_str().ok())
1318                    .map(str::to_string),
1319                content: body,
1320            }))
1321        } else if !(status == StatusCode::OK || status == StatusCode::ACCEPTED) {
1322            Err(AgentError::InvalidHttpResponse(format!(
1323                "Expected `200`, `202`, 4xx`, or `5xx` HTTP status code. Got: {status}",
1324            )))
1325        } else {
1326            Ok((status, body))
1327        }
1328    }
1329}
1330
1331// Checks if a principal is contained within a list of principal ranges
1332// A range is a tuple: (low: Principal, high: Principal), as described here: https://internetcomputer.org/docs/current/references/ic-interface-spec#state-tree-subnet
1333fn principal_is_within_ranges(principal: &Principal, ranges: &[(Principal, Principal)]) -> bool {
1334    ranges
1335        .iter()
1336        .any(|r| principal >= &r.0 && principal <= &r.1)
1337}
1338
1339fn sign_envelope(
1340    content: &EnvelopeContent,
1341    identity: Arc<dyn Identity>,
1342) -> Result<Vec<u8>, AgentError> {
1343    let signature = identity.sign(content).map_err(AgentError::SigningError)?;
1344
1345    let envelope = Envelope {
1346        content: Cow::Borrowed(content),
1347        sender_pubkey: signature.public_key,
1348        sender_sig: signature.signature,
1349        sender_delegation: signature.delegations,
1350    };
1351
1352    let mut serialized_bytes = Vec::new();
1353    let mut serializer = serde_cbor::Serializer::new(&mut serialized_bytes);
1354    serializer.self_describe()?;
1355    envelope.serialize(&mut serializer)?;
1356
1357    Ok(serialized_bytes)
1358}
1359
1360/// Inspect the bytes to be sent as a query
1361/// Return Ok only when the bytes can be deserialized as a query and all fields match with the arguments
1362pub fn signed_query_inspect(
1363    sender: Principal,
1364    canister_id: Principal,
1365    method_name: &str,
1366    arg: &[u8],
1367    ingress_expiry: u64,
1368    signed_query: Vec<u8>,
1369) -> Result<(), AgentError> {
1370    let envelope: Envelope =
1371        serde_cbor::from_slice(&signed_query).map_err(AgentError::InvalidCborData)?;
1372    match envelope.content.as_ref() {
1373        EnvelopeContent::Query {
1374            ingress_expiry: ingress_expiry_cbor,
1375            sender: sender_cbor,
1376            canister_id: canister_id_cbor,
1377            method_name: method_name_cbor,
1378            arg: arg_cbor,
1379            nonce: _nonce,
1380        } => {
1381            if ingress_expiry != *ingress_expiry_cbor {
1382                return Err(AgentError::CallDataMismatch {
1383                    field: "ingress_expiry".to_string(),
1384                    value_arg: ingress_expiry.to_string(),
1385                    value_cbor: ingress_expiry_cbor.to_string(),
1386                });
1387            }
1388            if sender != *sender_cbor {
1389                return Err(AgentError::CallDataMismatch {
1390                    field: "sender".to_string(),
1391                    value_arg: sender.to_string(),
1392                    value_cbor: sender_cbor.to_string(),
1393                });
1394            }
1395            if canister_id != *canister_id_cbor {
1396                return Err(AgentError::CallDataMismatch {
1397                    field: "canister_id".to_string(),
1398                    value_arg: canister_id.to_string(),
1399                    value_cbor: canister_id_cbor.to_string(),
1400                });
1401            }
1402            if method_name != *method_name_cbor {
1403                return Err(AgentError::CallDataMismatch {
1404                    field: "method_name".to_string(),
1405                    value_arg: method_name.to_string(),
1406                    value_cbor: method_name_cbor.clone(),
1407                });
1408            }
1409            if arg != *arg_cbor {
1410                return Err(AgentError::CallDataMismatch {
1411                    field: "arg".to_string(),
1412                    value_arg: format!("{arg:?}"),
1413                    value_cbor: format!("{arg_cbor:?}"),
1414                });
1415            }
1416        }
1417        EnvelopeContent::Call { .. } => {
1418            return Err(AgentError::CallDataMismatch {
1419                field: "request_type".to_string(),
1420                value_arg: "query".to_string(),
1421                value_cbor: "call".to_string(),
1422            })
1423        }
1424        EnvelopeContent::ReadState { .. } => {
1425            return Err(AgentError::CallDataMismatch {
1426                field: "request_type".to_string(),
1427                value_arg: "query".to_string(),
1428                value_cbor: "read_state".to_string(),
1429            })
1430        }
1431    }
1432    Ok(())
1433}
1434
1435/// Inspect the bytes to be sent as an update
1436/// Return Ok only when the bytes can be deserialized as an update and all fields match with the arguments
1437pub fn signed_update_inspect(
1438    sender: Principal,
1439    canister_id: Principal,
1440    method_name: &str,
1441    arg: &[u8],
1442    ingress_expiry: u64,
1443    signed_update: Vec<u8>,
1444) -> Result<(), AgentError> {
1445    let envelope: Envelope =
1446        serde_cbor::from_slice(&signed_update).map_err(AgentError::InvalidCborData)?;
1447    match envelope.content.as_ref() {
1448        EnvelopeContent::Call {
1449            nonce: _nonce,
1450            ingress_expiry: ingress_expiry_cbor,
1451            sender: sender_cbor,
1452            canister_id: canister_id_cbor,
1453            method_name: method_name_cbor,
1454            arg: arg_cbor,
1455        } => {
1456            if ingress_expiry != *ingress_expiry_cbor {
1457                return Err(AgentError::CallDataMismatch {
1458                    field: "ingress_expiry".to_string(),
1459                    value_arg: ingress_expiry.to_string(),
1460                    value_cbor: ingress_expiry_cbor.to_string(),
1461                });
1462            }
1463            if sender != *sender_cbor {
1464                return Err(AgentError::CallDataMismatch {
1465                    field: "sender".to_string(),
1466                    value_arg: sender.to_string(),
1467                    value_cbor: sender_cbor.to_string(),
1468                });
1469            }
1470            if canister_id != *canister_id_cbor {
1471                return Err(AgentError::CallDataMismatch {
1472                    field: "canister_id".to_string(),
1473                    value_arg: canister_id.to_string(),
1474                    value_cbor: canister_id_cbor.to_string(),
1475                });
1476            }
1477            if method_name != *method_name_cbor {
1478                return Err(AgentError::CallDataMismatch {
1479                    field: "method_name".to_string(),
1480                    value_arg: method_name.to_string(),
1481                    value_cbor: method_name_cbor.clone(),
1482                });
1483            }
1484            if arg != *arg_cbor {
1485                return Err(AgentError::CallDataMismatch {
1486                    field: "arg".to_string(),
1487                    value_arg: format!("{arg:?}"),
1488                    value_cbor: format!("{arg_cbor:?}"),
1489                });
1490            }
1491        }
1492        EnvelopeContent::ReadState { .. } => {
1493            return Err(AgentError::CallDataMismatch {
1494                field: "request_type".to_string(),
1495                value_arg: "call".to_string(),
1496                value_cbor: "read_state".to_string(),
1497            })
1498        }
1499        EnvelopeContent::Query { .. } => {
1500            return Err(AgentError::CallDataMismatch {
1501                field: "request_type".to_string(),
1502                value_arg: "call".to_string(),
1503                value_cbor: "query".to_string(),
1504            })
1505        }
1506    }
1507    Ok(())
1508}
1509
1510/// Inspect the bytes to be sent as a `request_status`
1511/// Return Ok only when the bytes can be deserialized as a `request_status` and all fields match with the arguments
1512pub fn signed_request_status_inspect(
1513    sender: Principal,
1514    request_id: &RequestId,
1515    ingress_expiry: u64,
1516    signed_request_status: Vec<u8>,
1517) -> Result<(), AgentError> {
1518    let paths: Vec<Vec<Label>> = vec![vec!["request_status".into(), request_id.to_vec().into()]];
1519    let envelope: Envelope =
1520        serde_cbor::from_slice(&signed_request_status).map_err(AgentError::InvalidCborData)?;
1521    match envelope.content.as_ref() {
1522        EnvelopeContent::ReadState {
1523            ingress_expiry: ingress_expiry_cbor,
1524            sender: sender_cbor,
1525            paths: paths_cbor,
1526        } => {
1527            if ingress_expiry != *ingress_expiry_cbor {
1528                return Err(AgentError::CallDataMismatch {
1529                    field: "ingress_expiry".to_string(),
1530                    value_arg: ingress_expiry.to_string(),
1531                    value_cbor: ingress_expiry_cbor.to_string(),
1532                });
1533            }
1534            if sender != *sender_cbor {
1535                return Err(AgentError::CallDataMismatch {
1536                    field: "sender".to_string(),
1537                    value_arg: sender.to_string(),
1538                    value_cbor: sender_cbor.to_string(),
1539                });
1540            }
1541
1542            if paths != *paths_cbor {
1543                return Err(AgentError::CallDataMismatch {
1544                    field: "paths".to_string(),
1545                    value_arg: format!("{paths:?}"),
1546                    value_cbor: format!("{paths_cbor:?}"),
1547                });
1548            }
1549        }
1550        EnvelopeContent::Query { .. } => {
1551            return Err(AgentError::CallDataMismatch {
1552                field: "request_type".to_string(),
1553                value_arg: "read_state".to_string(),
1554                value_cbor: "query".to_string(),
1555            })
1556        }
1557        EnvelopeContent::Call { .. } => {
1558            return Err(AgentError::CallDataMismatch {
1559                field: "request_type".to_string(),
1560                value_arg: "read_state".to_string(),
1561                value_cbor: "call".to_string(),
1562            })
1563        }
1564    }
1565    Ok(())
1566}
1567
1568#[derive(Clone)]
1569struct SubnetCache {
1570    subnets: TimedCache<Principal, Arc<Subnet>>,
1571    canister_index: RangeInclusiveMap<Principal, Principal, PrincipalStep>,
1572}
1573
1574impl SubnetCache {
1575    fn new() -> Self {
1576        Self {
1577            subnets: TimedCache::with_lifespan(300),
1578            canister_index: RangeInclusiveMap::new_with_step_fns(),
1579        }
1580    }
1581
1582    fn get_subnet_by_canister(&mut self, canister: &Principal) -> Option<Arc<Subnet>> {
1583        self.canister_index
1584            .get(canister)
1585            .and_then(|subnet_id| self.subnets.cache_get(subnet_id).cloned())
1586            .filter(|subnet| subnet.canister_ranges.contains(canister))
1587    }
1588
1589    fn insert_subnet(&mut self, subnet_id: Principal, subnet: Arc<Subnet>) {
1590        self.subnets.cache_set(subnet_id, subnet.clone());
1591        for range in subnet.canister_ranges.iter() {
1592            self.canister_index.insert(range.clone(), subnet_id);
1593        }
1594    }
1595}
1596
1597#[derive(Clone, Copy)]
1598struct PrincipalStep;
1599
1600impl StepFns<Principal> for PrincipalStep {
1601    fn add_one(start: &Principal) -> Principal {
1602        let bytes = start.as_slice();
1603        let mut arr = [0; 29];
1604        arr[..bytes.len()].copy_from_slice(bytes);
1605        for byte in arr[..bytes.len() - 1].iter_mut().rev() {
1606            *byte = byte.wrapping_add(1);
1607            if *byte != 0 {
1608                break;
1609            }
1610        }
1611        Principal::from_slice(&arr[..bytes.len()])
1612    }
1613    fn sub_one(start: &Principal) -> Principal {
1614        let bytes = start.as_slice();
1615        let mut arr = [0; 29];
1616        arr[..bytes.len()].copy_from_slice(bytes);
1617        for byte in arr[..bytes.len() - 1].iter_mut().rev() {
1618            *byte = byte.wrapping_sub(1);
1619            if *byte != 255 {
1620                break;
1621            }
1622        }
1623        Principal::from_slice(&arr[..bytes.len()])
1624    }
1625}
1626
1627#[derive(Clone)]
1628pub(crate) struct Subnet {
1629    // This key is just fetched for completeness. Do not actually use this value as it is not authoritative in case of a rogue subnet.
1630    // If a future agent needs to know the subnet key then it should fetch /subnet from the *root* subnet.
1631    _key: Vec<u8>,
1632    node_keys: HashMap<Principal, Vec<u8>>,
1633    canister_ranges: RangeInclusiveSet<Principal, PrincipalStep>,
1634}
1635
1636/// API boundary node, which routes /api calls to IC replica nodes.
1637#[derive(Debug, Clone)]
1638pub struct ApiBoundaryNode {
1639    /// Domain name
1640    pub domain: String,
1641    /// IPv6 address in the hexadecimal notation with colons.
1642    pub ipv6_address: String,
1643    /// IPv4 address in the dotted-decimal notation.
1644    pub ipv4_address: Option<String>,
1645}
1646
1647/// A query request builder.
1648///
1649/// This makes it easier to do query calls without actually passing all arguments.
1650#[derive(Debug, Clone)]
1651#[non_exhaustive]
1652pub struct QueryBuilder<'agent> {
1653    agent: &'agent Agent,
1654    /// The [effective canister ID](https://internetcomputer.org/docs/current/references/ic-interface-spec#http-effective-canister-id) of the destination.
1655    pub effective_canister_id: Principal,
1656    /// The principal ID of the canister being called.
1657    pub canister_id: Principal,
1658    /// The name of the canister method being called.
1659    pub method_name: String,
1660    /// The argument blob to be passed to the method.
1661    pub arg: Vec<u8>,
1662    /// The Unix timestamp that the request will expire at.
1663    pub ingress_expiry_datetime: Option<u64>,
1664    /// Whether to include a nonce with the message.
1665    pub use_nonce: bool,
1666}
1667
1668impl<'agent> QueryBuilder<'agent> {
1669    /// Creates a new query builder with an agent for a particular canister method.
1670    pub fn new(agent: &'agent Agent, canister_id: Principal, method_name: String) -> Self {
1671        Self {
1672            agent,
1673            effective_canister_id: canister_id,
1674            canister_id,
1675            method_name,
1676            arg: vec![],
1677            ingress_expiry_datetime: None,
1678            use_nonce: false,
1679        }
1680    }
1681
1682    /// Sets the [effective canister ID](https://internetcomputer.org/docs/current/references/ic-interface-spec#http-effective-canister-id) of the destination.
1683    pub fn with_effective_canister_id(mut self, canister_id: Principal) -> Self {
1684        self.effective_canister_id = canister_id;
1685        self
1686    }
1687
1688    /// Sets the argument blob to pass to the canister. For most canisters this should be a Candid-serialized tuple.
1689    pub fn with_arg<A: Into<Vec<u8>>>(mut self, arg: A) -> Self {
1690        self.arg = arg.into();
1691        self
1692    }
1693
1694    /// Sets `ingress_expiry_datetime` to the provided timestamp, at nanosecond precision.
1695    pub fn expire_at(mut self, time: impl Into<OffsetDateTime>) -> Self {
1696        self.ingress_expiry_datetime = Some(time.into().unix_timestamp_nanos() as u64);
1697        self
1698    }
1699
1700    /// Sets `ingress_expiry_datetime` to `max(now, 4min)`.
1701    pub fn expire_after(mut self, duration: Duration) -> Self {
1702        self.ingress_expiry_datetime = Some(
1703            OffsetDateTime::now_utc()
1704                .saturating_add(duration.try_into().expect("negative duration"))
1705                .unix_timestamp_nanos() as u64,
1706        );
1707        self
1708    }
1709
1710    /// Uses a nonce generated with the agent's configured nonce factory. By default queries do not use nonces,
1711    /// and thus may get a (briefly) cached response.
1712    pub fn with_nonce_generation(mut self) -> Self {
1713        self.use_nonce = true;
1714        self
1715    }
1716
1717    /// Make a query call. This will return a byte vector.
1718    pub async fn call(self) -> Result<Vec<u8>, AgentError> {
1719        self.agent
1720            .query_raw(
1721                self.canister_id,
1722                self.effective_canister_id,
1723                self.method_name,
1724                self.arg,
1725                self.ingress_expiry_datetime,
1726                self.use_nonce,
1727                None,
1728            )
1729            .await
1730    }
1731
1732    /// Make a query call with signature verification. This will return a byte vector.
1733    ///
1734    /// Compared with [call][Self::call], this method will **always** verify the signature of the query response
1735    /// regardless the Agent level configuration from [`AgentBuilder::with_verify_query_signatures`].
1736    pub async fn call_with_verification(self) -> Result<Vec<u8>, AgentError> {
1737        self.agent
1738            .query_raw(
1739                self.canister_id,
1740                self.effective_canister_id,
1741                self.method_name,
1742                self.arg,
1743                self.ingress_expiry_datetime,
1744                self.use_nonce,
1745                Some(true),
1746            )
1747            .await
1748    }
1749
1750    /// Make a query call without signature verification. This will return a byte vector.
1751    ///
1752    /// Compared with [call][Self::call], this method will **never** verify the signature of the query response
1753    /// regardless the Agent level configuration from [`AgentBuilder::with_verify_query_signatures`].
1754    pub async fn call_without_verification(self) -> Result<Vec<u8>, AgentError> {
1755        self.agent
1756            .query_raw(
1757                self.canister_id,
1758                self.effective_canister_id,
1759                self.method_name,
1760                self.arg,
1761                self.ingress_expiry_datetime,
1762                self.use_nonce,
1763                Some(false),
1764            )
1765            .await
1766    }
1767
1768    /// Sign a query call. This will return a [`signed::SignedQuery`]
1769    /// which contains all fields of the query and the signed query in CBOR encoding
1770    pub fn sign(self) -> Result<SignedQuery, AgentError> {
1771        let effective_canister_id = self.effective_canister_id;
1772        let identity = self.agent.identity.clone();
1773        let content = self.into_envelope()?;
1774        let signed_query = sign_envelope(&content, identity)?;
1775        let EnvelopeContent::Query {
1776            ingress_expiry,
1777            sender,
1778            canister_id,
1779            method_name,
1780            arg,
1781            nonce,
1782        } = content
1783        else {
1784            unreachable!()
1785        };
1786        Ok(SignedQuery {
1787            ingress_expiry,
1788            sender,
1789            canister_id,
1790            method_name,
1791            arg,
1792            effective_canister_id,
1793            signed_query,
1794            nonce,
1795        })
1796    }
1797
1798    /// Converts the query builder into [`EnvelopeContent`] for external signing or storage.
1799    pub fn into_envelope(self) -> Result<EnvelopeContent, AgentError> {
1800        self.agent.query_content(
1801            self.canister_id,
1802            self.method_name,
1803            self.arg,
1804            self.ingress_expiry_datetime,
1805            self.use_nonce,
1806        )
1807    }
1808}
1809
1810impl<'agent> IntoFuture for QueryBuilder<'agent> {
1811    type IntoFuture = AgentFuture<'agent, Vec<u8>>;
1812    type Output = Result<Vec<u8>, AgentError>;
1813    fn into_future(self) -> Self::IntoFuture {
1814        Box::pin(self.call())
1815    }
1816}
1817
1818/// An in-flight canister update call. Useful primarily as a `Future`.
1819pub struct UpdateCall<'agent> {
1820    agent: &'agent Agent,
1821    response_future: AgentFuture<'agent, CallResponse<(Vec<u8>, Certificate)>>,
1822    effective_canister_id: Principal,
1823    canister_id: Principal,
1824    method_name: String,
1825}
1826
1827impl fmt::Debug for UpdateCall<'_> {
1828    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1829        f.debug_struct("UpdateCall")
1830            .field("agent", &self.agent)
1831            .field("effective_canister_id", &self.effective_canister_id)
1832            .finish_non_exhaustive()
1833    }
1834}
1835
1836impl Future for UpdateCall<'_> {
1837    type Output = Result<CallResponse<(Vec<u8>, Certificate)>, AgentError>;
1838    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1839        self.response_future.as_mut().poll(cx)
1840    }
1841}
1842
1843impl<'a> UpdateCall<'a> {
1844    /// Waits for the update call to be completed, polling if necessary.
1845    pub async fn and_wait(self) -> Result<(Vec<u8>, Certificate), AgentError> {
1846        let response = self.response_future.await?;
1847
1848        match response {
1849            CallResponse::Response(response) => Ok(response),
1850            CallResponse::Poll(request_id) => {
1851                self.agent
1852                    .wait_inner(
1853                        &request_id,
1854                        self.effective_canister_id,
1855                        Some(Operation::Call {
1856                            canister: self.canister_id,
1857                            method: self.method_name,
1858                        }),
1859                    )
1860                    .await
1861            }
1862        }
1863    }
1864}
1865/// An update request Builder.
1866///
1867/// This makes it easier to do update calls without actually passing all arguments or specifying
1868/// if you want to wait or not.
1869#[derive(Debug)]
1870pub struct UpdateBuilder<'agent> {
1871    agent: &'agent Agent,
1872    /// The [effective canister ID](https://internetcomputer.org/docs/current/references/ic-interface-spec#http-effective-canister-id) of the destination.
1873    pub effective_canister_id: Principal,
1874    /// The principal ID of the canister being called.
1875    pub canister_id: Principal,
1876    /// The name of the canister method being called.
1877    pub method_name: String,
1878    /// The argument blob to be passed to the method.
1879    pub arg: Vec<u8>,
1880    /// The Unix timestamp that the request will expire at.
1881    pub ingress_expiry_datetime: Option<u64>,
1882}
1883
1884impl<'agent> UpdateBuilder<'agent> {
1885    /// Creates a new update builder with an agent for a particular canister method.
1886    pub fn new(agent: &'agent Agent, canister_id: Principal, method_name: String) -> Self {
1887        Self {
1888            agent,
1889            effective_canister_id: canister_id,
1890            canister_id,
1891            method_name,
1892            arg: vec![],
1893            ingress_expiry_datetime: None,
1894        }
1895    }
1896
1897    /// Sets the [effective canister ID](https://internetcomputer.org/docs/current/references/ic-interface-spec#http-effective-canister-id) of the destination.
1898    pub fn with_effective_canister_id(mut self, canister_id: Principal) -> Self {
1899        self.effective_canister_id = canister_id;
1900        self
1901    }
1902
1903    /// Sets the argument blob to pass to the canister. For most canisters this should be a Candid-serialized tuple.
1904    pub fn with_arg<A: Into<Vec<u8>>>(mut self, arg: A) -> Self {
1905        self.arg = arg.into();
1906        self
1907    }
1908
1909    /// Sets `ingress_expiry_datetime` to the provided timestamp, at nanosecond precision.
1910    pub fn expire_at(mut self, time: impl Into<OffsetDateTime>) -> Self {
1911        self.ingress_expiry_datetime = Some(time.into().unix_timestamp_nanos() as u64);
1912        self
1913    }
1914
1915    /// Sets `ingress_expiry_datetime` to `min(now, 4min)`.
1916    pub fn expire_after(mut self, duration: Duration) -> Self {
1917        self.ingress_expiry_datetime = Some(
1918            OffsetDateTime::now_utc()
1919                .saturating_add(duration.try_into().expect("negative duration"))
1920                .unix_timestamp_nanos() as u64,
1921        );
1922        self
1923    }
1924
1925    /// Make an update call. This will call `request_status` on the `RequestId` in a loop and return
1926    /// the response as a byte vector.
1927    pub async fn call_and_wait(self) -> Result<Vec<u8>, AgentError> {
1928        self.call().and_wait().await.map(|x| x.0)
1929    }
1930
1931    /// Make an update call. This will return a `RequestId`.
1932    /// The `RequestId` should then be used for `request_status` (most likely in a loop).
1933    pub fn call(self) -> UpdateCall<'agent> {
1934        let method_name = self.method_name.clone();
1935        let response_future = async move {
1936            self.agent
1937                .update_raw(
1938                    self.canister_id,
1939                    self.effective_canister_id,
1940                    self.method_name,
1941                    self.arg,
1942                    self.ingress_expiry_datetime,
1943                )
1944                .await
1945        };
1946        UpdateCall {
1947            agent: self.agent,
1948            response_future: Box::pin(response_future),
1949            effective_canister_id: self.effective_canister_id,
1950            canister_id: self.canister_id,
1951            method_name,
1952        }
1953    }
1954
1955    /// Sign a update call. This will return a [`signed::SignedUpdate`]
1956    /// which contains all fields of the update and the signed update in CBOR encoding
1957    pub fn sign(self) -> Result<SignedUpdate, AgentError> {
1958        let identity = self.agent.identity.clone();
1959        let effective_canister_id = self.effective_canister_id;
1960        let content = self.into_envelope()?;
1961        let signed_update = sign_envelope(&content, identity)?;
1962        let request_id = to_request_id(&content)?;
1963        let EnvelopeContent::Call {
1964            nonce,
1965            ingress_expiry,
1966            sender,
1967            canister_id,
1968            method_name,
1969            arg,
1970        } = content
1971        else {
1972            unreachable!()
1973        };
1974        Ok(SignedUpdate {
1975            nonce,
1976            ingress_expiry,
1977            sender,
1978            canister_id,
1979            method_name,
1980            arg,
1981            effective_canister_id,
1982            signed_update,
1983            request_id,
1984        })
1985    }
1986
1987    /// Converts the update builder into an [`EnvelopeContent`] for external signing or storage.
1988    pub fn into_envelope(self) -> Result<EnvelopeContent, AgentError> {
1989        let nonce = self.agent.nonce_factory.generate();
1990        self.agent.update_content(
1991            self.canister_id,
1992            self.method_name,
1993            self.arg,
1994            self.ingress_expiry_datetime,
1995            nonce,
1996        )
1997    }
1998}
1999
2000impl<'agent> IntoFuture for UpdateBuilder<'agent> {
2001    type IntoFuture = AgentFuture<'agent, Vec<u8>>;
2002    type Output = Result<Vec<u8>, AgentError>;
2003    fn into_future(self) -> Self::IntoFuture {
2004        Box::pin(self.call_and_wait())
2005    }
2006}
2007
2008/// HTTP client middleware. Implemented automatically for `reqwest`-compatible by-ref `tower::Service`, such as `reqwest_middleware`.
2009#[cfg_attr(target_family = "wasm", async_trait(?Send))]
2010#[cfg_attr(not(target_family = "wasm"), async_trait)]
2011pub trait HttpService: Send + Sync + Debug {
2012    /// Perform a HTTP request. Any retry logic should call `req` again, instead of `Request::try_clone`.
2013    async fn call<'a>(
2014        &'a self,
2015        req: &'a (dyn Fn() -> Result<Request, AgentError> + Send + Sync),
2016        max_retries: usize,
2017    ) -> Result<Response, AgentError>;
2018}
2019#[cfg(not(target_family = "wasm"))]
2020#[async_trait]
2021impl<T> HttpService for T
2022where
2023    for<'a> &'a T: Service<Request, Response = Response, Error = reqwest::Error>,
2024    for<'a> <&'a Self as Service<Request>>::Future: Send,
2025    T: Send + Sync + Debug + ?Sized,
2026{
2027    #[allow(clippy::needless_arbitrary_self_type)]
2028    async fn call<'a>(
2029        mut self: &'a Self,
2030        req: &'a (dyn Fn() -> Result<Request, AgentError> + Send + Sync),
2031        max_retries: usize,
2032    ) -> Result<Response, AgentError> {
2033        let mut retry_count = 0;
2034        loop {
2035            match Service::call(&mut self, req()?).await {
2036                Err(err) => {
2037                    // Network-related errors can be retried.
2038                    if err.is_connect() {
2039                        if retry_count >= max_retries {
2040                            return Err(AgentError::TransportError(err));
2041                        }
2042                        retry_count += 1;
2043                    }
2044                }
2045                Ok(resp) => return Ok(resp),
2046            }
2047        }
2048    }
2049}
2050
2051#[cfg(target_family = "wasm")]
2052#[async_trait(?Send)]
2053impl<T> HttpService for T
2054where
2055    for<'a> &'a T: Service<Request, Response = Response, Error = reqwest::Error>,
2056    T: Send + Sync + Debug + ?Sized,
2057{
2058    #[allow(clippy::needless_arbitrary_self_type)]
2059    async fn call<'a>(
2060        mut self: &'a Self,
2061        req: &'a (dyn Fn() -> Result<Request, AgentError> + Send + Sync),
2062        _: usize,
2063    ) -> Result<Response, AgentError> {
2064        Ok(Service::call(&mut self, req()?).await?)
2065    }
2066}
2067
2068#[derive(Debug)]
2069struct Retry429Logic {
2070    client: Client,
2071}
2072
2073#[cfg_attr(target_family = "wasm", async_trait(?Send))]
2074#[cfg_attr(not(target_family = "wasm"), async_trait)]
2075impl HttpService for Retry429Logic {
2076    async fn call<'a>(
2077        &'a self,
2078        req: &'a (dyn Fn() -> Result<Request, AgentError> + Send + Sync),
2079        _max_tcp_retries: usize,
2080    ) -> Result<Response, AgentError> {
2081        let mut retries = 0;
2082        loop {
2083            #[cfg(not(target_family = "wasm"))]
2084            let resp = self.client.call(req, _max_tcp_retries).await?;
2085            // Client inconveniently does not implement Service on wasm
2086            #[cfg(target_family = "wasm")]
2087            let resp = self.client.execute(req()?).await?;
2088            if resp.status() == StatusCode::TOO_MANY_REQUESTS {
2089                if retries == 6 {
2090                    break Ok(resp);
2091                } else {
2092                    retries += 1;
2093                    crate::util::sleep(Duration::from_millis(250)).await;
2094                    continue;
2095                }
2096            } else {
2097                break Ok(resp);
2098            }
2099        }
2100    }
2101}
2102
2103#[cfg(all(test, not(target_family = "wasm")))]
2104mod offline_tests {
2105    use super::*;
2106    use tokio::net::TcpListener;
2107    // Any tests that involve the network should go in agent_test, not here.
2108
2109    #[test]
2110    fn rounded_expiry() {
2111        let agent = Agent::builder()
2112            .with_url("http://not-a-real-url")
2113            .build()
2114            .unwrap();
2115        let mut prev_expiry = None;
2116        let mut num_timestamps = 0;
2117        for _ in 0..6 {
2118            let update = agent
2119                .update(&Principal::management_canister(), "not_a_method")
2120                .sign()
2121                .unwrap();
2122            if prev_expiry < Some(update.ingress_expiry) {
2123                prev_expiry = Some(update.ingress_expiry);
2124                num_timestamps += 1;
2125            }
2126        }
2127        // in six requests, there should be no more than two timestamps
2128        assert!(num_timestamps <= 2, "num_timestamps:{num_timestamps} > 2");
2129    }
2130
2131    #[tokio::test]
2132    async fn client_ratelimit() {
2133        let mock_server = TcpListener::bind("127.0.0.1:0").await.unwrap();
2134        let count = Arc::new(Mutex::new(0));
2135        let port = mock_server.local_addr().unwrap().port();
2136        tokio::spawn({
2137            let count = count.clone();
2138            async move {
2139                loop {
2140                    let (mut conn, _) = mock_server.accept().await.unwrap();
2141                    *count.lock().unwrap() += 1;
2142                    tokio::spawn(
2143                        // read all data, never reply
2144                        async move { tokio::io::copy(&mut conn, &mut tokio::io::sink()).await },
2145                    );
2146                }
2147            }
2148        });
2149        let agent = Agent::builder()
2150            .with_http_client(Client::builder().http1_only().build().unwrap())
2151            .with_url(format!("http://127.0.0.1:{port}"))
2152            .with_max_concurrent_requests(2)
2153            .build()
2154            .unwrap();
2155        for _ in 0..3 {
2156            let agent = agent.clone();
2157            tokio::spawn(async move {
2158                agent
2159                    .query(&"ryjl3-tyaaa-aaaaa-aaaba-cai".parse().unwrap(), "greet")
2160                    .call()
2161                    .await
2162            });
2163        }
2164        crate::util::sleep(Duration::from_millis(250)).await;
2165        assert_eq!(*count.lock().unwrap(), 2);
2166    }
2167}