Skip to main content

linera_core/
local_node.rs

1// Copyright (c) Facebook, Inc. and its affiliates.
2// Copyright (c) Zefchain Labs, Inc.
3// SPDX-License-Identifier: Apache-2.0
4
5use std::{
6    collections::{BTreeMap, HashMap, VecDeque},
7    sync::Arc,
8};
9
10use futures::{stream::FuturesUnordered, TryStreamExt as _};
11use linera_base::{
12    crypto::{CryptoHash, ValidatorPublicKey},
13    data_types::{ArithmeticError, Blob, BlockHeight, Epoch},
14    identifiers::{BlobId, ChainId, EventId, StreamId},
15};
16use linera_chain::{
17    data_types::{BlockProposal, BundleExecutionPolicy, ProposedBlock},
18    types::{Block, GenericCertificate},
19    ChainStateView,
20};
21use linera_execution::{committee::Committee, BlobState, Query, QueryOutcome, ResourceTracker};
22use linera_storage::Storage;
23use linera_views::ViewError;
24use thiserror::Error;
25use tokio::sync::OwnedRwLockReadGuard;
26use tracing::{instrument, warn};
27
28use crate::{
29    data_types::{ChainInfo, ChainInfoQuery, ChainInfoResponse},
30    notifier::Notifier,
31    worker::{ProcessableCertificate, WorkerError, WorkerState},
32};
33
34/// A local node with a single worker, typically used by clients.
35pub struct LocalNode<S>
36where
37    S: Storage,
38{
39    state: WorkerState<S>,
40}
41
42/// A client to a local node.
43#[derive(Clone)]
44pub struct LocalNodeClient<S>
45where
46    S: Storage,
47{
48    node: Arc<LocalNode<S>>,
49}
50
51/// Error type for the operations on a local node.
52#[derive(Debug, Error)]
53pub enum LocalNodeError {
54    #[error(transparent)]
55    ArithmeticError(#[from] ArithmeticError),
56
57    #[error(transparent)]
58    ViewError(#[from] ViewError),
59
60    #[error("Worker operation failed: {0}")]
61    WorkerError(WorkerError),
62
63    #[error("The local node doesn't have an active chain {0}")]
64    InactiveChain(ChainId),
65
66    #[error("The chain info response received from the local node is invalid")]
67    InvalidChainInfoResponse,
68
69    #[error("Blobs not found: {0:?}")]
70    BlobsNotFound(Vec<BlobId>),
71
72    #[error("Events not found: {0:?}")]
73    EventsNotFound(Vec<EventId>),
74}
75
76impl From<WorkerError> for LocalNodeError {
77    fn from(error: WorkerError) -> Self {
78        match error {
79            WorkerError::BlobsNotFound(blob_ids) => LocalNodeError::BlobsNotFound(blob_ids),
80            WorkerError::EventsNotFound(event_ids) => LocalNodeError::EventsNotFound(event_ids),
81            error => LocalNodeError::WorkerError(error),
82        }
83    }
84}
85
86impl<S> LocalNodeClient<S>
87where
88    S: Storage + Clone + 'static,
89{
90    #[instrument(level = "trace", skip_all)]
91    pub async fn handle_block_proposal(
92        &self,
93        proposal: BlockProposal,
94    ) -> Result<ChainInfoResponse, LocalNodeError> {
95        // In local nodes, cross-chain actions will be handled internally, so we discard them.
96        let (response, _actions) =
97            Box::pin(self.node.state.handle_block_proposal(proposal)).await?;
98        Ok(response)
99    }
100
101    #[instrument(level = "trace", skip_all)]
102    pub async fn handle_certificate<T>(
103        &self,
104        certificate: GenericCertificate<T>,
105        notifier: &impl Notifier,
106    ) -> Result<ChainInfoResponse, LocalNodeError>
107    where
108        T: ProcessableCertificate,
109    {
110        Ok(Box::pin(
111            self.node
112                .state
113                .fully_handle_certificate_with_notifications(certificate, notifier),
114        )
115        .await?)
116    }
117
118    #[instrument(level = "trace", skip_all)]
119    pub async fn handle_chain_info_query(
120        &self,
121        query: ChainInfoQuery,
122    ) -> Result<ChainInfoResponse, LocalNodeError> {
123        // In local nodes, cross-chain actions will be handled internally, so we discard them.
124        let (response, _actions) = self.node.state.handle_chain_info_query(query).await?;
125        Ok(response)
126    }
127
128    #[instrument(level = "trace", skip_all)]
129    pub fn new(state: WorkerState<S>) -> Self {
130        Self {
131            node: Arc::new(LocalNode { state }),
132        }
133    }
134
135    #[instrument(level = "trace", skip_all)]
136    pub(crate) fn storage_client(&self) -> S {
137        self.node.state.storage_client().clone()
138    }
139
140    #[instrument(level = "trace", skip_all)]
141    pub async fn stage_block_execution(
142        &self,
143        block: ProposedBlock,
144        round: Option<u32>,
145        published_blobs: Vec<Blob>,
146    ) -> Result<(Block, ChainInfoResponse, ResourceTracker), LocalNodeError> {
147        Ok(self
148            .node
149            .state
150            .stage_block_execution(block, round, published_blobs)
151            .await?)
152    }
153
154    /// Executes a block with a policy for handling bundle failures.
155    ///
156    /// Returns the modified block (bundles may be rejected/removed based on the policy),
157    /// the executed block, chain info response, and resource tracker.
158    #[instrument(level = "trace", skip_all)]
159    pub async fn stage_block_execution_with_policy(
160        &self,
161        block: ProposedBlock,
162        round: Option<u32>,
163        published_blobs: Vec<Blob>,
164        policy: BundleExecutionPolicy,
165    ) -> Result<(ProposedBlock, Block, ChainInfoResponse, ResourceTracker), LocalNodeError> {
166        Ok(self
167            .node
168            .state
169            .stage_block_execution_with_policy(block, round, published_blobs, policy)
170            .await?)
171    }
172
173    /// Reads blobs from storage.
174    pub async fn read_blobs_from_storage(
175        &self,
176        blob_ids: &[BlobId],
177    ) -> Result<Option<Vec<Blob>>, LocalNodeError> {
178        let storage = self.storage_client();
179        Ok(storage.read_blobs(blob_ids).await?.into_iter().collect())
180    }
181
182    /// Reads blob states from storage.
183    pub async fn read_blob_states_from_storage(
184        &self,
185        blob_ids: &[BlobId],
186    ) -> Result<Vec<BlobState>, LocalNodeError> {
187        let storage = self.storage_client();
188        let mut blobs_not_found = Vec::new();
189        let mut blob_states = Vec::new();
190        for (blob_state, blob_id) in storage
191            .read_blob_states(blob_ids)
192            .await?
193            .into_iter()
194            .zip(blob_ids)
195        {
196            match blob_state {
197                None => blobs_not_found.push(*blob_id),
198                Some(blob_state) => blob_states.push(blob_state),
199            }
200        }
201        if !blobs_not_found.is_empty() {
202            return Err(LocalNodeError::BlobsNotFound(blobs_not_found));
203        }
204        Ok(blob_states)
205    }
206
207    /// Looks for the specified blobs in the local chain manager's locking blobs.
208    /// Returns `Ok(None)` if any of the blobs is not found.
209    pub async fn get_locking_blobs(
210        &self,
211        blob_ids: impl IntoIterator<Item = &BlobId>,
212        chain_id: ChainId,
213    ) -> Result<Option<Vec<Blob>>, LocalNodeError> {
214        let blob_ids_vec: Vec<_> = blob_ids.into_iter().copied().collect();
215        Ok(self
216            .node
217            .state
218            .get_locking_blobs(chain_id, blob_ids_vec)
219            .await?)
220    }
221
222    /// Writes the given blobs to storage if there is an appropriate blob state.
223    pub async fn store_blobs(&self, blobs: &[Blob]) -> Result<(), LocalNodeError> {
224        let storage = self.storage_client();
225        storage.maybe_write_blobs(blobs).await?;
226        Ok(())
227    }
228
229    pub async fn handle_pending_blobs(
230        &self,
231        chain_id: ChainId,
232        blobs: Vec<Blob>,
233    ) -> Result<(), LocalNodeError> {
234        for blob in blobs {
235            self.node.state.handle_pending_blob(chain_id, blob).await?;
236        }
237        Ok(())
238    }
239
240    /// Returns a read-only view of the [`ChainStateView`] of a chain referenced by its
241    /// [`ChainId`].
242    ///
243    /// The returned view holds a lock on the chain state, which prevents the local node from
244    /// changing the state of that chain.
245    #[instrument(level = "trace", skip(self))]
246    pub async fn chain_state_view(
247        &self,
248        chain_id: ChainId,
249    ) -> Result<OwnedRwLockReadGuard<ChainStateView<S::Context>>, LocalNodeError> {
250        Ok(self.node.state.chain_state_view(chain_id).await?)
251    }
252
253    #[instrument(level = "trace", skip(self))]
254    pub(crate) async fn chain_info(
255        &self,
256        chain_id: ChainId,
257    ) -> Result<Box<ChainInfo>, LocalNodeError> {
258        let query = ChainInfoQuery::new(chain_id);
259        Ok(self.handle_chain_info_query(query).await?.info)
260    }
261
262    #[instrument(level = "trace", skip(self, query))]
263    pub async fn query_application(
264        &self,
265        chain_id: ChainId,
266        query: Query,
267        block_hash: Option<CryptoHash>,
268    ) -> Result<(QueryOutcome, BlockHeight), LocalNodeError> {
269        let result = self
270            .node
271            .state
272            .query_application(chain_id, query, block_hash)
273            .await?;
274        Ok(result)
275    }
276
277    /// Handles any pending local cross-chain requests.
278    #[instrument(level = "trace", skip(self))]
279    pub async fn retry_pending_cross_chain_requests(
280        &self,
281        sender_chain: ChainId,
282    ) -> Result<(), LocalNodeError> {
283        let (_response, actions) = self
284            .node
285            .state
286            .handle_chain_info_query(ChainInfoQuery::new(sender_chain).with_network_actions())
287            .await?;
288        let mut requests = VecDeque::from_iter(actions.cross_chain_requests);
289        while let Some(request) = requests.pop_front() {
290            let new_actions = self.node.state.handle_cross_chain_request(request).await?;
291            requests.extend(new_actions.cross_chain_requests);
292        }
293        Ok(())
294    }
295
296    /// Given a list of chain IDs, returns a map that assigns to each of them the next block
297    /// height to schedule, i.e. the lowest block height for which we haven't added the messages
298    /// to `receiver_id` to the outbox yet.
299    pub async fn next_outbox_heights(
300        &self,
301        chain_ids: impl IntoIterator<Item = &ChainId>,
302        receiver_id: ChainId,
303    ) -> Result<BTreeMap<ChainId, BlockHeight>, LocalNodeError> {
304        let futures =
305            FuturesUnordered::from_iter(chain_ids.into_iter().map(|chain_id| async move {
306                let (next_block_height, next_height_to_schedule) = match self
307                    .get_tip_state_and_outbox_info(*chain_id, receiver_id)
308                    .await
309                {
310                    Ok(info) => info,
311                    Err(LocalNodeError::BlobsNotFound(_) | LocalNodeError::InactiveChain(_)) => {
312                        return Ok((*chain_id, BlockHeight::ZERO))
313                    }
314                    Err(err) => Err(err)?,
315                };
316                let next_height = if let Some(scheduled_height) = next_height_to_schedule {
317                    next_block_height.max(scheduled_height)
318                } else {
319                    next_block_height
320                };
321                Ok::<_, LocalNodeError>((*chain_id, next_height))
322            }));
323        futures.try_collect().await
324    }
325
326    pub async fn update_received_certificate_trackers(
327        &self,
328        chain_id: ChainId,
329        new_trackers: BTreeMap<ValidatorPublicKey, u64>,
330    ) -> Result<(), LocalNodeError> {
331        self.node
332            .state
333            .update_received_certificate_trackers(chain_id, new_trackers)
334            .await?;
335        Ok(())
336    }
337
338    pub async fn get_preprocessed_block_hashes(
339        &self,
340        chain_id: ChainId,
341        start: BlockHeight,
342        end: BlockHeight,
343    ) -> Result<Vec<linera_base::crypto::CryptoHash>, LocalNodeError> {
344        Ok(self
345            .node
346            .state
347            .get_preprocessed_block_hashes(chain_id, start, end)
348            .await?)
349    }
350
351    pub async fn get_inbox_next_height(
352        &self,
353        chain_id: ChainId,
354        origin: ChainId,
355    ) -> Result<BlockHeight, LocalNodeError> {
356        Ok(self
357            .node
358            .state
359            .get_inbox_next_height(chain_id, origin)
360            .await?)
361    }
362
363    /// Gets block hashes for the given heights.
364    pub async fn get_block_hashes(
365        &self,
366        chain_id: ChainId,
367        heights: Vec<BlockHeight>,
368    ) -> Result<Vec<CryptoHash>, LocalNodeError> {
369        Ok(self.node.state.get_block_hashes(chain_id, heights).await?)
370    }
371
372    /// Gets proposed blobs from the manager for specified blob IDs.
373    pub async fn get_proposed_blobs(
374        &self,
375        chain_id: ChainId,
376        blob_ids: Vec<BlobId>,
377    ) -> Result<Vec<Blob>, LocalNodeError> {
378        Ok(self
379            .node
380            .state
381            .get_proposed_blobs(chain_id, blob_ids)
382            .await?)
383    }
384
385    /// Gets event subscriptions from the chain.
386    pub async fn get_event_subscriptions(
387        &self,
388        chain_id: ChainId,
389    ) -> Result<crate::worker::EventSubscriptionsResult, LocalNodeError> {
390        Ok(self.node.state.get_event_subscriptions(chain_id).await?)
391    }
392
393    /// Gets the stream event count for a stream.
394    pub async fn get_stream_event_count(
395        &self,
396        chain_id: ChainId,
397        stream_id: StreamId,
398    ) -> Result<Option<u32>, LocalNodeError> {
399        Ok(self
400            .node
401            .state
402            .get_stream_event_count(chain_id, stream_id)
403            .await?)
404    }
405
406    /// Gets received certificate trackers.
407    pub async fn get_received_certificate_trackers(
408        &self,
409        chain_id: ChainId,
410    ) -> Result<HashMap<ValidatorPublicKey, u64>, LocalNodeError> {
411        Ok(self
412            .node
413            .state
414            .get_received_certificate_trackers(chain_id)
415            .await?)
416    }
417
418    /// Gets tip state and outbox info for next_outbox_heights calculation.
419    pub async fn get_tip_state_and_outbox_info(
420        &self,
421        chain_id: ChainId,
422        receiver_id: ChainId,
423    ) -> Result<(BlockHeight, Option<BlockHeight>), LocalNodeError> {
424        Ok(self
425            .node
426            .state
427            .get_tip_state_and_outbox_info(chain_id, receiver_id)
428            .await?)
429    }
430
431    /// Gets the next height to preprocess.
432    pub async fn get_next_height_to_preprocess(
433        &self,
434        chain_id: ChainId,
435    ) -> Result<BlockHeight, LocalNodeError> {
436        Ok(self
437            .node
438            .state
439            .get_next_height_to_preprocess(chain_id)
440            .await?)
441    }
442
443    /// Gets the chain manager's seed for leader election.
444    pub async fn get_manager_seed(&self, chain_id: ChainId) -> Result<u64, LocalNodeError> {
445        Ok(self.node.state.get_manager_seed(chain_id).await?)
446    }
447}
448
449/// Extension trait for [`ChainInfo`]s from our local node. These should always be valid and
450/// contain the requested information.
451pub trait LocalChainInfoExt {
452    /// Returns the requested map of committees.
453    fn into_committees(self) -> Result<BTreeMap<Epoch, Committee>, LocalNodeError>;
454
455    /// Returns the current committee.
456    fn into_current_committee(self) -> Result<Committee, LocalNodeError>;
457
458    /// Returns a reference to the current committee.
459    fn current_committee(&self) -> Result<&Committee, LocalNodeError>;
460}
461
462impl LocalChainInfoExt for ChainInfo {
463    fn into_committees(self) -> Result<BTreeMap<Epoch, Committee>, LocalNodeError> {
464        self.requested_committees
465            .ok_or(LocalNodeError::InvalidChainInfoResponse)
466    }
467
468    fn into_current_committee(self) -> Result<Committee, LocalNodeError> {
469        self.requested_committees
470            .ok_or(LocalNodeError::InvalidChainInfoResponse)?
471            .remove(&self.epoch)
472            .ok_or(LocalNodeError::InactiveChain(self.chain_id))
473    }
474
475    fn current_committee(&self) -> Result<&Committee, LocalNodeError> {
476        self.requested_committees
477            .as_ref()
478            .ok_or(LocalNodeError::InvalidChainInfoResponse)?
479            .get(&self.epoch)
480            .ok_or(LocalNodeError::InactiveChain(self.chain_id))
481    }
482}