1use std::fmt::Debug;
4use std::time::Duration;
5
6use blockstore::EitherBlockstore;
7use celestia_types::Blob;
8use celestia_types::nmt::Namespace;
9use libp2p::{Multiaddr, PeerId};
10use serde::{Deserialize, Serialize};
11use serde_wasm_bindgen::to_value;
12use thiserror::Error;
13use tracing::{error, info, warn};
14use wasm_bindgen::prelude::*;
15use wasm_bindgen_futures::spawn_local;
16use web_sys::BroadcastChannel;
17
18use celestia_types::ExtendedHeader;
19use lumina_node::blockstore::{InMemoryBlockstore, IndexedDbBlockstore};
20use lumina_node::events::{EventSubscriber, NodeEventInfo};
21use lumina_node::node::{Node, SyncingInfo};
22use lumina_node::store::{EitherStore, InMemoryStore, IndexedDbStore, SamplingMetadata};
23
24use crate::client::WasmNodeConfig;
25use crate::commands::{NodeCommand, SingleHeaderQuery, WorkerResponse};
26use crate::error::{Context, Error, Result};
27use crate::ports::WorkerServer;
28use crate::utils::random_id;
29use crate::wrapper::libp2p::NetworkInfoSnapshot;
30
31pub(crate) type WasmBlockstore = EitherBlockstore<InMemoryBlockstore, IndexedDbBlockstore>;
32pub(crate) type WasmStore = EitherStore<InMemoryStore, IndexedDbStore>;
33
34#[derive(Debug, Serialize, Deserialize, Error)]
36pub enum WorkerError {
37 #[error("node hasn't been started yet")]
39 NodeNotRunning,
40 #[error("error trying to communicate with worker")]
43 WorkerCommunicationError(Error),
44 #[error("invalid command received")]
46 InvalidCommandReceived,
47 #[error("Worker encountered an error: {0:?}")]
49 NodeError(Error),
50}
51
52#[wasm_bindgen]
57pub struct NodeWorker {
58 event_channel_name: String,
59 node: Option<NodeWorkerInstance>,
60 request_server: WorkerServer,
61}
62
63struct NodeWorkerInstance {
64 node: Node<WasmBlockstore, WasmStore>,
65 events_channel_name: String,
66}
67
68#[wasm_bindgen]
69impl NodeWorker {
70 #[wasm_bindgen(constructor)]
72 pub fn new(port_like_object: JsValue) -> Self {
73 info!("Created lumina worker");
74
75 let request_server = WorkerServer::new();
76 let port_channel = request_server.get_port_channel();
77
78 port_channel
79 .send(port_like_object)
80 .expect("control channel should be ready to receive now");
81
82 Self {
83 event_channel_name: format!("NodeEventChannel-{}", random_id()),
84 node: None,
85 request_server,
86 }
87 }
88
89 pub async fn run(&mut self) -> Result<(), Error> {
91 loop {
92 let (command, responder) = self.request_server.recv().await?;
93
94 if matches!(&command, NodeCommand::StopNode)
96 && let Some(node) = self.node.take()
97 {
98 node.stop().await;
99 if responder.send(WorkerResponse::NodeStopped(())).is_err() {
100 error!("Failed to send response: channel dropped");
101 }
102 continue;
103 }
104
105 let response = match &mut self.node {
106 Some(node) => node.process_command(command).await,
107 node @ None => match command {
108 NodeCommand::InternalPing => WorkerResponse::InternalPong,
109 NodeCommand::IsRunning => WorkerResponse::IsRunning(false),
110 NodeCommand::GetEventsChannelName => {
111 WorkerResponse::EventsChannelName(self.event_channel_name.clone())
112 }
113 NodeCommand::StartNode(config) => {
114 match NodeWorkerInstance::new(&self.event_channel_name, config).await {
115 Ok(instance) => {
116 let _ = node.insert(instance);
117 WorkerResponse::NodeStarted(Ok(()))
118 }
119 Err(e) => WorkerResponse::NodeStarted(Err(e)),
120 }
121 }
122 _ => {
123 warn!("Worker not running");
124 WorkerResponse::NodeNotRunning
125 }
126 },
127 };
128 if responder.send(response).is_err() {
129 error!("Failed to send response: channel dropped");
130 }
131 }
132 }
133}
134
135impl NodeWorkerInstance {
136 async fn new(events_channel_name: &str, config: WasmNodeConfig) -> Result<Self> {
137 let builder = config.into_node_builder().await?;
138 let (node, events_sub) = builder.start_subscribed().await?;
139
140 let events_channel = BroadcastChannel::new(events_channel_name)
141 .context("Failed to allocate BroadcastChannel")?;
142
143 spawn_local(event_forwarder_task(events_sub, events_channel));
144
145 Ok(Self {
146 node,
147 events_channel_name: events_channel_name.to_owned(),
148 })
149 }
150
151 async fn stop(self) {
152 self.node.stop().await;
153 }
154
155 async fn get_syncer_info(&mut self) -> Result<SyncingInfo> {
156 Ok(self.node.syncer_info().await?)
157 }
158
159 async fn get_network_info(&mut self) -> Result<NetworkInfoSnapshot> {
160 Ok(self.node.network_info().await?.into())
161 }
162
163 async fn set_peer_trust(&mut self, peer_id: PeerId, is_trusted: bool) -> Result<()> {
164 Ok(self.node.set_peer_trust(peer_id, is_trusted).await?)
165 }
166
167 async fn get_connected_peers(&mut self) -> Result<Vec<String>> {
168 Ok(self
169 .node
170 .connected_peers()
171 .await?
172 .iter()
173 .map(|id| id.to_string())
174 .collect())
175 }
176
177 async fn get_listeners(&mut self) -> Result<Vec<Multiaddr>> {
178 Ok(self.node.listeners().await?)
179 }
180
181 async fn wait_connected(&mut self, trusted: bool) -> Result<()> {
182 if trusted {
183 self.node.wait_connected_trusted().await?;
184 } else {
185 self.node.wait_connected().await?;
186 }
187 Ok(())
188 }
189
190 async fn request_header(&mut self, query: SingleHeaderQuery) -> Result<ExtendedHeader> {
191 Ok(match query {
192 SingleHeaderQuery::Head => self.node.request_head_header().await,
193 SingleHeaderQuery::ByHash(hash) => self.node.request_header_by_hash(&hash).await,
194 SingleHeaderQuery::ByHeight(height) => self.node.request_header_by_height(height).await,
195 }?)
196 }
197
198 async fn get_header(&mut self, query: SingleHeaderQuery) -> Result<ExtendedHeader> {
199 Ok(match query {
200 SingleHeaderQuery::Head => self.node.get_local_head_header().await,
201 SingleHeaderQuery::ByHash(hash) => self.node.get_header_by_hash(&hash).await,
202 SingleHeaderQuery::ByHeight(height) => self.node.get_header_by_height(height).await,
203 }?)
204 }
205
206 async fn get_verified_headers(
207 &mut self,
208 from: ExtendedHeader,
209 amount: u64,
210 ) -> Result<Vec<ExtendedHeader>> {
211 Ok(self.node.request_verified_headers(&from, amount).await?)
212 }
213
214 async fn get_headers_range(
215 &mut self,
216 start_height: Option<u64>,
217 end_height: Option<u64>,
218 ) -> Result<Vec<ExtendedHeader>> {
219 Ok(match (start_height, end_height) {
220 (None, None) => self.node.get_headers(..).await,
221 (Some(start), None) => self.node.get_headers(start..).await,
222 (None, Some(end)) => self.node.get_headers(..=end).await,
223 (Some(start), Some(end)) => self.node.get_headers(start..=end).await,
224 }?)
225 }
226
227 async fn get_last_seen_network_head(&mut self) -> Result<Option<ExtendedHeader>> {
228 Ok(self.node.get_network_head_header().await?)
229 }
230
231 async fn get_sampling_metadata(&mut self, height: u64) -> Result<Option<SamplingMetadata>> {
232 Ok(self.node.get_sampling_metadata(height).await?)
233 }
234
235 async fn request_all_blobs(
236 &mut self,
237 namespace: Namespace,
238 block_height: u64,
239 timeout_secs: Option<f64>,
240 ) -> Result<Vec<Blob>> {
241 let timeout = timeout_secs.map(Duration::from_secs_f64);
242 Ok(self
243 .node
244 .request_all_blobs(namespace, block_height, timeout)
245 .await?)
246 }
247
248 async fn process_command(&mut self, command: NodeCommand) -> WorkerResponse {
249 match command {
250 NodeCommand::IsRunning => WorkerResponse::IsRunning(true),
251 NodeCommand::StartNode(_) => {
252 WorkerResponse::NodeStarted(Err(Error::new("Node already started")))
253 }
254 NodeCommand::StopNode => unreachable!("StopNode is handled in `run()`"),
255 NodeCommand::GetLocalPeerId => {
256 WorkerResponse::LocalPeerId(self.node.local_peer_id().to_string())
257 }
258 NodeCommand::GetEventsChannelName => {
259 WorkerResponse::EventsChannelName(self.events_channel_name.clone())
260 }
261 NodeCommand::GetSyncerInfo => WorkerResponse::SyncerInfo(self.get_syncer_info().await),
262 NodeCommand::GetPeerTrackerInfo => {
263 let peer_tracker_info = self.node.peer_tracker_info();
264 WorkerResponse::PeerTrackerInfo(peer_tracker_info)
265 }
266 NodeCommand::GetNetworkInfo => {
267 WorkerResponse::NetworkInfo(self.get_network_info().await)
268 }
269 NodeCommand::GetConnectedPeers => {
270 WorkerResponse::ConnectedPeers(self.get_connected_peers().await)
271 }
272 NodeCommand::SetPeerTrust {
273 peer_id,
274 is_trusted,
275 } => WorkerResponse::SetPeerTrust(self.set_peer_trust(peer_id, is_trusted).await),
276 NodeCommand::WaitConnected { trusted } => {
277 WorkerResponse::Connected(self.wait_connected(trusted).await)
278 }
279 NodeCommand::GetListeners => WorkerResponse::Listeners(self.get_listeners().await),
280 NodeCommand::RequestHeader(query) => {
281 WorkerResponse::Header(self.request_header(query).await)
282 }
283 NodeCommand::GetHeader(query) => WorkerResponse::Header(self.get_header(query).await),
284 NodeCommand::GetVerifiedHeaders { from, amount } => {
285 WorkerResponse::Headers(self.get_verified_headers(from, amount).await)
286 }
287 NodeCommand::GetHeadersRange {
288 start_height,
289 end_height,
290 } => WorkerResponse::Headers(self.get_headers_range(start_height, end_height).await),
291 NodeCommand::LastSeenNetworkHead => {
292 WorkerResponse::LastSeenNetworkHead(self.get_last_seen_network_head().await)
293 }
294 NodeCommand::GetSamplingMetadata { height } => {
295 WorkerResponse::SamplingMetadata(self.get_sampling_metadata(height).await)
296 }
297 NodeCommand::RequestAllBlobs {
298 namespace,
299 block_height,
300 timeout_secs,
301 } => WorkerResponse::Blobs(
302 self.request_all_blobs(namespace, block_height, timeout_secs)
303 .await,
304 ),
305 NodeCommand::InternalPing => WorkerResponse::InternalPong,
306 }
307 }
308}
309
310async fn event_forwarder_task(mut events_sub: EventSubscriber, events_channel: BroadcastChannel) {
311 #[derive(Serialize)]
312 struct Event {
313 message: String,
314 is_error: bool,
315 #[serde(flatten)]
316 info: NodeEventInfo,
317 }
318
319 while let Ok(ev) = events_sub.recv().await {
320 let ev = Event {
321 message: ev.event.to_string(),
322 is_error: ev.event.is_error(),
323 info: ev,
324 };
325
326 if let Ok(val) = to_value(&ev)
327 && events_channel.post_message(&val).is_err()
328 {
329 break;
330 }
331 }
332}