sos_protocol/network_client/
mod.rs

1//! HTTP transport trait and implementations.
2use super::{Error, Result};
3use crate::transfer::CancelReason;
4use sos_core::encode;
5use sos_signer::ed25519::{
6    BinaryEd25519Signature, Signature as Ed25519Signature,
7};
8use std::{
9    future::Future,
10    sync::{
11        atomic::{AtomicU32, Ordering},
12        Arc,
13    },
14    time::Duration,
15};
16use tokio::sync::watch;
17
18mod http;
19#[cfg(feature = "listen")]
20mod websocket;
21
22pub use self::http::{set_user_agent, HttpClient};
23
24#[cfg(feature = "listen")]
25pub use websocket::{changes, connect, ListenOptions, WebSocketHandle};
26
27/// Network retry state and logic for exponential backoff.
28#[cfg(not(target_arch = "wasm32"))]
29#[derive(Debug, Clone)]
30pub struct NetworkRetry {
31    retries: Arc<AtomicU32>,
32    /// Reconnect interval.
33    pub reconnect_interval: u16,
34    /// Maximum number of retries.
35    pub maximum_retries: u32,
36}
37
38#[cfg(not(target_arch = "wasm32"))]
39impl Default for NetworkRetry {
40    fn default() -> Self {
41        Self::new(4, 1000)
42    }
43}
44
45#[cfg(not(target_arch = "wasm32"))]
46impl NetworkRetry {
47    /// Create a new network retry.
48    ///
49    /// The reconnect interval is a *base interval* in milliseconds
50    /// for the exponential backoff so use a small value such as
51    /// `1000` or `2000`.
52    pub fn new(maximum_retries: u32, reconnect_interval: u16) -> Self {
53        Self {
54            retries: Arc::new(AtomicU32::from(1)),
55            reconnect_interval,
56            maximum_retries,
57        }
58    }
59
60    /// Exponential backoff millisecond delay for a retry counter.
61    pub fn delay(&self, retries: u32) -> Result<u64> {
62        let factor = 2u64.checked_pow(retries).ok_or(Error::RetryOverflow)?;
63        Ok(self.reconnect_interval as u64 * factor)
64    }
65
66    /// Current number of retries.
67    pub fn retries(&self) -> u32 {
68        self.retries.load(Ordering::SeqCst)
69    }
70
71    /// Maximum number of retries.
72    pub fn maximum(&self) -> u32 {
73        self.maximum_retries
74    }
75
76    /// Reset retries counter.
77    pub fn reset(&self) {
78        self.retries.store(1, Ordering::SeqCst)
79    }
80
81    /// Clone of this network retry with the retry counter reset.
82    pub fn clone_reset(&self) -> Self {
83        Self {
84            retries: Arc::new(AtomicU32::from(1)),
85            reconnect_interval: self.reconnect_interval,
86            maximum_retries: self.maximum_retries,
87        }
88    }
89
90    /// Increment for next retry attempt.
91    pub fn increment(&self) -> u32 {
92        self.retries.fetch_add(1, Ordering::SeqCst)
93    }
94
95    /// Determine if retry attempts are exhausted.
96    pub fn is_exhausted(&self, retries: u32) -> bool {
97        retries > self.maximum_retries
98    }
99
100    /// Wait and then retry.
101    pub async fn wait_and_retry<D, T, F>(
102        &self,
103        id: D,
104        retries: u32,
105        callback: F,
106        mut cancel: watch::Receiver<CancelReason>,
107    ) -> Result<T>
108    where
109        D: std::fmt::Display,
110        F: Future<Output = T>,
111    {
112        let delay = self.delay(retries)?;
113        tracing::debug!(
114            id = %id,
115            delay = %delay,
116            retries = %retries,
117            maximum_retries = %self.maximum_retries,
118            "retry",
119        );
120
121        loop {
122            tokio::select! {
123                _ = cancel.changed() => {
124                    let reason = cancel.borrow();
125                    tracing::debug!(id = %id, "retry::canceled");
126                    return Err(Error::RetryCanceled(reason.clone()));
127                }
128                _ = tokio::time::sleep(Duration::from_millis(delay)) => {
129                    return Ok(callback.await)
130                }
131            };
132        }
133    }
134}
135
136pub(crate) async fn encode_device_signature(
137    signature: Ed25519Signature,
138) -> Result<String> {
139    let signature: BinaryEd25519Signature = signature.into();
140    Ok(bs58::encode(encode(&signature).await?).into_string())
141}
142
143pub(crate) fn bearer_prefix(device_signature: &str) -> String {
144    format!("Bearer {}", device_signature)
145}
146
147#[cfg(any(feature = "listen", feature = "pairing"))]
148mod websocket_request {
149    use crate::constants::X_SOS_ACCOUNT_ID;
150
151    use super::Result;
152    use sos_core::AccountId;
153    use tokio_tungstenite::tungstenite::{
154        self, client::IntoClientRequest, handshake::client::generate_key,
155    };
156    use url::Url;
157
158    /// Build a websocket connection request.
159    pub struct WebSocketRequest {
160        /// Account identifier.
161        pub account_id: AccountId,
162        /// Remote URI.
163        pub uri: Url,
164        /// Remote host.
165        pub host: String,
166        /// Bearer authentication.
167        pub bearer: Option<String>,
168        /// URL origin.
169        pub origin: url::Origin,
170    }
171
172    impl WebSocketRequest {
173        /// Create a new websocket request.
174        pub fn new(
175            account_id: AccountId,
176            url: &Url,
177            path: &str,
178        ) -> Result<Self> {
179            let origin = url.origin();
180            let host = url.host_str().unwrap().to_string();
181
182            let mut uri = url.join(path)?;
183            let scheme = if uri.scheme() == "http" {
184                "ws"
185            } else if uri.scheme() == "https" {
186                "wss"
187            } else {
188                panic!("bad url scheme for websocket, requires http(s)");
189            };
190
191            uri.set_scheme(scheme)
192                .expect("failed to set websocket scheme");
193
194            Ok(Self {
195                account_id,
196                host,
197                uri,
198                origin,
199                bearer: None,
200            })
201        }
202
203        /// Set bearer authorization.
204        pub fn set_bearer(&mut self, bearer: String) {
205            self.bearer = Some(bearer);
206        }
207    }
208
209    impl IntoClientRequest for WebSocketRequest {
210        fn into_client_request(
211            self,
212        ) -> std::result::Result<http::Request<()>, tungstenite::Error>
213        {
214            let origin = self.origin.unicode_serialization();
215            let mut request =
216                http::Request::builder().uri(self.uri.to_string());
217            if let Some(bearer) = self.bearer {
218                request = request.header("authorization", bearer);
219            }
220            request = request
221                .header("sec-websocket-key", generate_key())
222                .header("sec-websocket-version", "13")
223                .header("host", self.host)
224                .header("origin", origin)
225                .header("connection", "keep-alive, Upgrade")
226                .header(X_SOS_ACCOUNT_ID, self.account_id.to_string())
227                .header("upgrade", "websocket");
228            Ok(request.body(())?)
229        }
230    }
231}
232
233#[cfg(any(feature = "listen", feature = "pairing"))]
234pub use websocket_request::WebSocketRequest;