lumina_node_wasm/
worker.rs

1//! Worker component
2
3use std::fmt::Debug;
4use std::time::Duration;
5
6use blockstore::EitherBlockstore;
7use celestia_types::Blob;
8use celestia_types::nmt::Namespace;
9use libp2p::{Multiaddr, PeerId};
10use serde::{Deserialize, Serialize};
11use serde_wasm_bindgen::to_value;
12use thiserror::Error;
13use tracing::{error, info, warn};
14use wasm_bindgen::prelude::*;
15use wasm_bindgen_futures::spawn_local;
16use web_sys::BroadcastChannel;
17
18use celestia_types::ExtendedHeader;
19use lumina_node::blockstore::{InMemoryBlockstore, IndexedDbBlockstore};
20use lumina_node::events::{EventSubscriber, NodeEventInfo};
21use lumina_node::node::{Node, SyncingInfo};
22use lumina_node::store::{EitherStore, InMemoryStore, IndexedDbStore, SamplingMetadata};
23
24use crate::client::WasmNodeConfig;
25use crate::commands::{NodeCommand, SingleHeaderQuery, WorkerResponse};
26use crate::error::{Context, Error, Result};
27use crate::ports::WorkerServer;
28use crate::utils::random_id;
29use crate::wrapper::libp2p::NetworkInfoSnapshot;
30
31pub(crate) type WasmBlockstore = EitherBlockstore<InMemoryBlockstore, IndexedDbBlockstore>;
32pub(crate) type WasmStore = EitherStore<InMemoryStore, IndexedDbStore>;
33
34/// Errors produced by `NodeWorker`
35#[derive(Debug, Serialize, Deserialize, Error)]
36pub enum WorkerError {
37    /// Worker is initialised, but the node has not been started yet. Use [`NodeDriver::start`].
38    #[error("node hasn't been started yet")]
39    NodeNotRunning,
40    /// Communication with worker has been broken and we're unable to send or receive messages from it.
41    /// Try creating new [`NodeDriver`] instance.
42    #[error("error trying to communicate with worker")]
43    WorkerCommunicationError(Error),
44    /// Worker received unrecognised command
45    #[error("invalid command received")]
46    InvalidCommandReceived,
47    /// Worker encountered error coming from lumina-node
48    #[error("Worker encountered an error: {0:?}")]
49    NodeError(Error),
50}
51
52/// `NodeWorker` is responsible for receiving commands from connected [`NodeClient`]s, executing
53/// them and sending a response back, as well as accepting new `NodeClient` connections.
54///
55/// [`NodeClient`]: crate::client::NodeClient
56#[wasm_bindgen]
57pub struct NodeWorker {
58    event_channel_name: String,
59    node: Option<NodeWorkerInstance>,
60    request_server: WorkerServer,
61}
62
63struct NodeWorkerInstance {
64    node: Node<WasmBlockstore, WasmStore>,
65    events_channel_name: String,
66}
67
68#[wasm_bindgen]
69impl NodeWorker {
70    /// Create a new `NodeWorker` with a port-like JS object.
71    #[wasm_bindgen(constructor)]
72    pub fn new(port_like_object: JsValue) -> Self {
73        info!("Created lumina worker");
74
75        let request_server = WorkerServer::new();
76        let port_channel = request_server.get_port_channel();
77
78        port_channel
79            .send(port_like_object)
80            .expect("control channel should be ready to receive now");
81
82        Self {
83            event_channel_name: format!("NodeEventChannel-{}", random_id()),
84            node: None,
85            request_server,
86        }
87    }
88
89    /// Run `NodeWorker` main loop.
90    pub async fn run(&mut self) -> Result<(), Error> {
91        loop {
92            let (command, responder) = self.request_server.recv().await?;
93
94            // StopNode needs special handling because `NodeWorkerInstance` needs to be consumed.
95            if matches!(&command, NodeCommand::StopNode)
96                && let Some(node) = self.node.take()
97            {
98                node.stop().await;
99                if responder.send(WorkerResponse::NodeStopped(())).is_err() {
100                    error!("Failed to send response: channel dropped");
101                }
102                continue;
103            }
104
105            let response = match &mut self.node {
106                Some(node) => node.process_command(command).await,
107                node @ None => match command {
108                    NodeCommand::InternalPing => WorkerResponse::InternalPong,
109                    NodeCommand::IsRunning => WorkerResponse::IsRunning(false),
110                    NodeCommand::GetEventsChannelName => {
111                        WorkerResponse::EventsChannelName(self.event_channel_name.clone())
112                    }
113                    NodeCommand::StartNode(config) => {
114                        match NodeWorkerInstance::new(&self.event_channel_name, config).await {
115                            Ok(instance) => {
116                                let _ = node.insert(instance);
117                                WorkerResponse::NodeStarted(Ok(()))
118                            }
119                            Err(e) => WorkerResponse::NodeStarted(Err(e)),
120                        }
121                    }
122                    _ => {
123                        warn!("Worker not running");
124                        WorkerResponse::NodeNotRunning
125                    }
126                },
127            };
128            if responder.send(response).is_err() {
129                error!("Failed to send response: channel dropped");
130            }
131        }
132    }
133}
134
135impl NodeWorkerInstance {
136    async fn new(events_channel_name: &str, config: WasmNodeConfig) -> Result<Self> {
137        let builder = config.into_node_builder().await?;
138        let (node, events_sub) = builder.start_subscribed().await?;
139
140        let events_channel = BroadcastChannel::new(events_channel_name)
141            .context("Failed to allocate BroadcastChannel")?;
142
143        spawn_local(event_forwarder_task(events_sub, events_channel));
144
145        Ok(Self {
146            node,
147            events_channel_name: events_channel_name.to_owned(),
148        })
149    }
150
151    async fn stop(self) {
152        self.node.stop().await;
153    }
154
155    async fn get_syncer_info(&mut self) -> Result<SyncingInfo> {
156        Ok(self.node.syncer_info().await?)
157    }
158
159    async fn get_network_info(&mut self) -> Result<NetworkInfoSnapshot> {
160        Ok(self.node.network_info().await?.into())
161    }
162
163    async fn set_peer_trust(&mut self, peer_id: PeerId, is_trusted: bool) -> Result<()> {
164        Ok(self.node.set_peer_trust(peer_id, is_trusted).await?)
165    }
166
167    async fn get_connected_peers(&mut self) -> Result<Vec<String>> {
168        Ok(self
169            .node
170            .connected_peers()
171            .await?
172            .iter()
173            .map(|id| id.to_string())
174            .collect())
175    }
176
177    async fn get_listeners(&mut self) -> Result<Vec<Multiaddr>> {
178        Ok(self.node.listeners().await?)
179    }
180
181    async fn wait_connected(&mut self, trusted: bool) -> Result<()> {
182        if trusted {
183            self.node.wait_connected_trusted().await?;
184        } else {
185            self.node.wait_connected().await?;
186        }
187        Ok(())
188    }
189
190    async fn request_header(&mut self, query: SingleHeaderQuery) -> Result<ExtendedHeader> {
191        Ok(match query {
192            SingleHeaderQuery::Head => self.node.request_head_header().await,
193            SingleHeaderQuery::ByHash(hash) => self.node.request_header_by_hash(&hash).await,
194            SingleHeaderQuery::ByHeight(height) => self.node.request_header_by_height(height).await,
195        }?)
196    }
197
198    async fn get_header(&mut self, query: SingleHeaderQuery) -> Result<ExtendedHeader> {
199        Ok(match query {
200            SingleHeaderQuery::Head => self.node.get_local_head_header().await,
201            SingleHeaderQuery::ByHash(hash) => self.node.get_header_by_hash(&hash).await,
202            SingleHeaderQuery::ByHeight(height) => self.node.get_header_by_height(height).await,
203        }?)
204    }
205
206    async fn get_verified_headers(
207        &mut self,
208        from: ExtendedHeader,
209        amount: u64,
210    ) -> Result<Vec<ExtendedHeader>> {
211        Ok(self.node.request_verified_headers(&from, amount).await?)
212    }
213
214    async fn get_headers_range(
215        &mut self,
216        start_height: Option<u64>,
217        end_height: Option<u64>,
218    ) -> Result<Vec<ExtendedHeader>> {
219        Ok(match (start_height, end_height) {
220            (None, None) => self.node.get_headers(..).await,
221            (Some(start), None) => self.node.get_headers(start..).await,
222            (None, Some(end)) => self.node.get_headers(..=end).await,
223            (Some(start), Some(end)) => self.node.get_headers(start..=end).await,
224        }?)
225    }
226
227    async fn get_last_seen_network_head(&mut self) -> Result<Option<ExtendedHeader>> {
228        Ok(self.node.get_network_head_header().await?)
229    }
230
231    async fn get_sampling_metadata(&mut self, height: u64) -> Result<Option<SamplingMetadata>> {
232        Ok(self.node.get_sampling_metadata(height).await?)
233    }
234
235    async fn request_all_blobs(
236        &mut self,
237        namespace: Namespace,
238        block_height: u64,
239        timeout_secs: Option<f64>,
240    ) -> Result<Vec<Blob>> {
241        let timeout = timeout_secs.map(Duration::from_secs_f64);
242        Ok(self
243            .node
244            .request_all_blobs(namespace, block_height, timeout)
245            .await?)
246    }
247
248    async fn process_command(&mut self, command: NodeCommand) -> WorkerResponse {
249        match command {
250            NodeCommand::IsRunning => WorkerResponse::IsRunning(true),
251            NodeCommand::StartNode(_) => {
252                WorkerResponse::NodeStarted(Err(Error::new("Node already started")))
253            }
254            NodeCommand::StopNode => unreachable!("StopNode is handled in `run()`"),
255            NodeCommand::GetLocalPeerId => {
256                WorkerResponse::LocalPeerId(self.node.local_peer_id().to_string())
257            }
258            NodeCommand::GetEventsChannelName => {
259                WorkerResponse::EventsChannelName(self.events_channel_name.clone())
260            }
261            NodeCommand::GetSyncerInfo => WorkerResponse::SyncerInfo(self.get_syncer_info().await),
262            NodeCommand::GetPeerTrackerInfo => {
263                let peer_tracker_info = self.node.peer_tracker_info();
264                WorkerResponse::PeerTrackerInfo(peer_tracker_info)
265            }
266            NodeCommand::GetNetworkInfo => {
267                WorkerResponse::NetworkInfo(self.get_network_info().await)
268            }
269            NodeCommand::GetConnectedPeers => {
270                WorkerResponse::ConnectedPeers(self.get_connected_peers().await)
271            }
272            NodeCommand::SetPeerTrust {
273                peer_id,
274                is_trusted,
275            } => WorkerResponse::SetPeerTrust(self.set_peer_trust(peer_id, is_trusted).await),
276            NodeCommand::WaitConnected { trusted } => {
277                WorkerResponse::Connected(self.wait_connected(trusted).await)
278            }
279            NodeCommand::GetListeners => WorkerResponse::Listeners(self.get_listeners().await),
280            NodeCommand::RequestHeader(query) => {
281                WorkerResponse::Header(self.request_header(query).await)
282            }
283            NodeCommand::GetHeader(query) => WorkerResponse::Header(self.get_header(query).await),
284            NodeCommand::GetVerifiedHeaders { from, amount } => {
285                WorkerResponse::Headers(self.get_verified_headers(from, amount).await)
286            }
287            NodeCommand::GetHeadersRange {
288                start_height,
289                end_height,
290            } => WorkerResponse::Headers(self.get_headers_range(start_height, end_height).await),
291            NodeCommand::LastSeenNetworkHead => {
292                WorkerResponse::LastSeenNetworkHead(self.get_last_seen_network_head().await)
293            }
294            NodeCommand::GetSamplingMetadata { height } => {
295                WorkerResponse::SamplingMetadata(self.get_sampling_metadata(height).await)
296            }
297            NodeCommand::RequestAllBlobs {
298                namespace,
299                block_height,
300                timeout_secs,
301            } => WorkerResponse::Blobs(
302                self.request_all_blobs(namespace, block_height, timeout_secs)
303                    .await,
304            ),
305            NodeCommand::InternalPing => WorkerResponse::InternalPong,
306        }
307    }
308}
309
310async fn event_forwarder_task(mut events_sub: EventSubscriber, events_channel: BroadcastChannel) {
311    #[derive(Serialize)]
312    struct Event {
313        message: String,
314        is_error: bool,
315        #[serde(flatten)]
316        info: NodeEventInfo,
317    }
318
319    while let Ok(ev) = events_sub.recv().await {
320        let ev = Event {
321            message: ev.event.to_string(),
322            is_error: ev.event.is_error(),
323            info: ev,
324        };
325
326        if let Ok(val) = to_value(&ev)
327            && events_channel.post_message(&val).is_err()
328        {
329            break;
330        }
331    }
332}