Documentation
use std::time::Duration;
use std::thread;
use std::sync::{
    Arc,
    atomic::{AtomicBool, Ordering}
};

use std::io::ErrorKind::Interrupted;

use queen_io::{
    epoll::{Epoll, Events, Token, Ready, EpollOpt},
    queue::mpsc::Queue
};

use nson::{
    Message,
    message_id::MessageId
};

use crate::Wire;
use crate::error::{Result, Error, RecvError, Code};

pub use hook::{Hook, NonHook};
pub use switch::Switch;
pub use slot::Slot;

mod hook;
mod switch;
mod slot;

#[derive(Clone)]
pub struct Socket {
    inner: Arc<Inner>
}

struct Inner {
    id: MessageId,
    queue: Queue<Packet>,
    run: AtomicBool
}

impl Socket {
    pub fn new(id: MessageId, hook: impl Hook) -> Result<Self> {
        let queue = Queue::new()?;

        let socket = Socket {
            inner: Arc::new(Inner {
                id,
                queue: queue.clone(),
                run: AtomicBool::new(true)
            })
        };

        let mut main_loop = MainLoop::new(
            id,
            queue,
            hook,
        )?;

        let socket2 = socket.clone();
        thread::Builder::new().name("socket".to_string()).spawn(move || {
            let ret = main_loop.run();
            if ret.is_err() {
                log::error!("socket loop exit: {:?}", ret);
            } else {
                log::trace!("socket loop exit");
            }

            socket2.inner.run.store(false, Ordering::Relaxed);

            main_loop.hook.stop(&main_loop.switch);
        }).unwrap();

        Ok(socket)
    }

    pub fn id(&self) -> &MessageId {
        &self.inner.id
    }

    pub fn stop(&self) {
        self.inner.run.store(false, Ordering::Relaxed);
        self.inner.queue.push(Packet::Close);
    }

    pub fn running(&self) -> bool {
        self.inner.run.load(Ordering::Relaxed)
    }

    pub fn connect(
        &self,
        slot_id: MessageId,
        root: bool,
        attr: Message,
        capacity: Option<usize>,
        timeout: Option<Duration>
    ) -> Result<Wire<Message>> {
        let (wire1, wire2) = Wire::pipe(capacity.unwrap_or(64), attr)?;

        let packet = Packet::NewSlot(slot_id, root, wire1);

        self.inner.queue.push(packet);

        let ret = wire2.wait(Some(timeout.unwrap_or_else(|| Duration::from_secs(10))))?;

        if let Some(code) = Code::get(&ret) {
            if code != Code::Ok {
                return Err(Error::ErrorCode(code))
            }
        } else {
            unreachable!()
        }

        Ok(wire2)
    }
}

impl Drop for Socket {
    fn drop(&mut self) {
        if Arc::strong_count(&self.inner) <= 2 {
            self.stop()
        }
    }
}

struct MainLoop<H> {
    epoll: Epoll,
    events: Events,
    queue: Queue<Packet>,
    hook: H,
    switch: Switch
}

enum Packet {
    NewSlot(MessageId, bool, Wire<Message>),
    Close
}

impl<H: Hook> MainLoop<H> {
    const QUEUE_TOKEN: Token = Token(usize::max_value());

    fn new(socket_id: MessageId, queue: Queue<Packet>, hook: H) -> Result<MainLoop<H>> {
        Ok(MainLoop {
            epoll: Epoll::new()?,
            events: Events::with_capacity(1024),
            queue,
            hook,
            switch: Switch::new(socket_id)
        })
    }

    fn run(&mut self) -> Result<()> {
        self.epoll.add(&self.queue, Self::QUEUE_TOKEN, Ready::readable(), EpollOpt::level())?;

        loop {
            let size = match self.epoll.wait(&mut self.events, None) {
                Ok(size) => size,
                Err(err) => {
                    if err.kind() == Interrupted {
                        continue;
                    } else {
                        return Err(err.into())
                    }
                }
            };

            for i in 0..size {
                let event = self.events.get(i).unwrap();

                let token = event.token();

                match token {
                    Self::QUEUE_TOKEN => {
                        if let Some(packet) = self.queue.pop() {
                            match packet {
                                Packet::NewSlot(id, root, wire) => {
                                    self.switch.add_slot(&self.epoll, &self.hook, id, root, wire)?;
                                }
                                Packet::Close => {
                                    return Ok(())
                                }
                            }
                        }
                    }
                    _ => {
                        let token = token.0;
                        if let Some(slot) = self.switch.slots.get(token) {
                            match slot.wire.recv() {
                                Ok(message) => {
                                    self.switch.recv_message(&self.epoll, &self.hook, token, message)?;
                                }
                                Err(err) => {
                                    if !matches!(err, RecvError::Empty) {
                                        self.switch.del_slot(&self.epoll, &self.hook, token)?;
                                    }
                                }
                            }
                        }
                    }
                }
            }
        }
    }
}