1pub(crate) mod agent_config;
3pub mod agent_error;
4pub(crate) mod builder;
5#[doc(hidden)]
7#[deprecated(since = "0.38.0", note = "use the AgentBuilder methods")]
8pub mod http_transport;
9pub(crate) mod nonce;
10pub(crate) mod response_authentication;
11pub mod route_provider;
12pub mod status;
13
14pub use agent_config::AgentConfig;
15pub use agent_error::AgentError;
16use agent_error::HttpErrorPayload;
17use async_lock::Semaphore;
18use async_trait::async_trait;
19pub use builder::AgentBuilder;
20use cached::{Cached, TimedCache};
21use ed25519_consensus::{Error as Ed25519Error, Signature, VerificationKey};
22use futures_util::StreamExt;
23use http::{header::CONTENT_TYPE, HeaderMap, Method, StatusCode};
24#[doc(inline)]
25pub use ic_transport_types::{
26 signed, CallResponse, Envelope, EnvelopeContent, RejectCode, RejectResponse, ReplyResponse,
27 RequestStatusResponse,
28};
29pub use nonce::{NonceFactory, NonceGenerator};
30use rangemap::{RangeInclusiveMap, RangeInclusiveSet, StepFns};
31use reqwest::{Body, Client, Request, Response};
32use route_provider::{
33 dynamic_routing::{
34 dynamic_route_provider::DynamicRouteProviderBuilder, node::Node,
35 snapshot::latency_based_routing::LatencyRoutingSnapshot,
36 },
37 RouteProvider, UrlUntilReady,
38};
39use time::OffsetDateTime;
40use tower_service::Service;
41
42#[cfg(test)]
43mod agent_test;
44
45use crate::{
46 agent::response_authentication::{
47 extract_der, lookup_canister_info, lookup_canister_metadata, lookup_request_status,
48 lookup_subnet, lookup_subnet_metrics, lookup_time, lookup_value,
49 },
50 export::Principal,
51 identity::Identity,
52 to_request_id, RequestId,
53};
54use backoff::{backoff::Backoff, ExponentialBackoffBuilder};
55use backoff::{exponential::ExponentialBackoff, SystemClock};
56use ic_certification::{Certificate, Delegation, Label};
57use ic_transport_types::{
58 signed::{SignedQuery, SignedRequestStatus, SignedUpdate},
59 QueryResponse, ReadStateResponse, SubnetMetrics, TransportCallResponse,
60};
61use serde::Serialize;
62use status::Status;
63use std::{
64 borrow::Cow,
65 collections::HashMap,
66 convert::TryFrom,
67 fmt::{self, Debug},
68 future::{Future, IntoFuture},
69 pin::Pin,
70 sync::{Arc, Mutex, RwLock},
71 task::{Context, Poll},
72 time::Duration,
73};
74
75use crate::agent::response_authentication::lookup_api_boundary_nodes;
76
77const IC_STATE_ROOT_DOMAIN_SEPARATOR: &[u8; 14] = b"\x0Dic-state-root";
78
79const IC_ROOT_KEY: &[u8; 133] = b"\x30\x81\x82\x30\x1d\x06\x0d\x2b\x06\x01\x04\x01\x82\xdc\x7c\x05\x03\x01\x02\x01\x06\x0c\x2b\x06\x01\x04\x01\x82\xdc\x7c\x05\x03\x02\x01\x03\x61\x00\x81\x4c\x0e\x6e\xc7\x1f\xab\x58\x3b\x08\xbd\x81\x37\x3c\x25\x5c\x3c\x37\x1b\x2e\x84\x86\x3c\x98\xa4\xf1\xe0\x8b\x74\x23\x5d\x14\xfb\x5d\x9c\x0c\xd5\x46\xd9\x68\x5f\x91\x3a\x0c\x0b\x2c\xc5\x34\x15\x83\xbf\x4b\x43\x92\xe4\x67\xdb\x96\xd6\x5b\x9b\xb4\xcb\x71\x71\x12\xf8\x47\x2e\x0d\x5a\x4d\x14\x50\x5f\xfd\x74\x84\xb0\x12\x91\x09\x1c\x5f\x87\xb9\x88\x83\x46\x3f\x98\x09\x1a\x0b\xaa\xae";
80
81#[cfg(not(target_family = "wasm"))]
82type AgentFuture<'a, V> = Pin<Box<dyn Future<Output = Result<V, AgentError>> + Send + 'a>>;
83
84#[cfg(target_family = "wasm")]
85type AgentFuture<'a, V> = Pin<Box<dyn Future<Output = Result<V, AgentError>> + 'a>>;
86
87#[derive(Clone)]
153pub struct Agent {
154 nonce_factory: Arc<dyn NonceGenerator>,
155 identity: Arc<dyn Identity>,
156 ingress_expiry: Duration,
157 root_key: Arc<RwLock<Vec<u8>>>,
158 client: Arc<dyn HttpService>,
159 route_provider: Arc<dyn RouteProvider>,
160 subnet_key_cache: Arc<Mutex<SubnetCache>>,
161 concurrent_requests_semaphore: Arc<Semaphore>,
162 verify_query_signatures: bool,
163 max_response_body_size: Option<usize>,
164 max_polling_time: Duration,
165 #[allow(dead_code)]
166 max_tcp_error_retries: usize,
167}
168
169impl fmt::Debug for Agent {
170 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
171 f.debug_struct("Agent")
172 .field("ingress_expiry", &self.ingress_expiry)
173 .finish_non_exhaustive()
174 }
175}
176
177impl Agent {
178 pub fn builder() -> builder::AgentBuilder {
181 Default::default()
182 }
183
184 pub fn new(config: agent_config::AgentConfig) -> Result<Agent, AgentError> {
186 let client = config.http_service.unwrap_or_else(|| {
187 Arc::new(Retry429Logic {
188 client: config.client.unwrap_or_else(|| {
189 #[cfg(not(target_family = "wasm"))]
190 {
191 Client::builder()
192 .use_rustls_tls()
193 .timeout(Duration::from_secs(360))
194 .build()
195 .expect("Could not create HTTP client.")
196 }
197 #[cfg(all(target_family = "wasm", feature = "wasm-bindgen"))]
198 {
199 Client::new()
200 }
201 }),
202 })
203 });
204 Ok(Agent {
205 nonce_factory: config.nonce_factory,
206 identity: config.identity,
207 ingress_expiry: config.ingress_expiry,
208 root_key: Arc::new(RwLock::new(IC_ROOT_KEY.to_vec())),
209 client: client.clone(),
210 route_provider: if let Some(route_provider) = config.route_provider {
211 route_provider
212 } else if let Some(url) = config.url {
213 if config.background_dynamic_routing {
214 assert!(
215 url.scheme() == "https" && url.path() == "/" && url.port().is_none() && url.domain().is_some(),
216 "in dynamic routing mode, URL must be in the exact form https://domain with no path, port, IP, or non-HTTPS scheme"
217 );
218 let seeds = vec![Node::new(url.domain().unwrap()).unwrap()];
219 UrlUntilReady::new(url, async move {
220 DynamicRouteProviderBuilder::new(
221 LatencyRoutingSnapshot::new(),
222 seeds,
223 client,
224 )
225 .build()
226 .await
227 }) as Arc<dyn RouteProvider>
228 } else {
229 Arc::new(url)
230 }
231 } else {
232 panic!("either route_provider or url must be specified");
233 },
234 subnet_key_cache: Arc::new(Mutex::new(SubnetCache::new())),
235 verify_query_signatures: config.verify_query_signatures,
236 concurrent_requests_semaphore: Arc::new(Semaphore::new(config.max_concurrent_requests)),
237 max_response_body_size: config.max_response_body_size,
238 max_tcp_error_retries: config.max_tcp_error_retries,
239 max_polling_time: config.max_polling_time,
240 })
241 }
242
243 pub fn set_identity<I>(&mut self, identity: I)
249 where
250 I: 'static + Identity,
251 {
252 self.identity = Arc::new(identity);
253 }
254 pub fn set_arc_identity(&mut self, identity: Arc<dyn Identity>) {
260 self.identity = identity;
261 }
262
263 pub async fn fetch_root_key(&self) -> Result<(), AgentError> {
272 if self.read_root_key()[..] != IC_ROOT_KEY[..] {
273 return Ok(());
275 }
276 let status = self.status().await?;
277 let Some(root_key) = status.root_key else {
278 return Err(AgentError::NoRootKeyInStatus(status));
279 };
280 self.set_root_key(root_key);
281 Ok(())
282 }
283
284 pub fn set_root_key(&self, root_key: Vec<u8>) {
289 *self.root_key.write().unwrap() = root_key;
290 }
291
292 pub fn read_root_key(&self) -> Vec<u8> {
294 self.root_key.read().unwrap().clone()
295 }
296
297 fn get_expiry_date(&self) -> u64 {
298 let expiry_raw = OffsetDateTime::now_utc() + self.ingress_expiry;
299 let mut rounded = expiry_raw.replace_nanosecond(0).unwrap();
300 if self.ingress_expiry.as_secs() > 90 {
301 rounded = rounded.replace_second(0).unwrap();
302 }
303 rounded.unix_timestamp_nanos().try_into().unwrap()
304 }
305
306 pub fn get_principal(&self) -> Result<Principal, String> {
308 self.identity.sender()
309 }
310
311 async fn query_endpoint<A>(
312 &self,
313 effective_canister_id: Principal,
314 serialized_bytes: Vec<u8>,
315 ) -> Result<A, AgentError>
316 where
317 A: serde::de::DeserializeOwned,
318 {
319 let _permit = self.concurrent_requests_semaphore.acquire().await;
320 let bytes = self
321 .execute(
322 Method::POST,
323 &format!("api/v2/canister/{}/query", effective_canister_id.to_text()),
324 Some(serialized_bytes),
325 )
326 .await?
327 .1;
328 serde_cbor::from_slice(&bytes).map_err(AgentError::InvalidCborData)
329 }
330
331 async fn read_state_endpoint<A>(
332 &self,
333 effective_canister_id: Principal,
334 serialized_bytes: Vec<u8>,
335 ) -> Result<A, AgentError>
336 where
337 A: serde::de::DeserializeOwned,
338 {
339 let _permit = self.concurrent_requests_semaphore.acquire().await;
340 let endpoint = format!(
341 "api/v2/canister/{}/read_state",
342 effective_canister_id.to_text()
343 );
344 let bytes = self
345 .execute(Method::POST, &endpoint, Some(serialized_bytes))
346 .await?
347 .1;
348 serde_cbor::from_slice(&bytes).map_err(AgentError::InvalidCborData)
349 }
350
351 async fn read_subnet_state_endpoint<A>(
352 &self,
353 subnet_id: Principal,
354 serialized_bytes: Vec<u8>,
355 ) -> Result<A, AgentError>
356 where
357 A: serde::de::DeserializeOwned,
358 {
359 let _permit = self.concurrent_requests_semaphore.acquire().await;
360 let endpoint = format!("api/v2/subnet/{}/read_state", subnet_id.to_text());
361 let bytes = self
362 .execute(Method::POST, &endpoint, Some(serialized_bytes))
363 .await?
364 .1;
365 serde_cbor::from_slice(&bytes).map_err(AgentError::InvalidCborData)
366 }
367
368 async fn call_endpoint(
369 &self,
370 effective_canister_id: Principal,
371 serialized_bytes: Vec<u8>,
372 ) -> Result<TransportCallResponse, AgentError> {
373 let _permit = self.concurrent_requests_semaphore.acquire().await;
374 let endpoint = format!("api/v3/canister/{}/call", effective_canister_id.to_text());
375 let (status_code, response_body) = self
376 .execute(Method::POST, &endpoint, Some(serialized_bytes))
377 .await?;
378
379 if status_code == StatusCode::ACCEPTED {
380 return Ok(TransportCallResponse::Accepted);
381 }
382
383 serde_cbor::from_slice(&response_body).map_err(AgentError::InvalidCborData)
384 }
385
386 #[allow(clippy::too_many_arguments)]
389 async fn query_raw(
390 &self,
391 canister_id: Principal,
392 effective_canister_id: Principal,
393 method_name: String,
394 arg: Vec<u8>,
395 ingress_expiry_datetime: Option<u64>,
396 use_nonce: bool,
397 explicit_verify_query_signatures: Option<bool>,
398 ) -> Result<Vec<u8>, AgentError> {
399 let content = self.query_content(
400 canister_id,
401 method_name,
402 arg,
403 ingress_expiry_datetime,
404 use_nonce,
405 )?;
406 let serialized_bytes = sign_envelope(&content, self.identity.clone())?;
407 self.query_inner(
408 effective_canister_id,
409 serialized_bytes,
410 content.to_request_id(),
411 explicit_verify_query_signatures,
412 )
413 .await
414 }
415
416 pub async fn query_signed(
420 &self,
421 effective_canister_id: Principal,
422 signed_query: Vec<u8>,
423 ) -> Result<Vec<u8>, AgentError> {
424 let envelope: Envelope =
425 serde_cbor::from_slice(&signed_query).map_err(AgentError::InvalidCborData)?;
426 self.query_inner(
427 effective_canister_id,
428 signed_query,
429 envelope.content.to_request_id(),
430 None,
431 )
432 .await
433 }
434
435 async fn query_inner(
439 &self,
440 effective_canister_id: Principal,
441 signed_query: Vec<u8>,
442 request_id: RequestId,
443 explicit_verify_query_signatures: Option<bool>,
444 ) -> Result<Vec<u8>, AgentError> {
445 let response = if explicit_verify_query_signatures.unwrap_or(self.verify_query_signatures) {
446 let (response, mut subnet) = futures_util::try_join!(
447 self.query_endpoint::<QueryResponse>(effective_canister_id, signed_query),
448 self.get_subnet_by_canister(&effective_canister_id)
449 )?;
450 if response.signatures().is_empty() {
451 return Err(AgentError::MissingSignature);
452 } else if response.signatures().len() > subnet.node_keys.len() {
453 return Err(AgentError::TooManySignatures {
454 had: response.signatures().len(),
455 needed: subnet.node_keys.len(),
456 });
457 }
458 for signature in response.signatures() {
459 if OffsetDateTime::now_utc()
460 - OffsetDateTime::from_unix_timestamp_nanos(signature.timestamp.into()).unwrap()
461 > self.ingress_expiry
462 {
463 return Err(AgentError::CertificateOutdated(self.ingress_expiry));
464 }
465 let signable = response.signable(request_id, signature.timestamp);
466 let node_key = if let Some(node_key) = subnet.node_keys.get(&signature.identity) {
467 node_key
468 } else {
469 subnet = self
470 .fetch_subnet_by_canister(&effective_canister_id)
471 .await?;
472 subnet
473 .node_keys
474 .get(&signature.identity)
475 .ok_or(AgentError::CertificateNotAuthorized())?
476 };
477 if node_key.len() != 44 {
478 return Err(AgentError::DerKeyLengthMismatch {
479 expected: 44,
480 actual: node_key.len(),
481 });
482 }
483 const DER_PREFIX: [u8; 12] = [48, 42, 48, 5, 6, 3, 43, 101, 112, 3, 33, 0];
484 if node_key[..12] != DER_PREFIX {
485 return Err(AgentError::DerPrefixMismatch {
486 expected: DER_PREFIX.to_vec(),
487 actual: node_key[..12].to_vec(),
488 });
489 }
490 let pubkey =
491 VerificationKey::try_from(<[u8; 32]>::try_from(&node_key[12..]).unwrap())
492 .map_err(|_| AgentError::MalformedPublicKey)?;
493 let sig = Signature::from(
494 <[u8; 64]>::try_from(&signature.signature[..])
495 .map_err(|_| AgentError::MalformedSignature)?,
496 );
497
498 match pubkey.verify(&sig, &signable) {
499 Err(Ed25519Error::InvalidSignature) => {
500 return Err(AgentError::QuerySignatureVerificationFailed)
501 }
502 Err(Ed25519Error::InvalidSliceLength) => {
503 return Err(AgentError::MalformedSignature)
504 }
505 Err(Ed25519Error::MalformedPublicKey) => {
506 return Err(AgentError::MalformedPublicKey)
507 }
508 Ok(()) => (),
509 _ => unreachable!(),
510 }
511 }
512 response
513 } else {
514 self.query_endpoint::<QueryResponse>(effective_canister_id, signed_query)
515 .await?
516 };
517
518 match response {
519 QueryResponse::Replied { reply, .. } => Ok(reply.arg),
520 QueryResponse::Rejected { reject, .. } => Err(AgentError::UncertifiedReject(reject)),
521 }
522 }
523
524 fn query_content(
525 &self,
526 canister_id: Principal,
527 method_name: String,
528 arg: Vec<u8>,
529 ingress_expiry_datetime: Option<u64>,
530 use_nonce: bool,
531 ) -> Result<EnvelopeContent, AgentError> {
532 Ok(EnvelopeContent::Query {
533 sender: self.identity.sender().map_err(AgentError::SigningError)?,
534 canister_id,
535 method_name,
536 arg,
537 ingress_expiry: ingress_expiry_datetime.unwrap_or_else(|| self.get_expiry_date()),
538 nonce: use_nonce.then(|| self.nonce_factory.generate()).flatten(),
539 })
540 }
541
542 async fn update_raw(
544 &self,
545 canister_id: Principal,
546 effective_canister_id: Principal,
547 method_name: String,
548 arg: Vec<u8>,
549 ingress_expiry_datetime: Option<u64>,
550 ) -> Result<CallResponse<(Vec<u8>, Certificate)>, AgentError> {
551 let nonce = self.nonce_factory.generate();
552 let content = self.update_content(
553 canister_id,
554 method_name,
555 arg,
556 ingress_expiry_datetime,
557 nonce,
558 )?;
559 let request_id = to_request_id(&content)?;
560 let serialized_bytes = sign_envelope(&content, self.identity.clone())?;
561
562 let response_body = self
563 .call_endpoint(effective_canister_id, serialized_bytes)
564 .await?;
565
566 match response_body {
567 TransportCallResponse::Replied { certificate } => {
568 let certificate =
569 serde_cbor::from_slice(&certificate).map_err(AgentError::InvalidCborData)?;
570
571 self.verify(&certificate, effective_canister_id)?;
572 let status = lookup_request_status(&certificate, &request_id)?;
573
574 match status {
575 RequestStatusResponse::Replied(reply) => {
576 Ok(CallResponse::Response((reply.arg, certificate)))
577 }
578 RequestStatusResponse::Rejected(reject_response) => {
579 Err(AgentError::CertifiedReject(reject_response))?
580 }
581 _ => Ok(CallResponse::Poll(request_id)),
582 }
583 }
584 TransportCallResponse::Accepted => Ok(CallResponse::Poll(request_id)),
585 TransportCallResponse::NonReplicatedRejection(reject_response) => {
586 Err(AgentError::UncertifiedReject(reject_response))
587 }
588 }
589 }
590
591 pub async fn update_signed(
595 &self,
596 effective_canister_id: Principal,
597 signed_update: Vec<u8>,
598 ) -> Result<CallResponse<Vec<u8>>, AgentError> {
599 let envelope: Envelope =
600 serde_cbor::from_slice(&signed_update).map_err(AgentError::InvalidCborData)?;
601 let request_id = to_request_id(&envelope.content)?;
602
603 let response_body = self
604 .call_endpoint(effective_canister_id, signed_update)
605 .await?;
606
607 match response_body {
608 TransportCallResponse::Replied { certificate } => {
609 let certificate =
610 serde_cbor::from_slice(&certificate).map_err(AgentError::InvalidCborData)?;
611
612 self.verify(&certificate, effective_canister_id)?;
613 let status = lookup_request_status(&certificate, &request_id)?;
614
615 match status {
616 RequestStatusResponse::Replied(reply) => Ok(CallResponse::Response(reply.arg)),
617 RequestStatusResponse::Rejected(reject_response) => {
618 Err(AgentError::CertifiedReject(reject_response))?
619 }
620 _ => Ok(CallResponse::Poll(request_id)),
621 }
622 }
623 TransportCallResponse::Accepted => Ok(CallResponse::Poll(request_id)),
624 TransportCallResponse::NonReplicatedRejection(reject_response) => {
625 Err(AgentError::UncertifiedReject(reject_response))
626 }
627 }
628 }
629
630 fn update_content(
631 &self,
632 canister_id: Principal,
633 method_name: String,
634 arg: Vec<u8>,
635 ingress_expiry_datetime: Option<u64>,
636 nonce: Option<Vec<u8>>,
637 ) -> Result<EnvelopeContent, AgentError> {
638 Ok(EnvelopeContent::Call {
639 canister_id,
640 method_name,
641 arg,
642 nonce,
643 sender: self.identity.sender().map_err(AgentError::SigningError)?,
644 ingress_expiry: ingress_expiry_datetime.unwrap_or_else(|| self.get_expiry_date()),
645 })
646 }
647
648 fn get_retry_policy(&self) -> ExponentialBackoff<SystemClock> {
649 ExponentialBackoffBuilder::new()
650 .with_initial_interval(Duration::from_millis(500))
651 .with_max_interval(Duration::from_secs(1))
652 .with_multiplier(1.4)
653 .with_max_elapsed_time(Some(self.max_polling_time))
654 .build()
655 }
656
657 pub async fn wait_signed(
659 &self,
660 request_id: &RequestId,
661 effective_canister_id: Principal,
662 signed_request_status: Vec<u8>,
663 ) -> Result<(Vec<u8>, Certificate), AgentError> {
664 let mut retry_policy = self.get_retry_policy();
665
666 let mut request_accepted = false;
667 let (resp, cert) = self
668 .request_status_signed(
669 request_id,
670 effective_canister_id,
671 signed_request_status.clone(),
672 )
673 .await?;
674 loop {
675 match resp {
676 RequestStatusResponse::Unknown => {}
677
678 RequestStatusResponse::Received | RequestStatusResponse::Processing => {
679 if !request_accepted {
680 retry_policy.reset();
681 request_accepted = true;
682 }
683 }
684
685 RequestStatusResponse::Replied(ReplyResponse { arg, .. }) => {
686 return Ok((arg, cert))
687 }
688
689 RequestStatusResponse::Rejected(response) => {
690 return Err(AgentError::CertifiedReject(response))
691 }
692
693 RequestStatusResponse::Done => {
694 return Err(AgentError::RequestStatusDoneNoReply(String::from(
695 *request_id,
696 )))
697 }
698 };
699
700 match retry_policy.next_backoff() {
701 Some(duration) => crate::util::sleep(duration).await,
702
703 None => return Err(AgentError::TimeoutWaitingForResponse()),
704 }
705 }
706 }
707
708 pub async fn wait(
710 &self,
711 request_id: &RequestId,
712 effective_canister_id: Principal,
713 ) -> Result<(Vec<u8>, Certificate), AgentError> {
714 let mut retry_policy = self.get_retry_policy();
715
716 let mut request_accepted = false;
717 loop {
718 let (resp, cert) = self
719 .request_status_raw(request_id, effective_canister_id)
720 .await?;
721 match resp {
722 RequestStatusResponse::Unknown => {}
723
724 RequestStatusResponse::Received | RequestStatusResponse::Processing => {
725 if !request_accepted {
726 retry_policy.reset();
734 request_accepted = true;
735 }
736 }
737
738 RequestStatusResponse::Replied(ReplyResponse { arg, .. }) => {
739 return Ok((arg, cert))
740 }
741
742 RequestStatusResponse::Rejected(response) => {
743 return Err(AgentError::CertifiedReject(response))
744 }
745
746 RequestStatusResponse::Done => {
747 return Err(AgentError::RequestStatusDoneNoReply(String::from(
748 *request_id,
749 )))
750 }
751 };
752
753 match retry_policy.next_backoff() {
754 Some(duration) => crate::util::sleep(duration).await,
755
756 None => return Err(AgentError::TimeoutWaitingForResponse()),
757 }
758 }
759 }
760
761 pub async fn read_state_raw(
764 &self,
765 paths: Vec<Vec<Label>>,
766 effective_canister_id: Principal,
767 ) -> Result<Certificate, AgentError> {
768 let content = self.read_state_content(paths)?;
769 let serialized_bytes = sign_envelope(&content, self.identity.clone())?;
770
771 let read_state_response: ReadStateResponse = self
772 .read_state_endpoint(effective_canister_id, serialized_bytes)
773 .await?;
774 let cert: Certificate = serde_cbor::from_slice(&read_state_response.certificate)
775 .map_err(AgentError::InvalidCborData)?;
776 self.verify(&cert, effective_canister_id)?;
777 Ok(cert)
778 }
779
780 pub async fn read_subnet_state_raw(
783 &self,
784 paths: Vec<Vec<Label>>,
785 subnet_id: Principal,
786 ) -> Result<Certificate, AgentError> {
787 let content = self.read_state_content(paths)?;
788 let serialized_bytes = sign_envelope(&content, self.identity.clone())?;
789
790 let read_state_response: ReadStateResponse = self
791 .read_subnet_state_endpoint(subnet_id, serialized_bytes)
792 .await?;
793 let cert: Certificate = serde_cbor::from_slice(&read_state_response.certificate)
794 .map_err(AgentError::InvalidCborData)?;
795 self.verify_for_subnet(&cert, subnet_id)?;
796 Ok(cert)
797 }
798
799 fn read_state_content(&self, paths: Vec<Vec<Label>>) -> Result<EnvelopeContent, AgentError> {
800 Ok(EnvelopeContent::ReadState {
801 sender: self.identity.sender().map_err(AgentError::SigningError)?,
802 paths,
803 ingress_expiry: self.get_expiry_date(),
804 })
805 }
806
807 pub fn verify(
810 &self,
811 cert: &Certificate,
812 effective_canister_id: Principal,
813 ) -> Result<(), AgentError> {
814 self.verify_cert(cert, effective_canister_id)?;
815 self.verify_cert_timestamp(cert)?;
816 Ok(())
817 }
818
819 fn verify_cert(
820 &self,
821 cert: &Certificate,
822 effective_canister_id: Principal,
823 ) -> Result<(), AgentError> {
824 let sig = &cert.signature;
825
826 let root_hash = cert.tree.digest();
827 let mut msg = vec![];
828 msg.extend_from_slice(IC_STATE_ROOT_DOMAIN_SEPARATOR);
829 msg.extend_from_slice(&root_hash);
830
831 let der_key = self.check_delegation(&cert.delegation, effective_canister_id)?;
832 let key = extract_der(der_key)?;
833
834 ic_verify_bls_signature::verify_bls_signature(sig, &msg, &key)
835 .map_err(|_| AgentError::CertificateVerificationFailed())?;
836 Ok(())
837 }
838
839 pub fn verify_for_subnet(
842 &self,
843 cert: &Certificate,
844 subnet_id: Principal,
845 ) -> Result<(), AgentError> {
846 self.verify_cert_for_subnet(cert, subnet_id)?;
847 self.verify_cert_timestamp(cert)?;
848 Ok(())
849 }
850
851 fn verify_cert_for_subnet(
852 &self,
853 cert: &Certificate,
854 subnet_id: Principal,
855 ) -> Result<(), AgentError> {
856 let sig = &cert.signature;
857
858 let root_hash = cert.tree.digest();
859 let mut msg = vec![];
860 msg.extend_from_slice(IC_STATE_ROOT_DOMAIN_SEPARATOR);
861 msg.extend_from_slice(&root_hash);
862
863 let der_key = self.check_delegation_for_subnet(&cert.delegation, subnet_id)?;
864 let key = extract_der(der_key)?;
865
866 ic_verify_bls_signature::verify_bls_signature(sig, &msg, &key)
867 .map_err(|_| AgentError::CertificateVerificationFailed())?;
868 Ok(())
869 }
870
871 fn verify_cert_timestamp(&self, cert: &Certificate) -> Result<(), AgentError> {
872 let time = lookup_time(cert)?;
873 if (OffsetDateTime::now_utc()
874 - OffsetDateTime::from_unix_timestamp_nanos(time.into()).unwrap())
875 .abs()
876 > self.ingress_expiry
877 {
878 Err(AgentError::CertificateOutdated(self.ingress_expiry))
879 } else {
880 Ok(())
881 }
882 }
883
884 fn check_delegation(
885 &self,
886 delegation: &Option<Delegation>,
887 effective_canister_id: Principal,
888 ) -> Result<Vec<u8>, AgentError> {
889 match delegation {
890 None => Ok(self.read_root_key()),
891 Some(delegation) => {
892 let cert: Certificate = serde_cbor::from_slice(&delegation.certificate)
893 .map_err(AgentError::InvalidCborData)?;
894 if cert.delegation.is_some() {
895 return Err(AgentError::CertificateHasTooManyDelegations);
896 }
897 self.verify_cert(&cert, effective_canister_id)?;
898 let canister_range_lookup = [
899 "subnet".as_bytes(),
900 delegation.subnet_id.as_ref(),
901 "canister_ranges".as_bytes(),
902 ];
903 let canister_range = lookup_value(&cert.tree, canister_range_lookup)?;
904 let ranges: Vec<(Principal, Principal)> =
905 serde_cbor::from_slice(canister_range).map_err(AgentError::InvalidCborData)?;
906 if !principal_is_within_ranges(&effective_canister_id, &ranges[..]) {
907 return Err(AgentError::CertificateNotAuthorized());
909 }
910
911 let public_key_path = [
912 "subnet".as_bytes(),
913 delegation.subnet_id.as_ref(),
914 "public_key".as_bytes(),
915 ];
916 lookup_value(&cert.tree, public_key_path).map(<[u8]>::to_vec)
917 }
918 }
919 }
920
921 fn check_delegation_for_subnet(
922 &self,
923 delegation: &Option<Delegation>,
924 subnet_id: Principal,
925 ) -> Result<Vec<u8>, AgentError> {
926 match delegation {
927 None => Ok(self.read_root_key()),
928 Some(delegation) => {
929 let cert: Certificate = serde_cbor::from_slice(&delegation.certificate)
930 .map_err(AgentError::InvalidCborData)?;
931 if cert.delegation.is_some() {
932 return Err(AgentError::CertificateHasTooManyDelegations);
933 }
934 self.verify_cert_for_subnet(&cert, subnet_id)?;
935 let public_key_path = [
936 "subnet".as_bytes(),
937 delegation.subnet_id.as_ref(),
938 "public_key".as_bytes(),
939 ];
940 let pk = lookup_value(&cert.tree, public_key_path)
941 .map_err(|_| AgentError::CertificateNotAuthorized())?
942 .to_vec();
943 Ok(pk)
944 }
945 }
946 }
947
948 pub async fn read_state_canister_info(
951 &self,
952 canister_id: Principal,
953 path: &str,
954 ) -> Result<Vec<u8>, AgentError> {
955 let paths: Vec<Vec<Label>> = vec![vec![
956 "canister".into(),
957 Label::from_bytes(canister_id.as_slice()),
958 path.into(),
959 ]];
960
961 let cert = self.read_state_raw(paths, canister_id).await?;
962
963 lookup_canister_info(cert, canister_id, path)
964 }
965
966 pub async fn read_state_canister_metadata(
968 &self,
969 canister_id: Principal,
970 path: &str,
971 ) -> Result<Vec<u8>, AgentError> {
972 let paths: Vec<Vec<Label>> = vec![vec![
973 "canister".into(),
974 Label::from_bytes(canister_id.as_slice()),
975 "metadata".into(),
976 path.into(),
977 ]];
978
979 let cert = self.read_state_raw(paths, canister_id).await?;
980
981 lookup_canister_metadata(cert, canister_id, path)
982 }
983
984 pub async fn read_state_subnet_metrics(
986 &self,
987 subnet_id: Principal,
988 ) -> Result<SubnetMetrics, AgentError> {
989 let paths = vec![vec![
990 "subnet".into(),
991 Label::from_bytes(subnet_id.as_slice()),
992 "metrics".into(),
993 ]];
994 let cert = self.read_subnet_state_raw(paths, subnet_id).await?;
995 lookup_subnet_metrics(cert, subnet_id)
996 }
997
998 pub async fn request_status_raw(
1000 &self,
1001 request_id: &RequestId,
1002 effective_canister_id: Principal,
1003 ) -> Result<(RequestStatusResponse, Certificate), AgentError> {
1004 let paths: Vec<Vec<Label>> =
1005 vec![vec!["request_status".into(), request_id.to_vec().into()]];
1006
1007 let cert = self.read_state_raw(paths, effective_canister_id).await?;
1008
1009 Ok((lookup_request_status(&cert, request_id)?, cert))
1010 }
1011
1012 pub async fn request_status_signed(
1016 &self,
1017 request_id: &RequestId,
1018 effective_canister_id: Principal,
1019 signed_request_status: Vec<u8>,
1020 ) -> Result<(RequestStatusResponse, Certificate), AgentError> {
1021 let _envelope: Envelope =
1022 serde_cbor::from_slice(&signed_request_status).map_err(AgentError::InvalidCborData)?;
1023 let read_state_response: ReadStateResponse = self
1024 .read_state_endpoint(effective_canister_id, signed_request_status)
1025 .await?;
1026
1027 let cert: Certificate = serde_cbor::from_slice(&read_state_response.certificate)
1028 .map_err(AgentError::InvalidCborData)?;
1029 self.verify(&cert, effective_canister_id)?;
1030 Ok((lookup_request_status(&cert, request_id)?, cert))
1031 }
1032
1033 pub fn update<S: Into<String>>(
1036 &self,
1037 canister_id: &Principal,
1038 method_name: S,
1039 ) -> UpdateBuilder {
1040 UpdateBuilder::new(self, *canister_id, method_name.into())
1041 }
1042
1043 pub async fn status(&self) -> Result<Status, AgentError> {
1045 let endpoint = "api/v2/status";
1046 let bytes = self.execute(Method::GET, endpoint, None).await?.1;
1047
1048 let cbor: serde_cbor::Value =
1049 serde_cbor::from_slice(&bytes).map_err(AgentError::InvalidCborData)?;
1050
1051 Status::try_from(&cbor).map_err(|_| AgentError::InvalidReplicaStatus)
1052 }
1053
1054 pub fn query<S: Into<String>>(&self, canister_id: &Principal, method_name: S) -> QueryBuilder {
1057 QueryBuilder::new(self, *canister_id, method_name.into())
1058 }
1059
1060 pub fn sign_request_status(
1063 &self,
1064 effective_canister_id: Principal,
1065 request_id: RequestId,
1066 ) -> Result<SignedRequestStatus, AgentError> {
1067 let paths: Vec<Vec<Label>> =
1068 vec![vec!["request_status".into(), request_id.to_vec().into()]];
1069 let read_state_content = self.read_state_content(paths)?;
1070 let signed_request_status = sign_envelope(&read_state_content, self.identity.clone())?;
1071 let ingress_expiry = read_state_content.ingress_expiry();
1072 let sender = *read_state_content.sender();
1073 Ok(SignedRequestStatus {
1074 ingress_expiry,
1075 sender,
1076 effective_canister_id,
1077 request_id,
1078 signed_request_status,
1079 })
1080 }
1081
1082 async fn get_subnet_by_canister(
1083 &self,
1084 canister: &Principal,
1085 ) -> Result<Arc<Subnet>, AgentError> {
1086 let subnet = self
1087 .subnet_key_cache
1088 .lock()
1089 .unwrap()
1090 .get_subnet_by_canister(canister);
1091 if let Some(subnet) = subnet {
1092 Ok(subnet)
1093 } else {
1094 self.fetch_subnet_by_canister(canister).await
1095 }
1096 }
1097
1098 pub async fn fetch_api_boundary_nodes_by_canister_id(
1100 &self,
1101 canister_id: Principal,
1102 ) -> Result<Vec<ApiBoundaryNode>, AgentError> {
1103 let paths = vec![vec!["api_boundary_nodes".into()]];
1104 let certificate = self.read_state_raw(paths, canister_id).await?;
1105 let api_boundary_nodes = lookup_api_boundary_nodes(certificate)?;
1106 Ok(api_boundary_nodes)
1107 }
1108
1109 pub async fn fetch_api_boundary_nodes_by_subnet_id(
1111 &self,
1112 subnet_id: Principal,
1113 ) -> Result<Vec<ApiBoundaryNode>, AgentError> {
1114 let paths = vec![vec!["api_boundary_nodes".into()]];
1115 let certificate = self.read_subnet_state_raw(paths, subnet_id).await?;
1116 let api_boundary_nodes = lookup_api_boundary_nodes(certificate)?;
1117 Ok(api_boundary_nodes)
1118 }
1119
1120 async fn fetch_subnet_by_canister(
1121 &self,
1122 canister: &Principal,
1123 ) -> Result<Arc<Subnet>, AgentError> {
1124 let cert = self
1125 .read_state_raw(vec![vec!["subnet".into()]], *canister)
1126 .await?;
1127
1128 let (subnet_id, subnet) = lookup_subnet(&cert, &self.root_key.read().unwrap())?;
1129 let subnet = Arc::new(subnet);
1130 self.subnet_key_cache
1131 .lock()
1132 .unwrap()
1133 .insert_subnet(subnet_id, subnet.clone());
1134 Ok(subnet)
1135 }
1136
1137 async fn request(
1138 &self,
1139 method: Method,
1140 endpoint: &str,
1141 body: Option<Vec<u8>>,
1142 ) -> Result<(StatusCode, HeaderMap, Vec<u8>), AgentError> {
1143 let create_request_with_generated_url = || -> Result<Request, AgentError> {
1144 let url = self.route_provider.route()?.join(endpoint)?;
1145 let mut http_request = Request::new(method.clone(), url);
1146 http_request
1147 .headers_mut()
1148 .insert(CONTENT_TYPE, "application/cbor".parse().unwrap());
1149 *http_request.body_mut() = body.clone().map(Body::from);
1150 Ok(http_request)
1151 };
1152
1153 let response = self
1154 .client
1155 .call(
1156 &create_request_with_generated_url,
1157 self.max_tcp_error_retries,
1158 )
1159 .await?;
1160
1161 let http_status = response.status();
1162 let response_headers = response.headers().clone();
1163
1164 if matches!(self
1166 .max_response_body_size
1167 .zip(response.content_length()), Some((size_limit, content_length)) if content_length > size_limit as u64)
1168 {
1169 return Err(AgentError::ResponseSizeExceededLimit());
1170 }
1171
1172 let mut body: Vec<u8> = response
1173 .content_length()
1174 .map_or_else(Vec::new, |n| Vec::with_capacity(n as usize));
1175
1176 let mut stream = response.bytes_stream();
1177
1178 while let Some(chunk) = stream.next().await {
1179 let chunk = chunk?;
1180
1181 if matches!(self
1183 .max_response_body_size, Some(size_limit) if body.len() + chunk.len() > size_limit)
1184 {
1185 return Err(AgentError::ResponseSizeExceededLimit());
1186 }
1187
1188 body.extend_from_slice(chunk.as_ref());
1189 }
1190
1191 Ok((http_status, response_headers, body))
1192 }
1193
1194 async fn execute(
1195 &self,
1196 method: Method,
1197 endpoint: &str,
1198 body: Option<Vec<u8>>,
1199 ) -> Result<(StatusCode, Vec<u8>), AgentError> {
1200 let request_result = self.request(method.clone(), endpoint, body.clone()).await?;
1201
1202 let status = request_result.0;
1203 let headers = request_result.1;
1204 let body = request_result.2;
1205
1206 if status.is_client_error() || status.is_server_error() {
1207 Err(AgentError::HttpError(HttpErrorPayload {
1208 status: status.into(),
1209 content_type: headers
1210 .get(CONTENT_TYPE)
1211 .and_then(|value| value.to_str().ok())
1212 .map(str::to_string),
1213 content: body,
1214 }))
1215 } else if !(status == StatusCode::OK || status == StatusCode::ACCEPTED) {
1216 Err(AgentError::InvalidHttpResponse(format!(
1217 "Expected `200`, `202`, 4xx`, or `5xx` HTTP status code. Got: {status}",
1218 )))
1219 } else {
1220 Ok((status, body))
1221 }
1222 }
1223}
1224
1225fn principal_is_within_ranges(principal: &Principal, ranges: &[(Principal, Principal)]) -> bool {
1228 ranges
1229 .iter()
1230 .any(|r| principal >= &r.0 && principal <= &r.1)
1231}
1232
1233fn sign_envelope(
1234 content: &EnvelopeContent,
1235 identity: Arc<dyn Identity>,
1236) -> Result<Vec<u8>, AgentError> {
1237 let signature = identity.sign(content).map_err(AgentError::SigningError)?;
1238
1239 let envelope = Envelope {
1240 content: Cow::Borrowed(content),
1241 sender_pubkey: signature.public_key,
1242 sender_sig: signature.signature,
1243 sender_delegation: signature.delegations,
1244 };
1245
1246 let mut serialized_bytes = Vec::new();
1247 let mut serializer = serde_cbor::Serializer::new(&mut serialized_bytes);
1248 serializer.self_describe()?;
1249 envelope.serialize(&mut serializer)?;
1250
1251 Ok(serialized_bytes)
1252}
1253
1254pub fn signed_query_inspect(
1257 sender: Principal,
1258 canister_id: Principal,
1259 method_name: &str,
1260 arg: &[u8],
1261 ingress_expiry: u64,
1262 signed_query: Vec<u8>,
1263) -> Result<(), AgentError> {
1264 let envelope: Envelope =
1265 serde_cbor::from_slice(&signed_query).map_err(AgentError::InvalidCborData)?;
1266 match envelope.content.as_ref() {
1267 EnvelopeContent::Query {
1268 ingress_expiry: ingress_expiry_cbor,
1269 sender: sender_cbor,
1270 canister_id: canister_id_cbor,
1271 method_name: method_name_cbor,
1272 arg: arg_cbor,
1273 nonce: _nonce,
1274 } => {
1275 if ingress_expiry != *ingress_expiry_cbor {
1276 return Err(AgentError::CallDataMismatch {
1277 field: "ingress_expiry".to_string(),
1278 value_arg: ingress_expiry.to_string(),
1279 value_cbor: ingress_expiry_cbor.to_string(),
1280 });
1281 }
1282 if sender != *sender_cbor {
1283 return Err(AgentError::CallDataMismatch {
1284 field: "sender".to_string(),
1285 value_arg: sender.to_string(),
1286 value_cbor: sender_cbor.to_string(),
1287 });
1288 }
1289 if canister_id != *canister_id_cbor {
1290 return Err(AgentError::CallDataMismatch {
1291 field: "canister_id".to_string(),
1292 value_arg: canister_id.to_string(),
1293 value_cbor: canister_id_cbor.to_string(),
1294 });
1295 }
1296 if method_name != *method_name_cbor {
1297 return Err(AgentError::CallDataMismatch {
1298 field: "method_name".to_string(),
1299 value_arg: method_name.to_string(),
1300 value_cbor: method_name_cbor.clone(),
1301 });
1302 }
1303 if arg != *arg_cbor {
1304 return Err(AgentError::CallDataMismatch {
1305 field: "arg".to_string(),
1306 value_arg: format!("{arg:?}"),
1307 value_cbor: format!("{arg_cbor:?}"),
1308 });
1309 }
1310 }
1311 EnvelopeContent::Call { .. } => {
1312 return Err(AgentError::CallDataMismatch {
1313 field: "request_type".to_string(),
1314 value_arg: "query".to_string(),
1315 value_cbor: "call".to_string(),
1316 })
1317 }
1318 EnvelopeContent::ReadState { .. } => {
1319 return Err(AgentError::CallDataMismatch {
1320 field: "request_type".to_string(),
1321 value_arg: "query".to_string(),
1322 value_cbor: "read_state".to_string(),
1323 })
1324 }
1325 }
1326 Ok(())
1327}
1328
1329pub fn signed_update_inspect(
1332 sender: Principal,
1333 canister_id: Principal,
1334 method_name: &str,
1335 arg: &[u8],
1336 ingress_expiry: u64,
1337 signed_update: Vec<u8>,
1338) -> Result<(), AgentError> {
1339 let envelope: Envelope =
1340 serde_cbor::from_slice(&signed_update).map_err(AgentError::InvalidCborData)?;
1341 match envelope.content.as_ref() {
1342 EnvelopeContent::Call {
1343 nonce: _nonce,
1344 ingress_expiry: ingress_expiry_cbor,
1345 sender: sender_cbor,
1346 canister_id: canister_id_cbor,
1347 method_name: method_name_cbor,
1348 arg: arg_cbor,
1349 } => {
1350 if ingress_expiry != *ingress_expiry_cbor {
1351 return Err(AgentError::CallDataMismatch {
1352 field: "ingress_expiry".to_string(),
1353 value_arg: ingress_expiry.to_string(),
1354 value_cbor: ingress_expiry_cbor.to_string(),
1355 });
1356 }
1357 if sender != *sender_cbor {
1358 return Err(AgentError::CallDataMismatch {
1359 field: "sender".to_string(),
1360 value_arg: sender.to_string(),
1361 value_cbor: sender_cbor.to_string(),
1362 });
1363 }
1364 if canister_id != *canister_id_cbor {
1365 return Err(AgentError::CallDataMismatch {
1366 field: "canister_id".to_string(),
1367 value_arg: canister_id.to_string(),
1368 value_cbor: canister_id_cbor.to_string(),
1369 });
1370 }
1371 if method_name != *method_name_cbor {
1372 return Err(AgentError::CallDataMismatch {
1373 field: "method_name".to_string(),
1374 value_arg: method_name.to_string(),
1375 value_cbor: method_name_cbor.clone(),
1376 });
1377 }
1378 if arg != *arg_cbor {
1379 return Err(AgentError::CallDataMismatch {
1380 field: "arg".to_string(),
1381 value_arg: format!("{arg:?}"),
1382 value_cbor: format!("{arg_cbor:?}"),
1383 });
1384 }
1385 }
1386 EnvelopeContent::ReadState { .. } => {
1387 return Err(AgentError::CallDataMismatch {
1388 field: "request_type".to_string(),
1389 value_arg: "call".to_string(),
1390 value_cbor: "read_state".to_string(),
1391 })
1392 }
1393 EnvelopeContent::Query { .. } => {
1394 return Err(AgentError::CallDataMismatch {
1395 field: "request_type".to_string(),
1396 value_arg: "call".to_string(),
1397 value_cbor: "query".to_string(),
1398 })
1399 }
1400 }
1401 Ok(())
1402}
1403
1404pub fn signed_request_status_inspect(
1407 sender: Principal,
1408 request_id: &RequestId,
1409 ingress_expiry: u64,
1410 signed_request_status: Vec<u8>,
1411) -> Result<(), AgentError> {
1412 let paths: Vec<Vec<Label>> = vec![vec!["request_status".into(), request_id.to_vec().into()]];
1413 let envelope: Envelope =
1414 serde_cbor::from_slice(&signed_request_status).map_err(AgentError::InvalidCborData)?;
1415 match envelope.content.as_ref() {
1416 EnvelopeContent::ReadState {
1417 ingress_expiry: ingress_expiry_cbor,
1418 sender: sender_cbor,
1419 paths: paths_cbor,
1420 } => {
1421 if ingress_expiry != *ingress_expiry_cbor {
1422 return Err(AgentError::CallDataMismatch {
1423 field: "ingress_expiry".to_string(),
1424 value_arg: ingress_expiry.to_string(),
1425 value_cbor: ingress_expiry_cbor.to_string(),
1426 });
1427 }
1428 if sender != *sender_cbor {
1429 return Err(AgentError::CallDataMismatch {
1430 field: "sender".to_string(),
1431 value_arg: sender.to_string(),
1432 value_cbor: sender_cbor.to_string(),
1433 });
1434 }
1435
1436 if paths != *paths_cbor {
1437 return Err(AgentError::CallDataMismatch {
1438 field: "paths".to_string(),
1439 value_arg: format!("{paths:?}"),
1440 value_cbor: format!("{paths_cbor:?}"),
1441 });
1442 }
1443 }
1444 EnvelopeContent::Query { .. } => {
1445 return Err(AgentError::CallDataMismatch {
1446 field: "request_type".to_string(),
1447 value_arg: "read_state".to_string(),
1448 value_cbor: "query".to_string(),
1449 })
1450 }
1451 EnvelopeContent::Call { .. } => {
1452 return Err(AgentError::CallDataMismatch {
1453 field: "request_type".to_string(),
1454 value_arg: "read_state".to_string(),
1455 value_cbor: "call".to_string(),
1456 })
1457 }
1458 }
1459 Ok(())
1460}
1461
1462#[derive(Clone)]
1463struct SubnetCache {
1464 subnets: TimedCache<Principal, Arc<Subnet>>,
1465 canister_index: RangeInclusiveMap<Principal, Principal, PrincipalStep>,
1466}
1467
1468impl SubnetCache {
1469 fn new() -> Self {
1470 Self {
1471 subnets: TimedCache::with_lifespan(300),
1472 canister_index: RangeInclusiveMap::new_with_step_fns(),
1473 }
1474 }
1475
1476 fn get_subnet_by_canister(&mut self, canister: &Principal) -> Option<Arc<Subnet>> {
1477 self.canister_index
1478 .get(canister)
1479 .and_then(|subnet_id| self.subnets.cache_get(subnet_id).cloned())
1480 .filter(|subnet| subnet.canister_ranges.contains(canister))
1481 }
1482
1483 fn insert_subnet(&mut self, subnet_id: Principal, subnet: Arc<Subnet>) {
1484 self.subnets.cache_set(subnet_id, subnet.clone());
1485 for range in subnet.canister_ranges.iter() {
1486 self.canister_index.insert(range.clone(), subnet_id);
1487 }
1488 }
1489}
1490
1491#[derive(Clone, Copy)]
1492struct PrincipalStep;
1493
1494impl StepFns<Principal> for PrincipalStep {
1495 fn add_one(start: &Principal) -> Principal {
1496 let bytes = start.as_slice();
1497 let mut arr = [0; 29];
1498 arr[..bytes.len()].copy_from_slice(bytes);
1499 for byte in arr[..bytes.len() - 1].iter_mut().rev() {
1500 *byte = byte.wrapping_add(1);
1501 if *byte != 0 {
1502 break;
1503 }
1504 }
1505 Principal::from_slice(&arr[..bytes.len()])
1506 }
1507 fn sub_one(start: &Principal) -> Principal {
1508 let bytes = start.as_slice();
1509 let mut arr = [0; 29];
1510 arr[..bytes.len()].copy_from_slice(bytes);
1511 for byte in arr[..bytes.len() - 1].iter_mut().rev() {
1512 *byte = byte.wrapping_sub(1);
1513 if *byte != 255 {
1514 break;
1515 }
1516 }
1517 Principal::from_slice(&arr[..bytes.len()])
1518 }
1519}
1520
1521#[derive(Clone)]
1522pub(crate) struct Subnet {
1523 _key: Vec<u8>,
1526 node_keys: HashMap<Principal, Vec<u8>>,
1527 canister_ranges: RangeInclusiveSet<Principal, PrincipalStep>,
1528}
1529
1530#[derive(Debug, Clone)]
1532pub struct ApiBoundaryNode {
1533 pub domain: String,
1535 pub ipv6_address: String,
1537 pub ipv4_address: Option<String>,
1539}
1540
1541#[derive(Debug, Clone)]
1545#[non_exhaustive]
1546pub struct QueryBuilder<'agent> {
1547 agent: &'agent Agent,
1548 pub effective_canister_id: Principal,
1550 pub canister_id: Principal,
1552 pub method_name: String,
1554 pub arg: Vec<u8>,
1556 pub ingress_expiry_datetime: Option<u64>,
1558 pub use_nonce: bool,
1560}
1561
1562impl<'agent> QueryBuilder<'agent> {
1563 pub fn new(agent: &'agent Agent, canister_id: Principal, method_name: String) -> Self {
1565 Self {
1566 agent,
1567 effective_canister_id: canister_id,
1568 canister_id,
1569 method_name,
1570 arg: vec![],
1571 ingress_expiry_datetime: None,
1572 use_nonce: false,
1573 }
1574 }
1575
1576 pub fn with_effective_canister_id(mut self, canister_id: Principal) -> Self {
1578 self.effective_canister_id = canister_id;
1579 self
1580 }
1581
1582 pub fn with_arg<A: Into<Vec<u8>>>(mut self, arg: A) -> Self {
1584 self.arg = arg.into();
1585 self
1586 }
1587
1588 pub fn expire_at(mut self, time: impl Into<OffsetDateTime>) -> Self {
1590 self.ingress_expiry_datetime = Some(time.into().unix_timestamp_nanos() as u64);
1591 self
1592 }
1593
1594 pub fn expire_after(mut self, duration: Duration) -> Self {
1596 self.ingress_expiry_datetime = Some(
1597 OffsetDateTime::now_utc()
1598 .saturating_add(duration.try_into().expect("negative duration"))
1599 .unix_timestamp_nanos() as u64,
1600 );
1601 self
1602 }
1603
1604 pub fn with_nonce_generation(mut self) -> Self {
1607 self.use_nonce = true;
1608 self
1609 }
1610
1611 pub async fn call(self) -> Result<Vec<u8>, AgentError> {
1613 self.agent
1614 .query_raw(
1615 self.canister_id,
1616 self.effective_canister_id,
1617 self.method_name,
1618 self.arg,
1619 self.ingress_expiry_datetime,
1620 self.use_nonce,
1621 None,
1622 )
1623 .await
1624 }
1625
1626 pub async fn call_with_verification(self) -> Result<Vec<u8>, AgentError> {
1631 self.agent
1632 .query_raw(
1633 self.canister_id,
1634 self.effective_canister_id,
1635 self.method_name,
1636 self.arg,
1637 self.ingress_expiry_datetime,
1638 self.use_nonce,
1639 Some(true),
1640 )
1641 .await
1642 }
1643
1644 pub async fn call_without_verification(self) -> Result<Vec<u8>, AgentError> {
1649 self.agent
1650 .query_raw(
1651 self.canister_id,
1652 self.effective_canister_id,
1653 self.method_name,
1654 self.arg,
1655 self.ingress_expiry_datetime,
1656 self.use_nonce,
1657 Some(false),
1658 )
1659 .await
1660 }
1661
1662 pub fn sign(self) -> Result<SignedQuery, AgentError> {
1665 let effective_canister_id = self.effective_canister_id;
1666 let identity = self.agent.identity.clone();
1667 let content = self.into_envelope()?;
1668 let signed_query = sign_envelope(&content, identity)?;
1669 let EnvelopeContent::Query {
1670 ingress_expiry,
1671 sender,
1672 canister_id,
1673 method_name,
1674 arg,
1675 nonce,
1676 } = content
1677 else {
1678 unreachable!()
1679 };
1680 Ok(SignedQuery {
1681 ingress_expiry,
1682 sender,
1683 canister_id,
1684 method_name,
1685 arg,
1686 effective_canister_id,
1687 signed_query,
1688 nonce,
1689 })
1690 }
1691
1692 pub fn into_envelope(self) -> Result<EnvelopeContent, AgentError> {
1694 self.agent.query_content(
1695 self.canister_id,
1696 self.method_name,
1697 self.arg,
1698 self.ingress_expiry_datetime,
1699 self.use_nonce,
1700 )
1701 }
1702}
1703
1704impl<'agent> IntoFuture for QueryBuilder<'agent> {
1705 type IntoFuture = AgentFuture<'agent, Vec<u8>>;
1706 type Output = Result<Vec<u8>, AgentError>;
1707 fn into_future(self) -> Self::IntoFuture {
1708 Box::pin(self.call())
1709 }
1710}
1711
1712pub struct UpdateCall<'agent> {
1714 agent: &'agent Agent,
1715 response_future: AgentFuture<'agent, CallResponse<(Vec<u8>, Certificate)>>,
1716 effective_canister_id: Principal,
1717}
1718
1719impl fmt::Debug for UpdateCall<'_> {
1720 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1721 f.debug_struct("UpdateCall")
1722 .field("agent", &self.agent)
1723 .field("effective_canister_id", &self.effective_canister_id)
1724 .finish_non_exhaustive()
1725 }
1726}
1727
1728impl Future for UpdateCall<'_> {
1729 type Output = Result<CallResponse<(Vec<u8>, Certificate)>, AgentError>;
1730 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1731 self.response_future.as_mut().poll(cx)
1732 }
1733}
1734
1735impl<'a> UpdateCall<'a> {
1736 pub async fn and_wait(self) -> Result<(Vec<u8>, Certificate), AgentError> {
1738 let response = self.response_future.await?;
1739
1740 match response {
1741 CallResponse::Response(response) => Ok(response),
1742 CallResponse::Poll(request_id) => {
1743 self.agent
1744 .wait(&request_id, self.effective_canister_id)
1745 .await
1746 }
1747 }
1748 }
1749}
1750#[derive(Debug)]
1755pub struct UpdateBuilder<'agent> {
1756 agent: &'agent Agent,
1757 pub effective_canister_id: Principal,
1759 pub canister_id: Principal,
1761 pub method_name: String,
1763 pub arg: Vec<u8>,
1765 pub ingress_expiry_datetime: Option<u64>,
1767}
1768
1769impl<'agent> UpdateBuilder<'agent> {
1770 pub fn new(agent: &'agent Agent, canister_id: Principal, method_name: String) -> Self {
1772 Self {
1773 agent,
1774 effective_canister_id: canister_id,
1775 canister_id,
1776 method_name,
1777 arg: vec![],
1778 ingress_expiry_datetime: None,
1779 }
1780 }
1781
1782 pub fn with_effective_canister_id(mut self, canister_id: Principal) -> Self {
1784 self.effective_canister_id = canister_id;
1785 self
1786 }
1787
1788 pub fn with_arg<A: Into<Vec<u8>>>(mut self, arg: A) -> Self {
1790 self.arg = arg.into();
1791 self
1792 }
1793
1794 pub fn expire_at(mut self, time: impl Into<OffsetDateTime>) -> Self {
1796 self.ingress_expiry_datetime = Some(time.into().unix_timestamp_nanos() as u64);
1797 self
1798 }
1799
1800 pub fn expire_after(mut self, duration: Duration) -> Self {
1802 self.ingress_expiry_datetime = Some(
1803 OffsetDateTime::now_utc()
1804 .saturating_add(duration.try_into().expect("negative duration"))
1805 .unix_timestamp_nanos() as u64,
1806 );
1807 self
1808 }
1809
1810 pub async fn call_and_wait(self) -> Result<Vec<u8>, AgentError> {
1813 self.call().and_wait().await.map(|x| x.0)
1814 }
1815
1816 pub fn call(self) -> UpdateCall<'agent> {
1819 let response_future = async move {
1820 self.agent
1821 .update_raw(
1822 self.canister_id,
1823 self.effective_canister_id,
1824 self.method_name,
1825 self.arg,
1826 self.ingress_expiry_datetime,
1827 )
1828 .await
1829 };
1830 UpdateCall {
1831 agent: self.agent,
1832 response_future: Box::pin(response_future),
1833 effective_canister_id: self.effective_canister_id,
1834 }
1835 }
1836
1837 pub fn sign(self) -> Result<SignedUpdate, AgentError> {
1840 let identity = self.agent.identity.clone();
1841 let effective_canister_id = self.effective_canister_id;
1842 let content = self.into_envelope()?;
1843 let signed_update = sign_envelope(&content, identity)?;
1844 let request_id = to_request_id(&content)?;
1845 let EnvelopeContent::Call {
1846 nonce,
1847 ingress_expiry,
1848 sender,
1849 canister_id,
1850 method_name,
1851 arg,
1852 } = content
1853 else {
1854 unreachable!()
1855 };
1856 Ok(SignedUpdate {
1857 nonce,
1858 ingress_expiry,
1859 sender,
1860 canister_id,
1861 method_name,
1862 arg,
1863 effective_canister_id,
1864 signed_update,
1865 request_id,
1866 })
1867 }
1868
1869 pub fn into_envelope(self) -> Result<EnvelopeContent, AgentError> {
1871 let nonce = self.agent.nonce_factory.generate();
1872 self.agent.update_content(
1873 self.canister_id,
1874 self.method_name,
1875 self.arg,
1876 self.ingress_expiry_datetime,
1877 nonce,
1878 )
1879 }
1880}
1881
1882impl<'agent> IntoFuture for UpdateBuilder<'agent> {
1883 type IntoFuture = AgentFuture<'agent, Vec<u8>>;
1884 type Output = Result<Vec<u8>, AgentError>;
1885 fn into_future(self) -> Self::IntoFuture {
1886 Box::pin(self.call_and_wait())
1887 }
1888}
1889
1890#[cfg_attr(target_family = "wasm", async_trait(?Send))]
1892#[cfg_attr(not(target_family = "wasm"), async_trait)]
1893pub trait HttpService: Send + Sync + Debug {
1894 async fn call<'a>(
1896 &'a self,
1897 req: &'a (dyn Fn() -> Result<Request, AgentError> + Send + Sync),
1898 max_retries: usize,
1899 ) -> Result<Response, AgentError>;
1900}
1901#[cfg(not(target_family = "wasm"))]
1902#[async_trait]
1903impl<T> HttpService for T
1904where
1905 for<'a> &'a T: Service<Request, Response = Response, Error = reqwest::Error>,
1906 for<'a> <&'a Self as Service<Request>>::Future: Send,
1907 T: Send + Sync + Debug + ?Sized,
1908{
1909 #[allow(clippy::needless_arbitrary_self_type)]
1910 async fn call<'a>(
1911 mut self: &'a Self,
1912 req: &'a (dyn Fn() -> Result<Request, AgentError> + Send + Sync),
1913 max_retries: usize,
1914 ) -> Result<Response, AgentError> {
1915 let mut retry_count = 0;
1916 loop {
1917 match Service::call(&mut self, req()?).await {
1918 Err(err) => {
1919 if err.is_connect() {
1921 if retry_count >= max_retries {
1922 return Err(AgentError::TransportError(err));
1923 }
1924 retry_count += 1;
1925 }
1926 }
1927 Ok(resp) => return Ok(resp),
1928 }
1929 }
1930 }
1931}
1932
1933#[cfg(target_family = "wasm")]
1934#[async_trait(?Send)]
1935impl<T> HttpService for T
1936where
1937 for<'a> &'a T: Service<Request, Response = Response, Error = reqwest::Error>,
1938 T: Send + Sync + Debug + ?Sized,
1939{
1940 #[allow(clippy::needless_arbitrary_self_type)]
1941 async fn call<'a>(
1942 mut self: &'a Self,
1943 req: &'a (dyn Fn() -> Result<Request, AgentError> + Send + Sync),
1944 _: usize,
1945 ) -> Result<Response, AgentError> {
1946 Ok(Service::call(&mut self, req()?).await?)
1947 }
1948}
1949
1950#[derive(Debug)]
1951struct Retry429Logic {
1952 client: Client,
1953}
1954
1955#[cfg_attr(target_family = "wasm", async_trait(?Send))]
1956#[cfg_attr(not(target_family = "wasm"), async_trait)]
1957impl HttpService for Retry429Logic {
1958 async fn call<'a>(
1959 &'a self,
1960 req: &'a (dyn Fn() -> Result<Request, AgentError> + Send + Sync),
1961 _max_tcp_retries: usize,
1962 ) -> Result<Response, AgentError> {
1963 let mut retries = 0;
1964 loop {
1965 #[cfg(not(target_family = "wasm"))]
1966 let resp = self.client.call(req, _max_tcp_retries).await?;
1967 #[cfg(target_family = "wasm")]
1969 let resp = self.client.execute(req()?).await?;
1970 if resp.status() == StatusCode::TOO_MANY_REQUESTS {
1971 if retries == 6 {
1972 break Ok(resp);
1973 } else {
1974 retries += 1;
1975 crate::util::sleep(Duration::from_millis(250)).await;
1976 continue;
1977 }
1978 } else {
1979 break Ok(resp);
1980 }
1981 }
1982 }
1983}
1984
1985#[cfg(all(test, not(target_family = "wasm")))]
1986mod offline_tests {
1987 use super::*;
1988 use tokio::net::TcpListener;
1989 #[test]
1992 fn rounded_expiry() {
1993 let agent = Agent::builder()
1994 .with_url("http://not-a-real-url")
1995 .build()
1996 .unwrap();
1997 let mut prev_expiry = None;
1998 let mut num_timestamps = 0;
1999 for _ in 0..6 {
2000 let update = agent
2001 .update(&Principal::management_canister(), "not_a_method")
2002 .sign()
2003 .unwrap();
2004 if prev_expiry < Some(update.ingress_expiry) {
2005 prev_expiry = Some(update.ingress_expiry);
2006 num_timestamps += 1;
2007 }
2008 }
2009 assert!(num_timestamps <= 2, "num_timestamps:{num_timestamps} > 2");
2011 }
2012
2013 #[tokio::test]
2014 async fn client_ratelimit() {
2015 let mock_server = TcpListener::bind("127.0.0.1:0").await.unwrap();
2016 let count = Arc::new(Mutex::new(0));
2017 let port = mock_server.local_addr().unwrap().port();
2018 tokio::spawn({
2019 let count = count.clone();
2020 async move {
2021 loop {
2022 let (mut conn, _) = mock_server.accept().await.unwrap();
2023 *count.lock().unwrap() += 1;
2024 tokio::spawn(
2025 async move { tokio::io::copy(&mut conn, &mut tokio::io::sink()).await },
2027 );
2028 }
2029 }
2030 });
2031 let agent = Agent::builder()
2032 .with_http_client(Client::builder().http1_only().build().unwrap())
2033 .with_url(format!("http://127.0.0.1:{port}"))
2034 .with_max_concurrent_requests(2)
2035 .build()
2036 .unwrap();
2037 for _ in 0..3 {
2038 let agent = agent.clone();
2039 tokio::spawn(async move {
2040 agent
2041 .query(&"ryjl3-tyaaa-aaaaa-aaaba-cai".parse().unwrap(), "greet")
2042 .call()
2043 .await
2044 });
2045 }
2046 crate::util::sleep(Duration::from_millis(250)).await;
2047 assert_eq!(*count.lock().unwrap(), 2);
2048 }
2049}