secop_core/
client.rs

1// -----------------------------------------------------------------------------
2// Rust SECoP playground
3//
4// This program is free software; you can redistribute it and/or modify it under
5// the terms of the GNU General Public License as published by the Free Software
6// Foundation; either version 2 of the License, or (at your option) any later
7// version.
8//
9// This program is distributed in the hope that it will be useful, but WITHOUT
10// ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
11// FOR A PARTICULAR PURPOSE.  See the GNU General Public License for more
12// details.
13//
14// You should have received a copy of the GNU General Public License along with
15// this program; if not, write to the Free Software Foundation, Inc.,
16// 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
17//
18// Module authors:
19//   Georg Brandl <g.brandl@fz-juelich.de>
20//
21// -----------------------------------------------------------------------------
22//
23//! A client for use with internal and external modules.
24
25use std::net::TcpStream;
26use std::time::Duration;
27use url::Url;
28use crossbeam_channel::unbounded;
29use serde_json::Value;
30
31use crate::errors::{Error, Result};
32use crate::server::{CON_SENDER, REQ_SENDER, next_handler_id,
33                    HandlerId, ReqSender, RepReceiver};
34use crate::proto::{IncomingMsg, Msg};
35
36
37pub enum Client {
38    Local(LocalClient),
39    Remote(RemoteClient),
40}
41
42impl Client {
43    pub fn new(addr: &str) -> Result<Self> {
44        let baseurl = Url::parse("local://").expect("valid URL");
45        match Url::options().base_url(Some(&baseurl)).parse(addr) {
46            Err(e) => panic!("{}", e),
47            Ok(uri) => match uri.scheme() {
48                "local" => {
49                    let loc = LocalClient::new(&uri.path()[1..]).ok_or_else(
50                        || Error::comm_failed("no local server running"))?;
51                    Ok(Client::Local(loc))
52                }
53                "secop" => {
54                    let host = uri.domain().unwrap_or("localhost");
55                    let port = uri.port().unwrap_or(10767);
56                    let modname = uri.path()[1..].to_owned();
57                    RemoteClient::new(host, port, modname).map(Client::Remote)
58                }
59                s => {
60                    Err(Error::bad_value(format!("invalid URI scheme: {}", s)))
61                }
62            }
63        }
64    }
65
66    pub fn ping(&self) -> Result<()> {
67        match self {
68            Client::Local(c) => c.ping(),
69            Client::Remote(c) => c.ping()
70        }
71    }
72
73    pub fn read(&self, param: &str) -> Result<Value> {
74        match self {
75            Client::Local(c) => c.read(param),
76            Client::Remote(c) => c.read(param)
77        }
78    }
79
80    pub fn change(&self, param: &str, value: Value) -> Result<Value> {
81        match self {
82            Client::Local(c) => c.change(param, value),
83            Client::Remote(c) => c.change(param, value)
84        }
85    }
86
87    pub fn command(&self, cmd: &str, arg: Value) -> Result<Value> {
88        match self {
89            Client::Local(c) => c.command(cmd, arg),
90            Client::Remote(c) => c.command(cmd, arg)
91        }
92    }
93}
94
95
96/// Client that loops back requests to a module in this process.
97pub struct LocalClient {
98    hid: HandlerId,
99    modname: String,
100    timeout: Duration,
101    req_sender: ReqSender,
102    rep_receiver: RepReceiver,
103}
104
105impl Drop for LocalClient {
106    fn drop(&mut self) {
107        let _ = self.req_sender.send((self.hid, IncomingMsg::bare(Msg::Quit)));
108    }
109}
110
111impl LocalClient {
112    /// Return a new local client connecting to the given module.  None is
113    /// returned if no local server is running.
114    pub fn new(modname: impl Into<String>) -> Option<Self> {
115        let timeout = Duration::from_secs(2); // TODO configurable
116        let hid = next_handler_id();
117        let con_sender = CON_SENDER.lock().clone()?;
118        let req_sender = REQ_SENDER.lock().clone()?;
119        let (rep_sender, rep_receiver) = unbounded();
120        con_sender.send((hid, rep_sender.clone())).unwrap();
121        Some(Self { hid, modname: modname.into(), timeout, req_sender, rep_receiver })
122    }
123
124    fn transact(&self, msg: Msg) -> Result<Msg> {
125        self.req_sender.send((self.hid, IncomingMsg::bare(msg))).unwrap();
126        match self.rep_receiver.recv_timeout(self.timeout) {
127            Err(_)  => Err(Error::comm_failed("local module timed out")),
128            Ok(msg) => Ok(msg)
129        }
130    }
131
132    pub fn ping(&self) -> Result<()> {
133        match self.transact(Msg::Ping { token: self.modname.clone() })? {
134            Msg::Pong { ref token, .. } if token == &self.modname => Ok(()),
135            msg => Err(Error::protocol(format!("invalid reply message for ping: {}", msg)))
136        }
137    }
138
139    pub fn read(&self, param: &str) -> Result<Value> {
140        let req = Msg::Read { module: self.modname.clone(), param: param.into() };
141        match self.transact(req)? {
142            Msg::Update { data, .. } => Ok(data), // TODO extract value from report
143            msg => Err(Error::protocol(format!("invalid reply message for read: {}", msg)))
144        }
145    }
146
147    pub fn change(&self, param: &str, value: Value) -> Result<Value> {
148        let req = Msg::Change { module: self.modname.clone(), param: param.into(), value };
149        match self.transact(req)? {
150            Msg::Changed { data, .. } => Ok(data), // TODO extract value from report
151            msg => Err(Error::protocol(format!("invalid reply message for change: {}", msg)))
152        }
153    }
154
155    pub fn command(&self, cmd: &str, arg: Value) -> Result<Value> {
156        let req = Msg::Do { module: self.modname.clone(), command: cmd.into(), arg: arg };
157        match self.transact(req)? {
158            Msg::Done { data, .. } => Ok(data), // TODO extract value from report
159            msg => Err(Error::protocol(format!("invalid reply message for do: {}", msg)))
160        }
161    }
162}
163
164/// Client that accesses a module in some remote SEC node.
165pub struct RemoteClient {
166    _conn: TcpStream,
167    modname: String,
168}
169
170impl RemoteClient {
171    pub fn new(_host: &str, _port: u16, _modname: String) -> Result<Self> {
172        Err(Error::config("remote client connection not yet implemented"))
173    }
174
175    fn transact(&self, _msg: Msg) -> Result<Msg> {
176        unimplemented!()
177    }
178
179    pub fn ping(&self) -> Result<()> {
180        match self.transact(Msg::Ping { token: self.modname.clone() })? {
181            Msg::Pong { ref token, .. } if token == &self.modname => Ok(()),
182            msg => Err(Error::protocol(format!("invalid reply message for ping: {}", msg)))
183        }
184    }
185
186    pub fn read(&self, param: &str) -> Result<Value> {
187        let req = Msg::Read { module: self.modname.clone(), param: param.into() };
188        match self.transact(req)? {
189            Msg::Update { data, .. } => Ok(data), // TODO extract value from report
190            msg => Err(Error::protocol(format!("invalid reply message for read: {}", msg)))
191        }
192    }
193
194    pub fn change(&self, param: &str, value: Value) -> Result<Value> {
195        let req = Msg::Change { module: self.modname.clone(), param: param.into(), value };
196        match self.transact(req)? {
197            Msg::Changed { data, .. } => Ok(data), // TODO extract value from report
198            msg => Err(Error::protocol(format!("invalid reply message for change: {}", msg)))
199        }
200    }
201
202    pub fn command(&self, cmd: &str, arg: Value) -> Result<Value> {
203        let req = Msg::Do { module: self.modname.clone(), command: cmd.into(), arg: arg };
204        match self.transact(req)? {
205            Msg::Done { data, .. } => Ok(data), // TODO extract value from report
206            msg => Err(Error::protocol(format!("invalid reply message for do: {}", msg)))
207        }
208    }
209}