1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
use super::{
    config::Config,
    socket::{Socket, SocketClient},
    WebsocketClient,
};
use crate::{
    Auth, Client, GlimeshError, MutationConn, QueryConn, SubscriptionConn, WebsocketConnectionError,
};
use futures::stream::BoxStream;
use graphql_client::GraphQLQuery;
use std::{fmt::Debug, time::Duration};

/// Configure and build a websocket [`Connection`].
///
/// ## Usage:
/// ```rust
/// let connection = Connection::builder()
///     .api_url("ws://localhost:8080/api/socket/websocket")
///     .build();
/// ```
#[derive(Debug, Default)]
pub struct ConnectionBuilder {
    config: Config,
}

impl ConnectionBuilder {
    /// Build the websocket connection from the set options and connect to the socket.
    pub async fn connect(self, auth: Auth) -> Result<Connection, WebsocketConnectionError> {
        let mut socket = Socket::new(auth, self.config);
        socket.connect().await?;
        let socket_client = socket.client();
        socket.stay_conected();

        Ok(Connection {
            socket: socket_client,
        })
    }

    /// Set the base api url used for request.
    /// Useful if running Glimesh locally for example.
    ///
    /// Defaults to `wss://glimesh.tv/api/socket/websocket`
    pub fn api_url(mut self, value: impl Into<String>) -> Self {
        self.config.api_url = value.into();
        self
    }

    /// Set the version passed as query param `vsn`.
    ///
    /// This defaults to `2.0.0` and is all glimesh supports at the time of writing.
    pub fn version(mut self, value: impl Into<String>) -> Self {
        self.config.version = value.into();
        self
    }

    /// Number of messages to buffer before sending messages blocks the sender.
    ///
    /// This defaults to 100
    pub fn outgoing_capacity(mut self, value: usize) -> Self {
        self.config.outgoing_capacity = value;
        self
    }

    /// Number of messages buffered before older messages a dropped if they aren't being handled in time.
    ///
    /// This defaults to 10_000 to allow for bursts of messages.
    pub fn incoming_capacity(mut self, value: usize) -> Self {
        self.config.incoming_capacity = value;
        self
    }

    /// Number of seconds between each socket ping
    ///
    /// This defaults to 30 seconds.
    pub fn ping_interval(mut self, value: Duration) -> Self {
        self.config.ping_interval = value;
        self
    }

    /// How long to wait for a response to a request over the socket before timing out.
    ///
    /// This defaults to 30 seconds.
    pub fn request_timeout(mut self, value: Duration) -> Self {
        self.config.request_timeout = value;
        self
    }
}

/// Connect to glimesh over websockets
#[derive(Debug, Clone)]
pub struct Connection {
    socket: SocketClient,
}

impl Connection {
    /// Create a [`ConnectionBuilder`] to configure various options.
    pub fn builder() -> ConnectionBuilder {
        ConnectionBuilder::default()
    }

    /// Create a connection with the default options.
    pub async fn connect(auth: Auth) -> Result<Self, WebsocketConnectionError> {
        ConnectionBuilder::default().connect(auth).await
    }

    /// Create a client with reference to this connection
    pub fn as_client(&self) -> Client<&Self, WebsocketConnectionError> {
        Client::new(self)
    }

    /// Create a client with a clone of this connection
    pub fn to_client(&self) -> WebsocketClient {
        Client::new(self.clone())
    }

    /// Convert this connection into a client
    pub fn into_client(self) -> WebsocketClient {
        Client::new(self)
    }

    /// Terminate the socket connection
    pub fn close(self) {
        self.socket.close();
    }

    async fn request<Q>(
        &self,
        variables: Q::Variables,
    ) -> Result<Q::ResponseData, WebsocketConnectionError>
    where
        Q: graphql_client::GraphQLQuery,
    {
        let reply = self
            .socket
            .request(
                "__absinthe__:control".into(),
                "doc".into(),
                Q::build_query(variables),
            )
            .await?;

        let res: graphql_client::Response<Q::ResponseData> = reply.response;

        if let Some(errs) = res.errors {
            if errs.len() > 0 {
                return Err(GlimeshError::GraphqlErrors(errs).into());
            }
        }

        let data = res.data.ok_or_else(|| GlimeshError::NoData)?;
        Ok(data)
    }
}

#[async_trait]
impl QueryConn for Connection {
    type Error = WebsocketConnectionError;

    async fn query<Q>(&self, variables: Q::Variables) -> Result<Q::ResponseData, Self::Error>
    where
        Q: graphql_client::GraphQLQuery,
        Q::Variables: Send + Sync,
    {
        self.request::<Q>(variables).await
    }
}

#[async_trait]
impl MutationConn for Connection {
    type Error = WebsocketConnectionError;

    async fn mutate<Q>(&self, variables: Q::Variables) -> Result<Q::ResponseData, Self::Error>
    where
        Q: graphql_client::GraphQLQuery,
        Q::Variables: Send + Sync,
    {
        self.request::<Q>(variables).await
    }
}

#[async_trait]
impl SubscriptionConn for Connection {
    type Error = WebsocketConnectionError;

    async fn subscribe<'a, Q>(
        &self,
        variables: Q::Variables,
    ) -> Result<BoxStream<'a, Q::ResponseData>, Self::Error>
    where
        Q: GraphQLQuery,
        Q::Variables: Send + Sync,
    {
        self.socket.subscribe(Q::build_query(variables)).await
    }
}