1use std::error::Error as StdError;
27use std::io::{Read as IoRead, Write as IoWrite};
28use std::net::{SocketAddr, TcpListener, TcpStream};
29use std::num::NonZeroU64;
30use std::time::Duration;
31use std::sync::atomic::{AtomicUsize, Ordering};
32use std::thread;
33use log::*;
34use memchr::memchr;
35use derive_new::new;
36use hashbrown::{HashMap, HashSet};
37use crossbeam_channel::{unbounded, Sender, Receiver, select, tick};
38use serde_json::{Value, json};
39use mlzutil::time::localtime;
40use parking_lot::{const_mutex, Mutex};
41
42use crate::config::ServerConfig;
43use crate::errors::Error;
44use crate::module::ModInternals;
45use crate::proto::{IncomingMsg, Msg, Msg::*, IDENT_REPLY};
46
47pub const RECVBUF_LEN: usize = 4096;
48pub const MAX_MSG_LEN: usize = 1024*1024;
49
50pub type HandlerId = NonZeroU64;
52
53#[derive(new)]
54pub struct Server {
55 config: ServerConfig,
56}
57
58pub type ConSender = Sender<(HandlerId, RepSender)>;
60pub type ConReceiver = Receiver<(HandlerId, RepSender)>;
61pub type ReqSender = Sender<(HandlerId, IncomingMsg)>;
62pub type ReqReceiver = Receiver<(HandlerId, IncomingMsg)>;
63pub type RepSender = Sender<Msg>;
64pub type RepReceiver = Receiver<Msg>;
65pub type ModRepSender = Sender<(Option<HandlerId>, Msg)>;
66pub type ModRepReceiver = Receiver<(Option<HandlerId>, Msg)>;
67
68pub static CON_SENDER: Mutex<Option<ConSender>> = const_mutex(None);
70
71pub static REQ_SENDER: Mutex<Option<ReqSender>> = const_mutex(None);
73
74static NEXT_HID: AtomicUsize = AtomicUsize::new(1);
75
76pub fn next_handler_id() -> HandlerId {
77 NonZeroU64::new(NEXT_HID.fetch_add(1, Ordering::SeqCst) as u64).expect("is nonzero")
78}
79
80impl Server {
81 fn tcp_listener(tcp_sock: TcpListener) {
83 mlzlog::set_thread_prefix("TCP: ");
84 info!("listener started");
85 let con_sender = CON_SENDER.lock().clone().expect("no server running?");
86 while let Ok((stream, addr)) = tcp_sock.accept() {
87 info!("[{}] new client connected", addr);
88 let new_req_sender = REQ_SENDER.lock().clone().expect("no server running?");
90 let (rep_sender, rep_receiver) = unbounded();
91 let disp_rep_sender = rep_sender.clone();
92 let hid = next_handler_id();
93 con_sender.send((hid, disp_rep_sender)).unwrap();
94 thread::spawn(move || Handler::new(hid, stream, addr,
95 new_req_sender, rep_sender, rep_receiver).handle());
96 }
97 }
98
99 pub fn start<F>(mut self, addr: &str, mod_runner: F) -> Result<(), Box<dyn StdError>>
102 where F: Fn(ModInternals) -> Result<(), Box<dyn StdError>>
103 {
104 let (con_sender, con_receiver) = unbounded();
107 *CON_SENDER.lock() = Some(con_sender);
108 let (req_sender, req_receiver) = unbounded();
110 *REQ_SENDER.lock() = Some(req_sender);
111 let (rep_sender, rep_receiver) = unbounded();
113
114 let mut active_sets = HashMap::new();
116 let mut mod_senders = HashMap::new();
117
118 for (name, modcfg) in self.config.modules.drain() {
119 let (mod_sender, mod_receiver) = unbounded();
121 let mod_rep_sender = rep_sender.clone();
123 let tickers = (tick(Duration::from_secs(1)), tick(Duration::from_secs(1)));
124 let int = ModInternals::new(name.clone(), modcfg, mod_receiver, mod_rep_sender, tickers);
125 active_sets.insert(name.clone(), HashSet::new());
126 mod_senders.insert(name, mod_sender);
127 mod_runner(int)?;
128 }
129
130 let descriptive = json!({
131 "description": self.config.description,
132 "equipment_id": self.config.equipment_id,
133 "firmware": concat!("secop-rs ", env!("CARGO_PKG_VERSION")),
134 "modules": {}
135 });
136
137 let dispatcher = Dispatcher {
139 descriptive: descriptive,
140 active: active_sets,
141 handlers: HashMap::new(),
142 modules: mod_senders,
143 connections: con_receiver,
144 requests: req_receiver,
145 replies: rep_receiver,
146 };
147 thread::spawn(move || dispatcher.run());
148
149 let tcp_sock = TcpListener::bind(addr)?;
151 thread::spawn(move || Server::tcp_listener(tcp_sock));
152 Ok(())
153 }
154}
155
156struct Dispatcher {
159 descriptive: Value,
160 handlers: HashMap<HandlerId, RepSender>,
161 active: HashMap<String, HashSet<HandlerId>>,
162 modules: HashMap<String, ReqSender>,
163 connections: ConReceiver,
164 requests: ReqReceiver,
165 replies: ModRepReceiver,
166}
167
168impl Dispatcher {
169 fn send_back(&self, hid: HandlerId, msg: Msg) {
170 if let Some(chan) = self.handlers.get(&hid) {
171 let _ = chan.send(msg);
172 }
173 }
174
175 fn run(mut self) {
176 mlzlog::set_thread_prefix("Dispatcher: ");
177
178 let mut global_activate_remaining = 0;
180
181 loop {
182 select! {
183 recv(self.connections) -> res => if let Ok((hid, conn)) = res {
184 debug!("got handler {}", hid);
185 self.handlers.insert(hid, conn);
186 },
187 recv(self.requests) -> res => if let Ok((hid, req)) = res {
188 debug!("got request {} -> {}", hid, req);
189 match req.1 {
190 Do { ref module, .. } |
191 Change { ref module, .. } |
192 Read { ref module, .. } => {
193 if let Some(chan) = self.modules.get(module) {
195 chan.send((hid, req)).unwrap();
196 } else {
197 self.send_back(hid, Error::no_module().into_msg(req.0));
198 }
199 }
200 Activate { ref module } => {
201 if !module.is_empty() {
207 if let Some(chan) = self.modules.get(module) {
209 chan.send((hid, req)).unwrap();
210 } else {
211 self.send_back(hid, Error::no_module().into_msg(req.0));
212 continue;
213 }
214 } else {
215 if global_activate_remaining > 0 {
217 self.send_back(hid, Error::protocol(
219 "already activating").into_msg(req.0));
220 continue;
221 }
222 for chan in self.modules.values() {
226 chan.send((hid, req.clone())).unwrap();
227 }
228 global_activate_remaining = self.modules.len();
229 }
230 }
231 Deactivate { module } => {
232 if !module.is_empty() {
234 if !self.modules.contains_key(&module) {
236 self.send_back(hid, Error::no_module().into_msg(req.0));
237 continue;
238 }
239 self.active.get_mut(&module).expect("always there").remove(&hid);
240 } else {
241 for module in self.modules.keys() {
243 self.active.get_mut(module).expect("always there").remove(&hid);
244 }
245 }
246 self.send_back(hid, Inactive { module });
247 }
248 Describe => {
249 self.send_back(hid, Describing {
250 id: ".".into(),
251 structure: self.descriptive.clone()
252 });
253 }
254 Quit => {
255 self.handlers.remove(&hid);
257 for set in self.active.values_mut() {
258 set.remove(&hid);
259 }
260 }
261 _ => warn!("message should not arrive here: {}", req.1),
262 }
263 },
264 recv(self.replies) -> res => if let Ok((hid, rep)) = res {
265 match hid {
266 None => match rep {
267 Describing { id, structure } => {
270 let obj = self.descriptive["modules"].as_object_mut().expect("object");
271 obj.insert(id, structure);
272 }
273 Update { ref module, .. } => {
275 debug!("got {}", rep);
276 for &hid in &self.active[module] {
277 self.send_back(hid, rep.clone());
278 }
279 }
280 _ => ()
281 },
282 Some(hid) => match rep {
284 InitUpdates { module, updates } => {
285 for msg in updates {
286 self.send_back(hid, msg);
287 }
288 if !module.is_empty() {
289 self.send_back(hid, Active { module: module.clone() });
290 self.active.get_mut(&module).expect("always there").insert(hid);
291 } else {
292 global_activate_remaining -= 1;
293 if global_activate_remaining == 0 {
294 self.send_back(hid, Active { module: "".into() });
295 for set in self.active.values_mut() {
296 set.insert(hid);
297 }
298 }
299 }
300 }
301 _ => {
302 debug!("got reply {} for {}", rep, hid);
303 self.send_back(hid, rep)
304 }
305 }
306 }
307 }
308 }
309 }
310 }
311}
312
313pub struct Handler {
319 client: TcpStream,
320 hid: HandlerId,
322 req_sender: ReqSender,
324 rep_sender: RepSender,
326}
327
328impl Handler {
329 pub fn new(hid: HandlerId, client: TcpStream, addr: SocketAddr, req_sender: ReqSender,
330 rep_sender: RepSender, rep_receiver: RepReceiver) -> Handler {
331 let send_client = client.try_clone().expect("could not clone socket");
333 let thread_name = addr.to_string();
334 thread::spawn(move || Handler::sender(&thread_name, send_client, rep_receiver));
335 mlzlog::set_thread_prefix(format!("[{}] ", addr));
336 Handler { hid, client, req_sender, rep_sender }
337 }
338
339 fn sender(name: &str, client: TcpStream, rep_receiver: RepReceiver) {
341 mlzlog::set_thread_prefix(format!("[{}] ", name));
342 let mut client = std::io::BufWriter::new(client);
343 for to_send in rep_receiver {
344 if let Err(err) = write!(client, "{}\n", to_send) {
345 warn!("write error in sender: {}", err);
346 break;
347 }
348 let _ = client.flush();
349 }
350 info!("sender quit");
351 }
352
353 fn send_back(&self, msg: Msg) {
355 self.rep_sender.send(msg).expect("sending to client failed");
356 }
357
358 fn handle_msg(&self, msg: IncomingMsg) {
360 match msg.1 {
361 Change { .. } | Do { .. } | Read { .. } | Describe |
363 Activate { .. } | Deactivate { .. } => {
364 self.req_sender.send((self.hid, msg)).unwrap();
365 }
366 Ping { token } => {
368 let data = json!([null, {"t": localtime()}]);
369 self.send_back(Pong { token, data });
370 }
371 Idn => {
372 self.send_back(IdnReply { encoded: IDENT_REPLY.into() });
373 }
374 _ => {
375 warn!("message {:?} not handled yet", msg.1);
376 }
377 }
378 }
379
380 fn process(&self, line: String) {
382 match Msg::parse(line) {
383 Ok(msg) => {
384 debug!("processing {}", msg);
385 self.handle_msg(msg);
386 }
387 Err(msg) => {
388 warn!("failed to parse line: {}", msg);
390 self.send_back(msg);
391 }
392 }
393 }
394
395 pub fn handle(mut self) {
397 let mut buf = Vec::with_capacity(RECVBUF_LEN);
398 let mut recvbuf = [0u8; RECVBUF_LEN];
399
400 loop {
401 let got = match self.client.read(&mut recvbuf) {
403 Err(err) => {
404 warn!("error in recv, closing connection: {}", err);
405 break;
406 },
407 Ok(0) => break, Ok(got) => got,
409 };
410 buf.extend_from_slice(&recvbuf[..got]);
412 let mut from = 0;
414 while let Some(to) = memchr(b'\n', &buf[from..]) {
415 let line_str = String::from_utf8_lossy(&buf[from..from+to]);
416 let line_str = line_str.trim_end_matches('\r');
417 self.process(line_str.to_owned());
418 from += to + 1;
419 }
420 buf.drain(..from);
421 if buf.len() > MAX_MSG_LEN {
423 warn!("hit request length limit, closing connection");
424 break;
425 }
426 }
427 self.req_sender.send((self.hid, IncomingMsg::bare(Quit))).unwrap();
428 info!("handler is finished");
429 }
430}