Skip to main content

tiders_ingest/
lib.rs

1#![expect(
2    clippy::should_implement_trait,
3    reason = "LogKind::from_str is a constructor pattern, not Trait impl"
4)]
5#![expect(
6    clippy::field_reassign_with_default,
7    reason = "ProviderConfig is built by setting fields after construction"
8)]
9
10//! # tiders-ingest
11//!
12//! Streams blockchain data from multiple provider backends as Apache Arrow RecordBatches.
13//!
14//! Supports both EVM (Ethereum) and SVM (Solana) chains through a unified [`Query`] enum.
15//! Data is fetched from one of three provider backends:
16//!
17//! - **SQD** ([`ProviderKind::Sqd`]) — SQD Network portal for historical data.
18//! - **HyperSync** ([`ProviderKind::Hypersync`]) — Envio HyperSync for fast historical data.
19//! - **RPC** ([`ProviderKind::Rpc`]) — Direct JSON-RPC node connection.
20//!
21//! Use [`start_stream`] to create an async stream of `BTreeMap<String, RecordBatch>` where
22//! keys are table names (e.g. "blocks", "transactions", "logs").
23
24use std::{collections::BTreeMap, pin::Pin, sync::Arc};
25
26#[cfg(feature = "pyo3")]
27use anyhow::anyhow;
28use anyhow::{Context, Result};
29use arrow::array::UInt64Array;
30use arrow::compute::kernels::aggregate::max as array_max;
31use arrow::record_batch::RecordBatch;
32use futures_lite::{Stream, StreamExt};
33use provider::common::{evm_query_to_generic, svm_query_to_generic};
34use serde::de::DeserializeOwned;
35
36pub mod evm;
37mod provider;
38mod rayon_async;
39pub mod svm;
40
41/// Top-level query type: either an EVM or SVM blockchain data query.
42#[derive(Debug, Clone)]
43pub enum Query {
44    Evm(evm::Query),
45    Svm(svm::Query),
46}
47
48#[cfg(feature = "pyo3")]
49impl<'py> pyo3::FromPyObject<'py> for Query {
50    fn extract_bound(ob: &pyo3::Bound<'py, pyo3::PyAny>) -> pyo3::PyResult<Self> {
51        use pyo3::types::PyAnyMethods;
52
53        let kind = ob.getattr("kind").context("get kind attribute")?;
54        let kind: &str = kind.extract().context("kind as str")?;
55
56        let query = ob.getattr("params").context("get params attribute")?;
57
58        match kind {
59            "evm" => Ok(Self::Evm(query.extract().context("parse query")?)),
60            "svm" => Ok(Self::Svm(query.extract().context("parse query")?)),
61            _ => Err(anyhow!("unknown query kind: {kind}").into()),
62        }
63    }
64}
65
66/// Configuration for a data provider, including connection details and retry settings.
67#[derive(Debug, Clone)]
68#[cfg_attr(feature = "pyo3", derive(pyo3::FromPyObject))]
69pub struct ProviderConfig {
70    pub kind: ProviderKind,
71    pub url: Option<String>,
72    pub bearer_token: Option<String>,
73    pub max_num_retries: Option<usize>,
74    pub retry_backoff_ms: Option<u64>,
75    pub retry_base_ms: Option<u64>,
76    pub retry_ceiling_ms: Option<u64>,
77    pub req_timeout_millis: Option<u64>,
78    pub stop_on_head: bool,
79    pub head_poll_interval_millis: Option<u64>,
80    pub buffer_size: Option<usize>,
81    // RPC-specific fields
82    pub compute_units_per_second: Option<u64>,
83    pub batch_size: Option<usize>,
84    pub reorg_safe_distance: Option<u64>,
85    pub trace_method: Option<RpcTraceMethod>,
86}
87
88impl ProviderConfig {
89    pub fn new(kind: ProviderKind) -> Self {
90        Self {
91            kind,
92            url: None,
93            bearer_token: None,
94            max_num_retries: None,
95            retry_backoff_ms: None,
96            retry_base_ms: None,
97            retry_ceiling_ms: None,
98            req_timeout_millis: None,
99            stop_on_head: false,
100            head_poll_interval_millis: None,
101            buffer_size: None,
102            compute_units_per_second: None,
103            batch_size: None,
104            reorg_safe_distance: None,
105            trace_method: None,
106        }
107    }
108}
109
110/// Selects which RPC method to use for fetching EVM execution traces.
111#[derive(Debug, Clone, Copy)]
112pub enum RpcTraceMethod {
113    /// Uses `trace_block` (Parity/OpenEthereum style).
114    TraceBlock,
115    /// Uses `debug_traceBlockByNumber` (Geth style).
116    DebugTraceBlockByNumber,
117}
118
119#[cfg(feature = "pyo3")]
120impl<'py> pyo3::FromPyObject<'py> for RpcTraceMethod {
121    fn extract_bound(ob: &pyo3::Bound<'py, pyo3::PyAny>) -> pyo3::PyResult<Self> {
122        use pyo3::types::PyAnyMethods;
123
124        let out: &str = ob.extract().context("read as string")?;
125
126        match out {
127            "trace_block" => Ok(Self::TraceBlock),
128            "debug_trace_block_by_number" => Ok(Self::DebugTraceBlockByNumber),
129            _ => Err(anyhow!("unknown trace method: {out}").into()),
130        }
131    }
132}
133
134/// The type of data provider backend.
135#[derive(Debug, Clone, Copy)]
136pub enum ProviderKind {
137    /// SQD Network portal.
138    Sqd,
139    /// Envio HyperSync.
140    Hypersync,
141    /// Direct JSON-RPC node.
142    Rpc,
143}
144
145#[cfg(feature = "pyo3")]
146impl<'py> pyo3::FromPyObject<'py> for ProviderKind {
147    fn extract_bound(ob: &pyo3::Bound<'py, pyo3::PyAny>) -> pyo3::PyResult<Self> {
148        use pyo3::types::PyAnyMethods;
149
150        let out: &str = ob.extract().context("read as string")?;
151
152        match out {
153            "sqd" => Ok(Self::Sqd),
154            "hypersync" => Ok(Self::Hypersync),
155            "rpc" => Ok(Self::Rpc),
156            _ => Err(anyhow!("unknown provider kind: {out}").into()),
157        }
158    }
159}
160
161/// One yielded chunk from [`start_stream`]: the projected tables plus the
162/// highest block-id observed in this chunk (`None` if the chunk has no rows
163/// in any table carrying a block-id column).
164#[derive(Debug)]
165pub struct StreamItem {
166    pub data: BTreeMap<String, RecordBatch>,
167    pub last_block: Option<u64>,
168}
169
170/// Public stream type returned by [`start_stream`].
171pub type DataStream = Pin<Box<dyn Stream<Item = Result<StreamItem>> + Send + Sync>>;
172
173/// Internal stream type returned by each provider before projection /
174/// `last_block` extraction.
175pub(crate) type ProviderStream =
176    Pin<Box<dyn Stream<Item = Result<BTreeMap<String, RecordBatch>>> + Send + Sync>>;
177
178/// Returns the maximum value across every column in `data` whose name is one
179/// of the known block-id column names (`block_number` for EVM non-block
180/// tables, `number` for the EVM blocks table, `slot` for SVM tables).
181///
182/// Runs against pre-projection batches, where these columns are guaranteed to
183/// be present (they are required by the local query filter step).
184fn extract_last_block(data: &BTreeMap<String, RecordBatch>) -> Option<u64> {
185    const BLOCK_ID_COLUMNS: &[&str] = &["block_number", "number", "slot"];
186
187    let mut out: Option<u64> = None;
188    for batch in data.values() {
189        for col_name in BLOCK_ID_COLUMNS {
190            let Some(col) = batch.column_by_name(col_name) else {
191                continue;
192            };
193            let Some(arr) = col.as_any().downcast_ref::<UInt64Array>() else {
194                continue;
195            };
196            if let Some(m) = array_max(arr) {
197                out = Some(out.map_or(m, |cur| cur.max(m)));
198            }
199        }
200    }
201    out
202}
203
204fn make_req_fields<T: DeserializeOwned>(query: &tiders_query::Query) -> Result<T> {
205    let mut req_fields_query = query.clone();
206    req_fields_query
207        .add_request_and_include_fields()
208        .context("add req and include fields")?;
209
210    let fields = req_fields_query
211        .fields
212        .into_iter()
213        .map(|(k, v)| -> Result<_> {
214            Ok((
215                k.strip_suffix('s')
216                    .context("field key should end with 's'")?
217                    .to_owned(),
218                v.into_iter()
219                    .map(|v| (v, true))
220                    .collect::<BTreeMap<String, bool>>(),
221            ))
222        })
223        .collect::<Result<BTreeMap<String, _>>>()?;
224
225    let json_value = serde_json::to_value(&fields).context("serialize fields to JSON")?;
226    serde_json::from_value(json_value).context("deserialize fields from JSON")
227}
228
229/// Creates an async stream of Arrow RecordBatches from the configured provider.
230pub async fn start_stream(provider_config: ProviderConfig, mut query: Query) -> Result<DataStream> {
231    let generic_query = match &mut query {
232        Query::Evm(evm_query) => {
233            let generic_query = evm_query_to_generic(evm_query).context("validate evm query")?;
234
235            evm_query.fields = make_req_fields(&generic_query).context("make req fields")?;
236
237            generic_query
238        }
239        Query::Svm(svm_query) => {
240            let generic_query = svm_query_to_generic(svm_query);
241
242            svm_query.fields = make_req_fields(&generic_query).context("make req fields")?;
243
244            generic_query
245        }
246    };
247    let generic_query = Arc::new(generic_query);
248
249    let stream = match provider_config.kind {
250        ProviderKind::Sqd => {
251            provider::sqd::start_stream(provider_config, query).context("start sqd stream")?
252        }
253        ProviderKind::Hypersync => provider::hypersync::start_stream(provider_config, query)
254            .await
255            .context("start hypersync stream")?,
256        ProviderKind::Rpc => {
257            provider::rpc::start_stream(&provider_config, query).context("start rpc stream")?
258        }
259    };
260
261    let stream = stream.then(move |res| {
262        let generic_query = Arc::clone(&generic_query);
263        async {
264            rayon_async::spawn(move || {
265                res.and_then(move |data| {
266                    let last_block = extract_last_block(&data);
267                    let data = tiders_query::run_query(&data, &generic_query)
268                        .context("run local query")?;
269                    Ok(StreamItem { data, last_block })
270                })
271            })
272            .await
273            .context("rayon task was cancelled")
274            .and_then(|r| r)
275        }
276    });
277
278    Ok(Box::pin(stream))
279}
280
281#[cfg(test)]
282mod tests {
283
284    use super::*;
285    use crate::svm::*;
286    use parquet::arrow::ArrowWriter;
287    use std::fs::File;
288
289    #[tokio::test]
290    #[ignore]
291    async fn simple_svm_start_stream() {
292        let mut provider_config = ProviderConfig::new(ProviderKind::Sqd);
293        provider_config.url = Some("https://portal.sqd.dev/datasets/solana-mainnet".to_string());
294
295        let program_id = "TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA";
296        let program_id: [u8; 32] = bs58::decode(program_id)
297            .into_vec()
298            .unwrap()
299            .try_into()
300            .unwrap();
301        let program_id = Address(program_id);
302
303        let query = crate::Query::Svm(svm::Query {
304            from_block: 329443000,
305            to_block: Some(329443000),
306            include_all_blocks: false,
307            fields: Fields {
308                instruction: InstructionFields::all(),
309                transaction: TransactionFields::default(),
310                log: LogFields::default(),
311                balance: BalanceFields::default(),
312                token_balance: TokenBalanceFields::default(),
313                reward: RewardFields::default(),
314                block: BlockFields::default(),
315            },
316            instructions: vec![
317                // InstructionRequest::default() ,
318                InstructionRequest {
319                    program_id: vec![program_id],
320                    discriminator: vec![Data(vec![12, 96, 49, 128, 22])],
321                    ..Default::default()
322                },
323            ],
324            transactions: vec![],
325            logs: vec![],
326            balances: vec![],
327            token_balances: vec![],
328            rewards: vec![],
329        });
330        let mut stream = start_stream(provider_config, query).await.unwrap();
331        let item = stream.next().await.unwrap().unwrap();
332        for (k, v) in item.data.into_iter() {
333            let mut file = File::create(format!("{}.parquet", k)).unwrap();
334            let mut writer = ArrowWriter::try_new(&mut file, v.schema(), None).unwrap();
335            writer.write(&v).unwrap();
336            writer.close().unwrap();
337        }
338    }
339
340    /// End-to-end smoke test for the public `start_stream` surface against the
341    /// SQD ethereum-mainnet portal: fetches ERC-20 Transfer logs over a small
342    /// block range and asserts the `StreamItem` invariants
343    /// (`from_block` / `to_block` are mirrored on the query; `last_block` is
344    /// monotonic, within the requested range, and reaches `to_block` once the
345    /// stream exhausts).
346    #[tokio::test(flavor = "multi_thread")]
347    #[ignore]
348    async fn simple_start_stream() {
349        use crate::evm::{Fields, LogFields, LogRequest, Query as EvmQuery, Topic};
350
351        // keccak256("Transfer(address,address,uint256)")
352        let transfer_topic0: [u8; 32] = {
353            let mut out = [0u8; 32];
354            faster_hex::hex_decode(
355                b"ddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef",
356                &mut out,
357            )
358            .unwrap();
359            out
360        };
361
362        let from_block = 18_000_000u64;
363        let to_block = 18_000_010u64;
364        // let to_block = 18_001_000u64;
365
366        let mut provider_config = ProviderConfig::new(ProviderKind::Sqd);
367        provider_config.url = Some("https://portal.sqd.dev/datasets/ethereum-mainnet".to_string());
368        provider_config.stop_on_head = true;
369
370        // let mut provider_config = ProviderConfig::new(ProviderKind::Hypersync);
371        // provider_config.url =
372        //     Some("https://eth.hypersync.xyz/".to_string());
373        // provider_config.stop_on_head = true;
374        // provider_config.bearer_token = Some("add_token".to_string());
375
376        let mut fields = Fields::default();
377        fields.log = LogFields {
378            log_index: true,
379            block_number: true,
380            address: true,
381            topic0: true,
382            ..LogFields::default()
383        };
384
385        let query = crate::Query::Evm(EvmQuery {
386            from_block,
387            to_block: Some(to_block),
388            include_all_blocks: false,
389            logs: vec![LogRequest {
390                topic0: vec![Topic(transfer_topic0)],
391                ..LogRequest::default()
392            }],
393            transactions: vec![],
394            traces: vec![],
395            fields,
396        });
397
398        let mut stream = start_stream(provider_config, query).await.unwrap();
399
400        println!("from_block={} to_block={}", from_block, to_block);
401
402        let mut highest: Option<u64> = None;
403        let mut total_rows: usize = 0;
404
405        while let Some(res) = stream.next().await {
406            let item = res.unwrap();
407            println!("item last_block={:?}", item.last_block);
408
409            // Every yielded last_block must sit inside the requested window.
410            if let Some(lb) = item.last_block {
411                assert!(
412                    lb >= from_block,
413                    "last_block {} < from_block {}",
414                    lb,
415                    from_block
416                );
417                assert!(lb <= to_block, "last_block {} > to_block {}", lb, to_block);
418            }
419
420            // Monotonicity across items.
421            if let (Some(prev), Some(cur)) = (highest, item.last_block) {
422                assert!(
423                    cur >= prev,
424                    "last_block went backwards: {} -> {}",
425                    prev,
426                    cur
427                );
428            }
429            if item.last_block.is_some() {
430                highest = item.last_block;
431            }
432
433            if let Some(logs) = item.data.get("logs") {
434                total_rows += logs.num_rows();
435            }
436        }
437
438        println!(
439            "final: from_block={} to_block={} last_block={:?} total_rows={}",
440            from_block, to_block, highest, total_rows
441        );
442
443        // The stream must have delivered at least one Transfer log in a
444        // 10-block window of mainnet, and must have reached the configured
445        // upper bound by the time it closed.
446        assert!(
447            total_rows > 0,
448            "expected at least one Transfer log in window"
449        );
450        assert_eq!(highest, Some(to_block));
451    }
452}