1pub(crate) mod agent_config;
3pub mod agent_error;
4pub(crate) mod builder;
5#[doc(hidden)]
7#[deprecated(since = "0.38.0", note = "use the AgentBuilder methods")]
8pub mod http_transport;
9pub(crate) mod nonce;
10pub(crate) mod response_authentication;
11pub mod route_provider;
12pub mod status;
13
14pub use agent_config::AgentConfig;
15pub use agent_error::AgentError;
16use agent_error::{HttpErrorPayload, Operation};
17use async_lock::Semaphore;
18use async_trait::async_trait;
19pub use builder::AgentBuilder;
20use cached::{Cached, TimedCache};
21use ed25519_consensus::{Error as Ed25519Error, Signature, VerificationKey};
22use futures_util::StreamExt;
23use http::{header::CONTENT_TYPE, HeaderMap, Method, StatusCode};
24#[doc(inline)]
25pub use ic_transport_types::{
26 signed, CallResponse, Envelope, EnvelopeContent, RejectCode, RejectResponse, ReplyResponse,
27 RequestStatusResponse,
28};
29pub use nonce::{NonceFactory, NonceGenerator};
30use rangemap::{RangeInclusiveMap, RangeInclusiveSet, StepFns};
31use reqwest::{Body, Client, Request, Response};
32use route_provider::{
33 dynamic_routing::{
34 dynamic_route_provider::DynamicRouteProviderBuilder, node::Node,
35 snapshot::latency_based_routing::LatencyRoutingSnapshot,
36 },
37 RouteProvider, UrlUntilReady,
38};
39use time::OffsetDateTime;
40use tower_service::Service;
41
42#[cfg(test)]
43mod agent_test;
44
45use crate::{
46 agent::response_authentication::{
47 extract_der, lookup_canister_info, lookup_canister_metadata, lookup_request_status,
48 lookup_subnet, lookup_subnet_metrics, lookup_time, lookup_value,
49 },
50 export::Principal,
51 identity::Identity,
52 to_request_id, RequestId,
53};
54use backoff::{backoff::Backoff, ExponentialBackoffBuilder};
55use backoff::{exponential::ExponentialBackoff, SystemClock};
56use ic_certification::{Certificate, Delegation, Label};
57use ic_transport_types::{
58 signed::{SignedQuery, SignedRequestStatus, SignedUpdate},
59 QueryResponse, ReadStateResponse, SubnetMetrics, TransportCallResponse,
60};
61use serde::Serialize;
62use status::Status;
63use std::{
64 borrow::Cow,
65 collections::HashMap,
66 convert::TryFrom,
67 fmt::{self, Debug},
68 future::{Future, IntoFuture},
69 pin::Pin,
70 sync::{Arc, Mutex, RwLock},
71 task::{Context, Poll},
72 time::Duration,
73};
74
75use crate::agent::response_authentication::lookup_api_boundary_nodes;
76
77const IC_STATE_ROOT_DOMAIN_SEPARATOR: &[u8; 14] = b"\x0Dic-state-root";
78
79const IC_ROOT_KEY: &[u8; 133] = b"\x30\x81\x82\x30\x1d\x06\x0d\x2b\x06\x01\x04\x01\x82\xdc\x7c\x05\x03\x01\x02\x01\x06\x0c\x2b\x06\x01\x04\x01\x82\xdc\x7c\x05\x03\x02\x01\x03\x61\x00\x81\x4c\x0e\x6e\xc7\x1f\xab\x58\x3b\x08\xbd\x81\x37\x3c\x25\x5c\x3c\x37\x1b\x2e\x84\x86\x3c\x98\xa4\xf1\xe0\x8b\x74\x23\x5d\x14\xfb\x5d\x9c\x0c\xd5\x46\xd9\x68\x5f\x91\x3a\x0c\x0b\x2c\xc5\x34\x15\x83\xbf\x4b\x43\x92\xe4\x67\xdb\x96\xd6\x5b\x9b\xb4\xcb\x71\x71\x12\xf8\x47\x2e\x0d\x5a\x4d\x14\x50\x5f\xfd\x74\x84\xb0\x12\x91\x09\x1c\x5f\x87\xb9\x88\x83\x46\x3f\x98\x09\x1a\x0b\xaa\xae";
80
81#[cfg(not(target_family = "wasm"))]
82type AgentFuture<'a, V> = Pin<Box<dyn Future<Output = Result<V, AgentError>> + Send + 'a>>;
83
84#[cfg(target_family = "wasm")]
85type AgentFuture<'a, V> = Pin<Box<dyn Future<Output = Result<V, AgentError>> + 'a>>;
86
87#[derive(Clone)]
153pub struct Agent {
154 nonce_factory: Arc<dyn NonceGenerator>,
155 identity: Arc<dyn Identity>,
156 ingress_expiry: Duration,
157 root_key: Arc<RwLock<Vec<u8>>>,
158 client: Arc<dyn HttpService>,
159 route_provider: Arc<dyn RouteProvider>,
160 subnet_key_cache: Arc<Mutex<SubnetCache>>,
161 concurrent_requests_semaphore: Arc<Semaphore>,
162 verify_query_signatures: bool,
163 max_response_body_size: Option<usize>,
164 max_polling_time: Duration,
165 #[allow(dead_code)]
166 max_tcp_error_retries: usize,
167}
168
169impl fmt::Debug for Agent {
170 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
171 f.debug_struct("Agent")
172 .field("ingress_expiry", &self.ingress_expiry)
173 .finish_non_exhaustive()
174 }
175}
176
177impl Agent {
178 pub fn builder() -> builder::AgentBuilder {
181 Default::default()
182 }
183
184 pub fn new(config: agent_config::AgentConfig) -> Result<Agent, AgentError> {
186 let client = config.http_service.unwrap_or_else(|| {
187 Arc::new(Retry429Logic {
188 client: config.client.unwrap_or_else(|| {
189 #[cfg(not(target_family = "wasm"))]
190 {
191 Client::builder()
192 .use_rustls_tls()
193 .timeout(Duration::from_secs(360))
194 .build()
195 .expect("Could not create HTTP client.")
196 }
197 #[cfg(all(target_family = "wasm", feature = "wasm-bindgen"))]
198 {
199 Client::new()
200 }
201 }),
202 })
203 });
204 Ok(Agent {
205 nonce_factory: config.nonce_factory,
206 identity: config.identity,
207 ingress_expiry: config.ingress_expiry,
208 root_key: Arc::new(RwLock::new(IC_ROOT_KEY.to_vec())),
209 client: client.clone(),
210 route_provider: if let Some(route_provider) = config.route_provider {
211 route_provider
212 } else if let Some(url) = config.url {
213 if config.background_dynamic_routing {
214 assert!(
215 url.scheme() == "https" && url.path() == "/" && url.port().is_none() && url.domain().is_some(),
216 "in dynamic routing mode, URL must be in the exact form https://domain with no path, port, IP, or non-HTTPS scheme"
217 );
218 let seeds = vec![Node::new(url.domain().unwrap()).unwrap()];
219 UrlUntilReady::new(url, async move {
220 DynamicRouteProviderBuilder::new(
221 LatencyRoutingSnapshot::new(),
222 seeds,
223 client,
224 )
225 .build()
226 .await
227 }) as Arc<dyn RouteProvider>
228 } else {
229 Arc::new(url)
230 }
231 } else {
232 panic!("either route_provider or url must be specified");
233 },
234 subnet_key_cache: Arc::new(Mutex::new(SubnetCache::new())),
235 verify_query_signatures: config.verify_query_signatures,
236 concurrent_requests_semaphore: Arc::new(Semaphore::new(config.max_concurrent_requests)),
237 max_response_body_size: config.max_response_body_size,
238 max_tcp_error_retries: config.max_tcp_error_retries,
239 max_polling_time: config.max_polling_time,
240 })
241 }
242
243 pub fn set_identity<I>(&mut self, identity: I)
249 where
250 I: 'static + Identity,
251 {
252 self.identity = Arc::new(identity);
253 }
254 pub fn set_arc_identity(&mut self, identity: Arc<dyn Identity>) {
260 self.identity = identity;
261 }
262
263 pub async fn fetch_root_key(&self) -> Result<(), AgentError> {
272 if self.read_root_key()[..] != IC_ROOT_KEY[..] {
273 return Ok(());
275 }
276 let status = self.status().await?;
277 let Some(root_key) = status.root_key else {
278 return Err(AgentError::NoRootKeyInStatus(status));
279 };
280 self.set_root_key(root_key);
281 Ok(())
282 }
283
284 pub fn set_root_key(&self, root_key: Vec<u8>) {
289 *self.root_key.write().unwrap() = root_key;
290 }
291
292 pub fn read_root_key(&self) -> Vec<u8> {
294 self.root_key.read().unwrap().clone()
295 }
296
297 fn get_expiry_date(&self) -> u64 {
298 let expiry_raw = OffsetDateTime::now_utc() + self.ingress_expiry;
299 let mut rounded = expiry_raw.replace_nanosecond(0).unwrap();
300 if self.ingress_expiry.as_secs() > 90 {
301 rounded = rounded.replace_second(0).unwrap();
302 }
303 rounded.unix_timestamp_nanos().try_into().unwrap()
304 }
305
306 pub fn get_principal(&self) -> Result<Principal, String> {
308 self.identity.sender()
309 }
310
311 async fn query_endpoint<A>(
312 &self,
313 effective_canister_id: Principal,
314 serialized_bytes: Vec<u8>,
315 ) -> Result<A, AgentError>
316 where
317 A: serde::de::DeserializeOwned,
318 {
319 let _permit = self.concurrent_requests_semaphore.acquire().await;
320 let bytes = self
321 .execute(
322 Method::POST,
323 &format!("api/v2/canister/{}/query", effective_canister_id.to_text()),
324 Some(serialized_bytes),
325 )
326 .await?
327 .1;
328 serde_cbor::from_slice(&bytes).map_err(AgentError::InvalidCborData)
329 }
330
331 async fn read_state_endpoint<A>(
332 &self,
333 effective_canister_id: Principal,
334 serialized_bytes: Vec<u8>,
335 ) -> Result<A, AgentError>
336 where
337 A: serde::de::DeserializeOwned,
338 {
339 let _permit = self.concurrent_requests_semaphore.acquire().await;
340 let endpoint = format!(
341 "api/v2/canister/{}/read_state",
342 effective_canister_id.to_text()
343 );
344 let bytes = self
345 .execute(Method::POST, &endpoint, Some(serialized_bytes))
346 .await?
347 .1;
348 serde_cbor::from_slice(&bytes).map_err(AgentError::InvalidCborData)
349 }
350
351 async fn read_subnet_state_endpoint<A>(
352 &self,
353 subnet_id: Principal,
354 serialized_bytes: Vec<u8>,
355 ) -> Result<A, AgentError>
356 where
357 A: serde::de::DeserializeOwned,
358 {
359 let _permit = self.concurrent_requests_semaphore.acquire().await;
360 let endpoint = format!("api/v2/subnet/{}/read_state", subnet_id.to_text());
361 let bytes = self
362 .execute(Method::POST, &endpoint, Some(serialized_bytes))
363 .await?
364 .1;
365 serde_cbor::from_slice(&bytes).map_err(AgentError::InvalidCborData)
366 }
367
368 async fn call_endpoint(
369 &self,
370 effective_canister_id: Principal,
371 serialized_bytes: Vec<u8>,
372 ) -> Result<TransportCallResponse, AgentError> {
373 let _permit = self.concurrent_requests_semaphore.acquire().await;
374 let endpoint = format!("api/v3/canister/{}/call", effective_canister_id.to_text());
375 let (status_code, response_body) = self
376 .execute(Method::POST, &endpoint, Some(serialized_bytes))
377 .await?;
378
379 if status_code == StatusCode::ACCEPTED {
380 return Ok(TransportCallResponse::Accepted);
381 }
382
383 serde_cbor::from_slice(&response_body).map_err(AgentError::InvalidCborData)
384 }
385
386 #[allow(clippy::too_many_arguments)]
389 async fn query_raw(
390 &self,
391 canister_id: Principal,
392 effective_canister_id: Principal,
393 method_name: String,
394 arg: Vec<u8>,
395 ingress_expiry_datetime: Option<u64>,
396 use_nonce: bool,
397 explicit_verify_query_signatures: Option<bool>,
398 ) -> Result<Vec<u8>, AgentError> {
399 let operation = Operation::Call {
400 canister: canister_id,
401 method: method_name.clone(),
402 };
403 let content = self.query_content(
404 canister_id,
405 method_name,
406 arg,
407 ingress_expiry_datetime,
408 use_nonce,
409 )?;
410 let serialized_bytes = sign_envelope(&content, self.identity.clone())?;
411 self.query_inner(
412 effective_canister_id,
413 serialized_bytes,
414 content.to_request_id(),
415 explicit_verify_query_signatures,
416 operation,
417 )
418 .await
419 }
420
421 pub async fn query_signed(
425 &self,
426 effective_canister_id: Principal,
427 signed_query: Vec<u8>,
428 ) -> Result<Vec<u8>, AgentError> {
429 let envelope: Envelope =
430 serde_cbor::from_slice(&signed_query).map_err(AgentError::InvalidCborData)?;
431 let EnvelopeContent::Query {
432 canister_id,
433 method_name,
434 ..
435 } = &*envelope.content
436 else {
437 return Err(AgentError::CallDataMismatch {
438 field: "request_type".to_string(),
439 value_arg: "query".to_string(),
440 value_cbor: if matches!(*envelope.content, EnvelopeContent::Call { .. }) {
441 "update"
442 } else {
443 "read_state"
444 }
445 .to_string(),
446 });
447 };
448 let operation = Operation::Call {
449 canister: *canister_id,
450 method: method_name.clone(),
451 };
452 self.query_inner(
453 effective_canister_id,
454 signed_query,
455 envelope.content.to_request_id(),
456 None,
457 operation,
458 )
459 .await
460 }
461
462 async fn query_inner(
466 &self,
467 effective_canister_id: Principal,
468 signed_query: Vec<u8>,
469 request_id: RequestId,
470 explicit_verify_query_signatures: Option<bool>,
471 operation: Operation,
472 ) -> Result<Vec<u8>, AgentError> {
473 let response = if explicit_verify_query_signatures.unwrap_or(self.verify_query_signatures) {
474 let (response, mut subnet) = futures_util::try_join!(
475 self.query_endpoint::<QueryResponse>(effective_canister_id, signed_query),
476 self.get_subnet_by_canister(&effective_canister_id)
477 )?;
478 if response.signatures().is_empty() {
479 return Err(AgentError::MissingSignature);
480 } else if response.signatures().len() > subnet.node_keys.len() {
481 return Err(AgentError::TooManySignatures {
482 had: response.signatures().len(),
483 needed: subnet.node_keys.len(),
484 });
485 }
486 for signature in response.signatures() {
487 if OffsetDateTime::now_utc()
488 - OffsetDateTime::from_unix_timestamp_nanos(signature.timestamp.into()).unwrap()
489 > self.ingress_expiry
490 {
491 return Err(AgentError::CertificateOutdated(self.ingress_expiry));
492 }
493 let signable = response.signable(request_id, signature.timestamp);
494 let node_key = if let Some(node_key) = subnet.node_keys.get(&signature.identity) {
495 node_key
496 } else {
497 subnet = self
498 .fetch_subnet_by_canister(&effective_canister_id)
499 .await?;
500 subnet
501 .node_keys
502 .get(&signature.identity)
503 .ok_or(AgentError::CertificateNotAuthorized())?
504 };
505 if node_key.len() != 44 {
506 return Err(AgentError::DerKeyLengthMismatch {
507 expected: 44,
508 actual: node_key.len(),
509 });
510 }
511 const DER_PREFIX: [u8; 12] = [48, 42, 48, 5, 6, 3, 43, 101, 112, 3, 33, 0];
512 if node_key[..12] != DER_PREFIX {
513 return Err(AgentError::DerPrefixMismatch {
514 expected: DER_PREFIX.to_vec(),
515 actual: node_key[..12].to_vec(),
516 });
517 }
518 let pubkey =
519 VerificationKey::try_from(<[u8; 32]>::try_from(&node_key[12..]).unwrap())
520 .map_err(|_| AgentError::MalformedPublicKey)?;
521 let sig = Signature::from(
522 <[u8; 64]>::try_from(&signature.signature[..])
523 .map_err(|_| AgentError::MalformedSignature)?,
524 );
525
526 match pubkey.verify(&sig, &signable) {
527 Err(Ed25519Error::InvalidSignature) => {
528 return Err(AgentError::QuerySignatureVerificationFailed)
529 }
530 Err(Ed25519Error::InvalidSliceLength) => {
531 return Err(AgentError::MalformedSignature)
532 }
533 Err(Ed25519Error::MalformedPublicKey) => {
534 return Err(AgentError::MalformedPublicKey)
535 }
536 Ok(()) => (),
537 _ => unreachable!(),
538 }
539 }
540 response
541 } else {
542 self.query_endpoint::<QueryResponse>(effective_canister_id, signed_query)
543 .await?
544 };
545
546 match response {
547 QueryResponse::Replied { reply, .. } => Ok(reply.arg),
548 QueryResponse::Rejected { reject, .. } => Err(AgentError::UncertifiedReject {
549 reject,
550 operation: Some(operation),
551 }),
552 }
553 }
554
555 fn query_content(
556 &self,
557 canister_id: Principal,
558 method_name: String,
559 arg: Vec<u8>,
560 ingress_expiry_datetime: Option<u64>,
561 use_nonce: bool,
562 ) -> Result<EnvelopeContent, AgentError> {
563 Ok(EnvelopeContent::Query {
564 sender: self.identity.sender().map_err(AgentError::SigningError)?,
565 canister_id,
566 method_name,
567 arg,
568 ingress_expiry: ingress_expiry_datetime.unwrap_or_else(|| self.get_expiry_date()),
569 nonce: use_nonce.then(|| self.nonce_factory.generate()).flatten(),
570 })
571 }
572
573 async fn update_raw(
575 &self,
576 canister_id: Principal,
577 effective_canister_id: Principal,
578 method_name: String,
579 arg: Vec<u8>,
580 ingress_expiry_datetime: Option<u64>,
581 ) -> Result<CallResponse<(Vec<u8>, Certificate)>, AgentError> {
582 let nonce = self.nonce_factory.generate();
583 let content = self.update_content(
584 canister_id,
585 method_name.clone(),
586 arg,
587 ingress_expiry_datetime,
588 nonce,
589 )?;
590 let operation = Some(Operation::Call {
591 canister: canister_id,
592 method: method_name,
593 });
594 let request_id = to_request_id(&content)?;
595 let serialized_bytes = sign_envelope(&content, self.identity.clone())?;
596
597 let response_body = self
598 .call_endpoint(effective_canister_id, serialized_bytes)
599 .await?;
600
601 match response_body {
602 TransportCallResponse::Replied { certificate } => {
603 let certificate =
604 serde_cbor::from_slice(&certificate).map_err(AgentError::InvalidCborData)?;
605
606 self.verify(&certificate, effective_canister_id)?;
607 let status = lookup_request_status(&certificate, &request_id)?;
608
609 match status {
610 RequestStatusResponse::Replied(reply) => {
611 Ok(CallResponse::Response((reply.arg, certificate)))
612 }
613 RequestStatusResponse::Rejected(reject_response) => {
614 Err(AgentError::CertifiedReject {
615 reject: reject_response,
616 operation,
617 })?
618 }
619 _ => Ok(CallResponse::Poll(request_id)),
620 }
621 }
622 TransportCallResponse::Accepted => Ok(CallResponse::Poll(request_id)),
623 TransportCallResponse::NonReplicatedRejection(reject_response) => {
624 Err(AgentError::UncertifiedReject {
625 reject: reject_response,
626 operation,
627 })
628 }
629 }
630 }
631
632 pub async fn update_signed(
636 &self,
637 effective_canister_id: Principal,
638 signed_update: Vec<u8>,
639 ) -> Result<CallResponse<Vec<u8>>, AgentError> {
640 let envelope: Envelope =
641 serde_cbor::from_slice(&signed_update).map_err(AgentError::InvalidCborData)?;
642 let EnvelopeContent::Call {
643 canister_id,
644 method_name,
645 ..
646 } = &*envelope.content
647 else {
648 return Err(AgentError::CallDataMismatch {
649 field: "request_type".to_string(),
650 value_arg: "update".to_string(),
651 value_cbor: if matches!(*envelope.content, EnvelopeContent::Query { .. }) {
652 "query"
653 } else {
654 "read_state"
655 }
656 .to_string(),
657 });
658 };
659 let operation = Some(Operation::Call {
660 canister: *canister_id,
661 method: method_name.clone(),
662 });
663 let request_id = to_request_id(&envelope.content)?;
664
665 let response_body = self
666 .call_endpoint(effective_canister_id, signed_update)
667 .await?;
668
669 match response_body {
670 TransportCallResponse::Replied { certificate } => {
671 let certificate =
672 serde_cbor::from_slice(&certificate).map_err(AgentError::InvalidCborData)?;
673
674 self.verify(&certificate, effective_canister_id)?;
675 let status = lookup_request_status(&certificate, &request_id)?;
676
677 match status {
678 RequestStatusResponse::Replied(reply) => Ok(CallResponse::Response(reply.arg)),
679 RequestStatusResponse::Rejected(reject_response) => {
680 Err(AgentError::CertifiedReject {
681 reject: reject_response,
682 operation,
683 })?
684 }
685 _ => Ok(CallResponse::Poll(request_id)),
686 }
687 }
688 TransportCallResponse::Accepted => Ok(CallResponse::Poll(request_id)),
689 TransportCallResponse::NonReplicatedRejection(reject_response) => {
690 Err(AgentError::UncertifiedReject {
691 reject: reject_response,
692 operation,
693 })
694 }
695 }
696 }
697
698 fn update_content(
699 &self,
700 canister_id: Principal,
701 method_name: String,
702 arg: Vec<u8>,
703 ingress_expiry_datetime: Option<u64>,
704 nonce: Option<Vec<u8>>,
705 ) -> Result<EnvelopeContent, AgentError> {
706 Ok(EnvelopeContent::Call {
707 canister_id,
708 method_name,
709 arg,
710 nonce,
711 sender: self.identity.sender().map_err(AgentError::SigningError)?,
712 ingress_expiry: ingress_expiry_datetime.unwrap_or_else(|| self.get_expiry_date()),
713 })
714 }
715
716 fn get_retry_policy(&self) -> ExponentialBackoff<SystemClock> {
717 ExponentialBackoffBuilder::new()
718 .with_initial_interval(Duration::from_millis(500))
719 .with_max_interval(Duration::from_secs(1))
720 .with_multiplier(1.4)
721 .with_max_elapsed_time(Some(self.max_polling_time))
722 .build()
723 }
724
725 pub async fn wait_signed(
727 &self,
728 request_id: &RequestId,
729 effective_canister_id: Principal,
730 signed_request_status: Vec<u8>,
731 ) -> Result<(Vec<u8>, Certificate), AgentError> {
732 let mut retry_policy = self.get_retry_policy();
733
734 let mut request_accepted = false;
735 let (resp, cert) = self
736 .request_status_signed(
737 request_id,
738 effective_canister_id,
739 signed_request_status.clone(),
740 )
741 .await?;
742 loop {
743 match resp {
744 RequestStatusResponse::Unknown => {}
745
746 RequestStatusResponse::Received | RequestStatusResponse::Processing => {
747 if !request_accepted {
748 retry_policy.reset();
749 request_accepted = true;
750 }
751 }
752
753 RequestStatusResponse::Replied(ReplyResponse { arg, .. }) => {
754 return Ok((arg, cert))
755 }
756
757 RequestStatusResponse::Rejected(response) => {
758 return Err(AgentError::CertifiedReject {
759 reject: response,
760 operation: None,
761 })
762 }
763
764 RequestStatusResponse::Done => {
765 return Err(AgentError::RequestStatusDoneNoReply(String::from(
766 *request_id,
767 )))
768 }
769 };
770
771 match retry_policy.next_backoff() {
772 Some(duration) => crate::util::sleep(duration).await,
773
774 None => return Err(AgentError::TimeoutWaitingForResponse()),
775 }
776 }
777 }
778
779 pub async fn wait(
781 &self,
782 request_id: &RequestId,
783 effective_canister_id: Principal,
784 ) -> Result<(Vec<u8>, Certificate), AgentError> {
785 self.wait_inner(request_id, effective_canister_id, None)
786 .await
787 }
788
789 async fn wait_inner(
790 &self,
791 request_id: &RequestId,
792 effective_canister_id: Principal,
793 operation: Option<Operation>,
794 ) -> Result<(Vec<u8>, Certificate), AgentError> {
795 let mut retry_policy = self.get_retry_policy();
796
797 let mut request_accepted = false;
798 loop {
799 let (resp, cert) = self
800 .request_status_raw(request_id, effective_canister_id)
801 .await?;
802 match resp {
803 RequestStatusResponse::Unknown => {}
804
805 RequestStatusResponse::Received | RequestStatusResponse::Processing => {
806 if !request_accepted {
807 retry_policy.reset();
815 request_accepted = true;
816 }
817 }
818
819 RequestStatusResponse::Replied(ReplyResponse { arg, .. }) => {
820 return Ok((arg, cert))
821 }
822
823 RequestStatusResponse::Rejected(response) => {
824 return Err(AgentError::CertifiedReject {
825 reject: response,
826 operation,
827 })
828 }
829
830 RequestStatusResponse::Done => {
831 return Err(AgentError::RequestStatusDoneNoReply(String::from(
832 *request_id,
833 )))
834 }
835 };
836
837 match retry_policy.next_backoff() {
838 Some(duration) => crate::util::sleep(duration).await,
839
840 None => return Err(AgentError::TimeoutWaitingForResponse()),
841 }
842 }
843 }
844
845 pub async fn read_state_raw(
848 &self,
849 paths: Vec<Vec<Label>>,
850 effective_canister_id: Principal,
851 ) -> Result<Certificate, AgentError> {
852 let content = self.read_state_content(paths)?;
853 let serialized_bytes = sign_envelope(&content, self.identity.clone())?;
854
855 let read_state_response: ReadStateResponse = self
856 .read_state_endpoint(effective_canister_id, serialized_bytes)
857 .await?;
858 let cert: Certificate = serde_cbor::from_slice(&read_state_response.certificate)
859 .map_err(AgentError::InvalidCborData)?;
860 self.verify(&cert, effective_canister_id)?;
861 Ok(cert)
862 }
863
864 pub async fn read_subnet_state_raw(
867 &self,
868 paths: Vec<Vec<Label>>,
869 subnet_id: Principal,
870 ) -> Result<Certificate, AgentError> {
871 let content = self.read_state_content(paths)?;
872 let serialized_bytes = sign_envelope(&content, self.identity.clone())?;
873
874 let read_state_response: ReadStateResponse = self
875 .read_subnet_state_endpoint(subnet_id, serialized_bytes)
876 .await?;
877 let cert: Certificate = serde_cbor::from_slice(&read_state_response.certificate)
878 .map_err(AgentError::InvalidCborData)?;
879 self.verify_for_subnet(&cert, subnet_id)?;
880 Ok(cert)
881 }
882
883 fn read_state_content(&self, paths: Vec<Vec<Label>>) -> Result<EnvelopeContent, AgentError> {
884 Ok(EnvelopeContent::ReadState {
885 sender: self.identity.sender().map_err(AgentError::SigningError)?,
886 paths,
887 ingress_expiry: self.get_expiry_date(),
888 })
889 }
890
891 pub fn verify(
894 &self,
895 cert: &Certificate,
896 effective_canister_id: Principal,
897 ) -> Result<(), AgentError> {
898 self.verify_cert(cert, effective_canister_id)?;
899 self.verify_cert_timestamp(cert)?;
900 Ok(())
901 }
902
903 fn verify_cert(
904 &self,
905 cert: &Certificate,
906 effective_canister_id: Principal,
907 ) -> Result<(), AgentError> {
908 let sig = &cert.signature;
909
910 let root_hash = cert.tree.digest();
911 let mut msg = vec![];
912 msg.extend_from_slice(IC_STATE_ROOT_DOMAIN_SEPARATOR);
913 msg.extend_from_slice(&root_hash);
914
915 let der_key = self.check_delegation(&cert.delegation, effective_canister_id)?;
916 let key = extract_der(der_key)?;
917
918 ic_verify_bls_signature::verify_bls_signature(sig, &msg, &key)
919 .map_err(|_| AgentError::CertificateVerificationFailed())?;
920 Ok(())
921 }
922
923 pub fn verify_for_subnet(
926 &self,
927 cert: &Certificate,
928 subnet_id: Principal,
929 ) -> Result<(), AgentError> {
930 self.verify_cert_for_subnet(cert, subnet_id)?;
931 self.verify_cert_timestamp(cert)?;
932 Ok(())
933 }
934
935 fn verify_cert_for_subnet(
936 &self,
937 cert: &Certificate,
938 subnet_id: Principal,
939 ) -> Result<(), AgentError> {
940 let sig = &cert.signature;
941
942 let root_hash = cert.tree.digest();
943 let mut msg = vec![];
944 msg.extend_from_slice(IC_STATE_ROOT_DOMAIN_SEPARATOR);
945 msg.extend_from_slice(&root_hash);
946
947 let der_key = self.check_delegation_for_subnet(&cert.delegation, subnet_id)?;
948 let key = extract_der(der_key)?;
949
950 ic_verify_bls_signature::verify_bls_signature(sig, &msg, &key)
951 .map_err(|_| AgentError::CertificateVerificationFailed())?;
952 Ok(())
953 }
954
955 fn verify_cert_timestamp(&self, cert: &Certificate) -> Result<(), AgentError> {
956 let time = lookup_time(cert)?;
957 if (OffsetDateTime::now_utc()
958 - OffsetDateTime::from_unix_timestamp_nanos(time.into()).unwrap())
959 .abs()
960 > self.ingress_expiry
961 {
962 Err(AgentError::CertificateOutdated(self.ingress_expiry))
963 } else {
964 Ok(())
965 }
966 }
967
968 fn check_delegation(
969 &self,
970 delegation: &Option<Delegation>,
971 effective_canister_id: Principal,
972 ) -> Result<Vec<u8>, AgentError> {
973 match delegation {
974 None => Ok(self.read_root_key()),
975 Some(delegation) => {
976 let cert: Certificate = serde_cbor::from_slice(&delegation.certificate)
977 .map_err(AgentError::InvalidCborData)?;
978 if cert.delegation.is_some() {
979 return Err(AgentError::CertificateHasTooManyDelegations);
980 }
981 self.verify_cert(&cert, effective_canister_id)?;
982 let canister_range_lookup = [
983 "subnet".as_bytes(),
984 delegation.subnet_id.as_ref(),
985 "canister_ranges".as_bytes(),
986 ];
987 let canister_range = lookup_value(&cert.tree, canister_range_lookup)?;
988 let ranges: Vec<(Principal, Principal)> =
989 serde_cbor::from_slice(canister_range).map_err(AgentError::InvalidCborData)?;
990 if !principal_is_within_ranges(&effective_canister_id, &ranges[..]) {
991 return Err(AgentError::CertificateNotAuthorized());
993 }
994
995 let public_key_path = [
996 "subnet".as_bytes(),
997 delegation.subnet_id.as_ref(),
998 "public_key".as_bytes(),
999 ];
1000 lookup_value(&cert.tree, public_key_path).map(<[u8]>::to_vec)
1001 }
1002 }
1003 }
1004
1005 fn check_delegation_for_subnet(
1006 &self,
1007 delegation: &Option<Delegation>,
1008 subnet_id: Principal,
1009 ) -> Result<Vec<u8>, AgentError> {
1010 match delegation {
1011 None => Ok(self.read_root_key()),
1012 Some(delegation) => {
1013 let cert: Certificate = serde_cbor::from_slice(&delegation.certificate)
1014 .map_err(AgentError::InvalidCborData)?;
1015 if cert.delegation.is_some() {
1016 return Err(AgentError::CertificateHasTooManyDelegations);
1017 }
1018 self.verify_cert_for_subnet(&cert, subnet_id)?;
1019 let public_key_path = [
1020 "subnet".as_bytes(),
1021 delegation.subnet_id.as_ref(),
1022 "public_key".as_bytes(),
1023 ];
1024 let pk = lookup_value(&cert.tree, public_key_path)
1025 .map_err(|_| AgentError::CertificateNotAuthorized())?
1026 .to_vec();
1027 Ok(pk)
1028 }
1029 }
1030 }
1031
1032 pub async fn read_state_canister_info(
1035 &self,
1036 canister_id: Principal,
1037 path: &str,
1038 ) -> Result<Vec<u8>, AgentError> {
1039 let paths: Vec<Vec<Label>> = vec![vec![
1040 "canister".into(),
1041 Label::from_bytes(canister_id.as_slice()),
1042 path.into(),
1043 ]];
1044
1045 let cert = self.read_state_raw(paths, canister_id).await?;
1046
1047 lookup_canister_info(cert, canister_id, path)
1048 }
1049
1050 pub async fn read_state_canister_metadata(
1052 &self,
1053 canister_id: Principal,
1054 path: &str,
1055 ) -> Result<Vec<u8>, AgentError> {
1056 let paths: Vec<Vec<Label>> = vec![vec![
1057 "canister".into(),
1058 Label::from_bytes(canister_id.as_slice()),
1059 "metadata".into(),
1060 path.into(),
1061 ]];
1062
1063 let cert = self.read_state_raw(paths, canister_id).await?;
1064
1065 lookup_canister_metadata(cert, canister_id, path)
1066 }
1067
1068 pub async fn read_state_subnet_metrics(
1070 &self,
1071 subnet_id: Principal,
1072 ) -> Result<SubnetMetrics, AgentError> {
1073 let paths = vec![vec![
1074 "subnet".into(),
1075 Label::from_bytes(subnet_id.as_slice()),
1076 "metrics".into(),
1077 ]];
1078 let cert = self.read_subnet_state_raw(paths, subnet_id).await?;
1079 lookup_subnet_metrics(cert, subnet_id)
1080 }
1081
1082 pub async fn request_status_raw(
1084 &self,
1085 request_id: &RequestId,
1086 effective_canister_id: Principal,
1087 ) -> Result<(RequestStatusResponse, Certificate), AgentError> {
1088 let paths: Vec<Vec<Label>> =
1089 vec![vec!["request_status".into(), request_id.to_vec().into()]];
1090
1091 let cert = self.read_state_raw(paths, effective_canister_id).await?;
1092
1093 Ok((lookup_request_status(&cert, request_id)?, cert))
1094 }
1095
1096 pub async fn request_status_signed(
1100 &self,
1101 request_id: &RequestId,
1102 effective_canister_id: Principal,
1103 signed_request_status: Vec<u8>,
1104 ) -> Result<(RequestStatusResponse, Certificate), AgentError> {
1105 let _envelope: Envelope =
1106 serde_cbor::from_slice(&signed_request_status).map_err(AgentError::InvalidCborData)?;
1107 let read_state_response: ReadStateResponse = self
1108 .read_state_endpoint(effective_canister_id, signed_request_status)
1109 .await?;
1110
1111 let cert: Certificate = serde_cbor::from_slice(&read_state_response.certificate)
1112 .map_err(AgentError::InvalidCborData)?;
1113 self.verify(&cert, effective_canister_id)?;
1114 Ok((lookup_request_status(&cert, request_id)?, cert))
1115 }
1116
1117 pub fn update<S: Into<String>>(
1120 &self,
1121 canister_id: &Principal,
1122 method_name: S,
1123 ) -> UpdateBuilder {
1124 UpdateBuilder::new(self, *canister_id, method_name.into())
1125 }
1126
1127 pub async fn status(&self) -> Result<Status, AgentError> {
1129 let endpoint = "api/v2/status";
1130 let bytes = self.execute(Method::GET, endpoint, None).await?.1;
1131
1132 let cbor: serde_cbor::Value =
1133 serde_cbor::from_slice(&bytes).map_err(AgentError::InvalidCborData)?;
1134
1135 Status::try_from(&cbor).map_err(|_| AgentError::InvalidReplicaStatus)
1136 }
1137
1138 pub fn query<S: Into<String>>(&self, canister_id: &Principal, method_name: S) -> QueryBuilder {
1141 QueryBuilder::new(self, *canister_id, method_name.into())
1142 }
1143
1144 pub fn sign_request_status(
1147 &self,
1148 effective_canister_id: Principal,
1149 request_id: RequestId,
1150 ) -> Result<SignedRequestStatus, AgentError> {
1151 let paths: Vec<Vec<Label>> =
1152 vec![vec!["request_status".into(), request_id.to_vec().into()]];
1153 let read_state_content = self.read_state_content(paths)?;
1154 let signed_request_status = sign_envelope(&read_state_content, self.identity.clone())?;
1155 let ingress_expiry = read_state_content.ingress_expiry();
1156 let sender = *read_state_content.sender();
1157 Ok(SignedRequestStatus {
1158 ingress_expiry,
1159 sender,
1160 effective_canister_id,
1161 request_id,
1162 signed_request_status,
1163 })
1164 }
1165
1166 async fn get_subnet_by_canister(
1167 &self,
1168 canister: &Principal,
1169 ) -> Result<Arc<Subnet>, AgentError> {
1170 let subnet = self
1171 .subnet_key_cache
1172 .lock()
1173 .unwrap()
1174 .get_subnet_by_canister(canister);
1175 if let Some(subnet) = subnet {
1176 Ok(subnet)
1177 } else {
1178 self.fetch_subnet_by_canister(canister).await
1179 }
1180 }
1181
1182 pub async fn fetch_api_boundary_nodes_by_canister_id(
1184 &self,
1185 canister_id: Principal,
1186 ) -> Result<Vec<ApiBoundaryNode>, AgentError> {
1187 let paths = vec![vec!["api_boundary_nodes".into()]];
1188 let certificate = self.read_state_raw(paths, canister_id).await?;
1189 let api_boundary_nodes = lookup_api_boundary_nodes(certificate)?;
1190 Ok(api_boundary_nodes)
1191 }
1192
1193 pub async fn fetch_api_boundary_nodes_by_subnet_id(
1195 &self,
1196 subnet_id: Principal,
1197 ) -> Result<Vec<ApiBoundaryNode>, AgentError> {
1198 let paths = vec![vec!["api_boundary_nodes".into()]];
1199 let certificate = self.read_subnet_state_raw(paths, subnet_id).await?;
1200 let api_boundary_nodes = lookup_api_boundary_nodes(certificate)?;
1201 Ok(api_boundary_nodes)
1202 }
1203
1204 async fn fetch_subnet_by_canister(
1205 &self,
1206 canister: &Principal,
1207 ) -> Result<Arc<Subnet>, AgentError> {
1208 let cert = self
1209 .read_state_raw(vec![vec!["subnet".into()]], *canister)
1210 .await?;
1211
1212 let (subnet_id, subnet) = lookup_subnet(&cert, &self.root_key.read().unwrap())?;
1213 let subnet = Arc::new(subnet);
1214 self.subnet_key_cache
1215 .lock()
1216 .unwrap()
1217 .insert_subnet(subnet_id, subnet.clone());
1218 Ok(subnet)
1219 }
1220
1221 async fn request(
1222 &self,
1223 method: Method,
1224 endpoint: &str,
1225 body: Option<Vec<u8>>,
1226 ) -> Result<(StatusCode, HeaderMap, Vec<u8>), AgentError> {
1227 let create_request_with_generated_url = || -> Result<Request, AgentError> {
1228 let url = self.route_provider.route()?.join(endpoint)?;
1229 let mut http_request = Request::new(method.clone(), url);
1230 http_request
1231 .headers_mut()
1232 .insert(CONTENT_TYPE, "application/cbor".parse().unwrap());
1233 *http_request.body_mut() = body.clone().map(Body::from);
1234 Ok(http_request)
1235 };
1236
1237 let response = self
1238 .client
1239 .call(
1240 &create_request_with_generated_url,
1241 self.max_tcp_error_retries,
1242 )
1243 .await?;
1244
1245 let http_status = response.status();
1246 let response_headers = response.headers().clone();
1247
1248 if matches!(self
1250 .max_response_body_size
1251 .zip(response.content_length()), Some((size_limit, content_length)) if content_length > size_limit as u64)
1252 {
1253 return Err(AgentError::ResponseSizeExceededLimit());
1254 }
1255
1256 let mut body: Vec<u8> = response
1257 .content_length()
1258 .map_or_else(Vec::new, |n| Vec::with_capacity(n as usize));
1259
1260 let mut stream = response.bytes_stream();
1261
1262 while let Some(chunk) = stream.next().await {
1263 let chunk = chunk?;
1264
1265 if matches!(self
1267 .max_response_body_size, Some(size_limit) if body.len() + chunk.len() > size_limit)
1268 {
1269 return Err(AgentError::ResponseSizeExceededLimit());
1270 }
1271
1272 body.extend_from_slice(chunk.as_ref());
1273 }
1274
1275 Ok((http_status, response_headers, body))
1276 }
1277
1278 async fn execute(
1279 &self,
1280 method: Method,
1281 endpoint: &str,
1282 body: Option<Vec<u8>>,
1283 ) -> Result<(StatusCode, Vec<u8>), AgentError> {
1284 let request_result = self.request(method.clone(), endpoint, body.clone()).await?;
1285
1286 let status = request_result.0;
1287 let headers = request_result.1;
1288 let body = request_result.2;
1289
1290 if status.is_client_error() || status.is_server_error() {
1291 Err(AgentError::HttpError(HttpErrorPayload {
1292 status: status.into(),
1293 content_type: headers
1294 .get(CONTENT_TYPE)
1295 .and_then(|value| value.to_str().ok())
1296 .map(str::to_string),
1297 content: body,
1298 }))
1299 } else if !(status == StatusCode::OK || status == StatusCode::ACCEPTED) {
1300 Err(AgentError::InvalidHttpResponse(format!(
1301 "Expected `200`, `202`, 4xx`, or `5xx` HTTP status code. Got: {status}",
1302 )))
1303 } else {
1304 Ok((status, body))
1305 }
1306 }
1307}
1308
1309fn principal_is_within_ranges(principal: &Principal, ranges: &[(Principal, Principal)]) -> bool {
1312 ranges
1313 .iter()
1314 .any(|r| principal >= &r.0 && principal <= &r.1)
1315}
1316
1317fn sign_envelope(
1318 content: &EnvelopeContent,
1319 identity: Arc<dyn Identity>,
1320) -> Result<Vec<u8>, AgentError> {
1321 let signature = identity.sign(content).map_err(AgentError::SigningError)?;
1322
1323 let envelope = Envelope {
1324 content: Cow::Borrowed(content),
1325 sender_pubkey: signature.public_key,
1326 sender_sig: signature.signature,
1327 sender_delegation: signature.delegations,
1328 };
1329
1330 let mut serialized_bytes = Vec::new();
1331 let mut serializer = serde_cbor::Serializer::new(&mut serialized_bytes);
1332 serializer.self_describe()?;
1333 envelope.serialize(&mut serializer)?;
1334
1335 Ok(serialized_bytes)
1336}
1337
1338pub fn signed_query_inspect(
1341 sender: Principal,
1342 canister_id: Principal,
1343 method_name: &str,
1344 arg: &[u8],
1345 ingress_expiry: u64,
1346 signed_query: Vec<u8>,
1347) -> Result<(), AgentError> {
1348 let envelope: Envelope =
1349 serde_cbor::from_slice(&signed_query).map_err(AgentError::InvalidCborData)?;
1350 match envelope.content.as_ref() {
1351 EnvelopeContent::Query {
1352 ingress_expiry: ingress_expiry_cbor,
1353 sender: sender_cbor,
1354 canister_id: canister_id_cbor,
1355 method_name: method_name_cbor,
1356 arg: arg_cbor,
1357 nonce: _nonce,
1358 } => {
1359 if ingress_expiry != *ingress_expiry_cbor {
1360 return Err(AgentError::CallDataMismatch {
1361 field: "ingress_expiry".to_string(),
1362 value_arg: ingress_expiry.to_string(),
1363 value_cbor: ingress_expiry_cbor.to_string(),
1364 });
1365 }
1366 if sender != *sender_cbor {
1367 return Err(AgentError::CallDataMismatch {
1368 field: "sender".to_string(),
1369 value_arg: sender.to_string(),
1370 value_cbor: sender_cbor.to_string(),
1371 });
1372 }
1373 if canister_id != *canister_id_cbor {
1374 return Err(AgentError::CallDataMismatch {
1375 field: "canister_id".to_string(),
1376 value_arg: canister_id.to_string(),
1377 value_cbor: canister_id_cbor.to_string(),
1378 });
1379 }
1380 if method_name != *method_name_cbor {
1381 return Err(AgentError::CallDataMismatch {
1382 field: "method_name".to_string(),
1383 value_arg: method_name.to_string(),
1384 value_cbor: method_name_cbor.clone(),
1385 });
1386 }
1387 if arg != *arg_cbor {
1388 return Err(AgentError::CallDataMismatch {
1389 field: "arg".to_string(),
1390 value_arg: format!("{arg:?}"),
1391 value_cbor: format!("{arg_cbor:?}"),
1392 });
1393 }
1394 }
1395 EnvelopeContent::Call { .. } => {
1396 return Err(AgentError::CallDataMismatch {
1397 field: "request_type".to_string(),
1398 value_arg: "query".to_string(),
1399 value_cbor: "call".to_string(),
1400 })
1401 }
1402 EnvelopeContent::ReadState { .. } => {
1403 return Err(AgentError::CallDataMismatch {
1404 field: "request_type".to_string(),
1405 value_arg: "query".to_string(),
1406 value_cbor: "read_state".to_string(),
1407 })
1408 }
1409 }
1410 Ok(())
1411}
1412
1413pub fn signed_update_inspect(
1416 sender: Principal,
1417 canister_id: Principal,
1418 method_name: &str,
1419 arg: &[u8],
1420 ingress_expiry: u64,
1421 signed_update: Vec<u8>,
1422) -> Result<(), AgentError> {
1423 let envelope: Envelope =
1424 serde_cbor::from_slice(&signed_update).map_err(AgentError::InvalidCborData)?;
1425 match envelope.content.as_ref() {
1426 EnvelopeContent::Call {
1427 nonce: _nonce,
1428 ingress_expiry: ingress_expiry_cbor,
1429 sender: sender_cbor,
1430 canister_id: canister_id_cbor,
1431 method_name: method_name_cbor,
1432 arg: arg_cbor,
1433 } => {
1434 if ingress_expiry != *ingress_expiry_cbor {
1435 return Err(AgentError::CallDataMismatch {
1436 field: "ingress_expiry".to_string(),
1437 value_arg: ingress_expiry.to_string(),
1438 value_cbor: ingress_expiry_cbor.to_string(),
1439 });
1440 }
1441 if sender != *sender_cbor {
1442 return Err(AgentError::CallDataMismatch {
1443 field: "sender".to_string(),
1444 value_arg: sender.to_string(),
1445 value_cbor: sender_cbor.to_string(),
1446 });
1447 }
1448 if canister_id != *canister_id_cbor {
1449 return Err(AgentError::CallDataMismatch {
1450 field: "canister_id".to_string(),
1451 value_arg: canister_id.to_string(),
1452 value_cbor: canister_id_cbor.to_string(),
1453 });
1454 }
1455 if method_name != *method_name_cbor {
1456 return Err(AgentError::CallDataMismatch {
1457 field: "method_name".to_string(),
1458 value_arg: method_name.to_string(),
1459 value_cbor: method_name_cbor.clone(),
1460 });
1461 }
1462 if arg != *arg_cbor {
1463 return Err(AgentError::CallDataMismatch {
1464 field: "arg".to_string(),
1465 value_arg: format!("{arg:?}"),
1466 value_cbor: format!("{arg_cbor:?}"),
1467 });
1468 }
1469 }
1470 EnvelopeContent::ReadState { .. } => {
1471 return Err(AgentError::CallDataMismatch {
1472 field: "request_type".to_string(),
1473 value_arg: "call".to_string(),
1474 value_cbor: "read_state".to_string(),
1475 })
1476 }
1477 EnvelopeContent::Query { .. } => {
1478 return Err(AgentError::CallDataMismatch {
1479 field: "request_type".to_string(),
1480 value_arg: "call".to_string(),
1481 value_cbor: "query".to_string(),
1482 })
1483 }
1484 }
1485 Ok(())
1486}
1487
1488pub fn signed_request_status_inspect(
1491 sender: Principal,
1492 request_id: &RequestId,
1493 ingress_expiry: u64,
1494 signed_request_status: Vec<u8>,
1495) -> Result<(), AgentError> {
1496 let paths: Vec<Vec<Label>> = vec![vec!["request_status".into(), request_id.to_vec().into()]];
1497 let envelope: Envelope =
1498 serde_cbor::from_slice(&signed_request_status).map_err(AgentError::InvalidCborData)?;
1499 match envelope.content.as_ref() {
1500 EnvelopeContent::ReadState {
1501 ingress_expiry: ingress_expiry_cbor,
1502 sender: sender_cbor,
1503 paths: paths_cbor,
1504 } => {
1505 if ingress_expiry != *ingress_expiry_cbor {
1506 return Err(AgentError::CallDataMismatch {
1507 field: "ingress_expiry".to_string(),
1508 value_arg: ingress_expiry.to_string(),
1509 value_cbor: ingress_expiry_cbor.to_string(),
1510 });
1511 }
1512 if sender != *sender_cbor {
1513 return Err(AgentError::CallDataMismatch {
1514 field: "sender".to_string(),
1515 value_arg: sender.to_string(),
1516 value_cbor: sender_cbor.to_string(),
1517 });
1518 }
1519
1520 if paths != *paths_cbor {
1521 return Err(AgentError::CallDataMismatch {
1522 field: "paths".to_string(),
1523 value_arg: format!("{paths:?}"),
1524 value_cbor: format!("{paths_cbor:?}"),
1525 });
1526 }
1527 }
1528 EnvelopeContent::Query { .. } => {
1529 return Err(AgentError::CallDataMismatch {
1530 field: "request_type".to_string(),
1531 value_arg: "read_state".to_string(),
1532 value_cbor: "query".to_string(),
1533 })
1534 }
1535 EnvelopeContent::Call { .. } => {
1536 return Err(AgentError::CallDataMismatch {
1537 field: "request_type".to_string(),
1538 value_arg: "read_state".to_string(),
1539 value_cbor: "call".to_string(),
1540 })
1541 }
1542 }
1543 Ok(())
1544}
1545
1546#[derive(Clone)]
1547struct SubnetCache {
1548 subnets: TimedCache<Principal, Arc<Subnet>>,
1549 canister_index: RangeInclusiveMap<Principal, Principal, PrincipalStep>,
1550}
1551
1552impl SubnetCache {
1553 fn new() -> Self {
1554 Self {
1555 subnets: TimedCache::with_lifespan(300),
1556 canister_index: RangeInclusiveMap::new_with_step_fns(),
1557 }
1558 }
1559
1560 fn get_subnet_by_canister(&mut self, canister: &Principal) -> Option<Arc<Subnet>> {
1561 self.canister_index
1562 .get(canister)
1563 .and_then(|subnet_id| self.subnets.cache_get(subnet_id).cloned())
1564 .filter(|subnet| subnet.canister_ranges.contains(canister))
1565 }
1566
1567 fn insert_subnet(&mut self, subnet_id: Principal, subnet: Arc<Subnet>) {
1568 self.subnets.cache_set(subnet_id, subnet.clone());
1569 for range in subnet.canister_ranges.iter() {
1570 self.canister_index.insert(range.clone(), subnet_id);
1571 }
1572 }
1573}
1574
1575#[derive(Clone, Copy)]
1576struct PrincipalStep;
1577
1578impl StepFns<Principal> for PrincipalStep {
1579 fn add_one(start: &Principal) -> Principal {
1580 let bytes = start.as_slice();
1581 let mut arr = [0; 29];
1582 arr[..bytes.len()].copy_from_slice(bytes);
1583 for byte in arr[..bytes.len() - 1].iter_mut().rev() {
1584 *byte = byte.wrapping_add(1);
1585 if *byte != 0 {
1586 break;
1587 }
1588 }
1589 Principal::from_slice(&arr[..bytes.len()])
1590 }
1591 fn sub_one(start: &Principal) -> Principal {
1592 let bytes = start.as_slice();
1593 let mut arr = [0; 29];
1594 arr[..bytes.len()].copy_from_slice(bytes);
1595 for byte in arr[..bytes.len() - 1].iter_mut().rev() {
1596 *byte = byte.wrapping_sub(1);
1597 if *byte != 255 {
1598 break;
1599 }
1600 }
1601 Principal::from_slice(&arr[..bytes.len()])
1602 }
1603}
1604
1605#[derive(Clone)]
1606pub(crate) struct Subnet {
1607 _key: Vec<u8>,
1610 node_keys: HashMap<Principal, Vec<u8>>,
1611 canister_ranges: RangeInclusiveSet<Principal, PrincipalStep>,
1612}
1613
1614#[derive(Debug, Clone)]
1616pub struct ApiBoundaryNode {
1617 pub domain: String,
1619 pub ipv6_address: String,
1621 pub ipv4_address: Option<String>,
1623}
1624
1625#[derive(Debug, Clone)]
1629#[non_exhaustive]
1630pub struct QueryBuilder<'agent> {
1631 agent: &'agent Agent,
1632 pub effective_canister_id: Principal,
1634 pub canister_id: Principal,
1636 pub method_name: String,
1638 pub arg: Vec<u8>,
1640 pub ingress_expiry_datetime: Option<u64>,
1642 pub use_nonce: bool,
1644}
1645
1646impl<'agent> QueryBuilder<'agent> {
1647 pub fn new(agent: &'agent Agent, canister_id: Principal, method_name: String) -> Self {
1649 Self {
1650 agent,
1651 effective_canister_id: canister_id,
1652 canister_id,
1653 method_name,
1654 arg: vec![],
1655 ingress_expiry_datetime: None,
1656 use_nonce: false,
1657 }
1658 }
1659
1660 pub fn with_effective_canister_id(mut self, canister_id: Principal) -> Self {
1662 self.effective_canister_id = canister_id;
1663 self
1664 }
1665
1666 pub fn with_arg<A: Into<Vec<u8>>>(mut self, arg: A) -> Self {
1668 self.arg = arg.into();
1669 self
1670 }
1671
1672 pub fn expire_at(mut self, time: impl Into<OffsetDateTime>) -> Self {
1674 self.ingress_expiry_datetime = Some(time.into().unix_timestamp_nanos() as u64);
1675 self
1676 }
1677
1678 pub fn expire_after(mut self, duration: Duration) -> Self {
1680 self.ingress_expiry_datetime = Some(
1681 OffsetDateTime::now_utc()
1682 .saturating_add(duration.try_into().expect("negative duration"))
1683 .unix_timestamp_nanos() as u64,
1684 );
1685 self
1686 }
1687
1688 pub fn with_nonce_generation(mut self) -> Self {
1691 self.use_nonce = true;
1692 self
1693 }
1694
1695 pub async fn call(self) -> Result<Vec<u8>, AgentError> {
1697 self.agent
1698 .query_raw(
1699 self.canister_id,
1700 self.effective_canister_id,
1701 self.method_name,
1702 self.arg,
1703 self.ingress_expiry_datetime,
1704 self.use_nonce,
1705 None,
1706 )
1707 .await
1708 }
1709
1710 pub async fn call_with_verification(self) -> Result<Vec<u8>, AgentError> {
1715 self.agent
1716 .query_raw(
1717 self.canister_id,
1718 self.effective_canister_id,
1719 self.method_name,
1720 self.arg,
1721 self.ingress_expiry_datetime,
1722 self.use_nonce,
1723 Some(true),
1724 )
1725 .await
1726 }
1727
1728 pub async fn call_without_verification(self) -> Result<Vec<u8>, AgentError> {
1733 self.agent
1734 .query_raw(
1735 self.canister_id,
1736 self.effective_canister_id,
1737 self.method_name,
1738 self.arg,
1739 self.ingress_expiry_datetime,
1740 self.use_nonce,
1741 Some(false),
1742 )
1743 .await
1744 }
1745
1746 pub fn sign(self) -> Result<SignedQuery, AgentError> {
1749 let effective_canister_id = self.effective_canister_id;
1750 let identity = self.agent.identity.clone();
1751 let content = self.into_envelope()?;
1752 let signed_query = sign_envelope(&content, identity)?;
1753 let EnvelopeContent::Query {
1754 ingress_expiry,
1755 sender,
1756 canister_id,
1757 method_name,
1758 arg,
1759 nonce,
1760 } = content
1761 else {
1762 unreachable!()
1763 };
1764 Ok(SignedQuery {
1765 ingress_expiry,
1766 sender,
1767 canister_id,
1768 method_name,
1769 arg,
1770 effective_canister_id,
1771 signed_query,
1772 nonce,
1773 })
1774 }
1775
1776 pub fn into_envelope(self) -> Result<EnvelopeContent, AgentError> {
1778 self.agent.query_content(
1779 self.canister_id,
1780 self.method_name,
1781 self.arg,
1782 self.ingress_expiry_datetime,
1783 self.use_nonce,
1784 )
1785 }
1786}
1787
1788impl<'agent> IntoFuture for QueryBuilder<'agent> {
1789 type IntoFuture = AgentFuture<'agent, Vec<u8>>;
1790 type Output = Result<Vec<u8>, AgentError>;
1791 fn into_future(self) -> Self::IntoFuture {
1792 Box::pin(self.call())
1793 }
1794}
1795
1796pub struct UpdateCall<'agent> {
1798 agent: &'agent Agent,
1799 response_future: AgentFuture<'agent, CallResponse<(Vec<u8>, Certificate)>>,
1800 effective_canister_id: Principal,
1801 canister_id: Principal,
1802 method_name: String,
1803}
1804
1805impl fmt::Debug for UpdateCall<'_> {
1806 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1807 f.debug_struct("UpdateCall")
1808 .field("agent", &self.agent)
1809 .field("effective_canister_id", &self.effective_canister_id)
1810 .finish_non_exhaustive()
1811 }
1812}
1813
1814impl Future for UpdateCall<'_> {
1815 type Output = Result<CallResponse<(Vec<u8>, Certificate)>, AgentError>;
1816 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1817 self.response_future.as_mut().poll(cx)
1818 }
1819}
1820
1821impl<'a> UpdateCall<'a> {
1822 pub async fn and_wait(self) -> Result<(Vec<u8>, Certificate), AgentError> {
1824 let response = self.response_future.await?;
1825
1826 match response {
1827 CallResponse::Response(response) => Ok(response),
1828 CallResponse::Poll(request_id) => {
1829 self.agent
1830 .wait_inner(
1831 &request_id,
1832 self.effective_canister_id,
1833 Some(Operation::Call {
1834 canister: self.canister_id,
1835 method: self.method_name,
1836 }),
1837 )
1838 .await
1839 }
1840 }
1841 }
1842}
1843#[derive(Debug)]
1848pub struct UpdateBuilder<'agent> {
1849 agent: &'agent Agent,
1850 pub effective_canister_id: Principal,
1852 pub canister_id: Principal,
1854 pub method_name: String,
1856 pub arg: Vec<u8>,
1858 pub ingress_expiry_datetime: Option<u64>,
1860}
1861
1862impl<'agent> UpdateBuilder<'agent> {
1863 pub fn new(agent: &'agent Agent, canister_id: Principal, method_name: String) -> Self {
1865 Self {
1866 agent,
1867 effective_canister_id: canister_id,
1868 canister_id,
1869 method_name,
1870 arg: vec![],
1871 ingress_expiry_datetime: None,
1872 }
1873 }
1874
1875 pub fn with_effective_canister_id(mut self, canister_id: Principal) -> Self {
1877 self.effective_canister_id = canister_id;
1878 self
1879 }
1880
1881 pub fn with_arg<A: Into<Vec<u8>>>(mut self, arg: A) -> Self {
1883 self.arg = arg.into();
1884 self
1885 }
1886
1887 pub fn expire_at(mut self, time: impl Into<OffsetDateTime>) -> Self {
1889 self.ingress_expiry_datetime = Some(time.into().unix_timestamp_nanos() as u64);
1890 self
1891 }
1892
1893 pub fn expire_after(mut self, duration: Duration) -> Self {
1895 self.ingress_expiry_datetime = Some(
1896 OffsetDateTime::now_utc()
1897 .saturating_add(duration.try_into().expect("negative duration"))
1898 .unix_timestamp_nanos() as u64,
1899 );
1900 self
1901 }
1902
1903 pub async fn call_and_wait(self) -> Result<Vec<u8>, AgentError> {
1906 self.call().and_wait().await.map(|x| x.0)
1907 }
1908
1909 pub fn call(self) -> UpdateCall<'agent> {
1912 let method_name = self.method_name.clone();
1913 let response_future = async move {
1914 self.agent
1915 .update_raw(
1916 self.canister_id,
1917 self.effective_canister_id,
1918 self.method_name,
1919 self.arg,
1920 self.ingress_expiry_datetime,
1921 )
1922 .await
1923 };
1924 UpdateCall {
1925 agent: self.agent,
1926 response_future: Box::pin(response_future),
1927 effective_canister_id: self.effective_canister_id,
1928 canister_id: self.canister_id,
1929 method_name,
1930 }
1931 }
1932
1933 pub fn sign(self) -> Result<SignedUpdate, AgentError> {
1936 let identity = self.agent.identity.clone();
1937 let effective_canister_id = self.effective_canister_id;
1938 let content = self.into_envelope()?;
1939 let signed_update = sign_envelope(&content, identity)?;
1940 let request_id = to_request_id(&content)?;
1941 let EnvelopeContent::Call {
1942 nonce,
1943 ingress_expiry,
1944 sender,
1945 canister_id,
1946 method_name,
1947 arg,
1948 } = content
1949 else {
1950 unreachable!()
1951 };
1952 Ok(SignedUpdate {
1953 nonce,
1954 ingress_expiry,
1955 sender,
1956 canister_id,
1957 method_name,
1958 arg,
1959 effective_canister_id,
1960 signed_update,
1961 request_id,
1962 })
1963 }
1964
1965 pub fn into_envelope(self) -> Result<EnvelopeContent, AgentError> {
1967 let nonce = self.agent.nonce_factory.generate();
1968 self.agent.update_content(
1969 self.canister_id,
1970 self.method_name,
1971 self.arg,
1972 self.ingress_expiry_datetime,
1973 nonce,
1974 )
1975 }
1976}
1977
1978impl<'agent> IntoFuture for UpdateBuilder<'agent> {
1979 type IntoFuture = AgentFuture<'agent, Vec<u8>>;
1980 type Output = Result<Vec<u8>, AgentError>;
1981 fn into_future(self) -> Self::IntoFuture {
1982 Box::pin(self.call_and_wait())
1983 }
1984}
1985
1986#[cfg_attr(target_family = "wasm", async_trait(?Send))]
1988#[cfg_attr(not(target_family = "wasm"), async_trait)]
1989pub trait HttpService: Send + Sync + Debug {
1990 async fn call<'a>(
1992 &'a self,
1993 req: &'a (dyn Fn() -> Result<Request, AgentError> + Send + Sync),
1994 max_retries: usize,
1995 ) -> Result<Response, AgentError>;
1996}
1997#[cfg(not(target_family = "wasm"))]
1998#[async_trait]
1999impl<T> HttpService for T
2000where
2001 for<'a> &'a T: Service<Request, Response = Response, Error = reqwest::Error>,
2002 for<'a> <&'a Self as Service<Request>>::Future: Send,
2003 T: Send + Sync + Debug + ?Sized,
2004{
2005 #[allow(clippy::needless_arbitrary_self_type)]
2006 async fn call<'a>(
2007 mut self: &'a Self,
2008 req: &'a (dyn Fn() -> Result<Request, AgentError> + Send + Sync),
2009 max_retries: usize,
2010 ) -> Result<Response, AgentError> {
2011 let mut retry_count = 0;
2012 loop {
2013 match Service::call(&mut self, req()?).await {
2014 Err(err) => {
2015 if err.is_connect() {
2017 if retry_count >= max_retries {
2018 return Err(AgentError::TransportError(err));
2019 }
2020 retry_count += 1;
2021 }
2022 }
2023 Ok(resp) => return Ok(resp),
2024 }
2025 }
2026 }
2027}
2028
2029#[cfg(target_family = "wasm")]
2030#[async_trait(?Send)]
2031impl<T> HttpService for T
2032where
2033 for<'a> &'a T: Service<Request, Response = Response, Error = reqwest::Error>,
2034 T: Send + Sync + Debug + ?Sized,
2035{
2036 #[allow(clippy::needless_arbitrary_self_type)]
2037 async fn call<'a>(
2038 mut self: &'a Self,
2039 req: &'a (dyn Fn() -> Result<Request, AgentError> + Send + Sync),
2040 _: usize,
2041 ) -> Result<Response, AgentError> {
2042 Ok(Service::call(&mut self, req()?).await?)
2043 }
2044}
2045
2046#[derive(Debug)]
2047struct Retry429Logic {
2048 client: Client,
2049}
2050
2051#[cfg_attr(target_family = "wasm", async_trait(?Send))]
2052#[cfg_attr(not(target_family = "wasm"), async_trait)]
2053impl HttpService for Retry429Logic {
2054 async fn call<'a>(
2055 &'a self,
2056 req: &'a (dyn Fn() -> Result<Request, AgentError> + Send + Sync),
2057 _max_tcp_retries: usize,
2058 ) -> Result<Response, AgentError> {
2059 let mut retries = 0;
2060 loop {
2061 #[cfg(not(target_family = "wasm"))]
2062 let resp = self.client.call(req, _max_tcp_retries).await?;
2063 #[cfg(target_family = "wasm")]
2065 let resp = self.client.execute(req()?).await?;
2066 if resp.status() == StatusCode::TOO_MANY_REQUESTS {
2067 if retries == 6 {
2068 break Ok(resp);
2069 } else {
2070 retries += 1;
2071 crate::util::sleep(Duration::from_millis(250)).await;
2072 continue;
2073 }
2074 } else {
2075 break Ok(resp);
2076 }
2077 }
2078 }
2079}
2080
2081#[cfg(all(test, not(target_family = "wasm")))]
2082mod offline_tests {
2083 use super::*;
2084 use tokio::net::TcpListener;
2085 #[test]
2088 fn rounded_expiry() {
2089 let agent = Agent::builder()
2090 .with_url("http://not-a-real-url")
2091 .build()
2092 .unwrap();
2093 let mut prev_expiry = None;
2094 let mut num_timestamps = 0;
2095 for _ in 0..6 {
2096 let update = agent
2097 .update(&Principal::management_canister(), "not_a_method")
2098 .sign()
2099 .unwrap();
2100 if prev_expiry < Some(update.ingress_expiry) {
2101 prev_expiry = Some(update.ingress_expiry);
2102 num_timestamps += 1;
2103 }
2104 }
2105 assert!(num_timestamps <= 2, "num_timestamps:{num_timestamps} > 2");
2107 }
2108
2109 #[tokio::test]
2110 async fn client_ratelimit() {
2111 let mock_server = TcpListener::bind("127.0.0.1:0").await.unwrap();
2112 let count = Arc::new(Mutex::new(0));
2113 let port = mock_server.local_addr().unwrap().port();
2114 tokio::spawn({
2115 let count = count.clone();
2116 async move {
2117 loop {
2118 let (mut conn, _) = mock_server.accept().await.unwrap();
2119 *count.lock().unwrap() += 1;
2120 tokio::spawn(
2121 async move { tokio::io::copy(&mut conn, &mut tokio::io::sink()).await },
2123 );
2124 }
2125 }
2126 });
2127 let agent = Agent::builder()
2128 .with_http_client(Client::builder().http1_only().build().unwrap())
2129 .with_url(format!("http://127.0.0.1:{port}"))
2130 .with_max_concurrent_requests(2)
2131 .build()
2132 .unwrap();
2133 for _ in 0..3 {
2134 let agent = agent.clone();
2135 tokio::spawn(async move {
2136 agent
2137 .query(&"ryjl3-tyaaa-aaaaa-aaaba-cai".parse().unwrap(), "greet")
2138 .call()
2139 .await
2140 });
2141 }
2142 crate::util::sleep(Duration::from_millis(250)).await;
2143 assert_eq!(*count.lock().unwrap(), 2);
2144 }
2145}