mod config;
mod discovery;
mod filter;
mod lightpush;
mod management;
mod peers;
mod relay;
mod store;
pub use aes_gcm::{Aes256Gcm, Key};
pub use multiaddr::Multiaddr;
pub use secp256k1::{PublicKey, SecretKey};
use std::marker::PhantomData;
use std::sync::Mutex;
use std::time::Duration;
use crate::general::{
ContentFilter, FilterSubscriptionResult, MessageId, PeerId, ProtocolId, Result, StoreQuery,
StoreResponse, WakuMessage, WakuPubSubTopic,
};
pub use config::{GossipSubParams, WakuLogLevel, WakuNodeConfig, WebsocketParams};
pub use discovery::{waku_discv5_update_bootnodes, waku_dns_discovery, DnsInfo};
pub use peers::{Protocol, WakuPeerData, WakuPeers};
pub use relay::{waku_create_content_topic, waku_default_pubsub_topic};
static WAKU_NODE_INITIALIZED: Mutex<bool> = Mutex::new(false);
pub trait WakuNodeState {}
pub struct Initialized;
pub struct Running;
impl WakuNodeState for Initialized {}
impl WakuNodeState for Running {}
pub struct WakuNodeHandle<State: WakuNodeState>(PhantomData<State>);
unsafe impl<State: WakuNodeState> Send for WakuNodeHandle<State> {}
unsafe impl<State: WakuNodeState> Sync for WakuNodeHandle<State> {}
impl<State: WakuNodeState> WakuNodeHandle<State> {
pub fn peer_id(&self) -> Result<PeerId> {
management::waku_peer_id()
}
pub fn listen_addresses(&self) -> Result<Vec<Multiaddr>> {
management::waku_listen_addresses()
}
pub fn add_peer(&self, address: &Multiaddr, protocol_id: ProtocolId) -> Result<PeerId> {
peers::waku_add_peers(address, protocol_id)
}
}
fn stop_node() -> Result<()> {
let mut node_initialized = WAKU_NODE_INITIALIZED
.lock()
.expect("Access to the mutex at some point");
*node_initialized = false;
management::waku_stop().map(|_| ())
}
impl WakuNodeHandle<Initialized> {
pub fn start(self) -> Result<WakuNodeHandle<Running>> {
management::waku_start().map(|_| WakuNodeHandle(Default::default()))
}
pub fn stop(self) -> Result<()> {
stop_node()
}
}
impl WakuNodeHandle<Running> {
pub fn stop(self) -> Result<()> {
stop_node()
}
pub fn connect_peer_with_address(
&self,
address: &Multiaddr,
timeout: Option<Duration>,
) -> Result<()> {
peers::waku_connect_peer_with_address(address, timeout)
}
pub fn connect_peer_with_id(&self, peer_id: &PeerId, timeout: Option<Duration>) -> Result<()> {
peers::waku_connect_peer_with_id(peer_id, timeout)
}
pub fn disconnect_peer_with_id(&self, peer_id: &PeerId) -> Result<()> {
peers::waku_disconnect_peer_with_id(peer_id)
}
pub fn peer_count(&self) -> Result<usize> {
peers::waku_peer_count()
}
pub fn peers(&self) -> Result<WakuPeers> {
peers::waku_peers()
}
pub fn relay_publish_message(
&self,
message: &WakuMessage,
pubsub_topic: Option<WakuPubSubTopic>,
timeout: Option<Duration>,
) -> Result<MessageId> {
relay::waku_relay_publish_message(message, pubsub_topic, timeout)
}
pub fn relay_enough_peers(&self, pubsub_topic: Option<WakuPubSubTopic>) -> Result<bool> {
relay::waku_enough_peers(pubsub_topic)
}
pub fn relay_subscribe(&self, content_filter: &ContentFilter) -> Result<()> {
relay::waku_relay_subscribe(content_filter)
}
pub fn relay_unsubscribe(&self, content_filter: &ContentFilter) -> Result<()> {
relay::waku_relay_unsubscribe(content_filter)
}
pub fn relay_topics(&self) -> Result<Vec<String>> {
relay::waku_relay_topics()
}
pub fn store_query(
&self,
query: &StoreQuery,
peer_id: &PeerId,
timeout: Option<Duration>,
) -> Result<StoreResponse> {
store::waku_store_query(query, peer_id, timeout)
}
pub fn local_store_query(&self, query: &StoreQuery) -> Result<StoreResponse> {
store::waku_local_store_query(query)
}
pub fn lightpush_publish(
&self,
message: &WakuMessage,
pubsub_topic: Option<WakuPubSubTopic>,
peer_id: PeerId,
timeout: Option<Duration>,
) -> Result<MessageId> {
lightpush::waku_lightpush_publish(message, pubsub_topic, peer_id, timeout)
}
pub fn filter_subscribe(
&self,
content_filter: &ContentFilter,
peer_id: Option<PeerId>,
timeout: Option<Duration>,
) -> Result<FilterSubscriptionResult> {
filter::waku_filter_subscribe(content_filter, peer_id, timeout)
}
pub fn filter_ping(&self, peer_id: PeerId, timeout: Option<Duration>) -> Result<()> {
filter::waku_filter_ping(peer_id, timeout)
}
pub fn filter_unsubscribe(
&self,
content_filter: &ContentFilter,
peer_id: PeerId,
timeout: Option<Duration>,
) -> Result<()> {
filter::waku_filter_unsubscribe(content_filter, peer_id, timeout)
}
pub fn filter_unsubscribe_all(
&self,
peer_id: Option<PeerId>,
timeout: Option<Duration>,
) -> Result<()> {
filter::waku_filter_unsubscribe_all(peer_id, timeout)
}
pub fn discv5_update_bootnodes(bootnodes: Vec<String>) -> Result<()> {
discovery::waku_discv5_update_bootnodes(bootnodes)
}
}
pub fn waku_new(config: Option<WakuNodeConfig>) -> Result<WakuNodeHandle<Initialized>> {
let mut node_initialized = WAKU_NODE_INITIALIZED
.lock()
.expect("Access to the mutex at some point");
if *node_initialized {
return Err("Waku node is already initialized".into());
}
*node_initialized = true;
management::waku_new(config).map(|_| WakuNodeHandle(Default::default()))
}
#[cfg(test)]
mod tests {
use super::waku_new;
use serial_test::serial;
#[test]
#[serial]
fn exclusive_running() {
let handle1 = waku_new(None).unwrap();
let handle2 = waku_new(None);
assert!(handle2.is_err());
let stop_handle = handle1.start().unwrap();
stop_handle.stop().unwrap();
}
}