secop-core 0.1.3

A Rust framework for a hardware server speaking the SECoP protocol (core library)
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
// -----------------------------------------------------------------------------
// Rust SECoP playground
//
// This program is free software; you can redistribute it and/or modify it under
// the terms of the GNU General Public License as published by the Free Software
// Foundation; either version 2 of the License, or (at your option) any later
// version.
//
// This program is distributed in the hope that it will be useful, but WITHOUT
// ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
// FOR A PARTICULAR PURPOSE.  See the GNU General Public License for more
// details.
//
// You should have received a copy of the GNU General Public License along with
// this program; if not, write to the Free Software Foundation, Inc.,
// 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
//
// Module authors:
//   Georg Brandl <g.brandl@fz-juelich.de>
//
// -----------------------------------------------------------------------------
//
//! This module contains the server instance itself, and associated objects to
//! handle connections and message routing.

use std::error::Error as StdError;
use std::io::{Read as IoRead, Write as IoWrite};
use std::net::{SocketAddr, TcpListener, TcpStream};
use std::num::NonZeroU64;
use std::time::Duration;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::thread;
use log::*;
use memchr::memchr;
use derive_new::new;
use hashbrown::{HashMap, HashSet};
use crossbeam_channel::{unbounded, Sender, Receiver, select, tick};
use serde_json::{Value, json};
use mlzutil::time::localtime;
use parking_lot::{const_mutex, Mutex};

use crate::config::ServerConfig;
use crate::errors::Error;
use crate::module::ModInternals;
use crate::proto::{IncomingMsg, Msg, Msg::*, IDENT_REPLY};

pub const RECVBUF_LEN: usize = 4096;
pub const MAX_MSG_LEN: usize = 1024*1024;

/// Handler ID.  This is nonzero so that Option<HandlerId> is the same size.
pub type HandlerId = NonZeroU64;

#[derive(new)]
pub struct Server {
    config: ServerConfig,
}

// Aliases for all the common channel types.
pub type ConSender = Sender<(HandlerId, RepSender)>;
pub type ConReceiver = Receiver<(HandlerId, RepSender)>;
pub type ReqSender = Sender<(HandlerId, IncomingMsg)>;
pub type ReqReceiver = Receiver<(HandlerId, IncomingMsg)>;
pub type RepSender = Sender<Msg>;
pub type RepReceiver = Receiver<Msg>;
pub type ModRepSender = Sender<(Option<HandlerId>, Msg)>;
pub type ModRepReceiver = Receiver<(Option<HandlerId>, Msg)>;

/// Global sender for new connections to the server.
pub static CON_SENDER: Mutex<Option<ConSender>> = const_mutex(None);

/// Global sender for new requests to the dispatcher.
pub static REQ_SENDER: Mutex<Option<ReqSender>> = const_mutex(None);

static NEXT_HID: AtomicUsize = AtomicUsize::new(1);

pub fn next_handler_id() -> HandlerId {
    NonZeroU64::new(NEXT_HID.fetch_add(1, Ordering::SeqCst) as u64).expect("is nonzero")
}

impl Server {
    /// Listen for connections on the TCP socket and spawn handlers for it.
    fn tcp_listener(tcp_sock: TcpListener) {
        mlzlog::set_thread_prefix("TCP: ");
        info!("listener started");
        let con_sender = CON_SENDER.lock().clone().expect("no server running?");
        while let Ok((stream, addr)) = tcp_sock.accept() {
            info!("[{}] new client connected", addr);
            // create the handler and start its main thread
            let new_req_sender = REQ_SENDER.lock().clone().expect("no server running?");
            let (rep_sender, rep_receiver) = unbounded();
            let disp_rep_sender = rep_sender.clone();
            let hid = next_handler_id();
            con_sender.send((hid, disp_rep_sender)).unwrap();
            thread::spawn(move || Handler::new(hid, stream, addr,
                                               new_req_sender, rep_sender, rep_receiver).handle());
        }
    }

    /// Main server function; start threads to accept clients on the listening
    /// socket, the dispatcher, and the individual modules.
    pub fn start<F>(mut self, addr: &str, mod_runner: F) -> Result<(), Box<dyn StdError>>
        where F: Fn(ModInternals) -> Result<(), Box<dyn StdError>>
    {
        // create a few channels we need for the dispatcher:
        // sending info about incoming connections to the dispatcher
        let (con_sender, con_receiver) = unbounded();
        *CON_SENDER.lock() = Some(con_sender);
        // sending requests from all handlers to the dispatcher
        let (req_sender, req_receiver) = unbounded();
        *REQ_SENDER.lock() = Some(req_sender);
        // sending replies from all modules to the dispatcher
        let (rep_sender, rep_receiver) = unbounded();

        // create the modules
        let mut active_sets = HashMap::new();
        let mut mod_senders = HashMap::new();

        for (name, modcfg) in self.config.modules.drain() {
            // channel to send requests to the module
            let (mod_sender, mod_receiver) = unbounded();
            // replies go via a single one
            let mod_rep_sender = rep_sender.clone();
            let tickers = (tick(Duration::from_secs(1)), tick(Duration::from_secs(1)));
            let int = ModInternals::new(name.clone(), modcfg, mod_receiver, mod_rep_sender, tickers);
            active_sets.insert(name.clone(), HashSet::new());
            mod_senders.insert(name, mod_sender);
            mod_runner(int)?;
        }

        let descriptive = json!({
            "description": self.config.description,
            "equipment_id": self.config.equipment_id,
            "firmware": concat!("secop-rs ", env!("CARGO_PKG_VERSION")),
            "modules": {}
        });

        // create the dispatcher
        let dispatcher = Dispatcher {
            descriptive: descriptive,
            active: active_sets,
            handlers: HashMap::new(),
            modules: mod_senders,
            connections: con_receiver,
            requests: req_receiver,
            replies: rep_receiver,
        };
        thread::spawn(move || dispatcher.run());

        // create the TCP socket and start its handler thread
        let tcp_sock = TcpListener::bind(addr)?;
        thread::spawn(move || Server::tcp_listener(tcp_sock));
        Ok(())
    }
}

