1use std::{
6 collections::{BTreeMap, HashMap, VecDeque},
7 sync::Arc,
8};
9
10use futures::{stream::FuturesUnordered, TryStreamExt as _};
11use linera_base::{
12 crypto::{CryptoHash, ValidatorPublicKey},
13 data_types::{ArithmeticError, Blob, BlockHeight},
14 identifiers::{BlobId, ChainId, EventId, StreamId},
15};
16use linera_chain::{
17 data_types::{BlockProposal, BundleExecutionPolicy, ProposedBlock},
18 types::{Block, GenericCertificate},
19};
20use linera_execution::{BlobState, Query, QueryOutcome, ResourceTracker};
21use linera_storage::Storage;
22use linera_views::ViewError;
23use thiserror::Error;
24use tracing::{instrument, warn};
25
26use crate::{
27 data_types::{ChainInfo, ChainInfoQuery, ChainInfoResponse},
28 notifier::Notifier,
29 worker::{ProcessableCertificate, WorkerError, WorkerState},
30};
31
32pub struct LocalNode<S>
34where
35 S: Storage,
36{
37 state: WorkerState<S>,
38}
39
40#[derive(Clone)]
42pub struct LocalNodeClient<S>
43where
44 S: Storage,
45{
46 node: Arc<LocalNode<S>>,
47}
48
49#[derive(Debug, Error)]
51pub enum LocalNodeError {
52 #[error(transparent)]
53 ArithmeticError(#[from] ArithmeticError),
54
55 #[error(transparent)]
56 ViewError(#[from] ViewError),
57
58 #[error("Worker operation failed: {0}")]
59 WorkerError(WorkerError),
60
61 #[error("The local node doesn't have an active chain {0}")]
62 InactiveChain(ChainId),
63
64 #[error("The chain info response received from the local node is invalid")]
65 InvalidChainInfoResponse,
66
67 #[error("Blobs not found: {0:?}")]
68 BlobsNotFound(Vec<BlobId>),
69
70 #[error("Events not found: {0:?}")]
71 EventsNotFound(Vec<EventId>),
72}
73
74impl From<WorkerError> for LocalNodeError {
75 fn from(error: WorkerError) -> Self {
76 match error {
77 WorkerError::BlobsNotFound(blob_ids) => LocalNodeError::BlobsNotFound(blob_ids),
78 WorkerError::EventsNotFound(event_ids) => LocalNodeError::EventsNotFound(event_ids),
79 error => LocalNodeError::WorkerError(error),
80 }
81 }
82}
83
84impl<S> LocalNodeClient<S>
85where
86 S: Storage + Clone + 'static,
87{
88 #[instrument(level = "trace", skip_all)]
89 pub async fn handle_block_proposal(
90 &self,
91 proposal: BlockProposal,
92 ) -> Result<ChainInfoResponse, LocalNodeError> {
93 let (response, _actions) =
95 Box::pin(self.node.state.handle_block_proposal(proposal)).await?;
96 Ok(response)
97 }
98
99 #[instrument(level = "trace", skip_all)]
100 pub async fn handle_certificate<T>(
101 &self,
102 certificate: GenericCertificate<T>,
103 notifier: &impl Notifier,
104 ) -> Result<ChainInfoResponse, LocalNodeError>
105 where
106 T: ProcessableCertificate,
107 {
108 Ok(Box::pin(
109 self.node
110 .state
111 .fully_handle_certificate_with_notifications(certificate, notifier),
112 )
113 .await?)
114 }
115
116 #[instrument(level = "trace", skip_all)]
117 pub async fn handle_chain_info_query(
118 &self,
119 query: ChainInfoQuery,
120 ) -> Result<ChainInfoResponse, LocalNodeError> {
121 let (response, _actions) = self.node.state.handle_chain_info_query(query).await?;
123 Ok(response)
124 }
125
126 #[instrument(level = "trace", skip_all)]
127 pub fn new(state: WorkerState<S>) -> Self {
128 Self {
129 node: Arc::new(LocalNode { state }),
130 }
131 }
132
133 #[instrument(level = "trace", skip_all)]
134 pub(crate) fn storage_client(&self) -> S {
135 self.node.state.storage_client().clone()
136 }
137
138 #[instrument(level = "trace", skip_all)]
143 pub async fn stage_block_execution(
144 &self,
145 block: ProposedBlock,
146 round: Option<u32>,
147 published_blobs: Vec<Blob>,
148 policy: BundleExecutionPolicy,
149 ) -> Result<(ProposedBlock, Block, ChainInfoResponse, ResourceTracker), LocalNodeError> {
150 Ok(self
151 .node
152 .state
153 .stage_block_execution(block, round, published_blobs, policy)
154 .await?)
155 }
156
157 pub async fn read_blobs_from_storage(
159 &self,
160 blob_ids: &[BlobId],
161 ) -> Result<Option<Vec<Blob>>, LocalNodeError> {
162 let storage = self.storage_client();
163 Ok(storage
164 .read_blobs(blob_ids)
165 .await?
166 .into_iter()
167 .map(|opt| opt.map(Arc::unwrap_or_clone))
168 .collect())
169 }
170
171 pub async fn read_blob_states_from_storage(
173 &self,
174 blob_ids: &[BlobId],
175 ) -> Result<Vec<BlobState>, LocalNodeError> {
176 let storage = self.storage_client();
177 let mut blobs_not_found = Vec::new();
178 let mut blob_states = Vec::new();
179 for (blob_state, blob_id) in storage
180 .read_blob_states(blob_ids)
181 .await?
182 .into_iter()
183 .zip(blob_ids)
184 {
185 match blob_state {
186 None => blobs_not_found.push(*blob_id),
187 Some(blob_state) => blob_states.push(blob_state),
188 }
189 }
190 if !blobs_not_found.is_empty() {
191 return Err(LocalNodeError::BlobsNotFound(blobs_not_found));
192 }
193 Ok(blob_states)
194 }
195
196 pub async fn get_locking_blobs(
199 &self,
200 blob_ids: impl IntoIterator<Item = &BlobId>,
201 chain_id: ChainId,
202 ) -> Result<Option<Vec<Blob>>, LocalNodeError> {
203 let blob_ids_vec: Vec<_> = blob_ids.into_iter().copied().collect();
204 Ok(self
205 .node
206 .state
207 .get_locking_blobs(chain_id, blob_ids_vec)
208 .await?)
209 }
210
211 pub async fn store_blobs(&self, blobs: &[Blob]) -> Result<(), LocalNodeError> {
213 let storage = self.storage_client();
214 storage.maybe_write_blobs(blobs).await?;
215 Ok(())
216 }
217
218 pub async fn handle_pending_blobs(
219 &self,
220 chain_id: ChainId,
221 blobs: Vec<Blob>,
222 ) -> Result<(), LocalNodeError> {
223 for blob in blobs {
224 self.node.state.handle_pending_blob(chain_id, blob).await?;
225 }
226 Ok(())
227 }
228
229 #[instrument(level = "trace", skip(self))]
235 pub async fn chain_state_view(
236 &self,
237 chain_id: ChainId,
238 ) -> Result<crate::worker::ChainStateViewReadGuard<S>, LocalNodeError> {
239 Ok(self.node.state.chain_state_view(chain_id).await?)
240 }
241
242 #[instrument(level = "trace", skip(self))]
243 pub(crate) async fn chain_info(
244 &self,
245 chain_id: ChainId,
246 ) -> Result<Box<ChainInfo>, LocalNodeError> {
247 let query = ChainInfoQuery::new(chain_id);
248 Ok(self.handle_chain_info_query(query).await?.info)
249 }
250
251 #[instrument(level = "trace", skip(self, query))]
252 pub async fn query_application(
253 &self,
254 chain_id: ChainId,
255 query: Query,
256 block_hash: Option<CryptoHash>,
257 ) -> Result<(QueryOutcome, BlockHeight), LocalNodeError> {
258 let result = self
259 .node
260 .state
261 .query_application(chain_id, query, block_hash)
262 .await?;
263 Ok(result)
264 }
265
266 #[instrument(level = "trace", skip(self, notifier))]
274 pub async fn retry_pending_cross_chain_requests(
275 &self,
276 sender_chain: ChainId,
277 notifier: &impl Notifier,
278 ) -> Result<(), LocalNodeError> {
279 let actions = self
280 .node
281 .state
282 .cross_chain_network_actions(sender_chain)
283 .await?;
284 let mut requests = VecDeque::from_iter(actions.cross_chain_requests);
285 while let Some(request) = requests.pop_front() {
286 let new_actions = self.node.state.handle_cross_chain_request(request).await?;
287 notifier.notify(&new_actions.notifications);
288 requests.extend(new_actions.cross_chain_requests);
289 }
290 Ok(())
291 }
292
293 pub async fn next_outbox_heights(
297 &self,
298 chain_ids: impl IntoIterator<Item = &ChainId>,
299 receiver_id: ChainId,
300 ) -> Result<BTreeMap<ChainId, BlockHeight>, LocalNodeError> {
301 let futures =
302 FuturesUnordered::from_iter(chain_ids.into_iter().map(|chain_id| async move {
303 let (next_block_height, next_height_to_schedule) = match self
304 .get_tip_state_and_outbox_info(*chain_id, receiver_id)
305 .await
306 {
307 Ok(info) => info,
308 Err(LocalNodeError::BlobsNotFound(_) | LocalNodeError::InactiveChain(_)) => {
309 return Ok((*chain_id, BlockHeight::ZERO))
310 }
311 Err(err) => Err(err)?,
312 };
313 let next_height = if let Some(scheduled_height) = next_height_to_schedule {
314 next_block_height.max(scheduled_height)
315 } else {
316 next_block_height
317 };
318 Ok::<_, LocalNodeError>((*chain_id, next_height))
319 }));
320 futures.try_collect().await
321 }
322
323 pub async fn update_received_certificate_trackers(
324 &self,
325 chain_id: ChainId,
326 new_trackers: BTreeMap<ValidatorPublicKey, u64>,
327 ) -> Result<(), LocalNodeError> {
328 self.node
329 .state
330 .update_received_certificate_trackers(chain_id, new_trackers)
331 .await?;
332 Ok(())
333 }
334
335 pub async fn get_preprocessed_block_hashes(
336 &self,
337 chain_id: ChainId,
338 start: BlockHeight,
339 end: BlockHeight,
340 ) -> Result<Vec<linera_base::crypto::CryptoHash>, LocalNodeError> {
341 Ok(self
342 .node
343 .state
344 .get_preprocessed_block_hashes(chain_id, start, end)
345 .await?)
346 }
347
348 pub async fn get_inbox_next_height(
349 &self,
350 chain_id: ChainId,
351 origin: ChainId,
352 ) -> Result<BlockHeight, LocalNodeError> {
353 Ok(self
354 .node
355 .state
356 .get_inbox_next_height(chain_id, origin)
357 .await?)
358 }
359
360 pub async fn get_block_hashes(
362 &self,
363 chain_id: ChainId,
364 heights: Vec<BlockHeight>,
365 ) -> Result<Vec<CryptoHash>, LocalNodeError> {
366 Ok(self.node.state.get_block_hashes(chain_id, heights).await?)
367 }
368
369 pub async fn get_proposed_blobs(
371 &self,
372 chain_id: ChainId,
373 blob_ids: Vec<BlobId>,
374 ) -> Result<Vec<Blob>, LocalNodeError> {
375 Ok(self
376 .node
377 .state
378 .get_proposed_blobs(chain_id, blob_ids)
379 .await?)
380 }
381
382 pub async fn get_event_subscriptions(
384 &self,
385 chain_id: ChainId,
386 ) -> Result<crate::worker::EventSubscriptionsResult, LocalNodeError> {
387 Ok(self.node.state.get_event_subscriptions(chain_id).await?)
388 }
389
390 pub async fn next_expected_events(
392 &self,
393 chain_id: ChainId,
394 stream_ids: Vec<StreamId>,
395 ) -> Result<BTreeMap<StreamId, u32>, LocalNodeError> {
396 Ok(self
397 .node
398 .state
399 .next_expected_events(chain_id, stream_ids)
400 .await?)
401 }
402
403 pub async fn get_stream_event_count(
405 &self,
406 chain_id: ChainId,
407 stream_id: StreamId,
408 ) -> Result<Option<u32>, LocalNodeError> {
409 Ok(self
410 .node
411 .state
412 .get_stream_event_count(chain_id, stream_id)
413 .await?)
414 }
415
416 pub async fn get_received_certificate_trackers(
418 &self,
419 chain_id: ChainId,
420 ) -> Result<HashMap<ValidatorPublicKey, u64>, LocalNodeError> {
421 Ok(self
422 .node
423 .state
424 .get_received_certificate_trackers(chain_id)
425 .await?)
426 }
427
428 pub async fn get_tip_state_and_outbox_info(
430 &self,
431 chain_id: ChainId,
432 receiver_id: ChainId,
433 ) -> Result<(BlockHeight, Option<BlockHeight>), LocalNodeError> {
434 Ok(self
435 .node
436 .state
437 .get_tip_state_and_outbox_info(chain_id, receiver_id)
438 .await?)
439 }
440
441 pub async fn get_next_height_to_preprocess(
443 &self,
444 chain_id: ChainId,
445 ) -> Result<BlockHeight, LocalNodeError> {
446 Ok(self
447 .node
448 .state
449 .get_next_height_to_preprocess(chain_id)
450 .await?)
451 }
452
453 pub async fn get_manager_seed(&self, chain_id: ChainId) -> Result<u64, LocalNodeError> {
455 Ok(self.node.state.get_manager_seed(chain_id).await?)
456 }
457}