glimesh/conn/ws/
connection.rs1use super::{
2 config::Config,
3 socket::{Socket, SocketClient},
4 WebsocketClient,
5};
6use crate::{
7 Auth, Client, GlimeshError, MutationConn, QueryConn, Subscription, SubscriptionConn,
8 WebsocketConnectionError,
9};
10use graphql_client::GraphQLQuery;
11use std::{fmt::Debug, time::Duration};
12
13#[derive(Debug, Default)]
22pub struct ConnectionBuilder {
23 config: Config,
24}
25
26impl ConnectionBuilder {
27 pub async fn connect(self, auth: Auth) -> Result<Connection, WebsocketConnectionError> {
29 let mut socket = Socket::new(auth, self.config);
30 socket.connect().await?;
31 let socket_client = socket.client();
32 socket.stay_conected();
33
34 Ok(Connection {
35 socket: socket_client,
36 })
37 }
38
39 pub fn api_url(mut self, value: impl Into<String>) -> Self {
44 self.config.api_url = value.into();
45 self
46 }
47
48 pub fn version(mut self, value: impl Into<String>) -> Self {
52 self.config.version = value.into();
53 self
54 }
55
56 pub fn outgoing_capacity(mut self, value: usize) -> Self {
60 self.config.outgoing_capacity = value;
61 self
62 }
63
64 pub fn incoming_capacity(mut self, value: usize) -> Self {
68 self.config.incoming_capacity = value;
69 self
70 }
71
72 pub fn ping_interval(mut self, value: Duration) -> Self {
76 self.config.ping_interval = value;
77 self
78 }
79
80 pub fn request_timeout(mut self, value: Duration) -> Self {
84 self.config.request_timeout = value;
85 self
86 }
87}
88
89#[derive(Debug, Clone)]
91pub struct Connection {
92 socket: SocketClient,
93}
94
95impl Connection {
96 pub fn builder() -> ConnectionBuilder {
98 ConnectionBuilder::default()
99 }
100
101 pub async fn connect(auth: Auth) -> Result<Self, WebsocketConnectionError> {
103 ConnectionBuilder::default().connect(auth).await
104 }
105
106 pub fn as_client(&self) -> Client<&Self> {
108 Client::new(self)
109 }
110
111 pub fn to_client(&self) -> WebsocketClient {
113 Client::new(self.clone())
114 }
115
116 pub fn into_client(self) -> WebsocketClient {
118 Client::new(self)
119 }
120
121 pub fn close(self) {
123 self.socket.close();
124 }
125
126 async fn request<Q>(
127 &self,
128 variables: Q::Variables,
129 ) -> Result<Q::ResponseData, WebsocketConnectionError>
130 where
131 Q: graphql_client::GraphQLQuery,
132 {
133 let reply = self
134 .socket
135 .request(
136 "__absinthe__:control".into(),
137 "doc".into(),
138 Q::build_query(variables),
139 )
140 .await?;
141
142 let res: graphql_client::Response<Q::ResponseData> = reply.response;
143
144 if let Some(errs) = res.errors {
145 if !errs.is_empty() {
146 return Err(GlimeshError::GraphqlErrors(errs).into());
147 }
148 }
149
150 let data = res.data.ok_or(GlimeshError::NoData)?;
151 Ok(data)
152 }
153}
154
155#[async_trait]
156impl QueryConn for Connection {
157 type Error = WebsocketConnectionError;
158
159 async fn query<Q>(&self, variables: Q::Variables) -> Result<Q::ResponseData, Self::Error>
160 where
161 Q: graphql_client::GraphQLQuery,
162 Q::Variables: Send + Sync,
163 {
164 self.request::<Q>(variables).await
165 }
166}
167
168#[async_trait]
169impl MutationConn for Connection {
170 type Error = WebsocketConnectionError;
171
172 async fn mutate<Q>(&self, variables: Q::Variables) -> Result<Q::ResponseData, Self::Error>
173 where
174 Q: graphql_client::GraphQLQuery,
175 Q::Variables: Send + Sync,
176 {
177 self.request::<Q>(variables).await
178 }
179}
180
181#[async_trait]
182impl SubscriptionConn for Connection {
183 type Error = WebsocketConnectionError;
184
185 async fn subscribe<Q>(
186 &self,
187 variables: Q::Variables,
188 ) -> Result<Subscription<Q::ResponseData>, Self::Error>
189 where
190 Q: GraphQLQuery,
191 Q::Variables: Send + Sync,
192 {
193 self.socket.subscribe(Q::build_query(variables)).await
194 }
195}