Skip to main content

ic_agent/agent/
mod.rs

1//! The main Agent module. Contains the [Agent] type and all associated structures.
2pub(crate) mod agent_config;
3pub mod agent_error;
4pub(crate) mod builder;
5// delete this module after 0.40
6#[doc(hidden)]
7#[deprecated(since = "0.38.0", note = "use the AgentBuilder methods")]
8pub mod http_transport;
9pub(crate) mod nonce;
10pub(crate) mod response_authentication;
11pub mod route_provider;
12pub mod status;
13pub mod subnet;
14
15pub use agent_config::AgentConfig;
16pub use agent_error::AgentError;
17use agent_error::{HttpErrorPayload, Operation};
18use async_lock::Semaphore;
19use async_trait::async_trait;
20pub use builder::AgentBuilder;
21use bytes::Bytes;
22use cached::{Cached, TimedCache};
23use http::{header::CONTENT_TYPE, HeaderMap, Method, StatusCode, Uri};
24use ic_ed25519::{PublicKey, SignatureError};
25#[doc(inline)]
26pub use ic_transport_types::{
27    signed, CallResponse, Envelope, EnvelopeContent, RejectCode, RejectResponse, ReplyResponse,
28    RequestStatusResponse,
29};
30pub use nonce::{NonceFactory, NonceGenerator};
31use rangemap::RangeInclusiveMap;
32use reqwest::{Client, Request, Response};
33use route_provider::{
34    dynamic_routing::{dynamic_route_provider::DynamicRouteProviderBuilder, node::Node},
35    RouteProvider, UrlUntilReady,
36};
37pub use subnet::{Subnet, SubnetType};
38use time::OffsetDateTime;
39use tower_service::Service;
40
41#[cfg(test)]
42mod agent_test;
43
44use crate::{
45    agent::response_authentication::{
46        extract_der, lookup_canister_info, lookup_canister_metadata, lookup_canister_ranges,
47        lookup_incomplete_subnet, lookup_request_status, lookup_subnet_and_ranges,
48        lookup_subnet_canister_ranges, lookup_subnet_metrics, lookup_time, lookup_tree,
49        lookup_value,
50    },
51    agent_error::TransportError,
52    export::Principal,
53    identity::Identity,
54    to_request_id, RequestId,
55};
56use backoff::{backoff::Backoff, ExponentialBackoffBuilder};
57use backoff::{exponential::ExponentialBackoff, SystemClock};
58use ic_certification::{Certificate, Delegation, Label};
59use ic_transport_types::{
60    signed::{SignedQuery, SignedRequestStatus, SignedUpdate},
61    QueryResponse, ReadStateResponse, SubnetMetrics, TransportCallResponse,
62};
63use serde::Serialize;
64use status::Status;
65use std::{
66    borrow::Cow,
67    convert::TryFrom,
68    fmt::{self, Debug},
69    future::{Future, IntoFuture},
70    pin::Pin,
71    str::FromStr,
72    sync::{Arc, Mutex, RwLock},
73    task::{Context, Poll},
74    time::Duration,
75};
76
77use crate::agent::response_authentication::lookup_api_boundary_nodes;
78
79const IC_STATE_ROOT_DOMAIN_SEPARATOR: &[u8; 14] = b"\x0Dic-state-root";
80
81const IC_ROOT_KEY: &[u8; 133] = b"\x30\x81\x82\x30\x1d\x06\x0d\x2b\x06\x01\x04\x01\x82\xdc\x7c\x05\x03\x01\x02\x01\x06\x0c\x2b\x06\x01\x04\x01\x82\xdc\x7c\x05\x03\x02\x01\x03\x61\x00\x81\x4c\x0e\x6e\xc7\x1f\xab\x58\x3b\x08\xbd\x81\x37\x3c\x25\x5c\x3c\x37\x1b\x2e\x84\x86\x3c\x98\xa4\xf1\xe0\x8b\x74\x23\x5d\x14\xfb\x5d\x9c\x0c\xd5\x46\xd9\x68\x5f\x91\x3a\x0c\x0b\x2c\xc5\x34\x15\x83\xbf\x4b\x43\x92\xe4\x67\xdb\x96\xd6\x5b\x9b\xb4\xcb\x71\x71\x12\xf8\x47\x2e\x0d\x5a\x4d\x14\x50\x5f\xfd\x74\x84\xb0\x12\x91\x09\x1c\x5f\x87\xb9\x88\x83\x46\x3f\x98\x09\x1a\x0b\xaa\xae";
82
83#[cfg(not(target_family = "wasm"))]
84type AgentFuture<'a, V> = Pin<Box<dyn Future<Output = Result<V, AgentError>> + Send + 'a>>;
85
86#[cfg(target_family = "wasm")]
87type AgentFuture<'a, V> = Pin<Box<dyn Future<Output = Result<V, AgentError>> + 'a>>;
88
89/// A low level Agent to make calls to a Replica endpoint.
90///
91#[cfg_attr(unix, doc = " ```rust")] // pocket-ic
92#[cfg_attr(not(unix), doc = " ```ignore")]
93/// use ic_agent::{Agent, export::Principal};
94/// use candid::{Encode, Decode, CandidType, Nat};
95/// use serde::Deserialize;
96///
97/// #[derive(CandidType)]
98/// struct Argument {
99///   amount: Option<Nat>,
100/// }
101///
102/// #[derive(CandidType, Deserialize)]
103/// struct CreateCanisterResult {
104///   canister_id: Principal,
105/// }
106///
107/// # fn create_identity() -> impl ic_agent::Identity {
108/// #     // In real code, the raw key should be either read from a pem file or generated with randomness.
109/// #     ic_agent::identity::BasicIdentity::from_raw_key(&[0u8;32])
110/// # }
111/// #
112/// async fn create_a_canister() -> Result<Principal, Box<dyn std::error::Error>> {
113/// # Ok(ref_tests::utils::with_pic(async move |pic| {
114/// # let url = ref_tests::utils::get_pic_url(&pic);
115///   let agent = Agent::builder()
116///     .with_url(url)
117///     .with_identity(create_identity())
118///     .build()?;
119///
120///   // Only do the following call when not contacting the IC main net (e.g. a local emulator).
121///   // This is important as the main net public key is static and a rogue network could return
122///   // a different key.
123///   // If you know the root key ahead of time, you can use `agent.set_root_key(root_key);`.
124///   agent.fetch_root_key().await?;
125///   let management_canister_id = Principal::from_text("aaaaa-aa")?;
126///
127///   // Create a call to the management canister to create a new canister ID, and wait for a result.
128///   // This API only works in local instances; mainnet instances must use the cycles ledger.
129///   // See `dfx info default-effective-canister-id`.
130/// # let effective_canister_id = ref_tests::utils::get_effective_canister_id(&pic).await;
131///   let response = agent.update(&management_canister_id, "provisional_create_canister_with_cycles")
132///     .with_effective_canister_id(effective_canister_id)
133///     .with_arg(Encode!(&Argument { amount: None })?)
134///     .await?;
135///
136///   let result = Decode!(response.as_slice(), CreateCanisterResult)?;
137///   let canister_id: Principal = Principal::from_text(&result.canister_id.to_text())?;
138///   Ok(canister_id)
139/// # }).await)
140/// }
141///
142/// # let mut runtime = tokio::runtime::Runtime::new().unwrap();
143/// # runtime.block_on(async {
144/// let canister_id = create_a_canister().await.unwrap();
145/// eprintln!("{}", canister_id);
146/// # });
147/// ```
148///
149/// This agent does not understand Candid, and only acts on byte buffers.
150///
151/// Some methods return certificates. While there is a `verify_certificate` method, any certificate
152/// you receive from a method has already been verified and you do not need to manually verify it.
153#[derive(Clone)]
154pub struct Agent {
155    nonce_factory: Arc<dyn NonceGenerator>,
156    identity: Arc<dyn Identity>,
157    ingress_expiry: Duration,
158    root_key: Arc<RwLock<Vec<u8>>>,
159    client: Arc<dyn HttpService>,
160    route_provider: Arc<dyn RouteProvider>,
161    subnet_key_cache: Arc<Mutex<SubnetCache>>,
162    concurrent_requests_semaphore: Arc<Semaphore>,
163    verify_query_signatures: bool,
164    max_response_body_size: Option<usize>,
165    max_polling_time: Duration,
166    #[allow(dead_code)]
167    max_tcp_error_retries: usize,
168}
169
170impl fmt::Debug for Agent {
171    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
172        f.debug_struct("Agent")
173            .field("ingress_expiry", &self.ingress_expiry)
174            .finish_non_exhaustive()
175    }
176}
177
178impl Agent {
179    /// Create an instance of an [`AgentBuilder`] for building an [`Agent`]. This is simpler than
180    /// using the [`AgentConfig`] and [`Agent::new()`].
181    pub fn builder() -> builder::AgentBuilder {
182        Default::default()
183    }
184
185    /// Create an instance of an [`Agent`].
186    pub fn new(config: agent_config::AgentConfig) -> Result<Agent, AgentError> {
187        let client = config.http_service.unwrap_or_else(|| {
188            Arc::new(Retry429Logic {
189                client: config.client.unwrap_or_else(|| {
190                    #[cfg(not(target_family = "wasm"))]
191                    {
192                        Client::builder()
193                            .use_rustls_tls()
194                            .timeout(Duration::from_secs(360))
195                            .build()
196                            .expect("Could not create HTTP client.")
197                    }
198                    #[cfg(all(target_family = "wasm", feature = "wasm-bindgen"))]
199                    {
200                        Client::new()
201                    }
202                }),
203            })
204        });
205        Ok(Agent {
206            nonce_factory: config.nonce_factory,
207            identity: config.identity,
208            ingress_expiry: config.ingress_expiry,
209            root_key: Arc::new(RwLock::new(IC_ROOT_KEY.to_vec())),
210            client: client.clone(),
211            route_provider: if let Some(route_provider) = config.route_provider {
212                route_provider
213            } else if let Some(url) = config.url {
214                if config.background_dynamic_routing {
215                    assert!(
216                        url.scheme() == "https" && url.path() == "/" && url.port().is_none() && url.domain().is_some(),
217                        "in dynamic routing mode, URL must be in the exact form https://domain with no path, port, IP, or non-HTTPS scheme"
218                    );
219                    let seeds = vec![Node::new(url.domain().unwrap()).unwrap()];
220                    UrlUntilReady::new(url, async move {
221                        let provider =
222                            DynamicRouteProviderBuilder::new(seeds, client, None).build();
223                        provider.start().await;
224                        provider
225                    }) as Arc<dyn RouteProvider>
226                } else {
227                    Arc::new(url)
228                }
229            } else {
230                panic!("either route_provider or url must be specified");
231            },
232            subnet_key_cache: Arc::new(Mutex::new(SubnetCache::new())),
233            verify_query_signatures: config.verify_query_signatures,
234            concurrent_requests_semaphore: Arc::new(Semaphore::new(config.max_concurrent_requests)),
235            max_response_body_size: config.max_response_body_size,
236            max_tcp_error_retries: config.max_tcp_error_retries,
237            max_polling_time: config.max_polling_time,
238        })
239    }
240
241    /// Set the identity provider for signing messages.
242    ///
243    /// NOTE: if you change the identity while having update calls in
244    /// flight, you will not be able to [`Agent::request_status_raw`] the status of these
245    /// messages.
246    pub fn set_identity<I>(&mut self, identity: I)
247    where
248        I: 'static + Identity,
249    {
250        self.identity = Arc::new(identity);
251    }
252    /// Set the arc identity provider for signing messages.
253    ///
254    /// NOTE: if you change the identity while having update calls in
255    /// flight, you will not be able to [`Agent::request_status_raw`] the status of these
256    /// messages.
257    pub fn set_arc_identity(&mut self, identity: Arc<dyn Identity>) {
258        self.identity = identity;
259    }
260
261    /// By default, the agent is configured to talk to the main Internet Computer, and verifies
262    /// responses using a hard-coded public key.
263    ///
264    /// This function will instruct the agent to ask the endpoint for its public key, and use
265    /// that instead. This is required when talking to a local test instance, for example.
266    ///
267    /// *Only use this when you are  _not_ talking to the main Internet Computer, otherwise
268    /// you are prone to man-in-the-middle attacks! Do not call this function by default.*
269    pub async fn fetch_root_key(&self) -> Result<(), AgentError> {
270        if self.read_root_key()[..] != IC_ROOT_KEY[..] {
271            // already fetched the root key
272            return Ok(());
273        }
274        let status = self.status().await?;
275        let Some(root_key) = status.root_key else {
276            return Err(AgentError::NoRootKeyInStatus(status));
277        };
278        self.set_root_key(root_key);
279        Ok(())
280    }
281
282    /// By default, the agent is configured to talk to the main Internet Computer, and verifies
283    /// responses using a hard-coded public key.
284    ///
285    /// Using this function you can set the root key to a known one if you know if beforehand.
286    pub fn set_root_key(&self, root_key: Vec<u8>) {
287        *self.root_key.write().unwrap() = root_key;
288    }
289
290    /// Return the root key currently in use.
291    pub fn read_root_key(&self) -> Vec<u8> {
292        self.root_key.read().unwrap().clone()
293    }
294
295    fn get_expiry_date(&self) -> u64 {
296        let expiry_raw = OffsetDateTime::now_utc() + self.ingress_expiry;
297        let mut rounded = expiry_raw.replace_nanosecond(0).unwrap();
298        if self.ingress_expiry.as_secs() > 90 {
299            rounded = rounded.replace_second(0).unwrap();
300        }
301        rounded.unix_timestamp_nanos().try_into().unwrap()
302    }
303
304    /// Return the principal of the identity.
305    pub fn get_principal(&self) -> Result<Principal, String> {
306        self.identity.sender()
307    }
308
309    async fn query_endpoint<A>(
310        &self,
311        effective_canister_id: Principal,
312        serialized_bytes: Vec<u8>,
313    ) -> Result<A, AgentError>
314    where
315        A: serde::de::DeserializeOwned,
316    {
317        let _permit = self.concurrent_requests_semaphore.acquire().await;
318        let bytes = self
319            .execute(
320                Method::POST,
321                &format!("api/v3/canister/{}/query", effective_canister_id.to_text()),
322                Some(serialized_bytes),
323            )
324            .await?
325            .1;
326        serde_cbor::from_slice(&bytes).map_err(AgentError::InvalidCborData)
327    }
328
329    async fn read_state_endpoint<A>(
330        &self,
331        effective_canister_id: Principal,
332        serialized_bytes: Vec<u8>,
333    ) -> Result<A, AgentError>
334    where
335        A: serde::de::DeserializeOwned,
336    {
337        let _permit = self.concurrent_requests_semaphore.acquire().await;
338        let endpoint = format!(
339            "api/v3/canister/{}/read_state",
340            effective_canister_id.to_text()
341        );
342        let bytes = self
343            .execute(Method::POST, &endpoint, Some(serialized_bytes))
344            .await?
345            .1;
346        serde_cbor::from_slice(&bytes).map_err(AgentError::InvalidCborData)
347    }
348
349    async fn read_subnet_state_endpoint<A>(
350        &self,
351        subnet_id: Principal,
352        serialized_bytes: Vec<u8>,
353    ) -> Result<A, AgentError>
354    where
355        A: serde::de::DeserializeOwned,
356    {
357        let _permit = self.concurrent_requests_semaphore.acquire().await;
358        let endpoint = format!("api/v3/subnet/{}/read_state", subnet_id.to_text());
359        let bytes = self
360            .execute(Method::POST, &endpoint, Some(serialized_bytes))
361            .await?
362            .1;
363        serde_cbor::from_slice(&bytes).map_err(AgentError::InvalidCborData)
364    }
365
366    async fn call_endpoint(
367        &self,
368        effective_canister_id: Principal,
369        serialized_bytes: Vec<u8>,
370    ) -> Result<TransportCallResponse, AgentError> {
371        let _permit = self.concurrent_requests_semaphore.acquire().await;
372        let endpoint = format!("api/v4/canister/{}/call", effective_canister_id.to_text());
373        let (status_code, response_body) = self
374            .execute(Method::POST, &endpoint, Some(serialized_bytes))
375            .await?;
376
377        if status_code == StatusCode::ACCEPTED {
378            return Ok(TransportCallResponse::Accepted);
379        }
380
381        serde_cbor::from_slice(&response_body).map_err(AgentError::InvalidCborData)
382    }
383
384    /// The simplest way to do a query call; sends a byte array and will return a byte vector.
385    /// The encoding is left as an exercise to the user.
386    #[allow(clippy::too_many_arguments)]
387    async fn query_raw(
388        &self,
389        canister_id: Principal,
390        effective_canister_id: Principal,
391        method_name: String,
392        arg: Vec<u8>,
393        ingress_expiry_datetime: Option<u64>,
394        use_nonce: bool,
395        explicit_verify_query_signatures: Option<bool>,
396    ) -> Result<Vec<u8>, AgentError> {
397        let operation = Operation::Call {
398            canister: canister_id,
399            method: method_name.clone(),
400        };
401        let content = self.query_content(
402            canister_id,
403            method_name,
404            arg,
405            ingress_expiry_datetime,
406            use_nonce,
407        )?;
408        let serialized_bytes = sign_envelope(&content, self.identity.clone())?;
409        self.query_inner(
410            effective_canister_id,
411            serialized_bytes,
412            content.to_request_id(),
413            explicit_verify_query_signatures,
414            operation,
415        )
416        .await
417    }
418
419    /// Send the signed query to the network. Will return a byte vector.
420    /// The bytes will be checked if it is a valid query.
421    /// If you want to inspect the fields of the query call, use [`signed_query_inspect`] before calling this method.
422    pub async fn query_signed(
423        &self,
424        effective_canister_id: Principal,
425        signed_query: Vec<u8>,
426    ) -> Result<Vec<u8>, AgentError> {
427        let envelope: Envelope =
428            serde_cbor::from_slice(&signed_query).map_err(AgentError::InvalidCborData)?;
429        let EnvelopeContent::Query {
430            canister_id,
431            method_name,
432            ..
433        } = &*envelope.content
434        else {
435            return Err(AgentError::CallDataMismatch {
436                field: "request_type".to_string(),
437                value_arg: "query".to_string(),
438                value_cbor: if matches!(*envelope.content, EnvelopeContent::Call { .. }) {
439                    "update"
440                } else {
441                    "read_state"
442                }
443                .to_string(),
444            });
445        };
446        let operation = Operation::Call {
447            canister: *canister_id,
448            method: method_name.clone(),
449        };
450        self.query_inner(
451            effective_canister_id,
452            signed_query,
453            envelope.content.to_request_id(),
454            None,
455            operation,
456        )
457        .await
458    }
459
460    /// Helper function for performing both the query call and possibly a `read_state` to check the subnet node keys.
461    ///
462    /// This should be used instead of `query_endpoint`. No validation is performed on `signed_query`.
463    async fn query_inner(
464        &self,
465        effective_canister_id: Principal,
466        signed_query: Vec<u8>,
467        request_id: RequestId,
468        explicit_verify_query_signatures: Option<bool>,
469        operation: Operation,
470    ) -> Result<Vec<u8>, AgentError> {
471        let response = if explicit_verify_query_signatures.unwrap_or(self.verify_query_signatures) {
472            let (response, mut subnet) = futures_util::try_join!(
473                self.query_endpoint::<QueryResponse>(effective_canister_id, signed_query),
474                self.get_subnet_by_canister(&effective_canister_id)
475            )?;
476            if response.signatures().is_empty() {
477                return Err(AgentError::MissingSignature);
478            } else if response.signatures().len() > subnet.node_keys.len() {
479                return Err(AgentError::TooManySignatures {
480                    had: response.signatures().len(),
481                    needed: subnet.node_keys.len(),
482                });
483            }
484            for signature in response.signatures() {
485                if OffsetDateTime::now_utc()
486                    - OffsetDateTime::from_unix_timestamp_nanos(signature.timestamp.into()).unwrap()
487                    > self.ingress_expiry
488                {
489                    return Err(AgentError::CertificateOutdated(self.ingress_expiry));
490                }
491                let signable = response.signable(request_id, signature.timestamp);
492                let node_key = if let Some(node_key) = subnet.node_keys.get(&signature.identity) {
493                    node_key
494                } else {
495                    subnet = self
496                        .fetch_subnet_by_canister(&effective_canister_id)
497                        .await?;
498                    subnet
499                        .node_keys
500                        .get(&signature.identity)
501                        .ok_or(AgentError::CertificateNotAuthorized())?
502                };
503                if node_key.len() != 44 {
504                    return Err(AgentError::DerKeyLengthMismatch {
505                        expected: 44,
506                        actual: node_key.len(),
507                    });
508                }
509                const DER_PREFIX: [u8; 12] = [48, 42, 48, 5, 6, 3, 43, 101, 112, 3, 33, 0];
510                if node_key[..12] != DER_PREFIX {
511                    return Err(AgentError::DerPrefixMismatch {
512                        expected: DER_PREFIX.to_vec(),
513                        actual: node_key[..12].to_vec(),
514                    });
515                }
516                let pubkey = PublicKey::deserialize_raw(&node_key[12..])
517                    .map_err(|_| AgentError::MalformedPublicKey)?;
518
519                match pubkey.verify_signature(&signable, &signature.signature[..]) {
520                    Ok(()) => (),
521                    Err(SignatureError::InvalidSignature) => {
522                        return Err(AgentError::QuerySignatureVerificationFailed)
523                    }
524                    Err(SignatureError::InvalidLength) => {
525                        return Err(AgentError::MalformedSignature)
526                    }
527                    _ => unreachable!(),
528                }
529            }
530            response
531        } else {
532            self.query_endpoint::<QueryResponse>(effective_canister_id, signed_query)
533                .await?
534        };
535
536        match response {
537            QueryResponse::Replied { reply, .. } => Ok(reply.arg),
538            QueryResponse::Rejected { reject, .. } => Err(AgentError::UncertifiedReject {
539                reject,
540                operation: Some(operation),
541            }),
542        }
543    }
544
545    fn query_content(
546        &self,
547        canister_id: Principal,
548        method_name: String,
549        arg: Vec<u8>,
550        ingress_expiry_datetime: Option<u64>,
551        use_nonce: bool,
552    ) -> Result<EnvelopeContent, AgentError> {
553        Ok(EnvelopeContent::Query {
554            sender: self.identity.sender().map_err(AgentError::SigningError)?,
555            canister_id,
556            method_name,
557            arg,
558            ingress_expiry: ingress_expiry_datetime.unwrap_or_else(|| self.get_expiry_date()),
559            nonce: use_nonce.then(|| self.nonce_factory.generate()).flatten(),
560        })
561    }
562
563    /// The simplest way to do an update call; sends a byte array and will return a response, [`CallResponse`], from the replica.
564    async fn update_raw(
565        &self,
566        canister_id: Principal,
567        effective_canister_id: Principal,
568        method_name: String,
569        arg: Vec<u8>,
570        ingress_expiry_datetime: Option<u64>,
571    ) -> Result<CallResponse<(Vec<u8>, Certificate)>, AgentError> {
572        let nonce = self.nonce_factory.generate();
573        let content = self.update_content(
574            canister_id,
575            method_name.clone(),
576            arg,
577            ingress_expiry_datetime,
578            nonce,
579        )?;
580        let operation = Some(Operation::Call {
581            canister: canister_id,
582            method: method_name,
583        });
584        let request_id = to_request_id(&content)?;
585        let serialized_bytes = sign_envelope(&content, self.identity.clone())?;
586
587        let response_body = self
588            .call_endpoint(effective_canister_id, serialized_bytes)
589            .await?;
590
591        match response_body {
592            TransportCallResponse::Replied { certificate } => {
593                let certificate =
594                    serde_cbor::from_slice(&certificate).map_err(AgentError::InvalidCborData)?;
595
596                self.verify(&certificate, effective_canister_id)?;
597                let status = lookup_request_status(&certificate, &request_id)?;
598
599                match status {
600                    RequestStatusResponse::Replied(reply) => {
601                        Ok(CallResponse::Response((reply.arg, certificate)))
602                    }
603                    RequestStatusResponse::Rejected(reject_response) => {
604                        Err(AgentError::CertifiedReject {
605                            reject: reject_response,
606                            operation,
607                        })?
608                    }
609                    _ => Ok(CallResponse::Poll(request_id)),
610                }
611            }
612            TransportCallResponse::Accepted => Ok(CallResponse::Poll(request_id)),
613            TransportCallResponse::NonReplicatedRejection(reject_response) => {
614                Err(AgentError::UncertifiedReject {
615                    reject: reject_response,
616                    operation,
617                })
618            }
619        }
620    }
621
622    /// Send the signed update to the network. Will return a [`CallResponse<Vec<u8>>`].
623    /// The bytes will be checked to verify that it is a valid update.
624    /// If you want to inspect the fields of the update, use [`signed_update_inspect`] before calling this method.
625    pub async fn update_signed(
626        &self,
627        effective_canister_id: Principal,
628        signed_update: Vec<u8>,
629    ) -> Result<CallResponse<Vec<u8>>, AgentError> {
630        let envelope: Envelope =
631            serde_cbor::from_slice(&signed_update).map_err(AgentError::InvalidCborData)?;
632        let EnvelopeContent::Call {
633            canister_id,
634            method_name,
635            ..
636        } = &*envelope.content
637        else {
638            return Err(AgentError::CallDataMismatch {
639                field: "request_type".to_string(),
640                value_arg: "update".to_string(),
641                value_cbor: if matches!(*envelope.content, EnvelopeContent::Query { .. }) {
642                    "query"
643                } else {
644                    "read_state"
645                }
646                .to_string(),
647            });
648        };
649        let operation = Some(Operation::Call {
650            canister: *canister_id,
651            method: method_name.clone(),
652        });
653        let request_id = to_request_id(&envelope.content)?;
654
655        let response_body = self
656            .call_endpoint(effective_canister_id, signed_update)
657            .await?;
658
659        match response_body {
660            TransportCallResponse::Replied { certificate } => {
661                let certificate =
662                    serde_cbor::from_slice(&certificate).map_err(AgentError::InvalidCborData)?;
663
664                self.verify(&certificate, effective_canister_id)?;
665                let status = lookup_request_status(&certificate, &request_id)?;
666
667                match status {
668                    RequestStatusResponse::Replied(reply) => Ok(CallResponse::Response(reply.arg)),
669                    RequestStatusResponse::Rejected(reject_response) => {
670                        Err(AgentError::CertifiedReject {
671                            reject: reject_response,
672                            operation,
673                        })?
674                    }
675                    _ => Ok(CallResponse::Poll(request_id)),
676                }
677            }
678            TransportCallResponse::Accepted => Ok(CallResponse::Poll(request_id)),
679            TransportCallResponse::NonReplicatedRejection(reject_response) => {
680                Err(AgentError::UncertifiedReject {
681                    reject: reject_response,
682                    operation,
683                })
684            }
685        }
686    }
687
688    fn update_content(
689        &self,
690        canister_id: Principal,
691        method_name: String,
692        arg: Vec<u8>,
693        ingress_expiry_datetime: Option<u64>,
694        nonce: Option<Vec<u8>>,
695    ) -> Result<EnvelopeContent, AgentError> {
696        Ok(EnvelopeContent::Call {
697            canister_id,
698            method_name,
699            arg,
700            nonce,
701            sender: self.identity.sender().map_err(AgentError::SigningError)?,
702            ingress_expiry: ingress_expiry_datetime.unwrap_or_else(|| self.get_expiry_date()),
703        })
704    }
705
706    fn get_retry_policy(&self) -> ExponentialBackoff<SystemClock> {
707        ExponentialBackoffBuilder::new()
708            .with_initial_interval(Duration::from_millis(500))
709            .with_max_interval(Duration::from_secs(1))
710            .with_multiplier(1.4)
711            .with_max_elapsed_time(Some(self.max_polling_time))
712            .build()
713    }
714
715    /// Wait for `request_status` to return a Replied response and return the arg.
716    pub async fn wait_signed(
717        &self,
718        request_id: &RequestId,
719        effective_canister_id: Principal,
720        signed_request_status: Vec<u8>,
721    ) -> Result<(Vec<u8>, Certificate), AgentError> {
722        let mut retry_policy = self.get_retry_policy();
723
724        let mut request_accepted = false;
725        loop {
726            let (resp, cert) = self
727                .request_status_signed(
728                    request_id,
729                    effective_canister_id,
730                    signed_request_status.clone(),
731                )
732                .await?;
733            match resp {
734                RequestStatusResponse::Unknown => {
735                    // If status is still `Unknown` after 5 minutes, the ingress message is lost.
736                    if retry_policy.get_elapsed_time() > Duration::from_secs(5 * 60) {
737                        return Err(AgentError::TimeoutWaitingForResponse());
738                    }
739                }
740
741                RequestStatusResponse::Received | RequestStatusResponse::Processing => {
742                    if !request_accepted {
743                        retry_policy.reset();
744                        request_accepted = true;
745                    }
746                }
747
748                RequestStatusResponse::Replied(ReplyResponse { arg, .. }) => {
749                    return Ok((arg, cert))
750                }
751
752                RequestStatusResponse::Rejected(response) => {
753                    return Err(AgentError::CertifiedReject {
754                        reject: response,
755                        operation: None,
756                    })
757                }
758
759                RequestStatusResponse::Done => {
760                    return Err(AgentError::RequestStatusDoneNoReply(String::from(
761                        *request_id,
762                    )))
763                }
764            };
765
766            match retry_policy.next_backoff() {
767                Some(duration) => crate::util::sleep(duration).await,
768
769                None => return Err(AgentError::TimeoutWaitingForResponse()),
770            }
771        }
772    }
773
774    /// Call `request_status` on the `RequestId` in a loop and return the response as a byte vector.
775    pub async fn wait(
776        &self,
777        request_id: &RequestId,
778        effective_canister_id: Principal,
779    ) -> Result<(Vec<u8>, Certificate), AgentError> {
780        self.wait_inner(request_id, effective_canister_id, None)
781            .await
782    }
783
784    async fn wait_inner(
785        &self,
786        request_id: &RequestId,
787        effective_canister_id: Principal,
788        operation: Option<Operation>,
789    ) -> Result<(Vec<u8>, Certificate), AgentError> {
790        let mut retry_policy = self.get_retry_policy();
791
792        let mut request_accepted = false;
793        loop {
794            let (resp, cert) = self
795                .request_status_raw(request_id, effective_canister_id)
796                .await?;
797            match resp {
798                RequestStatusResponse::Unknown => {
799                    // If status is still `Unknown` after 5 minutes, the ingress message is lost.
800                    if retry_policy.get_elapsed_time() > Duration::from_secs(5 * 60) {
801                        return Err(AgentError::TimeoutWaitingForResponse());
802                    }
803                }
804
805                RequestStatusResponse::Received | RequestStatusResponse::Processing => {
806                    if !request_accepted {
807                        // The system will return RequestStatusResponse::Unknown
808                        // until the request is accepted
809                        // and we generally cannot know how long that will take.
810                        // State transitions between Received and Processing may be
811                        // instantaneous. Therefore, once we know the request is accepted,
812                        // we should restart the backoff so the request does not time out.
813
814                        retry_policy.reset();
815                        request_accepted = true;
816                    }
817                }
818
819                RequestStatusResponse::Replied(ReplyResponse { arg, .. }) => {
820                    return Ok((arg, cert))
821                }
822
823                RequestStatusResponse::Rejected(response) => {
824                    return Err(AgentError::CertifiedReject {
825                        reject: response,
826                        operation,
827                    })
828                }
829
830                RequestStatusResponse::Done => {
831                    return Err(AgentError::RequestStatusDoneNoReply(String::from(
832                        *request_id,
833                    )))
834                }
835            };
836
837            match retry_policy.next_backoff() {
838                Some(duration) => crate::util::sleep(duration).await,
839
840                None => return Err(AgentError::TimeoutWaitingForResponse()),
841            }
842        }
843    }
844
845    /// Request the raw state tree directly, under an effective canister ID.
846    /// See [the protocol docs](https://internetcomputer.org/docs/current/references/ic-interface-spec#http-read-state) for more information.
847    pub async fn read_state_raw(
848        &self,
849        paths: Vec<Vec<Label>>,
850        effective_canister_id: Principal,
851    ) -> Result<Certificate, AgentError> {
852        let content = self.read_state_content(paths)?;
853        let serialized_bytes = sign_envelope(&content, self.identity.clone())?;
854
855        let read_state_response: ReadStateResponse = self
856            .read_state_endpoint(effective_canister_id, serialized_bytes)
857            .await?;
858        let cert: Certificate = serde_cbor::from_slice(&read_state_response.certificate)
859            .map_err(AgentError::InvalidCborData)?;
860        self.verify(&cert, effective_canister_id)?;
861        Ok(cert)
862    }
863
864    /// Request the raw state tree directly, under a subnet ID.
865    /// See [the protocol docs](https://internetcomputer.org/docs/current/references/ic-interface-spec#http-read-state) for more information.
866    pub async fn read_subnet_state_raw(
867        &self,
868        paths: Vec<Vec<Label>>,
869        subnet_id: Principal,
870    ) -> Result<Certificate, AgentError> {
871        let content = self.read_state_content(paths)?;
872        let serialized_bytes = sign_envelope(&content, self.identity.clone())?;
873
874        let read_state_response: ReadStateResponse = self
875            .read_subnet_state_endpoint(subnet_id, serialized_bytes)
876            .await?;
877        let cert: Certificate = serde_cbor::from_slice(&read_state_response.certificate)
878            .map_err(AgentError::InvalidCborData)?;
879        self.verify_for_subnet(&cert, subnet_id)?;
880        Ok(cert)
881    }
882
883    fn read_state_content(&self, paths: Vec<Vec<Label>>) -> Result<EnvelopeContent, AgentError> {
884        Ok(EnvelopeContent::ReadState {
885            sender: self.identity.sender().map_err(AgentError::SigningError)?,
886            paths,
887            ingress_expiry: self.get_expiry_date(),
888        })
889    }
890
891    /// Verify a certificate, checking delegation if present.
892    /// Only passes if the certificate also has authority over the canister.
893    pub fn verify(
894        &self,
895        cert: &Certificate,
896        effective_canister_id: Principal,
897    ) -> Result<(), AgentError> {
898        self.verify_cert(cert, effective_canister_id)?;
899        self.verify_cert_timestamp(cert)?;
900        Ok(())
901    }
902
903    fn verify_cert(
904        &self,
905        cert: &Certificate,
906        effective_canister_id: Principal,
907    ) -> Result<(), AgentError> {
908        let sig = &cert.signature;
909
910        let root_hash = cert.tree.digest();
911        let mut msg = vec![];
912        msg.extend_from_slice(IC_STATE_ROOT_DOMAIN_SEPARATOR);
913        msg.extend_from_slice(&root_hash);
914
915        let der_key = self.check_delegation(&cert.delegation, effective_canister_id)?;
916        let key = extract_der(der_key)?;
917
918        ic_verify_bls_signature::verify_bls_signature(sig, &msg, &key)
919            .map_err(|_| AgentError::CertificateVerificationFailed())?;
920        Ok(())
921    }
922
923    /// Verify a certificate, checking delegation if present.
924    /// Only passes if the certificate is for the specified subnet.
925    pub fn verify_for_subnet(
926        &self,
927        cert: &Certificate,
928        subnet_id: Principal,
929    ) -> Result<(), AgentError> {
930        self.verify_cert_for_subnet(cert, subnet_id)?;
931        self.verify_cert_timestamp(cert)?;
932        Ok(())
933    }
934
935    fn verify_cert_for_subnet(
936        &self,
937        cert: &Certificate,
938        subnet_id: Principal,
939    ) -> Result<(), AgentError> {
940        let sig = &cert.signature;
941
942        let root_hash = cert.tree.digest();
943        let mut msg = vec![];
944        msg.extend_from_slice(IC_STATE_ROOT_DOMAIN_SEPARATOR);
945        msg.extend_from_slice(&root_hash);
946
947        let der_key = self.check_delegation_for_subnet(&cert.delegation, subnet_id)?;
948        let key = extract_der(der_key)?;
949
950        ic_verify_bls_signature::verify_bls_signature(sig, &msg, &key)
951            .map_err(|_| AgentError::CertificateVerificationFailed())?;
952        Ok(())
953    }
954
955    fn verify_cert_timestamp(&self, cert: &Certificate) -> Result<(), AgentError> {
956        // Verify that the certificate is not older than ingress expiry
957        // Certificates with timestamps in the future are allowed
958        let time = lookup_time(cert)?;
959        if (OffsetDateTime::now_utc()
960            - OffsetDateTime::from_unix_timestamp_nanos(time.into()).unwrap())
961            > self.ingress_expiry
962        {
963            Err(AgentError::CertificateOutdated(self.ingress_expiry))
964        } else {
965            Ok(())
966        }
967    }
968
969    fn check_delegation(
970        &self,
971        delegation: &Option<Delegation>,
972        effective_canister_id: Principal,
973    ) -> Result<Vec<u8>, AgentError> {
974        match delegation {
975            None => Ok(self.read_root_key()),
976            Some(delegation) => {
977                let cert: Certificate = serde_cbor::from_slice(&delegation.certificate)
978                    .map_err(AgentError::InvalidCborData)?;
979                if cert.delegation.is_some() {
980                    return Err(AgentError::CertificateHasTooManyDelegations);
981                }
982                self.verify_cert(&cert, effective_canister_id)?;
983                let canister_range_shards_lookup =
984                    ["canister_ranges".as_bytes(), delegation.subnet_id.as_ref()];
985                let canister_range_shards = lookup_tree(&cert.tree, canister_range_shards_lookup)?;
986                let mut shard_paths = canister_range_shards
987                    .list_paths() // /canister_ranges/<subnet_id>/<shard>
988                    .into_iter()
989                    .map(|mut x| {
990                        x.pop() // flatten [label] to label
991                            .ok_or_else(AgentError::CertificateVerificationFailed)
992                    })
993                    .collect::<Result<Vec<_>, _>>()?;
994                if shard_paths.is_empty() {
995                    return Err(AgentError::CertificateNotAuthorized());
996                }
997                shard_paths.sort_unstable();
998                let shard_division = shard_paths
999                    .partition_point(|shard| shard.as_bytes() <= effective_canister_id.as_slice());
1000                if shard_division == 0 {
1001                    // the certificate is not authorized to answer calls for this canister
1002                    return Err(AgentError::CertificateNotAuthorized());
1003                }
1004                let max_potential_shard = &shard_paths[shard_division - 1];
1005                let canister_range_lookup = [max_potential_shard.as_bytes()];
1006                let canister_range = lookup_value(&canister_range_shards, canister_range_lookup)?;
1007                let ranges: Vec<(Principal, Principal)> =
1008                    serde_cbor::from_slice(canister_range).map_err(AgentError::InvalidCborData)?;
1009                if !principal_is_within_ranges(&effective_canister_id, &ranges[..]) {
1010                    // the certificate is not authorized to answer calls for this canister
1011                    return Err(AgentError::CertificateNotAuthorized());
1012                }
1013
1014                let public_key_path = [
1015                    "subnet".as_bytes(),
1016                    delegation.subnet_id.as_ref(),
1017                    "public_key".as_bytes(),
1018                ];
1019                lookup_value(&cert.tree, public_key_path).map(<[u8]>::to_vec)
1020            }
1021        }
1022    }
1023
1024    fn check_delegation_for_subnet(
1025        &self,
1026        delegation: &Option<Delegation>,
1027        subnet_id: Principal,
1028    ) -> Result<Vec<u8>, AgentError> {
1029        match delegation {
1030            None => Ok(self.read_root_key()),
1031            Some(delegation) => {
1032                let cert: Certificate = serde_cbor::from_slice(&delegation.certificate)
1033                    .map_err(AgentError::InvalidCborData)?;
1034                if cert.delegation.is_some() {
1035                    return Err(AgentError::CertificateHasTooManyDelegations);
1036                }
1037                self.verify_cert_for_subnet(&cert, subnet_id)?;
1038                let public_key_path = [
1039                    "subnet".as_bytes(),
1040                    subnet_id.as_ref(),
1041                    "public_key".as_bytes(),
1042                ];
1043                let pk = lookup_value(&cert.tree, public_key_path)
1044                    .map_err(|_| AgentError::CertificateNotAuthorized())?
1045                    .to_vec();
1046                Ok(pk)
1047            }
1048        }
1049    }
1050
1051    /// Request information about a particular canister for a single state subkey.
1052    /// See [the protocol docs](https://internetcomputer.org/docs/current/references/ic-interface-spec#state-tree-canister-information) for more information.
1053    pub async fn read_state_canister_info(
1054        &self,
1055        canister_id: Principal,
1056        path: &str,
1057    ) -> Result<Vec<u8>, AgentError> {
1058        let paths: Vec<Vec<Label>> = vec![vec![
1059            "canister".into(),
1060            Label::from_bytes(canister_id.as_slice()),
1061            path.into(),
1062        ]];
1063
1064        let cert = self.read_state_raw(paths, canister_id).await?;
1065
1066        lookup_canister_info(cert, canister_id, path)
1067    }
1068
1069    /// Request the controller list of a given canister.
1070    pub async fn read_state_canister_controllers(
1071        &self,
1072        canister_id: Principal,
1073    ) -> Result<Vec<Principal>, AgentError> {
1074        let blob = self
1075            .read_state_canister_info(canister_id, "controllers")
1076            .await?;
1077        let controllers: Vec<Principal> =
1078            serde_cbor::from_slice(&blob).map_err(AgentError::InvalidCborData)?;
1079        Ok(controllers)
1080    }
1081
1082    /// Request the module hash of a given canister.
1083    pub async fn read_state_canister_module_hash(
1084        &self,
1085        canister_id: Principal,
1086    ) -> Result<Vec<u8>, AgentError> {
1087        self.read_state_canister_info(canister_id, "module_hash")
1088            .await
1089    }
1090
1091    /// Request the bytes of the canister's custom section `icp:public <path>` or `icp:private <path>`.
1092    pub async fn read_state_canister_metadata(
1093        &self,
1094        canister_id: Principal,
1095        path: &str,
1096    ) -> Result<Vec<u8>, AgentError> {
1097        let paths: Vec<Vec<Label>> = vec![vec![
1098            "canister".into(),
1099            Label::from_bytes(canister_id.as_slice()),
1100            "metadata".into(),
1101            path.into(),
1102        ]];
1103
1104        let cert = self.read_state_raw(paths, canister_id).await?;
1105
1106        lookup_canister_metadata(cert, canister_id, path)
1107    }
1108
1109    /// Request a list of metrics about the subnet.
1110    pub async fn read_state_subnet_metrics(
1111        &self,
1112        subnet_id: Principal,
1113    ) -> Result<SubnetMetrics, AgentError> {
1114        let paths = vec![vec![
1115            "subnet".into(),
1116            Label::from_bytes(subnet_id.as_slice()),
1117            "metrics".into(),
1118        ]];
1119        let cert = self.read_subnet_state_raw(paths, subnet_id).await?;
1120        lookup_subnet_metrics(cert, subnet_id)
1121    }
1122
1123    /// Request a list of metrics about the subnet.
1124    pub async fn read_state_subnet_canister_ranges(
1125        &self,
1126        subnet_id: Principal,
1127    ) -> Result<Vec<(Principal, Principal)>, AgentError> {
1128        let paths = vec![vec![
1129            "subnet".into(),
1130            Label::from_bytes(subnet_id.as_slice()),
1131            "canister_ranges".into(),
1132        ]];
1133        let cert = self.read_subnet_state_raw(paths, subnet_id).await?;
1134        lookup_subnet_canister_ranges(&cert, subnet_id)
1135    }
1136
1137    /// Fetches the status of a particular request by its ID.
1138    pub async fn request_status_raw(
1139        &self,
1140        request_id: &RequestId,
1141        effective_canister_id: Principal,
1142    ) -> Result<(RequestStatusResponse, Certificate), AgentError> {
1143        let paths: Vec<Vec<Label>> =
1144            vec![vec!["request_status".into(), request_id.to_vec().into()]];
1145
1146        let cert = self.read_state_raw(paths, effective_canister_id).await?;
1147
1148        Ok((lookup_request_status(&cert, request_id)?, cert))
1149    }
1150
1151    /// Send the signed `request_status` to the network. Will return [`RequestStatusResponse`].
1152    /// The bytes will be checked to verify that it is a valid `request_status`.
1153    /// If you want to inspect the fields of the `request_status`, use [`signed_request_status_inspect`] before calling this method.
1154    pub async fn request_status_signed(
1155        &self,
1156        request_id: &RequestId,
1157        effective_canister_id: Principal,
1158        signed_request_status: Vec<u8>,
1159    ) -> Result<(RequestStatusResponse, Certificate), AgentError> {
1160        let _envelope: Envelope =
1161            serde_cbor::from_slice(&signed_request_status).map_err(AgentError::InvalidCborData)?;
1162        let read_state_response: ReadStateResponse = self
1163            .read_state_endpoint(effective_canister_id, signed_request_status)
1164            .await?;
1165
1166        let cert: Certificate = serde_cbor::from_slice(&read_state_response.certificate)
1167            .map_err(AgentError::InvalidCborData)?;
1168        self.verify(&cert, effective_canister_id)?;
1169        Ok((lookup_request_status(&cert, request_id)?, cert))
1170    }
1171
1172    /// Returns an `UpdateBuilder` enabling the construction of an update call without
1173    /// passing all arguments.
1174    pub fn update<S: Into<String>>(
1175        &self,
1176        canister_id: &Principal,
1177        method_name: S,
1178    ) -> UpdateBuilder<'_> {
1179        UpdateBuilder::new(self, *canister_id, method_name.into())
1180    }
1181
1182    /// Calls and returns the information returned by the status endpoint of a replica.
1183    pub async fn status(&self) -> Result<Status, AgentError> {
1184        let endpoint = "api/v2/status";
1185        let bytes = self.execute(Method::GET, endpoint, None).await?.1;
1186
1187        let cbor: serde_cbor::Value =
1188            serde_cbor::from_slice(&bytes).map_err(AgentError::InvalidCborData)?;
1189
1190        Status::try_from(&cbor).map_err(|_| AgentError::InvalidReplicaStatus)
1191    }
1192
1193    /// Returns a `QueryBuilder` enabling the construction of a query call without
1194    /// passing all arguments.
1195    pub fn query<S: Into<String>>(
1196        &self,
1197        canister_id: &Principal,
1198        method_name: S,
1199    ) -> QueryBuilder<'_> {
1200        QueryBuilder::new(self, *canister_id, method_name.into())
1201    }
1202
1203    /// Sign a `request_status` call. This will return a [`signed::SignedRequestStatus`]
1204    /// which contains all fields of the `request_status` and the signed `request_status` in CBOR encoding
1205    pub fn sign_request_status(
1206        &self,
1207        effective_canister_id: Principal,
1208        request_id: RequestId,
1209    ) -> Result<SignedRequestStatus, AgentError> {
1210        let paths: Vec<Vec<Label>> =
1211            vec![vec!["request_status".into(), request_id.to_vec().into()]];
1212        let read_state_content = self.read_state_content(paths)?;
1213        let signed_request_status = sign_envelope(&read_state_content, self.identity.clone())?;
1214        let ingress_expiry = read_state_content.ingress_expiry();
1215        let sender = *read_state_content.sender();
1216        Ok(SignedRequestStatus {
1217            ingress_expiry,
1218            sender,
1219            effective_canister_id,
1220            request_id,
1221            signed_request_status,
1222        })
1223    }
1224
1225    /// Retrieve subnet information for a canister. This uses an internal five-minute cache, fresh data can
1226    /// be fetched with [`fetch_subnet_by_canister`](Self::fetch_subnet_by_canister).
1227    pub async fn get_subnet_by_canister(
1228        &self,
1229        canister: &Principal,
1230    ) -> Result<Arc<Subnet>, AgentError> {
1231        let subnet = self
1232            .subnet_key_cache
1233            .lock()
1234            .unwrap()
1235            .get_subnet_by_canister(canister);
1236        if let Some(subnet) = subnet {
1237            Ok(subnet)
1238        } else {
1239            self.fetch_subnet_by_canister(canister).await
1240        }
1241    }
1242
1243    /// Retrieve subnet information for a subnet ID. This uses an internal five-minute cache, fresh data can
1244    /// be fetched with [`fetch_subnet_by_id`](Self::fetch_subnet_by_id).
1245    pub async fn get_subnet_by_id(&self, subnet_id: &Principal) -> Result<Arc<Subnet>, AgentError> {
1246        let subnet = self
1247            .subnet_key_cache
1248            .lock()
1249            .unwrap()
1250            .get_subnet_by_id(subnet_id);
1251        if let Some(subnet) = subnet {
1252            Ok(subnet)
1253        } else {
1254            self.fetch_subnet_by_id(subnet_id).await
1255        }
1256    }
1257
1258    /// Retrieve all existing API boundary nodes from the state tree via endpoint `/api/v3/canister/<effective_canister_id>/read_state`
1259    pub async fn fetch_api_boundary_nodes_by_canister_id(
1260        &self,
1261        canister_id: Principal,
1262    ) -> Result<Vec<ApiBoundaryNode>, AgentError> {
1263        let paths = vec![vec!["api_boundary_nodes".into()]];
1264        let certificate = self.read_state_raw(paths, canister_id).await?;
1265        let api_boundary_nodes = lookup_api_boundary_nodes(certificate)?;
1266        Ok(api_boundary_nodes)
1267    }
1268
1269    /// Retrieve all existing API boundary nodes from the state tree via endpoint `/api/v3/subnet/<subnet_id>/read_state`
1270    pub async fn fetch_api_boundary_nodes_by_subnet_id(
1271        &self,
1272        subnet_id: Principal,
1273    ) -> Result<Vec<ApiBoundaryNode>, AgentError> {
1274        let paths = vec![vec!["api_boundary_nodes".into()]];
1275        let certificate = self.read_subnet_state_raw(paths, subnet_id).await?;
1276        let api_boundary_nodes = lookup_api_boundary_nodes(certificate)?;
1277        Ok(api_boundary_nodes)
1278    }
1279
1280    /// Fetches and caches the subnet information for a canister.
1281    ///
1282    /// This function does not read from the cache; most users want
1283    /// [`get_subnet_by_canister`](Self::get_subnet_by_canister) instead.
1284    pub async fn fetch_subnet_by_canister(
1285        &self,
1286        canister: &Principal,
1287    ) -> Result<Arc<Subnet>, AgentError> {
1288        let canister_cert = self
1289            .read_state_raw(vec![vec!["subnet".into()]], *canister)
1290            .await?;
1291        let subnet_id = if let Some(delegation) = canister_cert.delegation.as_ref() {
1292            Principal::from_slice(&delegation.subnet_id)
1293        } else {
1294            // if no delegation, it comes from the root subnet
1295            Principal::self_authenticating(&self.root_key.read().unwrap()[..])
1296        };
1297        let mut subnet = lookup_incomplete_subnet(&subnet_id, &canister_cert)?;
1298        let canister_ranges = if let Some(delegation) = canister_cert.delegation.as_ref() {
1299            // non-root subnets will not serve /subnet/<>/canister_ranges when looked up by canister, but their delegation will contain /canister_ranges
1300            let delegation_cert: Certificate = serde_cbor::from_slice(&delegation.certificate)?;
1301            lookup_canister_ranges(&subnet_id, &delegation_cert)?
1302        } else {
1303            lookup_canister_ranges(&subnet_id, &canister_cert)?
1304        };
1305        subnet.canister_ranges = canister_ranges;
1306        if !subnet.canister_ranges.contains(canister) {
1307            return Err(AgentError::CertificateNotAuthorized());
1308        }
1309        let subnet = Arc::new(subnet);
1310        self.subnet_key_cache
1311            .lock()
1312            .unwrap()
1313            .insert_subnet(subnet_id, subnet.clone());
1314        Ok(subnet)
1315    }
1316
1317    /// Fetches and caches the subnet information for a subnet ID.
1318    ///
1319    /// This function does not read from the cache; most users want
1320    /// [`get_subnet_by_id`](Self::get_subnet_by_id) instead.
1321    pub async fn fetch_subnet_by_id(
1322        &self,
1323        subnet_id: &Principal,
1324    ) -> Result<Arc<Subnet>, AgentError> {
1325        let subnet_cert = self
1326            .read_subnet_state_raw(
1327                vec![
1328                    vec!["canister_ranges".into(), subnet_id.as_slice().into()],
1329                    vec!["subnet".into(), subnet_id.as_slice().into()],
1330                ],
1331                *subnet_id,
1332            )
1333            .await?;
1334        let subnet = lookup_subnet_and_ranges(subnet_id, &subnet_cert)?;
1335        let subnet = Arc::new(subnet);
1336        self.subnet_key_cache
1337            .lock()
1338            .unwrap()
1339            .insert_subnet(*subnet_id, subnet.clone());
1340        Ok(subnet)
1341    }
1342
1343    async fn request(
1344        &self,
1345        method: Method,
1346        endpoint: &str,
1347        body: Option<Vec<u8>>,
1348    ) -> Result<(StatusCode, HeaderMap, Vec<u8>), AgentError> {
1349        let body = body.map(Bytes::from);
1350
1351        let create_request_with_generated_url = || -> Result<http::Request<Bytes>, AgentError> {
1352            let url = self.route_provider.route()?.join(endpoint)?;
1353            let uri = Uri::from_str(url.as_str())
1354                .map_err(|e| AgentError::InvalidReplicaUrl(e.to_string()))?;
1355            let body = body.clone().unwrap_or_default();
1356            let request = http::Request::builder()
1357                .method(method.clone())
1358                .uri(uri)
1359                .header(CONTENT_TYPE, "application/cbor")
1360                .body(body)
1361                .map_err(|e| {
1362                    AgentError::TransportError(TransportError::Generic(format!(
1363                        "unable to create request: {e:#}"
1364                    )))
1365                })?;
1366
1367            Ok(request)
1368        };
1369
1370        let response = self
1371            .client
1372            .call(
1373                &create_request_with_generated_url,
1374                self.max_tcp_error_retries,
1375                self.max_response_body_size,
1376            )
1377            .await?;
1378
1379        let (parts, body) = response.into_parts();
1380
1381        Ok((parts.status, parts.headers, body.to_vec()))
1382    }
1383
1384    async fn execute(
1385        &self,
1386        method: Method,
1387        endpoint: &str,
1388        body: Option<Vec<u8>>,
1389    ) -> Result<(StatusCode, Vec<u8>), AgentError> {
1390        let request_result = self.request(method.clone(), endpoint, body.clone()).await?;
1391
1392        let status = request_result.0;
1393        let headers = request_result.1;
1394        let body = request_result.2;
1395
1396        if status.is_client_error() || status.is_server_error() {
1397            Err(AgentError::HttpError(HttpErrorPayload {
1398                status: status.into(),
1399                content_type: headers
1400                    .get(CONTENT_TYPE)
1401                    .and_then(|value| value.to_str().ok())
1402                    .map(str::to_string),
1403                content: body,
1404            }))
1405        } else if !(status == StatusCode::OK || status == StatusCode::ACCEPTED) {
1406            Err(AgentError::InvalidHttpResponse(format!(
1407                "Expected `200`, `202`, 4xx`, or `5xx` HTTP status code. Got: {status}",
1408            )))
1409        } else {
1410            Ok((status, body))
1411        }
1412    }
1413}
1414
1415// Checks if a principal is contained within a list of principal ranges
1416// A range is a tuple: (low: Principal, high: Principal), as described here: https://internetcomputer.org/docs/current/references/ic-interface-spec#state-tree-subnet
1417fn principal_is_within_ranges(principal: &Principal, ranges: &[(Principal, Principal)]) -> bool {
1418    ranges
1419        .iter()
1420        .any(|r| principal >= &r.0 && principal <= &r.1)
1421}
1422
1423fn sign_envelope(
1424    content: &EnvelopeContent,
1425    identity: Arc<dyn Identity>,
1426) -> Result<Vec<u8>, AgentError> {
1427    let signature = identity.sign(content).map_err(AgentError::SigningError)?;
1428
1429    let envelope = Envelope {
1430        content: Cow::Borrowed(content),
1431        sender_pubkey: signature.public_key,
1432        sender_sig: signature.signature,
1433        sender_delegation: signature.delegations,
1434    };
1435
1436    let mut serialized_bytes = Vec::new();
1437    let mut serializer = serde_cbor::Serializer::new(&mut serialized_bytes);
1438    serializer.self_describe()?;
1439    envelope.serialize(&mut serializer)?;
1440
1441    Ok(serialized_bytes)
1442}
1443
1444/// Inspect the bytes to be sent as a query
1445/// Return Ok only when the bytes can be deserialized as a query and all fields match with the arguments
1446pub fn signed_query_inspect(
1447    sender: Principal,
1448    canister_id: Principal,
1449    method_name: &str,
1450    arg: &[u8],
1451    ingress_expiry: u64,
1452    signed_query: Vec<u8>,
1453) -> Result<(), AgentError> {
1454    let envelope: Envelope =
1455        serde_cbor::from_slice(&signed_query).map_err(AgentError::InvalidCborData)?;
1456    match envelope.content.as_ref() {
1457        EnvelopeContent::Query {
1458            ingress_expiry: ingress_expiry_cbor,
1459            sender: sender_cbor,
1460            canister_id: canister_id_cbor,
1461            method_name: method_name_cbor,
1462            arg: arg_cbor,
1463            nonce: _nonce,
1464        } => {
1465            if ingress_expiry != *ingress_expiry_cbor {
1466                return Err(AgentError::CallDataMismatch {
1467                    field: "ingress_expiry".to_string(),
1468                    value_arg: ingress_expiry.to_string(),
1469                    value_cbor: ingress_expiry_cbor.to_string(),
1470                });
1471            }
1472            if sender != *sender_cbor {
1473                return Err(AgentError::CallDataMismatch {
1474                    field: "sender".to_string(),
1475                    value_arg: sender.to_string(),
1476                    value_cbor: sender_cbor.to_string(),
1477                });
1478            }
1479            if canister_id != *canister_id_cbor {
1480                return Err(AgentError::CallDataMismatch {
1481                    field: "canister_id".to_string(),
1482                    value_arg: canister_id.to_string(),
1483                    value_cbor: canister_id_cbor.to_string(),
1484                });
1485            }
1486            if method_name != *method_name_cbor {
1487                return Err(AgentError::CallDataMismatch {
1488                    field: "method_name".to_string(),
1489                    value_arg: method_name.to_string(),
1490                    value_cbor: method_name_cbor.clone(),
1491                });
1492            }
1493            if arg != *arg_cbor {
1494                return Err(AgentError::CallDataMismatch {
1495                    field: "arg".to_string(),
1496                    value_arg: format!("{arg:?}"),
1497                    value_cbor: format!("{arg_cbor:?}"),
1498                });
1499            }
1500        }
1501        EnvelopeContent::Call { .. } => {
1502            return Err(AgentError::CallDataMismatch {
1503                field: "request_type".to_string(),
1504                value_arg: "query".to_string(),
1505                value_cbor: "call".to_string(),
1506            })
1507        }
1508        EnvelopeContent::ReadState { .. } => {
1509            return Err(AgentError::CallDataMismatch {
1510                field: "request_type".to_string(),
1511                value_arg: "query".to_string(),
1512                value_cbor: "read_state".to_string(),
1513            })
1514        }
1515    }
1516    Ok(())
1517}
1518
1519/// Inspect the bytes to be sent as an update
1520/// Return Ok only when the bytes can be deserialized as an update and all fields match with the arguments
1521pub fn signed_update_inspect(
1522    sender: Principal,
1523    canister_id: Principal,
1524    method_name: &str,
1525    arg: &[u8],
1526    ingress_expiry: u64,
1527    signed_update: Vec<u8>,
1528) -> Result<(), AgentError> {
1529    let envelope: Envelope =
1530        serde_cbor::from_slice(&signed_update).map_err(AgentError::InvalidCborData)?;
1531    match envelope.content.as_ref() {
1532        EnvelopeContent::Call {
1533            nonce: _nonce,
1534            ingress_expiry: ingress_expiry_cbor,
1535            sender: sender_cbor,
1536            canister_id: canister_id_cbor,
1537            method_name: method_name_cbor,
1538            arg: arg_cbor,
1539        } => {
1540            if ingress_expiry != *ingress_expiry_cbor {
1541                return Err(AgentError::CallDataMismatch {
1542                    field: "ingress_expiry".to_string(),
1543                    value_arg: ingress_expiry.to_string(),
1544                    value_cbor: ingress_expiry_cbor.to_string(),
1545                });
1546            }
1547            if sender != *sender_cbor {
1548                return Err(AgentError::CallDataMismatch {
1549                    field: "sender".to_string(),
1550                    value_arg: sender.to_string(),
1551                    value_cbor: sender_cbor.to_string(),
1552                });
1553            }
1554            if canister_id != *canister_id_cbor {
1555                return Err(AgentError::CallDataMismatch {
1556                    field: "canister_id".to_string(),
1557                    value_arg: canister_id.to_string(),
1558                    value_cbor: canister_id_cbor.to_string(),
1559                });
1560            }
1561            if method_name != *method_name_cbor {
1562                return Err(AgentError::CallDataMismatch {
1563                    field: "method_name".to_string(),
1564                    value_arg: method_name.to_string(),
1565                    value_cbor: method_name_cbor.clone(),
1566                });
1567            }
1568            if arg != *arg_cbor {
1569                return Err(AgentError::CallDataMismatch {
1570                    field: "arg".to_string(),
1571                    value_arg: format!("{arg:?}"),
1572                    value_cbor: format!("{arg_cbor:?}"),
1573                });
1574            }
1575        }
1576        EnvelopeContent::ReadState { .. } => {
1577            return Err(AgentError::CallDataMismatch {
1578                field: "request_type".to_string(),
1579                value_arg: "call".to_string(),
1580                value_cbor: "read_state".to_string(),
1581            })
1582        }
1583        EnvelopeContent::Query { .. } => {
1584            return Err(AgentError::CallDataMismatch {
1585                field: "request_type".to_string(),
1586                value_arg: "call".to_string(),
1587                value_cbor: "query".to_string(),
1588            })
1589        }
1590    }
1591    Ok(())
1592}
1593
1594/// Inspect the bytes to be sent as a `request_status`
1595/// Return Ok only when the bytes can be deserialized as a `request_status` and all fields match with the arguments
1596pub fn signed_request_status_inspect(
1597    sender: Principal,
1598    request_id: &RequestId,
1599    ingress_expiry: u64,
1600    signed_request_status: Vec<u8>,
1601) -> Result<(), AgentError> {
1602    let paths: Vec<Vec<Label>> = vec![vec!["request_status".into(), request_id.to_vec().into()]];
1603    let envelope: Envelope =
1604        serde_cbor::from_slice(&signed_request_status).map_err(AgentError::InvalidCborData)?;
1605    match envelope.content.as_ref() {
1606        EnvelopeContent::ReadState {
1607            ingress_expiry: ingress_expiry_cbor,
1608            sender: sender_cbor,
1609            paths: paths_cbor,
1610        } => {
1611            if ingress_expiry != *ingress_expiry_cbor {
1612                return Err(AgentError::CallDataMismatch {
1613                    field: "ingress_expiry".to_string(),
1614                    value_arg: ingress_expiry.to_string(),
1615                    value_cbor: ingress_expiry_cbor.to_string(),
1616                });
1617            }
1618            if sender != *sender_cbor {
1619                return Err(AgentError::CallDataMismatch {
1620                    field: "sender".to_string(),
1621                    value_arg: sender.to_string(),
1622                    value_cbor: sender_cbor.to_string(),
1623                });
1624            }
1625
1626            if paths != *paths_cbor {
1627                return Err(AgentError::CallDataMismatch {
1628                    field: "paths".to_string(),
1629                    value_arg: format!("{paths:?}"),
1630                    value_cbor: format!("{paths_cbor:?}"),
1631                });
1632            }
1633        }
1634        EnvelopeContent::Query { .. } => {
1635            return Err(AgentError::CallDataMismatch {
1636                field: "request_type".to_string(),
1637                value_arg: "read_state".to_string(),
1638                value_cbor: "query".to_string(),
1639            })
1640        }
1641        EnvelopeContent::Call { .. } => {
1642            return Err(AgentError::CallDataMismatch {
1643                field: "request_type".to_string(),
1644                value_arg: "read_state".to_string(),
1645                value_cbor: "call".to_string(),
1646            })
1647        }
1648    }
1649    Ok(())
1650}
1651
1652#[derive(Clone)]
1653struct SubnetCache {
1654    subnets: TimedCache<Principal, Arc<Subnet>>,
1655    canister_index: RangeInclusiveMap<Principal, Principal>,
1656}
1657
1658impl SubnetCache {
1659    fn new() -> Self {
1660        Self {
1661            subnets: TimedCache::with_lifespan(Duration::from_secs(300)),
1662            canister_index: RangeInclusiveMap::new(),
1663        }
1664    }
1665
1666    fn get_subnet_by_canister(&mut self, canister: &Principal) -> Option<Arc<Subnet>> {
1667        self.canister_index
1668            .get(canister)
1669            .and_then(|subnet_id| self.subnets.cache_get(subnet_id).cloned())
1670            .filter(|subnet| subnet.canister_ranges.contains(canister))
1671    }
1672
1673    fn get_subnet_by_id(&mut self, subnet_id: &Principal) -> Option<Arc<Subnet>> {
1674        self.subnets.cache_get(subnet_id).cloned()
1675    }
1676
1677    fn insert_subnet(&mut self, subnet_id: Principal, subnet: Arc<Subnet>) {
1678        self.subnets.cache_set(subnet_id, subnet.clone());
1679        for range in subnet.canister_ranges.iter() {
1680            self.canister_index.insert(range.clone(), subnet_id);
1681        }
1682    }
1683}
1684
1685/// API boundary node, which routes /api calls to IC replica nodes.
1686#[derive(Debug, Clone)]
1687pub struct ApiBoundaryNode {
1688    /// Domain name
1689    pub domain: String,
1690    /// IPv6 address in the hexadecimal notation with colons.
1691    pub ipv6_address: String,
1692    /// IPv4 address in the dotted-decimal notation.
1693    pub ipv4_address: Option<String>,
1694}
1695
1696/// A query request builder.
1697///
1698/// This makes it easier to do query calls without actually passing all arguments.
1699#[derive(Debug, Clone)]
1700#[non_exhaustive]
1701pub struct QueryBuilder<'agent> {
1702    agent: &'agent Agent,
1703    /// The [effective canister ID](https://internetcomputer.org/docs/current/references/ic-interface-spec#http-effective-canister-id) of the destination.
1704    pub effective_canister_id: Principal,
1705    /// The principal ID of the canister being called.
1706    pub canister_id: Principal,
1707    /// The name of the canister method being called.
1708    pub method_name: String,
1709    /// The argument blob to be passed to the method.
1710    pub arg: Vec<u8>,
1711    /// The Unix timestamp that the request will expire at.
1712    pub ingress_expiry_datetime: Option<u64>,
1713    /// Whether to include a nonce with the message.
1714    pub use_nonce: bool,
1715}
1716
1717impl<'agent> QueryBuilder<'agent> {
1718    /// Creates a new query builder with an agent for a particular canister method.
1719    pub fn new(agent: &'agent Agent, canister_id: Principal, method_name: String) -> Self {
1720        Self {
1721            agent,
1722            effective_canister_id: canister_id,
1723            canister_id,
1724            method_name,
1725            arg: vec![],
1726            ingress_expiry_datetime: None,
1727            use_nonce: false,
1728        }
1729    }
1730
1731    /// Sets the [effective canister ID](https://internetcomputer.org/docs/current/references/ic-interface-spec#http-effective-canister-id) of the destination.
1732    pub fn with_effective_canister_id(mut self, canister_id: Principal) -> Self {
1733        self.effective_canister_id = canister_id;
1734        self
1735    }
1736
1737    /// Sets the argument blob to pass to the canister. For most canisters this should be a Candid-serialized tuple.
1738    pub fn with_arg<A: Into<Vec<u8>>>(mut self, arg: A) -> Self {
1739        self.arg = arg.into();
1740        self
1741    }
1742
1743    /// Sets `ingress_expiry_datetime` to the provided timestamp, at nanosecond precision.
1744    pub fn expire_at(mut self, time: impl Into<OffsetDateTime>) -> Self {
1745        self.ingress_expiry_datetime = Some(time.into().unix_timestamp_nanos() as u64);
1746        self
1747    }
1748
1749    /// Sets `ingress_expiry_datetime` to `max(now, 4min)`.
1750    pub fn expire_after(mut self, duration: Duration) -> Self {
1751        self.ingress_expiry_datetime = Some(
1752            OffsetDateTime::now_utc()
1753                .saturating_add(duration.try_into().expect("negative duration"))
1754                .unix_timestamp_nanos() as u64,
1755        );
1756        self
1757    }
1758
1759    /// Uses a nonce generated with the agent's configured nonce factory. By default queries do not use nonces,
1760    /// and thus may get a (briefly) cached response.
1761    pub fn with_nonce_generation(mut self) -> Self {
1762        self.use_nonce = true;
1763        self
1764    }
1765
1766    /// Make a query call. This will return a byte vector.
1767    pub async fn call(self) -> Result<Vec<u8>, AgentError> {
1768        self.agent
1769            .query_raw(
1770                self.canister_id,
1771                self.effective_canister_id,
1772                self.method_name,
1773                self.arg,
1774                self.ingress_expiry_datetime,
1775                self.use_nonce,
1776                None,
1777            )
1778            .await
1779    }
1780
1781    /// Make a query call with signature verification. This will return a byte vector.
1782    ///
1783    /// Compared with [call][Self::call], this method will **always** verify the signature of the query response
1784    /// regardless the Agent level configuration from [`AgentBuilder::with_verify_query_signatures`].
1785    pub async fn call_with_verification(self) -> Result<Vec<u8>, AgentError> {
1786        self.agent
1787            .query_raw(
1788                self.canister_id,
1789                self.effective_canister_id,
1790                self.method_name,
1791                self.arg,
1792                self.ingress_expiry_datetime,
1793                self.use_nonce,
1794                Some(true),
1795            )
1796            .await
1797    }
1798
1799    /// Make a query call without signature verification. This will return a byte vector.
1800    ///
1801    /// Compared with [call][Self::call], this method will **never** verify the signature of the query response
1802    /// regardless the Agent level configuration from [`AgentBuilder::with_verify_query_signatures`].
1803    pub async fn call_without_verification(self) -> Result<Vec<u8>, AgentError> {
1804        self.agent
1805            .query_raw(
1806                self.canister_id,
1807                self.effective_canister_id,
1808                self.method_name,
1809                self.arg,
1810                self.ingress_expiry_datetime,
1811                self.use_nonce,
1812                Some(false),
1813            )
1814            .await
1815    }
1816
1817    /// Sign a query call. This will return a [`signed::SignedQuery`]
1818    /// which contains all fields of the query and the signed query in CBOR encoding
1819    pub fn sign(self) -> Result<SignedQuery, AgentError> {
1820        let effective_canister_id = self.effective_canister_id;
1821        let identity = self.agent.identity.clone();
1822        let content = self.into_envelope()?;
1823        let signed_query = sign_envelope(&content, identity)?;
1824        let EnvelopeContent::Query {
1825            ingress_expiry,
1826            sender,
1827            canister_id,
1828            method_name,
1829            arg,
1830            nonce,
1831        } = content
1832        else {
1833            unreachable!()
1834        };
1835        Ok(SignedQuery {
1836            ingress_expiry,
1837            sender,
1838            canister_id,
1839            method_name,
1840            arg,
1841            effective_canister_id,
1842            signed_query,
1843            nonce,
1844        })
1845    }
1846
1847    /// Converts the query builder into [`EnvelopeContent`] for external signing or storage.
1848    pub fn into_envelope(self) -> Result<EnvelopeContent, AgentError> {
1849        self.agent.query_content(
1850            self.canister_id,
1851            self.method_name,
1852            self.arg,
1853            self.ingress_expiry_datetime,
1854            self.use_nonce,
1855        )
1856    }
1857}
1858
1859impl<'agent> IntoFuture for QueryBuilder<'agent> {
1860    type IntoFuture = AgentFuture<'agent, Vec<u8>>;
1861    type Output = Result<Vec<u8>, AgentError>;
1862    fn into_future(self) -> Self::IntoFuture {
1863        Box::pin(self.call())
1864    }
1865}
1866
1867/// An in-flight canister update call. Useful primarily as a `Future`.
1868pub struct UpdateCall<'agent> {
1869    agent: &'agent Agent,
1870    response_future: AgentFuture<'agent, CallResponse<(Vec<u8>, Certificate)>>,
1871    effective_canister_id: Principal,
1872    canister_id: Principal,
1873    method_name: String,
1874}
1875
1876impl fmt::Debug for UpdateCall<'_> {
1877    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1878        f.debug_struct("UpdateCall")
1879            .field("agent", &self.agent)
1880            .field("effective_canister_id", &self.effective_canister_id)
1881            .finish_non_exhaustive()
1882    }
1883}
1884
1885impl Future for UpdateCall<'_> {
1886    type Output = Result<CallResponse<(Vec<u8>, Certificate)>, AgentError>;
1887    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1888        self.response_future.as_mut().poll(cx)
1889    }
1890}
1891
1892impl<'a> UpdateCall<'a> {
1893    /// Waits for the update call to be completed, polling if necessary.
1894    pub async fn and_wait(self) -> Result<(Vec<u8>, Certificate), AgentError> {
1895        let response = self.response_future.await?;
1896
1897        match response {
1898            CallResponse::Response(response) => Ok(response),
1899            CallResponse::Poll(request_id) => {
1900                self.agent
1901                    .wait_inner(
1902                        &request_id,
1903                        self.effective_canister_id,
1904                        Some(Operation::Call {
1905                            canister: self.canister_id,
1906                            method: self.method_name,
1907                        }),
1908                    )
1909                    .await
1910            }
1911        }
1912    }
1913}
1914/// An update request Builder.
1915///
1916/// This makes it easier to do update calls without actually passing all arguments or specifying
1917/// if you want to wait or not.
1918#[derive(Debug)]
1919pub struct UpdateBuilder<'agent> {
1920    agent: &'agent Agent,
1921    /// The [effective canister ID](https://internetcomputer.org/docs/current/references/ic-interface-spec#http-effective-canister-id) of the destination.
1922    pub effective_canister_id: Principal,
1923    /// The principal ID of the canister being called.
1924    pub canister_id: Principal,
1925    /// The name of the canister method being called.
1926    pub method_name: String,
1927    /// The argument blob to be passed to the method.
1928    pub arg: Vec<u8>,
1929    /// The Unix timestamp that the request will expire at.
1930    pub ingress_expiry_datetime: Option<u64>,
1931}
1932
1933impl<'agent> UpdateBuilder<'agent> {
1934    /// Creates a new update builder with an agent for a particular canister method.
1935    pub fn new(agent: &'agent Agent, canister_id: Principal, method_name: String) -> Self {
1936        Self {
1937            agent,
1938            effective_canister_id: canister_id,
1939            canister_id,
1940            method_name,
1941            arg: vec![],
1942            ingress_expiry_datetime: None,
1943        }
1944    }
1945
1946    /// Sets the [effective canister ID](https://internetcomputer.org/docs/current/references/ic-interface-spec#http-effective-canister-id) of the destination.
1947    pub fn with_effective_canister_id(mut self, canister_id: Principal) -> Self {
1948        self.effective_canister_id = canister_id;
1949        self
1950    }
1951
1952    /// Sets the argument blob to pass to the canister. For most canisters this should be a Candid-serialized tuple.
1953    pub fn with_arg<A: Into<Vec<u8>>>(mut self, arg: A) -> Self {
1954        self.arg = arg.into();
1955        self
1956    }
1957
1958    /// Sets `ingress_expiry_datetime` to the provided timestamp, at nanosecond precision.
1959    pub fn expire_at(mut self, time: impl Into<OffsetDateTime>) -> Self {
1960        self.ingress_expiry_datetime = Some(time.into().unix_timestamp_nanos() as u64);
1961        self
1962    }
1963
1964    /// Sets `ingress_expiry_datetime` to `min(now, 4min)`.
1965    pub fn expire_after(mut self, duration: Duration) -> Self {
1966        self.ingress_expiry_datetime = Some(
1967            OffsetDateTime::now_utc()
1968                .saturating_add(duration.try_into().expect("negative duration"))
1969                .unix_timestamp_nanos() as u64,
1970        );
1971        self
1972    }
1973
1974    /// Make an update call. This will call `request_status` on the `RequestId` in a loop and return
1975    /// the response as a byte vector.
1976    pub async fn call_and_wait(self) -> Result<Vec<u8>, AgentError> {
1977        self.call().and_wait().await.map(|x| x.0)
1978    }
1979
1980    /// Make an update call. This will return a `RequestId`.
1981    /// The `RequestId` should then be used for `request_status` (most likely in a loop).
1982    pub fn call(self) -> UpdateCall<'agent> {
1983        let method_name = self.method_name.clone();
1984        let response_future = async move {
1985            self.agent
1986                .update_raw(
1987                    self.canister_id,
1988                    self.effective_canister_id,
1989                    self.method_name,
1990                    self.arg,
1991                    self.ingress_expiry_datetime,
1992                )
1993                .await
1994        };
1995        UpdateCall {
1996            agent: self.agent,
1997            response_future: Box::pin(response_future),
1998            effective_canister_id: self.effective_canister_id,
1999            canister_id: self.canister_id,
2000            method_name,
2001        }
2002    }
2003
2004    /// Sign a update call. This will return a [`signed::SignedUpdate`]
2005    /// which contains all fields of the update and the signed update in CBOR encoding
2006    pub fn sign(self) -> Result<SignedUpdate, AgentError> {
2007        let identity = self.agent.identity.clone();
2008        let effective_canister_id = self.effective_canister_id;
2009        let content = self.into_envelope()?;
2010        let signed_update = sign_envelope(&content, identity)?;
2011        let request_id = to_request_id(&content)?;
2012        let EnvelopeContent::Call {
2013            nonce,
2014            ingress_expiry,
2015            sender,
2016            canister_id,
2017            method_name,
2018            arg,
2019        } = content
2020        else {
2021            unreachable!()
2022        };
2023        Ok(SignedUpdate {
2024            nonce,
2025            ingress_expiry,
2026            sender,
2027            canister_id,
2028            method_name,
2029            arg,
2030            effective_canister_id,
2031            signed_update,
2032            request_id,
2033        })
2034    }
2035
2036    /// Converts the update builder into an [`EnvelopeContent`] for external signing or storage.
2037    pub fn into_envelope(self) -> Result<EnvelopeContent, AgentError> {
2038        let nonce = self.agent.nonce_factory.generate();
2039        self.agent.update_content(
2040            self.canister_id,
2041            self.method_name,
2042            self.arg,
2043            self.ingress_expiry_datetime,
2044            nonce,
2045        )
2046    }
2047}
2048
2049impl<'agent> IntoFuture for UpdateBuilder<'agent> {
2050    type IntoFuture = AgentFuture<'agent, Vec<u8>>;
2051    type Output = Result<Vec<u8>, AgentError>;
2052    fn into_future(self) -> Self::IntoFuture {
2053        Box::pin(self.call_and_wait())
2054    }
2055}
2056
2057/// HTTP client middleware. Implemented automatically for `reqwest`-compatible by-ref `tower::Service`, such as `reqwest_middleware`.
2058#[cfg_attr(target_family = "wasm", async_trait(?Send))]
2059#[cfg_attr(not(target_family = "wasm"), async_trait)]
2060pub trait HttpService: Send + Sync + Debug {
2061    /// Perform a HTTP request. Any retry logic should call `req` again to get a new request.
2062    async fn call<'a>(
2063        &'a self,
2064        req: &'a (dyn Fn() -> Result<http::Request<Bytes>, AgentError> + Send + Sync),
2065        max_retries: usize,
2066        size_limit: Option<usize>,
2067    ) -> Result<http::Response<Bytes>, AgentError>;
2068}
2069
2070/// Convert from http Request to reqwest's one
2071fn from_http_request(req: http::Request<Bytes>) -> Result<Request, AgentError> {
2072    let (parts, body) = req.into_parts();
2073    let body = reqwest::Body::from(body);
2074    // I think it can never fail since it converts from `Url` to `Uri` and `Url` is a subset of `Uri`,
2075    // but just to be safe let's handle it.
2076    let request = http::Request::from_parts(parts, body)
2077        .try_into()
2078        .map_err(|e: reqwest::Error| AgentError::InvalidReplicaUrl(e.to_string()))?;
2079
2080    Ok(request)
2081}
2082
2083/// Convert from reqwests's Response to http one
2084#[cfg(not(target_family = "wasm"))]
2085async fn to_http_response(
2086    resp: Response,
2087    size_limit: Option<usize>,
2088) -> Result<http::Response<Bytes>, AgentError> {
2089    use http_body_util::{BodyExt, Limited};
2090
2091    let resp: http::Response<reqwest::Body> = resp.into();
2092    let (parts, body) = resp.into_parts();
2093    let body = Limited::new(body, size_limit.unwrap_or(usize::MAX));
2094    let body = body
2095        .collect()
2096        .await
2097        .map_err(|e| {
2098            AgentError::TransportError(TransportError::Generic(format!(
2099                "unable to read response body: {e:#}"
2100            )))
2101        })?
2102        .to_bytes();
2103    let resp = http::Response::from_parts(parts, body);
2104
2105    Ok(resp)
2106}
2107
2108/// Convert from reqwests's Response to http one.
2109/// WASM Response in reqwest doesn't have direct conversion for http::Response,
2110/// so we have to hack around using streams.
2111#[cfg(target_family = "wasm")]
2112async fn to_http_response(
2113    resp: Response,
2114    size_limit: Option<usize>,
2115) -> Result<http::Response<Bytes>, AgentError> {
2116    use futures_util::StreamExt;
2117    use http_body::Frame;
2118    use http_body_util::{Limited, StreamBody};
2119
2120    // Save headers
2121    let status = resp.status();
2122    let headers = resp.headers().clone();
2123
2124    // Convert body
2125    let stream = resp.bytes_stream().map(|x| x.map(Frame::data));
2126    let body = StreamBody::new(stream);
2127    let body = Limited::new(body, size_limit.unwrap_or(usize::MAX));
2128    let body = http_body_util::BodyExt::collect(body)
2129        .await
2130        .map_err(|e| {
2131            AgentError::TransportError(TransportError::Generic(format!(
2132                "unable to read response body: {e:#}"
2133            )))
2134        })?
2135        .to_bytes();
2136
2137    let mut resp = http::Response::new(body);
2138    *resp.status_mut() = status;
2139    *resp.headers_mut() = headers;
2140
2141    Ok(resp)
2142}
2143
2144#[cfg(not(target_family = "wasm"))]
2145#[async_trait]
2146impl<T> HttpService for T
2147where
2148    for<'a> &'a T: Service<Request, Response = Response, Error = reqwest::Error>,
2149    for<'a> <&'a Self as Service<Request>>::Future: Send,
2150    T: Send + Sync + Debug + ?Sized,
2151{
2152    #[allow(clippy::needless_arbitrary_self_type)]
2153    async fn call<'a>(
2154        mut self: &'a Self,
2155        req: &'a (dyn Fn() -> Result<http::Request<Bytes>, AgentError> + Send + Sync),
2156        max_retries: usize,
2157        size_limit: Option<usize>,
2158    ) -> Result<http::Response<Bytes>, AgentError> {
2159        let mut retry_count = 0;
2160        loop {
2161            let request = from_http_request(req()?)?;
2162
2163            match Service::call(&mut self, request).await {
2164                Err(err) => {
2165                    // Network-related errors can be retried.
2166                    if err.is_connect() {
2167                        if retry_count >= max_retries {
2168                            return Err(AgentError::TransportError(TransportError::Reqwest(err)));
2169                        }
2170                        retry_count += 1;
2171                    }
2172                    // All other errors return immediately.
2173                    else {
2174                        return Err(AgentError::TransportError(TransportError::Reqwest(err)));
2175                    }
2176                }
2177
2178                Ok(resp) => {
2179                    let resp = to_http_response(resp, size_limit).await?;
2180                    return Ok(resp);
2181                }
2182            }
2183        }
2184    }
2185}
2186
2187#[cfg(target_family = "wasm")]
2188#[async_trait(?Send)]
2189impl<T> HttpService for T
2190where
2191    for<'a> &'a T: Service<Request, Response = Response, Error = reqwest::Error>,
2192    T: Send + Sync + Debug + ?Sized,
2193{
2194    #[allow(clippy::needless_arbitrary_self_type)]
2195    async fn call<'a>(
2196        mut self: &'a Self,
2197        req: &'a (dyn Fn() -> Result<http::Request<Bytes>, AgentError> + Send + Sync),
2198        _retries: usize,
2199        _size_limit: Option<usize>,
2200    ) -> Result<http::Response<Bytes>, AgentError> {
2201        let request = from_http_request(req()?)?;
2202        let response = Service::call(&mut self, request)
2203            .await
2204            .map_err(|e| AgentError::TransportError(TransportError::Reqwest(e)))?;
2205
2206        to_http_response(response, _size_limit).await
2207    }
2208}
2209
2210#[derive(Debug)]
2211struct Retry429Logic {
2212    client: Client,
2213}
2214
2215#[cfg_attr(target_family = "wasm", async_trait(?Send))]
2216#[cfg_attr(not(target_family = "wasm"), async_trait)]
2217impl HttpService for Retry429Logic {
2218    async fn call<'a>(
2219        &'a self,
2220        req: &'a (dyn Fn() -> Result<http::Request<Bytes>, AgentError> + Send + Sync),
2221        _max_tcp_retries: usize,
2222        _size_limit: Option<usize>,
2223    ) -> Result<http::Response<Bytes>, AgentError> {
2224        let mut retries = 0;
2225        loop {
2226            #[cfg(not(target_family = "wasm"))]
2227            let resp = self.client.call(req, _max_tcp_retries, _size_limit).await?;
2228            // Client inconveniently does not implement Service on wasm
2229            #[cfg(target_family = "wasm")]
2230            let resp = {
2231                let request = from_http_request(req()?)?;
2232                let resp = self
2233                    .client
2234                    .execute(request)
2235                    .await
2236                    .map_err(|e| AgentError::TransportError(TransportError::Reqwest(e)))?;
2237
2238                to_http_response(resp, _size_limit).await?
2239            };
2240
2241            if resp.status() == StatusCode::TOO_MANY_REQUESTS {
2242                if retries == 6 {
2243                    break Ok(resp);
2244                } else {
2245                    retries += 1;
2246                    crate::util::sleep(Duration::from_millis(250)).await;
2247                    continue;
2248                }
2249            } else {
2250                break Ok(resp);
2251            }
2252        }
2253    }
2254}
2255
2256#[cfg(all(test, not(target_family = "wasm")))]
2257mod offline_tests {
2258    use super::*;
2259    use tokio::net::TcpListener;
2260    // Any tests that involve the network should go in agent_test, not here.
2261
2262    #[test]
2263    fn rounded_expiry() {
2264        let agent = Agent::builder()
2265            .with_url("http://not-a-real-url")
2266            .build()
2267            .unwrap();
2268        let mut prev_expiry = None;
2269        let mut num_timestamps = 0;
2270        for _ in 0..6 {
2271            let update = agent
2272                .update(&Principal::management_canister(), "not_a_method")
2273                .sign()
2274                .unwrap();
2275            if prev_expiry < Some(update.ingress_expiry) {
2276                prev_expiry = Some(update.ingress_expiry);
2277                num_timestamps += 1;
2278            }
2279        }
2280        // in six requests, there should be no more than two timestamps
2281        assert!(num_timestamps <= 2, "num_timestamps:{num_timestamps} > 2");
2282    }
2283
2284    #[tokio::test]
2285    async fn client_ratelimit() {
2286        let mock_server = TcpListener::bind("127.0.0.1:0").await.unwrap();
2287        let count = Arc::new(Mutex::new(0));
2288        let port = mock_server.local_addr().unwrap().port();
2289        tokio::spawn({
2290            let count = count.clone();
2291            async move {
2292                loop {
2293                    let (mut conn, _) = mock_server.accept().await.unwrap();
2294                    *count.lock().unwrap() += 1;
2295                    tokio::spawn(
2296                        // read all data, never reply
2297                        async move { tokio::io::copy(&mut conn, &mut tokio::io::sink()).await },
2298                    );
2299                }
2300            }
2301        });
2302        let agent = Agent::builder()
2303            .with_http_client(Client::builder().http1_only().build().unwrap())
2304            .with_url(format!("http://127.0.0.1:{port}"))
2305            .with_max_concurrent_requests(2)
2306            .build()
2307            .unwrap();
2308        for _ in 0..3 {
2309            let agent = agent.clone();
2310            tokio::spawn(async move {
2311                agent
2312                    .query(&"ryjl3-tyaaa-aaaaa-aaaba-cai".parse().unwrap(), "greet")
2313                    .call()
2314                    .await
2315            });
2316        }
2317        crate::util::sleep(Duration::from_millis(250)).await;
2318        assert_eq!(*count.lock().unwrap(), 2);
2319    }
2320}