glimesh/conn/ws/
connection.rs

1use super::{
2    config::Config,
3    socket::{Socket, SocketClient},
4    WebsocketClient,
5};
6use crate::{
7    Auth, Client, GlimeshError, MutationConn, QueryConn, Subscription, SubscriptionConn,
8    WebsocketConnectionError,
9};
10use graphql_client::GraphQLQuery;
11use std::{fmt::Debug, time::Duration};
12
13/// Configure and build a websocket [`Connection`].
14///
15/// ## Usage:
16/// ```rust
17/// let connection = Connection::builder()
18///     .api_url("ws://localhost:8080/api/socket/websocket")
19///     .build();
20/// ```
21#[derive(Debug, Default)]
22pub struct ConnectionBuilder {
23    config: Config,
24}
25
26impl ConnectionBuilder {
27    /// Build the websocket connection from the set options and connect to the socket.
28    pub async fn connect(self, auth: Auth) -> Result<Connection, WebsocketConnectionError> {
29        let mut socket = Socket::new(auth, self.config);
30        socket.connect().await?;
31        let socket_client = socket.client();
32        socket.stay_conected();
33
34        Ok(Connection {
35            socket: socket_client,
36        })
37    }
38
39    /// Set the base api url used for request.
40    /// Useful if running Glimesh locally for example.
41    ///
42    /// Defaults to `wss://glimesh.tv/api/socket/websocket`
43    pub fn api_url(mut self, value: impl Into<String>) -> Self {
44        self.config.api_url = value.into();
45        self
46    }
47
48    /// Set the version passed as query param `vsn`.
49    ///
50    /// This defaults to `2.0.0` and is all glimesh supports at the time of writing.
51    pub fn version(mut self, value: impl Into<String>) -> Self {
52        self.config.version = value.into();
53        self
54    }
55
56    /// Number of messages to buffer before sending messages blocks the sender.
57    ///
58    /// This defaults to 100
59    pub fn outgoing_capacity(mut self, value: usize) -> Self {
60        self.config.outgoing_capacity = value;
61        self
62    }
63
64    /// Number of messages buffered before older messages a dropped if they aren't being handled in time.
65    ///
66    /// This defaults to 10_000 to allow for bursts of messages.
67    pub fn incoming_capacity(mut self, value: usize) -> Self {
68        self.config.incoming_capacity = value;
69        self
70    }
71
72    /// Number of seconds between each socket ping
73    ///
74    /// This defaults to 30 seconds.
75    pub fn ping_interval(mut self, value: Duration) -> Self {
76        self.config.ping_interval = value;
77        self
78    }
79
80    /// How long to wait for a response to a request over the socket before timing out.
81    ///
82    /// This defaults to 30 seconds.
83    pub fn request_timeout(mut self, value: Duration) -> Self {
84        self.config.request_timeout = value;
85        self
86    }
87}
88
89/// Connect to glimesh over websockets
90#[derive(Debug, Clone)]
91pub struct Connection {
92    socket: SocketClient,
93}
94
95impl Connection {
96    /// Create a [`ConnectionBuilder`] to configure various options.
97    pub fn builder() -> ConnectionBuilder {
98        ConnectionBuilder::default()
99    }
100
101    /// Create a connection with the default options.
102    pub async fn connect(auth: Auth) -> Result<Self, WebsocketConnectionError> {
103        ConnectionBuilder::default().connect(auth).await
104    }
105
106    /// Create a client with reference to this connection
107    pub fn as_client(&self) -> Client<&Self> {
108        Client::new(self)
109    }
110
111    /// Create a client with a clone of this connection
112    pub fn to_client(&self) -> WebsocketClient {
113        Client::new(self.clone())
114    }
115
116    /// Convert this connection into a client
117    pub fn into_client(self) -> WebsocketClient {
118        Client::new(self)
119    }
120
121    /// Terminate the socket connection
122    pub fn close(self) {
123        self.socket.close();
124    }
125
126    async fn request<Q>(
127        &self,
128        variables: Q::Variables,
129    ) -> Result<Q::ResponseData, WebsocketConnectionError>
130    where
131        Q: graphql_client::GraphQLQuery,
132    {
133        let reply = self
134            .socket
135            .request(
136                "__absinthe__:control".into(),
137                "doc".into(),
138                Q::build_query(variables),
139            )
140            .await?;
141
142        let res: graphql_client::Response<Q::ResponseData> = reply.response;
143
144        if let Some(errs) = res.errors {
145            if !errs.is_empty() {
146                return Err(GlimeshError::GraphqlErrors(errs).into());
147            }
148        }
149
150        let data = res.data.ok_or(GlimeshError::NoData)?;
151        Ok(data)
152    }
153}
154
155#[async_trait]
156impl QueryConn for Connection {
157    type Error = WebsocketConnectionError;
158
159    async fn query<Q>(&self, variables: Q::Variables) -> Result<Q::ResponseData, Self::Error>
160    where
161        Q: graphql_client::GraphQLQuery,
162        Q::Variables: Send + Sync,
163    {
164        self.request::<Q>(variables).await
165    }
166}
167
168#[async_trait]
169impl MutationConn for Connection {
170    type Error = WebsocketConnectionError;
171
172    async fn mutate<Q>(&self, variables: Q::Variables) -> Result<Q::ResponseData, Self::Error>
173    where
174        Q: graphql_client::GraphQLQuery,
175        Q::Variables: Send + Sync,
176    {
177        self.request::<Q>(variables).await
178    }
179}
180
181#[async_trait]
182impl SubscriptionConn for Connection {
183    type Error = WebsocketConnectionError;
184
185    async fn subscribe<Q>(
186        &self,
187        variables: Q::Variables,
188    ) -> Result<Subscription<Q::ResponseData>, Self::Error>
189    where
190        Q: GraphQLQuery,
191        Q::Variables: Send + Sync,
192    {
193        self.socket.subscribe(Q::build_query(variables)).await
194    }
195}