hrana_client/
client.rs

1use tokio::sync::{mpsc, oneshot};
2
3use crate::conn::{spawn_hrana_conn, ConnFut};
4use crate::error::{Error, Result};
5use crate::op::Op;
6use crate::Stream;
7
8#[derive(Clone)]
9pub struct Client {
10    conn_sender: mpsc::UnboundedSender<Op>,
11}
12
13impl Client {
14    /// Connects to the remote hrana server.
15    ///
16    /// Returns a `Client` handle, along with a `HranaConnFut` that drives the socket connection,
17    /// and needs to be awaited
18    ///
19    /// # Example:
20    ///
21    /// ```ignore
22    /// let (client, fut) = Client::connect("ws://localhost:8080", None).await?;
23    /// let handle = tokio::task::spawn(fut);
24    /// let stream = client.open_stream().await?;
25    /// // do things with stream...
26    ///
27    /// // collect errors
28    /// handle.await??
29    /// ```
30    pub async fn connect(url: &str, jwt: Option<String>) -> Result<(Self, ConnFut)> {
31        let (conn_sender, handle) = spawn_hrana_conn(url, jwt).await?;
32        Ok((Self { conn_sender }, handle))
33    }
34
35    /// Open a new stream on this client
36    pub async fn open_stream(&self) -> Result<Stream> {
37        let (ret, recv) = oneshot::channel();
38        let op = Op::OpenStream { ret };
39        self.conn_sender.send(op).map_err(|_| Error::Shutdown)?;
40        let stream = recv.await.map_err(|_| Error::Shutdown)?;
41
42        Ok(stream)
43    }
44
45    /// Client shutdown.
46    /// Causes all inflight request abort with a `Shutdown` error.
47    pub async fn shutdown(&self) -> Result<()> {
48        let (ret, recv) = oneshot::channel();
49        let op = Op::Shutdown { ret };
50        self.conn_sender.send(op).map_err(|_| Error::Shutdown)?;
51        recv.await.map_err(|_| Error::Shutdown)?;
52
53        Ok(())
54    }
55}