secop_core/
server.rs

1// -----------------------------------------------------------------------------
2// Rust SECoP playground
3//
4// This program is free software; you can redistribute it and/or modify it under
5// the terms of the GNU General Public License as published by the Free Software
6// Foundation; either version 2 of the License, or (at your option) any later
7// version.
8//
9// This program is distributed in the hope that it will be useful, but WITHOUT
10// ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
11// FOR A PARTICULAR PURPOSE.  See the GNU General Public License for more
12// details.
13//
14// You should have received a copy of the GNU General Public License along with
15// this program; if not, write to the Free Software Foundation, Inc.,
16// 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
17//
18// Module authors:
19//   Georg Brandl <g.brandl@fz-juelich.de>
20//
21// -----------------------------------------------------------------------------
22//
23//! This module contains the server instance itself, and associated objects to
24//! handle connections and message routing.
25
26use std::error::Error as StdError;
27use std::io::{Read as IoRead, Write as IoWrite};
28use std::net::{SocketAddr, TcpListener, TcpStream};
29use std::num::NonZeroU64;
30use std::time::Duration;
31use std::sync::atomic::{AtomicUsize, Ordering};
32use std::thread;
33use log::*;
34use memchr::memchr;
35use derive_new::new;
36use hashbrown::{HashMap, HashSet};
37use crossbeam_channel::{unbounded, Sender, Receiver, select, tick};
38use serde_json::{Value, json};
39use mlzutil::time::localtime;
40use parking_lot::{const_mutex, Mutex};
41
42use crate::config::ServerConfig;
43use crate::errors::Error;
44use crate::module::ModInternals;
45use crate::proto::{IncomingMsg, Msg, Msg::*, IDENT_REPLY};
46
47pub const RECVBUF_LEN: usize = 4096;
48pub const MAX_MSG_LEN: usize = 1024*1024;
49
50/// Handler ID.  This is nonzero so that Option<HandlerId> is the same size.
51pub type HandlerId = NonZeroU64;
52
53#[derive(new)]
54pub struct Server {
55    config: ServerConfig,
56}
57
58// Aliases for all the common channel types.
59pub type ConSender = Sender<(HandlerId, RepSender)>;
60pub type ConReceiver = Receiver<(HandlerId, RepSender)>;
61pub type ReqSender = Sender<(HandlerId, IncomingMsg)>;
62pub type ReqReceiver = Receiver<(HandlerId, IncomingMsg)>;
63pub type RepSender = Sender<Msg>;
64pub type RepReceiver = Receiver<Msg>;
65pub type ModRepSender = Sender<(Option<HandlerId>, Msg)>;
66pub type ModRepReceiver = Receiver<(Option<HandlerId>, Msg)>;
67
68/// Global sender for new connections to the server.
69pub static CON_SENDER: Mutex<Option<ConSender>> = const_mutex(None);
70
71/// Global sender for new requests to the dispatcher.
72pub static REQ_SENDER: Mutex<Option<ReqSender>> = const_mutex(None);
73
74static NEXT_HID: AtomicUsize = AtomicUsize::new(1);
75
76pub fn next_handler_id() -> HandlerId {
77    NonZeroU64::new(NEXT_HID.fetch_add(1, Ordering::SeqCst) as u64).expect("is nonzero")
78}
79
80impl Server {
81    /// Listen for connections on the TCP socket and spawn handlers for it.
82    fn tcp_listener(tcp_sock: TcpListener) {
83        mlzlog::set_thread_prefix("TCP: ");
84        info!("listener started");
85        let con_sender = CON_SENDER.lock().clone().expect("no server running?");
86        while let Ok((stream, addr)) = tcp_sock.accept() {
87            info!("[{}] new client connected", addr);
88            // create the handler and start its main thread
89            let new_req_sender = REQ_SENDER.lock().clone().expect("no server running?");
90            let (rep_sender, rep_receiver) = unbounded();
91            let disp_rep_sender = rep_sender.clone();
92            let hid = next_handler_id();
93            con_sender.send((hid, disp_rep_sender)).unwrap();
94            thread::spawn(move || Handler::new(hid, stream, addr,
95                                               new_req_sender, rep_sender, rep_receiver).handle());
96        }
97    }
98
99    /// Main server function; start threads to accept clients on the listening
100    /// socket, the dispatcher, and the individual modules.
101    pub fn start<F>(mut self, addr: &str, mod_runner: F) -> Result<(), Box<dyn StdError>>
102        where F: Fn(ModInternals) -> Result<(), Box<dyn StdError>>
103    {
104        // create a few channels we need for the dispatcher:
105        // sending info about incoming connections to the dispatcher
106        let (con_sender, con_receiver) = unbounded();
107        *CON_SENDER.lock() = Some(con_sender);
108        // sending requests from all handlers to the dispatcher
109        let (req_sender, req_receiver) = unbounded();
110        *REQ_SENDER.lock() = Some(req_sender);
111        // sending replies from all modules to the dispatcher
112        let (rep_sender, rep_receiver) = unbounded();
113
114        // create the modules
115        let mut active_sets = HashMap::new();
116        let mut mod_senders = HashMap::new();
117
118        for (name, modcfg) in self.config.modules.drain() {
119            // channel to send requests to the module
120            let (mod_sender, mod_receiver) = unbounded();
121            // replies go via a single one
122            let mod_rep_sender = rep_sender.clone();
123            let tickers = (tick(Duration::from_secs(1)), tick(Duration::from_secs(1)));
124            let int = ModInternals::new(name.clone(), modcfg, mod_receiver, mod_rep_sender, tickers);
125            active_sets.insert(name.clone(), HashSet::new());
126            mod_senders.insert(name, mod_sender);
127            mod_runner(int)?;
128        }
129
130        let descriptive = json!({
131            "description": self.config.description,
132            "equipment_id": self.config.equipment_id,
133            "firmware": concat!("secop-rs ", env!("CARGO_PKG_VERSION")),
134            "modules": {}
135        });
136
137        // create the dispatcher
138        let dispatcher = Dispatcher {
139            descriptive: descriptive,
140            active: active_sets,
141            handlers: HashMap::new(),
142            modules: mod_senders,
143            connections: con_receiver,
144            requests: req_receiver,
145            replies: rep_receiver,
146        };
147        thread::spawn(move || dispatcher.run());
148
149        // create the TCP socket and start its handler thread
150        let tcp_sock = TcpListener::bind(addr)?;
151        thread::spawn(move || Server::tcp_listener(tcp_sock));
152        Ok(())
153    }
154}
155
156/// The dispatcher acts as a central piece connected to both modules and clients,
157/// all via channels.
158struct Dispatcher {
159    descriptive: Value,
160    handlers: HashMap<HandlerId, RepSender>,
161    active: HashMap<String, HashSet<HandlerId>>,
162    modules: HashMap<String, ReqSender>,
163    connections: ConReceiver,
164    requests: ReqReceiver,
165    replies: ModRepReceiver,
166}
167
168impl Dispatcher {
169    fn send_back(&self, hid: HandlerId, msg: Msg) {
170        if let Some(chan) = self.handlers.get(&hid) {
171            let _ = chan.send(msg);
172        }
173    }
174
175    fn run(mut self) {
176        mlzlog::set_thread_prefix("Dispatcher: ");
177
178        // > 0 if a global activation is currently being processed.
179        let mut global_activate_remaining = 0;
180
181        loop {
182            select! {
183                recv(self.connections) -> res => if let Ok((hid, conn)) = res {
184                    debug!("got handler {}", hid);
185                    self.handlers.insert(hid, conn);
186                },
187                recv(self.requests) -> res => if let Ok((hid, req)) = res {
188                    debug!("got request {} -> {}", hid, req);
189                    match req.1 {
190                        Do { ref module, .. } |
191                        Change { ref module, .. } |
192                        Read { ref module, .. } => {
193                            // check if module exists
194                            if let Some(chan) = self.modules.get(module) {
195                                chan.send((hid, req)).unwrap();
196                            } else {
197                                self.send_back(hid, Error::no_module().into_msg(req.0));
198                            }
199                        }
200                        Activate { ref module } => {
201                            // The activate message requires an "update" of all parameters
202                            // to be sent before "active".  Other events should not be sent.
203                            // To do this, we send this on to the module / all modules.
204                            //
205                            // When all replies arrived, we trigger the Active message.
206                            if !module.is_empty() {
207                                // check if module exists, send message on to it
208                                if let Some(chan) = self.modules.get(module) {
209                                    chan.send((hid, req)).unwrap();
210                                } else {
211                                    self.send_back(hid, Error::no_module().into_msg(req.0));
212                                    continue;
213                                }
214                            } else {
215                                // this is a global activation
216                                if global_activate_remaining > 0 {
217                                    // only one can be inflight
218                                    self.send_back(hid, Error::protocol(
219                                        "already activating").into_msg(req.0));
220                                    continue;
221                                }
222                                // send this on to all modules - the "module" entry
223                                // (which is empty here) will be replicated in the
224                                // responding InitUpdates message
225                                for chan in self.modules.values() {
226                                    chan.send((hid, req.clone())).unwrap();
227                                }
228                                global_activate_remaining = self.modules.len();
229                            }
230                        }
231                        Deactivate { module } => {
232                            // Deactivation is done instantly, much easier than activation.
233                            if !module.is_empty() {
234                                // check if module exists
235                                if !self.modules.contains_key(&module) {
236                                    self.send_back(hid, Error::no_module().into_msg(req.0));
237                                    continue;
238                                }
239                                self.active.get_mut(&module).expect("always there").remove(&hid);
240                            } else {
241                                // remove handler as active from all modules
242                                for module in self.modules.keys() {
243                                    self.active.get_mut(module).expect("always there").remove(&hid);
244                                }
245                            }
246                            self.send_back(hid, Inactive { module });
247                        }
248                        Describe => {
249                            self.send_back(hid, Describing {
250                                id: ".".into(),
251                                structure: self.descriptive.clone()
252                            });
253                        }
254                        Quit => {
255                            // the handler has quit - also remove it from all active lists
256                            self.handlers.remove(&hid);
257                            for set in self.active.values_mut() {
258                                set.remove(&hid);
259                            }
260                        }
261                        _ => warn!("message should not arrive here: {}", req.1),
262                    }
263                },
264                recv(self.replies) -> res => if let Ok((hid, rep)) = res {
265                    match hid {
266                        None => match rep {
267                            // update of descriptive data, isn't sent on to clients
268                            // but cached here
269                            Describing { id, structure } => {
270                                let obj = self.descriptive["modules"].as_object_mut().expect("object");
271                                obj.insert(id, structure);
272                            }
273                            // event update from a module, check where to send it
274                            Update { ref module, .. } => {
275                                debug!("got {}", rep);
276                                for &hid in &self.active[module] {
277                                    self.send_back(hid, rep.clone());
278                                }
279                            }
280                            _ => ()
281                        },
282                        // specific reply from a module
283                        Some(hid) => match rep {
284                            InitUpdates { module, updates } => {
285                                for msg in updates {
286                                    self.send_back(hid, msg);
287                                }
288                                if !module.is_empty() {
289                                    self.send_back(hid, Active { module: module.clone() });
290                                    self.active.get_mut(&module).expect("always there").insert(hid);
291                                } else {
292                                    global_activate_remaining -= 1;
293                                    if global_activate_remaining == 0 {
294                                        self.send_back(hid, Active { module: "".into() });
295                                        for set in self.active.values_mut() {
296                                            set.insert(hid);
297                                        }
298                                    }
299                                }
300                            }
301                            _ => {
302                                debug!("got reply {} for {}", rep, hid);
303                                self.send_back(hid, rep)
304                            }
305                        }
306                    }
307                }
308            }
309        }
310    }
311}
312
313/// The Handler represents a single client connection, both the read and
314/// write halves.
315///
316/// The write half is in its own thread to be able to send back replies (which
317/// can come both from the Handler and the Dispatcher) instantly.
318pub struct Handler {
319    client: TcpStream,
320    /// Assigned handler ID.
321    hid: HandlerId,
322    /// Sender for incoming requests, to the dispatcher.
323    req_sender: ReqSender,
324    /// Sender for outgoing replies, to the sender thread.
325    rep_sender: RepSender,
326}
327
328impl Handler {
329    pub fn new(hid: HandlerId, client: TcpStream, addr: SocketAddr, req_sender: ReqSender,
330               rep_sender: RepSender, rep_receiver: RepReceiver) -> Handler {
331        // spawn a thread that handles sending replies and events back
332        let send_client = client.try_clone().expect("could not clone socket");
333        let thread_name = addr.to_string();
334        thread::spawn(move || Handler::sender(&thread_name, send_client, rep_receiver));
335        mlzlog::set_thread_prefix(format!("[{}] ", addr));
336        Handler { hid, client, req_sender, rep_sender }
337    }
338
339    /// Thread that sends back replies and events to the client.
340    fn sender(name: &str, client: TcpStream, rep_receiver: RepReceiver) {
341        mlzlog::set_thread_prefix(format!("[{}] ", name));
342        let mut client = std::io::BufWriter::new(client);
343        for to_send in rep_receiver {
344            if let Err(err) = write!(client, "{}\n", to_send) {
345                warn!("write error in sender: {}", err);
346                break;
347            }
348            let _ = client.flush();
349        }
350        info!("sender quit");
351    }
352
353    /// Send a message back to the client.
354    fn send_back(&self, msg: Msg) {
355        self.rep_sender.send(msg).expect("sending to client failed");
356    }
357
358    /// Handle an incoming correctly-parsed message.
359    fn handle_msg(&self, msg: IncomingMsg) {
360        match msg.1 {
361            // most messages must go through the dispatcher to a module
362            Change { .. } | Do { .. } | Read { .. } | Describe |
363            Activate { .. } | Deactivate { .. } => {
364                self.req_sender.send((self.hid, msg)).unwrap();
365            }
366            // but a few of them we can respond to from here
367            Ping { token } => {
368                let data = json!([null, {"t": localtime()}]);
369                self.send_back(Pong { token, data });
370            }
371            Idn => {
372                self.send_back(IdnReply { encoded: IDENT_REPLY.into() });
373            }
374            _ => {
375                warn!("message {:?} not handled yet", msg.1);
376            }
377        }
378    }
379
380    /// Process a single line (message).
381    fn process(&self, line: String) {
382        match Msg::parse(line) {
383            Ok(msg) => {
384                debug!("processing {}", msg);
385                self.handle_msg(msg);
386            }
387            Err(msg) => {
388                // error while parsing: msg will be an ErrorRep
389                warn!("failed to parse line: {}", msg);
390                self.send_back(msg);
391            }
392        }
393    }
394
395    /// Handle incoming stream of messages.
396    pub fn handle(mut self) {
397        let mut buf = Vec::with_capacity(RECVBUF_LEN);
398        let mut recvbuf = [0u8; RECVBUF_LEN];
399
400        loop {
401            // read a chunk of incoming data
402            let got = match self.client.read(&mut recvbuf) {
403                Err(err) => {
404                    warn!("error in recv, closing connection: {}", err);
405                    break;
406                },
407                Ok(0)    => break,  // no data from blocking read...
408                Ok(got)  => got,
409            };
410            // convert to string and add to our buffer
411            buf.extend_from_slice(&recvbuf[..got]);
412            // process all whole lines we got
413            let mut from = 0;
414            while let Some(to) = memchr(b'\n', &buf[from..]) {
415                let line_str = String::from_utf8_lossy(&buf[from..from+to]);
416                let line_str = line_str.trim_end_matches('\r');
417                self.process(line_str.to_owned());
418                from += to + 1;
419            }
420            buf.drain(..from);
421            // limit the incoming request length
422            if buf.len() > MAX_MSG_LEN {
423                warn!("hit request length limit, closing connection");
424                break;
425            }
426        }
427        self.req_sender.send((self.hid, IncomingMsg::bare(Quit))).unwrap();
428        info!("handler is finished");
429    }
430}