1use std::sync::Arc;
5
6use futures::{Future, StreamExt as _, TryStreamExt as _};
7use linera_base::{
8 crypto::{CryptoHash, ValidatorPublicKey},
9 data_types::{Epoch, Timestamp},
10 identifiers::{Account, AccountOwner, ChainId},
11 ownership::ChainOwnership,
12 time::{Duration, Instant},
13 util::future::FutureSyncExt as _,
14};
15use linera_chain::{manager::LockingBlock, types::ConfirmedBlockCertificate};
16use linera_core::{
17 client::{ChainClient, Client, ListeningMode},
18 data_types::{ChainInfo, ChainInfoQuery, ClientOutcome},
19 join_set_ext::JoinSet,
20 node::ValidatorNode,
21 wallet, Environment, JoinSetExt as _, Wallet as _,
22};
23use linera_rpc::node_provider::{NodeOptions, NodeProvider};
24use linera_version::VersionInfo;
25use thiserror_context::Context;
26use tracing::{debug, info, warn};
27#[cfg(not(web))]
28use {
29 crate::{
30 benchmark::{Benchmark, BenchmarkError},
31 client_metrics::ClientMetrics,
32 },
33 futures::stream,
34 linera_base::{
35 crypto::AccountPublicKey,
36 data_types::{Amount, BlockHeight},
37 identifiers::{ApplicationId, BlobType},
38 },
39 linera_core::client::ChainClientError,
40 linera_execution::{
41 system::{OpenChainConfig, SystemOperation},
42 Operation,
43 },
44 std::{collections::HashSet, iter, path::Path},
45 tokio::{sync::mpsc, task},
46};
47#[cfg(feature = "fs")]
48use {
49 linera_base::{
50 data_types::{BlobContent, Bytecode},
51 identifiers::ModuleId,
52 vm::VmRuntime,
53 },
54 linera_core::client::create_bytecode_blobs,
55 std::{fs, path::PathBuf},
56};
57
58use crate::{
59 chain_listener::{self, ClientContext as _},
60 client_options::{ChainOwnershipConfig, Options},
61 config::GenesisConfig,
62 error, util, Error,
63};
64
65pub struct ValidatorQueryResults {
67 pub version_info: Result<VersionInfo, Error>,
69 pub genesis_config_hash: Result<CryptoHash, Error>,
71 pub chain_info: Result<ChainInfo, Error>,
73}
74
75impl ValidatorQueryResults {
76 pub fn errors(&self) -> Vec<&Error> {
78 let mut errors = Vec::new();
79 if let Err(e) = &self.version_info {
80 errors.push(e);
81 }
82 if let Err(e) = &self.genesis_config_hash {
83 errors.push(e);
84 }
85 if let Err(e) = &self.chain_info {
86 errors.push(e);
87 }
88 errors
89 }
90
91 pub fn print(
96 &self,
97 public_key: Option<&ValidatorPublicKey>,
98 address: Option<&str>,
99 weight: Option<u64>,
100 reference: Option<&ValidatorQueryResults>,
101 ) {
102 if let Some(key) = public_key {
103 println!("Public key: {}", key);
104 }
105 if let Some(address) = address {
106 println!("Address: {}", address);
107 }
108 if let Some(w) = weight {
109 println!("Weight: {}", w);
110 }
111
112 let ref_version = reference.and_then(|ref_results| ref_results.version_info.as_ref().ok());
113 match &self.version_info {
114 Ok(version_info) => {
115 if ref_version.is_none_or(|ref_v| ref_v.crate_version != version_info.crate_version)
116 {
117 println!("Linera protocol: v{}", version_info.crate_version);
118 }
119 if ref_version.is_none_or(|ref_v| ref_v.rpc_hash != version_info.rpc_hash) {
120 println!("RPC API hash: {}", version_info.rpc_hash);
121 }
122 if ref_version.is_none_or(|ref_v| ref_v.graphql_hash != version_info.graphql_hash) {
123 println!("GraphQL API hash: {}", version_info.graphql_hash);
124 }
125 if ref_version.is_none_or(|ref_v| ref_v.wit_hash != version_info.wit_hash) {
126 println!("WIT API hash: v{}", version_info.wit_hash);
127 }
128 if ref_version.is_none_or(|ref_v| {
129 (&ref_v.git_commit, ref_v.git_dirty)
130 != (&version_info.git_commit, version_info.git_dirty)
131 }) {
132 println!(
133 "Source code: {}/tree/{}{}",
134 env!("CARGO_PKG_REPOSITORY"),
135 version_info.git_commit,
136 if version_info.git_dirty {
137 " (dirty)"
138 } else {
139 ""
140 }
141 );
142 }
143 }
144 Err(err) => println!("Error getting version info: {err}"),
145 }
146
147 let ref_genesis_hash =
148 reference.and_then(|ref_results| ref_results.genesis_config_hash.as_ref().ok());
149 match &self.genesis_config_hash {
150 Ok(hash) if ref_genesis_hash.is_some_and(|ref_hash| ref_hash == hash) => {}
151 Ok(hash) => println!("Genesis config hash: {hash}"),
152 Err(err) => println!("Error getting genesis config: {err}"),
153 }
154
155 let ref_info = reference.and_then(|ref_results| ref_results.chain_info.as_ref().ok());
156 match &self.chain_info {
157 Ok(info) => {
158 if ref_info.is_none_or(|ref_info| info.block_hash != ref_info.block_hash) {
159 if let Some(hash) = info.block_hash {
160 println!("Block hash: {}", hash);
161 } else {
162 println!("Block hash: None");
163 }
164 }
165 if ref_info
166 .is_none_or(|ref_info| info.next_block_height != ref_info.next_block_height)
167 {
168 println!("Next height: {}", info.next_block_height);
169 }
170 if ref_info.is_none_or(|ref_info| info.timestamp != ref_info.timestamp) {
171 println!("Timestamp: {}", info.timestamp);
172 }
173 if ref_info.is_none_or(|ref_info| info.epoch != ref_info.epoch) {
174 println!("Epoch: {}", info.epoch);
175 }
176 if ref_info.is_none_or(|ref_info| {
177 info.manager.current_round != ref_info.manager.current_round
178 }) {
179 println!("Round: {}", info.manager.current_round);
180 }
181 if let Some(locking) = &info.manager.requested_locking {
182 match &**locking {
183 LockingBlock::Fast(proposal) => {
184 println!(
185 "Locking fast block from {}",
186 proposal.content.block.timestamp
187 );
188 }
189 LockingBlock::Regular(validated) => {
190 println!(
191 "Locking block {} in {} from {}",
192 validated.hash(),
193 validated.round,
194 validated.block().header.timestamp
195 );
196 }
197 }
198 }
199 }
200 Err(err) => println!("Error getting chain info: {err}"),
201 }
202 }
203}
204
205pub struct ClientContext<Env: Environment> {
206 pub client: Arc<Client<Env>>,
207 pub genesis_config: crate::config::GenesisConfig,
209 pub send_timeout: Duration,
210 pub recv_timeout: Duration,
211 pub retry_delay: Duration,
212 pub max_retries: u32,
213 pub chain_listeners: JoinSet,
214 pub default_chain: Option<ChainId>,
216 #[cfg(not(web))]
217 pub client_metrics: Option<ClientMetrics>,
218}
219
220impl<Env: Environment> chain_listener::ClientContext for ClientContext<Env> {
221 type Environment = Env;
222
223 fn wallet(&self) -> &Env::Wallet {
224 self.client.wallet()
225 }
226
227 fn storage(&self) -> &Env::Storage {
228 self.client.storage_client()
229 }
230
231 fn client(&self) -> &Arc<Client<Env>> {
232 &self.client
233 }
234
235 #[cfg(not(web))]
236 fn timing_sender(
237 &self,
238 ) -> Option<mpsc::UnboundedSender<(u64, linera_core::client::TimingType)>> {
239 self.client_metrics
240 .as_ref()
241 .map(|metrics| metrics.timing_sender.clone())
242 }
243
244 async fn update_wallet_for_new_chain(
245 &mut self,
246 chain_id: ChainId,
247 owner: Option<AccountOwner>,
248 timestamp: Timestamp,
249 epoch: Epoch,
250 ) -> Result<(), Error> {
251 self.update_wallet_for_new_chain(chain_id, owner, timestamp, epoch)
252 .make_sync()
253 .await
254 }
255
256 async fn update_wallet(&mut self, client: &ChainClient<Env>) -> Result<(), Error> {
257 self.update_wallet_from_client(client).make_sync().await
258 }
259}
260
261impl<S, Si, W> ClientContext<linera_core::environment::Impl<S, NodeProvider, Si, W>>
262where
263 S: linera_core::environment::Storage,
264 Si: linera_core::environment::Signer,
265 W: linera_core::environment::Wallet,
266{
267 #[allow(clippy::too_many_arguments)]
271 pub async fn new(
272 storage: S,
273 wallet: W,
274 signer: Si,
275 options: &Options,
276 default_chain: Option<ChainId>,
277 genesis_config: GenesisConfig,
278 ) -> Result<Self, Error> {
279 #[cfg(not(web))]
280 let timing_config = options.to_timing_config();
281 let node_provider = NodeProvider::new(NodeOptions {
282 send_timeout: options.send_timeout,
283 recv_timeout: options.recv_timeout,
284 retry_delay: options.retry_delay,
285 max_retries: options.max_retries,
286 });
287 let chain_modes: Vec<_> = wallet
288 .items()
289 .map_ok(|(id, _chain)| (id, ListeningMode::FullChain))
290 .try_collect()
291 .await
292 .map_err(error::Inner::wallet)?;
293 let name = match chain_modes.len() {
294 0 => "Client node".to_string(),
295 1 => format!("Client node for {:.8}", chain_modes[0].0),
296 n => format!(
297 "Client node for {:.8} and {} others",
298 chain_modes[0].0,
299 n - 1
300 ),
301 };
302
303 let client = Client::new(
304 linera_core::environment::Impl {
305 network: node_provider,
306 storage,
307 signer,
308 wallet,
309 },
310 genesis_config.admin_id(),
311 options.long_lived_services,
312 chain_modes,
313 name,
314 options.chain_worker_ttl,
315 options.sender_chain_worker_ttl,
316 options.to_chain_client_options(),
317 options.to_requests_scheduler_config(),
318 );
319
320 #[cfg(not(web))]
321 let client_metrics = if timing_config.enabled {
322 Some(ClientMetrics::new(timing_config))
323 } else {
324 None
325 };
326
327 Ok(ClientContext {
328 client: Arc::new(client),
329 default_chain,
330 genesis_config,
331 send_timeout: options.send_timeout,
332 recv_timeout: options.recv_timeout,
333 retry_delay: options.retry_delay,
334 max_retries: options.max_retries,
335 chain_listeners: JoinSet::default(),
336 #[cfg(not(web))]
337 client_metrics,
338 })
339 }
340}
341
342impl<Env: Environment> ClientContext<Env> {
343 pub fn wallet(&self) -> &Env::Wallet {
347 self.client.wallet()
348 }
349
350 pub fn admin_chain(&self) -> ChainId {
352 self.client.admin_chain()
353 }
354
355 pub fn default_account(&self) -> Account {
358 Account::chain(self.default_chain())
359 }
360
361 pub fn default_chain(&self) -> ChainId {
363 self.default_chain
364 .expect("default chain requested but none set")
365 }
366
367 pub async fn first_non_admin_chain(&self) -> Result<ChainId, Error> {
368 let admin_id = self.admin_chain();
369 std::pin::pin!(self
370 .wallet()
371 .chain_ids()
372 .try_filter(|chain_id| futures::future::ready(*chain_id != admin_id)))
373 .next()
374 .await
375 .expect("No non-admin chain specified in wallet with no non-admin chain")
376 .map_err(Error::wallet)
377 }
378
379 pub fn make_node_provider(&self) -> NodeProvider {
381 NodeProvider::new(self.make_node_options())
382 }
383
384 fn make_node_options(&self) -> NodeOptions {
385 NodeOptions {
386 send_timeout: self.send_timeout,
387 recv_timeout: self.recv_timeout,
388 retry_delay: self.retry_delay,
389 max_retries: self.max_retries,
390 }
391 }
392
393 #[cfg(not(web))]
394 pub fn client_metrics(&self) -> Option<&ClientMetrics> {
395 self.client_metrics.as_ref()
396 }
397
398 pub async fn update_wallet_from_client<Env_: Environment>(
399 &self,
400 client: &ChainClient<Env_>,
401 ) -> Result<(), Error> {
402 let info = client.chain_info().await?;
403
404 self.wallet()
405 .insert(
406 info.chain_id,
407 wallet::Chain {
408 pending_proposal: client.pending_proposal().clone(),
409 owner: client.preferred_owner(),
410 ..info.as_ref().into()
411 },
412 )
413 .await
414 .map_err(error::Inner::wallet)?;
415
416 Ok(())
417 }
418
419 pub async fn update_wallet_for_new_chain(
421 &mut self,
422 chain_id: ChainId,
423 owner: Option<AccountOwner>,
424 timestamp: Timestamp,
425 epoch: Epoch,
426 ) -> Result<(), Error> {
427 let _ = self
428 .wallet()
429 .try_insert(
430 chain_id,
431 linera_core::wallet::Chain::new(owner, epoch, timestamp),
432 )
433 .await
434 .map_err(error::Inner::wallet)?;
435 Ok(())
436 }
437
438 pub async fn process_inbox(
439 &mut self,
440 chain_client: &ChainClient<Env>,
441 ) -> Result<Vec<ConfirmedBlockCertificate>, Error> {
442 let mut certificates = Vec::new();
443 let (new_certificates, maybe_timeout) = {
445 chain_client.synchronize_from_validators().await?;
446 let result = chain_client.process_inbox_without_prepare().await;
447 self.update_wallet_from_client(chain_client).await?;
448 result?
449 };
450 certificates.extend(new_certificates);
451 if maybe_timeout.is_none() {
452 return Ok(certificates);
453 }
454
455 let (listener, _listen_handle, mut notification_stream) = chain_client.listen().await?;
457 self.chain_listeners.spawn_task(listener);
458
459 loop {
460 let (new_certificates, maybe_timeout) = {
461 let result = chain_client.process_inbox().await;
462 self.update_wallet_from_client(chain_client).await?;
463 result?
464 };
465 certificates.extend(new_certificates);
466 if let Some(timestamp) = maybe_timeout {
467 util::wait_for_next_round(&mut notification_stream, timestamp).await
468 } else {
469 return Ok(certificates);
470 }
471 }
472 }
473
474 pub async fn assign_new_chain_to_key(
475 &mut self,
476 chain_id: ChainId,
477 owner: AccountOwner,
478 ) -> Result<(), Error> {
479 self.client
480 .extend_chain_mode(chain_id, ListeningMode::FullChain);
481 let client = self.make_chain_client(chain_id).await?;
482 let chain_description = client.get_chain_description().await?;
483 let config = chain_description.config();
484
485 if !config.ownership.verify_owner(&owner) {
486 tracing::error!(
487 "The chain with the ID returned by the faucet is not owned by you. \
488 Please make sure you are connecting to a genuine faucet."
489 );
490 return Err(error::Inner::ChainOwnership.into());
491 }
492
493 self.wallet()
495 .insert(
496 chain_id,
497 wallet::Chain {
498 owner: Some(owner),
499 timestamp: chain_description.timestamp(),
500 epoch: Some(chain_description.config().epoch),
501 ..Default::default()
502 },
503 )
504 .await
505 .map_err(error::Inner::wallet)
506 .context("assigning new chain")?;
507 Ok(())
508 }
509
510 pub async fn apply_client_command<E, F, Fut, T>(
515 &mut self,
516 client: &ChainClient<Env>,
517 mut f: F,
518 ) -> Result<T, Error>
519 where
520 F: FnMut(&ChainClient<Env>) -> Fut,
521 Fut: Future<Output = Result<ClientOutcome<T>, E>>,
522 Error: From<E>,
523 {
524 client.prepare_chain().await?;
525 let result = f(client).await;
527 self.update_wallet_from_client(client).await?;
528 if let ClientOutcome::Committed(t) = result? {
529 return Ok(t);
530 }
531
532 let (listener, _listen_handle, mut notification_stream) = client.listen().await?;
534 self.chain_listeners.spawn_task(listener);
535
536 loop {
537 let result = f(client).await;
539 self.update_wallet_from_client(client).await?;
540 let timeout = match result? {
541 ClientOutcome::Committed(t) => return Ok(t),
542 ClientOutcome::WaitForTimeout(timeout) => timeout,
543 };
544 util::wait_for_next_round(&mut notification_stream, timeout).await;
546 }
547 }
548
549 pub async fn ownership(&mut self, chain_id: Option<ChainId>) -> Result<ChainOwnership, Error> {
550 let chain_id = chain_id.unwrap_or_else(|| self.default_chain());
551 let client = self.make_chain_client(chain_id).await?;
552 let info = client.chain_info().await?;
553 Ok(info.manager.ownership)
554 }
555
556 pub async fn change_ownership(
557 &mut self,
558 chain_id: Option<ChainId>,
559 ownership_config: ChainOwnershipConfig,
560 ) -> Result<(), Error> {
561 let chain_id = chain_id.unwrap_or_else(|| self.default_chain());
562 let chain_client = self.make_chain_client(chain_id).await?;
563 info!(
564 ?ownership_config, %chain_id, preferred_owner=?chain_client.preferred_owner(),
565 "Changing ownership of a chain"
566 );
567 let time_start = Instant::now();
568 let mut ownership = chain_client.query_chain_ownership().await?;
569 ownership_config.update(&mut ownership)?;
570
571 if ownership.super_owners.is_empty() && ownership.owners.is_empty() {
572 tracing::error!("At least one owner or super owner of the chain has to be set.");
573 return Err(error::Inner::ChainOwnership.into());
574 }
575
576 let certificate = self
577 .apply_client_command(&chain_client, |chain_client| {
578 let ownership = ownership.clone();
579 let chain_client = chain_client.clone();
580 async move {
581 chain_client
582 .change_ownership(ownership)
583 .await
584 .map_err(Error::from)
585 .context("Failed to change ownership")
586 }
587 })
588 .await?;
589 let time_total = time_start.elapsed();
590 info!("Operation confirmed after {} ms", time_total.as_millis());
591 debug!("{:?}", certificate);
592 Ok(())
593 }
594
595 pub async fn set_preferred_owner(
596 &mut self,
597 chain_id: Option<ChainId>,
598 preferred_owner: AccountOwner,
599 ) -> Result<(), Error> {
600 let chain_id = chain_id.unwrap_or_else(|| self.default_chain());
601 let mut chain_client = self.make_chain_client(chain_id).await?;
602 let old_owner = chain_client.preferred_owner();
603 info!(%chain_id, ?old_owner, %preferred_owner, "Changing preferred owner for chain");
604 chain_client.set_preferred_owner(preferred_owner);
605 self.update_wallet_from_client(&chain_client).await?;
606 info!("New preferred owner set");
607 Ok(())
608 }
609
610 pub async fn check_compatible_version_info(
611 &self,
612 address: &str,
613 node: &impl ValidatorNode,
614 ) -> Result<VersionInfo, Error> {
615 match node.get_version_info().await {
616 Ok(version_info) if version_info.is_compatible_with(&linera_version::VERSION_INFO) => {
617 debug!(
618 "Version information for validator {address}: {}",
619 version_info
620 );
621 Ok(version_info)
622 }
623 Ok(version_info) => Err(error::Inner::UnexpectedVersionInfo {
624 remote: Box::new(version_info),
625 local: Box::new(linera_version::VERSION_INFO.clone()),
626 }
627 .into()),
628 Err(error) => Err(error::Inner::UnavailableVersionInfo {
629 address: address.to_string(),
630 error: Box::new(error),
631 }
632 .into()),
633 }
634 }
635
636 pub async fn check_matching_network_description(
637 &self,
638 address: &str,
639 node: &impl ValidatorNode,
640 ) -> Result<CryptoHash, Error> {
641 let network_description = self.genesis_config.network_description();
642 match node.get_network_description().await {
643 Ok(description) => {
644 if description == network_description {
645 Ok(description.genesis_config_hash)
646 } else {
647 Err(error::Inner::UnexpectedNetworkDescription {
648 remote: Box::new(description),
649 local: Box::new(network_description),
650 }
651 .into())
652 }
653 }
654 Err(error) => Err(error::Inner::UnavailableNetworkDescription {
655 address: address.to_string(),
656 error: Box::new(error),
657 }
658 .into()),
659 }
660 }
661
662 pub async fn check_validator_chain_info_response(
663 &self,
664 public_key: Option<&ValidatorPublicKey>,
665 address: &str,
666 node: &impl ValidatorNode,
667 chain_id: ChainId,
668 ) -> Result<ChainInfo, Error> {
669 let query = ChainInfoQuery::new(chain_id).with_manager_values();
670 match node.handle_chain_info_query(query).await {
671 Ok(response) => {
672 debug!(
673 "Validator {address} sees chain {chain_id} at block height {} and epoch {:?}",
674 response.info.next_block_height, response.info.epoch,
675 );
676 if let Some(public_key) = public_key {
677 if response.check(*public_key).is_ok() {
678 debug!("Signature for public key {public_key} is OK.");
679 } else {
680 return Err(error::Inner::InvalidSignature {
681 public_key: *public_key,
682 }
683 .into());
684 }
685 } else {
686 warn!("Not checking signature as public key was not given");
687 }
688 Ok(*response.info)
689 }
690 Err(error) => Err(error::Inner::UnavailableChainInfo {
691 address: address.to_string(),
692 chain_id,
693 error: Box::new(error),
694 }
695 .into()),
696 }
697 }
698
699 pub async fn query_validator(
703 &self,
704 address: &str,
705 node: &impl ValidatorNode,
706 chain_id: ChainId,
707 public_key: Option<&ValidatorPublicKey>,
708 ) -> ValidatorQueryResults {
709 let version_info = self.check_compatible_version_info(address, node).await;
710 let genesis_config_hash = self.check_matching_network_description(address, node).await;
711 let chain_info = self
712 .check_validator_chain_info_response(public_key, address, node, chain_id)
713 .await;
714
715 ValidatorQueryResults {
716 version_info,
717 genesis_config_hash,
718 chain_info,
719 }
720 }
721
722 pub async fn query_local_node(
726 &self,
727 chain_id: ChainId,
728 ) -> Result<ValidatorQueryResults, Error> {
729 let version_info = Ok(linera_version::VERSION_INFO.clone());
730 let genesis_config_hash = Ok(self
731 .genesis_config
732 .network_description()
733 .genesis_config_hash);
734 let chain_info = self
735 .make_chain_client(chain_id)
736 .await?
737 .chain_info_with_manager_values()
738 .await
739 .map(|info| *info)
740 .map_err(|e| e.into());
741
742 Ok(ValidatorQueryResults {
743 version_info,
744 genesis_config_hash,
745 chain_info,
746 })
747 }
748}
749
750#[cfg(feature = "fs")]
751impl<Env: Environment> ClientContext<Env> {
752 pub async fn publish_module(
753 &mut self,
754 chain_client: &ChainClient<Env>,
755 contract: PathBuf,
756 service: PathBuf,
757 vm_runtime: VmRuntime,
758 ) -> Result<ModuleId, Error> {
759 info!("Loading bytecode files");
760 let contract_bytecode = Bytecode::load_from_file(&contract)
761 .with_context(|| format!("failed to load contract bytecode from {:?}", &contract))?;
762 let service_bytecode = Bytecode::load_from_file(&service)
763 .with_context(|| format!("failed to load service bytecode from {:?}", &service))?;
764
765 info!("Publishing module");
766 let (blobs, module_id) =
767 create_bytecode_blobs(contract_bytecode, service_bytecode, vm_runtime).await;
768 let (module_id, _) = self
769 .apply_client_command(chain_client, |chain_client| {
770 let blobs = blobs.clone();
771 let chain_client = chain_client.clone();
772 async move {
773 chain_client
774 .publish_module_blobs(blobs, module_id)
775 .await
776 .context("Failed to publish module")
777 }
778 })
779 .await?;
780
781 info!("{}", "Module published successfully!");
782
783 info!("Synchronizing client and processing inbox");
784 self.process_inbox(chain_client).await?;
785 Ok(module_id)
786 }
787
788 pub async fn publish_data_blob(
789 &mut self,
790 chain_client: &ChainClient<Env>,
791 blob_path: PathBuf,
792 ) -> Result<CryptoHash, Error> {
793 info!("Loading data blob file");
794 let blob_bytes = fs::read(&blob_path).context(format!(
795 "failed to load data blob bytes from {:?}",
796 &blob_path
797 ))?;
798
799 info!("Publishing data blob");
800 self.apply_client_command(chain_client, |chain_client| {
801 let blob_bytes = blob_bytes.clone();
802 let chain_client = chain_client.clone();
803 async move {
804 chain_client
805 .publish_data_blob(blob_bytes)
806 .await
807 .context("Failed to publish data blob")
808 }
809 })
810 .await?;
811
812 info!("{}", "Data blob published successfully!");
813 Ok(CryptoHash::new(&BlobContent::new_data(blob_bytes)))
814 }
815
816 pub async fn read_data_blob(
818 &mut self,
819 chain_client: &ChainClient<Env>,
820 hash: CryptoHash,
821 ) -> Result<(), Error> {
822 info!("Verifying data blob");
823 self.apply_client_command(chain_client, |chain_client| {
824 let chain_client = chain_client.clone();
825 async move {
826 chain_client
827 .read_data_blob(hash)
828 .await
829 .context("Failed to verify data blob")
830 }
831 })
832 .await?;
833
834 info!("{}", "Data blob verified successfully!");
835 Ok(())
836 }
837}
838
839#[cfg(not(web))]
840impl<Env: Environment> ClientContext<Env> {
841 pub async fn prepare_for_benchmark(
842 &mut self,
843 num_chains: usize,
844 transactions_per_block: usize,
845 tokens_per_chain: Amount,
846 fungible_application_id: Option<ApplicationId>,
847 pub_keys: Vec<AccountPublicKey>,
848 chains_config_path: Option<&Path>,
849 ) -> Result<(Vec<ChainClient<Env>>, Vec<Vec<Operation>>), Error> {
850 let start = Instant::now();
851 self.process_inboxes_and_force_validator_updates().await;
855 info!(
856 "Processed inboxes and forced validator updates in {} ms",
857 start.elapsed().as_millis()
858 );
859
860 let start = Instant::now();
861 let (benchmark_chains, chain_clients) = self
862 .make_benchmark_chains(
863 num_chains,
864 tokens_per_chain,
865 pub_keys,
866 chains_config_path.is_some(),
867 )
868 .await?;
869 info!(
870 "Got {} chains in {} ms",
871 num_chains,
872 start.elapsed().as_millis()
873 );
874
875 if let Some(id) = fungible_application_id {
876 let start = Instant::now();
877 self.supply_fungible_tokens(&benchmark_chains, id).await?;
878 info!(
879 "Supplied fungible tokens in {} ms",
880 start.elapsed().as_millis()
881 );
882 let start = Instant::now();
884 for chain_client in &chain_clients {
885 chain_client.process_inbox().await?;
886 }
887 info!(
888 "Processed inboxes after supplying fungible tokens in {} ms",
889 start.elapsed().as_millis()
890 );
891 }
892
893 let all_chains = Benchmark::<Env>::get_all_chains(chains_config_path, &benchmark_chains)?;
894 let known_chain_ids: HashSet<_> = benchmark_chains.iter().map(|(id, _)| *id).collect();
895 let unknown_chain_ids: Vec<_> = all_chains
896 .iter()
897 .filter(|id| !known_chain_ids.contains(id))
898 .copied()
899 .collect();
900 if !unknown_chain_ids.is_empty() {
901 for chain_id in &unknown_chain_ids {
905 self.client.get_chain_description(*chain_id).await?;
906 }
907 }
908
909 let blocks_infos = Benchmark::<Env>::make_benchmark_block_info(
910 benchmark_chains,
911 transactions_per_block,
912 fungible_application_id,
913 all_chains,
914 )?;
915
916 Ok((chain_clients, blocks_infos))
917 }
918
919 pub async fn wrap_up_benchmark(
920 &mut self,
921 chain_clients: Vec<ChainClient<Env>>,
922 close_chains: bool,
923 wrap_up_max_in_flight: usize,
924 ) -> Result<(), Error> {
925 if close_chains {
926 info!("Closing chains...");
927 let stream = stream::iter(chain_clients)
928 .map(|chain_client| async move {
929 Benchmark::<Env>::close_benchmark_chain(&chain_client).await?;
930 info!("Closed chain {:?}", chain_client.chain_id());
931 Ok::<(), BenchmarkError>(())
932 })
933 .buffer_unordered(wrap_up_max_in_flight);
934 stream.try_collect::<Vec<_>>().await?;
935 } else {
936 info!("Processing inbox for all chains...");
937 let stream = stream::iter(chain_clients.clone())
938 .map(|chain_client| async move {
939 chain_client.process_inbox().await?;
940 info!("Processed inbox for chain {:?}", chain_client.chain_id());
941 Ok::<(), ChainClientError>(())
942 })
943 .buffer_unordered(wrap_up_max_in_flight);
944 stream.try_collect::<Vec<_>>().await?;
945
946 info!("Updating wallet from chain clients...");
947 for chain_client in chain_clients {
948 let info = chain_client.chain_info().await?;
949 let client_owner = chain_client.preferred_owner();
950 let pending_proposal = chain_client.pending_proposal().clone();
951 self.wallet()
952 .insert(
953 info.chain_id,
954 wallet::Chain {
955 pending_proposal,
956 owner: client_owner,
957 ..info.as_ref().into()
958 },
959 )
960 .await
961 .map_err(error::Inner::wallet)?;
962 }
963 }
964
965 Ok(())
966 }
967
968 async fn process_inboxes_and_force_validator_updates(&mut self) {
969 let mut join_set = task::JoinSet::new();
970
971 let chain_clients: Vec<_> = self
972 .wallet()
973 .owned_chain_ids()
974 .map_err(|e| error::Inner::wallet(e).into())
975 .and_then(|id| self.make_chain_client(id))
976 .try_collect()
977 .await
978 .unwrap();
979
980 for chain_client in chain_clients {
981 join_set.spawn(async move {
982 Self::process_inbox_without_updating_wallet(&chain_client)
983 .await
984 .expect("Processing inbox should not fail!");
985 chain_client
986 });
987 }
988
989 for chain_client in join_set.join_all().await {
990 self.update_wallet_from_client(&chain_client).await.unwrap();
991 }
992 }
993
994 async fn process_inbox_without_updating_wallet(
995 chain_client: &ChainClient<Env>,
996 ) -> Result<Vec<ConfirmedBlockCertificate>, Error> {
997 chain_client.synchronize_from_validators().await?;
999 let (certificates, maybe_timeout) = chain_client.process_inbox_without_prepare().await?;
1000 assert!(
1001 maybe_timeout.is_none(),
1002 "Should not timeout within benchmark!"
1003 );
1004
1005 Ok(certificates)
1006 }
1007
1008 async fn make_benchmark_chains(
1011 &mut self,
1012 num_chains: usize,
1013 balance: Amount,
1014 pub_keys: Vec<AccountPublicKey>,
1015 wallet_only: bool,
1016 ) -> Result<(Vec<(ChainId, AccountOwner)>, Vec<ChainClient<Env>>), Error> {
1017 let mut chains_found_in_wallet = 0;
1018 let mut benchmark_chains = Vec::with_capacity(num_chains);
1019 let mut chain_clients = Vec::with_capacity(num_chains);
1020 let start = Instant::now();
1021 let mut owned_chain_ids = std::pin::pin!(self.wallet().owned_chain_ids());
1022 while let Some(chain_id) = owned_chain_ids.next().await {
1023 let chain_id = chain_id.map_err(error::Inner::wallet)?;
1024 if chains_found_in_wallet == num_chains {
1025 break;
1026 }
1027 let chain_client = self.make_chain_client(chain_id).await?;
1028 let ownership = chain_client.chain_info().await?.manager.ownership;
1029 if !ownership.owners.is_empty() || ownership.super_owners.len() != 1 {
1030 continue;
1031 }
1032 let owner = *ownership.super_owners.first().unwrap();
1033 chain_client.process_inbox().await?;
1034 benchmark_chains.push((chain_id, owner));
1035 chain_clients.push(chain_client);
1036 chains_found_in_wallet += 1;
1037 }
1038 info!(
1039 "Got {} chains from the wallet in {} ms",
1040 benchmark_chains.len(),
1041 start.elapsed().as_millis()
1042 );
1043
1044 let num_chains_to_create = num_chains - chains_found_in_wallet;
1045
1046 let default_chain_client = self.make_chain_client(self.default_chain()).await?;
1047
1048 if num_chains_to_create > 0 {
1049 if wallet_only {
1050 return Err(
1051 error::Inner::Benchmark(BenchmarkError::NotEnoughChainsInWallet(
1052 num_chains,
1053 chains_found_in_wallet,
1054 ))
1055 .into(),
1056 );
1057 }
1058 let mut pub_keys_iter = pub_keys.into_iter().take(num_chains_to_create);
1059 let operations_per_block = 900; for i in (0..num_chains_to_create).step_by(operations_per_block) {
1061 let num_new_chains = operations_per_block.min(num_chains_to_create - i);
1062 let pub_key = pub_keys_iter.next().unwrap();
1063 let owner = pub_key.into();
1064
1065 let certificate = Self::execute_open_chains_operations(
1066 num_new_chains,
1067 &default_chain_client,
1068 balance,
1069 owner,
1070 )
1071 .await?;
1072 info!("Block executed successfully");
1073
1074 let block = certificate.block();
1075 for i in 0..num_new_chains {
1076 let chain_id = block.body.blobs[i]
1077 .iter()
1078 .find(|blob| blob.id().blob_type == BlobType::ChainDescription)
1079 .map(|blob| ChainId(blob.id().hash))
1080 .expect("failed to create a new chain");
1081 self.client
1082 .extend_chain_mode(chain_id, ListeningMode::FullChain);
1083
1084 let mut chain_client = self.client.create_chain_client(
1085 chain_id,
1086 None,
1087 BlockHeight::ZERO,
1088 None,
1089 Some(owner),
1090 self.timing_sender(),
1091 );
1092 chain_client.set_preferred_owner(owner);
1093 chain_client.process_inbox().await?;
1094 benchmark_chains.push((chain_id, owner));
1095 chain_clients.push(chain_client);
1096 }
1097 }
1098
1099 info!(
1100 "Created {} chains in {} ms",
1101 num_chains_to_create,
1102 start.elapsed().as_millis()
1103 );
1104 }
1105
1106 info!("Updating wallet from client");
1107 self.update_wallet_from_client(&default_chain_client)
1108 .await?;
1109 info!("Retrying pending outgoing messages");
1110 default_chain_client
1111 .retry_pending_outgoing_messages()
1112 .await
1113 .context("outgoing messages to create the new chains should be delivered")?;
1114 info!("Processing default chain inbox");
1115 default_chain_client.process_inbox().await?;
1116
1117 Ok((benchmark_chains, chain_clients))
1118 }
1119
1120 async fn execute_open_chains_operations(
1121 num_new_chains: usize,
1122 chain_client: &ChainClient<Env>,
1123 balance: Amount,
1124 owner: AccountOwner,
1125 ) -> Result<ConfirmedBlockCertificate, Error> {
1126 let config = OpenChainConfig {
1127 ownership: ChainOwnership::single_super(owner),
1128 balance,
1129 application_permissions: Default::default(),
1130 };
1131 let operations = iter::repeat_n(
1132 Operation::system(SystemOperation::OpenChain(config)),
1133 num_new_chains,
1134 )
1135 .collect();
1136 info!("Executing {} OpenChain operations", num_new_chains);
1137 Ok(chain_client
1138 .execute_operations(operations, vec![])
1139 .await?
1140 .expect("should execute block with OpenChain operations"))
1141 }
1142
1143 async fn supply_fungible_tokens(
1145 &mut self,
1146 key_pairs: &[(ChainId, AccountOwner)],
1147 application_id: ApplicationId,
1148 ) -> Result<(), Error> {
1149 let default_chain_id = self.default_chain();
1150 let default_key = self
1151 .wallet()
1152 .get(default_chain_id)
1153 .await
1154 .unwrap()
1155 .unwrap()
1156 .owner
1157 .unwrap();
1158 let amount = Amount::from_nanos(4);
1160 let operations: Vec<Operation> = key_pairs
1161 .iter()
1162 .map(|(chain_id, owner)| {
1163 Benchmark::<Env>::fungible_transfer(
1164 application_id,
1165 *chain_id,
1166 default_key,
1167 *owner,
1168 amount,
1169 )
1170 })
1171 .collect();
1172 let chain_client = self.make_chain_client(default_chain_id).await?;
1173 for operation_chunk in operations.chunks(1000) {
1175 chain_client
1176 .execute_operations(operation_chunk.to_vec(), vec![])
1177 .await?
1178 .expect("should execute block with Transfer operations");
1179 }
1180 self.update_wallet_from_client(&chain_client).await?;
1181
1182 Ok(())
1183 }
1184}