1pub(crate) mod agent_config;
3pub mod agent_error;
4pub(crate) mod builder;
5#[doc(hidden)]
7#[deprecated(since = "0.38.0", note = "use the AgentBuilder methods")]
8pub mod http_transport;
9pub(crate) mod nonce;
10pub(crate) mod response_authentication;
11pub mod route_provider;
12pub mod status;
13
14pub use agent_config::AgentConfig;
15pub use agent_error::AgentError;
16use agent_error::{HttpErrorPayload, Operation};
17use async_lock::Semaphore;
18use async_trait::async_trait;
19pub use builder::AgentBuilder;
20use cached::{Cached, TimedCache};
21use ed25519_consensus::{Error as Ed25519Error, Signature, VerificationKey};
22use futures_util::StreamExt;
23use http::{header::CONTENT_TYPE, HeaderMap, Method, StatusCode};
24#[doc(inline)]
25pub use ic_transport_types::{
26 signed, CallResponse, Envelope, EnvelopeContent, RejectCode, RejectResponse, ReplyResponse,
27 RequestStatusResponse,
28};
29pub use nonce::{NonceFactory, NonceGenerator};
30use rangemap::{RangeInclusiveMap, RangeInclusiveSet, StepFns};
31use reqwest::{Body, Client, Request, Response};
32use route_provider::{
33 dynamic_routing::{
34 dynamic_route_provider::DynamicRouteProviderBuilder, node::Node,
35 snapshot::latency_based_routing::LatencyRoutingSnapshot,
36 },
37 RouteProvider, UrlUntilReady,
38};
39use time::OffsetDateTime;
40use tower_service::Service;
41
42#[cfg(test)]
43mod agent_test;
44
45use crate::{
46 agent::response_authentication::{
47 extract_der, lookup_canister_info, lookup_canister_metadata, lookup_request_status,
48 lookup_subnet, lookup_subnet_metrics, lookup_time, lookup_value,
49 },
50 export::Principal,
51 identity::Identity,
52 to_request_id, RequestId,
53};
54use backoff::{backoff::Backoff, ExponentialBackoffBuilder};
55use backoff::{exponential::ExponentialBackoff, SystemClock};
56use ic_certification::{Certificate, Delegation, Label};
57use ic_transport_types::{
58 signed::{SignedQuery, SignedRequestStatus, SignedUpdate},
59 QueryResponse, ReadStateResponse, SubnetMetrics, TransportCallResponse,
60};
61use serde::Serialize;
62use status::Status;
63use std::{
64 borrow::Cow,
65 collections::HashMap,
66 convert::TryFrom,
67 fmt::{self, Debug},
68 future::{Future, IntoFuture},
69 pin::Pin,
70 sync::{Arc, Mutex, RwLock},
71 task::{Context, Poll},
72 time::Duration,
73};
74
75use crate::agent::response_authentication::lookup_api_boundary_nodes;
76
77const IC_STATE_ROOT_DOMAIN_SEPARATOR: &[u8; 14] = b"\x0Dic-state-root";
78
79const IC_ROOT_KEY: &[u8; 133] = b"\x30\x81\x82\x30\x1d\x06\x0d\x2b\x06\x01\x04\x01\x82\xdc\x7c\x05\x03\x01\x02\x01\x06\x0c\x2b\x06\x01\x04\x01\x82\xdc\x7c\x05\x03\x02\x01\x03\x61\x00\x81\x4c\x0e\x6e\xc7\x1f\xab\x58\x3b\x08\xbd\x81\x37\x3c\x25\x5c\x3c\x37\x1b\x2e\x84\x86\x3c\x98\xa4\xf1\xe0\x8b\x74\x23\x5d\x14\xfb\x5d\x9c\x0c\xd5\x46\xd9\x68\x5f\x91\x3a\x0c\x0b\x2c\xc5\x34\x15\x83\xbf\x4b\x43\x92\xe4\x67\xdb\x96\xd6\x5b\x9b\xb4\xcb\x71\x71\x12\xf8\x47\x2e\x0d\x5a\x4d\x14\x50\x5f\xfd\x74\x84\xb0\x12\x91\x09\x1c\x5f\x87\xb9\x88\x83\x46\x3f\x98\x09\x1a\x0b\xaa\xae";
80
81#[cfg(not(target_family = "wasm"))]
82type AgentFuture<'a, V> = Pin<Box<dyn Future<Output = Result<V, AgentError>> + Send + 'a>>;
83
84#[cfg(target_family = "wasm")]
85type AgentFuture<'a, V> = Pin<Box<dyn Future<Output = Result<V, AgentError>> + 'a>>;
86
87#[derive(Clone)]
153pub struct Agent {
154 nonce_factory: Arc<dyn NonceGenerator>,
155 identity: Arc<dyn Identity>,
156 ingress_expiry: Duration,
157 root_key: Arc<RwLock<Vec<u8>>>,
158 client: Arc<dyn HttpService>,
159 route_provider: Arc<dyn RouteProvider>,
160 subnet_key_cache: Arc<Mutex<SubnetCache>>,
161 concurrent_requests_semaphore: Arc<Semaphore>,
162 verify_query_signatures: bool,
163 max_response_body_size: Option<usize>,
164 max_polling_time: Duration,
165 #[allow(dead_code)]
166 max_tcp_error_retries: usize,
167}
168
169impl fmt::Debug for Agent {
170 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
171 f.debug_struct("Agent")
172 .field("ingress_expiry", &self.ingress_expiry)
173 .finish_non_exhaustive()
174 }
175}
176
177impl Agent {
178 pub fn builder() -> builder::AgentBuilder {
181 Default::default()
182 }
183
184 pub fn new(config: agent_config::AgentConfig) -> Result<Agent, AgentError> {
186 let client = config.http_service.unwrap_or_else(|| {
187 Arc::new(Retry429Logic {
188 client: config.client.unwrap_or_else(|| {
189 #[cfg(not(target_family = "wasm"))]
190 {
191 Client::builder()
192 .use_rustls_tls()
193 .timeout(Duration::from_secs(360))
194 .build()
195 .expect("Could not create HTTP client.")
196 }
197 #[cfg(all(target_family = "wasm", feature = "wasm-bindgen"))]
198 {
199 Client::new()
200 }
201 }),
202 })
203 });
204 Ok(Agent {
205 nonce_factory: config.nonce_factory,
206 identity: config.identity,
207 ingress_expiry: config.ingress_expiry,
208 root_key: Arc::new(RwLock::new(IC_ROOT_KEY.to_vec())),
209 client: client.clone(),
210 route_provider: if let Some(route_provider) = config.route_provider {
211 route_provider
212 } else if let Some(url) = config.url {
213 if config.background_dynamic_routing {
214 assert!(
215 url.scheme() == "https" && url.path() == "/" && url.port().is_none() && url.domain().is_some(),
216 "in dynamic routing mode, URL must be in the exact form https://domain with no path, port, IP, or non-HTTPS scheme"
217 );
218 let seeds = vec![Node::new(url.domain().unwrap()).unwrap()];
219 UrlUntilReady::new(url, async move {
220 DynamicRouteProviderBuilder::new(
221 LatencyRoutingSnapshot::new(),
222 seeds,
223 client,
224 )
225 .build()
226 .await
227 }) as Arc<dyn RouteProvider>
228 } else {
229 Arc::new(url)
230 }
231 } else {
232 panic!("either route_provider or url must be specified");
233 },
234 subnet_key_cache: Arc::new(Mutex::new(SubnetCache::new())),
235 verify_query_signatures: config.verify_query_signatures,
236 concurrent_requests_semaphore: Arc::new(Semaphore::new(config.max_concurrent_requests)),
237 max_response_body_size: config.max_response_body_size,
238 max_tcp_error_retries: config.max_tcp_error_retries,
239 max_polling_time: config.max_polling_time,
240 })
241 }
242
243 pub fn set_identity<I>(&mut self, identity: I)
249 where
250 I: 'static + Identity,
251 {
252 self.identity = Arc::new(identity);
253 }
254 pub fn set_arc_identity(&mut self, identity: Arc<dyn Identity>) {
260 self.identity = identity;
261 }
262
263 pub async fn fetch_root_key(&self) -> Result<(), AgentError> {
272 if self.read_root_key()[..] != IC_ROOT_KEY[..] {
273 return Ok(());
275 }
276 let status = self.status().await?;
277 let Some(root_key) = status.root_key else {
278 return Err(AgentError::NoRootKeyInStatus(status));
279 };
280 self.set_root_key(root_key);
281 Ok(())
282 }
283
284 pub fn set_root_key(&self, root_key: Vec<u8>) {
289 *self.root_key.write().unwrap() = root_key;
290 }
291
292 pub fn read_root_key(&self) -> Vec<u8> {
294 self.root_key.read().unwrap().clone()
295 }
296
297 fn get_expiry_date(&self) -> u64 {
298 let expiry_raw = OffsetDateTime::now_utc() + self.ingress_expiry;
299 let mut rounded = expiry_raw.replace_nanosecond(0).unwrap();
300 if self.ingress_expiry.as_secs() > 90 {
301 rounded = rounded.replace_second(0).unwrap();
302 }
303 rounded.unix_timestamp_nanos().try_into().unwrap()
304 }
305
306 pub fn get_principal(&self) -> Result<Principal, String> {
308 self.identity.sender()
309 }
310
311 async fn query_endpoint<A>(
312 &self,
313 effective_canister_id: Principal,
314 serialized_bytes: Vec<u8>,
315 ) -> Result<A, AgentError>
316 where
317 A: serde::de::DeserializeOwned,
318 {
319 let _permit = self.concurrent_requests_semaphore.acquire().await;
320 let bytes = self
321 .execute(
322 Method::POST,
323 &format!("api/v2/canister/{}/query", effective_canister_id.to_text()),
324 Some(serialized_bytes),
325 )
326 .await?
327 .1;
328 serde_cbor::from_slice(&bytes).map_err(AgentError::InvalidCborData)
329 }
330
331 async fn read_state_endpoint<A>(
332 &self,
333 effective_canister_id: Principal,
334 serialized_bytes: Vec<u8>,
335 ) -> Result<A, AgentError>
336 where
337 A: serde::de::DeserializeOwned,
338 {
339 let _permit = self.concurrent_requests_semaphore.acquire().await;
340 let endpoint = format!(
341 "api/v2/canister/{}/read_state",
342 effective_canister_id.to_text()
343 );
344 let bytes = self
345 .execute(Method::POST, &endpoint, Some(serialized_bytes))
346 .await?
347 .1;
348 serde_cbor::from_slice(&bytes).map_err(AgentError::InvalidCborData)
349 }
350
351 async fn read_subnet_state_endpoint<A>(
352 &self,
353 subnet_id: Principal,
354 serialized_bytes: Vec<u8>,
355 ) -> Result<A, AgentError>
356 where
357 A: serde::de::DeserializeOwned,
358 {
359 let _permit = self.concurrent_requests_semaphore.acquire().await;
360 let endpoint = format!("api/v2/subnet/{}/read_state", subnet_id.to_text());
361 let bytes = self
362 .execute(Method::POST, &endpoint, Some(serialized_bytes))
363 .await?
364 .1;
365 serde_cbor::from_slice(&bytes).map_err(AgentError::InvalidCborData)
366 }
367
368 async fn call_endpoint(
369 &self,
370 effective_canister_id: Principal,
371 serialized_bytes: Vec<u8>,
372 ) -> Result<TransportCallResponse, AgentError> {
373 let _permit = self.concurrent_requests_semaphore.acquire().await;
374 let endpoint = format!("api/v3/canister/{}/call", effective_canister_id.to_text());
375 let (status_code, response_body) = self
376 .execute(Method::POST, &endpoint, Some(serialized_bytes))
377 .await?;
378
379 if status_code == StatusCode::ACCEPTED {
380 return Ok(TransportCallResponse::Accepted);
381 }
382
383 serde_cbor::from_slice(&response_body).map_err(AgentError::InvalidCborData)
384 }
385
386 #[allow(clippy::too_many_arguments)]
389 async fn query_raw(
390 &self,
391 canister_id: Principal,
392 effective_canister_id: Principal,
393 method_name: String,
394 arg: Vec<u8>,
395 ingress_expiry_datetime: Option<u64>,
396 use_nonce: bool,
397 explicit_verify_query_signatures: Option<bool>,
398 ) -> Result<Vec<u8>, AgentError> {
399 let operation = Operation::Call {
400 canister: canister_id,
401 method: method_name.clone(),
402 };
403 let content = self.query_content(
404 canister_id,
405 method_name,
406 arg,
407 ingress_expiry_datetime,
408 use_nonce,
409 )?;
410 let serialized_bytes = sign_envelope(&content, self.identity.clone())?;
411 self.query_inner(
412 effective_canister_id,
413 serialized_bytes,
414 content.to_request_id(),
415 explicit_verify_query_signatures,
416 operation,
417 )
418 .await
419 }
420
421 pub async fn query_signed(
425 &self,
426 effective_canister_id: Principal,
427 signed_query: Vec<u8>,
428 ) -> Result<Vec<u8>, AgentError> {
429 let envelope: Envelope =
430 serde_cbor::from_slice(&signed_query).map_err(AgentError::InvalidCborData)?;
431 let EnvelopeContent::Query {
432 canister_id,
433 method_name,
434 ..
435 } = &*envelope.content
436 else {
437 return Err(AgentError::CallDataMismatch {
438 field: "request_type".to_string(),
439 value_arg: "query".to_string(),
440 value_cbor: if matches!(*envelope.content, EnvelopeContent::Call { .. }) {
441 "update"
442 } else {
443 "read_state"
444 }
445 .to_string(),
446 });
447 };
448 let operation = Operation::Call {
449 canister: *canister_id,
450 method: method_name.clone(),
451 };
452 self.query_inner(
453 effective_canister_id,
454 signed_query,
455 envelope.content.to_request_id(),
456 None,
457 operation,
458 )
459 .await
460 }
461
462 async fn query_inner(
466 &self,
467 effective_canister_id: Principal,
468 signed_query: Vec<u8>,
469 request_id: RequestId,
470 explicit_verify_query_signatures: Option<bool>,
471 operation: Operation,
472 ) -> Result<Vec<u8>, AgentError> {
473 let response = if explicit_verify_query_signatures.unwrap_or(self.verify_query_signatures) {
474 let (response, mut subnet) = futures_util::try_join!(
475 self.query_endpoint::<QueryResponse>(effective_canister_id, signed_query),
476 self.get_subnet_by_canister(&effective_canister_id)
477 )?;
478 if response.signatures().is_empty() {
479 return Err(AgentError::MissingSignature);
480 } else if response.signatures().len() > subnet.node_keys.len() {
481 return Err(AgentError::TooManySignatures {
482 had: response.signatures().len(),
483 needed: subnet.node_keys.len(),
484 });
485 }
486 for signature in response.signatures() {
487 if OffsetDateTime::now_utc()
488 - OffsetDateTime::from_unix_timestamp_nanos(signature.timestamp.into()).unwrap()
489 > self.ingress_expiry
490 {
491 return Err(AgentError::CertificateOutdated(self.ingress_expiry));
492 }
493 let signable = response.signable(request_id, signature.timestamp);
494 let node_key = if let Some(node_key) = subnet.node_keys.get(&signature.identity) {
495 node_key
496 } else {
497 subnet = self
498 .fetch_subnet_by_canister(&effective_canister_id)
499 .await?;
500 subnet
501 .node_keys
502 .get(&signature.identity)
503 .ok_or(AgentError::CertificateNotAuthorized())?
504 };
505 if node_key.len() != 44 {
506 return Err(AgentError::DerKeyLengthMismatch {
507 expected: 44,
508 actual: node_key.len(),
509 });
510 }
511 const DER_PREFIX: [u8; 12] = [48, 42, 48, 5, 6, 3, 43, 101, 112, 3, 33, 0];
512 if node_key[..12] != DER_PREFIX {
513 return Err(AgentError::DerPrefixMismatch {
514 expected: DER_PREFIX.to_vec(),
515 actual: node_key[..12].to_vec(),
516 });
517 }
518 let pubkey =
519 VerificationKey::try_from(<[u8; 32]>::try_from(&node_key[12..]).unwrap())
520 .map_err(|_| AgentError::MalformedPublicKey)?;
521 let sig = Signature::from(
522 <[u8; 64]>::try_from(&signature.signature[..])
523 .map_err(|_| AgentError::MalformedSignature)?,
524 );
525
526 match pubkey.verify(&sig, &signable) {
527 Err(Ed25519Error::InvalidSignature) => {
528 return Err(AgentError::QuerySignatureVerificationFailed)
529 }
530 Err(Ed25519Error::InvalidSliceLength) => {
531 return Err(AgentError::MalformedSignature)
532 }
533 Err(Ed25519Error::MalformedPublicKey) => {
534 return Err(AgentError::MalformedPublicKey)
535 }
536 Ok(()) => (),
537 _ => unreachable!(),
538 }
539 }
540 response
541 } else {
542 self.query_endpoint::<QueryResponse>(effective_canister_id, signed_query)
543 .await?
544 };
545
546 match response {
547 QueryResponse::Replied { reply, .. } => Ok(reply.arg),
548 QueryResponse::Rejected { reject, .. } => Err(AgentError::UncertifiedReject {
549 reject,
550 operation: Some(operation),
551 }),
552 }
553 }
554
555 fn query_content(
556 &self,
557 canister_id: Principal,
558 method_name: String,
559 arg: Vec<u8>,
560 ingress_expiry_datetime: Option<u64>,
561 use_nonce: bool,
562 ) -> Result<EnvelopeContent, AgentError> {
563 Ok(EnvelopeContent::Query {
564 sender: self.identity.sender().map_err(AgentError::SigningError)?,
565 canister_id,
566 method_name,
567 arg,
568 ingress_expiry: ingress_expiry_datetime.unwrap_or_else(|| self.get_expiry_date()),
569 nonce: use_nonce.then(|| self.nonce_factory.generate()).flatten(),
570 })
571 }
572
573 async fn update_raw(
575 &self,
576 canister_id: Principal,
577 effective_canister_id: Principal,
578 method_name: String,
579 arg: Vec<u8>,
580 ingress_expiry_datetime: Option<u64>,
581 ) -> Result<CallResponse<(Vec<u8>, Certificate)>, AgentError> {
582 let nonce = self.nonce_factory.generate();
583 let content = self.update_content(
584 canister_id,
585 method_name.clone(),
586 arg,
587 ingress_expiry_datetime,
588 nonce,
589 )?;
590 let operation = Some(Operation::Call {
591 canister: canister_id,
592 method: method_name,
593 });
594 let request_id = to_request_id(&content)?;
595 let serialized_bytes = sign_envelope(&content, self.identity.clone())?;
596
597 let response_body = self
598 .call_endpoint(effective_canister_id, serialized_bytes)
599 .await?;
600
601 match response_body {
602 TransportCallResponse::Replied { certificate } => {
603 let certificate =
604 serde_cbor::from_slice(&certificate).map_err(AgentError::InvalidCborData)?;
605
606 self.verify(&certificate, effective_canister_id)?;
607 let status = lookup_request_status(&certificate, &request_id)?;
608
609 match status {
610 RequestStatusResponse::Replied(reply) => {
611 Ok(CallResponse::Response((reply.arg, certificate)))
612 }
613 RequestStatusResponse::Rejected(reject_response) => {
614 Err(AgentError::CertifiedReject {
615 reject: reject_response,
616 operation,
617 })?
618 }
619 _ => Ok(CallResponse::Poll(request_id)),
620 }
621 }
622 TransportCallResponse::Accepted => Ok(CallResponse::Poll(request_id)),
623 TransportCallResponse::NonReplicatedRejection(reject_response) => {
624 Err(AgentError::UncertifiedReject {
625 reject: reject_response,
626 operation,
627 })
628 }
629 }
630 }
631
632 pub async fn update_signed(
636 &self,
637 effective_canister_id: Principal,
638 signed_update: Vec<u8>,
639 ) -> Result<CallResponse<Vec<u8>>, AgentError> {
640 let envelope: Envelope =
641 serde_cbor::from_slice(&signed_update).map_err(AgentError::InvalidCborData)?;
642 let EnvelopeContent::Call {
643 canister_id,
644 method_name,
645 ..
646 } = &*envelope.content
647 else {
648 return Err(AgentError::CallDataMismatch {
649 field: "request_type".to_string(),
650 value_arg: "update".to_string(),
651 value_cbor: if matches!(*envelope.content, EnvelopeContent::Query { .. }) {
652 "query"
653 } else {
654 "read_state"
655 }
656 .to_string(),
657 });
658 };
659 let operation = Some(Operation::Call {
660 canister: *canister_id,
661 method: method_name.clone(),
662 });
663 let request_id = to_request_id(&envelope.content)?;
664
665 let response_body = self
666 .call_endpoint(effective_canister_id, signed_update)
667 .await?;
668
669 match response_body {
670 TransportCallResponse::Replied { certificate } => {
671 let certificate =
672 serde_cbor::from_slice(&certificate).map_err(AgentError::InvalidCborData)?;
673
674 self.verify(&certificate, effective_canister_id)?;
675 let status = lookup_request_status(&certificate, &request_id)?;
676
677 match status {
678 RequestStatusResponse::Replied(reply) => Ok(CallResponse::Response(reply.arg)),
679 RequestStatusResponse::Rejected(reject_response) => {
680 Err(AgentError::CertifiedReject {
681 reject: reject_response,
682 operation,
683 })?
684 }
685 _ => Ok(CallResponse::Poll(request_id)),
686 }
687 }
688 TransportCallResponse::Accepted => Ok(CallResponse::Poll(request_id)),
689 TransportCallResponse::NonReplicatedRejection(reject_response) => {
690 Err(AgentError::UncertifiedReject {
691 reject: reject_response,
692 operation,
693 })
694 }
695 }
696 }
697
698 fn update_content(
699 &self,
700 canister_id: Principal,
701 method_name: String,
702 arg: Vec<u8>,
703 ingress_expiry_datetime: Option<u64>,
704 nonce: Option<Vec<u8>>,
705 ) -> Result<EnvelopeContent, AgentError> {
706 Ok(EnvelopeContent::Call {
707 canister_id,
708 method_name,
709 arg,
710 nonce,
711 sender: self.identity.sender().map_err(AgentError::SigningError)?,
712 ingress_expiry: ingress_expiry_datetime.unwrap_or_else(|| self.get_expiry_date()),
713 })
714 }
715
716 fn get_retry_policy(&self) -> ExponentialBackoff<SystemClock> {
717 ExponentialBackoffBuilder::new()
718 .with_initial_interval(Duration::from_millis(500))
719 .with_max_interval(Duration::from_secs(1))
720 .with_multiplier(1.4)
721 .with_max_elapsed_time(Some(self.max_polling_time))
722 .build()
723 }
724
725 pub async fn wait_signed(
727 &self,
728 request_id: &RequestId,
729 effective_canister_id: Principal,
730 signed_request_status: Vec<u8>,
731 ) -> Result<(Vec<u8>, Certificate), AgentError> {
732 let mut retry_policy = self.get_retry_policy();
733
734 let mut request_accepted = false;
735 let (resp, cert) = self
736 .request_status_signed(
737 request_id,
738 effective_canister_id,
739 signed_request_status.clone(),
740 )
741 .await?;
742 loop {
743 match resp {
744 RequestStatusResponse::Unknown => {}
745
746 RequestStatusResponse::Received | RequestStatusResponse::Processing => {
747 if !request_accepted {
748 retry_policy.reset();
749 request_accepted = true;
750 }
751 }
752
753 RequestStatusResponse::Replied(ReplyResponse { arg, .. }) => {
754 return Ok((arg, cert))
755 }
756
757 RequestStatusResponse::Rejected(response) => {
758 return Err(AgentError::CertifiedReject {
759 reject: response,
760 operation: None,
761 })
762 }
763
764 RequestStatusResponse::Done => {
765 return Err(AgentError::RequestStatusDoneNoReply(String::from(
766 *request_id,
767 )))
768 }
769 };
770
771 match retry_policy.next_backoff() {
772 Some(duration) => crate::util::sleep(duration).await,
773
774 None => return Err(AgentError::TimeoutWaitingForResponse()),
775 }
776 }
777 }
778
779 pub async fn wait(
781 &self,
782 request_id: &RequestId,
783 effective_canister_id: Principal,
784 ) -> Result<(Vec<u8>, Certificate), AgentError> {
785 self.wait_inner(request_id, effective_canister_id, None)
786 .await
787 }
788
789 async fn wait_inner(
790 &self,
791 request_id: &RequestId,
792 effective_canister_id: Principal,
793 operation: Option<Operation>,
794 ) -> Result<(Vec<u8>, Certificate), AgentError> {
795 let mut retry_policy = self.get_retry_policy();
796
797 let mut request_accepted = false;
798 loop {
799 let (resp, cert) = self
800 .request_status_raw(request_id, effective_canister_id)
801 .await?;
802 match resp {
803 RequestStatusResponse::Unknown => {}
804
805 RequestStatusResponse::Received | RequestStatusResponse::Processing => {
806 if !request_accepted {
807 retry_policy.reset();
815 request_accepted = true;
816 }
817 }
818
819 RequestStatusResponse::Replied(ReplyResponse { arg, .. }) => {
820 return Ok((arg, cert))
821 }
822
823 RequestStatusResponse::Rejected(response) => {
824 return Err(AgentError::CertifiedReject {
825 reject: response,
826 operation,
827 })
828 }
829
830 RequestStatusResponse::Done => {
831 return Err(AgentError::RequestStatusDoneNoReply(String::from(
832 *request_id,
833 )))
834 }
835 };
836
837 match retry_policy.next_backoff() {
838 Some(duration) => crate::util::sleep(duration).await,
839
840 None => return Err(AgentError::TimeoutWaitingForResponse()),
841 }
842 }
843 }
844
845 pub async fn read_state_raw(
848 &self,
849 paths: Vec<Vec<Label>>,
850 effective_canister_id: Principal,
851 ) -> Result<Certificate, AgentError> {
852 let content = self.read_state_content(paths)?;
853 let serialized_bytes = sign_envelope(&content, self.identity.clone())?;
854
855 let read_state_response: ReadStateResponse = self
856 .read_state_endpoint(effective_canister_id, serialized_bytes)
857 .await?;
858 let cert: Certificate = serde_cbor::from_slice(&read_state_response.certificate)
859 .map_err(AgentError::InvalidCborData)?;
860 self.verify(&cert, effective_canister_id)?;
861 Ok(cert)
862 }
863
864 pub async fn read_subnet_state_raw(
867 &self,
868 paths: Vec<Vec<Label>>,
869 subnet_id: Principal,
870 ) -> Result<Certificate, AgentError> {
871 let content = self.read_state_content(paths)?;
872 let serialized_bytes = sign_envelope(&content, self.identity.clone())?;
873
874 let read_state_response: ReadStateResponse = self
875 .read_subnet_state_endpoint(subnet_id, serialized_bytes)
876 .await?;
877 let cert: Certificate = serde_cbor::from_slice(&read_state_response.certificate)
878 .map_err(AgentError::InvalidCborData)?;
879 self.verify_for_subnet(&cert, subnet_id)?;
880 Ok(cert)
881 }
882
883 fn read_state_content(&self, paths: Vec<Vec<Label>>) -> Result<EnvelopeContent, AgentError> {
884 Ok(EnvelopeContent::ReadState {
885 sender: self.identity.sender().map_err(AgentError::SigningError)?,
886 paths,
887 ingress_expiry: self.get_expiry_date(),
888 })
889 }
890
891 pub fn verify(
894 &self,
895 cert: &Certificate,
896 effective_canister_id: Principal,
897 ) -> Result<(), AgentError> {
898 self.verify_cert(cert, effective_canister_id)?;
899 self.verify_cert_timestamp(cert)?;
900 Ok(())
901 }
902
903 fn verify_cert(
904 &self,
905 cert: &Certificate,
906 effective_canister_id: Principal,
907 ) -> Result<(), AgentError> {
908 let sig = &cert.signature;
909
910 let root_hash = cert.tree.digest();
911 let mut msg = vec![];
912 msg.extend_from_slice(IC_STATE_ROOT_DOMAIN_SEPARATOR);
913 msg.extend_from_slice(&root_hash);
914
915 let der_key = self.check_delegation(&cert.delegation, effective_canister_id)?;
916 let key = extract_der(der_key)?;
917
918 ic_verify_bls_signature::verify_bls_signature(sig, &msg, &key)
919 .map_err(|_| AgentError::CertificateVerificationFailed())?;
920 Ok(())
921 }
922
923 pub fn verify_for_subnet(
926 &self,
927 cert: &Certificate,
928 subnet_id: Principal,
929 ) -> Result<(), AgentError> {
930 self.verify_cert_for_subnet(cert, subnet_id)?;
931 self.verify_cert_timestamp(cert)?;
932 Ok(())
933 }
934
935 fn verify_cert_for_subnet(
936 &self,
937 cert: &Certificate,
938 subnet_id: Principal,
939 ) -> Result<(), AgentError> {
940 let sig = &cert.signature;
941
942 let root_hash = cert.tree.digest();
943 let mut msg = vec![];
944 msg.extend_from_slice(IC_STATE_ROOT_DOMAIN_SEPARATOR);
945 msg.extend_from_slice(&root_hash);
946
947 let der_key = self.check_delegation_for_subnet(&cert.delegation, subnet_id)?;
948 let key = extract_der(der_key)?;
949
950 ic_verify_bls_signature::verify_bls_signature(sig, &msg, &key)
951 .map_err(|_| AgentError::CertificateVerificationFailed())?;
952 Ok(())
953 }
954
955 fn verify_cert_timestamp(&self, cert: &Certificate) -> Result<(), AgentError> {
956 let time = lookup_time(cert)?;
957 if (OffsetDateTime::now_utc()
958 - OffsetDateTime::from_unix_timestamp_nanos(time.into()).unwrap())
959 .abs()
960 > self.ingress_expiry
961 {
962 Err(AgentError::CertificateOutdated(self.ingress_expiry))
963 } else {
964 Ok(())
965 }
966 }
967
968 fn check_delegation(
969 &self,
970 delegation: &Option<Delegation>,
971 effective_canister_id: Principal,
972 ) -> Result<Vec<u8>, AgentError> {
973 match delegation {
974 None => Ok(self.read_root_key()),
975 Some(delegation) => {
976 let cert: Certificate = serde_cbor::from_slice(&delegation.certificate)
977 .map_err(AgentError::InvalidCborData)?;
978 if cert.delegation.is_some() {
979 return Err(AgentError::CertificateHasTooManyDelegations);
980 }
981 self.verify_cert(&cert, effective_canister_id)?;
982 let canister_range_lookup = [
983 "subnet".as_bytes(),
984 delegation.subnet_id.as_ref(),
985 "canister_ranges".as_bytes(),
986 ];
987 let canister_range = lookup_value(&cert.tree, canister_range_lookup)?;
988 let ranges: Vec<(Principal, Principal)> =
989 serde_cbor::from_slice(canister_range).map_err(AgentError::InvalidCborData)?;
990 if !principal_is_within_ranges(&effective_canister_id, &ranges[..]) {
991 return Err(AgentError::CertificateNotAuthorized());
993 }
994
995 let public_key_path = [
996 "subnet".as_bytes(),
997 delegation.subnet_id.as_ref(),
998 "public_key".as_bytes(),
999 ];
1000 lookup_value(&cert.tree, public_key_path).map(<[u8]>::to_vec)
1001 }
1002 }
1003 }
1004
1005 fn check_delegation_for_subnet(
1006 &self,
1007 delegation: &Option<Delegation>,
1008 subnet_id: Principal,
1009 ) -> Result<Vec<u8>, AgentError> {
1010 match delegation {
1011 None => Ok(self.read_root_key()),
1012 Some(delegation) => {
1013 let cert: Certificate = serde_cbor::from_slice(&delegation.certificate)
1014 .map_err(AgentError::InvalidCborData)?;
1015 if cert.delegation.is_some() {
1016 return Err(AgentError::CertificateHasTooManyDelegations);
1017 }
1018 self.verify_cert_for_subnet(&cert, subnet_id)?;
1019 let public_key_path = [
1020 "subnet".as_bytes(),
1021 delegation.subnet_id.as_ref(),
1022 "public_key".as_bytes(),
1023 ];
1024 let pk = lookup_value(&cert.tree, public_key_path)
1025 .map_err(|_| AgentError::CertificateNotAuthorized())?
1026 .to_vec();
1027 Ok(pk)
1028 }
1029 }
1030 }
1031
1032 pub async fn read_state_canister_info(
1035 &self,
1036 canister_id: Principal,
1037 path: &str,
1038 ) -> Result<Vec<u8>, AgentError> {
1039 let paths: Vec<Vec<Label>> = vec![vec![
1040 "canister".into(),
1041 Label::from_bytes(canister_id.as_slice()),
1042 path.into(),
1043 ]];
1044
1045 let cert = self.read_state_raw(paths, canister_id).await?;
1046
1047 lookup_canister_info(cert, canister_id, path)
1048 }
1049
1050 pub async fn read_state_canister_controllers(
1052 &self,
1053 canister_id: Principal,
1054 ) -> Result<Vec<Principal>, AgentError> {
1055 let blob = self
1056 .read_state_canister_info(canister_id, "controllers")
1057 .await?;
1058 let controllers: Vec<Principal> =
1059 serde_cbor::from_slice(&blob).map_err(AgentError::InvalidCborData)?;
1060 Ok(controllers)
1061 }
1062
1063 pub async fn read_state_canister_module_hash(
1065 &self,
1066 canister_id: Principal,
1067 ) -> Result<Vec<u8>, AgentError> {
1068 self.read_state_canister_info(canister_id, "module_hash")
1069 .await
1070 }
1071
1072 pub async fn read_state_canister_metadata(
1074 &self,
1075 canister_id: Principal,
1076 path: &str,
1077 ) -> Result<Vec<u8>, AgentError> {
1078 let paths: Vec<Vec<Label>> = vec![vec![
1079 "canister".into(),
1080 Label::from_bytes(canister_id.as_slice()),
1081 "metadata".into(),
1082 path.into(),
1083 ]];
1084
1085 let cert = self.read_state_raw(paths, canister_id).await?;
1086
1087 lookup_canister_metadata(cert, canister_id, path)
1088 }
1089
1090 pub async fn read_state_subnet_metrics(
1092 &self,
1093 subnet_id: Principal,
1094 ) -> Result<SubnetMetrics, AgentError> {
1095 let paths = vec![vec![
1096 "subnet".into(),
1097 Label::from_bytes(subnet_id.as_slice()),
1098 "metrics".into(),
1099 ]];
1100 let cert = self.read_subnet_state_raw(paths, subnet_id).await?;
1101 lookup_subnet_metrics(cert, subnet_id)
1102 }
1103
1104 pub async fn request_status_raw(
1106 &self,
1107 request_id: &RequestId,
1108 effective_canister_id: Principal,
1109 ) -> Result<(RequestStatusResponse, Certificate), AgentError> {
1110 let paths: Vec<Vec<Label>> =
1111 vec![vec!["request_status".into(), request_id.to_vec().into()]];
1112
1113 let cert = self.read_state_raw(paths, effective_canister_id).await?;
1114
1115 Ok((lookup_request_status(&cert, request_id)?, cert))
1116 }
1117
1118 pub async fn request_status_signed(
1122 &self,
1123 request_id: &RequestId,
1124 effective_canister_id: Principal,
1125 signed_request_status: Vec<u8>,
1126 ) -> Result<(RequestStatusResponse, Certificate), AgentError> {
1127 let _envelope: Envelope =
1128 serde_cbor::from_slice(&signed_request_status).map_err(AgentError::InvalidCborData)?;
1129 let read_state_response: ReadStateResponse = self
1130 .read_state_endpoint(effective_canister_id, signed_request_status)
1131 .await?;
1132
1133 let cert: Certificate = serde_cbor::from_slice(&read_state_response.certificate)
1134 .map_err(AgentError::InvalidCborData)?;
1135 self.verify(&cert, effective_canister_id)?;
1136 Ok((lookup_request_status(&cert, request_id)?, cert))
1137 }
1138
1139 pub fn update<S: Into<String>>(
1142 &self,
1143 canister_id: &Principal,
1144 method_name: S,
1145 ) -> UpdateBuilder {
1146 UpdateBuilder::new(self, *canister_id, method_name.into())
1147 }
1148
1149 pub async fn status(&self) -> Result<Status, AgentError> {
1151 let endpoint = "api/v2/status";
1152 let bytes = self.execute(Method::GET, endpoint, None).await?.1;
1153
1154 let cbor: serde_cbor::Value =
1155 serde_cbor::from_slice(&bytes).map_err(AgentError::InvalidCborData)?;
1156
1157 Status::try_from(&cbor).map_err(|_| AgentError::InvalidReplicaStatus)
1158 }
1159
1160 pub fn query<S: Into<String>>(&self, canister_id: &Principal, method_name: S) -> QueryBuilder {
1163 QueryBuilder::new(self, *canister_id, method_name.into())
1164 }
1165
1166 pub fn sign_request_status(
1169 &self,
1170 effective_canister_id: Principal,
1171 request_id: RequestId,
1172 ) -> Result<SignedRequestStatus, AgentError> {
1173 let paths: Vec<Vec<Label>> =
1174 vec![vec!["request_status".into(), request_id.to_vec().into()]];
1175 let read_state_content = self.read_state_content(paths)?;
1176 let signed_request_status = sign_envelope(&read_state_content, self.identity.clone())?;
1177 let ingress_expiry = read_state_content.ingress_expiry();
1178 let sender = *read_state_content.sender();
1179 Ok(SignedRequestStatus {
1180 ingress_expiry,
1181 sender,
1182 effective_canister_id,
1183 request_id,
1184 signed_request_status,
1185 })
1186 }
1187
1188 async fn get_subnet_by_canister(
1189 &self,
1190 canister: &Principal,
1191 ) -> Result<Arc<Subnet>, AgentError> {
1192 let subnet = self
1193 .subnet_key_cache
1194 .lock()
1195 .unwrap()
1196 .get_subnet_by_canister(canister);
1197 if let Some(subnet) = subnet {
1198 Ok(subnet)
1199 } else {
1200 self.fetch_subnet_by_canister(canister).await
1201 }
1202 }
1203
1204 pub async fn fetch_api_boundary_nodes_by_canister_id(
1206 &self,
1207 canister_id: Principal,
1208 ) -> Result<Vec<ApiBoundaryNode>, AgentError> {
1209 let paths = vec![vec!["api_boundary_nodes".into()]];
1210 let certificate = self.read_state_raw(paths, canister_id).await?;
1211 let api_boundary_nodes = lookup_api_boundary_nodes(certificate)?;
1212 Ok(api_boundary_nodes)
1213 }
1214
1215 pub async fn fetch_api_boundary_nodes_by_subnet_id(
1217 &self,
1218 subnet_id: Principal,
1219 ) -> Result<Vec<ApiBoundaryNode>, AgentError> {
1220 let paths = vec![vec!["api_boundary_nodes".into()]];
1221 let certificate = self.read_subnet_state_raw(paths, subnet_id).await?;
1222 let api_boundary_nodes = lookup_api_boundary_nodes(certificate)?;
1223 Ok(api_boundary_nodes)
1224 }
1225
1226 async fn fetch_subnet_by_canister(
1227 &self,
1228 canister: &Principal,
1229 ) -> Result<Arc<Subnet>, AgentError> {
1230 let cert = self
1231 .read_state_raw(vec![vec!["subnet".into()]], *canister)
1232 .await?;
1233
1234 let (subnet_id, subnet) = lookup_subnet(&cert, &self.root_key.read().unwrap())?;
1235 let subnet = Arc::new(subnet);
1236 self.subnet_key_cache
1237 .lock()
1238 .unwrap()
1239 .insert_subnet(subnet_id, subnet.clone());
1240 Ok(subnet)
1241 }
1242
1243 async fn request(
1244 &self,
1245 method: Method,
1246 endpoint: &str,
1247 body: Option<Vec<u8>>,
1248 ) -> Result<(StatusCode, HeaderMap, Vec<u8>), AgentError> {
1249 let create_request_with_generated_url = || -> Result<Request, AgentError> {
1250 let url = self.route_provider.route()?.join(endpoint)?;
1251 let mut http_request = Request::new(method.clone(), url);
1252 http_request
1253 .headers_mut()
1254 .insert(CONTENT_TYPE, "application/cbor".parse().unwrap());
1255 *http_request.body_mut() = body.clone().map(Body::from);
1256 Ok(http_request)
1257 };
1258
1259 let response = self
1260 .client
1261 .call(
1262 &create_request_with_generated_url,
1263 self.max_tcp_error_retries,
1264 )
1265 .await?;
1266
1267 let http_status = response.status();
1268 let response_headers = response.headers().clone();
1269
1270 if matches!(self
1272 .max_response_body_size
1273 .zip(response.content_length()), Some((size_limit, content_length)) if content_length > size_limit as u64)
1274 {
1275 return Err(AgentError::ResponseSizeExceededLimit());
1276 }
1277
1278 let mut body: Vec<u8> = response
1279 .content_length()
1280 .map_or_else(Vec::new, |n| Vec::with_capacity(n as usize));
1281
1282 let mut stream = response.bytes_stream();
1283
1284 while let Some(chunk) = stream.next().await {
1285 let chunk = chunk?;
1286
1287 if matches!(self
1289 .max_response_body_size, Some(size_limit) if body.len() + chunk.len() > size_limit)
1290 {
1291 return Err(AgentError::ResponseSizeExceededLimit());
1292 }
1293
1294 body.extend_from_slice(chunk.as_ref());
1295 }
1296
1297 Ok((http_status, response_headers, body))
1298 }
1299
1300 async fn execute(
1301 &self,
1302 method: Method,
1303 endpoint: &str,
1304 body: Option<Vec<u8>>,
1305 ) -> Result<(StatusCode, Vec<u8>), AgentError> {
1306 let request_result = self.request(method.clone(), endpoint, body.clone()).await?;
1307
1308 let status = request_result.0;
1309 let headers = request_result.1;
1310 let body = request_result.2;
1311
1312 if status.is_client_error() || status.is_server_error() {
1313 Err(AgentError::HttpError(HttpErrorPayload {
1314 status: status.into(),
1315 content_type: headers
1316 .get(CONTENT_TYPE)
1317 .and_then(|value| value.to_str().ok())
1318 .map(str::to_string),
1319 content: body,
1320 }))
1321 } else if !(status == StatusCode::OK || status == StatusCode::ACCEPTED) {
1322 Err(AgentError::InvalidHttpResponse(format!(
1323 "Expected `200`, `202`, 4xx`, or `5xx` HTTP status code. Got: {status}",
1324 )))
1325 } else {
1326 Ok((status, body))
1327 }
1328 }
1329}
1330
1331fn principal_is_within_ranges(principal: &Principal, ranges: &[(Principal, Principal)]) -> bool {
1334 ranges
1335 .iter()
1336 .any(|r| principal >= &r.0 && principal <= &r.1)
1337}
1338
1339fn sign_envelope(
1340 content: &EnvelopeContent,
1341 identity: Arc<dyn Identity>,
1342) -> Result<Vec<u8>, AgentError> {
1343 let signature = identity.sign(content).map_err(AgentError::SigningError)?;
1344
1345 let envelope = Envelope {
1346 content: Cow::Borrowed(content),
1347 sender_pubkey: signature.public_key,
1348 sender_sig: signature.signature,
1349 sender_delegation: signature.delegations,
1350 };
1351
1352 let mut serialized_bytes = Vec::new();
1353 let mut serializer = serde_cbor::Serializer::new(&mut serialized_bytes);
1354 serializer.self_describe()?;
1355 envelope.serialize(&mut serializer)?;
1356
1357 Ok(serialized_bytes)
1358}
1359
1360pub fn signed_query_inspect(
1363 sender: Principal,
1364 canister_id: Principal,
1365 method_name: &str,
1366 arg: &[u8],
1367 ingress_expiry: u64,
1368 signed_query: Vec<u8>,
1369) -> Result<(), AgentError> {
1370 let envelope: Envelope =
1371 serde_cbor::from_slice(&signed_query).map_err(AgentError::InvalidCborData)?;
1372 match envelope.content.as_ref() {
1373 EnvelopeContent::Query {
1374 ingress_expiry: ingress_expiry_cbor,
1375 sender: sender_cbor,
1376 canister_id: canister_id_cbor,
1377 method_name: method_name_cbor,
1378 arg: arg_cbor,
1379 nonce: _nonce,
1380 } => {
1381 if ingress_expiry != *ingress_expiry_cbor {
1382 return Err(AgentError::CallDataMismatch {
1383 field: "ingress_expiry".to_string(),
1384 value_arg: ingress_expiry.to_string(),
1385 value_cbor: ingress_expiry_cbor.to_string(),
1386 });
1387 }
1388 if sender != *sender_cbor {
1389 return Err(AgentError::CallDataMismatch {
1390 field: "sender".to_string(),
1391 value_arg: sender.to_string(),
1392 value_cbor: sender_cbor.to_string(),
1393 });
1394 }
1395 if canister_id != *canister_id_cbor {
1396 return Err(AgentError::CallDataMismatch {
1397 field: "canister_id".to_string(),
1398 value_arg: canister_id.to_string(),
1399 value_cbor: canister_id_cbor.to_string(),
1400 });
1401 }
1402 if method_name != *method_name_cbor {
1403 return Err(AgentError::CallDataMismatch {
1404 field: "method_name".to_string(),
1405 value_arg: method_name.to_string(),
1406 value_cbor: method_name_cbor.clone(),
1407 });
1408 }
1409 if arg != *arg_cbor {
1410 return Err(AgentError::CallDataMismatch {
1411 field: "arg".to_string(),
1412 value_arg: format!("{arg:?}"),
1413 value_cbor: format!("{arg_cbor:?}"),
1414 });
1415 }
1416 }
1417 EnvelopeContent::Call { .. } => {
1418 return Err(AgentError::CallDataMismatch {
1419 field: "request_type".to_string(),
1420 value_arg: "query".to_string(),
1421 value_cbor: "call".to_string(),
1422 })
1423 }
1424 EnvelopeContent::ReadState { .. } => {
1425 return Err(AgentError::CallDataMismatch {
1426 field: "request_type".to_string(),
1427 value_arg: "query".to_string(),
1428 value_cbor: "read_state".to_string(),
1429 })
1430 }
1431 }
1432 Ok(())
1433}
1434
1435pub fn signed_update_inspect(
1438 sender: Principal,
1439 canister_id: Principal,
1440 method_name: &str,
1441 arg: &[u8],
1442 ingress_expiry: u64,
1443 signed_update: Vec<u8>,
1444) -> Result<(), AgentError> {
1445 let envelope: Envelope =
1446 serde_cbor::from_slice(&signed_update).map_err(AgentError::InvalidCborData)?;
1447 match envelope.content.as_ref() {
1448 EnvelopeContent::Call {
1449 nonce: _nonce,
1450 ingress_expiry: ingress_expiry_cbor,
1451 sender: sender_cbor,
1452 canister_id: canister_id_cbor,
1453 method_name: method_name_cbor,
1454 arg: arg_cbor,
1455 } => {
1456 if ingress_expiry != *ingress_expiry_cbor {
1457 return Err(AgentError::CallDataMismatch {
1458 field: "ingress_expiry".to_string(),
1459 value_arg: ingress_expiry.to_string(),
1460 value_cbor: ingress_expiry_cbor.to_string(),
1461 });
1462 }
1463 if sender != *sender_cbor {
1464 return Err(AgentError::CallDataMismatch {
1465 field: "sender".to_string(),
1466 value_arg: sender.to_string(),
1467 value_cbor: sender_cbor.to_string(),
1468 });
1469 }
1470 if canister_id != *canister_id_cbor {
1471 return Err(AgentError::CallDataMismatch {
1472 field: "canister_id".to_string(),
1473 value_arg: canister_id.to_string(),
1474 value_cbor: canister_id_cbor.to_string(),
1475 });
1476 }
1477 if method_name != *method_name_cbor {
1478 return Err(AgentError::CallDataMismatch {
1479 field: "method_name".to_string(),
1480 value_arg: method_name.to_string(),
1481 value_cbor: method_name_cbor.clone(),
1482 });
1483 }
1484 if arg != *arg_cbor {
1485 return Err(AgentError::CallDataMismatch {
1486 field: "arg".to_string(),
1487 value_arg: format!("{arg:?}"),
1488 value_cbor: format!("{arg_cbor:?}"),
1489 });
1490 }
1491 }
1492 EnvelopeContent::ReadState { .. } => {
1493 return Err(AgentError::CallDataMismatch {
1494 field: "request_type".to_string(),
1495 value_arg: "call".to_string(),
1496 value_cbor: "read_state".to_string(),
1497 })
1498 }
1499 EnvelopeContent::Query { .. } => {
1500 return Err(AgentError::CallDataMismatch {
1501 field: "request_type".to_string(),
1502 value_arg: "call".to_string(),
1503 value_cbor: "query".to_string(),
1504 })
1505 }
1506 }
1507 Ok(())
1508}
1509
1510pub fn signed_request_status_inspect(
1513 sender: Principal,
1514 request_id: &RequestId,
1515 ingress_expiry: u64,
1516 signed_request_status: Vec<u8>,
1517) -> Result<(), AgentError> {
1518 let paths: Vec<Vec<Label>> = vec![vec!["request_status".into(), request_id.to_vec().into()]];
1519 let envelope: Envelope =
1520 serde_cbor::from_slice(&signed_request_status).map_err(AgentError::InvalidCborData)?;
1521 match envelope.content.as_ref() {
1522 EnvelopeContent::ReadState {
1523 ingress_expiry: ingress_expiry_cbor,
1524 sender: sender_cbor,
1525 paths: paths_cbor,
1526 } => {
1527 if ingress_expiry != *ingress_expiry_cbor {
1528 return Err(AgentError::CallDataMismatch {
1529 field: "ingress_expiry".to_string(),
1530 value_arg: ingress_expiry.to_string(),
1531 value_cbor: ingress_expiry_cbor.to_string(),
1532 });
1533 }
1534 if sender != *sender_cbor {
1535 return Err(AgentError::CallDataMismatch {
1536 field: "sender".to_string(),
1537 value_arg: sender.to_string(),
1538 value_cbor: sender_cbor.to_string(),
1539 });
1540 }
1541
1542 if paths != *paths_cbor {
1543 return Err(AgentError::CallDataMismatch {
1544 field: "paths".to_string(),
1545 value_arg: format!("{paths:?}"),
1546 value_cbor: format!("{paths_cbor:?}"),
1547 });
1548 }
1549 }
1550 EnvelopeContent::Query { .. } => {
1551 return Err(AgentError::CallDataMismatch {
1552 field: "request_type".to_string(),
1553 value_arg: "read_state".to_string(),
1554 value_cbor: "query".to_string(),
1555 })
1556 }
1557 EnvelopeContent::Call { .. } => {
1558 return Err(AgentError::CallDataMismatch {
1559 field: "request_type".to_string(),
1560 value_arg: "read_state".to_string(),
1561 value_cbor: "call".to_string(),
1562 })
1563 }
1564 }
1565 Ok(())
1566}
1567
1568#[derive(Clone)]
1569struct SubnetCache {
1570 subnets: TimedCache<Principal, Arc<Subnet>>,
1571 canister_index: RangeInclusiveMap<Principal, Principal, PrincipalStep>,
1572}
1573
1574impl SubnetCache {
1575 fn new() -> Self {
1576 Self {
1577 subnets: TimedCache::with_lifespan(300),
1578 canister_index: RangeInclusiveMap::new_with_step_fns(),
1579 }
1580 }
1581
1582 fn get_subnet_by_canister(&mut self, canister: &Principal) -> Option<Arc<Subnet>> {
1583 self.canister_index
1584 .get(canister)
1585 .and_then(|subnet_id| self.subnets.cache_get(subnet_id).cloned())
1586 .filter(|subnet| subnet.canister_ranges.contains(canister))
1587 }
1588
1589 fn insert_subnet(&mut self, subnet_id: Principal, subnet: Arc<Subnet>) {
1590 self.subnets.cache_set(subnet_id, subnet.clone());
1591 for range in subnet.canister_ranges.iter() {
1592 self.canister_index.insert(range.clone(), subnet_id);
1593 }
1594 }
1595}
1596
1597#[derive(Clone, Copy)]
1598struct PrincipalStep;
1599
1600impl StepFns<Principal> for PrincipalStep {
1601 fn add_one(start: &Principal) -> Principal {
1602 let bytes = start.as_slice();
1603 let mut arr = [0; 29];
1604 arr[..bytes.len()].copy_from_slice(bytes);
1605 for byte in arr[..bytes.len() - 1].iter_mut().rev() {
1606 *byte = byte.wrapping_add(1);
1607 if *byte != 0 {
1608 break;
1609 }
1610 }
1611 Principal::from_slice(&arr[..bytes.len()])
1612 }
1613 fn sub_one(start: &Principal) -> Principal {
1614 let bytes = start.as_slice();
1615 let mut arr = [0; 29];
1616 arr[..bytes.len()].copy_from_slice(bytes);
1617 for byte in arr[..bytes.len() - 1].iter_mut().rev() {
1618 *byte = byte.wrapping_sub(1);
1619 if *byte != 255 {
1620 break;
1621 }
1622 }
1623 Principal::from_slice(&arr[..bytes.len()])
1624 }
1625}
1626
1627#[derive(Clone)]
1628pub(crate) struct Subnet {
1629 _key: Vec<u8>,
1632 node_keys: HashMap<Principal, Vec<u8>>,
1633 canister_ranges: RangeInclusiveSet<Principal, PrincipalStep>,
1634}
1635
1636#[derive(Debug, Clone)]
1638pub struct ApiBoundaryNode {
1639 pub domain: String,
1641 pub ipv6_address: String,
1643 pub ipv4_address: Option<String>,
1645}
1646
1647#[derive(Debug, Clone)]
1651#[non_exhaustive]
1652pub struct QueryBuilder<'agent> {
1653 agent: &'agent Agent,
1654 pub effective_canister_id: Principal,
1656 pub canister_id: Principal,
1658 pub method_name: String,
1660 pub arg: Vec<u8>,
1662 pub ingress_expiry_datetime: Option<u64>,
1664 pub use_nonce: bool,
1666}
1667
1668impl<'agent> QueryBuilder<'agent> {
1669 pub fn new(agent: &'agent Agent, canister_id: Principal, method_name: String) -> Self {
1671 Self {
1672 agent,
1673 effective_canister_id: canister_id,
1674 canister_id,
1675 method_name,
1676 arg: vec![],
1677 ingress_expiry_datetime: None,
1678 use_nonce: false,
1679 }
1680 }
1681
1682 pub fn with_effective_canister_id(mut self, canister_id: Principal) -> Self {
1684 self.effective_canister_id = canister_id;
1685 self
1686 }
1687
1688 pub fn with_arg<A: Into<Vec<u8>>>(mut self, arg: A) -> Self {
1690 self.arg = arg.into();
1691 self
1692 }
1693
1694 pub fn expire_at(mut self, time: impl Into<OffsetDateTime>) -> Self {
1696 self.ingress_expiry_datetime = Some(time.into().unix_timestamp_nanos() as u64);
1697 self
1698 }
1699
1700 pub fn expire_after(mut self, duration: Duration) -> Self {
1702 self.ingress_expiry_datetime = Some(
1703 OffsetDateTime::now_utc()
1704 .saturating_add(duration.try_into().expect("negative duration"))
1705 .unix_timestamp_nanos() as u64,
1706 );
1707 self
1708 }
1709
1710 pub fn with_nonce_generation(mut self) -> Self {
1713 self.use_nonce = true;
1714 self
1715 }
1716
1717 pub async fn call(self) -> Result<Vec<u8>, AgentError> {
1719 self.agent
1720 .query_raw(
1721 self.canister_id,
1722 self.effective_canister_id,
1723 self.method_name,
1724 self.arg,
1725 self.ingress_expiry_datetime,
1726 self.use_nonce,
1727 None,
1728 )
1729 .await
1730 }
1731
1732 pub async fn call_with_verification(self) -> Result<Vec<u8>, AgentError> {
1737 self.agent
1738 .query_raw(
1739 self.canister_id,
1740 self.effective_canister_id,
1741 self.method_name,
1742 self.arg,
1743 self.ingress_expiry_datetime,
1744 self.use_nonce,
1745 Some(true),
1746 )
1747 .await
1748 }
1749
1750 pub async fn call_without_verification(self) -> Result<Vec<u8>, AgentError> {
1755 self.agent
1756 .query_raw(
1757 self.canister_id,
1758 self.effective_canister_id,
1759 self.method_name,
1760 self.arg,
1761 self.ingress_expiry_datetime,
1762 self.use_nonce,
1763 Some(false),
1764 )
1765 .await
1766 }
1767
1768 pub fn sign(self) -> Result<SignedQuery, AgentError> {
1771 let effective_canister_id = self.effective_canister_id;
1772 let identity = self.agent.identity.clone();
1773 let content = self.into_envelope()?;
1774 let signed_query = sign_envelope(&content, identity)?;
1775 let EnvelopeContent::Query {
1776 ingress_expiry,
1777 sender,
1778 canister_id,
1779 method_name,
1780 arg,
1781 nonce,
1782 } = content
1783 else {
1784 unreachable!()
1785 };
1786 Ok(SignedQuery {
1787 ingress_expiry,
1788 sender,
1789 canister_id,
1790 method_name,
1791 arg,
1792 effective_canister_id,
1793 signed_query,
1794 nonce,
1795 })
1796 }
1797
1798 pub fn into_envelope(self) -> Result<EnvelopeContent, AgentError> {
1800 self.agent.query_content(
1801 self.canister_id,
1802 self.method_name,
1803 self.arg,
1804 self.ingress_expiry_datetime,
1805 self.use_nonce,
1806 )
1807 }
1808}
1809
1810impl<'agent> IntoFuture for QueryBuilder<'agent> {
1811 type IntoFuture = AgentFuture<'agent, Vec<u8>>;
1812 type Output = Result<Vec<u8>, AgentError>;
1813 fn into_future(self) -> Self::IntoFuture {
1814 Box::pin(self.call())
1815 }
1816}
1817
1818pub struct UpdateCall<'agent> {
1820 agent: &'agent Agent,
1821 response_future: AgentFuture<'agent, CallResponse<(Vec<u8>, Certificate)>>,
1822 effective_canister_id: Principal,
1823 canister_id: Principal,
1824 method_name: String,
1825}
1826
1827impl fmt::Debug for UpdateCall<'_> {
1828 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1829 f.debug_struct("UpdateCall")
1830 .field("agent", &self.agent)
1831 .field("effective_canister_id", &self.effective_canister_id)
1832 .finish_non_exhaustive()
1833 }
1834}
1835
1836impl Future for UpdateCall<'_> {
1837 type Output = Result<CallResponse<(Vec<u8>, Certificate)>, AgentError>;
1838 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1839 self.response_future.as_mut().poll(cx)
1840 }
1841}
1842
1843impl<'a> UpdateCall<'a> {
1844 pub async fn and_wait(self) -> Result<(Vec<u8>, Certificate), AgentError> {
1846 let response = self.response_future.await?;
1847
1848 match response {
1849 CallResponse::Response(response) => Ok(response),
1850 CallResponse::Poll(request_id) => {
1851 self.agent
1852 .wait_inner(
1853 &request_id,
1854 self.effective_canister_id,
1855 Some(Operation::Call {
1856 canister: self.canister_id,
1857 method: self.method_name,
1858 }),
1859 )
1860 .await
1861 }
1862 }
1863 }
1864}
1865#[derive(Debug)]
1870pub struct UpdateBuilder<'agent> {
1871 agent: &'agent Agent,
1872 pub effective_canister_id: Principal,
1874 pub canister_id: Principal,
1876 pub method_name: String,
1878 pub arg: Vec<u8>,
1880 pub ingress_expiry_datetime: Option<u64>,
1882}
1883
1884impl<'agent> UpdateBuilder<'agent> {
1885 pub fn new(agent: &'agent Agent, canister_id: Principal, method_name: String) -> Self {
1887 Self {
1888 agent,
1889 effective_canister_id: canister_id,
1890 canister_id,
1891 method_name,
1892 arg: vec![],
1893 ingress_expiry_datetime: None,
1894 }
1895 }
1896
1897 pub fn with_effective_canister_id(mut self, canister_id: Principal) -> Self {
1899 self.effective_canister_id = canister_id;
1900 self
1901 }
1902
1903 pub fn with_arg<A: Into<Vec<u8>>>(mut self, arg: A) -> Self {
1905 self.arg = arg.into();
1906 self
1907 }
1908
1909 pub fn expire_at(mut self, time: impl Into<OffsetDateTime>) -> Self {
1911 self.ingress_expiry_datetime = Some(time.into().unix_timestamp_nanos() as u64);
1912 self
1913 }
1914
1915 pub fn expire_after(mut self, duration: Duration) -> Self {
1917 self.ingress_expiry_datetime = Some(
1918 OffsetDateTime::now_utc()
1919 .saturating_add(duration.try_into().expect("negative duration"))
1920 .unix_timestamp_nanos() as u64,
1921 );
1922 self
1923 }
1924
1925 pub async fn call_and_wait(self) -> Result<Vec<u8>, AgentError> {
1928 self.call().and_wait().await.map(|x| x.0)
1929 }
1930
1931 pub fn call(self) -> UpdateCall<'agent> {
1934 let method_name = self.method_name.clone();
1935 let response_future = async move {
1936 self.agent
1937 .update_raw(
1938 self.canister_id,
1939 self.effective_canister_id,
1940 self.method_name,
1941 self.arg,
1942 self.ingress_expiry_datetime,
1943 )
1944 .await
1945 };
1946 UpdateCall {
1947 agent: self.agent,
1948 response_future: Box::pin(response_future),
1949 effective_canister_id: self.effective_canister_id,
1950 canister_id: self.canister_id,
1951 method_name,
1952 }
1953 }
1954
1955 pub fn sign(self) -> Result<SignedUpdate, AgentError> {
1958 let identity = self.agent.identity.clone();
1959 let effective_canister_id = self.effective_canister_id;
1960 let content = self.into_envelope()?;
1961 let signed_update = sign_envelope(&content, identity)?;
1962 let request_id = to_request_id(&content)?;
1963 let EnvelopeContent::Call {
1964 nonce,
1965 ingress_expiry,
1966 sender,
1967 canister_id,
1968 method_name,
1969 arg,
1970 } = content
1971 else {
1972 unreachable!()
1973 };
1974 Ok(SignedUpdate {
1975 nonce,
1976 ingress_expiry,
1977 sender,
1978 canister_id,
1979 method_name,
1980 arg,
1981 effective_canister_id,
1982 signed_update,
1983 request_id,
1984 })
1985 }
1986
1987 pub fn into_envelope(self) -> Result<EnvelopeContent, AgentError> {
1989 let nonce = self.agent.nonce_factory.generate();
1990 self.agent.update_content(
1991 self.canister_id,
1992 self.method_name,
1993 self.arg,
1994 self.ingress_expiry_datetime,
1995 nonce,
1996 )
1997 }
1998}
1999
2000impl<'agent> IntoFuture for UpdateBuilder<'agent> {
2001 type IntoFuture = AgentFuture<'agent, Vec<u8>>;
2002 type Output = Result<Vec<u8>, AgentError>;
2003 fn into_future(self) -> Self::IntoFuture {
2004 Box::pin(self.call_and_wait())
2005 }
2006}
2007
2008#[cfg_attr(target_family = "wasm", async_trait(?Send))]
2010#[cfg_attr(not(target_family = "wasm"), async_trait)]
2011pub trait HttpService: Send + Sync + Debug {
2012 async fn call<'a>(
2014 &'a self,
2015 req: &'a (dyn Fn() -> Result<Request, AgentError> + Send + Sync),
2016 max_retries: usize,
2017 ) -> Result<Response, AgentError>;
2018}
2019#[cfg(not(target_family = "wasm"))]
2020#[async_trait]
2021impl<T> HttpService for T
2022where
2023 for<'a> &'a T: Service<Request, Response = Response, Error = reqwest::Error>,
2024 for<'a> <&'a Self as Service<Request>>::Future: Send,
2025 T: Send + Sync + Debug + ?Sized,
2026{
2027 #[allow(clippy::needless_arbitrary_self_type)]
2028 async fn call<'a>(
2029 mut self: &'a Self,
2030 req: &'a (dyn Fn() -> Result<Request, AgentError> + Send + Sync),
2031 max_retries: usize,
2032 ) -> Result<Response, AgentError> {
2033 let mut retry_count = 0;
2034 loop {
2035 match Service::call(&mut self, req()?).await {
2036 Err(err) => {
2037 if err.is_connect() {
2039 if retry_count >= max_retries {
2040 return Err(AgentError::TransportError(err));
2041 }
2042 retry_count += 1;
2043 }
2044 }
2045 Ok(resp) => return Ok(resp),
2046 }
2047 }
2048 }
2049}
2050
2051#[cfg(target_family = "wasm")]
2052#[async_trait(?Send)]
2053impl<T> HttpService for T
2054where
2055 for<'a> &'a T: Service<Request, Response = Response, Error = reqwest::Error>,
2056 T: Send + Sync + Debug + ?Sized,
2057{
2058 #[allow(clippy::needless_arbitrary_self_type)]
2059 async fn call<'a>(
2060 mut self: &'a Self,
2061 req: &'a (dyn Fn() -> Result<Request, AgentError> + Send + Sync),
2062 _: usize,
2063 ) -> Result<Response, AgentError> {
2064 Ok(Service::call(&mut self, req()?).await?)
2065 }
2066}
2067
2068#[derive(Debug)]
2069struct Retry429Logic {
2070 client: Client,
2071}
2072
2073#[cfg_attr(target_family = "wasm", async_trait(?Send))]
2074#[cfg_attr(not(target_family = "wasm"), async_trait)]
2075impl HttpService for Retry429Logic {
2076 async fn call<'a>(
2077 &'a self,
2078 req: &'a (dyn Fn() -> Result<Request, AgentError> + Send + Sync),
2079 _max_tcp_retries: usize,
2080 ) -> Result<Response, AgentError> {
2081 let mut retries = 0;
2082 loop {
2083 #[cfg(not(target_family = "wasm"))]
2084 let resp = self.client.call(req, _max_tcp_retries).await?;
2085 #[cfg(target_family = "wasm")]
2087 let resp = self.client.execute(req()?).await?;
2088 if resp.status() == StatusCode::TOO_MANY_REQUESTS {
2089 if retries == 6 {
2090 break Ok(resp);
2091 } else {
2092 retries += 1;
2093 crate::util::sleep(Duration::from_millis(250)).await;
2094 continue;
2095 }
2096 } else {
2097 break Ok(resp);
2098 }
2099 }
2100 }
2101}
2102
2103#[cfg(all(test, not(target_family = "wasm")))]
2104mod offline_tests {
2105 use super::*;
2106 use tokio::net::TcpListener;
2107 #[test]
2110 fn rounded_expiry() {
2111 let agent = Agent::builder()
2112 .with_url("http://not-a-real-url")
2113 .build()
2114 .unwrap();
2115 let mut prev_expiry = None;
2116 let mut num_timestamps = 0;
2117 for _ in 0..6 {
2118 let update = agent
2119 .update(&Principal::management_canister(), "not_a_method")
2120 .sign()
2121 .unwrap();
2122 if prev_expiry < Some(update.ingress_expiry) {
2123 prev_expiry = Some(update.ingress_expiry);
2124 num_timestamps += 1;
2125 }
2126 }
2127 assert!(num_timestamps <= 2, "num_timestamps:{num_timestamps} > 2");
2129 }
2130
2131 #[tokio::test]
2132 async fn client_ratelimit() {
2133 let mock_server = TcpListener::bind("127.0.0.1:0").await.unwrap();
2134 let count = Arc::new(Mutex::new(0));
2135 let port = mock_server.local_addr().unwrap().port();
2136 tokio::spawn({
2137 let count = count.clone();
2138 async move {
2139 loop {
2140 let (mut conn, _) = mock_server.accept().await.unwrap();
2141 *count.lock().unwrap() += 1;
2142 tokio::spawn(
2143 async move { tokio::io::copy(&mut conn, &mut tokio::io::sink()).await },
2145 );
2146 }
2147 }
2148 });
2149 let agent = Agent::builder()
2150 .with_http_client(Client::builder().http1_only().build().unwrap())
2151 .with_url(format!("http://127.0.0.1:{port}"))
2152 .with_max_concurrent_requests(2)
2153 .build()
2154 .unwrap();
2155 for _ in 0..3 {
2156 let agent = agent.clone();
2157 tokio::spawn(async move {
2158 agent
2159 .query(&"ryjl3-tyaaa-aaaaa-aaaba-cai".parse().unwrap(), "greet")
2160 .call()
2161 .await
2162 });
2163 }
2164 crate::util::sleep(Duration::from_millis(250)).await;
2165 assert_eq!(*count.lock().unwrap(), 2);
2166 }
2167}