queen/
socket.rs

1use std::time::Duration;
2use std::thread;
3use std::sync::{
4    Arc,
5    atomic::{AtomicBool, Ordering}
6};
7
8use std::io::ErrorKind::Interrupted;
9
10use queen_io::{
11    epoll::{Epoll, Events, Token, Ready, EpollOpt},
12    queue::mpsc::Queue
13};
14
15use nson::{
16    Message,
17    message_id::MessageId
18};
19
20use crate::Wire;
21use crate::error::{Result, Error, RecvError, Code};
22
23pub use hook::{Hook, NonHook};
24pub use switch::Switch;
25pub use slot::Slot;
26
27mod hook;
28mod switch;
29mod slot;
30
31#[derive(Clone)]
32pub struct Socket {
33    inner: Arc<Inner>
34}
35
36struct Inner {
37    id: MessageId,
38    queue: Queue<Packet>,
39    run: AtomicBool
40}
41
42impl Socket {
43    pub fn new(id: MessageId, hook: impl Hook) -> Result<Self> {
44        let queue = Queue::new()?;
45
46        let socket = Socket {
47            inner: Arc::new(Inner {
48                id,
49                queue: queue.clone(),
50                run: AtomicBool::new(true)
51            })
52        };
53
54        let mut main_loop = MainLoop::new(
55            id,
56            queue,
57            hook,
58        )?;
59
60        let socket2 = socket.clone();
61        thread::Builder::new().name("socket".to_string()).spawn(move || {
62            let ret = main_loop.run();
63            if ret.is_err() {
64                log::error!("socket loop exit: {:?}", ret);
65            } else {
66                log::trace!("socket loop exit");
67            }
68
69            socket2.inner.run.store(false, Ordering::Relaxed);
70
71            main_loop.hook.stop(&main_loop.switch);
72        }).unwrap();
73
74        Ok(socket)
75    }
76
77    pub fn id(&self) -> &MessageId {
78        &self.inner.id
79    }
80
81    pub fn stop(&self) {
82        self.inner.run.store(false, Ordering::Relaxed);
83        self.inner.queue.push(Packet::Close);
84    }
85
86    pub fn running(&self) -> bool {
87        self.inner.run.load(Ordering::Relaxed)
88    }
89
90    pub fn connect(
91        &self,
92        slot_id: MessageId,
93        root: bool,
94        attr: Message,
95        capacity: Option<usize>,
96        timeout: Option<Duration>
97    ) -> Result<Wire<Message>> {
98        let (wire1, wire2) = Wire::pipe(capacity.unwrap_or(64), attr)?;
99
100        let packet = Packet::NewSlot(slot_id, root, wire1);
101
102        self.inner.queue.push(packet);
103
104        let ret = wire2.wait(Some(timeout.unwrap_or_else(|| Duration::from_secs(10))))?;
105
106        if let Some(code) = Code::get(&ret) {
107            if code != Code::Ok {
108                return Err(Error::ErrorCode(code))
109            }
110        } else {
111            unreachable!()
112        }
113
114        Ok(wire2)
115    }
116}
117
118impl Drop for Socket {
119    fn drop(&mut self) {
120        if Arc::strong_count(&self.inner) <= 2 {
121            self.stop()
122        }
123    }
124}
125
126struct MainLoop<H> {
127    epoll: Epoll,
128    events: Events,
129    queue: Queue<Packet>,
130    hook: H,
131    switch: Switch
132}
133
134enum Packet {
135    NewSlot(MessageId, bool, Wire<Message>),
136    Close
137}
138
139impl<H: Hook> MainLoop<H> {
140    const QUEUE_TOKEN: Token = Token(usize::max_value());
141
142    fn new(socket_id: MessageId, queue: Queue<Packet>, hook: H) -> Result<MainLoop<H>> {
143        Ok(MainLoop {
144            epoll: Epoll::new()?,
145            events: Events::with_capacity(1024),
146            queue,
147            hook,
148            switch: Switch::new(socket_id)
149        })
150    }
151
152    fn run(&mut self) -> Result<()> {
153        self.epoll.add(&self.queue, Self::QUEUE_TOKEN, Ready::readable(), EpollOpt::level())?;
154
155        loop {
156            let size = match self.epoll.wait(&mut self.events, None) {
157                Ok(size) => size,
158                Err(err) => {
159                    if err.kind() == Interrupted {
160                        continue;
161                    } else {
162                        return Err(err.into())
163                    }
164                }
165            };
166
167            for i in 0..size {
168                let event = self.events.get(i).unwrap();
169
170                let token = event.token();
171
172                match token {
173                    Self::QUEUE_TOKEN => {
174                        if let Some(packet) = self.queue.pop() {
175                            match packet {
176                                Packet::NewSlot(id, root, wire) => {
177                                    self.switch.add_slot(&self.epoll, &self.hook, id, root, wire)?;
178                                }
179                                Packet::Close => {
180                                    return Ok(())
181                                }
182                            }
183                        }
184                    }
185                    _ => {
186                        let token = token.0;
187                        if let Some(slot) = self.switch.slots.get(token) {
188                            match slot.wire.recv() {
189                                Ok(message) => {
190                                    self.switch.recv_message(&self.epoll, &self.hook, token, message)?;
191                                }
192                                Err(err) => {
193                                    if !matches!(err, RecvError::Empty) {
194                                        self.switch.del_slot(&self.epoll, &self.hook, token)?;
195                                    }
196                                }
197                            }
198                        }
199                    }
200                }
201            }
202        }
203    }
204}