1use async_trait::async_trait;
2use futures::{
3 future, stream, Future as StdFuture, FutureExt, Stream as StdStream, StreamExt, TryFutureExt,
4};
5use serde::{de::DeserializeOwned, Deserialize, Serialize};
6use std::{pin::Pin, time::Duration};
7
8pub type Future<T> = Pin<Box<dyn StdFuture<Output = Result<T>> + Send>>;
10
11pub type Stream<T> = Pin<Box<dyn StdStream<Item = Result<T>> + Send>>;
13
14mod error;
15
16pub use error::{Error, Result};
17
18pub mod accounts;
19pub mod blocks;
20pub mod hotspots;
21pub mod models;
22pub mod oracle;
23pub mod ouis;
24pub mod pending_transactions;
25pub mod transactions;
26pub mod validators;
27pub mod vars;
28
29pub const DEFAULT_TIMEOUT: u64 = 120;
31pub const DEFAULT_BASE_URL: &str = "https://api.helium.io/v1";
33pub const NO_QUERY: &[&str; 0] = &[""; 0];
36
37#[derive(Clone, Deserialize, Debug)]
38pub(crate) struct Data<T> {
39 pub data: T,
40 pub cursor: Option<String>,
41}
42
43#[derive(Clone, Debug)]
44pub struct Client {
45 base_url: String,
46 client: reqwest::Client,
47}
48
49impl Client {
50 pub fn new_with_base_url(base_url: String, user_agent: &str) -> Self {
54 Self::new_with_timeout(base_url, user_agent, DEFAULT_TIMEOUT)
55 }
56
57 pub fn new_with_timeout(base_url: String, user_agent: &str, timeout: u64) -> Self {
61 let client = reqwest::Client::builder()
62 .gzip(true)
63 .user_agent(user_agent)
64 .timeout(Duration::from_secs(timeout))
65 .build()
66 .unwrap();
67 Self { base_url, client }
68 }
69
70 pub(crate) fn fetch_data<T, Q>(&self, path: &str, query: &Q) -> Future<Data<T>>
71 where
72 T: 'static + DeserializeOwned + std::marker::Send,
73 Q: Serialize + ?Sized,
74 {
75 let request_url = format!("{}{}", self.base_url, path);
76 self.client
77 .get(&request_url)
78 .query(query)
79 .send()
80 .map_err(error::Error::from)
81 .and_then(|response| match response.error_for_status() {
82 Ok(result) => {
83 let data: Future<Data<T>> = result.json().map_err(error::Error::from).boxed();
84 data
85 }
86 Err(e) => future::err(error::Error::from(e)).boxed(),
87 })
88 .boxed()
89 }
90
91 pub(crate) fn fetch_stream<E, Q>(&self, path: &str, query: &Q) -> Stream<E>
92 where
93 E: 'static + DeserializeOwned + std::marker::Send,
94 Q: Serialize + ?Sized,
95 {
96 let path = path.to_string();
97 let client = self.clone();
98 client
99 .fetch_data::<Vec<E>, _>(&path, query)
100 .map_ok(move |mut data| {
101 data.data.reverse();
102 stream::try_unfold(
103 (data, client, path),
104 |(mut data, client, path)| async move {
105 match data.data.pop() {
106 Some(entry) => Ok(Some((entry, (data, client, path)))),
107 None => match data.cursor {
108 Some(cursor) => {
109 let mut data: Data<Vec<E>>;
112 let mut cursor = cursor;
113 loop {
114 data = client
115 .fetch_data::<Vec<E>, _>(&path, &[("cursor", &cursor)])
116 .await?;
117
118 if !data.data.is_empty() {
119 data.data.reverse();
120 let entry = data.data.pop().unwrap();
121 break Ok(Some((entry, (data, client, path))));
122 } else if data.cursor.is_none() {
123 break Ok(None);
124 }
125 cursor = data.cursor.unwrap();
126 }
127 }
128 None => Ok(None),
129 },
130 }
131 },
132 )
133 })
134 .try_flatten_stream()
135 .boxed()
136 }
137
138 pub(crate) async fn fetch<T, Q>(&self, path: &str, query: &Q) -> error::Result<T>
139 where
140 T: 'static + DeserializeOwned + std::marker::Send,
141 Q: Serialize + ?Sized,
142 {
143 let result = self.fetch_data(path, query).await?;
144 Ok(result.data)
145 }
146
147 pub(crate) fn post<T, R>(&self, path: &str, json: &T) -> Future<R>
148 where
149 T: Serialize + ?Sized,
150 R: 'static + DeserializeOwned + std::marker::Send,
151 {
152 let request_url = format!("{}{}", self.base_url, path);
153 self.client
154 .post(&request_url)
155 .json(json)
156 .send()
157 .map_err(error::Error::from)
158 .and_then(|response| match response.error_for_status() {
159 Ok(result) => {
160 let data: Future<R> = result
161 .json()
162 .map_err(error::Error::from)
163 .map_ok(|v: Data<R>| v.data)
164 .boxed();
165 data
166 }
167 Err(e) => future::err(error::Error::from(e)).boxed(),
168 })
169 .boxed()
170 }
171}
172
173impl<T: ?Sized> IntoVec for T where T: StdStream {}
174
175#[async_trait]
176pub trait IntoVec: StreamExt {
177 async fn into_vec<T>(self) -> Result<Vec<T>>
178 where
179 Self: Sized,
180 T: std::marker::Send,
181 Vec<Result<T>>: Extend<Self::Item>,
182 {
183 self.collect::<Vec<Result<T>>>().await.into_iter().collect()
184 }
185}
186
187#[cfg(test)]
188fn get_test_client() -> Client {
189 use std::{env, thread, time};
190 const USER_AGENT: &str = "helium-api-test/0.1.0";
191 const BASE_URL: &str = "https://api.helium.io/v1";
192 let duration = time::Duration::from_millis(env::var("TEST_DELAY_MS").map_or(0, |v| {
193 v.parse::<u64>()
194 .expect("TEST_DELAY_MS cannot be parsed as u64")
195 }));
196 thread::sleep(duration);
197 Client::new_with_base_url(BASE_URL.into(), USER_AGENT)
198}