1pub(crate) mod agent_config;
3pub mod agent_error;
4pub(crate) mod builder;
5#[doc(hidden)]
7#[deprecated(since = "0.38.0", note = "use the AgentBuilder methods")]
8pub mod http_transport;
9pub(crate) mod nonce;
10pub(crate) mod response_authentication;
11pub mod route_provider;
12pub mod status;
13
14pub use agent_config::AgentConfig;
15pub use agent_error::AgentError;
16use agent_error::{HttpErrorPayload, Operation};
17use async_lock::Semaphore;
18use async_trait::async_trait;
19pub use builder::AgentBuilder;
20use bytes::Bytes;
21use cached::{Cached, TimedCache};
22use http::{header::CONTENT_TYPE, HeaderMap, Method, StatusCode, Uri};
23use ic_ed25519::{PublicKey, SignatureError};
24#[doc(inline)]
25pub use ic_transport_types::{
26 signed, CallResponse, Envelope, EnvelopeContent, RejectCode, RejectResponse, ReplyResponse,
27 RequestStatusResponse,
28};
29pub use nonce::{NonceFactory, NonceGenerator};
30use rangemap::{RangeInclusiveMap, RangeInclusiveSet, StepFns};
31use reqwest::{Client, Request, Response};
32use route_provider::{
33 dynamic_routing::{
34 dynamic_route_provider::DynamicRouteProviderBuilder, node::Node,
35 snapshot::latency_based_routing::LatencyRoutingSnapshot,
36 },
37 RouteProvider, UrlUntilReady,
38};
39use time::OffsetDateTime;
40use tower_service::Service;
41
42#[cfg(test)]
43mod agent_test;
44
45use crate::{
46 agent::response_authentication::{
47 extract_der, lookup_canister_info, lookup_canister_metadata, lookup_request_status,
48 lookup_subnet, lookup_subnet_canister_ranges, lookup_subnet_metrics, lookup_time,
49 lookup_value,
50 },
51 agent_error::TransportError,
52 export::Principal,
53 identity::Identity,
54 to_request_id, RequestId,
55};
56use backoff::{backoff::Backoff, ExponentialBackoffBuilder};
57use backoff::{exponential::ExponentialBackoff, SystemClock};
58use ic_certification::{Certificate, Delegation, Label};
59use ic_transport_types::{
60 signed::{SignedQuery, SignedRequestStatus, SignedUpdate},
61 QueryResponse, ReadStateResponse, SubnetMetrics, TransportCallResponse,
62};
63use serde::Serialize;
64use status::Status;
65use std::{
66 borrow::Cow,
67 collections::HashMap,
68 convert::TryFrom,
69 fmt::{self, Debug},
70 future::{Future, IntoFuture},
71 pin::Pin,
72 str::FromStr,
73 sync::{Arc, Mutex, RwLock},
74 task::{Context, Poll},
75 time::Duration,
76};
77
78use crate::agent::response_authentication::lookup_api_boundary_nodes;
79
80const IC_STATE_ROOT_DOMAIN_SEPARATOR: &[u8; 14] = b"\x0Dic-state-root";
81
82const IC_ROOT_KEY: &[u8; 133] = b"\x30\x81\x82\x30\x1d\x06\x0d\x2b\x06\x01\x04\x01\x82\xdc\x7c\x05\x03\x01\x02\x01\x06\x0c\x2b\x06\x01\x04\x01\x82\xdc\x7c\x05\x03\x02\x01\x03\x61\x00\x81\x4c\x0e\x6e\xc7\x1f\xab\x58\x3b\x08\xbd\x81\x37\x3c\x25\x5c\x3c\x37\x1b\x2e\x84\x86\x3c\x98\xa4\xf1\xe0\x8b\x74\x23\x5d\x14\xfb\x5d\x9c\x0c\xd5\x46\xd9\x68\x5f\x91\x3a\x0c\x0b\x2c\xc5\x34\x15\x83\xbf\x4b\x43\x92\xe4\x67\xdb\x96\xd6\x5b\x9b\xb4\xcb\x71\x71\x12\xf8\x47\x2e\x0d\x5a\x4d\x14\x50\x5f\xfd\x74\x84\xb0\x12\x91\x09\x1c\x5f\x87\xb9\x88\x83\x46\x3f\x98\x09\x1a\x0b\xaa\xae";
83
84#[cfg(not(target_family = "wasm"))]
85type AgentFuture<'a, V> = Pin<Box<dyn Future<Output = Result<V, AgentError>> + Send + 'a>>;
86
87#[cfg(target_family = "wasm")]
88type AgentFuture<'a, V> = Pin<Box<dyn Future<Output = Result<V, AgentError>> + 'a>>;
89
90#[derive(Clone)]
154pub struct Agent {
155 nonce_factory: Arc<dyn NonceGenerator>,
156 identity: Arc<dyn Identity>,
157 ingress_expiry: Duration,
158 root_key: Arc<RwLock<Vec<u8>>>,
159 client: Arc<dyn HttpService>,
160 route_provider: Arc<dyn RouteProvider>,
161 subnet_key_cache: Arc<Mutex<SubnetCache>>,
162 concurrent_requests_semaphore: Arc<Semaphore>,
163 verify_query_signatures: bool,
164 max_response_body_size: Option<usize>,
165 max_polling_time: Duration,
166 #[allow(dead_code)]
167 max_tcp_error_retries: usize,
168}
169
170impl fmt::Debug for Agent {
171 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
172 f.debug_struct("Agent")
173 .field("ingress_expiry", &self.ingress_expiry)
174 .finish_non_exhaustive()
175 }
176}
177
178impl Agent {
179 pub fn builder() -> builder::AgentBuilder {
182 Default::default()
183 }
184
185 pub fn new(config: agent_config::AgentConfig) -> Result<Agent, AgentError> {
187 let client = config.http_service.unwrap_or_else(|| {
188 Arc::new(Retry429Logic {
189 client: config.client.unwrap_or_else(|| {
190 #[cfg(not(target_family = "wasm"))]
191 {
192 Client::builder()
193 .use_rustls_tls()
194 .timeout(Duration::from_secs(360))
195 .build()
196 .expect("Could not create HTTP client.")
197 }
198 #[cfg(all(target_family = "wasm", feature = "wasm-bindgen"))]
199 {
200 Client::new()
201 }
202 }),
203 })
204 });
205 Ok(Agent {
206 nonce_factory: config.nonce_factory,
207 identity: config.identity,
208 ingress_expiry: config.ingress_expiry,
209 root_key: Arc::new(RwLock::new(IC_ROOT_KEY.to_vec())),
210 client: client.clone(),
211 route_provider: if let Some(route_provider) = config.route_provider {
212 route_provider
213 } else if let Some(url) = config.url {
214 if config.background_dynamic_routing {
215 assert!(
216 url.scheme() == "https" && url.path() == "/" && url.port().is_none() && url.domain().is_some(),
217 "in dynamic routing mode, URL must be in the exact form https://domain with no path, port, IP, or non-HTTPS scheme"
218 );
219 let seeds = vec![Node::new(url.domain().unwrap()).unwrap()];
220 UrlUntilReady::new(url, async move {
221 DynamicRouteProviderBuilder::new(
222 LatencyRoutingSnapshot::new(),
223 seeds,
224 client,
225 )
226 .build()
227 .await
228 }) as Arc<dyn RouteProvider>
229 } else {
230 Arc::new(url)
231 }
232 } else {
233 panic!("either route_provider or url must be specified");
234 },
235 subnet_key_cache: Arc::new(Mutex::new(SubnetCache::new())),
236 verify_query_signatures: config.verify_query_signatures,
237 concurrent_requests_semaphore: Arc::new(Semaphore::new(config.max_concurrent_requests)),
238 max_response_body_size: config.max_response_body_size,
239 max_tcp_error_retries: config.max_tcp_error_retries,
240 max_polling_time: config.max_polling_time,
241 })
242 }
243
244 pub fn set_identity<I>(&mut self, identity: I)
250 where
251 I: 'static + Identity,
252 {
253 self.identity = Arc::new(identity);
254 }
255 pub fn set_arc_identity(&mut self, identity: Arc<dyn Identity>) {
261 self.identity = identity;
262 }
263
264 pub async fn fetch_root_key(&self) -> Result<(), AgentError> {
273 if self.read_root_key()[..] != IC_ROOT_KEY[..] {
274 return Ok(());
276 }
277 let status = self.status().await?;
278 let Some(root_key) = status.root_key else {
279 return Err(AgentError::NoRootKeyInStatus(status));
280 };
281 self.set_root_key(root_key);
282 Ok(())
283 }
284
285 pub fn set_root_key(&self, root_key: Vec<u8>) {
290 *self.root_key.write().unwrap() = root_key;
291 }
292
293 pub fn read_root_key(&self) -> Vec<u8> {
295 self.root_key.read().unwrap().clone()
296 }
297
298 fn get_expiry_date(&self) -> u64 {
299 let expiry_raw = OffsetDateTime::now_utc() + self.ingress_expiry;
300 let mut rounded = expiry_raw.replace_nanosecond(0).unwrap();
301 if self.ingress_expiry.as_secs() > 90 {
302 rounded = rounded.replace_second(0).unwrap();
303 }
304 rounded.unix_timestamp_nanos().try_into().unwrap()
305 }
306
307 pub fn get_principal(&self) -> Result<Principal, String> {
309 self.identity.sender()
310 }
311
312 async fn query_endpoint<A>(
313 &self,
314 effective_canister_id: Principal,
315 serialized_bytes: Vec<u8>,
316 ) -> Result<A, AgentError>
317 where
318 A: serde::de::DeserializeOwned,
319 {
320 let _permit = self.concurrent_requests_semaphore.acquire().await;
321 let bytes = self
322 .execute(
323 Method::POST,
324 &format!("api/v2/canister/{}/query", effective_canister_id.to_text()),
325 Some(serialized_bytes),
326 )
327 .await?
328 .1;
329 serde_cbor::from_slice(&bytes).map_err(AgentError::InvalidCborData)
330 }
331
332 async fn read_state_endpoint<A>(
333 &self,
334 effective_canister_id: Principal,
335 serialized_bytes: Vec<u8>,
336 ) -> Result<A, AgentError>
337 where
338 A: serde::de::DeserializeOwned,
339 {
340 let _permit = self.concurrent_requests_semaphore.acquire().await;
341 let endpoint = format!(
342 "api/v2/canister/{}/read_state",
343 effective_canister_id.to_text()
344 );
345 let bytes = self
346 .execute(Method::POST, &endpoint, Some(serialized_bytes))
347 .await?
348 .1;
349 serde_cbor::from_slice(&bytes).map_err(AgentError::InvalidCborData)
350 }
351
352 async fn read_subnet_state_endpoint<A>(
353 &self,
354 subnet_id: Principal,
355 serialized_bytes: Vec<u8>,
356 ) -> Result<A, AgentError>
357 where
358 A: serde::de::DeserializeOwned,
359 {
360 let _permit = self.concurrent_requests_semaphore.acquire().await;
361 let endpoint = format!("api/v2/subnet/{}/read_state", subnet_id.to_text());
362 let bytes = self
363 .execute(Method::POST, &endpoint, Some(serialized_bytes))
364 .await?
365 .1;
366 serde_cbor::from_slice(&bytes).map_err(AgentError::InvalidCborData)
367 }
368
369 async fn call_endpoint(
370 &self,
371 effective_canister_id: Principal,
372 serialized_bytes: Vec<u8>,
373 ) -> Result<TransportCallResponse, AgentError> {
374 let _permit = self.concurrent_requests_semaphore.acquire().await;
375 let endpoint = format!("api/v3/canister/{}/call", effective_canister_id.to_text());
376 let (status_code, response_body) = self
377 .execute(Method::POST, &endpoint, Some(serialized_bytes))
378 .await?;
379
380 if status_code == StatusCode::ACCEPTED {
381 return Ok(TransportCallResponse::Accepted);
382 }
383
384 serde_cbor::from_slice(&response_body).map_err(AgentError::InvalidCborData)
385 }
386
387 #[allow(clippy::too_many_arguments)]
390 async fn query_raw(
391 &self,
392 canister_id: Principal,
393 effective_canister_id: Principal,
394 method_name: String,
395 arg: Vec<u8>,
396 ingress_expiry_datetime: Option<u64>,
397 use_nonce: bool,
398 explicit_verify_query_signatures: Option<bool>,
399 ) -> Result<Vec<u8>, AgentError> {
400 let operation = Operation::Call {
401 canister: canister_id,
402 method: method_name.clone(),
403 };
404 let content = self.query_content(
405 canister_id,
406 method_name,
407 arg,
408 ingress_expiry_datetime,
409 use_nonce,
410 )?;
411 let serialized_bytes = sign_envelope(&content, self.identity.clone())?;
412 self.query_inner(
413 effective_canister_id,
414 serialized_bytes,
415 content.to_request_id(),
416 explicit_verify_query_signatures,
417 operation,
418 )
419 .await
420 }
421
422 pub async fn query_signed(
426 &self,
427 effective_canister_id: Principal,
428 signed_query: Vec<u8>,
429 ) -> Result<Vec<u8>, AgentError> {
430 let envelope: Envelope =
431 serde_cbor::from_slice(&signed_query).map_err(AgentError::InvalidCborData)?;
432 let EnvelopeContent::Query {
433 canister_id,
434 method_name,
435 ..
436 } = &*envelope.content
437 else {
438 return Err(AgentError::CallDataMismatch {
439 field: "request_type".to_string(),
440 value_arg: "query".to_string(),
441 value_cbor: if matches!(*envelope.content, EnvelopeContent::Call { .. }) {
442 "update"
443 } else {
444 "read_state"
445 }
446 .to_string(),
447 });
448 };
449 let operation = Operation::Call {
450 canister: *canister_id,
451 method: method_name.clone(),
452 };
453 self.query_inner(
454 effective_canister_id,
455 signed_query,
456 envelope.content.to_request_id(),
457 None,
458 operation,
459 )
460 .await
461 }
462
463 async fn query_inner(
467 &self,
468 effective_canister_id: Principal,
469 signed_query: Vec<u8>,
470 request_id: RequestId,
471 explicit_verify_query_signatures: Option<bool>,
472 operation: Operation,
473 ) -> Result<Vec<u8>, AgentError> {
474 let response = if explicit_verify_query_signatures.unwrap_or(self.verify_query_signatures) {
475 let (response, mut subnet) = futures_util::try_join!(
476 self.query_endpoint::<QueryResponse>(effective_canister_id, signed_query),
477 self.get_subnet_by_canister(&effective_canister_id)
478 )?;
479 if response.signatures().is_empty() {
480 return Err(AgentError::MissingSignature);
481 } else if response.signatures().len() > subnet.node_keys.len() {
482 return Err(AgentError::TooManySignatures {
483 had: response.signatures().len(),
484 needed: subnet.node_keys.len(),
485 });
486 }
487 for signature in response.signatures() {
488 if OffsetDateTime::now_utc()
489 - OffsetDateTime::from_unix_timestamp_nanos(signature.timestamp.into()).unwrap()
490 > self.ingress_expiry
491 {
492 return Err(AgentError::CertificateOutdated(self.ingress_expiry));
493 }
494 let signable = response.signable(request_id, signature.timestamp);
495 let node_key = if let Some(node_key) = subnet.node_keys.get(&signature.identity) {
496 node_key
497 } else {
498 subnet = self
499 .fetch_subnet_by_canister(&effective_canister_id)
500 .await?;
501 subnet
502 .node_keys
503 .get(&signature.identity)
504 .ok_or(AgentError::CertificateNotAuthorized())?
505 };
506 if node_key.len() != 44 {
507 return Err(AgentError::DerKeyLengthMismatch {
508 expected: 44,
509 actual: node_key.len(),
510 });
511 }
512 const DER_PREFIX: [u8; 12] = [48, 42, 48, 5, 6, 3, 43, 101, 112, 3, 33, 0];
513 if node_key[..12] != DER_PREFIX {
514 return Err(AgentError::DerPrefixMismatch {
515 expected: DER_PREFIX.to_vec(),
516 actual: node_key[..12].to_vec(),
517 });
518 }
519 let pubkey = PublicKey::deserialize_raw(&node_key[12..])
520 .map_err(|_| AgentError::MalformedPublicKey)?;
521
522 match pubkey.verify_signature(&signable, &signature.signature[..]) {
523 Ok(()) => (),
524 Err(SignatureError::InvalidSignature) => {
525 return Err(AgentError::QuerySignatureVerificationFailed)
526 }
527 Err(SignatureError::InvalidLength) => {
528 return Err(AgentError::MalformedSignature)
529 }
530 _ => unreachable!(),
531 }
532 }
533 response
534 } else {
535 self.query_endpoint::<QueryResponse>(effective_canister_id, signed_query)
536 .await?
537 };
538
539 match response {
540 QueryResponse::Replied { reply, .. } => Ok(reply.arg),
541 QueryResponse::Rejected { reject, .. } => Err(AgentError::UncertifiedReject {
542 reject,
543 operation: Some(operation),
544 }),
545 }
546 }
547
548 fn query_content(
549 &self,
550 canister_id: Principal,
551 method_name: String,
552 arg: Vec<u8>,
553 ingress_expiry_datetime: Option<u64>,
554 use_nonce: bool,
555 ) -> Result<EnvelopeContent, AgentError> {
556 Ok(EnvelopeContent::Query {
557 sender: self.identity.sender().map_err(AgentError::SigningError)?,
558 canister_id,
559 method_name,
560 arg,
561 ingress_expiry: ingress_expiry_datetime.unwrap_or_else(|| self.get_expiry_date()),
562 nonce: use_nonce.then(|| self.nonce_factory.generate()).flatten(),
563 })
564 }
565
566 async fn update_raw(
568 &self,
569 canister_id: Principal,
570 effective_canister_id: Principal,
571 method_name: String,
572 arg: Vec<u8>,
573 ingress_expiry_datetime: Option<u64>,
574 ) -> Result<CallResponse<(Vec<u8>, Certificate)>, AgentError> {
575 let nonce = self.nonce_factory.generate();
576 let content = self.update_content(
577 canister_id,
578 method_name.clone(),
579 arg,
580 ingress_expiry_datetime,
581 nonce,
582 )?;
583 let operation = Some(Operation::Call {
584 canister: canister_id,
585 method: method_name,
586 });
587 let request_id = to_request_id(&content)?;
588 let serialized_bytes = sign_envelope(&content, self.identity.clone())?;
589
590 let response_body = self
591 .call_endpoint(effective_canister_id, serialized_bytes)
592 .await?;
593
594 match response_body {
595 TransportCallResponse::Replied { certificate } => {
596 let certificate =
597 serde_cbor::from_slice(&certificate).map_err(AgentError::InvalidCborData)?;
598
599 self.verify(&certificate, effective_canister_id)?;
600 let status = lookup_request_status(&certificate, &request_id)?;
601
602 match status {
603 RequestStatusResponse::Replied(reply) => {
604 Ok(CallResponse::Response((reply.arg, certificate)))
605 }
606 RequestStatusResponse::Rejected(reject_response) => {
607 Err(AgentError::CertifiedReject {
608 reject: reject_response,
609 operation,
610 })?
611 }
612 _ => Ok(CallResponse::Poll(request_id)),
613 }
614 }
615 TransportCallResponse::Accepted => Ok(CallResponse::Poll(request_id)),
616 TransportCallResponse::NonReplicatedRejection(reject_response) => {
617 Err(AgentError::UncertifiedReject {
618 reject: reject_response,
619 operation,
620 })
621 }
622 }
623 }
624
625 pub async fn update_signed(
629 &self,
630 effective_canister_id: Principal,
631 signed_update: Vec<u8>,
632 ) -> Result<CallResponse<Vec<u8>>, AgentError> {
633 let envelope: Envelope =
634 serde_cbor::from_slice(&signed_update).map_err(AgentError::InvalidCborData)?;
635 let EnvelopeContent::Call {
636 canister_id,
637 method_name,
638 ..
639 } = &*envelope.content
640 else {
641 return Err(AgentError::CallDataMismatch {
642 field: "request_type".to_string(),
643 value_arg: "update".to_string(),
644 value_cbor: if matches!(*envelope.content, EnvelopeContent::Query { .. }) {
645 "query"
646 } else {
647 "read_state"
648 }
649 .to_string(),
650 });
651 };
652 let operation = Some(Operation::Call {
653 canister: *canister_id,
654 method: method_name.clone(),
655 });
656 let request_id = to_request_id(&envelope.content)?;
657
658 let response_body = self
659 .call_endpoint(effective_canister_id, signed_update)
660 .await?;
661
662 match response_body {
663 TransportCallResponse::Replied { certificate } => {
664 let certificate =
665 serde_cbor::from_slice(&certificate).map_err(AgentError::InvalidCborData)?;
666
667 self.verify(&certificate, effective_canister_id)?;
668 let status = lookup_request_status(&certificate, &request_id)?;
669
670 match status {
671 RequestStatusResponse::Replied(reply) => Ok(CallResponse::Response(reply.arg)),
672 RequestStatusResponse::Rejected(reject_response) => {
673 Err(AgentError::CertifiedReject {
674 reject: reject_response,
675 operation,
676 })?
677 }
678 _ => Ok(CallResponse::Poll(request_id)),
679 }
680 }
681 TransportCallResponse::Accepted => Ok(CallResponse::Poll(request_id)),
682 TransportCallResponse::NonReplicatedRejection(reject_response) => {
683 Err(AgentError::UncertifiedReject {
684 reject: reject_response,
685 operation,
686 })
687 }
688 }
689 }
690
691 fn update_content(
692 &self,
693 canister_id: Principal,
694 method_name: String,
695 arg: Vec<u8>,
696 ingress_expiry_datetime: Option<u64>,
697 nonce: Option<Vec<u8>>,
698 ) -> Result<EnvelopeContent, AgentError> {
699 Ok(EnvelopeContent::Call {
700 canister_id,
701 method_name,
702 arg,
703 nonce,
704 sender: self.identity.sender().map_err(AgentError::SigningError)?,
705 ingress_expiry: ingress_expiry_datetime.unwrap_or_else(|| self.get_expiry_date()),
706 })
707 }
708
709 fn get_retry_policy(&self) -> ExponentialBackoff<SystemClock> {
710 ExponentialBackoffBuilder::new()
711 .with_initial_interval(Duration::from_millis(500))
712 .with_max_interval(Duration::from_secs(1))
713 .with_multiplier(1.4)
714 .with_max_elapsed_time(Some(self.max_polling_time))
715 .build()
716 }
717
718 pub async fn wait_signed(
720 &self,
721 request_id: &RequestId,
722 effective_canister_id: Principal,
723 signed_request_status: Vec<u8>,
724 ) -> Result<(Vec<u8>, Certificate), AgentError> {
725 let mut retry_policy = self.get_retry_policy();
726
727 let mut request_accepted = false;
728 loop {
729 let (resp, cert) = self
730 .request_status_signed(
731 request_id,
732 effective_canister_id,
733 signed_request_status.clone(),
734 )
735 .await?;
736 match resp {
737 RequestStatusResponse::Unknown => {}
738
739 RequestStatusResponse::Received | RequestStatusResponse::Processing => {
740 if !request_accepted {
741 retry_policy.reset();
742 request_accepted = true;
743 }
744 }
745
746 RequestStatusResponse::Replied(ReplyResponse { arg, .. }) => {
747 return Ok((arg, cert))
748 }
749
750 RequestStatusResponse::Rejected(response) => {
751 return Err(AgentError::CertifiedReject {
752 reject: response,
753 operation: None,
754 })
755 }
756
757 RequestStatusResponse::Done => {
758 return Err(AgentError::RequestStatusDoneNoReply(String::from(
759 *request_id,
760 )))
761 }
762 };
763
764 match retry_policy.next_backoff() {
765 Some(duration) => crate::util::sleep(duration).await,
766
767 None => return Err(AgentError::TimeoutWaitingForResponse()),
768 }
769 }
770 }
771
772 pub async fn wait(
774 &self,
775 request_id: &RequestId,
776 effective_canister_id: Principal,
777 ) -> Result<(Vec<u8>, Certificate), AgentError> {
778 self.wait_inner(request_id, effective_canister_id, None)
779 .await
780 }
781
782 async fn wait_inner(
783 &self,
784 request_id: &RequestId,
785 effective_canister_id: Principal,
786 operation: Option<Operation>,
787 ) -> Result<(Vec<u8>, Certificate), AgentError> {
788 let mut retry_policy = self.get_retry_policy();
789
790 let mut request_accepted = false;
791 loop {
792 let (resp, cert) = self
793 .request_status_raw(request_id, effective_canister_id)
794 .await?;
795 match resp {
796 RequestStatusResponse::Unknown => {}
797
798 RequestStatusResponse::Received | RequestStatusResponse::Processing => {
799 if !request_accepted {
800 retry_policy.reset();
808 request_accepted = true;
809 }
810 }
811
812 RequestStatusResponse::Replied(ReplyResponse { arg, .. }) => {
813 return Ok((arg, cert))
814 }
815
816 RequestStatusResponse::Rejected(response) => {
817 return Err(AgentError::CertifiedReject {
818 reject: response,
819 operation,
820 })
821 }
822
823 RequestStatusResponse::Done => {
824 return Err(AgentError::RequestStatusDoneNoReply(String::from(
825 *request_id,
826 )))
827 }
828 };
829
830 match retry_policy.next_backoff() {
831 Some(duration) => crate::util::sleep(duration).await,
832
833 None => return Err(AgentError::TimeoutWaitingForResponse()),
834 }
835 }
836 }
837
838 pub async fn read_state_raw(
841 &self,
842 paths: Vec<Vec<Label>>,
843 effective_canister_id: Principal,
844 ) -> Result<Certificate, AgentError> {
845 let content = self.read_state_content(paths)?;
846 let serialized_bytes = sign_envelope(&content, self.identity.clone())?;
847
848 let read_state_response: ReadStateResponse = self
849 .read_state_endpoint(effective_canister_id, serialized_bytes)
850 .await?;
851 let cert: Certificate = serde_cbor::from_slice(&read_state_response.certificate)
852 .map_err(AgentError::InvalidCborData)?;
853 self.verify(&cert, effective_canister_id)?;
854 Ok(cert)
855 }
856
857 pub async fn read_subnet_state_raw(
860 &self,
861 paths: Vec<Vec<Label>>,
862 subnet_id: Principal,
863 ) -> Result<Certificate, AgentError> {
864 let content = self.read_state_content(paths)?;
865 let serialized_bytes = sign_envelope(&content, self.identity.clone())?;
866
867 let read_state_response: ReadStateResponse = self
868 .read_subnet_state_endpoint(subnet_id, serialized_bytes)
869 .await?;
870 let cert: Certificate = serde_cbor::from_slice(&read_state_response.certificate)
871 .map_err(AgentError::InvalidCborData)?;
872 self.verify_for_subnet(&cert, subnet_id)?;
873 Ok(cert)
874 }
875
876 fn read_state_content(&self, paths: Vec<Vec<Label>>) -> Result<EnvelopeContent, AgentError> {
877 Ok(EnvelopeContent::ReadState {
878 sender: self.identity.sender().map_err(AgentError::SigningError)?,
879 paths,
880 ingress_expiry: self.get_expiry_date(),
881 })
882 }
883
884 pub fn verify(
887 &self,
888 cert: &Certificate,
889 effective_canister_id: Principal,
890 ) -> Result<(), AgentError> {
891 self.verify_cert(cert, effective_canister_id)?;
892 self.verify_cert_timestamp(cert)?;
893 Ok(())
894 }
895
896 fn verify_cert(
897 &self,
898 cert: &Certificate,
899 effective_canister_id: Principal,
900 ) -> Result<(), AgentError> {
901 let sig = &cert.signature;
902
903 let root_hash = cert.tree.digest();
904 let mut msg = vec![];
905 msg.extend_from_slice(IC_STATE_ROOT_DOMAIN_SEPARATOR);
906 msg.extend_from_slice(&root_hash);
907
908 let der_key = self.check_delegation(&cert.delegation, effective_canister_id)?;
909 let key = extract_der(der_key)?;
910
911 ic_verify_bls_signature::verify_bls_signature(sig, &msg, &key)
912 .map_err(|_| AgentError::CertificateVerificationFailed())?;
913 Ok(())
914 }
915
916 pub fn verify_for_subnet(
919 &self,
920 cert: &Certificate,
921 subnet_id: Principal,
922 ) -> Result<(), AgentError> {
923 self.verify_cert_for_subnet(cert, subnet_id)?;
924 self.verify_cert_timestamp(cert)?;
925 Ok(())
926 }
927
928 fn verify_cert_for_subnet(
929 &self,
930 cert: &Certificate,
931 subnet_id: Principal,
932 ) -> Result<(), AgentError> {
933 let sig = &cert.signature;
934
935 let root_hash = cert.tree.digest();
936 let mut msg = vec![];
937 msg.extend_from_slice(IC_STATE_ROOT_DOMAIN_SEPARATOR);
938 msg.extend_from_slice(&root_hash);
939
940 let der_key = self.check_delegation_for_subnet(&cert.delegation, subnet_id)?;
941 let key = extract_der(der_key)?;
942
943 ic_verify_bls_signature::verify_bls_signature(sig, &msg, &key)
944 .map_err(|_| AgentError::CertificateVerificationFailed())?;
945 Ok(())
946 }
947
948 fn verify_cert_timestamp(&self, cert: &Certificate) -> Result<(), AgentError> {
949 let time = lookup_time(cert)?;
950 if (OffsetDateTime::now_utc()
951 - OffsetDateTime::from_unix_timestamp_nanos(time.into()).unwrap())
952 .abs()
953 > self.ingress_expiry
954 {
955 Err(AgentError::CertificateOutdated(self.ingress_expiry))
956 } else {
957 Ok(())
958 }
959 }
960
961 fn check_delegation(
962 &self,
963 delegation: &Option<Delegation>,
964 effective_canister_id: Principal,
965 ) -> Result<Vec<u8>, AgentError> {
966 match delegation {
967 None => Ok(self.read_root_key()),
968 Some(delegation) => {
969 let cert: Certificate = serde_cbor::from_slice(&delegation.certificate)
970 .map_err(AgentError::InvalidCborData)?;
971 if cert.delegation.is_some() {
972 return Err(AgentError::CertificateHasTooManyDelegations);
973 }
974 self.verify_cert(&cert, effective_canister_id)?;
975 let canister_range_lookup = [
976 "subnet".as_bytes(),
977 delegation.subnet_id.as_ref(),
978 "canister_ranges".as_bytes(),
979 ];
980 let canister_range = lookup_value(&cert.tree, canister_range_lookup)?;
981 let ranges: Vec<(Principal, Principal)> =
982 serde_cbor::from_slice(canister_range).map_err(AgentError::InvalidCborData)?;
983 if !principal_is_within_ranges(&effective_canister_id, &ranges[..]) {
984 return Err(AgentError::CertificateNotAuthorized());
986 }
987
988 let public_key_path = [
989 "subnet".as_bytes(),
990 delegation.subnet_id.as_ref(),
991 "public_key".as_bytes(),
992 ];
993 lookup_value(&cert.tree, public_key_path).map(<[u8]>::to_vec)
994 }
995 }
996 }
997
998 fn check_delegation_for_subnet(
999 &self,
1000 delegation: &Option<Delegation>,
1001 subnet_id: Principal,
1002 ) -> Result<Vec<u8>, AgentError> {
1003 match delegation {
1004 None => Ok(self.read_root_key()),
1005 Some(delegation) => {
1006 let cert: Certificate = serde_cbor::from_slice(&delegation.certificate)
1007 .map_err(AgentError::InvalidCborData)?;
1008 if cert.delegation.is_some() {
1009 return Err(AgentError::CertificateHasTooManyDelegations);
1010 }
1011 self.verify_cert_for_subnet(&cert, subnet_id)?;
1012 let public_key_path = [
1013 "subnet".as_bytes(),
1014 delegation.subnet_id.as_ref(),
1015 "public_key".as_bytes(),
1016 ];
1017 let pk = lookup_value(&cert.tree, public_key_path)
1018 .map_err(|_| AgentError::CertificateNotAuthorized())?
1019 .to_vec();
1020 Ok(pk)
1021 }
1022 }
1023 }
1024
1025 pub async fn read_state_canister_info(
1028 &self,
1029 canister_id: Principal,
1030 path: &str,
1031 ) -> Result<Vec<u8>, AgentError> {
1032 let paths: Vec<Vec<Label>> = vec![vec![
1033 "canister".into(),
1034 Label::from_bytes(canister_id.as_slice()),
1035 path.into(),
1036 ]];
1037
1038 let cert = self.read_state_raw(paths, canister_id).await?;
1039
1040 lookup_canister_info(cert, canister_id, path)
1041 }
1042
1043 pub async fn read_state_canister_controllers(
1045 &self,
1046 canister_id: Principal,
1047 ) -> Result<Vec<Principal>, AgentError> {
1048 let blob = self
1049 .read_state_canister_info(canister_id, "controllers")
1050 .await?;
1051 let controllers: Vec<Principal> =
1052 serde_cbor::from_slice(&blob).map_err(AgentError::InvalidCborData)?;
1053 Ok(controllers)
1054 }
1055
1056 pub async fn read_state_canister_module_hash(
1058 &self,
1059 canister_id: Principal,
1060 ) -> Result<Vec<u8>, AgentError> {
1061 self.read_state_canister_info(canister_id, "module_hash")
1062 .await
1063 }
1064
1065 pub async fn read_state_canister_metadata(
1067 &self,
1068 canister_id: Principal,
1069 path: &str,
1070 ) -> Result<Vec<u8>, AgentError> {
1071 let paths: Vec<Vec<Label>> = vec![vec![
1072 "canister".into(),
1073 Label::from_bytes(canister_id.as_slice()),
1074 "metadata".into(),
1075 path.into(),
1076 ]];
1077
1078 let cert = self.read_state_raw(paths, canister_id).await?;
1079
1080 lookup_canister_metadata(cert, canister_id, path)
1081 }
1082
1083 pub async fn read_state_subnet_metrics(
1085 &self,
1086 subnet_id: Principal,
1087 ) -> Result<SubnetMetrics, AgentError> {
1088 let paths = vec![vec![
1089 "subnet".into(),
1090 Label::from_bytes(subnet_id.as_slice()),
1091 "metrics".into(),
1092 ]];
1093 let cert = self.read_subnet_state_raw(paths, subnet_id).await?;
1094 lookup_subnet_metrics(cert, subnet_id)
1095 }
1096
1097 pub async fn read_state_subnet_canister_ranges(
1099 &self,
1100 subnet_id: Principal,
1101 ) -> Result<Vec<(Principal, Principal)>, AgentError> {
1102 let paths = vec![vec![
1103 "subnet".into(),
1104 Label::from_bytes(subnet_id.as_slice()),
1105 "canister_ranges".into(),
1106 ]];
1107 let cert = self.read_subnet_state_raw(paths, subnet_id).await?;
1108 lookup_subnet_canister_ranges(cert, subnet_id)
1109 }
1110
1111 pub async fn request_status_raw(
1113 &self,
1114 request_id: &RequestId,
1115 effective_canister_id: Principal,
1116 ) -> Result<(RequestStatusResponse, Certificate), AgentError> {
1117 let paths: Vec<Vec<Label>> =
1118 vec![vec!["request_status".into(), request_id.to_vec().into()]];
1119
1120 let cert = self.read_state_raw(paths, effective_canister_id).await?;
1121
1122 Ok((lookup_request_status(&cert, request_id)?, cert))
1123 }
1124
1125 pub async fn request_status_signed(
1129 &self,
1130 request_id: &RequestId,
1131 effective_canister_id: Principal,
1132 signed_request_status: Vec<u8>,
1133 ) -> Result<(RequestStatusResponse, Certificate), AgentError> {
1134 let _envelope: Envelope =
1135 serde_cbor::from_slice(&signed_request_status).map_err(AgentError::InvalidCborData)?;
1136 let read_state_response: ReadStateResponse = self
1137 .read_state_endpoint(effective_canister_id, signed_request_status)
1138 .await?;
1139
1140 let cert: Certificate = serde_cbor::from_slice(&read_state_response.certificate)
1141 .map_err(AgentError::InvalidCborData)?;
1142 self.verify(&cert, effective_canister_id)?;
1143 Ok((lookup_request_status(&cert, request_id)?, cert))
1144 }
1145
1146 pub fn update<S: Into<String>>(
1149 &self,
1150 canister_id: &Principal,
1151 method_name: S,
1152 ) -> UpdateBuilder {
1153 UpdateBuilder::new(self, *canister_id, method_name.into())
1154 }
1155
1156 pub async fn status(&self) -> Result<Status, AgentError> {
1158 let endpoint = "api/v2/status";
1159 let bytes = self.execute(Method::GET, endpoint, None).await?.1;
1160
1161 let cbor: serde_cbor::Value =
1162 serde_cbor::from_slice(&bytes).map_err(AgentError::InvalidCborData)?;
1163
1164 Status::try_from(&cbor).map_err(|_| AgentError::InvalidReplicaStatus)
1165 }
1166
1167 pub fn query<S: Into<String>>(&self, canister_id: &Principal, method_name: S) -> QueryBuilder {
1170 QueryBuilder::new(self, *canister_id, method_name.into())
1171 }
1172
1173 pub fn sign_request_status(
1176 &self,
1177 effective_canister_id: Principal,
1178 request_id: RequestId,
1179 ) -> Result<SignedRequestStatus, AgentError> {
1180 let paths: Vec<Vec<Label>> =
1181 vec![vec!["request_status".into(), request_id.to_vec().into()]];
1182 let read_state_content = self.read_state_content(paths)?;
1183 let signed_request_status = sign_envelope(&read_state_content, self.identity.clone())?;
1184 let ingress_expiry = read_state_content.ingress_expiry();
1185 let sender = *read_state_content.sender();
1186 Ok(SignedRequestStatus {
1187 ingress_expiry,
1188 sender,
1189 effective_canister_id,
1190 request_id,
1191 signed_request_status,
1192 })
1193 }
1194
1195 async fn get_subnet_by_canister(
1196 &self,
1197 canister: &Principal,
1198 ) -> Result<Arc<Subnet>, AgentError> {
1199 let subnet = self
1200 .subnet_key_cache
1201 .lock()
1202 .unwrap()
1203 .get_subnet_by_canister(canister);
1204 if let Some(subnet) = subnet {
1205 Ok(subnet)
1206 } else {
1207 self.fetch_subnet_by_canister(canister).await
1208 }
1209 }
1210
1211 pub async fn fetch_api_boundary_nodes_by_canister_id(
1213 &self,
1214 canister_id: Principal,
1215 ) -> Result<Vec<ApiBoundaryNode>, AgentError> {
1216 let paths = vec![vec!["api_boundary_nodes".into()]];
1217 let certificate = self.read_state_raw(paths, canister_id).await?;
1218 let api_boundary_nodes = lookup_api_boundary_nodes(certificate)?;
1219 Ok(api_boundary_nodes)
1220 }
1221
1222 pub async fn fetch_api_boundary_nodes_by_subnet_id(
1224 &self,
1225 subnet_id: Principal,
1226 ) -> Result<Vec<ApiBoundaryNode>, AgentError> {
1227 let paths = vec![vec!["api_boundary_nodes".into()]];
1228 let certificate = self.read_subnet_state_raw(paths, subnet_id).await?;
1229 let api_boundary_nodes = lookup_api_boundary_nodes(certificate)?;
1230 Ok(api_boundary_nodes)
1231 }
1232
1233 async fn fetch_subnet_by_canister(
1234 &self,
1235 canister: &Principal,
1236 ) -> Result<Arc<Subnet>, AgentError> {
1237 let cert = self
1238 .read_state_raw(vec![vec!["subnet".into()]], *canister)
1239 .await?;
1240
1241 let (subnet_id, subnet) = lookup_subnet(&cert, &self.root_key.read().unwrap())?;
1242 if !subnet.canister_ranges.contains(canister) {
1243 return Err(AgentError::CertificateNotAuthorized());
1244 }
1245 let subnet = Arc::new(subnet);
1246 self.subnet_key_cache
1247 .lock()
1248 .unwrap()
1249 .insert_subnet(subnet_id, subnet.clone());
1250 Ok(subnet)
1251 }
1252
1253 async fn request(
1254 &self,
1255 method: Method,
1256 endpoint: &str,
1257 body: Option<Vec<u8>>,
1258 ) -> Result<(StatusCode, HeaderMap, Vec<u8>), AgentError> {
1259 let body = body.map(Bytes::from);
1260
1261 let create_request_with_generated_url = || -> Result<http::Request<Bytes>, AgentError> {
1262 let url = self.route_provider.route()?.join(endpoint)?;
1263 let uri = Uri::from_str(url.as_str())
1264 .map_err(|e| AgentError::InvalidReplicaUrl(e.to_string()))?;
1265 let body = body.clone().unwrap_or_default();
1266 let request = http::Request::builder()
1267 .method(method.clone())
1268 .uri(uri)
1269 .header(CONTENT_TYPE, "application/cbor")
1270 .body(body)
1271 .map_err(|e| {
1272 AgentError::TransportError(TransportError::Generic(format!(
1273 "unable to create request: {e:#}"
1274 )))
1275 })?;
1276
1277 Ok(request)
1278 };
1279
1280 let response = self
1281 .client
1282 .call(
1283 &create_request_with_generated_url,
1284 self.max_tcp_error_retries,
1285 self.max_response_body_size,
1286 )
1287 .await?;
1288
1289 let (parts, body) = response.into_parts();
1290
1291 Ok((parts.status, parts.headers, body.to_vec()))
1292 }
1293
1294 async fn execute(
1295 &self,
1296 method: Method,
1297 endpoint: &str,
1298 body: Option<Vec<u8>>,
1299 ) -> Result<(StatusCode, Vec<u8>), AgentError> {
1300 let request_result = self.request(method.clone(), endpoint, body.clone()).await?;
1301
1302 let status = request_result.0;
1303 let headers = request_result.1;
1304 let body = request_result.2;
1305
1306 if status.is_client_error() || status.is_server_error() {
1307 Err(AgentError::HttpError(HttpErrorPayload {
1308 status: status.into(),
1309 content_type: headers
1310 .get(CONTENT_TYPE)
1311 .and_then(|value| value.to_str().ok())
1312 .map(str::to_string),
1313 content: body,
1314 }))
1315 } else if !(status == StatusCode::OK || status == StatusCode::ACCEPTED) {
1316 Err(AgentError::InvalidHttpResponse(format!(
1317 "Expected `200`, `202`, 4xx`, or `5xx` HTTP status code. Got: {status}",
1318 )))
1319 } else {
1320 Ok((status, body))
1321 }
1322 }
1323}
1324
1325fn principal_is_within_ranges(principal: &Principal, ranges: &[(Principal, Principal)]) -> bool {
1328 ranges
1329 .iter()
1330 .any(|r| principal >= &r.0 && principal <= &r.1)
1331}
1332
1333fn sign_envelope(
1334 content: &EnvelopeContent,
1335 identity: Arc<dyn Identity>,
1336) -> Result<Vec<u8>, AgentError> {
1337 let signature = identity.sign(content).map_err(AgentError::SigningError)?;
1338
1339 let envelope = Envelope {
1340 content: Cow::Borrowed(content),
1341 sender_pubkey: signature.public_key,
1342 sender_sig: signature.signature,
1343 sender_delegation: signature.delegations,
1344 };
1345
1346 let mut serialized_bytes = Vec::new();
1347 let mut serializer = serde_cbor::Serializer::new(&mut serialized_bytes);
1348 serializer.self_describe()?;
1349 envelope.serialize(&mut serializer)?;
1350
1351 Ok(serialized_bytes)
1352}
1353
1354pub fn signed_query_inspect(
1357 sender: Principal,
1358 canister_id: Principal,
1359 method_name: &str,
1360 arg: &[u8],
1361 ingress_expiry: u64,
1362 signed_query: Vec<u8>,
1363) -> Result<(), AgentError> {
1364 let envelope: Envelope =
1365 serde_cbor::from_slice(&signed_query).map_err(AgentError::InvalidCborData)?;
1366 match envelope.content.as_ref() {
1367 EnvelopeContent::Query {
1368 ingress_expiry: ingress_expiry_cbor,
1369 sender: sender_cbor,
1370 canister_id: canister_id_cbor,
1371 method_name: method_name_cbor,
1372 arg: arg_cbor,
1373 nonce: _nonce,
1374 } => {
1375 if ingress_expiry != *ingress_expiry_cbor {
1376 return Err(AgentError::CallDataMismatch {
1377 field: "ingress_expiry".to_string(),
1378 value_arg: ingress_expiry.to_string(),
1379 value_cbor: ingress_expiry_cbor.to_string(),
1380 });
1381 }
1382 if sender != *sender_cbor {
1383 return Err(AgentError::CallDataMismatch {
1384 field: "sender".to_string(),
1385 value_arg: sender.to_string(),
1386 value_cbor: sender_cbor.to_string(),
1387 });
1388 }
1389 if canister_id != *canister_id_cbor {
1390 return Err(AgentError::CallDataMismatch {
1391 field: "canister_id".to_string(),
1392 value_arg: canister_id.to_string(),
1393 value_cbor: canister_id_cbor.to_string(),
1394 });
1395 }
1396 if method_name != *method_name_cbor {
1397 return Err(AgentError::CallDataMismatch {
1398 field: "method_name".to_string(),
1399 value_arg: method_name.to_string(),
1400 value_cbor: method_name_cbor.clone(),
1401 });
1402 }
1403 if arg != *arg_cbor {
1404 return Err(AgentError::CallDataMismatch {
1405 field: "arg".to_string(),
1406 value_arg: format!("{arg:?}"),
1407 value_cbor: format!("{arg_cbor:?}"),
1408 });
1409 }
1410 }
1411 EnvelopeContent::Call { .. } => {
1412 return Err(AgentError::CallDataMismatch {
1413 field: "request_type".to_string(),
1414 value_arg: "query".to_string(),
1415 value_cbor: "call".to_string(),
1416 })
1417 }
1418 EnvelopeContent::ReadState { .. } => {
1419 return Err(AgentError::CallDataMismatch {
1420 field: "request_type".to_string(),
1421 value_arg: "query".to_string(),
1422 value_cbor: "read_state".to_string(),
1423 })
1424 }
1425 }
1426 Ok(())
1427}
1428
1429pub fn signed_update_inspect(
1432 sender: Principal,
1433 canister_id: Principal,
1434 method_name: &str,
1435 arg: &[u8],
1436 ingress_expiry: u64,
1437 signed_update: Vec<u8>,
1438) -> Result<(), AgentError> {
1439 let envelope: Envelope =
1440 serde_cbor::from_slice(&signed_update).map_err(AgentError::InvalidCborData)?;
1441 match envelope.content.as_ref() {
1442 EnvelopeContent::Call {
1443 nonce: _nonce,
1444 ingress_expiry: ingress_expiry_cbor,
1445 sender: sender_cbor,
1446 canister_id: canister_id_cbor,
1447 method_name: method_name_cbor,
1448 arg: arg_cbor,
1449 } => {
1450 if ingress_expiry != *ingress_expiry_cbor {
1451 return Err(AgentError::CallDataMismatch {
1452 field: "ingress_expiry".to_string(),
1453 value_arg: ingress_expiry.to_string(),
1454 value_cbor: ingress_expiry_cbor.to_string(),
1455 });
1456 }
1457 if sender != *sender_cbor {
1458 return Err(AgentError::CallDataMismatch {
1459 field: "sender".to_string(),
1460 value_arg: sender.to_string(),
1461 value_cbor: sender_cbor.to_string(),
1462 });
1463 }
1464 if canister_id != *canister_id_cbor {
1465 return Err(AgentError::CallDataMismatch {
1466 field: "canister_id".to_string(),
1467 value_arg: canister_id.to_string(),
1468 value_cbor: canister_id_cbor.to_string(),
1469 });
1470 }
1471 if method_name != *method_name_cbor {
1472 return Err(AgentError::CallDataMismatch {
1473 field: "method_name".to_string(),
1474 value_arg: method_name.to_string(),
1475 value_cbor: method_name_cbor.clone(),
1476 });
1477 }
1478 if arg != *arg_cbor {
1479 return Err(AgentError::CallDataMismatch {
1480 field: "arg".to_string(),
1481 value_arg: format!("{arg:?}"),
1482 value_cbor: format!("{arg_cbor:?}"),
1483 });
1484 }
1485 }
1486 EnvelopeContent::ReadState { .. } => {
1487 return Err(AgentError::CallDataMismatch {
1488 field: "request_type".to_string(),
1489 value_arg: "call".to_string(),
1490 value_cbor: "read_state".to_string(),
1491 })
1492 }
1493 EnvelopeContent::Query { .. } => {
1494 return Err(AgentError::CallDataMismatch {
1495 field: "request_type".to_string(),
1496 value_arg: "call".to_string(),
1497 value_cbor: "query".to_string(),
1498 })
1499 }
1500 }
1501 Ok(())
1502}
1503
1504pub fn signed_request_status_inspect(
1507 sender: Principal,
1508 request_id: &RequestId,
1509 ingress_expiry: u64,
1510 signed_request_status: Vec<u8>,
1511) -> Result<(), AgentError> {
1512 let paths: Vec<Vec<Label>> = vec![vec!["request_status".into(), request_id.to_vec().into()]];
1513 let envelope: Envelope =
1514 serde_cbor::from_slice(&signed_request_status).map_err(AgentError::InvalidCborData)?;
1515 match envelope.content.as_ref() {
1516 EnvelopeContent::ReadState {
1517 ingress_expiry: ingress_expiry_cbor,
1518 sender: sender_cbor,
1519 paths: paths_cbor,
1520 } => {
1521 if ingress_expiry != *ingress_expiry_cbor {
1522 return Err(AgentError::CallDataMismatch {
1523 field: "ingress_expiry".to_string(),
1524 value_arg: ingress_expiry.to_string(),
1525 value_cbor: ingress_expiry_cbor.to_string(),
1526 });
1527 }
1528 if sender != *sender_cbor {
1529 return Err(AgentError::CallDataMismatch {
1530 field: "sender".to_string(),
1531 value_arg: sender.to_string(),
1532 value_cbor: sender_cbor.to_string(),
1533 });
1534 }
1535
1536 if paths != *paths_cbor {
1537 return Err(AgentError::CallDataMismatch {
1538 field: "paths".to_string(),
1539 value_arg: format!("{paths:?}"),
1540 value_cbor: format!("{paths_cbor:?}"),
1541 });
1542 }
1543 }
1544 EnvelopeContent::Query { .. } => {
1545 return Err(AgentError::CallDataMismatch {
1546 field: "request_type".to_string(),
1547 value_arg: "read_state".to_string(),
1548 value_cbor: "query".to_string(),
1549 })
1550 }
1551 EnvelopeContent::Call { .. } => {
1552 return Err(AgentError::CallDataMismatch {
1553 field: "request_type".to_string(),
1554 value_arg: "read_state".to_string(),
1555 value_cbor: "call".to_string(),
1556 })
1557 }
1558 }
1559 Ok(())
1560}
1561
1562#[derive(Clone)]
1563struct SubnetCache {
1564 subnets: TimedCache<Principal, Arc<Subnet>>,
1565 canister_index: RangeInclusiveMap<Principal, Principal, PrincipalStep>,
1566}
1567
1568impl SubnetCache {
1569 fn new() -> Self {
1570 Self {
1571 subnets: TimedCache::with_lifespan(300),
1572 canister_index: RangeInclusiveMap::new_with_step_fns(),
1573 }
1574 }
1575
1576 fn get_subnet_by_canister(&mut self, canister: &Principal) -> Option<Arc<Subnet>> {
1577 self.canister_index
1578 .get(canister)
1579 .and_then(|subnet_id| self.subnets.cache_get(subnet_id).cloned())
1580 .filter(|subnet| subnet.canister_ranges.contains(canister))
1581 }
1582
1583 fn insert_subnet(&mut self, subnet_id: Principal, subnet: Arc<Subnet>) {
1584 self.subnets.cache_set(subnet_id, subnet.clone());
1585 for range in subnet.canister_ranges.iter() {
1586 self.canister_index.insert(range.clone(), subnet_id);
1587 }
1588 }
1589}
1590
1591#[derive(Clone, Copy)]
1592struct PrincipalStep;
1593
1594impl StepFns<Principal> for PrincipalStep {
1595 fn add_one(start: &Principal) -> Principal {
1596 let bytes = start.as_slice();
1597 let mut arr = [0; 29];
1598 arr[..bytes.len()].copy_from_slice(bytes);
1599 for byte in arr[..bytes.len() - 1].iter_mut().rev() {
1600 *byte = byte.wrapping_add(1);
1601 if *byte != 0 {
1602 break;
1603 }
1604 }
1605 Principal::from_slice(&arr[..bytes.len()])
1606 }
1607 fn sub_one(start: &Principal) -> Principal {
1608 let bytes = start.as_slice();
1609 let mut arr = [0; 29];
1610 arr[..bytes.len()].copy_from_slice(bytes);
1611 for byte in arr[..bytes.len() - 1].iter_mut().rev() {
1612 *byte = byte.wrapping_sub(1);
1613 if *byte != 255 {
1614 break;
1615 }
1616 }
1617 Principal::from_slice(&arr[..bytes.len()])
1618 }
1619}
1620
1621#[derive(Clone)]
1622pub(crate) struct Subnet {
1623 _key: Vec<u8>,
1626 node_keys: HashMap<Principal, Vec<u8>>,
1627 canister_ranges: RangeInclusiveSet<Principal, PrincipalStep>,
1628}
1629
1630#[derive(Debug, Clone)]
1632pub struct ApiBoundaryNode {
1633 pub domain: String,
1635 pub ipv6_address: String,
1637 pub ipv4_address: Option<String>,
1639}
1640
1641#[derive(Debug, Clone)]
1645#[non_exhaustive]
1646pub struct QueryBuilder<'agent> {
1647 agent: &'agent Agent,
1648 pub effective_canister_id: Principal,
1650 pub canister_id: Principal,
1652 pub method_name: String,
1654 pub arg: Vec<u8>,
1656 pub ingress_expiry_datetime: Option<u64>,
1658 pub use_nonce: bool,
1660}
1661
1662impl<'agent> QueryBuilder<'agent> {
1663 pub fn new(agent: &'agent Agent, canister_id: Principal, method_name: String) -> Self {
1665 Self {
1666 agent,
1667 effective_canister_id: canister_id,
1668 canister_id,
1669 method_name,
1670 arg: vec![],
1671 ingress_expiry_datetime: None,
1672 use_nonce: false,
1673 }
1674 }
1675
1676 pub fn with_effective_canister_id(mut self, canister_id: Principal) -> Self {
1678 self.effective_canister_id = canister_id;
1679 self
1680 }
1681
1682 pub fn with_arg<A: Into<Vec<u8>>>(mut self, arg: A) -> Self {
1684 self.arg = arg.into();
1685 self
1686 }
1687
1688 pub fn expire_at(mut self, time: impl Into<OffsetDateTime>) -> Self {
1690 self.ingress_expiry_datetime = Some(time.into().unix_timestamp_nanos() as u64);
1691 self
1692 }
1693
1694 pub fn expire_after(mut self, duration: Duration) -> Self {
1696 self.ingress_expiry_datetime = Some(
1697 OffsetDateTime::now_utc()
1698 .saturating_add(duration.try_into().expect("negative duration"))
1699 .unix_timestamp_nanos() as u64,
1700 );
1701 self
1702 }
1703
1704 pub fn with_nonce_generation(mut self) -> Self {
1707 self.use_nonce = true;
1708 self
1709 }
1710
1711 pub async fn call(self) -> Result<Vec<u8>, AgentError> {
1713 self.agent
1714 .query_raw(
1715 self.canister_id,
1716 self.effective_canister_id,
1717 self.method_name,
1718 self.arg,
1719 self.ingress_expiry_datetime,
1720 self.use_nonce,
1721 None,
1722 )
1723 .await
1724 }
1725
1726 pub async fn call_with_verification(self) -> Result<Vec<u8>, AgentError> {
1731 self.agent
1732 .query_raw(
1733 self.canister_id,
1734 self.effective_canister_id,
1735 self.method_name,
1736 self.arg,
1737 self.ingress_expiry_datetime,
1738 self.use_nonce,
1739 Some(true),
1740 )
1741 .await
1742 }
1743
1744 pub async fn call_without_verification(self) -> Result<Vec<u8>, AgentError> {
1749 self.agent
1750 .query_raw(
1751 self.canister_id,
1752 self.effective_canister_id,
1753 self.method_name,
1754 self.arg,
1755 self.ingress_expiry_datetime,
1756 self.use_nonce,
1757 Some(false),
1758 )
1759 .await
1760 }
1761
1762 pub fn sign(self) -> Result<SignedQuery, AgentError> {
1765 let effective_canister_id = self.effective_canister_id;
1766 let identity = self.agent.identity.clone();
1767 let content = self.into_envelope()?;
1768 let signed_query = sign_envelope(&content, identity)?;
1769 let EnvelopeContent::Query {
1770 ingress_expiry,
1771 sender,
1772 canister_id,
1773 method_name,
1774 arg,
1775 nonce,
1776 } = content
1777 else {
1778 unreachable!()
1779 };
1780 Ok(SignedQuery {
1781 ingress_expiry,
1782 sender,
1783 canister_id,
1784 method_name,
1785 arg,
1786 effective_canister_id,
1787 signed_query,
1788 nonce,
1789 })
1790 }
1791
1792 pub fn into_envelope(self) -> Result<EnvelopeContent, AgentError> {
1794 self.agent.query_content(
1795 self.canister_id,
1796 self.method_name,
1797 self.arg,
1798 self.ingress_expiry_datetime,
1799 self.use_nonce,
1800 )
1801 }
1802}
1803
1804impl<'agent> IntoFuture for QueryBuilder<'agent> {
1805 type IntoFuture = AgentFuture<'agent, Vec<u8>>;
1806 type Output = Result<Vec<u8>, AgentError>;
1807 fn into_future(self) -> Self::IntoFuture {
1808 Box::pin(self.call())
1809 }
1810}
1811
1812pub struct UpdateCall<'agent> {
1814 agent: &'agent Agent,
1815 response_future: AgentFuture<'agent, CallResponse<(Vec<u8>, Certificate)>>,
1816 effective_canister_id: Principal,
1817 canister_id: Principal,
1818 method_name: String,
1819}
1820
1821impl fmt::Debug for UpdateCall<'_> {
1822 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1823 f.debug_struct("UpdateCall")
1824 .field("agent", &self.agent)
1825 .field("effective_canister_id", &self.effective_canister_id)
1826 .finish_non_exhaustive()
1827 }
1828}
1829
1830impl Future for UpdateCall<'_> {
1831 type Output = Result<CallResponse<(Vec<u8>, Certificate)>, AgentError>;
1832 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1833 self.response_future.as_mut().poll(cx)
1834 }
1835}
1836
1837impl<'a> UpdateCall<'a> {
1838 pub async fn and_wait(self) -> Result<(Vec<u8>, Certificate), AgentError> {
1840 let response = self.response_future.await?;
1841
1842 match response {
1843 CallResponse::Response(response) => Ok(response),
1844 CallResponse::Poll(request_id) => {
1845 self.agent
1846 .wait_inner(
1847 &request_id,
1848 self.effective_canister_id,
1849 Some(Operation::Call {
1850 canister: self.canister_id,
1851 method: self.method_name,
1852 }),
1853 )
1854 .await
1855 }
1856 }
1857 }
1858}
1859#[derive(Debug)]
1864pub struct UpdateBuilder<'agent> {
1865 agent: &'agent Agent,
1866 pub effective_canister_id: Principal,
1868 pub canister_id: Principal,
1870 pub method_name: String,
1872 pub arg: Vec<u8>,
1874 pub ingress_expiry_datetime: Option<u64>,
1876}
1877
1878impl<'agent> UpdateBuilder<'agent> {
1879 pub fn new(agent: &'agent Agent, canister_id: Principal, method_name: String) -> Self {
1881 Self {
1882 agent,
1883 effective_canister_id: canister_id,
1884 canister_id,
1885 method_name,
1886 arg: vec![],
1887 ingress_expiry_datetime: None,
1888 }
1889 }
1890
1891 pub fn with_effective_canister_id(mut self, canister_id: Principal) -> Self {
1893 self.effective_canister_id = canister_id;
1894 self
1895 }
1896
1897 pub fn with_arg<A: Into<Vec<u8>>>(mut self, arg: A) -> Self {
1899 self.arg = arg.into();
1900 self
1901 }
1902
1903 pub fn expire_at(mut self, time: impl Into<OffsetDateTime>) -> Self {
1905 self.ingress_expiry_datetime = Some(time.into().unix_timestamp_nanos() as u64);
1906 self
1907 }
1908
1909 pub fn expire_after(mut self, duration: Duration) -> Self {
1911 self.ingress_expiry_datetime = Some(
1912 OffsetDateTime::now_utc()
1913 .saturating_add(duration.try_into().expect("negative duration"))
1914 .unix_timestamp_nanos() as u64,
1915 );
1916 self
1917 }
1918
1919 pub async fn call_and_wait(self) -> Result<Vec<u8>, AgentError> {
1922 self.call().and_wait().await.map(|x| x.0)
1923 }
1924
1925 pub fn call(self) -> UpdateCall<'agent> {
1928 let method_name = self.method_name.clone();
1929 let response_future = async move {
1930 self.agent
1931 .update_raw(
1932 self.canister_id,
1933 self.effective_canister_id,
1934 self.method_name,
1935 self.arg,
1936 self.ingress_expiry_datetime,
1937 )
1938 .await
1939 };
1940 UpdateCall {
1941 agent: self.agent,
1942 response_future: Box::pin(response_future),
1943 effective_canister_id: self.effective_canister_id,
1944 canister_id: self.canister_id,
1945 method_name,
1946 }
1947 }
1948
1949 pub fn sign(self) -> Result<SignedUpdate, AgentError> {
1952 let identity = self.agent.identity.clone();
1953 let effective_canister_id = self.effective_canister_id;
1954 let content = self.into_envelope()?;
1955 let signed_update = sign_envelope(&content, identity)?;
1956 let request_id = to_request_id(&content)?;
1957 let EnvelopeContent::Call {
1958 nonce,
1959 ingress_expiry,
1960 sender,
1961 canister_id,
1962 method_name,
1963 arg,
1964 } = content
1965 else {
1966 unreachable!()
1967 };
1968 Ok(SignedUpdate {
1969 nonce,
1970 ingress_expiry,
1971 sender,
1972 canister_id,
1973 method_name,
1974 arg,
1975 effective_canister_id,
1976 signed_update,
1977 request_id,
1978 })
1979 }
1980
1981 pub fn into_envelope(self) -> Result<EnvelopeContent, AgentError> {
1983 let nonce = self.agent.nonce_factory.generate();
1984 self.agent.update_content(
1985 self.canister_id,
1986 self.method_name,
1987 self.arg,
1988 self.ingress_expiry_datetime,
1989 nonce,
1990 )
1991 }
1992}
1993
1994impl<'agent> IntoFuture for UpdateBuilder<'agent> {
1995 type IntoFuture = AgentFuture<'agent, Vec<u8>>;
1996 type Output = Result<Vec<u8>, AgentError>;
1997 fn into_future(self) -> Self::IntoFuture {
1998 Box::pin(self.call_and_wait())
1999 }
2000}
2001
2002#[cfg_attr(target_family = "wasm", async_trait(?Send))]
2004#[cfg_attr(not(target_family = "wasm"), async_trait)]
2005pub trait HttpService: Send + Sync + Debug {
2006 async fn call<'a>(
2008 &'a self,
2009 req: &'a (dyn Fn() -> Result<http::Request<Bytes>, AgentError> + Send + Sync),
2010 max_retries: usize,
2011 size_limit: Option<usize>,
2012 ) -> Result<http::Response<Bytes>, AgentError>;
2013}
2014
2015fn from_http_request(req: http::Request<Bytes>) -> Result<Request, AgentError> {
2017 let (parts, body) = req.into_parts();
2018 let body = reqwest::Body::from(body);
2019 let request = http::Request::from_parts(parts, body)
2022 .try_into()
2023 .map_err(|e: reqwest::Error| AgentError::InvalidReplicaUrl(e.to_string()))?;
2024
2025 Ok(request)
2026}
2027
2028#[cfg(not(target_family = "wasm"))]
2030async fn to_http_response(
2031 resp: Response,
2032 size_limit: Option<usize>,
2033) -> Result<http::Response<Bytes>, AgentError> {
2034 use http_body_util::{BodyExt, Limited};
2035
2036 let resp: http::Response<reqwest::Body> = resp.into();
2037 let (parts, body) = resp.into_parts();
2038 let body = Limited::new(body, size_limit.unwrap_or(usize::MAX));
2039 let body = body
2040 .collect()
2041 .await
2042 .map_err(|e| {
2043 AgentError::TransportError(TransportError::Generic(format!(
2044 "unable to read response body: {e:#}"
2045 )))
2046 })?
2047 .to_bytes();
2048 let resp = http::Response::from_parts(parts, body);
2049
2050 Ok(resp)
2051}
2052
2053#[cfg(target_family = "wasm")]
2057async fn to_http_response(
2058 resp: Response,
2059 size_limit: Option<usize>,
2060) -> Result<http::Response<Bytes>, AgentError> {
2061 use futures_util::StreamExt;
2062 use http_body::Frame;
2063 use http_body_util::{Limited, StreamBody};
2064
2065 let status = resp.status();
2067 let headers = resp.headers().clone();
2068
2069 let stream = resp.bytes_stream().map(|x| x.map(Frame::data));
2071 let body = StreamBody::new(stream);
2072 let body = Limited::new(body, size_limit.unwrap_or(usize::MAX));
2073 let body = http_body_util::BodyExt::collect(body)
2074 .await
2075 .map_err(|e| {
2076 AgentError::TransportError(TransportError::Generic(format!(
2077 "unable to read response body: {e:#}"
2078 )))
2079 })?
2080 .to_bytes();
2081
2082 let mut resp = http::Response::new(body);
2083 *resp.status_mut() = status;
2084 *resp.headers_mut() = headers;
2085
2086 Ok(resp)
2087}
2088
2089#[cfg(not(target_family = "wasm"))]
2090#[async_trait]
2091impl<T> HttpService for T
2092where
2093 for<'a> &'a T: Service<Request, Response = Response, Error = reqwest::Error>,
2094 for<'a> <&'a Self as Service<Request>>::Future: Send,
2095 T: Send + Sync + Debug + ?Sized,
2096{
2097 #[allow(clippy::needless_arbitrary_self_type)]
2098 async fn call<'a>(
2099 mut self: &'a Self,
2100 req: &'a (dyn Fn() -> Result<http::Request<Bytes>, AgentError> + Send + Sync),
2101 max_retries: usize,
2102 size_limit: Option<usize>,
2103 ) -> Result<http::Response<Bytes>, AgentError> {
2104 let mut retry_count = 0;
2105 loop {
2106 let request = from_http_request(req()?)?;
2107
2108 match Service::call(&mut self, request).await {
2109 Err(err) => {
2110 if err.is_connect() {
2112 if retry_count >= max_retries {
2113 return Err(AgentError::TransportError(TransportError::Reqwest(err)));
2114 }
2115 retry_count += 1;
2116 }
2117 else {
2119 return Err(AgentError::TransportError(TransportError::Reqwest(err)));
2120 }
2121 }
2122
2123 Ok(resp) => {
2124 let resp = to_http_response(resp, size_limit).await?;
2125 return Ok(resp);
2126 }
2127 }
2128 }
2129 }
2130}
2131
2132#[cfg(target_family = "wasm")]
2133#[async_trait(?Send)]
2134impl<T> HttpService for T
2135where
2136 for<'a> &'a T: Service<Request, Response = Response, Error = reqwest::Error>,
2137 T: Send + Sync + Debug + ?Sized,
2138{
2139 #[allow(clippy::needless_arbitrary_self_type)]
2140 async fn call<'a>(
2141 mut self: &'a Self,
2142 req: &'a (dyn Fn() -> Result<http::Request<Bytes>, AgentError> + Send + Sync),
2143 _retries: usize,
2144 _size_limit: Option<usize>,
2145 ) -> Result<http::Response<Bytes>, AgentError> {
2146 let request = from_http_request(req()?)?;
2147 let response = Service::call(&mut self, request)
2148 .await
2149 .map_err(|e| AgentError::TransportError(TransportError::Reqwest(e)))?;
2150
2151 to_http_response(response, _size_limit).await
2152 }
2153}
2154
2155#[derive(Debug)]
2156struct Retry429Logic {
2157 client: Client,
2158}
2159
2160#[cfg_attr(target_family = "wasm", async_trait(?Send))]
2161#[cfg_attr(not(target_family = "wasm"), async_trait)]
2162impl HttpService for Retry429Logic {
2163 async fn call<'a>(
2164 &'a self,
2165 req: &'a (dyn Fn() -> Result<http::Request<Bytes>, AgentError> + Send + Sync),
2166 _max_tcp_retries: usize,
2167 _size_limit: Option<usize>,
2168 ) -> Result<http::Response<Bytes>, AgentError> {
2169 let mut retries = 0;
2170 loop {
2171 #[cfg(not(target_family = "wasm"))]
2172 let resp = self.client.call(req, _max_tcp_retries, _size_limit).await?;
2173 #[cfg(target_family = "wasm")]
2175 let resp = {
2176 let request = from_http_request(req()?)?;
2177 let resp = self
2178 .client
2179 .execute(request)
2180 .await
2181 .map_err(|e| AgentError::TransportError(TransportError::Reqwest(e)))?;
2182
2183 to_http_response(resp, _size_limit).await?
2184 };
2185
2186 if resp.status() == StatusCode::TOO_MANY_REQUESTS {
2187 if retries == 6 {
2188 break Ok(resp);
2189 } else {
2190 retries += 1;
2191 crate::util::sleep(Duration::from_millis(250)).await;
2192 continue;
2193 }
2194 } else {
2195 break Ok(resp);
2196 }
2197 }
2198 }
2199}
2200
2201#[cfg(all(test, not(target_family = "wasm")))]
2202mod offline_tests {
2203 use super::*;
2204 use tokio::net::TcpListener;
2205 #[test]
2208 fn rounded_expiry() {
2209 let agent = Agent::builder()
2210 .with_url("http://not-a-real-url")
2211 .build()
2212 .unwrap();
2213 let mut prev_expiry = None;
2214 let mut num_timestamps = 0;
2215 for _ in 0..6 {
2216 let update = agent
2217 .update(&Principal::management_canister(), "not_a_method")
2218 .sign()
2219 .unwrap();
2220 if prev_expiry < Some(update.ingress_expiry) {
2221 prev_expiry = Some(update.ingress_expiry);
2222 num_timestamps += 1;
2223 }
2224 }
2225 assert!(num_timestamps <= 2, "num_timestamps:{num_timestamps} > 2");
2227 }
2228
2229 #[tokio::test]
2230 async fn client_ratelimit() {
2231 let mock_server = TcpListener::bind("127.0.0.1:0").await.unwrap();
2232 let count = Arc::new(Mutex::new(0));
2233 let port = mock_server.local_addr().unwrap().port();
2234 tokio::spawn({
2235 let count = count.clone();
2236 async move {
2237 loop {
2238 let (mut conn, _) = mock_server.accept().await.unwrap();
2239 *count.lock().unwrap() += 1;
2240 tokio::spawn(
2241 async move { tokio::io::copy(&mut conn, &mut tokio::io::sink()).await },
2243 );
2244 }
2245 }
2246 });
2247 let agent = Agent::builder()
2248 .with_http_client(Client::builder().http1_only().build().unwrap())
2249 .with_url(format!("http://127.0.0.1:{port}"))
2250 .with_max_concurrent_requests(2)
2251 .build()
2252 .unwrap();
2253 for _ in 0..3 {
2254 let agent = agent.clone();
2255 tokio::spawn(async move {
2256 agent
2257 .query(&"ryjl3-tyaaa-aaaaa-aaaba-cai".parse().unwrap(), "greet")
2258 .call()
2259 .await
2260 });
2261 }
2262 crate::util::sleep(Duration::from_millis(250)).await;
2263 assert_eq!(*count.lock().unwrap(), 2);
2264 }
2265}