hypersync_net_types/
query.rs

1use crate::block::{BlockField, BlockSelection};
2use crate::log::{LogField, LogSelection};
3use crate::trace::{TraceField, TraceSelection};
4use crate::transaction::{TransactionField, TransactionSelection};
5use crate::{hypersync_net_types_capnp, BuilderReader};
6use anyhow::Context;
7use capnp::message::Builder;
8use capnp::message::ReaderOptions;
9use serde::{Deserialize, Serialize};
10use std::collections::BTreeSet;
11
12#[derive(Default, Serialize, Deserialize, Clone, Debug, PartialEq)]
13pub struct Query {
14    /// The block to start the query from
15    pub from_block: u64,
16    /// The block to end the query at. If not specified, the query will go until the
17    ///  end of data. Exclusive, the returned range will be [from_block..to_block).
18    ///
19    /// The query will return before it reaches this target block if it hits the time limit
20    ///  configured on the server. The user should continue their query by putting the
21    ///  next_block field in the response into from_block field of their next query. This implements
22    ///  pagination.
23    #[serde(skip_serializing_if = "Option::is_none")]
24    pub to_block: Option<u64>,
25    /// List of log selections, these have an OR relationship between them, so the query will return logs
26    /// that match any of these selections.
27    #[serde(default, skip_serializing_if = "Vec::is_empty")]
28    pub logs: Vec<LogSelection>,
29    /// List of transaction selections, the query will return transactions that match any of these selections
30    #[serde(default, skip_serializing_if = "Vec::is_empty")]
31    pub transactions: Vec<TransactionSelection>,
32    /// List of trace selections, the query will return traces that match any of these selections
33    #[serde(default, skip_serializing_if = "Vec::is_empty")]
34    pub traces: Vec<TraceSelection>,
35    /// List of block selections, the query will return blocks that match any of these selections
36    #[serde(default, skip_serializing_if = "Vec::is_empty")]
37    pub blocks: Vec<BlockSelection>,
38    /// Weather to include all blocks regardless of if they are related to a returned transaction or log. Normally
39    ///  the server will return only the blocks that are related to the transaction or logs in the response. But if this
40    ///  is set to true, the server will return data for all blocks in the requested range [from_block, to_block).
41    #[serde(default, skip_serializing_if = "is_default")]
42    pub include_all_blocks: bool,
43    /// Field selection. The user can select which fields they are interested in, requesting less fields will improve
44    ///  query execution time and reduce the payload size so the user should always use a minimal number of fields.
45    #[serde(default, skip_serializing_if = "is_default")]
46    pub field_selection: FieldSelection,
47    /// Maximum number of blocks that should be returned, the server might return more blocks than this number but
48    ///  it won't overshoot by too much.
49    #[serde(default, skip_serializing_if = "Option::is_none")]
50    pub max_num_blocks: Option<usize>,
51    /// Maximum number of transactions that should be returned, the server might return more transactions than this number but
52    ///  it won't overshoot by too much.
53    #[serde(default, skip_serializing_if = "Option::is_none")]
54    pub max_num_transactions: Option<usize>,
55    /// Maximum number of logs that should be returned, the server might return more logs than this number but
56    ///  it won't overshoot by too much.
57    #[serde(default, skip_serializing_if = "Option::is_none")]
58    pub max_num_logs: Option<usize>,
59    /// Maximum number of traces that should be returned, the server might return more traces than this number but
60    ///  it won't overshoot by too much.
61    #[serde(default, skip_serializing_if = "Option::is_none")]
62    pub max_num_traces: Option<usize>,
63    /// Selects join mode for the query,
64    /// Default: join in this order logs -> transactions -> traces -> blocks
65    /// JoinAll: join everything to everything. For example if logSelection matches log0, we get the
66    /// associated transaction of log0 and then we get associated logs of that transaction as well. Applies similarly
67    /// to blocks, traces.
68    /// JoinNothing: join nothing.
69    #[serde(default, skip_serializing_if = "is_default")]
70    pub join_mode: JoinMode,
71}
72
73/// Used to skip serializing a defaulted serde field if
74/// the value matches the default value.
75fn is_default<T: Default + PartialEq>(t: &T) -> bool {
76    t == &T::default()
77}
78
79#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq, Copy)]
80pub enum JoinMode {
81    /// Join in this order logs -> transactions -> traces -> blocks
82    Default,
83    /// Join everything to everything. For example if logSelection matches log0, we get the
84    /// associated transaction of log0 and then we get associated logs of that transaction as well. Applies similarly
85    /// to blocks, traces.
86    JoinAll,
87    /// JoinNothing: join nothing.
88    JoinNothing,
89}
90
91impl Default for JoinMode {
92    fn default() -> Self {
93        Self::Default
94    }
95}
96
97#[derive(Default, Serialize, Deserialize, Clone, Debug, PartialEq)]
98pub struct FieldSelection {
99    #[serde(default, skip_serializing_if = "BTreeSet::is_empty")]
100    pub block: BTreeSet<BlockField>,
101    #[serde(default, skip_serializing_if = "BTreeSet::is_empty")]
102    pub transaction: BTreeSet<TransactionField>,
103    #[serde(default, skip_serializing_if = "BTreeSet::is_empty")]
104    pub log: BTreeSet<LogField>,
105    #[serde(default, skip_serializing_if = "BTreeSet::is_empty")]
106    pub trace: BTreeSet<TraceField>,
107}
108
109impl Query {
110    pub fn to_bytes(&self) -> anyhow::Result<Vec<u8>> {
111        // Check compression.rs benchmarks
112        // regulas capnp bytes compresses better with zstd than
113        // capnp packed bytes
114        let capnp_bytes = self
115            .to_capnp_bytes()
116            .context("Failed converting query to capnp message")?;
117
118        // ZSTD level 6 seems to have the best tradeoffs in terms of achieving
119        // a small payload, and being fast to decode once encoded.
120        let compressed_bytes = zstd::encode_all(capnp_bytes.as_slice(), 6)
121            .context("Failed compressing capnp message to bytes")?;
122        Ok(compressed_bytes)
123    }
124
125    pub fn from_bytes(bytes: &[u8]) -> Result<Self, Box<dyn std::error::Error>> {
126        // Check compression.rs benchmarks
127        let decompressed_bytes = zstd::decode_all(bytes)?;
128        let query = Query::from_capnp_bytes(&decompressed_bytes)?;
129        Ok(query)
130    }
131
132    /// Serialize Query to Cap'n Proto format and return as bytes
133    pub fn to_capnp_bytes(&self) -> Result<Vec<u8>, capnp::Error> {
134        let mut message = Builder::new_default();
135        let query = message.init_root::<hypersync_net_types_capnp::query::Builder>();
136
137        self.populate_capnp_query(query)?;
138
139        let mut buf = Vec::new();
140        capnp::serialize::write_message(&mut buf, &message)?;
141        Ok(buf)
142    }
143
144    /// Deserialize Query from Cap'n Proto bytes
145    pub fn from_capnp_bytes(bytes: &[u8]) -> Result<Self, capnp::Error> {
146        let message_reader =
147            capnp::serialize::read_message(&mut std::io::Cursor::new(bytes), ReaderOptions::new())?;
148        let query = message_reader.get_root::<hypersync_net_types_capnp::query::Reader>()?;
149
150        Self::from_capnp_query(query)
151    }
152    /// Serialize using packed format (for testing)
153    pub fn to_capnp_bytes_packed(&self) -> Result<Vec<u8>, capnp::Error> {
154        let mut message = Builder::new_default();
155        let query = message.init_root::<hypersync_net_types_capnp::query::Builder>();
156
157        self.populate_capnp_query(query)?;
158
159        let mut buf = Vec::new();
160        capnp::serialize_packed::write_message(&mut buf, &message)?;
161        Ok(buf)
162    }
163
164    /// Deserialize using packed format (for testing)
165    pub fn from_capnp_bytes_packed(bytes: &[u8]) -> Result<Self, capnp::Error> {
166        let message_reader = capnp::serialize_packed::read_message(
167            &mut std::io::Cursor::new(bytes),
168            ReaderOptions::new(),
169        )?;
170        let query = message_reader.get_root::<hypersync_net_types_capnp::query::Reader>()?;
171
172        Self::from_capnp_query(query)
173    }
174
175    fn populate_capnp_query(
176        &self,
177        mut query: hypersync_net_types_capnp::query::Builder,
178    ) -> Result<(), capnp::Error> {
179        let mut block_range_builder = query.reborrow().init_block_range();
180        block_range_builder.set_from_block(self.from_block);
181
182        if let Some(to_block) = self.to_block {
183            let mut to_block_builder = block_range_builder.reborrow().init_to_block();
184            to_block_builder.set_value(to_block)
185        }
186
187        // Hehe
188        let mut body_builder = query.reborrow().init_body();
189
190        body_builder
191            .reborrow()
192            .set_include_all_blocks(self.include_all_blocks);
193
194        // Set max nums using OptUInt64
195        if let Some(max_num_blocks) = self.max_num_blocks {
196            let mut max_blocks_builder = body_builder.reborrow().init_max_num_blocks();
197            max_blocks_builder.set_value(max_num_blocks as u64);
198        }
199        if let Some(max_num_transactions) = self.max_num_transactions {
200            let mut max_tx_builder = body_builder.reborrow().init_max_num_transactions();
201            max_tx_builder.set_value(max_num_transactions as u64);
202        }
203        if let Some(max_num_logs) = self.max_num_logs {
204            let mut max_logs_builder = body_builder.reborrow().init_max_num_logs();
205            max_logs_builder.set_value(max_num_logs as u64);
206        }
207        if let Some(max_num_traces) = self.max_num_traces {
208            let mut max_traces_builder = body_builder.reborrow().init_max_num_traces();
209            max_traces_builder.set_value(max_num_traces as u64);
210        }
211
212        // Set join mode
213        let join_mode = match self.join_mode {
214            JoinMode::Default => hypersync_net_types_capnp::JoinMode::Default,
215            JoinMode::JoinAll => hypersync_net_types_capnp::JoinMode::JoinAll,
216            JoinMode::JoinNothing => hypersync_net_types_capnp::JoinMode::JoinNothing,
217        };
218        body_builder.reborrow().set_join_mode(join_mode);
219
220        // Set field selection
221        {
222            let mut field_selection = body_builder.reborrow().init_field_selection();
223
224            // Set block fields
225            let mut block_list = field_selection
226                .reborrow()
227                .init_block(self.field_selection.block.len() as u32);
228            for (i, field) in self.field_selection.block.iter().enumerate() {
229                block_list.set(i as u32, field.to_capnp());
230            }
231
232            // Set transaction fields
233            let mut tx_list = field_selection
234                .reborrow()
235                .init_transaction(self.field_selection.transaction.len() as u32);
236            for (i, field) in self.field_selection.transaction.iter().enumerate() {
237                tx_list.set(i as u32, field.to_capnp());
238            }
239
240            // Set log fields
241            let mut log_list = field_selection
242                .reborrow()
243                .init_log(self.field_selection.log.len() as u32);
244            for (i, field) in self.field_selection.log.iter().enumerate() {
245                log_list.set(i as u32, field.to_capnp());
246            }
247
248            // Set trace fields
249            let mut trace_list = field_selection
250                .reborrow()
251                .init_trace(self.field_selection.trace.len() as u32);
252            for (i, field) in self.field_selection.trace.iter().enumerate() {
253                trace_list.set(i as u32, field.to_capnp());
254            }
255        }
256
257        // Set logs
258        {
259            let mut logs_list = body_builder.reborrow().init_logs(self.logs.len() as u32);
260            for (i, log_selection) in self.logs.iter().enumerate() {
261                let mut log_sel = logs_list.reborrow().get(i as u32);
262                log_selection.populate_builder(&mut log_sel)?;
263            }
264        }
265
266        // Set transactions
267        {
268            let mut tx_list = body_builder
269                .reborrow()
270                .init_transactions(self.transactions.len() as u32);
271            for (i, tx_selection) in self.transactions.iter().enumerate() {
272                let mut tx_sel = tx_list.reborrow().get(i as u32);
273                tx_selection.populate_builder(&mut tx_sel)?;
274            }
275        }
276
277        // Set traces
278        {
279            let mut trace_list = body_builder
280                .reborrow()
281                .init_traces(self.traces.len() as u32);
282            for (i, trace_selection) in self.traces.iter().enumerate() {
283                let mut trace_sel = trace_list.reborrow().get(i as u32);
284                trace_selection.populate_builder(&mut trace_sel)?;
285            }
286        }
287
288        // Set blocks
289        {
290            let mut block_list = body_builder
291                .reborrow()
292                .init_blocks(self.blocks.len() as u32);
293            for (i, block_selection) in self.blocks.iter().enumerate() {
294                let mut block_sel = block_list.reborrow().get(i as u32);
295                block_selection.populate_builder(&mut block_sel)?;
296            }
297        }
298
299        Ok(())
300    }
301
302    fn from_capnp_query(
303        query: hypersync_net_types_capnp::query::Reader,
304    ) -> Result<Self, capnp::Error> {
305        let block_range = query.get_block_range()?;
306
307        let from_block = block_range.get_from_block();
308        let to_block = if block_range.has_to_block() {
309            Some(block_range.get_to_block()?.get_value())
310        } else {
311            None
312        };
313        let body = query.get_body()?;
314        let include_all_blocks = body.get_include_all_blocks();
315
316        // Parse field selection
317        let field_selection = if body.has_field_selection() {
318            let fs = body.get_field_selection()?;
319
320            let block_fields = if fs.has_block() {
321                let block_list = fs.get_block()?;
322                (0..block_list.len())
323                    .map(|i| block_list.get(i).map(BlockField::from_capnp))
324                    .collect::<Result<BTreeSet<_>, capnp::NotInSchema>>()?
325            } else {
326                BTreeSet::new()
327            };
328
329            let transaction_fields = if fs.has_transaction() {
330                let tx_list = fs.get_transaction()?;
331                (0..tx_list.len())
332                    .map(|i| tx_list.get(i).map(TransactionField::from_capnp))
333                    .collect::<Result<BTreeSet<_>, capnp::NotInSchema>>()?
334            } else {
335                BTreeSet::new()
336            };
337
338            let log_fields = if fs.has_log() {
339                let log_list = fs.get_log()?;
340                (0..log_list.len())
341                    .map(|i| log_list.get(i).map(LogField::from_capnp))
342                    .collect::<Result<BTreeSet<_>, capnp::NotInSchema>>()?
343            } else {
344                BTreeSet::new()
345            };
346
347            let trace_fields = if fs.has_trace() {
348                let trace_list = fs.get_trace()?;
349                (0..trace_list.len())
350                    .map(|i| trace_list.get(i).map(TraceField::from_capnp))
351                    .collect::<Result<BTreeSet<_>, capnp::NotInSchema>>()?
352            } else {
353                BTreeSet::new()
354            };
355
356            FieldSelection {
357                block: block_fields,
358                transaction: transaction_fields,
359                log: log_fields,
360                trace: trace_fields,
361            }
362        } else {
363            FieldSelection::default()
364        };
365
366        // Parse max values using OptUInt64
367        let max_num_blocks = if body.has_max_num_blocks() {
368            let max_blocks_reader = body.get_max_num_blocks()?;
369            let value = max_blocks_reader.get_value();
370            Some(value as usize)
371        } else {
372            None
373        };
374        let max_num_transactions = if body.has_max_num_transactions() {
375            let max_tx_reader = body.get_max_num_transactions()?;
376            let value = max_tx_reader.get_value();
377            Some(value as usize)
378        } else {
379            None
380        };
381        let max_num_logs = if body.has_max_num_logs() {
382            let max_logs_reader = body.get_max_num_logs()?;
383            let value = max_logs_reader.get_value();
384            Some(value as usize)
385        } else {
386            None
387        };
388        let max_num_traces = if body.has_max_num_traces() {
389            let max_traces_reader = body.get_max_num_traces()?;
390            let value = max_traces_reader.get_value();
391            Some(value as usize)
392        } else {
393            None
394        };
395
396        // Parse join mode
397        let join_mode = match body.get_join_mode()? {
398            hypersync_net_types_capnp::JoinMode::Default => JoinMode::Default,
399            hypersync_net_types_capnp::JoinMode::JoinAll => JoinMode::JoinAll,
400            hypersync_net_types_capnp::JoinMode::JoinNothing => JoinMode::JoinNothing,
401        };
402
403        // Parse selections
404        let logs = if body.has_logs() {
405            let logs_list = body.get_logs()?;
406            let mut logs = Vec::new();
407            for i in 0..logs_list.len() {
408                let log_reader = logs_list.get(i);
409                logs.push(LogSelection::from_reader(log_reader)?);
410            }
411            logs
412        } else {
413            Vec::new()
414        };
415
416        let transactions = if body.has_transactions() {
417            let tx_list = body.get_transactions()?;
418            let mut transactions = Vec::new();
419            for i in 0..tx_list.len() {
420                let tx_reader = tx_list.get(i);
421                transactions.push(TransactionSelection::from_reader(tx_reader)?);
422            }
423            transactions
424        } else {
425            Vec::new()
426        };
427
428        let traces = if body.has_traces() {
429            let traces_list = body.get_traces()?;
430            let mut traces = Vec::new();
431            for i in 0..traces_list.len() {
432                let trace_reader = traces_list.get(i);
433                traces.push(TraceSelection::from_reader(trace_reader)?);
434            }
435            traces
436        } else {
437            Vec::new()
438        };
439
440        let blocks = if body.has_blocks() {
441            let blocks_list = body.get_blocks()?;
442            let mut blocks = Vec::new();
443            for i in 0..blocks_list.len() {
444                let block_reader = blocks_list.get(i);
445                blocks.push(BlockSelection::from_reader(block_reader)?);
446            }
447            blocks
448        } else {
449            Vec::new()
450        };
451
452        Ok(Query {
453            from_block,
454            to_block,
455            logs,
456            transactions,
457            traces,
458            blocks,
459            include_all_blocks,
460            field_selection,
461            max_num_blocks,
462            max_num_transactions,
463            max_num_logs,
464            max_num_traces,
465            join_mode,
466        })
467    }
468}
469
470#[cfg(test)]
471pub mod tests {
472    use super::*;
473    use pretty_assertions::assert_eq;
474
475    pub fn test_query_serde(query: Query, label: &str) {
476        fn test_encode_decode<T: PartialEq + std::fmt::Debug>(
477            input: &T,
478            label: String,
479            encode: impl FnOnce(&T) -> Vec<u8>,
480            decode: impl FnOnce(&[u8]) -> T,
481        ) {
482            let val = encode(input);
483            let decoded = decode(&val);
484            assert_eq!(input, &decoded, "{label} does not match");
485        }
486
487        test_encode_decode(
488            &query,
489            label.to_string() + "-capnp",
490            |q| q.to_capnp_bytes().unwrap(),
491            |bytes| Query::from_capnp_bytes(bytes).unwrap(),
492        );
493        test_encode_decode(
494            &query,
495            label.to_string() + "-capnp-packed",
496            |q| q.to_capnp_bytes_packed().unwrap(),
497            |bytes| Query::from_capnp_bytes_packed(bytes).unwrap(),
498        );
499        test_encode_decode(
500            &query,
501            label.to_string() + "-json",
502            |q| serde_json::to_vec(q).unwrap(),
503            |bytes| serde_json::from_slice(bytes).unwrap(),
504        );
505    }
506
507    #[test]
508    pub fn test_query_serde_default() {
509        let query = Query::default();
510        test_query_serde(query, "default");
511    }
512
513    #[test]
514    pub fn test_query_serde_with_non_null_defaults() {
515        let query = Query {
516            from_block: u64::default(),
517            to_block: Some(u64::default()),
518            logs: Vec::default(),
519            transactions: Vec::default(),
520            traces: Vec::default(),
521            blocks: Vec::default(),
522            include_all_blocks: bool::default(),
523            field_selection: FieldSelection::default(),
524            max_num_blocks: Some(usize::default()),
525            max_num_transactions: Some(usize::default()),
526            max_num_logs: Some(usize::default()),
527            max_num_traces: Some(usize::default()),
528            join_mode: JoinMode::default(),
529        };
530        test_query_serde(query, "base query with_non_null_defaults");
531    }
532
533    #[test]
534    pub fn test_query_serde_with_non_null_values() {
535        let query = Query {
536            from_block: 50,
537            to_block: Some(500),
538            logs: Vec::default(),
539            transactions: Vec::default(),
540            traces: Vec::default(),
541            blocks: Vec::default(),
542            include_all_blocks: true,
543            field_selection: FieldSelection::default(),
544            max_num_blocks: Some(50),
545            max_num_transactions: Some(100),
546            max_num_logs: Some(150),
547            max_num_traces: Some(200),
548            join_mode: JoinMode::JoinAll,
549        };
550        test_query_serde(query, "base query with_non_null_values");
551    }
552}