roblib-client 0.1.0

A client library for a dank engine
Documentation
use crate::Transport;
use anyhow::Result;
use roblib::{
    cmd::{self, has_return, Command},
    event::Event,
};
use serde::Deserialize;
use std::{collections::HashMap, io::Cursor, sync::Arc};

use super::Subscribable;

type D<'a> = bincode::Deserializer<bincode::de::read::SliceReader<'a>, bincode::DefaultOptions>;
type Handler = Box<dyn (for<'a> FnMut(D<'a>) -> Result<()>) + Send + Sync>;

struct UdpInner {
    events: std::sync::Mutex<HashMap<roblib::event::ConcreteType, u32>>,
    handlers: std::sync::Mutex<HashMap<u32, Handler>>,
    running: std::sync::RwLock<bool>,
}

pub struct Udp {
    inner: Arc<UdpInner>,

    sock: std::net::UdpSocket,
    id: std::sync::Mutex<u32>,
}

impl Udp {
    pub fn connect(addr: impl std::net::ToSocketAddrs) -> Result<Self> {
        let sock = std::net::UdpSocket::bind("0.0.0.0:0")?;
        sock.connect(addr)?;

        let sock2 = sock.try_clone()?;

        let inner = Arc::new(UdpInner {
            handlers: HashMap::new().into(),
            events: HashMap::new().into(),
            running: true.into(),
        });

        let i2 = inner.clone();
        std::thread::spawn(move || Self::recieve(i2, sock2));

        Ok(Self {
            id: super::ID_START.into(),
            inner,
            sock,
        })
    }

    fn recieve(inner: Arc<UdpInner>, sock: std::net::UdpSocket) -> Result<()> {
        let mut buf = [0; 512];
        loop {
            let running = inner.running.read().unwrap();
            if !*running {
                break;
            }
            drop(running);

            let len = sock.recv(&mut buf)?;
            let buf = &buf[..len];

            let mut curs = Cursor::new(buf);
            let id: u32 = bincode::Options::deserialize_from(bincode::options(), &mut curs)?;
            if let Some(h) = inner.handlers.lock().unwrap().get_mut(&id) {
                let pos = curs.position() as usize;
                let rest = &curs.into_inner()[pos..];
                h(bincode::Deserializer::from_slice(rest, bincode::options()))?;
            }
        }
        Ok(())
    }

    fn cmd_id<C>(&self, cmd: C, id: u32) -> Result<C::Return>
    where
        C: Command,
    {
        let concrete: cmd::Concrete = cmd.into();
        self.sock.send(&bincode::Options::serialize(
            bincode::options(),
            &(id, concrete),
        )?)?;

        Ok(if has_return::<C>() {
            let (tx, rx) = std::sync::mpsc::sync_channel(1);

            let a: Handler = Box::new(move |mut des: D| {
                let r = C::Return::deserialize(&mut des)?;
                tx.send(r).unwrap();
                Ok::<(), anyhow::Error>(())
            });

            self.inner.handlers.lock().unwrap().insert(id, a);

            rx.recv()?
        } else {
            unsafe { std::mem::zeroed() }
        })
    }
}

impl Transport for Udp {
    fn cmd<C>(&self, cmd: C) -> Result<C::Return>
    where
        C: Command,
    {
        let mut id_handle = self.id.lock().unwrap();
        let id = *id_handle;
        *id_handle = id + 1;
        drop(id_handle);

        let res = self.cmd_id(cmd, id);
        self.inner.handlers.lock().unwrap().remove(&id);
        res
    }
}

impl Subscribable for Udp {
    fn subscribe<E, F>(&self, ev: E, mut handler: F) -> Result<()>
    where
        E: Event,
        F: (FnMut(E::Item) -> Result<()>) + Send + Sync + 'static,
    {
        let mut id_handle = self.id.lock().unwrap();
        let id = *id_handle;
        *id_handle = id + 1;
        drop(id_handle);

        self.inner.handlers.lock().unwrap().insert(
            id,
            Box::new(move |mut des| handler(E::Item::deserialize(&mut des)?)),
        );

        self.cmd_id(cmd::Subscribe(ev.into()), id)?;

        Ok(())
    }

    fn unsubscribe<E: roblib::event::Event>(&self, ev: E) -> Result<()> {
        let ev = ev.into();
        let cmd: cmd::Concrete = cmd::Unsubscribe(ev.clone()).into();

        self.sock
            .send(&bincode::Options::serialize(bincode::options(), &cmd)?)?;

        let id = self.inner.events.lock().unwrap().remove(&ev).unwrap();

        self.inner.handlers.lock().unwrap().remove(&id);

        Ok(())
    }
}