ic_agent/agent/
mod.rs

1//! The main Agent module. Contains the [Agent] type and all associated structures.
2pub(crate) mod agent_config;
3pub mod agent_error;
4pub(crate) mod builder;
5// delete this module after 0.40
6#[doc(hidden)]
7#[deprecated(since = "0.38.0", note = "use the AgentBuilder methods")]
8pub mod http_transport;
9pub(crate) mod nonce;
10pub(crate) mod response_authentication;
11pub mod route_provider;
12pub mod status;
13
14pub use agent_config::AgentConfig;
15pub use agent_error::AgentError;
16use agent_error::{HttpErrorPayload, Operation};
17use async_lock::Semaphore;
18use async_trait::async_trait;
19pub use builder::AgentBuilder;
20use cached::{Cached, TimedCache};
21use futures_util::StreamExt;
22use http::{header::CONTENT_TYPE, HeaderMap, Method, StatusCode};
23use ic_ed25519::{PublicKey, SignatureError};
24#[doc(inline)]
25pub use ic_transport_types::{
26    signed, CallResponse, Envelope, EnvelopeContent, RejectCode, RejectResponse, ReplyResponse,
27    RequestStatusResponse,
28};
29pub use nonce::{NonceFactory, NonceGenerator};
30use rangemap::{RangeInclusiveMap, RangeInclusiveSet, StepFns};
31use reqwest::{Body, Client, Request, Response};
32use route_provider::{
33    dynamic_routing::{
34        dynamic_route_provider::DynamicRouteProviderBuilder, node::Node,
35        snapshot::latency_based_routing::LatencyRoutingSnapshot,
36    },
37    RouteProvider, UrlUntilReady,
38};
39use time::OffsetDateTime;
40use tower_service::Service;
41
42#[cfg(test)]
43mod agent_test;
44
45use crate::{
46    agent::response_authentication::{
47        extract_der, lookup_canister_info, lookup_canister_metadata, lookup_request_status,
48        lookup_subnet, lookup_subnet_metrics, lookup_time, lookup_value,
49    },
50    export::Principal,
51    identity::Identity,
52    to_request_id, RequestId,
53};
54use backoff::{backoff::Backoff, ExponentialBackoffBuilder};
55use backoff::{exponential::ExponentialBackoff, SystemClock};
56use ic_certification::{Certificate, Delegation, Label};
57use ic_transport_types::{
58    signed::{SignedQuery, SignedRequestStatus, SignedUpdate},
59    QueryResponse, ReadStateResponse, SubnetMetrics, TransportCallResponse,
60};
61use serde::Serialize;
62use status::Status;
63use std::{
64    borrow::Cow,
65    collections::HashMap,
66    convert::TryFrom,
67    fmt::{self, Debug},
68    future::{Future, IntoFuture},
69    pin::Pin,
70    sync::{Arc, Mutex, RwLock},
71    task::{Context, Poll},
72    time::Duration,
73};
74
75use crate::agent::response_authentication::lookup_api_boundary_nodes;
76
77const IC_STATE_ROOT_DOMAIN_SEPARATOR: &[u8; 14] = b"\x0Dic-state-root";
78
79const IC_ROOT_KEY: &[u8; 133] = b"\x30\x81\x82\x30\x1d\x06\x0d\x2b\x06\x01\x04\x01\x82\xdc\x7c\x05\x03\x01\x02\x01\x06\x0c\x2b\x06\x01\x04\x01\x82\xdc\x7c\x05\x03\x02\x01\x03\x61\x00\x81\x4c\x0e\x6e\xc7\x1f\xab\x58\x3b\x08\xbd\x81\x37\x3c\x25\x5c\x3c\x37\x1b\x2e\x84\x86\x3c\x98\xa4\xf1\xe0\x8b\x74\x23\x5d\x14\xfb\x5d\x9c\x0c\xd5\x46\xd9\x68\x5f\x91\x3a\x0c\x0b\x2c\xc5\x34\x15\x83\xbf\x4b\x43\x92\xe4\x67\xdb\x96\xd6\x5b\x9b\xb4\xcb\x71\x71\x12\xf8\x47\x2e\x0d\x5a\x4d\x14\x50\x5f\xfd\x74\x84\xb0\x12\x91\x09\x1c\x5f\x87\xb9\x88\x83\x46\x3f\x98\x09\x1a\x0b\xaa\xae";
80
81#[cfg(not(target_family = "wasm"))]
82type AgentFuture<'a, V> = Pin<Box<dyn Future<Output = Result<V, AgentError>> + Send + 'a>>;
83
84#[cfg(target_family = "wasm")]
85type AgentFuture<'a, V> = Pin<Box<dyn Future<Output = Result<V, AgentError>> + 'a>>;
86
87/// A low level Agent to make calls to a Replica endpoint.
88///
89/// ```ignore
90/// # // This test is ignored because it requires an ic to be running. We run these
91/// # // in the ic-ref workflow.
92/// use ic_agent::{Agent, export::Principal};
93/// use candid::{Encode, Decode, CandidType, Nat};
94/// use serde::Deserialize;
95///
96/// #[derive(CandidType)]
97/// struct Argument {
98///   amount: Option<Nat>,
99/// }
100///
101/// #[derive(CandidType, Deserialize)]
102/// struct CreateCanisterResult {
103///   canister_id: Principal,
104/// }
105///
106/// # fn create_identity() -> impl ic_agent::Identity {
107/// #     // In real code, the raw key should be either read from a pem file or generated with randomness.
108/// #     ic_agent::identity::BasicIdentity::from_raw_key(&[0u8;32])
109/// # }
110/// #
111/// async fn create_a_canister() -> Result<Principal, Box<dyn std::error::Error>> {
112/// # let url = format!("http://localhost:{}", option_env!("IC_REF_PORT").unwrap_or("4943"));
113///   let agent = Agent::builder()
114///     .with_url(url)
115///     .with_identity(create_identity())
116///     .build()?;
117///
118///   // Only do the following call when not contacting the IC main net (e.g. a local emulator).
119///   // This is important as the main net public key is static and a rogue network could return
120///   // a different key.
121///   // If you know the root key ahead of time, you can use `agent.set_root_key(root_key);`.
122///   agent.fetch_root_key().await?;
123///   let management_canister_id = Principal::from_text("aaaaa-aa")?;
124///
125///   // Create a call to the management canister to create a new canister ID,
126///   // and wait for a result.
127///   // The effective canister id must belong to the canister ranges of the subnet at which the canister is created.
128///   let effective_canister_id = Principal::from_text("rwlgt-iiaaa-aaaaa-aaaaa-cai").unwrap();
129///   let response = agent.update(&management_canister_id, "provisional_create_canister_with_cycles")
130///     .with_effective_canister_id(effective_canister_id)
131///     .with_arg(Encode!(&Argument { amount: None })?)
132///     .await?;
133///
134///   let result = Decode!(response.as_slice(), CreateCanisterResult)?;
135///   let canister_id: Principal = Principal::from_text(&result.canister_id.to_text())?;
136///   Ok(canister_id)
137/// }
138///
139/// # let mut runtime = tokio::runtime::Runtime::new().unwrap();
140/// # runtime.block_on(async {
141/// let canister_id = create_a_canister().await.unwrap();
142/// eprintln!("{}", canister_id);
143/// # });
144/// ```
145///
146/// This agent does not understand Candid, and only acts on byte buffers.
147///
148/// Some methods return certificates. While there is a `verify_certificate` method, any certificate
149/// you receive from a method has already been verified and you do not need to manually verify it.
150#[derive(Clone)]
151pub struct Agent {
152    nonce_factory: Arc<dyn NonceGenerator>,
153    identity: Arc<dyn Identity>,
154    ingress_expiry: Duration,
155    root_key: Arc<RwLock<Vec<u8>>>,
156    client: Arc<dyn HttpService>,
157    route_provider: Arc<dyn RouteProvider>,
158    subnet_key_cache: Arc<Mutex<SubnetCache>>,
159    concurrent_requests_semaphore: Arc<Semaphore>,
160    verify_query_signatures: bool,
161    max_response_body_size: Option<usize>,
162    max_polling_time: Duration,
163    #[allow(dead_code)]
164    max_tcp_error_retries: usize,
165}
166
167impl fmt::Debug for Agent {
168    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
169        f.debug_struct("Agent")
170            .field("ingress_expiry", &self.ingress_expiry)
171            .finish_non_exhaustive()
172    }
173}
174
175impl Agent {
176    /// Create an instance of an [`AgentBuilder`] for building an [`Agent`]. This is simpler than
177    /// using the [`AgentConfig`] and [`Agent::new()`].
178    pub fn builder() -> builder::AgentBuilder {
179        Default::default()
180    }
181
182    /// Create an instance of an [`Agent`].
183    pub fn new(config: agent_config::AgentConfig) -> Result<Agent, AgentError> {
184        let client = config.http_service.unwrap_or_else(|| {
185            Arc::new(Retry429Logic {
186                client: config.client.unwrap_or_else(|| {
187                    #[cfg(not(target_family = "wasm"))]
188                    {
189                        Client::builder()
190                            .use_rustls_tls()
191                            .timeout(Duration::from_secs(360))
192                            .build()
193                            .expect("Could not create HTTP client.")
194                    }
195                    #[cfg(all(target_family = "wasm", feature = "wasm-bindgen"))]
196                    {
197                        Client::new()
198                    }
199                }),
200            })
201        });
202        Ok(Agent {
203            nonce_factory: config.nonce_factory,
204            identity: config.identity,
205            ingress_expiry: config.ingress_expiry,
206            root_key: Arc::new(RwLock::new(IC_ROOT_KEY.to_vec())),
207            client: client.clone(),
208            route_provider: if let Some(route_provider) = config.route_provider {
209                route_provider
210            } else if let Some(url) = config.url {
211                if config.background_dynamic_routing {
212                    assert!(
213                        url.scheme() == "https" && url.path() == "/" && url.port().is_none() && url.domain().is_some(),
214                        "in dynamic routing mode, URL must be in the exact form https://domain with no path, port, IP, or non-HTTPS scheme"
215                    );
216                    let seeds = vec![Node::new(url.domain().unwrap()).unwrap()];
217                    UrlUntilReady::new(url, async move {
218                        DynamicRouteProviderBuilder::new(
219                            LatencyRoutingSnapshot::new(),
220                            seeds,
221                            client,
222                        )
223                        .build()
224                        .await
225                    }) as Arc<dyn RouteProvider>
226                } else {
227                    Arc::new(url)
228                }
229            } else {
230                panic!("either route_provider or url must be specified");
231            },
232            subnet_key_cache: Arc::new(Mutex::new(SubnetCache::new())),
233            verify_query_signatures: config.verify_query_signatures,
234            concurrent_requests_semaphore: Arc::new(Semaphore::new(config.max_concurrent_requests)),
235            max_response_body_size: config.max_response_body_size,
236            max_tcp_error_retries: config.max_tcp_error_retries,
237            max_polling_time: config.max_polling_time,
238        })
239    }
240
241    /// Set the identity provider for signing messages.
242    ///
243    /// NOTE: if you change the identity while having update calls in
244    /// flight, you will not be able to [`Agent::request_status_raw`] the status of these
245    /// messages.
246    pub fn set_identity<I>(&mut self, identity: I)
247    where
248        I: 'static + Identity,
249    {
250        self.identity = Arc::new(identity);
251    }
252    /// Set the arc identity provider for signing messages.
253    ///
254    /// NOTE: if you change the identity while having update calls in
255    /// flight, you will not be able to [`Agent::request_status_raw`] the status of these
256    /// messages.
257    pub fn set_arc_identity(&mut self, identity: Arc<dyn Identity>) {
258        self.identity = identity;
259    }
260
261    /// By default, the agent is configured to talk to the main Internet Computer, and verifies
262    /// responses using a hard-coded public key.
263    ///
264    /// This function will instruct the agent to ask the endpoint for its public key, and use
265    /// that instead. This is required when talking to a local test instance, for example.
266    ///
267    /// *Only use this when you are  _not_ talking to the main Internet Computer, otherwise
268    /// you are prone to man-in-the-middle attacks! Do not call this function by default.*
269    pub async fn fetch_root_key(&self) -> Result<(), AgentError> {
270        if self.read_root_key()[..] != IC_ROOT_KEY[..] {
271            // already fetched the root key
272            return Ok(());
273        }
274        let status = self.status().await?;
275        let Some(root_key) = status.root_key else {
276            return Err(AgentError::NoRootKeyInStatus(status));
277        };
278        self.set_root_key(root_key);
279        Ok(())
280    }
281
282    /// By default, the agent is configured to talk to the main Internet Computer, and verifies
283    /// responses using a hard-coded public key.
284    ///
285    /// Using this function you can set the root key to a known one if you know if beforehand.
286    pub fn set_root_key(&self, root_key: Vec<u8>) {
287        *self.root_key.write().unwrap() = root_key;
288    }
289
290    /// Return the root key currently in use.
291    pub fn read_root_key(&self) -> Vec<u8> {
292        self.root_key.read().unwrap().clone()
293    }
294
295    fn get_expiry_date(&self) -> u64 {
296        let expiry_raw = OffsetDateTime::now_utc() + self.ingress_expiry;
297        let mut rounded = expiry_raw.replace_nanosecond(0).unwrap();
298        if self.ingress_expiry.as_secs() > 90 {
299            rounded = rounded.replace_second(0).unwrap();
300        }
301        rounded.unix_timestamp_nanos().try_into().unwrap()
302    }
303
304    /// Return the principal of the identity.
305    pub fn get_principal(&self) -> Result<Principal, String> {
306        self.identity.sender()
307    }
308
309    async fn query_endpoint<A>(
310        &self,
311        effective_canister_id: Principal,
312        serialized_bytes: Vec<u8>,
313    ) -> Result<A, AgentError>
314    where
315        A: serde::de::DeserializeOwned,
316    {
317        let _permit = self.concurrent_requests_semaphore.acquire().await;
318        let bytes = self
319            .execute(
320                Method::POST,
321                &format!("api/v2/canister/{}/query", effective_canister_id.to_text()),
322                Some(serialized_bytes),
323            )
324            .await?
325            .1;
326        serde_cbor::from_slice(&bytes).map_err(AgentError::InvalidCborData)
327    }
328
329    async fn read_state_endpoint<A>(
330        &self,
331        effective_canister_id: Principal,
332        serialized_bytes: Vec<u8>,
333    ) -> Result<A, AgentError>
334    where
335        A: serde::de::DeserializeOwned,
336    {
337        let _permit = self.concurrent_requests_semaphore.acquire().await;
338        let endpoint = format!(
339            "api/v2/canister/{}/read_state",
340            effective_canister_id.to_text()
341        );
342        let bytes = self
343            .execute(Method::POST, &endpoint, Some(serialized_bytes))
344            .await?
345            .1;
346        serde_cbor::from_slice(&bytes).map_err(AgentError::InvalidCborData)
347    }
348
349    async fn read_subnet_state_endpoint<A>(
350        &self,
351        subnet_id: Principal,
352        serialized_bytes: Vec<u8>,
353    ) -> Result<A, AgentError>
354    where
355        A: serde::de::DeserializeOwned,
356    {
357        let _permit = self.concurrent_requests_semaphore.acquire().await;
358        let endpoint = format!("api/v2/subnet/{}/read_state", subnet_id.to_text());
359        let bytes = self
360            .execute(Method::POST, &endpoint, Some(serialized_bytes))
361            .await?
362            .1;
363        serde_cbor::from_slice(&bytes).map_err(AgentError::InvalidCborData)
364    }
365
366    async fn call_endpoint(
367        &self,
368        effective_canister_id: Principal,
369        serialized_bytes: Vec<u8>,
370    ) -> Result<TransportCallResponse, AgentError> {
371        let _permit = self.concurrent_requests_semaphore.acquire().await;
372        let endpoint = format!("api/v3/canister/{}/call", effective_canister_id.to_text());
373        let (status_code, response_body) = self
374            .execute(Method::POST, &endpoint, Some(serialized_bytes))
375            .await?;
376
377        if status_code == StatusCode::ACCEPTED {
378            return Ok(TransportCallResponse::Accepted);
379        }
380
381        serde_cbor::from_slice(&response_body).map_err(AgentError::InvalidCborData)
382    }
383
384    /// The simplest way to do a query call; sends a byte array and will return a byte vector.
385    /// The encoding is left as an exercise to the user.
386    #[allow(clippy::too_many_arguments)]
387    async fn query_raw(
388        &self,
389        canister_id: Principal,
390        effective_canister_id: Principal,
391        method_name: String,
392        arg: Vec<u8>,
393        ingress_expiry_datetime: Option<u64>,
394        use_nonce: bool,
395        explicit_verify_query_signatures: Option<bool>,
396    ) -> Result<Vec<u8>, AgentError> {
397        let operation = Operation::Call {
398            canister: canister_id,
399            method: method_name.clone(),
400        };
401        let content = self.query_content(
402            canister_id,
403            method_name,
404            arg,
405            ingress_expiry_datetime,
406            use_nonce,
407        )?;
408        let serialized_bytes = sign_envelope(&content, self.identity.clone())?;
409        self.query_inner(
410            effective_canister_id,
411            serialized_bytes,
412            content.to_request_id(),
413            explicit_verify_query_signatures,
414            operation,
415        )
416        .await
417    }
418
419    /// Send the signed query to the network. Will return a byte vector.
420    /// The bytes will be checked if it is a valid query.
421    /// If you want to inspect the fields of the query call, use [`signed_query_inspect`] before calling this method.
422    pub async fn query_signed(
423        &self,
424        effective_canister_id: Principal,
425        signed_query: Vec<u8>,
426    ) -> Result<Vec<u8>, AgentError> {
427        let envelope: Envelope =
428            serde_cbor::from_slice(&signed_query).map_err(AgentError::InvalidCborData)?;
429        let EnvelopeContent::Query {
430            canister_id,
431            method_name,
432            ..
433        } = &*envelope.content
434        else {
435            return Err(AgentError::CallDataMismatch {
436                field: "request_type".to_string(),
437                value_arg: "query".to_string(),
438                value_cbor: if matches!(*envelope.content, EnvelopeContent::Call { .. }) {
439                    "update"
440                } else {
441                    "read_state"
442                }
443                .to_string(),
444            });
445        };
446        let operation = Operation::Call {
447            canister: *canister_id,
448            method: method_name.clone(),
449        };
450        self.query_inner(
451            effective_canister_id,
452            signed_query,
453            envelope.content.to_request_id(),
454            None,
455            operation,
456        )
457        .await
458    }
459
460    /// Helper function for performing both the query call and possibly a `read_state` to check the subnet node keys.
461    ///
462    /// This should be used instead of `query_endpoint`. No validation is performed on `signed_query`.
463    async fn query_inner(
464        &self,
465        effective_canister_id: Principal,
466        signed_query: Vec<u8>,
467        request_id: RequestId,
468        explicit_verify_query_signatures: Option<bool>,
469        operation: Operation,
470    ) -> Result<Vec<u8>, AgentError> {
471        let response = if explicit_verify_query_signatures.unwrap_or(self.verify_query_signatures) {
472            let (response, mut subnet) = futures_util::try_join!(
473                self.query_endpoint::<QueryResponse>(effective_canister_id, signed_query),
474                self.get_subnet_by_canister(&effective_canister_id)
475            )?;
476            if response.signatures().is_empty() {
477                return Err(AgentError::MissingSignature);
478            } else if response.signatures().len() > subnet.node_keys.len() {
479                return Err(AgentError::TooManySignatures {
480                    had: response.signatures().len(),
481                    needed: subnet.node_keys.len(),
482                });
483            }
484            for signature in response.signatures() {
485                if OffsetDateTime::now_utc()
486                    - OffsetDateTime::from_unix_timestamp_nanos(signature.timestamp.into()).unwrap()
487                    > self.ingress_expiry
488                {
489                    return Err(AgentError::CertificateOutdated(self.ingress_expiry));
490                }
491                let signable = response.signable(request_id, signature.timestamp);
492                let node_key = if let Some(node_key) = subnet.node_keys.get(&signature.identity) {
493                    node_key
494                } else {
495                    subnet = self
496                        .fetch_subnet_by_canister(&effective_canister_id)
497                        .await?;
498                    subnet
499                        .node_keys
500                        .get(&signature.identity)
501                        .ok_or(AgentError::CertificateNotAuthorized())?
502                };
503                if node_key.len() != 44 {
504                    return Err(AgentError::DerKeyLengthMismatch {
505                        expected: 44,
506                        actual: node_key.len(),
507                    });
508                }
509                const DER_PREFIX: [u8; 12] = [48, 42, 48, 5, 6, 3, 43, 101, 112, 3, 33, 0];
510                if node_key[..12] != DER_PREFIX {
511                    return Err(AgentError::DerPrefixMismatch {
512                        expected: DER_PREFIX.to_vec(),
513                        actual: node_key[..12].to_vec(),
514                    });
515                }
516                let pubkey = PublicKey::deserialize_raw(&node_key[12..])
517                    .map_err(|_| AgentError::MalformedPublicKey)?;
518
519                match pubkey.verify_signature(&signable, &signature.signature[..]) {
520                    Ok(()) => (),
521                    Err(SignatureError::InvalidSignature) => {
522                        return Err(AgentError::QuerySignatureVerificationFailed)
523                    }
524                    Err(SignatureError::InvalidLength) => {
525                        return Err(AgentError::MalformedSignature)
526                    }
527                    _ => unreachable!(),
528                }
529            }
530            response
531        } else {
532            self.query_endpoint::<QueryResponse>(effective_canister_id, signed_query)
533                .await?
534        };
535
536        match response {
537            QueryResponse::Replied { reply, .. } => Ok(reply.arg),
538            QueryResponse::Rejected { reject, .. } => Err(AgentError::UncertifiedReject {
539                reject,
540                operation: Some(operation),
541            }),
542        }
543    }
544
545    fn query_content(
546        &self,
547        canister_id: Principal,
548        method_name: String,
549        arg: Vec<u8>,
550        ingress_expiry_datetime: Option<u64>,
551        use_nonce: bool,
552    ) -> Result<EnvelopeContent, AgentError> {
553        Ok(EnvelopeContent::Query {
554            sender: self.identity.sender().map_err(AgentError::SigningError)?,
555            canister_id,
556            method_name,
557            arg,
558            ingress_expiry: ingress_expiry_datetime.unwrap_or_else(|| self.get_expiry_date()),
559            nonce: use_nonce.then(|| self.nonce_factory.generate()).flatten(),
560        })
561    }
562
563    /// The simplest way to do an update call; sends a byte array and will return a response, [`CallResponse`], from the replica.
564    async fn update_raw(
565        &self,
566        canister_id: Principal,
567        effective_canister_id: Principal,
568        method_name: String,
569        arg: Vec<u8>,
570        ingress_expiry_datetime: Option<u64>,
571    ) -> Result<CallResponse<(Vec<u8>, Certificate)>, AgentError> {
572        let nonce = self.nonce_factory.generate();
573        let content = self.update_content(
574            canister_id,
575            method_name.clone(),
576            arg,
577            ingress_expiry_datetime,
578            nonce,
579        )?;
580        let operation = Some(Operation::Call {
581            canister: canister_id,
582            method: method_name,
583        });
584        let request_id = to_request_id(&content)?;
585        let serialized_bytes = sign_envelope(&content, self.identity.clone())?;
586
587        let response_body = self
588            .call_endpoint(effective_canister_id, serialized_bytes)
589            .await?;
590
591        match response_body {
592            TransportCallResponse::Replied { certificate } => {
593                let certificate =
594                    serde_cbor::from_slice(&certificate).map_err(AgentError::InvalidCborData)?;
595
596                self.verify(&certificate, effective_canister_id)?;
597                let status = lookup_request_status(&certificate, &request_id)?;
598
599                match status {
600                    RequestStatusResponse::Replied(reply) => {
601                        Ok(CallResponse::Response((reply.arg, certificate)))
602                    }
603                    RequestStatusResponse::Rejected(reject_response) => {
604                        Err(AgentError::CertifiedReject {
605                            reject: reject_response,
606                            operation,
607                        })?
608                    }
609                    _ => Ok(CallResponse::Poll(request_id)),
610                }
611            }
612            TransportCallResponse::Accepted => Ok(CallResponse::Poll(request_id)),
613            TransportCallResponse::NonReplicatedRejection(reject_response) => {
614                Err(AgentError::UncertifiedReject {
615                    reject: reject_response,
616                    operation,
617                })
618            }
619        }
620    }
621
622    /// Send the signed update to the network. Will return a [`CallResponse<Vec<u8>>`].
623    /// The bytes will be checked to verify that it is a valid update.
624    /// If you want to inspect the fields of the update, use [`signed_update_inspect`] before calling this method.
625    pub async fn update_signed(
626        &self,
627        effective_canister_id: Principal,
628        signed_update: Vec<u8>,
629    ) -> Result<CallResponse<Vec<u8>>, AgentError> {
630        let envelope: Envelope =
631            serde_cbor::from_slice(&signed_update).map_err(AgentError::InvalidCborData)?;
632        let EnvelopeContent::Call {
633            canister_id,
634            method_name,
635            ..
636        } = &*envelope.content
637        else {
638            return Err(AgentError::CallDataMismatch {
639                field: "request_type".to_string(),
640                value_arg: "update".to_string(),
641                value_cbor: if matches!(*envelope.content, EnvelopeContent::Query { .. }) {
642                    "query"
643                } else {
644                    "read_state"
645                }
646                .to_string(),
647            });
648        };
649        let operation = Some(Operation::Call {
650            canister: *canister_id,
651            method: method_name.clone(),
652        });
653        let request_id = to_request_id(&envelope.content)?;
654
655        let response_body = self
656            .call_endpoint(effective_canister_id, signed_update)
657            .await?;
658
659        match response_body {
660            TransportCallResponse::Replied { certificate } => {
661                let certificate =
662                    serde_cbor::from_slice(&certificate).map_err(AgentError::InvalidCborData)?;
663
664                self.verify(&certificate, effective_canister_id)?;
665                let status = lookup_request_status(&certificate, &request_id)?;
666
667                match status {
668                    RequestStatusResponse::Replied(reply) => Ok(CallResponse::Response(reply.arg)),
669                    RequestStatusResponse::Rejected(reject_response) => {
670                        Err(AgentError::CertifiedReject {
671                            reject: reject_response,
672                            operation,
673                        })?
674                    }
675                    _ => Ok(CallResponse::Poll(request_id)),
676                }
677            }
678            TransportCallResponse::Accepted => Ok(CallResponse::Poll(request_id)),
679            TransportCallResponse::NonReplicatedRejection(reject_response) => {
680                Err(AgentError::UncertifiedReject {
681                    reject: reject_response,
682                    operation,
683                })
684            }
685        }
686    }
687
688    fn update_content(
689        &self,
690        canister_id: Principal,
691        method_name: String,
692        arg: Vec<u8>,
693        ingress_expiry_datetime: Option<u64>,
694        nonce: Option<Vec<u8>>,
695    ) -> Result<EnvelopeContent, AgentError> {
696        Ok(EnvelopeContent::Call {
697            canister_id,
698            method_name,
699            arg,
700            nonce,
701            sender: self.identity.sender().map_err(AgentError::SigningError)?,
702            ingress_expiry: ingress_expiry_datetime.unwrap_or_else(|| self.get_expiry_date()),
703        })
704    }
705
706    fn get_retry_policy(&self) -> ExponentialBackoff<SystemClock> {
707        ExponentialBackoffBuilder::new()
708            .with_initial_interval(Duration::from_millis(500))
709            .with_max_interval(Duration::from_secs(1))
710            .with_multiplier(1.4)
711            .with_max_elapsed_time(Some(self.max_polling_time))
712            .build()
713    }
714
715    /// Wait for `request_status` to return a Replied response and return the arg.
716    pub async fn wait_signed(
717        &self,
718        request_id: &RequestId,
719        effective_canister_id: Principal,
720        signed_request_status: Vec<u8>,
721    ) -> Result<(Vec<u8>, Certificate), AgentError> {
722        let mut retry_policy = self.get_retry_policy();
723
724        let mut request_accepted = false;
725        let (resp, cert) = self
726            .request_status_signed(
727                request_id,
728                effective_canister_id,
729                signed_request_status.clone(),
730            )
731            .await?;
732        loop {
733            match resp {
734                RequestStatusResponse::Unknown => {}
735
736                RequestStatusResponse::Received | RequestStatusResponse::Processing => {
737                    if !request_accepted {
738                        retry_policy.reset();
739                        request_accepted = true;
740                    }
741                }
742
743                RequestStatusResponse::Replied(ReplyResponse { arg, .. }) => {
744                    return Ok((arg, cert))
745                }
746
747                RequestStatusResponse::Rejected(response) => {
748                    return Err(AgentError::CertifiedReject {
749                        reject: response,
750                        operation: None,
751                    })
752                }
753
754                RequestStatusResponse::Done => {
755                    return Err(AgentError::RequestStatusDoneNoReply(String::from(
756                        *request_id,
757                    )))
758                }
759            };
760
761            match retry_policy.next_backoff() {
762                Some(duration) => crate::util::sleep(duration).await,
763
764                None => return Err(AgentError::TimeoutWaitingForResponse()),
765            }
766        }
767    }
768
769    /// Call `request_status` on the `RequestId` in a loop and return the response as a byte vector.
770    pub async fn wait(
771        &self,
772        request_id: &RequestId,
773        effective_canister_id: Principal,
774    ) -> Result<(Vec<u8>, Certificate), AgentError> {
775        self.wait_inner(request_id, effective_canister_id, None)
776            .await
777    }
778
779    async fn wait_inner(
780        &self,
781        request_id: &RequestId,
782        effective_canister_id: Principal,
783        operation: Option<Operation>,
784    ) -> Result<(Vec<u8>, Certificate), AgentError> {
785        let mut retry_policy = self.get_retry_policy();
786
787        let mut request_accepted = false;
788        loop {
789            let (resp, cert) = self
790                .request_status_raw(request_id, effective_canister_id)
791                .await?;
792            match resp {
793                RequestStatusResponse::Unknown => {}
794
795                RequestStatusResponse::Received | RequestStatusResponse::Processing => {
796                    if !request_accepted {
797                        // The system will return RequestStatusResponse::Unknown
798                        // until the request is accepted
799                        // and we generally cannot know how long that will take.
800                        // State transitions between Received and Processing may be
801                        // instantaneous. Therefore, once we know the request is accepted,
802                        // we should restart the backoff so the request does not time out.
803
804                        retry_policy.reset();
805                        request_accepted = true;
806                    }
807                }
808
809                RequestStatusResponse::Replied(ReplyResponse { arg, .. }) => {
810                    return Ok((arg, cert))
811                }
812
813                RequestStatusResponse::Rejected(response) => {
814                    return Err(AgentError::CertifiedReject {
815                        reject: response,
816                        operation,
817                    })
818                }
819
820                RequestStatusResponse::Done => {
821                    return Err(AgentError::RequestStatusDoneNoReply(String::from(
822                        *request_id,
823                    )))
824                }
825            };
826
827            match retry_policy.next_backoff() {
828                Some(duration) => crate::util::sleep(duration).await,
829
830                None => return Err(AgentError::TimeoutWaitingForResponse()),
831            }
832        }
833    }
834
835    /// Request the raw state tree directly, under an effective canister ID.
836    /// See [the protocol docs](https://internetcomputer.org/docs/current/references/ic-interface-spec#http-read-state) for more information.
837    pub async fn read_state_raw(
838        &self,
839        paths: Vec<Vec<Label>>,
840        effective_canister_id: Principal,
841    ) -> Result<Certificate, AgentError> {
842        let content = self.read_state_content(paths)?;
843        let serialized_bytes = sign_envelope(&content, self.identity.clone())?;
844
845        let read_state_response: ReadStateResponse = self
846            .read_state_endpoint(effective_canister_id, serialized_bytes)
847            .await?;
848        let cert: Certificate = serde_cbor::from_slice(&read_state_response.certificate)
849            .map_err(AgentError::InvalidCborData)?;
850        self.verify(&cert, effective_canister_id)?;
851        Ok(cert)
852    }
853
854    /// Request the raw state tree directly, under a subnet ID.
855    /// See [the protocol docs](https://internetcomputer.org/docs/current/references/ic-interface-spec#http-read-state) for more information.
856    pub async fn read_subnet_state_raw(
857        &self,
858        paths: Vec<Vec<Label>>,
859        subnet_id: Principal,
860    ) -> Result<Certificate, AgentError> {
861        let content = self.read_state_content(paths)?;
862        let serialized_bytes = sign_envelope(&content, self.identity.clone())?;
863
864        let read_state_response: ReadStateResponse = self
865            .read_subnet_state_endpoint(subnet_id, serialized_bytes)
866            .await?;
867        let cert: Certificate = serde_cbor::from_slice(&read_state_response.certificate)
868            .map_err(AgentError::InvalidCborData)?;
869        self.verify_for_subnet(&cert, subnet_id)?;
870        Ok(cert)
871    }
872
873    fn read_state_content(&self, paths: Vec<Vec<Label>>) -> Result<EnvelopeContent, AgentError> {
874        Ok(EnvelopeContent::ReadState {
875            sender: self.identity.sender().map_err(AgentError::SigningError)?,
876            paths,
877            ingress_expiry: self.get_expiry_date(),
878        })
879    }
880
881    /// Verify a certificate, checking delegation if present.
882    /// Only passes if the certificate also has authority over the canister.
883    pub fn verify(
884        &self,
885        cert: &Certificate,
886        effective_canister_id: Principal,
887    ) -> Result<(), AgentError> {
888        self.verify_cert(cert, effective_canister_id)?;
889        self.verify_cert_timestamp(cert)?;
890        Ok(())
891    }
892
893    fn verify_cert(
894        &self,
895        cert: &Certificate,
896        effective_canister_id: Principal,
897    ) -> Result<(), AgentError> {
898        let sig = &cert.signature;
899
900        let root_hash = cert.tree.digest();
901        let mut msg = vec![];
902        msg.extend_from_slice(IC_STATE_ROOT_DOMAIN_SEPARATOR);
903        msg.extend_from_slice(&root_hash);
904
905        let der_key = self.check_delegation(&cert.delegation, effective_canister_id)?;
906        let key = extract_der(der_key)?;
907
908        ic_verify_bls_signature::verify_bls_signature(sig, &msg, &key)
909            .map_err(|_| AgentError::CertificateVerificationFailed())?;
910        Ok(())
911    }
912
913    /// Verify a certificate, checking delegation if present.
914    /// Only passes if the certificate is for the specified subnet.
915    pub fn verify_for_subnet(
916        &self,
917        cert: &Certificate,
918        subnet_id: Principal,
919    ) -> Result<(), AgentError> {
920        self.verify_cert_for_subnet(cert, subnet_id)?;
921        self.verify_cert_timestamp(cert)?;
922        Ok(())
923    }
924
925    fn verify_cert_for_subnet(
926        &self,
927        cert: &Certificate,
928        subnet_id: Principal,
929    ) -> Result<(), AgentError> {
930        let sig = &cert.signature;
931
932        let root_hash = cert.tree.digest();
933        let mut msg = vec![];
934        msg.extend_from_slice(IC_STATE_ROOT_DOMAIN_SEPARATOR);
935        msg.extend_from_slice(&root_hash);
936
937        let der_key = self.check_delegation_for_subnet(&cert.delegation, subnet_id)?;
938        let key = extract_der(der_key)?;
939
940        ic_verify_bls_signature::verify_bls_signature(sig, &msg, &key)
941            .map_err(|_| AgentError::CertificateVerificationFailed())?;
942        Ok(())
943    }
944
945    fn verify_cert_timestamp(&self, cert: &Certificate) -> Result<(), AgentError> {
946        let time = lookup_time(cert)?;
947        if (OffsetDateTime::now_utc()
948            - OffsetDateTime::from_unix_timestamp_nanos(time.into()).unwrap())
949        .abs()
950            > self.ingress_expiry
951        {
952            Err(AgentError::CertificateOutdated(self.ingress_expiry))
953        } else {
954            Ok(())
955        }
956    }
957
958    fn check_delegation(
959        &self,
960        delegation: &Option<Delegation>,
961        effective_canister_id: Principal,
962    ) -> Result<Vec<u8>, AgentError> {
963        match delegation {
964            None => Ok(self.read_root_key()),
965            Some(delegation) => {
966                let cert: Certificate = serde_cbor::from_slice(&delegation.certificate)
967                    .map_err(AgentError::InvalidCborData)?;
968                if cert.delegation.is_some() {
969                    return Err(AgentError::CertificateHasTooManyDelegations);
970                }
971                self.verify_cert(&cert, effective_canister_id)?;
972                let canister_range_lookup = [
973                    "subnet".as_bytes(),
974                    delegation.subnet_id.as_ref(),
975                    "canister_ranges".as_bytes(),
976                ];
977                let canister_range = lookup_value(&cert.tree, canister_range_lookup)?;
978                let ranges: Vec<(Principal, Principal)> =
979                    serde_cbor::from_slice(canister_range).map_err(AgentError::InvalidCborData)?;
980                if !principal_is_within_ranges(&effective_canister_id, &ranges[..]) {
981                    // the certificate is not authorized to answer calls for this canister
982                    return Err(AgentError::CertificateNotAuthorized());
983                }
984
985                let public_key_path = [
986                    "subnet".as_bytes(),
987                    delegation.subnet_id.as_ref(),
988                    "public_key".as_bytes(),
989                ];
990                lookup_value(&cert.tree, public_key_path).map(<[u8]>::to_vec)
991            }
992        }
993    }
994
995    fn check_delegation_for_subnet(
996        &self,
997        delegation: &Option<Delegation>,
998        subnet_id: Principal,
999    ) -> Result<Vec<u8>, AgentError> {
1000        match delegation {
1001            None => Ok(self.read_root_key()),
1002            Some(delegation) => {
1003                let cert: Certificate = serde_cbor::from_slice(&delegation.certificate)
1004                    .map_err(AgentError::InvalidCborData)?;
1005                if cert.delegation.is_some() {
1006                    return Err(AgentError::CertificateHasTooManyDelegations);
1007                }
1008                self.verify_cert_for_subnet(&cert, subnet_id)?;
1009                let public_key_path = [
1010                    "subnet".as_bytes(),
1011                    delegation.subnet_id.as_ref(),
1012                    "public_key".as_bytes(),
1013                ];
1014                let pk = lookup_value(&cert.tree, public_key_path)
1015                    .map_err(|_| AgentError::CertificateNotAuthorized())?
1016                    .to_vec();
1017                Ok(pk)
1018            }
1019        }
1020    }
1021
1022    /// Request information about a particular canister for a single state subkey.
1023    /// See [the protocol docs](https://internetcomputer.org/docs/current/references/ic-interface-spec#state-tree-canister-information) for more information.
1024    pub async fn read_state_canister_info(
1025        &self,
1026        canister_id: Principal,
1027        path: &str,
1028    ) -> Result<Vec<u8>, AgentError> {
1029        let paths: Vec<Vec<Label>> = vec![vec![
1030            "canister".into(),
1031            Label::from_bytes(canister_id.as_slice()),
1032            path.into(),
1033        ]];
1034
1035        let cert = self.read_state_raw(paths, canister_id).await?;
1036
1037        lookup_canister_info(cert, canister_id, path)
1038    }
1039
1040    /// Request the controller list of a given canister.
1041    pub async fn read_state_canister_controllers(
1042        &self,
1043        canister_id: Principal,
1044    ) -> Result<Vec<Principal>, AgentError> {
1045        let blob = self
1046            .read_state_canister_info(canister_id, "controllers")
1047            .await?;
1048        let controllers: Vec<Principal> =
1049            serde_cbor::from_slice(&blob).map_err(AgentError::InvalidCborData)?;
1050        Ok(controllers)
1051    }
1052
1053    /// Request the module hash of a given canister.
1054    pub async fn read_state_canister_module_hash(
1055        &self,
1056        canister_id: Principal,
1057    ) -> Result<Vec<u8>, AgentError> {
1058        self.read_state_canister_info(canister_id, "module_hash")
1059            .await
1060    }
1061
1062    /// Request the bytes of the canister's custom section `icp:public <path>` or `icp:private <path>`.
1063    pub async fn read_state_canister_metadata(
1064        &self,
1065        canister_id: Principal,
1066        path: &str,
1067    ) -> Result<Vec<u8>, AgentError> {
1068        let paths: Vec<Vec<Label>> = vec![vec![
1069            "canister".into(),
1070            Label::from_bytes(canister_id.as_slice()),
1071            "metadata".into(),
1072            path.into(),
1073        ]];
1074
1075        let cert = self.read_state_raw(paths, canister_id).await?;
1076
1077        lookup_canister_metadata(cert, canister_id, path)
1078    }
1079
1080    /// Request a list of metrics about the subnet.
1081    pub async fn read_state_subnet_metrics(
1082        &self,
1083        subnet_id: Principal,
1084    ) -> Result<SubnetMetrics, AgentError> {
1085        let paths = vec![vec![
1086            "subnet".into(),
1087            Label::from_bytes(subnet_id.as_slice()),
1088            "metrics".into(),
1089        ]];
1090        let cert = self.read_subnet_state_raw(paths, subnet_id).await?;
1091        lookup_subnet_metrics(cert, subnet_id)
1092    }
1093
1094    /// Fetches the status of a particular request by its ID.
1095    pub async fn request_status_raw(
1096        &self,
1097        request_id: &RequestId,
1098        effective_canister_id: Principal,
1099    ) -> Result<(RequestStatusResponse, Certificate), AgentError> {
1100        let paths: Vec<Vec<Label>> =
1101            vec![vec!["request_status".into(), request_id.to_vec().into()]];
1102
1103        let cert = self.read_state_raw(paths, effective_canister_id).await?;
1104
1105        Ok((lookup_request_status(&cert, request_id)?, cert))
1106    }
1107
1108    /// Send the signed `request_status` to the network. Will return [`RequestStatusResponse`].
1109    /// The bytes will be checked to verify that it is a valid `request_status`.
1110    /// If you want to inspect the fields of the `request_status`, use [`signed_request_status_inspect`] before calling this method.
1111    pub async fn request_status_signed(
1112        &self,
1113        request_id: &RequestId,
1114        effective_canister_id: Principal,
1115        signed_request_status: Vec<u8>,
1116    ) -> Result<(RequestStatusResponse, Certificate), AgentError> {
1117        let _envelope: Envelope =
1118            serde_cbor::from_slice(&signed_request_status).map_err(AgentError::InvalidCborData)?;
1119        let read_state_response: ReadStateResponse = self
1120            .read_state_endpoint(effective_canister_id, signed_request_status)
1121            .await?;
1122
1123        let cert: Certificate = serde_cbor::from_slice(&read_state_response.certificate)
1124            .map_err(AgentError::InvalidCborData)?;
1125        self.verify(&cert, effective_canister_id)?;
1126        Ok((lookup_request_status(&cert, request_id)?, cert))
1127    }
1128
1129    /// Returns an `UpdateBuilder` enabling the construction of an update call without
1130    /// passing all arguments.
1131    pub fn update<S: Into<String>>(
1132        &self,
1133        canister_id: &Principal,
1134        method_name: S,
1135    ) -> UpdateBuilder {
1136        UpdateBuilder::new(self, *canister_id, method_name.into())
1137    }
1138
1139    /// Calls and returns the information returned by the status endpoint of a replica.
1140    pub async fn status(&self) -> Result<Status, AgentError> {
1141        let endpoint = "api/v2/status";
1142        let bytes = self.execute(Method::GET, endpoint, None).await?.1;
1143
1144        let cbor: serde_cbor::Value =
1145            serde_cbor::from_slice(&bytes).map_err(AgentError::InvalidCborData)?;
1146
1147        Status::try_from(&cbor).map_err(|_| AgentError::InvalidReplicaStatus)
1148    }
1149
1150    /// Returns a `QueryBuilder` enabling the construction of a query call without
1151    /// passing all arguments.
1152    pub fn query<S: Into<String>>(&self, canister_id: &Principal, method_name: S) -> QueryBuilder {
1153        QueryBuilder::new(self, *canister_id, method_name.into())
1154    }
1155
1156    /// Sign a `request_status` call. This will return a [`signed::SignedRequestStatus`]
1157    /// which contains all fields of the `request_status` and the signed `request_status` in CBOR encoding
1158    pub fn sign_request_status(
1159        &self,
1160        effective_canister_id: Principal,
1161        request_id: RequestId,
1162    ) -> Result<SignedRequestStatus, AgentError> {
1163        let paths: Vec<Vec<Label>> =
1164            vec![vec!["request_status".into(), request_id.to_vec().into()]];
1165        let read_state_content = self.read_state_content(paths)?;
1166        let signed_request_status = sign_envelope(&read_state_content, self.identity.clone())?;
1167        let ingress_expiry = read_state_content.ingress_expiry();
1168        let sender = *read_state_content.sender();
1169        Ok(SignedRequestStatus {
1170            ingress_expiry,
1171            sender,
1172            effective_canister_id,
1173            request_id,
1174            signed_request_status,
1175        })
1176    }
1177
1178    async fn get_subnet_by_canister(
1179        &self,
1180        canister: &Principal,
1181    ) -> Result<Arc<Subnet>, AgentError> {
1182        let subnet = self
1183            .subnet_key_cache
1184            .lock()
1185            .unwrap()
1186            .get_subnet_by_canister(canister);
1187        if let Some(subnet) = subnet {
1188            Ok(subnet)
1189        } else {
1190            self.fetch_subnet_by_canister(canister).await
1191        }
1192    }
1193
1194    /// Retrieve all existing API boundary nodes from the state tree via endpoint `/api/v2/canister/<effective_canister_id>/read_state`
1195    pub async fn fetch_api_boundary_nodes_by_canister_id(
1196        &self,
1197        canister_id: Principal,
1198    ) -> Result<Vec<ApiBoundaryNode>, AgentError> {
1199        let paths = vec![vec!["api_boundary_nodes".into()]];
1200        let certificate = self.read_state_raw(paths, canister_id).await?;
1201        let api_boundary_nodes = lookup_api_boundary_nodes(certificate)?;
1202        Ok(api_boundary_nodes)
1203    }
1204
1205    /// Retrieve all existing API boundary nodes from the state tree via endpoint `/api/v2/subnet/<subnet_id>/read_state`
1206    pub async fn fetch_api_boundary_nodes_by_subnet_id(
1207        &self,
1208        subnet_id: Principal,
1209    ) -> Result<Vec<ApiBoundaryNode>, AgentError> {
1210        let paths = vec![vec!["api_boundary_nodes".into()]];
1211        let certificate = self.read_subnet_state_raw(paths, subnet_id).await?;
1212        let api_boundary_nodes = lookup_api_boundary_nodes(certificate)?;
1213        Ok(api_boundary_nodes)
1214    }
1215
1216    async fn fetch_subnet_by_canister(
1217        &self,
1218        canister: &Principal,
1219    ) -> Result<Arc<Subnet>, AgentError> {
1220        let cert = self
1221            .read_state_raw(vec![vec!["subnet".into()]], *canister)
1222            .await?;
1223
1224        let (subnet_id, subnet) = lookup_subnet(&cert, &self.root_key.read().unwrap())?;
1225        let subnet = Arc::new(subnet);
1226        self.subnet_key_cache
1227            .lock()
1228            .unwrap()
1229            .insert_subnet(subnet_id, subnet.clone());
1230        Ok(subnet)
1231    }
1232
1233    async fn request(
1234        &self,
1235        method: Method,
1236        endpoint: &str,
1237        body: Option<Vec<u8>>,
1238    ) -> Result<(StatusCode, HeaderMap, Vec<u8>), AgentError> {
1239        let create_request_with_generated_url = || -> Result<Request, AgentError> {
1240            let url = self.route_provider.route()?.join(endpoint)?;
1241            let mut http_request = Request::new(method.clone(), url);
1242            http_request
1243                .headers_mut()
1244                .insert(CONTENT_TYPE, "application/cbor".parse().unwrap());
1245            *http_request.body_mut() = body.clone().map(Body::from);
1246            Ok(http_request)
1247        };
1248
1249        let response = self
1250            .client
1251            .call(
1252                &create_request_with_generated_url,
1253                self.max_tcp_error_retries,
1254            )
1255            .await?;
1256
1257        let http_status = response.status();
1258        let response_headers = response.headers().clone();
1259
1260        // Size Check (Content-Length)
1261        if matches!(self
1262            .max_response_body_size
1263            .zip(response.content_length()), Some((size_limit, content_length)) if content_length > size_limit as u64)
1264        {
1265            return Err(AgentError::ResponseSizeExceededLimit());
1266        }
1267
1268        let mut body: Vec<u8> = response
1269            .content_length()
1270            .map_or_else(Vec::new, |n| Vec::with_capacity(n as usize));
1271
1272        let mut stream = response.bytes_stream();
1273
1274        while let Some(chunk) = stream.next().await {
1275            let chunk = chunk?;
1276
1277            // Size Check (Body Size)
1278            if matches!(self
1279                .max_response_body_size, Some(size_limit) if body.len() + chunk.len() > size_limit)
1280            {
1281                return Err(AgentError::ResponseSizeExceededLimit());
1282            }
1283
1284            body.extend_from_slice(chunk.as_ref());
1285        }
1286
1287        Ok((http_status, response_headers, body))
1288    }
1289
1290    async fn execute(
1291        &self,
1292        method: Method,
1293        endpoint: &str,
1294        body: Option<Vec<u8>>,
1295    ) -> Result<(StatusCode, Vec<u8>), AgentError> {
1296        let request_result = self.request(method.clone(), endpoint, body.clone()).await?;
1297
1298        let status = request_result.0;
1299        let headers = request_result.1;
1300        let body = request_result.2;
1301
1302        if status.is_client_error() || status.is_server_error() {
1303            Err(AgentError::HttpError(HttpErrorPayload {
1304                status: status.into(),
1305                content_type: headers
1306                    .get(CONTENT_TYPE)
1307                    .and_then(|value| value.to_str().ok())
1308                    .map(str::to_string),
1309                content: body,
1310            }))
1311        } else if !(status == StatusCode::OK || status == StatusCode::ACCEPTED) {
1312            Err(AgentError::InvalidHttpResponse(format!(
1313                "Expected `200`, `202`, 4xx`, or `5xx` HTTP status code. Got: {status}",
1314            )))
1315        } else {
1316            Ok((status, body))
1317        }
1318    }
1319}
1320
1321// Checks if a principal is contained within a list of principal ranges
1322// A range is a tuple: (low: Principal, high: Principal), as described here: https://internetcomputer.org/docs/current/references/ic-interface-spec#state-tree-subnet
1323fn principal_is_within_ranges(principal: &Principal, ranges: &[(Principal, Principal)]) -> bool {
1324    ranges
1325        .iter()
1326        .any(|r| principal >= &r.0 && principal <= &r.1)
1327}
1328
1329fn sign_envelope(
1330    content: &EnvelopeContent,
1331    identity: Arc<dyn Identity>,
1332) -> Result<Vec<u8>, AgentError> {
1333    let signature = identity.sign(content).map_err(AgentError::SigningError)?;
1334
1335    let envelope = Envelope {
1336        content: Cow::Borrowed(content),
1337        sender_pubkey: signature.public_key,
1338        sender_sig: signature.signature,
1339        sender_delegation: signature.delegations,
1340    };
1341
1342    let mut serialized_bytes = Vec::new();
1343    let mut serializer = serde_cbor::Serializer::new(&mut serialized_bytes);
1344    serializer.self_describe()?;
1345    envelope.serialize(&mut serializer)?;
1346
1347    Ok(serialized_bytes)
1348}
1349
1350/// Inspect the bytes to be sent as a query
1351/// Return Ok only when the bytes can be deserialized as a query and all fields match with the arguments
1352pub fn signed_query_inspect(
1353    sender: Principal,
1354    canister_id: Principal,
1355    method_name: &str,
1356    arg: &[u8],
1357    ingress_expiry: u64,
1358    signed_query: Vec<u8>,
1359) -> Result<(), AgentError> {
1360    let envelope: Envelope =
1361        serde_cbor::from_slice(&signed_query).map_err(AgentError::InvalidCborData)?;
1362    match envelope.content.as_ref() {
1363        EnvelopeContent::Query {
1364            ingress_expiry: ingress_expiry_cbor,
1365            sender: sender_cbor,
1366            canister_id: canister_id_cbor,
1367            method_name: method_name_cbor,
1368            arg: arg_cbor,
1369            nonce: _nonce,
1370        } => {
1371            if ingress_expiry != *ingress_expiry_cbor {
1372                return Err(AgentError::CallDataMismatch {
1373                    field: "ingress_expiry".to_string(),
1374                    value_arg: ingress_expiry.to_string(),
1375                    value_cbor: ingress_expiry_cbor.to_string(),
1376                });
1377            }
1378            if sender != *sender_cbor {
1379                return Err(AgentError::CallDataMismatch {
1380                    field: "sender".to_string(),
1381                    value_arg: sender.to_string(),
1382                    value_cbor: sender_cbor.to_string(),
1383                });
1384            }
1385            if canister_id != *canister_id_cbor {
1386                return Err(AgentError::CallDataMismatch {
1387                    field: "canister_id".to_string(),
1388                    value_arg: canister_id.to_string(),
1389                    value_cbor: canister_id_cbor.to_string(),
1390                });
1391            }
1392            if method_name != *method_name_cbor {
1393                return Err(AgentError::CallDataMismatch {
1394                    field: "method_name".to_string(),
1395                    value_arg: method_name.to_string(),
1396                    value_cbor: method_name_cbor.clone(),
1397                });
1398            }
1399            if arg != *arg_cbor {
1400                return Err(AgentError::CallDataMismatch {
1401                    field: "arg".to_string(),
1402                    value_arg: format!("{arg:?}"),
1403                    value_cbor: format!("{arg_cbor:?}"),
1404                });
1405            }
1406        }
1407        EnvelopeContent::Call { .. } => {
1408            return Err(AgentError::CallDataMismatch {
1409                field: "request_type".to_string(),
1410                value_arg: "query".to_string(),
1411                value_cbor: "call".to_string(),
1412            })
1413        }
1414        EnvelopeContent::ReadState { .. } => {
1415            return Err(AgentError::CallDataMismatch {
1416                field: "request_type".to_string(),
1417                value_arg: "query".to_string(),
1418                value_cbor: "read_state".to_string(),
1419            })
1420        }
1421    }
1422    Ok(())
1423}
1424
1425/// Inspect the bytes to be sent as an update
1426/// Return Ok only when the bytes can be deserialized as an update and all fields match with the arguments
1427pub fn signed_update_inspect(
1428    sender: Principal,
1429    canister_id: Principal,
1430    method_name: &str,
1431    arg: &[u8],
1432    ingress_expiry: u64,
1433    signed_update: Vec<u8>,
1434) -> Result<(), AgentError> {
1435    let envelope: Envelope =
1436        serde_cbor::from_slice(&signed_update).map_err(AgentError::InvalidCborData)?;
1437    match envelope.content.as_ref() {
1438        EnvelopeContent::Call {
1439            nonce: _nonce,
1440            ingress_expiry: ingress_expiry_cbor,
1441            sender: sender_cbor,
1442            canister_id: canister_id_cbor,
1443            method_name: method_name_cbor,
1444            arg: arg_cbor,
1445        } => {
1446            if ingress_expiry != *ingress_expiry_cbor {
1447                return Err(AgentError::CallDataMismatch {
1448                    field: "ingress_expiry".to_string(),
1449                    value_arg: ingress_expiry.to_string(),
1450                    value_cbor: ingress_expiry_cbor.to_string(),
1451                });
1452            }
1453            if sender != *sender_cbor {
1454                return Err(AgentError::CallDataMismatch {
1455                    field: "sender".to_string(),
1456                    value_arg: sender.to_string(),
1457                    value_cbor: sender_cbor.to_string(),
1458                });
1459            }
1460            if canister_id != *canister_id_cbor {
1461                return Err(AgentError::CallDataMismatch {
1462                    field: "canister_id".to_string(),
1463                    value_arg: canister_id.to_string(),
1464                    value_cbor: canister_id_cbor.to_string(),
1465                });
1466            }
1467            if method_name != *method_name_cbor {
1468                return Err(AgentError::CallDataMismatch {
1469                    field: "method_name".to_string(),
1470                    value_arg: method_name.to_string(),
1471                    value_cbor: method_name_cbor.clone(),
1472                });
1473            }
1474            if arg != *arg_cbor {
1475                return Err(AgentError::CallDataMismatch {
1476                    field: "arg".to_string(),
1477                    value_arg: format!("{arg:?}"),
1478                    value_cbor: format!("{arg_cbor:?}"),
1479                });
1480            }
1481        }
1482        EnvelopeContent::ReadState { .. } => {
1483            return Err(AgentError::CallDataMismatch {
1484                field: "request_type".to_string(),
1485                value_arg: "call".to_string(),
1486                value_cbor: "read_state".to_string(),
1487            })
1488        }
1489        EnvelopeContent::Query { .. } => {
1490            return Err(AgentError::CallDataMismatch {
1491                field: "request_type".to_string(),
1492                value_arg: "call".to_string(),
1493                value_cbor: "query".to_string(),
1494            })
1495        }
1496    }
1497    Ok(())
1498}
1499
1500/// Inspect the bytes to be sent as a `request_status`
1501/// Return Ok only when the bytes can be deserialized as a `request_status` and all fields match with the arguments
1502pub fn signed_request_status_inspect(
1503    sender: Principal,
1504    request_id: &RequestId,
1505    ingress_expiry: u64,
1506    signed_request_status: Vec<u8>,
1507) -> Result<(), AgentError> {
1508    let paths: Vec<Vec<Label>> = vec![vec!["request_status".into(), request_id.to_vec().into()]];
1509    let envelope: Envelope =
1510        serde_cbor::from_slice(&signed_request_status).map_err(AgentError::InvalidCborData)?;
1511    match envelope.content.as_ref() {
1512        EnvelopeContent::ReadState {
1513            ingress_expiry: ingress_expiry_cbor,
1514            sender: sender_cbor,
1515            paths: paths_cbor,
1516        } => {
1517            if ingress_expiry != *ingress_expiry_cbor {
1518                return Err(AgentError::CallDataMismatch {
1519                    field: "ingress_expiry".to_string(),
1520                    value_arg: ingress_expiry.to_string(),
1521                    value_cbor: ingress_expiry_cbor.to_string(),
1522                });
1523            }
1524            if sender != *sender_cbor {
1525                return Err(AgentError::CallDataMismatch {
1526                    field: "sender".to_string(),
1527                    value_arg: sender.to_string(),
1528                    value_cbor: sender_cbor.to_string(),
1529                });
1530            }
1531
1532            if paths != *paths_cbor {
1533                return Err(AgentError::CallDataMismatch {
1534                    field: "paths".to_string(),
1535                    value_arg: format!("{paths:?}"),
1536                    value_cbor: format!("{paths_cbor:?}"),
1537                });
1538            }
1539        }
1540        EnvelopeContent::Query { .. } => {
1541            return Err(AgentError::CallDataMismatch {
1542                field: "request_type".to_string(),
1543                value_arg: "read_state".to_string(),
1544                value_cbor: "query".to_string(),
1545            })
1546        }
1547        EnvelopeContent::Call { .. } => {
1548            return Err(AgentError::CallDataMismatch {
1549                field: "request_type".to_string(),
1550                value_arg: "read_state".to_string(),
1551                value_cbor: "call".to_string(),
1552            })
1553        }
1554    }
1555    Ok(())
1556}
1557
1558#[derive(Clone)]
1559struct SubnetCache {
1560    subnets: TimedCache<Principal, Arc<Subnet>>,
1561    canister_index: RangeInclusiveMap<Principal, Principal, PrincipalStep>,
1562}
1563
1564impl SubnetCache {
1565    fn new() -> Self {
1566        Self {
1567            subnets: TimedCache::with_lifespan(300),
1568            canister_index: RangeInclusiveMap::new_with_step_fns(),
1569        }
1570    }
1571
1572    fn get_subnet_by_canister(&mut self, canister: &Principal) -> Option<Arc<Subnet>> {
1573        self.canister_index
1574            .get(canister)
1575            .and_then(|subnet_id| self.subnets.cache_get(subnet_id).cloned())
1576            .filter(|subnet| subnet.canister_ranges.contains(canister))
1577    }
1578
1579    fn insert_subnet(&mut self, subnet_id: Principal, subnet: Arc<Subnet>) {
1580        self.subnets.cache_set(subnet_id, subnet.clone());
1581        for range in subnet.canister_ranges.iter() {
1582            self.canister_index.insert(range.clone(), subnet_id);
1583        }
1584    }
1585}
1586
1587#[derive(Clone, Copy)]
1588struct PrincipalStep;
1589
1590impl StepFns<Principal> for PrincipalStep {
1591    fn add_one(start: &Principal) -> Principal {
1592        let bytes = start.as_slice();
1593        let mut arr = [0; 29];
1594        arr[..bytes.len()].copy_from_slice(bytes);
1595        for byte in arr[..bytes.len() - 1].iter_mut().rev() {
1596            *byte = byte.wrapping_add(1);
1597            if *byte != 0 {
1598                break;
1599            }
1600        }
1601        Principal::from_slice(&arr[..bytes.len()])
1602    }
1603    fn sub_one(start: &Principal) -> Principal {
1604        let bytes = start.as_slice();
1605        let mut arr = [0; 29];
1606        arr[..bytes.len()].copy_from_slice(bytes);
1607        for byte in arr[..bytes.len() - 1].iter_mut().rev() {
1608            *byte = byte.wrapping_sub(1);
1609            if *byte != 255 {
1610                break;
1611            }
1612        }
1613        Principal::from_slice(&arr[..bytes.len()])
1614    }
1615}
1616
1617#[derive(Clone)]
1618pub(crate) struct Subnet {
1619    // This key is just fetched for completeness. Do not actually use this value as it is not authoritative in case of a rogue subnet.
1620    // If a future agent needs to know the subnet key then it should fetch /subnet from the *root* subnet.
1621    _key: Vec<u8>,
1622    node_keys: HashMap<Principal, Vec<u8>>,
1623    canister_ranges: RangeInclusiveSet<Principal, PrincipalStep>,
1624}
1625
1626/// API boundary node, which routes /api calls to IC replica nodes.
1627#[derive(Debug, Clone)]
1628pub struct ApiBoundaryNode {
1629    /// Domain name
1630    pub domain: String,
1631    /// IPv6 address in the hexadecimal notation with colons.
1632    pub ipv6_address: String,
1633    /// IPv4 address in the dotted-decimal notation.
1634    pub ipv4_address: Option<String>,
1635}
1636
1637/// A query request builder.
1638///
1639/// This makes it easier to do query calls without actually passing all arguments.
1640#[derive(Debug, Clone)]
1641#[non_exhaustive]
1642pub struct QueryBuilder<'agent> {
1643    agent: &'agent Agent,
1644    /// The [effective canister ID](https://internetcomputer.org/docs/current/references/ic-interface-spec#http-effective-canister-id) of the destination.
1645    pub effective_canister_id: Principal,
1646    /// The principal ID of the canister being called.
1647    pub canister_id: Principal,
1648    /// The name of the canister method being called.
1649    pub method_name: String,
1650    /// The argument blob to be passed to the method.
1651    pub arg: Vec<u8>,
1652    /// The Unix timestamp that the request will expire at.
1653    pub ingress_expiry_datetime: Option<u64>,
1654    /// Whether to include a nonce with the message.
1655    pub use_nonce: bool,
1656}
1657
1658impl<'agent> QueryBuilder<'agent> {
1659    /// Creates a new query builder with an agent for a particular canister method.
1660    pub fn new(agent: &'agent Agent, canister_id: Principal, method_name: String) -> Self {
1661        Self {
1662            agent,
1663            effective_canister_id: canister_id,
1664            canister_id,
1665            method_name,
1666            arg: vec![],
1667            ingress_expiry_datetime: None,
1668            use_nonce: false,
1669        }
1670    }
1671
1672    /// Sets the [effective canister ID](https://internetcomputer.org/docs/current/references/ic-interface-spec#http-effective-canister-id) of the destination.
1673    pub fn with_effective_canister_id(mut self, canister_id: Principal) -> Self {
1674        self.effective_canister_id = canister_id;
1675        self
1676    }
1677
1678    /// Sets the argument blob to pass to the canister. For most canisters this should be a Candid-serialized tuple.
1679    pub fn with_arg<A: Into<Vec<u8>>>(mut self, arg: A) -> Self {
1680        self.arg = arg.into();
1681        self
1682    }
1683
1684    /// Sets `ingress_expiry_datetime` to the provided timestamp, at nanosecond precision.
1685    pub fn expire_at(mut self, time: impl Into<OffsetDateTime>) -> Self {
1686        self.ingress_expiry_datetime = Some(time.into().unix_timestamp_nanos() as u64);
1687        self
1688    }
1689
1690    /// Sets `ingress_expiry_datetime` to `max(now, 4min)`.
1691    pub fn expire_after(mut self, duration: Duration) -> Self {
1692        self.ingress_expiry_datetime = Some(
1693            OffsetDateTime::now_utc()
1694                .saturating_add(duration.try_into().expect("negative duration"))
1695                .unix_timestamp_nanos() as u64,
1696        );
1697        self
1698    }
1699
1700    /// Uses a nonce generated with the agent's configured nonce factory. By default queries do not use nonces,
1701    /// and thus may get a (briefly) cached response.
1702    pub fn with_nonce_generation(mut self) -> Self {
1703        self.use_nonce = true;
1704        self
1705    }
1706
1707    /// Make a query call. This will return a byte vector.
1708    pub async fn call(self) -> Result<Vec<u8>, AgentError> {
1709        self.agent
1710            .query_raw(
1711                self.canister_id,
1712                self.effective_canister_id,
1713                self.method_name,
1714                self.arg,
1715                self.ingress_expiry_datetime,
1716                self.use_nonce,
1717                None,
1718            )
1719            .await
1720    }
1721
1722    /// Make a query call with signature verification. This will return a byte vector.
1723    ///
1724    /// Compared with [call][Self::call], this method will **always** verify the signature of the query response
1725    /// regardless the Agent level configuration from [`AgentBuilder::with_verify_query_signatures`].
1726    pub async fn call_with_verification(self) -> Result<Vec<u8>, AgentError> {
1727        self.agent
1728            .query_raw(
1729                self.canister_id,
1730                self.effective_canister_id,
1731                self.method_name,
1732                self.arg,
1733                self.ingress_expiry_datetime,
1734                self.use_nonce,
1735                Some(true),
1736            )
1737            .await
1738    }
1739
1740    /// Make a query call without signature verification. This will return a byte vector.
1741    ///
1742    /// Compared with [call][Self::call], this method will **never** verify the signature of the query response
1743    /// regardless the Agent level configuration from [`AgentBuilder::with_verify_query_signatures`].
1744    pub async fn call_without_verification(self) -> Result<Vec<u8>, AgentError> {
1745        self.agent
1746            .query_raw(
1747                self.canister_id,
1748                self.effective_canister_id,
1749                self.method_name,
1750                self.arg,
1751                self.ingress_expiry_datetime,
1752                self.use_nonce,
1753                Some(false),
1754            )
1755            .await
1756    }
1757
1758    /// Sign a query call. This will return a [`signed::SignedQuery`]
1759    /// which contains all fields of the query and the signed query in CBOR encoding
1760    pub fn sign(self) -> Result<SignedQuery, AgentError> {
1761        let effective_canister_id = self.effective_canister_id;
1762        let identity = self.agent.identity.clone();
1763        let content = self.into_envelope()?;
1764        let signed_query = sign_envelope(&content, identity)?;
1765        let EnvelopeContent::Query {
1766            ingress_expiry,
1767            sender,
1768            canister_id,
1769            method_name,
1770            arg,
1771            nonce,
1772        } = content
1773        else {
1774            unreachable!()
1775        };
1776        Ok(SignedQuery {
1777            ingress_expiry,
1778            sender,
1779            canister_id,
1780            method_name,
1781            arg,
1782            effective_canister_id,
1783            signed_query,
1784            nonce,
1785        })
1786    }
1787
1788    /// Converts the query builder into [`EnvelopeContent`] for external signing or storage.
1789    pub fn into_envelope(self) -> Result<EnvelopeContent, AgentError> {
1790        self.agent.query_content(
1791            self.canister_id,
1792            self.method_name,
1793            self.arg,
1794            self.ingress_expiry_datetime,
1795            self.use_nonce,
1796        )
1797    }
1798}
1799
1800impl<'agent> IntoFuture for QueryBuilder<'agent> {
1801    type IntoFuture = AgentFuture<'agent, Vec<u8>>;
1802    type Output = Result<Vec<u8>, AgentError>;
1803    fn into_future(self) -> Self::IntoFuture {
1804        Box::pin(self.call())
1805    }
1806}
1807
1808/// An in-flight canister update call. Useful primarily as a `Future`.
1809pub struct UpdateCall<'agent> {
1810    agent: &'agent Agent,
1811    response_future: AgentFuture<'agent, CallResponse<(Vec<u8>, Certificate)>>,
1812    effective_canister_id: Principal,
1813    canister_id: Principal,
1814    method_name: String,
1815}
1816
1817impl fmt::Debug for UpdateCall<'_> {
1818    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1819        f.debug_struct("UpdateCall")
1820            .field("agent", &self.agent)
1821            .field("effective_canister_id", &self.effective_canister_id)
1822            .finish_non_exhaustive()
1823    }
1824}
1825
1826impl Future for UpdateCall<'_> {
1827    type Output = Result<CallResponse<(Vec<u8>, Certificate)>, AgentError>;
1828    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1829        self.response_future.as_mut().poll(cx)
1830    }
1831}
1832
1833impl<'a> UpdateCall<'a> {
1834    /// Waits for the update call to be completed, polling if necessary.
1835    pub async fn and_wait(self) -> Result<(Vec<u8>, Certificate), AgentError> {
1836        let response = self.response_future.await?;
1837
1838        match response {
1839            CallResponse::Response(response) => Ok(response),
1840            CallResponse::Poll(request_id) => {
1841                self.agent
1842                    .wait_inner(
1843                        &request_id,
1844                        self.effective_canister_id,
1845                        Some(Operation::Call {
1846                            canister: self.canister_id,
1847                            method: self.method_name,
1848                        }),
1849                    )
1850                    .await
1851            }
1852        }
1853    }
1854}
1855/// An update request Builder.
1856///
1857/// This makes it easier to do update calls without actually passing all arguments or specifying
1858/// if you want to wait or not.
1859#[derive(Debug)]
1860pub struct UpdateBuilder<'agent> {
1861    agent: &'agent Agent,
1862    /// The [effective canister ID](https://internetcomputer.org/docs/current/references/ic-interface-spec#http-effective-canister-id) of the destination.
1863    pub effective_canister_id: Principal,
1864    /// The principal ID of the canister being called.
1865    pub canister_id: Principal,
1866    /// The name of the canister method being called.
1867    pub method_name: String,
1868    /// The argument blob to be passed to the method.
1869    pub arg: Vec<u8>,
1870    /// The Unix timestamp that the request will expire at.
1871    pub ingress_expiry_datetime: Option<u64>,
1872}
1873
1874impl<'agent> UpdateBuilder<'agent> {
1875    /// Creates a new update builder with an agent for a particular canister method.
1876    pub fn new(agent: &'agent Agent, canister_id: Principal, method_name: String) -> Self {
1877        Self {
1878            agent,
1879            effective_canister_id: canister_id,
1880            canister_id,
1881            method_name,
1882            arg: vec![],
1883            ingress_expiry_datetime: None,
1884        }
1885    }
1886
1887    /// Sets the [effective canister ID](https://internetcomputer.org/docs/current/references/ic-interface-spec#http-effective-canister-id) of the destination.
1888    pub fn with_effective_canister_id(mut self, canister_id: Principal) -> Self {
1889        self.effective_canister_id = canister_id;
1890        self
1891    }
1892
1893    /// Sets the argument blob to pass to the canister. For most canisters this should be a Candid-serialized tuple.
1894    pub fn with_arg<A: Into<Vec<u8>>>(mut self, arg: A) -> Self {
1895        self.arg = arg.into();
1896        self
1897    }
1898
1899    /// Sets `ingress_expiry_datetime` to the provided timestamp, at nanosecond precision.
1900    pub fn expire_at(mut self, time: impl Into<OffsetDateTime>) -> Self {
1901        self.ingress_expiry_datetime = Some(time.into().unix_timestamp_nanos() as u64);
1902        self
1903    }
1904
1905    /// Sets `ingress_expiry_datetime` to `min(now, 4min)`.
1906    pub fn expire_after(mut self, duration: Duration) -> Self {
1907        self.ingress_expiry_datetime = Some(
1908            OffsetDateTime::now_utc()
1909                .saturating_add(duration.try_into().expect("negative duration"))
1910                .unix_timestamp_nanos() as u64,
1911        );
1912        self
1913    }
1914
1915    /// Make an update call. This will call `request_status` on the `RequestId` in a loop and return
1916    /// the response as a byte vector.
1917    pub async fn call_and_wait(self) -> Result<Vec<u8>, AgentError> {
1918        self.call().and_wait().await.map(|x| x.0)
1919    }
1920
1921    /// Make an update call. This will return a `RequestId`.
1922    /// The `RequestId` should then be used for `request_status` (most likely in a loop).
1923    pub fn call(self) -> UpdateCall<'agent> {
1924        let method_name = self.method_name.clone();
1925        let response_future = async move {
1926            self.agent
1927                .update_raw(
1928                    self.canister_id,
1929                    self.effective_canister_id,
1930                    self.method_name,
1931                    self.arg,
1932                    self.ingress_expiry_datetime,
1933                )
1934                .await
1935        };
1936        UpdateCall {
1937            agent: self.agent,
1938            response_future: Box::pin(response_future),
1939            effective_canister_id: self.effective_canister_id,
1940            canister_id: self.canister_id,
1941            method_name,
1942        }
1943    }
1944
1945    /// Sign a update call. This will return a [`signed::SignedUpdate`]
1946    /// which contains all fields of the update and the signed update in CBOR encoding
1947    pub fn sign(self) -> Result<SignedUpdate, AgentError> {
1948        let identity = self.agent.identity.clone();
1949        let effective_canister_id = self.effective_canister_id;
1950        let content = self.into_envelope()?;
1951        let signed_update = sign_envelope(&content, identity)?;
1952        let request_id = to_request_id(&content)?;
1953        let EnvelopeContent::Call {
1954            nonce,
1955            ingress_expiry,
1956            sender,
1957            canister_id,
1958            method_name,
1959            arg,
1960        } = content
1961        else {
1962            unreachable!()
1963        };
1964        Ok(SignedUpdate {
1965            nonce,
1966            ingress_expiry,
1967            sender,
1968            canister_id,
1969            method_name,
1970            arg,
1971            effective_canister_id,
1972            signed_update,
1973            request_id,
1974        })
1975    }
1976
1977    /// Converts the update builder into an [`EnvelopeContent`] for external signing or storage.
1978    pub fn into_envelope(self) -> Result<EnvelopeContent, AgentError> {
1979        let nonce = self.agent.nonce_factory.generate();
1980        self.agent.update_content(
1981            self.canister_id,
1982            self.method_name,
1983            self.arg,
1984            self.ingress_expiry_datetime,
1985            nonce,
1986        )
1987    }
1988}
1989
1990impl<'agent> IntoFuture for UpdateBuilder<'agent> {
1991    type IntoFuture = AgentFuture<'agent, Vec<u8>>;
1992    type Output = Result<Vec<u8>, AgentError>;
1993    fn into_future(self) -> Self::IntoFuture {
1994        Box::pin(self.call_and_wait())
1995    }
1996}
1997
1998/// HTTP client middleware. Implemented automatically for `reqwest`-compatible by-ref `tower::Service`, such as `reqwest_middleware`.
1999#[cfg_attr(target_family = "wasm", async_trait(?Send))]
2000#[cfg_attr(not(target_family = "wasm"), async_trait)]
2001pub trait HttpService: Send + Sync + Debug {
2002    /// Perform a HTTP request. Any retry logic should call `req` again, instead of `Request::try_clone`.
2003    async fn call<'a>(
2004        &'a self,
2005        req: &'a (dyn Fn() -> Result<Request, AgentError> + Send + Sync),
2006        max_retries: usize,
2007    ) -> Result<Response, AgentError>;
2008}
2009#[cfg(not(target_family = "wasm"))]
2010#[async_trait]
2011impl<T> HttpService for T
2012where
2013    for<'a> &'a T: Service<Request, Response = Response, Error = reqwest::Error>,
2014    for<'a> <&'a Self as Service<Request>>::Future: Send,
2015    T: Send + Sync + Debug + ?Sized,
2016{
2017    #[allow(clippy::needless_arbitrary_self_type)]
2018    async fn call<'a>(
2019        mut self: &'a Self,
2020        req: &'a (dyn Fn() -> Result<Request, AgentError> + Send + Sync),
2021        max_retries: usize,
2022    ) -> Result<Response, AgentError> {
2023        let mut retry_count = 0;
2024        loop {
2025            match Service::call(&mut self, req()?).await {
2026                Err(err) => {
2027                    // Network-related errors can be retried.
2028                    if err.is_connect() {
2029                        if retry_count >= max_retries {
2030                            return Err(AgentError::TransportError(err));
2031                        }
2032                        retry_count += 1;
2033                    }
2034                }
2035                Ok(resp) => return Ok(resp),
2036            }
2037        }
2038    }
2039}
2040
2041#[cfg(target_family = "wasm")]
2042#[async_trait(?Send)]
2043impl<T> HttpService for T
2044where
2045    for<'a> &'a T: Service<Request, Response = Response, Error = reqwest::Error>,
2046    T: Send + Sync + Debug + ?Sized,
2047{
2048    #[allow(clippy::needless_arbitrary_self_type)]
2049    async fn call<'a>(
2050        mut self: &'a Self,
2051        req: &'a (dyn Fn() -> Result<Request, AgentError> + Send + Sync),
2052        _: usize,
2053    ) -> Result<Response, AgentError> {
2054        Ok(Service::call(&mut self, req()?).await?)
2055    }
2056}
2057
2058#[derive(Debug)]
2059struct Retry429Logic {
2060    client: Client,
2061}
2062
2063#[cfg_attr(target_family = "wasm", async_trait(?Send))]
2064#[cfg_attr(not(target_family = "wasm"), async_trait)]
2065impl HttpService for Retry429Logic {
2066    async fn call<'a>(
2067        &'a self,
2068        req: &'a (dyn Fn() -> Result<Request, AgentError> + Send + Sync),
2069        _max_tcp_retries: usize,
2070    ) -> Result<Response, AgentError> {
2071        let mut retries = 0;
2072        loop {
2073            #[cfg(not(target_family = "wasm"))]
2074            let resp = self.client.call(req, _max_tcp_retries).await?;
2075            // Client inconveniently does not implement Service on wasm
2076            #[cfg(target_family = "wasm")]
2077            let resp = self.client.execute(req()?).await?;
2078            if resp.status() == StatusCode::TOO_MANY_REQUESTS {
2079                if retries == 6 {
2080                    break Ok(resp);
2081                } else {
2082                    retries += 1;
2083                    crate::util::sleep(Duration::from_millis(250)).await;
2084                    continue;
2085                }
2086            } else {
2087                break Ok(resp);
2088            }
2089        }
2090    }
2091}
2092
2093#[cfg(all(test, not(target_family = "wasm")))]
2094mod offline_tests {
2095    use super::*;
2096    use tokio::net::TcpListener;
2097    // Any tests that involve the network should go in agent_test, not here.
2098
2099    #[test]
2100    fn rounded_expiry() {
2101        let agent = Agent::builder()
2102            .with_url("http://not-a-real-url")
2103            .build()
2104            .unwrap();
2105        let mut prev_expiry = None;
2106        let mut num_timestamps = 0;
2107        for _ in 0..6 {
2108            let update = agent
2109                .update(&Principal::management_canister(), "not_a_method")
2110                .sign()
2111                .unwrap();
2112            if prev_expiry < Some(update.ingress_expiry) {
2113                prev_expiry = Some(update.ingress_expiry);
2114                num_timestamps += 1;
2115            }
2116        }
2117        // in six requests, there should be no more than two timestamps
2118        assert!(num_timestamps <= 2, "num_timestamps:{num_timestamps} > 2");
2119    }
2120
2121    #[tokio::test]
2122    async fn client_ratelimit() {
2123        let mock_server = TcpListener::bind("127.0.0.1:0").await.unwrap();
2124        let count = Arc::new(Mutex::new(0));
2125        let port = mock_server.local_addr().unwrap().port();
2126        tokio::spawn({
2127            let count = count.clone();
2128            async move {
2129                loop {
2130                    let (mut conn, _) = mock_server.accept().await.unwrap();
2131                    *count.lock().unwrap() += 1;
2132                    tokio::spawn(
2133                        // read all data, never reply
2134                        async move { tokio::io::copy(&mut conn, &mut tokio::io::sink()).await },
2135                    );
2136                }
2137            }
2138        });
2139        let agent = Agent::builder()
2140            .with_http_client(Client::builder().http1_only().build().unwrap())
2141            .with_url(format!("http://127.0.0.1:{port}"))
2142            .with_max_concurrent_requests(2)
2143            .build()
2144            .unwrap();
2145        for _ in 0..3 {
2146            let agent = agent.clone();
2147            tokio::spawn(async move {
2148                agent
2149                    .query(&"ryjl3-tyaaa-aaaaa-aaaba-cai".parse().unwrap(), "greet")
2150                    .call()
2151                    .await
2152            });
2153        }
2154        crate::util::sleep(Duration::from_millis(250)).await;
2155        assert_eq!(*count.lock().unwrap(), 2);
2156    }
2157}