1use std::net::TcpStream;
26use std::time::Duration;
27use url::Url;
28use crossbeam_channel::unbounded;
29use serde_json::Value;
30
31use crate::errors::{Error, Result};
32use crate::server::{CON_SENDER, REQ_SENDER, next_handler_id,
33 HandlerId, ReqSender, RepReceiver};
34use crate::proto::{IncomingMsg, Msg};
35
36
37pub enum Client {
38 Local(LocalClient),
39 Remote(RemoteClient),
40}
41
42impl Client {
43 pub fn new(addr: &str) -> Result<Self> {
44 let baseurl = Url::parse("local://").expect("valid URL");
45 match Url::options().base_url(Some(&baseurl)).parse(addr) {
46 Err(e) => panic!("{}", e),
47 Ok(uri) => match uri.scheme() {
48 "local" => {
49 let loc = LocalClient::new(&uri.path()[1..]).ok_or_else(
50 || Error::comm_failed("no local server running"))?;
51 Ok(Client::Local(loc))
52 }
53 "secop" => {
54 let host = uri.domain().unwrap_or("localhost");
55 let port = uri.port().unwrap_or(10767);
56 let modname = uri.path()[1..].to_owned();
57 RemoteClient::new(host, port, modname).map(Client::Remote)
58 }
59 s => {
60 Err(Error::bad_value(format!("invalid URI scheme: {}", s)))
61 }
62 }
63 }
64 }
65
66 pub fn ping(&self) -> Result<()> {
67 match self {
68 Client::Local(c) => c.ping(),
69 Client::Remote(c) => c.ping()
70 }
71 }
72
73 pub fn read(&self, param: &str) -> Result<Value> {
74 match self {
75 Client::Local(c) => c.read(param),
76 Client::Remote(c) => c.read(param)
77 }
78 }
79
80 pub fn change(&self, param: &str, value: Value) -> Result<Value> {
81 match self {
82 Client::Local(c) => c.change(param, value),
83 Client::Remote(c) => c.change(param, value)
84 }
85 }
86
87 pub fn command(&self, cmd: &str, arg: Value) -> Result<Value> {
88 match self {
89 Client::Local(c) => c.command(cmd, arg),
90 Client::Remote(c) => c.command(cmd, arg)
91 }
92 }
93}
94
95
96pub struct LocalClient {
98 hid: HandlerId,
99 modname: String,
100 timeout: Duration,
101 req_sender: ReqSender,
102 rep_receiver: RepReceiver,
103}
104
105impl Drop for LocalClient {
106 fn drop(&mut self) {
107 let _ = self.req_sender.send((self.hid, IncomingMsg::bare(Msg::Quit)));
108 }
109}
110
111impl LocalClient {
112 pub fn new(modname: impl Into<String>) -> Option<Self> {
115 let timeout = Duration::from_secs(2); let hid = next_handler_id();
117 let con_sender = CON_SENDER.lock().clone()?;
118 let req_sender = REQ_SENDER.lock().clone()?;
119 let (rep_sender, rep_receiver) = unbounded();
120 con_sender.send((hid, rep_sender.clone())).unwrap();
121 Some(Self { hid, modname: modname.into(), timeout, req_sender, rep_receiver })
122 }
123
124 fn transact(&self, msg: Msg) -> Result<Msg> {
125 self.req_sender.send((self.hid, IncomingMsg::bare(msg))).unwrap();
126 match self.rep_receiver.recv_timeout(self.timeout) {
127 Err(_) => Err(Error::comm_failed("local module timed out")),
128 Ok(msg) => Ok(msg)
129 }
130 }
131
132 pub fn ping(&self) -> Result<()> {
133 match self.transact(Msg::Ping { token: self.modname.clone() })? {
134 Msg::Pong { ref token, .. } if token == &self.modname => Ok(()),
135 msg => Err(Error::protocol(format!("invalid reply message for ping: {}", msg)))
136 }
137 }
138
139 pub fn read(&self, param: &str) -> Result<Value> {
140 let req = Msg::Read { module: self.modname.clone(), param: param.into() };
141 match self.transact(req)? {
142 Msg::Update { data, .. } => Ok(data), msg => Err(Error::protocol(format!("invalid reply message for read: {}", msg)))
144 }
145 }
146
147 pub fn change(&self, param: &str, value: Value) -> Result<Value> {
148 let req = Msg::Change { module: self.modname.clone(), param: param.into(), value };
149 match self.transact(req)? {
150 Msg::Changed { data, .. } => Ok(data), msg => Err(Error::protocol(format!("invalid reply message for change: {}", msg)))
152 }
153 }
154
155 pub fn command(&self, cmd: &str, arg: Value) -> Result<Value> {
156 let req = Msg::Do { module: self.modname.clone(), command: cmd.into(), arg: arg };
157 match self.transact(req)? {
158 Msg::Done { data, .. } => Ok(data), msg => Err(Error::protocol(format!("invalid reply message for do: {}", msg)))
160 }
161 }
162}
163
164pub struct RemoteClient {
166 _conn: TcpStream,
167 modname: String,
168}
169
170impl RemoteClient {
171 pub fn new(_host: &str, _port: u16, _modname: String) -> Result<Self> {
172 Err(Error::config("remote client connection not yet implemented"))
173 }
174
175 fn transact(&self, _msg: Msg) -> Result<Msg> {
176 unimplemented!()
177 }
178
179 pub fn ping(&self) -> Result<()> {
180 match self.transact(Msg::Ping { token: self.modname.clone() })? {
181 Msg::Pong { ref token, .. } if token == &self.modname => Ok(()),
182 msg => Err(Error::protocol(format!("invalid reply message for ping: {}", msg)))
183 }
184 }
185
186 pub fn read(&self, param: &str) -> Result<Value> {
187 let req = Msg::Read { module: self.modname.clone(), param: param.into() };
188 match self.transact(req)? {
189 Msg::Update { data, .. } => Ok(data), msg => Err(Error::protocol(format!("invalid reply message for read: {}", msg)))
191 }
192 }
193
194 pub fn change(&self, param: &str, value: Value) -> Result<Value> {
195 let req = Msg::Change { module: self.modname.clone(), param: param.into(), value };
196 match self.transact(req)? {
197 Msg::Changed { data, .. } => Ok(data), msg => Err(Error::protocol(format!("invalid reply message for change: {}", msg)))
199 }
200 }
201
202 pub fn command(&self, cmd: &str, arg: Value) -> Result<Value> {
203 let req = Msg::Do { module: self.modname.clone(), command: cmd.into(), arg: arg };
204 match self.transact(req)? {
205 Msg::Done { data, .. } => Ok(data), msg => Err(Error::protocol(format!("invalid reply message for do: {}", msg)))
207 }
208 }
209}