1pub(crate) mod agent_config;
3pub mod agent_error;
4pub(crate) mod builder;
5#[doc(hidden)]
7#[deprecated(since = "0.38.0", note = "use the AgentBuilder methods")]
8pub mod http_transport;
9pub(crate) mod nonce;
10pub(crate) mod response_authentication;
11pub mod route_provider;
12pub mod status;
13
14pub use agent_config::AgentConfig;
15pub use agent_error::AgentError;
16use agent_error::{HttpErrorPayload, Operation};
17use async_lock::Semaphore;
18use async_trait::async_trait;
19pub use builder::AgentBuilder;
20use bytes::Bytes;
21use cached::{Cached, TimedCache};
22use http::{header::CONTENT_TYPE, HeaderMap, Method, StatusCode, Uri};
23use ic_ed25519::{PublicKey, SignatureError};
24#[doc(inline)]
25pub use ic_transport_types::{
26 signed, CallResponse, Envelope, EnvelopeContent, RejectCode, RejectResponse, ReplyResponse,
27 RequestStatusResponse,
28};
29pub use nonce::{NonceFactory, NonceGenerator};
30use rangemap::{RangeInclusiveMap, RangeInclusiveSet, StepFns};
31use reqwest::{Client, Request, Response};
32use route_provider::{
33 dynamic_routing::{
34 dynamic_route_provider::DynamicRouteProviderBuilder, node::Node,
35 snapshot::latency_based_routing::LatencyRoutingSnapshot,
36 },
37 RouteProvider, UrlUntilReady,
38};
39use time::OffsetDateTime;
40use tower_service::Service;
41
42#[cfg(test)]
43mod agent_test;
44
45use crate::{
46 agent::response_authentication::{
47 extract_der, lookup_canister_info, lookup_canister_metadata, lookup_request_status,
48 lookup_subnet, lookup_subnet_canister_ranges, lookup_subnet_metrics, lookup_time,
49 lookup_value,
50 },
51 agent_error::TransportError,
52 export::Principal,
53 identity::Identity,
54 to_request_id, RequestId,
55};
56use backoff::{backoff::Backoff, ExponentialBackoffBuilder};
57use backoff::{exponential::ExponentialBackoff, SystemClock};
58use ic_certification::{Certificate, Delegation, Label};
59use ic_transport_types::{
60 signed::{SignedQuery, SignedRequestStatus, SignedUpdate},
61 QueryResponse, ReadStateResponse, SubnetMetrics, TransportCallResponse,
62};
63use serde::Serialize;
64use status::Status;
65use std::{
66 borrow::Cow,
67 collections::HashMap,
68 convert::TryFrom,
69 fmt::{self, Debug},
70 future::{Future, IntoFuture},
71 pin::Pin,
72 str::FromStr,
73 sync::{Arc, Mutex, RwLock},
74 task::{Context, Poll},
75 time::Duration,
76};
77
78use crate::agent::response_authentication::lookup_api_boundary_nodes;
79
80const IC_STATE_ROOT_DOMAIN_SEPARATOR: &[u8; 14] = b"\x0Dic-state-root";
81
82const IC_ROOT_KEY: &[u8; 133] = b"\x30\x81\x82\x30\x1d\x06\x0d\x2b\x06\x01\x04\x01\x82\xdc\x7c\x05\x03\x01\x02\x01\x06\x0c\x2b\x06\x01\x04\x01\x82\xdc\x7c\x05\x03\x02\x01\x03\x61\x00\x81\x4c\x0e\x6e\xc7\x1f\xab\x58\x3b\x08\xbd\x81\x37\x3c\x25\x5c\x3c\x37\x1b\x2e\x84\x86\x3c\x98\xa4\xf1\xe0\x8b\x74\x23\x5d\x14\xfb\x5d\x9c\x0c\xd5\x46\xd9\x68\x5f\x91\x3a\x0c\x0b\x2c\xc5\x34\x15\x83\xbf\x4b\x43\x92\xe4\x67\xdb\x96\xd6\x5b\x9b\xb4\xcb\x71\x71\x12\xf8\x47\x2e\x0d\x5a\x4d\x14\x50\x5f\xfd\x74\x84\xb0\x12\x91\x09\x1c\x5f\x87\xb9\x88\x83\x46\x3f\x98\x09\x1a\x0b\xaa\xae";
83
84#[cfg(not(target_family = "wasm"))]
85type AgentFuture<'a, V> = Pin<Box<dyn Future<Output = Result<V, AgentError>> + Send + 'a>>;
86
87#[cfg(target_family = "wasm")]
88type AgentFuture<'a, V> = Pin<Box<dyn Future<Output = Result<V, AgentError>> + 'a>>;
89
90#[derive(Clone)]
154pub struct Agent {
155 nonce_factory: Arc<dyn NonceGenerator>,
156 identity: Arc<dyn Identity>,
157 ingress_expiry: Duration,
158 root_key: Arc<RwLock<Vec<u8>>>,
159 client: Arc<dyn HttpService>,
160 route_provider: Arc<dyn RouteProvider>,
161 subnet_key_cache: Arc<Mutex<SubnetCache>>,
162 concurrent_requests_semaphore: Arc<Semaphore>,
163 verify_query_signatures: bool,
164 max_response_body_size: Option<usize>,
165 max_polling_time: Duration,
166 #[allow(dead_code)]
167 max_tcp_error_retries: usize,
168}
169
170impl fmt::Debug for Agent {
171 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
172 f.debug_struct("Agent")
173 .field("ingress_expiry", &self.ingress_expiry)
174 .finish_non_exhaustive()
175 }
176}
177
178impl Agent {
179 pub fn builder() -> builder::AgentBuilder {
182 Default::default()
183 }
184
185 pub fn new(config: agent_config::AgentConfig) -> Result<Agent, AgentError> {
187 let client = config.http_service.unwrap_or_else(|| {
188 Arc::new(Retry429Logic {
189 client: config.client.unwrap_or_else(|| {
190 #[cfg(not(target_family = "wasm"))]
191 {
192 Client::builder()
193 .use_rustls_tls()
194 .timeout(Duration::from_secs(360))
195 .build()
196 .expect("Could not create HTTP client.")
197 }
198 #[cfg(all(target_family = "wasm", feature = "wasm-bindgen"))]
199 {
200 Client::new()
201 }
202 }),
203 })
204 });
205 Ok(Agent {
206 nonce_factory: config.nonce_factory,
207 identity: config.identity,
208 ingress_expiry: config.ingress_expiry,
209 root_key: Arc::new(RwLock::new(IC_ROOT_KEY.to_vec())),
210 client: client.clone(),
211 route_provider: if let Some(route_provider) = config.route_provider {
212 route_provider
213 } else if let Some(url) = config.url {
214 if config.background_dynamic_routing {
215 assert!(
216 url.scheme() == "https" && url.path() == "/" && url.port().is_none() && url.domain().is_some(),
217 "in dynamic routing mode, URL must be in the exact form https://domain with no path, port, IP, or non-HTTPS scheme"
218 );
219 let seeds = vec![Node::new(url.domain().unwrap()).unwrap()];
220 UrlUntilReady::new(url, async move {
221 DynamicRouteProviderBuilder::new(
222 LatencyRoutingSnapshot::new(),
223 seeds,
224 client,
225 )
226 .build()
227 .await
228 }) as Arc<dyn RouteProvider>
229 } else {
230 Arc::new(url)
231 }
232 } else {
233 panic!("either route_provider or url must be specified");
234 },
235 subnet_key_cache: Arc::new(Mutex::new(SubnetCache::new())),
236 verify_query_signatures: config.verify_query_signatures,
237 concurrent_requests_semaphore: Arc::new(Semaphore::new(config.max_concurrent_requests)),
238 max_response_body_size: config.max_response_body_size,
239 max_tcp_error_retries: config.max_tcp_error_retries,
240 max_polling_time: config.max_polling_time,
241 })
242 }
243
244 pub fn set_identity<I>(&mut self, identity: I)
250 where
251 I: 'static + Identity,
252 {
253 self.identity = Arc::new(identity);
254 }
255 pub fn set_arc_identity(&mut self, identity: Arc<dyn Identity>) {
261 self.identity = identity;
262 }
263
264 pub async fn fetch_root_key(&self) -> Result<(), AgentError> {
273 if self.read_root_key()[..] != IC_ROOT_KEY[..] {
274 return Ok(());
276 }
277 let status = self.status().await?;
278 let Some(root_key) = status.root_key else {
279 return Err(AgentError::NoRootKeyInStatus(status));
280 };
281 self.set_root_key(root_key);
282 Ok(())
283 }
284
285 pub fn set_root_key(&self, root_key: Vec<u8>) {
290 *self.root_key.write().unwrap() = root_key;
291 }
292
293 pub fn read_root_key(&self) -> Vec<u8> {
295 self.root_key.read().unwrap().clone()
296 }
297
298 fn get_expiry_date(&self) -> u64 {
299 let expiry_raw = OffsetDateTime::now_utc() + self.ingress_expiry;
300 let mut rounded = expiry_raw.replace_nanosecond(0).unwrap();
301 if self.ingress_expiry.as_secs() > 90 {
302 rounded = rounded.replace_second(0).unwrap();
303 }
304 rounded.unix_timestamp_nanos().try_into().unwrap()
305 }
306
307 pub fn get_principal(&self) -> Result<Principal, String> {
309 self.identity.sender()
310 }
311
312 async fn query_endpoint<A>(
313 &self,
314 effective_canister_id: Principal,
315 serialized_bytes: Vec<u8>,
316 ) -> Result<A, AgentError>
317 where
318 A: serde::de::DeserializeOwned,
319 {
320 let _permit = self.concurrent_requests_semaphore.acquire().await;
321 let bytes = self
322 .execute(
323 Method::POST,
324 &format!("api/v2/canister/{}/query", effective_canister_id.to_text()),
325 Some(serialized_bytes),
326 )
327 .await?
328 .1;
329 serde_cbor::from_slice(&bytes).map_err(AgentError::InvalidCborData)
330 }
331
332 async fn read_state_endpoint<A>(
333 &self,
334 effective_canister_id: Principal,
335 serialized_bytes: Vec<u8>,
336 ) -> Result<A, AgentError>
337 where
338 A: serde::de::DeserializeOwned,
339 {
340 let _permit = self.concurrent_requests_semaphore.acquire().await;
341 let endpoint = format!(
342 "api/v2/canister/{}/read_state",
343 effective_canister_id.to_text()
344 );
345 let bytes = self
346 .execute(Method::POST, &endpoint, Some(serialized_bytes))
347 .await?
348 .1;
349 serde_cbor::from_slice(&bytes).map_err(AgentError::InvalidCborData)
350 }
351
352 async fn read_subnet_state_endpoint<A>(
353 &self,
354 subnet_id: Principal,
355 serialized_bytes: Vec<u8>,
356 ) -> Result<A, AgentError>
357 where
358 A: serde::de::DeserializeOwned,
359 {
360 let _permit = self.concurrent_requests_semaphore.acquire().await;
361 let endpoint = format!("api/v2/subnet/{}/read_state", subnet_id.to_text());
362 let bytes = self
363 .execute(Method::POST, &endpoint, Some(serialized_bytes))
364 .await?
365 .1;
366 serde_cbor::from_slice(&bytes).map_err(AgentError::InvalidCborData)
367 }
368
369 async fn call_endpoint(
370 &self,
371 effective_canister_id: Principal,
372 serialized_bytes: Vec<u8>,
373 ) -> Result<TransportCallResponse, AgentError> {
374 let _permit = self.concurrent_requests_semaphore.acquire().await;
375 let endpoint = format!("api/v3/canister/{}/call", effective_canister_id.to_text());
376 let (status_code, response_body) = self
377 .execute(Method::POST, &endpoint, Some(serialized_bytes))
378 .await?;
379
380 if status_code == StatusCode::ACCEPTED {
381 return Ok(TransportCallResponse::Accepted);
382 }
383
384 serde_cbor::from_slice(&response_body).map_err(AgentError::InvalidCborData)
385 }
386
387 #[allow(clippy::too_many_arguments)]
390 async fn query_raw(
391 &self,
392 canister_id: Principal,
393 effective_canister_id: Principal,
394 method_name: String,
395 arg: Vec<u8>,
396 ingress_expiry_datetime: Option<u64>,
397 use_nonce: bool,
398 explicit_verify_query_signatures: Option<bool>,
399 ) -> Result<Vec<u8>, AgentError> {
400 let operation = Operation::Call {
401 canister: canister_id,
402 method: method_name.clone(),
403 };
404 let content = self.query_content(
405 canister_id,
406 method_name,
407 arg,
408 ingress_expiry_datetime,
409 use_nonce,
410 )?;
411 let serialized_bytes = sign_envelope(&content, self.identity.clone())?;
412 self.query_inner(
413 effective_canister_id,
414 serialized_bytes,
415 content.to_request_id(),
416 explicit_verify_query_signatures,
417 operation,
418 )
419 .await
420 }
421
422 pub async fn query_signed(
426 &self,
427 effective_canister_id: Principal,
428 signed_query: Vec<u8>,
429 ) -> Result<Vec<u8>, AgentError> {
430 let envelope: Envelope =
431 serde_cbor::from_slice(&signed_query).map_err(AgentError::InvalidCborData)?;
432 let EnvelopeContent::Query {
433 canister_id,
434 method_name,
435 ..
436 } = &*envelope.content
437 else {
438 return Err(AgentError::CallDataMismatch {
439 field: "request_type".to_string(),
440 value_arg: "query".to_string(),
441 value_cbor: if matches!(*envelope.content, EnvelopeContent::Call { .. }) {
442 "update"
443 } else {
444 "read_state"
445 }
446 .to_string(),
447 });
448 };
449 let operation = Operation::Call {
450 canister: *canister_id,
451 method: method_name.clone(),
452 };
453 self.query_inner(
454 effective_canister_id,
455 signed_query,
456 envelope.content.to_request_id(),
457 None,
458 operation,
459 )
460 .await
461 }
462
463 async fn query_inner(
467 &self,
468 effective_canister_id: Principal,
469 signed_query: Vec<u8>,
470 request_id: RequestId,
471 explicit_verify_query_signatures: Option<bool>,
472 operation: Operation,
473 ) -> Result<Vec<u8>, AgentError> {
474 let response = if explicit_verify_query_signatures.unwrap_or(self.verify_query_signatures) {
475 let (response, mut subnet) = futures_util::try_join!(
476 self.query_endpoint::<QueryResponse>(effective_canister_id, signed_query),
477 self.get_subnet_by_canister(&effective_canister_id)
478 )?;
479 if response.signatures().is_empty() {
480 return Err(AgentError::MissingSignature);
481 } else if response.signatures().len() > subnet.node_keys.len() {
482 return Err(AgentError::TooManySignatures {
483 had: response.signatures().len(),
484 needed: subnet.node_keys.len(),
485 });
486 }
487 for signature in response.signatures() {
488 if OffsetDateTime::now_utc()
489 - OffsetDateTime::from_unix_timestamp_nanos(signature.timestamp.into()).unwrap()
490 > self.ingress_expiry
491 {
492 return Err(AgentError::CertificateOutdated(self.ingress_expiry));
493 }
494 let signable = response.signable(request_id, signature.timestamp);
495 let node_key = if let Some(node_key) = subnet.node_keys.get(&signature.identity) {
496 node_key
497 } else {
498 subnet = self
499 .fetch_subnet_by_canister(&effective_canister_id)
500 .await?;
501 subnet
502 .node_keys
503 .get(&signature.identity)
504 .ok_or(AgentError::CertificateNotAuthorized())?
505 };
506 if node_key.len() != 44 {
507 return Err(AgentError::DerKeyLengthMismatch {
508 expected: 44,
509 actual: node_key.len(),
510 });
511 }
512 const DER_PREFIX: [u8; 12] = [48, 42, 48, 5, 6, 3, 43, 101, 112, 3, 33, 0];
513 if node_key[..12] != DER_PREFIX {
514 return Err(AgentError::DerPrefixMismatch {
515 expected: DER_PREFIX.to_vec(),
516 actual: node_key[..12].to_vec(),
517 });
518 }
519 let pubkey = PublicKey::deserialize_raw(&node_key[12..])
520 .map_err(|_| AgentError::MalformedPublicKey)?;
521
522 match pubkey.verify_signature(&signable, &signature.signature[..]) {
523 Ok(()) => (),
524 Err(SignatureError::InvalidSignature) => {
525 return Err(AgentError::QuerySignatureVerificationFailed)
526 }
527 Err(SignatureError::InvalidLength) => {
528 return Err(AgentError::MalformedSignature)
529 }
530 _ => unreachable!(),
531 }
532 }
533 response
534 } else {
535 self.query_endpoint::<QueryResponse>(effective_canister_id, signed_query)
536 .await?
537 };
538
539 match response {
540 QueryResponse::Replied { reply, .. } => Ok(reply.arg),
541 QueryResponse::Rejected { reject, .. } => Err(AgentError::UncertifiedReject {
542 reject,
543 operation: Some(operation),
544 }),
545 }
546 }
547
548 fn query_content(
549 &self,
550 canister_id: Principal,
551 method_name: String,
552 arg: Vec<u8>,
553 ingress_expiry_datetime: Option<u64>,
554 use_nonce: bool,
555 ) -> Result<EnvelopeContent, AgentError> {
556 Ok(EnvelopeContent::Query {
557 sender: self.identity.sender().map_err(AgentError::SigningError)?,
558 canister_id,
559 method_name,
560 arg,
561 ingress_expiry: ingress_expiry_datetime.unwrap_or_else(|| self.get_expiry_date()),
562 nonce: use_nonce.then(|| self.nonce_factory.generate()).flatten(),
563 })
564 }
565
566 async fn update_raw(
568 &self,
569 canister_id: Principal,
570 effective_canister_id: Principal,
571 method_name: String,
572 arg: Vec<u8>,
573 ingress_expiry_datetime: Option<u64>,
574 ) -> Result<CallResponse<(Vec<u8>, Certificate)>, AgentError> {
575 let nonce = self.nonce_factory.generate();
576 let content = self.update_content(
577 canister_id,
578 method_name.clone(),
579 arg,
580 ingress_expiry_datetime,
581 nonce,
582 )?;
583 let operation = Some(Operation::Call {
584 canister: canister_id,
585 method: method_name,
586 });
587 let request_id = to_request_id(&content)?;
588 let serialized_bytes = sign_envelope(&content, self.identity.clone())?;
589
590 let response_body = self
591 .call_endpoint(effective_canister_id, serialized_bytes)
592 .await?;
593
594 match response_body {
595 TransportCallResponse::Replied { certificate } => {
596 let certificate =
597 serde_cbor::from_slice(&certificate).map_err(AgentError::InvalidCborData)?;
598
599 self.verify(&certificate, effective_canister_id)?;
600 let status = lookup_request_status(&certificate, &request_id)?;
601
602 match status {
603 RequestStatusResponse::Replied(reply) => {
604 Ok(CallResponse::Response((reply.arg, certificate)))
605 }
606 RequestStatusResponse::Rejected(reject_response) => {
607 Err(AgentError::CertifiedReject {
608 reject: reject_response,
609 operation,
610 })?
611 }
612 _ => Ok(CallResponse::Poll(request_id)),
613 }
614 }
615 TransportCallResponse::Accepted => Ok(CallResponse::Poll(request_id)),
616 TransportCallResponse::NonReplicatedRejection(reject_response) => {
617 Err(AgentError::UncertifiedReject {
618 reject: reject_response,
619 operation,
620 })
621 }
622 }
623 }
624
625 pub async fn update_signed(
629 &self,
630 effective_canister_id: Principal,
631 signed_update: Vec<u8>,
632 ) -> Result<CallResponse<Vec<u8>>, AgentError> {
633 let envelope: Envelope =
634 serde_cbor::from_slice(&signed_update).map_err(AgentError::InvalidCborData)?;
635 let EnvelopeContent::Call {
636 canister_id,
637 method_name,
638 ..
639 } = &*envelope.content
640 else {
641 return Err(AgentError::CallDataMismatch {
642 field: "request_type".to_string(),
643 value_arg: "update".to_string(),
644 value_cbor: if matches!(*envelope.content, EnvelopeContent::Query { .. }) {
645 "query"
646 } else {
647 "read_state"
648 }
649 .to_string(),
650 });
651 };
652 let operation = Some(Operation::Call {
653 canister: *canister_id,
654 method: method_name.clone(),
655 });
656 let request_id = to_request_id(&envelope.content)?;
657
658 let response_body = self
659 .call_endpoint(effective_canister_id, signed_update)
660 .await?;
661
662 match response_body {
663 TransportCallResponse::Replied { certificate } => {
664 let certificate =
665 serde_cbor::from_slice(&certificate).map_err(AgentError::InvalidCborData)?;
666
667 self.verify(&certificate, effective_canister_id)?;
668 let status = lookup_request_status(&certificate, &request_id)?;
669
670 match status {
671 RequestStatusResponse::Replied(reply) => Ok(CallResponse::Response(reply.arg)),
672 RequestStatusResponse::Rejected(reject_response) => {
673 Err(AgentError::CertifiedReject {
674 reject: reject_response,
675 operation,
676 })?
677 }
678 _ => Ok(CallResponse::Poll(request_id)),
679 }
680 }
681 TransportCallResponse::Accepted => Ok(CallResponse::Poll(request_id)),
682 TransportCallResponse::NonReplicatedRejection(reject_response) => {
683 Err(AgentError::UncertifiedReject {
684 reject: reject_response,
685 operation,
686 })
687 }
688 }
689 }
690
691 fn update_content(
692 &self,
693 canister_id: Principal,
694 method_name: String,
695 arg: Vec<u8>,
696 ingress_expiry_datetime: Option<u64>,
697 nonce: Option<Vec<u8>>,
698 ) -> Result<EnvelopeContent, AgentError> {
699 Ok(EnvelopeContent::Call {
700 canister_id,
701 method_name,
702 arg,
703 nonce,
704 sender: self.identity.sender().map_err(AgentError::SigningError)?,
705 ingress_expiry: ingress_expiry_datetime.unwrap_or_else(|| self.get_expiry_date()),
706 })
707 }
708
709 fn get_retry_policy(&self) -> ExponentialBackoff<SystemClock> {
710 ExponentialBackoffBuilder::new()
711 .with_initial_interval(Duration::from_millis(500))
712 .with_max_interval(Duration::from_secs(1))
713 .with_multiplier(1.4)
714 .with_max_elapsed_time(Some(self.max_polling_time))
715 .build()
716 }
717
718 pub async fn wait_signed(
720 &self,
721 request_id: &RequestId,
722 effective_canister_id: Principal,
723 signed_request_status: Vec<u8>,
724 ) -> Result<(Vec<u8>, Certificate), AgentError> {
725 let mut retry_policy = self.get_retry_policy();
726
727 let mut request_accepted = false;
728 let (resp, cert) = self
729 .request_status_signed(
730 request_id,
731 effective_canister_id,
732 signed_request_status.clone(),
733 )
734 .await?;
735 loop {
736 match resp {
737 RequestStatusResponse::Unknown => {}
738
739 RequestStatusResponse::Received | RequestStatusResponse::Processing => {
740 if !request_accepted {
741 retry_policy.reset();
742 request_accepted = true;
743 }
744 }
745
746 RequestStatusResponse::Replied(ReplyResponse { arg, .. }) => {
747 return Ok((arg, cert))
748 }
749
750 RequestStatusResponse::Rejected(response) => {
751 return Err(AgentError::CertifiedReject {
752 reject: response,
753 operation: None,
754 })
755 }
756
757 RequestStatusResponse::Done => {
758 return Err(AgentError::RequestStatusDoneNoReply(String::from(
759 *request_id,
760 )))
761 }
762 };
763
764 match retry_policy.next_backoff() {
765 Some(duration) => crate::util::sleep(duration).await,
766
767 None => return Err(AgentError::TimeoutWaitingForResponse()),
768 }
769 }
770 }
771
772 pub async fn wait(
774 &self,
775 request_id: &RequestId,
776 effective_canister_id: Principal,
777 ) -> Result<(Vec<u8>, Certificate), AgentError> {
778 self.wait_inner(request_id, effective_canister_id, None)
779 .await
780 }
781
782 async fn wait_inner(
783 &self,
784 request_id: &RequestId,
785 effective_canister_id: Principal,
786 operation: Option<Operation>,
787 ) -> Result<(Vec<u8>, Certificate), AgentError> {
788 let mut retry_policy = self.get_retry_policy();
789
790 let mut request_accepted = false;
791 loop {
792 let (resp, cert) = self
793 .request_status_raw(request_id, effective_canister_id)
794 .await?;
795 match resp {
796 RequestStatusResponse::Unknown => {}
797
798 RequestStatusResponse::Received | RequestStatusResponse::Processing => {
799 if !request_accepted {
800 retry_policy.reset();
808 request_accepted = true;
809 }
810 }
811
812 RequestStatusResponse::Replied(ReplyResponse { arg, .. }) => {
813 return Ok((arg, cert))
814 }
815
816 RequestStatusResponse::Rejected(response) => {
817 return Err(AgentError::CertifiedReject {
818 reject: response,
819 operation,
820 })
821 }
822
823 RequestStatusResponse::Done => {
824 return Err(AgentError::RequestStatusDoneNoReply(String::from(
825 *request_id,
826 )))
827 }
828 };
829
830 match retry_policy.next_backoff() {
831 Some(duration) => crate::util::sleep(duration).await,
832
833 None => return Err(AgentError::TimeoutWaitingForResponse()),
834 }
835 }
836 }
837
838 pub async fn read_state_raw(
841 &self,
842 paths: Vec<Vec<Label>>,
843 effective_canister_id: Principal,
844 ) -> Result<Certificate, AgentError> {
845 let content = self.read_state_content(paths)?;
846 let serialized_bytes = sign_envelope(&content, self.identity.clone())?;
847
848 let read_state_response: ReadStateResponse = self
849 .read_state_endpoint(effective_canister_id, serialized_bytes)
850 .await?;
851 let cert: Certificate = serde_cbor::from_slice(&read_state_response.certificate)
852 .map_err(AgentError::InvalidCborData)?;
853 self.verify(&cert, effective_canister_id)?;
854 Ok(cert)
855 }
856
857 pub async fn read_subnet_state_raw(
860 &self,
861 paths: Vec<Vec<Label>>,
862 subnet_id: Principal,
863 ) -> Result<Certificate, AgentError> {
864 let content = self.read_state_content(paths)?;
865 let serialized_bytes = sign_envelope(&content, self.identity.clone())?;
866
867 let read_state_response: ReadStateResponse = self
868 .read_subnet_state_endpoint(subnet_id, serialized_bytes)
869 .await?;
870 let cert: Certificate = serde_cbor::from_slice(&read_state_response.certificate)
871 .map_err(AgentError::InvalidCborData)?;
872 self.verify_for_subnet(&cert, subnet_id)?;
873 Ok(cert)
874 }
875
876 fn read_state_content(&self, paths: Vec<Vec<Label>>) -> Result<EnvelopeContent, AgentError> {
877 Ok(EnvelopeContent::ReadState {
878 sender: self.identity.sender().map_err(AgentError::SigningError)?,
879 paths,
880 ingress_expiry: self.get_expiry_date(),
881 })
882 }
883
884 pub fn verify(
887 &self,
888 cert: &Certificate,
889 effective_canister_id: Principal,
890 ) -> Result<(), AgentError> {
891 self.verify_cert(cert, effective_canister_id)?;
892 self.verify_cert_timestamp(cert)?;
893 Ok(())
894 }
895
896 fn verify_cert(
897 &self,
898 cert: &Certificate,
899 effective_canister_id: Principal,
900 ) -> Result<(), AgentError> {
901 let sig = &cert.signature;
902
903 let root_hash = cert.tree.digest();
904 let mut msg = vec![];
905 msg.extend_from_slice(IC_STATE_ROOT_DOMAIN_SEPARATOR);
906 msg.extend_from_slice(&root_hash);
907
908 let der_key = self.check_delegation(&cert.delegation, effective_canister_id)?;
909 let key = extract_der(der_key)?;
910
911 ic_verify_bls_signature::verify_bls_signature(sig, &msg, &key)
912 .map_err(|_| AgentError::CertificateVerificationFailed())?;
913 Ok(())
914 }
915
916 pub fn verify_for_subnet(
919 &self,
920 cert: &Certificate,
921 subnet_id: Principal,
922 ) -> Result<(), AgentError> {
923 self.verify_cert_for_subnet(cert, subnet_id)?;
924 self.verify_cert_timestamp(cert)?;
925 Ok(())
926 }
927
928 fn verify_cert_for_subnet(
929 &self,
930 cert: &Certificate,
931 subnet_id: Principal,
932 ) -> Result<(), AgentError> {
933 let sig = &cert.signature;
934
935 let root_hash = cert.tree.digest();
936 let mut msg = vec![];
937 msg.extend_from_slice(IC_STATE_ROOT_DOMAIN_SEPARATOR);
938 msg.extend_from_slice(&root_hash);
939
940 let der_key = self.check_delegation_for_subnet(&cert.delegation, subnet_id)?;
941 let key = extract_der(der_key)?;
942
943 ic_verify_bls_signature::verify_bls_signature(sig, &msg, &key)
944 .map_err(|_| AgentError::CertificateVerificationFailed())?;
945 Ok(())
946 }
947
948 fn verify_cert_timestamp(&self, cert: &Certificate) -> Result<(), AgentError> {
949 let time = lookup_time(cert)?;
950 if (OffsetDateTime::now_utc()
951 - OffsetDateTime::from_unix_timestamp_nanos(time.into()).unwrap())
952 .abs()
953 > self.ingress_expiry
954 {
955 Err(AgentError::CertificateOutdated(self.ingress_expiry))
956 } else {
957 Ok(())
958 }
959 }
960
961 fn check_delegation(
962 &self,
963 delegation: &Option<Delegation>,
964 effective_canister_id: Principal,
965 ) -> Result<Vec<u8>, AgentError> {
966 match delegation {
967 None => Ok(self.read_root_key()),
968 Some(delegation) => {
969 let cert: Certificate = serde_cbor::from_slice(&delegation.certificate)
970 .map_err(AgentError::InvalidCborData)?;
971 if cert.delegation.is_some() {
972 return Err(AgentError::CertificateHasTooManyDelegations);
973 }
974 self.verify_cert(&cert, effective_canister_id)?;
975 let canister_range_lookup = [
976 "subnet".as_bytes(),
977 delegation.subnet_id.as_ref(),
978 "canister_ranges".as_bytes(),
979 ];
980 let canister_range = lookup_value(&cert.tree, canister_range_lookup)?;
981 let ranges: Vec<(Principal, Principal)> =
982 serde_cbor::from_slice(canister_range).map_err(AgentError::InvalidCborData)?;
983 if !principal_is_within_ranges(&effective_canister_id, &ranges[..]) {
984 return Err(AgentError::CertificateNotAuthorized());
986 }
987
988 let public_key_path = [
989 "subnet".as_bytes(),
990 delegation.subnet_id.as_ref(),
991 "public_key".as_bytes(),
992 ];
993 lookup_value(&cert.tree, public_key_path).map(<[u8]>::to_vec)
994 }
995 }
996 }
997
998 fn check_delegation_for_subnet(
999 &self,
1000 delegation: &Option<Delegation>,
1001 subnet_id: Principal,
1002 ) -> Result<Vec<u8>, AgentError> {
1003 match delegation {
1004 None => Ok(self.read_root_key()),
1005 Some(delegation) => {
1006 let cert: Certificate = serde_cbor::from_slice(&delegation.certificate)
1007 .map_err(AgentError::InvalidCborData)?;
1008 if cert.delegation.is_some() {
1009 return Err(AgentError::CertificateHasTooManyDelegations);
1010 }
1011 self.verify_cert_for_subnet(&cert, subnet_id)?;
1012 let public_key_path = [
1013 "subnet".as_bytes(),
1014 delegation.subnet_id.as_ref(),
1015 "public_key".as_bytes(),
1016 ];
1017 let pk = lookup_value(&cert.tree, public_key_path)
1018 .map_err(|_| AgentError::CertificateNotAuthorized())?
1019 .to_vec();
1020 Ok(pk)
1021 }
1022 }
1023 }
1024
1025 pub async fn read_state_canister_info(
1028 &self,
1029 canister_id: Principal,
1030 path: &str,
1031 ) -> Result<Vec<u8>, AgentError> {
1032 let paths: Vec<Vec<Label>> = vec![vec![
1033 "canister".into(),
1034 Label::from_bytes(canister_id.as_slice()),
1035 path.into(),
1036 ]];
1037
1038 let cert = self.read_state_raw(paths, canister_id).await?;
1039
1040 lookup_canister_info(cert, canister_id, path)
1041 }
1042
1043 pub async fn read_state_canister_controllers(
1045 &self,
1046 canister_id: Principal,
1047 ) -> Result<Vec<Principal>, AgentError> {
1048 let blob = self
1049 .read_state_canister_info(canister_id, "controllers")
1050 .await?;
1051 let controllers: Vec<Principal> =
1052 serde_cbor::from_slice(&blob).map_err(AgentError::InvalidCborData)?;
1053 Ok(controllers)
1054 }
1055
1056 pub async fn read_state_canister_module_hash(
1058 &self,
1059 canister_id: Principal,
1060 ) -> Result<Vec<u8>, AgentError> {
1061 self.read_state_canister_info(canister_id, "module_hash")
1062 .await
1063 }
1064
1065 pub async fn read_state_canister_metadata(
1067 &self,
1068 canister_id: Principal,
1069 path: &str,
1070 ) -> Result<Vec<u8>, AgentError> {
1071 let paths: Vec<Vec<Label>> = vec![vec![
1072 "canister".into(),
1073 Label::from_bytes(canister_id.as_slice()),
1074 "metadata".into(),
1075 path.into(),
1076 ]];
1077
1078 let cert = self.read_state_raw(paths, canister_id).await?;
1079
1080 lookup_canister_metadata(cert, canister_id, path)
1081 }
1082
1083 pub async fn read_state_subnet_metrics(
1085 &self,
1086 subnet_id: Principal,
1087 ) -> Result<SubnetMetrics, AgentError> {
1088 let paths = vec![vec![
1089 "subnet".into(),
1090 Label::from_bytes(subnet_id.as_slice()),
1091 "metrics".into(),
1092 ]];
1093 let cert = self.read_subnet_state_raw(paths, subnet_id).await?;
1094 lookup_subnet_metrics(cert, subnet_id)
1095 }
1096
1097 pub async fn read_state_subnet_canister_ranges(
1099 &self,
1100 subnet_id: Principal,
1101 ) -> Result<Vec<(Principal, Principal)>, AgentError> {
1102 let paths = vec![vec![
1103 "subnet".into(),
1104 Label::from_bytes(subnet_id.as_slice()),
1105 "canister_ranges".into(),
1106 ]];
1107 let cert = self.read_subnet_state_raw(paths, subnet_id).await?;
1108 lookup_subnet_canister_ranges(cert, subnet_id)
1109 }
1110
1111 pub async fn request_status_raw(
1113 &self,
1114 request_id: &RequestId,
1115 effective_canister_id: Principal,
1116 ) -> Result<(RequestStatusResponse, Certificate), AgentError> {
1117 let paths: Vec<Vec<Label>> =
1118 vec![vec!["request_status".into(), request_id.to_vec().into()]];
1119
1120 let cert = self.read_state_raw(paths, effective_canister_id).await?;
1121
1122 Ok((lookup_request_status(&cert, request_id)?, cert))
1123 }
1124
1125 pub async fn request_status_signed(
1129 &self,
1130 request_id: &RequestId,
1131 effective_canister_id: Principal,
1132 signed_request_status: Vec<u8>,
1133 ) -> Result<(RequestStatusResponse, Certificate), AgentError> {
1134 let _envelope: Envelope =
1135 serde_cbor::from_slice(&signed_request_status).map_err(AgentError::InvalidCborData)?;
1136 let read_state_response: ReadStateResponse = self
1137 .read_state_endpoint(effective_canister_id, signed_request_status)
1138 .await?;
1139
1140 let cert: Certificate = serde_cbor::from_slice(&read_state_response.certificate)
1141 .map_err(AgentError::InvalidCborData)?;
1142 self.verify(&cert, effective_canister_id)?;
1143 Ok((lookup_request_status(&cert, request_id)?, cert))
1144 }
1145
1146 pub fn update<S: Into<String>>(
1149 &self,
1150 canister_id: &Principal,
1151 method_name: S,
1152 ) -> UpdateBuilder {
1153 UpdateBuilder::new(self, *canister_id, method_name.into())
1154 }
1155
1156 pub async fn status(&self) -> Result<Status, AgentError> {
1158 let endpoint = "api/v2/status";
1159 let bytes = self.execute(Method::GET, endpoint, None).await?.1;
1160
1161 let cbor: serde_cbor::Value =
1162 serde_cbor::from_slice(&bytes).map_err(AgentError::InvalidCborData)?;
1163
1164 Status::try_from(&cbor).map_err(|_| AgentError::InvalidReplicaStatus)
1165 }
1166
1167 pub fn query<S: Into<String>>(&self, canister_id: &Principal, method_name: S) -> QueryBuilder {
1170 QueryBuilder::new(self, *canister_id, method_name.into())
1171 }
1172
1173 pub fn sign_request_status(
1176 &self,
1177 effective_canister_id: Principal,
1178 request_id: RequestId,
1179 ) -> Result<SignedRequestStatus, AgentError> {
1180 let paths: Vec<Vec<Label>> =
1181 vec![vec!["request_status".into(), request_id.to_vec().into()]];
1182 let read_state_content = self.read_state_content(paths)?;
1183 let signed_request_status = sign_envelope(&read_state_content, self.identity.clone())?;
1184 let ingress_expiry = read_state_content.ingress_expiry();
1185 let sender = *read_state_content.sender();
1186 Ok(SignedRequestStatus {
1187 ingress_expiry,
1188 sender,
1189 effective_canister_id,
1190 request_id,
1191 signed_request_status,
1192 })
1193 }
1194
1195 async fn get_subnet_by_canister(
1196 &self,
1197 canister: &Principal,
1198 ) -> Result<Arc<Subnet>, AgentError> {
1199 let subnet = self
1200 .subnet_key_cache
1201 .lock()
1202 .unwrap()
1203 .get_subnet_by_canister(canister);
1204 if let Some(subnet) = subnet {
1205 Ok(subnet)
1206 } else {
1207 self.fetch_subnet_by_canister(canister).await
1208 }
1209 }
1210
1211 pub async fn fetch_api_boundary_nodes_by_canister_id(
1213 &self,
1214 canister_id: Principal,
1215 ) -> Result<Vec<ApiBoundaryNode>, AgentError> {
1216 let paths = vec![vec!["api_boundary_nodes".into()]];
1217 let certificate = self.read_state_raw(paths, canister_id).await?;
1218 let api_boundary_nodes = lookup_api_boundary_nodes(certificate)?;
1219 Ok(api_boundary_nodes)
1220 }
1221
1222 pub async fn fetch_api_boundary_nodes_by_subnet_id(
1224 &self,
1225 subnet_id: Principal,
1226 ) -> Result<Vec<ApiBoundaryNode>, AgentError> {
1227 let paths = vec![vec!["api_boundary_nodes".into()]];
1228 let certificate = self.read_subnet_state_raw(paths, subnet_id).await?;
1229 let api_boundary_nodes = lookup_api_boundary_nodes(certificate)?;
1230 Ok(api_boundary_nodes)
1231 }
1232
1233 async fn fetch_subnet_by_canister(
1234 &self,
1235 canister: &Principal,
1236 ) -> Result<Arc<Subnet>, AgentError> {
1237 let cert = self
1238 .read_state_raw(vec![vec!["subnet".into()]], *canister)
1239 .await?;
1240
1241 let (subnet_id, subnet) = lookup_subnet(&cert, &self.root_key.read().unwrap())?;
1242 let subnet = Arc::new(subnet);
1243 self.subnet_key_cache
1244 .lock()
1245 .unwrap()
1246 .insert_subnet(subnet_id, subnet.clone());
1247 Ok(subnet)
1248 }
1249
1250 async fn request(
1251 &self,
1252 method: Method,
1253 endpoint: &str,
1254 body: Option<Vec<u8>>,
1255 ) -> Result<(StatusCode, HeaderMap, Vec<u8>), AgentError> {
1256 let body = body.map(Bytes::from);
1257
1258 let create_request_with_generated_url = || -> Result<http::Request<Bytes>, AgentError> {
1259 let url = self.route_provider.route()?.join(endpoint)?;
1260 let uri = Uri::from_str(url.as_str())
1261 .map_err(|e| AgentError::InvalidReplicaUrl(e.to_string()))?;
1262 let body = body.clone().unwrap_or_default();
1263 let request = http::Request::builder()
1264 .method(method.clone())
1265 .uri(uri)
1266 .header(CONTENT_TYPE, "application/cbor")
1267 .body(body)
1268 .map_err(|e| {
1269 AgentError::TransportError(TransportError::Generic(format!(
1270 "unable to create request: {e:#}"
1271 )))
1272 })?;
1273
1274 Ok(request)
1275 };
1276
1277 let response = self
1278 .client
1279 .call(
1280 &create_request_with_generated_url,
1281 self.max_tcp_error_retries,
1282 self.max_response_body_size,
1283 )
1284 .await?;
1285
1286 let (parts, body) = response.into_parts();
1287
1288 Ok((parts.status, parts.headers, body.to_vec()))
1289 }
1290
1291 async fn execute(
1292 &self,
1293 method: Method,
1294 endpoint: &str,
1295 body: Option<Vec<u8>>,
1296 ) -> Result<(StatusCode, Vec<u8>), AgentError> {
1297 let request_result = self.request(method.clone(), endpoint, body.clone()).await?;
1298
1299 let status = request_result.0;
1300 let headers = request_result.1;
1301 let body = request_result.2;
1302
1303 if status.is_client_error() || status.is_server_error() {
1304 Err(AgentError::HttpError(HttpErrorPayload {
1305 status: status.into(),
1306 content_type: headers
1307 .get(CONTENT_TYPE)
1308 .and_then(|value| value.to_str().ok())
1309 .map(str::to_string),
1310 content: body,
1311 }))
1312 } else if !(status == StatusCode::OK || status == StatusCode::ACCEPTED) {
1313 Err(AgentError::InvalidHttpResponse(format!(
1314 "Expected `200`, `202`, 4xx`, or `5xx` HTTP status code. Got: {status}",
1315 )))
1316 } else {
1317 Ok((status, body))
1318 }
1319 }
1320}
1321
1322fn principal_is_within_ranges(principal: &Principal, ranges: &[(Principal, Principal)]) -> bool {
1325 ranges
1326 .iter()
1327 .any(|r| principal >= &r.0 && principal <= &r.1)
1328}
1329
1330fn sign_envelope(
1331 content: &EnvelopeContent,
1332 identity: Arc<dyn Identity>,
1333) -> Result<Vec<u8>, AgentError> {
1334 let signature = identity.sign(content).map_err(AgentError::SigningError)?;
1335
1336 let envelope = Envelope {
1337 content: Cow::Borrowed(content),
1338 sender_pubkey: signature.public_key,
1339 sender_sig: signature.signature,
1340 sender_delegation: signature.delegations,
1341 };
1342
1343 let mut serialized_bytes = Vec::new();
1344 let mut serializer = serde_cbor::Serializer::new(&mut serialized_bytes);
1345 serializer.self_describe()?;
1346 envelope.serialize(&mut serializer)?;
1347
1348 Ok(serialized_bytes)
1349}
1350
1351pub fn signed_query_inspect(
1354 sender: Principal,
1355 canister_id: Principal,
1356 method_name: &str,
1357 arg: &[u8],
1358 ingress_expiry: u64,
1359 signed_query: Vec<u8>,
1360) -> Result<(), AgentError> {
1361 let envelope: Envelope =
1362 serde_cbor::from_slice(&signed_query).map_err(AgentError::InvalidCborData)?;
1363 match envelope.content.as_ref() {
1364 EnvelopeContent::Query {
1365 ingress_expiry: ingress_expiry_cbor,
1366 sender: sender_cbor,
1367 canister_id: canister_id_cbor,
1368 method_name: method_name_cbor,
1369 arg: arg_cbor,
1370 nonce: _nonce,
1371 } => {
1372 if ingress_expiry != *ingress_expiry_cbor {
1373 return Err(AgentError::CallDataMismatch {
1374 field: "ingress_expiry".to_string(),
1375 value_arg: ingress_expiry.to_string(),
1376 value_cbor: ingress_expiry_cbor.to_string(),
1377 });
1378 }
1379 if sender != *sender_cbor {
1380 return Err(AgentError::CallDataMismatch {
1381 field: "sender".to_string(),
1382 value_arg: sender.to_string(),
1383 value_cbor: sender_cbor.to_string(),
1384 });
1385 }
1386 if canister_id != *canister_id_cbor {
1387 return Err(AgentError::CallDataMismatch {
1388 field: "canister_id".to_string(),
1389 value_arg: canister_id.to_string(),
1390 value_cbor: canister_id_cbor.to_string(),
1391 });
1392 }
1393 if method_name != *method_name_cbor {
1394 return Err(AgentError::CallDataMismatch {
1395 field: "method_name".to_string(),
1396 value_arg: method_name.to_string(),
1397 value_cbor: method_name_cbor.clone(),
1398 });
1399 }
1400 if arg != *arg_cbor {
1401 return Err(AgentError::CallDataMismatch {
1402 field: "arg".to_string(),
1403 value_arg: format!("{arg:?}"),
1404 value_cbor: format!("{arg_cbor:?}"),
1405 });
1406 }
1407 }
1408 EnvelopeContent::Call { .. } => {
1409 return Err(AgentError::CallDataMismatch {
1410 field: "request_type".to_string(),
1411 value_arg: "query".to_string(),
1412 value_cbor: "call".to_string(),
1413 })
1414 }
1415 EnvelopeContent::ReadState { .. } => {
1416 return Err(AgentError::CallDataMismatch {
1417 field: "request_type".to_string(),
1418 value_arg: "query".to_string(),
1419 value_cbor: "read_state".to_string(),
1420 })
1421 }
1422 }
1423 Ok(())
1424}
1425
1426pub fn signed_update_inspect(
1429 sender: Principal,
1430 canister_id: Principal,
1431 method_name: &str,
1432 arg: &[u8],
1433 ingress_expiry: u64,
1434 signed_update: Vec<u8>,
1435) -> Result<(), AgentError> {
1436 let envelope: Envelope =
1437 serde_cbor::from_slice(&signed_update).map_err(AgentError::InvalidCborData)?;
1438 match envelope.content.as_ref() {
1439 EnvelopeContent::Call {
1440 nonce: _nonce,
1441 ingress_expiry: ingress_expiry_cbor,
1442 sender: sender_cbor,
1443 canister_id: canister_id_cbor,
1444 method_name: method_name_cbor,
1445 arg: arg_cbor,
1446 } => {
1447 if ingress_expiry != *ingress_expiry_cbor {
1448 return Err(AgentError::CallDataMismatch {
1449 field: "ingress_expiry".to_string(),
1450 value_arg: ingress_expiry.to_string(),
1451 value_cbor: ingress_expiry_cbor.to_string(),
1452 });
1453 }
1454 if sender != *sender_cbor {
1455 return Err(AgentError::CallDataMismatch {
1456 field: "sender".to_string(),
1457 value_arg: sender.to_string(),
1458 value_cbor: sender_cbor.to_string(),
1459 });
1460 }
1461 if canister_id != *canister_id_cbor {
1462 return Err(AgentError::CallDataMismatch {
1463 field: "canister_id".to_string(),
1464 value_arg: canister_id.to_string(),
1465 value_cbor: canister_id_cbor.to_string(),
1466 });
1467 }
1468 if method_name != *method_name_cbor {
1469 return Err(AgentError::CallDataMismatch {
1470 field: "method_name".to_string(),
1471 value_arg: method_name.to_string(),
1472 value_cbor: method_name_cbor.clone(),
1473 });
1474 }
1475 if arg != *arg_cbor {
1476 return Err(AgentError::CallDataMismatch {
1477 field: "arg".to_string(),
1478 value_arg: format!("{arg:?}"),
1479 value_cbor: format!("{arg_cbor:?}"),
1480 });
1481 }
1482 }
1483 EnvelopeContent::ReadState { .. } => {
1484 return Err(AgentError::CallDataMismatch {
1485 field: "request_type".to_string(),
1486 value_arg: "call".to_string(),
1487 value_cbor: "read_state".to_string(),
1488 })
1489 }
1490 EnvelopeContent::Query { .. } => {
1491 return Err(AgentError::CallDataMismatch {
1492 field: "request_type".to_string(),
1493 value_arg: "call".to_string(),
1494 value_cbor: "query".to_string(),
1495 })
1496 }
1497 }
1498 Ok(())
1499}
1500
1501pub fn signed_request_status_inspect(
1504 sender: Principal,
1505 request_id: &RequestId,
1506 ingress_expiry: u64,
1507 signed_request_status: Vec<u8>,
1508) -> Result<(), AgentError> {
1509 let paths: Vec<Vec<Label>> = vec![vec!["request_status".into(), request_id.to_vec().into()]];
1510 let envelope: Envelope =
1511 serde_cbor::from_slice(&signed_request_status).map_err(AgentError::InvalidCborData)?;
1512 match envelope.content.as_ref() {
1513 EnvelopeContent::ReadState {
1514 ingress_expiry: ingress_expiry_cbor,
1515 sender: sender_cbor,
1516 paths: paths_cbor,
1517 } => {
1518 if ingress_expiry != *ingress_expiry_cbor {
1519 return Err(AgentError::CallDataMismatch {
1520 field: "ingress_expiry".to_string(),
1521 value_arg: ingress_expiry.to_string(),
1522 value_cbor: ingress_expiry_cbor.to_string(),
1523 });
1524 }
1525 if sender != *sender_cbor {
1526 return Err(AgentError::CallDataMismatch {
1527 field: "sender".to_string(),
1528 value_arg: sender.to_string(),
1529 value_cbor: sender_cbor.to_string(),
1530 });
1531 }
1532
1533 if paths != *paths_cbor {
1534 return Err(AgentError::CallDataMismatch {
1535 field: "paths".to_string(),
1536 value_arg: format!("{paths:?}"),
1537 value_cbor: format!("{paths_cbor:?}"),
1538 });
1539 }
1540 }
1541 EnvelopeContent::Query { .. } => {
1542 return Err(AgentError::CallDataMismatch {
1543 field: "request_type".to_string(),
1544 value_arg: "read_state".to_string(),
1545 value_cbor: "query".to_string(),
1546 })
1547 }
1548 EnvelopeContent::Call { .. } => {
1549 return Err(AgentError::CallDataMismatch {
1550 field: "request_type".to_string(),
1551 value_arg: "read_state".to_string(),
1552 value_cbor: "call".to_string(),
1553 })
1554 }
1555 }
1556 Ok(())
1557}
1558
1559#[derive(Clone)]
1560struct SubnetCache {
1561 subnets: TimedCache<Principal, Arc<Subnet>>,
1562 canister_index: RangeInclusiveMap<Principal, Principal, PrincipalStep>,
1563}
1564
1565impl SubnetCache {
1566 fn new() -> Self {
1567 Self {
1568 subnets: TimedCache::with_lifespan(300),
1569 canister_index: RangeInclusiveMap::new_with_step_fns(),
1570 }
1571 }
1572
1573 fn get_subnet_by_canister(&mut self, canister: &Principal) -> Option<Arc<Subnet>> {
1574 self.canister_index
1575 .get(canister)
1576 .and_then(|subnet_id| self.subnets.cache_get(subnet_id).cloned())
1577 .filter(|subnet| subnet.canister_ranges.contains(canister))
1578 }
1579
1580 fn insert_subnet(&mut self, subnet_id: Principal, subnet: Arc<Subnet>) {
1581 self.subnets.cache_set(subnet_id, subnet.clone());
1582 for range in subnet.canister_ranges.iter() {
1583 self.canister_index.insert(range.clone(), subnet_id);
1584 }
1585 }
1586}
1587
1588#[derive(Clone, Copy)]
1589struct PrincipalStep;
1590
1591impl StepFns<Principal> for PrincipalStep {
1592 fn add_one(start: &Principal) -> Principal {
1593 let bytes = start.as_slice();
1594 let mut arr = [0; 29];
1595 arr[..bytes.len()].copy_from_slice(bytes);
1596 for byte in arr[..bytes.len() - 1].iter_mut().rev() {
1597 *byte = byte.wrapping_add(1);
1598 if *byte != 0 {
1599 break;
1600 }
1601 }
1602 Principal::from_slice(&arr[..bytes.len()])
1603 }
1604 fn sub_one(start: &Principal) -> Principal {
1605 let bytes = start.as_slice();
1606 let mut arr = [0; 29];
1607 arr[..bytes.len()].copy_from_slice(bytes);
1608 for byte in arr[..bytes.len() - 1].iter_mut().rev() {
1609 *byte = byte.wrapping_sub(1);
1610 if *byte != 255 {
1611 break;
1612 }
1613 }
1614 Principal::from_slice(&arr[..bytes.len()])
1615 }
1616}
1617
1618#[derive(Clone)]
1619pub(crate) struct Subnet {
1620 _key: Vec<u8>,
1623 node_keys: HashMap<Principal, Vec<u8>>,
1624 canister_ranges: RangeInclusiveSet<Principal, PrincipalStep>,
1625}
1626
1627#[derive(Debug, Clone)]
1629pub struct ApiBoundaryNode {
1630 pub domain: String,
1632 pub ipv6_address: String,
1634 pub ipv4_address: Option<String>,
1636}
1637
1638#[derive(Debug, Clone)]
1642#[non_exhaustive]
1643pub struct QueryBuilder<'agent> {
1644 agent: &'agent Agent,
1645 pub effective_canister_id: Principal,
1647 pub canister_id: Principal,
1649 pub method_name: String,
1651 pub arg: Vec<u8>,
1653 pub ingress_expiry_datetime: Option<u64>,
1655 pub use_nonce: bool,
1657}
1658
1659impl<'agent> QueryBuilder<'agent> {
1660 pub fn new(agent: &'agent Agent, canister_id: Principal, method_name: String) -> Self {
1662 Self {
1663 agent,
1664 effective_canister_id: canister_id,
1665 canister_id,
1666 method_name,
1667 arg: vec![],
1668 ingress_expiry_datetime: None,
1669 use_nonce: false,
1670 }
1671 }
1672
1673 pub fn with_effective_canister_id(mut self, canister_id: Principal) -> Self {
1675 self.effective_canister_id = canister_id;
1676 self
1677 }
1678
1679 pub fn with_arg<A: Into<Vec<u8>>>(mut self, arg: A) -> Self {
1681 self.arg = arg.into();
1682 self
1683 }
1684
1685 pub fn expire_at(mut self, time: impl Into<OffsetDateTime>) -> Self {
1687 self.ingress_expiry_datetime = Some(time.into().unix_timestamp_nanos() as u64);
1688 self
1689 }
1690
1691 pub fn expire_after(mut self, duration: Duration) -> Self {
1693 self.ingress_expiry_datetime = Some(
1694 OffsetDateTime::now_utc()
1695 .saturating_add(duration.try_into().expect("negative duration"))
1696 .unix_timestamp_nanos() as u64,
1697 );
1698 self
1699 }
1700
1701 pub fn with_nonce_generation(mut self) -> Self {
1704 self.use_nonce = true;
1705 self
1706 }
1707
1708 pub async fn call(self) -> Result<Vec<u8>, AgentError> {
1710 self.agent
1711 .query_raw(
1712 self.canister_id,
1713 self.effective_canister_id,
1714 self.method_name,
1715 self.arg,
1716 self.ingress_expiry_datetime,
1717 self.use_nonce,
1718 None,
1719 )
1720 .await
1721 }
1722
1723 pub async fn call_with_verification(self) -> Result<Vec<u8>, AgentError> {
1728 self.agent
1729 .query_raw(
1730 self.canister_id,
1731 self.effective_canister_id,
1732 self.method_name,
1733 self.arg,
1734 self.ingress_expiry_datetime,
1735 self.use_nonce,
1736 Some(true),
1737 )
1738 .await
1739 }
1740
1741 pub async fn call_without_verification(self) -> Result<Vec<u8>, AgentError> {
1746 self.agent
1747 .query_raw(
1748 self.canister_id,
1749 self.effective_canister_id,
1750 self.method_name,
1751 self.arg,
1752 self.ingress_expiry_datetime,
1753 self.use_nonce,
1754 Some(false),
1755 )
1756 .await
1757 }
1758
1759 pub fn sign(self) -> Result<SignedQuery, AgentError> {
1762 let effective_canister_id = self.effective_canister_id;
1763 let identity = self.agent.identity.clone();
1764 let content = self.into_envelope()?;
1765 let signed_query = sign_envelope(&content, identity)?;
1766 let EnvelopeContent::Query {
1767 ingress_expiry,
1768 sender,
1769 canister_id,
1770 method_name,
1771 arg,
1772 nonce,
1773 } = content
1774 else {
1775 unreachable!()
1776 };
1777 Ok(SignedQuery {
1778 ingress_expiry,
1779 sender,
1780 canister_id,
1781 method_name,
1782 arg,
1783 effective_canister_id,
1784 signed_query,
1785 nonce,
1786 })
1787 }
1788
1789 pub fn into_envelope(self) -> Result<EnvelopeContent, AgentError> {
1791 self.agent.query_content(
1792 self.canister_id,
1793 self.method_name,
1794 self.arg,
1795 self.ingress_expiry_datetime,
1796 self.use_nonce,
1797 )
1798 }
1799}
1800
1801impl<'agent> IntoFuture for QueryBuilder<'agent> {
1802 type IntoFuture = AgentFuture<'agent, Vec<u8>>;
1803 type Output = Result<Vec<u8>, AgentError>;
1804 fn into_future(self) -> Self::IntoFuture {
1805 Box::pin(self.call())
1806 }
1807}
1808
1809pub struct UpdateCall<'agent> {
1811 agent: &'agent Agent,
1812 response_future: AgentFuture<'agent, CallResponse<(Vec<u8>, Certificate)>>,
1813 effective_canister_id: Principal,
1814 canister_id: Principal,
1815 method_name: String,
1816}
1817
1818impl fmt::Debug for UpdateCall<'_> {
1819 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1820 f.debug_struct("UpdateCall")
1821 .field("agent", &self.agent)
1822 .field("effective_canister_id", &self.effective_canister_id)
1823 .finish_non_exhaustive()
1824 }
1825}
1826
1827impl Future for UpdateCall<'_> {
1828 type Output = Result<CallResponse<(Vec<u8>, Certificate)>, AgentError>;
1829 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1830 self.response_future.as_mut().poll(cx)
1831 }
1832}
1833
1834impl<'a> UpdateCall<'a> {
1835 pub async fn and_wait(self) -> Result<(Vec<u8>, Certificate), AgentError> {
1837 let response = self.response_future.await?;
1838
1839 match response {
1840 CallResponse::Response(response) => Ok(response),
1841 CallResponse::Poll(request_id) => {
1842 self.agent
1843 .wait_inner(
1844 &request_id,
1845 self.effective_canister_id,
1846 Some(Operation::Call {
1847 canister: self.canister_id,
1848 method: self.method_name,
1849 }),
1850 )
1851 .await
1852 }
1853 }
1854 }
1855}
1856#[derive(Debug)]
1861pub struct UpdateBuilder<'agent> {
1862 agent: &'agent Agent,
1863 pub effective_canister_id: Principal,
1865 pub canister_id: Principal,
1867 pub method_name: String,
1869 pub arg: Vec<u8>,
1871 pub ingress_expiry_datetime: Option<u64>,
1873}
1874
1875impl<'agent> UpdateBuilder<'agent> {
1876 pub fn new(agent: &'agent Agent, canister_id: Principal, method_name: String) -> Self {
1878 Self {
1879 agent,
1880 effective_canister_id: canister_id,
1881 canister_id,
1882 method_name,
1883 arg: vec![],
1884 ingress_expiry_datetime: None,
1885 }
1886 }
1887
1888 pub fn with_effective_canister_id(mut self, canister_id: Principal) -> Self {
1890 self.effective_canister_id = canister_id;
1891 self
1892 }
1893
1894 pub fn with_arg<A: Into<Vec<u8>>>(mut self, arg: A) -> Self {
1896 self.arg = arg.into();
1897 self
1898 }
1899
1900 pub fn expire_at(mut self, time: impl Into<OffsetDateTime>) -> Self {
1902 self.ingress_expiry_datetime = Some(time.into().unix_timestamp_nanos() as u64);
1903 self
1904 }
1905
1906 pub fn expire_after(mut self, duration: Duration) -> Self {
1908 self.ingress_expiry_datetime = Some(
1909 OffsetDateTime::now_utc()
1910 .saturating_add(duration.try_into().expect("negative duration"))
1911 .unix_timestamp_nanos() as u64,
1912 );
1913 self
1914 }
1915
1916 pub async fn call_and_wait(self) -> Result<Vec<u8>, AgentError> {
1919 self.call().and_wait().await.map(|x| x.0)
1920 }
1921
1922 pub fn call(self) -> UpdateCall<'agent> {
1925 let method_name = self.method_name.clone();
1926 let response_future = async move {
1927 self.agent
1928 .update_raw(
1929 self.canister_id,
1930 self.effective_canister_id,
1931 self.method_name,
1932 self.arg,
1933 self.ingress_expiry_datetime,
1934 )
1935 .await
1936 };
1937 UpdateCall {
1938 agent: self.agent,
1939 response_future: Box::pin(response_future),
1940 effective_canister_id: self.effective_canister_id,
1941 canister_id: self.canister_id,
1942 method_name,
1943 }
1944 }
1945
1946 pub fn sign(self) -> Result<SignedUpdate, AgentError> {
1949 let identity = self.agent.identity.clone();
1950 let effective_canister_id = self.effective_canister_id;
1951 let content = self.into_envelope()?;
1952 let signed_update = sign_envelope(&content, identity)?;
1953 let request_id = to_request_id(&content)?;
1954 let EnvelopeContent::Call {
1955 nonce,
1956 ingress_expiry,
1957 sender,
1958 canister_id,
1959 method_name,
1960 arg,
1961 } = content
1962 else {
1963 unreachable!()
1964 };
1965 Ok(SignedUpdate {
1966 nonce,
1967 ingress_expiry,
1968 sender,
1969 canister_id,
1970 method_name,
1971 arg,
1972 effective_canister_id,
1973 signed_update,
1974 request_id,
1975 })
1976 }
1977
1978 pub fn into_envelope(self) -> Result<EnvelopeContent, AgentError> {
1980 let nonce = self.agent.nonce_factory.generate();
1981 self.agent.update_content(
1982 self.canister_id,
1983 self.method_name,
1984 self.arg,
1985 self.ingress_expiry_datetime,
1986 nonce,
1987 )
1988 }
1989}
1990
1991impl<'agent> IntoFuture for UpdateBuilder<'agent> {
1992 type IntoFuture = AgentFuture<'agent, Vec<u8>>;
1993 type Output = Result<Vec<u8>, AgentError>;
1994 fn into_future(self) -> Self::IntoFuture {
1995 Box::pin(self.call_and_wait())
1996 }
1997}
1998
1999#[cfg_attr(target_family = "wasm", async_trait(?Send))]
2001#[cfg_attr(not(target_family = "wasm"), async_trait)]
2002pub trait HttpService: Send + Sync + Debug {
2003 async fn call<'a>(
2005 &'a self,
2006 req: &'a (dyn Fn() -> Result<http::Request<Bytes>, AgentError> + Send + Sync),
2007 max_retries: usize,
2008 size_limit: Option<usize>,
2009 ) -> Result<http::Response<Bytes>, AgentError>;
2010}
2011
2012fn from_http_request(req: http::Request<Bytes>) -> Result<Request, AgentError> {
2014 let (parts, body) = req.into_parts();
2015 let body = reqwest::Body::from(body);
2016 let request = http::Request::from_parts(parts, body)
2019 .try_into()
2020 .map_err(|e: reqwest::Error| AgentError::InvalidReplicaUrl(e.to_string()))?;
2021
2022 Ok(request)
2023}
2024
2025#[cfg(not(target_family = "wasm"))]
2027async fn to_http_response(
2028 resp: Response,
2029 size_limit: Option<usize>,
2030) -> Result<http::Response<Bytes>, AgentError> {
2031 use http_body_util::{BodyExt, Limited};
2032
2033 let resp: http::Response<reqwest::Body> = resp.into();
2034 let (parts, body) = resp.into_parts();
2035 let body = Limited::new(body, size_limit.unwrap_or(usize::MAX));
2036 let body = body
2037 .collect()
2038 .await
2039 .map_err(|e| {
2040 AgentError::TransportError(TransportError::Generic(format!(
2041 "unable to read response body: {e:#}"
2042 )))
2043 })?
2044 .to_bytes();
2045 let resp = http::Response::from_parts(parts, body);
2046
2047 Ok(resp)
2048}
2049
2050#[cfg(target_family = "wasm")]
2054async fn to_http_response(
2055 resp: Response,
2056 size_limit: Option<usize>,
2057) -> Result<http::Response<Bytes>, AgentError> {
2058 use futures_util::StreamExt;
2059 use http_body::Frame;
2060 use http_body_util::{Limited, StreamBody};
2061
2062 let status = resp.status();
2064 let headers = resp.headers().clone();
2065
2066 let stream = resp.bytes_stream().map(|x| x.map(Frame::data));
2068 let body = StreamBody::new(stream);
2069 let body = Limited::new(body, size_limit.unwrap_or(usize::MAX));
2070 let body = http_body_util::BodyExt::collect(body)
2071 .await
2072 .map_err(|e| {
2073 AgentError::TransportError(TransportError::Generic(format!(
2074 "unable to read response body: {e:#}"
2075 )))
2076 })?
2077 .to_bytes();
2078
2079 let mut resp = http::Response::new(body);
2080 *resp.status_mut() = status;
2081 *resp.headers_mut() = headers;
2082
2083 Ok(resp)
2084}
2085
2086#[cfg(not(target_family = "wasm"))]
2087#[async_trait]
2088impl<T> HttpService for T
2089where
2090 for<'a> &'a T: Service<Request, Response = Response, Error = reqwest::Error>,
2091 for<'a> <&'a Self as Service<Request>>::Future: Send,
2092 T: Send + Sync + Debug + ?Sized,
2093{
2094 #[allow(clippy::needless_arbitrary_self_type)]
2095 async fn call<'a>(
2096 mut self: &'a Self,
2097 req: &'a (dyn Fn() -> Result<http::Request<Bytes>, AgentError> + Send + Sync),
2098 max_retries: usize,
2099 size_limit: Option<usize>,
2100 ) -> Result<http::Response<Bytes>, AgentError> {
2101 let mut retry_count = 0;
2102 loop {
2103 let request = from_http_request(req()?)?;
2104
2105 match Service::call(&mut self, request).await {
2106 Err(err) => {
2107 if err.is_connect() {
2109 if retry_count >= max_retries {
2110 return Err(AgentError::TransportError(TransportError::Reqwest(err)));
2111 }
2112 retry_count += 1;
2113 }
2114 }
2115
2116 Ok(resp) => {
2117 let resp = to_http_response(resp, size_limit).await?;
2118 return Ok(resp);
2119 }
2120 }
2121 }
2122 }
2123}
2124
2125#[cfg(target_family = "wasm")]
2126#[async_trait(?Send)]
2127impl<T> HttpService for T
2128where
2129 for<'a> &'a T: Service<Request, Response = Response, Error = reqwest::Error>,
2130 T: Send + Sync + Debug + ?Sized,
2131{
2132 #[allow(clippy::needless_arbitrary_self_type)]
2133 async fn call<'a>(
2134 mut self: &'a Self,
2135 req: &'a (dyn Fn() -> Result<http::Request<Bytes>, AgentError> + Send + Sync),
2136 _retries: usize,
2137 _size_limit: Option<usize>,
2138 ) -> Result<http::Response<Bytes>, AgentError> {
2139 let request = from_http_request(req()?)?;
2140 let response = Service::call(&mut self, request)
2141 .await
2142 .map_err(|e| AgentError::TransportError(TransportError::Reqwest(e)))?;
2143
2144 to_http_response(response, _size_limit).await
2145 }
2146}
2147
2148#[derive(Debug)]
2149struct Retry429Logic {
2150 client: Client,
2151}
2152
2153#[cfg_attr(target_family = "wasm", async_trait(?Send))]
2154#[cfg_attr(not(target_family = "wasm"), async_trait)]
2155impl HttpService for Retry429Logic {
2156 async fn call<'a>(
2157 &'a self,
2158 req: &'a (dyn Fn() -> Result<http::Request<Bytes>, AgentError> + Send + Sync),
2159 _max_tcp_retries: usize,
2160 _size_limit: Option<usize>,
2161 ) -> Result<http::Response<Bytes>, AgentError> {
2162 let mut retries = 0;
2163 loop {
2164 #[cfg(not(target_family = "wasm"))]
2165 let resp = self.client.call(req, _max_tcp_retries, _size_limit).await?;
2166 #[cfg(target_family = "wasm")]
2168 let resp = {
2169 let request = from_http_request(req()?)?;
2170 let resp = self
2171 .client
2172 .execute(request)
2173 .await
2174 .map_err(|e| AgentError::TransportError(TransportError::Reqwest(e)))?;
2175
2176 to_http_response(resp, _size_limit).await?
2177 };
2178
2179 if resp.status() == StatusCode::TOO_MANY_REQUESTS {
2180 if retries == 6 {
2181 break Ok(resp);
2182 } else {
2183 retries += 1;
2184 crate::util::sleep(Duration::from_millis(250)).await;
2185 continue;
2186 }
2187 } else {
2188 break Ok(resp);
2189 }
2190 }
2191 }
2192}
2193
2194#[cfg(all(test, not(target_family = "wasm")))]
2195mod offline_tests {
2196 use super::*;
2197 use tokio::net::TcpListener;
2198 #[test]
2201 fn rounded_expiry() {
2202 let agent = Agent::builder()
2203 .with_url("http://not-a-real-url")
2204 .build()
2205 .unwrap();
2206 let mut prev_expiry = None;
2207 let mut num_timestamps = 0;
2208 for _ in 0..6 {
2209 let update = agent
2210 .update(&Principal::management_canister(), "not_a_method")
2211 .sign()
2212 .unwrap();
2213 if prev_expiry < Some(update.ingress_expiry) {
2214 prev_expiry = Some(update.ingress_expiry);
2215 num_timestamps += 1;
2216 }
2217 }
2218 assert!(num_timestamps <= 2, "num_timestamps:{num_timestamps} > 2");
2220 }
2221
2222 #[tokio::test]
2223 async fn client_ratelimit() {
2224 let mock_server = TcpListener::bind("127.0.0.1:0").await.unwrap();
2225 let count = Arc::new(Mutex::new(0));
2226 let port = mock_server.local_addr().unwrap().port();
2227 tokio::spawn({
2228 let count = count.clone();
2229 async move {
2230 loop {
2231 let (mut conn, _) = mock_server.accept().await.unwrap();
2232 *count.lock().unwrap() += 1;
2233 tokio::spawn(
2234 async move { tokio::io::copy(&mut conn, &mut tokio::io::sink()).await },
2236 );
2237 }
2238 }
2239 });
2240 let agent = Agent::builder()
2241 .with_http_client(Client::builder().http1_only().build().unwrap())
2242 .with_url(format!("http://127.0.0.1:{port}"))
2243 .with_max_concurrent_requests(2)
2244 .build()
2245 .unwrap();
2246 for _ in 0..3 {
2247 let agent = agent.clone();
2248 tokio::spawn(async move {
2249 agent
2250 .query(&"ryjl3-tyaaa-aaaaa-aaaba-cai".parse().unwrap(), "greet")
2251 .call()
2252 .await
2253 });
2254 }
2255 crate::util::sleep(Duration::from_millis(250)).await;
2256 assert_eq!(*count.lock().unwrap(), 2);
2257 }
2258}