ic_agent/agent/
mod.rs

1//! The main Agent module. Contains the [Agent] type and all associated structures.
2pub(crate) mod agent_config;
3pub mod agent_error;
4pub(crate) mod builder;
5// delete this module after 0.40
6#[doc(hidden)]
7#[deprecated(since = "0.38.0", note = "use the AgentBuilder methods")]
8pub mod http_transport;
9pub(crate) mod nonce;
10pub(crate) mod response_authentication;
11pub mod route_provider;
12pub mod status;
13
14pub use agent_config::AgentConfig;
15pub use agent_error::AgentError;
16use agent_error::{HttpErrorPayload, Operation};
17use async_lock::Semaphore;
18use async_trait::async_trait;
19pub use builder::AgentBuilder;
20use bytes::Bytes;
21use cached::{Cached, TimedCache};
22use http::{header::CONTENT_TYPE, HeaderMap, Method, StatusCode, Uri};
23use ic_ed25519::{PublicKey, SignatureError};
24#[doc(inline)]
25pub use ic_transport_types::{
26    signed, CallResponse, Envelope, EnvelopeContent, RejectCode, RejectResponse, ReplyResponse,
27    RequestStatusResponse,
28};
29pub use nonce::{NonceFactory, NonceGenerator};
30use rangemap::{RangeInclusiveMap, RangeInclusiveSet, StepFns};
31use reqwest::{Client, Request, Response};
32use route_provider::{
33    dynamic_routing::{
34        dynamic_route_provider::DynamicRouteProviderBuilder, node::Node,
35        snapshot::latency_based_routing::LatencyRoutingSnapshot,
36    },
37    RouteProvider, UrlUntilReady,
38};
39use time::OffsetDateTime;
40use tower_service::Service;
41
42#[cfg(test)]
43mod agent_test;
44
45use crate::{
46    agent::response_authentication::{
47        extract_der, lookup_canister_info, lookup_canister_metadata, lookup_request_status,
48        lookup_subnet, lookup_subnet_canister_ranges, lookup_subnet_metrics, lookup_time,
49        lookup_value,
50    },
51    agent_error::TransportError,
52    export::Principal,
53    identity::Identity,
54    to_request_id, RequestId,
55};
56use backoff::{backoff::Backoff, ExponentialBackoffBuilder};
57use backoff::{exponential::ExponentialBackoff, SystemClock};
58use ic_certification::{Certificate, Delegation, Label};
59use ic_transport_types::{
60    signed::{SignedQuery, SignedRequestStatus, SignedUpdate},
61    QueryResponse, ReadStateResponse, SubnetMetrics, TransportCallResponse,
62};
63use serde::Serialize;
64use status::Status;
65use std::{
66    borrow::Cow,
67    collections::HashMap,
68    convert::TryFrom,
69    fmt::{self, Debug},
70    future::{Future, IntoFuture},
71    pin::Pin,
72    str::FromStr,
73    sync::{Arc, Mutex, RwLock},
74    task::{Context, Poll},
75    time::Duration,
76};
77
78use crate::agent::response_authentication::lookup_api_boundary_nodes;
79
80const IC_STATE_ROOT_DOMAIN_SEPARATOR: &[u8; 14] = b"\x0Dic-state-root";
81
82const IC_ROOT_KEY: &[u8; 133] = b"\x30\x81\x82\x30\x1d\x06\x0d\x2b\x06\x01\x04\x01\x82\xdc\x7c\x05\x03\x01\x02\x01\x06\x0c\x2b\x06\x01\x04\x01\x82\xdc\x7c\x05\x03\x02\x01\x03\x61\x00\x81\x4c\x0e\x6e\xc7\x1f\xab\x58\x3b\x08\xbd\x81\x37\x3c\x25\x5c\x3c\x37\x1b\x2e\x84\x86\x3c\x98\xa4\xf1\xe0\x8b\x74\x23\x5d\x14\xfb\x5d\x9c\x0c\xd5\x46\xd9\x68\x5f\x91\x3a\x0c\x0b\x2c\xc5\x34\x15\x83\xbf\x4b\x43\x92\xe4\x67\xdb\x96\xd6\x5b\x9b\xb4\xcb\x71\x71\x12\xf8\x47\x2e\x0d\x5a\x4d\x14\x50\x5f\xfd\x74\x84\xb0\x12\x91\x09\x1c\x5f\x87\xb9\x88\x83\x46\x3f\x98\x09\x1a\x0b\xaa\xae";
83
84#[cfg(not(target_family = "wasm"))]
85type AgentFuture<'a, V> = Pin<Box<dyn Future<Output = Result<V, AgentError>> + Send + 'a>>;
86
87#[cfg(target_family = "wasm")]
88type AgentFuture<'a, V> = Pin<Box<dyn Future<Output = Result<V, AgentError>> + 'a>>;
89
90/// A low level Agent to make calls to a Replica endpoint.
91///
92/// ```ignore
93/// # // This test is ignored because it requires an ic to be running. We run these
94/// # // in the ic-ref workflow.
95/// use ic_agent::{Agent, export::Principal};
96/// use candid::{Encode, Decode, CandidType, Nat};
97/// use serde::Deserialize;
98///
99/// #[derive(CandidType)]
100/// struct Argument {
101///   amount: Option<Nat>,
102/// }
103///
104/// #[derive(CandidType, Deserialize)]
105/// struct CreateCanisterResult {
106///   canister_id: Principal,
107/// }
108///
109/// # fn create_identity() -> impl ic_agent::Identity {
110/// #     // In real code, the raw key should be either read from a pem file or generated with randomness.
111/// #     ic_agent::identity::BasicIdentity::from_raw_key(&[0u8;32])
112/// # }
113/// #
114/// async fn create_a_canister() -> Result<Principal, Box<dyn std::error::Error>> {
115/// # let url = format!("http://localhost:{}", option_env!("IC_REF_PORT").unwrap_or("4943"));
116///   let agent = Agent::builder()
117///     .with_url(url)
118///     .with_identity(create_identity())
119///     .build()?;
120///
121///   // Only do the following call when not contacting the IC main net (e.g. a local emulator).
122///   // This is important as the main net public key is static and a rogue network could return
123///   // a different key.
124///   // If you know the root key ahead of time, you can use `agent.set_root_key(root_key);`.
125///   agent.fetch_root_key().await?;
126///   let management_canister_id = Principal::from_text("aaaaa-aa")?;
127///
128///   // Create a call to the management canister to create a new canister ID,
129///   // and wait for a result.
130///   // The effective canister id must belong to the canister ranges of the subnet at which the canister is created.
131///   let effective_canister_id = Principal::from_text("rwlgt-iiaaa-aaaaa-aaaaa-cai").unwrap();
132///   let response = agent.update(&management_canister_id, "provisional_create_canister_with_cycles")
133///     .with_effective_canister_id(effective_canister_id)
134///     .with_arg(Encode!(&Argument { amount: None })?)
135///     .await?;
136///
137///   let result = Decode!(response.as_slice(), CreateCanisterResult)?;
138///   let canister_id: Principal = Principal::from_text(&result.canister_id.to_text())?;
139///   Ok(canister_id)
140/// }
141///
142/// # let mut runtime = tokio::runtime::Runtime::new().unwrap();
143/// # runtime.block_on(async {
144/// let canister_id = create_a_canister().await.unwrap();
145/// eprintln!("{}", canister_id);
146/// # });
147/// ```
148///
149/// This agent does not understand Candid, and only acts on byte buffers.
150///
151/// Some methods return certificates. While there is a `verify_certificate` method, any certificate
152/// you receive from a method has already been verified and you do not need to manually verify it.
153#[derive(Clone)]
154pub struct Agent {
155    nonce_factory: Arc<dyn NonceGenerator>,
156    identity: Arc<dyn Identity>,
157    ingress_expiry: Duration,
158    root_key: Arc<RwLock<Vec<u8>>>,
159    client: Arc<dyn HttpService>,
160    route_provider: Arc<dyn RouteProvider>,
161    subnet_key_cache: Arc<Mutex<SubnetCache>>,
162    concurrent_requests_semaphore: Arc<Semaphore>,
163    verify_query_signatures: bool,
164    max_response_body_size: Option<usize>,
165    max_polling_time: Duration,
166    #[allow(dead_code)]
167    max_tcp_error_retries: usize,
168}
169
170impl fmt::Debug for Agent {
171    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
172        f.debug_struct("Agent")
173            .field("ingress_expiry", &self.ingress_expiry)
174            .finish_non_exhaustive()
175    }
176}
177
178impl Agent {
179    /// Create an instance of an [`AgentBuilder`] for building an [`Agent`]. This is simpler than
180    /// using the [`AgentConfig`] and [`Agent::new()`].
181    pub fn builder() -> builder::AgentBuilder {
182        Default::default()
183    }
184
185    /// Create an instance of an [`Agent`].
186    pub fn new(config: agent_config::AgentConfig) -> Result<Agent, AgentError> {
187        let client = config.http_service.unwrap_or_else(|| {
188            Arc::new(Retry429Logic {
189                client: config.client.unwrap_or_else(|| {
190                    #[cfg(not(target_family = "wasm"))]
191                    {
192                        Client::builder()
193                            .use_rustls_tls()
194                            .timeout(Duration::from_secs(360))
195                            .build()
196                            .expect("Could not create HTTP client.")
197                    }
198                    #[cfg(all(target_family = "wasm", feature = "wasm-bindgen"))]
199                    {
200                        Client::new()
201                    }
202                }),
203            })
204        });
205        Ok(Agent {
206            nonce_factory: config.nonce_factory,
207            identity: config.identity,
208            ingress_expiry: config.ingress_expiry,
209            root_key: Arc::new(RwLock::new(IC_ROOT_KEY.to_vec())),
210            client: client.clone(),
211            route_provider: if let Some(route_provider) = config.route_provider {
212                route_provider
213            } else if let Some(url) = config.url {
214                if config.background_dynamic_routing {
215                    assert!(
216                        url.scheme() == "https" && url.path() == "/" && url.port().is_none() && url.domain().is_some(),
217                        "in dynamic routing mode, URL must be in the exact form https://domain with no path, port, IP, or non-HTTPS scheme"
218                    );
219                    let seeds = vec![Node::new(url.domain().unwrap()).unwrap()];
220                    UrlUntilReady::new(url, async move {
221                        DynamicRouteProviderBuilder::new(
222                            LatencyRoutingSnapshot::new(),
223                            seeds,
224                            client,
225                        )
226                        .build()
227                        .await
228                    }) as Arc<dyn RouteProvider>
229                } else {
230                    Arc::new(url)
231                }
232            } else {
233                panic!("either route_provider or url must be specified");
234            },
235            subnet_key_cache: Arc::new(Mutex::new(SubnetCache::new())),
236            verify_query_signatures: config.verify_query_signatures,
237            concurrent_requests_semaphore: Arc::new(Semaphore::new(config.max_concurrent_requests)),
238            max_response_body_size: config.max_response_body_size,
239            max_tcp_error_retries: config.max_tcp_error_retries,
240            max_polling_time: config.max_polling_time,
241        })
242    }
243
244    /// Set the identity provider for signing messages.
245    ///
246    /// NOTE: if you change the identity while having update calls in
247    /// flight, you will not be able to [`Agent::request_status_raw`] the status of these
248    /// messages.
249    pub fn set_identity<I>(&mut self, identity: I)
250    where
251        I: 'static + Identity,
252    {
253        self.identity = Arc::new(identity);
254    }
255    /// Set the arc identity provider for signing messages.
256    ///
257    /// NOTE: if you change the identity while having update calls in
258    /// flight, you will not be able to [`Agent::request_status_raw`] the status of these
259    /// messages.
260    pub fn set_arc_identity(&mut self, identity: Arc<dyn Identity>) {
261        self.identity = identity;
262    }
263
264    /// By default, the agent is configured to talk to the main Internet Computer, and verifies
265    /// responses using a hard-coded public key.
266    ///
267    /// This function will instruct the agent to ask the endpoint for its public key, and use
268    /// that instead. This is required when talking to a local test instance, for example.
269    ///
270    /// *Only use this when you are  _not_ talking to the main Internet Computer, otherwise
271    /// you are prone to man-in-the-middle attacks! Do not call this function by default.*
272    pub async fn fetch_root_key(&self) -> Result<(), AgentError> {
273        if self.read_root_key()[..] != IC_ROOT_KEY[..] {
274            // already fetched the root key
275            return Ok(());
276        }
277        let status = self.status().await?;
278        let Some(root_key) = status.root_key else {
279            return Err(AgentError::NoRootKeyInStatus(status));
280        };
281        self.set_root_key(root_key);
282        Ok(())
283    }
284
285    /// By default, the agent is configured to talk to the main Internet Computer, and verifies
286    /// responses using a hard-coded public key.
287    ///
288    /// Using this function you can set the root key to a known one if you know if beforehand.
289    pub fn set_root_key(&self, root_key: Vec<u8>) {
290        *self.root_key.write().unwrap() = root_key;
291    }
292
293    /// Return the root key currently in use.
294    pub fn read_root_key(&self) -> Vec<u8> {
295        self.root_key.read().unwrap().clone()
296    }
297
298    fn get_expiry_date(&self) -> u64 {
299        let expiry_raw = OffsetDateTime::now_utc() + self.ingress_expiry;
300        let mut rounded = expiry_raw.replace_nanosecond(0).unwrap();
301        if self.ingress_expiry.as_secs() > 90 {
302            rounded = rounded.replace_second(0).unwrap();
303        }
304        rounded.unix_timestamp_nanos().try_into().unwrap()
305    }
306
307    /// Return the principal of the identity.
308    pub fn get_principal(&self) -> Result<Principal, String> {
309        self.identity.sender()
310    }
311
312    async fn query_endpoint<A>(
313        &self,
314        effective_canister_id: Principal,
315        serialized_bytes: Vec<u8>,
316    ) -> Result<A, AgentError>
317    where
318        A: serde::de::DeserializeOwned,
319    {
320        let _permit = self.concurrent_requests_semaphore.acquire().await;
321        let bytes = self
322            .execute(
323                Method::POST,
324                &format!("api/v2/canister/{}/query", effective_canister_id.to_text()),
325                Some(serialized_bytes),
326            )
327            .await?
328            .1;
329        serde_cbor::from_slice(&bytes).map_err(AgentError::InvalidCborData)
330    }
331
332    async fn read_state_endpoint<A>(
333        &self,
334        effective_canister_id: Principal,
335        serialized_bytes: Vec<u8>,
336    ) -> Result<A, AgentError>
337    where
338        A: serde::de::DeserializeOwned,
339    {
340        let _permit = self.concurrent_requests_semaphore.acquire().await;
341        let endpoint = format!(
342            "api/v2/canister/{}/read_state",
343            effective_canister_id.to_text()
344        );
345        let bytes = self
346            .execute(Method::POST, &endpoint, Some(serialized_bytes))
347            .await?
348            .1;
349        serde_cbor::from_slice(&bytes).map_err(AgentError::InvalidCborData)
350    }
351
352    async fn read_subnet_state_endpoint<A>(
353        &self,
354        subnet_id: Principal,
355        serialized_bytes: Vec<u8>,
356    ) -> Result<A, AgentError>
357    where
358        A: serde::de::DeserializeOwned,
359    {
360        let _permit = self.concurrent_requests_semaphore.acquire().await;
361        let endpoint = format!("api/v2/subnet/{}/read_state", subnet_id.to_text());
362        let bytes = self
363            .execute(Method::POST, &endpoint, Some(serialized_bytes))
364            .await?
365            .1;
366        serde_cbor::from_slice(&bytes).map_err(AgentError::InvalidCborData)
367    }
368
369    async fn call_endpoint(
370        &self,
371        effective_canister_id: Principal,
372        serialized_bytes: Vec<u8>,
373    ) -> Result<TransportCallResponse, AgentError> {
374        let _permit = self.concurrent_requests_semaphore.acquire().await;
375        let endpoint = format!("api/v3/canister/{}/call", effective_canister_id.to_text());
376        let (status_code, response_body) = self
377            .execute(Method::POST, &endpoint, Some(serialized_bytes))
378            .await?;
379
380        if status_code == StatusCode::ACCEPTED {
381            return Ok(TransportCallResponse::Accepted);
382        }
383
384        serde_cbor::from_slice(&response_body).map_err(AgentError::InvalidCborData)
385    }
386
387    /// The simplest way to do a query call; sends a byte array and will return a byte vector.
388    /// The encoding is left as an exercise to the user.
389    #[allow(clippy::too_many_arguments)]
390    async fn query_raw(
391        &self,
392        canister_id: Principal,
393        effective_canister_id: Principal,
394        method_name: String,
395        arg: Vec<u8>,
396        ingress_expiry_datetime: Option<u64>,
397        use_nonce: bool,
398        explicit_verify_query_signatures: Option<bool>,
399    ) -> Result<Vec<u8>, AgentError> {
400        let operation = Operation::Call {
401            canister: canister_id,
402            method: method_name.clone(),
403        };
404        let content = self.query_content(
405            canister_id,
406            method_name,
407            arg,
408            ingress_expiry_datetime,
409            use_nonce,
410        )?;
411        let serialized_bytes = sign_envelope(&content, self.identity.clone())?;
412        self.query_inner(
413            effective_canister_id,
414            serialized_bytes,
415            content.to_request_id(),
416            explicit_verify_query_signatures,
417            operation,
418        )
419        .await
420    }
421
422    /// Send the signed query to the network. Will return a byte vector.
423    /// The bytes will be checked if it is a valid query.
424    /// If you want to inspect the fields of the query call, use [`signed_query_inspect`] before calling this method.
425    pub async fn query_signed(
426        &self,
427        effective_canister_id: Principal,
428        signed_query: Vec<u8>,
429    ) -> Result<Vec<u8>, AgentError> {
430        let envelope: Envelope =
431            serde_cbor::from_slice(&signed_query).map_err(AgentError::InvalidCborData)?;
432        let EnvelopeContent::Query {
433            canister_id,
434            method_name,
435            ..
436        } = &*envelope.content
437        else {
438            return Err(AgentError::CallDataMismatch {
439                field: "request_type".to_string(),
440                value_arg: "query".to_string(),
441                value_cbor: if matches!(*envelope.content, EnvelopeContent::Call { .. }) {
442                    "update"
443                } else {
444                    "read_state"
445                }
446                .to_string(),
447            });
448        };
449        let operation = Operation::Call {
450            canister: *canister_id,
451            method: method_name.clone(),
452        };
453        self.query_inner(
454            effective_canister_id,
455            signed_query,
456            envelope.content.to_request_id(),
457            None,
458            operation,
459        )
460        .await
461    }
462
463    /// Helper function for performing both the query call and possibly a `read_state` to check the subnet node keys.
464    ///
465    /// This should be used instead of `query_endpoint`. No validation is performed on `signed_query`.
466    async fn query_inner(
467        &self,
468        effective_canister_id: Principal,
469        signed_query: Vec<u8>,
470        request_id: RequestId,
471        explicit_verify_query_signatures: Option<bool>,
472        operation: Operation,
473    ) -> Result<Vec<u8>, AgentError> {
474        let response = if explicit_verify_query_signatures.unwrap_or(self.verify_query_signatures) {
475            let (response, mut subnet) = futures_util::try_join!(
476                self.query_endpoint::<QueryResponse>(effective_canister_id, signed_query),
477                self.get_subnet_by_canister(&effective_canister_id)
478            )?;
479            if response.signatures().is_empty() {
480                return Err(AgentError::MissingSignature);
481            } else if response.signatures().len() > subnet.node_keys.len() {
482                return Err(AgentError::TooManySignatures {
483                    had: response.signatures().len(),
484                    needed: subnet.node_keys.len(),
485                });
486            }
487            for signature in response.signatures() {
488                if OffsetDateTime::now_utc()
489                    - OffsetDateTime::from_unix_timestamp_nanos(signature.timestamp.into()).unwrap()
490                    > self.ingress_expiry
491                {
492                    return Err(AgentError::CertificateOutdated(self.ingress_expiry));
493                }
494                let signable = response.signable(request_id, signature.timestamp);
495                let node_key = if let Some(node_key) = subnet.node_keys.get(&signature.identity) {
496                    node_key
497                } else {
498                    subnet = self
499                        .fetch_subnet_by_canister(&effective_canister_id)
500                        .await?;
501                    subnet
502                        .node_keys
503                        .get(&signature.identity)
504                        .ok_or(AgentError::CertificateNotAuthorized())?
505                };
506                if node_key.len() != 44 {
507                    return Err(AgentError::DerKeyLengthMismatch {
508                        expected: 44,
509                        actual: node_key.len(),
510                    });
511                }
512                const DER_PREFIX: [u8; 12] = [48, 42, 48, 5, 6, 3, 43, 101, 112, 3, 33, 0];
513                if node_key[..12] != DER_PREFIX {
514                    return Err(AgentError::DerPrefixMismatch {
515                        expected: DER_PREFIX.to_vec(),
516                        actual: node_key[..12].to_vec(),
517                    });
518                }
519                let pubkey = PublicKey::deserialize_raw(&node_key[12..])
520                    .map_err(|_| AgentError::MalformedPublicKey)?;
521
522                match pubkey.verify_signature(&signable, &signature.signature[..]) {
523                    Ok(()) => (),
524                    Err(SignatureError::InvalidSignature) => {
525                        return Err(AgentError::QuerySignatureVerificationFailed)
526                    }
527                    Err(SignatureError::InvalidLength) => {
528                        return Err(AgentError::MalformedSignature)
529                    }
530                    _ => unreachable!(),
531                }
532            }
533            response
534        } else {
535            self.query_endpoint::<QueryResponse>(effective_canister_id, signed_query)
536                .await?
537        };
538
539        match response {
540            QueryResponse::Replied { reply, .. } => Ok(reply.arg),
541            QueryResponse::Rejected { reject, .. } => Err(AgentError::UncertifiedReject {
542                reject,
543                operation: Some(operation),
544            }),
545        }
546    }
547
548    fn query_content(
549        &self,
550        canister_id: Principal,
551        method_name: String,
552        arg: Vec<u8>,
553        ingress_expiry_datetime: Option<u64>,
554        use_nonce: bool,
555    ) -> Result<EnvelopeContent, AgentError> {
556        Ok(EnvelopeContent::Query {
557            sender: self.identity.sender().map_err(AgentError::SigningError)?,
558            canister_id,
559            method_name,
560            arg,
561            ingress_expiry: ingress_expiry_datetime.unwrap_or_else(|| self.get_expiry_date()),
562            nonce: use_nonce.then(|| self.nonce_factory.generate()).flatten(),
563        })
564    }
565
566    /// The simplest way to do an update call; sends a byte array and will return a response, [`CallResponse`], from the replica.
567    async fn update_raw(
568        &self,
569        canister_id: Principal,
570        effective_canister_id: Principal,
571        method_name: String,
572        arg: Vec<u8>,
573        ingress_expiry_datetime: Option<u64>,
574    ) -> Result<CallResponse<(Vec<u8>, Certificate)>, AgentError> {
575        let nonce = self.nonce_factory.generate();
576        let content = self.update_content(
577            canister_id,
578            method_name.clone(),
579            arg,
580            ingress_expiry_datetime,
581            nonce,
582        )?;
583        let operation = Some(Operation::Call {
584            canister: canister_id,
585            method: method_name,
586        });
587        let request_id = to_request_id(&content)?;
588        let serialized_bytes = sign_envelope(&content, self.identity.clone())?;
589
590        let response_body = self
591            .call_endpoint(effective_canister_id, serialized_bytes)
592            .await?;
593
594        match response_body {
595            TransportCallResponse::Replied { certificate } => {
596                let certificate =
597                    serde_cbor::from_slice(&certificate).map_err(AgentError::InvalidCborData)?;
598
599                self.verify(&certificate, effective_canister_id)?;
600                let status = lookup_request_status(&certificate, &request_id)?;
601
602                match status {
603                    RequestStatusResponse::Replied(reply) => {
604                        Ok(CallResponse::Response((reply.arg, certificate)))
605                    }
606                    RequestStatusResponse::Rejected(reject_response) => {
607                        Err(AgentError::CertifiedReject {
608                            reject: reject_response,
609                            operation,
610                        })?
611                    }
612                    _ => Ok(CallResponse::Poll(request_id)),
613                }
614            }
615            TransportCallResponse::Accepted => Ok(CallResponse::Poll(request_id)),
616            TransportCallResponse::NonReplicatedRejection(reject_response) => {
617                Err(AgentError::UncertifiedReject {
618                    reject: reject_response,
619                    operation,
620                })
621            }
622        }
623    }
624
625    /// Send the signed update to the network. Will return a [`CallResponse<Vec<u8>>`].
626    /// The bytes will be checked to verify that it is a valid update.
627    /// If you want to inspect the fields of the update, use [`signed_update_inspect`] before calling this method.
628    pub async fn update_signed(
629        &self,
630        effective_canister_id: Principal,
631        signed_update: Vec<u8>,
632    ) -> Result<CallResponse<Vec<u8>>, AgentError> {
633        let envelope: Envelope =
634            serde_cbor::from_slice(&signed_update).map_err(AgentError::InvalidCborData)?;
635        let EnvelopeContent::Call {
636            canister_id,
637            method_name,
638            ..
639        } = &*envelope.content
640        else {
641            return Err(AgentError::CallDataMismatch {
642                field: "request_type".to_string(),
643                value_arg: "update".to_string(),
644                value_cbor: if matches!(*envelope.content, EnvelopeContent::Query { .. }) {
645                    "query"
646                } else {
647                    "read_state"
648                }
649                .to_string(),
650            });
651        };
652        let operation = Some(Operation::Call {
653            canister: *canister_id,
654            method: method_name.clone(),
655        });
656        let request_id = to_request_id(&envelope.content)?;
657
658        let response_body = self
659            .call_endpoint(effective_canister_id, signed_update)
660            .await?;
661
662        match response_body {
663            TransportCallResponse::Replied { certificate } => {
664                let certificate =
665                    serde_cbor::from_slice(&certificate).map_err(AgentError::InvalidCborData)?;
666
667                self.verify(&certificate, effective_canister_id)?;
668                let status = lookup_request_status(&certificate, &request_id)?;
669
670                match status {
671                    RequestStatusResponse::Replied(reply) => Ok(CallResponse::Response(reply.arg)),
672                    RequestStatusResponse::Rejected(reject_response) => {
673                        Err(AgentError::CertifiedReject {
674                            reject: reject_response,
675                            operation,
676                        })?
677                    }
678                    _ => Ok(CallResponse::Poll(request_id)),
679                }
680            }
681            TransportCallResponse::Accepted => Ok(CallResponse::Poll(request_id)),
682            TransportCallResponse::NonReplicatedRejection(reject_response) => {
683                Err(AgentError::UncertifiedReject {
684                    reject: reject_response,
685                    operation,
686                })
687            }
688        }
689    }
690
691    fn update_content(
692        &self,
693        canister_id: Principal,
694        method_name: String,
695        arg: Vec<u8>,
696        ingress_expiry_datetime: Option<u64>,
697        nonce: Option<Vec<u8>>,
698    ) -> Result<EnvelopeContent, AgentError> {
699        Ok(EnvelopeContent::Call {
700            canister_id,
701            method_name,
702            arg,
703            nonce,
704            sender: self.identity.sender().map_err(AgentError::SigningError)?,
705            ingress_expiry: ingress_expiry_datetime.unwrap_or_else(|| self.get_expiry_date()),
706        })
707    }
708
709    fn get_retry_policy(&self) -> ExponentialBackoff<SystemClock> {
710        ExponentialBackoffBuilder::new()
711            .with_initial_interval(Duration::from_millis(500))
712            .with_max_interval(Duration::from_secs(1))
713            .with_multiplier(1.4)
714            .with_max_elapsed_time(Some(self.max_polling_time))
715            .build()
716    }
717
718    /// Wait for `request_status` to return a Replied response and return the arg.
719    pub async fn wait_signed(
720        &self,
721        request_id: &RequestId,
722        effective_canister_id: Principal,
723        signed_request_status: Vec<u8>,
724    ) -> Result<(Vec<u8>, Certificate), AgentError> {
725        let mut retry_policy = self.get_retry_policy();
726
727        let mut request_accepted = false;
728        let (resp, cert) = self
729            .request_status_signed(
730                request_id,
731                effective_canister_id,
732                signed_request_status.clone(),
733            )
734            .await?;
735        loop {
736            match resp {
737                RequestStatusResponse::Unknown => {}
738
739                RequestStatusResponse::Received | RequestStatusResponse::Processing => {
740                    if !request_accepted {
741                        retry_policy.reset();
742                        request_accepted = true;
743                    }
744                }
745
746                RequestStatusResponse::Replied(ReplyResponse { arg, .. }) => {
747                    return Ok((arg, cert))
748                }
749
750                RequestStatusResponse::Rejected(response) => {
751                    return Err(AgentError::CertifiedReject {
752                        reject: response,
753                        operation: None,
754                    })
755                }
756
757                RequestStatusResponse::Done => {
758                    return Err(AgentError::RequestStatusDoneNoReply(String::from(
759                        *request_id,
760                    )))
761                }
762            };
763
764            match retry_policy.next_backoff() {
765                Some(duration) => crate::util::sleep(duration).await,
766
767                None => return Err(AgentError::TimeoutWaitingForResponse()),
768            }
769        }
770    }
771
772    /// Call `request_status` on the `RequestId` in a loop and return the response as a byte vector.
773    pub async fn wait(
774        &self,
775        request_id: &RequestId,
776        effective_canister_id: Principal,
777    ) -> Result<(Vec<u8>, Certificate), AgentError> {
778        self.wait_inner(request_id, effective_canister_id, None)
779            .await
780    }
781
782    async fn wait_inner(
783        &self,
784        request_id: &RequestId,
785        effective_canister_id: Principal,
786        operation: Option<Operation>,
787    ) -> Result<(Vec<u8>, Certificate), AgentError> {
788        let mut retry_policy = self.get_retry_policy();
789
790        let mut request_accepted = false;
791        loop {
792            let (resp, cert) = self
793                .request_status_raw(request_id, effective_canister_id)
794                .await?;
795            match resp {
796                RequestStatusResponse::Unknown => {}
797
798                RequestStatusResponse::Received | RequestStatusResponse::Processing => {
799                    if !request_accepted {
800                        // The system will return RequestStatusResponse::Unknown
801                        // until the request is accepted
802                        // and we generally cannot know how long that will take.
803                        // State transitions between Received and Processing may be
804                        // instantaneous. Therefore, once we know the request is accepted,
805                        // we should restart the backoff so the request does not time out.
806
807                        retry_policy.reset();
808                        request_accepted = true;
809                    }
810                }
811
812                RequestStatusResponse::Replied(ReplyResponse { arg, .. }) => {
813                    return Ok((arg, cert))
814                }
815
816                RequestStatusResponse::Rejected(response) => {
817                    return Err(AgentError::CertifiedReject {
818                        reject: response,
819                        operation,
820                    })
821                }
822
823                RequestStatusResponse::Done => {
824                    return Err(AgentError::RequestStatusDoneNoReply(String::from(
825                        *request_id,
826                    )))
827                }
828            };
829
830            match retry_policy.next_backoff() {
831                Some(duration) => crate::util::sleep(duration).await,
832
833                None => return Err(AgentError::TimeoutWaitingForResponse()),
834            }
835        }
836    }
837
838    /// Request the raw state tree directly, under an effective canister ID.
839    /// See [the protocol docs](https://internetcomputer.org/docs/current/references/ic-interface-spec#http-read-state) for more information.
840    pub async fn read_state_raw(
841        &self,
842        paths: Vec<Vec<Label>>,
843        effective_canister_id: Principal,
844    ) -> Result<Certificate, AgentError> {
845        let content = self.read_state_content(paths)?;
846        let serialized_bytes = sign_envelope(&content, self.identity.clone())?;
847
848        let read_state_response: ReadStateResponse = self
849            .read_state_endpoint(effective_canister_id, serialized_bytes)
850            .await?;
851        let cert: Certificate = serde_cbor::from_slice(&read_state_response.certificate)
852            .map_err(AgentError::InvalidCborData)?;
853        self.verify(&cert, effective_canister_id)?;
854        Ok(cert)
855    }
856
857    /// Request the raw state tree directly, under a subnet ID.
858    /// See [the protocol docs](https://internetcomputer.org/docs/current/references/ic-interface-spec#http-read-state) for more information.
859    pub async fn read_subnet_state_raw(
860        &self,
861        paths: Vec<Vec<Label>>,
862        subnet_id: Principal,
863    ) -> Result<Certificate, AgentError> {
864        let content = self.read_state_content(paths)?;
865        let serialized_bytes = sign_envelope(&content, self.identity.clone())?;
866
867        let read_state_response: ReadStateResponse = self
868            .read_subnet_state_endpoint(subnet_id, serialized_bytes)
869            .await?;
870        let cert: Certificate = serde_cbor::from_slice(&read_state_response.certificate)
871            .map_err(AgentError::InvalidCborData)?;
872        self.verify_for_subnet(&cert, subnet_id)?;
873        Ok(cert)
874    }
875
876    fn read_state_content(&self, paths: Vec<Vec<Label>>) -> Result<EnvelopeContent, AgentError> {
877        Ok(EnvelopeContent::ReadState {
878            sender: self.identity.sender().map_err(AgentError::SigningError)?,
879            paths,
880            ingress_expiry: self.get_expiry_date(),
881        })
882    }
883
884    /// Verify a certificate, checking delegation if present.
885    /// Only passes if the certificate also has authority over the canister.
886    pub fn verify(
887        &self,
888        cert: &Certificate,
889        effective_canister_id: Principal,
890    ) -> Result<(), AgentError> {
891        self.verify_cert(cert, effective_canister_id)?;
892        self.verify_cert_timestamp(cert)?;
893        Ok(())
894    }
895
896    fn verify_cert(
897        &self,
898        cert: &Certificate,
899        effective_canister_id: Principal,
900    ) -> Result<(), AgentError> {
901        let sig = &cert.signature;
902
903        let root_hash = cert.tree.digest();
904        let mut msg = vec![];
905        msg.extend_from_slice(IC_STATE_ROOT_DOMAIN_SEPARATOR);
906        msg.extend_from_slice(&root_hash);
907
908        let der_key = self.check_delegation(&cert.delegation, effective_canister_id)?;
909        let key = extract_der(der_key)?;
910
911        ic_verify_bls_signature::verify_bls_signature(sig, &msg, &key)
912            .map_err(|_| AgentError::CertificateVerificationFailed())?;
913        Ok(())
914    }
915
916    /// Verify a certificate, checking delegation if present.
917    /// Only passes if the certificate is for the specified subnet.
918    pub fn verify_for_subnet(
919        &self,
920        cert: &Certificate,
921        subnet_id: Principal,
922    ) -> Result<(), AgentError> {
923        self.verify_cert_for_subnet(cert, subnet_id)?;
924        self.verify_cert_timestamp(cert)?;
925        Ok(())
926    }
927
928    fn verify_cert_for_subnet(
929        &self,
930        cert: &Certificate,
931        subnet_id: Principal,
932    ) -> Result<(), AgentError> {
933        let sig = &cert.signature;
934
935        let root_hash = cert.tree.digest();
936        let mut msg = vec![];
937        msg.extend_from_slice(IC_STATE_ROOT_DOMAIN_SEPARATOR);
938        msg.extend_from_slice(&root_hash);
939
940        let der_key = self.check_delegation_for_subnet(&cert.delegation, subnet_id)?;
941        let key = extract_der(der_key)?;
942
943        ic_verify_bls_signature::verify_bls_signature(sig, &msg, &key)
944            .map_err(|_| AgentError::CertificateVerificationFailed())?;
945        Ok(())
946    }
947
948    fn verify_cert_timestamp(&self, cert: &Certificate) -> Result<(), AgentError> {
949        let time = lookup_time(cert)?;
950        if (OffsetDateTime::now_utc()
951            - OffsetDateTime::from_unix_timestamp_nanos(time.into()).unwrap())
952        .abs()
953            > self.ingress_expiry
954        {
955            Err(AgentError::CertificateOutdated(self.ingress_expiry))
956        } else {
957            Ok(())
958        }
959    }
960
961    fn check_delegation(
962        &self,
963        delegation: &Option<Delegation>,
964        effective_canister_id: Principal,
965    ) -> Result<Vec<u8>, AgentError> {
966        match delegation {
967            None => Ok(self.read_root_key()),
968            Some(delegation) => {
969                let cert: Certificate = serde_cbor::from_slice(&delegation.certificate)
970                    .map_err(AgentError::InvalidCborData)?;
971                if cert.delegation.is_some() {
972                    return Err(AgentError::CertificateHasTooManyDelegations);
973                }
974                self.verify_cert(&cert, effective_canister_id)?;
975                let canister_range_lookup = [
976                    "subnet".as_bytes(),
977                    delegation.subnet_id.as_ref(),
978                    "canister_ranges".as_bytes(),
979                ];
980                let canister_range = lookup_value(&cert.tree, canister_range_lookup)?;
981                let ranges: Vec<(Principal, Principal)> =
982                    serde_cbor::from_slice(canister_range).map_err(AgentError::InvalidCborData)?;
983                if !principal_is_within_ranges(&effective_canister_id, &ranges[..]) {
984                    // the certificate is not authorized to answer calls for this canister
985                    return Err(AgentError::CertificateNotAuthorized());
986                }
987
988                let public_key_path = [
989                    "subnet".as_bytes(),
990                    delegation.subnet_id.as_ref(),
991                    "public_key".as_bytes(),
992                ];
993                lookup_value(&cert.tree, public_key_path).map(<[u8]>::to_vec)
994            }
995        }
996    }
997
998    fn check_delegation_for_subnet(
999        &self,
1000        delegation: &Option<Delegation>,
1001        subnet_id: Principal,
1002    ) -> Result<Vec<u8>, AgentError> {
1003        match delegation {
1004            None => Ok(self.read_root_key()),
1005            Some(delegation) => {
1006                let cert: Certificate = serde_cbor::from_slice(&delegation.certificate)
1007                    .map_err(AgentError::InvalidCborData)?;
1008                if cert.delegation.is_some() {
1009                    return Err(AgentError::CertificateHasTooManyDelegations);
1010                }
1011                self.verify_cert_for_subnet(&cert, subnet_id)?;
1012                let public_key_path = [
1013                    "subnet".as_bytes(),
1014                    delegation.subnet_id.as_ref(),
1015                    "public_key".as_bytes(),
1016                ];
1017                let pk = lookup_value(&cert.tree, public_key_path)
1018                    .map_err(|_| AgentError::CertificateNotAuthorized())?
1019                    .to_vec();
1020                Ok(pk)
1021            }
1022        }
1023    }
1024
1025    /// Request information about a particular canister for a single state subkey.
1026    /// See [the protocol docs](https://internetcomputer.org/docs/current/references/ic-interface-spec#state-tree-canister-information) for more information.
1027    pub async fn read_state_canister_info(
1028        &self,
1029        canister_id: Principal,
1030        path: &str,
1031    ) -> Result<Vec<u8>, AgentError> {
1032        let paths: Vec<Vec<Label>> = vec![vec![
1033            "canister".into(),
1034            Label::from_bytes(canister_id.as_slice()),
1035            path.into(),
1036        ]];
1037
1038        let cert = self.read_state_raw(paths, canister_id).await?;
1039
1040        lookup_canister_info(cert, canister_id, path)
1041    }
1042
1043    /// Request the controller list of a given canister.
1044    pub async fn read_state_canister_controllers(
1045        &self,
1046        canister_id: Principal,
1047    ) -> Result<Vec<Principal>, AgentError> {
1048        let blob = self
1049            .read_state_canister_info(canister_id, "controllers")
1050            .await?;
1051        let controllers: Vec<Principal> =
1052            serde_cbor::from_slice(&blob).map_err(AgentError::InvalidCborData)?;
1053        Ok(controllers)
1054    }
1055
1056    /// Request the module hash of a given canister.
1057    pub async fn read_state_canister_module_hash(
1058        &self,
1059        canister_id: Principal,
1060    ) -> Result<Vec<u8>, AgentError> {
1061        self.read_state_canister_info(canister_id, "module_hash")
1062            .await
1063    }
1064
1065    /// Request the bytes of the canister's custom section `icp:public <path>` or `icp:private <path>`.
1066    pub async fn read_state_canister_metadata(
1067        &self,
1068        canister_id: Principal,
1069        path: &str,
1070    ) -> Result<Vec<u8>, AgentError> {
1071        let paths: Vec<Vec<Label>> = vec![vec![
1072            "canister".into(),
1073            Label::from_bytes(canister_id.as_slice()),
1074            "metadata".into(),
1075            path.into(),
1076        ]];
1077
1078        let cert = self.read_state_raw(paths, canister_id).await?;
1079
1080        lookup_canister_metadata(cert, canister_id, path)
1081    }
1082
1083    /// Request a list of metrics about the subnet.
1084    pub async fn read_state_subnet_metrics(
1085        &self,
1086        subnet_id: Principal,
1087    ) -> Result<SubnetMetrics, AgentError> {
1088        let paths = vec![vec![
1089            "subnet".into(),
1090            Label::from_bytes(subnet_id.as_slice()),
1091            "metrics".into(),
1092        ]];
1093        let cert = self.read_subnet_state_raw(paths, subnet_id).await?;
1094        lookup_subnet_metrics(cert, subnet_id)
1095    }
1096
1097    /// Request a list of metrics about the subnet.
1098    pub async fn read_state_subnet_canister_ranges(
1099        &self,
1100        subnet_id: Principal,
1101    ) -> Result<Vec<(Principal, Principal)>, AgentError> {
1102        let paths = vec![vec![
1103            "subnet".into(),
1104            Label::from_bytes(subnet_id.as_slice()),
1105            "canister_ranges".into(),
1106        ]];
1107        let cert = self.read_subnet_state_raw(paths, subnet_id).await?;
1108        lookup_subnet_canister_ranges(cert, subnet_id)
1109    }
1110
1111    /// Fetches the status of a particular request by its ID.
1112    pub async fn request_status_raw(
1113        &self,
1114        request_id: &RequestId,
1115        effective_canister_id: Principal,
1116    ) -> Result<(RequestStatusResponse, Certificate), AgentError> {
1117        let paths: Vec<Vec<Label>> =
1118            vec![vec!["request_status".into(), request_id.to_vec().into()]];
1119
1120        let cert = self.read_state_raw(paths, effective_canister_id).await?;
1121
1122        Ok((lookup_request_status(&cert, request_id)?, cert))
1123    }
1124
1125    /// Send the signed `request_status` to the network. Will return [`RequestStatusResponse`].
1126    /// The bytes will be checked to verify that it is a valid `request_status`.
1127    /// If you want to inspect the fields of the `request_status`, use [`signed_request_status_inspect`] before calling this method.
1128    pub async fn request_status_signed(
1129        &self,
1130        request_id: &RequestId,
1131        effective_canister_id: Principal,
1132        signed_request_status: Vec<u8>,
1133    ) -> Result<(RequestStatusResponse, Certificate), AgentError> {
1134        let _envelope: Envelope =
1135            serde_cbor::from_slice(&signed_request_status).map_err(AgentError::InvalidCborData)?;
1136        let read_state_response: ReadStateResponse = self
1137            .read_state_endpoint(effective_canister_id, signed_request_status)
1138            .await?;
1139
1140        let cert: Certificate = serde_cbor::from_slice(&read_state_response.certificate)
1141            .map_err(AgentError::InvalidCborData)?;
1142        self.verify(&cert, effective_canister_id)?;
1143        Ok((lookup_request_status(&cert, request_id)?, cert))
1144    }
1145
1146    /// Returns an `UpdateBuilder` enabling the construction of an update call without
1147    /// passing all arguments.
1148    pub fn update<S: Into<String>>(
1149        &self,
1150        canister_id: &Principal,
1151        method_name: S,
1152    ) -> UpdateBuilder {
1153        UpdateBuilder::new(self, *canister_id, method_name.into())
1154    }
1155
1156    /// Calls and returns the information returned by the status endpoint of a replica.
1157    pub async fn status(&self) -> Result<Status, AgentError> {
1158        let endpoint = "api/v2/status";
1159        let bytes = self.execute(Method::GET, endpoint, None).await?.1;
1160
1161        let cbor: serde_cbor::Value =
1162            serde_cbor::from_slice(&bytes).map_err(AgentError::InvalidCborData)?;
1163
1164        Status::try_from(&cbor).map_err(|_| AgentError::InvalidReplicaStatus)
1165    }
1166
1167    /// Returns a `QueryBuilder` enabling the construction of a query call without
1168    /// passing all arguments.
1169    pub fn query<S: Into<String>>(&self, canister_id: &Principal, method_name: S) -> QueryBuilder {
1170        QueryBuilder::new(self, *canister_id, method_name.into())
1171    }
1172
1173    /// Sign a `request_status` call. This will return a [`signed::SignedRequestStatus`]
1174    /// which contains all fields of the `request_status` and the signed `request_status` in CBOR encoding
1175    pub fn sign_request_status(
1176        &self,
1177        effective_canister_id: Principal,
1178        request_id: RequestId,
1179    ) -> Result<SignedRequestStatus, AgentError> {
1180        let paths: Vec<Vec<Label>> =
1181            vec![vec!["request_status".into(), request_id.to_vec().into()]];
1182        let read_state_content = self.read_state_content(paths)?;
1183        let signed_request_status = sign_envelope(&read_state_content, self.identity.clone())?;
1184        let ingress_expiry = read_state_content.ingress_expiry();
1185        let sender = *read_state_content.sender();
1186        Ok(SignedRequestStatus {
1187            ingress_expiry,
1188            sender,
1189            effective_canister_id,
1190            request_id,
1191            signed_request_status,
1192        })
1193    }
1194
1195    async fn get_subnet_by_canister(
1196        &self,
1197        canister: &Principal,
1198    ) -> Result<Arc<Subnet>, AgentError> {
1199        let subnet = self
1200            .subnet_key_cache
1201            .lock()
1202            .unwrap()
1203            .get_subnet_by_canister(canister);
1204        if let Some(subnet) = subnet {
1205            Ok(subnet)
1206        } else {
1207            self.fetch_subnet_by_canister(canister).await
1208        }
1209    }
1210
1211    /// Retrieve all existing API boundary nodes from the state tree via endpoint `/api/v2/canister/<effective_canister_id>/read_state`
1212    pub async fn fetch_api_boundary_nodes_by_canister_id(
1213        &self,
1214        canister_id: Principal,
1215    ) -> Result<Vec<ApiBoundaryNode>, AgentError> {
1216        let paths = vec![vec!["api_boundary_nodes".into()]];
1217        let certificate = self.read_state_raw(paths, canister_id).await?;
1218        let api_boundary_nodes = lookup_api_boundary_nodes(certificate)?;
1219        Ok(api_boundary_nodes)
1220    }
1221
1222    /// Retrieve all existing API boundary nodes from the state tree via endpoint `/api/v2/subnet/<subnet_id>/read_state`
1223    pub async fn fetch_api_boundary_nodes_by_subnet_id(
1224        &self,
1225        subnet_id: Principal,
1226    ) -> Result<Vec<ApiBoundaryNode>, AgentError> {
1227        let paths = vec![vec!["api_boundary_nodes".into()]];
1228        let certificate = self.read_subnet_state_raw(paths, subnet_id).await?;
1229        let api_boundary_nodes = lookup_api_boundary_nodes(certificate)?;
1230        Ok(api_boundary_nodes)
1231    }
1232
1233    async fn fetch_subnet_by_canister(
1234        &self,
1235        canister: &Principal,
1236    ) -> Result<Arc<Subnet>, AgentError> {
1237        let cert = self
1238            .read_state_raw(vec![vec!["subnet".into()]], *canister)
1239            .await?;
1240
1241        let (subnet_id, subnet) = lookup_subnet(&cert, &self.root_key.read().unwrap())?;
1242        let subnet = Arc::new(subnet);
1243        self.subnet_key_cache
1244            .lock()
1245            .unwrap()
1246            .insert_subnet(subnet_id, subnet.clone());
1247        Ok(subnet)
1248    }
1249
1250    async fn request(
1251        &self,
1252        method: Method,
1253        endpoint: &str,
1254        body: Option<Vec<u8>>,
1255    ) -> Result<(StatusCode, HeaderMap, Vec<u8>), AgentError> {
1256        let body = body.map(Bytes::from);
1257
1258        let create_request_with_generated_url = || -> Result<http::Request<Bytes>, AgentError> {
1259            let url = self.route_provider.route()?.join(endpoint)?;
1260            let uri = Uri::from_str(url.as_str())
1261                .map_err(|e| AgentError::InvalidReplicaUrl(e.to_string()))?;
1262            let body = body.clone().unwrap_or_default();
1263            let request = http::Request::builder()
1264                .method(method.clone())
1265                .uri(uri)
1266                .header(CONTENT_TYPE, "application/cbor")
1267                .body(body)
1268                .map_err(|e| {
1269                    AgentError::TransportError(TransportError::Generic(format!(
1270                        "unable to create request: {e:#}"
1271                    )))
1272                })?;
1273
1274            Ok(request)
1275        };
1276
1277        let response = self
1278            .client
1279            .call(
1280                &create_request_with_generated_url,
1281                self.max_tcp_error_retries,
1282                self.max_response_body_size,
1283            )
1284            .await?;
1285
1286        let (parts, body) = response.into_parts();
1287
1288        Ok((parts.status, parts.headers, body.to_vec()))
1289    }
1290
1291    async fn execute(
1292        &self,
1293        method: Method,
1294        endpoint: &str,
1295        body: Option<Vec<u8>>,
1296    ) -> Result<(StatusCode, Vec<u8>), AgentError> {
1297        let request_result = self.request(method.clone(), endpoint, body.clone()).await?;
1298
1299        let status = request_result.0;
1300        let headers = request_result.1;
1301        let body = request_result.2;
1302
1303        if status.is_client_error() || status.is_server_error() {
1304            Err(AgentError::HttpError(HttpErrorPayload {
1305                status: status.into(),
1306                content_type: headers
1307                    .get(CONTENT_TYPE)
1308                    .and_then(|value| value.to_str().ok())
1309                    .map(str::to_string),
1310                content: body,
1311            }))
1312        } else if !(status == StatusCode::OK || status == StatusCode::ACCEPTED) {
1313            Err(AgentError::InvalidHttpResponse(format!(
1314                "Expected `200`, `202`, 4xx`, or `5xx` HTTP status code. Got: {status}",
1315            )))
1316        } else {
1317            Ok((status, body))
1318        }
1319    }
1320}
1321
1322// Checks if a principal is contained within a list of principal ranges
1323// A range is a tuple: (low: Principal, high: Principal), as described here: https://internetcomputer.org/docs/current/references/ic-interface-spec#state-tree-subnet
1324fn principal_is_within_ranges(principal: &Principal, ranges: &[(Principal, Principal)]) -> bool {
1325    ranges
1326        .iter()
1327        .any(|r| principal >= &r.0 && principal <= &r.1)
1328}
1329
1330fn sign_envelope(
1331    content: &EnvelopeContent,
1332    identity: Arc<dyn Identity>,
1333) -> Result<Vec<u8>, AgentError> {
1334    let signature = identity.sign(content).map_err(AgentError::SigningError)?;
1335
1336    let envelope = Envelope {
1337        content: Cow::Borrowed(content),
1338        sender_pubkey: signature.public_key,
1339        sender_sig: signature.signature,
1340        sender_delegation: signature.delegations,
1341    };
1342
1343    let mut serialized_bytes = Vec::new();
1344    let mut serializer = serde_cbor::Serializer::new(&mut serialized_bytes);
1345    serializer.self_describe()?;
1346    envelope.serialize(&mut serializer)?;
1347
1348    Ok(serialized_bytes)
1349}
1350
1351/// Inspect the bytes to be sent as a query
1352/// Return Ok only when the bytes can be deserialized as a query and all fields match with the arguments
1353pub fn signed_query_inspect(
1354    sender: Principal,
1355    canister_id: Principal,
1356    method_name: &str,
1357    arg: &[u8],
1358    ingress_expiry: u64,
1359    signed_query: Vec<u8>,
1360) -> Result<(), AgentError> {
1361    let envelope: Envelope =
1362        serde_cbor::from_slice(&signed_query).map_err(AgentError::InvalidCborData)?;
1363    match envelope.content.as_ref() {
1364        EnvelopeContent::Query {
1365            ingress_expiry: ingress_expiry_cbor,
1366            sender: sender_cbor,
1367            canister_id: canister_id_cbor,
1368            method_name: method_name_cbor,
1369            arg: arg_cbor,
1370            nonce: _nonce,
1371        } => {
1372            if ingress_expiry != *ingress_expiry_cbor {
1373                return Err(AgentError::CallDataMismatch {
1374                    field: "ingress_expiry".to_string(),
1375                    value_arg: ingress_expiry.to_string(),
1376                    value_cbor: ingress_expiry_cbor.to_string(),
1377                });
1378            }
1379            if sender != *sender_cbor {
1380                return Err(AgentError::CallDataMismatch {
1381                    field: "sender".to_string(),
1382                    value_arg: sender.to_string(),
1383                    value_cbor: sender_cbor.to_string(),
1384                });
1385            }
1386            if canister_id != *canister_id_cbor {
1387                return Err(AgentError::CallDataMismatch {
1388                    field: "canister_id".to_string(),
1389                    value_arg: canister_id.to_string(),
1390                    value_cbor: canister_id_cbor.to_string(),
1391                });
1392            }
1393            if method_name != *method_name_cbor {
1394                return Err(AgentError::CallDataMismatch {
1395                    field: "method_name".to_string(),
1396                    value_arg: method_name.to_string(),
1397                    value_cbor: method_name_cbor.clone(),
1398                });
1399            }
1400            if arg != *arg_cbor {
1401                return Err(AgentError::CallDataMismatch {
1402                    field: "arg".to_string(),
1403                    value_arg: format!("{arg:?}"),
1404                    value_cbor: format!("{arg_cbor:?}"),
1405                });
1406            }
1407        }
1408        EnvelopeContent::Call { .. } => {
1409            return Err(AgentError::CallDataMismatch {
1410                field: "request_type".to_string(),
1411                value_arg: "query".to_string(),
1412                value_cbor: "call".to_string(),
1413            })
1414        }
1415        EnvelopeContent::ReadState { .. } => {
1416            return Err(AgentError::CallDataMismatch {
1417                field: "request_type".to_string(),
1418                value_arg: "query".to_string(),
1419                value_cbor: "read_state".to_string(),
1420            })
1421        }
1422    }
1423    Ok(())
1424}
1425
1426/// Inspect the bytes to be sent as an update
1427/// Return Ok only when the bytes can be deserialized as an update and all fields match with the arguments
1428pub fn signed_update_inspect(
1429    sender: Principal,
1430    canister_id: Principal,
1431    method_name: &str,
1432    arg: &[u8],
1433    ingress_expiry: u64,
1434    signed_update: Vec<u8>,
1435) -> Result<(), AgentError> {
1436    let envelope: Envelope =
1437        serde_cbor::from_slice(&signed_update).map_err(AgentError::InvalidCborData)?;
1438    match envelope.content.as_ref() {
1439        EnvelopeContent::Call {
1440            nonce: _nonce,
1441            ingress_expiry: ingress_expiry_cbor,
1442            sender: sender_cbor,
1443            canister_id: canister_id_cbor,
1444            method_name: method_name_cbor,
1445            arg: arg_cbor,
1446        } => {
1447            if ingress_expiry != *ingress_expiry_cbor {
1448                return Err(AgentError::CallDataMismatch {
1449                    field: "ingress_expiry".to_string(),
1450                    value_arg: ingress_expiry.to_string(),
1451                    value_cbor: ingress_expiry_cbor.to_string(),
1452                });
1453            }
1454            if sender != *sender_cbor {
1455                return Err(AgentError::CallDataMismatch {
1456                    field: "sender".to_string(),
1457                    value_arg: sender.to_string(),
1458                    value_cbor: sender_cbor.to_string(),
1459                });
1460            }
1461            if canister_id != *canister_id_cbor {
1462                return Err(AgentError::CallDataMismatch {
1463                    field: "canister_id".to_string(),
1464                    value_arg: canister_id.to_string(),
1465                    value_cbor: canister_id_cbor.to_string(),
1466                });
1467            }
1468            if method_name != *method_name_cbor {
1469                return Err(AgentError::CallDataMismatch {
1470                    field: "method_name".to_string(),
1471                    value_arg: method_name.to_string(),
1472                    value_cbor: method_name_cbor.clone(),
1473                });
1474            }
1475            if arg != *arg_cbor {
1476                return Err(AgentError::CallDataMismatch {
1477                    field: "arg".to_string(),
1478                    value_arg: format!("{arg:?}"),
1479                    value_cbor: format!("{arg_cbor:?}"),
1480                });
1481            }
1482        }
1483        EnvelopeContent::ReadState { .. } => {
1484            return Err(AgentError::CallDataMismatch {
1485                field: "request_type".to_string(),
1486                value_arg: "call".to_string(),
1487                value_cbor: "read_state".to_string(),
1488            })
1489        }
1490        EnvelopeContent::Query { .. } => {
1491            return Err(AgentError::CallDataMismatch {
1492                field: "request_type".to_string(),
1493                value_arg: "call".to_string(),
1494                value_cbor: "query".to_string(),
1495            })
1496        }
1497    }
1498    Ok(())
1499}
1500
1501/// Inspect the bytes to be sent as a `request_status`
1502/// Return Ok only when the bytes can be deserialized as a `request_status` and all fields match with the arguments
1503pub fn signed_request_status_inspect(
1504    sender: Principal,
1505    request_id: &RequestId,
1506    ingress_expiry: u64,
1507    signed_request_status: Vec<u8>,
1508) -> Result<(), AgentError> {
1509    let paths: Vec<Vec<Label>> = vec![vec!["request_status".into(), request_id.to_vec().into()]];
1510    let envelope: Envelope =
1511        serde_cbor::from_slice(&signed_request_status).map_err(AgentError::InvalidCborData)?;
1512    match envelope.content.as_ref() {
1513        EnvelopeContent::ReadState {
1514            ingress_expiry: ingress_expiry_cbor,
1515            sender: sender_cbor,
1516            paths: paths_cbor,
1517        } => {
1518            if ingress_expiry != *ingress_expiry_cbor {
1519                return Err(AgentError::CallDataMismatch {
1520                    field: "ingress_expiry".to_string(),
1521                    value_arg: ingress_expiry.to_string(),
1522                    value_cbor: ingress_expiry_cbor.to_string(),
1523                });
1524            }
1525            if sender != *sender_cbor {
1526                return Err(AgentError::CallDataMismatch {
1527                    field: "sender".to_string(),
1528                    value_arg: sender.to_string(),
1529                    value_cbor: sender_cbor.to_string(),
1530                });
1531            }
1532
1533            if paths != *paths_cbor {
1534                return Err(AgentError::CallDataMismatch {
1535                    field: "paths".to_string(),
1536                    value_arg: format!("{paths:?}"),
1537                    value_cbor: format!("{paths_cbor:?}"),
1538                });
1539            }
1540        }
1541        EnvelopeContent::Query { .. } => {
1542            return Err(AgentError::CallDataMismatch {
1543                field: "request_type".to_string(),
1544                value_arg: "read_state".to_string(),
1545                value_cbor: "query".to_string(),
1546            })
1547        }
1548        EnvelopeContent::Call { .. } => {
1549            return Err(AgentError::CallDataMismatch {
1550                field: "request_type".to_string(),
1551                value_arg: "read_state".to_string(),
1552                value_cbor: "call".to_string(),
1553            })
1554        }
1555    }
1556    Ok(())
1557}
1558
1559#[derive(Clone)]
1560struct SubnetCache {
1561    subnets: TimedCache<Principal, Arc<Subnet>>,
1562    canister_index: RangeInclusiveMap<Principal, Principal, PrincipalStep>,
1563}
1564
1565impl SubnetCache {
1566    fn new() -> Self {
1567        Self {
1568            subnets: TimedCache::with_lifespan(300),
1569            canister_index: RangeInclusiveMap::new_with_step_fns(),
1570        }
1571    }
1572
1573    fn get_subnet_by_canister(&mut self, canister: &Principal) -> Option<Arc<Subnet>> {
1574        self.canister_index
1575            .get(canister)
1576            .and_then(|subnet_id| self.subnets.cache_get(subnet_id).cloned())
1577            .filter(|subnet| subnet.canister_ranges.contains(canister))
1578    }
1579
1580    fn insert_subnet(&mut self, subnet_id: Principal, subnet: Arc<Subnet>) {
1581        self.subnets.cache_set(subnet_id, subnet.clone());
1582        for range in subnet.canister_ranges.iter() {
1583            self.canister_index.insert(range.clone(), subnet_id);
1584        }
1585    }
1586}
1587
1588#[derive(Clone, Copy)]
1589struct PrincipalStep;
1590
1591impl StepFns<Principal> for PrincipalStep {
1592    fn add_one(start: &Principal) -> Principal {
1593        let bytes = start.as_slice();
1594        let mut arr = [0; 29];
1595        arr[..bytes.len()].copy_from_slice(bytes);
1596        for byte in arr[..bytes.len() - 1].iter_mut().rev() {
1597            *byte = byte.wrapping_add(1);
1598            if *byte != 0 {
1599                break;
1600            }
1601        }
1602        Principal::from_slice(&arr[..bytes.len()])
1603    }
1604    fn sub_one(start: &Principal) -> Principal {
1605        let bytes = start.as_slice();
1606        let mut arr = [0; 29];
1607        arr[..bytes.len()].copy_from_slice(bytes);
1608        for byte in arr[..bytes.len() - 1].iter_mut().rev() {
1609            *byte = byte.wrapping_sub(1);
1610            if *byte != 255 {
1611                break;
1612            }
1613        }
1614        Principal::from_slice(&arr[..bytes.len()])
1615    }
1616}
1617
1618#[derive(Clone)]
1619pub(crate) struct Subnet {
1620    // This key is just fetched for completeness. Do not actually use this value as it is not authoritative in case of a rogue subnet.
1621    // If a future agent needs to know the subnet key then it should fetch /subnet from the *root* subnet.
1622    _key: Vec<u8>,
1623    node_keys: HashMap<Principal, Vec<u8>>,
1624    canister_ranges: RangeInclusiveSet<Principal, PrincipalStep>,
1625}
1626
1627/// API boundary node, which routes /api calls to IC replica nodes.
1628#[derive(Debug, Clone)]
1629pub struct ApiBoundaryNode {
1630    /// Domain name
1631    pub domain: String,
1632    /// IPv6 address in the hexadecimal notation with colons.
1633    pub ipv6_address: String,
1634    /// IPv4 address in the dotted-decimal notation.
1635    pub ipv4_address: Option<String>,
1636}
1637
1638/// A query request builder.
1639///
1640/// This makes it easier to do query calls without actually passing all arguments.
1641#[derive(Debug, Clone)]
1642#[non_exhaustive]
1643pub struct QueryBuilder<'agent> {
1644    agent: &'agent Agent,
1645    /// The [effective canister ID](https://internetcomputer.org/docs/current/references/ic-interface-spec#http-effective-canister-id) of the destination.
1646    pub effective_canister_id: Principal,
1647    /// The principal ID of the canister being called.
1648    pub canister_id: Principal,
1649    /// The name of the canister method being called.
1650    pub method_name: String,
1651    /// The argument blob to be passed to the method.
1652    pub arg: Vec<u8>,
1653    /// The Unix timestamp that the request will expire at.
1654    pub ingress_expiry_datetime: Option<u64>,
1655    /// Whether to include a nonce with the message.
1656    pub use_nonce: bool,
1657}
1658
1659impl<'agent> QueryBuilder<'agent> {
1660    /// Creates a new query builder with an agent for a particular canister method.
1661    pub fn new(agent: &'agent Agent, canister_id: Principal, method_name: String) -> Self {
1662        Self {
1663            agent,
1664            effective_canister_id: canister_id,
1665            canister_id,
1666            method_name,
1667            arg: vec![],
1668            ingress_expiry_datetime: None,
1669            use_nonce: false,
1670        }
1671    }
1672
1673    /// Sets the [effective canister ID](https://internetcomputer.org/docs/current/references/ic-interface-spec#http-effective-canister-id) of the destination.
1674    pub fn with_effective_canister_id(mut self, canister_id: Principal) -> Self {
1675        self.effective_canister_id = canister_id;
1676        self
1677    }
1678
1679    /// Sets the argument blob to pass to the canister. For most canisters this should be a Candid-serialized tuple.
1680    pub fn with_arg<A: Into<Vec<u8>>>(mut self, arg: A) -> Self {
1681        self.arg = arg.into();
1682        self
1683    }
1684
1685    /// Sets `ingress_expiry_datetime` to the provided timestamp, at nanosecond precision.
1686    pub fn expire_at(mut self, time: impl Into<OffsetDateTime>) -> Self {
1687        self.ingress_expiry_datetime = Some(time.into().unix_timestamp_nanos() as u64);
1688        self
1689    }
1690
1691    /// Sets `ingress_expiry_datetime` to `max(now, 4min)`.
1692    pub fn expire_after(mut self, duration: Duration) -> Self {
1693        self.ingress_expiry_datetime = Some(
1694            OffsetDateTime::now_utc()
1695                .saturating_add(duration.try_into().expect("negative duration"))
1696                .unix_timestamp_nanos() as u64,
1697        );
1698        self
1699    }
1700
1701    /// Uses a nonce generated with the agent's configured nonce factory. By default queries do not use nonces,
1702    /// and thus may get a (briefly) cached response.
1703    pub fn with_nonce_generation(mut self) -> Self {
1704        self.use_nonce = true;
1705        self
1706    }
1707
1708    /// Make a query call. This will return a byte vector.
1709    pub async fn call(self) -> Result<Vec<u8>, AgentError> {
1710        self.agent
1711            .query_raw(
1712                self.canister_id,
1713                self.effective_canister_id,
1714                self.method_name,
1715                self.arg,
1716                self.ingress_expiry_datetime,
1717                self.use_nonce,
1718                None,
1719            )
1720            .await
1721    }
1722
1723    /// Make a query call with signature verification. This will return a byte vector.
1724    ///
1725    /// Compared with [call][Self::call], this method will **always** verify the signature of the query response
1726    /// regardless the Agent level configuration from [`AgentBuilder::with_verify_query_signatures`].
1727    pub async fn call_with_verification(self) -> Result<Vec<u8>, AgentError> {
1728        self.agent
1729            .query_raw(
1730                self.canister_id,
1731                self.effective_canister_id,
1732                self.method_name,
1733                self.arg,
1734                self.ingress_expiry_datetime,
1735                self.use_nonce,
1736                Some(true),
1737            )
1738            .await
1739    }
1740
1741    /// Make a query call without signature verification. This will return a byte vector.
1742    ///
1743    /// Compared with [call][Self::call], this method will **never** verify the signature of the query response
1744    /// regardless the Agent level configuration from [`AgentBuilder::with_verify_query_signatures`].
1745    pub async fn call_without_verification(self) -> Result<Vec<u8>, AgentError> {
1746        self.agent
1747            .query_raw(
1748                self.canister_id,
1749                self.effective_canister_id,
1750                self.method_name,
1751                self.arg,
1752                self.ingress_expiry_datetime,
1753                self.use_nonce,
1754                Some(false),
1755            )
1756            .await
1757    }
1758
1759    /// Sign a query call. This will return a [`signed::SignedQuery`]
1760    /// which contains all fields of the query and the signed query in CBOR encoding
1761    pub fn sign(self) -> Result<SignedQuery, AgentError> {
1762        let effective_canister_id = self.effective_canister_id;
1763        let identity = self.agent.identity.clone();
1764        let content = self.into_envelope()?;
1765        let signed_query = sign_envelope(&content, identity)?;
1766        let EnvelopeContent::Query {
1767            ingress_expiry,
1768            sender,
1769            canister_id,
1770            method_name,
1771            arg,
1772            nonce,
1773        } = content
1774        else {
1775            unreachable!()
1776        };
1777        Ok(SignedQuery {
1778            ingress_expiry,
1779            sender,
1780            canister_id,
1781            method_name,
1782            arg,
1783            effective_canister_id,
1784            signed_query,
1785            nonce,
1786        })
1787    }
1788
1789    /// Converts the query builder into [`EnvelopeContent`] for external signing or storage.
1790    pub fn into_envelope(self) -> Result<EnvelopeContent, AgentError> {
1791        self.agent.query_content(
1792            self.canister_id,
1793            self.method_name,
1794            self.arg,
1795            self.ingress_expiry_datetime,
1796            self.use_nonce,
1797        )
1798    }
1799}
1800
1801impl<'agent> IntoFuture for QueryBuilder<'agent> {
1802    type IntoFuture = AgentFuture<'agent, Vec<u8>>;
1803    type Output = Result<Vec<u8>, AgentError>;
1804    fn into_future(self) -> Self::IntoFuture {
1805        Box::pin(self.call())
1806    }
1807}
1808
1809/// An in-flight canister update call. Useful primarily as a `Future`.
1810pub struct UpdateCall<'agent> {
1811    agent: &'agent Agent,
1812    response_future: AgentFuture<'agent, CallResponse<(Vec<u8>, Certificate)>>,
1813    effective_canister_id: Principal,
1814    canister_id: Principal,
1815    method_name: String,
1816}
1817
1818impl fmt::Debug for UpdateCall<'_> {
1819    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1820        f.debug_struct("UpdateCall")
1821            .field("agent", &self.agent)
1822            .field("effective_canister_id", &self.effective_canister_id)
1823            .finish_non_exhaustive()
1824    }
1825}
1826
1827impl Future for UpdateCall<'_> {
1828    type Output = Result<CallResponse<(Vec<u8>, Certificate)>, AgentError>;
1829    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1830        self.response_future.as_mut().poll(cx)
1831    }
1832}
1833
1834impl<'a> UpdateCall<'a> {
1835    /// Waits for the update call to be completed, polling if necessary.
1836    pub async fn and_wait(self) -> Result<(Vec<u8>, Certificate), AgentError> {
1837        let response = self.response_future.await?;
1838
1839        match response {
1840            CallResponse::Response(response) => Ok(response),
1841            CallResponse::Poll(request_id) => {
1842                self.agent
1843                    .wait_inner(
1844                        &request_id,
1845                        self.effective_canister_id,
1846                        Some(Operation::Call {
1847                            canister: self.canister_id,
1848                            method: self.method_name,
1849                        }),
1850                    )
1851                    .await
1852            }
1853        }
1854    }
1855}
1856/// An update request Builder.
1857///
1858/// This makes it easier to do update calls without actually passing all arguments or specifying
1859/// if you want to wait or not.
1860#[derive(Debug)]
1861pub struct UpdateBuilder<'agent> {
1862    agent: &'agent Agent,
1863    /// The [effective canister ID](https://internetcomputer.org/docs/current/references/ic-interface-spec#http-effective-canister-id) of the destination.
1864    pub effective_canister_id: Principal,
1865    /// The principal ID of the canister being called.
1866    pub canister_id: Principal,
1867    /// The name of the canister method being called.
1868    pub method_name: String,
1869    /// The argument blob to be passed to the method.
1870    pub arg: Vec<u8>,
1871    /// The Unix timestamp that the request will expire at.
1872    pub ingress_expiry_datetime: Option<u64>,
1873}
1874
1875impl<'agent> UpdateBuilder<'agent> {
1876    /// Creates a new update builder with an agent for a particular canister method.
1877    pub fn new(agent: &'agent Agent, canister_id: Principal, method_name: String) -> Self {
1878        Self {
1879            agent,
1880            effective_canister_id: canister_id,
1881            canister_id,
1882            method_name,
1883            arg: vec![],
1884            ingress_expiry_datetime: None,
1885        }
1886    }
1887
1888    /// Sets the [effective canister ID](https://internetcomputer.org/docs/current/references/ic-interface-spec#http-effective-canister-id) of the destination.
1889    pub fn with_effective_canister_id(mut self, canister_id: Principal) -> Self {
1890        self.effective_canister_id = canister_id;
1891        self
1892    }
1893
1894    /// Sets the argument blob to pass to the canister. For most canisters this should be a Candid-serialized tuple.
1895    pub fn with_arg<A: Into<Vec<u8>>>(mut self, arg: A) -> Self {
1896        self.arg = arg.into();
1897        self
1898    }
1899
1900    /// Sets `ingress_expiry_datetime` to the provided timestamp, at nanosecond precision.
1901    pub fn expire_at(mut self, time: impl Into<OffsetDateTime>) -> Self {
1902        self.ingress_expiry_datetime = Some(time.into().unix_timestamp_nanos() as u64);
1903        self
1904    }
1905
1906    /// Sets `ingress_expiry_datetime` to `min(now, 4min)`.
1907    pub fn expire_after(mut self, duration: Duration) -> Self {
1908        self.ingress_expiry_datetime = Some(
1909            OffsetDateTime::now_utc()
1910                .saturating_add(duration.try_into().expect("negative duration"))
1911                .unix_timestamp_nanos() as u64,
1912        );
1913        self
1914    }
1915
1916    /// Make an update call. This will call `request_status` on the `RequestId` in a loop and return
1917    /// the response as a byte vector.
1918    pub async fn call_and_wait(self) -> Result<Vec<u8>, AgentError> {
1919        self.call().and_wait().await.map(|x| x.0)
1920    }
1921
1922    /// Make an update call. This will return a `RequestId`.
1923    /// The `RequestId` should then be used for `request_status` (most likely in a loop).
1924    pub fn call(self) -> UpdateCall<'agent> {
1925        let method_name = self.method_name.clone();
1926        let response_future = async move {
1927            self.agent
1928                .update_raw(
1929                    self.canister_id,
1930                    self.effective_canister_id,
1931                    self.method_name,
1932                    self.arg,
1933                    self.ingress_expiry_datetime,
1934                )
1935                .await
1936        };
1937        UpdateCall {
1938            agent: self.agent,
1939            response_future: Box::pin(response_future),
1940            effective_canister_id: self.effective_canister_id,
1941            canister_id: self.canister_id,
1942            method_name,
1943        }
1944    }
1945
1946    /// Sign a update call. This will return a [`signed::SignedUpdate`]
1947    /// which contains all fields of the update and the signed update in CBOR encoding
1948    pub fn sign(self) -> Result<SignedUpdate, AgentError> {
1949        let identity = self.agent.identity.clone();
1950        let effective_canister_id = self.effective_canister_id;
1951        let content = self.into_envelope()?;
1952        let signed_update = sign_envelope(&content, identity)?;
1953        let request_id = to_request_id(&content)?;
1954        let EnvelopeContent::Call {
1955            nonce,
1956            ingress_expiry,
1957            sender,
1958            canister_id,
1959            method_name,
1960            arg,
1961        } = content
1962        else {
1963            unreachable!()
1964        };
1965        Ok(SignedUpdate {
1966            nonce,
1967            ingress_expiry,
1968            sender,
1969            canister_id,
1970            method_name,
1971            arg,
1972            effective_canister_id,
1973            signed_update,
1974            request_id,
1975        })
1976    }
1977
1978    /// Converts the update builder into an [`EnvelopeContent`] for external signing or storage.
1979    pub fn into_envelope(self) -> Result<EnvelopeContent, AgentError> {
1980        let nonce = self.agent.nonce_factory.generate();
1981        self.agent.update_content(
1982            self.canister_id,
1983            self.method_name,
1984            self.arg,
1985            self.ingress_expiry_datetime,
1986            nonce,
1987        )
1988    }
1989}
1990
1991impl<'agent> IntoFuture for UpdateBuilder<'agent> {
1992    type IntoFuture = AgentFuture<'agent, Vec<u8>>;
1993    type Output = Result<Vec<u8>, AgentError>;
1994    fn into_future(self) -> Self::IntoFuture {
1995        Box::pin(self.call_and_wait())
1996    }
1997}
1998
1999/// HTTP client middleware. Implemented automatically for `reqwest`-compatible by-ref `tower::Service`, such as `reqwest_middleware`.
2000#[cfg_attr(target_family = "wasm", async_trait(?Send))]
2001#[cfg_attr(not(target_family = "wasm"), async_trait)]
2002pub trait HttpService: Send + Sync + Debug {
2003    /// Perform a HTTP request. Any retry logic should call `req` again to get a new request.
2004    async fn call<'a>(
2005        &'a self,
2006        req: &'a (dyn Fn() -> Result<http::Request<Bytes>, AgentError> + Send + Sync),
2007        max_retries: usize,
2008        size_limit: Option<usize>,
2009    ) -> Result<http::Response<Bytes>, AgentError>;
2010}
2011
2012/// Convert from http Request to reqwest's one
2013fn from_http_request(req: http::Request<Bytes>) -> Result<Request, AgentError> {
2014    let (parts, body) = req.into_parts();
2015    let body = reqwest::Body::from(body);
2016    // I think it can never fail since it converts from `Url` to `Uri` and `Url` is a subset of `Uri`,
2017    // but just to be safe let's handle it.
2018    let request = http::Request::from_parts(parts, body)
2019        .try_into()
2020        .map_err(|e: reqwest::Error| AgentError::InvalidReplicaUrl(e.to_string()))?;
2021
2022    Ok(request)
2023}
2024
2025/// Convert from reqwests's Response to http one
2026#[cfg(not(target_family = "wasm"))]
2027async fn to_http_response(
2028    resp: Response,
2029    size_limit: Option<usize>,
2030) -> Result<http::Response<Bytes>, AgentError> {
2031    use http_body_util::{BodyExt, Limited};
2032
2033    let resp: http::Response<reqwest::Body> = resp.into();
2034    let (parts, body) = resp.into_parts();
2035    let body = Limited::new(body, size_limit.unwrap_or(usize::MAX));
2036    let body = body
2037        .collect()
2038        .await
2039        .map_err(|e| {
2040            AgentError::TransportError(TransportError::Generic(format!(
2041                "unable to read response body: {e:#}"
2042            )))
2043        })?
2044        .to_bytes();
2045    let resp = http::Response::from_parts(parts, body);
2046
2047    Ok(resp)
2048}
2049
2050/// Convert from reqwests's Response to http one.
2051/// WASM Response in reqwest doesn't have direct conversion for http::Response,
2052/// so we have to hack around using streams.
2053#[cfg(target_family = "wasm")]
2054async fn to_http_response(
2055    resp: Response,
2056    size_limit: Option<usize>,
2057) -> Result<http::Response<Bytes>, AgentError> {
2058    use futures_util::StreamExt;
2059    use http_body::Frame;
2060    use http_body_util::{Limited, StreamBody};
2061
2062    // Save headers
2063    let status = resp.status();
2064    let headers = resp.headers().clone();
2065
2066    // Convert body
2067    let stream = resp.bytes_stream().map(|x| x.map(Frame::data));
2068    let body = StreamBody::new(stream);
2069    let body = Limited::new(body, size_limit.unwrap_or(usize::MAX));
2070    let body = http_body_util::BodyExt::collect(body)
2071        .await
2072        .map_err(|e| {
2073            AgentError::TransportError(TransportError::Generic(format!(
2074                "unable to read response body: {e:#}"
2075            )))
2076        })?
2077        .to_bytes();
2078
2079    let mut resp = http::Response::new(body);
2080    *resp.status_mut() = status;
2081    *resp.headers_mut() = headers;
2082
2083    Ok(resp)
2084}
2085
2086#[cfg(not(target_family = "wasm"))]
2087#[async_trait]
2088impl<T> HttpService for T
2089where
2090    for<'a> &'a T: Service<Request, Response = Response, Error = reqwest::Error>,
2091    for<'a> <&'a Self as Service<Request>>::Future: Send,
2092    T: Send + Sync + Debug + ?Sized,
2093{
2094    #[allow(clippy::needless_arbitrary_self_type)]
2095    async fn call<'a>(
2096        mut self: &'a Self,
2097        req: &'a (dyn Fn() -> Result<http::Request<Bytes>, AgentError> + Send + Sync),
2098        max_retries: usize,
2099        size_limit: Option<usize>,
2100    ) -> Result<http::Response<Bytes>, AgentError> {
2101        let mut retry_count = 0;
2102        loop {
2103            let request = from_http_request(req()?)?;
2104
2105            match Service::call(&mut self, request).await {
2106                Err(err) => {
2107                    // Network-related errors can be retried.
2108                    if err.is_connect() {
2109                        if retry_count >= max_retries {
2110                            return Err(AgentError::TransportError(TransportError::Reqwest(err)));
2111                        }
2112                        retry_count += 1;
2113                    }
2114                }
2115
2116                Ok(resp) => {
2117                    let resp = to_http_response(resp, size_limit).await?;
2118                    return Ok(resp);
2119                }
2120            }
2121        }
2122    }
2123}
2124
2125#[cfg(target_family = "wasm")]
2126#[async_trait(?Send)]
2127impl<T> HttpService for T
2128where
2129    for<'a> &'a T: Service<Request, Response = Response, Error = reqwest::Error>,
2130    T: Send + Sync + Debug + ?Sized,
2131{
2132    #[allow(clippy::needless_arbitrary_self_type)]
2133    async fn call<'a>(
2134        mut self: &'a Self,
2135        req: &'a (dyn Fn() -> Result<http::Request<Bytes>, AgentError> + Send + Sync),
2136        _retries: usize,
2137        _size_limit: Option<usize>,
2138    ) -> Result<http::Response<Bytes>, AgentError> {
2139        let request = from_http_request(req()?)?;
2140        let response = Service::call(&mut self, request)
2141            .await
2142            .map_err(|e| AgentError::TransportError(TransportError::Reqwest(e)))?;
2143
2144        to_http_response(response, _size_limit).await
2145    }
2146}
2147
2148#[derive(Debug)]
2149struct Retry429Logic {
2150    client: Client,
2151}
2152
2153#[cfg_attr(target_family = "wasm", async_trait(?Send))]
2154#[cfg_attr(not(target_family = "wasm"), async_trait)]
2155impl HttpService for Retry429Logic {
2156    async fn call<'a>(
2157        &'a self,
2158        req: &'a (dyn Fn() -> Result<http::Request<Bytes>, AgentError> + Send + Sync),
2159        _max_tcp_retries: usize,
2160        _size_limit: Option<usize>,
2161    ) -> Result<http::Response<Bytes>, AgentError> {
2162        let mut retries = 0;
2163        loop {
2164            #[cfg(not(target_family = "wasm"))]
2165            let resp = self.client.call(req, _max_tcp_retries, _size_limit).await?;
2166            // Client inconveniently does not implement Service on wasm
2167            #[cfg(target_family = "wasm")]
2168            let resp = {
2169                let request = from_http_request(req()?)?;
2170                let resp = self
2171                    .client
2172                    .execute(request)
2173                    .await
2174                    .map_err(|e| AgentError::TransportError(TransportError::Reqwest(e)))?;
2175
2176                to_http_response(resp, _size_limit).await?
2177            };
2178
2179            if resp.status() == StatusCode::TOO_MANY_REQUESTS {
2180                if retries == 6 {
2181                    break Ok(resp);
2182                } else {
2183                    retries += 1;
2184                    crate::util::sleep(Duration::from_millis(250)).await;
2185                    continue;
2186                }
2187            } else {
2188                break Ok(resp);
2189            }
2190        }
2191    }
2192}
2193
2194#[cfg(all(test, not(target_family = "wasm")))]
2195mod offline_tests {
2196    use super::*;
2197    use tokio::net::TcpListener;
2198    // Any tests that involve the network should go in agent_test, not here.
2199
2200    #[test]
2201    fn rounded_expiry() {
2202        let agent = Agent::builder()
2203            .with_url("http://not-a-real-url")
2204            .build()
2205            .unwrap();
2206        let mut prev_expiry = None;
2207        let mut num_timestamps = 0;
2208        for _ in 0..6 {
2209            let update = agent
2210                .update(&Principal::management_canister(), "not_a_method")
2211                .sign()
2212                .unwrap();
2213            if prev_expiry < Some(update.ingress_expiry) {
2214                prev_expiry = Some(update.ingress_expiry);
2215                num_timestamps += 1;
2216            }
2217        }
2218        // in six requests, there should be no more than two timestamps
2219        assert!(num_timestamps <= 2, "num_timestamps:{num_timestamps} > 2");
2220    }
2221
2222    #[tokio::test]
2223    async fn client_ratelimit() {
2224        let mock_server = TcpListener::bind("127.0.0.1:0").await.unwrap();
2225        let count = Arc::new(Mutex::new(0));
2226        let port = mock_server.local_addr().unwrap().port();
2227        tokio::spawn({
2228            let count = count.clone();
2229            async move {
2230                loop {
2231                    let (mut conn, _) = mock_server.accept().await.unwrap();
2232                    *count.lock().unwrap() += 1;
2233                    tokio::spawn(
2234                        // read all data, never reply
2235                        async move { tokio::io::copy(&mut conn, &mut tokio::io::sink()).await },
2236                    );
2237                }
2238            }
2239        });
2240        let agent = Agent::builder()
2241            .with_http_client(Client::builder().http1_only().build().unwrap())
2242            .with_url(format!("http://127.0.0.1:{port}"))
2243            .with_max_concurrent_requests(2)
2244            .build()
2245            .unwrap();
2246        for _ in 0..3 {
2247            let agent = agent.clone();
2248            tokio::spawn(async move {
2249                agent
2250                    .query(&"ryjl3-tyaaa-aaaaa-aaaba-cai".parse().unwrap(), "greet")
2251                    .call()
2252                    .await
2253            });
2254        }
2255        crate::util::sleep(Duration::from_millis(250)).await;
2256        assert_eq!(*count.lock().unwrap(), 2);
2257    }
2258}