Skip to main content

linera_client/
client_context.rs

1// Copyright (c) Zefchain Labs, Inc.
2// SPDX-License-Identifier: Apache-2.0
3
4use std::sync::Arc;
5
6use futures::{Future, StreamExt as _, TryStreamExt as _};
7use linera_base::{
8    crypto::{CryptoHash, ValidatorPublicKey},
9    data_types::{Epoch, Timestamp},
10    identifiers::{Account, AccountOwner, ChainId},
11    ownership::ChainOwnership,
12    time::{Duration, Instant},
13    util::future::FutureSyncExt as _,
14};
15use linera_chain::{manager::LockingBlock, types::ConfirmedBlockCertificate};
16use linera_core::{
17    client::{ChainClient, ChainClientError, Client, ListeningMode},
18    data_types::{ChainInfo, ChainInfoQuery, ClientOutcome},
19    join_set_ext::JoinSet,
20    node::ValidatorNode,
21    wallet, Environment, JoinSetExt as _, Wallet as _,
22};
23use linera_rpc::node_provider::{NodeOptions, NodeProvider};
24use linera_version::VersionInfo;
25use thiserror_context::Context;
26use tracing::{debug, info, warn};
27#[cfg(not(web))]
28use {
29    crate::{
30        benchmark::{fungible_transfer, Benchmark, BenchmarkError},
31        client_metrics::ClientMetrics,
32    },
33    futures::stream,
34    linera_base::{
35        crypto::AccountPublicKey,
36        data_types::{Amount, BlockHeight},
37        identifiers::{ApplicationId, BlobType},
38    },
39    linera_execution::{
40        system::{OpenChainConfig, SystemOperation},
41        Operation,
42    },
43    std::{collections::HashSet, path::Path},
44    tokio::{sync::mpsc, task},
45};
46#[cfg(feature = "fs")]
47use {
48    linera_base::{
49        data_types::{BlobContent, Bytecode},
50        identifiers::ModuleId,
51        vm::VmRuntime,
52    },
53    linera_core::client::create_bytecode_blobs,
54    std::{fs, path::PathBuf},
55};
56
57use crate::{
58    chain_listener::{self, ClientContext as _},
59    client_options::{ChainOwnershipConfig, Options},
60    config::GenesisConfig,
61    error, util, Error,
62};
63
64/// Results from querying a validator about version, network description, and chain info.
65pub struct ValidatorQueryResults {
66    /// The validator's version information.
67    pub version_info: Result<VersionInfo, Error>,
68    /// The validator's genesis config hash.
69    pub genesis_config_hash: Result<CryptoHash, Error>,
70    /// The validator's chain info (if valid and signature check passed).
71    pub chain_info: Result<ChainInfo, Error>,
72}
73
74impl ValidatorQueryResults {
75    /// Returns a vector of references to all errors in the query results.
76    pub fn errors(&self) -> Vec<&Error> {
77        let mut errors = Vec::new();
78        if let Err(e) = &self.version_info {
79            errors.push(e);
80        }
81        if let Err(e) = &self.genesis_config_hash {
82            errors.push(e);
83        }
84        if let Err(e) = &self.chain_info {
85            errors.push(e);
86        }
87        errors
88    }
89
90    /// Prints validator information to stdout.
91    ///
92    /// Prints public key, address, and optionally weight, version info, and chain info.
93    /// If `reference` is provided, only prints fields that differ from the reference.
94    pub fn print(
95        &self,
96        public_key: Option<&ValidatorPublicKey>,
97        address: Option<&str>,
98        weight: Option<u64>,
99        reference: Option<&ValidatorQueryResults>,
100    ) {
101        if let Some(key) = public_key {
102            println!("Public key: {}", key);
103        }
104        if let Some(address) = address {
105            println!("Address: {}", address);
106        }
107        if let Some(w) = weight {
108            println!("Weight: {}", w);
109        }
110
111        let ref_version = reference.and_then(|ref_results| ref_results.version_info.as_ref().ok());
112        match &self.version_info {
113            Ok(version_info) => {
114                if ref_version.is_none_or(|ref_v| ref_v.crate_version != version_info.crate_version)
115                {
116                    println!("Linera protocol: v{}", version_info.crate_version);
117                }
118                if ref_version.is_none_or(|ref_v| ref_v.rpc_hash != version_info.rpc_hash) {
119                    println!("RPC API hash: {}", version_info.rpc_hash);
120                }
121                if ref_version.is_none_or(|ref_v| ref_v.graphql_hash != version_info.graphql_hash) {
122                    println!("GraphQL API hash: {}", version_info.graphql_hash);
123                }
124                if ref_version.is_none_or(|ref_v| ref_v.wit_hash != version_info.wit_hash) {
125                    println!("WIT API hash: v{}", version_info.wit_hash);
126                }
127                if ref_version.is_none_or(|ref_v| {
128                    (&ref_v.git_commit, ref_v.git_dirty)
129                        != (&version_info.git_commit, version_info.git_dirty)
130                }) {
131                    println!(
132                        "Source code: {}/tree/{}{}",
133                        env!("CARGO_PKG_REPOSITORY"),
134                        version_info.git_commit,
135                        if version_info.git_dirty {
136                            " (dirty)"
137                        } else {
138                            ""
139                        }
140                    );
141                }
142            }
143            Err(err) => println!("Error getting version info: {err}"),
144        }
145
146        let ref_genesis_hash =
147            reference.and_then(|ref_results| ref_results.genesis_config_hash.as_ref().ok());
148        match &self.genesis_config_hash {
149            Ok(hash) if ref_genesis_hash.is_some_and(|ref_hash| ref_hash == hash) => {}
150            Ok(hash) => println!("Genesis config hash: {hash}"),
151            Err(err) => println!("Error getting genesis config: {err}"),
152        }
153
154        let ref_info = reference.and_then(|ref_results| ref_results.chain_info.as_ref().ok());
155        match &self.chain_info {
156            Ok(info) => {
157                if ref_info.is_none_or(|ref_info| info.block_hash != ref_info.block_hash) {
158                    if let Some(hash) = info.block_hash {
159                        println!("Block hash: {}", hash);
160                    } else {
161                        println!("Block hash: None");
162                    }
163                }
164                if ref_info
165                    .is_none_or(|ref_info| info.next_block_height != ref_info.next_block_height)
166                {
167                    println!("Next height: {}", info.next_block_height);
168                }
169                if ref_info.is_none_or(|ref_info| info.timestamp != ref_info.timestamp) {
170                    println!("Timestamp: {}", info.timestamp);
171                }
172                if ref_info.is_none_or(|ref_info| info.epoch != ref_info.epoch) {
173                    println!("Epoch: {}", info.epoch);
174                }
175                if ref_info.is_none_or(|ref_info| {
176                    info.manager.current_round != ref_info.manager.current_round
177                }) {
178                    println!("Round: {}", info.manager.current_round);
179                }
180                if let Some(locking) = &info.manager.requested_locking {
181                    match &**locking {
182                        LockingBlock::Fast(proposal) => {
183                            println!(
184                                "Locking fast block from {}",
185                                proposal.content.block.timestamp
186                            );
187                        }
188                        LockingBlock::Regular(validated) => {
189                            println!(
190                                "Locking block {} in {} from {}",
191                                validated.hash(),
192                                validated.round,
193                                validated.block().header.timestamp
194                            );
195                        }
196                    }
197                }
198            }
199            Err(err) => println!("Error getting chain info: {err}"),
200        }
201    }
202}
203
204pub struct ClientContext<Env: Environment> {
205    pub client: Arc<Client<Env>>,
206    // TODO(#5083): this doesn't really need to be stored
207    pub genesis_config: crate::config::GenesisConfig,
208    pub send_timeout: Duration,
209    pub recv_timeout: Duration,
210    pub retry_delay: Duration,
211    pub max_retries: u32,
212    pub max_backoff: Duration,
213    pub chain_listeners: JoinSet,
214    // TODO(#5082): move this into the upstream UI layers (maybe just the CLI)
215    pub default_chain: Option<ChainId>,
216    #[cfg(not(web))]
217    pub client_metrics: Option<ClientMetrics>,
218}
219
220impl<Env: Environment> chain_listener::ClientContext for ClientContext<Env> {
221    type Environment = Env;
222
223    fn wallet(&self) -> &Env::Wallet {
224        self.client.wallet()
225    }
226
227    fn storage(&self) -> &Env::Storage {
228        self.client.storage_client()
229    }
230
231    fn client(&self) -> &Arc<Client<Env>> {
232        &self.client
233    }
234
235    #[cfg(not(web))]
236    fn timing_sender(
237        &self,
238    ) -> Option<mpsc::UnboundedSender<(u64, linera_core::client::TimingType)>> {
239        self.client_metrics
240            .as_ref()
241            .map(|metrics| metrics.timing_sender.clone())
242    }
243
244    async fn update_wallet_for_new_chain(
245        &mut self,
246        chain_id: ChainId,
247        owner: Option<AccountOwner>,
248        timestamp: Timestamp,
249        epoch: Epoch,
250    ) -> Result<(), Error> {
251        self.update_wallet_for_new_chain(chain_id, owner, timestamp, epoch)
252            .make_sync()
253            .await
254    }
255
256    async fn update_wallet(&mut self, client: &ChainClient<Env>) -> Result<(), Error> {
257        self.update_wallet_from_client(client).make_sync().await
258    }
259}
260
261impl<S, Si, W> ClientContext<linera_core::environment::Impl<S, NodeProvider, Si, W>>
262where
263    S: linera_core::environment::Storage,
264    Si: linera_core::environment::Signer,
265    W: linera_core::environment::Wallet,
266{
267    // not worth refactoring this because
268    // https://github.com/linera-io/linera-protocol/issues/5082
269    // https://github.com/linera-io/linera-protocol/issues/5083
270    #[allow(clippy::too_many_arguments)]
271    pub async fn new(
272        storage: S,
273        wallet: W,
274        signer: Si,
275        options: &Options,
276        default_chain: Option<ChainId>,
277        genesis_config: GenesisConfig,
278    ) -> Result<Self, Error> {
279        #[cfg(not(web))]
280        let timing_config = options.to_timing_config();
281        let node_provider = NodeProvider::new(NodeOptions {
282            send_timeout: options.send_timeout,
283            recv_timeout: options.recv_timeout,
284            retry_delay: options.retry_delay,
285            max_retries: options.max_retries,
286            max_backoff: options.max_backoff,
287        });
288        let chain_modes: Vec<_> = wallet
289            .items()
290            .map_ok(|(id, _chain)| (id, ListeningMode::FullChain))
291            .try_collect()
292            .await
293            .map_err(error::Inner::wallet)?;
294        let name = match chain_modes.len() {
295            0 => "Client node".to_string(),
296            1 => format!("Client node for {:.8}", chain_modes[0].0),
297            n => format!(
298                "Client node for {:.8} and {} others",
299                chain_modes[0].0,
300                n - 1
301            ),
302        };
303
304        let client = Client::new(
305            linera_core::environment::Impl {
306                network: node_provider,
307                storage,
308                signer,
309                wallet,
310            },
311            genesis_config.admin_chain_id(),
312            options.long_lived_services,
313            chain_modes,
314            name,
315            options.chain_worker_ttl,
316            options.sender_chain_worker_ttl,
317            options.to_chain_client_options(),
318            options.to_requests_scheduler_config(),
319        );
320
321        #[cfg(not(web))]
322        let client_metrics = if timing_config.enabled {
323            Some(ClientMetrics::new(timing_config))
324        } else {
325            None
326        };
327
328        Ok(ClientContext {
329            client: Arc::new(client),
330            default_chain,
331            genesis_config,
332            send_timeout: options.send_timeout,
333            recv_timeout: options.recv_timeout,
334            retry_delay: options.retry_delay,
335            max_retries: options.max_retries,
336            max_backoff: options.max_backoff,
337            chain_listeners: JoinSet::default(),
338            #[cfg(not(web))]
339            client_metrics,
340        })
341    }
342}
343
344impl<Env: Environment> ClientContext<Env> {
345    // TODO(#5084) this (and other injected dependencies) should not be re-exposed by the
346    // client interface
347    /// Returns a reference to the wallet.
348    pub fn wallet(&self) -> &Env::Wallet {
349        self.client.wallet()
350    }
351
352    /// Returns the ID of the admin chain.
353    pub fn admin_chain_id(&self) -> ChainId {
354        self.client.admin_chain_id()
355    }
356
357    /// Retrieve the default account. Current this is the common account of the default
358    /// chain.
359    pub fn default_account(&self) -> Account {
360        Account::chain(self.default_chain())
361    }
362
363    /// Retrieve the default chain.
364    pub fn default_chain(&self) -> ChainId {
365        self.default_chain
366            .expect("default chain requested but none set")
367    }
368
369    pub async fn first_non_admin_chain(&self) -> Result<ChainId, Error> {
370        let admin_chain_id = self.admin_chain_id();
371        std::pin::pin!(self
372            .wallet()
373            .chain_ids()
374            .try_filter(|chain_id| futures::future::ready(*chain_id != admin_chain_id)))
375        .next()
376        .await
377        .expect("No non-admin chain specified in wallet with no non-admin chain")
378        .map_err(Error::wallet)
379    }
380
381    // TODO(#5084) this should match the `NodeProvider` from the `Environment`
382    pub fn make_node_provider(&self) -> NodeProvider {
383        NodeProvider::new(self.make_node_options())
384    }
385
386    fn make_node_options(&self) -> NodeOptions {
387        NodeOptions {
388            send_timeout: self.send_timeout,
389            recv_timeout: self.recv_timeout,
390            retry_delay: self.retry_delay,
391            max_retries: self.max_retries,
392            max_backoff: self.max_backoff,
393        }
394    }
395
396    #[cfg(not(web))]
397    pub fn client_metrics(&self) -> Option<&ClientMetrics> {
398        self.client_metrics.as_ref()
399    }
400
401    pub async fn update_wallet_from_client<Env_: Environment>(
402        &self,
403        client: &ChainClient<Env_>,
404    ) -> Result<(), Error> {
405        let info = client.chain_info().await?;
406        let existing_owner = self
407            .wallet()
408            .get(info.chain_id)
409            .await
410            .map_err(error::Inner::wallet)?
411            .and_then(|chain| chain.owner);
412
413        self.wallet()
414            .insert(
415                info.chain_id,
416                wallet::Chain {
417                    pending_proposal: client.pending_proposal().clone(),
418                    owner: existing_owner,
419                    ..info.as_ref().into()
420                },
421            )
422            .await
423            .map_err(error::Inner::wallet)?;
424
425        Ok(())
426    }
427
428    /// Remembers the new chain and its owner (if any) in the wallet.
429    pub async fn update_wallet_for_new_chain(
430        &mut self,
431        chain_id: ChainId,
432        owner: Option<AccountOwner>,
433        timestamp: Timestamp,
434        epoch: Epoch,
435    ) -> Result<(), Error> {
436        self.wallet()
437            .try_insert(
438                chain_id,
439                linera_core::wallet::Chain::new(owner, epoch, timestamp),
440            )
441            .await
442            .map_err(error::Inner::wallet)?;
443        Ok(())
444    }
445
446    pub async fn process_inbox(
447        &mut self,
448        chain_client: &ChainClient<Env>,
449    ) -> Result<Vec<ConfirmedBlockCertificate>, Error> {
450        let mut certificates = Vec::new();
451        // Try processing the inbox optimistically without waiting for validator notifications.
452        let (new_certificates, maybe_timeout) = {
453            chain_client.synchronize_from_validators().await?;
454            let result = chain_client.process_inbox_without_prepare().await;
455            self.update_wallet_from_client(chain_client).await?;
456            result?
457        };
458        certificates.extend(new_certificates);
459        if maybe_timeout.is_none() {
460            return Ok(certificates);
461        }
462
463        // Start listening for notifications, so we learn about new rounds and blocks.
464        let (listener, _listen_handle, mut notification_stream) = chain_client.listen().await?;
465        self.chain_listeners.spawn_task(listener);
466
467        loop {
468            let (new_certificates, maybe_timeout) = {
469                let result = chain_client.process_inbox().await;
470                self.update_wallet_from_client(chain_client).await?;
471                result?
472            };
473            certificates.extend(new_certificates);
474            if let Some(timestamp) = maybe_timeout {
475                util::wait_for_next_round(&mut notification_stream, timestamp).await
476            } else {
477                return Ok(certificates);
478            }
479        }
480    }
481
482    pub async fn assign_new_chain_to_key(
483        &mut self,
484        chain_id: ChainId,
485        owner: AccountOwner,
486    ) -> Result<(), Error> {
487        self.client
488            .extend_chain_mode(chain_id, ListeningMode::FullChain);
489        let client = self.make_chain_client(chain_id).await?;
490
491        // Ensure we have the chain description blob.
492        client.get_chain_description().await?;
493
494        // Synchronize and get chain info.
495        client.synchronize_from_validators().await?;
496        let info = client.chain_info().await?;
497
498        // Validate that the owner can propose on this chain (either as owner or via
499        // open_multi_leader_rounds).
500        if !info
501            .manager
502            .ownership
503            .can_propose_in_multi_leader_round(&owner)
504        {
505            tracing::error!("Chain {chain_id} is not owned by {owner}.");
506            return Err(error::Inner::ChainOwnership.into());
507        }
508
509        // Try to modify existing chain entry, setting the owner.
510        let modified = self
511            .wallet()
512            .modify(chain_id, |chain| chain.owner = Some(owner))
513            .await
514            .map_err(error::Inner::wallet)?;
515        // If the chain didn't exist, insert a new entry.
516        if modified.is_none() {
517            self.wallet()
518                .insert(
519                    chain_id,
520                    wallet::Chain {
521                        owner: Some(owner),
522                        timestamp: info.timestamp,
523                        epoch: Some(info.epoch),
524                        ..Default::default()
525                    },
526                )
527                .await
528                .map_err(error::Inner::wallet)
529                .context("assigning new chain")?;
530        }
531        Ok(())
532    }
533
534    /// Applies the given function to the chain client.
535    ///
536    /// Updates the wallet regardless of the outcome. As long as the function returns a round
537    /// timeout, it will wait and retry.
538    pub async fn apply_client_command<E, F, Fut, T>(
539        &mut self,
540        client: &ChainClient<Env>,
541        mut f: F,
542    ) -> Result<T, Error>
543    where
544        F: FnMut(&ChainClient<Env>) -> Fut,
545        Fut: Future<Output = Result<ClientOutcome<T>, E>>,
546        Error: From<E>,
547    {
548        client.prepare_chain().await?;
549        // Try applying f optimistically without validator notifications. Return if committed.
550        let result = f(client).await;
551        self.update_wallet_from_client(client).await?;
552        match result? {
553            ClientOutcome::Committed(t) => return Ok(t),
554            ClientOutcome::Conflict(certificate) => {
555                return Err(ChainClientError::Conflict(certificate.hash()).into());
556            }
557            ClientOutcome::WaitForTimeout(_) => {}
558        }
559
560        // Start listening for notifications, so we learn about new rounds and blocks.
561        let (listener, _listen_handle, mut notification_stream) = client.listen().await?;
562        self.chain_listeners.spawn_task(listener);
563
564        loop {
565            // Try applying f. Return if committed.
566            let result = f(client).await;
567            self.update_wallet_from_client(client).await?;
568            let timeout = match result? {
569                ClientOutcome::Committed(t) => return Ok(t),
570                ClientOutcome::Conflict(certificate) => {
571                    return Err(ChainClientError::Conflict(certificate.hash()).into());
572                }
573                ClientOutcome::WaitForTimeout(timeout) => timeout,
574            };
575            // Otherwise wait and try again in the next round.
576            util::wait_for_next_round(&mut notification_stream, timeout).await;
577        }
578    }
579
580    pub async fn ownership(&mut self, chain_id: Option<ChainId>) -> Result<ChainOwnership, Error> {
581        let chain_id = chain_id.unwrap_or_else(|| self.default_chain());
582        let client = self.make_chain_client(chain_id).await?;
583        let info = client.chain_info().await?;
584        Ok(info.manager.ownership)
585    }
586
587    pub async fn change_ownership(
588        &mut self,
589        chain_id: Option<ChainId>,
590        ownership_config: ChainOwnershipConfig,
591    ) -> Result<(), Error> {
592        let chain_id = chain_id.unwrap_or_else(|| self.default_chain());
593        let chain_client = self.make_chain_client(chain_id).await?;
594        info!(
595            ?ownership_config, %chain_id, preferred_owner=?chain_client.preferred_owner(),
596            "Changing ownership of a chain"
597        );
598        let time_start = Instant::now();
599        let mut ownership = chain_client.query_chain_ownership().await?;
600        ownership_config.update(&mut ownership)?;
601
602        if ownership.super_owners.is_empty() && ownership.owners.is_empty() {
603            tracing::error!("At least one owner or super owner of the chain has to be set.");
604            return Err(error::Inner::ChainOwnership.into());
605        }
606
607        let certificate = self
608            .apply_client_command(&chain_client, |chain_client| {
609                let ownership = ownership.clone();
610                let chain_client = chain_client.clone();
611                async move {
612                    chain_client
613                        .change_ownership(ownership)
614                        .await
615                        .map_err(Error::from)
616                        .context("Failed to change ownership")
617                }
618            })
619            .await?;
620        let time_total = time_start.elapsed();
621        info!("Operation confirmed after {} ms", time_total.as_millis());
622        debug!("{:?}", certificate);
623        Ok(())
624    }
625
626    pub async fn set_preferred_owner(
627        &mut self,
628        chain_id: Option<ChainId>,
629        preferred_owner: AccountOwner,
630    ) -> Result<(), Error> {
631        let chain_id = chain_id.unwrap_or_else(|| self.default_chain());
632        let mut chain_client = self.make_chain_client(chain_id).await?;
633        let old_owner = chain_client.preferred_owner();
634        info!(%chain_id, ?old_owner, %preferred_owner, "Changing preferred owner for chain");
635        chain_client.set_preferred_owner(preferred_owner);
636        self.update_wallet_from_client(&chain_client).await?;
637        info!("New preferred owner set");
638        Ok(())
639    }
640
641    pub async fn check_compatible_version_info(
642        &self,
643        address: &str,
644        node: &impl ValidatorNode,
645    ) -> Result<VersionInfo, Error> {
646        match node.get_version_info().await {
647            Ok(version_info) if version_info.is_compatible_with(&linera_version::VERSION_INFO) => {
648                debug!(
649                    "Version information for validator {address}: {}",
650                    version_info
651                );
652                Ok(version_info)
653            }
654            Ok(version_info) => Err(error::Inner::UnexpectedVersionInfo {
655                remote: Box::new(version_info),
656                local: Box::new(linera_version::VERSION_INFO.clone()),
657            }
658            .into()),
659            Err(error) => Err(error::Inner::UnavailableVersionInfo {
660                address: address.to_string(),
661                error: Box::new(error),
662            }
663            .into()),
664        }
665    }
666
667    pub async fn check_matching_network_description(
668        &self,
669        address: &str,
670        node: &impl ValidatorNode,
671    ) -> Result<CryptoHash, Error> {
672        let network_description = self.genesis_config.network_description();
673        match node.get_network_description().await {
674            Ok(description) => {
675                if description == network_description {
676                    Ok(description.genesis_config_hash)
677                } else {
678                    Err(error::Inner::UnexpectedNetworkDescription {
679                        remote: Box::new(description),
680                        local: Box::new(network_description),
681                    }
682                    .into())
683                }
684            }
685            Err(error) => Err(error::Inner::UnavailableNetworkDescription {
686                address: address.to_string(),
687                error: Box::new(error),
688            }
689            .into()),
690        }
691    }
692
693    pub async fn check_validator_chain_info_response(
694        &self,
695        public_key: Option<&ValidatorPublicKey>,
696        address: &str,
697        node: &impl ValidatorNode,
698        chain_id: ChainId,
699    ) -> Result<ChainInfo, Error> {
700        let query = ChainInfoQuery::new(chain_id).with_manager_values();
701        match node.handle_chain_info_query(query).await {
702            Ok(response) => {
703                debug!(
704                    "Validator {address} sees chain {chain_id} at block height {} and epoch {:?}",
705                    response.info.next_block_height, response.info.epoch,
706                );
707                if let Some(public_key) = public_key {
708                    if response.check(*public_key).is_ok() {
709                        debug!("Signature for public key {public_key} is OK.");
710                    } else {
711                        return Err(error::Inner::InvalidSignature {
712                            public_key: *public_key,
713                        }
714                        .into());
715                    }
716                } else {
717                    warn!("Not checking signature as public key was not given");
718                }
719                Ok(*response.info)
720            }
721            Err(error) => Err(error::Inner::UnavailableChainInfo {
722                address: address.to_string(),
723                chain_id,
724                error: Box::new(error),
725            }
726            .into()),
727        }
728    }
729
730    /// Query a validator for version info, network description, and chain info.
731    ///
732    /// Returns a `ValidatorQueryResults` struct with the results of all three queries.
733    pub async fn query_validator(
734        &self,
735        address: &str,
736        node: &impl ValidatorNode,
737        chain_id: ChainId,
738        public_key: Option<&ValidatorPublicKey>,
739    ) -> ValidatorQueryResults {
740        let version_info = self.check_compatible_version_info(address, node).await;
741        let genesis_config_hash = self.check_matching_network_description(address, node).await;
742        let chain_info = self
743            .check_validator_chain_info_response(public_key, address, node, chain_id)
744            .await;
745
746        ValidatorQueryResults {
747            version_info,
748            genesis_config_hash,
749            chain_info,
750        }
751    }
752
753    /// Query the local node for version info, network description, and chain info.
754    ///
755    /// Returns a `ValidatorQueryResults` struct with the local node's information.
756    pub async fn query_local_node(
757        &self,
758        chain_id: ChainId,
759    ) -> Result<ValidatorQueryResults, Error> {
760        let version_info = Ok(linera_version::VERSION_INFO.clone());
761        let genesis_config_hash = Ok(self
762            .genesis_config
763            .network_description()
764            .genesis_config_hash);
765        let chain_info = self
766            .make_chain_client(chain_id)
767            .await?
768            .chain_info_with_manager_values()
769            .await
770            .map(|info| *info)
771            .map_err(|e| e.into());
772
773        Ok(ValidatorQueryResults {
774            version_info,
775            genesis_config_hash,
776            chain_info,
777        })
778    }
779}
780
781#[cfg(feature = "fs")]
782impl<Env: Environment> ClientContext<Env> {
783    pub async fn publish_module(
784        &mut self,
785        chain_client: &ChainClient<Env>,
786        contract: PathBuf,
787        service: PathBuf,
788        vm_runtime: VmRuntime,
789    ) -> Result<ModuleId, Error> {
790        info!("Loading bytecode files");
791        let contract_bytecode = Bytecode::load_from_file(&contract)
792            .with_context(|| format!("failed to load contract bytecode from {:?}", &contract))?;
793        let service_bytecode = Bytecode::load_from_file(&service)
794            .with_context(|| format!("failed to load service bytecode from {:?}", &service))?;
795
796        info!("Publishing module");
797        let (blobs, module_id) =
798            create_bytecode_blobs(contract_bytecode, service_bytecode, vm_runtime).await;
799        let (module_id, _) = self
800            .apply_client_command(chain_client, |chain_client| {
801                let blobs = blobs.clone();
802                let chain_client = chain_client.clone();
803                async move {
804                    chain_client
805                        .publish_module_blobs(blobs, module_id)
806                        .await
807                        .context("Failed to publish module")
808                }
809            })
810            .await?;
811
812        info!("{}", "Module published successfully!");
813
814        info!("Synchronizing client and processing inbox");
815        self.process_inbox(chain_client).await?;
816        Ok(module_id)
817    }
818
819    pub async fn publish_data_blob(
820        &mut self,
821        chain_client: &ChainClient<Env>,
822        blob_path: PathBuf,
823    ) -> Result<CryptoHash, Error> {
824        info!("Loading data blob file");
825        let blob_bytes = fs::read(&blob_path).context(format!(
826            "failed to load data blob bytes from {:?}",
827            &blob_path
828        ))?;
829
830        info!("Publishing data blob");
831        self.apply_client_command(chain_client, |chain_client| {
832            let blob_bytes = blob_bytes.clone();
833            let chain_client = chain_client.clone();
834            async move {
835                chain_client
836                    .publish_data_blob(blob_bytes)
837                    .await
838                    .context("Failed to publish data blob")
839            }
840        })
841        .await?;
842
843        info!("{}", "Data blob published successfully!");
844        Ok(CryptoHash::new(&BlobContent::new_data(blob_bytes)))
845    }
846
847    // TODO(#2490): Consider removing or renaming this.
848    pub async fn read_data_blob(
849        &mut self,
850        chain_client: &ChainClient<Env>,
851        hash: CryptoHash,
852    ) -> Result<(), Error> {
853        info!("Verifying data blob");
854        self.apply_client_command(chain_client, |chain_client| {
855            let chain_client = chain_client.clone();
856            async move {
857                chain_client
858                    .read_data_blob(hash)
859                    .await
860                    .context("Failed to verify data blob")
861            }
862        })
863        .await?;
864
865        info!("{}", "Data blob verified successfully!");
866        Ok(())
867    }
868}
869
870#[cfg(not(web))]
871impl<Env: Environment> ClientContext<Env> {
872    pub async fn prepare_for_benchmark(
873        &mut self,
874        num_chains: usize,
875        tokens_per_chain: Amount,
876        fungible_application_id: Option<ApplicationId>,
877        pub_keys: Vec<AccountPublicKey>,
878        chains_config_path: Option<&Path>,
879        close_chains: bool,
880    ) -> Result<Vec<ChainClient<Env>>, Error> {
881        let start = Instant::now();
882        // Below all block proposals are supposed to succeed without retries, we
883        // must make sure that all incoming payments have been accepted on-chain
884        // and that no validator is missing user certificates.
885        self.process_inboxes_and_force_validator_updates().await;
886        info!(
887            "Processed inboxes and forced validator updates in {} ms",
888            start.elapsed().as_millis()
889        );
890
891        let start = Instant::now();
892        let (benchmark_chains, chain_clients) = self
893            .make_benchmark_chains(
894                num_chains,
895                tokens_per_chain,
896                pub_keys,
897                chains_config_path.is_some(),
898                close_chains,
899            )
900            .await?;
901        info!(
902            "Got {} chains in {} ms",
903            num_chains,
904            start.elapsed().as_millis()
905        );
906
907        if let Some(id) = fungible_application_id {
908            let start = Instant::now();
909            self.supply_fungible_tokens(&benchmark_chains, id).await?;
910            info!(
911                "Supplied fungible tokens in {} ms",
912                start.elapsed().as_millis()
913            );
914            // Need to process inboxes to make sure the chains receive the supplied tokens.
915            let start = Instant::now();
916            for chain_client in &chain_clients {
917                chain_client.process_inbox().await?;
918            }
919            info!(
920                "Processed inboxes after supplying fungible tokens in {} ms",
921                start.elapsed().as_millis()
922            );
923        }
924
925        let all_chains = Benchmark::<Env>::get_all_chains(chains_config_path, &benchmark_chains)?;
926        let known_chain_ids: HashSet<_> = benchmark_chains.iter().map(|(id, _)| *id).collect();
927        let unknown_chain_ids: Vec<_> = all_chains
928            .iter()
929            .filter(|id| !known_chain_ids.contains(id))
930            .copied()
931            .collect();
932        if !unknown_chain_ids.is_empty() {
933            // The current client won't have the blobs for the chains in the other wallets. Even
934            // though it will eventually get those blobs, we're getting a head start here and
935            // fetching those blobs in advance.
936            for chain_id in &unknown_chain_ids {
937                self.client.get_chain_description(*chain_id).await?;
938            }
939        }
940
941        Ok(chain_clients)
942    }
943
944    pub async fn wrap_up_benchmark(
945        &mut self,
946        chain_clients: Vec<ChainClient<Env>>,
947        close_chains: bool,
948        wrap_up_max_in_flight: usize,
949    ) -> Result<(), Error> {
950        if close_chains {
951            info!("Closing chains...");
952            let chain_ids: Vec<_> = chain_clients.iter().map(|c| c.chain_id()).collect();
953            let stream = stream::iter(chain_clients)
954                .map(|chain_client| async move {
955                    Benchmark::<Env>::close_benchmark_chain(&chain_client).await?;
956                    info!("Closed chain {:?}", chain_client.chain_id());
957                    Ok::<(), BenchmarkError>(())
958                })
959                .buffer_unordered(wrap_up_max_in_flight);
960            stream.try_collect::<Vec<_>>().await?;
961            // Remove closed chains from wallet (the chain listener may have added them).
962            for chain_id in chain_ids {
963                let _ = self.wallet().remove(chain_id).await;
964            }
965        } else {
966            info!("Processing inbox for all chains...");
967            let stream = stream::iter(chain_clients.clone())
968                .map(|chain_client| async move {
969                    chain_client.process_inbox().await?;
970                    info!("Processed inbox for chain {:?}", chain_client.chain_id());
971                    Ok::<(), ChainClientError>(())
972                })
973                .buffer_unordered(wrap_up_max_in_flight);
974            stream.try_collect::<Vec<_>>().await?;
975
976            info!("Updating wallet from chain clients...");
977            for chain_client in chain_clients {
978                let info = chain_client.chain_info().await?;
979                let client_owner = chain_client.preferred_owner();
980                let pending_proposal = chain_client.pending_proposal().clone();
981                self.wallet()
982                    .insert(
983                        info.chain_id,
984                        wallet::Chain {
985                            pending_proposal,
986                            owner: client_owner,
987                            ..info.as_ref().into()
988                        },
989                    )
990                    .await
991                    .map_err(error::Inner::wallet)?;
992            }
993        }
994
995        Ok(())
996    }
997
998    async fn process_inboxes_and_force_validator_updates(&mut self) {
999        let mut join_set = task::JoinSet::new();
1000
1001        let chain_clients: Vec<_> = self
1002            .wallet()
1003            .owned_chain_ids()
1004            .map_err(|e| error::Inner::wallet(e).into())
1005            .and_then(|id| self.make_chain_client(id))
1006            .try_collect()
1007            .await
1008            .unwrap();
1009
1010        for chain_client in chain_clients {
1011            join_set.spawn(async move {
1012                Self::process_inbox_without_updating_wallet(&chain_client)
1013                    .await
1014                    .expect("Processing inbox should not fail!");
1015                chain_client
1016            });
1017        }
1018
1019        for chain_client in join_set.join_all().await {
1020            self.update_wallet_from_client(&chain_client).await.unwrap();
1021        }
1022    }
1023
1024    async fn process_inbox_without_updating_wallet(
1025        chain_client: &ChainClient<Env>,
1026    ) -> Result<Vec<ConfirmedBlockCertificate>, Error> {
1027        // Try processing the inbox optimistically without waiting for validator notifications.
1028        chain_client.synchronize_from_validators().await?;
1029        let (certificates, maybe_timeout) = chain_client.process_inbox_without_prepare().await?;
1030        assert!(
1031            maybe_timeout.is_none(),
1032            "Should not timeout within benchmark!"
1033        );
1034
1035        Ok(certificates)
1036    }
1037
1038    /// Creates chains if necessary, and returns a map of exactly `num_chains` chain IDs
1039    /// with key pairs, as well as a map of the chain clients.
1040    ///
1041    /// If `close_chains` is true, chains are not looked up from or stored in the wallet,
1042    /// since they will be closed after the benchmark and shouldn't be reused.
1043    async fn make_benchmark_chains(
1044        &mut self,
1045        num_chains: usize,
1046        balance: Amount,
1047        pub_keys: Vec<AccountPublicKey>,
1048        wallet_only: bool,
1049        close_chains: bool,
1050    ) -> Result<(Vec<(ChainId, AccountOwner)>, Vec<ChainClient<Env>>), Error> {
1051        let mut chains_found_in_wallet = 0;
1052        let mut benchmark_chains = Vec::with_capacity(num_chains);
1053        let mut chain_clients = Vec::with_capacity(num_chains);
1054        let start = Instant::now();
1055
1056        // When close_chains is true and we're creating our own chains (not wallet_only),
1057        // skip wallet lookup to avoid picking up existing chains that would then be closed.
1058        // When wallet_only is true, chains were pre-created by the parent process and must
1059        // be read from the wallet.
1060        if !close_chains || wallet_only {
1061            let mut owned_chain_ids = std::pin::pin!(self.wallet().owned_chain_ids());
1062            while let Some(chain_id) = owned_chain_ids.next().await {
1063                let chain_id = chain_id.map_err(error::Inner::wallet)?;
1064                if chains_found_in_wallet == num_chains {
1065                    break;
1066                }
1067                let chain_client = self.make_chain_client(chain_id).await?;
1068                let ownership = chain_client.chain_info().await?.manager.ownership;
1069                if !ownership.owners.is_empty() || ownership.super_owners.len() != 1 {
1070                    continue;
1071                }
1072                let owner = *ownership.super_owners.first().unwrap();
1073                chain_client.process_inbox().await?;
1074                benchmark_chains.push((chain_id, owner));
1075                chain_clients.push(chain_client);
1076                chains_found_in_wallet += 1;
1077            }
1078            info!(
1079                "Got {} chains from the wallet in {} ms",
1080                benchmark_chains.len(),
1081                start.elapsed().as_millis()
1082            );
1083        }
1084
1085        let num_chains_to_create = num_chains - chains_found_in_wallet;
1086
1087        let default_chain_client = self.make_chain_client(self.default_chain()).await?;
1088
1089        if num_chains_to_create > 0 {
1090            if wallet_only {
1091                return Err(
1092                    error::Inner::Benchmark(BenchmarkError::NotEnoughChainsInWallet(
1093                        num_chains,
1094                        chains_found_in_wallet,
1095                    ))
1096                    .into(),
1097                );
1098            }
1099            let mut pub_keys_iter = pub_keys.into_iter().take(num_chains_to_create);
1100            let operations_per_block = 900; // Over this we seem to hit the block size limits.
1101            for i in (0..num_chains_to_create).step_by(operations_per_block) {
1102                let num_new_chains = operations_per_block.min(num_chains_to_create - i);
1103                // Each chain gets its own unique owner (previously all chains in a batch
1104                // shared one owner, which could cause conflicts during benchmarking).
1105                let owners: Vec<AccountOwner> = (&mut pub_keys_iter)
1106                    .take(num_new_chains)
1107                    .map(|pk| pk.into())
1108                    .collect();
1109
1110                let certificate = Self::execute_open_chains_operations(
1111                    &default_chain_client,
1112                    balance,
1113                    owners.clone(),
1114                )
1115                .await?;
1116                info!("Block executed successfully");
1117
1118                let block = certificate.block();
1119                for (i, owner) in owners.into_iter().enumerate() {
1120                    let chain_id = block.body.blobs[i]
1121                        .iter()
1122                        .find(|blob| blob.id().blob_type == BlobType::ChainDescription)
1123                        .map(|blob| ChainId(blob.id().hash))
1124                        .expect("failed to create a new chain");
1125                    self.client
1126                        .extend_chain_mode(chain_id, ListeningMode::FullChain);
1127
1128                    let mut chain_client = self.client.create_chain_client(
1129                        chain_id,
1130                        None,
1131                        BlockHeight::ZERO,
1132                        None,
1133                        Some(owner),
1134                        self.timing_sender(),
1135                    );
1136                    chain_client.set_preferred_owner(owner);
1137                    chain_client.process_inbox().await?;
1138                    benchmark_chains.push((chain_id, owner));
1139                    chain_clients.push(chain_client);
1140                }
1141            }
1142
1143            info!(
1144                "Created {} chains in {} ms",
1145                num_chains_to_create,
1146                start.elapsed().as_millis()
1147            );
1148        }
1149
1150        // Only update wallet if chains will be reused (not closed after benchmark)
1151        if !close_chains {
1152            info!("Updating wallet from client");
1153            self.update_wallet_from_client(&default_chain_client)
1154                .await?;
1155        }
1156        info!("Retrying pending outgoing messages");
1157        default_chain_client
1158            .retry_pending_outgoing_messages()
1159            .await
1160            .context("outgoing messages to create the new chains should be delivered")?;
1161        info!("Processing default chain inbox");
1162        default_chain_client.process_inbox().await?;
1163
1164        Ok((benchmark_chains, chain_clients))
1165    }
1166
1167    async fn execute_open_chains_operations(
1168        chain_client: &ChainClient<Env>,
1169        balance: Amount,
1170        owners: Vec<AccountOwner>,
1171    ) -> Result<ConfirmedBlockCertificate, Error> {
1172        let operations: Vec<_> = owners
1173            .iter()
1174            .map(|owner| {
1175                let config = OpenChainConfig {
1176                    ownership: ChainOwnership::single_super(*owner),
1177                    balance,
1178                    application_permissions: Default::default(),
1179                };
1180                Operation::system(SystemOperation::OpenChain(config))
1181            })
1182            .collect();
1183        info!("Executing {} OpenChain operations", operations.len());
1184        Ok(chain_client
1185            .execute_operations(operations, vec![])
1186            .await?
1187            .expect("should execute block with OpenChain operations"))
1188    }
1189
1190    /// Supplies fungible tokens to the chains.
1191    async fn supply_fungible_tokens(
1192        &mut self,
1193        key_pairs: &[(ChainId, AccountOwner)],
1194        application_id: ApplicationId,
1195    ) -> Result<(), Error> {
1196        let default_chain_id = self.default_chain();
1197        let default_key = self
1198            .wallet()
1199            .get(default_chain_id)
1200            .await
1201            .unwrap()
1202            .unwrap()
1203            .owner
1204            .unwrap();
1205        // This should be enough to run the benchmark at 1M TPS for an hour.
1206        let amount = Amount::from_nanos(4);
1207        let operations: Vec<Operation> = key_pairs
1208            .iter()
1209            .map(|(chain_id, owner)| {
1210                fungible_transfer(application_id, *chain_id, default_key, *owner, amount)
1211            })
1212            .collect();
1213        let chain_client = self.make_chain_client(default_chain_id).await?;
1214        // Put at most 1000 fungible token operations in each block.
1215        for operation_chunk in operations.chunks(1000) {
1216            chain_client
1217                .execute_operations(operation_chunk.to_vec(), vec![])
1218                .await?
1219                .expect("should execute block with Transfer operations");
1220        }
1221        self.update_wallet_from_client(&chain_client).await?;
1222
1223        Ok(())
1224    }
1225}