oxygengine-network-backend-native 0.46.1

Network Native backend module for Oxygengine
Documentation
use crate::{client::NativeClient, utils::DoOnDrop};
use network::{
    client::{Client, ClientId, ClientState, MessageId},
    server::{Server, ServerId, ServerState},
};
use std::{
    collections::{HashMap, VecDeque},
    io::ErrorKind,
    net::TcpListener,
    ops::Range,
    sync::{Arc, RwLock},
    thread::{sleep, Builder as ThreadBuilder, JoinHandle},
    time::Duration,
};

const LISTENER_SLEEP_MS: u64 = 10;

type MsgData = (ClientId, MessageId, Vec<u8>);

pub struct NativeServer {
    id: ServerId,
    state: Arc<RwLock<ServerState>>,
    clients: Arc<RwLock<HashMap<ClientId, NativeClient>>>,
    clients_ids_cached: Vec<ClientId>,
    messages: VecDeque<MsgData>,
    thread: Option<JoinHandle<()>>,
}

impl Drop for NativeServer {
    fn drop(&mut self) {
        self.cleanup();
    }
}

impl NativeServer {
    fn cleanup(&mut self) {
        if let Ok(mut state) = self.state.write() {
            *state = ServerState::Closed;
        }
        if let Some(thread) = self.thread.take() {
            thread.join().unwrap();
        }
    }
}

impl Server for NativeServer {
    fn open(url: &str) -> Option<Self> {
        let sid = ServerId::default();
        let url = url.to_owned();
        let state = Arc::new(RwLock::new(ServerState::Starting));
        let state2 = state.clone();
        let clients = Arc::new(RwLock::new(HashMap::default()));
        let clients2 = clients.clone();
        let thread = Some(
            ThreadBuilder::new()
                .name(format!("Server: {:?}", sid))
                .spawn(move || {
                    let state3 = state2.clone();
                    let _ = DoOnDrop::new(move || {
                        if let Ok(mut state) = state3.write() {
                            *state = ServerState::Closed;
                        }
                    });
                    let listener = TcpListener::bind(&url).unwrap();
                    listener.set_nonblocking(true).unwrap_or_else(|_| {
                        panic!(
                            "Server {:?} cannot set non-blocking listening on: {}",
                            sid, &url
                        )
                    });
                    if let Ok(mut state) = state2.write() {
                        *state = ServerState::Open;
                    }
                    for stream in listener.incoming() {
                        if let Ok(state) = state2.read() {
                            if *state == ServerState::Closed {
                                break;
                            }
                        }
                        match stream {
                            Ok(stream) => {
                                let client = NativeClient::from(stream);
                                let id = client.id();
                                if let Ok(mut clients) = clients2.write() {
                                    clients.insert(id, client);
                                }
                            }
                            Err(ref e) if e.kind() == ErrorKind::WouldBlock => {
                                sleep(Duration::from_millis(LISTENER_SLEEP_MS));
                            }
                            Err(ref e) if e.kind() == ErrorKind::UnexpectedEof => {
                                break;
                            }
                            Err(e) => {
                                panic!("Server {:?} listener {} got IO error: {}", sid, &url, e)
                            }
                        }
                    }
                    if let Ok(mut state) = state2.write() {
                        *state = ServerState::Closed;
                    }
                })
                .unwrap(),
        );
        Some(Self {
            id: sid,
            state,
            clients,
            clients_ids_cached: vec![],
            messages: Default::default(),
            thread,
        })
    }

    fn close(mut self) -> Self {
        self.cleanup();
        self
    }

    fn id(&self) -> ServerId {
        self.id
    }

    fn state(&self) -> ServerState {
        if let Ok(state) = self.state.read() {
            *state
        } else {
            ServerState::default()
        }
    }

    fn clients(&self) -> &[ClientId] {
        &self.clients_ids_cached
    }

    fn disconnect(&mut self, id: ClientId) {
        if let Ok(mut clients) = self.clients.write() {
            if let Some(client) = clients.remove(&id) {
                client.close();
            }
        }
    }

    fn disconnect_all(&mut self) {
        if let Ok(mut clients) = self.clients.write() {
            for (_, client) in clients.drain() {
                client.close();
            }
        }
    }

    fn send(&mut self, id: ClientId, msg_id: MessageId, data: &[u8]) -> Option<Range<usize>> {
        if self.state() != ServerState::Open {
            return None;
        }
        if let Ok(mut clients) = self.clients.write() {
            if let Some(client) = clients.get_mut(&id) {
                if let Some(size) = client.send(msg_id, data) {
                    return Some(size);
                }
            }
        }
        None
    }

    fn send_all(&mut self, id: MessageId, data: &[u8]) {
        if self.state() != ServerState::Open {
            return;
        }
        if let Ok(mut clients) = self.clients.write() {
            for client in clients.values_mut() {
                client.send(id, data);
            }
        }
    }

    fn read(&mut self) -> Option<(ClientId, MessageId, Vec<u8>)> {
        self.messages.pop_front()
    }

    fn read_all(&mut self) -> Vec<MsgData> {
        self.messages.drain(..).collect()
    }

    fn process(&mut self) {
        if let Ok(mut clients) = self.clients.write() {
            for (id, client) in clients.iter_mut() {
                self.messages.extend(
                    client
                        .read_all()
                        .into_iter()
                        .map(|(mid, data)| (*id, mid, data)),
                );
            }
            clients.retain(|_, client| client.state() != ClientState::Closed);
            self.clients_ids_cached.clear();
            for id in clients.keys() {
                self.clients_ids_cached.push(*id);
            }
        }
    }
}