use super::{Error, Result};
use crate::transfer::CancelReason;
use sos_core::encode;
use sos_signer::ed25519::{
BinaryEd25519Signature, Signature as Ed25519Signature,
};
use std::{
future::Future,
sync::{
atomic::{AtomicU32, Ordering},
Arc,
},
time::Duration,
};
use tokio::sync::watch;
mod http;
#[cfg(feature = "listen")]
mod websocket;
pub use self::http::{set_user_agent, HttpClient};
#[cfg(feature = "listen")]
pub use websocket::{changes, connect, ListenOptions, WebSocketHandle};
#[cfg(not(target_arch = "wasm32"))]
#[derive(Debug, Clone)]
pub struct NetworkRetry {
retries: Arc<AtomicU32>,
pub reconnect_interval: u16,
pub maximum_retries: u32,
}
#[cfg(not(target_arch = "wasm32"))]
impl Default for NetworkRetry {
fn default() -> Self {
Self::new(4, 1000)
}
}
#[cfg(not(target_arch = "wasm32"))]
impl NetworkRetry {
pub fn new(maximum_retries: u32, reconnect_interval: u16) -> Self {
Self {
retries: Arc::new(AtomicU32::from(1)),
reconnect_interval,
maximum_retries,
}
}
pub fn delay(&self, retries: u32) -> Result<u64> {
let factor = 2u64.checked_pow(retries).ok_or(Error::RetryOverflow)?;
Ok(self.reconnect_interval as u64 * factor)
}
pub fn retries(&self) -> u32 {
self.retries.load(Ordering::SeqCst)
}
pub fn maximum(&self) -> u32 {
self.maximum_retries
}
pub fn reset(&self) {
self.retries.store(1, Ordering::SeqCst)
}
pub fn clone_reset(&self) -> Self {
Self {
retries: Arc::new(AtomicU32::from(1)),
reconnect_interval: self.reconnect_interval,
maximum_retries: self.maximum_retries,
}
}
pub fn increment(&self) -> u32 {
self.retries.fetch_add(1, Ordering::SeqCst)
}
pub fn is_exhausted(&self, retries: u32) -> bool {
retries > self.maximum_retries
}
pub async fn wait_and_retry<D, T, F>(
&self,
id: D,
retries: u32,
callback: F,
mut cancel: watch::Receiver<CancelReason>,
) -> Result<T>
where
D: std::fmt::Display,
F: Future<Output = T>,
{
let delay = self.delay(retries)?;
tracing::debug!(
id = %id,
delay = %delay,
retries = %retries,
maximum_retries = %self.maximum_retries,
"retry",
);
tokio::select! {
_ = cancel.changed() => {
let reason = cancel.borrow();
tracing::debug!(id = %id, "retry::canceled");
Err(Error::RetryCanceled(reason.clone()))
}
_ = tokio::time::sleep(Duration::from_millis(delay)) => {
Ok(callback.await)
}
}
}
}
pub(crate) async fn encode_device_signature(
signature: Ed25519Signature,
) -> Result<String> {
let signature: BinaryEd25519Signature = signature.into();
Ok(bs58::encode(encode(&signature).await?).into_string())
}
pub(crate) fn bearer_prefix(device_signature: &str) -> String {
format!("Bearer {}", device_signature)
}
#[cfg(any(feature = "listen", feature = "pairing"))]
mod websocket_request {
use crate::constants::X_SOS_ACCOUNT_ID;
use super::Result;
use sos_core::AccountId;
use tokio_tungstenite::tungstenite::{
self, client::IntoClientRequest, handshake::client::generate_key,
};
use url::Url;
pub struct WebSocketRequest {
pub account_id: AccountId,
pub uri: Url,
pub host: String,
pub bearer: Option<String>,
pub origin: url::Origin,
}
impl WebSocketRequest {
pub fn new(
account_id: AccountId,
url: &Url,
path: &str,
) -> Result<Self> {
let origin = url.origin();
let host = url.host_str().unwrap().to_string();
let mut uri = url.join(path)?;
let scheme = if uri.scheme() == "http" {
"ws"
} else if uri.scheme() == "https" {
"wss"
} else {
panic!("bad url scheme for websocket, requires http(s)");
};
uri.set_scheme(scheme)
.expect("failed to set websocket scheme");
Ok(Self {
account_id,
host,
uri,
origin,
bearer: None,
})
}
pub fn set_bearer(&mut self, bearer: String) {
self.bearer = Some(bearer);
}
}
impl IntoClientRequest for WebSocketRequest {
fn into_client_request(
self,
) -> std::result::Result<http::Request<()>, tungstenite::Error>
{
let origin = self.origin.unicode_serialization();
let mut request =
http::Request::builder().uri(self.uri.to_string());
if let Some(bearer) = self.bearer {
request = request.header("authorization", bearer);
}
request = request
.header("sec-websocket-key", generate_key())
.header("sec-websocket-version", "13")
.header("host", self.host)
.header("origin", origin)
.header("connection", "keep-alive, Upgrade")
.header(X_SOS_ACCOUNT_ID, self.account_id.to_string())
.header("upgrade", "websocket");
Ok(request.body(())?)
}
}
}
#[cfg(any(feature = "listen", feature = "pairing"))]
pub use websocket_request::WebSocketRequest;