dsf_client/
client.rs

1use std::collections::HashMap;
2use std::os::unix::net::UnixStream as StdUnixStream;
3use std::sync::{Arc, Mutex};
4use std::time::Duration;
5
6use futures::channel::mpsc;
7use futures::prelude::*;
8use futures::{SinkExt, StreamExt};
9
10use futures_codec::{Framed, JsonCodec};
11
12use async_std::future::timeout;
13use async_std::os::unix::net::UnixStream;
14use async_std::task::{self, JoinHandle};
15
16use humantime::Duration as HumanDuration;
17use structopt::StructOpt;
18
19use tracing::{span, Level};
20
21use dsf_rpc::*;
22use dsf_rpc::{Request as RpcRequest, Response as RpcResponse};
23
24use dsf_core::api::*;
25
26use crate::error::Error;
27
28type RequestMap = Arc<Mutex<HashMap<u64, mpsc::Sender<ResponseKind>>>>;
29
30/// Options for client instantiation
31#[derive(Clone, Debug, StructOpt)]
32pub struct Options {
33    #[structopt(
34        short = "d",
35        long = "daemon-socket",
36        default_value = "/var/run/dsfd/dsf.sock",
37        env = "DSF_SOCK"
38    )]
39    /// Specify the socket to bind the DSF daemon
40    pub daemon_socket: String,
41
42    #[structopt(long, default_value = "10s")]
43    pub timeout: HumanDuration,
44}
45
46impl Options {
47    pub fn new(address: &str, timeout: Duration) -> Self {
48        Self {
49            daemon_socket: address.to_string(),
50            timeout: timeout.into(),
51        }
52    }
53}
54
55#[derive(Debug)]
56pub struct Client {
57    addr: String,
58    sink: mpsc::Sender<RpcRequest>,
59    requests: RequestMap,
60
61    timeout: Duration,
62
63    tx_handle: JoinHandle<()>,
64    rx_handle: JoinHandle<()>,
65}
66
67impl Client {
68    /// Create a new client
69    pub fn new(options: &Options) -> Result<Self, Error> {
70        let span = span!(Level::DEBUG, "client", "{}", options.daemon_socket);
71        let _enter = span.enter();
72
73        info!("Client connecting (address: {})", options.daemon_socket);
74
75        // Connect to stream
76        let stream = StdUnixStream::connect(&options.daemon_socket)?;
77        let stream = UnixStream::from(stream);
78
79        // Build codec and split
80        let codec = JsonCodec::<RpcRequest, RpcResponse>::new();
81        let framed = Framed::new(stream, codec);
82        let (mut unix_sink, mut unix_stream) = framed.split();
83
84        // Create sending task
85        let (internal_sink, mut internal_stream) = mpsc::channel::<RpcRequest>(0);
86        let tx_handle = task::spawn(async move {
87            trace!("started client tx listener");
88            while let Some(msg) = internal_stream.next().await {
89                unix_sink.send(msg).await.unwrap();
90            }
91        });
92
93        // Create receiving task
94        let requests = Arc::new(Mutex::new(HashMap::new()));
95        let reqs = requests.clone();
96        let rx_handle = task::spawn(async move {
97            trace!("started client rx listener");
98            while let Some(Ok(resp)) = unix_stream.next().await {
99                Self::handle(&reqs, resp).await.unwrap();
100            }
101        });
102
103        Ok(Client {
104            sink: internal_sink,
105            addr: options.daemon_socket.to_owned(),
106            requests,
107            timeout: *options.timeout,
108            rx_handle,
109            tx_handle,
110        })
111    }
112
113    /// Issue a request to the daemon using a client instance, returning a response
114    // TODO: #[instrument] when futures 0.3 support is viable
115    pub async fn request(&mut self, rk: RequestKind) -> Result<ResponseKind, Error> {
116        let span = span!(Level::DEBUG, "client", "{}", self.addr);
117        let _enter = span.enter();
118
119        debug!("Issuing request: {:?}", rk);
120
121        let resp = self.do_request(rk).await.map(|(v, _)| v)?;
122
123        debug!("Received response: {:?}", resp);
124
125        Ok(resp)
126    }
127
128    // TODO: #[instrument]
129    async fn do_request(
130        &mut self,
131        rk: RequestKind,
132    ) -> Result<(ResponseKind, mpsc::Receiver<ResponseKind>), Error> {
133        let (tx, mut rx) = mpsc::channel(0);
134        let req = RpcRequest::new(rk);
135        let id = req.req_id();
136
137        // Add to tracking
138        trace!("request add lock");
139        self.requests.lock().unwrap().insert(id, tx);
140
141        // Send message
142        self.sink.send(req).await.unwrap();
143
144        // Await and return response
145        let res = timeout(self.timeout, rx.next()).await;
146
147        // TODO: Handle timeout errors
148        let res = match res {
149            Ok(Some(v)) => Ok(v),
150            // TODO: this seems like it should be a yeild / retry point..?
151            Ok(None) => {
152                error!("No response received");
153                Err(Error::None(()))
154            }
155            Err(e) => {
156                error!("Response error: {:?}", e);
157                Err(Error::Timeout)
158            }
159        };
160
161        // Remove request on failure
162        if let Err(_e) = &res {
163            trace!("request failure lock");
164            self.requests.lock().unwrap().remove(&id);
165        }
166
167        res.map(|v| (v, rx))
168    }
169
170    // Internal function to handle received messages
171    async fn handle(requests: &RequestMap, resp: RpcResponse) -> Result<(), Error> {
172        // Find matching sender
173        let id = resp.req_id();
174
175        trace!("receive request lock");
176        let mut a = match requests.lock().unwrap().get_mut(&id) {
177            Some(a) => a.clone(),
178            None => {
179                error!("Unix RX with no matching request ID");
180                return Err(Error::Unknown);
181            }
182        };
183
184        // Forward response
185        match a.send(resp.kind()).await {
186            Ok(_) => (),
187            Err(e) => {
188                error!("client send error: {:?}", e);
189            }
190        };
191
192        Ok(())
193    }
194
195    /// Fetch daemon status information
196    pub async fn status(&mut self) -> Result<StatusInfo, Error> {
197        let req = RequestKind::Status;
198        let resp = self.request(req).await?;
199
200        match resp {
201            ResponseKind::Status(info) => Ok(info),
202            _ => Err(Error::UnrecognizedResult),
203        }
204    }
205
206    /// Connect to another DSF instance
207    pub async fn connect(
208        &mut self,
209        options: peer::ConnectOptions,
210    ) -> Result<peer::ConnectInfo, Error> {
211        let req = RequestKind::Peer(peer::PeerCommands::Connect(options));
212        let resp = self.request(req).await?;
213
214        match resp {
215            ResponseKind::Connected(info) => Ok(info),
216            _ => Err(Error::UnrecognizedResult),
217        }
218    }
219
220    /// Search for a peer using the database
221    pub async fn find(&mut self, options: peer::SearchOptions) -> Result<peer::PeerInfo, Error> {
222        let req = RequestKind::Peer(peer::PeerCommands::Search(options));
223        let resp = self.request(req).await?;
224
225        match resp {
226            ResponseKind::Peer(info) => Ok(info.clone()),
227            _ => Err(Error::UnrecognizedResult),
228        }
229    }
230
231    /// List known services
232    pub async fn list(
233        &mut self,
234        options: service::ListOptions,
235    ) -> Result<Vec<service::ServiceInfo>, Error> {
236        let req = RequestKind::Service(service::ServiceCommands::List(options));
237        let resp = self.request(req).await?;
238
239        match resp {
240            ResponseKind::Services(info) => Ok(info),
241            _ => Err(Error::UnrecognizedResult),
242        }
243    }
244
245    /// Fetch information for a given service
246    pub async fn info(
247        &mut self,
248        options: service::InfoOptions,
249    ) -> Result<(ServiceHandle, ServiceInfo), Error> {
250        let req = RequestKind::Service(service::ServiceCommands::Info(options));
251        let resp = self.request(req).await?;
252
253        match resp {
254            ResponseKind::Service(info) => Ok((ServiceHandle::new(info.id.clone()), info)),
255            _ => Err(Error::UnrecognizedResult),
256        }
257    }
258
259    /// Create a new service with the provided options
260    /// This MUST be stored locally for reuse
261    pub async fn create(
262        &mut self,
263        options: service::CreateOptions,
264    ) -> Result<ServiceHandle, Error> {
265        let req = RequestKind::Service(service::ServiceCommands::Create(options));
266        let resp = self.request(req).await?;
267
268        match resp {
269            ResponseKind::Service(info) => Ok(ServiceHandle::new(info.id.clone())),
270            _ => Err(Error::UnrecognizedResult),
271        }
272    }
273
274    /// Register a service instance in the distributed database
275    pub async fn register(
276        &mut self,
277        options: RegisterOptions,
278    ) -> Result<dsf_rpc::service::RegisterInfo, Error> {
279        let req = RequestKind::Service(dsf_rpc::service::ServiceCommands::Register(options));
280        let resp = self.request(req).await?;
281
282        match resp {
283            ResponseKind::Registered(info) => Ok(info),
284            _ => Err(Error::UnrecognizedResult),
285        }
286    }
287
288    /// Locate a service instance in the distributed database
289    /// This returns a future that will resolve to the desired service or an error
290    pub async fn locate(
291        &mut self,
292        options: LocateOptions,
293    ) -> Result<(ServiceHandle, LocateInfo), Error> {
294        let id = options.id.clone();
295        let req = RequestKind::Service(dsf_rpc::service::ServiceCommands::Locate(options));
296
297        let resp = self.request(req).await?;
298
299        match resp {
300            ResponseKind::Located(info) => {
301                let handle = ServiceHandle { id: id.clone() };
302                Ok((handle, info))
303            }
304            _ => Err(Error::UnrecognizedResult),
305        }
306    }
307
308    /// Publish data using an existing service
309    pub async fn publish(&mut self, options: PublishOptions) -> Result<PublishInfo, Error> {
310        let req = RequestKind::Data(DataCommands::Publish(options));
311
312        let resp = self.request(req).await?;
313
314        match resp {
315            ResponseKind::Published(info) => Ok(info),
316            _ => Err(Error::UnrecognizedResult),
317        }
318    }
319
320    /// Subscribe to data from a given service
321    pub async fn subscribe(
322        &mut self,
323        options: SubscribeOptions,
324    ) -> Result<impl Stream<Item = ResponseKind>, Error> {
325        let req = RequestKind::Service(ServiceCommands::Subscribe(options));
326
327        let (resp, rx) = self.do_request(req).await?;
328
329        match resp {
330            ResponseKind::Subscribed(_info) => Ok(rx),
331            _ => Err(Error::UnrecognizedResult),
332        }
333    }
334
335    /// Fetch data from a given service
336    pub async fn data(&mut self, options: data::ListOptions) -> Result<Vec<DataInfo>, Error> {
337        let req = RequestKind::Data(DataCommands::List(options));
338
339        let resp = self.request(req).await?;
340
341        match resp {
342            ResponseKind::Data(info) => Ok(info),
343            _ => Err(Error::UnrecognizedResult),
344        }
345    }
346}