1use std::{
6 collections::{BTreeMap, HashMap, VecDeque},
7 sync::Arc,
8};
9
10use futures::{stream::FuturesUnordered, TryStreamExt as _};
11use linera_base::{
12 crypto::{CryptoHash, ValidatorPublicKey},
13 data_types::{ArithmeticError, Blob, BlockHeight, Epoch},
14 identifiers::{BlobId, ChainId, StreamId},
15};
16use linera_chain::{
17 data_types::{BlockProposal, BundleExecutionPolicy, ProposedBlock},
18 types::{Block, GenericCertificate},
19 ChainStateView,
20};
21use linera_execution::{committee::Committee, BlobState, Query, QueryOutcome, ResourceTracker};
22use linera_storage::Storage;
23use linera_views::ViewError;
24use thiserror::Error;
25use tokio::sync::OwnedRwLockReadGuard;
26use tracing::{instrument, warn};
27
28use crate::{
29 data_types::{ChainInfo, ChainInfoQuery, ChainInfoResponse},
30 notifier::Notifier,
31 worker::{ProcessableCertificate, WorkerError, WorkerState},
32};
33
34pub struct LocalNode<S>
36where
37 S: Storage,
38{
39 state: WorkerState<S>,
40}
41
42#[derive(Clone)]
44pub struct LocalNodeClient<S>
45where
46 S: Storage,
47{
48 node: Arc<LocalNode<S>>,
49}
50
51#[derive(Debug, Error)]
53pub enum LocalNodeError {
54 #[error(transparent)]
55 ArithmeticError(#[from] ArithmeticError),
56
57 #[error(transparent)]
58 ViewError(#[from] ViewError),
59
60 #[error("Worker operation failed: {0}")]
61 WorkerError(WorkerError),
62
63 #[error("The local node doesn't have an active chain {0}")]
64 InactiveChain(ChainId),
65
66 #[error("The chain info response received from the local node is invalid")]
67 InvalidChainInfoResponse,
68
69 #[error("Blobs not found: {0:?}")]
70 BlobsNotFound(Vec<BlobId>),
71}
72
73impl From<WorkerError> for LocalNodeError {
74 fn from(error: WorkerError) -> Self {
75 match error {
76 WorkerError::BlobsNotFound(blob_ids) => LocalNodeError::BlobsNotFound(blob_ids),
77 error => LocalNodeError::WorkerError(error),
78 }
79 }
80}
81
82impl<S> LocalNodeClient<S>
83where
84 S: Storage + Clone + 'static,
85{
86 #[instrument(level = "trace", skip_all)]
87 pub async fn handle_block_proposal(
88 &self,
89 proposal: BlockProposal,
90 ) -> Result<ChainInfoResponse, LocalNodeError> {
91 let (response, _actions) =
93 Box::pin(self.node.state.handle_block_proposal(proposal)).await?;
94 Ok(response)
95 }
96
97 #[instrument(level = "trace", skip_all)]
98 pub async fn handle_certificate<T>(
99 &self,
100 certificate: GenericCertificate<T>,
101 notifier: &impl Notifier,
102 ) -> Result<ChainInfoResponse, LocalNodeError>
103 where
104 T: ProcessableCertificate,
105 {
106 Ok(Box::pin(
107 self.node
108 .state
109 .fully_handle_certificate_with_notifications(certificate, notifier),
110 )
111 .await?)
112 }
113
114 #[instrument(level = "trace", skip_all)]
115 pub async fn handle_chain_info_query(
116 &self,
117 query: ChainInfoQuery,
118 ) -> Result<ChainInfoResponse, LocalNodeError> {
119 let (response, _actions) = self.node.state.handle_chain_info_query(query).await?;
121 Ok(response)
122 }
123
124 #[instrument(level = "trace", skip_all)]
125 pub fn new(state: WorkerState<S>) -> Self {
126 Self {
127 node: Arc::new(LocalNode { state }),
128 }
129 }
130
131 #[instrument(level = "trace", skip_all)]
132 pub(crate) fn storage_client(&self) -> S {
133 self.node.state.storage_client().clone()
134 }
135
136 #[instrument(level = "trace", skip_all)]
137 pub async fn stage_block_execution(
138 &self,
139 block: ProposedBlock,
140 round: Option<u32>,
141 published_blobs: Vec<Blob>,
142 ) -> Result<(Block, ChainInfoResponse, ResourceTracker), LocalNodeError> {
143 Ok(self
144 .node
145 .state
146 .stage_block_execution(block, round, published_blobs)
147 .await?)
148 }
149
150 #[instrument(level = "trace", skip_all)]
155 pub async fn stage_block_execution_with_policy(
156 &self,
157 block: ProposedBlock,
158 round: Option<u32>,
159 published_blobs: Vec<Blob>,
160 policy: BundleExecutionPolicy,
161 ) -> Result<(ProposedBlock, Block, ChainInfoResponse, ResourceTracker), LocalNodeError> {
162 Ok(self
163 .node
164 .state
165 .stage_block_execution_with_policy(block, round, published_blobs, policy)
166 .await?)
167 }
168
169 pub async fn read_blobs_from_storage(
171 &self,
172 blob_ids: &[BlobId],
173 ) -> Result<Option<Vec<Blob>>, LocalNodeError> {
174 let storage = self.storage_client();
175 Ok(storage.read_blobs(blob_ids).await?.into_iter().collect())
176 }
177
178 pub async fn read_blob_states_from_storage(
180 &self,
181 blob_ids: &[BlobId],
182 ) -> Result<Vec<BlobState>, LocalNodeError> {
183 let storage = self.storage_client();
184 let mut blobs_not_found = Vec::new();
185 let mut blob_states = Vec::new();
186 for (blob_state, blob_id) in storage
187 .read_blob_states(blob_ids)
188 .await?
189 .into_iter()
190 .zip(blob_ids)
191 {
192 match blob_state {
193 None => blobs_not_found.push(*blob_id),
194 Some(blob_state) => blob_states.push(blob_state),
195 }
196 }
197 if !blobs_not_found.is_empty() {
198 return Err(LocalNodeError::BlobsNotFound(blobs_not_found));
199 }
200 Ok(blob_states)
201 }
202
203 pub async fn get_locking_blobs(
206 &self,
207 blob_ids: impl IntoIterator<Item = &BlobId>,
208 chain_id: ChainId,
209 ) -> Result<Option<Vec<Blob>>, LocalNodeError> {
210 let blob_ids_vec: Vec<_> = blob_ids.into_iter().copied().collect();
211 Ok(self
212 .node
213 .state
214 .get_locking_blobs(chain_id, blob_ids_vec)
215 .await?)
216 }
217
218 pub async fn store_blobs(&self, blobs: &[Blob]) -> Result<(), LocalNodeError> {
220 let storage = self.storage_client();
221 storage.maybe_write_blobs(blobs).await?;
222 Ok(())
223 }
224
225 pub async fn handle_pending_blobs(
226 &self,
227 chain_id: ChainId,
228 blobs: Vec<Blob>,
229 ) -> Result<(), LocalNodeError> {
230 for blob in blobs {
231 self.node.state.handle_pending_blob(chain_id, blob).await?;
232 }
233 Ok(())
234 }
235
236 #[instrument(level = "trace", skip(self))]
242 pub async fn chain_state_view(
243 &self,
244 chain_id: ChainId,
245 ) -> Result<OwnedRwLockReadGuard<ChainStateView<S::Context>>, LocalNodeError> {
246 Ok(self.node.state.chain_state_view(chain_id).await?)
247 }
248
249 #[instrument(level = "trace", skip(self))]
250 pub(crate) async fn chain_info(
251 &self,
252 chain_id: ChainId,
253 ) -> Result<Box<ChainInfo>, LocalNodeError> {
254 let query = ChainInfoQuery::new(chain_id);
255 Ok(self.handle_chain_info_query(query).await?.info)
256 }
257
258 #[instrument(level = "trace", skip(self, query))]
259 pub async fn query_application(
260 &self,
261 chain_id: ChainId,
262 query: Query,
263 block_hash: Option<CryptoHash>,
264 ) -> Result<QueryOutcome, LocalNodeError> {
265 let outcome = self
266 .node
267 .state
268 .query_application(chain_id, query, block_hash)
269 .await?;
270 Ok(outcome)
271 }
272
273 #[instrument(level = "trace", skip(self))]
275 pub async fn retry_pending_cross_chain_requests(
276 &self,
277 sender_chain: ChainId,
278 ) -> Result<(), LocalNodeError> {
279 let (_response, actions) = self
280 .node
281 .state
282 .handle_chain_info_query(ChainInfoQuery::new(sender_chain).with_network_actions())
283 .await?;
284 let mut requests = VecDeque::from_iter(actions.cross_chain_requests);
285 while let Some(request) = requests.pop_front() {
286 let new_actions = self.node.state.handle_cross_chain_request(request).await?;
287 requests.extend(new_actions.cross_chain_requests);
288 }
289 Ok(())
290 }
291
292 pub async fn next_outbox_heights(
296 &self,
297 chain_ids: impl IntoIterator<Item = &ChainId>,
298 receiver_id: ChainId,
299 ) -> Result<BTreeMap<ChainId, BlockHeight>, LocalNodeError> {
300 let futures =
301 FuturesUnordered::from_iter(chain_ids.into_iter().map(|chain_id| async move {
302 let (next_block_height, next_height_to_schedule) = match self
303 .get_tip_state_and_outbox_info(*chain_id, receiver_id)
304 .await
305 {
306 Ok(info) => info,
307 Err(LocalNodeError::BlobsNotFound(_) | LocalNodeError::InactiveChain(_)) => {
308 return Ok((*chain_id, BlockHeight::ZERO))
309 }
310 Err(err) => Err(err)?,
311 };
312 let next_height = if let Some(scheduled_height) = next_height_to_schedule {
313 next_block_height.max(scheduled_height)
314 } else {
315 next_block_height
316 };
317 Ok::<_, LocalNodeError>((*chain_id, next_height))
318 }));
319 futures.try_collect().await
320 }
321
322 pub async fn update_received_certificate_trackers(
323 &self,
324 chain_id: ChainId,
325 new_trackers: BTreeMap<ValidatorPublicKey, u64>,
326 ) -> Result<(), LocalNodeError> {
327 self.node
328 .state
329 .update_received_certificate_trackers(chain_id, new_trackers)
330 .await?;
331 Ok(())
332 }
333
334 pub async fn get_preprocessed_block_hashes(
335 &self,
336 chain_id: ChainId,
337 start: BlockHeight,
338 end: BlockHeight,
339 ) -> Result<Vec<linera_base::crypto::CryptoHash>, LocalNodeError> {
340 Ok(self
341 .node
342 .state
343 .get_preprocessed_block_hashes(chain_id, start, end)
344 .await?)
345 }
346
347 pub async fn get_inbox_next_height(
348 &self,
349 chain_id: ChainId,
350 origin: ChainId,
351 ) -> Result<BlockHeight, LocalNodeError> {
352 Ok(self
353 .node
354 .state
355 .get_inbox_next_height(chain_id, origin)
356 .await?)
357 }
358
359 pub async fn get_block_hashes(
361 &self,
362 chain_id: ChainId,
363 heights: Vec<BlockHeight>,
364 ) -> Result<Vec<CryptoHash>, LocalNodeError> {
365 Ok(self.node.state.get_block_hashes(chain_id, heights).await?)
366 }
367
368 pub async fn get_proposed_blobs(
370 &self,
371 chain_id: ChainId,
372 blob_ids: Vec<BlobId>,
373 ) -> Result<Vec<Blob>, LocalNodeError> {
374 Ok(self
375 .node
376 .state
377 .get_proposed_blobs(chain_id, blob_ids)
378 .await?)
379 }
380
381 pub async fn get_event_subscriptions(
383 &self,
384 chain_id: ChainId,
385 ) -> Result<crate::worker::EventSubscriptionsResult, LocalNodeError> {
386 Ok(self.node.state.get_event_subscriptions(chain_id).await?)
387 }
388
389 pub async fn get_stream_event_count(
391 &self,
392 chain_id: ChainId,
393 stream_id: StreamId,
394 ) -> Result<Option<u32>, LocalNodeError> {
395 Ok(self
396 .node
397 .state
398 .get_stream_event_count(chain_id, stream_id)
399 .await?)
400 }
401
402 pub async fn get_received_certificate_trackers(
404 &self,
405 chain_id: ChainId,
406 ) -> Result<HashMap<ValidatorPublicKey, u64>, LocalNodeError> {
407 Ok(self
408 .node
409 .state
410 .get_received_certificate_trackers(chain_id)
411 .await?)
412 }
413
414 pub async fn get_tip_state_and_outbox_info(
416 &self,
417 chain_id: ChainId,
418 receiver_id: ChainId,
419 ) -> Result<(BlockHeight, Option<BlockHeight>), LocalNodeError> {
420 Ok(self
421 .node
422 .state
423 .get_tip_state_and_outbox_info(chain_id, receiver_id)
424 .await?)
425 }
426
427 pub async fn get_next_height_to_preprocess(
429 &self,
430 chain_id: ChainId,
431 ) -> Result<BlockHeight, LocalNodeError> {
432 Ok(self
433 .node
434 .state
435 .get_next_height_to_preprocess(chain_id)
436 .await?)
437 }
438
439 pub async fn get_manager_seed(&self, chain_id: ChainId) -> Result<u64, LocalNodeError> {
441 Ok(self.node.state.get_manager_seed(chain_id).await?)
442 }
443}
444
445pub trait LocalChainInfoExt {
448 fn into_committees(self) -> Result<BTreeMap<Epoch, Committee>, LocalNodeError>;
450
451 fn into_current_committee(self) -> Result<Committee, LocalNodeError>;
453
454 fn current_committee(&self) -> Result<&Committee, LocalNodeError>;
456}
457
458impl LocalChainInfoExt for ChainInfo {
459 fn into_committees(self) -> Result<BTreeMap<Epoch, Committee>, LocalNodeError> {
460 self.requested_committees
461 .ok_or(LocalNodeError::InvalidChainInfoResponse)
462 }
463
464 fn into_current_committee(self) -> Result<Committee, LocalNodeError> {
465 self.requested_committees
466 .ok_or(LocalNodeError::InvalidChainInfoResponse)?
467 .remove(&self.epoch)
468 .ok_or(LocalNodeError::InactiveChain(self.chain_id))
469 }
470
471 fn current_committee(&self) -> Result<&Committee, LocalNodeError> {
472 self.requested_committees
473 .as_ref()
474 .ok_or(LocalNodeError::InvalidChainInfoResponse)?
475 .get(&self.epoch)
476 .ok_or(LocalNodeError::InactiveChain(self.chain_id))
477 }
478}