1use std::{cmp, collections::BTreeSet, error::Error, time::Duration};
2
3use anyhow::{anyhow, Context, Result};
4use arrayvec::ArrayVec;
5use format::{Address, LogArgument};
6use futures::StreamExt;
7use polars_arrow::{array::Array, record_batch::RecordBatch as Chunk};
8use reqwest::Method;
9use skar_net_types::{
10 skar_net_types_capnp, ArchiveHeight, FieldSelection, LogSelection, Query, RollbackGuard,
11 TransactionSelection,
12};
13
14mod column_mapping;
15pub mod config;
16mod decode;
17mod parquet_out;
18mod rayon_async;
19mod transport_format;
20mod types;
21
22pub use column_mapping::{ColumnMapping, DataType};
23pub use config::Config;
24pub use decode::Decoder;
25pub use parquet_out::map_batch_to_binary_view;
26pub use skar_format as format;
27use tokio::sync::mpsc;
28pub use transport_format::{ArrowIpc, TransportFormat};
29pub use types::{ArrowBatch, ParquetConfig, QueryResponse, QueryResponseData, StreamConfig};
30
31pub type ArrowChunk = Chunk<Box<dyn Array>>;
32
33#[derive(Clone)]
34pub struct Client {
35 http_client: reqwest::Client,
36 cfg: Config,
37}
38
39impl Client {
40 pub fn new(cfg: Config) -> Result<Self> {
42 let http_client = reqwest::Client::builder()
43 .no_gzip()
44 .timeout(Duration::from_millis(cfg.http_req_timeout_millis.get()))
45 .pool_max_idle_per_host(0)
46 .build()
47 .unwrap();
48
49 Ok(Self { http_client, cfg })
50 }
51
52 pub async fn create_parquet_folder(&self, query: Query, config: ParquetConfig) -> Result<()> {
56 parquet_out::create_parquet_folder(self, query, config).await
57 }
58
59 pub async fn get_height(&self) -> Result<u64> {
61 let mut url = self.cfg.url.clone();
62 let mut segments = url.path_segments_mut().ok().context("get path segments")?;
63 segments.push("height");
64 std::mem::drop(segments);
65 let mut req = self.http_client.request(Method::GET, url);
66
67 if let Some(bearer_token) = &self.cfg.bearer_token {
68 req = req.bearer_auth(bearer_token);
69 }
70
71 let res = req.send().await.context("execute http req")?;
72
73 let status = res.status();
74 if !status.is_success() {
75 return Err(anyhow!("http response status code {}", status));
76 }
77
78 let height: ArchiveHeight = res.json().await.context("read response body json")?;
79
80 Ok(height.height.unwrap_or(0))
81 }
82
83 pub async fn get_height_with_retry(&self) -> Result<u64> {
89 let mut base = 1;
90
91 loop {
92 match self.get_height().await {
93 Ok(res) => return Ok(res),
94 Err(e) => {
95 log::error!("failed to send request to skar server: {:?}", e);
96 }
97 }
98
99 let secs = Duration::from_secs(base);
100 let millis = Duration::from_millis(fastrange_rs::fastrange_64(rand::random(), 1000));
101
102 tokio::time::sleep(secs + millis).await;
103
104 base = std::cmp::min(base + 1, 5);
105 }
106 }
107
108 pub async fn stream<Format: TransportFormat>(
109 &self,
110 query: Query,
111 config: StreamConfig,
112 ) -> Result<mpsc::Receiver<Result<QueryResponse>>> {
113 let (tx, rx) = mpsc::channel(config.concurrency);
114
115 let to_block = match query.to_block {
116 Some(to_block) => to_block,
117 None => {
118 if config.retry {
119 self.get_height_with_retry().await.context("get height")?
120 } else {
121 self.get_height().await.context("get height")?
122 }
123 }
124 };
125
126 let client = self.clone();
127 let step = usize::try_from(config.batch_size).unwrap();
128 tokio::spawn(async move {
129 let mut query = query;
130 let initial_res = if config.retry {
131 client
132 .send_with_retry::<crate::ArrowIpc>(&query)
133 .await
134 .context("run initial query")
135 } else {
136 client
137 .send::<crate::ArrowIpc>(&query)
138 .await
139 .context("run initial query")
140 };
141 match initial_res {
142 Ok(res) => {
143 query.from_block = res.next_block;
144 if tx.send(Ok(res)).await.is_err() {
145 return;
146 }
147 }
148 Err(e) => {
149 tx.send(Err(e)).await.ok();
150 return;
151 }
152 }
153
154 let futs = (query.from_block..to_block)
155 .step_by(step)
156 .map(move |start| {
157 let end = cmp::min(start + config.batch_size, to_block);
158 let mut query = query.clone();
159 query.from_block = start;
160 query.to_block = Some(end);
161
162 Self::run_query_to_end(client.clone(), query, config.retry)
163 });
164
165 let mut stream = futures::stream::iter(futs).buffered(config.concurrency);
166
167 while let Some(resps) = stream.next().await {
168 let resps = match resps {
169 Ok(resps) => resps,
170 Err(e) => {
171 tx.send(Err(e)).await.ok();
172 return;
173 }
174 };
175
176 for resp in resps {
177 if tx.send(Ok(resp)).await.is_err() {
178 return;
179 }
180 }
181 }
182 });
183
184 Ok(rx)
185 }
186
187 async fn run_query_to_end(self, query: Query, retry: bool) -> Result<Vec<QueryResponse>> {
188 let mut resps = Vec::new();
189
190 let to_block = query.to_block.unwrap();
191
192 let mut query = query;
193
194 loop {
195 let resp = if retry {
196 self.send_with_retry::<crate::ArrowIpc>(&query)
197 .await
198 .context("send query")?
199 } else {
200 self.send::<crate::ArrowIpc>(&query)
201 .await
202 .context("send query")?
203 };
204
205 let next_block = resp.next_block;
206
207 resps.push(resp);
208
209 if next_block >= to_block {
210 break;
211 } else {
212 query.from_block = next_block;
213 }
214 }
215
216 Ok(resps)
217 }
218
219 pub async fn send<Format: TransportFormat>(&self, query: &Query) -> Result<QueryResponse> {
224 let mut url = self.cfg.url.clone();
225 let mut segments = url.path_segments_mut().ok().context("get path segments")?;
226 segments.push("query");
227 segments.push(Format::path());
228 std::mem::drop(segments);
229 let mut req = self.http_client.request(Method::POST, url);
230
231 if let Some(bearer_token) = &self.cfg.bearer_token {
232 req = req.bearer_auth(bearer_token);
233 }
234
235 let res = req.json(&query).send().await.context("execute http req")?;
236
237 let status = res.status();
238 if !status.is_success() {
239 let text = res.text().await.context("read text to see error")?;
240
241 return Err(anyhow!(
242 "http response status code {}, err body: {}",
243 status,
244 text
245 ));
246 }
247
248 let bytes = res.bytes().await.context("read response body bytes")?;
249
250 let res = tokio::task::block_in_place(|| {
251 Self::parse_query_response::<Format>(&bytes).context("parse query response")
252 })?;
253
254 Ok(res)
255 }
256
257 pub async fn send_with_retry<Format: TransportFormat>(
266 &self,
267 query: &Query,
268 ) -> Result<QueryResponse> {
269 let mut base = 1;
270
271 loop {
272 match self.send::<Format>(query).await {
273 Ok(res) => return Ok(res),
274 Err(e) => {
275 log::error!("failed to send request to skar server: {:?}", e);
276 }
277 }
278
279 let secs = Duration::from_secs(base);
280 let millis = Duration::from_millis(fastrange_rs::fastrange_64(rand::random(), 1000));
281
282 tokio::time::sleep(secs + millis).await;
283
284 base = std::cmp::min(base + 1, 5);
285 }
286 }
287
288 fn parse_query_response<Format: TransportFormat>(bytes: &[u8]) -> Result<QueryResponse> {
289 let mut opts = capnp::message::ReaderOptions::new();
290 opts.nesting_limit(i32::MAX).traversal_limit_in_words(None);
291 let message_reader =
292 capnp::serialize_packed::read_message(bytes, opts).context("create message reader")?;
293
294 let query_response = message_reader
295 .get_root::<skar_net_types_capnp::query_response::Reader>()
296 .context("get root")?;
297
298 let archive_height = match query_response.get_archive_height() {
299 -1 => None,
300 h => Some(
301 h.try_into()
302 .context("invalid archive height returned from server")?,
303 ),
304 };
305
306 let rollback_guard = if query_response.has_rollback_guard() {
307 let rg = query_response
308 .get_rollback_guard()
309 .context("get rollback guard")?;
310
311 Some(RollbackGuard {
312 block_number: rg.get_block_number(),
313 timestamp: rg.get_timestamp(),
314 hash: rg
315 .get_hash()
316 .context("get rollback guard hash")?
317 .try_into()
318 .context("hash size")?,
319 first_block_number: rg.get_first_block_number(),
320 first_parent_hash: rg
321 .get_first_parent_hash()
322 .context("get rollback guard first parent hash")?
323 .try_into()
324 .context("hash size")?,
325 })
326 } else {
327 None
328 };
329
330 let data = query_response.get_data().context("read data")?;
331
332 let blocks = Format::read_chunks(data.get_blocks().context("get data")?)
333 .context("parse block data")?;
334 let transactions = Format::read_chunks(data.get_transactions().context("get data")?)
335 .context("parse tx data")?;
336 let logs =
337 Format::read_chunks(data.get_logs().context("get data")?).context("parse log data")?;
338 let traces = if data.has_traces() {
339 Format::read_chunks(data.get_traces().context("get data")?)
340 .context("parse traces data")?
341 } else {
342 Vec::new()
343 };
344
345 Ok(QueryResponse {
346 archive_height,
347 next_block: query_response.get_next_block(),
348 total_execution_time: query_response.get_total_execution_time(),
349 data: QueryResponseData {
350 blocks,
351 transactions,
352 logs,
353 traces,
354 },
355 rollback_guard,
356 })
357 }
358
359 pub fn preset_query_blocks_and_transactions(from_block: u64, to_block: Option<u64>) -> Query {
364 let all_block_fields: BTreeSet<String> = skar_schema::block_header()
365 .fields
366 .iter()
367 .map(|x| x.name.clone())
368 .collect();
369
370 let all_tx_fields: BTreeSet<String> = skar_schema::transaction()
371 .fields
372 .iter()
373 .map(|x| x.name.clone())
374 .collect();
375
376 Query {
377 from_block,
378 to_block,
379 include_all_blocks: true,
380 transactions: vec![TransactionSelection::default()],
381 field_selection: FieldSelection {
382 block: all_block_fields,
383 transaction: all_tx_fields,
384 ..Default::default()
385 },
386 ..Default::default()
387 }
388 }
389
390 pub fn preset_query_blocks_and_transaction_hashes(
396 from_block: u64,
397 to_block: Option<u64>,
398 ) -> Query {
399 let mut tx_field_selection = BTreeSet::new();
400 tx_field_selection.insert("block_hash".to_owned());
401 tx_field_selection.insert("block_number".to_owned());
402 tx_field_selection.insert("hash".to_owned());
403
404 let all_block_fields: BTreeSet<String> = skar_schema::block_header()
405 .fields
406 .iter()
407 .map(|x| x.name.clone())
408 .collect();
409
410 Query {
411 from_block,
412 to_block,
413 include_all_blocks: true,
414 transactions: vec![TransactionSelection::default()],
415 field_selection: FieldSelection {
416 block: all_block_fields,
417 transaction: tx_field_selection,
418 ..Default::default()
419 },
420 ..Default::default()
421 }
422 }
423
424 pub fn preset_query_logs<A>(from_block: u64, to_block: Option<u64>, address: A) -> Result<Query>
429 where
430 A: TryInto<Address>,
431 <A as TryInto<Address>>::Error: Error + Send + Sync + 'static,
432 {
433 let address = address.try_into().context("convert Address type")?;
434
435 let all_log_fields: BTreeSet<String> = skar_schema::log()
436 .fields
437 .iter()
438 .map(|x| x.name.clone())
439 .collect();
440
441 Ok(Query {
442 from_block,
443 to_block,
444 logs: vec![LogSelection {
445 address: vec![address],
446 ..Default::default()
447 }],
448 field_selection: FieldSelection {
449 log: all_log_fields,
450 ..Default::default()
451 },
452 ..Default::default()
453 })
454 }
455
456 pub fn preset_query_logs_of_event<A, T>(
462 from_block: u64,
463 to_block: Option<u64>,
464 topic0: T,
465 address: A,
466 ) -> Result<Query>
467 where
468 A: TryInto<Address>,
469 <A as TryInto<Address>>::Error: Error + Send + Sync + 'static,
470 T: TryInto<LogArgument>,
471 <T as TryInto<LogArgument>>::Error: Error + Send + Sync + 'static,
472 {
473 let topic0 = topic0.try_into().context("convert Topic0 type")?;
474 let mut topics = ArrayVec::<Vec<LogArgument>, 4>::new();
475 topics.insert(0, vec![topic0]);
476
477 let address = address.try_into().context("convert Address type")?;
478
479 let all_log_fields: BTreeSet<String> = skar_schema::log()
480 .fields
481 .iter()
482 .map(|x| x.name.clone())
483 .collect();
484
485 Ok(Query {
486 from_block,
487 to_block,
488 logs: vec![LogSelection {
489 address: vec![address],
490 topics,
491 }],
492 field_selection: FieldSelection {
493 log: all_log_fields,
494 ..Default::default()
495 },
496 ..Default::default()
497 })
498 }
499
500 pub fn preset_query_transactions(from_block: u64, to_block: Option<u64>) -> Query {
505 let all_txn_fields: BTreeSet<String> = skar_schema::transaction()
506 .fields
507 .iter()
508 .map(|x| x.name.clone())
509 .collect();
510
511 Query {
512 from_block,
513 to_block,
514 transactions: vec![TransactionSelection::default()],
515 field_selection: FieldSelection {
516 transaction: all_txn_fields,
517 ..Default::default()
518 },
519 ..Default::default()
520 }
521 }
522
523 pub fn preset_query_transactions_from_address<A>(
528 from_block: u64,
529 to_block: Option<u64>,
530 address: A,
531 ) -> Result<Query>
532 where
533 A: TryInto<Address>,
534 <A as TryInto<Address>>::Error: Error + Send + Sync + 'static,
535 {
536 let address = address.try_into().context("convert Address type")?;
537
538 let all_txn_fields: BTreeSet<String> = skar_schema::transaction()
539 .fields
540 .iter()
541 .map(|x| x.name.clone())
542 .collect();
543
544 Ok(Query {
545 from_block,
546 to_block,
547 transactions: vec![TransactionSelection {
548 from: vec![address],
549 ..Default::default()
550 }],
551 field_selection: FieldSelection {
552 transaction: all_txn_fields,
553 ..Default::default()
554 },
555 ..Default::default()
556 })
557 }
558}