1use std::collections::HashMap;
2use std::os::unix::net::UnixStream as StdUnixStream;
3use std::sync::{Arc, Mutex};
4use std::time::Duration;
5
6use futures::channel::mpsc;
7use futures::prelude::*;
8use futures::{SinkExt, StreamExt};
9
10use futures_codec::{Framed, JsonCodec};
11
12use async_std::future::timeout;
13use async_std::os::unix::net::UnixStream;
14use async_std::task::{self, JoinHandle};
15
16use humantime::Duration as HumanDuration;
17use structopt::StructOpt;
18
19use tracing::{span, Level};
20
21use dsf_rpc::*;
22use dsf_rpc::{Request as RpcRequest, Response as RpcResponse};
23
24use dsf_core::api::*;
25
26use crate::error::Error;
27
28type RequestMap = Arc<Mutex<HashMap<u64, mpsc::Sender<ResponseKind>>>>;
29
30#[derive(Clone, Debug, StructOpt)]
32pub struct Options {
33 #[structopt(
34 short = "d",
35 long = "daemon-socket",
36 default_value = "/var/run/dsfd/dsf.sock",
37 env = "DSF_SOCK"
38 )]
39 pub daemon_socket: String,
41
42 #[structopt(long, default_value = "10s")]
43 pub timeout: HumanDuration,
44}
45
46impl Options {
47 pub fn new(address: &str, timeout: Duration) -> Self {
48 Self {
49 daemon_socket: address.to_string(),
50 timeout: timeout.into(),
51 }
52 }
53}
54
55#[derive(Debug)]
56pub struct Client {
57 addr: String,
58 sink: mpsc::Sender<RpcRequest>,
59 requests: RequestMap,
60
61 timeout: Duration,
62
63 tx_handle: JoinHandle<()>,
64 rx_handle: JoinHandle<()>,
65}
66
67impl Client {
68 pub fn new(options: &Options) -> Result<Self, Error> {
70 let span = span!(Level::DEBUG, "client", "{}", options.daemon_socket);
71 let _enter = span.enter();
72
73 info!("Client connecting (address: {})", options.daemon_socket);
74
75 let stream = StdUnixStream::connect(&options.daemon_socket)?;
77 let stream = UnixStream::from(stream);
78
79 let codec = JsonCodec::<RpcRequest, RpcResponse>::new();
81 let framed = Framed::new(stream, codec);
82 let (mut unix_sink, mut unix_stream) = framed.split();
83
84 let (internal_sink, mut internal_stream) = mpsc::channel::<RpcRequest>(0);
86 let tx_handle = task::spawn(async move {
87 trace!("started client tx listener");
88 while let Some(msg) = internal_stream.next().await {
89 unix_sink.send(msg).await.unwrap();
90 }
91 });
92
93 let requests = Arc::new(Mutex::new(HashMap::new()));
95 let reqs = requests.clone();
96 let rx_handle = task::spawn(async move {
97 trace!("started client rx listener");
98 while let Some(Ok(resp)) = unix_stream.next().await {
99 Self::handle(&reqs, resp).await.unwrap();
100 }
101 });
102
103 Ok(Client {
104 sink: internal_sink,
105 addr: options.daemon_socket.to_owned(),
106 requests,
107 timeout: *options.timeout,
108 rx_handle,
109 tx_handle,
110 })
111 }
112
113 pub async fn request(&mut self, rk: RequestKind) -> Result<ResponseKind, Error> {
116 let span = span!(Level::DEBUG, "client", "{}", self.addr);
117 let _enter = span.enter();
118
119 debug!("Issuing request: {:?}", rk);
120
121 let resp = self.do_request(rk).await.map(|(v, _)| v)?;
122
123 debug!("Received response: {:?}", resp);
124
125 Ok(resp)
126 }
127
128 async fn do_request(
130 &mut self,
131 rk: RequestKind,
132 ) -> Result<(ResponseKind, mpsc::Receiver<ResponseKind>), Error> {
133 let (tx, mut rx) = mpsc::channel(0);
134 let req = RpcRequest::new(rk);
135 let id = req.req_id();
136
137 trace!("request add lock");
139 self.requests.lock().unwrap().insert(id, tx);
140
141 self.sink.send(req).await.unwrap();
143
144 let res = timeout(self.timeout, rx.next()).await;
146
147 let res = match res {
149 Ok(Some(v)) => Ok(v),
150 Ok(None) => {
152 error!("No response received");
153 Err(Error::None(()))
154 }
155 Err(e) => {
156 error!("Response error: {:?}", e);
157 Err(Error::Timeout)
158 }
159 };
160
161 if let Err(_e) = &res {
163 trace!("request failure lock");
164 self.requests.lock().unwrap().remove(&id);
165 }
166
167 res.map(|v| (v, rx))
168 }
169
170 async fn handle(requests: &RequestMap, resp: RpcResponse) -> Result<(), Error> {
172 let id = resp.req_id();
174
175 trace!("receive request lock");
176 let mut a = match requests.lock().unwrap().get_mut(&id) {
177 Some(a) => a.clone(),
178 None => {
179 error!("Unix RX with no matching request ID");
180 return Err(Error::Unknown);
181 }
182 };
183
184 match a.send(resp.kind()).await {
186 Ok(_) => (),
187 Err(e) => {
188 error!("client send error: {:?}", e);
189 }
190 };
191
192 Ok(())
193 }
194
195 pub async fn status(&mut self) -> Result<StatusInfo, Error> {
197 let req = RequestKind::Status;
198 let resp = self.request(req).await?;
199
200 match resp {
201 ResponseKind::Status(info) => Ok(info),
202 _ => Err(Error::UnrecognizedResult),
203 }
204 }
205
206 pub async fn connect(
208 &mut self,
209 options: peer::ConnectOptions,
210 ) -> Result<peer::ConnectInfo, Error> {
211 let req = RequestKind::Peer(peer::PeerCommands::Connect(options));
212 let resp = self.request(req).await?;
213
214 match resp {
215 ResponseKind::Connected(info) => Ok(info),
216 _ => Err(Error::UnrecognizedResult),
217 }
218 }
219
220 pub async fn find(&mut self, options: peer::SearchOptions) -> Result<peer::PeerInfo, Error> {
222 let req = RequestKind::Peer(peer::PeerCommands::Search(options));
223 let resp = self.request(req).await?;
224
225 match resp {
226 ResponseKind::Peer(info) => Ok(info.clone()),
227 _ => Err(Error::UnrecognizedResult),
228 }
229 }
230
231 pub async fn list(
233 &mut self,
234 options: service::ListOptions,
235 ) -> Result<Vec<service::ServiceInfo>, Error> {
236 let req = RequestKind::Service(service::ServiceCommands::List(options));
237 let resp = self.request(req).await?;
238
239 match resp {
240 ResponseKind::Services(info) => Ok(info),
241 _ => Err(Error::UnrecognizedResult),
242 }
243 }
244
245 pub async fn info(
247 &mut self,
248 options: service::InfoOptions,
249 ) -> Result<(ServiceHandle, ServiceInfo), Error> {
250 let req = RequestKind::Service(service::ServiceCommands::Info(options));
251 let resp = self.request(req).await?;
252
253 match resp {
254 ResponseKind::Service(info) => Ok((ServiceHandle::new(info.id.clone()), info)),
255 _ => Err(Error::UnrecognizedResult),
256 }
257 }
258
259 pub async fn create(
262 &mut self,
263 options: service::CreateOptions,
264 ) -> Result<ServiceHandle, Error> {
265 let req = RequestKind::Service(service::ServiceCommands::Create(options));
266 let resp = self.request(req).await?;
267
268 match resp {
269 ResponseKind::Service(info) => Ok(ServiceHandle::new(info.id.clone())),
270 _ => Err(Error::UnrecognizedResult),
271 }
272 }
273
274 pub async fn register(
276 &mut self,
277 options: RegisterOptions,
278 ) -> Result<dsf_rpc::service::RegisterInfo, Error> {
279 let req = RequestKind::Service(dsf_rpc::service::ServiceCommands::Register(options));
280 let resp = self.request(req).await?;
281
282 match resp {
283 ResponseKind::Registered(info) => Ok(info),
284 _ => Err(Error::UnrecognizedResult),
285 }
286 }
287
288 pub async fn locate(
291 &mut self,
292 options: LocateOptions,
293 ) -> Result<(ServiceHandle, LocateInfo), Error> {
294 let id = options.id.clone();
295 let req = RequestKind::Service(dsf_rpc::service::ServiceCommands::Locate(options));
296
297 let resp = self.request(req).await?;
298
299 match resp {
300 ResponseKind::Located(info) => {
301 let handle = ServiceHandle { id: id.clone() };
302 Ok((handle, info))
303 }
304 _ => Err(Error::UnrecognizedResult),
305 }
306 }
307
308 pub async fn publish(&mut self, options: PublishOptions) -> Result<PublishInfo, Error> {
310 let req = RequestKind::Data(DataCommands::Publish(options));
311
312 let resp = self.request(req).await?;
313
314 match resp {
315 ResponseKind::Published(info) => Ok(info),
316 _ => Err(Error::UnrecognizedResult),
317 }
318 }
319
320 pub async fn subscribe(
322 &mut self,
323 options: SubscribeOptions,
324 ) -> Result<impl Stream<Item = ResponseKind>, Error> {
325 let req = RequestKind::Service(ServiceCommands::Subscribe(options));
326
327 let (resp, rx) = self.do_request(req).await?;
328
329 match resp {
330 ResponseKind::Subscribed(_info) => Ok(rx),
331 _ => Err(Error::UnrecognizedResult),
332 }
333 }
334
335 pub async fn data(&mut self, options: data::ListOptions) -> Result<Vec<DataInfo>, Error> {
337 let req = RequestKind::Data(DataCommands::List(options));
338
339 let resp = self.request(req).await?;
340
341 match resp {
342 ResponseKind::Data(info) => Ok(info),
343 _ => Err(Error::UnrecognizedResult),
344 }
345 }
346}