1use std::{
6 collections::{BTreeMap, HashMap, VecDeque},
7 sync::Arc,
8};
9
10use futures::{stream::FuturesUnordered, TryStreamExt as _};
11use linera_base::{
12 crypto::{CryptoHash, ValidatorPublicKey},
13 data_types::{ArithmeticError, Blob, BlockHeight, Epoch},
14 identifiers::{BlobId, ChainId, EventId, StreamId},
15};
16use linera_chain::{
17 data_types::{BlockProposal, BundleExecutionPolicy, ProposedBlock},
18 types::{Block, GenericCertificate},
19 ChainStateView,
20};
21use linera_execution::{committee::Committee, BlobState, Query, QueryOutcome, ResourceTracker};
22use linera_storage::Storage;
23use linera_views::ViewError;
24use thiserror::Error;
25use tokio::sync::OwnedRwLockReadGuard;
26use tracing::{instrument, warn};
27
28use crate::{
29 data_types::{ChainInfo, ChainInfoQuery, ChainInfoResponse},
30 notifier::Notifier,
31 worker::{ProcessableCertificate, WorkerError, WorkerState},
32};
33
34pub struct LocalNode<S>
36where
37 S: Storage,
38{
39 state: WorkerState<S>,
40}
41
42#[derive(Clone)]
44pub struct LocalNodeClient<S>
45where
46 S: Storage,
47{
48 node: Arc<LocalNode<S>>,
49}
50
51#[derive(Debug, Error)]
53pub enum LocalNodeError {
54 #[error(transparent)]
55 ArithmeticError(#[from] ArithmeticError),
56
57 #[error(transparent)]
58 ViewError(#[from] ViewError),
59
60 #[error("Worker operation failed: {0}")]
61 WorkerError(WorkerError),
62
63 #[error("The local node doesn't have an active chain {0}")]
64 InactiveChain(ChainId),
65
66 #[error("The chain info response received from the local node is invalid")]
67 InvalidChainInfoResponse,
68
69 #[error("Blobs not found: {0:?}")]
70 BlobsNotFound(Vec<BlobId>),
71
72 #[error("Events not found: {0:?}")]
73 EventsNotFound(Vec<EventId>),
74}
75
76impl From<WorkerError> for LocalNodeError {
77 fn from(error: WorkerError) -> Self {
78 match error {
79 WorkerError::BlobsNotFound(blob_ids) => LocalNodeError::BlobsNotFound(blob_ids),
80 WorkerError::EventsNotFound(event_ids) => LocalNodeError::EventsNotFound(event_ids),
81 error => LocalNodeError::WorkerError(error),
82 }
83 }
84}
85
86impl<S> LocalNodeClient<S>
87where
88 S: Storage + Clone + 'static,
89{
90 #[instrument(level = "trace", skip_all)]
91 pub async fn handle_block_proposal(
92 &self,
93 proposal: BlockProposal,
94 ) -> Result<ChainInfoResponse, LocalNodeError> {
95 let (response, _actions) =
97 Box::pin(self.node.state.handle_block_proposal(proposal)).await?;
98 Ok(response)
99 }
100
101 #[instrument(level = "trace", skip_all)]
102 pub async fn handle_certificate<T>(
103 &self,
104 certificate: GenericCertificate<T>,
105 notifier: &impl Notifier,
106 ) -> Result<ChainInfoResponse, LocalNodeError>
107 where
108 T: ProcessableCertificate,
109 {
110 Ok(Box::pin(
111 self.node
112 .state
113 .fully_handle_certificate_with_notifications(certificate, notifier),
114 )
115 .await?)
116 }
117
118 #[instrument(level = "trace", skip_all)]
119 pub async fn handle_chain_info_query(
120 &self,
121 query: ChainInfoQuery,
122 ) -> Result<ChainInfoResponse, LocalNodeError> {
123 let (response, _actions) = self.node.state.handle_chain_info_query(query).await?;
125 Ok(response)
126 }
127
128 #[instrument(level = "trace", skip_all)]
129 pub fn new(state: WorkerState<S>) -> Self {
130 Self {
131 node: Arc::new(LocalNode { state }),
132 }
133 }
134
135 #[instrument(level = "trace", skip_all)]
136 pub(crate) fn storage_client(&self) -> S {
137 self.node.state.storage_client().clone()
138 }
139
140 #[instrument(level = "trace", skip_all)]
141 pub async fn stage_block_execution(
142 &self,
143 block: ProposedBlock,
144 round: Option<u32>,
145 published_blobs: Vec<Blob>,
146 ) -> Result<(Block, ChainInfoResponse, ResourceTracker), LocalNodeError> {
147 Ok(self
148 .node
149 .state
150 .stage_block_execution(block, round, published_blobs)
151 .await?)
152 }
153
154 #[instrument(level = "trace", skip_all)]
159 pub async fn stage_block_execution_with_policy(
160 &self,
161 block: ProposedBlock,
162 round: Option<u32>,
163 published_blobs: Vec<Blob>,
164 policy: BundleExecutionPolicy,
165 ) -> Result<(ProposedBlock, Block, ChainInfoResponse, ResourceTracker), LocalNodeError> {
166 Ok(self
167 .node
168 .state
169 .stage_block_execution_with_policy(block, round, published_blobs, policy)
170 .await?)
171 }
172
173 pub async fn read_blobs_from_storage(
175 &self,
176 blob_ids: &[BlobId],
177 ) -> Result<Option<Vec<Blob>>, LocalNodeError> {
178 let storage = self.storage_client();
179 Ok(storage.read_blobs(blob_ids).await?.into_iter().collect())
180 }
181
182 pub async fn read_blob_states_from_storage(
184 &self,
185 blob_ids: &[BlobId],
186 ) -> Result<Vec<BlobState>, LocalNodeError> {
187 let storage = self.storage_client();
188 let mut blobs_not_found = Vec::new();
189 let mut blob_states = Vec::new();
190 for (blob_state, blob_id) in storage
191 .read_blob_states(blob_ids)
192 .await?
193 .into_iter()
194 .zip(blob_ids)
195 {
196 match blob_state {
197 None => blobs_not_found.push(*blob_id),
198 Some(blob_state) => blob_states.push(blob_state),
199 }
200 }
201 if !blobs_not_found.is_empty() {
202 return Err(LocalNodeError::BlobsNotFound(blobs_not_found));
203 }
204 Ok(blob_states)
205 }
206
207 pub async fn get_locking_blobs(
210 &self,
211 blob_ids: impl IntoIterator<Item = &BlobId>,
212 chain_id: ChainId,
213 ) -> Result<Option<Vec<Blob>>, LocalNodeError> {
214 let blob_ids_vec: Vec<_> = blob_ids.into_iter().copied().collect();
215 Ok(self
216 .node
217 .state
218 .get_locking_blobs(chain_id, blob_ids_vec)
219 .await?)
220 }
221
222 pub async fn store_blobs(&self, blobs: &[Blob]) -> Result<(), LocalNodeError> {
224 let storage = self.storage_client();
225 storage.maybe_write_blobs(blobs).await?;
226 Ok(())
227 }
228
229 pub async fn handle_pending_blobs(
230 &self,
231 chain_id: ChainId,
232 blobs: Vec<Blob>,
233 ) -> Result<(), LocalNodeError> {
234 for blob in blobs {
235 self.node.state.handle_pending_blob(chain_id, blob).await?;
236 }
237 Ok(())
238 }
239
240 #[instrument(level = "trace", skip(self))]
246 pub async fn chain_state_view(
247 &self,
248 chain_id: ChainId,
249 ) -> Result<OwnedRwLockReadGuard<ChainStateView<S::Context>>, LocalNodeError> {
250 Ok(self.node.state.chain_state_view(chain_id).await?)
251 }
252
253 #[instrument(level = "trace", skip(self))]
254 pub(crate) async fn chain_info(
255 &self,
256 chain_id: ChainId,
257 ) -> Result<Box<ChainInfo>, LocalNodeError> {
258 let query = ChainInfoQuery::new(chain_id);
259 Ok(self.handle_chain_info_query(query).await?.info)
260 }
261
262 #[instrument(level = "trace", skip(self, query))]
263 pub async fn query_application(
264 &self,
265 chain_id: ChainId,
266 query: Query,
267 block_hash: Option<CryptoHash>,
268 ) -> Result<(QueryOutcome, BlockHeight), LocalNodeError> {
269 let result = self
270 .node
271 .state
272 .query_application(chain_id, query, block_hash)
273 .await?;
274 Ok(result)
275 }
276
277 #[instrument(level = "trace", skip(self))]
279 pub async fn retry_pending_cross_chain_requests(
280 &self,
281 sender_chain: ChainId,
282 ) -> Result<(), LocalNodeError> {
283 let (_response, actions) = self
284 .node
285 .state
286 .handle_chain_info_query(ChainInfoQuery::new(sender_chain).with_network_actions())
287 .await?;
288 let mut requests = VecDeque::from_iter(actions.cross_chain_requests);
289 while let Some(request) = requests.pop_front() {
290 let new_actions = self.node.state.handle_cross_chain_request(request).await?;
291 requests.extend(new_actions.cross_chain_requests);
292 }
293 Ok(())
294 }
295
296 pub async fn next_outbox_heights(
300 &self,
301 chain_ids: impl IntoIterator<Item = &ChainId>,
302 receiver_id: ChainId,
303 ) -> Result<BTreeMap<ChainId, BlockHeight>, LocalNodeError> {
304 let futures =
305 FuturesUnordered::from_iter(chain_ids.into_iter().map(|chain_id| async move {
306 let (next_block_height, next_height_to_schedule) = match self
307 .get_tip_state_and_outbox_info(*chain_id, receiver_id)
308 .await
309 {
310 Ok(info) => info,
311 Err(LocalNodeError::BlobsNotFound(_) | LocalNodeError::InactiveChain(_)) => {
312 return Ok((*chain_id, BlockHeight::ZERO))
313 }
314 Err(err) => Err(err)?,
315 };
316 let next_height = if let Some(scheduled_height) = next_height_to_schedule {
317 next_block_height.max(scheduled_height)
318 } else {
319 next_block_height
320 };
321 Ok::<_, LocalNodeError>((*chain_id, next_height))
322 }));
323 futures.try_collect().await
324 }
325
326 pub async fn update_received_certificate_trackers(
327 &self,
328 chain_id: ChainId,
329 new_trackers: BTreeMap<ValidatorPublicKey, u64>,
330 ) -> Result<(), LocalNodeError> {
331 self.node
332 .state
333 .update_received_certificate_trackers(chain_id, new_trackers)
334 .await?;
335 Ok(())
336 }
337
338 pub async fn get_preprocessed_block_hashes(
339 &self,
340 chain_id: ChainId,
341 start: BlockHeight,
342 end: BlockHeight,
343 ) -> Result<Vec<linera_base::crypto::CryptoHash>, LocalNodeError> {
344 Ok(self
345 .node
346 .state
347 .get_preprocessed_block_hashes(chain_id, start, end)
348 .await?)
349 }
350
351 pub async fn get_inbox_next_height(
352 &self,
353 chain_id: ChainId,
354 origin: ChainId,
355 ) -> Result<BlockHeight, LocalNodeError> {
356 Ok(self
357 .node
358 .state
359 .get_inbox_next_height(chain_id, origin)
360 .await?)
361 }
362
363 pub async fn get_block_hashes(
365 &self,
366 chain_id: ChainId,
367 heights: Vec<BlockHeight>,
368 ) -> Result<Vec<CryptoHash>, LocalNodeError> {
369 Ok(self.node.state.get_block_hashes(chain_id, heights).await?)
370 }
371
372 pub async fn get_proposed_blobs(
374 &self,
375 chain_id: ChainId,
376 blob_ids: Vec<BlobId>,
377 ) -> Result<Vec<Blob>, LocalNodeError> {
378 Ok(self
379 .node
380 .state
381 .get_proposed_blobs(chain_id, blob_ids)
382 .await?)
383 }
384
385 pub async fn get_event_subscriptions(
387 &self,
388 chain_id: ChainId,
389 ) -> Result<crate::worker::EventSubscriptionsResult, LocalNodeError> {
390 Ok(self.node.state.get_event_subscriptions(chain_id).await?)
391 }
392
393 pub async fn get_stream_event_count(
395 &self,
396 chain_id: ChainId,
397 stream_id: StreamId,
398 ) -> Result<Option<u32>, LocalNodeError> {
399 Ok(self
400 .node
401 .state
402 .get_stream_event_count(chain_id, stream_id)
403 .await?)
404 }
405
406 pub async fn get_received_certificate_trackers(
408 &self,
409 chain_id: ChainId,
410 ) -> Result<HashMap<ValidatorPublicKey, u64>, LocalNodeError> {
411 Ok(self
412 .node
413 .state
414 .get_received_certificate_trackers(chain_id)
415 .await?)
416 }
417
418 pub async fn get_tip_state_and_outbox_info(
420 &self,
421 chain_id: ChainId,
422 receiver_id: ChainId,
423 ) -> Result<(BlockHeight, Option<BlockHeight>), LocalNodeError> {
424 Ok(self
425 .node
426 .state
427 .get_tip_state_and_outbox_info(chain_id, receiver_id)
428 .await?)
429 }
430
431 pub async fn get_next_height_to_preprocess(
433 &self,
434 chain_id: ChainId,
435 ) -> Result<BlockHeight, LocalNodeError> {
436 Ok(self
437 .node
438 .state
439 .get_next_height_to_preprocess(chain_id)
440 .await?)
441 }
442
443 pub async fn get_manager_seed(&self, chain_id: ChainId) -> Result<u64, LocalNodeError> {
445 Ok(self.node.state.get_manager_seed(chain_id).await?)
446 }
447}
448
449pub trait LocalChainInfoExt {
452 fn into_committees(self) -> Result<BTreeMap<Epoch, Committee>, LocalNodeError>;
454
455 fn into_current_committee(self) -> Result<Committee, LocalNodeError>;
457
458 fn current_committee(&self) -> Result<&Committee, LocalNodeError>;
460}
461
462impl LocalChainInfoExt for ChainInfo {
463 fn into_committees(self) -> Result<BTreeMap<Epoch, Committee>, LocalNodeError> {
464 self.requested_committees
465 .ok_or(LocalNodeError::InvalidChainInfoResponse)
466 }
467
468 fn into_current_committee(self) -> Result<Committee, LocalNodeError> {
469 self.requested_committees
470 .ok_or(LocalNodeError::InvalidChainInfoResponse)?
471 .remove(&self.epoch)
472 .ok_or(LocalNodeError::InactiveChain(self.chain_id))
473 }
474
475 fn current_committee(&self) -> Result<&Committee, LocalNodeError> {
476 self.requested_committees
477 .as_ref()
478 .ok_or(LocalNodeError::InvalidChainInfoResponse)?
479 .get(&self.epoch)
480 .ok_or(LocalNodeError::InactiveChain(self.chain_id))
481 }
482}