cherry_ingest/
lib.rs

1#![allow(clippy::should_implement_trait)]
2#![allow(clippy::field_reassign_with_default)]
3
4use std::{collections::BTreeMap, pin::Pin};
5
6use anyhow::{anyhow, Context, Result};
7use arrow::record_batch::RecordBatch;
8use futures_lite::Stream;
9
10pub mod evm;
11mod provider;
12pub mod svm;
13
14#[derive(Debug, Clone)]
15pub enum Query {
16    Evm(evm::Query),
17    Svm(svm::Query),
18}
19
20#[cfg(feature = "pyo3")]
21impl<'py> pyo3::FromPyObject<'py> for Query {
22    fn extract_bound(ob: &pyo3::Bound<'py, pyo3::PyAny>) -> pyo3::PyResult<Self> {
23        use pyo3::types::PyAnyMethods;
24
25        let kind = ob.getattr("kind").context("get kind attribute")?;
26        let kind: &str = kind.extract().context("kind as str")?;
27
28        let query = ob.getattr("params").context("get params attribute")?;
29
30        match kind {
31            "evm" => Ok(Self::Evm(query.extract().context("parse query")?)),
32            "svm" => Ok(Self::Svm(query.extract().context("parse query")?)),
33            _ => Err(anyhow!("unknown query kind: {}", kind).into()),
34        }
35    }
36}
37
38#[derive(Debug, Clone)]
39#[cfg_attr(feature = "pyo3", derive(pyo3::FromPyObject))]
40pub struct ProviderConfig {
41    pub kind: ProviderKind,
42    pub query: Query,
43    pub url: Option<String>,
44    pub bearer_token: Option<String>,
45    pub max_num_retries: Option<usize>,
46    pub retry_backoff_ms: Option<u64>,
47    pub retry_base_ms: Option<u64>,
48    pub retry_ceiling_ms: Option<u64>,
49    pub req_timeout_millis: Option<u64>,
50    pub stop_on_head: bool,
51    pub head_poll_interval_millis: Option<u64>,
52    pub buffer_size: Option<usize>,
53}
54
55impl ProviderConfig {
56    pub fn new(kind: ProviderKind, query: Query) -> Self {
57        Self {
58            kind,
59            query,
60            url: None,
61            bearer_token: None,
62            max_num_retries: None,
63            retry_backoff_ms: None,
64            retry_base_ms: None,
65            retry_ceiling_ms: None,
66            req_timeout_millis: None,
67            stop_on_head: false,
68            head_poll_interval_millis: None,
69            buffer_size: None,
70        }
71    }
72}
73
74#[derive(Debug, Clone, Copy)]
75pub enum ProviderKind {
76    Sqd,
77    Hypersync,
78    YellowstoneGrpc,
79}
80
81#[cfg(feature = "pyo3")]
82impl<'py> pyo3::FromPyObject<'py> for ProviderKind {
83    fn extract_bound(ob: &pyo3::Bound<'py, pyo3::PyAny>) -> pyo3::PyResult<Self> {
84        use pyo3::types::PyAnyMethods;
85
86        let out: &str = ob.extract().context("read as string")?;
87
88        match out {
89            "sqd" => Ok(Self::Sqd),
90            "hypersync" => Ok(Self::Hypersync),
91            "yellowstone_grpc" => Ok(Self::YellowstoneGrpc),
92            _ => Err(anyhow!("unknown provider kind: {}", out).into()),
93        }
94    }
95}
96
97type DataStream = Pin<Box<dyn Stream<Item = Result<BTreeMap<String, RecordBatch>>> + Send + Sync>>;
98
99pub async fn start_stream(provider_config: ProviderConfig) -> Result<DataStream> {
100    match provider_config.kind {
101        ProviderKind::Sqd => provider::sqd::start_stream(provider_config),
102        ProviderKind::Hypersync => provider::hypersync::start_stream(provider_config).await,
103        ProviderKind::YellowstoneGrpc => {
104            provider::yellowstone_grpc::start_stream(provider_config).await
105        }
106    }
107}