use nightshade::prelude::tracing;
use serde::Serialize;
use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
enum Outbound {
Publish { topic: String, payload: String },
PublishBytes { topic: String, bytes: Vec<u8> },
Subscribe(Vec<String>),
Unsubscribe(Vec<String>),
}
enum Inbound {
Connected { client_id: String },
Disconnected,
Message(NetMessage),
}
#[derive(Debug, Clone)]
pub struct NetMessage {
pub topic: String,
pub payload: String,
pub bytes: Option<Vec<u8>>,
}
pub struct Network {
outbound: UnboundedSender<Outbound>,
inbound: UnboundedReceiver<Inbound>,
connected: bool,
client_id: String,
}
enum Role {
Host {
broker_address: String,
websocket_address: Option<String>,
},
Join {
broker_address: String,
},
}
pub fn host(address: &str) -> Network {
let websocket_address = next_port(address);
start(Role::Host {
broker_address: address.to_string(),
websocket_address,
})
}
pub fn join(address: &str) -> Network {
start(Role::Join {
broker_address: address.to_string(),
})
}
pub fn subscribe(network: &Network, topics: &[&str]) {
let topics = topics.iter().map(|topic| topic.to_string()).collect();
let _ = network.outbound.send(Outbound::Subscribe(topics));
}
pub fn unsubscribe(network: &Network, topics: &[&str]) {
let topics = topics.iter().map(|topic| topic.to_string()).collect();
let _ = network.outbound.send(Outbound::Unsubscribe(topics));
}
pub fn publish<T: Serialize>(network: &Network, topic: &str, payload: &T) {
match serde_json::to_string(payload) {
Ok(message) => publish_json(network, topic, &message),
Err(error) => tracing::error!("Failed to serialize network payload: {error}"),
}
}
pub fn publish_json(network: &Network, topic: &str, json: &str) {
let _ = network.outbound.send(Outbound::Publish {
topic: topic.to_string(),
payload: json.to_string(),
});
}
pub fn publish_bytes(network: &Network, topic: &str, bytes: &[u8]) {
let _ = network.outbound.send(Outbound::PublishBytes {
topic: topic.to_string(),
bytes: bytes.to_vec(),
});
}
pub fn drain_network(network: &mut Network) -> Vec<NetMessage> {
let mut messages = Vec::new();
while let Ok(event) = network.inbound.try_recv() {
match event {
Inbound::Connected { client_id } => {
network.connected = true;
network.client_id = client_id;
}
Inbound::Disconnected => network.connected = false,
Inbound::Message(message) => messages.push(message),
}
}
messages
}
pub fn is_connected(network: &Network) -> bool {
network.connected
}
pub fn client_id(network: &Network) -> &str {
&network.client_id
}
fn next_port(address: &str) -> Option<String> {
let (host, port) = address.rsplit_once(':')?;
let port: u16 = port.parse().ok()?;
Some(format!("{host}:{}", port.checked_add(1)?))
}
fn start(role: Role) -> Network {
let (outbound_sender, outbound_receiver) = unbounded_channel();
let (inbound_sender, inbound_receiver) = unbounded_channel();
std::thread::spawn(move || run_runtime(role, outbound_receiver, inbound_sender));
Network {
outbound: outbound_sender,
inbound: inbound_receiver,
connected: false,
client_id: String::new(),
}
}
fn run_runtime(
role: Role,
outbound: UnboundedReceiver<Outbound>,
inbound: UnboundedSender<Inbound>,
) {
let runtime = match tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
{
Ok(runtime) => runtime,
Err(error) => {
tracing::error!("Failed to start network runtime: {error}");
return;
}
};
runtime.block_on(runtime_loop(role, outbound, inbound));
}
async fn runtime_loop(
role: Role,
mut outbound: UnboundedReceiver<Outbound>,
inbound: UnboundedSender<Inbound>,
) {
let broker_address = match &role {
Role::Host {
broker_address,
websocket_address,
} => match hearsay::start_broker(broker_address).await {
Ok(broker) => {
if let Some(websocket_address) = websocket_address
&& let Err(error) =
hearsay::start_websocket_listener(&broker, websocket_address).await
{
tracing::warn!("Failed to start websocket listener: {error}");
}
std::mem::forget(broker);
broker_address.clone()
}
Err(error) => {
tracing::error!("Failed to host session: {error}");
return;
}
},
Role::Join { broker_address } => broker_address.clone(),
};
let mut client = hearsay::create_client("nightshade", hearsay::ClientSettings::default());
if let Err(error) = hearsay::connect(&mut client, &broker_address).await {
tracing::error!("Failed to connect to session: {error}");
return;
}
let client_id = hearsay::client_id(&client).await;
let _ = inbound.send(Inbound::Connected { client_id });
let mut connected = true;
loop {
if connected {
tokio::select! {
command = outbound.recv() => match command {
Some(command) => apply(command, &mut client).await,
None => break,
},
message = hearsay::next_message(&mut client) => match message {
Some(message) => {
let _ = inbound.send(Inbound::Message(NetMessage {
topic: message.topic,
payload: message.payload,
bytes: message.bytes,
}));
}
None => {
connected = false;
let _ = inbound.send(Inbound::Disconnected);
}
},
}
} else {
tokio::select! {
command = outbound.recv() => match command {
Some(command) => apply(command, &mut client).await,
None => break,
},
_ = tokio::time::sleep(std::time::Duration::from_millis(500)) => {
if hearsay::is_connected(&client).await {
connected = true;
let client_id = hearsay::client_id(&client).await;
let _ = inbound.send(Inbound::Connected { client_id });
}
}
}
}
}
}
async fn apply(command: Outbound, client: &mut hearsay::Client) {
match command {
Outbound::Publish { topic, payload } => {
let _ = hearsay::publish_json(client, &topic, &payload, hearsay::Route::Global).await;
}
Outbound::PublishBytes { topic, bytes } => {
let _ = hearsay::publish_bytes(client, &topic, &bytes, hearsay::Route::Global).await;
}
Outbound::Subscribe(topics) => {
let topics: Vec<&str> = topics.iter().map(String::as_str).collect();
let _ = hearsay::subscribe(client, &topics).await;
}
Outbound::Unsubscribe(topics) => {
let topics: Vec<&str> = topics.iter().map(String::as_str).collect();
let _ = hearsay::unsubscribe(client, &topics).await;
}
}
}