use crate::perms::oauth::SharedToken;
use futures_util::future::Either;
use futures_util::stream::{self, Stream, StreamExt};
use serde::de::DeserializeOwned;
use signet_tx_cache::{
error::TxCacheError,
types::{BundleKey, BundleList, CacheObject, CacheResponse, CachedBundle},
TxCache,
};
use thiserror::Error;
use tokio::sync::watch;
use tracing::instrument;
pub type Result<T> = core::result::Result<T, BuilderTxCacheError>;
#[derive(Debug, Error)]
pub enum BuilderTxCacheError {
#[error("auth token unavailable (background auth task stopped): {0}")]
TokenRetrieval(#[from] watch::error::RecvError),
#[error(transparent)]
TxCache(#[from] TxCacheError),
}
impl From<reqwest::Error> for BuilderTxCacheError {
fn from(err: reqwest::Error) -> Self {
BuilderTxCacheError::TxCache(TxCacheError::from(err))
}
}
impl From<url::ParseError> for BuilderTxCacheError {
fn from(err: url::ParseError) -> Self {
BuilderTxCacheError::TxCache(TxCacheError::Url(err))
}
}
const BUNDLES: &str = "bundles";
#[cfg(feature = "sse")]
const BUNDLES_FEED: &str = "bundles/feed";
#[derive(Debug, Clone)]
pub struct BuilderTxCache {
tx_cache: TxCache,
token: SharedToken,
}
impl std::ops::Deref for BuilderTxCache {
type Target = TxCache;
fn deref(&self) -> &Self::Target {
&self.tx_cache
}
}
impl std::ops::DerefMut for BuilderTxCache {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.tx_cache
}
}
impl BuilderTxCache {
pub fn new(url: reqwest::Url, token: SharedToken) -> Self {
Self {
tx_cache: TxCache::new(url),
token,
}
}
pub fn new_from_string(url: &str, token: SharedToken) -> Result<Self> {
let tx_cache = TxCache::new_from_string(url)?;
Ok(Self { tx_cache, token })
}
pub const fn new_with_client(
url: reqwest::Url,
client: reqwest::Client,
token: SharedToken,
) -> Self {
Self {
tx_cache: TxCache::new_with_client(url, client),
token,
}
}
pub const fn inner(&self) -> &TxCache {
&self.tx_cache
}
pub const fn token(&self) -> &SharedToken {
&self.token
}
async fn get_inner_with_token<T>(&self, join: &str, query: Option<T::Key>) -> Result<T>
where
T: DeserializeOwned + CacheObject,
{
let url = self.tx_cache.url().join(join)?;
let secret = self.token.secret().await?;
self.tx_cache
.client()
.get(url)
.query(&query)
.bearer_auth(secret)
.send()
.await?
.error_for_status()?
.json::<T>()
.await
.map_err(Into::into)
}
#[instrument(skip_all)]
pub async fn get_bundles(&self, query: Option<BundleKey>) -> Result<CacheResponse<BundleList>> {
self.get_inner_with_token::<CacheResponse<BundleList>>(BUNDLES, query)
.await
}
fn get_bundle_url_path(&self, bundle_id: &str) -> String {
format!("{BUNDLES}/{bundle_id}")
}
pub fn stream_bundles(&self) -> impl Stream<Item = Result<CachedBundle>> + Send + '_ {
stream::unfold(Some(None), move |cursor| async move {
let cursor = cursor?;
match self.get_bundles(cursor).await {
Ok(response) => {
let (inner, next_cursor) = response.into_parts();
let bundles = stream::iter(inner.bundles).map(Ok);
Some((Either::Left(bundles), next_cursor.map(Some)))
}
Err(error) => Some((Either::Right(stream::once(async { Err(error) })), None)),
}
})
.flatten()
}
#[instrument(skip_all)]
pub async fn get_bundle(&self, bundle_id: &str) -> Result<CachedBundle> {
let url_path = self.get_bundle_url_path(bundle_id);
let url = self.tx_cache.url().join(&url_path)?;
let secret = self.token.secret().await?;
self.tx_cache
.client()
.get(url)
.bearer_auth(secret)
.send()
.await?
.error_for_status()?
.json::<CachedBundle>()
.await
.map_err(Into::into)
}
}
#[cfg(feature = "sse")]
impl BuilderTxCache {
#[instrument(skip_all)]
pub async fn subscribe_bundles(
&self,
) -> Result<impl Stream<Item = Result<CachedBundle>> + Send> {
let secret = self.token.secret().await?;
let stream = self
.tx_cache
.subscribe_inner::<CachedBundle>(BUNDLES_FEED, Some(&secret))
.await?;
Ok(stream.map(|r| r.map_err(Into::into)))
}
}