1use std::time::Duration;
2use std::thread;
3use std::sync::{
4 Arc,
5 atomic::{AtomicBool, Ordering}
6};
7
8use std::io::ErrorKind::Interrupted;
9
10use queen_io::{
11 epoll::{Epoll, Events, Token, Ready, EpollOpt},
12 queue::mpsc::Queue
13};
14
15use nson::{
16 Message,
17 message_id::MessageId
18};
19
20use crate::Wire;
21use crate::error::{Result, Error, RecvError, Code};
22
23pub use hook::{Hook, NonHook};
24pub use switch::Switch;
25pub use slot::Slot;
26
27mod hook;
28mod switch;
29mod slot;
30
31#[derive(Clone)]
32pub struct Socket {
33 inner: Arc<Inner>
34}
35
36struct Inner {
37 id: MessageId,
38 queue: Queue<Packet>,
39 run: AtomicBool
40}
41
42impl Socket {
43 pub fn new(id: MessageId, hook: impl Hook) -> Result<Self> {
44 let queue = Queue::new()?;
45
46 let socket = Socket {
47 inner: Arc::new(Inner {
48 id,
49 queue: queue.clone(),
50 run: AtomicBool::new(true)
51 })
52 };
53
54 let mut main_loop = MainLoop::new(
55 id,
56 queue,
57 hook,
58 )?;
59
60 let socket2 = socket.clone();
61 thread::Builder::new().name("socket".to_string()).spawn(move || {
62 let ret = main_loop.run();
63 if ret.is_err() {
64 log::error!("socket loop exit: {:?}", ret);
65 } else {
66 log::trace!("socket loop exit");
67 }
68
69 socket2.inner.run.store(false, Ordering::Relaxed);
70
71 main_loop.hook.stop(&main_loop.switch);
72 }).unwrap();
73
74 Ok(socket)
75 }
76
77 pub fn id(&self) -> &MessageId {
78 &self.inner.id
79 }
80
81 pub fn stop(&self) {
82 self.inner.run.store(false, Ordering::Relaxed);
83 self.inner.queue.push(Packet::Close);
84 }
85
86 pub fn running(&self) -> bool {
87 self.inner.run.load(Ordering::Relaxed)
88 }
89
90 pub fn connect(
91 &self,
92 slot_id: MessageId,
93 root: bool,
94 attr: Message,
95 capacity: Option<usize>,
96 timeout: Option<Duration>
97 ) -> Result<Wire<Message>> {
98 let (wire1, wire2) = Wire::pipe(capacity.unwrap_or(64), attr)?;
99
100 let packet = Packet::NewSlot(slot_id, root, wire1);
101
102 self.inner.queue.push(packet);
103
104 let ret = wire2.wait(Some(timeout.unwrap_or_else(|| Duration::from_secs(10))))?;
105
106 if let Some(code) = Code::get(&ret) {
107 if code != Code::Ok {
108 return Err(Error::ErrorCode(code))
109 }
110 } else {
111 unreachable!()
112 }
113
114 Ok(wire2)
115 }
116}
117
118impl Drop for Socket {
119 fn drop(&mut self) {
120 if Arc::strong_count(&self.inner) <= 2 {
121 self.stop()
122 }
123 }
124}
125
126struct MainLoop<H> {
127 epoll: Epoll,
128 events: Events,
129 queue: Queue<Packet>,
130 hook: H,
131 switch: Switch
132}
133
134enum Packet {
135 NewSlot(MessageId, bool, Wire<Message>),
136 Close
137}
138
139impl<H: Hook> MainLoop<H> {
140 const QUEUE_TOKEN: Token = Token(usize::max_value());
141
142 fn new(socket_id: MessageId, queue: Queue<Packet>, hook: H) -> Result<MainLoop<H>> {
143 Ok(MainLoop {
144 epoll: Epoll::new()?,
145 events: Events::with_capacity(1024),
146 queue,
147 hook,
148 switch: Switch::new(socket_id)
149 })
150 }
151
152 fn run(&mut self) -> Result<()> {
153 self.epoll.add(&self.queue, Self::QUEUE_TOKEN, Ready::readable(), EpollOpt::level())?;
154
155 loop {
156 let size = match self.epoll.wait(&mut self.events, None) {
157 Ok(size) => size,
158 Err(err) => {
159 if err.kind() == Interrupted {
160 continue;
161 } else {
162 return Err(err.into())
163 }
164 }
165 };
166
167 for i in 0..size {
168 let event = self.events.get(i).unwrap();
169
170 let token = event.token();
171
172 match token {
173 Self::QUEUE_TOKEN => {
174 if let Some(packet) = self.queue.pop() {
175 match packet {
176 Packet::NewSlot(id, root, wire) => {
177 self.switch.add_slot(&self.epoll, &self.hook, id, root, wire)?;
178 }
179 Packet::Close => {
180 return Ok(())
181 }
182 }
183 }
184 }
185 _ => {
186 let token = token.0;
187 if let Some(slot) = self.switch.slots.get(token) {
188 match slot.wire.recv() {
189 Ok(message) => {
190 self.switch.recv_message(&self.epoll, &self.hook, token, message)?;
191 }
192 Err(err) => {
193 if !matches!(err, RecvError::Empty) {
194 self.switch.del_slot(&self.epoll, &self.hook, token)?;
195 }
196 }
197 }
198 }
199 }
200 }
201 }
202 }
203 }
204}