1use crate::block::{BlockField, BlockSelection};
2use crate::log::{LogField, LogSelection};
3use crate::trace::{TraceField, TraceSelection};
4use crate::transaction::{TransactionField, TransactionSelection};
5use crate::{hypersync_net_types_capnp, BuilderReader};
6use anyhow::Context;
7use capnp::message::Builder;
8use capnp::message::ReaderOptions;
9use serde::{Deserialize, Serialize};
10use std::collections::BTreeSet;
11
12#[derive(Default, Serialize, Deserialize, Clone, Debug, PartialEq)]
13pub struct Query {
14 pub from_block: u64,
16 #[serde(skip_serializing_if = "Option::is_none")]
24 pub to_block: Option<u64>,
25 #[serde(default, skip_serializing_if = "Vec::is_empty")]
28 pub logs: Vec<LogSelection>,
29 #[serde(default, skip_serializing_if = "Vec::is_empty")]
31 pub transactions: Vec<TransactionSelection>,
32 #[serde(default, skip_serializing_if = "Vec::is_empty")]
34 pub traces: Vec<TraceSelection>,
35 #[serde(default, skip_serializing_if = "Vec::is_empty")]
37 pub blocks: Vec<BlockSelection>,
38 #[serde(default, skip_serializing_if = "is_default")]
42 pub include_all_blocks: bool,
43 #[serde(default, skip_serializing_if = "is_default")]
46 pub field_selection: FieldSelection,
47 #[serde(default, skip_serializing_if = "Option::is_none")]
50 pub max_num_blocks: Option<usize>,
51 #[serde(default, skip_serializing_if = "Option::is_none")]
54 pub max_num_transactions: Option<usize>,
55 #[serde(default, skip_serializing_if = "Option::is_none")]
58 pub max_num_logs: Option<usize>,
59 #[serde(default, skip_serializing_if = "Option::is_none")]
62 pub max_num_traces: Option<usize>,
63 #[serde(default, skip_serializing_if = "is_default")]
70 pub join_mode: JoinMode,
71}
72
73fn is_default<T: Default + PartialEq>(t: &T) -> bool {
76 t == &T::default()
77}
78
79#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq, Copy)]
80pub enum JoinMode {
81 Default,
83 JoinAll,
87 JoinNothing,
89}
90
91impl Default for JoinMode {
92 fn default() -> Self {
93 Self::Default
94 }
95}
96
97#[derive(Default, Serialize, Deserialize, Clone, Debug, PartialEq)]
98pub struct FieldSelection {
99 #[serde(default, skip_serializing_if = "BTreeSet::is_empty")]
100 pub block: BTreeSet<BlockField>,
101 #[serde(default, skip_serializing_if = "BTreeSet::is_empty")]
102 pub transaction: BTreeSet<TransactionField>,
103 #[serde(default, skip_serializing_if = "BTreeSet::is_empty")]
104 pub log: BTreeSet<LogField>,
105 #[serde(default, skip_serializing_if = "BTreeSet::is_empty")]
106 pub trace: BTreeSet<TraceField>,
107}
108
109impl Query {
110 pub fn to_bytes(&self) -> anyhow::Result<Vec<u8>> {
111 let capnp_bytes = self
115 .to_capnp_bytes()
116 .context("Failed converting query to capnp message")?;
117
118 let compressed_bytes = zstd::encode_all(capnp_bytes.as_slice(), 6)
121 .context("Failed compressing capnp message to bytes")?;
122 Ok(compressed_bytes)
123 }
124
125 pub fn from_bytes(bytes: &[u8]) -> Result<Self, Box<dyn std::error::Error>> {
126 let decompressed_bytes = zstd::decode_all(bytes)?;
128 let query = Query::from_capnp_bytes(&decompressed_bytes)?;
129 Ok(query)
130 }
131
132 pub fn to_capnp_bytes(&self) -> Result<Vec<u8>, capnp::Error> {
134 let mut message = Builder::new_default();
135 let query = message.init_root::<hypersync_net_types_capnp::query::Builder>();
136
137 self.populate_capnp_query(query)?;
138
139 let mut buf = Vec::new();
140 capnp::serialize::write_message(&mut buf, &message)?;
141 Ok(buf)
142 }
143
144 pub fn from_capnp_bytes(bytes: &[u8]) -> Result<Self, capnp::Error> {
146 let message_reader =
147 capnp::serialize::read_message(&mut std::io::Cursor::new(bytes), ReaderOptions::new())?;
148 let query = message_reader.get_root::<hypersync_net_types_capnp::query::Reader>()?;
149
150 Self::from_capnp_query(query)
151 }
152 pub fn to_capnp_bytes_packed(&self) -> Result<Vec<u8>, capnp::Error> {
154 let mut message = Builder::new_default();
155 let query = message.init_root::<hypersync_net_types_capnp::query::Builder>();
156
157 self.populate_capnp_query(query)?;
158
159 let mut buf = Vec::new();
160 capnp::serialize_packed::write_message(&mut buf, &message)?;
161 Ok(buf)
162 }
163
164 pub fn from_capnp_bytes_packed(bytes: &[u8]) -> Result<Self, capnp::Error> {
166 let message_reader = capnp::serialize_packed::read_message(
167 &mut std::io::Cursor::new(bytes),
168 ReaderOptions::new(),
169 )?;
170 let query = message_reader.get_root::<hypersync_net_types_capnp::query::Reader>()?;
171
172 Self::from_capnp_query(query)
173 }
174
175 fn populate_capnp_query(
176 &self,
177 mut query: hypersync_net_types_capnp::query::Builder,
178 ) -> Result<(), capnp::Error> {
179 let mut block_range_builder = query.reborrow().init_block_range();
180 block_range_builder.set_from_block(self.from_block);
181
182 if let Some(to_block) = self.to_block {
183 let mut to_block_builder = block_range_builder.reborrow().init_to_block();
184 to_block_builder.set_value(to_block)
185 }
186
187 let mut body_builder = query.reborrow().init_body();
189
190 body_builder
191 .reborrow()
192 .set_include_all_blocks(self.include_all_blocks);
193
194 if let Some(max_num_blocks) = self.max_num_blocks {
196 let mut max_blocks_builder = body_builder.reborrow().init_max_num_blocks();
197 max_blocks_builder.set_value(max_num_blocks as u64);
198 }
199 if let Some(max_num_transactions) = self.max_num_transactions {
200 let mut max_tx_builder = body_builder.reborrow().init_max_num_transactions();
201 max_tx_builder.set_value(max_num_transactions as u64);
202 }
203 if let Some(max_num_logs) = self.max_num_logs {
204 let mut max_logs_builder = body_builder.reborrow().init_max_num_logs();
205 max_logs_builder.set_value(max_num_logs as u64);
206 }
207 if let Some(max_num_traces) = self.max_num_traces {
208 let mut max_traces_builder = body_builder.reborrow().init_max_num_traces();
209 max_traces_builder.set_value(max_num_traces as u64);
210 }
211
212 let join_mode = match self.join_mode {
214 JoinMode::Default => hypersync_net_types_capnp::JoinMode::Default,
215 JoinMode::JoinAll => hypersync_net_types_capnp::JoinMode::JoinAll,
216 JoinMode::JoinNothing => hypersync_net_types_capnp::JoinMode::JoinNothing,
217 };
218 body_builder.reborrow().set_join_mode(join_mode);
219
220 {
222 let mut field_selection = body_builder.reborrow().init_field_selection();
223
224 let mut block_list = field_selection
226 .reborrow()
227 .init_block(self.field_selection.block.len() as u32);
228 for (i, field) in self.field_selection.block.iter().enumerate() {
229 block_list.set(i as u32, field.to_capnp());
230 }
231
232 let mut tx_list = field_selection
234 .reborrow()
235 .init_transaction(self.field_selection.transaction.len() as u32);
236 for (i, field) in self.field_selection.transaction.iter().enumerate() {
237 tx_list.set(i as u32, field.to_capnp());
238 }
239
240 let mut log_list = field_selection
242 .reborrow()
243 .init_log(self.field_selection.log.len() as u32);
244 for (i, field) in self.field_selection.log.iter().enumerate() {
245 log_list.set(i as u32, field.to_capnp());
246 }
247
248 let mut trace_list = field_selection
250 .reborrow()
251 .init_trace(self.field_selection.trace.len() as u32);
252 for (i, field) in self.field_selection.trace.iter().enumerate() {
253 trace_list.set(i as u32, field.to_capnp());
254 }
255 }
256
257 {
259 let mut logs_list = body_builder.reborrow().init_logs(self.logs.len() as u32);
260 for (i, log_selection) in self.logs.iter().enumerate() {
261 let mut log_sel = logs_list.reborrow().get(i as u32);
262 log_selection.populate_builder(&mut log_sel)?;
263 }
264 }
265
266 {
268 let mut tx_list = body_builder
269 .reborrow()
270 .init_transactions(self.transactions.len() as u32);
271 for (i, tx_selection) in self.transactions.iter().enumerate() {
272 let mut tx_sel = tx_list.reborrow().get(i as u32);
273 tx_selection.populate_builder(&mut tx_sel)?;
274 }
275 }
276
277 {
279 let mut trace_list = body_builder
280 .reborrow()
281 .init_traces(self.traces.len() as u32);
282 for (i, trace_selection) in self.traces.iter().enumerate() {
283 let mut trace_sel = trace_list.reborrow().get(i as u32);
284 trace_selection.populate_builder(&mut trace_sel)?;
285 }
286 }
287
288 {
290 let mut block_list = body_builder
291 .reborrow()
292 .init_blocks(self.blocks.len() as u32);
293 for (i, block_selection) in self.blocks.iter().enumerate() {
294 let mut block_sel = block_list.reborrow().get(i as u32);
295 block_selection.populate_builder(&mut block_sel)?;
296 }
297 }
298
299 Ok(())
300 }
301
302 fn from_capnp_query(
303 query: hypersync_net_types_capnp::query::Reader,
304 ) -> Result<Self, capnp::Error> {
305 let block_range = query.get_block_range()?;
306
307 let from_block = block_range.get_from_block();
308 let to_block = if block_range.has_to_block() {
309 Some(block_range.get_to_block()?.get_value())
310 } else {
311 None
312 };
313 let body = query.get_body()?;
314 let include_all_blocks = body.get_include_all_blocks();
315
316 let field_selection = if body.has_field_selection() {
318 let fs = body.get_field_selection()?;
319
320 let block_fields = if fs.has_block() {
321 let block_list = fs.get_block()?;
322 (0..block_list.len())
323 .map(|i| block_list.get(i).map(BlockField::from_capnp))
324 .collect::<Result<BTreeSet<_>, capnp::NotInSchema>>()?
325 } else {
326 BTreeSet::new()
327 };
328
329 let transaction_fields = if fs.has_transaction() {
330 let tx_list = fs.get_transaction()?;
331 (0..tx_list.len())
332 .map(|i| tx_list.get(i).map(TransactionField::from_capnp))
333 .collect::<Result<BTreeSet<_>, capnp::NotInSchema>>()?
334 } else {
335 BTreeSet::new()
336 };
337
338 let log_fields = if fs.has_log() {
339 let log_list = fs.get_log()?;
340 (0..log_list.len())
341 .map(|i| log_list.get(i).map(LogField::from_capnp))
342 .collect::<Result<BTreeSet<_>, capnp::NotInSchema>>()?
343 } else {
344 BTreeSet::new()
345 };
346
347 let trace_fields = if fs.has_trace() {
348 let trace_list = fs.get_trace()?;
349 (0..trace_list.len())
350 .map(|i| trace_list.get(i).map(TraceField::from_capnp))
351 .collect::<Result<BTreeSet<_>, capnp::NotInSchema>>()?
352 } else {
353 BTreeSet::new()
354 };
355
356 FieldSelection {
357 block: block_fields,
358 transaction: transaction_fields,
359 log: log_fields,
360 trace: trace_fields,
361 }
362 } else {
363 FieldSelection::default()
364 };
365
366 let max_num_blocks = if body.has_max_num_blocks() {
368 let max_blocks_reader = body.get_max_num_blocks()?;
369 let value = max_blocks_reader.get_value();
370 Some(value as usize)
371 } else {
372 None
373 };
374 let max_num_transactions = if body.has_max_num_transactions() {
375 let max_tx_reader = body.get_max_num_transactions()?;
376 let value = max_tx_reader.get_value();
377 Some(value as usize)
378 } else {
379 None
380 };
381 let max_num_logs = if body.has_max_num_logs() {
382 let max_logs_reader = body.get_max_num_logs()?;
383 let value = max_logs_reader.get_value();
384 Some(value as usize)
385 } else {
386 None
387 };
388 let max_num_traces = if body.has_max_num_traces() {
389 let max_traces_reader = body.get_max_num_traces()?;
390 let value = max_traces_reader.get_value();
391 Some(value as usize)
392 } else {
393 None
394 };
395
396 let join_mode = match body.get_join_mode()? {
398 hypersync_net_types_capnp::JoinMode::Default => JoinMode::Default,
399 hypersync_net_types_capnp::JoinMode::JoinAll => JoinMode::JoinAll,
400 hypersync_net_types_capnp::JoinMode::JoinNothing => JoinMode::JoinNothing,
401 };
402
403 let logs = if body.has_logs() {
405 let logs_list = body.get_logs()?;
406 let mut logs = Vec::new();
407 for i in 0..logs_list.len() {
408 let log_reader = logs_list.get(i);
409 logs.push(LogSelection::from_reader(log_reader)?);
410 }
411 logs
412 } else {
413 Vec::new()
414 };
415
416 let transactions = if body.has_transactions() {
417 let tx_list = body.get_transactions()?;
418 let mut transactions = Vec::new();
419 for i in 0..tx_list.len() {
420 let tx_reader = tx_list.get(i);
421 transactions.push(TransactionSelection::from_reader(tx_reader)?);
422 }
423 transactions
424 } else {
425 Vec::new()
426 };
427
428 let traces = if body.has_traces() {
429 let traces_list = body.get_traces()?;
430 let mut traces = Vec::new();
431 for i in 0..traces_list.len() {
432 let trace_reader = traces_list.get(i);
433 traces.push(TraceSelection::from_reader(trace_reader)?);
434 }
435 traces
436 } else {
437 Vec::new()
438 };
439
440 let blocks = if body.has_blocks() {
441 let blocks_list = body.get_blocks()?;
442 let mut blocks = Vec::new();
443 for i in 0..blocks_list.len() {
444 let block_reader = blocks_list.get(i);
445 blocks.push(BlockSelection::from_reader(block_reader)?);
446 }
447 blocks
448 } else {
449 Vec::new()
450 };
451
452 Ok(Query {
453 from_block,
454 to_block,
455 logs,
456 transactions,
457 traces,
458 blocks,
459 include_all_blocks,
460 field_selection,
461 max_num_blocks,
462 max_num_transactions,
463 max_num_logs,
464 max_num_traces,
465 join_mode,
466 })
467 }
468}
469
470#[cfg(test)]
471pub mod tests {
472 use super::*;
473 use pretty_assertions::assert_eq;
474
475 pub fn test_query_serde(query: Query, label: &str) {
476 fn test_encode_decode<T: PartialEq + std::fmt::Debug>(
477 input: &T,
478 label: String,
479 encode: impl FnOnce(&T) -> Vec<u8>,
480 decode: impl FnOnce(&[u8]) -> T,
481 ) {
482 let val = encode(input);
483 let decoded = decode(&val);
484 assert_eq!(input, &decoded, "{label} does not match");
485 }
486
487 test_encode_decode(
488 &query,
489 label.to_string() + "-capnp",
490 |q| q.to_capnp_bytes().unwrap(),
491 |bytes| Query::from_capnp_bytes(bytes).unwrap(),
492 );
493 test_encode_decode(
494 &query,
495 label.to_string() + "-capnp-packed",
496 |q| q.to_capnp_bytes_packed().unwrap(),
497 |bytes| Query::from_capnp_bytes_packed(bytes).unwrap(),
498 );
499 test_encode_decode(
500 &query,
501 label.to_string() + "-json",
502 |q| serde_json::to_vec(q).unwrap(),
503 |bytes| serde_json::from_slice(bytes).unwrap(),
504 );
505 }
506
507 #[test]
508 pub fn test_query_serde_default() {
509 let query = Query::default();
510 test_query_serde(query, "default");
511 }
512
513 #[test]
514 pub fn test_query_serde_with_non_null_defaults() {
515 let query = Query {
516 from_block: u64::default(),
517 to_block: Some(u64::default()),
518 logs: Vec::default(),
519 transactions: Vec::default(),
520 traces: Vec::default(),
521 blocks: Vec::default(),
522 include_all_blocks: bool::default(),
523 field_selection: FieldSelection::default(),
524 max_num_blocks: Some(usize::default()),
525 max_num_transactions: Some(usize::default()),
526 max_num_logs: Some(usize::default()),
527 max_num_traces: Some(usize::default()),
528 join_mode: JoinMode::default(),
529 };
530 test_query_serde(query, "base query with_non_null_defaults");
531 }
532
533 #[test]
534 pub fn test_query_serde_with_non_null_values() {
535 let query = Query {
536 from_block: 50,
537 to_block: Some(500),
538 logs: Vec::default(),
539 transactions: Vec::default(),
540 traces: Vec::default(),
541 blocks: Vec::default(),
542 include_all_blocks: true,
543 field_selection: FieldSelection::default(),
544 max_num_blocks: Some(50),
545 max_num_transactions: Some(100),
546 max_num_logs: Some(150),
547 max_num_traces: Some(200),
548 join_mode: JoinMode::JoinAll,
549 };
550 test_query_serde(query, "base query with_non_null_values");
551 }
552}