1pub(crate) mod agent_config;
3pub mod agent_error;
4pub(crate) mod builder;
5#[doc(hidden)]
7#[deprecated(since = "0.38.0", note = "use the AgentBuilder methods")]
8pub mod http_transport;
9pub(crate) mod nonce;
10pub(crate) mod response_authentication;
11pub mod route_provider;
12pub mod status;
13
14pub use agent_config::AgentConfig;
15pub use agent_error::AgentError;
16use agent_error::{HttpErrorPayload, Operation};
17use async_lock::Semaphore;
18use async_trait::async_trait;
19pub use builder::AgentBuilder;
20use bytes::Bytes;
21use cached::{Cached, TimedCache};
22use http::{header::CONTENT_TYPE, HeaderMap, Method, StatusCode, Uri};
23use ic_ed25519::{PublicKey, SignatureError};
24#[doc(inline)]
25pub use ic_transport_types::{
26 signed, CallResponse, Envelope, EnvelopeContent, RejectCode, RejectResponse, ReplyResponse,
27 RequestStatusResponse,
28};
29pub use nonce::{NonceFactory, NonceGenerator};
30use rangemap::{RangeInclusiveMap, RangeInclusiveSet, StepFns};
31use reqwest::{Client, Request, Response};
32use route_provider::{
33 dynamic_routing::{
34 dynamic_route_provider::DynamicRouteProviderBuilder, node::Node,
35 snapshot::latency_based_routing::LatencyRoutingSnapshot,
36 },
37 RouteProvider, UrlUntilReady,
38};
39use time::OffsetDateTime;
40use tower_service::Service;
41
42#[cfg(test)]
43mod agent_test;
44
45use crate::{
46 agent::response_authentication::{
47 extract_der, lookup_canister_info, lookup_canister_metadata, lookup_request_status,
48 lookup_subnet, lookup_subnet_metrics, lookup_time, lookup_value,
49 },
50 agent_error::TransportError,
51 export::Principal,
52 identity::Identity,
53 to_request_id, RequestId,
54};
55use backoff::{backoff::Backoff, ExponentialBackoffBuilder};
56use backoff::{exponential::ExponentialBackoff, SystemClock};
57use ic_certification::{Certificate, Delegation, Label};
58use ic_transport_types::{
59 signed::{SignedQuery, SignedRequestStatus, SignedUpdate},
60 QueryResponse, ReadStateResponse, SubnetMetrics, TransportCallResponse,
61};
62use serde::Serialize;
63use status::Status;
64use std::{
65 borrow::Cow,
66 collections::HashMap,
67 convert::TryFrom,
68 fmt::{self, Debug},
69 future::{Future, IntoFuture},
70 pin::Pin,
71 str::FromStr,
72 sync::{Arc, Mutex, RwLock},
73 task::{Context, Poll},
74 time::Duration,
75};
76
77use crate::agent::response_authentication::lookup_api_boundary_nodes;
78
79const IC_STATE_ROOT_DOMAIN_SEPARATOR: &[u8; 14] = b"\x0Dic-state-root";
80
81const IC_ROOT_KEY: &[u8; 133] = b"\x30\x81\x82\x30\x1d\x06\x0d\x2b\x06\x01\x04\x01\x82\xdc\x7c\x05\x03\x01\x02\x01\x06\x0c\x2b\x06\x01\x04\x01\x82\xdc\x7c\x05\x03\x02\x01\x03\x61\x00\x81\x4c\x0e\x6e\xc7\x1f\xab\x58\x3b\x08\xbd\x81\x37\x3c\x25\x5c\x3c\x37\x1b\x2e\x84\x86\x3c\x98\xa4\xf1\xe0\x8b\x74\x23\x5d\x14\xfb\x5d\x9c\x0c\xd5\x46\xd9\x68\x5f\x91\x3a\x0c\x0b\x2c\xc5\x34\x15\x83\xbf\x4b\x43\x92\xe4\x67\xdb\x96\xd6\x5b\x9b\xb4\xcb\x71\x71\x12\xf8\x47\x2e\x0d\x5a\x4d\x14\x50\x5f\xfd\x74\x84\xb0\x12\x91\x09\x1c\x5f\x87\xb9\x88\x83\x46\x3f\x98\x09\x1a\x0b\xaa\xae";
82
83#[cfg(not(target_family = "wasm"))]
84type AgentFuture<'a, V> = Pin<Box<dyn Future<Output = Result<V, AgentError>> + Send + 'a>>;
85
86#[cfg(target_family = "wasm")]
87type AgentFuture<'a, V> = Pin<Box<dyn Future<Output = Result<V, AgentError>> + 'a>>;
88
89#[derive(Clone)]
153pub struct Agent {
154 nonce_factory: Arc<dyn NonceGenerator>,
155 identity: Arc<dyn Identity>,
156 ingress_expiry: Duration,
157 root_key: Arc<RwLock<Vec<u8>>>,
158 client: Arc<dyn HttpService>,
159 route_provider: Arc<dyn RouteProvider>,
160 subnet_key_cache: Arc<Mutex<SubnetCache>>,
161 concurrent_requests_semaphore: Arc<Semaphore>,
162 verify_query_signatures: bool,
163 max_response_body_size: Option<usize>,
164 max_polling_time: Duration,
165 #[allow(dead_code)]
166 max_tcp_error_retries: usize,
167}
168
169impl fmt::Debug for Agent {
170 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
171 f.debug_struct("Agent")
172 .field("ingress_expiry", &self.ingress_expiry)
173 .finish_non_exhaustive()
174 }
175}
176
177impl Agent {
178 pub fn builder() -> builder::AgentBuilder {
181 Default::default()
182 }
183
184 pub fn new(config: agent_config::AgentConfig) -> Result<Agent, AgentError> {
186 let client = config.http_service.unwrap_or_else(|| {
187 Arc::new(Retry429Logic {
188 client: config.client.unwrap_or_else(|| {
189 #[cfg(not(target_family = "wasm"))]
190 {
191 Client::builder()
192 .use_rustls_tls()
193 .timeout(Duration::from_secs(360))
194 .build()
195 .expect("Could not create HTTP client.")
196 }
197 #[cfg(all(target_family = "wasm", feature = "wasm-bindgen"))]
198 {
199 Client::new()
200 }
201 }),
202 })
203 });
204 Ok(Agent {
205 nonce_factory: config.nonce_factory,
206 identity: config.identity,
207 ingress_expiry: config.ingress_expiry,
208 root_key: Arc::new(RwLock::new(IC_ROOT_KEY.to_vec())),
209 client: client.clone(),
210 route_provider: if let Some(route_provider) = config.route_provider {
211 route_provider
212 } else if let Some(url) = config.url {
213 if config.background_dynamic_routing {
214 assert!(
215 url.scheme() == "https" && url.path() == "/" && url.port().is_none() && url.domain().is_some(),
216 "in dynamic routing mode, URL must be in the exact form https://domain with no path, port, IP, or non-HTTPS scheme"
217 );
218 let seeds = vec![Node::new(url.domain().unwrap()).unwrap()];
219 UrlUntilReady::new(url, async move {
220 DynamicRouteProviderBuilder::new(
221 LatencyRoutingSnapshot::new(),
222 seeds,
223 client,
224 )
225 .build()
226 .await
227 }) as Arc<dyn RouteProvider>
228 } else {
229 Arc::new(url)
230 }
231 } else {
232 panic!("either route_provider or url must be specified");
233 },
234 subnet_key_cache: Arc::new(Mutex::new(SubnetCache::new())),
235 verify_query_signatures: config.verify_query_signatures,
236 concurrent_requests_semaphore: Arc::new(Semaphore::new(config.max_concurrent_requests)),
237 max_response_body_size: config.max_response_body_size,
238 max_tcp_error_retries: config.max_tcp_error_retries,
239 max_polling_time: config.max_polling_time,
240 })
241 }
242
243 pub fn set_identity<I>(&mut self, identity: I)
249 where
250 I: 'static + Identity,
251 {
252 self.identity = Arc::new(identity);
253 }
254 pub fn set_arc_identity(&mut self, identity: Arc<dyn Identity>) {
260 self.identity = identity;
261 }
262
263 pub async fn fetch_root_key(&self) -> Result<(), AgentError> {
272 if self.read_root_key()[..] != IC_ROOT_KEY[..] {
273 return Ok(());
275 }
276 let status = self.status().await?;
277 let Some(root_key) = status.root_key else {
278 return Err(AgentError::NoRootKeyInStatus(status));
279 };
280 self.set_root_key(root_key);
281 Ok(())
282 }
283
284 pub fn set_root_key(&self, root_key: Vec<u8>) {
289 *self.root_key.write().unwrap() = root_key;
290 }
291
292 pub fn read_root_key(&self) -> Vec<u8> {
294 self.root_key.read().unwrap().clone()
295 }
296
297 fn get_expiry_date(&self) -> u64 {
298 let expiry_raw = OffsetDateTime::now_utc() + self.ingress_expiry;
299 let mut rounded = expiry_raw.replace_nanosecond(0).unwrap();
300 if self.ingress_expiry.as_secs() > 90 {
301 rounded = rounded.replace_second(0).unwrap();
302 }
303 rounded.unix_timestamp_nanos().try_into().unwrap()
304 }
305
306 pub fn get_principal(&self) -> Result<Principal, String> {
308 self.identity.sender()
309 }
310
311 async fn query_endpoint<A>(
312 &self,
313 effective_canister_id: Principal,
314 serialized_bytes: Vec<u8>,
315 ) -> Result<A, AgentError>
316 where
317 A: serde::de::DeserializeOwned,
318 {
319 let _permit = self.concurrent_requests_semaphore.acquire().await;
320 let bytes = self
321 .execute(
322 Method::POST,
323 &format!("api/v2/canister/{}/query", effective_canister_id.to_text()),
324 Some(serialized_bytes),
325 )
326 .await?
327 .1;
328 serde_cbor::from_slice(&bytes).map_err(AgentError::InvalidCborData)
329 }
330
331 async fn read_state_endpoint<A>(
332 &self,
333 effective_canister_id: Principal,
334 serialized_bytes: Vec<u8>,
335 ) -> Result<A, AgentError>
336 where
337 A: serde::de::DeserializeOwned,
338 {
339 let _permit = self.concurrent_requests_semaphore.acquire().await;
340 let endpoint = format!(
341 "api/v2/canister/{}/read_state",
342 effective_canister_id.to_text()
343 );
344 let bytes = self
345 .execute(Method::POST, &endpoint, Some(serialized_bytes))
346 .await?
347 .1;
348 serde_cbor::from_slice(&bytes).map_err(AgentError::InvalidCborData)
349 }
350
351 async fn read_subnet_state_endpoint<A>(
352 &self,
353 subnet_id: Principal,
354 serialized_bytes: Vec<u8>,
355 ) -> Result<A, AgentError>
356 where
357 A: serde::de::DeserializeOwned,
358 {
359 let _permit = self.concurrent_requests_semaphore.acquire().await;
360 let endpoint = format!("api/v2/subnet/{}/read_state", subnet_id.to_text());
361 let bytes = self
362 .execute(Method::POST, &endpoint, Some(serialized_bytes))
363 .await?
364 .1;
365 serde_cbor::from_slice(&bytes).map_err(AgentError::InvalidCborData)
366 }
367
368 async fn call_endpoint(
369 &self,
370 effective_canister_id: Principal,
371 serialized_bytes: Vec<u8>,
372 ) -> Result<TransportCallResponse, AgentError> {
373 let _permit = self.concurrent_requests_semaphore.acquire().await;
374 let endpoint = format!("api/v3/canister/{}/call", effective_canister_id.to_text());
375 let (status_code, response_body) = self
376 .execute(Method::POST, &endpoint, Some(serialized_bytes))
377 .await?;
378
379 if status_code == StatusCode::ACCEPTED {
380 return Ok(TransportCallResponse::Accepted);
381 }
382
383 serde_cbor::from_slice(&response_body).map_err(AgentError::InvalidCborData)
384 }
385
386 #[allow(clippy::too_many_arguments)]
389 async fn query_raw(
390 &self,
391 canister_id: Principal,
392 effective_canister_id: Principal,
393 method_name: String,
394 arg: Vec<u8>,
395 ingress_expiry_datetime: Option<u64>,
396 use_nonce: bool,
397 explicit_verify_query_signatures: Option<bool>,
398 ) -> Result<Vec<u8>, AgentError> {
399 let operation = Operation::Call {
400 canister: canister_id,
401 method: method_name.clone(),
402 };
403 let content = self.query_content(
404 canister_id,
405 method_name,
406 arg,
407 ingress_expiry_datetime,
408 use_nonce,
409 )?;
410 let serialized_bytes = sign_envelope(&content, self.identity.clone())?;
411 self.query_inner(
412 effective_canister_id,
413 serialized_bytes,
414 content.to_request_id(),
415 explicit_verify_query_signatures,
416 operation,
417 )
418 .await
419 }
420
421 pub async fn query_signed(
425 &self,
426 effective_canister_id: Principal,
427 signed_query: Vec<u8>,
428 ) -> Result<Vec<u8>, AgentError> {
429 let envelope: Envelope =
430 serde_cbor::from_slice(&signed_query).map_err(AgentError::InvalidCborData)?;
431 let EnvelopeContent::Query {
432 canister_id,
433 method_name,
434 ..
435 } = &*envelope.content
436 else {
437 return Err(AgentError::CallDataMismatch {
438 field: "request_type".to_string(),
439 value_arg: "query".to_string(),
440 value_cbor: if matches!(*envelope.content, EnvelopeContent::Call { .. }) {
441 "update"
442 } else {
443 "read_state"
444 }
445 .to_string(),
446 });
447 };
448 let operation = Operation::Call {
449 canister: *canister_id,
450 method: method_name.clone(),
451 };
452 self.query_inner(
453 effective_canister_id,
454 signed_query,
455 envelope.content.to_request_id(),
456 None,
457 operation,
458 )
459 .await
460 }
461
462 async fn query_inner(
466 &self,
467 effective_canister_id: Principal,
468 signed_query: Vec<u8>,
469 request_id: RequestId,
470 explicit_verify_query_signatures: Option<bool>,
471 operation: Operation,
472 ) -> Result<Vec<u8>, AgentError> {
473 let response = if explicit_verify_query_signatures.unwrap_or(self.verify_query_signatures) {
474 let (response, mut subnet) = futures_util::try_join!(
475 self.query_endpoint::<QueryResponse>(effective_canister_id, signed_query),
476 self.get_subnet_by_canister(&effective_canister_id)
477 )?;
478 if response.signatures().is_empty() {
479 return Err(AgentError::MissingSignature);
480 } else if response.signatures().len() > subnet.node_keys.len() {
481 return Err(AgentError::TooManySignatures {
482 had: response.signatures().len(),
483 needed: subnet.node_keys.len(),
484 });
485 }
486 for signature in response.signatures() {
487 if OffsetDateTime::now_utc()
488 - OffsetDateTime::from_unix_timestamp_nanos(signature.timestamp.into()).unwrap()
489 > self.ingress_expiry
490 {
491 return Err(AgentError::CertificateOutdated(self.ingress_expiry));
492 }
493 let signable = response.signable(request_id, signature.timestamp);
494 let node_key = if let Some(node_key) = subnet.node_keys.get(&signature.identity) {
495 node_key
496 } else {
497 subnet = self
498 .fetch_subnet_by_canister(&effective_canister_id)
499 .await?;
500 subnet
501 .node_keys
502 .get(&signature.identity)
503 .ok_or(AgentError::CertificateNotAuthorized())?
504 };
505 if node_key.len() != 44 {
506 return Err(AgentError::DerKeyLengthMismatch {
507 expected: 44,
508 actual: node_key.len(),
509 });
510 }
511 const DER_PREFIX: [u8; 12] = [48, 42, 48, 5, 6, 3, 43, 101, 112, 3, 33, 0];
512 if node_key[..12] != DER_PREFIX {
513 return Err(AgentError::DerPrefixMismatch {
514 expected: DER_PREFIX.to_vec(),
515 actual: node_key[..12].to_vec(),
516 });
517 }
518 let pubkey = PublicKey::deserialize_raw(&node_key[12..])
519 .map_err(|_| AgentError::MalformedPublicKey)?;
520
521 match pubkey.verify_signature(&signable, &signature.signature[..]) {
522 Ok(()) => (),
523 Err(SignatureError::InvalidSignature) => {
524 return Err(AgentError::QuerySignatureVerificationFailed)
525 }
526 Err(SignatureError::InvalidLength) => {
527 return Err(AgentError::MalformedSignature)
528 }
529 _ => unreachable!(),
530 }
531 }
532 response
533 } else {
534 self.query_endpoint::<QueryResponse>(effective_canister_id, signed_query)
535 .await?
536 };
537
538 match response {
539 QueryResponse::Replied { reply, .. } => Ok(reply.arg),
540 QueryResponse::Rejected { reject, .. } => Err(AgentError::UncertifiedReject {
541 reject,
542 operation: Some(operation),
543 }),
544 }
545 }
546
547 fn query_content(
548 &self,
549 canister_id: Principal,
550 method_name: String,
551 arg: Vec<u8>,
552 ingress_expiry_datetime: Option<u64>,
553 use_nonce: bool,
554 ) -> Result<EnvelopeContent, AgentError> {
555 Ok(EnvelopeContent::Query {
556 sender: self.identity.sender().map_err(AgentError::SigningError)?,
557 canister_id,
558 method_name,
559 arg,
560 ingress_expiry: ingress_expiry_datetime.unwrap_or_else(|| self.get_expiry_date()),
561 nonce: use_nonce.then(|| self.nonce_factory.generate()).flatten(),
562 })
563 }
564
565 async fn update_raw(
567 &self,
568 canister_id: Principal,
569 effective_canister_id: Principal,
570 method_name: String,
571 arg: Vec<u8>,
572 ingress_expiry_datetime: Option<u64>,
573 ) -> Result<CallResponse<(Vec<u8>, Certificate)>, AgentError> {
574 let nonce = self.nonce_factory.generate();
575 let content = self.update_content(
576 canister_id,
577 method_name.clone(),
578 arg,
579 ingress_expiry_datetime,
580 nonce,
581 )?;
582 let operation = Some(Operation::Call {
583 canister: canister_id,
584 method: method_name,
585 });
586 let request_id = to_request_id(&content)?;
587 let serialized_bytes = sign_envelope(&content, self.identity.clone())?;
588
589 let response_body = self
590 .call_endpoint(effective_canister_id, serialized_bytes)
591 .await?;
592
593 match response_body {
594 TransportCallResponse::Replied { certificate } => {
595 let certificate =
596 serde_cbor::from_slice(&certificate).map_err(AgentError::InvalidCborData)?;
597
598 self.verify(&certificate, effective_canister_id)?;
599 let status = lookup_request_status(&certificate, &request_id)?;
600
601 match status {
602 RequestStatusResponse::Replied(reply) => {
603 Ok(CallResponse::Response((reply.arg, certificate)))
604 }
605 RequestStatusResponse::Rejected(reject_response) => {
606 Err(AgentError::CertifiedReject {
607 reject: reject_response,
608 operation,
609 })?
610 }
611 _ => Ok(CallResponse::Poll(request_id)),
612 }
613 }
614 TransportCallResponse::Accepted => Ok(CallResponse::Poll(request_id)),
615 TransportCallResponse::NonReplicatedRejection(reject_response) => {
616 Err(AgentError::UncertifiedReject {
617 reject: reject_response,
618 operation,
619 })
620 }
621 }
622 }
623
624 pub async fn update_signed(
628 &self,
629 effective_canister_id: Principal,
630 signed_update: Vec<u8>,
631 ) -> Result<CallResponse<Vec<u8>>, AgentError> {
632 let envelope: Envelope =
633 serde_cbor::from_slice(&signed_update).map_err(AgentError::InvalidCborData)?;
634 let EnvelopeContent::Call {
635 canister_id,
636 method_name,
637 ..
638 } = &*envelope.content
639 else {
640 return Err(AgentError::CallDataMismatch {
641 field: "request_type".to_string(),
642 value_arg: "update".to_string(),
643 value_cbor: if matches!(*envelope.content, EnvelopeContent::Query { .. }) {
644 "query"
645 } else {
646 "read_state"
647 }
648 .to_string(),
649 });
650 };
651 let operation = Some(Operation::Call {
652 canister: *canister_id,
653 method: method_name.clone(),
654 });
655 let request_id = to_request_id(&envelope.content)?;
656
657 let response_body = self
658 .call_endpoint(effective_canister_id, signed_update)
659 .await?;
660
661 match response_body {
662 TransportCallResponse::Replied { certificate } => {
663 let certificate =
664 serde_cbor::from_slice(&certificate).map_err(AgentError::InvalidCborData)?;
665
666 self.verify(&certificate, effective_canister_id)?;
667 let status = lookup_request_status(&certificate, &request_id)?;
668
669 match status {
670 RequestStatusResponse::Replied(reply) => Ok(CallResponse::Response(reply.arg)),
671 RequestStatusResponse::Rejected(reject_response) => {
672 Err(AgentError::CertifiedReject {
673 reject: reject_response,
674 operation,
675 })?
676 }
677 _ => Ok(CallResponse::Poll(request_id)),
678 }
679 }
680 TransportCallResponse::Accepted => Ok(CallResponse::Poll(request_id)),
681 TransportCallResponse::NonReplicatedRejection(reject_response) => {
682 Err(AgentError::UncertifiedReject {
683 reject: reject_response,
684 operation,
685 })
686 }
687 }
688 }
689
690 fn update_content(
691 &self,
692 canister_id: Principal,
693 method_name: String,
694 arg: Vec<u8>,
695 ingress_expiry_datetime: Option<u64>,
696 nonce: Option<Vec<u8>>,
697 ) -> Result<EnvelopeContent, AgentError> {
698 Ok(EnvelopeContent::Call {
699 canister_id,
700 method_name,
701 arg,
702 nonce,
703 sender: self.identity.sender().map_err(AgentError::SigningError)?,
704 ingress_expiry: ingress_expiry_datetime.unwrap_or_else(|| self.get_expiry_date()),
705 })
706 }
707
708 fn get_retry_policy(&self) -> ExponentialBackoff<SystemClock> {
709 ExponentialBackoffBuilder::new()
710 .with_initial_interval(Duration::from_millis(500))
711 .with_max_interval(Duration::from_secs(1))
712 .with_multiplier(1.4)
713 .with_max_elapsed_time(Some(self.max_polling_time))
714 .build()
715 }
716
717 pub async fn wait_signed(
719 &self,
720 request_id: &RequestId,
721 effective_canister_id: Principal,
722 signed_request_status: Vec<u8>,
723 ) -> Result<(Vec<u8>, Certificate), AgentError> {
724 let mut retry_policy = self.get_retry_policy();
725
726 let mut request_accepted = false;
727 let (resp, cert) = self
728 .request_status_signed(
729 request_id,
730 effective_canister_id,
731 signed_request_status.clone(),
732 )
733 .await?;
734 loop {
735 match resp {
736 RequestStatusResponse::Unknown => {}
737
738 RequestStatusResponse::Received | RequestStatusResponse::Processing => {
739 if !request_accepted {
740 retry_policy.reset();
741 request_accepted = true;
742 }
743 }
744
745 RequestStatusResponse::Replied(ReplyResponse { arg, .. }) => {
746 return Ok((arg, cert))
747 }
748
749 RequestStatusResponse::Rejected(response) => {
750 return Err(AgentError::CertifiedReject {
751 reject: response,
752 operation: None,
753 })
754 }
755
756 RequestStatusResponse::Done => {
757 return Err(AgentError::RequestStatusDoneNoReply(String::from(
758 *request_id,
759 )))
760 }
761 };
762
763 match retry_policy.next_backoff() {
764 Some(duration) => crate::util::sleep(duration).await,
765
766 None => return Err(AgentError::TimeoutWaitingForResponse()),
767 }
768 }
769 }
770
771 pub async fn wait(
773 &self,
774 request_id: &RequestId,
775 effective_canister_id: Principal,
776 ) -> Result<(Vec<u8>, Certificate), AgentError> {
777 self.wait_inner(request_id, effective_canister_id, None)
778 .await
779 }
780
781 async fn wait_inner(
782 &self,
783 request_id: &RequestId,
784 effective_canister_id: Principal,
785 operation: Option<Operation>,
786 ) -> Result<(Vec<u8>, Certificate), AgentError> {
787 let mut retry_policy = self.get_retry_policy();
788
789 let mut request_accepted = false;
790 loop {
791 let (resp, cert) = self
792 .request_status_raw(request_id, effective_canister_id)
793 .await?;
794 match resp {
795 RequestStatusResponse::Unknown => {}
796
797 RequestStatusResponse::Received | RequestStatusResponse::Processing => {
798 if !request_accepted {
799 retry_policy.reset();
807 request_accepted = true;
808 }
809 }
810
811 RequestStatusResponse::Replied(ReplyResponse { arg, .. }) => {
812 return Ok((arg, cert))
813 }
814
815 RequestStatusResponse::Rejected(response) => {
816 return Err(AgentError::CertifiedReject {
817 reject: response,
818 operation,
819 })
820 }
821
822 RequestStatusResponse::Done => {
823 return Err(AgentError::RequestStatusDoneNoReply(String::from(
824 *request_id,
825 )))
826 }
827 };
828
829 match retry_policy.next_backoff() {
830 Some(duration) => crate::util::sleep(duration).await,
831
832 None => return Err(AgentError::TimeoutWaitingForResponse()),
833 }
834 }
835 }
836
837 pub async fn read_state_raw(
840 &self,
841 paths: Vec<Vec<Label>>,
842 effective_canister_id: Principal,
843 ) -> Result<Certificate, AgentError> {
844 let content = self.read_state_content(paths)?;
845 let serialized_bytes = sign_envelope(&content, self.identity.clone())?;
846
847 let read_state_response: ReadStateResponse = self
848 .read_state_endpoint(effective_canister_id, serialized_bytes)
849 .await?;
850 let cert: Certificate = serde_cbor::from_slice(&read_state_response.certificate)
851 .map_err(AgentError::InvalidCborData)?;
852 self.verify(&cert, effective_canister_id)?;
853 Ok(cert)
854 }
855
856 pub async fn read_subnet_state_raw(
859 &self,
860 paths: Vec<Vec<Label>>,
861 subnet_id: Principal,
862 ) -> Result<Certificate, AgentError> {
863 let content = self.read_state_content(paths)?;
864 let serialized_bytes = sign_envelope(&content, self.identity.clone())?;
865
866 let read_state_response: ReadStateResponse = self
867 .read_subnet_state_endpoint(subnet_id, serialized_bytes)
868 .await?;
869 let cert: Certificate = serde_cbor::from_slice(&read_state_response.certificate)
870 .map_err(AgentError::InvalidCborData)?;
871 self.verify_for_subnet(&cert, subnet_id)?;
872 Ok(cert)
873 }
874
875 fn read_state_content(&self, paths: Vec<Vec<Label>>) -> Result<EnvelopeContent, AgentError> {
876 Ok(EnvelopeContent::ReadState {
877 sender: self.identity.sender().map_err(AgentError::SigningError)?,
878 paths,
879 ingress_expiry: self.get_expiry_date(),
880 })
881 }
882
883 pub fn verify(
886 &self,
887 cert: &Certificate,
888 effective_canister_id: Principal,
889 ) -> Result<(), AgentError> {
890 self.verify_cert(cert, effective_canister_id)?;
891 self.verify_cert_timestamp(cert)?;
892 Ok(())
893 }
894
895 fn verify_cert(
896 &self,
897 cert: &Certificate,
898 effective_canister_id: Principal,
899 ) -> Result<(), AgentError> {
900 let sig = &cert.signature;
901
902 let root_hash = cert.tree.digest();
903 let mut msg = vec![];
904 msg.extend_from_slice(IC_STATE_ROOT_DOMAIN_SEPARATOR);
905 msg.extend_from_slice(&root_hash);
906
907 let der_key = self.check_delegation(&cert.delegation, effective_canister_id)?;
908 let key = extract_der(der_key)?;
909
910 ic_verify_bls_signature::verify_bls_signature(sig, &msg, &key)
911 .map_err(|_| AgentError::CertificateVerificationFailed())?;
912 Ok(())
913 }
914
915 pub fn verify_for_subnet(
918 &self,
919 cert: &Certificate,
920 subnet_id: Principal,
921 ) -> Result<(), AgentError> {
922 self.verify_cert_for_subnet(cert, subnet_id)?;
923 self.verify_cert_timestamp(cert)?;
924 Ok(())
925 }
926
927 fn verify_cert_for_subnet(
928 &self,
929 cert: &Certificate,
930 subnet_id: Principal,
931 ) -> Result<(), AgentError> {
932 let sig = &cert.signature;
933
934 let root_hash = cert.tree.digest();
935 let mut msg = vec![];
936 msg.extend_from_slice(IC_STATE_ROOT_DOMAIN_SEPARATOR);
937 msg.extend_from_slice(&root_hash);
938
939 let der_key = self.check_delegation_for_subnet(&cert.delegation, subnet_id)?;
940 let key = extract_der(der_key)?;
941
942 ic_verify_bls_signature::verify_bls_signature(sig, &msg, &key)
943 .map_err(|_| AgentError::CertificateVerificationFailed())?;
944 Ok(())
945 }
946
947 fn verify_cert_timestamp(&self, cert: &Certificate) -> Result<(), AgentError> {
948 let time = lookup_time(cert)?;
949 if (OffsetDateTime::now_utc()
950 - OffsetDateTime::from_unix_timestamp_nanos(time.into()).unwrap())
951 .abs()
952 > self.ingress_expiry
953 {
954 Err(AgentError::CertificateOutdated(self.ingress_expiry))
955 } else {
956 Ok(())
957 }
958 }
959
960 fn check_delegation(
961 &self,
962 delegation: &Option<Delegation>,
963 effective_canister_id: Principal,
964 ) -> Result<Vec<u8>, AgentError> {
965 match delegation {
966 None => Ok(self.read_root_key()),
967 Some(delegation) => {
968 let cert: Certificate = serde_cbor::from_slice(&delegation.certificate)
969 .map_err(AgentError::InvalidCborData)?;
970 if cert.delegation.is_some() {
971 return Err(AgentError::CertificateHasTooManyDelegations);
972 }
973 self.verify_cert(&cert, effective_canister_id)?;
974 let canister_range_lookup = [
975 "subnet".as_bytes(),
976 delegation.subnet_id.as_ref(),
977 "canister_ranges".as_bytes(),
978 ];
979 let canister_range = lookup_value(&cert.tree, canister_range_lookup)?;
980 let ranges: Vec<(Principal, Principal)> =
981 serde_cbor::from_slice(canister_range).map_err(AgentError::InvalidCborData)?;
982 if !principal_is_within_ranges(&effective_canister_id, &ranges[..]) {
983 return Err(AgentError::CertificateNotAuthorized());
985 }
986
987 let public_key_path = [
988 "subnet".as_bytes(),
989 delegation.subnet_id.as_ref(),
990 "public_key".as_bytes(),
991 ];
992 lookup_value(&cert.tree, public_key_path).map(<[u8]>::to_vec)
993 }
994 }
995 }
996
997 fn check_delegation_for_subnet(
998 &self,
999 delegation: &Option<Delegation>,
1000 subnet_id: Principal,
1001 ) -> Result<Vec<u8>, AgentError> {
1002 match delegation {
1003 None => Ok(self.read_root_key()),
1004 Some(delegation) => {
1005 let cert: Certificate = serde_cbor::from_slice(&delegation.certificate)
1006 .map_err(AgentError::InvalidCborData)?;
1007 if cert.delegation.is_some() {
1008 return Err(AgentError::CertificateHasTooManyDelegations);
1009 }
1010 self.verify_cert_for_subnet(&cert, subnet_id)?;
1011 let public_key_path = [
1012 "subnet".as_bytes(),
1013 delegation.subnet_id.as_ref(),
1014 "public_key".as_bytes(),
1015 ];
1016 let pk = lookup_value(&cert.tree, public_key_path)
1017 .map_err(|_| AgentError::CertificateNotAuthorized())?
1018 .to_vec();
1019 Ok(pk)
1020 }
1021 }
1022 }
1023
1024 pub async fn read_state_canister_info(
1027 &self,
1028 canister_id: Principal,
1029 path: &str,
1030 ) -> Result<Vec<u8>, AgentError> {
1031 let paths: Vec<Vec<Label>> = vec![vec![
1032 "canister".into(),
1033 Label::from_bytes(canister_id.as_slice()),
1034 path.into(),
1035 ]];
1036
1037 let cert = self.read_state_raw(paths, canister_id).await?;
1038
1039 lookup_canister_info(cert, canister_id, path)
1040 }
1041
1042 pub async fn read_state_canister_controllers(
1044 &self,
1045 canister_id: Principal,
1046 ) -> Result<Vec<Principal>, AgentError> {
1047 let blob = self
1048 .read_state_canister_info(canister_id, "controllers")
1049 .await?;
1050 let controllers: Vec<Principal> =
1051 serde_cbor::from_slice(&blob).map_err(AgentError::InvalidCborData)?;
1052 Ok(controllers)
1053 }
1054
1055 pub async fn read_state_canister_module_hash(
1057 &self,
1058 canister_id: Principal,
1059 ) -> Result<Vec<u8>, AgentError> {
1060 self.read_state_canister_info(canister_id, "module_hash")
1061 .await
1062 }
1063
1064 pub async fn read_state_canister_metadata(
1066 &self,
1067 canister_id: Principal,
1068 path: &str,
1069 ) -> Result<Vec<u8>, AgentError> {
1070 let paths: Vec<Vec<Label>> = vec![vec![
1071 "canister".into(),
1072 Label::from_bytes(canister_id.as_slice()),
1073 "metadata".into(),
1074 path.into(),
1075 ]];
1076
1077 let cert = self.read_state_raw(paths, canister_id).await?;
1078
1079 lookup_canister_metadata(cert, canister_id, path)
1080 }
1081
1082 pub async fn read_state_subnet_metrics(
1084 &self,
1085 subnet_id: Principal,
1086 ) -> Result<SubnetMetrics, AgentError> {
1087 let paths = vec![vec![
1088 "subnet".into(),
1089 Label::from_bytes(subnet_id.as_slice()),
1090 "metrics".into(),
1091 ]];
1092 let cert = self.read_subnet_state_raw(paths, subnet_id).await?;
1093 lookup_subnet_metrics(cert, subnet_id)
1094 }
1095
1096 pub async fn request_status_raw(
1098 &self,
1099 request_id: &RequestId,
1100 effective_canister_id: Principal,
1101 ) -> Result<(RequestStatusResponse, Certificate), AgentError> {
1102 let paths: Vec<Vec<Label>> =
1103 vec![vec!["request_status".into(), request_id.to_vec().into()]];
1104
1105 let cert = self.read_state_raw(paths, effective_canister_id).await?;
1106
1107 Ok((lookup_request_status(&cert, request_id)?, cert))
1108 }
1109
1110 pub async fn request_status_signed(
1114 &self,
1115 request_id: &RequestId,
1116 effective_canister_id: Principal,
1117 signed_request_status: Vec<u8>,
1118 ) -> Result<(RequestStatusResponse, Certificate), AgentError> {
1119 let _envelope: Envelope =
1120 serde_cbor::from_slice(&signed_request_status).map_err(AgentError::InvalidCborData)?;
1121 let read_state_response: ReadStateResponse = self
1122 .read_state_endpoint(effective_canister_id, signed_request_status)
1123 .await?;
1124
1125 let cert: Certificate = serde_cbor::from_slice(&read_state_response.certificate)
1126 .map_err(AgentError::InvalidCborData)?;
1127 self.verify(&cert, effective_canister_id)?;
1128 Ok((lookup_request_status(&cert, request_id)?, cert))
1129 }
1130
1131 pub fn update<S: Into<String>>(
1134 &self,
1135 canister_id: &Principal,
1136 method_name: S,
1137 ) -> UpdateBuilder {
1138 UpdateBuilder::new(self, *canister_id, method_name.into())
1139 }
1140
1141 pub async fn status(&self) -> Result<Status, AgentError> {
1143 let endpoint = "api/v2/status";
1144 let bytes = self.execute(Method::GET, endpoint, None).await?.1;
1145
1146 let cbor: serde_cbor::Value =
1147 serde_cbor::from_slice(&bytes).map_err(AgentError::InvalidCborData)?;
1148
1149 Status::try_from(&cbor).map_err(|_| AgentError::InvalidReplicaStatus)
1150 }
1151
1152 pub fn query<S: Into<String>>(&self, canister_id: &Principal, method_name: S) -> QueryBuilder {
1155 QueryBuilder::new(self, *canister_id, method_name.into())
1156 }
1157
1158 pub fn sign_request_status(
1161 &self,
1162 effective_canister_id: Principal,
1163 request_id: RequestId,
1164 ) -> Result<SignedRequestStatus, AgentError> {
1165 let paths: Vec<Vec<Label>> =
1166 vec![vec!["request_status".into(), request_id.to_vec().into()]];
1167 let read_state_content = self.read_state_content(paths)?;
1168 let signed_request_status = sign_envelope(&read_state_content, self.identity.clone())?;
1169 let ingress_expiry = read_state_content.ingress_expiry();
1170 let sender = *read_state_content.sender();
1171 Ok(SignedRequestStatus {
1172 ingress_expiry,
1173 sender,
1174 effective_canister_id,
1175 request_id,
1176 signed_request_status,
1177 })
1178 }
1179
1180 async fn get_subnet_by_canister(
1181 &self,
1182 canister: &Principal,
1183 ) -> Result<Arc<Subnet>, AgentError> {
1184 let subnet = self
1185 .subnet_key_cache
1186 .lock()
1187 .unwrap()
1188 .get_subnet_by_canister(canister);
1189 if let Some(subnet) = subnet {
1190 Ok(subnet)
1191 } else {
1192 self.fetch_subnet_by_canister(canister).await
1193 }
1194 }
1195
1196 pub async fn fetch_api_boundary_nodes_by_canister_id(
1198 &self,
1199 canister_id: Principal,
1200 ) -> Result<Vec<ApiBoundaryNode>, AgentError> {
1201 let paths = vec![vec!["api_boundary_nodes".into()]];
1202 let certificate = self.read_state_raw(paths, canister_id).await?;
1203 let api_boundary_nodes = lookup_api_boundary_nodes(certificate)?;
1204 Ok(api_boundary_nodes)
1205 }
1206
1207 pub async fn fetch_api_boundary_nodes_by_subnet_id(
1209 &self,
1210 subnet_id: Principal,
1211 ) -> Result<Vec<ApiBoundaryNode>, AgentError> {
1212 let paths = vec![vec!["api_boundary_nodes".into()]];
1213 let certificate = self.read_subnet_state_raw(paths, subnet_id).await?;
1214 let api_boundary_nodes = lookup_api_boundary_nodes(certificate)?;
1215 Ok(api_boundary_nodes)
1216 }
1217
1218 async fn fetch_subnet_by_canister(
1219 &self,
1220 canister: &Principal,
1221 ) -> Result<Arc<Subnet>, AgentError> {
1222 let cert = self
1223 .read_state_raw(vec![vec!["subnet".into()]], *canister)
1224 .await?;
1225
1226 let (subnet_id, subnet) = lookup_subnet(&cert, &self.root_key.read().unwrap())?;
1227 let subnet = Arc::new(subnet);
1228 self.subnet_key_cache
1229 .lock()
1230 .unwrap()
1231 .insert_subnet(subnet_id, subnet.clone());
1232 Ok(subnet)
1233 }
1234
1235 async fn request(
1236 &self,
1237 method: Method,
1238 endpoint: &str,
1239 body: Option<Vec<u8>>,
1240 ) -> Result<(StatusCode, HeaderMap, Vec<u8>), AgentError> {
1241 let body = body.map(Bytes::from);
1242
1243 let create_request_with_generated_url = || -> Result<http::Request<Bytes>, AgentError> {
1244 let url = self.route_provider.route()?.join(endpoint)?;
1245 let uri = Uri::from_str(url.as_str())
1246 .map_err(|e| AgentError::InvalidReplicaUrl(e.to_string()))?;
1247 let body = body.clone().unwrap_or_default();
1248 let request = http::Request::builder()
1249 .method(method.clone())
1250 .uri(uri)
1251 .header(CONTENT_TYPE, "application/cbor")
1252 .body(body)
1253 .map_err(|e| {
1254 AgentError::TransportError(TransportError::Generic(format!(
1255 "unable to create request: {e:#}"
1256 )))
1257 })?;
1258
1259 Ok(request)
1260 };
1261
1262 let response = self
1263 .client
1264 .call(
1265 &create_request_with_generated_url,
1266 self.max_tcp_error_retries,
1267 self.max_response_body_size,
1268 )
1269 .await?;
1270
1271 let (parts, body) = response.into_parts();
1272
1273 Ok((parts.status, parts.headers, body.to_vec()))
1274 }
1275
1276 async fn execute(
1277 &self,
1278 method: Method,
1279 endpoint: &str,
1280 body: Option<Vec<u8>>,
1281 ) -> Result<(StatusCode, Vec<u8>), AgentError> {
1282 let request_result = self.request(method.clone(), endpoint, body.clone()).await?;
1283
1284 let status = request_result.0;
1285 let headers = request_result.1;
1286 let body = request_result.2;
1287
1288 if status.is_client_error() || status.is_server_error() {
1289 Err(AgentError::HttpError(HttpErrorPayload {
1290 status: status.into(),
1291 content_type: headers
1292 .get(CONTENT_TYPE)
1293 .and_then(|value| value.to_str().ok())
1294 .map(str::to_string),
1295 content: body,
1296 }))
1297 } else if !(status == StatusCode::OK || status == StatusCode::ACCEPTED) {
1298 Err(AgentError::InvalidHttpResponse(format!(
1299 "Expected `200`, `202`, 4xx`, or `5xx` HTTP status code. Got: {status}",
1300 )))
1301 } else {
1302 Ok((status, body))
1303 }
1304 }
1305}
1306
1307fn principal_is_within_ranges(principal: &Principal, ranges: &[(Principal, Principal)]) -> bool {
1310 ranges
1311 .iter()
1312 .any(|r| principal >= &r.0 && principal <= &r.1)
1313}
1314
1315fn sign_envelope(
1316 content: &EnvelopeContent,
1317 identity: Arc<dyn Identity>,
1318) -> Result<Vec<u8>, AgentError> {
1319 let signature = identity.sign(content).map_err(AgentError::SigningError)?;
1320
1321 let envelope = Envelope {
1322 content: Cow::Borrowed(content),
1323 sender_pubkey: signature.public_key,
1324 sender_sig: signature.signature,
1325 sender_delegation: signature.delegations,
1326 };
1327
1328 let mut serialized_bytes = Vec::new();
1329 let mut serializer = serde_cbor::Serializer::new(&mut serialized_bytes);
1330 serializer.self_describe()?;
1331 envelope.serialize(&mut serializer)?;
1332
1333 Ok(serialized_bytes)
1334}
1335
1336pub fn signed_query_inspect(
1339 sender: Principal,
1340 canister_id: Principal,
1341 method_name: &str,
1342 arg: &[u8],
1343 ingress_expiry: u64,
1344 signed_query: Vec<u8>,
1345) -> Result<(), AgentError> {
1346 let envelope: Envelope =
1347 serde_cbor::from_slice(&signed_query).map_err(AgentError::InvalidCborData)?;
1348 match envelope.content.as_ref() {
1349 EnvelopeContent::Query {
1350 ingress_expiry: ingress_expiry_cbor,
1351 sender: sender_cbor,
1352 canister_id: canister_id_cbor,
1353 method_name: method_name_cbor,
1354 arg: arg_cbor,
1355 nonce: _nonce,
1356 } => {
1357 if ingress_expiry != *ingress_expiry_cbor {
1358 return Err(AgentError::CallDataMismatch {
1359 field: "ingress_expiry".to_string(),
1360 value_arg: ingress_expiry.to_string(),
1361 value_cbor: ingress_expiry_cbor.to_string(),
1362 });
1363 }
1364 if sender != *sender_cbor {
1365 return Err(AgentError::CallDataMismatch {
1366 field: "sender".to_string(),
1367 value_arg: sender.to_string(),
1368 value_cbor: sender_cbor.to_string(),
1369 });
1370 }
1371 if canister_id != *canister_id_cbor {
1372 return Err(AgentError::CallDataMismatch {
1373 field: "canister_id".to_string(),
1374 value_arg: canister_id.to_string(),
1375 value_cbor: canister_id_cbor.to_string(),
1376 });
1377 }
1378 if method_name != *method_name_cbor {
1379 return Err(AgentError::CallDataMismatch {
1380 field: "method_name".to_string(),
1381 value_arg: method_name.to_string(),
1382 value_cbor: method_name_cbor.clone(),
1383 });
1384 }
1385 if arg != *arg_cbor {
1386 return Err(AgentError::CallDataMismatch {
1387 field: "arg".to_string(),
1388 value_arg: format!("{arg:?}"),
1389 value_cbor: format!("{arg_cbor:?}"),
1390 });
1391 }
1392 }
1393 EnvelopeContent::Call { .. } => {
1394 return Err(AgentError::CallDataMismatch {
1395 field: "request_type".to_string(),
1396 value_arg: "query".to_string(),
1397 value_cbor: "call".to_string(),
1398 })
1399 }
1400 EnvelopeContent::ReadState { .. } => {
1401 return Err(AgentError::CallDataMismatch {
1402 field: "request_type".to_string(),
1403 value_arg: "query".to_string(),
1404 value_cbor: "read_state".to_string(),
1405 })
1406 }
1407 }
1408 Ok(())
1409}
1410
1411pub fn signed_update_inspect(
1414 sender: Principal,
1415 canister_id: Principal,
1416 method_name: &str,
1417 arg: &[u8],
1418 ingress_expiry: u64,
1419 signed_update: Vec<u8>,
1420) -> Result<(), AgentError> {
1421 let envelope: Envelope =
1422 serde_cbor::from_slice(&signed_update).map_err(AgentError::InvalidCborData)?;
1423 match envelope.content.as_ref() {
1424 EnvelopeContent::Call {
1425 nonce: _nonce,
1426 ingress_expiry: ingress_expiry_cbor,
1427 sender: sender_cbor,
1428 canister_id: canister_id_cbor,
1429 method_name: method_name_cbor,
1430 arg: arg_cbor,
1431 } => {
1432 if ingress_expiry != *ingress_expiry_cbor {
1433 return Err(AgentError::CallDataMismatch {
1434 field: "ingress_expiry".to_string(),
1435 value_arg: ingress_expiry.to_string(),
1436 value_cbor: ingress_expiry_cbor.to_string(),
1437 });
1438 }
1439 if sender != *sender_cbor {
1440 return Err(AgentError::CallDataMismatch {
1441 field: "sender".to_string(),
1442 value_arg: sender.to_string(),
1443 value_cbor: sender_cbor.to_string(),
1444 });
1445 }
1446 if canister_id != *canister_id_cbor {
1447 return Err(AgentError::CallDataMismatch {
1448 field: "canister_id".to_string(),
1449 value_arg: canister_id.to_string(),
1450 value_cbor: canister_id_cbor.to_string(),
1451 });
1452 }
1453 if method_name != *method_name_cbor {
1454 return Err(AgentError::CallDataMismatch {
1455 field: "method_name".to_string(),
1456 value_arg: method_name.to_string(),
1457 value_cbor: method_name_cbor.clone(),
1458 });
1459 }
1460 if arg != *arg_cbor {
1461 return Err(AgentError::CallDataMismatch {
1462 field: "arg".to_string(),
1463 value_arg: format!("{arg:?}"),
1464 value_cbor: format!("{arg_cbor:?}"),
1465 });
1466 }
1467 }
1468 EnvelopeContent::ReadState { .. } => {
1469 return Err(AgentError::CallDataMismatch {
1470 field: "request_type".to_string(),
1471 value_arg: "call".to_string(),
1472 value_cbor: "read_state".to_string(),
1473 })
1474 }
1475 EnvelopeContent::Query { .. } => {
1476 return Err(AgentError::CallDataMismatch {
1477 field: "request_type".to_string(),
1478 value_arg: "call".to_string(),
1479 value_cbor: "query".to_string(),
1480 })
1481 }
1482 }
1483 Ok(())
1484}
1485
1486pub fn signed_request_status_inspect(
1489 sender: Principal,
1490 request_id: &RequestId,
1491 ingress_expiry: u64,
1492 signed_request_status: Vec<u8>,
1493) -> Result<(), AgentError> {
1494 let paths: Vec<Vec<Label>> = vec![vec!["request_status".into(), request_id.to_vec().into()]];
1495 let envelope: Envelope =
1496 serde_cbor::from_slice(&signed_request_status).map_err(AgentError::InvalidCborData)?;
1497 match envelope.content.as_ref() {
1498 EnvelopeContent::ReadState {
1499 ingress_expiry: ingress_expiry_cbor,
1500 sender: sender_cbor,
1501 paths: paths_cbor,
1502 } => {
1503 if ingress_expiry != *ingress_expiry_cbor {
1504 return Err(AgentError::CallDataMismatch {
1505 field: "ingress_expiry".to_string(),
1506 value_arg: ingress_expiry.to_string(),
1507 value_cbor: ingress_expiry_cbor.to_string(),
1508 });
1509 }
1510 if sender != *sender_cbor {
1511 return Err(AgentError::CallDataMismatch {
1512 field: "sender".to_string(),
1513 value_arg: sender.to_string(),
1514 value_cbor: sender_cbor.to_string(),
1515 });
1516 }
1517
1518 if paths != *paths_cbor {
1519 return Err(AgentError::CallDataMismatch {
1520 field: "paths".to_string(),
1521 value_arg: format!("{paths:?}"),
1522 value_cbor: format!("{paths_cbor:?}"),
1523 });
1524 }
1525 }
1526 EnvelopeContent::Query { .. } => {
1527 return Err(AgentError::CallDataMismatch {
1528 field: "request_type".to_string(),
1529 value_arg: "read_state".to_string(),
1530 value_cbor: "query".to_string(),
1531 })
1532 }
1533 EnvelopeContent::Call { .. } => {
1534 return Err(AgentError::CallDataMismatch {
1535 field: "request_type".to_string(),
1536 value_arg: "read_state".to_string(),
1537 value_cbor: "call".to_string(),
1538 })
1539 }
1540 }
1541 Ok(())
1542}
1543
1544#[derive(Clone)]
1545struct SubnetCache {
1546 subnets: TimedCache<Principal, Arc<Subnet>>,
1547 canister_index: RangeInclusiveMap<Principal, Principal, PrincipalStep>,
1548}
1549
1550impl SubnetCache {
1551 fn new() -> Self {
1552 Self {
1553 subnets: TimedCache::with_lifespan(300),
1554 canister_index: RangeInclusiveMap::new_with_step_fns(),
1555 }
1556 }
1557
1558 fn get_subnet_by_canister(&mut self, canister: &Principal) -> Option<Arc<Subnet>> {
1559 self.canister_index
1560 .get(canister)
1561 .and_then(|subnet_id| self.subnets.cache_get(subnet_id).cloned())
1562 .filter(|subnet| subnet.canister_ranges.contains(canister))
1563 }
1564
1565 fn insert_subnet(&mut self, subnet_id: Principal, subnet: Arc<Subnet>) {
1566 self.subnets.cache_set(subnet_id, subnet.clone());
1567 for range in subnet.canister_ranges.iter() {
1568 self.canister_index.insert(range.clone(), subnet_id);
1569 }
1570 }
1571}
1572
1573#[derive(Clone, Copy)]
1574struct PrincipalStep;
1575
1576impl StepFns<Principal> for PrincipalStep {
1577 fn add_one(start: &Principal) -> Principal {
1578 let bytes = start.as_slice();
1579 let mut arr = [0; 29];
1580 arr[..bytes.len()].copy_from_slice(bytes);
1581 for byte in arr[..bytes.len() - 1].iter_mut().rev() {
1582 *byte = byte.wrapping_add(1);
1583 if *byte != 0 {
1584 break;
1585 }
1586 }
1587 Principal::from_slice(&arr[..bytes.len()])
1588 }
1589 fn sub_one(start: &Principal) -> Principal {
1590 let bytes = start.as_slice();
1591 let mut arr = [0; 29];
1592 arr[..bytes.len()].copy_from_slice(bytes);
1593 for byte in arr[..bytes.len() - 1].iter_mut().rev() {
1594 *byte = byte.wrapping_sub(1);
1595 if *byte != 255 {
1596 break;
1597 }
1598 }
1599 Principal::from_slice(&arr[..bytes.len()])
1600 }
1601}
1602
1603#[derive(Clone)]
1604pub(crate) struct Subnet {
1605 _key: Vec<u8>,
1608 node_keys: HashMap<Principal, Vec<u8>>,
1609 canister_ranges: RangeInclusiveSet<Principal, PrincipalStep>,
1610}
1611
1612#[derive(Debug, Clone)]
1614pub struct ApiBoundaryNode {
1615 pub domain: String,
1617 pub ipv6_address: String,
1619 pub ipv4_address: Option<String>,
1621}
1622
1623#[derive(Debug, Clone)]
1627#[non_exhaustive]
1628pub struct QueryBuilder<'agent> {
1629 agent: &'agent Agent,
1630 pub effective_canister_id: Principal,
1632 pub canister_id: Principal,
1634 pub method_name: String,
1636 pub arg: Vec<u8>,
1638 pub ingress_expiry_datetime: Option<u64>,
1640 pub use_nonce: bool,
1642}
1643
1644impl<'agent> QueryBuilder<'agent> {
1645 pub fn new(agent: &'agent Agent, canister_id: Principal, method_name: String) -> Self {
1647 Self {
1648 agent,
1649 effective_canister_id: canister_id,
1650 canister_id,
1651 method_name,
1652 arg: vec![],
1653 ingress_expiry_datetime: None,
1654 use_nonce: false,
1655 }
1656 }
1657
1658 pub fn with_effective_canister_id(mut self, canister_id: Principal) -> Self {
1660 self.effective_canister_id = canister_id;
1661 self
1662 }
1663
1664 pub fn with_arg<A: Into<Vec<u8>>>(mut self, arg: A) -> Self {
1666 self.arg = arg.into();
1667 self
1668 }
1669
1670 pub fn expire_at(mut self, time: impl Into<OffsetDateTime>) -> Self {
1672 self.ingress_expiry_datetime = Some(time.into().unix_timestamp_nanos() as u64);
1673 self
1674 }
1675
1676 pub fn expire_after(mut self, duration: Duration) -> Self {
1678 self.ingress_expiry_datetime = Some(
1679 OffsetDateTime::now_utc()
1680 .saturating_add(duration.try_into().expect("negative duration"))
1681 .unix_timestamp_nanos() as u64,
1682 );
1683 self
1684 }
1685
1686 pub fn with_nonce_generation(mut self) -> Self {
1689 self.use_nonce = true;
1690 self
1691 }
1692
1693 pub async fn call(self) -> Result<Vec<u8>, AgentError> {
1695 self.agent
1696 .query_raw(
1697 self.canister_id,
1698 self.effective_canister_id,
1699 self.method_name,
1700 self.arg,
1701 self.ingress_expiry_datetime,
1702 self.use_nonce,
1703 None,
1704 )
1705 .await
1706 }
1707
1708 pub async fn call_with_verification(self) -> Result<Vec<u8>, AgentError> {
1713 self.agent
1714 .query_raw(
1715 self.canister_id,
1716 self.effective_canister_id,
1717 self.method_name,
1718 self.arg,
1719 self.ingress_expiry_datetime,
1720 self.use_nonce,
1721 Some(true),
1722 )
1723 .await
1724 }
1725
1726 pub async fn call_without_verification(self) -> Result<Vec<u8>, AgentError> {
1731 self.agent
1732 .query_raw(
1733 self.canister_id,
1734 self.effective_canister_id,
1735 self.method_name,
1736 self.arg,
1737 self.ingress_expiry_datetime,
1738 self.use_nonce,
1739 Some(false),
1740 )
1741 .await
1742 }
1743
1744 pub fn sign(self) -> Result<SignedQuery, AgentError> {
1747 let effective_canister_id = self.effective_canister_id;
1748 let identity = self.agent.identity.clone();
1749 let content = self.into_envelope()?;
1750 let signed_query = sign_envelope(&content, identity)?;
1751 let EnvelopeContent::Query {
1752 ingress_expiry,
1753 sender,
1754 canister_id,
1755 method_name,
1756 arg,
1757 nonce,
1758 } = content
1759 else {
1760 unreachable!()
1761 };
1762 Ok(SignedQuery {
1763 ingress_expiry,
1764 sender,
1765 canister_id,
1766 method_name,
1767 arg,
1768 effective_canister_id,
1769 signed_query,
1770 nonce,
1771 })
1772 }
1773
1774 pub fn into_envelope(self) -> Result<EnvelopeContent, AgentError> {
1776 self.agent.query_content(
1777 self.canister_id,
1778 self.method_name,
1779 self.arg,
1780 self.ingress_expiry_datetime,
1781 self.use_nonce,
1782 )
1783 }
1784}
1785
1786impl<'agent> IntoFuture for QueryBuilder<'agent> {
1787 type IntoFuture = AgentFuture<'agent, Vec<u8>>;
1788 type Output = Result<Vec<u8>, AgentError>;
1789 fn into_future(self) -> Self::IntoFuture {
1790 Box::pin(self.call())
1791 }
1792}
1793
1794pub struct UpdateCall<'agent> {
1796 agent: &'agent Agent,
1797 response_future: AgentFuture<'agent, CallResponse<(Vec<u8>, Certificate)>>,
1798 effective_canister_id: Principal,
1799 canister_id: Principal,
1800 method_name: String,
1801}
1802
1803impl fmt::Debug for UpdateCall<'_> {
1804 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1805 f.debug_struct("UpdateCall")
1806 .field("agent", &self.agent)
1807 .field("effective_canister_id", &self.effective_canister_id)
1808 .finish_non_exhaustive()
1809 }
1810}
1811
1812impl Future for UpdateCall<'_> {
1813 type Output = Result<CallResponse<(Vec<u8>, Certificate)>, AgentError>;
1814 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1815 self.response_future.as_mut().poll(cx)
1816 }
1817}
1818
1819impl<'a> UpdateCall<'a> {
1820 pub async fn and_wait(self) -> Result<(Vec<u8>, Certificate), AgentError> {
1822 let response = self.response_future.await?;
1823
1824 match response {
1825 CallResponse::Response(response) => Ok(response),
1826 CallResponse::Poll(request_id) => {
1827 self.agent
1828 .wait_inner(
1829 &request_id,
1830 self.effective_canister_id,
1831 Some(Operation::Call {
1832 canister: self.canister_id,
1833 method: self.method_name,
1834 }),
1835 )
1836 .await
1837 }
1838 }
1839 }
1840}
1841#[derive(Debug)]
1846pub struct UpdateBuilder<'agent> {
1847 agent: &'agent Agent,
1848 pub effective_canister_id: Principal,
1850 pub canister_id: Principal,
1852 pub method_name: String,
1854 pub arg: Vec<u8>,
1856 pub ingress_expiry_datetime: Option<u64>,
1858}
1859
1860impl<'agent> UpdateBuilder<'agent> {
1861 pub fn new(agent: &'agent Agent, canister_id: Principal, method_name: String) -> Self {
1863 Self {
1864 agent,
1865 effective_canister_id: canister_id,
1866 canister_id,
1867 method_name,
1868 arg: vec![],
1869 ingress_expiry_datetime: None,
1870 }
1871 }
1872
1873 pub fn with_effective_canister_id(mut self, canister_id: Principal) -> Self {
1875 self.effective_canister_id = canister_id;
1876 self
1877 }
1878
1879 pub fn with_arg<A: Into<Vec<u8>>>(mut self, arg: A) -> Self {
1881 self.arg = arg.into();
1882 self
1883 }
1884
1885 pub fn expire_at(mut self, time: impl Into<OffsetDateTime>) -> Self {
1887 self.ingress_expiry_datetime = Some(time.into().unix_timestamp_nanos() as u64);
1888 self
1889 }
1890
1891 pub fn expire_after(mut self, duration: Duration) -> Self {
1893 self.ingress_expiry_datetime = Some(
1894 OffsetDateTime::now_utc()
1895 .saturating_add(duration.try_into().expect("negative duration"))
1896 .unix_timestamp_nanos() as u64,
1897 );
1898 self
1899 }
1900
1901 pub async fn call_and_wait(self) -> Result<Vec<u8>, AgentError> {
1904 self.call().and_wait().await.map(|x| x.0)
1905 }
1906
1907 pub fn call(self) -> UpdateCall<'agent> {
1910 let method_name = self.method_name.clone();
1911 let response_future = async move {
1912 self.agent
1913 .update_raw(
1914 self.canister_id,
1915 self.effective_canister_id,
1916 self.method_name,
1917 self.arg,
1918 self.ingress_expiry_datetime,
1919 )
1920 .await
1921 };
1922 UpdateCall {
1923 agent: self.agent,
1924 response_future: Box::pin(response_future),
1925 effective_canister_id: self.effective_canister_id,
1926 canister_id: self.canister_id,
1927 method_name,
1928 }
1929 }
1930
1931 pub fn sign(self) -> Result<SignedUpdate, AgentError> {
1934 let identity = self.agent.identity.clone();
1935 let effective_canister_id = self.effective_canister_id;
1936 let content = self.into_envelope()?;
1937 let signed_update = sign_envelope(&content, identity)?;
1938 let request_id = to_request_id(&content)?;
1939 let EnvelopeContent::Call {
1940 nonce,
1941 ingress_expiry,
1942 sender,
1943 canister_id,
1944 method_name,
1945 arg,
1946 } = content
1947 else {
1948 unreachable!()
1949 };
1950 Ok(SignedUpdate {
1951 nonce,
1952 ingress_expiry,
1953 sender,
1954 canister_id,
1955 method_name,
1956 arg,
1957 effective_canister_id,
1958 signed_update,
1959 request_id,
1960 })
1961 }
1962
1963 pub fn into_envelope(self) -> Result<EnvelopeContent, AgentError> {
1965 let nonce = self.agent.nonce_factory.generate();
1966 self.agent.update_content(
1967 self.canister_id,
1968 self.method_name,
1969 self.arg,
1970 self.ingress_expiry_datetime,
1971 nonce,
1972 )
1973 }
1974}
1975
1976impl<'agent> IntoFuture for UpdateBuilder<'agent> {
1977 type IntoFuture = AgentFuture<'agent, Vec<u8>>;
1978 type Output = Result<Vec<u8>, AgentError>;
1979 fn into_future(self) -> Self::IntoFuture {
1980 Box::pin(self.call_and_wait())
1981 }
1982}
1983
1984#[cfg_attr(target_family = "wasm", async_trait(?Send))]
1986#[cfg_attr(not(target_family = "wasm"), async_trait)]
1987pub trait HttpService: Send + Sync + Debug {
1988 async fn call<'a>(
1990 &'a self,
1991 req: &'a (dyn Fn() -> Result<http::Request<Bytes>, AgentError> + Send + Sync),
1992 max_retries: usize,
1993 size_limit: Option<usize>,
1994 ) -> Result<http::Response<Bytes>, AgentError>;
1995}
1996
1997fn from_http_request(req: http::Request<Bytes>) -> Result<Request, AgentError> {
1999 let (parts, body) = req.into_parts();
2000 let body = reqwest::Body::from(body);
2001 let request = http::Request::from_parts(parts, body)
2004 .try_into()
2005 .map_err(|e: reqwest::Error| AgentError::InvalidReplicaUrl(e.to_string()))?;
2006
2007 Ok(request)
2008}
2009
2010#[cfg(not(target_family = "wasm"))]
2012async fn to_http_response(
2013 resp: Response,
2014 size_limit: Option<usize>,
2015) -> Result<http::Response<Bytes>, AgentError> {
2016 use http_body_util::{BodyExt, Limited};
2017
2018 let resp: http::Response<reqwest::Body> = resp.into();
2019 let (parts, body) = resp.into_parts();
2020 let body = Limited::new(body, size_limit.unwrap_or(usize::MAX));
2021 let body = body
2022 .collect()
2023 .await
2024 .map_err(|e| {
2025 AgentError::TransportError(TransportError::Generic(format!(
2026 "unable to read response body: {e:#}"
2027 )))
2028 })?
2029 .to_bytes();
2030 let resp = http::Response::from_parts(parts, body);
2031
2032 Ok(resp)
2033}
2034
2035#[cfg(target_family = "wasm")]
2039async fn to_http_response(
2040 resp: Response,
2041 size_limit: Option<usize>,
2042) -> Result<http::Response<Bytes>, AgentError> {
2043 use futures_util::StreamExt;
2044 use http_body::Frame;
2045 use http_body_util::{Limited, StreamBody};
2046
2047 let status = resp.status();
2049 let headers = resp.headers().clone();
2050
2051 let stream = resp.bytes_stream().map(|x| x.map(Frame::data));
2053 let body = StreamBody::new(stream);
2054 let body = Limited::new(body, size_limit.unwrap_or(usize::MAX));
2055 let body = http_body_util::BodyExt::collect(body)
2056 .await
2057 .map_err(|e| {
2058 AgentError::TransportError(TransportError::Generic(format!(
2059 "unable to read response body: {e:#}"
2060 )))
2061 })?
2062 .to_bytes();
2063
2064 let mut resp = http::Response::new(body);
2065 *resp.status_mut() = status;
2066 *resp.headers_mut() = headers;
2067
2068 Ok(resp)
2069}
2070
2071#[cfg(not(target_family = "wasm"))]
2072#[async_trait]
2073impl<T> HttpService for T
2074where
2075 for<'a> &'a T: Service<Request, Response = Response, Error = reqwest::Error>,
2076 for<'a> <&'a Self as Service<Request>>::Future: Send,
2077 T: Send + Sync + Debug + ?Sized,
2078{
2079 #[allow(clippy::needless_arbitrary_self_type)]
2080 async fn call<'a>(
2081 mut self: &'a Self,
2082 req: &'a (dyn Fn() -> Result<http::Request<Bytes>, AgentError> + Send + Sync),
2083 max_retries: usize,
2084 size_limit: Option<usize>,
2085 ) -> Result<http::Response<Bytes>, AgentError> {
2086 let mut retry_count = 0;
2087 loop {
2088 let request = from_http_request(req()?)?;
2089
2090 match Service::call(&mut self, request).await {
2091 Err(err) => {
2092 if err.is_connect() {
2094 if retry_count >= max_retries {
2095 return Err(AgentError::TransportError(TransportError::Reqwest(err)));
2096 }
2097 retry_count += 1;
2098 }
2099 }
2100
2101 Ok(resp) => {
2102 let resp = to_http_response(resp, size_limit).await?;
2103 return Ok(resp);
2104 }
2105 }
2106 }
2107 }
2108}
2109
2110#[cfg(target_family = "wasm")]
2111#[async_trait(?Send)]
2112impl<T> HttpService for T
2113where
2114 for<'a> &'a T: Service<Request, Response = Response, Error = reqwest::Error>,
2115 T: Send + Sync + Debug + ?Sized,
2116{
2117 #[allow(clippy::needless_arbitrary_self_type)]
2118 async fn call<'a>(
2119 mut self: &'a Self,
2120 req: &'a (dyn Fn() -> Result<http::Request<Bytes>, AgentError> + Send + Sync),
2121 _retries: usize,
2122 _size_limit: Option<usize>,
2123 ) -> Result<http::Response<Bytes>, AgentError> {
2124 let request = from_http_request(req()?)?;
2125 let response = Service::call(&mut self, request)
2126 .await
2127 .map_err(|e| AgentError::TransportError(TransportError::Reqwest(e)))?;
2128
2129 to_http_response(response, _size_limit).await
2130 }
2131}
2132
2133#[derive(Debug)]
2134struct Retry429Logic {
2135 client: Client,
2136}
2137
2138#[cfg_attr(target_family = "wasm", async_trait(?Send))]
2139#[cfg_attr(not(target_family = "wasm"), async_trait)]
2140impl HttpService for Retry429Logic {
2141 async fn call<'a>(
2142 &'a self,
2143 req: &'a (dyn Fn() -> Result<http::Request<Bytes>, AgentError> + Send + Sync),
2144 _max_tcp_retries: usize,
2145 _size_limit: Option<usize>,
2146 ) -> Result<http::Response<Bytes>, AgentError> {
2147 let mut retries = 0;
2148 loop {
2149 #[cfg(not(target_family = "wasm"))]
2150 let resp = self.client.call(req, _max_tcp_retries, _size_limit).await?;
2151 #[cfg(target_family = "wasm")]
2153 let resp = {
2154 let request = from_http_request(req()?)?;
2155 let resp = self
2156 .client
2157 .execute(request)
2158 .await
2159 .map_err(|e| AgentError::TransportError(TransportError::Reqwest(e)))?;
2160
2161 to_http_response(resp, _size_limit).await?
2162 };
2163
2164 if resp.status() == StatusCode::TOO_MANY_REQUESTS {
2165 if retries == 6 {
2166 break Ok(resp);
2167 } else {
2168 retries += 1;
2169 crate::util::sleep(Duration::from_millis(250)).await;
2170 continue;
2171 }
2172 } else {
2173 break Ok(resp);
2174 }
2175 }
2176 }
2177}
2178
2179#[cfg(all(test, not(target_family = "wasm")))]
2180mod offline_tests {
2181 use super::*;
2182 use tokio::net::TcpListener;
2183 #[test]
2186 fn rounded_expiry() {
2187 let agent = Agent::builder()
2188 .with_url("http://not-a-real-url")
2189 .build()
2190 .unwrap();
2191 let mut prev_expiry = None;
2192 let mut num_timestamps = 0;
2193 for _ in 0..6 {
2194 let update = agent
2195 .update(&Principal::management_canister(), "not_a_method")
2196 .sign()
2197 .unwrap();
2198 if prev_expiry < Some(update.ingress_expiry) {
2199 prev_expiry = Some(update.ingress_expiry);
2200 num_timestamps += 1;
2201 }
2202 }
2203 assert!(num_timestamps <= 2, "num_timestamps:{num_timestamps} > 2");
2205 }
2206
2207 #[tokio::test]
2208 async fn client_ratelimit() {
2209 let mock_server = TcpListener::bind("127.0.0.1:0").await.unwrap();
2210 let count = Arc::new(Mutex::new(0));
2211 let port = mock_server.local_addr().unwrap().port();
2212 tokio::spawn({
2213 let count = count.clone();
2214 async move {
2215 loop {
2216 let (mut conn, _) = mock_server.accept().await.unwrap();
2217 *count.lock().unwrap() += 1;
2218 tokio::spawn(
2219 async move { tokio::io::copy(&mut conn, &mut tokio::io::sink()).await },
2221 );
2222 }
2223 }
2224 });
2225 let agent = Agent::builder()
2226 .with_http_client(Client::builder().http1_only().build().unwrap())
2227 .with_url(format!("http://127.0.0.1:{port}"))
2228 .with_max_concurrent_requests(2)
2229 .build()
2230 .unwrap();
2231 for _ in 0..3 {
2232 let agent = agent.clone();
2233 tokio::spawn(async move {
2234 agent
2235 .query(&"ryjl3-tyaaa-aaaaa-aaaba-cai".parse().unwrap(), "greet")
2236 .call()
2237 .await
2238 });
2239 }
2240 crate::util::sleep(Duration::from_millis(250)).await;
2241 assert_eq!(*count.lock().unwrap(), 2);
2242 }
2243}