/// The dispatcher acts as a central piece connected to both modules and clients,
/// all via channels.
struct Dispatcher {
    descriptive: Value,
    handlers: HashMap<HandlerId, RepSender>,
    active: HashMap<String, HashSet<HandlerId>>,
    modules: HashMap<String, ReqSender>,
    connections: ConReceiver,
    requests: ReqReceiver,
    replies: ModRepReceiver,
}

impl Dispatcher {
    fn send_back(&self, hid: HandlerId, msg: Msg) {
        if let Some(chan) = self.handlers.get(&hid) {
            let _ = chan.send(msg);
        }
    }

    fn run(mut self) {
        mlzlog::set_thread_prefix("Dispatcher: ");

        // > 0 if a global activation is currently being processed.
        let mut global_activate_remaining = 0;

        loop {
            select! {
                recv(self.connections) -> res => if let Ok((hid, conn)) = res {
                    debug!("got handler {}", hid);
                    self.handlers.insert(hid, conn);
                },
                recv(self.requests) -> res => if let Ok((hid, req)) = res {
                    debug!("got request {} -> {}", hid, req);
                    match req.1 {
                        Do { ref module, .. } |
                        Change { ref module, .. } |
                        Read { ref module, .. } => {
                            // check if module exists
                            if let Some(chan) = self.modules.get(module) {
                                chan.send((hid, req)).unwrap();
                            } else {
                                self.send_back(hid, Error::no_module().into_msg(req.0));
                            }
                        }
                        Activate { ref module } => {
                            // The activate message requires an "update" of all parameters
                            // to be sent before "active".  Other events should not be sent.
                            // To do this, we send this on to the module / all modules.
                            //
                            // When all replies arrived, we trigger the Active message.
                            if !module.is_empty() {
                                // check if module exists, send message on to it
                                if let Some(chan) = self.modules.get(module) {
                                    chan.send((hid, req)).unwrap();
                                } else {
                                    self.send_back(hid, Error::no_module().into_msg(req.0));
                                    continue;
                                }
                            } else {
                                // this is a global activation
                                if global_activate_remaining > 0 {
                                    // only one can be inflight
                                    self.send_back(hid, Error::protocol(
                                        "already activating").into_msg(req.0));
                                    continue;
                                }
                                // send this on to all modules - the "module" entry
                                // (which is empty here) will be replicated in the
                                // responding InitUpdates message
                                for chan in self.modules.values() {
                                    chan.send((hid, req.clone())).unwrap();
                                }
                                global_activate_remaining = self.modules.len();
                            }
                        }
                        Deactivate { module } => {
                            // Deactivation is done instantly, much easier than activation.
                            if !module.is_empty() {
                                // check if module exists
                                if !self.modules.contains_key(&module) {
                                    self.send_back(hid, Error::no_module().into_msg(req.0));
                                    continue;
                                }
                                self.active.get_mut(&module).expect("always there").remove(&hid);
                            } else {
                                // remove handler as active from all modules
                                for module in self.modules.keys() {
                                    self.active.get_mut(module).expect("always there").remove(&hid);
                                }
                            }
                            self.send_back(hid, Inactive { module });
                        }
                        Describe => {
                            self.send_back(hid, Describing {
                                id: ".".into(),
                                structure: self.descriptive.clone()
                            });
                        }
                        Quit => {
                            // the handler has quit - also remove it from all active lists
                            self.handlers.remove(&hid);
                            for set in self.active.values_mut() {
                                set.remove(&hid);
                            }
                        }
                        _ => warn!("message should not arrive here: {}", req.1),
                    }
                },
                recv(self.replies) -> res => if let Ok((hid, rep)) = res {
                    match hid {
                        None => match rep {
                            // update of descriptive data, isn't sent on to clients
                            // but cached here
                            Describing { id, structure } => {
                                let obj = self.descriptive["modules"].as_object_mut().expect("object");
                                obj.insert(id, structure);
                            }
                            // event update from a module, check where to send it
                            Update { ref module, .. } => {
                                debug!("got {}", rep);
                                for &hid in &self.active[module] {
                                    self.send_back(hid, rep.clone());
                                }
                            }
                            _ => ()
                        },
                        // specific reply from a module
                        Some(hid) => match rep {
                            InitUpdates { module, updates } => {
                                for msg in updates {
                                    self.send_back(hid, msg);
                                }
                                if !module.is_empty() {
                                    self.send_back(hid, Active { module: module.clone() });
                                    self.active.get_mut(&module).expect("always there").insert(hid);
                                } else {
                                    global_activate_remaining -= 1;
                                    if global_activate_remaining == 0 {
                                        self.send_back(hid, Active { module: "".into() });
                                        for set in self.active.values_mut() {
                                            set.insert(hid);
                                        }
                                    }
                                }
                            }
                            _ => {
                                debug!("got reply {} for {}", rep, hid);
                                self.send_back(hid, rep)
                            }
                        }
                    }
                }
            }
        }
    }
}

