helium_api/
lib.rs

1use async_trait::async_trait;
2use futures::{
3    future, stream, Future as StdFuture, FutureExt, Stream as StdStream, StreamExt, TryFutureExt,
4};
5use serde::{de::DeserializeOwned, Deserialize, Serialize};
6use std::{pin::Pin, time::Duration};
7
8/// A type alias for `Future` that may return `crate::error::Error`
9pub type Future<T> = Pin<Box<dyn StdFuture<Output = Result<T>> + Send>>;
10
11/// A type alias for `Stream` that may result in `crate::error::Error`
12pub type Stream<T> = Pin<Box<dyn StdStream<Item = Result<T>> + Send>>;
13
14mod error;
15
16pub use error::{Error, Result};
17
18pub mod accounts;
19pub mod blocks;
20pub mod hotspots;
21pub mod models;
22pub mod oracle;
23pub mod ouis;
24pub mod pending_transactions;
25pub mod transactions;
26pub mod validators;
27pub mod vars;
28
29/// The default timeout for API requests
30pub const DEFAULT_TIMEOUT: u64 = 120;
31/// The default base URL if none is specified.
32pub const DEFAULT_BASE_URL: &str = "https://api.helium.io/v1";
33/// A utility constant to pass an empty query slice to the various client fetch
34/// functions
35pub const NO_QUERY: &[&str; 0] = &[""; 0];
36
37#[derive(Clone, Deserialize, Debug)]
38pub(crate) struct Data<T> {
39    pub data: T,
40    pub cursor: Option<String>,
41}
42
43#[derive(Clone, Debug)]
44pub struct Client {
45    base_url: String,
46    client: reqwest::Client,
47}
48
49impl Client {
50    /// Create a new client using a given base URL and a default
51    /// timeout. The library will use absoluate paths based on this
52    /// base_url.
53    pub fn new_with_base_url(base_url: String, user_agent: &str) -> Self {
54        Self::new_with_timeout(base_url, user_agent, DEFAULT_TIMEOUT)
55    }
56
57    /// Create a new client using a given base URL, and request
58    /// timeout value.  The library will use absoluate paths based on
59    /// the given base_url.
60    pub fn new_with_timeout(base_url: String, user_agent: &str, timeout: u64) -> Self {
61        let client = reqwest::Client::builder()
62            .gzip(true)
63            .user_agent(user_agent)
64            .timeout(Duration::from_secs(timeout))
65            .build()
66            .unwrap();
67        Self { base_url, client }
68    }
69
70    pub(crate) fn fetch_data<T, Q>(&self, path: &str, query: &Q) -> Future<Data<T>>
71    where
72        T: 'static + DeserializeOwned + std::marker::Send,
73        Q: Serialize + ?Sized,
74    {
75        let request_url = format!("{}{}", self.base_url, path);
76        self.client
77            .get(&request_url)
78            .query(query)
79            .send()
80            .map_err(error::Error::from)
81            .and_then(|response| match response.error_for_status() {
82                Ok(result) => {
83                    let data: Future<Data<T>> = result.json().map_err(error::Error::from).boxed();
84                    data
85                }
86                Err(e) => future::err(error::Error::from(e)).boxed(),
87            })
88            .boxed()
89    }
90
91    pub(crate) fn fetch_stream<E, Q>(&self, path: &str, query: &Q) -> Stream<E>
92    where
93        E: 'static + DeserializeOwned + std::marker::Send,
94        Q: Serialize + ?Sized,
95    {
96        let path = path.to_string();
97        let client = self.clone();
98        client
99            .fetch_data::<Vec<E>, _>(&path, query)
100            .map_ok(move |mut data| {
101                data.data.reverse();
102                stream::try_unfold(
103                    (data, client, path),
104                    |(mut data, client, path)| async move {
105                        match data.data.pop() {
106                            Some(entry) => Ok(Some((entry, (data, client, path)))),
107                            None => match data.cursor {
108                                Some(cursor) => {
109                                    //loop until we find next bit of data or run
110                                    // out of cursors
111                                    let mut data: Data<Vec<E>>;
112                                    let mut cursor = cursor;
113                                    loop {
114                                        data = client
115                                            .fetch_data::<Vec<E>, _>(&path, &[("cursor", &cursor)])
116                                            .await?;
117
118                                        if !data.data.is_empty() {
119                                            data.data.reverse();
120                                            let entry = data.data.pop().unwrap();
121                                            break Ok(Some((entry, (data, client, path))));
122                                        } else if data.cursor.is_none() {
123                                            break Ok(None);
124                                        }
125                                        cursor = data.cursor.unwrap();
126                                    }
127                                }
128                                None => Ok(None),
129                            },
130                        }
131                    },
132                )
133            })
134            .try_flatten_stream()
135            .boxed()
136    }
137
138    pub(crate) async fn fetch<T, Q>(&self, path: &str, query: &Q) -> error::Result<T>
139    where
140        T: 'static + DeserializeOwned + std::marker::Send,
141        Q: Serialize + ?Sized,
142    {
143        let result = self.fetch_data(path, query).await?;
144        Ok(result.data)
145    }
146
147    pub(crate) fn post<T, R>(&self, path: &str, json: &T) -> Future<R>
148    where
149        T: Serialize + ?Sized,
150        R: 'static + DeserializeOwned + std::marker::Send,
151    {
152        let request_url = format!("{}{}", self.base_url, path);
153        self.client
154            .post(&request_url)
155            .json(json)
156            .send()
157            .map_err(error::Error::from)
158            .and_then(|response| match response.error_for_status() {
159                Ok(result) => {
160                    let data: Future<R> = result
161                        .json()
162                        .map_err(error::Error::from)
163                        .map_ok(|v: Data<R>| v.data)
164                        .boxed();
165                    data
166                }
167                Err(e) => future::err(error::Error::from(e)).boxed(),
168            })
169            .boxed()
170    }
171}
172
173impl<T: ?Sized> IntoVec for T where T: StdStream {}
174
175#[async_trait]
176pub trait IntoVec: StreamExt {
177    async fn into_vec<T>(self) -> Result<Vec<T>>
178    where
179        Self: Sized,
180        T: std::marker::Send,
181        Vec<Result<T>>: Extend<Self::Item>,
182    {
183        self.collect::<Vec<Result<T>>>().await.into_iter().collect()
184    }
185}
186
187#[cfg(test)]
188fn get_test_client() -> Client {
189    use std::{env, thread, time};
190    const USER_AGENT: &str = "helium-api-test/0.1.0";
191    const BASE_URL: &str = "https://api.helium.io/v1";
192    let duration = time::Duration::from_millis(env::var("TEST_DELAY_MS").map_or(0, |v| {
193        v.parse::<u64>()
194            .expect("TEST_DELAY_MS cannot be parsed as u64")
195    }));
196    thread::sleep(duration);
197    Client::new_with_base_url(BASE_URL.into(), USER_AGENT)
198}