nt 3.0.0

A NetworkTables revision 3 library for Rust
Documentation
use super::State;
use crate::error::Error;
use crate::{
    Action, CallbackType, ConnectionAction, ConnectionCallbackType, EntryData, EntryValue,
    RpcCallback,
};
use futures_channel::mpsc::{channel, unbounded, Receiver, Sender, UnboundedSender};
use futures_util::StreamExt;
use multimap::MultiMap;
use nt_network::{
    ClearAllEntries, EntryAssignment, EntryDelete, EntryFlagsUpdate, EntryUpdate, Packet,
    RpcExecute,
};
use std::collections::HashMap;
use std::net::SocketAddr;
use std::sync::{Arc, Mutex};
use std::thread;
use tokio::runtime::Runtime;

pub(crate) mod conn;

pub struct ClientState {
    pub(crate) connected: bool,
    ip: String,
    name: String,
    entries: HashMap<u16, EntryData>,
    callbacks: MultiMap<CallbackType, Box<Action>>,
    connection_callbacks: MultiMap<ConnectionCallbackType, Box<ConnectionAction>>,
    pub(crate) pending_entries: HashMap<String, Sender<u16>>,
    pub(crate) packet_tx: UnboundedSender<Box<dyn Packet>>,
    rpc_callbacks: HashMap<u16, Box<RpcCallback>>,
    next_rpc_id: u16,
}

impl ClientState {
    pub async fn new(ip: String, name: String, close_rx: Receiver<()>) -> Arc<Mutex<ClientState>> {
        let (packet_tx, packet_rx) = unbounded::<Box<dyn Packet>>();
        let (ready_tx, mut ready_rx) = unbounded::<()>();

        let state = Arc::new(Mutex::new(ClientState {
            connected: false,
            ip,
            name,
            entries: HashMap::new(),
            callbacks: MultiMap::new(),
            connection_callbacks: MultiMap::new(),
            pending_entries: HashMap::new(),
            packet_tx,
            rpc_callbacks: HashMap::new(),
            next_rpc_id: 0,
        }));

        let rt_state = state.clone();
        thread::spawn(move || {
            let mut rt = Runtime::new().unwrap();
            rt.block_on(conn::connection(rt_state, packet_rx, ready_tx, close_rx))
                .unwrap();
        });

        ready_rx.next().await;
        state
    }

    #[cfg(feature = "websocket")]
    pub async fn new_ws(
        url: String,
        name: String,
        close_rx: Receiver<()>,
    ) -> crate::Result<Arc<Mutex<ClientState>>> {
        let (packet_tx, packet_rx) = unbounded::<Box<dyn Packet>>();
        let (ready_tx, mut ready_rx) = unbounded::<()>();

        let state = Arc::new(Mutex::new(ClientState {
            connected: false,
            ip: url,
            name,
            entries: HashMap::new(),
            callbacks: MultiMap::new(),
            connection_callbacks: MultiMap::new(),
            pending_entries: HashMap::new(),
            packet_tx,
            rpc_callbacks: HashMap::new(),
            next_rpc_id: 0,
        }));

        let rt_state = state.clone();
        thread::spawn(move || {
            let mut rt = Runtime::new().unwrap();

            let _ = rt.block_on(conn::connection_ws(rt_state, packet_rx, ready_tx, close_rx));
        });

        if let None = ready_rx.next().await {
            return Err(Error::ConnectionAborted);
        }
        Ok(state)
    }

    pub fn add_connection_callback(
        &mut self,
        callback_type: ConnectionCallbackType,
        action: impl FnMut(&SocketAddr) + Send + 'static,
    ) {
        self.connection_callbacks
            .insert(callback_type, Box::new(action));
    }

    pub fn call_rpc(
        &mut self,
        id: u16,
        parameter: Vec<u8>,
        callback: impl Fn(Vec<u8>) + Send + 'static,
    ) {
        self.rpc_callbacks
            .insert(self.next_rpc_id, Box::new(callback));
        self.packet_tx
            .unbounded_send(Box::new(RpcExecute::new(id, self.next_rpc_id, parameter)))
            .unwrap();

        self.next_rpc_id += 1;
    }
}

impl State for ClientState {
    fn entries(&self) -> &HashMap<u16, EntryData> {
        &self.entries
    }

    fn entries_mut(&mut self) -> &mut HashMap<u16, EntryData> {
        &mut self.entries
    }

    fn create_entry(&mut self, data: EntryData) -> crate::Result<Receiver<u16>> {
        if !self.connected {
            return Err(Error::BrokenPipe);
        }
        let (tx, rx) = channel::<u16>(1);
        self.pending_entries.insert(data.name.clone(), tx);
        self.packet_tx
            .unbounded_send(Box::new(EntryAssignment::new(
                data.name.clone(),
                data.entry_type(),
                0xFFFF,
                data.seqnum,
                data.flags,
                data.value,
            )))
            .unwrap();
        Ok(rx)
    }

    fn delete_entry(&mut self, id: u16) {
        let packet = EntryDelete::new(id);
        self.packet_tx.unbounded_send(Box::new(packet)).unwrap();
    }

    fn update_entry(&mut self, id: u16, new_value: EntryValue) {
        if let Some(entry) = self.entries.get_mut(&id) {
            entry.value = new_value.clone();
            entry.seqnum += 1;
            self.packet_tx
                .unbounded_send(Box::new(EntryUpdate::new(
                    id,
                    entry.seqnum,
                    entry.entry_type(),
                    new_value,
                )))
                .unwrap();
        }
    }

    fn update_entry_flags(&mut self, id: u16, flags: u8) {
        if let Some(entry) = self.entries.get_mut(&id) {
            entry.flags = flags;
            self.packet_tx
                .unbounded_send(Box::new(EntryFlagsUpdate::new(id, flags)))
                .unwrap();
        }
    }

    fn clear_entries(&mut self) {
        self.packet_tx
            .unbounded_send(Box::new(ClearAllEntries::new()))
            .unwrap();
        self.entries.clear();
    }

    fn add_callback(
        &mut self,
        callback_type: CallbackType,
        action: impl FnMut(&EntryData) + Send + 'static,
    ) {
        self.callbacks.insert(callback_type, Box::new(action));
    }
}