/// The Handler represents a single client connection, both the read and
/// write halves.
///
/// The write half is in its own thread to be able to send back replies (which
/// can come both from the Handler and the Dispatcher) instantly.
pub struct Handler {
    client: TcpStream,
    /// Assigned handler ID.
    hid: HandlerId,
    /// Sender for incoming requests, to the dispatcher.
    req_sender: ReqSender,
    /// Sender for outgoing replies, to the sender thread.
    rep_sender: RepSender,
}

impl Handler {
    pub fn new(hid: HandlerId, client: TcpStream, addr: SocketAddr, req_sender: ReqSender,
               rep_sender: RepSender, rep_receiver: RepReceiver) -> Handler {
        // spawn a thread that handles sending replies and events back
        let send_client = client.try_clone().expect("could not clone socket");
        let thread_name = addr.to_string();
        thread::spawn(move || Handler::sender(&thread_name, send_client, rep_receiver));
        mlzlog::set_thread_prefix(format!("[{}] ", addr));
        Handler { hid, client, req_sender, rep_sender }
    }

    /// Thread that sends back replies and events to the client.
    fn sender(name: &str, client: TcpStream, rep_receiver: RepReceiver) {
        mlzlog::set_thread_prefix(format!("[{}] ", name));
        let mut client = std::io::BufWriter::new(client);
        for to_send in rep_receiver {
            if let Err(err) = write!(client, "{}\n", to_send) {
                warn!("write error in sender: {}", err);
                break;
            }
            let _ = client.flush();
        }
        info!("sender quit");
    }

    /// Send a message back to the client.
    fn send_back(&self, msg: Msg) {
        self.rep_sender.send(msg).expect("sending to client failed");
    }

    /// Handle an incoming correctly-parsed message.
    fn handle_msg(&self, msg: IncomingMsg) {
        match msg.1 {
            // most messages must go through the dispatcher to a module
            Change { .. } | Do { .. } | Read { .. } | Describe |
            Activate { .. } | Deactivate { .. } => {
                self.req_sender.send((self.hid, msg)).unwrap();
            }
            // but a few of them we can respond to from here
            Ping { token } => {
                let data = json!([null, {"t": localtime()}]);
                self.send_back(Pong { token, data });
            }
            Idn => {
                self.send_back(IdnReply { encoded: IDENT_REPLY.into() });
            }
            _ => {
                warn!("message {:?} not handled yet", msg.1);
            }
        }
    }

    /// Process a single line (message).
    fn process(&self, line: String) {
        match Msg::parse(line) {
            Ok(msg) => {
                debug!("processing {}", msg);
                self.handle_msg(msg);
            }
            Err(msg) => {
                // error while parsing: msg will be an ErrorRep
                warn!("failed to parse line: {}", msg);
                self.send_back(msg);
            }
        }
    }

    /// Handle incoming stream of messages.
    pub fn handle(mut self) {
        let mut buf = Vec::with_capacity(RECVBUF_LEN);
        let mut recvbuf = [0u8; RECVBUF_LEN];

        loop {
            // read a chunk of incoming data
            let got = match self.client.read(&mut recvbuf) {
                Err(err) => {
                    warn!("error in recv, closing connection: {}", err);
                    break;
                },
                Ok(0)    => break,  // no data from blocking read...
                Ok(got)  => got,
            };
            // convert to string and add to our buffer
            buf.extend_from_slice(&recvbuf[..got]);
            // process all whole lines we got
            let mut from = 0;
            while let Some(to) = memchr(b'\n', &buf[from..]) {
                let line_str = String::from_utf8_lossy(&buf[from..from+to]);
                let line_str = line_str.trim_end_matches('\r');
                self.process(line_str.to_owned());
                from += to + 1;
            }
            buf.drain(..from);
            // limit the incoming request length
            if buf.len() > MAX_MSG_LEN {
                warn!("hit request length limit, closing connection");
                break;
            }
        }
        self.req_sender.send((self.hid, IncomingMsg::bare(Quit))).unwrap();
        info!("handler is finished");
    }
}