lumina_node_wasm/
client.rsuse std::time::Duration;
use js_sys::Array;
use libp2p::identity::Keypair;
use serde::{Deserialize, Serialize};
use serde_wasm_bindgen::to_value;
use tracing::{debug, error};
use wasm_bindgen::prelude::*;
use web_sys::BroadcastChannel;
use lumina_node::blockstore::IndexedDbBlockstore;
use lumina_node::network::{canonical_network_bootnodes, network_id};
use lumina_node::node::NodeConfig;
use lumina_node::store::IndexedDbStore;
use crate::commands::{CheckableResponseExt, NodeCommand, SingleHeaderQuery};
use crate::error::{Context, Result};
use crate::ports::WorkerClient;
use crate::utils::{
is_safari, js_value_from_display, request_storage_persistence, resolve_dnsaddr_multiaddress,
timeout, Network,
};
use crate::wrapper::libp2p::NetworkInfoSnapshot;
use crate::wrapper::node::{PeerTrackerInfoSnapshot, SyncingInfoSnapshot};
#[wasm_bindgen(inspectable, js_name = NodeConfig)]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WasmNodeConfig {
pub network: Network,
#[wasm_bindgen(getter_with_clone)]
pub bootnodes: Vec<String>,
pub custom_syncing_window_secs: Option<u32>,
}
#[wasm_bindgen]
struct NodeClient {
worker: WorkerClient,
}
#[wasm_bindgen]
impl NodeClient {
#[wasm_bindgen(constructor)]
pub async fn new(port: JsValue) -> Result<NodeClient> {
if !is_safari()? {
if let Err(e) = request_storage_persistence().await {
error!("Error requesting storage persistence: {e}");
}
}
let worker = WorkerClient::new(port)?;
loop {
if timeout(100, worker.exec(NodeCommand::InternalPing))
.await
.is_ok()
{
break;
}
}
debug!("Connected to worker");
Ok(Self { worker })
}
#[wasm_bindgen(js_name = addConnectionToWorker)]
pub async fn add_connection_to_worker(&self, port: &JsValue) -> Result<()> {
self.worker.add_connection_to_worker(port).await
}
#[wasm_bindgen(js_name = isRunning)]
pub async fn is_running(&self) -> Result<bool> {
let command = NodeCommand::IsRunning;
let response = self.worker.exec(command).await?;
let running = response.into_is_running().check_variant()?;
Ok(running)
}
pub async fn start(&self, config: &WasmNodeConfig) -> Result<()> {
let command = NodeCommand::StartNode(config.clone());
let response = self.worker.exec(command).await?;
response.into_node_started().check_variant()??;
Ok(())
}
pub async fn stop(&self) -> Result<()> {
let command = NodeCommand::StopNode;
let response = self.worker.exec(command).await?;
response.into_node_stopped().check_variant()?;
Ok(())
}
#[wasm_bindgen(js_name = localPeerId)]
pub async fn local_peer_id(&self) -> Result<String> {
let command = NodeCommand::GetLocalPeerId;
let response = self.worker.exec(command).await?;
let peer_id = response.into_local_peer_id().check_variant()?;
Ok(peer_id)
}
#[wasm_bindgen(js_name = peerTrackerInfo)]
pub async fn peer_tracker_info(&self) -> Result<PeerTrackerInfoSnapshot> {
let command = NodeCommand::GetPeerTrackerInfo;
let response = self.worker.exec(command).await?;
let peer_info = response.into_peer_tracker_info().check_variant()?;
Ok(peer_info.into())
}
#[wasm_bindgen(js_name = waitConnected)]
pub async fn wait_connected(&self) -> Result<()> {
let command = NodeCommand::WaitConnected { trusted: false };
let response = self.worker.exec(command).await?;
let _ = response.into_connected().check_variant()?;
Ok(())
}
#[wasm_bindgen(js_name = waitConnectedTrusted)]
pub async fn wait_connected_trusted(&self) -> Result<()> {
let command = NodeCommand::WaitConnected { trusted: true };
let response = self.worker.exec(command).await?;
response.into_connected().check_variant()?
}
#[wasm_bindgen(js_name = networkInfo)]
pub async fn network_info(&self) -> Result<NetworkInfoSnapshot> {
let command = NodeCommand::GetNetworkInfo;
let response = self.worker.exec(command).await?;
response.into_network_info().check_variant()?
}
pub async fn listeners(&self) -> Result<Array> {
let command = NodeCommand::GetListeners;
let response = self.worker.exec(command).await?;
let listeners = response.into_listeners().check_variant()?;
let result = listeners?.iter().map(js_value_from_display).collect();
Ok(result)
}
#[wasm_bindgen(js_name = connectedPeers)]
pub async fn connected_peers(&self) -> Result<Array> {
let command = NodeCommand::GetConnectedPeers;
let response = self.worker.exec(command).await?;
let peers = response.into_connected_peers().check_variant()?;
let result = peers?.iter().map(js_value_from_display).collect();
Ok(result)
}
#[wasm_bindgen(js_name = setPeerTrust)]
pub async fn set_peer_trust(&self, peer_id: &str, is_trusted: bool) -> Result<()> {
let command = NodeCommand::SetPeerTrust {
peer_id: peer_id.parse()?,
is_trusted,
};
let response = self.worker.exec(command).await?;
response.into_set_peer_trust().check_variant()?
}
#[wasm_bindgen(js_name = requestHeadHeader)]
pub async fn request_head_header(&self) -> Result<JsValue> {
let command = NodeCommand::RequestHeader(SingleHeaderQuery::Head);
let response = self.worker.exec(command).await?;
let header = response.into_header().check_variant()?;
header.into()
}
#[wasm_bindgen(js_name = requestHeaderByHash)]
pub async fn request_header_by_hash(&self, hash: &str) -> Result<JsValue> {
let command = NodeCommand::RequestHeader(SingleHeaderQuery::ByHash(hash.parse()?));
let response = self.worker.exec(command).await?;
let header = response.into_header().check_variant()?;
header.into()
}
#[wasm_bindgen(js_name = requestHeaderByHeight)]
pub async fn request_header_by_height(&self, height: u64) -> Result<JsValue> {
let command = NodeCommand::RequestHeader(SingleHeaderQuery::ByHeight(height));
let response = self.worker.exec(command).await?;
let header = response.into_header().check_variant()?;
header.into()
}
#[wasm_bindgen(js_name = requestVerifiedHeaders)]
pub async fn request_verified_headers(
&self,
from_header: JsValue,
amount: u64,
) -> Result<Array> {
let command = NodeCommand::GetVerifiedHeaders {
from: from_header,
amount,
};
let response = self.worker.exec(command).await?;
let headers = response.into_headers().check_variant()?;
headers.into()
}
#[wasm_bindgen(js_name = syncerInfo)]
pub async fn syncer_info(&self) -> Result<SyncingInfoSnapshot> {
let command = NodeCommand::GetSyncerInfo;
let response = self.worker.exec(command).await?;
let syncer_info = response.into_syncer_info().check_variant()?;
Ok(syncer_info?.into())
}
#[wasm_bindgen(js_name = getNetworkHeadHeader)]
pub async fn get_network_head_header(&self) -> Result<JsValue> {
let command = NodeCommand::LastSeenNetworkHead;
let response = self.worker.exec(command).await?;
let header = response.into_last_seen_network_head().check_variant()?;
header.into()
}
#[wasm_bindgen(js_name = getLocalHeadHeader)]
pub async fn get_local_head_header(&self) -> Result<JsValue> {
let command = NodeCommand::GetHeader(SingleHeaderQuery::Head);
let response = self.worker.exec(command).await?;
let header = response.into_header().check_variant()?;
header.into()
}
#[wasm_bindgen(js_name = getHeaderByHash)]
pub async fn get_header_by_hash(&self, hash: &str) -> Result<JsValue> {
let command = NodeCommand::GetHeader(SingleHeaderQuery::ByHash(hash.parse()?));
let response = self.worker.exec(command).await?;
let header = response.into_header().check_variant()?;
header.into()
}
#[wasm_bindgen(js_name = getHeaderByHeight)]
pub async fn get_header_by_height(&self, height: u64) -> Result<JsValue> {
let command = NodeCommand::GetHeader(SingleHeaderQuery::ByHeight(height));
let response = self.worker.exec(command).await?;
let header = response.into_header().check_variant()?;
header.into()
}
#[wasm_bindgen(js_name = getHeaders)]
pub async fn get_headers(
&self,
start_height: Option<u64>,
end_height: Option<u64>,
) -> Result<Array> {
let command = NodeCommand::GetHeadersRange {
start_height,
end_height,
};
let response = self.worker.exec(command).await?;
let headers = response.into_headers().check_variant()?;
headers.into()
}
#[wasm_bindgen(js_name = getSamplingMetadata)]
pub async fn get_sampling_metadata(&self, height: u64) -> Result<JsValue> {
let command = NodeCommand::GetSamplingMetadata { height };
let response = self.worker.exec(command).await?;
let metadata = response.into_sampling_metadata().check_variant()?;
Ok(to_value(&metadata?)?)
}
#[wasm_bindgen(js_name = eventsChannel)]
pub async fn events_channel(&self) -> Result<BroadcastChannel> {
let command = NodeCommand::GetEventsChannelName;
let response = self.worker.exec(command).await?;
let name = response.into_events_channel_name().check_variant()?;
Ok(BroadcastChannel::new(&name).unwrap())
}
}
#[wasm_bindgen(js_class = NodeConfig)]
impl WasmNodeConfig {
pub fn default(network: Network) -> WasmNodeConfig {
WasmNodeConfig {
network,
bootnodes: canonical_network_bootnodes(network.into())
.map(|addr| addr.to_string())
.collect::<Vec<_>>(),
custom_syncing_window_secs: None,
}
}
pub(crate) async fn into_node_config(
self,
) -> Result<NodeConfig<IndexedDbBlockstore, IndexedDbStore>> {
let network_id = network_id(self.network.into());
let store = IndexedDbStore::new(network_id)
.await
.context("Failed to open the store")?;
let blockstore = IndexedDbBlockstore::new(&format!("{network_id}-blockstore"))
.await
.context("Failed to open the blockstore")?;
let p2p_local_keypair = Keypair::generate_ed25519();
let mut p2p_bootnodes = Vec::with_capacity(self.bootnodes.len());
for addr in self.bootnodes {
let addr = addr
.parse()
.with_context(|| format!("invalid multiaddr: '{addr}"))?;
let resolved_addrs = resolve_dnsaddr_multiaddress(addr).await?;
p2p_bootnodes.extend(resolved_addrs.into_iter());
}
let syncing_window = self
.custom_syncing_window_secs
.map(|d| Duration::from_secs(d.into()));
Ok(NodeConfig {
network_id: network_id.to_string(),
p2p_bootnodes,
p2p_local_keypair,
p2p_listen_on: vec![],
sync_batch_size: 128,
custom_syncing_window: syncing_window,
blockstore,
store,
})
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::time::Duration;
use celestia_rpc::{prelude::*, Client};
use celestia_types::p2p::PeerId;
use celestia_types::ExtendedHeader;
use gloo_timers::future::sleep;
use libp2p::{multiaddr::Protocol, Multiaddr};
use rexie::Rexie;
use serde_wasm_bindgen::from_value;
use wasm_bindgen_futures::spawn_local;
use wasm_bindgen_test::wasm_bindgen_test;
use web_sys::MessageChannel;
use crate::worker::NodeWorker;
const WS_URL: &str = "ws://127.0.0.1:26658";
#[wasm_bindgen_test]
async fn request_network_head_header() {
remove_database().await.expect("failed to clear db");
let rpc_client = Client::new(WS_URL).await.unwrap();
let bridge_ma = fetch_bridge_webtransport_multiaddr(&rpc_client).await;
let client = spawn_connected_node(vec![bridge_ma.to_string()]).await;
let info = client.network_info().await.unwrap();
assert_eq!(info.num_peers, 1);
let bridge_head_header = rpc_client.header_network_head().await.unwrap();
let head_header: ExtendedHeader =
from_value(client.request_head_header().await.unwrap()).unwrap();
assert_eq!(head_header, bridge_head_header);
rpc_client
.p2p_close_peer(&PeerId(
client.local_peer_id().await.unwrap().parse().unwrap(),
))
.await
.unwrap();
}
#[wasm_bindgen_test]
async fn discover_network_peers() {
crate::utils::setup_logging();
remove_database().await.expect("failed to clear db");
let rpc_client = Client::new(WS_URL).await.unwrap();
let bridge_ma = fetch_bridge_webtransport_multiaddr(&rpc_client).await;
let client = spawn_connected_node(vec![bridge_ma.to_string()]).await;
let info = client.network_info().await.unwrap();
assert_eq!(info.num_peers, 1);
sleep(Duration::from_millis(300)).await;
client.wait_connected().await.unwrap();
let info = client.network_info().await.unwrap();
assert_eq!(info.num_peers, 2);
rpc_client
.p2p_close_peer(&PeerId(
client.local_peer_id().await.unwrap().parse().unwrap(),
))
.await
.unwrap();
}
async fn spawn_connected_node(bootnodes: Vec<String>) -> NodeClient {
let message_channel = MessageChannel::new().unwrap();
let mut worker = NodeWorker::new(message_channel.port1().into());
spawn_local(async move {
worker.run().await.unwrap();
});
let client = NodeClient::new(message_channel.port2().into())
.await
.unwrap();
assert!(!client.is_running().await.expect("node ready to be run"));
client
.start(&WasmNodeConfig {
network: Network::Private,
bootnodes,
custom_syncing_window_secs: None,
})
.await
.unwrap();
assert!(client.is_running().await.expect("running node"));
client.wait_connected_trusted().await.expect("to connect");
client
}
async fn fetch_bridge_webtransport_multiaddr(client: &Client) -> Multiaddr {
let bridge_info = client.p2p_info().await.unwrap();
let mut ma = bridge_info
.addrs
.into_iter()
.find(|ma| {
let not_localhost = !ma
.iter()
.any(|prot| prot == Protocol::Ip4("127.0.0.1".parse().unwrap()));
let webtransport = ma
.protocol_stack()
.any(|protocol| protocol == "webtransport");
not_localhost && webtransport
})
.expect("Bridge doesn't listen on webtransport");
if !ma.protocol_stack().any(|protocol| protocol == "p2p") {
ma.push(Protocol::P2p(bridge_info.id.into()))
}
ma
}
async fn remove_database() -> rexie::Result<()> {
Rexie::delete("private").await?;
Rexie::delete("private-blockstore").await?;
Ok(())
}
}