1pub(crate) mod agent_config;
3pub mod agent_error;
4pub(crate) mod builder;
5#[doc(hidden)]
7#[deprecated(since = "0.38.0", note = "use the AgentBuilder methods")]
8pub mod http_transport;
9pub(crate) mod nonce;
10pub(crate) mod response_authentication;
11pub mod route_provider;
12pub mod status;
13pub mod subnet;
14
15pub use agent_config::AgentConfig;
16pub use agent_error::AgentError;
17use agent_error::{HttpErrorPayload, Operation};
18use async_lock::Semaphore;
19use async_trait::async_trait;
20pub use builder::AgentBuilder;
21use bytes::Bytes;
22use cached::{Cached, TimedCache};
23use http::{header::CONTENT_TYPE, HeaderMap, Method, StatusCode, Uri};
24use ic_ed25519::{PublicKey, SignatureError};
25#[doc(inline)]
26pub use ic_transport_types::{
27 signed, CallResponse, Envelope, EnvelopeContent, RejectCode, RejectResponse, ReplyResponse,
28 RequestStatusResponse,
29};
30pub use nonce::{NonceFactory, NonceGenerator};
31use rangemap::RangeInclusiveMap;
32use reqwest::{Client, Request, Response};
33use route_provider::{
34 dynamic_routing::{dynamic_route_provider::DynamicRouteProviderBuilder, node::Node},
35 RouteProvider, UrlUntilReady,
36};
37pub use subnet::{Subnet, SubnetType};
38use time::OffsetDateTime;
39use tower_service::Service;
40
41#[cfg(test)]
42mod agent_test;
43
44use crate::{
45 agent::response_authentication::{
46 extract_der, lookup_canister_info, lookup_canister_metadata, lookup_canister_ranges,
47 lookup_incomplete_subnet, lookup_request_status, lookup_subnet_and_ranges,
48 lookup_subnet_canister_ranges, lookup_subnet_metrics, lookup_time, lookup_tree,
49 lookup_value,
50 },
51 agent_error::TransportError,
52 export::Principal,
53 identity::Identity,
54 to_request_id, RequestId,
55};
56use backoff::{backoff::Backoff, ExponentialBackoffBuilder};
57use backoff::{exponential::ExponentialBackoff, SystemClock};
58use ic_certification::{Certificate, Delegation, Label};
59use ic_transport_types::{
60 signed::{SignedQuery, SignedRequestStatus, SignedUpdate},
61 QueryResponse, ReadStateResponse, SubnetMetrics, TransportCallResponse,
62};
63use serde::Serialize;
64use status::Status;
65use std::{
66 borrow::Cow,
67 convert::TryFrom,
68 fmt::{self, Debug},
69 future::{Future, IntoFuture},
70 pin::Pin,
71 str::FromStr,
72 sync::{Arc, Mutex, RwLock},
73 task::{Context, Poll},
74 time::Duration,
75};
76
77use crate::agent::response_authentication::lookup_api_boundary_nodes;
78
79pub(crate) const IC_STATE_ROOT_DOMAIN_SEPARATOR: &[u8; 14] = b"\x0Dic-state-root";
80
81pub(crate) const IC_ROOT_KEY: &[u8; 133] = b"\x30\x81\x82\x30\x1d\x06\x0d\x2b\x06\x01\x04\x01\x82\xdc\x7c\x05\x03\x01\x02\x01\x06\x0c\x2b\x06\x01\x04\x01\x82\xdc\x7c\x05\x03\x02\x01\x03\x61\x00\x81\x4c\x0e\x6e\xc7\x1f\xab\x58\x3b\x08\xbd\x81\x37\x3c\x25\x5c\x3c\x37\x1b\x2e\x84\x86\x3c\x98\xa4\xf1\xe0\x8b\x74\x23\x5d\x14\xfb\x5d\x9c\x0c\xd5\x46\xd9\x68\x5f\x91\x3a\x0c\x0b\x2c\xc5\x34\x15\x83\xbf\x4b\x43\x92\xe4\x67\xdb\x96\xd6\x5b\x9b\xb4\xcb\x71\x71\x12\xf8\x47\x2e\x0d\x5a\x4d\x14\x50\x5f\xfd\x74\x84\xb0\x12\x91\x09\x1c\x5f\x87\xb9\x88\x83\x46\x3f\x98\x09\x1a\x0b\xaa\xae";
82
83#[cfg(not(target_family = "wasm"))]
84type AgentFuture<'a, V> = Pin<Box<dyn Future<Output = Result<V, AgentError>> + Send + 'a>>;
85
86#[cfg(target_family = "wasm")]
87type AgentFuture<'a, V> = Pin<Box<dyn Future<Output = Result<V, AgentError>> + 'a>>;
88
89#[cfg_attr(unix, doc = " ```rust")] #[cfg_attr(not(unix), doc = " ```ignore")]
93#[derive(Clone)]
154pub struct Agent {
155 nonce_factory: Arc<dyn NonceGenerator>,
156 identity: Arc<dyn Identity>,
157 ingress_expiry: Duration,
158 root_key: Arc<RwLock<Vec<u8>>>,
159 client: Arc<dyn HttpService>,
160 route_provider: Arc<dyn RouteProvider>,
161 subnet_key_cache: Arc<Mutex<SubnetCache>>,
162 concurrent_requests_semaphore: Arc<Semaphore>,
163 verify_query_signatures: bool,
164 max_response_body_size: Option<usize>,
165 max_polling_time: Duration,
166 #[allow(dead_code)]
167 max_tcp_error_retries: usize,
168}
169
170impl fmt::Debug for Agent {
171 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
172 f.debug_struct("Agent")
173 .field("ingress_expiry", &self.ingress_expiry)
174 .finish_non_exhaustive()
175 }
176}
177
178impl Agent {
179 pub fn builder() -> builder::AgentBuilder {
182 Default::default()
183 }
184
185 pub fn new(config: agent_config::AgentConfig) -> Result<Agent, AgentError> {
187 let client = config.http_service.unwrap_or_else(|| {
188 Arc::new(Retry429Logic {
189 client: config.client.unwrap_or_else(|| {
190 #[cfg(not(target_family = "wasm"))]
191 {
192 Client::builder()
193 .use_rustls_tls()
194 .timeout(Duration::from_secs(360))
195 .build()
196 .expect("Could not create HTTP client.")
197 }
198 #[cfg(all(target_family = "wasm", feature = "wasm-bindgen"))]
199 {
200 Client::new()
201 }
202 }),
203 })
204 });
205 Ok(Agent {
206 nonce_factory: config.nonce_factory,
207 identity: config.identity,
208 ingress_expiry: config.ingress_expiry,
209 root_key: Arc::new(RwLock::new(IC_ROOT_KEY.to_vec())),
210 client: client.clone(),
211 route_provider: if let Some(route_provider) = config.route_provider {
212 route_provider
213 } else if let Some(url) = config.url {
214 if config.background_dynamic_routing {
215 assert!(
216 url.scheme() == "https" && url.path() == "/" && url.port().is_none() && url.domain().is_some(),
217 "in dynamic routing mode, URL must be in the exact form https://domain with no path, port, IP, or non-HTTPS scheme"
218 );
219 let seeds = vec![Node::new(url.domain().unwrap()).unwrap()];
220 UrlUntilReady::new(url, async move {
221 let provider =
222 DynamicRouteProviderBuilder::new(seeds, client, None).build();
223 provider.start().await;
224 provider
225 }) as Arc<dyn RouteProvider>
226 } else {
227 Arc::new(url)
228 }
229 } else {
230 panic!("either route_provider or url must be specified");
231 },
232 subnet_key_cache: Arc::new(Mutex::new(SubnetCache::new())),
233 verify_query_signatures: config.verify_query_signatures,
234 concurrent_requests_semaphore: Arc::new(Semaphore::new(config.max_concurrent_requests)),
235 max_response_body_size: config.max_response_body_size,
236 max_tcp_error_retries: config.max_tcp_error_retries,
237 max_polling_time: config.max_polling_time,
238 })
239 }
240
241 pub fn set_identity<I>(&mut self, identity: I)
247 where
248 I: 'static + Identity,
249 {
250 self.identity = Arc::new(identity);
251 }
252 pub fn set_arc_identity(&mut self, identity: Arc<dyn Identity>) {
258 self.identity = identity;
259 }
260
261 pub async fn fetch_root_key(&self) -> Result<(), AgentError> {
270 if self.read_root_key()[..] != IC_ROOT_KEY[..] {
271 return Ok(());
273 }
274 let status = self.status().await?;
275 let Some(root_key) = status.root_key else {
276 return Err(AgentError::NoRootKeyInStatus(status));
277 };
278 self.set_root_key(root_key);
279 Ok(())
280 }
281
282 pub fn set_root_key(&self, root_key: Vec<u8>) {
287 *self.root_key.write().unwrap() = root_key;
288 }
289
290 pub fn read_root_key(&self) -> Vec<u8> {
292 self.root_key.read().unwrap().clone()
293 }
294
295 fn get_expiry_date(&self) -> u64 {
296 let expiry_raw = OffsetDateTime::now_utc() + self.ingress_expiry;
297 let mut rounded = expiry_raw.replace_nanosecond(0).unwrap();
298 if self.ingress_expiry.as_secs() > 90 {
299 rounded = rounded.replace_second(0).unwrap();
300 }
301 rounded.unix_timestamp_nanos().try_into().unwrap()
302 }
303
304 pub fn get_principal(&self) -> Result<Principal, String> {
306 self.identity.sender()
307 }
308
309 async fn query_endpoint<A>(
310 &self,
311 effective_canister_id: Principal,
312 serialized_bytes: Vec<u8>,
313 ) -> Result<A, AgentError>
314 where
315 A: serde::de::DeserializeOwned,
316 {
317 let _permit = self.concurrent_requests_semaphore.acquire().await;
318 let bytes = self
319 .execute(
320 Method::POST,
321 &format!("api/v3/canister/{}/query", effective_canister_id.to_text()),
322 Some(serialized_bytes),
323 )
324 .await?
325 .1;
326 serde_cbor::from_slice(&bytes).map_err(AgentError::InvalidCborData)
327 }
328
329 async fn read_state_endpoint<A>(
330 &self,
331 effective_canister_id: Principal,
332 serialized_bytes: Vec<u8>,
333 ) -> Result<A, AgentError>
334 where
335 A: serde::de::DeserializeOwned,
336 {
337 let _permit = self.concurrent_requests_semaphore.acquire().await;
338 let endpoint = format!(
339 "api/v3/canister/{}/read_state",
340 effective_canister_id.to_text()
341 );
342 let bytes = self
343 .execute(Method::POST, &endpoint, Some(serialized_bytes))
344 .await?
345 .1;
346 serde_cbor::from_slice(&bytes).map_err(AgentError::InvalidCborData)
347 }
348
349 async fn read_subnet_state_endpoint<A>(
350 &self,
351 subnet_id: Principal,
352 serialized_bytes: Vec<u8>,
353 ) -> Result<A, AgentError>
354 where
355 A: serde::de::DeserializeOwned,
356 {
357 let _permit = self.concurrent_requests_semaphore.acquire().await;
358 let endpoint = format!("api/v3/subnet/{}/read_state", subnet_id.to_text());
359 let bytes = self
360 .execute(Method::POST, &endpoint, Some(serialized_bytes))
361 .await?
362 .1;
363 serde_cbor::from_slice(&bytes).map_err(AgentError::InvalidCborData)
364 }
365
366 async fn call_endpoint(
367 &self,
368 effective_canister_id: Principal,
369 serialized_bytes: Vec<u8>,
370 ) -> Result<TransportCallResponse, AgentError> {
371 let _permit = self.concurrent_requests_semaphore.acquire().await;
372 let endpoint = format!("api/v4/canister/{}/call", effective_canister_id.to_text());
373 let (status_code, response_body) = self
374 .execute(Method::POST, &endpoint, Some(serialized_bytes))
375 .await?;
376
377 if status_code == StatusCode::ACCEPTED {
378 return Ok(TransportCallResponse::Accepted);
379 }
380
381 serde_cbor::from_slice(&response_body).map_err(AgentError::InvalidCborData)
382 }
383
384 #[allow(clippy::too_many_arguments)]
387 async fn query_raw(
388 &self,
389 canister_id: Principal,
390 effective_canister_id: Principal,
391 method_name: String,
392 arg: Vec<u8>,
393 ingress_expiry_datetime: Option<u64>,
394 use_nonce: bool,
395 explicit_verify_query_signatures: Option<bool>,
396 ) -> Result<Vec<u8>, AgentError> {
397 let operation = Operation::Call {
398 canister: canister_id,
399 method: method_name.clone(),
400 };
401 let content = self.query_content(
402 canister_id,
403 method_name,
404 arg,
405 ingress_expiry_datetime,
406 use_nonce,
407 )?;
408 let serialized_bytes = sign_envelope(&content, self.identity.clone())?;
409 self.query_inner(
410 effective_canister_id,
411 serialized_bytes,
412 content.to_request_id(),
413 explicit_verify_query_signatures,
414 operation,
415 )
416 .await
417 }
418
419 pub async fn query_signed(
423 &self,
424 effective_canister_id: Principal,
425 signed_query: Vec<u8>,
426 ) -> Result<Vec<u8>, AgentError> {
427 let envelope: Envelope =
428 serde_cbor::from_slice(&signed_query).map_err(AgentError::InvalidCborData)?;
429 let EnvelopeContent::Query {
430 canister_id,
431 method_name,
432 ..
433 } = &*envelope.content
434 else {
435 return Err(AgentError::CallDataMismatch {
436 field: "request_type".to_string(),
437 value_arg: "query".to_string(),
438 value_cbor: if matches!(*envelope.content, EnvelopeContent::Call { .. }) {
439 "update"
440 } else {
441 "read_state"
442 }
443 .to_string(),
444 });
445 };
446 let operation = Operation::Call {
447 canister: *canister_id,
448 method: method_name.clone(),
449 };
450 self.query_inner(
451 effective_canister_id,
452 signed_query,
453 envelope.content.to_request_id(),
454 None,
455 operation,
456 )
457 .await
458 }
459
460 async fn query_inner(
464 &self,
465 effective_canister_id: Principal,
466 signed_query: Vec<u8>,
467 request_id: RequestId,
468 explicit_verify_query_signatures: Option<bool>,
469 operation: Operation,
470 ) -> Result<Vec<u8>, AgentError> {
471 let response = if explicit_verify_query_signatures.unwrap_or(self.verify_query_signatures) {
472 let (response, mut subnet) = futures_util::try_join!(
473 self.query_endpoint::<QueryResponse>(effective_canister_id, signed_query),
474 self.get_subnet_by_canister(&effective_canister_id)
475 )?;
476 if response.signatures().is_empty() {
477 return Err(AgentError::MissingSignature);
478 } else if response.signatures().len() > subnet.node_keys.len() {
479 return Err(AgentError::TooManySignatures {
480 had: response.signatures().len(),
481 needed: subnet.node_keys.len(),
482 });
483 }
484 for signature in response.signatures() {
485 if OffsetDateTime::now_utc()
486 - OffsetDateTime::from_unix_timestamp_nanos(signature.timestamp.into()).unwrap()
487 > self.ingress_expiry
488 {
489 return Err(AgentError::CertificateOutdated(self.ingress_expiry));
490 }
491 let signable = response.signable(request_id, signature.timestamp);
492 let node_key = if let Some(node_key) = subnet.node_keys.get(&signature.identity) {
493 node_key
494 } else {
495 subnet = self
496 .fetch_subnet_by_canister(&effective_canister_id)
497 .await?;
498 subnet
499 .node_keys
500 .get(&signature.identity)
501 .ok_or(AgentError::CertificateNotAuthorized())?
502 };
503 if node_key.len() != 44 {
504 return Err(AgentError::DerKeyLengthMismatch {
505 expected: 44,
506 actual: node_key.len(),
507 });
508 }
509 const DER_PREFIX: [u8; 12] = [48, 42, 48, 5, 6, 3, 43, 101, 112, 3, 33, 0];
510 if node_key[..12] != DER_PREFIX {
511 return Err(AgentError::DerPrefixMismatch {
512 expected: DER_PREFIX.to_vec(),
513 actual: node_key[..12].to_vec(),
514 });
515 }
516 let pubkey = PublicKey::deserialize_raw(&node_key[12..])
517 .map_err(|_| AgentError::MalformedPublicKey)?;
518
519 match pubkey.verify_signature(&signable, &signature.signature[..]) {
520 Ok(()) => (),
521 Err(SignatureError::InvalidSignature) => {
522 return Err(AgentError::QuerySignatureVerificationFailed)
523 }
524 Err(SignatureError::InvalidLength) => {
525 return Err(AgentError::MalformedSignature)
526 }
527 _ => unreachable!(),
528 }
529 }
530 response
531 } else {
532 self.query_endpoint::<QueryResponse>(effective_canister_id, signed_query)
533 .await?
534 };
535
536 match response {
537 QueryResponse::Replied { reply, .. } => Ok(reply.arg),
538 QueryResponse::Rejected { reject, .. } => Err(AgentError::UncertifiedReject {
539 reject,
540 operation: Some(operation),
541 }),
542 }
543 }
544
545 fn query_content(
546 &self,
547 canister_id: Principal,
548 method_name: String,
549 arg: Vec<u8>,
550 ingress_expiry_datetime: Option<u64>,
551 use_nonce: bool,
552 ) -> Result<EnvelopeContent, AgentError> {
553 Ok(EnvelopeContent::Query {
554 sender: self.identity.sender().map_err(AgentError::SigningError)?,
555 canister_id,
556 method_name,
557 arg,
558 ingress_expiry: ingress_expiry_datetime.unwrap_or_else(|| self.get_expiry_date()),
559 nonce: use_nonce.then(|| self.nonce_factory.generate()).flatten(),
560 sender_info: self.identity.sender_info(),
561 })
562 }
563
564 async fn update_raw(
566 &self,
567 canister_id: Principal,
568 effective_canister_id: Principal,
569 method_name: String,
570 arg: Vec<u8>,
571 ingress_expiry_datetime: Option<u64>,
572 ) -> Result<CallResponse<(Vec<u8>, Certificate)>, AgentError> {
573 let nonce = self.nonce_factory.generate();
574 let content = self.update_content(
575 canister_id,
576 method_name.clone(),
577 arg,
578 ingress_expiry_datetime,
579 nonce,
580 )?;
581 let operation = Some(Operation::Call {
582 canister: canister_id,
583 method: method_name,
584 });
585 let request_id = to_request_id(&content)?;
586 let serialized_bytes = sign_envelope(&content, self.identity.clone())?;
587
588 let response_body = self
589 .call_endpoint(effective_canister_id, serialized_bytes)
590 .await?;
591
592 match response_body {
593 TransportCallResponse::Replied { certificate } => {
594 let certificate =
595 serde_cbor::from_slice(&certificate).map_err(AgentError::InvalidCborData)?;
596
597 self.verify(&certificate, effective_canister_id)?;
598 let status = lookup_request_status(&certificate, &request_id)?;
599
600 match status {
601 RequestStatusResponse::Replied(reply) => {
602 Ok(CallResponse::Response((reply.arg, certificate)))
603 }
604 RequestStatusResponse::Rejected(reject_response) => {
605 Err(AgentError::CertifiedReject {
606 reject: reject_response,
607 operation,
608 })?
609 }
610 _ => Ok(CallResponse::Poll(request_id)),
611 }
612 }
613 TransportCallResponse::Accepted => Ok(CallResponse::Poll(request_id)),
614 TransportCallResponse::NonReplicatedRejection(reject_response) => {
615 Err(AgentError::UncertifiedReject {
616 reject: reject_response,
617 operation,
618 })
619 }
620 }
621 }
622
623 pub async fn update_signed(
627 &self,
628 effective_canister_id: Principal,
629 signed_update: Vec<u8>,
630 ) -> Result<CallResponse<Vec<u8>>, AgentError> {
631 let envelope: Envelope =
632 serde_cbor::from_slice(&signed_update).map_err(AgentError::InvalidCborData)?;
633 let EnvelopeContent::Call {
634 canister_id,
635 method_name,
636 ..
637 } = &*envelope.content
638 else {
639 return Err(AgentError::CallDataMismatch {
640 field: "request_type".to_string(),
641 value_arg: "update".to_string(),
642 value_cbor: if matches!(*envelope.content, EnvelopeContent::Query { .. }) {
643 "query"
644 } else {
645 "read_state"
646 }
647 .to_string(),
648 });
649 };
650 let operation = Some(Operation::Call {
651 canister: *canister_id,
652 method: method_name.clone(),
653 });
654 let request_id = to_request_id(&envelope.content)?;
655
656 let response_body = self
657 .call_endpoint(effective_canister_id, signed_update)
658 .await?;
659
660 match response_body {
661 TransportCallResponse::Replied { certificate } => {
662 let certificate =
663 serde_cbor::from_slice(&certificate).map_err(AgentError::InvalidCborData)?;
664
665 self.verify(&certificate, effective_canister_id)?;
666 let status = lookup_request_status(&certificate, &request_id)?;
667
668 match status {
669 RequestStatusResponse::Replied(reply) => Ok(CallResponse::Response(reply.arg)),
670 RequestStatusResponse::Rejected(reject_response) => {
671 Err(AgentError::CertifiedReject {
672 reject: reject_response,
673 operation,
674 })?
675 }
676 _ => Ok(CallResponse::Poll(request_id)),
677 }
678 }
679 TransportCallResponse::Accepted => Ok(CallResponse::Poll(request_id)),
680 TransportCallResponse::NonReplicatedRejection(reject_response) => {
681 Err(AgentError::UncertifiedReject {
682 reject: reject_response,
683 operation,
684 })
685 }
686 }
687 }
688
689 fn update_content(
690 &self,
691 canister_id: Principal,
692 method_name: String,
693 arg: Vec<u8>,
694 ingress_expiry_datetime: Option<u64>,
695 nonce: Option<Vec<u8>>,
696 ) -> Result<EnvelopeContent, AgentError> {
697 Ok(EnvelopeContent::Call {
698 canister_id,
699 method_name,
700 arg,
701 nonce,
702 sender: self.identity.sender().map_err(AgentError::SigningError)?,
703 ingress_expiry: ingress_expiry_datetime.unwrap_or_else(|| self.get_expiry_date()),
704 sender_info: self.identity.sender_info(),
705 })
706 }
707
708 fn get_retry_policy(&self) -> ExponentialBackoff<SystemClock> {
709 ExponentialBackoffBuilder::new()
710 .with_initial_interval(Duration::from_millis(500))
711 .with_max_interval(Duration::from_secs(1))
712 .with_multiplier(1.4)
713 .with_max_elapsed_time(Some(self.max_polling_time))
714 .build()
715 }
716
717 pub async fn wait_signed(
719 &self,
720 request_id: &RequestId,
721 effective_canister_id: Principal,
722 signed_request_status: Vec<u8>,
723 ) -> Result<(Vec<u8>, Certificate), AgentError> {
724 let mut retry_policy = self.get_retry_policy();
725
726 let mut request_accepted = false;
727 loop {
728 let (resp, cert) = self
729 .request_status_signed(
730 request_id,
731 effective_canister_id,
732 signed_request_status.clone(),
733 )
734 .await?;
735 match resp {
736 RequestStatusResponse::Unknown => {
737 if retry_policy.get_elapsed_time() > Duration::from_secs(5 * 60) {
739 return Err(AgentError::TimeoutWaitingForResponse());
740 }
741 }
742
743 RequestStatusResponse::Received | RequestStatusResponse::Processing => {
744 if !request_accepted {
745 retry_policy.reset();
746 request_accepted = true;
747 }
748 }
749
750 RequestStatusResponse::Replied(ReplyResponse { arg, .. }) => {
751 return Ok((arg, cert))
752 }
753
754 RequestStatusResponse::Rejected(response) => {
755 return Err(AgentError::CertifiedReject {
756 reject: response,
757 operation: None,
758 })
759 }
760
761 RequestStatusResponse::Done => {
762 return Err(AgentError::RequestStatusDoneNoReply(String::from(
763 *request_id,
764 )))
765 }
766 };
767
768 match retry_policy.next_backoff() {
769 Some(duration) => crate::util::sleep(duration).await,
770
771 None => return Err(AgentError::TimeoutWaitingForResponse()),
772 }
773 }
774 }
775
776 pub async fn wait(
778 &self,
779 request_id: &RequestId,
780 effective_canister_id: Principal,
781 ) -> Result<(Vec<u8>, Certificate), AgentError> {
782 self.wait_inner(request_id, effective_canister_id, None)
783 .await
784 }
785
786 async fn wait_inner(
787 &self,
788 request_id: &RequestId,
789 effective_canister_id: Principal,
790 operation: Option<Operation>,
791 ) -> Result<(Vec<u8>, Certificate), AgentError> {
792 let mut retry_policy = self.get_retry_policy();
793
794 let mut request_accepted = false;
795 loop {
796 let (resp, cert) = self
797 .request_status_raw(request_id, effective_canister_id)
798 .await?;
799 match resp {
800 RequestStatusResponse::Unknown => {
801 if retry_policy.get_elapsed_time() > Duration::from_secs(5 * 60) {
803 return Err(AgentError::TimeoutWaitingForResponse());
804 }
805 }
806
807 RequestStatusResponse::Received | RequestStatusResponse::Processing => {
808 if !request_accepted {
809 retry_policy.reset();
817 request_accepted = true;
818 }
819 }
820
821 RequestStatusResponse::Replied(ReplyResponse { arg, .. }) => {
822 return Ok((arg, cert))
823 }
824
825 RequestStatusResponse::Rejected(response) => {
826 return Err(AgentError::CertifiedReject {
827 reject: response,
828 operation,
829 })
830 }
831
832 RequestStatusResponse::Done => {
833 return Err(AgentError::RequestStatusDoneNoReply(String::from(
834 *request_id,
835 )))
836 }
837 };
838
839 match retry_policy.next_backoff() {
840 Some(duration) => crate::util::sleep(duration).await,
841
842 None => return Err(AgentError::TimeoutWaitingForResponse()),
843 }
844 }
845 }
846
847 pub async fn read_state_raw(
850 &self,
851 paths: Vec<Vec<Label>>,
852 effective_canister_id: Principal,
853 ) -> Result<Certificate, AgentError> {
854 let content = self.read_state_content(paths)?;
855 let serialized_bytes = sign_envelope(&content, self.identity.clone())?;
856
857 let read_state_response: ReadStateResponse = self
858 .read_state_endpoint(effective_canister_id, serialized_bytes)
859 .await?;
860 let cert: Certificate = serde_cbor::from_slice(&read_state_response.certificate)
861 .map_err(AgentError::InvalidCborData)?;
862 self.verify(&cert, effective_canister_id)?;
863 Ok(cert)
864 }
865
866 pub async fn read_subnet_state_raw(
869 &self,
870 paths: Vec<Vec<Label>>,
871 subnet_id: Principal,
872 ) -> Result<Certificate, AgentError> {
873 let content = self.read_state_content(paths)?;
874 let serialized_bytes = sign_envelope(&content, self.identity.clone())?;
875
876 let read_state_response: ReadStateResponse = self
877 .read_subnet_state_endpoint(subnet_id, serialized_bytes)
878 .await?;
879 let cert: Certificate = serde_cbor::from_slice(&read_state_response.certificate)
880 .map_err(AgentError::InvalidCborData)?;
881 self.verify_for_subnet(&cert, subnet_id)?;
882 Ok(cert)
883 }
884
885 fn read_state_content(&self, paths: Vec<Vec<Label>>) -> Result<EnvelopeContent, AgentError> {
886 Ok(EnvelopeContent::ReadState {
887 sender: self.identity.sender().map_err(AgentError::SigningError)?,
888 paths,
889 ingress_expiry: self.get_expiry_date(),
890 })
891 }
892
893 pub fn verify(
896 &self,
897 cert: &Certificate,
898 effective_canister_id: Principal,
899 ) -> Result<(), AgentError> {
900 self.verify_cert(cert, effective_canister_id)?;
901 self.verify_cert_timestamp(cert)?;
902 Ok(())
903 }
904
905 fn verify_cert(
906 &self,
907 cert: &Certificate,
908 effective_canister_id: Principal,
909 ) -> Result<(), AgentError> {
910 let sig = &cert.signature;
911
912 let root_hash = cert.tree.digest();
913 let mut msg = vec![];
914 msg.extend_from_slice(IC_STATE_ROOT_DOMAIN_SEPARATOR);
915 msg.extend_from_slice(&root_hash);
916
917 let der_key = self.check_delegation(&cert.delegation, effective_canister_id)?;
918 let key = extract_der(der_key)?;
919
920 ic_verify_bls_signature::verify_bls_signature(sig, &msg, &key)
921 .map_err(|_| AgentError::CertificateVerificationFailed())?;
922 Ok(())
923 }
924
925 pub fn verify_for_subnet(
928 &self,
929 cert: &Certificate,
930 subnet_id: Principal,
931 ) -> Result<(), AgentError> {
932 self.verify_cert_for_subnet(cert, subnet_id)?;
933 self.verify_cert_timestamp(cert)?;
934 Ok(())
935 }
936
937 fn verify_cert_for_subnet(
938 &self,
939 cert: &Certificate,
940 subnet_id: Principal,
941 ) -> Result<(), AgentError> {
942 let sig = &cert.signature;
943
944 let root_hash = cert.tree.digest();
945 let mut msg = vec![];
946 msg.extend_from_slice(IC_STATE_ROOT_DOMAIN_SEPARATOR);
947 msg.extend_from_slice(&root_hash);
948
949 let der_key = self.check_delegation_for_subnet(&cert.delegation, subnet_id)?;
950 let key = extract_der(der_key)?;
951
952 ic_verify_bls_signature::verify_bls_signature(sig, &msg, &key)
953 .map_err(|_| AgentError::CertificateVerificationFailed())?;
954 Ok(())
955 }
956
957 fn verify_cert_timestamp(&self, cert: &Certificate) -> Result<(), AgentError> {
958 let time = lookup_time(cert)?;
961 if (OffsetDateTime::now_utc()
962 - OffsetDateTime::from_unix_timestamp_nanos(time.into()).unwrap())
963 > self.ingress_expiry
964 {
965 Err(AgentError::CertificateOutdated(self.ingress_expiry))
966 } else {
967 Ok(())
968 }
969 }
970
971 fn check_delegation(
972 &self,
973 delegation: &Option<Delegation>,
974 effective_canister_id: Principal,
975 ) -> Result<Vec<u8>, AgentError> {
976 match delegation {
977 None => Ok(self.read_root_key()),
978 Some(delegation) => {
979 let cert: Certificate = serde_cbor::from_slice(&delegation.certificate)
980 .map_err(AgentError::InvalidCborData)?;
981 if cert.delegation.is_some() {
982 return Err(AgentError::CertificateHasTooManyDelegations);
983 }
984 self.verify_cert(&cert, effective_canister_id)?;
985 let canister_range_shards_lookup =
986 ["canister_ranges".as_bytes(), delegation.subnet_id.as_ref()];
987 let ranges: Vec<(Principal, Principal)> =
988 match lookup_tree(&cert.tree, canister_range_shards_lookup) {
989 Ok(canister_range_shards) => {
990 let mut shard_paths = canister_range_shards
991 .list_paths() .into_iter()
993 .map(|mut x| {
994 x.pop() .ok_or_else(AgentError::CertificateVerificationFailed)
996 })
997 .collect::<Result<Vec<_>, _>>()?;
998 if shard_paths.is_empty() {
999 return Err(AgentError::CertificateNotAuthorized());
1000 }
1001 shard_paths.sort_unstable();
1002 let shard_division = shard_paths.partition_point(|shard| {
1003 shard.as_bytes() <= effective_canister_id.as_slice()
1004 });
1005 if shard_division == 0 {
1006 return Err(AgentError::CertificateNotAuthorized());
1008 }
1009 let max_potential_shard = &shard_paths[shard_division - 1];
1010 let canister_range = lookup_value(
1011 &canister_range_shards,
1012 [max_potential_shard.as_bytes()],
1013 )?;
1014 serde_cbor::from_slice(canister_range)
1015 .map_err(AgentError::InvalidCborData)?
1016 }
1017 Err(AgentError::LookupPathAbsent(_) | AgentError::LookupPathUnknown(_)) => {
1019 let subnet_ranges_path = [
1020 "subnet".as_bytes(),
1021 delegation.subnet_id.as_ref(),
1022 "canister_ranges".as_bytes(),
1023 ];
1024 let canister_range = lookup_value(&cert.tree, subnet_ranges_path)?;
1025 serde_cbor::from_slice(canister_range)
1026 .map_err(AgentError::InvalidCborData)?
1027 }
1028 Err(e) => return Err(e),
1029 };
1030 if !principal_is_within_ranges(&effective_canister_id, &ranges[..]) {
1031 return Err(AgentError::CertificateNotAuthorized());
1033 }
1034
1035 let public_key_path = [
1036 "subnet".as_bytes(),
1037 delegation.subnet_id.as_ref(),
1038 "public_key".as_bytes(),
1039 ];
1040 lookup_value(&cert.tree, public_key_path).map(<[u8]>::to_vec)
1041 }
1042 }
1043 }
1044
1045 fn check_delegation_for_subnet(
1046 &self,
1047 delegation: &Option<Delegation>,
1048 subnet_id: Principal,
1049 ) -> Result<Vec<u8>, AgentError> {
1050 match delegation {
1051 None => Ok(self.read_root_key()),
1052 Some(delegation) => {
1053 let cert: Certificate = serde_cbor::from_slice(&delegation.certificate)
1054 .map_err(AgentError::InvalidCborData)?;
1055 if cert.delegation.is_some() {
1056 return Err(AgentError::CertificateHasTooManyDelegations);
1057 }
1058 self.verify_cert_for_subnet(&cert, subnet_id)?;
1059 let public_key_path = [
1060 "subnet".as_bytes(),
1061 subnet_id.as_ref(),
1062 "public_key".as_bytes(),
1063 ];
1064 let pk = lookup_value(&cert.tree, public_key_path)
1065 .map_err(|_| AgentError::CertificateNotAuthorized())?
1066 .to_vec();
1067 Ok(pk)
1068 }
1069 }
1070 }
1071
1072 pub async fn read_state_canister_info(
1075 &self,
1076 canister_id: Principal,
1077 path: &str,
1078 ) -> Result<Vec<u8>, AgentError> {
1079 let paths: Vec<Vec<Label>> = vec![vec![
1080 "canister".into(),
1081 Label::from_bytes(canister_id.as_slice()),
1082 path.into(),
1083 ]];
1084
1085 let cert = self.read_state_raw(paths, canister_id).await?;
1086
1087 lookup_canister_info(cert, canister_id, path)
1088 }
1089
1090 pub async fn read_state_canister_controllers(
1092 &self,
1093 canister_id: Principal,
1094 ) -> Result<Vec<Principal>, AgentError> {
1095 let blob = self
1096 .read_state_canister_info(canister_id, "controllers")
1097 .await?;
1098 let controllers: Vec<Principal> =
1099 serde_cbor::from_slice(&blob).map_err(AgentError::InvalidCborData)?;
1100 Ok(controllers)
1101 }
1102
1103 pub async fn read_state_canister_module_hash(
1105 &self,
1106 canister_id: Principal,
1107 ) -> Result<Vec<u8>, AgentError> {
1108 self.read_state_canister_info(canister_id, "module_hash")
1109 .await
1110 }
1111
1112 pub async fn read_state_canister_metadata(
1114 &self,
1115 canister_id: Principal,
1116 path: &str,
1117 ) -> Result<Vec<u8>, AgentError> {
1118 let paths: Vec<Vec<Label>> = vec![vec![
1119 "canister".into(),
1120 Label::from_bytes(canister_id.as_slice()),
1121 "metadata".into(),
1122 path.into(),
1123 ]];
1124
1125 let cert = self.read_state_raw(paths, canister_id).await?;
1126
1127 lookup_canister_metadata(cert, canister_id, path)
1128 }
1129
1130 pub async fn read_state_subnet_metrics(
1132 &self,
1133 subnet_id: Principal,
1134 ) -> Result<SubnetMetrics, AgentError> {
1135 let paths = vec![vec![
1136 "subnet".into(),
1137 Label::from_bytes(subnet_id.as_slice()),
1138 "metrics".into(),
1139 ]];
1140 let cert = self.read_subnet_state_raw(paths, subnet_id).await?;
1141 lookup_subnet_metrics(cert, subnet_id)
1142 }
1143
1144 pub async fn read_state_subnet_canister_ranges(
1146 &self,
1147 subnet_id: Principal,
1148 ) -> Result<Vec<(Principal, Principal)>, AgentError> {
1149 let paths = vec![vec![
1150 "subnet".into(),
1151 Label::from_bytes(subnet_id.as_slice()),
1152 "canister_ranges".into(),
1153 ]];
1154 let cert = self.read_subnet_state_raw(paths, subnet_id).await?;
1155 lookup_subnet_canister_ranges(&cert, subnet_id)
1156 }
1157
1158 pub async fn request_status_raw(
1160 &self,
1161 request_id: &RequestId,
1162 effective_canister_id: Principal,
1163 ) -> Result<(RequestStatusResponse, Certificate), AgentError> {
1164 let paths: Vec<Vec<Label>> =
1165 vec![vec!["request_status".into(), request_id.to_vec().into()]];
1166
1167 let cert = self.read_state_raw(paths, effective_canister_id).await?;
1168
1169 Ok((lookup_request_status(&cert, request_id)?, cert))
1170 }
1171
1172 pub async fn request_status_signed(
1176 &self,
1177 request_id: &RequestId,
1178 effective_canister_id: Principal,
1179 signed_request_status: Vec<u8>,
1180 ) -> Result<(RequestStatusResponse, Certificate), AgentError> {
1181 let _envelope: Envelope =
1182 serde_cbor::from_slice(&signed_request_status).map_err(AgentError::InvalidCborData)?;
1183 let read_state_response: ReadStateResponse = self
1184 .read_state_endpoint(effective_canister_id, signed_request_status)
1185 .await?;
1186
1187 let cert: Certificate = serde_cbor::from_slice(&read_state_response.certificate)
1188 .map_err(AgentError::InvalidCborData)?;
1189 self.verify(&cert, effective_canister_id)?;
1190 Ok((lookup_request_status(&cert, request_id)?, cert))
1191 }
1192
1193 pub fn update<S: Into<String>>(
1196 &self,
1197 canister_id: &Principal,
1198 method_name: S,
1199 ) -> UpdateBuilder<'_> {
1200 UpdateBuilder::new(self, *canister_id, method_name.into())
1201 }
1202
1203 pub async fn status(&self) -> Result<Status, AgentError> {
1205 let endpoint = "api/v2/status";
1206 let bytes = self.execute(Method::GET, endpoint, None).await?.1;
1207
1208 let cbor: serde_cbor::Value =
1209 serde_cbor::from_slice(&bytes).map_err(AgentError::InvalidCborData)?;
1210
1211 Status::try_from(&cbor).map_err(|_| AgentError::InvalidReplicaStatus)
1212 }
1213
1214 pub fn query<S: Into<String>>(
1217 &self,
1218 canister_id: &Principal,
1219 method_name: S,
1220 ) -> QueryBuilder<'_> {
1221 QueryBuilder::new(self, *canister_id, method_name.into())
1222 }
1223
1224 pub fn sign_request_status(
1227 &self,
1228 effective_canister_id: Principal,
1229 request_id: RequestId,
1230 ) -> Result<SignedRequestStatus, AgentError> {
1231 let paths: Vec<Vec<Label>> =
1232 vec![vec!["request_status".into(), request_id.to_vec().into()]];
1233 let read_state_content = self.read_state_content(paths)?;
1234 let signed_request_status = sign_envelope(&read_state_content, self.identity.clone())?;
1235 let ingress_expiry = read_state_content.ingress_expiry();
1236 let sender = *read_state_content.sender();
1237 Ok(SignedRequestStatus {
1238 ingress_expiry,
1239 sender,
1240 effective_canister_id,
1241 request_id,
1242 signed_request_status,
1243 })
1244 }
1245
1246 pub async fn get_subnet_by_canister(
1249 &self,
1250 canister: &Principal,
1251 ) -> Result<Arc<Subnet>, AgentError> {
1252 let subnet = self
1253 .subnet_key_cache
1254 .lock()
1255 .unwrap()
1256 .get_subnet_by_canister(canister);
1257 if let Some(subnet) = subnet {
1258 Ok(subnet)
1259 } else {
1260 self.fetch_subnet_by_canister(canister).await
1261 }
1262 }
1263
1264 pub async fn get_subnet_by_id(&self, subnet_id: &Principal) -> Result<Arc<Subnet>, AgentError> {
1267 let subnet = self
1268 .subnet_key_cache
1269 .lock()
1270 .unwrap()
1271 .get_subnet_by_id(subnet_id);
1272 if let Some(subnet) = subnet {
1273 Ok(subnet)
1274 } else {
1275 self.fetch_subnet_by_id(subnet_id).await
1276 }
1277 }
1278
1279 pub async fn fetch_api_boundary_nodes_by_canister_id(
1281 &self,
1282 canister_id: Principal,
1283 ) -> Result<Vec<ApiBoundaryNode>, AgentError> {
1284 let paths = vec![vec!["api_boundary_nodes".into()]];
1285 let certificate = self.read_state_raw(paths, canister_id).await?;
1286 let api_boundary_nodes = lookup_api_boundary_nodes(certificate)?;
1287 Ok(api_boundary_nodes)
1288 }
1289
1290 pub async fn fetch_api_boundary_nodes_by_subnet_id(
1292 &self,
1293 subnet_id: Principal,
1294 ) -> Result<Vec<ApiBoundaryNode>, AgentError> {
1295 let paths = vec![vec!["api_boundary_nodes".into()]];
1296 let certificate = self.read_subnet_state_raw(paths, subnet_id).await?;
1297 let api_boundary_nodes = lookup_api_boundary_nodes(certificate)?;
1298 Ok(api_boundary_nodes)
1299 }
1300
1301 pub async fn fetch_subnet_by_canister(
1306 &self,
1307 canister: &Principal,
1308 ) -> Result<Arc<Subnet>, AgentError> {
1309 let canister_cert = self
1310 .read_state_raw(vec![vec!["subnet".into()]], *canister)
1311 .await?;
1312 let subnet_id = if let Some(delegation) = canister_cert.delegation.as_ref() {
1313 Principal::from_slice(&delegation.subnet_id)
1314 } else {
1315 Principal::self_authenticating(&self.root_key.read().unwrap()[..])
1317 };
1318 let mut subnet = lookup_incomplete_subnet(&subnet_id, &canister_cert)?;
1319 let canister_ranges = if let Some(delegation) = canister_cert.delegation.as_ref() {
1320 let delegation_cert: Certificate = serde_cbor::from_slice(&delegation.certificate)?;
1322 lookup_canister_ranges(&subnet_id, &delegation_cert)?
1323 } else {
1324 lookup_canister_ranges(&subnet_id, &canister_cert)?
1325 };
1326 subnet.canister_ranges = canister_ranges;
1327 if !subnet.canister_ranges.contains(canister) {
1328 return Err(AgentError::CertificateNotAuthorized());
1329 }
1330 let subnet = Arc::new(subnet);
1331 self.subnet_key_cache
1332 .lock()
1333 .unwrap()
1334 .insert_subnet(subnet_id, subnet.clone());
1335 Ok(subnet)
1336 }
1337
1338 pub async fn fetch_subnet_by_id(
1343 &self,
1344 subnet_id: &Principal,
1345 ) -> Result<Arc<Subnet>, AgentError> {
1346 let subnet_cert = self
1347 .read_subnet_state_raw(
1348 vec![
1349 vec!["canister_ranges".into(), subnet_id.as_slice().into()],
1350 vec!["subnet".into(), subnet_id.as_slice().into()],
1351 ],
1352 *subnet_id,
1353 )
1354 .await?;
1355 let subnet = lookup_subnet_and_ranges(subnet_id, &subnet_cert)?;
1356 let subnet = Arc::new(subnet);
1357 self.subnet_key_cache
1358 .lock()
1359 .unwrap()
1360 .insert_subnet(*subnet_id, subnet.clone());
1361 Ok(subnet)
1362 }
1363
1364 async fn request(
1365 &self,
1366 method: Method,
1367 endpoint: &str,
1368 body: Option<Vec<u8>>,
1369 ) -> Result<(StatusCode, HeaderMap, Vec<u8>), AgentError> {
1370 let body = body.map(Bytes::from);
1371
1372 let create_request_with_generated_url = || -> Result<http::Request<Bytes>, AgentError> {
1373 let url = self.route_provider.route()?.join(endpoint)?;
1374 let uri = Uri::from_str(url.as_str())
1375 .map_err(|e| AgentError::InvalidReplicaUrl(e.to_string()))?;
1376 let body = body.clone().unwrap_or_default();
1377 let request = http::Request::builder()
1378 .method(method.clone())
1379 .uri(uri)
1380 .header(CONTENT_TYPE, "application/cbor")
1381 .body(body)
1382 .map_err(|e| {
1383 AgentError::TransportError(TransportError::Generic(format!(
1384 "unable to create request: {e:#}"
1385 )))
1386 })?;
1387
1388 Ok(request)
1389 };
1390
1391 let response = self
1392 .client
1393 .call(
1394 &create_request_with_generated_url,
1395 self.max_tcp_error_retries,
1396 self.max_response_body_size,
1397 )
1398 .await?;
1399
1400 let (parts, body) = response.into_parts();
1401
1402 Ok((parts.status, parts.headers, body.to_vec()))
1403 }
1404
1405 async fn execute(
1406 &self,
1407 method: Method,
1408 endpoint: &str,
1409 body: Option<Vec<u8>>,
1410 ) -> Result<(StatusCode, Vec<u8>), AgentError> {
1411 let request_result = self.request(method.clone(), endpoint, body.clone()).await?;
1412
1413 let status = request_result.0;
1414 let headers = request_result.1;
1415 let body = request_result.2;
1416
1417 if status.is_client_error() || status.is_server_error() {
1418 Err(AgentError::HttpError(HttpErrorPayload {
1419 status: status.into(),
1420 content_type: headers
1421 .get(CONTENT_TYPE)
1422 .and_then(|value| value.to_str().ok())
1423 .map(str::to_string),
1424 content: body,
1425 }))
1426 } else if !(status == StatusCode::OK || status == StatusCode::ACCEPTED) {
1427 Err(AgentError::InvalidHttpResponse(format!(
1428 "Expected `200`, `202`, 4xx`, or `5xx` HTTP status code. Got: {status}",
1429 )))
1430 } else {
1431 Ok((status, body))
1432 }
1433 }
1434}
1435
1436fn principal_is_within_ranges(principal: &Principal, ranges: &[(Principal, Principal)]) -> bool {
1439 ranges
1440 .iter()
1441 .any(|r| principal >= &r.0 && principal <= &r.1)
1442}
1443
1444fn sign_envelope(
1445 content: &EnvelopeContent,
1446 identity: Arc<dyn Identity>,
1447) -> Result<Vec<u8>, AgentError> {
1448 let signature = identity.sign(content).map_err(AgentError::SigningError)?;
1449
1450 let envelope = Envelope {
1451 content: Cow::Borrowed(content),
1452 sender_pubkey: signature.public_key,
1453 sender_sig: signature.signature,
1454 sender_delegation: signature.delegations,
1455 };
1456
1457 let mut serialized_bytes = Vec::new();
1458 let mut serializer = serde_cbor::Serializer::new(&mut serialized_bytes);
1459 serializer.self_describe()?;
1460 envelope.serialize(&mut serializer)?;
1461
1462 Ok(serialized_bytes)
1463}
1464
1465pub fn signed_query_inspect(
1468 sender: Principal,
1469 canister_id: Principal,
1470 method_name: &str,
1471 arg: &[u8],
1472 ingress_expiry: u64,
1473 signed_query: Vec<u8>,
1474) -> Result<(), AgentError> {
1475 let envelope: Envelope =
1476 serde_cbor::from_slice(&signed_query).map_err(AgentError::InvalidCborData)?;
1477 match envelope.content.as_ref() {
1478 EnvelopeContent::Query {
1479 ingress_expiry: ingress_expiry_cbor,
1480 sender: sender_cbor,
1481 canister_id: canister_id_cbor,
1482 method_name: method_name_cbor,
1483 arg: arg_cbor,
1484 nonce: _nonce,
1485 sender_info: _,
1486 } => {
1487 if ingress_expiry != *ingress_expiry_cbor {
1488 return Err(AgentError::CallDataMismatch {
1489 field: "ingress_expiry".to_string(),
1490 value_arg: ingress_expiry.to_string(),
1491 value_cbor: ingress_expiry_cbor.to_string(),
1492 });
1493 }
1494 if sender != *sender_cbor {
1495 return Err(AgentError::CallDataMismatch {
1496 field: "sender".to_string(),
1497 value_arg: sender.to_string(),
1498 value_cbor: sender_cbor.to_string(),
1499 });
1500 }
1501 if canister_id != *canister_id_cbor {
1502 return Err(AgentError::CallDataMismatch {
1503 field: "canister_id".to_string(),
1504 value_arg: canister_id.to_string(),
1505 value_cbor: canister_id_cbor.to_string(),
1506 });
1507 }
1508 if method_name != *method_name_cbor {
1509 return Err(AgentError::CallDataMismatch {
1510 field: "method_name".to_string(),
1511 value_arg: method_name.to_string(),
1512 value_cbor: method_name_cbor.clone(),
1513 });
1514 }
1515 if arg != *arg_cbor {
1516 return Err(AgentError::CallDataMismatch {
1517 field: "arg".to_string(),
1518 value_arg: format!("{arg:?}"),
1519 value_cbor: format!("{arg_cbor:?}"),
1520 });
1521 }
1522 }
1523 EnvelopeContent::Call { .. } => {
1524 return Err(AgentError::CallDataMismatch {
1525 field: "request_type".to_string(),
1526 value_arg: "query".to_string(),
1527 value_cbor: "call".to_string(),
1528 })
1529 }
1530 EnvelopeContent::ReadState { .. } => {
1531 return Err(AgentError::CallDataMismatch {
1532 field: "request_type".to_string(),
1533 value_arg: "query".to_string(),
1534 value_cbor: "read_state".to_string(),
1535 })
1536 }
1537 }
1538 Ok(())
1539}
1540
1541pub fn signed_update_inspect(
1544 sender: Principal,
1545 canister_id: Principal,
1546 method_name: &str,
1547 arg: &[u8],
1548 ingress_expiry: u64,
1549 signed_update: Vec<u8>,
1550) -> Result<(), AgentError> {
1551 let envelope: Envelope =
1552 serde_cbor::from_slice(&signed_update).map_err(AgentError::InvalidCborData)?;
1553 match envelope.content.as_ref() {
1554 EnvelopeContent::Call {
1555 nonce: _nonce,
1556 ingress_expiry: ingress_expiry_cbor,
1557 sender: sender_cbor,
1558 canister_id: canister_id_cbor,
1559 method_name: method_name_cbor,
1560 arg: arg_cbor,
1561 sender_info: _,
1562 } => {
1563 if ingress_expiry != *ingress_expiry_cbor {
1564 return Err(AgentError::CallDataMismatch {
1565 field: "ingress_expiry".to_string(),
1566 value_arg: ingress_expiry.to_string(),
1567 value_cbor: ingress_expiry_cbor.to_string(),
1568 });
1569 }
1570 if sender != *sender_cbor {
1571 return Err(AgentError::CallDataMismatch {
1572 field: "sender".to_string(),
1573 value_arg: sender.to_string(),
1574 value_cbor: sender_cbor.to_string(),
1575 });
1576 }
1577 if canister_id != *canister_id_cbor {
1578 return Err(AgentError::CallDataMismatch {
1579 field: "canister_id".to_string(),
1580 value_arg: canister_id.to_string(),
1581 value_cbor: canister_id_cbor.to_string(),
1582 });
1583 }
1584 if method_name != *method_name_cbor {
1585 return Err(AgentError::CallDataMismatch {
1586 field: "method_name".to_string(),
1587 value_arg: method_name.to_string(),
1588 value_cbor: method_name_cbor.clone(),
1589 });
1590 }
1591 if arg != *arg_cbor {
1592 return Err(AgentError::CallDataMismatch {
1593 field: "arg".to_string(),
1594 value_arg: format!("{arg:?}"),
1595 value_cbor: format!("{arg_cbor:?}"),
1596 });
1597 }
1598 }
1599 EnvelopeContent::ReadState { .. } => {
1600 return Err(AgentError::CallDataMismatch {
1601 field: "request_type".to_string(),
1602 value_arg: "call".to_string(),
1603 value_cbor: "read_state".to_string(),
1604 })
1605 }
1606 EnvelopeContent::Query { .. } => {
1607 return Err(AgentError::CallDataMismatch {
1608 field: "request_type".to_string(),
1609 value_arg: "call".to_string(),
1610 value_cbor: "query".to_string(),
1611 })
1612 }
1613 }
1614 Ok(())
1615}
1616
1617pub fn signed_request_status_inspect(
1620 sender: Principal,
1621 request_id: &RequestId,
1622 ingress_expiry: u64,
1623 signed_request_status: Vec<u8>,
1624) -> Result<(), AgentError> {
1625 let paths: Vec<Vec<Label>> = vec![vec!["request_status".into(), request_id.to_vec().into()]];
1626 let envelope: Envelope =
1627 serde_cbor::from_slice(&signed_request_status).map_err(AgentError::InvalidCborData)?;
1628 match envelope.content.as_ref() {
1629 EnvelopeContent::ReadState {
1630 ingress_expiry: ingress_expiry_cbor,
1631 sender: sender_cbor,
1632 paths: paths_cbor,
1633 } => {
1634 if ingress_expiry != *ingress_expiry_cbor {
1635 return Err(AgentError::CallDataMismatch {
1636 field: "ingress_expiry".to_string(),
1637 value_arg: ingress_expiry.to_string(),
1638 value_cbor: ingress_expiry_cbor.to_string(),
1639 });
1640 }
1641 if sender != *sender_cbor {
1642 return Err(AgentError::CallDataMismatch {
1643 field: "sender".to_string(),
1644 value_arg: sender.to_string(),
1645 value_cbor: sender_cbor.to_string(),
1646 });
1647 }
1648
1649 if paths != *paths_cbor {
1650 return Err(AgentError::CallDataMismatch {
1651 field: "paths".to_string(),
1652 value_arg: format!("{paths:?}"),
1653 value_cbor: format!("{paths_cbor:?}"),
1654 });
1655 }
1656 }
1657 EnvelopeContent::Query { .. } => {
1658 return Err(AgentError::CallDataMismatch {
1659 field: "request_type".to_string(),
1660 value_arg: "read_state".to_string(),
1661 value_cbor: "query".to_string(),
1662 })
1663 }
1664 EnvelopeContent::Call { .. } => {
1665 return Err(AgentError::CallDataMismatch {
1666 field: "request_type".to_string(),
1667 value_arg: "read_state".to_string(),
1668 value_cbor: "call".to_string(),
1669 })
1670 }
1671 }
1672 Ok(())
1673}
1674
1675#[derive(Clone)]
1676struct SubnetCache {
1677 subnets: TimedCache<Principal, Arc<Subnet>>,
1678 canister_index: RangeInclusiveMap<Principal, Principal>,
1679}
1680
1681impl SubnetCache {
1682 fn new() -> Self {
1683 Self {
1684 subnets: TimedCache::with_lifespan(Duration::from_secs(300)),
1685 canister_index: RangeInclusiveMap::new(),
1686 }
1687 }
1688
1689 fn get_subnet_by_canister(&mut self, canister: &Principal) -> Option<Arc<Subnet>> {
1690 self.canister_index
1691 .get(canister)
1692 .and_then(|subnet_id| self.subnets.cache_get(subnet_id).cloned())
1693 .filter(|subnet| subnet.canister_ranges.contains(canister))
1694 }
1695
1696 fn get_subnet_by_id(&mut self, subnet_id: &Principal) -> Option<Arc<Subnet>> {
1697 self.subnets.cache_get(subnet_id).cloned()
1698 }
1699
1700 fn insert_subnet(&mut self, subnet_id: Principal, subnet: Arc<Subnet>) {
1701 self.subnets.cache_set(subnet_id, subnet.clone());
1702 for range in subnet.canister_ranges.iter() {
1703 self.canister_index.insert(range.clone(), subnet_id);
1704 }
1705 }
1706}
1707
1708#[derive(Debug, Clone)]
1710pub struct ApiBoundaryNode {
1711 pub domain: String,
1713 pub ipv6_address: String,
1715 pub ipv4_address: Option<String>,
1717}
1718
1719#[derive(Debug, Clone)]
1723#[non_exhaustive]
1724pub struct QueryBuilder<'agent> {
1725 agent: &'agent Agent,
1726 pub effective_canister_id: Principal,
1728 pub canister_id: Principal,
1730 pub method_name: String,
1732 pub arg: Vec<u8>,
1734 pub ingress_expiry_datetime: Option<u64>,
1736 pub use_nonce: bool,
1738}
1739
1740impl<'agent> QueryBuilder<'agent> {
1741 pub fn new(agent: &'agent Agent, canister_id: Principal, method_name: String) -> Self {
1743 Self {
1744 agent,
1745 effective_canister_id: canister_id,
1746 canister_id,
1747 method_name,
1748 arg: vec![],
1749 ingress_expiry_datetime: None,
1750 use_nonce: false,
1751 }
1752 }
1753
1754 pub fn with_effective_canister_id(mut self, canister_id: Principal) -> Self {
1756 self.effective_canister_id = canister_id;
1757 self
1758 }
1759
1760 pub fn with_arg<A: Into<Vec<u8>>>(mut self, arg: A) -> Self {
1762 self.arg = arg.into();
1763 self
1764 }
1765
1766 pub fn expire_at(mut self, time: impl Into<OffsetDateTime>) -> Self {
1768 self.ingress_expiry_datetime = Some(time.into().unix_timestamp_nanos() as u64);
1769 self
1770 }
1771
1772 pub fn expire_after(mut self, duration: Duration) -> Self {
1774 self.ingress_expiry_datetime = Some(
1775 OffsetDateTime::now_utc()
1776 .saturating_add(duration.try_into().expect("negative duration"))
1777 .unix_timestamp_nanos() as u64,
1778 );
1779 self
1780 }
1781
1782 pub fn with_nonce_generation(mut self) -> Self {
1785 self.use_nonce = true;
1786 self
1787 }
1788
1789 pub async fn call(self) -> Result<Vec<u8>, AgentError> {
1791 self.agent
1792 .query_raw(
1793 self.canister_id,
1794 self.effective_canister_id,
1795 self.method_name,
1796 self.arg,
1797 self.ingress_expiry_datetime,
1798 self.use_nonce,
1799 None,
1800 )
1801 .await
1802 }
1803
1804 pub async fn call_with_verification(self) -> Result<Vec<u8>, AgentError> {
1809 self.agent
1810 .query_raw(
1811 self.canister_id,
1812 self.effective_canister_id,
1813 self.method_name,
1814 self.arg,
1815 self.ingress_expiry_datetime,
1816 self.use_nonce,
1817 Some(true),
1818 )
1819 .await
1820 }
1821
1822 pub async fn call_without_verification(self) -> Result<Vec<u8>, AgentError> {
1827 self.agent
1828 .query_raw(
1829 self.canister_id,
1830 self.effective_canister_id,
1831 self.method_name,
1832 self.arg,
1833 self.ingress_expiry_datetime,
1834 self.use_nonce,
1835 Some(false),
1836 )
1837 .await
1838 }
1839
1840 pub fn sign(self) -> Result<SignedQuery, AgentError> {
1843 let effective_canister_id = self.effective_canister_id;
1844 let identity = self.agent.identity.clone();
1845 let content = self.into_envelope()?;
1846 let signed_query = sign_envelope(&content, identity)?;
1847 let EnvelopeContent::Query {
1848 ingress_expiry,
1849 sender,
1850 canister_id,
1851 method_name,
1852 arg,
1853 nonce,
1854 sender_info,
1855 } = content
1856 else {
1857 unreachable!()
1858 };
1859 Ok(SignedQuery {
1860 ingress_expiry,
1861 sender,
1862 canister_id,
1863 method_name,
1864 arg,
1865 effective_canister_id,
1866 signed_query,
1867 nonce,
1868 sender_info,
1869 })
1870 }
1871
1872 pub fn into_envelope(self) -> Result<EnvelopeContent, AgentError> {
1874 self.agent.query_content(
1875 self.canister_id,
1876 self.method_name,
1877 self.arg,
1878 self.ingress_expiry_datetime,
1879 self.use_nonce,
1880 )
1881 }
1882}
1883
1884impl<'agent> IntoFuture for QueryBuilder<'agent> {
1885 type IntoFuture = AgentFuture<'agent, Vec<u8>>;
1886 type Output = Result<Vec<u8>, AgentError>;
1887 fn into_future(self) -> Self::IntoFuture {
1888 Box::pin(self.call())
1889 }
1890}
1891
1892pub struct UpdateCall<'agent> {
1894 agent: &'agent Agent,
1895 response_future: AgentFuture<'agent, CallResponse<(Vec<u8>, Certificate)>>,
1896 effective_canister_id: Principal,
1897 canister_id: Principal,
1898 method_name: String,
1899}
1900
1901impl fmt::Debug for UpdateCall<'_> {
1902 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1903 f.debug_struct("UpdateCall")
1904 .field("agent", &self.agent)
1905 .field("effective_canister_id", &self.effective_canister_id)
1906 .finish_non_exhaustive()
1907 }
1908}
1909
1910impl Future for UpdateCall<'_> {
1911 type Output = Result<CallResponse<(Vec<u8>, Certificate)>, AgentError>;
1912 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1913 self.response_future.as_mut().poll(cx)
1914 }
1915}
1916
1917impl<'a> UpdateCall<'a> {
1918 pub async fn and_wait(self) -> Result<(Vec<u8>, Certificate), AgentError> {
1920 let response = self.response_future.await?;
1921
1922 match response {
1923 CallResponse::Response(response) => Ok(response),
1924 CallResponse::Poll(request_id) => {
1925 self.agent
1926 .wait_inner(
1927 &request_id,
1928 self.effective_canister_id,
1929 Some(Operation::Call {
1930 canister: self.canister_id,
1931 method: self.method_name,
1932 }),
1933 )
1934 .await
1935 }
1936 }
1937 }
1938}
1939#[derive(Debug)]
1944pub struct UpdateBuilder<'agent> {
1945 agent: &'agent Agent,
1946 pub effective_canister_id: Principal,
1948 pub canister_id: Principal,
1950 pub method_name: String,
1952 pub arg: Vec<u8>,
1954 pub ingress_expiry_datetime: Option<u64>,
1956}
1957
1958impl<'agent> UpdateBuilder<'agent> {
1959 pub fn new(agent: &'agent Agent, canister_id: Principal, method_name: String) -> Self {
1961 Self {
1962 agent,
1963 effective_canister_id: canister_id,
1964 canister_id,
1965 method_name,
1966 arg: vec![],
1967 ingress_expiry_datetime: None,
1968 }
1969 }
1970
1971 pub fn with_effective_canister_id(mut self, canister_id: Principal) -> Self {
1973 self.effective_canister_id = canister_id;
1974 self
1975 }
1976
1977 pub fn with_arg<A: Into<Vec<u8>>>(mut self, arg: A) -> Self {
1979 self.arg = arg.into();
1980 self
1981 }
1982
1983 pub fn expire_at(mut self, time: impl Into<OffsetDateTime>) -> Self {
1985 self.ingress_expiry_datetime = Some(time.into().unix_timestamp_nanos() as u64);
1986 self
1987 }
1988
1989 pub fn expire_after(mut self, duration: Duration) -> Self {
1991 self.ingress_expiry_datetime = Some(
1992 OffsetDateTime::now_utc()
1993 .saturating_add(duration.try_into().expect("negative duration"))
1994 .unix_timestamp_nanos() as u64,
1995 );
1996 self
1997 }
1998
1999 pub async fn call_and_wait(self) -> Result<Vec<u8>, AgentError> {
2002 self.call().and_wait().await.map(|x| x.0)
2003 }
2004
2005 pub fn call(self) -> UpdateCall<'agent> {
2008 let method_name = self.method_name.clone();
2009 let response_future = async move {
2010 self.agent
2011 .update_raw(
2012 self.canister_id,
2013 self.effective_canister_id,
2014 self.method_name,
2015 self.arg,
2016 self.ingress_expiry_datetime,
2017 )
2018 .await
2019 };
2020 UpdateCall {
2021 agent: self.agent,
2022 response_future: Box::pin(response_future),
2023 effective_canister_id: self.effective_canister_id,
2024 canister_id: self.canister_id,
2025 method_name,
2026 }
2027 }
2028
2029 pub fn sign(self) -> Result<SignedUpdate, AgentError> {
2032 let identity = self.agent.identity.clone();
2033 let effective_canister_id = self.effective_canister_id;
2034 let content = self.into_envelope()?;
2035 let signed_update = sign_envelope(&content, identity)?;
2036 let request_id = to_request_id(&content)?;
2037 let EnvelopeContent::Call {
2038 nonce,
2039 ingress_expiry,
2040 sender,
2041 canister_id,
2042 method_name,
2043 arg,
2044 sender_info,
2045 } = content
2046 else {
2047 unreachable!()
2048 };
2049 Ok(SignedUpdate {
2050 nonce,
2051 ingress_expiry,
2052 sender,
2053 canister_id,
2054 method_name,
2055 arg,
2056 effective_canister_id,
2057 signed_update,
2058 request_id,
2059 sender_info,
2060 })
2061 }
2062
2063 pub fn into_envelope(self) -> Result<EnvelopeContent, AgentError> {
2065 let nonce = self.agent.nonce_factory.generate();
2066 self.agent.update_content(
2067 self.canister_id,
2068 self.method_name,
2069 self.arg,
2070 self.ingress_expiry_datetime,
2071 nonce,
2072 )
2073 }
2074}
2075
2076impl<'agent> IntoFuture for UpdateBuilder<'agent> {
2077 type IntoFuture = AgentFuture<'agent, Vec<u8>>;
2078 type Output = Result<Vec<u8>, AgentError>;
2079 fn into_future(self) -> Self::IntoFuture {
2080 Box::pin(self.call_and_wait())
2081 }
2082}
2083
2084#[cfg_attr(target_family = "wasm", async_trait(?Send))]
2086#[cfg_attr(not(target_family = "wasm"), async_trait)]
2087pub trait HttpService: Send + Sync + Debug {
2088 async fn call<'a>(
2090 &'a self,
2091 req: &'a (dyn Fn() -> Result<http::Request<Bytes>, AgentError> + Send + Sync),
2092 max_retries: usize,
2093 size_limit: Option<usize>,
2094 ) -> Result<http::Response<Bytes>, AgentError>;
2095}
2096
2097fn from_http_request(req: http::Request<Bytes>) -> Result<Request, AgentError> {
2099 let (parts, body) = req.into_parts();
2100 let body = reqwest::Body::from(body);
2101 let request = http::Request::from_parts(parts, body)
2104 .try_into()
2105 .map_err(|e: reqwest::Error| AgentError::InvalidReplicaUrl(e.to_string()))?;
2106
2107 Ok(request)
2108}
2109
2110#[cfg(not(target_family = "wasm"))]
2112async fn to_http_response(
2113 resp: Response,
2114 size_limit: Option<usize>,
2115) -> Result<http::Response<Bytes>, AgentError> {
2116 use http_body_util::{BodyExt, Limited};
2117
2118 let resp: http::Response<reqwest::Body> = resp.into();
2119 let (parts, body) = resp.into_parts();
2120 let body = Limited::new(body, size_limit.unwrap_or(usize::MAX));
2121 let body = body
2122 .collect()
2123 .await
2124 .map_err(|e| {
2125 AgentError::TransportError(TransportError::Generic(format!(
2126 "unable to read response body: {e:#}"
2127 )))
2128 })?
2129 .to_bytes();
2130 let resp = http::Response::from_parts(parts, body);
2131
2132 Ok(resp)
2133}
2134
2135#[cfg(target_family = "wasm")]
2139async fn to_http_response(
2140 resp: Response,
2141 size_limit: Option<usize>,
2142) -> Result<http::Response<Bytes>, AgentError> {
2143 use futures_util::StreamExt;
2144 use http_body::Frame;
2145 use http_body_util::{Limited, StreamBody};
2146
2147 let status = resp.status();
2149 let headers = resp.headers().clone();
2150
2151 let stream = resp.bytes_stream().map(|x| x.map(Frame::data));
2153 let body = StreamBody::new(stream);
2154 let body = Limited::new(body, size_limit.unwrap_or(usize::MAX));
2155 let body = http_body_util::BodyExt::collect(body)
2156 .await
2157 .map_err(|e| {
2158 AgentError::TransportError(TransportError::Generic(format!(
2159 "unable to read response body: {e:#}"
2160 )))
2161 })?
2162 .to_bytes();
2163
2164 let mut resp = http::Response::new(body);
2165 *resp.status_mut() = status;
2166 *resp.headers_mut() = headers;
2167
2168 Ok(resp)
2169}
2170
2171#[cfg(not(target_family = "wasm"))]
2172#[async_trait]
2173impl<T> HttpService for T
2174where
2175 for<'a> &'a T: Service<Request, Response = Response, Error = reqwest::Error>,
2176 for<'a> <&'a Self as Service<Request>>::Future: Send,
2177 T: Send + Sync + Debug + ?Sized,
2178{
2179 #[allow(clippy::needless_arbitrary_self_type)]
2180 async fn call<'a>(
2181 mut self: &'a Self,
2182 req: &'a (dyn Fn() -> Result<http::Request<Bytes>, AgentError> + Send + Sync),
2183 max_retries: usize,
2184 size_limit: Option<usize>,
2185 ) -> Result<http::Response<Bytes>, AgentError> {
2186 let mut retry_count = 0;
2187 loop {
2188 let request = from_http_request(req()?)?;
2189
2190 match Service::call(&mut self, request).await {
2191 Err(err) => {
2192 if err.is_connect() {
2194 if retry_count >= max_retries {
2195 return Err(AgentError::TransportError(TransportError::Reqwest(err)));
2196 }
2197 retry_count += 1;
2198 }
2199 else {
2201 return Err(AgentError::TransportError(TransportError::Reqwest(err)));
2202 }
2203 }
2204
2205 Ok(resp) => {
2206 let resp = to_http_response(resp, size_limit).await?;
2207 return Ok(resp);
2208 }
2209 }
2210 }
2211 }
2212}
2213
2214#[cfg(target_family = "wasm")]
2215#[async_trait(?Send)]
2216impl<T> HttpService for T
2217where
2218 for<'a> &'a T: Service<Request, Response = Response, Error = reqwest::Error>,
2219 T: Send + Sync + Debug + ?Sized,
2220{
2221 #[allow(clippy::needless_arbitrary_self_type)]
2222 async fn call<'a>(
2223 mut self: &'a Self,
2224 req: &'a (dyn Fn() -> Result<http::Request<Bytes>, AgentError> + Send + Sync),
2225 _retries: usize,
2226 _size_limit: Option<usize>,
2227 ) -> Result<http::Response<Bytes>, AgentError> {
2228 let request = from_http_request(req()?)?;
2229 let response = Service::call(&mut self, request)
2230 .await
2231 .map_err(|e| AgentError::TransportError(TransportError::Reqwest(e)))?;
2232
2233 to_http_response(response, _size_limit).await
2234 }
2235}
2236
2237#[derive(Debug)]
2238struct Retry429Logic {
2239 client: Client,
2240}
2241
2242#[cfg_attr(target_family = "wasm", async_trait(?Send))]
2243#[cfg_attr(not(target_family = "wasm"), async_trait)]
2244impl HttpService for Retry429Logic {
2245 async fn call<'a>(
2246 &'a self,
2247 req: &'a (dyn Fn() -> Result<http::Request<Bytes>, AgentError> + Send + Sync),
2248 _max_tcp_retries: usize,
2249 _size_limit: Option<usize>,
2250 ) -> Result<http::Response<Bytes>, AgentError> {
2251 let mut retries = 0;
2252 loop {
2253 #[cfg(not(target_family = "wasm"))]
2254 let resp = self.client.call(req, _max_tcp_retries, _size_limit).await?;
2255 #[cfg(target_family = "wasm")]
2257 let resp = {
2258 let request = from_http_request(req()?)?;
2259 let resp = self
2260 .client
2261 .execute(request)
2262 .await
2263 .map_err(|e| AgentError::TransportError(TransportError::Reqwest(e)))?;
2264
2265 to_http_response(resp, _size_limit).await?
2266 };
2267
2268 if resp.status() == StatusCode::TOO_MANY_REQUESTS {
2269 if retries == 6 {
2270 break Ok(resp);
2271 } else {
2272 retries += 1;
2273 crate::util::sleep(Duration::from_millis(250)).await;
2274 continue;
2275 }
2276 } else {
2277 break Ok(resp);
2278 }
2279 }
2280 }
2281}
2282
2283#[cfg(all(test, not(target_family = "wasm")))]
2284mod offline_tests {
2285 use super::*;
2286 use tokio::net::TcpListener;
2287 #[test]
2290 fn rounded_expiry() {
2291 let agent = Agent::builder()
2292 .with_url("http://not-a-real-url")
2293 .build()
2294 .unwrap();
2295 let mut prev_expiry = None;
2296 let mut num_timestamps = 0;
2297 for _ in 0..6 {
2298 let update = agent
2299 .update(&Principal::management_canister(), "not_a_method")
2300 .sign()
2301 .unwrap();
2302 if prev_expiry < Some(update.ingress_expiry) {
2303 prev_expiry = Some(update.ingress_expiry);
2304 num_timestamps += 1;
2305 }
2306 }
2307 assert!(num_timestamps <= 2, "num_timestamps:{num_timestamps} > 2");
2309 }
2310
2311 #[tokio::test]
2312 async fn client_ratelimit() {
2313 let mock_server = TcpListener::bind("127.0.0.1:0").await.unwrap();
2314 let count = Arc::new(Mutex::new(0));
2315 let port = mock_server.local_addr().unwrap().port();
2316 tokio::spawn({
2317 let count = count.clone();
2318 async move {
2319 loop {
2320 let (mut conn, _) = mock_server.accept().await.unwrap();
2321 *count.lock().unwrap() += 1;
2322 tokio::spawn(
2323 async move { tokio::io::copy(&mut conn, &mut tokio::io::sink()).await },
2325 );
2326 }
2327 }
2328 });
2329 let agent = Agent::builder()
2330 .with_http_client(Client::builder().http1_only().build().unwrap())
2331 .with_url(format!("http://127.0.0.1:{port}"))
2332 .with_max_concurrent_requests(2)
2333 .build()
2334 .unwrap();
2335 for _ in 0..3 {
2336 let agent = agent.clone();
2337 tokio::spawn(async move {
2338 agent
2339 .query(&"ryjl3-tyaaa-aaaaa-aaaba-cai".parse().unwrap(), "greet")
2340 .call()
2341 .await
2342 });
2343 }
2344 crate::util::sleep(Duration::from_millis(250)).await;
2345 assert_eq!(*count.lock().unwrap(), 2);
2346 }
2347}