pub mod callback;
pub mod entry;
use crate::Result;
pub use self::entry::*;
use crate::nt::callback::*;
use crate::proto::server::ServerState;
use crate::proto::{client::ClientState, Client, NTBackend, Server, State};
use futures_channel::mpsc::{channel, unbounded, Sender};
use futures_util::StreamExt;
use nt_network::types::EntryValue;
use nt_network::Packet;
use std::collections::HashMap;
use std::net::SocketAddr;
use std::panic::RefUnwindSafe;
use std::sync::{Arc, Mutex};
use std::thread;
use tokio::runtime::Runtime;
pub struct NetworkTables<T: NTBackend> {
state: Arc<Mutex<T::State>>,
close_tx: Sender<()>,
}
impl NetworkTables<Client> {
pub async fn connect(ip: &str, client_name: &str) -> Result<NetworkTables<Client>> {
let (close_tx, close_rx) = channel::<()>(1);
let state = ClientState::new(ip.to_string(), client_name.to_string(), close_rx).await;
Ok(NetworkTables { state, close_tx })
}
pub async fn reconnect(&mut self) {
let rt_state = self.state.clone();
let (close_tx, close_rx) = channel::<()>(1);
let (packet_tx, packet_rx) = unbounded::<Box<dyn Packet>>();
let (ready_tx, mut ready_rx) = unbounded();
self.close_tx = close_tx;
{
let mut state = self.state.lock().unwrap();
state.packet_tx = packet_tx;
state.entries_mut().clear();
state.pending_entries.clear();
}
thread::spawn(move || {
let mut rt = Runtime::new().unwrap();
rt.block_on(crate::proto::client::conn::connection(
rt_state, packet_rx, ready_tx, close_rx,
))
.unwrap();
});
let _ = ready_rx.next().await;
}
#[cfg(feature = "websocket")]
pub async fn connect_ws(ip: &str, client_name: &str) -> Result<NetworkTables<Client>> {
let (close_tx, close_rx) = channel::<()>(1);
let state = ClientState::new_ws(ip.to_string(), client_name.to_string(), close_rx).await?;
Ok(NetworkTables { state, close_tx })
}
#[cfg(feature = "websocket")]
pub async fn reconnect_ws(&mut self) {
let rt_state = self.state.clone();
let (close_tx, close_rx) = channel::<()>(1);
let (packet_tx, packet_rx) = unbounded::<Box<dyn Packet>>();
let (ready_tx, mut ready_rx) = unbounded();
self.close_tx = close_tx;
{
let mut state = self.state.lock().unwrap();
state.packet_tx = packet_tx;
state.entries_mut().clear();
state.pending_entries.clear();
}
thread::spawn(move || {
let mut rt = Runtime::new().unwrap();
rt.block_on(crate::proto::client::conn::connection_ws(
rt_state, packet_rx, ready_tx, close_rx,
))
.unwrap();
});
let _ = ready_rx.next().await;
}
pub fn add_connection_callback(
&self,
callback_type: ConnectionCallbackType,
action: impl FnMut(&SocketAddr) + Send + 'static,
) {
self.state
.lock()
.unwrap()
.add_connection_callback(callback_type, action);
}
pub fn call_rpc(
&self,
id: u16,
parameter: Vec<u8>,
callback: impl Fn(Vec<u8>) + Send + 'static,
) {
self.state.lock().unwrap().call_rpc(id, parameter, callback);
}
}
impl NetworkTables<Server> {
pub fn bind(ip: &str, server_name: &str) -> NetworkTables<Server> {
let (close_tx, close_rx) = channel::<()>(1);
let state = ServerState::new(ip.to_string(), server_name.to_string(), close_rx);
NetworkTables { state, close_tx }
}
pub fn add_connection_callback(
&mut self,
callback_type: ConnectionCallbackType,
action: impl FnMut(&SocketAddr) + Send + 'static,
) {
self.state
.lock()
.unwrap()
.add_server_callback(callback_type, action);
}
pub fn create_rpc(
&mut self,
data: EntryData,
callback: impl Fn(Vec<u8>) -> Vec<u8> + Send + Sync + RefUnwindSafe + 'static,
) {
self.state.lock().unwrap().create_rpc(data, callback);
}
}
impl<T: NTBackend> NetworkTables<T> {
pub fn entries(&self) -> HashMap<u16, EntryData> {
self.state.lock().unwrap().entries().clone()
}
pub fn get_entry(&self, id: u16) -> Entry<T> {
Entry::new(self, id)
}
pub async fn create_entry(&self, data: EntryData) -> crate::Result<u16> {
let mut rx = self.state.lock().unwrap().create_entry(data)?;
Ok(rx.next().await.unwrap())
}
pub fn delete_entry(&self, id: u16) {
self.state.lock().unwrap().delete_entry(id);
}
pub fn clear_entries(&self) {
self.state.lock().unwrap().clear_entries();
}
pub fn update_entry(&self, id: u16, new_value: EntryValue) {
self.state.lock().unwrap().update_entry(id, new_value);
}
pub fn add_callback<F>(&mut self, action: CallbackType, cb: F)
where
F: FnMut(&EntryData) + Send + 'static,
{
let mut state = self.state.lock().unwrap();
state.add_callback(action, cb);
}
pub fn update_entry_flags(&self, id: u16, new_flags: u8) {
self.state.lock().unwrap().update_entry_flags(id, new_flags);
}
}
impl<T: NTBackend> Drop for NetworkTables<T> {
fn drop(&mut self) {
let _ = self.close_tx.try_send(());
}
}