1pub(crate) mod agent_config;
3pub mod agent_error;
4pub(crate) mod builder;
5#[doc(hidden)]
7#[deprecated(since = "0.38.0", note = "use the AgentBuilder methods")]
8pub mod http_transport;
9pub(crate) mod nonce;
10pub(crate) mod response_authentication;
11pub mod route_provider;
12pub mod status;
13
14pub use agent_config::AgentConfig;
15pub use agent_error::AgentError;
16use agent_error::{HttpErrorPayload, Operation};
17use async_lock::Semaphore;
18use async_trait::async_trait;
19pub use builder::AgentBuilder;
20use cached::{Cached, TimedCache};
21use futures_util::StreamExt;
22use http::{header::CONTENT_TYPE, HeaderMap, Method, StatusCode};
23use ic_ed25519::{PublicKey, SignatureError};
24#[doc(inline)]
25pub use ic_transport_types::{
26 signed, CallResponse, Envelope, EnvelopeContent, RejectCode, RejectResponse, ReplyResponse,
27 RequestStatusResponse,
28};
29pub use nonce::{NonceFactory, NonceGenerator};
30use rangemap::{RangeInclusiveMap, RangeInclusiveSet, StepFns};
31use reqwest::{Body, Client, Request, Response};
32use route_provider::{
33 dynamic_routing::{
34 dynamic_route_provider::DynamicRouteProviderBuilder, node::Node,
35 snapshot::latency_based_routing::LatencyRoutingSnapshot,
36 },
37 RouteProvider, UrlUntilReady,
38};
39use time::OffsetDateTime;
40use tower_service::Service;
41
42#[cfg(test)]
43mod agent_test;
44
45use crate::{
46 agent::response_authentication::{
47 extract_der, lookup_canister_info, lookup_canister_metadata, lookup_request_status,
48 lookup_subnet, lookup_subnet_metrics, lookup_time, lookup_value,
49 },
50 export::Principal,
51 identity::Identity,
52 to_request_id, RequestId,
53};
54use backoff::{backoff::Backoff, ExponentialBackoffBuilder};
55use backoff::{exponential::ExponentialBackoff, SystemClock};
56use ic_certification::{Certificate, Delegation, Label};
57use ic_transport_types::{
58 signed::{SignedQuery, SignedRequestStatus, SignedUpdate},
59 QueryResponse, ReadStateResponse, SubnetMetrics, TransportCallResponse,
60};
61use serde::Serialize;
62use status::Status;
63use std::{
64 borrow::Cow,
65 collections::HashMap,
66 convert::TryFrom,
67 fmt::{self, Debug},
68 future::{Future, IntoFuture},
69 pin::Pin,
70 sync::{Arc, Mutex, RwLock},
71 task::{Context, Poll},
72 time::Duration,
73};
74
75use crate::agent::response_authentication::lookup_api_boundary_nodes;
76
77const IC_STATE_ROOT_DOMAIN_SEPARATOR: &[u8; 14] = b"\x0Dic-state-root";
78
79const IC_ROOT_KEY: &[u8; 133] = b"\x30\x81\x82\x30\x1d\x06\x0d\x2b\x06\x01\x04\x01\x82\xdc\x7c\x05\x03\x01\x02\x01\x06\x0c\x2b\x06\x01\x04\x01\x82\xdc\x7c\x05\x03\x02\x01\x03\x61\x00\x81\x4c\x0e\x6e\xc7\x1f\xab\x58\x3b\x08\xbd\x81\x37\x3c\x25\x5c\x3c\x37\x1b\x2e\x84\x86\x3c\x98\xa4\xf1\xe0\x8b\x74\x23\x5d\x14\xfb\x5d\x9c\x0c\xd5\x46\xd9\x68\x5f\x91\x3a\x0c\x0b\x2c\xc5\x34\x15\x83\xbf\x4b\x43\x92\xe4\x67\xdb\x96\xd6\x5b\x9b\xb4\xcb\x71\x71\x12\xf8\x47\x2e\x0d\x5a\x4d\x14\x50\x5f\xfd\x74\x84\xb0\x12\x91\x09\x1c\x5f\x87\xb9\x88\x83\x46\x3f\x98\x09\x1a\x0b\xaa\xae";
80
81#[cfg(not(target_family = "wasm"))]
82type AgentFuture<'a, V> = Pin<Box<dyn Future<Output = Result<V, AgentError>> + Send + 'a>>;
83
84#[cfg(target_family = "wasm")]
85type AgentFuture<'a, V> = Pin<Box<dyn Future<Output = Result<V, AgentError>> + 'a>>;
86
87#[derive(Clone)]
151pub struct Agent {
152 nonce_factory: Arc<dyn NonceGenerator>,
153 identity: Arc<dyn Identity>,
154 ingress_expiry: Duration,
155 root_key: Arc<RwLock<Vec<u8>>>,
156 client: Arc<dyn HttpService>,
157 route_provider: Arc<dyn RouteProvider>,
158 subnet_key_cache: Arc<Mutex<SubnetCache>>,
159 concurrent_requests_semaphore: Arc<Semaphore>,
160 verify_query_signatures: bool,
161 max_response_body_size: Option<usize>,
162 max_polling_time: Duration,
163 #[allow(dead_code)]
164 max_tcp_error_retries: usize,
165}
166
167impl fmt::Debug for Agent {
168 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
169 f.debug_struct("Agent")
170 .field("ingress_expiry", &self.ingress_expiry)
171 .finish_non_exhaustive()
172 }
173}
174
175impl Agent {
176 pub fn builder() -> builder::AgentBuilder {
179 Default::default()
180 }
181
182 pub fn new(config: agent_config::AgentConfig) -> Result<Agent, AgentError> {
184 let client = config.http_service.unwrap_or_else(|| {
185 Arc::new(Retry429Logic {
186 client: config.client.unwrap_or_else(|| {
187 #[cfg(not(target_family = "wasm"))]
188 {
189 Client::builder()
190 .use_rustls_tls()
191 .timeout(Duration::from_secs(360))
192 .build()
193 .expect("Could not create HTTP client.")
194 }
195 #[cfg(all(target_family = "wasm", feature = "wasm-bindgen"))]
196 {
197 Client::new()
198 }
199 }),
200 })
201 });
202 Ok(Agent {
203 nonce_factory: config.nonce_factory,
204 identity: config.identity,
205 ingress_expiry: config.ingress_expiry,
206 root_key: Arc::new(RwLock::new(IC_ROOT_KEY.to_vec())),
207 client: client.clone(),
208 route_provider: if let Some(route_provider) = config.route_provider {
209 route_provider
210 } else if let Some(url) = config.url {
211 if config.background_dynamic_routing {
212 assert!(
213 url.scheme() == "https" && url.path() == "/" && url.port().is_none() && url.domain().is_some(),
214 "in dynamic routing mode, URL must be in the exact form https://domain with no path, port, IP, or non-HTTPS scheme"
215 );
216 let seeds = vec![Node::new(url.domain().unwrap()).unwrap()];
217 UrlUntilReady::new(url, async move {
218 DynamicRouteProviderBuilder::new(
219 LatencyRoutingSnapshot::new(),
220 seeds,
221 client,
222 )
223 .build()
224 .await
225 }) as Arc<dyn RouteProvider>
226 } else {
227 Arc::new(url)
228 }
229 } else {
230 panic!("either route_provider or url must be specified");
231 },
232 subnet_key_cache: Arc::new(Mutex::new(SubnetCache::new())),
233 verify_query_signatures: config.verify_query_signatures,
234 concurrent_requests_semaphore: Arc::new(Semaphore::new(config.max_concurrent_requests)),
235 max_response_body_size: config.max_response_body_size,
236 max_tcp_error_retries: config.max_tcp_error_retries,
237 max_polling_time: config.max_polling_time,
238 })
239 }
240
241 pub fn set_identity<I>(&mut self, identity: I)
247 where
248 I: 'static + Identity,
249 {
250 self.identity = Arc::new(identity);
251 }
252 pub fn set_arc_identity(&mut self, identity: Arc<dyn Identity>) {
258 self.identity = identity;
259 }
260
261 pub async fn fetch_root_key(&self) -> Result<(), AgentError> {
270 if self.read_root_key()[..] != IC_ROOT_KEY[..] {
271 return Ok(());
273 }
274 let status = self.status().await?;
275 let Some(root_key) = status.root_key else {
276 return Err(AgentError::NoRootKeyInStatus(status));
277 };
278 self.set_root_key(root_key);
279 Ok(())
280 }
281
282 pub fn set_root_key(&self, root_key: Vec<u8>) {
287 *self.root_key.write().unwrap() = root_key;
288 }
289
290 pub fn read_root_key(&self) -> Vec<u8> {
292 self.root_key.read().unwrap().clone()
293 }
294
295 fn get_expiry_date(&self) -> u64 {
296 let expiry_raw = OffsetDateTime::now_utc() + self.ingress_expiry;
297 let mut rounded = expiry_raw.replace_nanosecond(0).unwrap();
298 if self.ingress_expiry.as_secs() > 90 {
299 rounded = rounded.replace_second(0).unwrap();
300 }
301 rounded.unix_timestamp_nanos().try_into().unwrap()
302 }
303
304 pub fn get_principal(&self) -> Result<Principal, String> {
306 self.identity.sender()
307 }
308
309 async fn query_endpoint<A>(
310 &self,
311 effective_canister_id: Principal,
312 serialized_bytes: Vec<u8>,
313 ) -> Result<A, AgentError>
314 where
315 A: serde::de::DeserializeOwned,
316 {
317 let _permit = self.concurrent_requests_semaphore.acquire().await;
318 let bytes = self
319 .execute(
320 Method::POST,
321 &format!("api/v2/canister/{}/query", effective_canister_id.to_text()),
322 Some(serialized_bytes),
323 )
324 .await?
325 .1;
326 serde_cbor::from_slice(&bytes).map_err(AgentError::InvalidCborData)
327 }
328
329 async fn read_state_endpoint<A>(
330 &self,
331 effective_canister_id: Principal,
332 serialized_bytes: Vec<u8>,
333 ) -> Result<A, AgentError>
334 where
335 A: serde::de::DeserializeOwned,
336 {
337 let _permit = self.concurrent_requests_semaphore.acquire().await;
338 let endpoint = format!(
339 "api/v2/canister/{}/read_state",
340 effective_canister_id.to_text()
341 );
342 let bytes = self
343 .execute(Method::POST, &endpoint, Some(serialized_bytes))
344 .await?
345 .1;
346 serde_cbor::from_slice(&bytes).map_err(AgentError::InvalidCborData)
347 }
348
349 async fn read_subnet_state_endpoint<A>(
350 &self,
351 subnet_id: Principal,
352 serialized_bytes: Vec<u8>,
353 ) -> Result<A, AgentError>
354 where
355 A: serde::de::DeserializeOwned,
356 {
357 let _permit = self.concurrent_requests_semaphore.acquire().await;
358 let endpoint = format!("api/v2/subnet/{}/read_state", subnet_id.to_text());
359 let bytes = self
360 .execute(Method::POST, &endpoint, Some(serialized_bytes))
361 .await?
362 .1;
363 serde_cbor::from_slice(&bytes).map_err(AgentError::InvalidCborData)
364 }
365
366 async fn call_endpoint(
367 &self,
368 effective_canister_id: Principal,
369 serialized_bytes: Vec<u8>,
370 ) -> Result<TransportCallResponse, AgentError> {
371 let _permit = self.concurrent_requests_semaphore.acquire().await;
372 let endpoint = format!("api/v3/canister/{}/call", effective_canister_id.to_text());
373 let (status_code, response_body) = self
374 .execute(Method::POST, &endpoint, Some(serialized_bytes))
375 .await?;
376
377 if status_code == StatusCode::ACCEPTED {
378 return Ok(TransportCallResponse::Accepted);
379 }
380
381 serde_cbor::from_slice(&response_body).map_err(AgentError::InvalidCborData)
382 }
383
384 #[allow(clippy::too_many_arguments)]
387 async fn query_raw(
388 &self,
389 canister_id: Principal,
390 effective_canister_id: Principal,
391 method_name: String,
392 arg: Vec<u8>,
393 ingress_expiry_datetime: Option<u64>,
394 use_nonce: bool,
395 explicit_verify_query_signatures: Option<bool>,
396 ) -> Result<Vec<u8>, AgentError> {
397 let operation = Operation::Call {
398 canister: canister_id,
399 method: method_name.clone(),
400 };
401 let content = self.query_content(
402 canister_id,
403 method_name,
404 arg,
405 ingress_expiry_datetime,
406 use_nonce,
407 )?;
408 let serialized_bytes = sign_envelope(&content, self.identity.clone())?;
409 self.query_inner(
410 effective_canister_id,
411 serialized_bytes,
412 content.to_request_id(),
413 explicit_verify_query_signatures,
414 operation,
415 )
416 .await
417 }
418
419 pub async fn query_signed(
423 &self,
424 effective_canister_id: Principal,
425 signed_query: Vec<u8>,
426 ) -> Result<Vec<u8>, AgentError> {
427 let envelope: Envelope =
428 serde_cbor::from_slice(&signed_query).map_err(AgentError::InvalidCborData)?;
429 let EnvelopeContent::Query {
430 canister_id,
431 method_name,
432 ..
433 } = &*envelope.content
434 else {
435 return Err(AgentError::CallDataMismatch {
436 field: "request_type".to_string(),
437 value_arg: "query".to_string(),
438 value_cbor: if matches!(*envelope.content, EnvelopeContent::Call { .. }) {
439 "update"
440 } else {
441 "read_state"
442 }
443 .to_string(),
444 });
445 };
446 let operation = Operation::Call {
447 canister: *canister_id,
448 method: method_name.clone(),
449 };
450 self.query_inner(
451 effective_canister_id,
452 signed_query,
453 envelope.content.to_request_id(),
454 None,
455 operation,
456 )
457 .await
458 }
459
460 async fn query_inner(
464 &self,
465 effective_canister_id: Principal,
466 signed_query: Vec<u8>,
467 request_id: RequestId,
468 explicit_verify_query_signatures: Option<bool>,
469 operation: Operation,
470 ) -> Result<Vec<u8>, AgentError> {
471 let response = if explicit_verify_query_signatures.unwrap_or(self.verify_query_signatures) {
472 let (response, mut subnet) = futures_util::try_join!(
473 self.query_endpoint::<QueryResponse>(effective_canister_id, signed_query),
474 self.get_subnet_by_canister(&effective_canister_id)
475 )?;
476 if response.signatures().is_empty() {
477 return Err(AgentError::MissingSignature);
478 } else if response.signatures().len() > subnet.node_keys.len() {
479 return Err(AgentError::TooManySignatures {
480 had: response.signatures().len(),
481 needed: subnet.node_keys.len(),
482 });
483 }
484 for signature in response.signatures() {
485 if OffsetDateTime::now_utc()
486 - OffsetDateTime::from_unix_timestamp_nanos(signature.timestamp.into()).unwrap()
487 > self.ingress_expiry
488 {
489 return Err(AgentError::CertificateOutdated(self.ingress_expiry));
490 }
491 let signable = response.signable(request_id, signature.timestamp);
492 let node_key = if let Some(node_key) = subnet.node_keys.get(&signature.identity) {
493 node_key
494 } else {
495 subnet = self
496 .fetch_subnet_by_canister(&effective_canister_id)
497 .await?;
498 subnet
499 .node_keys
500 .get(&signature.identity)
501 .ok_or(AgentError::CertificateNotAuthorized())?
502 };
503 if node_key.len() != 44 {
504 return Err(AgentError::DerKeyLengthMismatch {
505 expected: 44,
506 actual: node_key.len(),
507 });
508 }
509 const DER_PREFIX: [u8; 12] = [48, 42, 48, 5, 6, 3, 43, 101, 112, 3, 33, 0];
510 if node_key[..12] != DER_PREFIX {
511 return Err(AgentError::DerPrefixMismatch {
512 expected: DER_PREFIX.to_vec(),
513 actual: node_key[..12].to_vec(),
514 });
515 }
516 let pubkey = PublicKey::deserialize_raw(&node_key[12..])
517 .map_err(|_| AgentError::MalformedPublicKey)?;
518
519 match pubkey.verify_signature(&signable, &signature.signature[..]) {
520 Ok(()) => (),
521 Err(SignatureError::InvalidSignature) => {
522 return Err(AgentError::QuerySignatureVerificationFailed)
523 }
524 Err(SignatureError::InvalidLength) => {
525 return Err(AgentError::MalformedSignature)
526 }
527 _ => unreachable!(),
528 }
529 }
530 response
531 } else {
532 self.query_endpoint::<QueryResponse>(effective_canister_id, signed_query)
533 .await?
534 };
535
536 match response {
537 QueryResponse::Replied { reply, .. } => Ok(reply.arg),
538 QueryResponse::Rejected { reject, .. } => Err(AgentError::UncertifiedReject {
539 reject,
540 operation: Some(operation),
541 }),
542 }
543 }
544
545 fn query_content(
546 &self,
547 canister_id: Principal,
548 method_name: String,
549 arg: Vec<u8>,
550 ingress_expiry_datetime: Option<u64>,
551 use_nonce: bool,
552 ) -> Result<EnvelopeContent, AgentError> {
553 Ok(EnvelopeContent::Query {
554 sender: self.identity.sender().map_err(AgentError::SigningError)?,
555 canister_id,
556 method_name,
557 arg,
558 ingress_expiry: ingress_expiry_datetime.unwrap_or_else(|| self.get_expiry_date()),
559 nonce: use_nonce.then(|| self.nonce_factory.generate()).flatten(),
560 })
561 }
562
563 async fn update_raw(
565 &self,
566 canister_id: Principal,
567 effective_canister_id: Principal,
568 method_name: String,
569 arg: Vec<u8>,
570 ingress_expiry_datetime: Option<u64>,
571 ) -> Result<CallResponse<(Vec<u8>, Certificate)>, AgentError> {
572 let nonce = self.nonce_factory.generate();
573 let content = self.update_content(
574 canister_id,
575 method_name.clone(),
576 arg,
577 ingress_expiry_datetime,
578 nonce,
579 )?;
580 let operation = Some(Operation::Call {
581 canister: canister_id,
582 method: method_name,
583 });
584 let request_id = to_request_id(&content)?;
585 let serialized_bytes = sign_envelope(&content, self.identity.clone())?;
586
587 let response_body = self
588 .call_endpoint(effective_canister_id, serialized_bytes)
589 .await?;
590
591 match response_body {
592 TransportCallResponse::Replied { certificate } => {
593 let certificate =
594 serde_cbor::from_slice(&certificate).map_err(AgentError::InvalidCborData)?;
595
596 self.verify(&certificate, effective_canister_id)?;
597 let status = lookup_request_status(&certificate, &request_id)?;
598
599 match status {
600 RequestStatusResponse::Replied(reply) => {
601 Ok(CallResponse::Response((reply.arg, certificate)))
602 }
603 RequestStatusResponse::Rejected(reject_response) => {
604 Err(AgentError::CertifiedReject {
605 reject: reject_response,
606 operation,
607 })?
608 }
609 _ => Ok(CallResponse::Poll(request_id)),
610 }
611 }
612 TransportCallResponse::Accepted => Ok(CallResponse::Poll(request_id)),
613 TransportCallResponse::NonReplicatedRejection(reject_response) => {
614 Err(AgentError::UncertifiedReject {
615 reject: reject_response,
616 operation,
617 })
618 }
619 }
620 }
621
622 pub async fn update_signed(
626 &self,
627 effective_canister_id: Principal,
628 signed_update: Vec<u8>,
629 ) -> Result<CallResponse<Vec<u8>>, AgentError> {
630 let envelope: Envelope =
631 serde_cbor::from_slice(&signed_update).map_err(AgentError::InvalidCborData)?;
632 let EnvelopeContent::Call {
633 canister_id,
634 method_name,
635 ..
636 } = &*envelope.content
637 else {
638 return Err(AgentError::CallDataMismatch {
639 field: "request_type".to_string(),
640 value_arg: "update".to_string(),
641 value_cbor: if matches!(*envelope.content, EnvelopeContent::Query { .. }) {
642 "query"
643 } else {
644 "read_state"
645 }
646 .to_string(),
647 });
648 };
649 let operation = Some(Operation::Call {
650 canister: *canister_id,
651 method: method_name.clone(),
652 });
653 let request_id = to_request_id(&envelope.content)?;
654
655 let response_body = self
656 .call_endpoint(effective_canister_id, signed_update)
657 .await?;
658
659 match response_body {
660 TransportCallResponse::Replied { certificate } => {
661 let certificate =
662 serde_cbor::from_slice(&certificate).map_err(AgentError::InvalidCborData)?;
663
664 self.verify(&certificate, effective_canister_id)?;
665 let status = lookup_request_status(&certificate, &request_id)?;
666
667 match status {
668 RequestStatusResponse::Replied(reply) => Ok(CallResponse::Response(reply.arg)),
669 RequestStatusResponse::Rejected(reject_response) => {
670 Err(AgentError::CertifiedReject {
671 reject: reject_response,
672 operation,
673 })?
674 }
675 _ => Ok(CallResponse::Poll(request_id)),
676 }
677 }
678 TransportCallResponse::Accepted => Ok(CallResponse::Poll(request_id)),
679 TransportCallResponse::NonReplicatedRejection(reject_response) => {
680 Err(AgentError::UncertifiedReject {
681 reject: reject_response,
682 operation,
683 })
684 }
685 }
686 }
687
688 fn update_content(
689 &self,
690 canister_id: Principal,
691 method_name: String,
692 arg: Vec<u8>,
693 ingress_expiry_datetime: Option<u64>,
694 nonce: Option<Vec<u8>>,
695 ) -> Result<EnvelopeContent, AgentError> {
696 Ok(EnvelopeContent::Call {
697 canister_id,
698 method_name,
699 arg,
700 nonce,
701 sender: self.identity.sender().map_err(AgentError::SigningError)?,
702 ingress_expiry: ingress_expiry_datetime.unwrap_or_else(|| self.get_expiry_date()),
703 })
704 }
705
706 fn get_retry_policy(&self) -> ExponentialBackoff<SystemClock> {
707 ExponentialBackoffBuilder::new()
708 .with_initial_interval(Duration::from_millis(500))
709 .with_max_interval(Duration::from_secs(1))
710 .with_multiplier(1.4)
711 .with_max_elapsed_time(Some(self.max_polling_time))
712 .build()
713 }
714
715 pub async fn wait_signed(
717 &self,
718 request_id: &RequestId,
719 effective_canister_id: Principal,
720 signed_request_status: Vec<u8>,
721 ) -> Result<(Vec<u8>, Certificate), AgentError> {
722 let mut retry_policy = self.get_retry_policy();
723
724 let mut request_accepted = false;
725 let (resp, cert) = self
726 .request_status_signed(
727 request_id,
728 effective_canister_id,
729 signed_request_status.clone(),
730 )
731 .await?;
732 loop {
733 match resp {
734 RequestStatusResponse::Unknown => {}
735
736 RequestStatusResponse::Received | RequestStatusResponse::Processing => {
737 if !request_accepted {
738 retry_policy.reset();
739 request_accepted = true;
740 }
741 }
742
743 RequestStatusResponse::Replied(ReplyResponse { arg, .. }) => {
744 return Ok((arg, cert))
745 }
746
747 RequestStatusResponse::Rejected(response) => {
748 return Err(AgentError::CertifiedReject {
749 reject: response,
750 operation: None,
751 })
752 }
753
754 RequestStatusResponse::Done => {
755 return Err(AgentError::RequestStatusDoneNoReply(String::from(
756 *request_id,
757 )))
758 }
759 };
760
761 match retry_policy.next_backoff() {
762 Some(duration) => crate::util::sleep(duration).await,
763
764 None => return Err(AgentError::TimeoutWaitingForResponse()),
765 }
766 }
767 }
768
769 pub async fn wait(
771 &self,
772 request_id: &RequestId,
773 effective_canister_id: Principal,
774 ) -> Result<(Vec<u8>, Certificate), AgentError> {
775 self.wait_inner(request_id, effective_canister_id, None)
776 .await
777 }
778
779 async fn wait_inner(
780 &self,
781 request_id: &RequestId,
782 effective_canister_id: Principal,
783 operation: Option<Operation>,
784 ) -> Result<(Vec<u8>, Certificate), AgentError> {
785 let mut retry_policy = self.get_retry_policy();
786
787 let mut request_accepted = false;
788 loop {
789 let (resp, cert) = self
790 .request_status_raw(request_id, effective_canister_id)
791 .await?;
792 match resp {
793 RequestStatusResponse::Unknown => {}
794
795 RequestStatusResponse::Received | RequestStatusResponse::Processing => {
796 if !request_accepted {
797 retry_policy.reset();
805 request_accepted = true;
806 }
807 }
808
809 RequestStatusResponse::Replied(ReplyResponse { arg, .. }) => {
810 return Ok((arg, cert))
811 }
812
813 RequestStatusResponse::Rejected(response) => {
814 return Err(AgentError::CertifiedReject {
815 reject: response,
816 operation,
817 })
818 }
819
820 RequestStatusResponse::Done => {
821 return Err(AgentError::RequestStatusDoneNoReply(String::from(
822 *request_id,
823 )))
824 }
825 };
826
827 match retry_policy.next_backoff() {
828 Some(duration) => crate::util::sleep(duration).await,
829
830 None => return Err(AgentError::TimeoutWaitingForResponse()),
831 }
832 }
833 }
834
835 pub async fn read_state_raw(
838 &self,
839 paths: Vec<Vec<Label>>,
840 effective_canister_id: Principal,
841 ) -> Result<Certificate, AgentError> {
842 let content = self.read_state_content(paths)?;
843 let serialized_bytes = sign_envelope(&content, self.identity.clone())?;
844
845 let read_state_response: ReadStateResponse = self
846 .read_state_endpoint(effective_canister_id, serialized_bytes)
847 .await?;
848 let cert: Certificate = serde_cbor::from_slice(&read_state_response.certificate)
849 .map_err(AgentError::InvalidCborData)?;
850 self.verify(&cert, effective_canister_id)?;
851 Ok(cert)
852 }
853
854 pub async fn read_subnet_state_raw(
857 &self,
858 paths: Vec<Vec<Label>>,
859 subnet_id: Principal,
860 ) -> Result<Certificate, AgentError> {
861 let content = self.read_state_content(paths)?;
862 let serialized_bytes = sign_envelope(&content, self.identity.clone())?;
863
864 let read_state_response: ReadStateResponse = self
865 .read_subnet_state_endpoint(subnet_id, serialized_bytes)
866 .await?;
867 let cert: Certificate = serde_cbor::from_slice(&read_state_response.certificate)
868 .map_err(AgentError::InvalidCborData)?;
869 self.verify_for_subnet(&cert, subnet_id)?;
870 Ok(cert)
871 }
872
873 fn read_state_content(&self, paths: Vec<Vec<Label>>) -> Result<EnvelopeContent, AgentError> {
874 Ok(EnvelopeContent::ReadState {
875 sender: self.identity.sender().map_err(AgentError::SigningError)?,
876 paths,
877 ingress_expiry: self.get_expiry_date(),
878 })
879 }
880
881 pub fn verify(
884 &self,
885 cert: &Certificate,
886 effective_canister_id: Principal,
887 ) -> Result<(), AgentError> {
888 self.verify_cert(cert, effective_canister_id)?;
889 self.verify_cert_timestamp(cert)?;
890 Ok(())
891 }
892
893 fn verify_cert(
894 &self,
895 cert: &Certificate,
896 effective_canister_id: Principal,
897 ) -> Result<(), AgentError> {
898 let sig = &cert.signature;
899
900 let root_hash = cert.tree.digest();
901 let mut msg = vec![];
902 msg.extend_from_slice(IC_STATE_ROOT_DOMAIN_SEPARATOR);
903 msg.extend_from_slice(&root_hash);
904
905 let der_key = self.check_delegation(&cert.delegation, effective_canister_id)?;
906 let key = extract_der(der_key)?;
907
908 ic_verify_bls_signature::verify_bls_signature(sig, &msg, &key)
909 .map_err(|_| AgentError::CertificateVerificationFailed())?;
910 Ok(())
911 }
912
913 pub fn verify_for_subnet(
916 &self,
917 cert: &Certificate,
918 subnet_id: Principal,
919 ) -> Result<(), AgentError> {
920 self.verify_cert_for_subnet(cert, subnet_id)?;
921 self.verify_cert_timestamp(cert)?;
922 Ok(())
923 }
924
925 fn verify_cert_for_subnet(
926 &self,
927 cert: &Certificate,
928 subnet_id: Principal,
929 ) -> Result<(), AgentError> {
930 let sig = &cert.signature;
931
932 let root_hash = cert.tree.digest();
933 let mut msg = vec![];
934 msg.extend_from_slice(IC_STATE_ROOT_DOMAIN_SEPARATOR);
935 msg.extend_from_slice(&root_hash);
936
937 let der_key = self.check_delegation_for_subnet(&cert.delegation, subnet_id)?;
938 let key = extract_der(der_key)?;
939
940 ic_verify_bls_signature::verify_bls_signature(sig, &msg, &key)
941 .map_err(|_| AgentError::CertificateVerificationFailed())?;
942 Ok(())
943 }
944
945 fn verify_cert_timestamp(&self, cert: &Certificate) -> Result<(), AgentError> {
946 let time = lookup_time(cert)?;
947 if (OffsetDateTime::now_utc()
948 - OffsetDateTime::from_unix_timestamp_nanos(time.into()).unwrap())
949 .abs()
950 > self.ingress_expiry
951 {
952 Err(AgentError::CertificateOutdated(self.ingress_expiry))
953 } else {
954 Ok(())
955 }
956 }
957
958 fn check_delegation(
959 &self,
960 delegation: &Option<Delegation>,
961 effective_canister_id: Principal,
962 ) -> Result<Vec<u8>, AgentError> {
963 match delegation {
964 None => Ok(self.read_root_key()),
965 Some(delegation) => {
966 let cert: Certificate = serde_cbor::from_slice(&delegation.certificate)
967 .map_err(AgentError::InvalidCborData)?;
968 if cert.delegation.is_some() {
969 return Err(AgentError::CertificateHasTooManyDelegations);
970 }
971 self.verify_cert(&cert, effective_canister_id)?;
972 let canister_range_lookup = [
973 "subnet".as_bytes(),
974 delegation.subnet_id.as_ref(),
975 "canister_ranges".as_bytes(),
976 ];
977 let canister_range = lookup_value(&cert.tree, canister_range_lookup)?;
978 let ranges: Vec<(Principal, Principal)> =
979 serde_cbor::from_slice(canister_range).map_err(AgentError::InvalidCborData)?;
980 if !principal_is_within_ranges(&effective_canister_id, &ranges[..]) {
981 return Err(AgentError::CertificateNotAuthorized());
983 }
984
985 let public_key_path = [
986 "subnet".as_bytes(),
987 delegation.subnet_id.as_ref(),
988 "public_key".as_bytes(),
989 ];
990 lookup_value(&cert.tree, public_key_path).map(<[u8]>::to_vec)
991 }
992 }
993 }
994
995 fn check_delegation_for_subnet(
996 &self,
997 delegation: &Option<Delegation>,
998 subnet_id: Principal,
999 ) -> Result<Vec<u8>, AgentError> {
1000 match delegation {
1001 None => Ok(self.read_root_key()),
1002 Some(delegation) => {
1003 let cert: Certificate = serde_cbor::from_slice(&delegation.certificate)
1004 .map_err(AgentError::InvalidCborData)?;
1005 if cert.delegation.is_some() {
1006 return Err(AgentError::CertificateHasTooManyDelegations);
1007 }
1008 self.verify_cert_for_subnet(&cert, subnet_id)?;
1009 let public_key_path = [
1010 "subnet".as_bytes(),
1011 delegation.subnet_id.as_ref(),
1012 "public_key".as_bytes(),
1013 ];
1014 let pk = lookup_value(&cert.tree, public_key_path)
1015 .map_err(|_| AgentError::CertificateNotAuthorized())?
1016 .to_vec();
1017 Ok(pk)
1018 }
1019 }
1020 }
1021
1022 pub async fn read_state_canister_info(
1025 &self,
1026 canister_id: Principal,
1027 path: &str,
1028 ) -> Result<Vec<u8>, AgentError> {
1029 let paths: Vec<Vec<Label>> = vec![vec![
1030 "canister".into(),
1031 Label::from_bytes(canister_id.as_slice()),
1032 path.into(),
1033 ]];
1034
1035 let cert = self.read_state_raw(paths, canister_id).await?;
1036
1037 lookup_canister_info(cert, canister_id, path)
1038 }
1039
1040 pub async fn read_state_canister_controllers(
1042 &self,
1043 canister_id: Principal,
1044 ) -> Result<Vec<Principal>, AgentError> {
1045 let blob = self
1046 .read_state_canister_info(canister_id, "controllers")
1047 .await?;
1048 let controllers: Vec<Principal> =
1049 serde_cbor::from_slice(&blob).map_err(AgentError::InvalidCborData)?;
1050 Ok(controllers)
1051 }
1052
1053 pub async fn read_state_canister_module_hash(
1055 &self,
1056 canister_id: Principal,
1057 ) -> Result<Vec<u8>, AgentError> {
1058 self.read_state_canister_info(canister_id, "module_hash")
1059 .await
1060 }
1061
1062 pub async fn read_state_canister_metadata(
1064 &self,
1065 canister_id: Principal,
1066 path: &str,
1067 ) -> Result<Vec<u8>, AgentError> {
1068 let paths: Vec<Vec<Label>> = vec![vec![
1069 "canister".into(),
1070 Label::from_bytes(canister_id.as_slice()),
1071 "metadata".into(),
1072 path.into(),
1073 ]];
1074
1075 let cert = self.read_state_raw(paths, canister_id).await?;
1076
1077 lookup_canister_metadata(cert, canister_id, path)
1078 }
1079
1080 pub async fn read_state_subnet_metrics(
1082 &self,
1083 subnet_id: Principal,
1084 ) -> Result<SubnetMetrics, AgentError> {
1085 let paths = vec![vec![
1086 "subnet".into(),
1087 Label::from_bytes(subnet_id.as_slice()),
1088 "metrics".into(),
1089 ]];
1090 let cert = self.read_subnet_state_raw(paths, subnet_id).await?;
1091 lookup_subnet_metrics(cert, subnet_id)
1092 }
1093
1094 pub async fn request_status_raw(
1096 &self,
1097 request_id: &RequestId,
1098 effective_canister_id: Principal,
1099 ) -> Result<(RequestStatusResponse, Certificate), AgentError> {
1100 let paths: Vec<Vec<Label>> =
1101 vec![vec!["request_status".into(), request_id.to_vec().into()]];
1102
1103 let cert = self.read_state_raw(paths, effective_canister_id).await?;
1104
1105 Ok((lookup_request_status(&cert, request_id)?, cert))
1106 }
1107
1108 pub async fn request_status_signed(
1112 &self,
1113 request_id: &RequestId,
1114 effective_canister_id: Principal,
1115 signed_request_status: Vec<u8>,
1116 ) -> Result<(RequestStatusResponse, Certificate), AgentError> {
1117 let _envelope: Envelope =
1118 serde_cbor::from_slice(&signed_request_status).map_err(AgentError::InvalidCborData)?;
1119 let read_state_response: ReadStateResponse = self
1120 .read_state_endpoint(effective_canister_id, signed_request_status)
1121 .await?;
1122
1123 let cert: Certificate = serde_cbor::from_slice(&read_state_response.certificate)
1124 .map_err(AgentError::InvalidCborData)?;
1125 self.verify(&cert, effective_canister_id)?;
1126 Ok((lookup_request_status(&cert, request_id)?, cert))
1127 }
1128
1129 pub fn update<S: Into<String>>(
1132 &self,
1133 canister_id: &Principal,
1134 method_name: S,
1135 ) -> UpdateBuilder {
1136 UpdateBuilder::new(self, *canister_id, method_name.into())
1137 }
1138
1139 pub async fn status(&self) -> Result<Status, AgentError> {
1141 let endpoint = "api/v2/status";
1142 let bytes = self.execute(Method::GET, endpoint, None).await?.1;
1143
1144 let cbor: serde_cbor::Value =
1145 serde_cbor::from_slice(&bytes).map_err(AgentError::InvalidCborData)?;
1146
1147 Status::try_from(&cbor).map_err(|_| AgentError::InvalidReplicaStatus)
1148 }
1149
1150 pub fn query<S: Into<String>>(&self, canister_id: &Principal, method_name: S) -> QueryBuilder {
1153 QueryBuilder::new(self, *canister_id, method_name.into())
1154 }
1155
1156 pub fn sign_request_status(
1159 &self,
1160 effective_canister_id: Principal,
1161 request_id: RequestId,
1162 ) -> Result<SignedRequestStatus, AgentError> {
1163 let paths: Vec<Vec<Label>> =
1164 vec![vec!["request_status".into(), request_id.to_vec().into()]];
1165 let read_state_content = self.read_state_content(paths)?;
1166 let signed_request_status = sign_envelope(&read_state_content, self.identity.clone())?;
1167 let ingress_expiry = read_state_content.ingress_expiry();
1168 let sender = *read_state_content.sender();
1169 Ok(SignedRequestStatus {
1170 ingress_expiry,
1171 sender,
1172 effective_canister_id,
1173 request_id,
1174 signed_request_status,
1175 })
1176 }
1177
1178 async fn get_subnet_by_canister(
1179 &self,
1180 canister: &Principal,
1181 ) -> Result<Arc<Subnet>, AgentError> {
1182 let subnet = self
1183 .subnet_key_cache
1184 .lock()
1185 .unwrap()
1186 .get_subnet_by_canister(canister);
1187 if let Some(subnet) = subnet {
1188 Ok(subnet)
1189 } else {
1190 self.fetch_subnet_by_canister(canister).await
1191 }
1192 }
1193
1194 pub async fn fetch_api_boundary_nodes_by_canister_id(
1196 &self,
1197 canister_id: Principal,
1198 ) -> Result<Vec<ApiBoundaryNode>, AgentError> {
1199 let paths = vec![vec!["api_boundary_nodes".into()]];
1200 let certificate = self.read_state_raw(paths, canister_id).await?;
1201 let api_boundary_nodes = lookup_api_boundary_nodes(certificate)?;
1202 Ok(api_boundary_nodes)
1203 }
1204
1205 pub async fn fetch_api_boundary_nodes_by_subnet_id(
1207 &self,
1208 subnet_id: Principal,
1209 ) -> Result<Vec<ApiBoundaryNode>, AgentError> {
1210 let paths = vec![vec!["api_boundary_nodes".into()]];
1211 let certificate = self.read_subnet_state_raw(paths, subnet_id).await?;
1212 let api_boundary_nodes = lookup_api_boundary_nodes(certificate)?;
1213 Ok(api_boundary_nodes)
1214 }
1215
1216 async fn fetch_subnet_by_canister(
1217 &self,
1218 canister: &Principal,
1219 ) -> Result<Arc<Subnet>, AgentError> {
1220 let cert = self
1221 .read_state_raw(vec![vec!["subnet".into()]], *canister)
1222 .await?;
1223
1224 let (subnet_id, subnet) = lookup_subnet(&cert, &self.root_key.read().unwrap())?;
1225 let subnet = Arc::new(subnet);
1226 self.subnet_key_cache
1227 .lock()
1228 .unwrap()
1229 .insert_subnet(subnet_id, subnet.clone());
1230 Ok(subnet)
1231 }
1232
1233 async fn request(
1234 &self,
1235 method: Method,
1236 endpoint: &str,
1237 body: Option<Vec<u8>>,
1238 ) -> Result<(StatusCode, HeaderMap, Vec<u8>), AgentError> {
1239 let create_request_with_generated_url = || -> Result<Request, AgentError> {
1240 let url = self.route_provider.route()?.join(endpoint)?;
1241 let mut http_request = Request::new(method.clone(), url);
1242 http_request
1243 .headers_mut()
1244 .insert(CONTENT_TYPE, "application/cbor".parse().unwrap());
1245 *http_request.body_mut() = body.clone().map(Body::from);
1246 Ok(http_request)
1247 };
1248
1249 let response = self
1250 .client
1251 .call(
1252 &create_request_with_generated_url,
1253 self.max_tcp_error_retries,
1254 )
1255 .await?;
1256
1257 let http_status = response.status();
1258 let response_headers = response.headers().clone();
1259
1260 if matches!(self
1262 .max_response_body_size
1263 .zip(response.content_length()), Some((size_limit, content_length)) if content_length > size_limit as u64)
1264 {
1265 return Err(AgentError::ResponseSizeExceededLimit());
1266 }
1267
1268 let mut body: Vec<u8> = response
1269 .content_length()
1270 .map_or_else(Vec::new, |n| Vec::with_capacity(n as usize));
1271
1272 let mut stream = response.bytes_stream();
1273
1274 while let Some(chunk) = stream.next().await {
1275 let chunk = chunk?;
1276
1277 if matches!(self
1279 .max_response_body_size, Some(size_limit) if body.len() + chunk.len() > size_limit)
1280 {
1281 return Err(AgentError::ResponseSizeExceededLimit());
1282 }
1283
1284 body.extend_from_slice(chunk.as_ref());
1285 }
1286
1287 Ok((http_status, response_headers, body))
1288 }
1289
1290 async fn execute(
1291 &self,
1292 method: Method,
1293 endpoint: &str,
1294 body: Option<Vec<u8>>,
1295 ) -> Result<(StatusCode, Vec<u8>), AgentError> {
1296 let request_result = self.request(method.clone(), endpoint, body.clone()).await?;
1297
1298 let status = request_result.0;
1299 let headers = request_result.1;
1300 let body = request_result.2;
1301
1302 if status.is_client_error() || status.is_server_error() {
1303 Err(AgentError::HttpError(HttpErrorPayload {
1304 status: status.into(),
1305 content_type: headers
1306 .get(CONTENT_TYPE)
1307 .and_then(|value| value.to_str().ok())
1308 .map(str::to_string),
1309 content: body,
1310 }))
1311 } else if !(status == StatusCode::OK || status == StatusCode::ACCEPTED) {
1312 Err(AgentError::InvalidHttpResponse(format!(
1313 "Expected `200`, `202`, 4xx`, or `5xx` HTTP status code. Got: {status}",
1314 )))
1315 } else {
1316 Ok((status, body))
1317 }
1318 }
1319}
1320
1321fn principal_is_within_ranges(principal: &Principal, ranges: &[(Principal, Principal)]) -> bool {
1324 ranges
1325 .iter()
1326 .any(|r| principal >= &r.0 && principal <= &r.1)
1327}
1328
1329fn sign_envelope(
1330 content: &EnvelopeContent,
1331 identity: Arc<dyn Identity>,
1332) -> Result<Vec<u8>, AgentError> {
1333 let signature = identity.sign(content).map_err(AgentError::SigningError)?;
1334
1335 let envelope = Envelope {
1336 content: Cow::Borrowed(content),
1337 sender_pubkey: signature.public_key,
1338 sender_sig: signature.signature,
1339 sender_delegation: signature.delegations,
1340 };
1341
1342 let mut serialized_bytes = Vec::new();
1343 let mut serializer = serde_cbor::Serializer::new(&mut serialized_bytes);
1344 serializer.self_describe()?;
1345 envelope.serialize(&mut serializer)?;
1346
1347 Ok(serialized_bytes)
1348}
1349
1350pub fn signed_query_inspect(
1353 sender: Principal,
1354 canister_id: Principal,
1355 method_name: &str,
1356 arg: &[u8],
1357 ingress_expiry: u64,
1358 signed_query: Vec<u8>,
1359) -> Result<(), AgentError> {
1360 let envelope: Envelope =
1361 serde_cbor::from_slice(&signed_query).map_err(AgentError::InvalidCborData)?;
1362 match envelope.content.as_ref() {
1363 EnvelopeContent::Query {
1364 ingress_expiry: ingress_expiry_cbor,
1365 sender: sender_cbor,
1366 canister_id: canister_id_cbor,
1367 method_name: method_name_cbor,
1368 arg: arg_cbor,
1369 nonce: _nonce,
1370 } => {
1371 if ingress_expiry != *ingress_expiry_cbor {
1372 return Err(AgentError::CallDataMismatch {
1373 field: "ingress_expiry".to_string(),
1374 value_arg: ingress_expiry.to_string(),
1375 value_cbor: ingress_expiry_cbor.to_string(),
1376 });
1377 }
1378 if sender != *sender_cbor {
1379 return Err(AgentError::CallDataMismatch {
1380 field: "sender".to_string(),
1381 value_arg: sender.to_string(),
1382 value_cbor: sender_cbor.to_string(),
1383 });
1384 }
1385 if canister_id != *canister_id_cbor {
1386 return Err(AgentError::CallDataMismatch {
1387 field: "canister_id".to_string(),
1388 value_arg: canister_id.to_string(),
1389 value_cbor: canister_id_cbor.to_string(),
1390 });
1391 }
1392 if method_name != *method_name_cbor {
1393 return Err(AgentError::CallDataMismatch {
1394 field: "method_name".to_string(),
1395 value_arg: method_name.to_string(),
1396 value_cbor: method_name_cbor.clone(),
1397 });
1398 }
1399 if arg != *arg_cbor {
1400 return Err(AgentError::CallDataMismatch {
1401 field: "arg".to_string(),
1402 value_arg: format!("{arg:?}"),
1403 value_cbor: format!("{arg_cbor:?}"),
1404 });
1405 }
1406 }
1407 EnvelopeContent::Call { .. } => {
1408 return Err(AgentError::CallDataMismatch {
1409 field: "request_type".to_string(),
1410 value_arg: "query".to_string(),
1411 value_cbor: "call".to_string(),
1412 })
1413 }
1414 EnvelopeContent::ReadState { .. } => {
1415 return Err(AgentError::CallDataMismatch {
1416 field: "request_type".to_string(),
1417 value_arg: "query".to_string(),
1418 value_cbor: "read_state".to_string(),
1419 })
1420 }
1421 }
1422 Ok(())
1423}
1424
1425pub fn signed_update_inspect(
1428 sender: Principal,
1429 canister_id: Principal,
1430 method_name: &str,
1431 arg: &[u8],
1432 ingress_expiry: u64,
1433 signed_update: Vec<u8>,
1434) -> Result<(), AgentError> {
1435 let envelope: Envelope =
1436 serde_cbor::from_slice(&signed_update).map_err(AgentError::InvalidCborData)?;
1437 match envelope.content.as_ref() {
1438 EnvelopeContent::Call {
1439 nonce: _nonce,
1440 ingress_expiry: ingress_expiry_cbor,
1441 sender: sender_cbor,
1442 canister_id: canister_id_cbor,
1443 method_name: method_name_cbor,
1444 arg: arg_cbor,
1445 } => {
1446 if ingress_expiry != *ingress_expiry_cbor {
1447 return Err(AgentError::CallDataMismatch {
1448 field: "ingress_expiry".to_string(),
1449 value_arg: ingress_expiry.to_string(),
1450 value_cbor: ingress_expiry_cbor.to_string(),
1451 });
1452 }
1453 if sender != *sender_cbor {
1454 return Err(AgentError::CallDataMismatch {
1455 field: "sender".to_string(),
1456 value_arg: sender.to_string(),
1457 value_cbor: sender_cbor.to_string(),
1458 });
1459 }
1460 if canister_id != *canister_id_cbor {
1461 return Err(AgentError::CallDataMismatch {
1462 field: "canister_id".to_string(),
1463 value_arg: canister_id.to_string(),
1464 value_cbor: canister_id_cbor.to_string(),
1465 });
1466 }
1467 if method_name != *method_name_cbor {
1468 return Err(AgentError::CallDataMismatch {
1469 field: "method_name".to_string(),
1470 value_arg: method_name.to_string(),
1471 value_cbor: method_name_cbor.clone(),
1472 });
1473 }
1474 if arg != *arg_cbor {
1475 return Err(AgentError::CallDataMismatch {
1476 field: "arg".to_string(),
1477 value_arg: format!("{arg:?}"),
1478 value_cbor: format!("{arg_cbor:?}"),
1479 });
1480 }
1481 }
1482 EnvelopeContent::ReadState { .. } => {
1483 return Err(AgentError::CallDataMismatch {
1484 field: "request_type".to_string(),
1485 value_arg: "call".to_string(),
1486 value_cbor: "read_state".to_string(),
1487 })
1488 }
1489 EnvelopeContent::Query { .. } => {
1490 return Err(AgentError::CallDataMismatch {
1491 field: "request_type".to_string(),
1492 value_arg: "call".to_string(),
1493 value_cbor: "query".to_string(),
1494 })
1495 }
1496 }
1497 Ok(())
1498}
1499
1500pub fn signed_request_status_inspect(
1503 sender: Principal,
1504 request_id: &RequestId,
1505 ingress_expiry: u64,
1506 signed_request_status: Vec<u8>,
1507) -> Result<(), AgentError> {
1508 let paths: Vec<Vec<Label>> = vec![vec!["request_status".into(), request_id.to_vec().into()]];
1509 let envelope: Envelope =
1510 serde_cbor::from_slice(&signed_request_status).map_err(AgentError::InvalidCborData)?;
1511 match envelope.content.as_ref() {
1512 EnvelopeContent::ReadState {
1513 ingress_expiry: ingress_expiry_cbor,
1514 sender: sender_cbor,
1515 paths: paths_cbor,
1516 } => {
1517 if ingress_expiry != *ingress_expiry_cbor {
1518 return Err(AgentError::CallDataMismatch {
1519 field: "ingress_expiry".to_string(),
1520 value_arg: ingress_expiry.to_string(),
1521 value_cbor: ingress_expiry_cbor.to_string(),
1522 });
1523 }
1524 if sender != *sender_cbor {
1525 return Err(AgentError::CallDataMismatch {
1526 field: "sender".to_string(),
1527 value_arg: sender.to_string(),
1528 value_cbor: sender_cbor.to_string(),
1529 });
1530 }
1531
1532 if paths != *paths_cbor {
1533 return Err(AgentError::CallDataMismatch {
1534 field: "paths".to_string(),
1535 value_arg: format!("{paths:?}"),
1536 value_cbor: format!("{paths_cbor:?}"),
1537 });
1538 }
1539 }
1540 EnvelopeContent::Query { .. } => {
1541 return Err(AgentError::CallDataMismatch {
1542 field: "request_type".to_string(),
1543 value_arg: "read_state".to_string(),
1544 value_cbor: "query".to_string(),
1545 })
1546 }
1547 EnvelopeContent::Call { .. } => {
1548 return Err(AgentError::CallDataMismatch {
1549 field: "request_type".to_string(),
1550 value_arg: "read_state".to_string(),
1551 value_cbor: "call".to_string(),
1552 })
1553 }
1554 }
1555 Ok(())
1556}
1557
1558#[derive(Clone)]
1559struct SubnetCache {
1560 subnets: TimedCache<Principal, Arc<Subnet>>,
1561 canister_index: RangeInclusiveMap<Principal, Principal, PrincipalStep>,
1562}
1563
1564impl SubnetCache {
1565 fn new() -> Self {
1566 Self {
1567 subnets: TimedCache::with_lifespan(300),
1568 canister_index: RangeInclusiveMap::new_with_step_fns(),
1569 }
1570 }
1571
1572 fn get_subnet_by_canister(&mut self, canister: &Principal) -> Option<Arc<Subnet>> {
1573 self.canister_index
1574 .get(canister)
1575 .and_then(|subnet_id| self.subnets.cache_get(subnet_id).cloned())
1576 .filter(|subnet| subnet.canister_ranges.contains(canister))
1577 }
1578
1579 fn insert_subnet(&mut self, subnet_id: Principal, subnet: Arc<Subnet>) {
1580 self.subnets.cache_set(subnet_id, subnet.clone());
1581 for range in subnet.canister_ranges.iter() {
1582 self.canister_index.insert(range.clone(), subnet_id);
1583 }
1584 }
1585}
1586
1587#[derive(Clone, Copy)]
1588struct PrincipalStep;
1589
1590impl StepFns<Principal> for PrincipalStep {
1591 fn add_one(start: &Principal) -> Principal {
1592 let bytes = start.as_slice();
1593 let mut arr = [0; 29];
1594 arr[..bytes.len()].copy_from_slice(bytes);
1595 for byte in arr[..bytes.len() - 1].iter_mut().rev() {
1596 *byte = byte.wrapping_add(1);
1597 if *byte != 0 {
1598 break;
1599 }
1600 }
1601 Principal::from_slice(&arr[..bytes.len()])
1602 }
1603 fn sub_one(start: &Principal) -> Principal {
1604 let bytes = start.as_slice();
1605 let mut arr = [0; 29];
1606 arr[..bytes.len()].copy_from_slice(bytes);
1607 for byte in arr[..bytes.len() - 1].iter_mut().rev() {
1608 *byte = byte.wrapping_sub(1);
1609 if *byte != 255 {
1610 break;
1611 }
1612 }
1613 Principal::from_slice(&arr[..bytes.len()])
1614 }
1615}
1616
1617#[derive(Clone)]
1618pub(crate) struct Subnet {
1619 _key: Vec<u8>,
1622 node_keys: HashMap<Principal, Vec<u8>>,
1623 canister_ranges: RangeInclusiveSet<Principal, PrincipalStep>,
1624}
1625
1626#[derive(Debug, Clone)]
1628pub struct ApiBoundaryNode {
1629 pub domain: String,
1631 pub ipv6_address: String,
1633 pub ipv4_address: Option<String>,
1635}
1636
1637#[derive(Debug, Clone)]
1641#[non_exhaustive]
1642pub struct QueryBuilder<'agent> {
1643 agent: &'agent Agent,
1644 pub effective_canister_id: Principal,
1646 pub canister_id: Principal,
1648 pub method_name: String,
1650 pub arg: Vec<u8>,
1652 pub ingress_expiry_datetime: Option<u64>,
1654 pub use_nonce: bool,
1656}
1657
1658impl<'agent> QueryBuilder<'agent> {
1659 pub fn new(agent: &'agent Agent, canister_id: Principal, method_name: String) -> Self {
1661 Self {
1662 agent,
1663 effective_canister_id: canister_id,
1664 canister_id,
1665 method_name,
1666 arg: vec![],
1667 ingress_expiry_datetime: None,
1668 use_nonce: false,
1669 }
1670 }
1671
1672 pub fn with_effective_canister_id(mut self, canister_id: Principal) -> Self {
1674 self.effective_canister_id = canister_id;
1675 self
1676 }
1677
1678 pub fn with_arg<A: Into<Vec<u8>>>(mut self, arg: A) -> Self {
1680 self.arg = arg.into();
1681 self
1682 }
1683
1684 pub fn expire_at(mut self, time: impl Into<OffsetDateTime>) -> Self {
1686 self.ingress_expiry_datetime = Some(time.into().unix_timestamp_nanos() as u64);
1687 self
1688 }
1689
1690 pub fn expire_after(mut self, duration: Duration) -> Self {
1692 self.ingress_expiry_datetime = Some(
1693 OffsetDateTime::now_utc()
1694 .saturating_add(duration.try_into().expect("negative duration"))
1695 .unix_timestamp_nanos() as u64,
1696 );
1697 self
1698 }
1699
1700 pub fn with_nonce_generation(mut self) -> Self {
1703 self.use_nonce = true;
1704 self
1705 }
1706
1707 pub async fn call(self) -> Result<Vec<u8>, AgentError> {
1709 self.agent
1710 .query_raw(
1711 self.canister_id,
1712 self.effective_canister_id,
1713 self.method_name,
1714 self.arg,
1715 self.ingress_expiry_datetime,
1716 self.use_nonce,
1717 None,
1718 )
1719 .await
1720 }
1721
1722 pub async fn call_with_verification(self) -> Result<Vec<u8>, AgentError> {
1727 self.agent
1728 .query_raw(
1729 self.canister_id,
1730 self.effective_canister_id,
1731 self.method_name,
1732 self.arg,
1733 self.ingress_expiry_datetime,
1734 self.use_nonce,
1735 Some(true),
1736 )
1737 .await
1738 }
1739
1740 pub async fn call_without_verification(self) -> Result<Vec<u8>, AgentError> {
1745 self.agent
1746 .query_raw(
1747 self.canister_id,
1748 self.effective_canister_id,
1749 self.method_name,
1750 self.arg,
1751 self.ingress_expiry_datetime,
1752 self.use_nonce,
1753 Some(false),
1754 )
1755 .await
1756 }
1757
1758 pub fn sign(self) -> Result<SignedQuery, AgentError> {
1761 let effective_canister_id = self.effective_canister_id;
1762 let identity = self.agent.identity.clone();
1763 let content = self.into_envelope()?;
1764 let signed_query = sign_envelope(&content, identity)?;
1765 let EnvelopeContent::Query {
1766 ingress_expiry,
1767 sender,
1768 canister_id,
1769 method_name,
1770 arg,
1771 nonce,
1772 } = content
1773 else {
1774 unreachable!()
1775 };
1776 Ok(SignedQuery {
1777 ingress_expiry,
1778 sender,
1779 canister_id,
1780 method_name,
1781 arg,
1782 effective_canister_id,
1783 signed_query,
1784 nonce,
1785 })
1786 }
1787
1788 pub fn into_envelope(self) -> Result<EnvelopeContent, AgentError> {
1790 self.agent.query_content(
1791 self.canister_id,
1792 self.method_name,
1793 self.arg,
1794 self.ingress_expiry_datetime,
1795 self.use_nonce,
1796 )
1797 }
1798}
1799
1800impl<'agent> IntoFuture for QueryBuilder<'agent> {
1801 type IntoFuture = AgentFuture<'agent, Vec<u8>>;
1802 type Output = Result<Vec<u8>, AgentError>;
1803 fn into_future(self) -> Self::IntoFuture {
1804 Box::pin(self.call())
1805 }
1806}
1807
1808pub struct UpdateCall<'agent> {
1810 agent: &'agent Agent,
1811 response_future: AgentFuture<'agent, CallResponse<(Vec<u8>, Certificate)>>,
1812 effective_canister_id: Principal,
1813 canister_id: Principal,
1814 method_name: String,
1815}
1816
1817impl fmt::Debug for UpdateCall<'_> {
1818 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1819 f.debug_struct("UpdateCall")
1820 .field("agent", &self.agent)
1821 .field("effective_canister_id", &self.effective_canister_id)
1822 .finish_non_exhaustive()
1823 }
1824}
1825
1826impl Future for UpdateCall<'_> {
1827 type Output = Result<CallResponse<(Vec<u8>, Certificate)>, AgentError>;
1828 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1829 self.response_future.as_mut().poll(cx)
1830 }
1831}
1832
1833impl<'a> UpdateCall<'a> {
1834 pub async fn and_wait(self) -> Result<(Vec<u8>, Certificate), AgentError> {
1836 let response = self.response_future.await?;
1837
1838 match response {
1839 CallResponse::Response(response) => Ok(response),
1840 CallResponse::Poll(request_id) => {
1841 self.agent
1842 .wait_inner(
1843 &request_id,
1844 self.effective_canister_id,
1845 Some(Operation::Call {
1846 canister: self.canister_id,
1847 method: self.method_name,
1848 }),
1849 )
1850 .await
1851 }
1852 }
1853 }
1854}
1855#[derive(Debug)]
1860pub struct UpdateBuilder<'agent> {
1861 agent: &'agent Agent,
1862 pub effective_canister_id: Principal,
1864 pub canister_id: Principal,
1866 pub method_name: String,
1868 pub arg: Vec<u8>,
1870 pub ingress_expiry_datetime: Option<u64>,
1872}
1873
1874impl<'agent> UpdateBuilder<'agent> {
1875 pub fn new(agent: &'agent Agent, canister_id: Principal, method_name: String) -> Self {
1877 Self {
1878 agent,
1879 effective_canister_id: canister_id,
1880 canister_id,
1881 method_name,
1882 arg: vec![],
1883 ingress_expiry_datetime: None,
1884 }
1885 }
1886
1887 pub fn with_effective_canister_id(mut self, canister_id: Principal) -> Self {
1889 self.effective_canister_id = canister_id;
1890 self
1891 }
1892
1893 pub fn with_arg<A: Into<Vec<u8>>>(mut self, arg: A) -> Self {
1895 self.arg = arg.into();
1896 self
1897 }
1898
1899 pub fn expire_at(mut self, time: impl Into<OffsetDateTime>) -> Self {
1901 self.ingress_expiry_datetime = Some(time.into().unix_timestamp_nanos() as u64);
1902 self
1903 }
1904
1905 pub fn expire_after(mut self, duration: Duration) -> Self {
1907 self.ingress_expiry_datetime = Some(
1908 OffsetDateTime::now_utc()
1909 .saturating_add(duration.try_into().expect("negative duration"))
1910 .unix_timestamp_nanos() as u64,
1911 );
1912 self
1913 }
1914
1915 pub async fn call_and_wait(self) -> Result<Vec<u8>, AgentError> {
1918 self.call().and_wait().await.map(|x| x.0)
1919 }
1920
1921 pub fn call(self) -> UpdateCall<'agent> {
1924 let method_name = self.method_name.clone();
1925 let response_future = async move {
1926 self.agent
1927 .update_raw(
1928 self.canister_id,
1929 self.effective_canister_id,
1930 self.method_name,
1931 self.arg,
1932 self.ingress_expiry_datetime,
1933 )
1934 .await
1935 };
1936 UpdateCall {
1937 agent: self.agent,
1938 response_future: Box::pin(response_future),
1939 effective_canister_id: self.effective_canister_id,
1940 canister_id: self.canister_id,
1941 method_name,
1942 }
1943 }
1944
1945 pub fn sign(self) -> Result<SignedUpdate, AgentError> {
1948 let identity = self.agent.identity.clone();
1949 let effective_canister_id = self.effective_canister_id;
1950 let content = self.into_envelope()?;
1951 let signed_update = sign_envelope(&content, identity)?;
1952 let request_id = to_request_id(&content)?;
1953 let EnvelopeContent::Call {
1954 nonce,
1955 ingress_expiry,
1956 sender,
1957 canister_id,
1958 method_name,
1959 arg,
1960 } = content
1961 else {
1962 unreachable!()
1963 };
1964 Ok(SignedUpdate {
1965 nonce,
1966 ingress_expiry,
1967 sender,
1968 canister_id,
1969 method_name,
1970 arg,
1971 effective_canister_id,
1972 signed_update,
1973 request_id,
1974 })
1975 }
1976
1977 pub fn into_envelope(self) -> Result<EnvelopeContent, AgentError> {
1979 let nonce = self.agent.nonce_factory.generate();
1980 self.agent.update_content(
1981 self.canister_id,
1982 self.method_name,
1983 self.arg,
1984 self.ingress_expiry_datetime,
1985 nonce,
1986 )
1987 }
1988}
1989
1990impl<'agent> IntoFuture for UpdateBuilder<'agent> {
1991 type IntoFuture = AgentFuture<'agent, Vec<u8>>;
1992 type Output = Result<Vec<u8>, AgentError>;
1993 fn into_future(self) -> Self::IntoFuture {
1994 Box::pin(self.call_and_wait())
1995 }
1996}
1997
1998#[cfg_attr(target_family = "wasm", async_trait(?Send))]
2000#[cfg_attr(not(target_family = "wasm"), async_trait)]
2001pub trait HttpService: Send + Sync + Debug {
2002 async fn call<'a>(
2004 &'a self,
2005 req: &'a (dyn Fn() -> Result<Request, AgentError> + Send + Sync),
2006 max_retries: usize,
2007 ) -> Result<Response, AgentError>;
2008}
2009#[cfg(not(target_family = "wasm"))]
2010#[async_trait]
2011impl<T> HttpService for T
2012where
2013 for<'a> &'a T: Service<Request, Response = Response, Error = reqwest::Error>,
2014 for<'a> <&'a Self as Service<Request>>::Future: Send,
2015 T: Send + Sync + Debug + ?Sized,
2016{
2017 #[allow(clippy::needless_arbitrary_self_type)]
2018 async fn call<'a>(
2019 mut self: &'a Self,
2020 req: &'a (dyn Fn() -> Result<Request, AgentError> + Send + Sync),
2021 max_retries: usize,
2022 ) -> Result<Response, AgentError> {
2023 let mut retry_count = 0;
2024 loop {
2025 match Service::call(&mut self, req()?).await {
2026 Err(err) => {
2027 if err.is_connect() {
2029 if retry_count >= max_retries {
2030 return Err(AgentError::TransportError(err));
2031 }
2032 retry_count += 1;
2033 }
2034 }
2035 Ok(resp) => return Ok(resp),
2036 }
2037 }
2038 }
2039}
2040
2041#[cfg(target_family = "wasm")]
2042#[async_trait(?Send)]
2043impl<T> HttpService for T
2044where
2045 for<'a> &'a T: Service<Request, Response = Response, Error = reqwest::Error>,
2046 T: Send + Sync + Debug + ?Sized,
2047{
2048 #[allow(clippy::needless_arbitrary_self_type)]
2049 async fn call<'a>(
2050 mut self: &'a Self,
2051 req: &'a (dyn Fn() -> Result<Request, AgentError> + Send + Sync),
2052 _: usize,
2053 ) -> Result<Response, AgentError> {
2054 Ok(Service::call(&mut self, req()?).await?)
2055 }
2056}
2057
2058#[derive(Debug)]
2059struct Retry429Logic {
2060 client: Client,
2061}
2062
2063#[cfg_attr(target_family = "wasm", async_trait(?Send))]
2064#[cfg_attr(not(target_family = "wasm"), async_trait)]
2065impl HttpService for Retry429Logic {
2066 async fn call<'a>(
2067 &'a self,
2068 req: &'a (dyn Fn() -> Result<Request, AgentError> + Send + Sync),
2069 _max_tcp_retries: usize,
2070 ) -> Result<Response, AgentError> {
2071 let mut retries = 0;
2072 loop {
2073 #[cfg(not(target_family = "wasm"))]
2074 let resp = self.client.call(req, _max_tcp_retries).await?;
2075 #[cfg(target_family = "wasm")]
2077 let resp = self.client.execute(req()?).await?;
2078 if resp.status() == StatusCode::TOO_MANY_REQUESTS {
2079 if retries == 6 {
2080 break Ok(resp);
2081 } else {
2082 retries += 1;
2083 crate::util::sleep(Duration::from_millis(250)).await;
2084 continue;
2085 }
2086 } else {
2087 break Ok(resp);
2088 }
2089 }
2090 }
2091}
2092
2093#[cfg(all(test, not(target_family = "wasm")))]
2094mod offline_tests {
2095 use super::*;
2096 use tokio::net::TcpListener;
2097 #[test]
2100 fn rounded_expiry() {
2101 let agent = Agent::builder()
2102 .with_url("http://not-a-real-url")
2103 .build()
2104 .unwrap();
2105 let mut prev_expiry = None;
2106 let mut num_timestamps = 0;
2107 for _ in 0..6 {
2108 let update = agent
2109 .update(&Principal::management_canister(), "not_a_method")
2110 .sign()
2111 .unwrap();
2112 if prev_expiry < Some(update.ingress_expiry) {
2113 prev_expiry = Some(update.ingress_expiry);
2114 num_timestamps += 1;
2115 }
2116 }
2117 assert!(num_timestamps <= 2, "num_timestamps:{num_timestamps} > 2");
2119 }
2120
2121 #[tokio::test]
2122 async fn client_ratelimit() {
2123 let mock_server = TcpListener::bind("127.0.0.1:0").await.unwrap();
2124 let count = Arc::new(Mutex::new(0));
2125 let port = mock_server.local_addr().unwrap().port();
2126 tokio::spawn({
2127 let count = count.clone();
2128 async move {
2129 loop {
2130 let (mut conn, _) = mock_server.accept().await.unwrap();
2131 *count.lock().unwrap() += 1;
2132 tokio::spawn(
2133 async move { tokio::io::copy(&mut conn, &mut tokio::io::sink()).await },
2135 );
2136 }
2137 }
2138 });
2139 let agent = Agent::builder()
2140 .with_http_client(Client::builder().http1_only().build().unwrap())
2141 .with_url(format!("http://127.0.0.1:{port}"))
2142 .with_max_concurrent_requests(2)
2143 .build()
2144 .unwrap();
2145 for _ in 0..3 {
2146 let agent = agent.clone();
2147 tokio::spawn(async move {
2148 agent
2149 .query(&"ryjl3-tyaaa-aaaaa-aaaba-cai".parse().unwrap(), "greet")
2150 .call()
2151 .await
2152 });
2153 }
2154 crate::util::sleep(Duration::from_millis(250)).await;
2155 assert_eq!(*count.lock().unwrap(), 2);
2156 }
2157}