sos_protocol/network_client/
mod.rs1use super::{Error, Result};
3use crate::transfer::CancelReason;
4use sos_core::encode;
5use sos_signer::ed25519::{
6 BinaryEd25519Signature, Signature as Ed25519Signature,
7};
8use std::{
9 future::Future,
10 sync::{
11 atomic::{AtomicU32, Ordering},
12 Arc,
13 },
14 time::Duration,
15};
16use tokio::sync::watch;
17
18mod http;
19#[cfg(feature = "listen")]
20mod websocket;
21
22pub use self::http::{set_user_agent, HttpClient};
23
24#[cfg(feature = "listen")]
25pub use websocket::{changes, connect, ListenOptions, WebSocketHandle};
26
27#[cfg(not(target_arch = "wasm32"))]
29#[derive(Debug, Clone)]
30pub struct NetworkRetry {
31 retries: Arc<AtomicU32>,
32 pub reconnect_interval: u16,
34 pub maximum_retries: u32,
36}
37
38#[cfg(not(target_arch = "wasm32"))]
39impl Default for NetworkRetry {
40 fn default() -> Self {
41 Self::new(4, 1000)
42 }
43}
44
45#[cfg(not(target_arch = "wasm32"))]
46impl NetworkRetry {
47 pub fn new(maximum_retries: u32, reconnect_interval: u16) -> Self {
53 Self {
54 retries: Arc::new(AtomicU32::from(1)),
55 reconnect_interval,
56 maximum_retries,
57 }
58 }
59
60 pub fn delay(&self, retries: u32) -> Result<u64> {
62 let factor = 2u64.checked_pow(retries).ok_or(Error::RetryOverflow)?;
63 Ok(self.reconnect_interval as u64 * factor)
64 }
65
66 pub fn retries(&self) -> u32 {
68 self.retries.load(Ordering::SeqCst)
69 }
70
71 pub fn maximum(&self) -> u32 {
73 self.maximum_retries
74 }
75
76 pub fn reset(&self) {
78 self.retries.store(1, Ordering::SeqCst)
79 }
80
81 pub fn clone_reset(&self) -> Self {
83 Self {
84 retries: Arc::new(AtomicU32::from(1)),
85 reconnect_interval: self.reconnect_interval,
86 maximum_retries: self.maximum_retries,
87 }
88 }
89
90 pub fn increment(&self) -> u32 {
92 self.retries.fetch_add(1, Ordering::SeqCst)
93 }
94
95 pub fn is_exhausted(&self, retries: u32) -> bool {
97 retries > self.maximum_retries
98 }
99
100 pub async fn wait_and_retry<D, T, F>(
102 &self,
103 id: D,
104 retries: u32,
105 callback: F,
106 mut cancel: watch::Receiver<CancelReason>,
107 ) -> Result<T>
108 where
109 D: std::fmt::Display,
110 F: Future<Output = T>,
111 {
112 let delay = self.delay(retries)?;
113 tracing::debug!(
114 id = %id,
115 delay = %delay,
116 retries = %retries,
117 maximum_retries = %self.maximum_retries,
118 "retry",
119 );
120
121 loop {
122 tokio::select! {
123 _ = cancel.changed() => {
124 let reason = cancel.borrow();
125 tracing::debug!(id = %id, "retry::canceled");
126 return Err(Error::RetryCanceled(reason.clone()));
127 }
128 _ = tokio::time::sleep(Duration::from_millis(delay)) => {
129 return Ok(callback.await)
130 }
131 };
132 }
133 }
134}
135
136pub(crate) async fn encode_device_signature(
137 signature: Ed25519Signature,
138) -> Result<String> {
139 let signature: BinaryEd25519Signature = signature.into();
140 Ok(bs58::encode(encode(&signature).await?).into_string())
141}
142
143pub(crate) fn bearer_prefix(device_signature: &str) -> String {
144 format!("Bearer {}", device_signature)
145}
146
147#[cfg(any(feature = "listen", feature = "pairing"))]
148mod websocket_request {
149 use crate::constants::X_SOS_ACCOUNT_ID;
150
151 use super::Result;
152 use sos_core::AccountId;
153 use tokio_tungstenite::tungstenite::{
154 self, client::IntoClientRequest, handshake::client::generate_key,
155 };
156 use url::Url;
157
158 pub struct WebSocketRequest {
160 pub account_id: AccountId,
162 pub uri: Url,
164 pub host: String,
166 pub bearer: Option<String>,
168 pub origin: url::Origin,
170 }
171
172 impl WebSocketRequest {
173 pub fn new(
175 account_id: AccountId,
176 url: &Url,
177 path: &str,
178 ) -> Result<Self> {
179 let origin = url.origin();
180 let host = url.host_str().unwrap().to_string();
181
182 let mut uri = url.join(path)?;
183 let scheme = if uri.scheme() == "http" {
184 "ws"
185 } else if uri.scheme() == "https" {
186 "wss"
187 } else {
188 panic!("bad url scheme for websocket, requires http(s)");
189 };
190
191 uri.set_scheme(scheme)
192 .expect("failed to set websocket scheme");
193
194 Ok(Self {
195 account_id,
196 host,
197 uri,
198 origin,
199 bearer: None,
200 })
201 }
202
203 pub fn set_bearer(&mut self, bearer: String) {
205 self.bearer = Some(bearer);
206 }
207 }
208
209 impl IntoClientRequest for WebSocketRequest {
210 fn into_client_request(
211 self,
212 ) -> std::result::Result<http::Request<()>, tungstenite::Error>
213 {
214 let origin = self.origin.unicode_serialization();
215 let mut request =
216 http::Request::builder().uri(self.uri.to_string());
217 if let Some(bearer) = self.bearer {
218 request = request.header("authorization", bearer);
219 }
220 request = request
221 .header("sec-websocket-key", generate_key())
222 .header("sec-websocket-version", "13")
223 .header("host", self.host)
224 .header("origin", origin)
225 .header("connection", "keep-alive, Upgrade")
226 .header(X_SOS_ACCOUNT_ID, self.account_id.to_string())
227 .header("upgrade", "websocket");
228 Ok(request.body(())?)
229 }
230 }
231}
232
233#[cfg(any(feature = "listen", feature = "pairing"))]
234pub use websocket_request::WebSocketRequest;