linera_client/
client_context.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::sync::Arc;
5
6use futures::{Future, StreamExt as _, TryStreamExt as _};
7use linera_base::{
8    crypto::{CryptoHash, ValidatorPublicKey},
9    data_types::{Epoch, Timestamp},
10    identifiers::{Account, AccountOwner, ChainId},
11    ownership::ChainOwnership,
12    time::{Duration, Instant},
13    util::future::FutureSyncExt as _,
14};
15use linera_chain::{manager::LockingBlock, types::ConfirmedBlockCertificate};
16use linera_core::{
17    client::{ChainClient, Client, ListeningMode},
18    data_types::{ChainInfo, ChainInfoQuery, ClientOutcome},
19    join_set_ext::JoinSet,
20    node::ValidatorNode,
21    wallet, Environment, JoinSetExt as _, Wallet as _,
22};
23use linera_rpc::node_provider::{NodeOptions, NodeProvider};
24use linera_version::VersionInfo;
25use thiserror_context::Context;
26use tracing::{debug, info, warn};
27#[cfg(not(web))]
28use {
29    crate::{
30        benchmark::{Benchmark, BenchmarkError},
31        client_metrics::ClientMetrics,
32    },
33    futures::stream,
34    linera_base::{
35        crypto::AccountPublicKey,
36        data_types::{Amount, BlockHeight},
37        identifiers::{ApplicationId, BlobType},
38    },
39    linera_core::client::ChainClientError,
40    linera_execution::{
41        system::{OpenChainConfig, SystemOperation},
42        Operation,
43    },
44    std::{collections::HashSet, iter, path::Path},
45    tokio::{sync::mpsc, task},
46};
47#[cfg(feature = "fs")]
48use {
49    linera_base::{
50        data_types::{BlobContent, Bytecode},
51        identifiers::ModuleId,
52        vm::VmRuntime,
53    },
54    linera_core::client::create_bytecode_blobs,
55    std::{fs, path::PathBuf},
56};
57
58use crate::{
59    chain_listener::{self, ClientContext as _},
60    client_options::{ChainOwnershipConfig, Options},
61    config::GenesisConfig,
62    error, util, Error,
63};
64
65/// Results from querying a validator about version, network description, and chain info.
66pub struct ValidatorQueryResults {
67    /// The validator's version information.
68    pub version_info: Result<VersionInfo, Error>,
69    /// The validator's genesis config hash.
70    pub genesis_config_hash: Result<CryptoHash, Error>,
71    /// The validator's chain info (if valid and signature check passed).
72    pub chain_info: Result<ChainInfo, Error>,
73}
74
75impl ValidatorQueryResults {
76    /// Returns a vector of references to all errors in the query results.
77    pub fn errors(&self) -> Vec<&Error> {
78        let mut errors = Vec::new();
79        if let Err(e) = &self.version_info {
80            errors.push(e);
81        }
82        if let Err(e) = &self.genesis_config_hash {
83            errors.push(e);
84        }
85        if let Err(e) = &self.chain_info {
86            errors.push(e);
87        }
88        errors
89    }
90
91    /// Prints validator information to stdout.
92    ///
93    /// Prints public key, address, and optionally weight, version info, and chain info.
94    /// If `reference` is provided, only prints fields that differ from the reference.
95    pub fn print(
96        &self,
97        public_key: Option<&ValidatorPublicKey>,
98        address: Option<&str>,
99        weight: Option<u64>,
100        reference: Option<&ValidatorQueryResults>,
101    ) {
102        if let Some(key) = public_key {
103            println!("Public key: {}", key);
104        }
105        if let Some(address) = address {
106            println!("Address: {}", address);
107        }
108        if let Some(w) = weight {
109            println!("Weight: {}", w);
110        }
111
112        let ref_version = reference.and_then(|ref_results| ref_results.version_info.as_ref().ok());
113        match &self.version_info {
114            Ok(version_info) => {
115                if ref_version.is_none_or(|ref_v| ref_v.crate_version != version_info.crate_version)
116                {
117                    println!("Linera protocol: v{}", version_info.crate_version);
118                }
119                if ref_version.is_none_or(|ref_v| ref_v.rpc_hash != version_info.rpc_hash) {
120                    println!("RPC API hash: {}", version_info.rpc_hash);
121                }
122                if ref_version.is_none_or(|ref_v| ref_v.graphql_hash != version_info.graphql_hash) {
123                    println!("GraphQL API hash: {}", version_info.graphql_hash);
124                }
125                if ref_version.is_none_or(|ref_v| ref_v.wit_hash != version_info.wit_hash) {
126                    println!("WIT API hash: v{}", version_info.wit_hash);
127                }
128                if ref_version.is_none_or(|ref_v| {
129                    (&ref_v.git_commit, ref_v.git_dirty)
130                        != (&version_info.git_commit, version_info.git_dirty)
131                }) {
132                    println!(
133                        "Source code: {}/tree/{}{}",
134                        env!("CARGO_PKG_REPOSITORY"),
135                        version_info.git_commit,
136                        if version_info.git_dirty {
137                            " (dirty)"
138                        } else {
139                            ""
140                        }
141                    );
142                }
143            }
144            Err(err) => println!("Error getting version info: {err}"),
145        }
146
147        let ref_genesis_hash =
148            reference.and_then(|ref_results| ref_results.genesis_config_hash.as_ref().ok());
149        match &self.genesis_config_hash {
150            Ok(hash) if ref_genesis_hash.is_some_and(|ref_hash| ref_hash == hash) => {}
151            Ok(hash) => println!("Genesis config hash: {hash}"),
152            Err(err) => println!("Error getting genesis config: {err}"),
153        }
154
155        let ref_info = reference.and_then(|ref_results| ref_results.chain_info.as_ref().ok());
156        match &self.chain_info {
157            Ok(info) => {
158                if ref_info.is_none_or(|ref_info| info.block_hash != ref_info.block_hash) {
159                    if let Some(hash) = info.block_hash {
160                        println!("Block hash: {}", hash);
161                    } else {
162                        println!("Block hash: None");
163                    }
164                }
165                if ref_info
166                    .is_none_or(|ref_info| info.next_block_height != ref_info.next_block_height)
167                {
168                    println!("Next height: {}", info.next_block_height);
169                }
170                if ref_info.is_none_or(|ref_info| info.timestamp != ref_info.timestamp) {
171                    println!("Timestamp: {}", info.timestamp);
172                }
173                if ref_info.is_none_or(|ref_info| info.epoch != ref_info.epoch) {
174                    println!("Epoch: {}", info.epoch);
175                }
176                if ref_info.is_none_or(|ref_info| {
177                    info.manager.current_round != ref_info.manager.current_round
178                }) {
179                    println!("Round: {}", info.manager.current_round);
180                }
181                if let Some(locking) = &info.manager.requested_locking {
182                    match &**locking {
183                        LockingBlock::Fast(proposal) => {
184                            println!(
185                                "Locking fast block from {}",
186                                proposal.content.block.timestamp
187                            );
188                        }
189                        LockingBlock::Regular(validated) => {
190                            println!(
191                                "Locking block {} in {} from {}",
192                                validated.hash(),
193                                validated.round,
194                                validated.block().header.timestamp
195                            );
196                        }
197                    }
198                }
199            }
200            Err(err) => println!("Error getting chain info: {err}"),
201        }
202    }
203}
204
205pub struct ClientContext<Env: Environment> {
206    pub client: Arc<Client<Env>>,
207    // TODO(#5083): this doesn't really need to be stored
208    pub genesis_config: crate::config::GenesisConfig,
209    pub send_timeout: Duration,
210    pub recv_timeout: Duration,
211    pub retry_delay: Duration,
212    pub max_retries: u32,
213    pub chain_listeners: JoinSet,
214    // TODO(#5082): move this into the upstream UI layers (maybe just the CLI)
215    pub default_chain: Option<ChainId>,
216    #[cfg(not(web))]
217    pub client_metrics: Option<ClientMetrics>,
218}
219
220impl<Env: Environment> chain_listener::ClientContext for ClientContext<Env> {
221    type Environment = Env;
222
223    fn wallet(&self) -> &Env::Wallet {
224        self.client.wallet()
225    }
226
227    fn storage(&self) -> &Env::Storage {
228        self.client.storage_client()
229    }
230
231    fn client(&self) -> &Arc<Client<Env>> {
232        &self.client
233    }
234
235    #[cfg(not(web))]
236    fn timing_sender(
237        &self,
238    ) -> Option<mpsc::UnboundedSender<(u64, linera_core::client::TimingType)>> {
239        self.client_metrics
240            .as_ref()
241            .map(|metrics| metrics.timing_sender.clone())
242    }
243
244    async fn update_wallet_for_new_chain(
245        &mut self,
246        chain_id: ChainId,
247        owner: Option<AccountOwner>,
248        timestamp: Timestamp,
249        epoch: Epoch,
250    ) -> Result<(), Error> {
251        self.update_wallet_for_new_chain(chain_id, owner, timestamp, epoch)
252            .make_sync()
253            .await
254    }
255
256    async fn update_wallet(&mut self, client: &ChainClient<Env>) -> Result<(), Error> {
257        self.update_wallet_from_client(client).make_sync().await
258    }
259}
260
261impl<S, Si, W> ClientContext<linera_core::environment::Impl<S, NodeProvider, Si, W>>
262where
263    S: linera_core::environment::Storage,
264    Si: linera_core::environment::Signer,
265    W: linera_core::environment::Wallet,
266{
267    // not worth refactoring this because
268    // https://github.com/linera-io/linera-protocol/issues/5082
269    // https://github.com/linera-io/linera-protocol/issues/5083
270    #[allow(clippy::too_many_arguments)]
271    pub async fn new(
272        storage: S,
273        wallet: W,
274        signer: Si,
275        options: &Options,
276        default_chain: Option<ChainId>,
277        genesis_config: GenesisConfig,
278    ) -> Result<Self, Error> {
279        #[cfg(not(web))]
280        let timing_config = options.to_timing_config();
281        let node_provider = NodeProvider::new(NodeOptions {
282            send_timeout: options.send_timeout,
283            recv_timeout: options.recv_timeout,
284            retry_delay: options.retry_delay,
285            max_retries: options.max_retries,
286        });
287        let chain_modes: Vec<_> = wallet
288            .items()
289            .map_ok(|(id, _chain)| (id, ListeningMode::FullChain))
290            .try_collect()
291            .await
292            .map_err(error::Inner::wallet)?;
293        let name = match chain_modes.len() {
294            0 => "Client node".to_string(),
295            1 => format!("Client node for {:.8}", chain_modes[0].0),
296            n => format!(
297                "Client node for {:.8} and {} others",
298                chain_modes[0].0,
299                n - 1
300            ),
301        };
302
303        let client = Client::new(
304            linera_core::environment::Impl {
305                network: node_provider,
306                storage,
307                signer,
308                wallet,
309            },
310            genesis_config.admin_id(),
311            options.long_lived_services,
312            chain_modes,
313            name,
314            options.chain_worker_ttl,
315            options.sender_chain_worker_ttl,
316            options.to_chain_client_options(),
317            options.to_requests_scheduler_config(),
318        );
319
320        #[cfg(not(web))]
321        let client_metrics = if timing_config.enabled {
322            Some(ClientMetrics::new(timing_config))
323        } else {
324            None
325        };
326
327        Ok(ClientContext {
328            client: Arc::new(client),
329            default_chain,
330            genesis_config,
331            send_timeout: options.send_timeout,
332            recv_timeout: options.recv_timeout,
333            retry_delay: options.retry_delay,
334            max_retries: options.max_retries,
335            chain_listeners: JoinSet::default(),
336            #[cfg(not(web))]
337            client_metrics,
338        })
339    }
340}
341
342impl<Env: Environment> ClientContext<Env> {
343    // TODO(#5084) this (and other injected dependencies) should not be re-exposed by the
344    // client interface
345    /// Returns a reference to the wallet.
346    pub fn wallet(&self) -> &Env::Wallet {
347        self.client.wallet()
348    }
349
350    /// Returns the ID of the admin chain.
351    pub fn admin_chain(&self) -> ChainId {
352        self.client.admin_chain()
353    }
354
355    /// Retrieve the default account. Current this is the common account of the default
356    /// chain.
357    pub fn default_account(&self) -> Account {
358        Account::chain(self.default_chain())
359    }
360
361    /// Retrieve the default chain.
362    pub fn default_chain(&self) -> ChainId {
363        self.default_chain
364            .expect("default chain requested but none set")
365    }
366
367    pub async fn first_non_admin_chain(&self) -> Result<ChainId, Error> {
368        let admin_id = self.admin_chain();
369        std::pin::pin!(self
370            .wallet()
371            .chain_ids()
372            .try_filter(|chain_id| futures::future::ready(*chain_id != admin_id)))
373        .next()
374        .await
375        .expect("No non-admin chain specified in wallet with no non-admin chain")
376        .map_err(Error::wallet)
377    }
378
379    // TODO(#5084) this should match the `NodeProvider` from the `Environment`
380    pub fn make_node_provider(&self) -> NodeProvider {
381        NodeProvider::new(self.make_node_options())
382    }
383
384    fn make_node_options(&self) -> NodeOptions {
385        NodeOptions {
386            send_timeout: self.send_timeout,
387            recv_timeout: self.recv_timeout,
388            retry_delay: self.retry_delay,
389            max_retries: self.max_retries,
390        }
391    }
392
393    #[cfg(not(web))]
394    pub fn client_metrics(&self) -> Option<&ClientMetrics> {
395        self.client_metrics.as_ref()
396    }
397
398    pub async fn update_wallet_from_client<Env_: Environment>(
399        &self,
400        client: &ChainClient<Env_>,
401    ) -> Result<(), Error> {
402        let info = client.chain_info().await?;
403
404        self.wallet()
405            .insert(
406                info.chain_id,
407                wallet::Chain {
408                    pending_proposal: client.pending_proposal().clone(),
409                    owner: client.preferred_owner(),
410                    ..info.as_ref().into()
411                },
412            )
413            .await
414            .map_err(error::Inner::wallet)?;
415
416        Ok(())
417    }
418
419    /// Remembers the new chain and its owner (if any) in the wallet.
420    pub async fn update_wallet_for_new_chain(
421        &mut self,
422        chain_id: ChainId,
423        owner: Option<AccountOwner>,
424        timestamp: Timestamp,
425        epoch: Epoch,
426    ) -> Result<(), Error> {
427        let _ = self
428            .wallet()
429            .try_insert(
430                chain_id,
431                linera_core::wallet::Chain::new(owner, epoch, timestamp),
432            )
433            .await
434            .map_err(error::Inner::wallet)?;
435        Ok(())
436    }
437
438    pub async fn process_inbox(
439        &mut self,
440        chain_client: &ChainClient<Env>,
441    ) -> Result<Vec<ConfirmedBlockCertificate>, Error> {
442        let mut certificates = Vec::new();
443        // Try processing the inbox optimistically without waiting for validator notifications.
444        let (new_certificates, maybe_timeout) = {
445            chain_client.synchronize_from_validators().await?;
446            let result = chain_client.process_inbox_without_prepare().await;
447            self.update_wallet_from_client(chain_client).await?;
448            result?
449        };
450        certificates.extend(new_certificates);
451        if maybe_timeout.is_none() {
452            return Ok(certificates);
453        }
454
455        // Start listening for notifications, so we learn about new rounds and blocks.
456        let (listener, _listen_handle, mut notification_stream) = chain_client.listen().await?;
457        self.chain_listeners.spawn_task(listener);
458
459        loop {
460            let (new_certificates, maybe_timeout) = {
461                let result = chain_client.process_inbox().await;
462                self.update_wallet_from_client(chain_client).await?;
463                result?
464            };
465            certificates.extend(new_certificates);
466            if let Some(timestamp) = maybe_timeout {
467                util::wait_for_next_round(&mut notification_stream, timestamp).await
468            } else {
469                return Ok(certificates);
470            }
471        }
472    }
473
474    pub async fn assign_new_chain_to_key(
475        &mut self,
476        chain_id: ChainId,
477        owner: AccountOwner,
478    ) -> Result<(), Error> {
479        self.client
480            .extend_chain_mode(chain_id, ListeningMode::FullChain);
481        let client = self.make_chain_client(chain_id).await?;
482        let chain_description = client.get_chain_description().await?;
483        let config = chain_description.config();
484
485        if !config.ownership.verify_owner(&owner) {
486            tracing::error!(
487                "The chain with the ID returned by the faucet is not owned by you. \
488                Please make sure you are connecting to a genuine faucet."
489            );
490            return Err(error::Inner::ChainOwnership.into());
491        }
492
493        // TODO(#5247): Do not reset preexisting epoch and timestamp.
494        self.wallet()
495            .insert(
496                chain_id,
497                wallet::Chain {
498                    owner: Some(owner),
499                    timestamp: chain_description.timestamp(),
500                    epoch: Some(chain_description.config().epoch),
501                    ..Default::default()
502                },
503            )
504            .await
505            .map_err(error::Inner::wallet)
506            .context("assigning new chain")?;
507        Ok(())
508    }
509
510    /// Applies the given function to the chain client.
511    ///
512    /// Updates the wallet regardless of the outcome. As long as the function returns a round
513    /// timeout, it will wait and retry.
514    pub async fn apply_client_command<E, F, Fut, T>(
515        &mut self,
516        client: &ChainClient<Env>,
517        mut f: F,
518    ) -> Result<T, Error>
519    where
520        F: FnMut(&ChainClient<Env>) -> Fut,
521        Fut: Future<Output = Result<ClientOutcome<T>, E>>,
522        Error: From<E>,
523    {
524        client.prepare_chain().await?;
525        // Try applying f optimistically without validator notifications. Return if committed.
526        let result = f(client).await;
527        self.update_wallet_from_client(client).await?;
528        if let ClientOutcome::Committed(t) = result? {
529            return Ok(t);
530        }
531
532        // Start listening for notifications, so we learn about new rounds and blocks.
533        let (listener, _listen_handle, mut notification_stream) = client.listen().await?;
534        self.chain_listeners.spawn_task(listener);
535
536        loop {
537            // Try applying f. Return if committed.
538            let result = f(client).await;
539            self.update_wallet_from_client(client).await?;
540            let timeout = match result? {
541                ClientOutcome::Committed(t) => return Ok(t),
542                ClientOutcome::WaitForTimeout(timeout) => timeout,
543            };
544            // Otherwise wait and try again in the next round.
545            util::wait_for_next_round(&mut notification_stream, timeout).await;
546        }
547    }
548
549    pub async fn ownership(&mut self, chain_id: Option<ChainId>) -> Result<ChainOwnership, Error> {
550        let chain_id = chain_id.unwrap_or_else(|| self.default_chain());
551        let client = self.make_chain_client(chain_id).await?;
552        let info = client.chain_info().await?;
553        Ok(info.manager.ownership)
554    }
555
556    pub async fn change_ownership(
557        &mut self,
558        chain_id: Option<ChainId>,
559        ownership_config: ChainOwnershipConfig,
560    ) -> Result<(), Error> {
561        let chain_id = chain_id.unwrap_or_else(|| self.default_chain());
562        let chain_client = self.make_chain_client(chain_id).await?;
563        info!(
564            ?ownership_config, %chain_id, preferred_owner=?chain_client.preferred_owner(),
565            "Changing ownership of a chain"
566        );
567        let time_start = Instant::now();
568        let mut ownership = chain_client.query_chain_ownership().await?;
569        ownership_config.update(&mut ownership)?;
570
571        if ownership.super_owners.is_empty() && ownership.owners.is_empty() {
572            tracing::error!("At least one owner or super owner of the chain has to be set.");
573            return Err(error::Inner::ChainOwnership.into());
574        }
575
576        let certificate = self
577            .apply_client_command(&chain_client, |chain_client| {
578                let ownership = ownership.clone();
579                let chain_client = chain_client.clone();
580                async move {
581                    chain_client
582                        .change_ownership(ownership)
583                        .await
584                        .map_err(Error::from)
585                        .context("Failed to change ownership")
586                }
587            })
588            .await?;
589        let time_total = time_start.elapsed();
590        info!("Operation confirmed after {} ms", time_total.as_millis());
591        debug!("{:?}", certificate);
592        Ok(())
593    }
594
595    pub async fn set_preferred_owner(
596        &mut self,
597        chain_id: Option<ChainId>,
598        preferred_owner: AccountOwner,
599    ) -> Result<(), Error> {
600        let chain_id = chain_id.unwrap_or_else(|| self.default_chain());
601        let mut chain_client = self.make_chain_client(chain_id).await?;
602        let old_owner = chain_client.preferred_owner();
603        info!(%chain_id, ?old_owner, %preferred_owner, "Changing preferred owner for chain");
604        chain_client.set_preferred_owner(preferred_owner);
605        self.update_wallet_from_client(&chain_client).await?;
606        info!("New preferred owner set");
607        Ok(())
608    }
609
610    pub async fn check_compatible_version_info(
611        &self,
612        address: &str,
613        node: &impl ValidatorNode,
614    ) -> Result<VersionInfo, Error> {
615        match node.get_version_info().await {
616            Ok(version_info) if version_info.is_compatible_with(&linera_version::VERSION_INFO) => {
617                debug!(
618                    "Version information for validator {address}: {}",
619                    version_info
620                );
621                Ok(version_info)
622            }
623            Ok(version_info) => Err(error::Inner::UnexpectedVersionInfo {
624                remote: Box::new(version_info),
625                local: Box::new(linera_version::VERSION_INFO.clone()),
626            }
627            .into()),
628            Err(error) => Err(error::Inner::UnavailableVersionInfo {
629                address: address.to_string(),
630                error: Box::new(error),
631            }
632            .into()),
633        }
634    }
635
636    pub async fn check_matching_network_description(
637        &self,
638        address: &str,
639        node: &impl ValidatorNode,
640    ) -> Result<CryptoHash, Error> {
641        let network_description = self.genesis_config.network_description();
642        match node.get_network_description().await {
643            Ok(description) => {
644                if description == network_description {
645                    Ok(description.genesis_config_hash)
646                } else {
647                    Err(error::Inner::UnexpectedNetworkDescription {
648                        remote: Box::new(description),
649                        local: Box::new(network_description),
650                    }
651                    .into())
652                }
653            }
654            Err(error) => Err(error::Inner::UnavailableNetworkDescription {
655                address: address.to_string(),
656                error: Box::new(error),
657            }
658            .into()),
659        }
660    }
661
662    pub async fn check_validator_chain_info_response(
663        &self,
664        public_key: Option<&ValidatorPublicKey>,
665        address: &str,
666        node: &impl ValidatorNode,
667        chain_id: ChainId,
668    ) -> Result<ChainInfo, Error> {
669        let query = ChainInfoQuery::new(chain_id).with_manager_values();
670        match node.handle_chain_info_query(query).await {
671            Ok(response) => {
672                debug!(
673                    "Validator {address} sees chain {chain_id} at block height {} and epoch {:?}",
674                    response.info.next_block_height, response.info.epoch,
675                );
676                if let Some(public_key) = public_key {
677                    if response.check(*public_key).is_ok() {
678                        debug!("Signature for public key {public_key} is OK.");
679                    } else {
680                        return Err(error::Inner::InvalidSignature {
681                            public_key: *public_key,
682                        }
683                        .into());
684                    }
685                } else {
686                    warn!("Not checking signature as public key was not given");
687                }
688                Ok(*response.info)
689            }
690            Err(error) => Err(error::Inner::UnavailableChainInfo {
691                address: address.to_string(),
692                chain_id,
693                error: Box::new(error),
694            }
695            .into()),
696        }
697    }
698
699    /// Query a validator for version info, network description, and chain info.
700    ///
701    /// Returns a `ValidatorQueryResults` struct with the results of all three queries.
702    pub async fn query_validator(
703        &self,
704        address: &str,
705        node: &impl ValidatorNode,
706        chain_id: ChainId,
707        public_key: Option<&ValidatorPublicKey>,
708    ) -> ValidatorQueryResults {
709        let version_info = self.check_compatible_version_info(address, node).await;
710        let genesis_config_hash = self.check_matching_network_description(address, node).await;
711        let chain_info = self
712            .check_validator_chain_info_response(public_key, address, node, chain_id)
713            .await;
714
715        ValidatorQueryResults {
716            version_info,
717            genesis_config_hash,
718            chain_info,
719        }
720    }
721
722    /// Query the local node for version info, network description, and chain info.
723    ///
724    /// Returns a `ValidatorQueryResults` struct with the local node's information.
725    pub async fn query_local_node(
726        &self,
727        chain_id: ChainId,
728    ) -> Result<ValidatorQueryResults, Error> {
729        let version_info = Ok(linera_version::VERSION_INFO.clone());
730        let genesis_config_hash = Ok(self
731            .genesis_config
732            .network_description()
733            .genesis_config_hash);
734        let chain_info = self
735            .make_chain_client(chain_id)
736            .await?
737            .chain_info_with_manager_values()
738            .await
739            .map(|info| *info)
740            .map_err(|e| e.into());
741
742        Ok(ValidatorQueryResults {
743            version_info,
744            genesis_config_hash,
745            chain_info,
746        })
747    }
748}
749
750#[cfg(feature = "fs")]
751impl<Env: Environment> ClientContext<Env> {
752    pub async fn publish_module(
753        &mut self,
754        chain_client: &ChainClient<Env>,
755        contract: PathBuf,
756        service: PathBuf,
757        vm_runtime: VmRuntime,
758    ) -> Result<ModuleId, Error> {
759        info!("Loading bytecode files");
760        let contract_bytecode = Bytecode::load_from_file(&contract)
761            .with_context(|| format!("failed to load contract bytecode from {:?}", &contract))?;
762        let service_bytecode = Bytecode::load_from_file(&service)
763            .with_context(|| format!("failed to load service bytecode from {:?}", &service))?;
764
765        info!("Publishing module");
766        let (blobs, module_id) =
767            create_bytecode_blobs(contract_bytecode, service_bytecode, vm_runtime).await;
768        let (module_id, _) = self
769            .apply_client_command(chain_client, |chain_client| {
770                let blobs = blobs.clone();
771                let chain_client = chain_client.clone();
772                async move {
773                    chain_client
774                        .publish_module_blobs(blobs, module_id)
775                        .await
776                        .context("Failed to publish module")
777                }
778            })
779            .await?;
780
781        info!("{}", "Module published successfully!");
782
783        info!("Synchronizing client and processing inbox");
784        self.process_inbox(chain_client).await?;
785        Ok(module_id)
786    }
787
788    pub async fn publish_data_blob(
789        &mut self,
790        chain_client: &ChainClient<Env>,
791        blob_path: PathBuf,
792    ) -> Result<CryptoHash, Error> {
793        info!("Loading data blob file");
794        let blob_bytes = fs::read(&blob_path).context(format!(
795            "failed to load data blob bytes from {:?}",
796            &blob_path
797        ))?;
798
799        info!("Publishing data blob");
800        self.apply_client_command(chain_client, |chain_client| {
801            let blob_bytes = blob_bytes.clone();
802            let chain_client = chain_client.clone();
803            async move {
804                chain_client
805                    .publish_data_blob(blob_bytes)
806                    .await
807                    .context("Failed to publish data blob")
808            }
809        })
810        .await?;
811
812        info!("{}", "Data blob published successfully!");
813        Ok(CryptoHash::new(&BlobContent::new_data(blob_bytes)))
814    }
815
816    // TODO(#2490): Consider removing or renaming this.
817    pub async fn read_data_blob(
818        &mut self,
819        chain_client: &ChainClient<Env>,
820        hash: CryptoHash,
821    ) -> Result<(), Error> {
822        info!("Verifying data blob");
823        self.apply_client_command(chain_client, |chain_client| {
824            let chain_client = chain_client.clone();
825            async move {
826                chain_client
827                    .read_data_blob(hash)
828                    .await
829                    .context("Failed to verify data blob")
830            }
831        })
832        .await?;
833
834        info!("{}", "Data blob verified successfully!");
835        Ok(())
836    }
837}
838
839#[cfg(not(web))]
840impl<Env: Environment> ClientContext<Env> {
841    pub async fn prepare_for_benchmark(
842        &mut self,
843        num_chains: usize,
844        transactions_per_block: usize,
845        tokens_per_chain: Amount,
846        fungible_application_id: Option<ApplicationId>,
847        pub_keys: Vec<AccountPublicKey>,
848        chains_config_path: Option<&Path>,
849    ) -> Result<(Vec<ChainClient<Env>>, Vec<Vec<Operation>>), Error> {
850        let start = Instant::now();
851        // Below all block proposals are supposed to succeed without retries, we
852        // must make sure that all incoming payments have been accepted on-chain
853        // and that no validator is missing user certificates.
854        self.process_inboxes_and_force_validator_updates().await;
855        info!(
856            "Processed inboxes and forced validator updates in {} ms",
857            start.elapsed().as_millis()
858        );
859
860        let start = Instant::now();
861        let (benchmark_chains, chain_clients) = self
862            .make_benchmark_chains(
863                num_chains,
864                tokens_per_chain,
865                pub_keys,
866                chains_config_path.is_some(),
867            )
868            .await?;
869        info!(
870            "Got {} chains in {} ms",
871            num_chains,
872            start.elapsed().as_millis()
873        );
874
875        if let Some(id) = fungible_application_id {
876            let start = Instant::now();
877            self.supply_fungible_tokens(&benchmark_chains, id).await?;
878            info!(
879                "Supplied fungible tokens in {} ms",
880                start.elapsed().as_millis()
881            );
882            // Need to process inboxes to make sure the chains receive the supplied tokens.
883            let start = Instant::now();
884            for chain_client in &chain_clients {
885                chain_client.process_inbox().await?;
886            }
887            info!(
888                "Processed inboxes after supplying fungible tokens in {} ms",
889                start.elapsed().as_millis()
890            );
891        }
892
893        let all_chains = Benchmark::<Env>::get_all_chains(chains_config_path, &benchmark_chains)?;
894        let known_chain_ids: HashSet<_> = benchmark_chains.iter().map(|(id, _)| *id).collect();
895        let unknown_chain_ids: Vec<_> = all_chains
896            .iter()
897            .filter(|id| !known_chain_ids.contains(id))
898            .copied()
899            .collect();
900        if !unknown_chain_ids.is_empty() {
901            // The current client won't have the blobs for the chains in the other wallets. Even
902            // though it will eventually get those blobs, we're getting a head start here and
903            // fetching those blobs in advance.
904            for chain_id in &unknown_chain_ids {
905                self.client.get_chain_description(*chain_id).await?;
906            }
907        }
908
909        let blocks_infos = Benchmark::<Env>::make_benchmark_block_info(
910            benchmark_chains,
911            transactions_per_block,
912            fungible_application_id,
913            all_chains,
914        )?;
915
916        Ok((chain_clients, blocks_infos))
917    }
918
919    pub async fn wrap_up_benchmark(
920        &mut self,
921        chain_clients: Vec<ChainClient<Env>>,
922        close_chains: bool,
923        wrap_up_max_in_flight: usize,
924    ) -> Result<(), Error> {
925        if close_chains {
926            info!("Closing chains...");
927            let stream = stream::iter(chain_clients)
928                .map(|chain_client| async move {
929                    Benchmark::<Env>::close_benchmark_chain(&chain_client).await?;
930                    info!("Closed chain {:?}", chain_client.chain_id());
931                    Ok::<(), BenchmarkError>(())
932                })
933                .buffer_unordered(wrap_up_max_in_flight);
934            stream.try_collect::<Vec<_>>().await?;
935        } else {
936            info!("Processing inbox for all chains...");
937            let stream = stream::iter(chain_clients.clone())
938                .map(|chain_client| async move {
939                    chain_client.process_inbox().await?;
940                    info!("Processed inbox for chain {:?}", chain_client.chain_id());
941                    Ok::<(), ChainClientError>(())
942                })
943                .buffer_unordered(wrap_up_max_in_flight);
944            stream.try_collect::<Vec<_>>().await?;
945
946            info!("Updating wallet from chain clients...");
947            for chain_client in chain_clients {
948                let info = chain_client.chain_info().await?;
949                let client_owner = chain_client.preferred_owner();
950                let pending_proposal = chain_client.pending_proposal().clone();
951                self.wallet()
952                    .insert(
953                        info.chain_id,
954                        wallet::Chain {
955                            pending_proposal,
956                            owner: client_owner,
957                            ..info.as_ref().into()
958                        },
959                    )
960                    .await
961                    .map_err(error::Inner::wallet)?;
962            }
963        }
964
965        Ok(())
966    }
967
968    async fn process_inboxes_and_force_validator_updates(&mut self) {
969        let mut join_set = task::JoinSet::new();
970
971        let chain_clients: Vec<_> = self
972            .wallet()
973            .owned_chain_ids()
974            .map_err(|e| error::Inner::wallet(e).into())
975            .and_then(|id| self.make_chain_client(id))
976            .try_collect()
977            .await
978            .unwrap();
979
980        for chain_client in chain_clients {
981            join_set.spawn(async move {
982                Self::process_inbox_without_updating_wallet(&chain_client)
983                    .await
984                    .expect("Processing inbox should not fail!");
985                chain_client
986            });
987        }
988
989        for chain_client in join_set.join_all().await {
990            self.update_wallet_from_client(&chain_client).await.unwrap();
991        }
992    }
993
994    async fn process_inbox_without_updating_wallet(
995        chain_client: &ChainClient<Env>,
996    ) -> Result<Vec<ConfirmedBlockCertificate>, Error> {
997        // Try processing the inbox optimistically without waiting for validator notifications.
998        chain_client.synchronize_from_validators().await?;
999        let (certificates, maybe_timeout) = chain_client.process_inbox_without_prepare().await?;
1000        assert!(
1001            maybe_timeout.is_none(),
1002            "Should not timeout within benchmark!"
1003        );
1004
1005        Ok(certificates)
1006    }
1007
1008    /// Creates chains if necessary, and returns a map of exactly `num_chains` chain IDs
1009    /// with key pairs, as well as a map of the chain clients.
1010    async fn make_benchmark_chains(
1011        &mut self,
1012        num_chains: usize,
1013        balance: Amount,
1014        pub_keys: Vec<AccountPublicKey>,
1015        wallet_only: bool,
1016    ) -> Result<(Vec<(ChainId, AccountOwner)>, Vec<ChainClient<Env>>), Error> {
1017        let mut chains_found_in_wallet = 0;
1018        let mut benchmark_chains = Vec::with_capacity(num_chains);
1019        let mut chain_clients = Vec::with_capacity(num_chains);
1020        let start = Instant::now();
1021        let mut owned_chain_ids = std::pin::pin!(self.wallet().owned_chain_ids());
1022        while let Some(chain_id) = owned_chain_ids.next().await {
1023            let chain_id = chain_id.map_err(error::Inner::wallet)?;
1024            if chains_found_in_wallet == num_chains {
1025                break;
1026            }
1027            let chain_client = self.make_chain_client(chain_id).await?;
1028            let ownership = chain_client.chain_info().await?.manager.ownership;
1029            if !ownership.owners.is_empty() || ownership.super_owners.len() != 1 {
1030                continue;
1031            }
1032            let owner = *ownership.super_owners.first().unwrap();
1033            chain_client.process_inbox().await?;
1034            benchmark_chains.push((chain_id, owner));
1035            chain_clients.push(chain_client);
1036            chains_found_in_wallet += 1;
1037        }
1038        info!(
1039            "Got {} chains from the wallet in {} ms",
1040            benchmark_chains.len(),
1041            start.elapsed().as_millis()
1042        );
1043
1044        let num_chains_to_create = num_chains - chains_found_in_wallet;
1045
1046        let default_chain_client = self.make_chain_client(self.default_chain()).await?;
1047
1048        if num_chains_to_create > 0 {
1049            if wallet_only {
1050                return Err(
1051                    error::Inner::Benchmark(BenchmarkError::NotEnoughChainsInWallet(
1052                        num_chains,
1053                        chains_found_in_wallet,
1054                    ))
1055                    .into(),
1056                );
1057            }
1058            let mut pub_keys_iter = pub_keys.into_iter().take(num_chains_to_create);
1059            let operations_per_block = 900; // Over this we seem to hit the block size limits.
1060            for i in (0..num_chains_to_create).step_by(operations_per_block) {
1061                let num_new_chains = operations_per_block.min(num_chains_to_create - i);
1062                let pub_key = pub_keys_iter.next().unwrap();
1063                let owner = pub_key.into();
1064
1065                let certificate = Self::execute_open_chains_operations(
1066                    num_new_chains,
1067                    &default_chain_client,
1068                    balance,
1069                    owner,
1070                )
1071                .await?;
1072                info!("Block executed successfully");
1073
1074                let block = certificate.block();
1075                for i in 0..num_new_chains {
1076                    let chain_id = block.body.blobs[i]
1077                        .iter()
1078                        .find(|blob| blob.id().blob_type == BlobType::ChainDescription)
1079                        .map(|blob| ChainId(blob.id().hash))
1080                        .expect("failed to create a new chain");
1081                    self.client
1082                        .extend_chain_mode(chain_id, ListeningMode::FullChain);
1083
1084                    let mut chain_client = self.client.create_chain_client(
1085                        chain_id,
1086                        None,
1087                        BlockHeight::ZERO,
1088                        None,
1089                        Some(owner),
1090                        self.timing_sender(),
1091                    );
1092                    chain_client.set_preferred_owner(owner);
1093                    chain_client.process_inbox().await?;
1094                    benchmark_chains.push((chain_id, owner));
1095                    chain_clients.push(chain_client);
1096                }
1097            }
1098
1099            info!(
1100                "Created {} chains in {} ms",
1101                num_chains_to_create,
1102                start.elapsed().as_millis()
1103            );
1104        }
1105
1106        info!("Updating wallet from client");
1107        self.update_wallet_from_client(&default_chain_client)
1108            .await?;
1109        info!("Retrying pending outgoing messages");
1110        default_chain_client
1111            .retry_pending_outgoing_messages()
1112            .await
1113            .context("outgoing messages to create the new chains should be delivered")?;
1114        info!("Processing default chain inbox");
1115        default_chain_client.process_inbox().await?;
1116
1117        Ok((benchmark_chains, chain_clients))
1118    }
1119
1120    async fn execute_open_chains_operations(
1121        num_new_chains: usize,
1122        chain_client: &ChainClient<Env>,
1123        balance: Amount,
1124        owner: AccountOwner,
1125    ) -> Result<ConfirmedBlockCertificate, Error> {
1126        let config = OpenChainConfig {
1127            ownership: ChainOwnership::single_super(owner),
1128            balance,
1129            application_permissions: Default::default(),
1130        };
1131        let operations = iter::repeat_n(
1132            Operation::system(SystemOperation::OpenChain(config)),
1133            num_new_chains,
1134        )
1135        .collect();
1136        info!("Executing {} OpenChain operations", num_new_chains);
1137        Ok(chain_client
1138            .execute_operations(operations, vec![])
1139            .await?
1140            .expect("should execute block with OpenChain operations"))
1141    }
1142
1143    /// Supplies fungible tokens to the chains.
1144    async fn supply_fungible_tokens(
1145        &mut self,
1146        key_pairs: &[(ChainId, AccountOwner)],
1147        application_id: ApplicationId,
1148    ) -> Result<(), Error> {
1149        let default_chain_id = self.default_chain();
1150        let default_key = self
1151            .wallet()
1152            .get(default_chain_id)
1153            .await
1154            .unwrap()
1155            .unwrap()
1156            .owner
1157            .unwrap();
1158        // This should be enough to run the benchmark at 1M TPS for an hour.
1159        let amount = Amount::from_nanos(4);
1160        let operations: Vec<Operation> = key_pairs
1161            .iter()
1162            .map(|(chain_id, owner)| {
1163                Benchmark::<Env>::fungible_transfer(
1164                    application_id,
1165                    *chain_id,
1166                    default_key,
1167                    *owner,
1168                    amount,
1169                )
1170            })
1171            .collect();
1172        let chain_client = self.make_chain_client(default_chain_id).await?;
1173        // Put at most 1000 fungible token operations in each block.
1174        for operation_chunk in operations.chunks(1000) {
1175            chain_client
1176                .execute_operations(operation_chunk.to_vec(), vec![])
1177                .await?
1178                .expect("should execute block with Transfer operations");
1179        }
1180        self.update_wallet_from_client(&chain_client).await?;
1181
1182        Ok(())
1183    }
1184}