cherry_ingest/
lib.rs

1#![allow(clippy::should_implement_trait)]
2
3use std::{collections::BTreeMap, pin::Pin};
4
5use anyhow::{anyhow, Context, Result};
6use arrow::record_batch::RecordBatch;
7use futures_lite::Stream;
8
9pub mod evm;
10mod provider;
11pub mod svm;
12
13#[derive(Debug, Clone)]
14pub enum Query {
15    Evm(evm::Query),
16    Svm(svm::Query),
17}
18
19#[cfg(feature = "pyo3")]
20impl<'py> pyo3::FromPyObject<'py> for Query {
21    fn extract_bound(ob: &pyo3::Bound<'py, pyo3::PyAny>) -> pyo3::PyResult<Self> {
22        use pyo3::types::PyAnyMethods;
23
24        let kind = ob.getattr("kind").context("get kind attribute")?;
25        let kind: &str = kind.extract().context("kind as str")?;
26
27        let query = ob.getattr("params").context("get params attribute")?;
28
29        match kind {
30            "evm" => Ok(Self::Evm(query.extract().context("parse query")?)),
31            "svm" => Ok(Self::Svm(query.extract().context("parse query")?)),
32            _ => Err(anyhow!("unknown query kind: {}", kind).into()),
33        }
34    }
35}
36
37#[derive(Debug, Clone)]
38#[cfg_attr(feature = "pyo3", derive(pyo3::FromPyObject))]
39pub struct ProviderConfig {
40    pub kind: ProviderKind,
41    pub query: Query,
42    pub url: Option<String>,
43    pub bearer_token: Option<String>,
44    pub max_num_retries: Option<usize>,
45    pub retry_backoff_ms: Option<u64>,
46    pub retry_base_ms: Option<u64>,
47    pub retry_ceiling_ms: Option<u64>,
48    pub http_req_timeout_millis: Option<u64>,
49}
50
51impl ProviderConfig {
52    pub fn new(kind: ProviderKind, query: Query) -> Self {
53        Self {
54            kind,
55            query,
56            url: None,
57            bearer_token: None,
58            max_num_retries: None,
59            retry_backoff_ms: None,
60            retry_base_ms: None,
61            retry_ceiling_ms: None,
62            http_req_timeout_millis: None,
63        }
64    }
65}
66
67#[derive(Debug, Clone, Copy)]
68pub enum ProviderKind {
69    Sqd,
70    Hypersync,
71}
72
73#[cfg(feature = "pyo3")]
74impl<'py> pyo3::FromPyObject<'py> for ProviderKind {
75    fn extract_bound(ob: &pyo3::Bound<'py, pyo3::PyAny>) -> pyo3::PyResult<Self> {
76        use pyo3::types::PyAnyMethods;
77
78        let out: &str = ob.extract().context("read as string")?;
79
80        match out {
81            "sqd" => Ok(Self::Sqd),
82            "hypersync" => Ok(Self::Hypersync),
83            _ => Err(anyhow!("unknown provider kind: {}", out).into()),
84        }
85    }
86}
87
88type DataStream = Pin<Box<dyn Stream<Item = Result<BTreeMap<String, RecordBatch>>> + Send + Sync>>;
89
90pub async fn start_stream(provider_config: ProviderConfig) -> Result<DataStream> {
91    match provider_config.kind {
92        ProviderKind::Sqd => provider::sqd::start_stream(provider_config),
93        ProviderKind::Hypersync => provider::hypersync::start_stream(provider_config).await,
94    }
95}