pub(crate) mod sys;
pub(crate) mod wrapper;
use std::sync::{Arc, Mutex, mpsc};
use std::thread;
use std::time::Duration;
use base64::{Engine, engine::general_purpose::STANDARD as BASE64};
use tracing::{debug, error, info};
use crate::ds::{
DeliveryServiceError,
transport::{DeliveryService, InboundPacket, OutboundPacket},
};
use wrapper::WakuNodeCtx;
use super::{GROUP_VERSION, SUBTOPICS};
pub fn pubsub_topic() -> String {
"/waku/2/rs/15/1".to_string()
}
pub fn build_content_topics(group_name: &str) -> Vec<String> {
SUBTOPICS
.iter()
.map(|subtopic| build_content_topic(group_name, GROUP_VERSION, subtopic))
.collect()
}
pub fn build_content_topic(group_name: &str, group_version: &str, subtopic: &str) -> String {
format!("/{group_name}/{group_version}/{subtopic}/proto")
}
struct OutboundCommand {
pkt: OutboundPacket,
reply: mpsc::SyncSender<Result<String, DeliveryServiceError>>,
}
type SubscriberList = Arc<Mutex<Vec<mpsc::SyncSender<InboundPacket>>>>;
pub struct WakuStartResult {
pub service: WakuDeliveryService,
pub enr: Option<String>,
}
#[derive(Clone)]
pub struct WakuDeliveryService {
outbound: mpsc::SyncSender<OutboundCommand>,
subscribers: SubscriberList,
enr: Option<String>,
}
#[derive(Debug, Clone)]
pub struct WakuConfig {
pub node_port: u16,
pub discv5: bool,
pub discv5_udp_port: u16,
pub discv5_bootstrap_enrs: Vec<String>,
}
impl Default for WakuConfig {
fn default() -> Self {
Self {
node_port: 60000,
discv5: false,
discv5_udp_port: 9000,
discv5_bootstrap_enrs: Vec::new(),
}
}
}
impl WakuDeliveryService {
pub fn start(cfg: WakuConfig) -> Result<WakuStartResult, DeliveryServiceError> {
let (out_tx, out_rx) = mpsc::sync_channel::<OutboundCommand>(256);
let subscribers: SubscriberList = Arc::new(Mutex::new(Vec::new()));
let (ready_tx, ready_rx) = mpsc::channel::<Result<Option<String>, DeliveryServiceError>>();
let subs_for_thread = subscribers.clone();
thread::Builder::new()
.name("waku-node".into())
.spawn(move || {
if let Err(panic) = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
Self::node_thread(cfg, out_rx, subs_for_thread, ready_tx);
})) {
let msg = panic
.downcast_ref::<&str>()
.map(|s| s.to_string())
.or_else(|| panic.downcast_ref::<String>().cloned())
.unwrap_or_else(|| "unknown panic".to_string());
error!("waku-node thread panicked: {msg}");
}
})
.map_err(|e| DeliveryServiceError::Other(anyhow::anyhow!(e)))?;
let enr = ready_rx
.recv()
.map_err(|e| DeliveryServiceError::Other(anyhow::anyhow!(e)))??;
let service = Self {
outbound: out_tx,
subscribers,
enr: enr.clone(),
};
Ok(WakuStartResult { service, enr })
}
pub fn enr(&self) -> Option<&str> {
self.enr.as_deref()
}
pub fn shutdown(self) {
drop(self.outbound);
}
fn node_thread(
cfg: WakuConfig,
out_rx: mpsc::Receiver<OutboundCommand>,
subscribers: SubscriberList,
ready_tx: mpsc::Sender<Result<Option<String>, DeliveryServiceError>>,
) {
let mut config = serde_json::json!({
"host": "0.0.0.0",
"port": cfg.node_port,
"relay": true,
"clusterId": 15,
"shards": [1],
"numShardsInNetwork": 8,
"logLevel": "FATAL",
});
if cfg.discv5 {
config["discv5Discovery"] = serde_json::json!(true);
config["discv5UdpPort"] = serde_json::json!(cfg.discv5_udp_port);
if !cfg.discv5_bootstrap_enrs.is_empty() {
config["discv5BootstrapNodes"] = serde_json::json!(cfg.discv5_bootstrap_enrs);
}
}
let config_json = config.to_string();
let waku = match WakuNodeCtx::new(&config_json) {
Ok(w) => w,
Err(e) => {
let _ = ready_tx.send(Err(DeliveryServiceError::WakuNodeAlreadyInitialized(e)));
return;
}
};
if let Err(e) = waku.start() {
let _ = ready_tx.send(Err(DeliveryServiceError::WakuNodeAlreadyInitialized(e)));
return;
}
info!("Waku node started");
thread::sleep(Duration::from_secs(2));
let local_enr = if cfg.discv5 {
match waku.get_enr() {
Ok(enr) => {
info!("Local ENR: {enr}");
Some(enr)
}
Err(e) => {
info!("Could not retrieve ENR: {e}");
None
}
}
} else {
None
};
let topic = pubsub_topic();
if let Err(e) = waku.relay_subscribe(&topic) {
info!("relay_subscribe returned (may already be subscribed): {e}");
}
let subs_for_cb = subscribers.clone();
let event_closure = move |_ret: i32, data: &str| {
if let Some(pkt) = Self::parse_event(data) {
let guard = subs_for_cb.lock();
let mut guard = match guard {
Ok(g) => g,
Err(e) => {
error!("Subscriber mutex poisoned: {e}");
return;
}
};
guard.retain(|tx| match tx.try_send(pkt.clone()) {
Ok(()) => true,
Err(mpsc::TrySendError::Full(_)) => true, Err(mpsc::TrySendError::Disconnected(_)) => false, });
}
};
let _event_cb_guard = waku.set_event_callback(event_closure);
let _ = ready_tx.send(Ok(local_enr));
let topic = pubsub_topic();
while let Ok(cmd) = out_rx.recv() {
let res = Self::do_publish(&waku, &topic, cmd.pkt);
let _ = cmd.reply.try_send(res);
}
info!("Waku outbound loop finished");
}
fn do_publish(
waku: &WakuNodeCtx,
pubsub_topic: &str,
pkt: OutboundPacket,
) -> Result<String, DeliveryServiceError> {
let content_topic = build_content_topic(&pkt.group_id, GROUP_VERSION, &pkt.subtopic);
let payload_b64 = BASE64.encode(&pkt.payload);
let timestamp = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_nanos();
let meta_b64 = BASE64.encode(&pkt.app_id);
let msg_json = serde_json::json!({
"payload": payload_b64,
"contentTopic": content_topic,
"timestamp": timestamp as u64,
"version": 2,
"meta": meta_b64,
})
.to_string();
waku.relay_publish(pubsub_topic, &msg_json, 10_000)
.map_err(|e| {
error!("Failed to relay publish: {e}");
DeliveryServiceError::WakuPublishMessageError(e)
})
}
fn parse_event(data: &str) -> Option<InboundPacket> {
let v: serde_json::Value = serde_json::from_str(data).ok()?;
let waku_msg = v.get("wakuMessage")?;
let payload_b64 = waku_msg.get("payload")?.as_str()?;
let payload = BASE64.decode(payload_b64).ok()?;
let content_topic = waku_msg.get("contentTopic")?.as_str()?;
let timestamp = waku_msg
.get("timestamp")
.and_then(|t| t.as_i64())
.unwrap_or(0);
let meta = waku_msg
.get("meta")
.and_then(|m| m.as_str())
.and_then(|m| BASE64.decode(m).ok())
.unwrap_or_default();
let (group_id, subtopic) = Self::parse_content_topic(content_topic)?;
debug!("Inbound message: group={group_id} subtopic={subtopic}");
Some(InboundPacket {
payload,
subtopic,
group_id,
app_id: meta,
timestamp,
})
}
fn parse_content_topic(ct: &str) -> Option<(String, String)> {
let mut parts = ct.split('/');
let _empty = parts.next()?; let group_id = parts.next()?;
let _version = parts.next()?;
let subtopic = parts.next()?;
if group_id.is_empty() || subtopic.is_empty() {
return None;
}
Some((group_id.to_owned(), subtopic.to_owned()))
}
}
impl DeliveryService for WakuDeliveryService {
fn send(&self, pkt: OutboundPacket) -> Result<String, DeliveryServiceError> {
let (reply_tx, reply_rx) = mpsc::sync_channel(1);
self.outbound
.send(OutboundCommand {
pkt,
reply: reply_tx,
})
.map_err(|e| DeliveryServiceError::Other(anyhow::anyhow!(e)))?;
reply_rx
.recv()
.map_err(|e| DeliveryServiceError::Other(anyhow::anyhow!(e)))?
}
fn subscribe(&self) -> mpsc::Receiver<InboundPacket> {
let (tx, rx) = mpsc::sync_channel(256);
match self.subscribers.lock() {
Ok(mut g) => g.push(tx),
Err(e) => {
error!("Subscriber mutex poisoned, subscription lost: {e}");
}
}
rx
}
}