1#![expect(
2 clippy::should_implement_trait,
3 reason = "LogKind::from_str is a constructor pattern, not Trait impl"
4)]
5#![expect(
6 clippy::field_reassign_with_default,
7 reason = "ProviderConfig is built by setting fields after construction"
8)]
9
10use std::{collections::BTreeMap, pin::Pin, sync::Arc};
25
26#[cfg(feature = "pyo3")]
27use anyhow::anyhow;
28use anyhow::{Context, Result};
29use arrow::array::UInt64Array;
30use arrow::compute::kernels::aggregate::max as array_max;
31use arrow::record_batch::RecordBatch;
32use futures_lite::{Stream, StreamExt};
33use provider::common::{evm_query_to_generic, svm_query_to_generic};
34use serde::de::DeserializeOwned;
35
36pub mod evm;
37mod provider;
38mod rayon_async;
39pub mod svm;
40
41#[derive(Debug, Clone)]
43pub enum Query {
44 Evm(evm::Query),
45 Svm(svm::Query),
46}
47
48#[cfg(feature = "pyo3")]
49impl<'py> pyo3::FromPyObject<'py> for Query {
50 fn extract_bound(ob: &pyo3::Bound<'py, pyo3::PyAny>) -> pyo3::PyResult<Self> {
51 use pyo3::types::PyAnyMethods;
52
53 let kind = ob.getattr("kind").context("get kind attribute")?;
54 let kind: &str = kind.extract().context("kind as str")?;
55
56 let query = ob.getattr("params").context("get params attribute")?;
57
58 match kind {
59 "evm" => Ok(Self::Evm(query.extract().context("parse query")?)),
60 "svm" => Ok(Self::Svm(query.extract().context("parse query")?)),
61 _ => Err(anyhow!("unknown query kind: {kind}").into()),
62 }
63 }
64}
65
66#[derive(Debug, Clone)]
68#[cfg_attr(feature = "pyo3", derive(pyo3::FromPyObject))]
69pub struct ProviderConfig {
70 pub kind: ProviderKind,
71 pub url: Option<String>,
72 pub bearer_token: Option<String>,
73 pub max_num_retries: Option<usize>,
74 pub retry_backoff_ms: Option<u64>,
75 pub retry_base_ms: Option<u64>,
76 pub retry_ceiling_ms: Option<u64>,
77 pub req_timeout_millis: Option<u64>,
78 pub stop_on_head: bool,
79 pub head_poll_interval_millis: Option<u64>,
80 pub buffer_size: Option<usize>,
81 pub compute_units_per_second: Option<u64>,
83 pub batch_size: Option<usize>,
84 pub reorg_safe_distance: Option<u64>,
85 pub trace_method: Option<RpcTraceMethod>,
86}
87
88impl ProviderConfig {
89 pub fn new(kind: ProviderKind) -> Self {
90 Self {
91 kind,
92 url: None,
93 bearer_token: None,
94 max_num_retries: None,
95 retry_backoff_ms: None,
96 retry_base_ms: None,
97 retry_ceiling_ms: None,
98 req_timeout_millis: None,
99 stop_on_head: false,
100 head_poll_interval_millis: None,
101 buffer_size: None,
102 compute_units_per_second: None,
103 batch_size: None,
104 reorg_safe_distance: None,
105 trace_method: None,
106 }
107 }
108}
109
110#[derive(Debug, Clone, Copy)]
112pub enum RpcTraceMethod {
113 TraceBlock,
115 DebugTraceBlockByNumber,
117}
118
119#[cfg(feature = "pyo3")]
120impl<'py> pyo3::FromPyObject<'py> for RpcTraceMethod {
121 fn extract_bound(ob: &pyo3::Bound<'py, pyo3::PyAny>) -> pyo3::PyResult<Self> {
122 use pyo3::types::PyAnyMethods;
123
124 let out: &str = ob.extract().context("read as string")?;
125
126 match out {
127 "trace_block" => Ok(Self::TraceBlock),
128 "debug_trace_block_by_number" => Ok(Self::DebugTraceBlockByNumber),
129 _ => Err(anyhow!("unknown trace method: {out}").into()),
130 }
131 }
132}
133
134#[derive(Debug, Clone, Copy)]
136pub enum ProviderKind {
137 Sqd,
139 Hypersync,
141 Rpc,
143}
144
145#[cfg(feature = "pyo3")]
146impl<'py> pyo3::FromPyObject<'py> for ProviderKind {
147 fn extract_bound(ob: &pyo3::Bound<'py, pyo3::PyAny>) -> pyo3::PyResult<Self> {
148 use pyo3::types::PyAnyMethods;
149
150 let out: &str = ob.extract().context("read as string")?;
151
152 match out {
153 "sqd" => Ok(Self::Sqd),
154 "hypersync" => Ok(Self::Hypersync),
155 "rpc" => Ok(Self::Rpc),
156 _ => Err(anyhow!("unknown provider kind: {out}").into()),
157 }
158 }
159}
160
161#[derive(Debug)]
165pub struct StreamItem {
166 pub data: BTreeMap<String, RecordBatch>,
167 pub last_block: Option<u64>,
168}
169
170pub type DataStream = Pin<Box<dyn Stream<Item = Result<StreamItem>> + Send + Sync>>;
172
173pub(crate) type ProviderStream =
176 Pin<Box<dyn Stream<Item = Result<BTreeMap<String, RecordBatch>>> + Send + Sync>>;
177
178fn extract_last_block(data: &BTreeMap<String, RecordBatch>) -> Option<u64> {
185 const BLOCK_ID_COLUMNS: &[&str] = &["block_number", "number", "slot"];
186
187 let mut out: Option<u64> = None;
188 for batch in data.values() {
189 for col_name in BLOCK_ID_COLUMNS {
190 let Some(col) = batch.column_by_name(col_name) else {
191 continue;
192 };
193 let Some(arr) = col.as_any().downcast_ref::<UInt64Array>() else {
194 continue;
195 };
196 if let Some(m) = array_max(arr) {
197 out = Some(out.map_or(m, |cur| cur.max(m)));
198 }
199 }
200 }
201 out
202}
203
204fn make_req_fields<T: DeserializeOwned>(query: &tiders_query::Query) -> Result<T> {
205 let mut req_fields_query = query.clone();
206 req_fields_query
207 .add_request_and_include_fields()
208 .context("add req and include fields")?;
209
210 let fields = req_fields_query
211 .fields
212 .into_iter()
213 .map(|(k, v)| -> Result<_> {
214 Ok((
215 k.strip_suffix('s')
216 .context("field key should end with 's'")?
217 .to_owned(),
218 v.into_iter()
219 .map(|v| (v, true))
220 .collect::<BTreeMap<String, bool>>(),
221 ))
222 })
223 .collect::<Result<BTreeMap<String, _>>>()?;
224
225 let json_value = serde_json::to_value(&fields).context("serialize fields to JSON")?;
226 serde_json::from_value(json_value).context("deserialize fields from JSON")
227}
228
229pub async fn start_stream(provider_config: ProviderConfig, mut query: Query) -> Result<DataStream> {
231 let generic_query = match &mut query {
232 Query::Evm(evm_query) => {
233 let generic_query = evm_query_to_generic(evm_query).context("validate evm query")?;
234
235 evm_query.fields = make_req_fields(&generic_query).context("make req fields")?;
236
237 generic_query
238 }
239 Query::Svm(svm_query) => {
240 let generic_query = svm_query_to_generic(svm_query);
241
242 svm_query.fields = make_req_fields(&generic_query).context("make req fields")?;
243
244 generic_query
245 }
246 };
247 let generic_query = Arc::new(generic_query);
248
249 let stream = match provider_config.kind {
250 ProviderKind::Sqd => {
251 provider::sqd::start_stream(provider_config, query).context("start sqd stream")?
252 }
253 ProviderKind::Hypersync => provider::hypersync::start_stream(provider_config, query)
254 .await
255 .context("start hypersync stream")?,
256 ProviderKind::Rpc => {
257 provider::rpc::start_stream(&provider_config, query).context("start rpc stream")?
258 }
259 };
260
261 let stream = stream.then(move |res| {
262 let generic_query = Arc::clone(&generic_query);
263 async {
264 rayon_async::spawn(move || {
265 res.and_then(move |data| {
266 let last_block = extract_last_block(&data);
267 let data = tiders_query::run_query(&data, &generic_query)
268 .context("run local query")?;
269 Ok(StreamItem { data, last_block })
270 })
271 })
272 .await
273 .context("rayon task was cancelled")
274 .and_then(|r| r)
275 }
276 });
277
278 Ok(Box::pin(stream))
279}
280
281#[cfg(test)]
282mod tests {
283
284 use super::*;
285 use crate::svm::*;
286 use parquet::arrow::ArrowWriter;
287 use std::fs::File;
288
289 #[tokio::test]
290 #[ignore]
291 async fn simple_svm_start_stream() {
292 let mut provider_config = ProviderConfig::new(ProviderKind::Sqd);
293 provider_config.url = Some("https://portal.sqd.dev/datasets/solana-mainnet".to_string());
294
295 let program_id = "TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA";
296 let program_id: [u8; 32] = bs58::decode(program_id)
297 .into_vec()
298 .unwrap()
299 .try_into()
300 .unwrap();
301 let program_id = Address(program_id);
302
303 let query = crate::Query::Svm(svm::Query {
304 from_block: 329443000,
305 to_block: Some(329443000),
306 include_all_blocks: false,
307 fields: Fields {
308 instruction: InstructionFields::all(),
309 transaction: TransactionFields::default(),
310 log: LogFields::default(),
311 balance: BalanceFields::default(),
312 token_balance: TokenBalanceFields::default(),
313 reward: RewardFields::default(),
314 block: BlockFields::default(),
315 },
316 instructions: vec![
317 InstructionRequest {
319 program_id: vec![program_id],
320 discriminator: vec![Data(vec![12, 96, 49, 128, 22])],
321 ..Default::default()
322 },
323 ],
324 transactions: vec![],
325 logs: vec![],
326 balances: vec![],
327 token_balances: vec![],
328 rewards: vec![],
329 });
330 let mut stream = start_stream(provider_config, query).await.unwrap();
331 let item = stream.next().await.unwrap().unwrap();
332 for (k, v) in item.data.into_iter() {
333 let mut file = File::create(format!("{}.parquet", k)).unwrap();
334 let mut writer = ArrowWriter::try_new(&mut file, v.schema(), None).unwrap();
335 writer.write(&v).unwrap();
336 writer.close().unwrap();
337 }
338 }
339
340 #[tokio::test(flavor = "multi_thread")]
347 #[ignore]
348 async fn simple_start_stream() {
349 use crate::evm::{Fields, LogFields, LogRequest, Query as EvmQuery, Topic};
350
351 let transfer_topic0: [u8; 32] = {
353 let mut out = [0u8; 32];
354 faster_hex::hex_decode(
355 b"ddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef",
356 &mut out,
357 )
358 .unwrap();
359 out
360 };
361
362 let from_block = 18_000_000u64;
363 let to_block = 18_000_010u64;
364 let mut provider_config = ProviderConfig::new(ProviderKind::Sqd);
367 provider_config.url = Some("https://portal.sqd.dev/datasets/ethereum-mainnet".to_string());
368 provider_config.stop_on_head = true;
369
370 let mut fields = Fields::default();
377 fields.log = LogFields {
378 log_index: true,
379 block_number: true,
380 address: true,
381 topic0: true,
382 ..LogFields::default()
383 };
384
385 let query = crate::Query::Evm(EvmQuery {
386 from_block,
387 to_block: Some(to_block),
388 include_all_blocks: false,
389 logs: vec![LogRequest {
390 topic0: vec![Topic(transfer_topic0)],
391 ..LogRequest::default()
392 }],
393 transactions: vec![],
394 traces: vec![],
395 fields,
396 });
397
398 let mut stream = start_stream(provider_config, query).await.unwrap();
399
400 println!("from_block={} to_block={}", from_block, to_block);
401
402 let mut highest: Option<u64> = None;
403 let mut total_rows: usize = 0;
404
405 while let Some(res) = stream.next().await {
406 let item = res.unwrap();
407 println!("item last_block={:?}", item.last_block);
408
409 if let Some(lb) = item.last_block {
411 assert!(
412 lb >= from_block,
413 "last_block {} < from_block {}",
414 lb,
415 from_block
416 );
417 assert!(lb <= to_block, "last_block {} > to_block {}", lb, to_block);
418 }
419
420 if let (Some(prev), Some(cur)) = (highest, item.last_block) {
422 assert!(
423 cur >= prev,
424 "last_block went backwards: {} -> {}",
425 prev,
426 cur
427 );
428 }
429 if item.last_block.is_some() {
430 highest = item.last_block;
431 }
432
433 if let Some(logs) = item.data.get("logs") {
434 total_rows += logs.num_rows();
435 }
436 }
437
438 println!(
439 "final: from_block={} to_block={} last_block={:?} total_rows={}",
440 from_block, to_block, highest, total_rows
441 );
442
443 assert!(
447 total_rows > 0,
448 "expected at least one Transfer log in window"
449 );
450 assert_eq!(highest, Some(to_block));
451 }
452}