1use std::sync::Arc;
5
6use futures::{Future, StreamExt as _, TryStreamExt as _};
7use linera_base::{
8 crypto::{CryptoHash, ValidatorPublicKey},
9 data_types::{Epoch, Timestamp},
10 identifiers::{Account, AccountOwner, ChainId},
11 ownership::ChainOwnership,
12 time::{Duration, Instant},
13 util::future::FutureSyncExt as _,
14};
15use linera_chain::{manager::LockingBlock, types::ConfirmedBlockCertificate};
16use linera_core::{
17 client::{ChainClient, ChainClientError, Client, ListeningMode},
18 data_types::{ChainInfo, ChainInfoQuery, ClientOutcome},
19 join_set_ext::JoinSet,
20 node::ValidatorNode,
21 wallet, Environment, JoinSetExt as _, Wallet as _,
22};
23use linera_rpc::node_provider::{NodeOptions, NodeProvider};
24use linera_version::VersionInfo;
25use thiserror_context::Context;
26use tracing::{debug, info, warn};
27#[cfg(not(web))]
28use {
29 crate::{
30 benchmark::{fungible_transfer, Benchmark, BenchmarkError},
31 client_metrics::ClientMetrics,
32 },
33 futures::stream,
34 linera_base::{
35 crypto::AccountPublicKey,
36 data_types::{Amount, BlockHeight},
37 identifiers::{ApplicationId, BlobType},
38 },
39 linera_execution::{
40 system::{OpenChainConfig, SystemOperation},
41 Operation,
42 },
43 std::{collections::HashSet, path::Path},
44 tokio::{sync::mpsc, task},
45};
46#[cfg(feature = "fs")]
47use {
48 linera_base::{
49 data_types::{BlobContent, Bytecode},
50 identifiers::ModuleId,
51 vm::VmRuntime,
52 },
53 linera_core::client::create_bytecode_blobs,
54 std::{fs, path::PathBuf},
55};
56
57use crate::{
58 chain_listener::{self, ClientContext as _},
59 client_options::{ChainOwnershipConfig, Options},
60 config::GenesisConfig,
61 error, util, Error,
62};
63
64pub struct ValidatorQueryResults {
66 pub version_info: Result<VersionInfo, Error>,
68 pub genesis_config_hash: Result<CryptoHash, Error>,
70 pub chain_info: Result<ChainInfo, Error>,
72}
73
74impl ValidatorQueryResults {
75 pub fn errors(&self) -> Vec<&Error> {
77 let mut errors = Vec::new();
78 if let Err(e) = &self.version_info {
79 errors.push(e);
80 }
81 if let Err(e) = &self.genesis_config_hash {
82 errors.push(e);
83 }
84 if let Err(e) = &self.chain_info {
85 errors.push(e);
86 }
87 errors
88 }
89
90 pub fn print(
95 &self,
96 public_key: Option<&ValidatorPublicKey>,
97 address: Option<&str>,
98 weight: Option<u64>,
99 reference: Option<&ValidatorQueryResults>,
100 ) {
101 if let Some(key) = public_key {
102 println!("Public key: {}", key);
103 }
104 if let Some(address) = address {
105 println!("Address: {}", address);
106 }
107 if let Some(w) = weight {
108 println!("Weight: {}", w);
109 }
110
111 let ref_version = reference.and_then(|ref_results| ref_results.version_info.as_ref().ok());
112 match &self.version_info {
113 Ok(version_info) => {
114 if ref_version.is_none_or(|ref_v| ref_v.crate_version != version_info.crate_version)
115 {
116 println!("Linera protocol: v{}", version_info.crate_version);
117 }
118 if ref_version.is_none_or(|ref_v| ref_v.rpc_hash != version_info.rpc_hash) {
119 println!("RPC API hash: {}", version_info.rpc_hash);
120 }
121 if ref_version.is_none_or(|ref_v| ref_v.graphql_hash != version_info.graphql_hash) {
122 println!("GraphQL API hash: {}", version_info.graphql_hash);
123 }
124 if ref_version.is_none_or(|ref_v| ref_v.wit_hash != version_info.wit_hash) {
125 println!("WIT API hash: v{}", version_info.wit_hash);
126 }
127 if ref_version.is_none_or(|ref_v| {
128 (&ref_v.git_commit, ref_v.git_dirty)
129 != (&version_info.git_commit, version_info.git_dirty)
130 }) {
131 println!(
132 "Source code: {}/tree/{}{}",
133 env!("CARGO_PKG_REPOSITORY"),
134 version_info.git_commit,
135 if version_info.git_dirty {
136 " (dirty)"
137 } else {
138 ""
139 }
140 );
141 }
142 }
143 Err(err) => println!("Error getting version info: {err}"),
144 }
145
146 let ref_genesis_hash =
147 reference.and_then(|ref_results| ref_results.genesis_config_hash.as_ref().ok());
148 match &self.genesis_config_hash {
149 Ok(hash) if ref_genesis_hash.is_some_and(|ref_hash| ref_hash == hash) => {}
150 Ok(hash) => println!("Genesis config hash: {hash}"),
151 Err(err) => println!("Error getting genesis config: {err}"),
152 }
153
154 let ref_info = reference.and_then(|ref_results| ref_results.chain_info.as_ref().ok());
155 match &self.chain_info {
156 Ok(info) => {
157 if ref_info.is_none_or(|ref_info| info.block_hash != ref_info.block_hash) {
158 if let Some(hash) = info.block_hash {
159 println!("Block hash: {}", hash);
160 } else {
161 println!("Block hash: None");
162 }
163 }
164 if ref_info
165 .is_none_or(|ref_info| info.next_block_height != ref_info.next_block_height)
166 {
167 println!("Next height: {}", info.next_block_height);
168 }
169 if ref_info.is_none_or(|ref_info| info.timestamp != ref_info.timestamp) {
170 println!("Timestamp: {}", info.timestamp);
171 }
172 if ref_info.is_none_or(|ref_info| info.epoch != ref_info.epoch) {
173 println!("Epoch: {}", info.epoch);
174 }
175 if ref_info.is_none_or(|ref_info| {
176 info.manager.current_round != ref_info.manager.current_round
177 }) {
178 println!("Round: {}", info.manager.current_round);
179 }
180 if let Some(locking) = &info.manager.requested_locking {
181 match &**locking {
182 LockingBlock::Fast(proposal) => {
183 println!(
184 "Locking fast block from {}",
185 proposal.content.block.timestamp
186 );
187 }
188 LockingBlock::Regular(validated) => {
189 println!(
190 "Locking block {} in {} from {}",
191 validated.hash(),
192 validated.round,
193 validated.block().header.timestamp
194 );
195 }
196 }
197 }
198 }
199 Err(err) => println!("Error getting chain info: {err}"),
200 }
201 }
202}
203
204pub struct ClientContext<Env: Environment> {
205 pub client: Arc<Client<Env>>,
206 pub genesis_config: crate::config::GenesisConfig,
208 pub send_timeout: Duration,
209 pub recv_timeout: Duration,
210 pub retry_delay: Duration,
211 pub max_retries: u32,
212 pub max_backoff: Duration,
213 pub chain_listeners: JoinSet,
214 pub default_chain: Option<ChainId>,
216 #[cfg(not(web))]
217 pub client_metrics: Option<ClientMetrics>,
218}
219
220impl<Env: Environment> chain_listener::ClientContext for ClientContext<Env> {
221 type Environment = Env;
222
223 fn wallet(&self) -> &Env::Wallet {
224 self.client.wallet()
225 }
226
227 fn storage(&self) -> &Env::Storage {
228 self.client.storage_client()
229 }
230
231 fn client(&self) -> &Arc<Client<Env>> {
232 &self.client
233 }
234
235 #[cfg(not(web))]
236 fn timing_sender(
237 &self,
238 ) -> Option<mpsc::UnboundedSender<(u64, linera_core::client::TimingType)>> {
239 self.client_metrics
240 .as_ref()
241 .map(|metrics| metrics.timing_sender.clone())
242 }
243
244 async fn update_wallet_for_new_chain(
245 &mut self,
246 chain_id: ChainId,
247 owner: Option<AccountOwner>,
248 timestamp: Timestamp,
249 epoch: Epoch,
250 ) -> Result<(), Error> {
251 self.update_wallet_for_new_chain(chain_id, owner, timestamp, epoch)
252 .make_sync()
253 .await
254 }
255
256 async fn update_wallet(&mut self, client: &ChainClient<Env>) -> Result<(), Error> {
257 self.update_wallet_from_client(client).make_sync().await
258 }
259}
260
261impl<S, Si, W> ClientContext<linera_core::environment::Impl<S, NodeProvider, Si, W>>
262where
263 S: linera_core::environment::Storage,
264 Si: linera_core::environment::Signer,
265 W: linera_core::environment::Wallet,
266{
267 #[allow(clippy::too_many_arguments)]
271 pub async fn new(
272 storage: S,
273 wallet: W,
274 signer: Si,
275 options: &Options,
276 default_chain: Option<ChainId>,
277 genesis_config: GenesisConfig,
278 ) -> Result<Self, Error> {
279 #[cfg(not(web))]
280 let timing_config = options.to_timing_config();
281 let node_provider = NodeProvider::new(NodeOptions {
282 send_timeout: options.send_timeout,
283 recv_timeout: options.recv_timeout,
284 retry_delay: options.retry_delay,
285 max_retries: options.max_retries,
286 max_backoff: options.max_backoff,
287 });
288 let chain_modes: Vec<_> = wallet
289 .items()
290 .map_ok(|(id, _chain)| (id, ListeningMode::FullChain))
291 .try_collect()
292 .await
293 .map_err(error::Inner::wallet)?;
294 let name = match chain_modes.len() {
295 0 => "Client node".to_string(),
296 1 => format!("Client node for {:.8}", chain_modes[0].0),
297 n => format!(
298 "Client node for {:.8} and {} others",
299 chain_modes[0].0,
300 n - 1
301 ),
302 };
303
304 let client = Client::new(
305 linera_core::environment::Impl {
306 network: node_provider,
307 storage,
308 signer,
309 wallet,
310 },
311 genesis_config.admin_chain_id(),
312 options.long_lived_services,
313 chain_modes,
314 name,
315 options.chain_worker_ttl,
316 options.sender_chain_worker_ttl,
317 options.to_chain_client_options(),
318 options.to_requests_scheduler_config(),
319 );
320
321 #[cfg(not(web))]
322 let client_metrics = if timing_config.enabled {
323 Some(ClientMetrics::new(timing_config))
324 } else {
325 None
326 };
327
328 Ok(ClientContext {
329 client: Arc::new(client),
330 default_chain,
331 genesis_config,
332 send_timeout: options.send_timeout,
333 recv_timeout: options.recv_timeout,
334 retry_delay: options.retry_delay,
335 max_retries: options.max_retries,
336 max_backoff: options.max_backoff,
337 chain_listeners: JoinSet::default(),
338 #[cfg(not(web))]
339 client_metrics,
340 })
341 }
342}
343
344impl<Env: Environment> ClientContext<Env> {
345 pub fn wallet(&self) -> &Env::Wallet {
349 self.client.wallet()
350 }
351
352 pub fn admin_chain_id(&self) -> ChainId {
354 self.client.admin_chain_id()
355 }
356
357 pub fn default_account(&self) -> Account {
360 Account::chain(self.default_chain())
361 }
362
363 pub fn default_chain(&self) -> ChainId {
365 self.default_chain
366 .expect("default chain requested but none set")
367 }
368
369 pub async fn first_non_admin_chain(&self) -> Result<ChainId, Error> {
370 let admin_chain_id = self.admin_chain_id();
371 std::pin::pin!(self
372 .wallet()
373 .chain_ids()
374 .try_filter(|chain_id| futures::future::ready(*chain_id != admin_chain_id)))
375 .next()
376 .await
377 .expect("No non-admin chain specified in wallet with no non-admin chain")
378 .map_err(Error::wallet)
379 }
380
381 pub fn make_node_provider(&self) -> NodeProvider {
383 NodeProvider::new(self.make_node_options())
384 }
385
386 fn make_node_options(&self) -> NodeOptions {
387 NodeOptions {
388 send_timeout: self.send_timeout,
389 recv_timeout: self.recv_timeout,
390 retry_delay: self.retry_delay,
391 max_retries: self.max_retries,
392 max_backoff: self.max_backoff,
393 }
394 }
395
396 #[cfg(not(web))]
397 pub fn client_metrics(&self) -> Option<&ClientMetrics> {
398 self.client_metrics.as_ref()
399 }
400
401 pub async fn update_wallet_from_client<Env_: Environment>(
402 &self,
403 client: &ChainClient<Env_>,
404 ) -> Result<(), Error> {
405 let info = client.chain_info().await?;
406 let existing_owner = self
407 .wallet()
408 .get(info.chain_id)
409 .await
410 .map_err(error::Inner::wallet)?
411 .and_then(|chain| chain.owner);
412
413 self.wallet()
414 .insert(
415 info.chain_id,
416 wallet::Chain {
417 pending_proposal: client.pending_proposal().clone(),
418 owner: existing_owner,
419 ..info.as_ref().into()
420 },
421 )
422 .await
423 .map_err(error::Inner::wallet)?;
424
425 Ok(())
426 }
427
428 pub async fn update_wallet_for_new_chain(
430 &mut self,
431 chain_id: ChainId,
432 owner: Option<AccountOwner>,
433 timestamp: Timestamp,
434 epoch: Epoch,
435 ) -> Result<(), Error> {
436 self.wallet()
437 .try_insert(
438 chain_id,
439 linera_core::wallet::Chain::new(owner, epoch, timestamp),
440 )
441 .await
442 .map_err(error::Inner::wallet)?;
443 Ok(())
444 }
445
446 pub async fn process_inbox(
447 &mut self,
448 chain_client: &ChainClient<Env>,
449 ) -> Result<Vec<ConfirmedBlockCertificate>, Error> {
450 let mut certificates = Vec::new();
451 let (new_certificates, maybe_timeout) = {
453 chain_client.synchronize_from_validators().await?;
454 let result = chain_client.process_inbox_without_prepare().await;
455 self.update_wallet_from_client(chain_client).await?;
456 result?
457 };
458 certificates.extend(new_certificates);
459 if maybe_timeout.is_none() {
460 return Ok(certificates);
461 }
462
463 let (listener, _listen_handle, mut notification_stream) = chain_client.listen().await?;
465 self.chain_listeners.spawn_task(listener);
466
467 loop {
468 let (new_certificates, maybe_timeout) = {
469 let result = chain_client.process_inbox().await;
470 self.update_wallet_from_client(chain_client).await?;
471 result?
472 };
473 certificates.extend(new_certificates);
474 if let Some(timestamp) = maybe_timeout {
475 util::wait_for_next_round(&mut notification_stream, timestamp).await
476 } else {
477 return Ok(certificates);
478 }
479 }
480 }
481
482 pub async fn assign_new_chain_to_key(
483 &mut self,
484 chain_id: ChainId,
485 owner: AccountOwner,
486 ) -> Result<(), Error> {
487 self.client
488 .extend_chain_mode(chain_id, ListeningMode::FullChain);
489 let client = self.make_chain_client(chain_id).await?;
490
491 client.get_chain_description().await?;
493
494 client.synchronize_from_validators().await?;
496 let info = client.chain_info().await?;
497
498 if !info
501 .manager
502 .ownership
503 .can_propose_in_multi_leader_round(&owner)
504 {
505 tracing::error!("Chain {chain_id} is not owned by {owner}.");
506 return Err(error::Inner::ChainOwnership.into());
507 }
508
509 let modified = self
511 .wallet()
512 .modify(chain_id, |chain| chain.owner = Some(owner))
513 .await
514 .map_err(error::Inner::wallet)?;
515 if modified.is_none() {
517 self.wallet()
518 .insert(
519 chain_id,
520 wallet::Chain {
521 owner: Some(owner),
522 timestamp: info.timestamp,
523 epoch: Some(info.epoch),
524 ..Default::default()
525 },
526 )
527 .await
528 .map_err(error::Inner::wallet)
529 .context("assigning new chain")?;
530 }
531 Ok(())
532 }
533
534 pub async fn apply_client_command<E, F, Fut, T>(
539 &mut self,
540 client: &ChainClient<Env>,
541 mut f: F,
542 ) -> Result<T, Error>
543 where
544 F: FnMut(&ChainClient<Env>) -> Fut,
545 Fut: Future<Output = Result<ClientOutcome<T>, E>>,
546 Error: From<E>,
547 {
548 client.prepare_chain().await?;
549 let result = f(client).await;
551 self.update_wallet_from_client(client).await?;
552 match result? {
553 ClientOutcome::Committed(t) => return Ok(t),
554 ClientOutcome::Conflict(certificate) => {
555 return Err(ChainClientError::Conflict(certificate.hash()).into());
556 }
557 ClientOutcome::WaitForTimeout(_) => {}
558 }
559
560 let (listener, _listen_handle, mut notification_stream) = client.listen().await?;
562 self.chain_listeners.spawn_task(listener);
563
564 loop {
565 let result = f(client).await;
567 self.update_wallet_from_client(client).await?;
568 let timeout = match result? {
569 ClientOutcome::Committed(t) => return Ok(t),
570 ClientOutcome::Conflict(certificate) => {
571 return Err(ChainClientError::Conflict(certificate.hash()).into());
572 }
573 ClientOutcome::WaitForTimeout(timeout) => timeout,
574 };
575 util::wait_for_next_round(&mut notification_stream, timeout).await;
577 }
578 }
579
580 pub async fn ownership(&mut self, chain_id: Option<ChainId>) -> Result<ChainOwnership, Error> {
581 let chain_id = chain_id.unwrap_or_else(|| self.default_chain());
582 let client = self.make_chain_client(chain_id).await?;
583 let info = client.chain_info().await?;
584 Ok(info.manager.ownership)
585 }
586
587 pub async fn change_ownership(
588 &mut self,
589 chain_id: Option<ChainId>,
590 ownership_config: ChainOwnershipConfig,
591 ) -> Result<(), Error> {
592 let chain_id = chain_id.unwrap_or_else(|| self.default_chain());
593 let chain_client = self.make_chain_client(chain_id).await?;
594 info!(
595 ?ownership_config, %chain_id, preferred_owner=?chain_client.preferred_owner(),
596 "Changing ownership of a chain"
597 );
598 let time_start = Instant::now();
599 let mut ownership = chain_client.query_chain_ownership().await?;
600 ownership_config.update(&mut ownership)?;
601
602 if ownership.super_owners.is_empty() && ownership.owners.is_empty() {
603 tracing::error!("At least one owner or super owner of the chain has to be set.");
604 return Err(error::Inner::ChainOwnership.into());
605 }
606
607 let certificate = self
608 .apply_client_command(&chain_client, |chain_client| {
609 let ownership = ownership.clone();
610 let chain_client = chain_client.clone();
611 async move {
612 chain_client
613 .change_ownership(ownership)
614 .await
615 .map_err(Error::from)
616 .context("Failed to change ownership")
617 }
618 })
619 .await?;
620 let time_total = time_start.elapsed();
621 info!("Operation confirmed after {} ms", time_total.as_millis());
622 debug!("{:?}", certificate);
623 Ok(())
624 }
625
626 pub async fn set_preferred_owner(
627 &mut self,
628 chain_id: Option<ChainId>,
629 preferred_owner: AccountOwner,
630 ) -> Result<(), Error> {
631 let chain_id = chain_id.unwrap_or_else(|| self.default_chain());
632 let mut chain_client = self.make_chain_client(chain_id).await?;
633 let old_owner = chain_client.preferred_owner();
634 info!(%chain_id, ?old_owner, %preferred_owner, "Changing preferred owner for chain");
635 chain_client.set_preferred_owner(preferred_owner);
636 self.update_wallet_from_client(&chain_client).await?;
637 info!("New preferred owner set");
638 Ok(())
639 }
640
641 pub async fn check_compatible_version_info(
642 &self,
643 address: &str,
644 node: &impl ValidatorNode,
645 ) -> Result<VersionInfo, Error> {
646 match node.get_version_info().await {
647 Ok(version_info) if version_info.is_compatible_with(&linera_version::VERSION_INFO) => {
648 debug!(
649 "Version information for validator {address}: {}",
650 version_info
651 );
652 Ok(version_info)
653 }
654 Ok(version_info) => Err(error::Inner::UnexpectedVersionInfo {
655 remote: Box::new(version_info),
656 local: Box::new(linera_version::VERSION_INFO.clone()),
657 }
658 .into()),
659 Err(error) => Err(error::Inner::UnavailableVersionInfo {
660 address: address.to_string(),
661 error: Box::new(error),
662 }
663 .into()),
664 }
665 }
666
667 pub async fn check_matching_network_description(
668 &self,
669 address: &str,
670 node: &impl ValidatorNode,
671 ) -> Result<CryptoHash, Error> {
672 let network_description = self.genesis_config.network_description();
673 match node.get_network_description().await {
674 Ok(description) => {
675 if description == network_description {
676 Ok(description.genesis_config_hash)
677 } else {
678 Err(error::Inner::UnexpectedNetworkDescription {
679 remote: Box::new(description),
680 local: Box::new(network_description),
681 }
682 .into())
683 }
684 }
685 Err(error) => Err(error::Inner::UnavailableNetworkDescription {
686 address: address.to_string(),
687 error: Box::new(error),
688 }
689 .into()),
690 }
691 }
692
693 pub async fn check_validator_chain_info_response(
694 &self,
695 public_key: Option<&ValidatorPublicKey>,
696 address: &str,
697 node: &impl ValidatorNode,
698 chain_id: ChainId,
699 ) -> Result<ChainInfo, Error> {
700 let query = ChainInfoQuery::new(chain_id).with_manager_values();
701 match node.handle_chain_info_query(query).await {
702 Ok(response) => {
703 debug!(
704 "Validator {address} sees chain {chain_id} at block height {} and epoch {:?}",
705 response.info.next_block_height, response.info.epoch,
706 );
707 if let Some(public_key) = public_key {
708 if response.check(*public_key).is_ok() {
709 debug!("Signature for public key {public_key} is OK.");
710 } else {
711 return Err(error::Inner::InvalidSignature {
712 public_key: *public_key,
713 }
714 .into());
715 }
716 } else {
717 warn!("Not checking signature as public key was not given");
718 }
719 Ok(*response.info)
720 }
721 Err(error) => Err(error::Inner::UnavailableChainInfo {
722 address: address.to_string(),
723 chain_id,
724 error: Box::new(error),
725 }
726 .into()),
727 }
728 }
729
730 pub async fn query_validator(
734 &self,
735 address: &str,
736 node: &impl ValidatorNode,
737 chain_id: ChainId,
738 public_key: Option<&ValidatorPublicKey>,
739 ) -> ValidatorQueryResults {
740 let version_info = self.check_compatible_version_info(address, node).await;
741 let genesis_config_hash = self.check_matching_network_description(address, node).await;
742 let chain_info = self
743 .check_validator_chain_info_response(public_key, address, node, chain_id)
744 .await;
745
746 ValidatorQueryResults {
747 version_info,
748 genesis_config_hash,
749 chain_info,
750 }
751 }
752
753 pub async fn query_local_node(
757 &self,
758 chain_id: ChainId,
759 ) -> Result<ValidatorQueryResults, Error> {
760 let version_info = Ok(linera_version::VERSION_INFO.clone());
761 let genesis_config_hash = Ok(self
762 .genesis_config
763 .network_description()
764 .genesis_config_hash);
765 let chain_info = self
766 .make_chain_client(chain_id)
767 .await?
768 .chain_info_with_manager_values()
769 .await
770 .map(|info| *info)
771 .map_err(|e| e.into());
772
773 Ok(ValidatorQueryResults {
774 version_info,
775 genesis_config_hash,
776 chain_info,
777 })
778 }
779}
780
781#[cfg(feature = "fs")]
782impl<Env: Environment> ClientContext<Env> {
783 pub async fn publish_module(
784 &mut self,
785 chain_client: &ChainClient<Env>,
786 contract: PathBuf,
787 service: PathBuf,
788 vm_runtime: VmRuntime,
789 ) -> Result<ModuleId, Error> {
790 info!("Loading bytecode files");
791 let contract_bytecode = Bytecode::load_from_file(&contract)
792 .with_context(|| format!("failed to load contract bytecode from {:?}", &contract))?;
793 let service_bytecode = Bytecode::load_from_file(&service)
794 .with_context(|| format!("failed to load service bytecode from {:?}", &service))?;
795
796 info!("Publishing module");
797 let (blobs, module_id) =
798 create_bytecode_blobs(contract_bytecode, service_bytecode, vm_runtime).await;
799 let (module_id, _) = self
800 .apply_client_command(chain_client, |chain_client| {
801 let blobs = blobs.clone();
802 let chain_client = chain_client.clone();
803 async move {
804 chain_client
805 .publish_module_blobs(blobs, module_id)
806 .await
807 .context("Failed to publish module")
808 }
809 })
810 .await?;
811
812 info!("{}", "Module published successfully!");
813
814 info!("Synchronizing client and processing inbox");
815 self.process_inbox(chain_client).await?;
816 Ok(module_id)
817 }
818
819 pub async fn publish_data_blob(
820 &mut self,
821 chain_client: &ChainClient<Env>,
822 blob_path: PathBuf,
823 ) -> Result<CryptoHash, Error> {
824 info!("Loading data blob file");
825 let blob_bytes = fs::read(&blob_path).context(format!(
826 "failed to load data blob bytes from {:?}",
827 &blob_path
828 ))?;
829
830 info!("Publishing data blob");
831 self.apply_client_command(chain_client, |chain_client| {
832 let blob_bytes = blob_bytes.clone();
833 let chain_client = chain_client.clone();
834 async move {
835 chain_client
836 .publish_data_blob(blob_bytes)
837 .await
838 .context("Failed to publish data blob")
839 }
840 })
841 .await?;
842
843 info!("{}", "Data blob published successfully!");
844 Ok(CryptoHash::new(&BlobContent::new_data(blob_bytes)))
845 }
846
847 pub async fn read_data_blob(
849 &mut self,
850 chain_client: &ChainClient<Env>,
851 hash: CryptoHash,
852 ) -> Result<(), Error> {
853 info!("Verifying data blob");
854 self.apply_client_command(chain_client, |chain_client| {
855 let chain_client = chain_client.clone();
856 async move {
857 chain_client
858 .read_data_blob(hash)
859 .await
860 .context("Failed to verify data blob")
861 }
862 })
863 .await?;
864
865 info!("{}", "Data blob verified successfully!");
866 Ok(())
867 }
868}
869
870#[cfg(not(web))]
871impl<Env: Environment> ClientContext<Env> {
872 pub async fn prepare_for_benchmark(
873 &mut self,
874 num_chains: usize,
875 tokens_per_chain: Amount,
876 fungible_application_id: Option<ApplicationId>,
877 pub_keys: Vec<AccountPublicKey>,
878 chains_config_path: Option<&Path>,
879 close_chains: bool,
880 ) -> Result<Vec<ChainClient<Env>>, Error> {
881 let start = Instant::now();
882 self.process_inboxes_and_force_validator_updates().await;
886 info!(
887 "Processed inboxes and forced validator updates in {} ms",
888 start.elapsed().as_millis()
889 );
890
891 let start = Instant::now();
892 let (benchmark_chains, chain_clients) = self
893 .make_benchmark_chains(
894 num_chains,
895 tokens_per_chain,
896 pub_keys,
897 chains_config_path.is_some(),
898 close_chains,
899 )
900 .await?;
901 info!(
902 "Got {} chains in {} ms",
903 num_chains,
904 start.elapsed().as_millis()
905 );
906
907 if let Some(id) = fungible_application_id {
908 let start = Instant::now();
909 self.supply_fungible_tokens(&benchmark_chains, id).await?;
910 info!(
911 "Supplied fungible tokens in {} ms",
912 start.elapsed().as_millis()
913 );
914 let start = Instant::now();
916 for chain_client in &chain_clients {
917 chain_client.process_inbox().await?;
918 }
919 info!(
920 "Processed inboxes after supplying fungible tokens in {} ms",
921 start.elapsed().as_millis()
922 );
923 }
924
925 let all_chains = Benchmark::<Env>::get_all_chains(chains_config_path, &benchmark_chains)?;
926 let known_chain_ids: HashSet<_> = benchmark_chains.iter().map(|(id, _)| *id).collect();
927 let unknown_chain_ids: Vec<_> = all_chains
928 .iter()
929 .filter(|id| !known_chain_ids.contains(id))
930 .copied()
931 .collect();
932 if !unknown_chain_ids.is_empty() {
933 for chain_id in &unknown_chain_ids {
937 self.client.get_chain_description(*chain_id).await?;
938 }
939 }
940
941 Ok(chain_clients)
942 }
943
944 pub async fn wrap_up_benchmark(
945 &mut self,
946 chain_clients: Vec<ChainClient<Env>>,
947 close_chains: bool,
948 wrap_up_max_in_flight: usize,
949 ) -> Result<(), Error> {
950 if close_chains {
951 info!("Closing chains...");
952 let chain_ids: Vec<_> = chain_clients.iter().map(|c| c.chain_id()).collect();
953 let stream = stream::iter(chain_clients)
954 .map(|chain_client| async move {
955 Benchmark::<Env>::close_benchmark_chain(&chain_client).await?;
956 info!("Closed chain {:?}", chain_client.chain_id());
957 Ok::<(), BenchmarkError>(())
958 })
959 .buffer_unordered(wrap_up_max_in_flight);
960 stream.try_collect::<Vec<_>>().await?;
961 for chain_id in chain_ids {
963 let _ = self.wallet().remove(chain_id).await;
964 }
965 } else {
966 info!("Processing inbox for all chains...");
967 let stream = stream::iter(chain_clients.clone())
968 .map(|chain_client| async move {
969 chain_client.process_inbox().await?;
970 info!("Processed inbox for chain {:?}", chain_client.chain_id());
971 Ok::<(), ChainClientError>(())
972 })
973 .buffer_unordered(wrap_up_max_in_flight);
974 stream.try_collect::<Vec<_>>().await?;
975
976 info!("Updating wallet from chain clients...");
977 for chain_client in chain_clients {
978 let info = chain_client.chain_info().await?;
979 let client_owner = chain_client.preferred_owner();
980 let pending_proposal = chain_client.pending_proposal().clone();
981 self.wallet()
982 .insert(
983 info.chain_id,
984 wallet::Chain {
985 pending_proposal,
986 owner: client_owner,
987 ..info.as_ref().into()
988 },
989 )
990 .await
991 .map_err(error::Inner::wallet)?;
992 }
993 }
994
995 Ok(())
996 }
997
998 async fn process_inboxes_and_force_validator_updates(&mut self) {
999 let mut join_set = task::JoinSet::new();
1000
1001 let chain_clients: Vec<_> = self
1002 .wallet()
1003 .owned_chain_ids()
1004 .map_err(|e| error::Inner::wallet(e).into())
1005 .and_then(|id| self.make_chain_client(id))
1006 .try_collect()
1007 .await
1008 .unwrap();
1009
1010 for chain_client in chain_clients {
1011 join_set.spawn(async move {
1012 Self::process_inbox_without_updating_wallet(&chain_client)
1013 .await
1014 .expect("Processing inbox should not fail!");
1015 chain_client
1016 });
1017 }
1018
1019 for chain_client in join_set.join_all().await {
1020 self.update_wallet_from_client(&chain_client).await.unwrap();
1021 }
1022 }
1023
1024 async fn process_inbox_without_updating_wallet(
1025 chain_client: &ChainClient<Env>,
1026 ) -> Result<Vec<ConfirmedBlockCertificate>, Error> {
1027 chain_client.synchronize_from_validators().await?;
1029 let (certificates, maybe_timeout) = chain_client.process_inbox_without_prepare().await?;
1030 assert!(
1031 maybe_timeout.is_none(),
1032 "Should not timeout within benchmark!"
1033 );
1034
1035 Ok(certificates)
1036 }
1037
1038 async fn make_benchmark_chains(
1044 &mut self,
1045 num_chains: usize,
1046 balance: Amount,
1047 pub_keys: Vec<AccountPublicKey>,
1048 wallet_only: bool,
1049 close_chains: bool,
1050 ) -> Result<(Vec<(ChainId, AccountOwner)>, Vec<ChainClient<Env>>), Error> {
1051 let mut chains_found_in_wallet = 0;
1052 let mut benchmark_chains = Vec::with_capacity(num_chains);
1053 let mut chain_clients = Vec::with_capacity(num_chains);
1054 let start = Instant::now();
1055
1056 if !close_chains || wallet_only {
1061 let mut owned_chain_ids = std::pin::pin!(self.wallet().owned_chain_ids());
1062 while let Some(chain_id) = owned_chain_ids.next().await {
1063 let chain_id = chain_id.map_err(error::Inner::wallet)?;
1064 if chains_found_in_wallet == num_chains {
1065 break;
1066 }
1067 let chain_client = self.make_chain_client(chain_id).await?;
1068 let ownership = chain_client.chain_info().await?.manager.ownership;
1069 if !ownership.owners.is_empty() || ownership.super_owners.len() != 1 {
1070 continue;
1071 }
1072 let owner = *ownership.super_owners.first().unwrap();
1073 chain_client.process_inbox().await?;
1074 benchmark_chains.push((chain_id, owner));
1075 chain_clients.push(chain_client);
1076 chains_found_in_wallet += 1;
1077 }
1078 info!(
1079 "Got {} chains from the wallet in {} ms",
1080 benchmark_chains.len(),
1081 start.elapsed().as_millis()
1082 );
1083 }
1084
1085 let num_chains_to_create = num_chains - chains_found_in_wallet;
1086
1087 let default_chain_client = self.make_chain_client(self.default_chain()).await?;
1088
1089 if num_chains_to_create > 0 {
1090 if wallet_only {
1091 return Err(
1092 error::Inner::Benchmark(BenchmarkError::NotEnoughChainsInWallet(
1093 num_chains,
1094 chains_found_in_wallet,
1095 ))
1096 .into(),
1097 );
1098 }
1099 let mut pub_keys_iter = pub_keys.into_iter().take(num_chains_to_create);
1100 let operations_per_block = 900; for i in (0..num_chains_to_create).step_by(operations_per_block) {
1102 let num_new_chains = operations_per_block.min(num_chains_to_create - i);
1103 let owners: Vec<AccountOwner> = (&mut pub_keys_iter)
1106 .take(num_new_chains)
1107 .map(|pk| pk.into())
1108 .collect();
1109
1110 let certificate = Self::execute_open_chains_operations(
1111 &default_chain_client,
1112 balance,
1113 owners.clone(),
1114 )
1115 .await?;
1116 info!("Block executed successfully");
1117
1118 let block = certificate.block();
1119 for (i, owner) in owners.into_iter().enumerate() {
1120 let chain_id = block.body.blobs[i]
1121 .iter()
1122 .find(|blob| blob.id().blob_type == BlobType::ChainDescription)
1123 .map(|blob| ChainId(blob.id().hash))
1124 .expect("failed to create a new chain");
1125 self.client
1126 .extend_chain_mode(chain_id, ListeningMode::FullChain);
1127
1128 let mut chain_client = self.client.create_chain_client(
1129 chain_id,
1130 None,
1131 BlockHeight::ZERO,
1132 None,
1133 Some(owner),
1134 self.timing_sender(),
1135 );
1136 chain_client.set_preferred_owner(owner);
1137 chain_client.process_inbox().await?;
1138 benchmark_chains.push((chain_id, owner));
1139 chain_clients.push(chain_client);
1140 }
1141 }
1142
1143 info!(
1144 "Created {} chains in {} ms",
1145 num_chains_to_create,
1146 start.elapsed().as_millis()
1147 );
1148 }
1149
1150 if !close_chains {
1152 info!("Updating wallet from client");
1153 self.update_wallet_from_client(&default_chain_client)
1154 .await?;
1155 }
1156 info!("Retrying pending outgoing messages");
1157 default_chain_client
1158 .retry_pending_outgoing_messages()
1159 .await
1160 .context("outgoing messages to create the new chains should be delivered")?;
1161 info!("Processing default chain inbox");
1162 default_chain_client.process_inbox().await?;
1163
1164 Ok((benchmark_chains, chain_clients))
1165 }
1166
1167 async fn execute_open_chains_operations(
1168 chain_client: &ChainClient<Env>,
1169 balance: Amount,
1170 owners: Vec<AccountOwner>,
1171 ) -> Result<ConfirmedBlockCertificate, Error> {
1172 let operations: Vec<_> = owners
1173 .iter()
1174 .map(|owner| {
1175 let config = OpenChainConfig {
1176 ownership: ChainOwnership::single_super(*owner),
1177 balance,
1178 application_permissions: Default::default(),
1179 };
1180 Operation::system(SystemOperation::OpenChain(config))
1181 })
1182 .collect();
1183 info!("Executing {} OpenChain operations", operations.len());
1184 Ok(chain_client
1185 .execute_operations(operations, vec![])
1186 .await?
1187 .expect("should execute block with OpenChain operations"))
1188 }
1189
1190 async fn supply_fungible_tokens(
1192 &mut self,
1193 key_pairs: &[(ChainId, AccountOwner)],
1194 application_id: ApplicationId,
1195 ) -> Result<(), Error> {
1196 let default_chain_id = self.default_chain();
1197 let default_key = self
1198 .wallet()
1199 .get(default_chain_id)
1200 .await
1201 .unwrap()
1202 .unwrap()
1203 .owner
1204 .unwrap();
1205 let amount = Amount::from_nanos(4);
1207 let operations: Vec<Operation> = key_pairs
1208 .iter()
1209 .map(|(chain_id, owner)| {
1210 fungible_transfer(application_id, *chain_id, default_key, *owner, amount)
1211 })
1212 .collect();
1213 let chain_client = self.make_chain_client(default_chain_id).await?;
1214 for operation_chunk in operations.chunks(1000) {
1216 chain_client
1217 .execute_operations(operation_chunk.to_vec(), vec![])
1218 .await?
1219 .expect("should execute block with Transfer operations");
1220 }
1221 self.update_wallet_from_client(&chain_client).await?;
1222
1223 Ok(())
1224 }
1225}