use async_trait::async_trait;
use futures::{
future, stream, Future as StdFuture, FutureExt, Stream as StdStream, StreamExt, TryFutureExt,
};
use serde::{de::DeserializeOwned, Deserialize, Serialize};
use std::{pin::Pin, time::Duration};
pub type Future<T> = Pin<Box<dyn StdFuture<Output = Result<T>> + Send>>;
pub type Stream<T> = Pin<Box<dyn StdStream<Item = Result<T>> + Send>>;
mod error;
mod values;
pub use error::{Error, Result};
pub use values::{Hnt, Hst, Usd};
pub mod accounts;
pub mod blocks;
pub mod hotspots;
pub mod oracle;
pub mod ouis;
pub mod pending_transactions;
pub mod validators;
pub mod vars;
pub const DEFAULT_TIMEOUT: u64 = 120;
pub const DEFAULT_BASE_URL: &str = "https://api.helium.io/v1";
pub const NO_QUERY: &[&str; 0] = &[""; 0];
#[derive(Clone, Deserialize, Debug)]
pub(crate) struct Data<T> {
pub data: T,
pub cursor: Option<String>,
}
#[derive(Clone, Debug)]
pub struct Client {
base_url: String,
client: reqwest::Client,
}
impl Default for Client {
fn default() -> Self {
Self::new_with_base_url(DEFAULT_BASE_URL.to_string())
}
}
impl Client {
pub fn new_with_base_url(base_url: String) -> Self {
Self::new_with_timeout(base_url, DEFAULT_TIMEOUT)
}
pub fn new_with_timeout(base_url: String, timeout: u64) -> Self {
let client = reqwest::Client::builder()
.gzip(true)
.timeout(Duration::from_secs(timeout))
.build()
.unwrap();
Self { base_url, client }
}
pub(crate) fn fetch_data<T, Q>(&self, path: &str, query: &Q) -> Future<Data<T>>
where
T: 'static + DeserializeOwned + std::marker::Send,
Q: Serialize + ?Sized,
{
let request_url = format!("{}{}", self.base_url, path);
self.client
.get(&request_url)
.query(query)
.send()
.map_err(error::Error::from)
.and_then(|response| match response.error_for_status() {
Ok(result) => {
let data: Future<Data<T>> = result.json().map_err(error::Error::from).boxed();
data
}
Err(e) => future::err(error::Error::from(e)).boxed(),
})
.boxed()
}
pub(crate) fn fetch_stream<E, Q>(&self, path: &str, query: &Q) -> Stream<E>
where
E: 'static + DeserializeOwned + std::marker::Send,
Q: Serialize + ?Sized,
{
let path = path.to_string();
let client = self.clone();
client
.fetch_data::<Vec<E>, _>(&path, query)
.map_ok(move |mut data| {
data.data.reverse();
stream::try_unfold(
(data, client, path),
|(mut data, client, path)| async move {
match data.data.pop() {
Some(entry) => Ok(Some((entry, (data, client, path)))),
None => match data.cursor {
Some(cursor) => {
let mut data = client
.fetch_data::<Vec<E>, _>(&path, &[("cursor", &cursor)])
.await?;
data.data.reverse();
match data.data.pop() {
Some(entry) => Ok(Some((entry, (data, client, path)))),
None => Ok(None),
}
}
None => Ok(None),
},
}
},
)
})
.try_flatten_stream()
.boxed()
}
pub(crate) async fn fetch<T, Q>(&self, path: &str, query: &Q) -> error::Result<T>
where
T: 'static + DeserializeOwned + std::marker::Send,
Q: Serialize + ?Sized,
{
let result = self.fetch_data(path, query).await?;
Ok(result.data)
}
pub(crate) fn post<T, R>(&self, path: &str, json: &T) -> Future<R>
where
T: Serialize + ?Sized,
R: 'static + DeserializeOwned + std::marker::Send,
{
let request_url = format!("{}{}", self.base_url, path);
self.client
.post(&request_url)
.json(json)
.send()
.map_err(error::Error::from)
.and_then(|response| match response.error_for_status() {
Ok(result) => {
let data: Future<R> = result
.json()
.map_err(error::Error::from)
.map_ok(|v: Data<R>| v.data)
.boxed();
data
}
Err(e) => future::err(error::Error::from(e)).boxed(),
})
.boxed()
}
}
#[async_trait]
pub trait IntoVec {
type Item;
async fn into_vec(self) -> Result<Vec<Self::Item>>;
}
#[async_trait]
impl<T> IntoVec for Stream<T>
where
T: std::marker::Send,
{
type Item = T;
async fn into_vec(self) -> Result<Vec<Self::Item>> {
self.collect::<Vec<error::Result<Self::Item>>>()
.await
.into_iter()
.collect()
}
}