1pub(crate) mod agent_config;
3pub mod agent_error;
4pub(crate) mod builder;
5#[doc(hidden)]
7#[deprecated(since = "0.38.0", note = "use the AgentBuilder methods")]
8pub mod http_transport;
9pub(crate) mod nonce;
10pub(crate) mod response_authentication;
11pub mod route_provider;
12pub mod status;
13pub mod subnet;
14
15pub use agent_config::AgentConfig;
16pub use agent_error::AgentError;
17use agent_error::{HttpErrorPayload, Operation};
18use async_lock::Semaphore;
19use async_trait::async_trait;
20pub use builder::AgentBuilder;
21use bytes::Bytes;
22use cached::{Cached, TimedCache};
23use http::{header::CONTENT_TYPE, HeaderMap, Method, StatusCode, Uri};
24use ic_ed25519::{PublicKey, SignatureError};
25#[doc(inline)]
26pub use ic_transport_types::{
27 signed, CallResponse, Envelope, EnvelopeContent, RejectCode, RejectResponse, ReplyResponse,
28 RequestStatusResponse,
29};
30pub use nonce::{NonceFactory, NonceGenerator};
31use rangemap::RangeInclusiveMap;
32use reqwest::{Client, Request, Response};
33use route_provider::{
34 dynamic_routing::{dynamic_route_provider::DynamicRouteProviderBuilder, node::Node},
35 RouteProvider, UrlUntilReady,
36};
37pub use subnet::{Subnet, SubnetType};
38use time::OffsetDateTime;
39use tower_service::Service;
40
41#[cfg(test)]
42mod agent_test;
43
44use crate::{
45 agent::response_authentication::{
46 extract_der, lookup_canister_info, lookup_canister_metadata, lookup_canister_ranges,
47 lookup_incomplete_subnet, lookup_request_status, lookup_subnet_and_ranges,
48 lookup_subnet_canister_ranges, lookup_subnet_metrics, lookup_time, lookup_tree,
49 lookup_value,
50 },
51 agent_error::TransportError,
52 export::Principal,
53 identity::Identity,
54 to_request_id, RequestId,
55};
56use backoff::{backoff::Backoff, ExponentialBackoffBuilder};
57use backoff::{exponential::ExponentialBackoff, SystemClock};
58use ic_certification::{Certificate, Delegation, Label};
59use ic_transport_types::{
60 signed::{SignedQuery, SignedRequestStatus, SignedUpdate},
61 QueryResponse, ReadStateResponse, SubnetMetrics, TransportCallResponse,
62};
63use serde::Serialize;
64use status::Status;
65use std::{
66 borrow::Cow,
67 convert::TryFrom,
68 fmt::{self, Debug},
69 future::{Future, IntoFuture},
70 pin::Pin,
71 str::FromStr,
72 sync::{Arc, Mutex, RwLock},
73 task::{Context, Poll},
74 time::Duration,
75};
76
77use crate::agent::response_authentication::lookup_api_boundary_nodes;
78
79const IC_STATE_ROOT_DOMAIN_SEPARATOR: &[u8; 14] = b"\x0Dic-state-root";
80
81const IC_ROOT_KEY: &[u8; 133] = b"\x30\x81\x82\x30\x1d\x06\x0d\x2b\x06\x01\x04\x01\x82\xdc\x7c\x05\x03\x01\x02\x01\x06\x0c\x2b\x06\x01\x04\x01\x82\xdc\x7c\x05\x03\x02\x01\x03\x61\x00\x81\x4c\x0e\x6e\xc7\x1f\xab\x58\x3b\x08\xbd\x81\x37\x3c\x25\x5c\x3c\x37\x1b\x2e\x84\x86\x3c\x98\xa4\xf1\xe0\x8b\x74\x23\x5d\x14\xfb\x5d\x9c\x0c\xd5\x46\xd9\x68\x5f\x91\x3a\x0c\x0b\x2c\xc5\x34\x15\x83\xbf\x4b\x43\x92\xe4\x67\xdb\x96\xd6\x5b\x9b\xb4\xcb\x71\x71\x12\xf8\x47\x2e\x0d\x5a\x4d\x14\x50\x5f\xfd\x74\x84\xb0\x12\x91\x09\x1c\x5f\x87\xb9\x88\x83\x46\x3f\x98\x09\x1a\x0b\xaa\xae";
82
83#[cfg(not(target_family = "wasm"))]
84type AgentFuture<'a, V> = Pin<Box<dyn Future<Output = Result<V, AgentError>> + Send + 'a>>;
85
86#[cfg(target_family = "wasm")]
87type AgentFuture<'a, V> = Pin<Box<dyn Future<Output = Result<V, AgentError>> + 'a>>;
88
89#[cfg_attr(unix, doc = " ```rust")] #[cfg_attr(not(unix), doc = " ```ignore")]
93#[derive(Clone)]
154pub struct Agent {
155 nonce_factory: Arc<dyn NonceGenerator>,
156 identity: Arc<dyn Identity>,
157 ingress_expiry: Duration,
158 root_key: Arc<RwLock<Vec<u8>>>,
159 client: Arc<dyn HttpService>,
160 route_provider: Arc<dyn RouteProvider>,
161 subnet_key_cache: Arc<Mutex<SubnetCache>>,
162 concurrent_requests_semaphore: Arc<Semaphore>,
163 verify_query_signatures: bool,
164 max_response_body_size: Option<usize>,
165 max_polling_time: Duration,
166 #[allow(dead_code)]
167 max_tcp_error_retries: usize,
168}
169
170impl fmt::Debug for Agent {
171 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
172 f.debug_struct("Agent")
173 .field("ingress_expiry", &self.ingress_expiry)
174 .finish_non_exhaustive()
175 }
176}
177
178impl Agent {
179 pub fn builder() -> builder::AgentBuilder {
182 Default::default()
183 }
184
185 pub fn new(config: agent_config::AgentConfig) -> Result<Agent, AgentError> {
187 let client = config.http_service.unwrap_or_else(|| {
188 Arc::new(Retry429Logic {
189 client: config.client.unwrap_or_else(|| {
190 #[cfg(not(target_family = "wasm"))]
191 {
192 Client::builder()
193 .use_rustls_tls()
194 .timeout(Duration::from_secs(360))
195 .build()
196 .expect("Could not create HTTP client.")
197 }
198 #[cfg(all(target_family = "wasm", feature = "wasm-bindgen"))]
199 {
200 Client::new()
201 }
202 }),
203 })
204 });
205 Ok(Agent {
206 nonce_factory: config.nonce_factory,
207 identity: config.identity,
208 ingress_expiry: config.ingress_expiry,
209 root_key: Arc::new(RwLock::new(IC_ROOT_KEY.to_vec())),
210 client: client.clone(),
211 route_provider: if let Some(route_provider) = config.route_provider {
212 route_provider
213 } else if let Some(url) = config.url {
214 if config.background_dynamic_routing {
215 assert!(
216 url.scheme() == "https" && url.path() == "/" && url.port().is_none() && url.domain().is_some(),
217 "in dynamic routing mode, URL must be in the exact form https://domain with no path, port, IP, or non-HTTPS scheme"
218 );
219 let seeds = vec![Node::new(url.domain().unwrap()).unwrap()];
220 UrlUntilReady::new(url, async move {
221 let provider =
222 DynamicRouteProviderBuilder::new(seeds, client, None).build();
223 provider.start().await;
224 provider
225 }) as Arc<dyn RouteProvider>
226 } else {
227 Arc::new(url)
228 }
229 } else {
230 panic!("either route_provider or url must be specified");
231 },
232 subnet_key_cache: Arc::new(Mutex::new(SubnetCache::new())),
233 verify_query_signatures: config.verify_query_signatures,
234 concurrent_requests_semaphore: Arc::new(Semaphore::new(config.max_concurrent_requests)),
235 max_response_body_size: config.max_response_body_size,
236 max_tcp_error_retries: config.max_tcp_error_retries,
237 max_polling_time: config.max_polling_time,
238 })
239 }
240
241 pub fn set_identity<I>(&mut self, identity: I)
247 where
248 I: 'static + Identity,
249 {
250 self.identity = Arc::new(identity);
251 }
252 pub fn set_arc_identity(&mut self, identity: Arc<dyn Identity>) {
258 self.identity = identity;
259 }
260
261 pub async fn fetch_root_key(&self) -> Result<(), AgentError> {
270 if self.read_root_key()[..] != IC_ROOT_KEY[..] {
271 return Ok(());
273 }
274 let status = self.status().await?;
275 let Some(root_key) = status.root_key else {
276 return Err(AgentError::NoRootKeyInStatus(status));
277 };
278 self.set_root_key(root_key);
279 Ok(())
280 }
281
282 pub fn set_root_key(&self, root_key: Vec<u8>) {
287 *self.root_key.write().unwrap() = root_key;
288 }
289
290 pub fn read_root_key(&self) -> Vec<u8> {
292 self.root_key.read().unwrap().clone()
293 }
294
295 fn get_expiry_date(&self) -> u64 {
296 let expiry_raw = OffsetDateTime::now_utc() + self.ingress_expiry;
297 let mut rounded = expiry_raw.replace_nanosecond(0).unwrap();
298 if self.ingress_expiry.as_secs() > 90 {
299 rounded = rounded.replace_second(0).unwrap();
300 }
301 rounded.unix_timestamp_nanos().try_into().unwrap()
302 }
303
304 pub fn get_principal(&self) -> Result<Principal, String> {
306 self.identity.sender()
307 }
308
309 async fn query_endpoint<A>(
310 &self,
311 effective_canister_id: Principal,
312 serialized_bytes: Vec<u8>,
313 ) -> Result<A, AgentError>
314 where
315 A: serde::de::DeserializeOwned,
316 {
317 let _permit = self.concurrent_requests_semaphore.acquire().await;
318 let bytes = self
319 .execute(
320 Method::POST,
321 &format!("api/v3/canister/{}/query", effective_canister_id.to_text()),
322 Some(serialized_bytes),
323 )
324 .await?
325 .1;
326 serde_cbor::from_slice(&bytes).map_err(AgentError::InvalidCborData)
327 }
328
329 async fn read_state_endpoint<A>(
330 &self,
331 effective_canister_id: Principal,
332 serialized_bytes: Vec<u8>,
333 ) -> Result<A, AgentError>
334 where
335 A: serde::de::DeserializeOwned,
336 {
337 let _permit = self.concurrent_requests_semaphore.acquire().await;
338 let endpoint = format!(
339 "api/v3/canister/{}/read_state",
340 effective_canister_id.to_text()
341 );
342 let bytes = self
343 .execute(Method::POST, &endpoint, Some(serialized_bytes))
344 .await?
345 .1;
346 serde_cbor::from_slice(&bytes).map_err(AgentError::InvalidCborData)
347 }
348
349 async fn read_subnet_state_endpoint<A>(
350 &self,
351 subnet_id: Principal,
352 serialized_bytes: Vec<u8>,
353 ) -> Result<A, AgentError>
354 where
355 A: serde::de::DeserializeOwned,
356 {
357 let _permit = self.concurrent_requests_semaphore.acquire().await;
358 let endpoint = format!("api/v3/subnet/{}/read_state", subnet_id.to_text());
359 let bytes = self
360 .execute(Method::POST, &endpoint, Some(serialized_bytes))
361 .await?
362 .1;
363 serde_cbor::from_slice(&bytes).map_err(AgentError::InvalidCborData)
364 }
365
366 async fn call_endpoint(
367 &self,
368 effective_canister_id: Principal,
369 serialized_bytes: Vec<u8>,
370 ) -> Result<TransportCallResponse, AgentError> {
371 let _permit = self.concurrent_requests_semaphore.acquire().await;
372 let endpoint = format!("api/v4/canister/{}/call", effective_canister_id.to_text());
373 let (status_code, response_body) = self
374 .execute(Method::POST, &endpoint, Some(serialized_bytes))
375 .await?;
376
377 if status_code == StatusCode::ACCEPTED {
378 return Ok(TransportCallResponse::Accepted);
379 }
380
381 serde_cbor::from_slice(&response_body).map_err(AgentError::InvalidCborData)
382 }
383
384 #[allow(clippy::too_many_arguments)]
387 async fn query_raw(
388 &self,
389 canister_id: Principal,
390 effective_canister_id: Principal,
391 method_name: String,
392 arg: Vec<u8>,
393 ingress_expiry_datetime: Option<u64>,
394 use_nonce: bool,
395 explicit_verify_query_signatures: Option<bool>,
396 ) -> Result<Vec<u8>, AgentError> {
397 let operation = Operation::Call {
398 canister: canister_id,
399 method: method_name.clone(),
400 };
401 let content = self.query_content(
402 canister_id,
403 method_name,
404 arg,
405 ingress_expiry_datetime,
406 use_nonce,
407 )?;
408 let serialized_bytes = sign_envelope(&content, self.identity.clone())?;
409 self.query_inner(
410 effective_canister_id,
411 serialized_bytes,
412 content.to_request_id(),
413 explicit_verify_query_signatures,
414 operation,
415 )
416 .await
417 }
418
419 pub async fn query_signed(
423 &self,
424 effective_canister_id: Principal,
425 signed_query: Vec<u8>,
426 ) -> Result<Vec<u8>, AgentError> {
427 let envelope: Envelope =
428 serde_cbor::from_slice(&signed_query).map_err(AgentError::InvalidCborData)?;
429 let EnvelopeContent::Query {
430 canister_id,
431 method_name,
432 ..
433 } = &*envelope.content
434 else {
435 return Err(AgentError::CallDataMismatch {
436 field: "request_type".to_string(),
437 value_arg: "query".to_string(),
438 value_cbor: if matches!(*envelope.content, EnvelopeContent::Call { .. }) {
439 "update"
440 } else {
441 "read_state"
442 }
443 .to_string(),
444 });
445 };
446 let operation = Operation::Call {
447 canister: *canister_id,
448 method: method_name.clone(),
449 };
450 self.query_inner(
451 effective_canister_id,
452 signed_query,
453 envelope.content.to_request_id(),
454 None,
455 operation,
456 )
457 .await
458 }
459
460 async fn query_inner(
464 &self,
465 effective_canister_id: Principal,
466 signed_query: Vec<u8>,
467 request_id: RequestId,
468 explicit_verify_query_signatures: Option<bool>,
469 operation: Operation,
470 ) -> Result<Vec<u8>, AgentError> {
471 let response = if explicit_verify_query_signatures.unwrap_or(self.verify_query_signatures) {
472 let (response, mut subnet) = futures_util::try_join!(
473 self.query_endpoint::<QueryResponse>(effective_canister_id, signed_query),
474 self.get_subnet_by_canister(&effective_canister_id)
475 )?;
476 if response.signatures().is_empty() {
477 return Err(AgentError::MissingSignature);
478 } else if response.signatures().len() > subnet.node_keys.len() {
479 return Err(AgentError::TooManySignatures {
480 had: response.signatures().len(),
481 needed: subnet.node_keys.len(),
482 });
483 }
484 for signature in response.signatures() {
485 if OffsetDateTime::now_utc()
486 - OffsetDateTime::from_unix_timestamp_nanos(signature.timestamp.into()).unwrap()
487 > self.ingress_expiry
488 {
489 return Err(AgentError::CertificateOutdated(self.ingress_expiry));
490 }
491 let signable = response.signable(request_id, signature.timestamp);
492 let node_key = if let Some(node_key) = subnet.node_keys.get(&signature.identity) {
493 node_key
494 } else {
495 subnet = self
496 .fetch_subnet_by_canister(&effective_canister_id)
497 .await?;
498 subnet
499 .node_keys
500 .get(&signature.identity)
501 .ok_or(AgentError::CertificateNotAuthorized())?
502 };
503 if node_key.len() != 44 {
504 return Err(AgentError::DerKeyLengthMismatch {
505 expected: 44,
506 actual: node_key.len(),
507 });
508 }
509 const DER_PREFIX: [u8; 12] = [48, 42, 48, 5, 6, 3, 43, 101, 112, 3, 33, 0];
510 if node_key[..12] != DER_PREFIX {
511 return Err(AgentError::DerPrefixMismatch {
512 expected: DER_PREFIX.to_vec(),
513 actual: node_key[..12].to_vec(),
514 });
515 }
516 let pubkey = PublicKey::deserialize_raw(&node_key[12..])
517 .map_err(|_| AgentError::MalformedPublicKey)?;
518
519 match pubkey.verify_signature(&signable, &signature.signature[..]) {
520 Ok(()) => (),
521 Err(SignatureError::InvalidSignature) => {
522 return Err(AgentError::QuerySignatureVerificationFailed)
523 }
524 Err(SignatureError::InvalidLength) => {
525 return Err(AgentError::MalformedSignature)
526 }
527 _ => unreachable!(),
528 }
529 }
530 response
531 } else {
532 self.query_endpoint::<QueryResponse>(effective_canister_id, signed_query)
533 .await?
534 };
535
536 match response {
537 QueryResponse::Replied { reply, .. } => Ok(reply.arg),
538 QueryResponse::Rejected { reject, .. } => Err(AgentError::UncertifiedReject {
539 reject,
540 operation: Some(operation),
541 }),
542 }
543 }
544
545 fn query_content(
546 &self,
547 canister_id: Principal,
548 method_name: String,
549 arg: Vec<u8>,
550 ingress_expiry_datetime: Option<u64>,
551 use_nonce: bool,
552 ) -> Result<EnvelopeContent, AgentError> {
553 Ok(EnvelopeContent::Query {
554 sender: self.identity.sender().map_err(AgentError::SigningError)?,
555 canister_id,
556 method_name,
557 arg,
558 ingress_expiry: ingress_expiry_datetime.unwrap_or_else(|| self.get_expiry_date()),
559 nonce: use_nonce.then(|| self.nonce_factory.generate()).flatten(),
560 })
561 }
562
563 async fn update_raw(
565 &self,
566 canister_id: Principal,
567 effective_canister_id: Principal,
568 method_name: String,
569 arg: Vec<u8>,
570 ingress_expiry_datetime: Option<u64>,
571 ) -> Result<CallResponse<(Vec<u8>, Certificate)>, AgentError> {
572 let nonce = self.nonce_factory.generate();
573 let content = self.update_content(
574 canister_id,
575 method_name.clone(),
576 arg,
577 ingress_expiry_datetime,
578 nonce,
579 )?;
580 let operation = Some(Operation::Call {
581 canister: canister_id,
582 method: method_name,
583 });
584 let request_id = to_request_id(&content)?;
585 let serialized_bytes = sign_envelope(&content, self.identity.clone())?;
586
587 let response_body = self
588 .call_endpoint(effective_canister_id, serialized_bytes)
589 .await?;
590
591 match response_body {
592 TransportCallResponse::Replied { certificate } => {
593 let certificate =
594 serde_cbor::from_slice(&certificate).map_err(AgentError::InvalidCborData)?;
595
596 self.verify(&certificate, effective_canister_id)?;
597 let status = lookup_request_status(&certificate, &request_id)?;
598
599 match status {
600 RequestStatusResponse::Replied(reply) => {
601 Ok(CallResponse::Response((reply.arg, certificate)))
602 }
603 RequestStatusResponse::Rejected(reject_response) => {
604 Err(AgentError::CertifiedReject {
605 reject: reject_response,
606 operation,
607 })?
608 }
609 _ => Ok(CallResponse::Poll(request_id)),
610 }
611 }
612 TransportCallResponse::Accepted => Ok(CallResponse::Poll(request_id)),
613 TransportCallResponse::NonReplicatedRejection(reject_response) => {
614 Err(AgentError::UncertifiedReject {
615 reject: reject_response,
616 operation,
617 })
618 }
619 }
620 }
621
622 pub async fn update_signed(
626 &self,
627 effective_canister_id: Principal,
628 signed_update: Vec<u8>,
629 ) -> Result<CallResponse<Vec<u8>>, AgentError> {
630 let envelope: Envelope =
631 serde_cbor::from_slice(&signed_update).map_err(AgentError::InvalidCborData)?;
632 let EnvelopeContent::Call {
633 canister_id,
634 method_name,
635 ..
636 } = &*envelope.content
637 else {
638 return Err(AgentError::CallDataMismatch {
639 field: "request_type".to_string(),
640 value_arg: "update".to_string(),
641 value_cbor: if matches!(*envelope.content, EnvelopeContent::Query { .. }) {
642 "query"
643 } else {
644 "read_state"
645 }
646 .to_string(),
647 });
648 };
649 let operation = Some(Operation::Call {
650 canister: *canister_id,
651 method: method_name.clone(),
652 });
653 let request_id = to_request_id(&envelope.content)?;
654
655 let response_body = self
656 .call_endpoint(effective_canister_id, signed_update)
657 .await?;
658
659 match response_body {
660 TransportCallResponse::Replied { certificate } => {
661 let certificate =
662 serde_cbor::from_slice(&certificate).map_err(AgentError::InvalidCborData)?;
663
664 self.verify(&certificate, effective_canister_id)?;
665 let status = lookup_request_status(&certificate, &request_id)?;
666
667 match status {
668 RequestStatusResponse::Replied(reply) => Ok(CallResponse::Response(reply.arg)),
669 RequestStatusResponse::Rejected(reject_response) => {
670 Err(AgentError::CertifiedReject {
671 reject: reject_response,
672 operation,
673 })?
674 }
675 _ => Ok(CallResponse::Poll(request_id)),
676 }
677 }
678 TransportCallResponse::Accepted => Ok(CallResponse::Poll(request_id)),
679 TransportCallResponse::NonReplicatedRejection(reject_response) => {
680 Err(AgentError::UncertifiedReject {
681 reject: reject_response,
682 operation,
683 })
684 }
685 }
686 }
687
688 fn update_content(
689 &self,
690 canister_id: Principal,
691 method_name: String,
692 arg: Vec<u8>,
693 ingress_expiry_datetime: Option<u64>,
694 nonce: Option<Vec<u8>>,
695 ) -> Result<EnvelopeContent, AgentError> {
696 Ok(EnvelopeContent::Call {
697 canister_id,
698 method_name,
699 arg,
700 nonce,
701 sender: self.identity.sender().map_err(AgentError::SigningError)?,
702 ingress_expiry: ingress_expiry_datetime.unwrap_or_else(|| self.get_expiry_date()),
703 })
704 }
705
706 fn get_retry_policy(&self) -> ExponentialBackoff<SystemClock> {
707 ExponentialBackoffBuilder::new()
708 .with_initial_interval(Duration::from_millis(500))
709 .with_max_interval(Duration::from_secs(1))
710 .with_multiplier(1.4)
711 .with_max_elapsed_time(Some(self.max_polling_time))
712 .build()
713 }
714
715 pub async fn wait_signed(
717 &self,
718 request_id: &RequestId,
719 effective_canister_id: Principal,
720 signed_request_status: Vec<u8>,
721 ) -> Result<(Vec<u8>, Certificate), AgentError> {
722 let mut retry_policy = self.get_retry_policy();
723
724 let mut request_accepted = false;
725 loop {
726 let (resp, cert) = self
727 .request_status_signed(
728 request_id,
729 effective_canister_id,
730 signed_request_status.clone(),
731 )
732 .await?;
733 match resp {
734 RequestStatusResponse::Unknown => {
735 if retry_policy.get_elapsed_time() > Duration::from_secs(5 * 60) {
737 return Err(AgentError::TimeoutWaitingForResponse());
738 }
739 }
740
741 RequestStatusResponse::Received | RequestStatusResponse::Processing => {
742 if !request_accepted {
743 retry_policy.reset();
744 request_accepted = true;
745 }
746 }
747
748 RequestStatusResponse::Replied(ReplyResponse { arg, .. }) => {
749 return Ok((arg, cert))
750 }
751
752 RequestStatusResponse::Rejected(response) => {
753 return Err(AgentError::CertifiedReject {
754 reject: response,
755 operation: None,
756 })
757 }
758
759 RequestStatusResponse::Done => {
760 return Err(AgentError::RequestStatusDoneNoReply(String::from(
761 *request_id,
762 )))
763 }
764 };
765
766 match retry_policy.next_backoff() {
767 Some(duration) => crate::util::sleep(duration).await,
768
769 None => return Err(AgentError::TimeoutWaitingForResponse()),
770 }
771 }
772 }
773
774 pub async fn wait(
776 &self,
777 request_id: &RequestId,
778 effective_canister_id: Principal,
779 ) -> Result<(Vec<u8>, Certificate), AgentError> {
780 self.wait_inner(request_id, effective_canister_id, None)
781 .await
782 }
783
784 async fn wait_inner(
785 &self,
786 request_id: &RequestId,
787 effective_canister_id: Principal,
788 operation: Option<Operation>,
789 ) -> Result<(Vec<u8>, Certificate), AgentError> {
790 let mut retry_policy = self.get_retry_policy();
791
792 let mut request_accepted = false;
793 loop {
794 let (resp, cert) = self
795 .request_status_raw(request_id, effective_canister_id)
796 .await?;
797 match resp {
798 RequestStatusResponse::Unknown => {
799 if retry_policy.get_elapsed_time() > Duration::from_secs(5 * 60) {
801 return Err(AgentError::TimeoutWaitingForResponse());
802 }
803 }
804
805 RequestStatusResponse::Received | RequestStatusResponse::Processing => {
806 if !request_accepted {
807 retry_policy.reset();
815 request_accepted = true;
816 }
817 }
818
819 RequestStatusResponse::Replied(ReplyResponse { arg, .. }) => {
820 return Ok((arg, cert))
821 }
822
823 RequestStatusResponse::Rejected(response) => {
824 return Err(AgentError::CertifiedReject {
825 reject: response,
826 operation,
827 })
828 }
829
830 RequestStatusResponse::Done => {
831 return Err(AgentError::RequestStatusDoneNoReply(String::from(
832 *request_id,
833 )))
834 }
835 };
836
837 match retry_policy.next_backoff() {
838 Some(duration) => crate::util::sleep(duration).await,
839
840 None => return Err(AgentError::TimeoutWaitingForResponse()),
841 }
842 }
843 }
844
845 pub async fn read_state_raw(
848 &self,
849 paths: Vec<Vec<Label>>,
850 effective_canister_id: Principal,
851 ) -> Result<Certificate, AgentError> {
852 let content = self.read_state_content(paths)?;
853 let serialized_bytes = sign_envelope(&content, self.identity.clone())?;
854
855 let read_state_response: ReadStateResponse = self
856 .read_state_endpoint(effective_canister_id, serialized_bytes)
857 .await?;
858 let cert: Certificate = serde_cbor::from_slice(&read_state_response.certificate)
859 .map_err(AgentError::InvalidCborData)?;
860 self.verify(&cert, effective_canister_id)?;
861 Ok(cert)
862 }
863
864 pub async fn read_subnet_state_raw(
867 &self,
868 paths: Vec<Vec<Label>>,
869 subnet_id: Principal,
870 ) -> Result<Certificate, AgentError> {
871 let content = self.read_state_content(paths)?;
872 let serialized_bytes = sign_envelope(&content, self.identity.clone())?;
873
874 let read_state_response: ReadStateResponse = self
875 .read_subnet_state_endpoint(subnet_id, serialized_bytes)
876 .await?;
877 let cert: Certificate = serde_cbor::from_slice(&read_state_response.certificate)
878 .map_err(AgentError::InvalidCborData)?;
879 self.verify_for_subnet(&cert, subnet_id)?;
880 Ok(cert)
881 }
882
883 fn read_state_content(&self, paths: Vec<Vec<Label>>) -> Result<EnvelopeContent, AgentError> {
884 Ok(EnvelopeContent::ReadState {
885 sender: self.identity.sender().map_err(AgentError::SigningError)?,
886 paths,
887 ingress_expiry: self.get_expiry_date(),
888 })
889 }
890
891 pub fn verify(
894 &self,
895 cert: &Certificate,
896 effective_canister_id: Principal,
897 ) -> Result<(), AgentError> {
898 self.verify_cert(cert, effective_canister_id)?;
899 self.verify_cert_timestamp(cert)?;
900 Ok(())
901 }
902
903 fn verify_cert(
904 &self,
905 cert: &Certificate,
906 effective_canister_id: Principal,
907 ) -> Result<(), AgentError> {
908 let sig = &cert.signature;
909
910 let root_hash = cert.tree.digest();
911 let mut msg = vec![];
912 msg.extend_from_slice(IC_STATE_ROOT_DOMAIN_SEPARATOR);
913 msg.extend_from_slice(&root_hash);
914
915 let der_key = self.check_delegation(&cert.delegation, effective_canister_id)?;
916 let key = extract_der(der_key)?;
917
918 ic_verify_bls_signature::verify_bls_signature(sig, &msg, &key)
919 .map_err(|_| AgentError::CertificateVerificationFailed())?;
920 Ok(())
921 }
922
923 pub fn verify_for_subnet(
926 &self,
927 cert: &Certificate,
928 subnet_id: Principal,
929 ) -> Result<(), AgentError> {
930 self.verify_cert_for_subnet(cert, subnet_id)?;
931 self.verify_cert_timestamp(cert)?;
932 Ok(())
933 }
934
935 fn verify_cert_for_subnet(
936 &self,
937 cert: &Certificate,
938 subnet_id: Principal,
939 ) -> Result<(), AgentError> {
940 let sig = &cert.signature;
941
942 let root_hash = cert.tree.digest();
943 let mut msg = vec![];
944 msg.extend_from_slice(IC_STATE_ROOT_DOMAIN_SEPARATOR);
945 msg.extend_from_slice(&root_hash);
946
947 let der_key = self.check_delegation_for_subnet(&cert.delegation, subnet_id)?;
948 let key = extract_der(der_key)?;
949
950 ic_verify_bls_signature::verify_bls_signature(sig, &msg, &key)
951 .map_err(|_| AgentError::CertificateVerificationFailed())?;
952 Ok(())
953 }
954
955 fn verify_cert_timestamp(&self, cert: &Certificate) -> Result<(), AgentError> {
956 let time = lookup_time(cert)?;
959 if (OffsetDateTime::now_utc()
960 - OffsetDateTime::from_unix_timestamp_nanos(time.into()).unwrap())
961 > self.ingress_expiry
962 {
963 Err(AgentError::CertificateOutdated(self.ingress_expiry))
964 } else {
965 Ok(())
966 }
967 }
968
969 fn check_delegation(
970 &self,
971 delegation: &Option<Delegation>,
972 effective_canister_id: Principal,
973 ) -> Result<Vec<u8>, AgentError> {
974 match delegation {
975 None => Ok(self.read_root_key()),
976 Some(delegation) => {
977 let cert: Certificate = serde_cbor::from_slice(&delegation.certificate)
978 .map_err(AgentError::InvalidCborData)?;
979 if cert.delegation.is_some() {
980 return Err(AgentError::CertificateHasTooManyDelegations);
981 }
982 self.verify_cert(&cert, effective_canister_id)?;
983 let canister_range_shards_lookup =
984 ["canister_ranges".as_bytes(), delegation.subnet_id.as_ref()];
985 let canister_range_shards = lookup_tree(&cert.tree, canister_range_shards_lookup)?;
986 let mut shard_paths = canister_range_shards
987 .list_paths() .into_iter()
989 .map(|mut x| {
990 x.pop() .ok_or_else(AgentError::CertificateVerificationFailed)
992 })
993 .collect::<Result<Vec<_>, _>>()?;
994 if shard_paths.is_empty() {
995 return Err(AgentError::CertificateNotAuthorized());
996 }
997 shard_paths.sort_unstable();
998 let shard_division = shard_paths
999 .partition_point(|shard| shard.as_bytes() <= effective_canister_id.as_slice());
1000 if shard_division == 0 {
1001 return Err(AgentError::CertificateNotAuthorized());
1003 }
1004 let max_potential_shard = &shard_paths[shard_division - 1];
1005 let canister_range_lookup = [max_potential_shard.as_bytes()];
1006 let canister_range = lookup_value(&canister_range_shards, canister_range_lookup)?;
1007 let ranges: Vec<(Principal, Principal)> =
1008 serde_cbor::from_slice(canister_range).map_err(AgentError::InvalidCborData)?;
1009 if !principal_is_within_ranges(&effective_canister_id, &ranges[..]) {
1010 return Err(AgentError::CertificateNotAuthorized());
1012 }
1013
1014 let public_key_path = [
1015 "subnet".as_bytes(),
1016 delegation.subnet_id.as_ref(),
1017 "public_key".as_bytes(),
1018 ];
1019 lookup_value(&cert.tree, public_key_path).map(<[u8]>::to_vec)
1020 }
1021 }
1022 }
1023
1024 fn check_delegation_for_subnet(
1025 &self,
1026 delegation: &Option<Delegation>,
1027 subnet_id: Principal,
1028 ) -> Result<Vec<u8>, AgentError> {
1029 match delegation {
1030 None => Ok(self.read_root_key()),
1031 Some(delegation) => {
1032 let cert: Certificate = serde_cbor::from_slice(&delegation.certificate)
1033 .map_err(AgentError::InvalidCborData)?;
1034 if cert.delegation.is_some() {
1035 return Err(AgentError::CertificateHasTooManyDelegations);
1036 }
1037 self.verify_cert_for_subnet(&cert, subnet_id)?;
1038 let public_key_path = [
1039 "subnet".as_bytes(),
1040 subnet_id.as_ref(),
1041 "public_key".as_bytes(),
1042 ];
1043 let pk = lookup_value(&cert.tree, public_key_path)
1044 .map_err(|_| AgentError::CertificateNotAuthorized())?
1045 .to_vec();
1046 Ok(pk)
1047 }
1048 }
1049 }
1050
1051 pub async fn read_state_canister_info(
1054 &self,
1055 canister_id: Principal,
1056 path: &str,
1057 ) -> Result<Vec<u8>, AgentError> {
1058 let paths: Vec<Vec<Label>> = vec![vec![
1059 "canister".into(),
1060 Label::from_bytes(canister_id.as_slice()),
1061 path.into(),
1062 ]];
1063
1064 let cert = self.read_state_raw(paths, canister_id).await?;
1065
1066 lookup_canister_info(cert, canister_id, path)
1067 }
1068
1069 pub async fn read_state_canister_controllers(
1071 &self,
1072 canister_id: Principal,
1073 ) -> Result<Vec<Principal>, AgentError> {
1074 let blob = self
1075 .read_state_canister_info(canister_id, "controllers")
1076 .await?;
1077 let controllers: Vec<Principal> =
1078 serde_cbor::from_slice(&blob).map_err(AgentError::InvalidCborData)?;
1079 Ok(controllers)
1080 }
1081
1082 pub async fn read_state_canister_module_hash(
1084 &self,
1085 canister_id: Principal,
1086 ) -> Result<Vec<u8>, AgentError> {
1087 self.read_state_canister_info(canister_id, "module_hash")
1088 .await
1089 }
1090
1091 pub async fn read_state_canister_metadata(
1093 &self,
1094 canister_id: Principal,
1095 path: &str,
1096 ) -> Result<Vec<u8>, AgentError> {
1097 let paths: Vec<Vec<Label>> = vec![vec![
1098 "canister".into(),
1099 Label::from_bytes(canister_id.as_slice()),
1100 "metadata".into(),
1101 path.into(),
1102 ]];
1103
1104 let cert = self.read_state_raw(paths, canister_id).await?;
1105
1106 lookup_canister_metadata(cert, canister_id, path)
1107 }
1108
1109 pub async fn read_state_subnet_metrics(
1111 &self,
1112 subnet_id: Principal,
1113 ) -> Result<SubnetMetrics, AgentError> {
1114 let paths = vec![vec![
1115 "subnet".into(),
1116 Label::from_bytes(subnet_id.as_slice()),
1117 "metrics".into(),
1118 ]];
1119 let cert = self.read_subnet_state_raw(paths, subnet_id).await?;
1120 lookup_subnet_metrics(cert, subnet_id)
1121 }
1122
1123 pub async fn read_state_subnet_canister_ranges(
1125 &self,
1126 subnet_id: Principal,
1127 ) -> Result<Vec<(Principal, Principal)>, AgentError> {
1128 let paths = vec![vec![
1129 "subnet".into(),
1130 Label::from_bytes(subnet_id.as_slice()),
1131 "canister_ranges".into(),
1132 ]];
1133 let cert = self.read_subnet_state_raw(paths, subnet_id).await?;
1134 lookup_subnet_canister_ranges(&cert, subnet_id)
1135 }
1136
1137 pub async fn request_status_raw(
1139 &self,
1140 request_id: &RequestId,
1141 effective_canister_id: Principal,
1142 ) -> Result<(RequestStatusResponse, Certificate), AgentError> {
1143 let paths: Vec<Vec<Label>> =
1144 vec![vec!["request_status".into(), request_id.to_vec().into()]];
1145
1146 let cert = self.read_state_raw(paths, effective_canister_id).await?;
1147
1148 Ok((lookup_request_status(&cert, request_id)?, cert))
1149 }
1150
1151 pub async fn request_status_signed(
1155 &self,
1156 request_id: &RequestId,
1157 effective_canister_id: Principal,
1158 signed_request_status: Vec<u8>,
1159 ) -> Result<(RequestStatusResponse, Certificate), AgentError> {
1160 let _envelope: Envelope =
1161 serde_cbor::from_slice(&signed_request_status).map_err(AgentError::InvalidCborData)?;
1162 let read_state_response: ReadStateResponse = self
1163 .read_state_endpoint(effective_canister_id, signed_request_status)
1164 .await?;
1165
1166 let cert: Certificate = serde_cbor::from_slice(&read_state_response.certificate)
1167 .map_err(AgentError::InvalidCborData)?;
1168 self.verify(&cert, effective_canister_id)?;
1169 Ok((lookup_request_status(&cert, request_id)?, cert))
1170 }
1171
1172 pub fn update<S: Into<String>>(
1175 &self,
1176 canister_id: &Principal,
1177 method_name: S,
1178 ) -> UpdateBuilder<'_> {
1179 UpdateBuilder::new(self, *canister_id, method_name.into())
1180 }
1181
1182 pub async fn status(&self) -> Result<Status, AgentError> {
1184 let endpoint = "api/v2/status";
1185 let bytes = self.execute(Method::GET, endpoint, None).await?.1;
1186
1187 let cbor: serde_cbor::Value =
1188 serde_cbor::from_slice(&bytes).map_err(AgentError::InvalidCborData)?;
1189
1190 Status::try_from(&cbor).map_err(|_| AgentError::InvalidReplicaStatus)
1191 }
1192
1193 pub fn query<S: Into<String>>(
1196 &self,
1197 canister_id: &Principal,
1198 method_name: S,
1199 ) -> QueryBuilder<'_> {
1200 QueryBuilder::new(self, *canister_id, method_name.into())
1201 }
1202
1203 pub fn sign_request_status(
1206 &self,
1207 effective_canister_id: Principal,
1208 request_id: RequestId,
1209 ) -> Result<SignedRequestStatus, AgentError> {
1210 let paths: Vec<Vec<Label>> =
1211 vec![vec!["request_status".into(), request_id.to_vec().into()]];
1212 let read_state_content = self.read_state_content(paths)?;
1213 let signed_request_status = sign_envelope(&read_state_content, self.identity.clone())?;
1214 let ingress_expiry = read_state_content.ingress_expiry();
1215 let sender = *read_state_content.sender();
1216 Ok(SignedRequestStatus {
1217 ingress_expiry,
1218 sender,
1219 effective_canister_id,
1220 request_id,
1221 signed_request_status,
1222 })
1223 }
1224
1225 pub async fn get_subnet_by_canister(
1228 &self,
1229 canister: &Principal,
1230 ) -> Result<Arc<Subnet>, AgentError> {
1231 let subnet = self
1232 .subnet_key_cache
1233 .lock()
1234 .unwrap()
1235 .get_subnet_by_canister(canister);
1236 if let Some(subnet) = subnet {
1237 Ok(subnet)
1238 } else {
1239 self.fetch_subnet_by_canister(canister).await
1240 }
1241 }
1242
1243 pub async fn get_subnet_by_id(&self, subnet_id: &Principal) -> Result<Arc<Subnet>, AgentError> {
1246 let subnet = self
1247 .subnet_key_cache
1248 .lock()
1249 .unwrap()
1250 .get_subnet_by_id(subnet_id);
1251 if let Some(subnet) = subnet {
1252 Ok(subnet)
1253 } else {
1254 self.fetch_subnet_by_id(subnet_id).await
1255 }
1256 }
1257
1258 pub async fn fetch_api_boundary_nodes_by_canister_id(
1260 &self,
1261 canister_id: Principal,
1262 ) -> Result<Vec<ApiBoundaryNode>, AgentError> {
1263 let paths = vec![vec!["api_boundary_nodes".into()]];
1264 let certificate = self.read_state_raw(paths, canister_id).await?;
1265 let api_boundary_nodes = lookup_api_boundary_nodes(certificate)?;
1266 Ok(api_boundary_nodes)
1267 }
1268
1269 pub async fn fetch_api_boundary_nodes_by_subnet_id(
1271 &self,
1272 subnet_id: Principal,
1273 ) -> Result<Vec<ApiBoundaryNode>, AgentError> {
1274 let paths = vec![vec!["api_boundary_nodes".into()]];
1275 let certificate = self.read_subnet_state_raw(paths, subnet_id).await?;
1276 let api_boundary_nodes = lookup_api_boundary_nodes(certificate)?;
1277 Ok(api_boundary_nodes)
1278 }
1279
1280 pub async fn fetch_subnet_by_canister(
1285 &self,
1286 canister: &Principal,
1287 ) -> Result<Arc<Subnet>, AgentError> {
1288 let canister_cert = self
1289 .read_state_raw(vec![vec!["subnet".into()]], *canister)
1290 .await?;
1291 let subnet_id = if let Some(delegation) = canister_cert.delegation.as_ref() {
1292 Principal::from_slice(&delegation.subnet_id)
1293 } else {
1294 Principal::self_authenticating(&self.root_key.read().unwrap()[..])
1296 };
1297 let mut subnet = lookup_incomplete_subnet(&subnet_id, &canister_cert)?;
1298 let canister_ranges = if let Some(delegation) = canister_cert.delegation.as_ref() {
1299 let delegation_cert: Certificate = serde_cbor::from_slice(&delegation.certificate)?;
1301 lookup_canister_ranges(&subnet_id, &delegation_cert)?
1302 } else {
1303 lookup_canister_ranges(&subnet_id, &canister_cert)?
1304 };
1305 subnet.canister_ranges = canister_ranges;
1306 if !subnet.canister_ranges.contains(canister) {
1307 return Err(AgentError::CertificateNotAuthorized());
1308 }
1309 let subnet = Arc::new(subnet);
1310 self.subnet_key_cache
1311 .lock()
1312 .unwrap()
1313 .insert_subnet(subnet_id, subnet.clone());
1314 Ok(subnet)
1315 }
1316
1317 pub async fn fetch_subnet_by_id(
1322 &self,
1323 subnet_id: &Principal,
1324 ) -> Result<Arc<Subnet>, AgentError> {
1325 let subnet_cert = self
1326 .read_subnet_state_raw(
1327 vec![
1328 vec!["canister_ranges".into(), subnet_id.as_slice().into()],
1329 vec!["subnet".into(), subnet_id.as_slice().into()],
1330 ],
1331 *subnet_id,
1332 )
1333 .await?;
1334 let subnet = lookup_subnet_and_ranges(subnet_id, &subnet_cert)?;
1335 let subnet = Arc::new(subnet);
1336 self.subnet_key_cache
1337 .lock()
1338 .unwrap()
1339 .insert_subnet(*subnet_id, subnet.clone());
1340 Ok(subnet)
1341 }
1342
1343 async fn request(
1344 &self,
1345 method: Method,
1346 endpoint: &str,
1347 body: Option<Vec<u8>>,
1348 ) -> Result<(StatusCode, HeaderMap, Vec<u8>), AgentError> {
1349 let body = body.map(Bytes::from);
1350
1351 let create_request_with_generated_url = || -> Result<http::Request<Bytes>, AgentError> {
1352 let url = self.route_provider.route()?.join(endpoint)?;
1353 let uri = Uri::from_str(url.as_str())
1354 .map_err(|e| AgentError::InvalidReplicaUrl(e.to_string()))?;
1355 let body = body.clone().unwrap_or_default();
1356 let request = http::Request::builder()
1357 .method(method.clone())
1358 .uri(uri)
1359 .header(CONTENT_TYPE, "application/cbor")
1360 .body(body)
1361 .map_err(|e| {
1362 AgentError::TransportError(TransportError::Generic(format!(
1363 "unable to create request: {e:#}"
1364 )))
1365 })?;
1366
1367 Ok(request)
1368 };
1369
1370 let response = self
1371 .client
1372 .call(
1373 &create_request_with_generated_url,
1374 self.max_tcp_error_retries,
1375 self.max_response_body_size,
1376 )
1377 .await?;
1378
1379 let (parts, body) = response.into_parts();
1380
1381 Ok((parts.status, parts.headers, body.to_vec()))
1382 }
1383
1384 async fn execute(
1385 &self,
1386 method: Method,
1387 endpoint: &str,
1388 body: Option<Vec<u8>>,
1389 ) -> Result<(StatusCode, Vec<u8>), AgentError> {
1390 let request_result = self.request(method.clone(), endpoint, body.clone()).await?;
1391
1392 let status = request_result.0;
1393 let headers = request_result.1;
1394 let body = request_result.2;
1395
1396 if status.is_client_error() || status.is_server_error() {
1397 Err(AgentError::HttpError(HttpErrorPayload {
1398 status: status.into(),
1399 content_type: headers
1400 .get(CONTENT_TYPE)
1401 .and_then(|value| value.to_str().ok())
1402 .map(str::to_string),
1403 content: body,
1404 }))
1405 } else if !(status == StatusCode::OK || status == StatusCode::ACCEPTED) {
1406 Err(AgentError::InvalidHttpResponse(format!(
1407 "Expected `200`, `202`, 4xx`, or `5xx` HTTP status code. Got: {status}",
1408 )))
1409 } else {
1410 Ok((status, body))
1411 }
1412 }
1413}
1414
1415fn principal_is_within_ranges(principal: &Principal, ranges: &[(Principal, Principal)]) -> bool {
1418 ranges
1419 .iter()
1420 .any(|r| principal >= &r.0 && principal <= &r.1)
1421}
1422
1423fn sign_envelope(
1424 content: &EnvelopeContent,
1425 identity: Arc<dyn Identity>,
1426) -> Result<Vec<u8>, AgentError> {
1427 let signature = identity.sign(content).map_err(AgentError::SigningError)?;
1428
1429 let envelope = Envelope {
1430 content: Cow::Borrowed(content),
1431 sender_pubkey: signature.public_key,
1432 sender_sig: signature.signature,
1433 sender_delegation: signature.delegations,
1434 };
1435
1436 let mut serialized_bytes = Vec::new();
1437 let mut serializer = serde_cbor::Serializer::new(&mut serialized_bytes);
1438 serializer.self_describe()?;
1439 envelope.serialize(&mut serializer)?;
1440
1441 Ok(serialized_bytes)
1442}
1443
1444pub fn signed_query_inspect(
1447 sender: Principal,
1448 canister_id: Principal,
1449 method_name: &str,
1450 arg: &[u8],
1451 ingress_expiry: u64,
1452 signed_query: Vec<u8>,
1453) -> Result<(), AgentError> {
1454 let envelope: Envelope =
1455 serde_cbor::from_slice(&signed_query).map_err(AgentError::InvalidCborData)?;
1456 match envelope.content.as_ref() {
1457 EnvelopeContent::Query {
1458 ingress_expiry: ingress_expiry_cbor,
1459 sender: sender_cbor,
1460 canister_id: canister_id_cbor,
1461 method_name: method_name_cbor,
1462 arg: arg_cbor,
1463 nonce: _nonce,
1464 } => {
1465 if ingress_expiry != *ingress_expiry_cbor {
1466 return Err(AgentError::CallDataMismatch {
1467 field: "ingress_expiry".to_string(),
1468 value_arg: ingress_expiry.to_string(),
1469 value_cbor: ingress_expiry_cbor.to_string(),
1470 });
1471 }
1472 if sender != *sender_cbor {
1473 return Err(AgentError::CallDataMismatch {
1474 field: "sender".to_string(),
1475 value_arg: sender.to_string(),
1476 value_cbor: sender_cbor.to_string(),
1477 });
1478 }
1479 if canister_id != *canister_id_cbor {
1480 return Err(AgentError::CallDataMismatch {
1481 field: "canister_id".to_string(),
1482 value_arg: canister_id.to_string(),
1483 value_cbor: canister_id_cbor.to_string(),
1484 });
1485 }
1486 if method_name != *method_name_cbor {
1487 return Err(AgentError::CallDataMismatch {
1488 field: "method_name".to_string(),
1489 value_arg: method_name.to_string(),
1490 value_cbor: method_name_cbor.clone(),
1491 });
1492 }
1493 if arg != *arg_cbor {
1494 return Err(AgentError::CallDataMismatch {
1495 field: "arg".to_string(),
1496 value_arg: format!("{arg:?}"),
1497 value_cbor: format!("{arg_cbor:?}"),
1498 });
1499 }
1500 }
1501 EnvelopeContent::Call { .. } => {
1502 return Err(AgentError::CallDataMismatch {
1503 field: "request_type".to_string(),
1504 value_arg: "query".to_string(),
1505 value_cbor: "call".to_string(),
1506 })
1507 }
1508 EnvelopeContent::ReadState { .. } => {
1509 return Err(AgentError::CallDataMismatch {
1510 field: "request_type".to_string(),
1511 value_arg: "query".to_string(),
1512 value_cbor: "read_state".to_string(),
1513 })
1514 }
1515 }
1516 Ok(())
1517}
1518
1519pub fn signed_update_inspect(
1522 sender: Principal,
1523 canister_id: Principal,
1524 method_name: &str,
1525 arg: &[u8],
1526 ingress_expiry: u64,
1527 signed_update: Vec<u8>,
1528) -> Result<(), AgentError> {
1529 let envelope: Envelope =
1530 serde_cbor::from_slice(&signed_update).map_err(AgentError::InvalidCborData)?;
1531 match envelope.content.as_ref() {
1532 EnvelopeContent::Call {
1533 nonce: _nonce,
1534 ingress_expiry: ingress_expiry_cbor,
1535 sender: sender_cbor,
1536 canister_id: canister_id_cbor,
1537 method_name: method_name_cbor,
1538 arg: arg_cbor,
1539 } => {
1540 if ingress_expiry != *ingress_expiry_cbor {
1541 return Err(AgentError::CallDataMismatch {
1542 field: "ingress_expiry".to_string(),
1543 value_arg: ingress_expiry.to_string(),
1544 value_cbor: ingress_expiry_cbor.to_string(),
1545 });
1546 }
1547 if sender != *sender_cbor {
1548 return Err(AgentError::CallDataMismatch {
1549 field: "sender".to_string(),
1550 value_arg: sender.to_string(),
1551 value_cbor: sender_cbor.to_string(),
1552 });
1553 }
1554 if canister_id != *canister_id_cbor {
1555 return Err(AgentError::CallDataMismatch {
1556 field: "canister_id".to_string(),
1557 value_arg: canister_id.to_string(),
1558 value_cbor: canister_id_cbor.to_string(),
1559 });
1560 }
1561 if method_name != *method_name_cbor {
1562 return Err(AgentError::CallDataMismatch {
1563 field: "method_name".to_string(),
1564 value_arg: method_name.to_string(),
1565 value_cbor: method_name_cbor.clone(),
1566 });
1567 }
1568 if arg != *arg_cbor {
1569 return Err(AgentError::CallDataMismatch {
1570 field: "arg".to_string(),
1571 value_arg: format!("{arg:?}"),
1572 value_cbor: format!("{arg_cbor:?}"),
1573 });
1574 }
1575 }
1576 EnvelopeContent::ReadState { .. } => {
1577 return Err(AgentError::CallDataMismatch {
1578 field: "request_type".to_string(),
1579 value_arg: "call".to_string(),
1580 value_cbor: "read_state".to_string(),
1581 })
1582 }
1583 EnvelopeContent::Query { .. } => {
1584 return Err(AgentError::CallDataMismatch {
1585 field: "request_type".to_string(),
1586 value_arg: "call".to_string(),
1587 value_cbor: "query".to_string(),
1588 })
1589 }
1590 }
1591 Ok(())
1592}
1593
1594pub fn signed_request_status_inspect(
1597 sender: Principal,
1598 request_id: &RequestId,
1599 ingress_expiry: u64,
1600 signed_request_status: Vec<u8>,
1601) -> Result<(), AgentError> {
1602 let paths: Vec<Vec<Label>> = vec![vec!["request_status".into(), request_id.to_vec().into()]];
1603 let envelope: Envelope =
1604 serde_cbor::from_slice(&signed_request_status).map_err(AgentError::InvalidCborData)?;
1605 match envelope.content.as_ref() {
1606 EnvelopeContent::ReadState {
1607 ingress_expiry: ingress_expiry_cbor,
1608 sender: sender_cbor,
1609 paths: paths_cbor,
1610 } => {
1611 if ingress_expiry != *ingress_expiry_cbor {
1612 return Err(AgentError::CallDataMismatch {
1613 field: "ingress_expiry".to_string(),
1614 value_arg: ingress_expiry.to_string(),
1615 value_cbor: ingress_expiry_cbor.to_string(),
1616 });
1617 }
1618 if sender != *sender_cbor {
1619 return Err(AgentError::CallDataMismatch {
1620 field: "sender".to_string(),
1621 value_arg: sender.to_string(),
1622 value_cbor: sender_cbor.to_string(),
1623 });
1624 }
1625
1626 if paths != *paths_cbor {
1627 return Err(AgentError::CallDataMismatch {
1628 field: "paths".to_string(),
1629 value_arg: format!("{paths:?}"),
1630 value_cbor: format!("{paths_cbor:?}"),
1631 });
1632 }
1633 }
1634 EnvelopeContent::Query { .. } => {
1635 return Err(AgentError::CallDataMismatch {
1636 field: "request_type".to_string(),
1637 value_arg: "read_state".to_string(),
1638 value_cbor: "query".to_string(),
1639 })
1640 }
1641 EnvelopeContent::Call { .. } => {
1642 return Err(AgentError::CallDataMismatch {
1643 field: "request_type".to_string(),
1644 value_arg: "read_state".to_string(),
1645 value_cbor: "call".to_string(),
1646 })
1647 }
1648 }
1649 Ok(())
1650}
1651
1652#[derive(Clone)]
1653struct SubnetCache {
1654 subnets: TimedCache<Principal, Arc<Subnet>>,
1655 canister_index: RangeInclusiveMap<Principal, Principal>,
1656}
1657
1658impl SubnetCache {
1659 fn new() -> Self {
1660 Self {
1661 subnets: TimedCache::with_lifespan(Duration::from_secs(300)),
1662 canister_index: RangeInclusiveMap::new(),
1663 }
1664 }
1665
1666 fn get_subnet_by_canister(&mut self, canister: &Principal) -> Option<Arc<Subnet>> {
1667 self.canister_index
1668 .get(canister)
1669 .and_then(|subnet_id| self.subnets.cache_get(subnet_id).cloned())
1670 .filter(|subnet| subnet.canister_ranges.contains(canister))
1671 }
1672
1673 fn get_subnet_by_id(&mut self, subnet_id: &Principal) -> Option<Arc<Subnet>> {
1674 self.subnets.cache_get(subnet_id).cloned()
1675 }
1676
1677 fn insert_subnet(&mut self, subnet_id: Principal, subnet: Arc<Subnet>) {
1678 self.subnets.cache_set(subnet_id, subnet.clone());
1679 for range in subnet.canister_ranges.iter() {
1680 self.canister_index.insert(range.clone(), subnet_id);
1681 }
1682 }
1683}
1684
1685#[derive(Debug, Clone)]
1687pub struct ApiBoundaryNode {
1688 pub domain: String,
1690 pub ipv6_address: String,
1692 pub ipv4_address: Option<String>,
1694}
1695
1696#[derive(Debug, Clone)]
1700#[non_exhaustive]
1701pub struct QueryBuilder<'agent> {
1702 agent: &'agent Agent,
1703 pub effective_canister_id: Principal,
1705 pub canister_id: Principal,
1707 pub method_name: String,
1709 pub arg: Vec<u8>,
1711 pub ingress_expiry_datetime: Option<u64>,
1713 pub use_nonce: bool,
1715}
1716
1717impl<'agent> QueryBuilder<'agent> {
1718 pub fn new(agent: &'agent Agent, canister_id: Principal, method_name: String) -> Self {
1720 Self {
1721 agent,
1722 effective_canister_id: canister_id,
1723 canister_id,
1724 method_name,
1725 arg: vec![],
1726 ingress_expiry_datetime: None,
1727 use_nonce: false,
1728 }
1729 }
1730
1731 pub fn with_effective_canister_id(mut self, canister_id: Principal) -> Self {
1733 self.effective_canister_id = canister_id;
1734 self
1735 }
1736
1737 pub fn with_arg<A: Into<Vec<u8>>>(mut self, arg: A) -> Self {
1739 self.arg = arg.into();
1740 self
1741 }
1742
1743 pub fn expire_at(mut self, time: impl Into<OffsetDateTime>) -> Self {
1745 self.ingress_expiry_datetime = Some(time.into().unix_timestamp_nanos() as u64);
1746 self
1747 }
1748
1749 pub fn expire_after(mut self, duration: Duration) -> Self {
1751 self.ingress_expiry_datetime = Some(
1752 OffsetDateTime::now_utc()
1753 .saturating_add(duration.try_into().expect("negative duration"))
1754 .unix_timestamp_nanos() as u64,
1755 );
1756 self
1757 }
1758
1759 pub fn with_nonce_generation(mut self) -> Self {
1762 self.use_nonce = true;
1763 self
1764 }
1765
1766 pub async fn call(self) -> Result<Vec<u8>, AgentError> {
1768 self.agent
1769 .query_raw(
1770 self.canister_id,
1771 self.effective_canister_id,
1772 self.method_name,
1773 self.arg,
1774 self.ingress_expiry_datetime,
1775 self.use_nonce,
1776 None,
1777 )
1778 .await
1779 }
1780
1781 pub async fn call_with_verification(self) -> Result<Vec<u8>, AgentError> {
1786 self.agent
1787 .query_raw(
1788 self.canister_id,
1789 self.effective_canister_id,
1790 self.method_name,
1791 self.arg,
1792 self.ingress_expiry_datetime,
1793 self.use_nonce,
1794 Some(true),
1795 )
1796 .await
1797 }
1798
1799 pub async fn call_without_verification(self) -> Result<Vec<u8>, AgentError> {
1804 self.agent
1805 .query_raw(
1806 self.canister_id,
1807 self.effective_canister_id,
1808 self.method_name,
1809 self.arg,
1810 self.ingress_expiry_datetime,
1811 self.use_nonce,
1812 Some(false),
1813 )
1814 .await
1815 }
1816
1817 pub fn sign(self) -> Result<SignedQuery, AgentError> {
1820 let effective_canister_id = self.effective_canister_id;
1821 let identity = self.agent.identity.clone();
1822 let content = self.into_envelope()?;
1823 let signed_query = sign_envelope(&content, identity)?;
1824 let EnvelopeContent::Query {
1825 ingress_expiry,
1826 sender,
1827 canister_id,
1828 method_name,
1829 arg,
1830 nonce,
1831 } = content
1832 else {
1833 unreachable!()
1834 };
1835 Ok(SignedQuery {
1836 ingress_expiry,
1837 sender,
1838 canister_id,
1839 method_name,
1840 arg,
1841 effective_canister_id,
1842 signed_query,
1843 nonce,
1844 })
1845 }
1846
1847 pub fn into_envelope(self) -> Result<EnvelopeContent, AgentError> {
1849 self.agent.query_content(
1850 self.canister_id,
1851 self.method_name,
1852 self.arg,
1853 self.ingress_expiry_datetime,
1854 self.use_nonce,
1855 )
1856 }
1857}
1858
1859impl<'agent> IntoFuture for QueryBuilder<'agent> {
1860 type IntoFuture = AgentFuture<'agent, Vec<u8>>;
1861 type Output = Result<Vec<u8>, AgentError>;
1862 fn into_future(self) -> Self::IntoFuture {
1863 Box::pin(self.call())
1864 }
1865}
1866
1867pub struct UpdateCall<'agent> {
1869 agent: &'agent Agent,
1870 response_future: AgentFuture<'agent, CallResponse<(Vec<u8>, Certificate)>>,
1871 effective_canister_id: Principal,
1872 canister_id: Principal,
1873 method_name: String,
1874}
1875
1876impl fmt::Debug for UpdateCall<'_> {
1877 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1878 f.debug_struct("UpdateCall")
1879 .field("agent", &self.agent)
1880 .field("effective_canister_id", &self.effective_canister_id)
1881 .finish_non_exhaustive()
1882 }
1883}
1884
1885impl Future for UpdateCall<'_> {
1886 type Output = Result<CallResponse<(Vec<u8>, Certificate)>, AgentError>;
1887 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1888 self.response_future.as_mut().poll(cx)
1889 }
1890}
1891
1892impl<'a> UpdateCall<'a> {
1893 pub async fn and_wait(self) -> Result<(Vec<u8>, Certificate), AgentError> {
1895 let response = self.response_future.await?;
1896
1897 match response {
1898 CallResponse::Response(response) => Ok(response),
1899 CallResponse::Poll(request_id) => {
1900 self.agent
1901 .wait_inner(
1902 &request_id,
1903 self.effective_canister_id,
1904 Some(Operation::Call {
1905 canister: self.canister_id,
1906 method: self.method_name,
1907 }),
1908 )
1909 .await
1910 }
1911 }
1912 }
1913}
1914#[derive(Debug)]
1919pub struct UpdateBuilder<'agent> {
1920 agent: &'agent Agent,
1921 pub effective_canister_id: Principal,
1923 pub canister_id: Principal,
1925 pub method_name: String,
1927 pub arg: Vec<u8>,
1929 pub ingress_expiry_datetime: Option<u64>,
1931}
1932
1933impl<'agent> UpdateBuilder<'agent> {
1934 pub fn new(agent: &'agent Agent, canister_id: Principal, method_name: String) -> Self {
1936 Self {
1937 agent,
1938 effective_canister_id: canister_id,
1939 canister_id,
1940 method_name,
1941 arg: vec![],
1942 ingress_expiry_datetime: None,
1943 }
1944 }
1945
1946 pub fn with_effective_canister_id(mut self, canister_id: Principal) -> Self {
1948 self.effective_canister_id = canister_id;
1949 self
1950 }
1951
1952 pub fn with_arg<A: Into<Vec<u8>>>(mut self, arg: A) -> Self {
1954 self.arg = arg.into();
1955 self
1956 }
1957
1958 pub fn expire_at(mut self, time: impl Into<OffsetDateTime>) -> Self {
1960 self.ingress_expiry_datetime = Some(time.into().unix_timestamp_nanos() as u64);
1961 self
1962 }
1963
1964 pub fn expire_after(mut self, duration: Duration) -> Self {
1966 self.ingress_expiry_datetime = Some(
1967 OffsetDateTime::now_utc()
1968 .saturating_add(duration.try_into().expect("negative duration"))
1969 .unix_timestamp_nanos() as u64,
1970 );
1971 self
1972 }
1973
1974 pub async fn call_and_wait(self) -> Result<Vec<u8>, AgentError> {
1977 self.call().and_wait().await.map(|x| x.0)
1978 }
1979
1980 pub fn call(self) -> UpdateCall<'agent> {
1983 let method_name = self.method_name.clone();
1984 let response_future = async move {
1985 self.agent
1986 .update_raw(
1987 self.canister_id,
1988 self.effective_canister_id,
1989 self.method_name,
1990 self.arg,
1991 self.ingress_expiry_datetime,
1992 )
1993 .await
1994 };
1995 UpdateCall {
1996 agent: self.agent,
1997 response_future: Box::pin(response_future),
1998 effective_canister_id: self.effective_canister_id,
1999 canister_id: self.canister_id,
2000 method_name,
2001 }
2002 }
2003
2004 pub fn sign(self) -> Result<SignedUpdate, AgentError> {
2007 let identity = self.agent.identity.clone();
2008 let effective_canister_id = self.effective_canister_id;
2009 let content = self.into_envelope()?;
2010 let signed_update = sign_envelope(&content, identity)?;
2011 let request_id = to_request_id(&content)?;
2012 let EnvelopeContent::Call {
2013 nonce,
2014 ingress_expiry,
2015 sender,
2016 canister_id,
2017 method_name,
2018 arg,
2019 } = content
2020 else {
2021 unreachable!()
2022 };
2023 Ok(SignedUpdate {
2024 nonce,
2025 ingress_expiry,
2026 sender,
2027 canister_id,
2028 method_name,
2029 arg,
2030 effective_canister_id,
2031 signed_update,
2032 request_id,
2033 })
2034 }
2035
2036 pub fn into_envelope(self) -> Result<EnvelopeContent, AgentError> {
2038 let nonce = self.agent.nonce_factory.generate();
2039 self.agent.update_content(
2040 self.canister_id,
2041 self.method_name,
2042 self.arg,
2043 self.ingress_expiry_datetime,
2044 nonce,
2045 )
2046 }
2047}
2048
2049impl<'agent> IntoFuture for UpdateBuilder<'agent> {
2050 type IntoFuture = AgentFuture<'agent, Vec<u8>>;
2051 type Output = Result<Vec<u8>, AgentError>;
2052 fn into_future(self) -> Self::IntoFuture {
2053 Box::pin(self.call_and_wait())
2054 }
2055}
2056
2057#[cfg_attr(target_family = "wasm", async_trait(?Send))]
2059#[cfg_attr(not(target_family = "wasm"), async_trait)]
2060pub trait HttpService: Send + Sync + Debug {
2061 async fn call<'a>(
2063 &'a self,
2064 req: &'a (dyn Fn() -> Result<http::Request<Bytes>, AgentError> + Send + Sync),
2065 max_retries: usize,
2066 size_limit: Option<usize>,
2067 ) -> Result<http::Response<Bytes>, AgentError>;
2068}
2069
2070fn from_http_request(req: http::Request<Bytes>) -> Result<Request, AgentError> {
2072 let (parts, body) = req.into_parts();
2073 let body = reqwest::Body::from(body);
2074 let request = http::Request::from_parts(parts, body)
2077 .try_into()
2078 .map_err(|e: reqwest::Error| AgentError::InvalidReplicaUrl(e.to_string()))?;
2079
2080 Ok(request)
2081}
2082
2083#[cfg(not(target_family = "wasm"))]
2085async fn to_http_response(
2086 resp: Response,
2087 size_limit: Option<usize>,
2088) -> Result<http::Response<Bytes>, AgentError> {
2089 use http_body_util::{BodyExt, Limited};
2090
2091 let resp: http::Response<reqwest::Body> = resp.into();
2092 let (parts, body) = resp.into_parts();
2093 let body = Limited::new(body, size_limit.unwrap_or(usize::MAX));
2094 let body = body
2095 .collect()
2096 .await
2097 .map_err(|e| {
2098 AgentError::TransportError(TransportError::Generic(format!(
2099 "unable to read response body: {e:#}"
2100 )))
2101 })?
2102 .to_bytes();
2103 let resp = http::Response::from_parts(parts, body);
2104
2105 Ok(resp)
2106}
2107
2108#[cfg(target_family = "wasm")]
2112async fn to_http_response(
2113 resp: Response,
2114 size_limit: Option<usize>,
2115) -> Result<http::Response<Bytes>, AgentError> {
2116 use futures_util::StreamExt;
2117 use http_body::Frame;
2118 use http_body_util::{Limited, StreamBody};
2119
2120 let status = resp.status();
2122 let headers = resp.headers().clone();
2123
2124 let stream = resp.bytes_stream().map(|x| x.map(Frame::data));
2126 let body = StreamBody::new(stream);
2127 let body = Limited::new(body, size_limit.unwrap_or(usize::MAX));
2128 let body = http_body_util::BodyExt::collect(body)
2129 .await
2130 .map_err(|e| {
2131 AgentError::TransportError(TransportError::Generic(format!(
2132 "unable to read response body: {e:#}"
2133 )))
2134 })?
2135 .to_bytes();
2136
2137 let mut resp = http::Response::new(body);
2138 *resp.status_mut() = status;
2139 *resp.headers_mut() = headers;
2140
2141 Ok(resp)
2142}
2143
2144#[cfg(not(target_family = "wasm"))]
2145#[async_trait]
2146impl<T> HttpService for T
2147where
2148 for<'a> &'a T: Service<Request, Response = Response, Error = reqwest::Error>,
2149 for<'a> <&'a Self as Service<Request>>::Future: Send,
2150 T: Send + Sync + Debug + ?Sized,
2151{
2152 #[allow(clippy::needless_arbitrary_self_type)]
2153 async fn call<'a>(
2154 mut self: &'a Self,
2155 req: &'a (dyn Fn() -> Result<http::Request<Bytes>, AgentError> + Send + Sync),
2156 max_retries: usize,
2157 size_limit: Option<usize>,
2158 ) -> Result<http::Response<Bytes>, AgentError> {
2159 let mut retry_count = 0;
2160 loop {
2161 let request = from_http_request(req()?)?;
2162
2163 match Service::call(&mut self, request).await {
2164 Err(err) => {
2165 if err.is_connect() {
2167 if retry_count >= max_retries {
2168 return Err(AgentError::TransportError(TransportError::Reqwest(err)));
2169 }
2170 retry_count += 1;
2171 }
2172 else {
2174 return Err(AgentError::TransportError(TransportError::Reqwest(err)));
2175 }
2176 }
2177
2178 Ok(resp) => {
2179 let resp = to_http_response(resp, size_limit).await?;
2180 return Ok(resp);
2181 }
2182 }
2183 }
2184 }
2185}
2186
2187#[cfg(target_family = "wasm")]
2188#[async_trait(?Send)]
2189impl<T> HttpService for T
2190where
2191 for<'a> &'a T: Service<Request, Response = Response, Error = reqwest::Error>,
2192 T: Send + Sync + Debug + ?Sized,
2193{
2194 #[allow(clippy::needless_arbitrary_self_type)]
2195 async fn call<'a>(
2196 mut self: &'a Self,
2197 req: &'a (dyn Fn() -> Result<http::Request<Bytes>, AgentError> + Send + Sync),
2198 _retries: usize,
2199 _size_limit: Option<usize>,
2200 ) -> Result<http::Response<Bytes>, AgentError> {
2201 let request = from_http_request(req()?)?;
2202 let response = Service::call(&mut self, request)
2203 .await
2204 .map_err(|e| AgentError::TransportError(TransportError::Reqwest(e)))?;
2205
2206 to_http_response(response, _size_limit).await
2207 }
2208}
2209
2210#[derive(Debug)]
2211struct Retry429Logic {
2212 client: Client,
2213}
2214
2215#[cfg_attr(target_family = "wasm", async_trait(?Send))]
2216#[cfg_attr(not(target_family = "wasm"), async_trait)]
2217impl HttpService for Retry429Logic {
2218 async fn call<'a>(
2219 &'a self,
2220 req: &'a (dyn Fn() -> Result<http::Request<Bytes>, AgentError> + Send + Sync),
2221 _max_tcp_retries: usize,
2222 _size_limit: Option<usize>,
2223 ) -> Result<http::Response<Bytes>, AgentError> {
2224 let mut retries = 0;
2225 loop {
2226 #[cfg(not(target_family = "wasm"))]
2227 let resp = self.client.call(req, _max_tcp_retries, _size_limit).await?;
2228 #[cfg(target_family = "wasm")]
2230 let resp = {
2231 let request = from_http_request(req()?)?;
2232 let resp = self
2233 .client
2234 .execute(request)
2235 .await
2236 .map_err(|e| AgentError::TransportError(TransportError::Reqwest(e)))?;
2237
2238 to_http_response(resp, _size_limit).await?
2239 };
2240
2241 if resp.status() == StatusCode::TOO_MANY_REQUESTS {
2242 if retries == 6 {
2243 break Ok(resp);
2244 } else {
2245 retries += 1;
2246 crate::util::sleep(Duration::from_millis(250)).await;
2247 continue;
2248 }
2249 } else {
2250 break Ok(resp);
2251 }
2252 }
2253 }
2254}
2255
2256#[cfg(all(test, not(target_family = "wasm")))]
2257mod offline_tests {
2258 use super::*;
2259 use tokio::net::TcpListener;
2260 #[test]
2263 fn rounded_expiry() {
2264 let agent = Agent::builder()
2265 .with_url("http://not-a-real-url")
2266 .build()
2267 .unwrap();
2268 let mut prev_expiry = None;
2269 let mut num_timestamps = 0;
2270 for _ in 0..6 {
2271 let update = agent
2272 .update(&Principal::management_canister(), "not_a_method")
2273 .sign()
2274 .unwrap();
2275 if prev_expiry < Some(update.ingress_expiry) {
2276 prev_expiry = Some(update.ingress_expiry);
2277 num_timestamps += 1;
2278 }
2279 }
2280 assert!(num_timestamps <= 2, "num_timestamps:{num_timestamps} > 2");
2282 }
2283
2284 #[tokio::test]
2285 async fn client_ratelimit() {
2286 let mock_server = TcpListener::bind("127.0.0.1:0").await.unwrap();
2287 let count = Arc::new(Mutex::new(0));
2288 let port = mock_server.local_addr().unwrap().port();
2289 tokio::spawn({
2290 let count = count.clone();
2291 async move {
2292 loop {
2293 let (mut conn, _) = mock_server.accept().await.unwrap();
2294 *count.lock().unwrap() += 1;
2295 tokio::spawn(
2296 async move { tokio::io::copy(&mut conn, &mut tokio::io::sink()).await },
2298 );
2299 }
2300 }
2301 });
2302 let agent = Agent::builder()
2303 .with_http_client(Client::builder().http1_only().build().unwrap())
2304 .with_url(format!("http://127.0.0.1:{port}"))
2305 .with_max_concurrent_requests(2)
2306 .build()
2307 .unwrap();
2308 for _ in 0..3 {
2309 let agent = agent.clone();
2310 tokio::spawn(async move {
2311 agent
2312 .query(&"ryjl3-tyaaa-aaaaa-aaaba-cai".parse().unwrap(), "greet")
2313 .call()
2314 .await
2315 });
2316 }
2317 crate::util::sleep(Duration::from_millis(250)).await;
2318 assert_eq!(*count.lock().unwrap(), 2);
2319 }
2320}