Skip to main content

ic_agent/agent/
mod.rs

1//! The main Agent module. Contains the [Agent] type and all associated structures.
2pub(crate) mod agent_config;
3pub mod agent_error;
4pub(crate) mod builder;
5// delete this module after 0.40
6#[doc(hidden)]
7#[deprecated(since = "0.38.0", note = "use the AgentBuilder methods")]
8pub mod http_transport;
9pub(crate) mod nonce;
10pub(crate) mod response_authentication;
11pub mod route_provider;
12pub mod status;
13pub mod subnet;
14
15pub use agent_config::AgentConfig;
16pub use agent_error::AgentError;
17use agent_error::{HttpErrorPayload, Operation};
18use async_lock::Semaphore;
19use async_trait::async_trait;
20pub use builder::AgentBuilder;
21use bytes::Bytes;
22use cached::{Cached, TimedCache};
23use http::{header::CONTENT_TYPE, HeaderMap, Method, StatusCode, Uri};
24use ic_ed25519::{PublicKey, SignatureError};
25#[doc(inline)]
26pub use ic_transport_types::{
27    signed, CallResponse, Envelope, EnvelopeContent, RejectCode, RejectResponse, ReplyResponse,
28    RequestStatusResponse,
29};
30pub use nonce::{NonceFactory, NonceGenerator};
31use rangemap::{RangeInclusiveMap, StepFns};
32use reqwest::{Client, Request, Response};
33use route_provider::{
34    dynamic_routing::{dynamic_route_provider::DynamicRouteProviderBuilder, node::Node},
35    RouteProvider, UrlUntilReady,
36};
37pub use subnet::Subnet;
38use time::OffsetDateTime;
39use tower_service::Service;
40
41#[cfg(test)]
42mod agent_test;
43
44use crate::{
45    agent::response_authentication::{
46        extract_der, lookup_canister_info, lookup_canister_metadata, lookup_canister_ranges,
47        lookup_incomplete_subnet, lookup_request_status, lookup_subnet_and_ranges,
48        lookup_subnet_canister_ranges, lookup_subnet_metrics, lookup_time, lookup_tree,
49        lookup_value,
50    },
51    agent_error::TransportError,
52    export::Principal,
53    identity::Identity,
54    to_request_id, RequestId,
55};
56use backoff::{backoff::Backoff, ExponentialBackoffBuilder};
57use backoff::{exponential::ExponentialBackoff, SystemClock};
58use ic_certification::{Certificate, Delegation, Label};
59use ic_transport_types::{
60    signed::{SignedQuery, SignedRequestStatus, SignedUpdate},
61    QueryResponse, ReadStateResponse, SubnetMetrics, TransportCallResponse,
62};
63use serde::Serialize;
64use status::Status;
65use std::{
66    borrow::Cow,
67    convert::TryFrom,
68    fmt::{self, Debug},
69    future::{Future, IntoFuture},
70    pin::Pin,
71    str::FromStr,
72    sync::{Arc, Mutex, RwLock},
73    task::{Context, Poll},
74    time::Duration,
75};
76
77use crate::agent::response_authentication::lookup_api_boundary_nodes;
78
79const IC_STATE_ROOT_DOMAIN_SEPARATOR: &[u8; 14] = b"\x0Dic-state-root";
80
81const IC_ROOT_KEY: &[u8; 133] = b"\x30\x81\x82\x30\x1d\x06\x0d\x2b\x06\x01\x04\x01\x82\xdc\x7c\x05\x03\x01\x02\x01\x06\x0c\x2b\x06\x01\x04\x01\x82\xdc\x7c\x05\x03\x02\x01\x03\x61\x00\x81\x4c\x0e\x6e\xc7\x1f\xab\x58\x3b\x08\xbd\x81\x37\x3c\x25\x5c\x3c\x37\x1b\x2e\x84\x86\x3c\x98\xa4\xf1\xe0\x8b\x74\x23\x5d\x14\xfb\x5d\x9c\x0c\xd5\x46\xd9\x68\x5f\x91\x3a\x0c\x0b\x2c\xc5\x34\x15\x83\xbf\x4b\x43\x92\xe4\x67\xdb\x96\xd6\x5b\x9b\xb4\xcb\x71\x71\x12\xf8\x47\x2e\x0d\x5a\x4d\x14\x50\x5f\xfd\x74\x84\xb0\x12\x91\x09\x1c\x5f\x87\xb9\x88\x83\x46\x3f\x98\x09\x1a\x0b\xaa\xae";
82
83#[cfg(not(target_family = "wasm"))]
84type AgentFuture<'a, V> = Pin<Box<dyn Future<Output = Result<V, AgentError>> + Send + 'a>>;
85
86#[cfg(target_family = "wasm")]
87type AgentFuture<'a, V> = Pin<Box<dyn Future<Output = Result<V, AgentError>> + 'a>>;
88
89/// A low level Agent to make calls to a Replica endpoint.
90///
91#[cfg_attr(unix, doc = " ```rust")] // pocket-ic
92#[cfg_attr(not(unix), doc = " ```ignore")]
93/// use ic_agent::{Agent, export::Principal};
94/// use candid::{Encode, Decode, CandidType, Nat};
95/// use serde::Deserialize;
96///
97/// #[derive(CandidType)]
98/// struct Argument {
99///   amount: Option<Nat>,
100/// }
101///
102/// #[derive(CandidType, Deserialize)]
103/// struct CreateCanisterResult {
104///   canister_id: Principal,
105/// }
106///
107/// # fn create_identity() -> impl ic_agent::Identity {
108/// #     // In real code, the raw key should be either read from a pem file or generated with randomness.
109/// #     ic_agent::identity::BasicIdentity::from_raw_key(&[0u8;32])
110/// # }
111/// #
112/// async fn create_a_canister() -> Result<Principal, Box<dyn std::error::Error>> {
113/// # Ok(ref_tests::utils::with_pic(async move |pic| {
114/// # let url = ref_tests::utils::get_pic_url(&pic);
115///   let agent = Agent::builder()
116///     .with_url(url)
117///     .with_identity(create_identity())
118///     .build()?;
119///
120///   // Only do the following call when not contacting the IC main net (e.g. a local emulator).
121///   // This is important as the main net public key is static and a rogue network could return
122///   // a different key.
123///   // If you know the root key ahead of time, you can use `agent.set_root_key(root_key);`.
124///   agent.fetch_root_key().await?;
125///   let management_canister_id = Principal::from_text("aaaaa-aa")?;
126///
127///   // Create a call to the management canister to create a new canister ID, and wait for a result.
128///   // This API only works in local instances; mainnet instances must use the cycles ledger.
129///   // See `dfx info default-effective-canister-id`.
130/// # let effective_canister_id = ref_tests::utils::get_effective_canister_id(&pic).await;
131///   let response = agent.update(&management_canister_id, "provisional_create_canister_with_cycles")
132///     .with_effective_canister_id(effective_canister_id)
133///     .with_arg(Encode!(&Argument { amount: None })?)
134///     .await?;
135///
136///   let result = Decode!(response.as_slice(), CreateCanisterResult)?;
137///   let canister_id: Principal = Principal::from_text(&result.canister_id.to_text())?;
138///   Ok(canister_id)
139/// # }).await)
140/// }
141///
142/// # let mut runtime = tokio::runtime::Runtime::new().unwrap();
143/// # runtime.block_on(async {
144/// let canister_id = create_a_canister().await.unwrap();
145/// eprintln!("{}", canister_id);
146/// # });
147/// ```
148///
149/// This agent does not understand Candid, and only acts on byte buffers.
150///
151/// Some methods return certificates. While there is a `verify_certificate` method, any certificate
152/// you receive from a method has already been verified and you do not need to manually verify it.
153#[derive(Clone)]
154pub struct Agent {
155    nonce_factory: Arc<dyn NonceGenerator>,
156    identity: Arc<dyn Identity>,
157    ingress_expiry: Duration,
158    root_key: Arc<RwLock<Vec<u8>>>,
159    client: Arc<dyn HttpService>,
160    route_provider: Arc<dyn RouteProvider>,
161    subnet_key_cache: Arc<Mutex<SubnetCache>>,
162    concurrent_requests_semaphore: Arc<Semaphore>,
163    verify_query_signatures: bool,
164    max_response_body_size: Option<usize>,
165    max_polling_time: Duration,
166    #[allow(dead_code)]
167    max_tcp_error_retries: usize,
168}
169
170impl fmt::Debug for Agent {
171    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
172        f.debug_struct("Agent")
173            .field("ingress_expiry", &self.ingress_expiry)
174            .finish_non_exhaustive()
175    }
176}
177
178impl Agent {
179    /// Create an instance of an [`AgentBuilder`] for building an [`Agent`]. This is simpler than
180    /// using the [`AgentConfig`] and [`Agent::new()`].
181    pub fn builder() -> builder::AgentBuilder {
182        Default::default()
183    }
184
185    /// Create an instance of an [`Agent`].
186    pub fn new(config: agent_config::AgentConfig) -> Result<Agent, AgentError> {
187        let client = config.http_service.unwrap_or_else(|| {
188            Arc::new(Retry429Logic {
189                client: config.client.unwrap_or_else(|| {
190                    #[cfg(not(target_family = "wasm"))]
191                    {
192                        Client::builder()
193                            .use_rustls_tls()
194                            .timeout(Duration::from_secs(360))
195                            .build()
196                            .expect("Could not create HTTP client.")
197                    }
198                    #[cfg(all(target_family = "wasm", feature = "wasm-bindgen"))]
199                    {
200                        Client::new()
201                    }
202                }),
203            })
204        });
205        Ok(Agent {
206            nonce_factory: config.nonce_factory,
207            identity: config.identity,
208            ingress_expiry: config.ingress_expiry,
209            root_key: Arc::new(RwLock::new(IC_ROOT_KEY.to_vec())),
210            client: client.clone(),
211            route_provider: if let Some(route_provider) = config.route_provider {
212                route_provider
213            } else if let Some(url) = config.url {
214                if config.background_dynamic_routing {
215                    assert!(
216                        url.scheme() == "https" && url.path() == "/" && url.port().is_none() && url.domain().is_some(),
217                        "in dynamic routing mode, URL must be in the exact form https://domain with no path, port, IP, or non-HTTPS scheme"
218                    );
219                    let seeds = vec![Node::new(url.domain().unwrap()).unwrap()];
220                    UrlUntilReady::new(url, async move {
221                        let provider = DynamicRouteProviderBuilder::new(seeds, client).build();
222                        provider.start().await;
223                        provider
224                    }) as Arc<dyn RouteProvider>
225                } else {
226                    Arc::new(url)
227                }
228            } else {
229                panic!("either route_provider or url must be specified");
230            },
231            subnet_key_cache: Arc::new(Mutex::new(SubnetCache::new())),
232            verify_query_signatures: config.verify_query_signatures,
233            concurrent_requests_semaphore: Arc::new(Semaphore::new(config.max_concurrent_requests)),
234            max_response_body_size: config.max_response_body_size,
235            max_tcp_error_retries: config.max_tcp_error_retries,
236            max_polling_time: config.max_polling_time,
237        })
238    }
239
240    /// Set the identity provider for signing messages.
241    ///
242    /// NOTE: if you change the identity while having update calls in
243    /// flight, you will not be able to [`Agent::request_status_raw`] the status of these
244    /// messages.
245    pub fn set_identity<I>(&mut self, identity: I)
246    where
247        I: 'static + Identity,
248    {
249        self.identity = Arc::new(identity);
250    }
251    /// Set the arc identity provider for signing messages.
252    ///
253    /// NOTE: if you change the identity while having update calls in
254    /// flight, you will not be able to [`Agent::request_status_raw`] the status of these
255    /// messages.
256    pub fn set_arc_identity(&mut self, identity: Arc<dyn Identity>) {
257        self.identity = identity;
258    }
259
260    /// By default, the agent is configured to talk to the main Internet Computer, and verifies
261    /// responses using a hard-coded public key.
262    ///
263    /// This function will instruct the agent to ask the endpoint for its public key, and use
264    /// that instead. This is required when talking to a local test instance, for example.
265    ///
266    /// *Only use this when you are  _not_ talking to the main Internet Computer, otherwise
267    /// you are prone to man-in-the-middle attacks! Do not call this function by default.*
268    pub async fn fetch_root_key(&self) -> Result<(), AgentError> {
269        if self.read_root_key()[..] != IC_ROOT_KEY[..] {
270            // already fetched the root key
271            return Ok(());
272        }
273        let status = self.status().await?;
274        let Some(root_key) = status.root_key else {
275            return Err(AgentError::NoRootKeyInStatus(status));
276        };
277        self.set_root_key(root_key);
278        Ok(())
279    }
280
281    /// By default, the agent is configured to talk to the main Internet Computer, and verifies
282    /// responses using a hard-coded public key.
283    ///
284    /// Using this function you can set the root key to a known one if you know if beforehand.
285    pub fn set_root_key(&self, root_key: Vec<u8>) {
286        *self.root_key.write().unwrap() = root_key;
287    }
288
289    /// Return the root key currently in use.
290    pub fn read_root_key(&self) -> Vec<u8> {
291        self.root_key.read().unwrap().clone()
292    }
293
294    fn get_expiry_date(&self) -> u64 {
295        let expiry_raw = OffsetDateTime::now_utc() + self.ingress_expiry;
296        let mut rounded = expiry_raw.replace_nanosecond(0).unwrap();
297        if self.ingress_expiry.as_secs() > 90 {
298            rounded = rounded.replace_second(0).unwrap();
299        }
300        rounded.unix_timestamp_nanos().try_into().unwrap()
301    }
302
303    /// Return the principal of the identity.
304    pub fn get_principal(&self) -> Result<Principal, String> {
305        self.identity.sender()
306    }
307
308    async fn query_endpoint<A>(
309        &self,
310        effective_canister_id: Principal,
311        serialized_bytes: Vec<u8>,
312    ) -> Result<A, AgentError>
313    where
314        A: serde::de::DeserializeOwned,
315    {
316        let _permit = self.concurrent_requests_semaphore.acquire().await;
317        let bytes = self
318            .execute(
319                Method::POST,
320                &format!("api/v3/canister/{}/query", effective_canister_id.to_text()),
321                Some(serialized_bytes),
322            )
323            .await?
324            .1;
325        serde_cbor::from_slice(&bytes).map_err(AgentError::InvalidCborData)
326    }
327
328    async fn read_state_endpoint<A>(
329        &self,
330        effective_canister_id: Principal,
331        serialized_bytes: Vec<u8>,
332    ) -> Result<A, AgentError>
333    where
334        A: serde::de::DeserializeOwned,
335    {
336        let _permit = self.concurrent_requests_semaphore.acquire().await;
337        let endpoint = format!(
338            "api/v3/canister/{}/read_state",
339            effective_canister_id.to_text()
340        );
341        let bytes = self
342            .execute(Method::POST, &endpoint, Some(serialized_bytes))
343            .await?
344            .1;
345        serde_cbor::from_slice(&bytes).map_err(AgentError::InvalidCborData)
346    }
347
348    async fn read_subnet_state_endpoint<A>(
349        &self,
350        subnet_id: Principal,
351        serialized_bytes: Vec<u8>,
352    ) -> Result<A, AgentError>
353    where
354        A: serde::de::DeserializeOwned,
355    {
356        let _permit = self.concurrent_requests_semaphore.acquire().await;
357        let endpoint = format!("api/v3/subnet/{}/read_state", subnet_id.to_text());
358        let bytes = self
359            .execute(Method::POST, &endpoint, Some(serialized_bytes))
360            .await?
361            .1;
362        serde_cbor::from_slice(&bytes).map_err(AgentError::InvalidCborData)
363    }
364
365    async fn call_endpoint(
366        &self,
367        effective_canister_id: Principal,
368        serialized_bytes: Vec<u8>,
369    ) -> Result<TransportCallResponse, AgentError> {
370        let _permit = self.concurrent_requests_semaphore.acquire().await;
371        let endpoint = format!("api/v4/canister/{}/call", effective_canister_id.to_text());
372        let (status_code, response_body) = self
373            .execute(Method::POST, &endpoint, Some(serialized_bytes))
374            .await?;
375
376        if status_code == StatusCode::ACCEPTED {
377            return Ok(TransportCallResponse::Accepted);
378        }
379
380        serde_cbor::from_slice(&response_body).map_err(AgentError::InvalidCborData)
381    }
382
383    /// The simplest way to do a query call; sends a byte array and will return a byte vector.
384    /// The encoding is left as an exercise to the user.
385    #[allow(clippy::too_many_arguments)]
386    async fn query_raw(
387        &self,
388        canister_id: Principal,
389        effective_canister_id: Principal,
390        method_name: String,
391        arg: Vec<u8>,
392        ingress_expiry_datetime: Option<u64>,
393        use_nonce: bool,
394        explicit_verify_query_signatures: Option<bool>,
395    ) -> Result<Vec<u8>, AgentError> {
396        let operation = Operation::Call {
397            canister: canister_id,
398            method: method_name.clone(),
399        };
400        let content = self.query_content(
401            canister_id,
402            method_name,
403            arg,
404            ingress_expiry_datetime,
405            use_nonce,
406        )?;
407        let serialized_bytes = sign_envelope(&content, self.identity.clone())?;
408        self.query_inner(
409            effective_canister_id,
410            serialized_bytes,
411            content.to_request_id(),
412            explicit_verify_query_signatures,
413            operation,
414        )
415        .await
416    }
417
418    /// Send the signed query to the network. Will return a byte vector.
419    /// The bytes will be checked if it is a valid query.
420    /// If you want to inspect the fields of the query call, use [`signed_query_inspect`] before calling this method.
421    pub async fn query_signed(
422        &self,
423        effective_canister_id: Principal,
424        signed_query: Vec<u8>,
425    ) -> Result<Vec<u8>, AgentError> {
426        let envelope: Envelope =
427            serde_cbor::from_slice(&signed_query).map_err(AgentError::InvalidCborData)?;
428        let EnvelopeContent::Query {
429            canister_id,
430            method_name,
431            ..
432        } = &*envelope.content
433        else {
434            return Err(AgentError::CallDataMismatch {
435                field: "request_type".to_string(),
436                value_arg: "query".to_string(),
437                value_cbor: if matches!(*envelope.content, EnvelopeContent::Call { .. }) {
438                    "update"
439                } else {
440                    "read_state"
441                }
442                .to_string(),
443            });
444        };
445        let operation = Operation::Call {
446            canister: *canister_id,
447            method: method_name.clone(),
448        };
449        self.query_inner(
450            effective_canister_id,
451            signed_query,
452            envelope.content.to_request_id(),
453            None,
454            operation,
455        )
456        .await
457    }
458
459    /// Helper function for performing both the query call and possibly a `read_state` to check the subnet node keys.
460    ///
461    /// This should be used instead of `query_endpoint`. No validation is performed on `signed_query`.
462    async fn query_inner(
463        &self,
464        effective_canister_id: Principal,
465        signed_query: Vec<u8>,
466        request_id: RequestId,
467        explicit_verify_query_signatures: Option<bool>,
468        operation: Operation,
469    ) -> Result<Vec<u8>, AgentError> {
470        let response = if explicit_verify_query_signatures.unwrap_or(self.verify_query_signatures) {
471            let (response, mut subnet) = futures_util::try_join!(
472                self.query_endpoint::<QueryResponse>(effective_canister_id, signed_query),
473                self.get_subnet_by_canister(&effective_canister_id)
474            )?;
475            if response.signatures().is_empty() {
476                return Err(AgentError::MissingSignature);
477            } else if response.signatures().len() > subnet.node_keys.len() {
478                return Err(AgentError::TooManySignatures {
479                    had: response.signatures().len(),
480                    needed: subnet.node_keys.len(),
481                });
482            }
483            for signature in response.signatures() {
484                if OffsetDateTime::now_utc()
485                    - OffsetDateTime::from_unix_timestamp_nanos(signature.timestamp.into()).unwrap()
486                    > self.ingress_expiry
487                {
488                    return Err(AgentError::CertificateOutdated(self.ingress_expiry));
489                }
490                let signable = response.signable(request_id, signature.timestamp);
491                let node_key = if let Some(node_key) = subnet.node_keys.get(&signature.identity) {
492                    node_key
493                } else {
494                    subnet = self
495                        .fetch_subnet_by_canister(&effective_canister_id)
496                        .await?;
497                    subnet
498                        .node_keys
499                        .get(&signature.identity)
500                        .ok_or(AgentError::CertificateNotAuthorized())?
501                };
502                if node_key.len() != 44 {
503                    return Err(AgentError::DerKeyLengthMismatch {
504                        expected: 44,
505                        actual: node_key.len(),
506                    });
507                }
508                const DER_PREFIX: [u8; 12] = [48, 42, 48, 5, 6, 3, 43, 101, 112, 3, 33, 0];
509                if node_key[..12] != DER_PREFIX {
510                    return Err(AgentError::DerPrefixMismatch {
511                        expected: DER_PREFIX.to_vec(),
512                        actual: node_key[..12].to_vec(),
513                    });
514                }
515                let pubkey = PublicKey::deserialize_raw(&node_key[12..])
516                    .map_err(|_| AgentError::MalformedPublicKey)?;
517
518                match pubkey.verify_signature(&signable, &signature.signature[..]) {
519                    Ok(()) => (),
520                    Err(SignatureError::InvalidSignature) => {
521                        return Err(AgentError::QuerySignatureVerificationFailed)
522                    }
523                    Err(SignatureError::InvalidLength) => {
524                        return Err(AgentError::MalformedSignature)
525                    }
526                    _ => unreachable!(),
527                }
528            }
529            response
530        } else {
531            self.query_endpoint::<QueryResponse>(effective_canister_id, signed_query)
532                .await?
533        };
534
535        match response {
536            QueryResponse::Replied { reply, .. } => Ok(reply.arg),
537            QueryResponse::Rejected { reject, .. } => Err(AgentError::UncertifiedReject {
538                reject,
539                operation: Some(operation),
540            }),
541        }
542    }
543
544    fn query_content(
545        &self,
546        canister_id: Principal,
547        method_name: String,
548        arg: Vec<u8>,
549        ingress_expiry_datetime: Option<u64>,
550        use_nonce: bool,
551    ) -> Result<EnvelopeContent, AgentError> {
552        Ok(EnvelopeContent::Query {
553            sender: self.identity.sender().map_err(AgentError::SigningError)?,
554            canister_id,
555            method_name,
556            arg,
557            ingress_expiry: ingress_expiry_datetime.unwrap_or_else(|| self.get_expiry_date()),
558            nonce: use_nonce.then(|| self.nonce_factory.generate()).flatten(),
559        })
560    }
561
562    /// The simplest way to do an update call; sends a byte array and will return a response, [`CallResponse`], from the replica.
563    async fn update_raw(
564        &self,
565        canister_id: Principal,
566        effective_canister_id: Principal,
567        method_name: String,
568        arg: Vec<u8>,
569        ingress_expiry_datetime: Option<u64>,
570    ) -> Result<CallResponse<(Vec<u8>, Certificate)>, AgentError> {
571        let nonce = self.nonce_factory.generate();
572        let content = self.update_content(
573            canister_id,
574            method_name.clone(),
575            arg,
576            ingress_expiry_datetime,
577            nonce,
578        )?;
579        let operation = Some(Operation::Call {
580            canister: canister_id,
581            method: method_name,
582        });
583        let request_id = to_request_id(&content)?;
584        let serialized_bytes = sign_envelope(&content, self.identity.clone())?;
585
586        let response_body = self
587            .call_endpoint(effective_canister_id, serialized_bytes)
588            .await?;
589
590        match response_body {
591            TransportCallResponse::Replied { certificate } => {
592                let certificate =
593                    serde_cbor::from_slice(&certificate).map_err(AgentError::InvalidCborData)?;
594
595                self.verify(&certificate, effective_canister_id)?;
596                let status = lookup_request_status(&certificate, &request_id)?;
597
598                match status {
599                    RequestStatusResponse::Replied(reply) => {
600                        Ok(CallResponse::Response((reply.arg, certificate)))
601                    }
602                    RequestStatusResponse::Rejected(reject_response) => {
603                        Err(AgentError::CertifiedReject {
604                            reject: reject_response,
605                            operation,
606                        })?
607                    }
608                    _ => Ok(CallResponse::Poll(request_id)),
609                }
610            }
611            TransportCallResponse::Accepted => Ok(CallResponse::Poll(request_id)),
612            TransportCallResponse::NonReplicatedRejection(reject_response) => {
613                Err(AgentError::UncertifiedReject {
614                    reject: reject_response,
615                    operation,
616                })
617            }
618        }
619    }
620
621    /// Send the signed update to the network. Will return a [`CallResponse<Vec<u8>>`].
622    /// The bytes will be checked to verify that it is a valid update.
623    /// If you want to inspect the fields of the update, use [`signed_update_inspect`] before calling this method.
624    pub async fn update_signed(
625        &self,
626        effective_canister_id: Principal,
627        signed_update: Vec<u8>,
628    ) -> Result<CallResponse<Vec<u8>>, AgentError> {
629        let envelope: Envelope =
630            serde_cbor::from_slice(&signed_update).map_err(AgentError::InvalidCborData)?;
631        let EnvelopeContent::Call {
632            canister_id,
633            method_name,
634            ..
635        } = &*envelope.content
636        else {
637            return Err(AgentError::CallDataMismatch {
638                field: "request_type".to_string(),
639                value_arg: "update".to_string(),
640                value_cbor: if matches!(*envelope.content, EnvelopeContent::Query { .. }) {
641                    "query"
642                } else {
643                    "read_state"
644                }
645                .to_string(),
646            });
647        };
648        let operation = Some(Operation::Call {
649            canister: *canister_id,
650            method: method_name.clone(),
651        });
652        let request_id = to_request_id(&envelope.content)?;
653
654        let response_body = self
655            .call_endpoint(effective_canister_id, signed_update)
656            .await?;
657
658        match response_body {
659            TransportCallResponse::Replied { certificate } => {
660                let certificate =
661                    serde_cbor::from_slice(&certificate).map_err(AgentError::InvalidCborData)?;
662
663                self.verify(&certificate, effective_canister_id)?;
664                let status = lookup_request_status(&certificate, &request_id)?;
665
666                match status {
667                    RequestStatusResponse::Replied(reply) => Ok(CallResponse::Response(reply.arg)),
668                    RequestStatusResponse::Rejected(reject_response) => {
669                        Err(AgentError::CertifiedReject {
670                            reject: reject_response,
671                            operation,
672                        })?
673                    }
674                    _ => Ok(CallResponse::Poll(request_id)),
675                }
676            }
677            TransportCallResponse::Accepted => Ok(CallResponse::Poll(request_id)),
678            TransportCallResponse::NonReplicatedRejection(reject_response) => {
679                Err(AgentError::UncertifiedReject {
680                    reject: reject_response,
681                    operation,
682                })
683            }
684        }
685    }
686
687    fn update_content(
688        &self,
689        canister_id: Principal,
690        method_name: String,
691        arg: Vec<u8>,
692        ingress_expiry_datetime: Option<u64>,
693        nonce: Option<Vec<u8>>,
694    ) -> Result<EnvelopeContent, AgentError> {
695        Ok(EnvelopeContent::Call {
696            canister_id,
697            method_name,
698            arg,
699            nonce,
700            sender: self.identity.sender().map_err(AgentError::SigningError)?,
701            ingress_expiry: ingress_expiry_datetime.unwrap_or_else(|| self.get_expiry_date()),
702        })
703    }
704
705    fn get_retry_policy(&self) -> ExponentialBackoff<SystemClock> {
706        ExponentialBackoffBuilder::new()
707            .with_initial_interval(Duration::from_millis(500))
708            .with_max_interval(Duration::from_secs(1))
709            .with_multiplier(1.4)
710            .with_max_elapsed_time(Some(self.max_polling_time))
711            .build()
712    }
713
714    /// Wait for `request_status` to return a Replied response and return the arg.
715    pub async fn wait_signed(
716        &self,
717        request_id: &RequestId,
718        effective_canister_id: Principal,
719        signed_request_status: Vec<u8>,
720    ) -> Result<(Vec<u8>, Certificate), AgentError> {
721        let mut retry_policy = self.get_retry_policy();
722
723        let mut request_accepted = false;
724        loop {
725            let (resp, cert) = self
726                .request_status_signed(
727                    request_id,
728                    effective_canister_id,
729                    signed_request_status.clone(),
730                )
731                .await?;
732            match resp {
733                RequestStatusResponse::Unknown => {
734                    // If status is still `Unknown` after 5 minutes, the ingress message is lost.
735                    if retry_policy.get_elapsed_time() > Duration::from_secs(5 * 60) {
736                        return Err(AgentError::TimeoutWaitingForResponse());
737                    }
738                }
739
740                RequestStatusResponse::Received | RequestStatusResponse::Processing => {
741                    if !request_accepted {
742                        retry_policy.reset();
743                        request_accepted = true;
744                    }
745                }
746
747                RequestStatusResponse::Replied(ReplyResponse { arg, .. }) => {
748                    return Ok((arg, cert))
749                }
750
751                RequestStatusResponse::Rejected(response) => {
752                    return Err(AgentError::CertifiedReject {
753                        reject: response,
754                        operation: None,
755                    })
756                }
757
758                RequestStatusResponse::Done => {
759                    return Err(AgentError::RequestStatusDoneNoReply(String::from(
760                        *request_id,
761                    )))
762                }
763            };
764
765            match retry_policy.next_backoff() {
766                Some(duration) => crate::util::sleep(duration).await,
767
768                None => return Err(AgentError::TimeoutWaitingForResponse()),
769            }
770        }
771    }
772
773    /// Call `request_status` on the `RequestId` in a loop and return the response as a byte vector.
774    pub async fn wait(
775        &self,
776        request_id: &RequestId,
777        effective_canister_id: Principal,
778    ) -> Result<(Vec<u8>, Certificate), AgentError> {
779        self.wait_inner(request_id, effective_canister_id, None)
780            .await
781    }
782
783    async fn wait_inner(
784        &self,
785        request_id: &RequestId,
786        effective_canister_id: Principal,
787        operation: Option<Operation>,
788    ) -> Result<(Vec<u8>, Certificate), AgentError> {
789        let mut retry_policy = self.get_retry_policy();
790
791        let mut request_accepted = false;
792        loop {
793            let (resp, cert) = self
794                .request_status_raw(request_id, effective_canister_id)
795                .await?;
796            match resp {
797                RequestStatusResponse::Unknown => {
798                    // If status is still `Unknown` after 5 minutes, the ingress message is lost.
799                    if retry_policy.get_elapsed_time() > Duration::from_secs(5 * 60) {
800                        return Err(AgentError::TimeoutWaitingForResponse());
801                    }
802                }
803
804                RequestStatusResponse::Received | RequestStatusResponse::Processing => {
805                    if !request_accepted {
806                        // The system will return RequestStatusResponse::Unknown
807                        // until the request is accepted
808                        // and we generally cannot know how long that will take.
809                        // State transitions between Received and Processing may be
810                        // instantaneous. Therefore, once we know the request is accepted,
811                        // we should restart the backoff so the request does not time out.
812
813                        retry_policy.reset();
814                        request_accepted = true;
815                    }
816                }
817
818                RequestStatusResponse::Replied(ReplyResponse { arg, .. }) => {
819                    return Ok((arg, cert))
820                }
821
822                RequestStatusResponse::Rejected(response) => {
823                    return Err(AgentError::CertifiedReject {
824                        reject: response,
825                        operation,
826                    })
827                }
828
829                RequestStatusResponse::Done => {
830                    return Err(AgentError::RequestStatusDoneNoReply(String::from(
831                        *request_id,
832                    )))
833                }
834            };
835
836            match retry_policy.next_backoff() {
837                Some(duration) => crate::util::sleep(duration).await,
838
839                None => return Err(AgentError::TimeoutWaitingForResponse()),
840            }
841        }
842    }
843
844    /// Request the raw state tree directly, under an effective canister ID.
845    /// See [the protocol docs](https://internetcomputer.org/docs/current/references/ic-interface-spec#http-read-state) for more information.
846    pub async fn read_state_raw(
847        &self,
848        paths: Vec<Vec<Label>>,
849        effective_canister_id: Principal,
850    ) -> Result<Certificate, AgentError> {
851        let content = self.read_state_content(paths)?;
852        let serialized_bytes = sign_envelope(&content, self.identity.clone())?;
853
854        let read_state_response: ReadStateResponse = self
855            .read_state_endpoint(effective_canister_id, serialized_bytes)
856            .await?;
857        let cert: Certificate = serde_cbor::from_slice(&read_state_response.certificate)
858            .map_err(AgentError::InvalidCborData)?;
859        self.verify(&cert, effective_canister_id)?;
860        Ok(cert)
861    }
862
863    /// Request the raw state tree directly, under a subnet ID.
864    /// See [the protocol docs](https://internetcomputer.org/docs/current/references/ic-interface-spec#http-read-state) for more information.
865    pub async fn read_subnet_state_raw(
866        &self,
867        paths: Vec<Vec<Label>>,
868        subnet_id: Principal,
869    ) -> Result<Certificate, AgentError> {
870        let content = self.read_state_content(paths)?;
871        let serialized_bytes = sign_envelope(&content, self.identity.clone())?;
872
873        let read_state_response: ReadStateResponse = self
874            .read_subnet_state_endpoint(subnet_id, serialized_bytes)
875            .await?;
876        let cert: Certificate = serde_cbor::from_slice(&read_state_response.certificate)
877            .map_err(AgentError::InvalidCborData)?;
878        self.verify_for_subnet(&cert, subnet_id)?;
879        Ok(cert)
880    }
881
882    fn read_state_content(&self, paths: Vec<Vec<Label>>) -> Result<EnvelopeContent, AgentError> {
883        Ok(EnvelopeContent::ReadState {
884            sender: self.identity.sender().map_err(AgentError::SigningError)?,
885            paths,
886            ingress_expiry: self.get_expiry_date(),
887        })
888    }
889
890    /// Verify a certificate, checking delegation if present.
891    /// Only passes if the certificate also has authority over the canister.
892    pub fn verify(
893        &self,
894        cert: &Certificate,
895        effective_canister_id: Principal,
896    ) -> Result<(), AgentError> {
897        self.verify_cert(cert, effective_canister_id)?;
898        self.verify_cert_timestamp(cert)?;
899        Ok(())
900    }
901
902    fn verify_cert(
903        &self,
904        cert: &Certificate,
905        effective_canister_id: Principal,
906    ) -> Result<(), AgentError> {
907        let sig = &cert.signature;
908
909        let root_hash = cert.tree.digest();
910        let mut msg = vec![];
911        msg.extend_from_slice(IC_STATE_ROOT_DOMAIN_SEPARATOR);
912        msg.extend_from_slice(&root_hash);
913
914        let der_key = self.check_delegation(&cert.delegation, effective_canister_id)?;
915        let key = extract_der(der_key)?;
916
917        ic_verify_bls_signature::verify_bls_signature(sig, &msg, &key)
918            .map_err(|_| AgentError::CertificateVerificationFailed())?;
919        Ok(())
920    }
921
922    /// Verify a certificate, checking delegation if present.
923    /// Only passes if the certificate is for the specified subnet.
924    pub fn verify_for_subnet(
925        &self,
926        cert: &Certificate,
927        subnet_id: Principal,
928    ) -> Result<(), AgentError> {
929        self.verify_cert_for_subnet(cert, subnet_id)?;
930        self.verify_cert_timestamp(cert)?;
931        Ok(())
932    }
933
934    fn verify_cert_for_subnet(
935        &self,
936        cert: &Certificate,
937        subnet_id: Principal,
938    ) -> Result<(), AgentError> {
939        let sig = &cert.signature;
940
941        let root_hash = cert.tree.digest();
942        let mut msg = vec![];
943        msg.extend_from_slice(IC_STATE_ROOT_DOMAIN_SEPARATOR);
944        msg.extend_from_slice(&root_hash);
945
946        let der_key = self.check_delegation_for_subnet(&cert.delegation, subnet_id)?;
947        let key = extract_der(der_key)?;
948
949        ic_verify_bls_signature::verify_bls_signature(sig, &msg, &key)
950            .map_err(|_| AgentError::CertificateVerificationFailed())?;
951        Ok(())
952    }
953
954    fn verify_cert_timestamp(&self, cert: &Certificate) -> Result<(), AgentError> {
955        // Verify that the certificate is not older than ingress expiry
956        // Certificates with timestamps in the future are allowed
957        let time = lookup_time(cert)?;
958        if (OffsetDateTime::now_utc()
959            - OffsetDateTime::from_unix_timestamp_nanos(time.into()).unwrap())
960            > self.ingress_expiry
961        {
962            Err(AgentError::CertificateOutdated(self.ingress_expiry))
963        } else {
964            Ok(())
965        }
966    }
967
968    fn check_delegation(
969        &self,
970        delegation: &Option<Delegation>,
971        effective_canister_id: Principal,
972    ) -> Result<Vec<u8>, AgentError> {
973        match delegation {
974            None => Ok(self.read_root_key()),
975            Some(delegation) => {
976                let cert: Certificate = serde_cbor::from_slice(&delegation.certificate)
977                    .map_err(AgentError::InvalidCborData)?;
978                if cert.delegation.is_some() {
979                    return Err(AgentError::CertificateHasTooManyDelegations);
980                }
981                self.verify_cert(&cert, effective_canister_id)?;
982                let canister_range_shards_lookup =
983                    ["canister_ranges".as_bytes(), delegation.subnet_id.as_ref()];
984                let canister_range_shards = lookup_tree(&cert.tree, canister_range_shards_lookup)?;
985                let mut shard_paths = canister_range_shards
986                    .list_paths() // /canister_ranges/<subnet_id>/<shard>
987                    .into_iter()
988                    .map(|mut x| {
989                        x.pop() // flatten [label] to label
990                            .ok_or_else(AgentError::CertificateVerificationFailed)
991                    })
992                    .collect::<Result<Vec<_>, _>>()?;
993                if shard_paths.is_empty() {
994                    return Err(AgentError::CertificateNotAuthorized());
995                }
996                shard_paths.sort_unstable();
997                let shard_division = shard_paths
998                    .partition_point(|shard| shard.as_bytes() <= effective_canister_id.as_slice());
999                if shard_division == 0 {
1000                    // the certificate is not authorized to answer calls for this canister
1001                    return Err(AgentError::CertificateNotAuthorized());
1002                }
1003                let max_potential_shard = &shard_paths[shard_division - 1];
1004                let canister_range_lookup = [max_potential_shard.as_bytes()];
1005                let canister_range = lookup_value(&canister_range_shards, canister_range_lookup)?;
1006                let ranges: Vec<(Principal, Principal)> =
1007                    serde_cbor::from_slice(canister_range).map_err(AgentError::InvalidCborData)?;
1008                if !principal_is_within_ranges(&effective_canister_id, &ranges[..]) {
1009                    // the certificate is not authorized to answer calls for this canister
1010                    return Err(AgentError::CertificateNotAuthorized());
1011                }
1012
1013                let public_key_path = [
1014                    "subnet".as_bytes(),
1015                    delegation.subnet_id.as_ref(),
1016                    "public_key".as_bytes(),
1017                ];
1018                lookup_value(&cert.tree, public_key_path).map(<[u8]>::to_vec)
1019            }
1020        }
1021    }
1022
1023    fn check_delegation_for_subnet(
1024        &self,
1025        delegation: &Option<Delegation>,
1026        subnet_id: Principal,
1027    ) -> Result<Vec<u8>, AgentError> {
1028        match delegation {
1029            None => Ok(self.read_root_key()),
1030            Some(delegation) => {
1031                let cert: Certificate = serde_cbor::from_slice(&delegation.certificate)
1032                    .map_err(AgentError::InvalidCborData)?;
1033                if cert.delegation.is_some() {
1034                    return Err(AgentError::CertificateHasTooManyDelegations);
1035                }
1036                self.verify_cert_for_subnet(&cert, subnet_id)?;
1037                let public_key_path = [
1038                    "subnet".as_bytes(),
1039                    subnet_id.as_ref(),
1040                    "public_key".as_bytes(),
1041                ];
1042                let pk = lookup_value(&cert.tree, public_key_path)
1043                    .map_err(|_| AgentError::CertificateNotAuthorized())?
1044                    .to_vec();
1045                Ok(pk)
1046            }
1047        }
1048    }
1049
1050    /// Request information about a particular canister for a single state subkey.
1051    /// See [the protocol docs](https://internetcomputer.org/docs/current/references/ic-interface-spec#state-tree-canister-information) for more information.
1052    pub async fn read_state_canister_info(
1053        &self,
1054        canister_id: Principal,
1055        path: &str,
1056    ) -> Result<Vec<u8>, AgentError> {
1057        let paths: Vec<Vec<Label>> = vec![vec![
1058            "canister".into(),
1059            Label::from_bytes(canister_id.as_slice()),
1060            path.into(),
1061        ]];
1062
1063        let cert = self.read_state_raw(paths, canister_id).await?;
1064
1065        lookup_canister_info(cert, canister_id, path)
1066    }
1067
1068    /// Request the controller list of a given canister.
1069    pub async fn read_state_canister_controllers(
1070        &self,
1071        canister_id: Principal,
1072    ) -> Result<Vec<Principal>, AgentError> {
1073        let blob = self
1074            .read_state_canister_info(canister_id, "controllers")
1075            .await?;
1076        let controllers: Vec<Principal> =
1077            serde_cbor::from_slice(&blob).map_err(AgentError::InvalidCborData)?;
1078        Ok(controllers)
1079    }
1080
1081    /// Request the module hash of a given canister.
1082    pub async fn read_state_canister_module_hash(
1083        &self,
1084        canister_id: Principal,
1085    ) -> Result<Vec<u8>, AgentError> {
1086        self.read_state_canister_info(canister_id, "module_hash")
1087            .await
1088    }
1089
1090    /// Request the bytes of the canister's custom section `icp:public <path>` or `icp:private <path>`.
1091    pub async fn read_state_canister_metadata(
1092        &self,
1093        canister_id: Principal,
1094        path: &str,
1095    ) -> Result<Vec<u8>, AgentError> {
1096        let paths: Vec<Vec<Label>> = vec![vec![
1097            "canister".into(),
1098            Label::from_bytes(canister_id.as_slice()),
1099            "metadata".into(),
1100            path.into(),
1101        ]];
1102
1103        let cert = self.read_state_raw(paths, canister_id).await?;
1104
1105        lookup_canister_metadata(cert, canister_id, path)
1106    }
1107
1108    /// Request a list of metrics about the subnet.
1109    pub async fn read_state_subnet_metrics(
1110        &self,
1111        subnet_id: Principal,
1112    ) -> Result<SubnetMetrics, AgentError> {
1113        let paths = vec![vec![
1114            "subnet".into(),
1115            Label::from_bytes(subnet_id.as_slice()),
1116            "metrics".into(),
1117        ]];
1118        let cert = self.read_subnet_state_raw(paths, subnet_id).await?;
1119        lookup_subnet_metrics(cert, subnet_id)
1120    }
1121
1122    /// Request a list of metrics about the subnet.
1123    pub async fn read_state_subnet_canister_ranges(
1124        &self,
1125        subnet_id: Principal,
1126    ) -> Result<Vec<(Principal, Principal)>, AgentError> {
1127        let paths = vec![vec![
1128            "subnet".into(),
1129            Label::from_bytes(subnet_id.as_slice()),
1130            "canister_ranges".into(),
1131        ]];
1132        let cert = self.read_subnet_state_raw(paths, subnet_id).await?;
1133        lookup_subnet_canister_ranges(&cert, subnet_id)
1134    }
1135
1136    /// Fetches the status of a particular request by its ID.
1137    pub async fn request_status_raw(
1138        &self,
1139        request_id: &RequestId,
1140        effective_canister_id: Principal,
1141    ) -> Result<(RequestStatusResponse, Certificate), AgentError> {
1142        let paths: Vec<Vec<Label>> =
1143            vec![vec!["request_status".into(), request_id.to_vec().into()]];
1144
1145        let cert = self.read_state_raw(paths, effective_canister_id).await?;
1146
1147        Ok((lookup_request_status(&cert, request_id)?, cert))
1148    }
1149
1150    /// Send the signed `request_status` to the network. Will return [`RequestStatusResponse`].
1151    /// The bytes will be checked to verify that it is a valid `request_status`.
1152    /// If you want to inspect the fields of the `request_status`, use [`signed_request_status_inspect`] before calling this method.
1153    pub async fn request_status_signed(
1154        &self,
1155        request_id: &RequestId,
1156        effective_canister_id: Principal,
1157        signed_request_status: Vec<u8>,
1158    ) -> Result<(RequestStatusResponse, Certificate), AgentError> {
1159        let _envelope: Envelope =
1160            serde_cbor::from_slice(&signed_request_status).map_err(AgentError::InvalidCborData)?;
1161        let read_state_response: ReadStateResponse = self
1162            .read_state_endpoint(effective_canister_id, signed_request_status)
1163            .await?;
1164
1165        let cert: Certificate = serde_cbor::from_slice(&read_state_response.certificate)
1166            .map_err(AgentError::InvalidCborData)?;
1167        self.verify(&cert, effective_canister_id)?;
1168        Ok((lookup_request_status(&cert, request_id)?, cert))
1169    }
1170
1171    /// Returns an `UpdateBuilder` enabling the construction of an update call without
1172    /// passing all arguments.
1173    pub fn update<S: Into<String>>(
1174        &self,
1175        canister_id: &Principal,
1176        method_name: S,
1177    ) -> UpdateBuilder<'_> {
1178        UpdateBuilder::new(self, *canister_id, method_name.into())
1179    }
1180
1181    /// Calls and returns the information returned by the status endpoint of a replica.
1182    pub async fn status(&self) -> Result<Status, AgentError> {
1183        let endpoint = "api/v2/status";
1184        let bytes = self.execute(Method::GET, endpoint, None).await?.1;
1185
1186        let cbor: serde_cbor::Value =
1187            serde_cbor::from_slice(&bytes).map_err(AgentError::InvalidCborData)?;
1188
1189        Status::try_from(&cbor).map_err(|_| AgentError::InvalidReplicaStatus)
1190    }
1191
1192    /// Returns a `QueryBuilder` enabling the construction of a query call without
1193    /// passing all arguments.
1194    pub fn query<S: Into<String>>(
1195        &self,
1196        canister_id: &Principal,
1197        method_name: S,
1198    ) -> QueryBuilder<'_> {
1199        QueryBuilder::new(self, *canister_id, method_name.into())
1200    }
1201
1202    /// Sign a `request_status` call. This will return a [`signed::SignedRequestStatus`]
1203    /// which contains all fields of the `request_status` and the signed `request_status` in CBOR encoding
1204    pub fn sign_request_status(
1205        &self,
1206        effective_canister_id: Principal,
1207        request_id: RequestId,
1208    ) -> Result<SignedRequestStatus, AgentError> {
1209        let paths: Vec<Vec<Label>> =
1210            vec![vec!["request_status".into(), request_id.to_vec().into()]];
1211        let read_state_content = self.read_state_content(paths)?;
1212        let signed_request_status = sign_envelope(&read_state_content, self.identity.clone())?;
1213        let ingress_expiry = read_state_content.ingress_expiry();
1214        let sender = *read_state_content.sender();
1215        Ok(SignedRequestStatus {
1216            ingress_expiry,
1217            sender,
1218            effective_canister_id,
1219            request_id,
1220            signed_request_status,
1221        })
1222    }
1223
1224    /// Retrieve subnet information for a canister. This uses an internal five-minute cache, fresh data can
1225    /// be fetched with [`fetch_subnet_by_canister`](Self::fetch_subnet_by_canister).
1226    pub async fn get_subnet_by_canister(
1227        &self,
1228        canister: &Principal,
1229    ) -> Result<Arc<Subnet>, AgentError> {
1230        let subnet = self
1231            .subnet_key_cache
1232            .lock()
1233            .unwrap()
1234            .get_subnet_by_canister(canister);
1235        if let Some(subnet) = subnet {
1236            Ok(subnet)
1237        } else {
1238            self.fetch_subnet_by_canister(canister).await
1239        }
1240    }
1241
1242    /// Retrieve subnet information for a subnet ID. This uses an internal five-minute cache, fresh data can
1243    /// be fetched with [`fetch_subnet_by_id`](Self::fetch_subnet_by_id).
1244    pub async fn get_subnet_by_id(&self, subnet_id: &Principal) -> Result<Arc<Subnet>, AgentError> {
1245        let subnet = self
1246            .subnet_key_cache
1247            .lock()
1248            .unwrap()
1249            .get_subnet_by_id(subnet_id);
1250        if let Some(subnet) = subnet {
1251            Ok(subnet)
1252        } else {
1253            self.fetch_subnet_by_id(subnet_id).await
1254        }
1255    }
1256
1257    /// Retrieve all existing API boundary nodes from the state tree via endpoint `/api/v3/canister/<effective_canister_id>/read_state`
1258    pub async fn fetch_api_boundary_nodes_by_canister_id(
1259        &self,
1260        canister_id: Principal,
1261    ) -> Result<Vec<ApiBoundaryNode>, AgentError> {
1262        let paths = vec![vec!["api_boundary_nodes".into()]];
1263        let certificate = self.read_state_raw(paths, canister_id).await?;
1264        let api_boundary_nodes = lookup_api_boundary_nodes(certificate)?;
1265        Ok(api_boundary_nodes)
1266    }
1267
1268    /// Retrieve all existing API boundary nodes from the state tree via endpoint `/api/v3/subnet/<subnet_id>/read_state`
1269    pub async fn fetch_api_boundary_nodes_by_subnet_id(
1270        &self,
1271        subnet_id: Principal,
1272    ) -> Result<Vec<ApiBoundaryNode>, AgentError> {
1273        let paths = vec![vec!["api_boundary_nodes".into()]];
1274        let certificate = self.read_subnet_state_raw(paths, subnet_id).await?;
1275        let api_boundary_nodes = lookup_api_boundary_nodes(certificate)?;
1276        Ok(api_boundary_nodes)
1277    }
1278
1279    /// Fetches and caches the subnet information for a canister.
1280    ///
1281    /// This function does not read from the cache; most users want
1282    /// [`get_subnet_by_canister`](Self::get_subnet_by_canister) instead.
1283    pub async fn fetch_subnet_by_canister(
1284        &self,
1285        canister: &Principal,
1286    ) -> Result<Arc<Subnet>, AgentError> {
1287        let canister_cert = self
1288            .read_state_raw(vec![vec!["subnet".into()]], *canister)
1289            .await?;
1290        let subnet_id = if let Some(delegation) = canister_cert.delegation.as_ref() {
1291            Principal::from_slice(&delegation.subnet_id)
1292        } else {
1293            // if no delegation, it comes from the root subnet
1294            Principal::self_authenticating(&self.root_key.read().unwrap()[..])
1295        };
1296        let mut subnet = lookup_incomplete_subnet(&subnet_id, &canister_cert)?;
1297        let canister_ranges = if let Some(delegation) = canister_cert.delegation.as_ref() {
1298            // non-root subnets will not serve /subnet/<>/canister_ranges when looked up by canister, but their delegation will contain /canister_ranges
1299            let delegation_cert: Certificate = serde_cbor::from_slice(&delegation.certificate)?;
1300            lookup_canister_ranges(&subnet_id, &delegation_cert)?
1301        } else {
1302            lookup_canister_ranges(&subnet_id, &canister_cert)?
1303        };
1304        subnet.canister_ranges = canister_ranges;
1305        if !subnet.canister_ranges.contains(canister) {
1306            return Err(AgentError::CertificateNotAuthorized());
1307        }
1308        let subnet = Arc::new(subnet);
1309        self.subnet_key_cache
1310            .lock()
1311            .unwrap()
1312            .insert_subnet(subnet_id, subnet.clone());
1313        Ok(subnet)
1314    }
1315
1316    /// Fetches and caches the subnet information for a subnet ID.
1317    ///
1318    /// This function does not read from the cache; most users want
1319    /// [`get_subnet_by_id`](Self::get_subnet_by_id) instead.
1320    pub async fn fetch_subnet_by_id(
1321        &self,
1322        subnet_id: &Principal,
1323    ) -> Result<Arc<Subnet>, AgentError> {
1324        let subnet_cert = self
1325            .read_subnet_state_raw(
1326                vec![
1327                    vec!["canister_ranges".into(), subnet_id.as_slice().into()],
1328                    vec!["subnet".into(), subnet_id.as_slice().into()],
1329                ],
1330                *subnet_id,
1331            )
1332            .await?;
1333        let subnet = lookup_subnet_and_ranges(subnet_id, &subnet_cert)?;
1334        let subnet = Arc::new(subnet);
1335        self.subnet_key_cache
1336            .lock()
1337            .unwrap()
1338            .insert_subnet(*subnet_id, subnet.clone());
1339        Ok(subnet)
1340    }
1341
1342    async fn request(
1343        &self,
1344        method: Method,
1345        endpoint: &str,
1346        body: Option<Vec<u8>>,
1347    ) -> Result<(StatusCode, HeaderMap, Vec<u8>), AgentError> {
1348        let body = body.map(Bytes::from);
1349
1350        let create_request_with_generated_url = || -> Result<http::Request<Bytes>, AgentError> {
1351            let url = self.route_provider.route()?.join(endpoint)?;
1352            let uri = Uri::from_str(url.as_str())
1353                .map_err(|e| AgentError::InvalidReplicaUrl(e.to_string()))?;
1354            let body = body.clone().unwrap_or_default();
1355            let request = http::Request::builder()
1356                .method(method.clone())
1357                .uri(uri)
1358                .header(CONTENT_TYPE, "application/cbor")
1359                .body(body)
1360                .map_err(|e| {
1361                    AgentError::TransportError(TransportError::Generic(format!(
1362                        "unable to create request: {e:#}"
1363                    )))
1364                })?;
1365
1366            Ok(request)
1367        };
1368
1369        let response = self
1370            .client
1371            .call(
1372                &create_request_with_generated_url,
1373                self.max_tcp_error_retries,
1374                self.max_response_body_size,
1375            )
1376            .await?;
1377
1378        let (parts, body) = response.into_parts();
1379
1380        Ok((parts.status, parts.headers, body.to_vec()))
1381    }
1382
1383    async fn execute(
1384        &self,
1385        method: Method,
1386        endpoint: &str,
1387        body: Option<Vec<u8>>,
1388    ) -> Result<(StatusCode, Vec<u8>), AgentError> {
1389        let request_result = self.request(method.clone(), endpoint, body.clone()).await?;
1390
1391        let status = request_result.0;
1392        let headers = request_result.1;
1393        let body = request_result.2;
1394
1395        if status.is_client_error() || status.is_server_error() {
1396            Err(AgentError::HttpError(HttpErrorPayload {
1397                status: status.into(),
1398                content_type: headers
1399                    .get(CONTENT_TYPE)
1400                    .and_then(|value| value.to_str().ok())
1401                    .map(str::to_string),
1402                content: body,
1403            }))
1404        } else if !(status == StatusCode::OK || status == StatusCode::ACCEPTED) {
1405            Err(AgentError::InvalidHttpResponse(format!(
1406                "Expected `200`, `202`, 4xx`, or `5xx` HTTP status code. Got: {status}",
1407            )))
1408        } else {
1409            Ok((status, body))
1410        }
1411    }
1412}
1413
1414// Checks if a principal is contained within a list of principal ranges
1415// A range is a tuple: (low: Principal, high: Principal), as described here: https://internetcomputer.org/docs/current/references/ic-interface-spec#state-tree-subnet
1416fn principal_is_within_ranges(principal: &Principal, ranges: &[(Principal, Principal)]) -> bool {
1417    ranges
1418        .iter()
1419        .any(|r| principal >= &r.0 && principal <= &r.1)
1420}
1421
1422fn sign_envelope(
1423    content: &EnvelopeContent,
1424    identity: Arc<dyn Identity>,
1425) -> Result<Vec<u8>, AgentError> {
1426    let signature = identity.sign(content).map_err(AgentError::SigningError)?;
1427
1428    let envelope = Envelope {
1429        content: Cow::Borrowed(content),
1430        sender_pubkey: signature.public_key,
1431        sender_sig: signature.signature,
1432        sender_delegation: signature.delegations,
1433    };
1434
1435    let mut serialized_bytes = Vec::new();
1436    let mut serializer = serde_cbor::Serializer::new(&mut serialized_bytes);
1437    serializer.self_describe()?;
1438    envelope.serialize(&mut serializer)?;
1439
1440    Ok(serialized_bytes)
1441}
1442
1443/// Inspect the bytes to be sent as a query
1444/// Return Ok only when the bytes can be deserialized as a query and all fields match with the arguments
1445pub fn signed_query_inspect(
1446    sender: Principal,
1447    canister_id: Principal,
1448    method_name: &str,
1449    arg: &[u8],
1450    ingress_expiry: u64,
1451    signed_query: Vec<u8>,
1452) -> Result<(), AgentError> {
1453    let envelope: Envelope =
1454        serde_cbor::from_slice(&signed_query).map_err(AgentError::InvalidCborData)?;
1455    match envelope.content.as_ref() {
1456        EnvelopeContent::Query {
1457            ingress_expiry: ingress_expiry_cbor,
1458            sender: sender_cbor,
1459            canister_id: canister_id_cbor,
1460            method_name: method_name_cbor,
1461            arg: arg_cbor,
1462            nonce: _nonce,
1463        } => {
1464            if ingress_expiry != *ingress_expiry_cbor {
1465                return Err(AgentError::CallDataMismatch {
1466                    field: "ingress_expiry".to_string(),
1467                    value_arg: ingress_expiry.to_string(),
1468                    value_cbor: ingress_expiry_cbor.to_string(),
1469                });
1470            }
1471            if sender != *sender_cbor {
1472                return Err(AgentError::CallDataMismatch {
1473                    field: "sender".to_string(),
1474                    value_arg: sender.to_string(),
1475                    value_cbor: sender_cbor.to_string(),
1476                });
1477            }
1478            if canister_id != *canister_id_cbor {
1479                return Err(AgentError::CallDataMismatch {
1480                    field: "canister_id".to_string(),
1481                    value_arg: canister_id.to_string(),
1482                    value_cbor: canister_id_cbor.to_string(),
1483                });
1484            }
1485            if method_name != *method_name_cbor {
1486                return Err(AgentError::CallDataMismatch {
1487                    field: "method_name".to_string(),
1488                    value_arg: method_name.to_string(),
1489                    value_cbor: method_name_cbor.clone(),
1490                });
1491            }
1492            if arg != *arg_cbor {
1493                return Err(AgentError::CallDataMismatch {
1494                    field: "arg".to_string(),
1495                    value_arg: format!("{arg:?}"),
1496                    value_cbor: format!("{arg_cbor:?}"),
1497                });
1498            }
1499        }
1500        EnvelopeContent::Call { .. } => {
1501            return Err(AgentError::CallDataMismatch {
1502                field: "request_type".to_string(),
1503                value_arg: "query".to_string(),
1504                value_cbor: "call".to_string(),
1505            })
1506        }
1507        EnvelopeContent::ReadState { .. } => {
1508            return Err(AgentError::CallDataMismatch {
1509                field: "request_type".to_string(),
1510                value_arg: "query".to_string(),
1511                value_cbor: "read_state".to_string(),
1512            })
1513        }
1514    }
1515    Ok(())
1516}
1517
1518/// Inspect the bytes to be sent as an update
1519/// Return Ok only when the bytes can be deserialized as an update and all fields match with the arguments
1520pub fn signed_update_inspect(
1521    sender: Principal,
1522    canister_id: Principal,
1523    method_name: &str,
1524    arg: &[u8],
1525    ingress_expiry: u64,
1526    signed_update: Vec<u8>,
1527) -> Result<(), AgentError> {
1528    let envelope: Envelope =
1529        serde_cbor::from_slice(&signed_update).map_err(AgentError::InvalidCborData)?;
1530    match envelope.content.as_ref() {
1531        EnvelopeContent::Call {
1532            nonce: _nonce,
1533            ingress_expiry: ingress_expiry_cbor,
1534            sender: sender_cbor,
1535            canister_id: canister_id_cbor,
1536            method_name: method_name_cbor,
1537            arg: arg_cbor,
1538        } => {
1539            if ingress_expiry != *ingress_expiry_cbor {
1540                return Err(AgentError::CallDataMismatch {
1541                    field: "ingress_expiry".to_string(),
1542                    value_arg: ingress_expiry.to_string(),
1543                    value_cbor: ingress_expiry_cbor.to_string(),
1544                });
1545            }
1546            if sender != *sender_cbor {
1547                return Err(AgentError::CallDataMismatch {
1548                    field: "sender".to_string(),
1549                    value_arg: sender.to_string(),
1550                    value_cbor: sender_cbor.to_string(),
1551                });
1552            }
1553            if canister_id != *canister_id_cbor {
1554                return Err(AgentError::CallDataMismatch {
1555                    field: "canister_id".to_string(),
1556                    value_arg: canister_id.to_string(),
1557                    value_cbor: canister_id_cbor.to_string(),
1558                });
1559            }
1560            if method_name != *method_name_cbor {
1561                return Err(AgentError::CallDataMismatch {
1562                    field: "method_name".to_string(),
1563                    value_arg: method_name.to_string(),
1564                    value_cbor: method_name_cbor.clone(),
1565                });
1566            }
1567            if arg != *arg_cbor {
1568                return Err(AgentError::CallDataMismatch {
1569                    field: "arg".to_string(),
1570                    value_arg: format!("{arg:?}"),
1571                    value_cbor: format!("{arg_cbor:?}"),
1572                });
1573            }
1574        }
1575        EnvelopeContent::ReadState { .. } => {
1576            return Err(AgentError::CallDataMismatch {
1577                field: "request_type".to_string(),
1578                value_arg: "call".to_string(),
1579                value_cbor: "read_state".to_string(),
1580            })
1581        }
1582        EnvelopeContent::Query { .. } => {
1583            return Err(AgentError::CallDataMismatch {
1584                field: "request_type".to_string(),
1585                value_arg: "call".to_string(),
1586                value_cbor: "query".to_string(),
1587            })
1588        }
1589    }
1590    Ok(())
1591}
1592
1593/// Inspect the bytes to be sent as a `request_status`
1594/// Return Ok only when the bytes can be deserialized as a `request_status` and all fields match with the arguments
1595pub fn signed_request_status_inspect(
1596    sender: Principal,
1597    request_id: &RequestId,
1598    ingress_expiry: u64,
1599    signed_request_status: Vec<u8>,
1600) -> Result<(), AgentError> {
1601    let paths: Vec<Vec<Label>> = vec![vec!["request_status".into(), request_id.to_vec().into()]];
1602    let envelope: Envelope =
1603        serde_cbor::from_slice(&signed_request_status).map_err(AgentError::InvalidCborData)?;
1604    match envelope.content.as_ref() {
1605        EnvelopeContent::ReadState {
1606            ingress_expiry: ingress_expiry_cbor,
1607            sender: sender_cbor,
1608            paths: paths_cbor,
1609        } => {
1610            if ingress_expiry != *ingress_expiry_cbor {
1611                return Err(AgentError::CallDataMismatch {
1612                    field: "ingress_expiry".to_string(),
1613                    value_arg: ingress_expiry.to_string(),
1614                    value_cbor: ingress_expiry_cbor.to_string(),
1615                });
1616            }
1617            if sender != *sender_cbor {
1618                return Err(AgentError::CallDataMismatch {
1619                    field: "sender".to_string(),
1620                    value_arg: sender.to_string(),
1621                    value_cbor: sender_cbor.to_string(),
1622                });
1623            }
1624
1625            if paths != *paths_cbor {
1626                return Err(AgentError::CallDataMismatch {
1627                    field: "paths".to_string(),
1628                    value_arg: format!("{paths:?}"),
1629                    value_cbor: format!("{paths_cbor:?}"),
1630                });
1631            }
1632        }
1633        EnvelopeContent::Query { .. } => {
1634            return Err(AgentError::CallDataMismatch {
1635                field: "request_type".to_string(),
1636                value_arg: "read_state".to_string(),
1637                value_cbor: "query".to_string(),
1638            })
1639        }
1640        EnvelopeContent::Call { .. } => {
1641            return Err(AgentError::CallDataMismatch {
1642                field: "request_type".to_string(),
1643                value_arg: "read_state".to_string(),
1644                value_cbor: "call".to_string(),
1645            })
1646        }
1647    }
1648    Ok(())
1649}
1650
1651#[derive(Clone)]
1652struct SubnetCache {
1653    subnets: TimedCache<Principal, Arc<Subnet>>,
1654    canister_index: RangeInclusiveMap<Principal, Principal, PrincipalStep>,
1655}
1656
1657impl SubnetCache {
1658    fn new() -> Self {
1659        Self {
1660            subnets: TimedCache::with_lifespan(Duration::from_secs(300)),
1661            canister_index: RangeInclusiveMap::new_with_step_fns(),
1662        }
1663    }
1664
1665    fn get_subnet_by_canister(&mut self, canister: &Principal) -> Option<Arc<Subnet>> {
1666        self.canister_index
1667            .get(canister)
1668            .and_then(|subnet_id| self.subnets.cache_get(subnet_id).cloned())
1669            .filter(|subnet| subnet.canister_ranges.contains(canister))
1670    }
1671
1672    fn get_subnet_by_id(&mut self, subnet_id: &Principal) -> Option<Arc<Subnet>> {
1673        self.subnets.cache_get(subnet_id).cloned()
1674    }
1675
1676    fn insert_subnet(&mut self, subnet_id: Principal, subnet: Arc<Subnet>) {
1677        self.subnets.cache_set(subnet_id, subnet.clone());
1678        for range in subnet.canister_ranges.iter() {
1679            self.canister_index.insert(range.clone(), subnet_id);
1680        }
1681    }
1682}
1683
1684#[derive(Clone, Copy)]
1685pub(crate) struct PrincipalStep;
1686
1687impl StepFns<Principal> for PrincipalStep {
1688    fn add_one(start: &Principal) -> Principal {
1689        let bytes = start.as_slice();
1690        let mut arr = [0; 29];
1691        arr[..bytes.len()].copy_from_slice(bytes);
1692        for byte in arr[..bytes.len() - 1].iter_mut().rev() {
1693            *byte = byte.wrapping_add(1);
1694            if *byte != 0 {
1695                break;
1696            }
1697        }
1698        Principal::from_slice(&arr[..bytes.len()])
1699    }
1700    fn sub_one(start: &Principal) -> Principal {
1701        let bytes = start.as_slice();
1702        let mut arr = [0; 29];
1703        arr[..bytes.len()].copy_from_slice(bytes);
1704        for byte in arr[..bytes.len() - 1].iter_mut().rev() {
1705            *byte = byte.wrapping_sub(1);
1706            if *byte != 255 {
1707                break;
1708            }
1709        }
1710        Principal::from_slice(&arr[..bytes.len()])
1711    }
1712}
1713
1714/// API boundary node, which routes /api calls to IC replica nodes.
1715#[derive(Debug, Clone)]
1716pub struct ApiBoundaryNode {
1717    /// Domain name
1718    pub domain: String,
1719    /// IPv6 address in the hexadecimal notation with colons.
1720    pub ipv6_address: String,
1721    /// IPv4 address in the dotted-decimal notation.
1722    pub ipv4_address: Option<String>,
1723}
1724
1725/// A query request builder.
1726///
1727/// This makes it easier to do query calls without actually passing all arguments.
1728#[derive(Debug, Clone)]
1729#[non_exhaustive]
1730pub struct QueryBuilder<'agent> {
1731    agent: &'agent Agent,
1732    /// The [effective canister ID](https://internetcomputer.org/docs/current/references/ic-interface-spec#http-effective-canister-id) of the destination.
1733    pub effective_canister_id: Principal,
1734    /// The principal ID of the canister being called.
1735    pub canister_id: Principal,
1736    /// The name of the canister method being called.
1737    pub method_name: String,
1738    /// The argument blob to be passed to the method.
1739    pub arg: Vec<u8>,
1740    /// The Unix timestamp that the request will expire at.
1741    pub ingress_expiry_datetime: Option<u64>,
1742    /// Whether to include a nonce with the message.
1743    pub use_nonce: bool,
1744}
1745
1746impl<'agent> QueryBuilder<'agent> {
1747    /// Creates a new query builder with an agent for a particular canister method.
1748    pub fn new(agent: &'agent Agent, canister_id: Principal, method_name: String) -> Self {
1749        Self {
1750            agent,
1751            effective_canister_id: canister_id,
1752            canister_id,
1753            method_name,
1754            arg: vec![],
1755            ingress_expiry_datetime: None,
1756            use_nonce: false,
1757        }
1758    }
1759
1760    /// Sets the [effective canister ID](https://internetcomputer.org/docs/current/references/ic-interface-spec#http-effective-canister-id) of the destination.
1761    pub fn with_effective_canister_id(mut self, canister_id: Principal) -> Self {
1762        self.effective_canister_id = canister_id;
1763        self
1764    }
1765
1766    /// Sets the argument blob to pass to the canister. For most canisters this should be a Candid-serialized tuple.
1767    pub fn with_arg<A: Into<Vec<u8>>>(mut self, arg: A) -> Self {
1768        self.arg = arg.into();
1769        self
1770    }
1771
1772    /// Sets `ingress_expiry_datetime` to the provided timestamp, at nanosecond precision.
1773    pub fn expire_at(mut self, time: impl Into<OffsetDateTime>) -> Self {
1774        self.ingress_expiry_datetime = Some(time.into().unix_timestamp_nanos() as u64);
1775        self
1776    }
1777
1778    /// Sets `ingress_expiry_datetime` to `max(now, 4min)`.
1779    pub fn expire_after(mut self, duration: Duration) -> Self {
1780        self.ingress_expiry_datetime = Some(
1781            OffsetDateTime::now_utc()
1782                .saturating_add(duration.try_into().expect("negative duration"))
1783                .unix_timestamp_nanos() as u64,
1784        );
1785        self
1786    }
1787
1788    /// Uses a nonce generated with the agent's configured nonce factory. By default queries do not use nonces,
1789    /// and thus may get a (briefly) cached response.
1790    pub fn with_nonce_generation(mut self) -> Self {
1791        self.use_nonce = true;
1792        self
1793    }
1794
1795    /// Make a query call. This will return a byte vector.
1796    pub async fn call(self) -> Result<Vec<u8>, AgentError> {
1797        self.agent
1798            .query_raw(
1799                self.canister_id,
1800                self.effective_canister_id,
1801                self.method_name,
1802                self.arg,
1803                self.ingress_expiry_datetime,
1804                self.use_nonce,
1805                None,
1806            )
1807            .await
1808    }
1809
1810    /// Make a query call with signature verification. This will return a byte vector.
1811    ///
1812    /// Compared with [call][Self::call], this method will **always** verify the signature of the query response
1813    /// regardless the Agent level configuration from [`AgentBuilder::with_verify_query_signatures`].
1814    pub async fn call_with_verification(self) -> Result<Vec<u8>, AgentError> {
1815        self.agent
1816            .query_raw(
1817                self.canister_id,
1818                self.effective_canister_id,
1819                self.method_name,
1820                self.arg,
1821                self.ingress_expiry_datetime,
1822                self.use_nonce,
1823                Some(true),
1824            )
1825            .await
1826    }
1827
1828    /// Make a query call without signature verification. This will return a byte vector.
1829    ///
1830    /// Compared with [call][Self::call], this method will **never** verify the signature of the query response
1831    /// regardless the Agent level configuration from [`AgentBuilder::with_verify_query_signatures`].
1832    pub async fn call_without_verification(self) -> Result<Vec<u8>, AgentError> {
1833        self.agent
1834            .query_raw(
1835                self.canister_id,
1836                self.effective_canister_id,
1837                self.method_name,
1838                self.arg,
1839                self.ingress_expiry_datetime,
1840                self.use_nonce,
1841                Some(false),
1842            )
1843            .await
1844    }
1845
1846    /// Sign a query call. This will return a [`signed::SignedQuery`]
1847    /// which contains all fields of the query and the signed query in CBOR encoding
1848    pub fn sign(self) -> Result<SignedQuery, AgentError> {
1849        let effective_canister_id = self.effective_canister_id;
1850        let identity = self.agent.identity.clone();
1851        let content = self.into_envelope()?;
1852        let signed_query = sign_envelope(&content, identity)?;
1853        let EnvelopeContent::Query {
1854            ingress_expiry,
1855            sender,
1856            canister_id,
1857            method_name,
1858            arg,
1859            nonce,
1860        } = content
1861        else {
1862            unreachable!()
1863        };
1864        Ok(SignedQuery {
1865            ingress_expiry,
1866            sender,
1867            canister_id,
1868            method_name,
1869            arg,
1870            effective_canister_id,
1871            signed_query,
1872            nonce,
1873        })
1874    }
1875
1876    /// Converts the query builder into [`EnvelopeContent`] for external signing or storage.
1877    pub fn into_envelope(self) -> Result<EnvelopeContent, AgentError> {
1878        self.agent.query_content(
1879            self.canister_id,
1880            self.method_name,
1881            self.arg,
1882            self.ingress_expiry_datetime,
1883            self.use_nonce,
1884        )
1885    }
1886}
1887
1888impl<'agent> IntoFuture for QueryBuilder<'agent> {
1889    type IntoFuture = AgentFuture<'agent, Vec<u8>>;
1890    type Output = Result<Vec<u8>, AgentError>;
1891    fn into_future(self) -> Self::IntoFuture {
1892        Box::pin(self.call())
1893    }
1894}
1895
1896/// An in-flight canister update call. Useful primarily as a `Future`.
1897pub struct UpdateCall<'agent> {
1898    agent: &'agent Agent,
1899    response_future: AgentFuture<'agent, CallResponse<(Vec<u8>, Certificate)>>,
1900    effective_canister_id: Principal,
1901    canister_id: Principal,
1902    method_name: String,
1903}
1904
1905impl fmt::Debug for UpdateCall<'_> {
1906    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1907        f.debug_struct("UpdateCall")
1908            .field("agent", &self.agent)
1909            .field("effective_canister_id", &self.effective_canister_id)
1910            .finish_non_exhaustive()
1911    }
1912}
1913
1914impl Future for UpdateCall<'_> {
1915    type Output = Result<CallResponse<(Vec<u8>, Certificate)>, AgentError>;
1916    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1917        self.response_future.as_mut().poll(cx)
1918    }
1919}
1920
1921impl<'a> UpdateCall<'a> {
1922    /// Waits for the update call to be completed, polling if necessary.
1923    pub async fn and_wait(self) -> Result<(Vec<u8>, Certificate), AgentError> {
1924        let response = self.response_future.await?;
1925
1926        match response {
1927            CallResponse::Response(response) => Ok(response),
1928            CallResponse::Poll(request_id) => {
1929                self.agent
1930                    .wait_inner(
1931                        &request_id,
1932                        self.effective_canister_id,
1933                        Some(Operation::Call {
1934                            canister: self.canister_id,
1935                            method: self.method_name,
1936                        }),
1937                    )
1938                    .await
1939            }
1940        }
1941    }
1942}
1943/// An update request Builder.
1944///
1945/// This makes it easier to do update calls without actually passing all arguments or specifying
1946/// if you want to wait or not.
1947#[derive(Debug)]
1948pub struct UpdateBuilder<'agent> {
1949    agent: &'agent Agent,
1950    /// The [effective canister ID](https://internetcomputer.org/docs/current/references/ic-interface-spec#http-effective-canister-id) of the destination.
1951    pub effective_canister_id: Principal,
1952    /// The principal ID of the canister being called.
1953    pub canister_id: Principal,
1954    /// The name of the canister method being called.
1955    pub method_name: String,
1956    /// The argument blob to be passed to the method.
1957    pub arg: Vec<u8>,
1958    /// The Unix timestamp that the request will expire at.
1959    pub ingress_expiry_datetime: Option<u64>,
1960}
1961
1962impl<'agent> UpdateBuilder<'agent> {
1963    /// Creates a new update builder with an agent for a particular canister method.
1964    pub fn new(agent: &'agent Agent, canister_id: Principal, method_name: String) -> Self {
1965        Self {
1966            agent,
1967            effective_canister_id: canister_id,
1968            canister_id,
1969            method_name,
1970            arg: vec![],
1971            ingress_expiry_datetime: None,
1972        }
1973    }
1974
1975    /// Sets the [effective canister ID](https://internetcomputer.org/docs/current/references/ic-interface-spec#http-effective-canister-id) of the destination.
1976    pub fn with_effective_canister_id(mut self, canister_id: Principal) -> Self {
1977        self.effective_canister_id = canister_id;
1978        self
1979    }
1980
1981    /// Sets the argument blob to pass to the canister. For most canisters this should be a Candid-serialized tuple.
1982    pub fn with_arg<A: Into<Vec<u8>>>(mut self, arg: A) -> Self {
1983        self.arg = arg.into();
1984        self
1985    }
1986
1987    /// Sets `ingress_expiry_datetime` to the provided timestamp, at nanosecond precision.
1988    pub fn expire_at(mut self, time: impl Into<OffsetDateTime>) -> Self {
1989        self.ingress_expiry_datetime = Some(time.into().unix_timestamp_nanos() as u64);
1990        self
1991    }
1992
1993    /// Sets `ingress_expiry_datetime` to `min(now, 4min)`.
1994    pub fn expire_after(mut self, duration: Duration) -> Self {
1995        self.ingress_expiry_datetime = Some(
1996            OffsetDateTime::now_utc()
1997                .saturating_add(duration.try_into().expect("negative duration"))
1998                .unix_timestamp_nanos() as u64,
1999        );
2000        self
2001    }
2002
2003    /// Make an update call. This will call `request_status` on the `RequestId` in a loop and return
2004    /// the response as a byte vector.
2005    pub async fn call_and_wait(self) -> Result<Vec<u8>, AgentError> {
2006        self.call().and_wait().await.map(|x| x.0)
2007    }
2008
2009    /// Make an update call. This will return a `RequestId`.
2010    /// The `RequestId` should then be used for `request_status` (most likely in a loop).
2011    pub fn call(self) -> UpdateCall<'agent> {
2012        let method_name = self.method_name.clone();
2013        let response_future = async move {
2014            self.agent
2015                .update_raw(
2016                    self.canister_id,
2017                    self.effective_canister_id,
2018                    self.method_name,
2019                    self.arg,
2020                    self.ingress_expiry_datetime,
2021                )
2022                .await
2023        };
2024        UpdateCall {
2025            agent: self.agent,
2026            response_future: Box::pin(response_future),
2027            effective_canister_id: self.effective_canister_id,
2028            canister_id: self.canister_id,
2029            method_name,
2030        }
2031    }
2032
2033    /// Sign a update call. This will return a [`signed::SignedUpdate`]
2034    /// which contains all fields of the update and the signed update in CBOR encoding
2035    pub fn sign(self) -> Result<SignedUpdate, AgentError> {
2036        let identity = self.agent.identity.clone();
2037        let effective_canister_id = self.effective_canister_id;
2038        let content = self.into_envelope()?;
2039        let signed_update = sign_envelope(&content, identity)?;
2040        let request_id = to_request_id(&content)?;
2041        let EnvelopeContent::Call {
2042            nonce,
2043            ingress_expiry,
2044            sender,
2045            canister_id,
2046            method_name,
2047            arg,
2048        } = content
2049        else {
2050            unreachable!()
2051        };
2052        Ok(SignedUpdate {
2053            nonce,
2054            ingress_expiry,
2055            sender,
2056            canister_id,
2057            method_name,
2058            arg,
2059            effective_canister_id,
2060            signed_update,
2061            request_id,
2062        })
2063    }
2064
2065    /// Converts the update builder into an [`EnvelopeContent`] for external signing or storage.
2066    pub fn into_envelope(self) -> Result<EnvelopeContent, AgentError> {
2067        let nonce = self.agent.nonce_factory.generate();
2068        self.agent.update_content(
2069            self.canister_id,
2070            self.method_name,
2071            self.arg,
2072            self.ingress_expiry_datetime,
2073            nonce,
2074        )
2075    }
2076}
2077
2078impl<'agent> IntoFuture for UpdateBuilder<'agent> {
2079    type IntoFuture = AgentFuture<'agent, Vec<u8>>;
2080    type Output = Result<Vec<u8>, AgentError>;
2081    fn into_future(self) -> Self::IntoFuture {
2082        Box::pin(self.call_and_wait())
2083    }
2084}
2085
2086/// HTTP client middleware. Implemented automatically for `reqwest`-compatible by-ref `tower::Service`, such as `reqwest_middleware`.
2087#[cfg_attr(target_family = "wasm", async_trait(?Send))]
2088#[cfg_attr(not(target_family = "wasm"), async_trait)]
2089pub trait HttpService: Send + Sync + Debug {
2090    /// Perform a HTTP request. Any retry logic should call `req` again to get a new request.
2091    async fn call<'a>(
2092        &'a self,
2093        req: &'a (dyn Fn() -> Result<http::Request<Bytes>, AgentError> + Send + Sync),
2094        max_retries: usize,
2095        size_limit: Option<usize>,
2096    ) -> Result<http::Response<Bytes>, AgentError>;
2097}
2098
2099/// Convert from http Request to reqwest's one
2100fn from_http_request(req: http::Request<Bytes>) -> Result<Request, AgentError> {
2101    let (parts, body) = req.into_parts();
2102    let body = reqwest::Body::from(body);
2103    // I think it can never fail since it converts from `Url` to `Uri` and `Url` is a subset of `Uri`,
2104    // but just to be safe let's handle it.
2105    let request = http::Request::from_parts(parts, body)
2106        .try_into()
2107        .map_err(|e: reqwest::Error| AgentError::InvalidReplicaUrl(e.to_string()))?;
2108
2109    Ok(request)
2110}
2111
2112/// Convert from reqwests's Response to http one
2113#[cfg(not(target_family = "wasm"))]
2114async fn to_http_response(
2115    resp: Response,
2116    size_limit: Option<usize>,
2117) -> Result<http::Response<Bytes>, AgentError> {
2118    use http_body_util::{BodyExt, Limited};
2119
2120    let resp: http::Response<reqwest::Body> = resp.into();
2121    let (parts, body) = resp.into_parts();
2122    let body = Limited::new(body, size_limit.unwrap_or(usize::MAX));
2123    let body = body
2124        .collect()
2125        .await
2126        .map_err(|e| {
2127            AgentError::TransportError(TransportError::Generic(format!(
2128                "unable to read response body: {e:#}"
2129            )))
2130        })?
2131        .to_bytes();
2132    let resp = http::Response::from_parts(parts, body);
2133
2134    Ok(resp)
2135}
2136
2137/// Convert from reqwests's Response to http one.
2138/// WASM Response in reqwest doesn't have direct conversion for http::Response,
2139/// so we have to hack around using streams.
2140#[cfg(target_family = "wasm")]
2141async fn to_http_response(
2142    resp: Response,
2143    size_limit: Option<usize>,
2144) -> Result<http::Response<Bytes>, AgentError> {
2145    use futures_util::StreamExt;
2146    use http_body::Frame;
2147    use http_body_util::{Limited, StreamBody};
2148
2149    // Save headers
2150    let status = resp.status();
2151    let headers = resp.headers().clone();
2152
2153    // Convert body
2154    let stream = resp.bytes_stream().map(|x| x.map(Frame::data));
2155    let body = StreamBody::new(stream);
2156    let body = Limited::new(body, size_limit.unwrap_or(usize::MAX));
2157    let body = http_body_util::BodyExt::collect(body)
2158        .await
2159        .map_err(|e| {
2160            AgentError::TransportError(TransportError::Generic(format!(
2161                "unable to read response body: {e:#}"
2162            )))
2163        })?
2164        .to_bytes();
2165
2166    let mut resp = http::Response::new(body);
2167    *resp.status_mut() = status;
2168    *resp.headers_mut() = headers;
2169
2170    Ok(resp)
2171}
2172
2173#[cfg(not(target_family = "wasm"))]
2174#[async_trait]
2175impl<T> HttpService for T
2176where
2177    for<'a> &'a T: Service<Request, Response = Response, Error = reqwest::Error>,
2178    for<'a> <&'a Self as Service<Request>>::Future: Send,
2179    T: Send + Sync + Debug + ?Sized,
2180{
2181    #[allow(clippy::needless_arbitrary_self_type)]
2182    async fn call<'a>(
2183        mut self: &'a Self,
2184        req: &'a (dyn Fn() -> Result<http::Request<Bytes>, AgentError> + Send + Sync),
2185        max_retries: usize,
2186        size_limit: Option<usize>,
2187    ) -> Result<http::Response<Bytes>, AgentError> {
2188        let mut retry_count = 0;
2189        loop {
2190            let request = from_http_request(req()?)?;
2191
2192            match Service::call(&mut self, request).await {
2193                Err(err) => {
2194                    // Network-related errors can be retried.
2195                    if err.is_connect() {
2196                        if retry_count >= max_retries {
2197                            return Err(AgentError::TransportError(TransportError::Reqwest(err)));
2198                        }
2199                        retry_count += 1;
2200                    }
2201                    // All other errors return immediately.
2202                    else {
2203                        return Err(AgentError::TransportError(TransportError::Reqwest(err)));
2204                    }
2205                }
2206
2207                Ok(resp) => {
2208                    let resp = to_http_response(resp, size_limit).await?;
2209                    return Ok(resp);
2210                }
2211            }
2212        }
2213    }
2214}
2215
2216#[cfg(target_family = "wasm")]
2217#[async_trait(?Send)]
2218impl<T> HttpService for T
2219where
2220    for<'a> &'a T: Service<Request, Response = Response, Error = reqwest::Error>,
2221    T: Send + Sync + Debug + ?Sized,
2222{
2223    #[allow(clippy::needless_arbitrary_self_type)]
2224    async fn call<'a>(
2225        mut self: &'a Self,
2226        req: &'a (dyn Fn() -> Result<http::Request<Bytes>, AgentError> + Send + Sync),
2227        _retries: usize,
2228        _size_limit: Option<usize>,
2229    ) -> Result<http::Response<Bytes>, AgentError> {
2230        let request = from_http_request(req()?)?;
2231        let response = Service::call(&mut self, request)
2232            .await
2233            .map_err(|e| AgentError::TransportError(TransportError::Reqwest(e)))?;
2234
2235        to_http_response(response, _size_limit).await
2236    }
2237}
2238
2239#[derive(Debug)]
2240struct Retry429Logic {
2241    client: Client,
2242}
2243
2244#[cfg_attr(target_family = "wasm", async_trait(?Send))]
2245#[cfg_attr(not(target_family = "wasm"), async_trait)]
2246impl HttpService for Retry429Logic {
2247    async fn call<'a>(
2248        &'a self,
2249        req: &'a (dyn Fn() -> Result<http::Request<Bytes>, AgentError> + Send + Sync),
2250        _max_tcp_retries: usize,
2251        _size_limit: Option<usize>,
2252    ) -> Result<http::Response<Bytes>, AgentError> {
2253        let mut retries = 0;
2254        loop {
2255            #[cfg(not(target_family = "wasm"))]
2256            let resp = self.client.call(req, _max_tcp_retries, _size_limit).await?;
2257            // Client inconveniently does not implement Service on wasm
2258            #[cfg(target_family = "wasm")]
2259            let resp = {
2260                let request = from_http_request(req()?)?;
2261                let resp = self
2262                    .client
2263                    .execute(request)
2264                    .await
2265                    .map_err(|e| AgentError::TransportError(TransportError::Reqwest(e)))?;
2266
2267                to_http_response(resp, _size_limit).await?
2268            };
2269
2270            if resp.status() == StatusCode::TOO_MANY_REQUESTS {
2271                if retries == 6 {
2272                    break Ok(resp);
2273                } else {
2274                    retries += 1;
2275                    crate::util::sleep(Duration::from_millis(250)).await;
2276                    continue;
2277                }
2278            } else {
2279                break Ok(resp);
2280            }
2281        }
2282    }
2283}
2284
2285#[cfg(all(test, not(target_family = "wasm")))]
2286mod offline_tests {
2287    use super::*;
2288    use tokio::net::TcpListener;
2289    // Any tests that involve the network should go in agent_test, not here.
2290
2291    #[test]
2292    fn rounded_expiry() {
2293        let agent = Agent::builder()
2294            .with_url("http://not-a-real-url")
2295            .build()
2296            .unwrap();
2297        let mut prev_expiry = None;
2298        let mut num_timestamps = 0;
2299        for _ in 0..6 {
2300            let update = agent
2301                .update(&Principal::management_canister(), "not_a_method")
2302                .sign()
2303                .unwrap();
2304            if prev_expiry < Some(update.ingress_expiry) {
2305                prev_expiry = Some(update.ingress_expiry);
2306                num_timestamps += 1;
2307            }
2308        }
2309        // in six requests, there should be no more than two timestamps
2310        assert!(num_timestamps <= 2, "num_timestamps:{num_timestamps} > 2");
2311    }
2312
2313    #[tokio::test]
2314    async fn client_ratelimit() {
2315        let mock_server = TcpListener::bind("127.0.0.1:0").await.unwrap();
2316        let count = Arc::new(Mutex::new(0));
2317        let port = mock_server.local_addr().unwrap().port();
2318        tokio::spawn({
2319            let count = count.clone();
2320            async move {
2321                loop {
2322                    let (mut conn, _) = mock_server.accept().await.unwrap();
2323                    *count.lock().unwrap() += 1;
2324                    tokio::spawn(
2325                        // read all data, never reply
2326                        async move { tokio::io::copy(&mut conn, &mut tokio::io::sink()).await },
2327                    );
2328                }
2329            }
2330        });
2331        let agent = Agent::builder()
2332            .with_http_client(Client::builder().http1_only().build().unwrap())
2333            .with_url(format!("http://127.0.0.1:{port}"))
2334            .with_max_concurrent_requests(2)
2335            .build()
2336            .unwrap();
2337        for _ in 0..3 {
2338            let agent = agent.clone();
2339            tokio::spawn(async move {
2340                agent
2341                    .query(&"ryjl3-tyaaa-aaaaa-aaaba-cai".parse().unwrap(), "greet")
2342                    .call()
2343                    .await
2344            });
2345        }
2346        crate::util::sleep(Duration::from_millis(250)).await;
2347        assert_eq!(*count.lock().unwrap(), 2);
2348    }
2349}