Skip to main content

linera_core/
local_node.rs

1// Copyright (c) Facebook, Inc. and its affiliates.
2// Copyright (c) Zefchain Labs, Inc.
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{
6    collections::{BTreeMap, HashMap, VecDeque},
7    sync::Arc,
8};
9
10use futures::{stream::FuturesUnordered, TryStreamExt as _};
11use linera_base::{
12    crypto::{CryptoHash, ValidatorPublicKey},
13    data_types::{ArithmeticError, Blob, BlockHeight},
14    identifiers::{BlobId, ChainId, EventId, StreamId},
15};
16use linera_chain::{
17    data_types::{BlockProposal, BundleExecutionPolicy, ProposedBlock},
18    types::{Block, GenericCertificate},
19};
20use linera_execution::{BlobState, Query, QueryOutcome, ResourceTracker};
21use linera_storage::Storage;
22use linera_views::ViewError;
23use thiserror::Error;
24use tracing::{instrument, warn};
25
26use crate::{
27    data_types::{ChainInfo, ChainInfoQuery, ChainInfoResponse},
28    notifier::Notifier,
29    worker::{ProcessableCertificate, WorkerError, WorkerState},
30};
31
32/// A local node with a single worker, typically used by clients.
33pub struct LocalNode<S>
34where
35    S: Storage,
36{
37    state: WorkerState<S>,
38}
39
40/// A client to a local node.
41#[derive(Clone)]
42pub struct LocalNodeClient<S>
43where
44    S: Storage,
45{
46    node: Arc<LocalNode<S>>,
47}
48
49/// Error type for the operations on a local node.
50#[derive(Debug, Error)]
51pub enum LocalNodeError {
52    #[error(transparent)]
53    ArithmeticError(#[from] ArithmeticError),
54
55    #[error(transparent)]
56    ViewError(#[from] ViewError),
57
58    #[error("Worker operation failed: {0}")]
59    WorkerError(WorkerError),
60
61    #[error("The local node doesn't have an active chain {0}")]
62    InactiveChain(ChainId),
63
64    #[error("The chain info response received from the local node is invalid")]
65    InvalidChainInfoResponse,
66
67    #[error("Blobs not found: {0:?}")]
68    BlobsNotFound(Vec<BlobId>),
69
70    #[error("Events not found: {0:?}")]
71    EventsNotFound(Vec<EventId>),
72}
73
74impl From<WorkerError> for LocalNodeError {
75    fn from(error: WorkerError) -> Self {
76        match error {
77            WorkerError::BlobsNotFound(blob_ids) => LocalNodeError::BlobsNotFound(blob_ids),
78            WorkerError::EventsNotFound(event_ids) => LocalNodeError::EventsNotFound(event_ids),
79            error => LocalNodeError::WorkerError(error),
80        }
81    }
82}
83
84impl<S> LocalNodeClient<S>
85where
86    S: Storage + Clone + 'static,
87{
88    #[instrument(level = "trace", skip_all)]
89    pub async fn handle_block_proposal(
90        &self,
91        proposal: BlockProposal,
92    ) -> Result<ChainInfoResponse, LocalNodeError> {
93        // In local nodes, cross-chain actions will be handled internally, so we discard them.
94        let (response, _actions) =
95            Box::pin(self.node.state.handle_block_proposal(proposal)).await?;
96        Ok(response)
97    }
98
99    #[instrument(level = "trace", skip_all)]
100    pub async fn handle_certificate<T>(
101        &self,
102        certificate: GenericCertificate<T>,
103        notifier: &impl Notifier,
104    ) -> Result<ChainInfoResponse, LocalNodeError>
105    where
106        T: ProcessableCertificate,
107    {
108        Ok(Box::pin(
109            self.node
110                .state
111                .fully_handle_certificate_with_notifications(certificate, notifier),
112        )
113        .await?)
114    }
115
116    #[instrument(level = "trace", skip_all)]
117    pub async fn handle_chain_info_query(
118        &self,
119        query: ChainInfoQuery,
120    ) -> Result<ChainInfoResponse, LocalNodeError> {
121        // In local nodes, cross-chain actions will be handled internally, so we discard them.
122        let (response, _actions) = self.node.state.handle_chain_info_query(query).await?;
123        Ok(response)
124    }
125
126    #[instrument(level = "trace", skip_all)]
127    pub fn new(state: WorkerState<S>) -> Self {
128        Self {
129            node: Arc::new(LocalNode { state }),
130        }
131    }
132
133    #[instrument(level = "trace", skip_all)]
134    pub(crate) fn storage_client(&self) -> S {
135        self.node.state.storage_client().clone()
136    }
137
138    /// Executes a block with a policy for handling bundle failures.
139    ///
140    /// Returns the modified block (bundles may be rejected/removed based on the policy),
141    /// the executed block, chain info response, and resource tracker.
142    #[instrument(level = "trace", skip_all)]
143    pub async fn stage_block_execution(
144        &self,
145        block: ProposedBlock,
146        round: Option<u32>,
147        published_blobs: Vec<Blob>,
148        policy: BundleExecutionPolicy,
149    ) -> Result<(ProposedBlock, Block, ChainInfoResponse, ResourceTracker), LocalNodeError> {
150        Ok(self
151            .node
152            .state
153            .stage_block_execution(block, round, published_blobs, policy)
154            .await?)
155    }
156
157    /// Reads blobs from storage.
158    pub async fn read_blobs_from_storage(
159        &self,
160        blob_ids: &[BlobId],
161    ) -> Result<Option<Vec<Blob>>, LocalNodeError> {
162        let storage = self.storage_client();
163        Ok(storage
164            .read_blobs(blob_ids)
165            .await?
166            .into_iter()
167            .map(|opt| opt.map(Arc::unwrap_or_clone))
168            .collect())
169    }
170
171    /// Reads blob states from storage.
172    pub async fn read_blob_states_from_storage(
173        &self,
174        blob_ids: &[BlobId],
175    ) -> Result<Vec<BlobState>, LocalNodeError> {
176        let storage = self.storage_client();
177        let mut blobs_not_found = Vec::new();
178        let mut blob_states = Vec::new();
179        for (blob_state, blob_id) in storage
180            .read_blob_states(blob_ids)
181            .await?
182            .into_iter()
183            .zip(blob_ids)
184        {
185            match blob_state {
186                None => blobs_not_found.push(*blob_id),
187                Some(blob_state) => blob_states.push(blob_state),
188            }
189        }
190        if !blobs_not_found.is_empty() {
191            return Err(LocalNodeError::BlobsNotFound(blobs_not_found));
192        }
193        Ok(blob_states)
194    }
195
196    /// Looks for the specified blobs in the local chain manager's locking blobs.
197    /// Returns `Ok(None)` if any of the blobs is not found.
198    pub async fn get_locking_blobs(
199        &self,
200        blob_ids: impl IntoIterator<Item = &BlobId>,
201        chain_id: ChainId,
202    ) -> Result<Option<Vec<Blob>>, LocalNodeError> {
203        let blob_ids_vec: Vec<_> = blob_ids.into_iter().copied().collect();
204        Ok(self
205            .node
206            .state
207            .get_locking_blobs(chain_id, blob_ids_vec)
208            .await?)
209    }
210
211    /// Writes the given blobs to storage if there is an appropriate blob state.
212    pub async fn store_blobs(&self, blobs: &[Blob]) -> Result<(), LocalNodeError> {
213        let storage = self.storage_client();
214        storage.maybe_write_blobs(blobs).await?;
215        Ok(())
216    }
217
218    pub async fn handle_pending_blobs(
219        &self,
220        chain_id: ChainId,
221        blobs: Vec<Blob>,
222    ) -> Result<(), LocalNodeError> {
223        for blob in blobs {
224            self.node.state.handle_pending_blob(chain_id, blob).await?;
225        }
226        Ok(())
227    }
228
229    /// Returns a read-only view of the [`ChainStateView`] of a chain referenced by its
230    /// [`ChainId`].
231    ///
232    /// The returned view holds a lock on the chain state, which prevents the local node from
233    /// changing the state of that chain.
234    #[instrument(level = "trace", skip(self))]
235    pub async fn chain_state_view(
236        &self,
237        chain_id: ChainId,
238    ) -> Result<crate::worker::ChainStateViewReadGuard<S>, LocalNodeError> {
239        Ok(self.node.state.chain_state_view(chain_id).await?)
240    }
241
242    #[instrument(level = "trace", skip(self))]
243    pub(crate) async fn chain_info(
244        &self,
245        chain_id: ChainId,
246    ) -> Result<Box<ChainInfo>, LocalNodeError> {
247        let query = ChainInfoQuery::new(chain_id);
248        Ok(self.handle_chain_info_query(query).await?.info)
249    }
250
251    #[instrument(level = "trace", skip(self, query))]
252    pub async fn query_application(
253        &self,
254        chain_id: ChainId,
255        query: Query,
256        block_hash: Option<CryptoHash>,
257    ) -> Result<(QueryOutcome, BlockHeight), LocalNodeError> {
258        let result = self
259            .node
260            .state
261            .query_application(chain_id, query, block_hash)
262            .await?;
263        Ok(result)
264    }
265
266    /// Handles any pending local cross-chain requests.
267    ///
268    /// Does not initialize the sender chain's execution state, so it is safe to
269    /// call even when the sender's `ChainDescription` blob is not in local storage.
270    /// Previously this went through `handle_chain_info_query`, which unconditionally
271    /// initialized the worker and therefore forced a `ChainDescription` download on
272    /// every call.
273    #[instrument(level = "trace", skip(self, notifier))]
274    pub async fn retry_pending_cross_chain_requests(
275        &self,
276        sender_chain: ChainId,
277        notifier: &impl Notifier,
278    ) -> Result<(), LocalNodeError> {
279        let actions = self
280            .node
281            .state
282            .cross_chain_network_actions(sender_chain)
283            .await?;
284        let mut requests = VecDeque::from_iter(actions.cross_chain_requests);
285        while let Some(request) = requests.pop_front() {
286            let new_actions = self.node.state.handle_cross_chain_request(request).await?;
287            notifier.notify(&new_actions.notifications);
288            requests.extend(new_actions.cross_chain_requests);
289        }
290        Ok(())
291    }
292
293    /// Given a list of chain IDs, returns a map that assigns to each of them the next block
294    /// height to schedule, i.e. the lowest block height for which we haven't added the messages
295    /// to `receiver_id` to the outbox yet.
296    pub async fn next_outbox_heights(
297        &self,
298        chain_ids: impl IntoIterator<Item = &ChainId>,
299        receiver_id: ChainId,
300    ) -> Result<BTreeMap<ChainId, BlockHeight>, LocalNodeError> {
301        let futures =
302            FuturesUnordered::from_iter(chain_ids.into_iter().map(|chain_id| async move {
303                let (next_block_height, next_height_to_schedule) = match self
304                    .get_tip_state_and_outbox_info(*chain_id, receiver_id)
305                    .await
306                {
307                    Ok(info) => info,
308                    Err(LocalNodeError::BlobsNotFound(_) | LocalNodeError::InactiveChain(_)) => {
309                        return Ok((*chain_id, BlockHeight::ZERO))
310                    }
311                    Err(err) => Err(err)?,
312                };
313                let next_height = if let Some(scheduled_height) = next_height_to_schedule {
314                    next_block_height.max(scheduled_height)
315                } else {
316                    next_block_height
317                };
318                Ok::<_, LocalNodeError>((*chain_id, next_height))
319            }));
320        futures.try_collect().await
321    }
322
323    pub async fn update_received_certificate_trackers(
324        &self,
325        chain_id: ChainId,
326        new_trackers: BTreeMap<ValidatorPublicKey, u64>,
327    ) -> Result<(), LocalNodeError> {
328        self.node
329            .state
330            .update_received_certificate_trackers(chain_id, new_trackers)
331            .await?;
332        Ok(())
333    }
334
335    pub async fn get_preprocessed_block_hashes(
336        &self,
337        chain_id: ChainId,
338        start: BlockHeight,
339        end: BlockHeight,
340    ) -> Result<Vec<linera_base::crypto::CryptoHash>, LocalNodeError> {
341        Ok(self
342            .node
343            .state
344            .get_preprocessed_block_hashes(chain_id, start, end)
345            .await?)
346    }
347
348    pub async fn get_inbox_next_height(
349        &self,
350        chain_id: ChainId,
351        origin: ChainId,
352    ) -> Result<BlockHeight, LocalNodeError> {
353        Ok(self
354            .node
355            .state
356            .get_inbox_next_height(chain_id, origin)
357            .await?)
358    }
359
360    /// Gets block hashes for the given heights.
361    pub async fn get_block_hashes(
362        &self,
363        chain_id: ChainId,
364        heights: Vec<BlockHeight>,
365    ) -> Result<Vec<CryptoHash>, LocalNodeError> {
366        Ok(self.node.state.get_block_hashes(chain_id, heights).await?)
367    }
368
369    /// Gets proposed blobs from the manager for specified blob IDs.
370    pub async fn get_proposed_blobs(
371        &self,
372        chain_id: ChainId,
373        blob_ids: Vec<BlobId>,
374    ) -> Result<Vec<Blob>, LocalNodeError> {
375        Ok(self
376            .node
377            .state
378            .get_proposed_blobs(chain_id, blob_ids)
379            .await?)
380    }
381
382    /// Gets event subscriptions from the chain.
383    pub async fn get_event_subscriptions(
384        &self,
385        chain_id: ChainId,
386    ) -> Result<crate::worker::EventSubscriptionsResult, LocalNodeError> {
387        Ok(self.node.state.get_event_subscriptions(chain_id).await?)
388    }
389
390    /// Gets the `next_expected_events` indices for the given streams.
391    pub async fn next_expected_events(
392        &self,
393        chain_id: ChainId,
394        stream_ids: Vec<StreamId>,
395    ) -> Result<BTreeMap<StreamId, u32>, LocalNodeError> {
396        Ok(self
397            .node
398            .state
399            .next_expected_events(chain_id, stream_ids)
400            .await?)
401    }
402
403    /// Gets the stream event count for a stream.
404    pub async fn get_stream_event_count(
405        &self,
406        chain_id: ChainId,
407        stream_id: StreamId,
408    ) -> Result<Option<u32>, LocalNodeError> {
409        Ok(self
410            .node
411            .state
412            .get_stream_event_count(chain_id, stream_id)
413            .await?)
414    }
415
416    /// Gets received certificate trackers.
417    pub async fn get_received_certificate_trackers(
418        &self,
419        chain_id: ChainId,
420    ) -> Result<HashMap<ValidatorPublicKey, u64>, LocalNodeError> {
421        Ok(self
422            .node
423            .state
424            .get_received_certificate_trackers(chain_id)
425            .await?)
426    }
427
428    /// Gets tip state and outbox info for next_outbox_heights calculation.
429    pub async fn get_tip_state_and_outbox_info(
430        &self,
431        chain_id: ChainId,
432        receiver_id: ChainId,
433    ) -> Result<(BlockHeight, Option<BlockHeight>), LocalNodeError> {
434        Ok(self
435            .node
436            .state
437            .get_tip_state_and_outbox_info(chain_id, receiver_id)
438            .await?)
439    }
440
441    /// Gets the next height to preprocess.
442    pub async fn get_next_height_to_preprocess(
443        &self,
444        chain_id: ChainId,
445    ) -> Result<BlockHeight, LocalNodeError> {
446        Ok(self
447            .node
448            .state
449            .get_next_height_to_preprocess(chain_id)
450            .await?)
451    }
452
453    /// Gets the chain manager's seed for leader election.
454    pub async fn get_manager_seed(&self, chain_id: ChainId) -> Result<u64, LocalNodeError> {
455        Ok(self.node.state.get_manager_seed(chain_id).await?)
456    }
457}