1#![allow(clippy::should_implement_trait)]
2
3use std::{collections::BTreeMap, pin::Pin};
4
5use anyhow::{anyhow, Context, Result};
6use arrow::record_batch::RecordBatch;
7use futures_lite::Stream;
8
9pub mod evm;
10mod provider;
11pub mod svm;
12
13#[derive(Debug, Clone)]
14pub enum Query {
15 Evm(evm::Query),
16 Svm(svm::Query),
17}
18
19#[cfg(feature = "pyo3")]
20impl<'py> pyo3::FromPyObject<'py> for Query {
21 fn extract_bound(ob: &pyo3::Bound<'py, pyo3::PyAny>) -> pyo3::PyResult<Self> {
22 use pyo3::types::PyAnyMethods;
23
24 let kind = ob.getattr("kind").context("get kind attribute")?;
25 let kind: &str = kind.extract().context("kind as str")?;
26
27 let query = ob.getattr("params").context("get params attribute")?;
28
29 match kind {
30 "evm" => Ok(Self::Evm(query.extract().context("parse query")?)),
31 "svm" => Ok(Self::Svm(query.extract().context("parse query")?)),
32 _ => Err(anyhow!("unknown query kind: {}", kind).into()),
33 }
34 }
35}
36
37#[derive(Debug, Clone)]
38#[cfg_attr(feature = "pyo3", derive(pyo3::FromPyObject))]
39pub struct ProviderConfig {
40 pub kind: ProviderKind,
41 pub query: Query,
42 pub url: Option<String>,
43 pub bearer_token: Option<String>,
44 pub max_num_retries: Option<usize>,
45 pub retry_backoff_ms: Option<u64>,
46 pub retry_base_ms: Option<u64>,
47 pub retry_ceiling_ms: Option<u64>,
48 pub http_req_timeout_millis: Option<u64>,
49}
50
51impl ProviderConfig {
52 pub fn new(kind: ProviderKind, query: Query) -> Self {
53 Self {
54 kind,
55 query,
56 url: None,
57 bearer_token: None,
58 max_num_retries: None,
59 retry_backoff_ms: None,
60 retry_base_ms: None,
61 retry_ceiling_ms: None,
62 http_req_timeout_millis: None,
63 }
64 }
65}
66
67#[derive(Debug, Clone, Copy)]
68pub enum ProviderKind {
69 Sqd,
70 Hypersync,
71}
72
73#[cfg(feature = "pyo3")]
74impl<'py> pyo3::FromPyObject<'py> for ProviderKind {
75 fn extract_bound(ob: &pyo3::Bound<'py, pyo3::PyAny>) -> pyo3::PyResult<Self> {
76 use pyo3::types::PyAnyMethods;
77
78 let out: &str = ob.extract().context("read as string")?;
79
80 match out {
81 "sqd" => Ok(Self::Sqd),
82 "hypersync" => Ok(Self::Hypersync),
83 _ => Err(anyhow!("unknown provider kind: {}", out).into()),
84 }
85 }
86}
87
88type DataStream = Pin<Box<dyn Stream<Item = Result<BTreeMap<String, RecordBatch>>> + Send + Sync>>;
89
90pub async fn start_stream(provider_config: ProviderConfig) -> Result<DataStream> {
91 match provider_config.kind {
92 ProviderKind::Sqd => provider::sqd::start_stream(provider_config),
93 ProviderKind::Hypersync => provider::hypersync::start_stream(provider_config).await,
94 }
95}