1use crate::{hypersync_net_types_capnp, types::AnyOf, CapnpBuilder, CapnpReader, Selection};
2use anyhow::Context;
3use arrayvec::ArrayVec;
4use hypersync_format::{Address, FilterWrapper, LogArgument};
5use serde::{Deserialize, Serialize};
6
7pub type LogSelection = Selection<LogFilter>;
8
9#[derive(Default, Serialize, Deserialize, Clone, Debug, PartialEq)]
10pub struct LogFilter {
11 #[serde(default, skip_serializing_if = "Vec::is_empty")]
14 pub address: Vec<Address>,
15 #[serde(default, skip_serializing_if = "Option::is_none")]
16 pub address_filter: Option<FilterWrapper>,
17 #[serde(default, skip_serializing_if = "ArrayVec::is_empty")]
20 pub topics: ArrayVec<Vec<LogArgument>, 4>,
21}
22
23#[cfg(feature = "arbitrary")]
24impl<'input> arbitrary::Arbitrary<'input> for LogFilter {
25 fn arbitrary(u: &mut arbitrary::Unstructured<'input>) -> arbitrary::Result<Self> {
26 let num_topics = u.arbitrary::<u8>()? % 4 + 1;
27 let mut topics = ArrayVec::<Vec<LogArgument>, 4>::new();
28 for _ in 0..num_topics {
29 topics.push(u.arbitrary()?);
30 }
31
32 Ok(Self {
33 address: u.arbitrary()?,
34 address_filter: u.arbitrary()?,
35 topics,
36 })
37 }
38}
39
40impl From<LogFilter> for AnyOf<LogFilter> {
41 fn from(filter: LogFilter) -> Self {
42 Self::new(filter)
43 }
44}
45
46impl LogFilter {
47 pub fn all() -> Self {
51 Default::default()
52 }
53
54 pub fn or(self, other: Self) -> AnyOf<Self> {
80 AnyOf::new(self).or(other)
81 }
82
83 pub fn and_address<I, A>(mut self, addresses: I) -> anyhow::Result<Self>
121 where
122 I: IntoIterator<Item = A>,
123 A: TryInto<Address>,
124 A::Error: std::error::Error + Send + Sync + 'static,
125 {
126 let mut converted_addresses: Vec<Address> = Vec::new();
127 for (idx, address) in addresses.into_iter().enumerate() {
128 converted_addresses.push(
129 address
130 .try_into()
131 .with_context(|| format!("invalid address at position {idx}"))?,
132 );
133 }
134 self.address = converted_addresses;
135 Ok(self)
136 }
137
138 fn and_topic<I, T>(mut self, topic_idx: usize, topics: I) -> anyhow::Result<Self>
139 where
140 I: IntoIterator<Item = T>,
141 T: TryInto<LogArgument>,
142 T::Error: std::error::Error + Send + Sync + 'static,
143 {
144 if topic_idx > 3 {
145 anyhow::bail!("topic index should not be greater than 3");
146 }
147
148 if self.topics.len() <= topic_idx {
149 for _ in 0..=(topic_idx - self.topics.len()) {
150 self.topics.push(Vec::new());
151 }
152 }
153 let topic_selection = self
154 .topics
155 .get_mut(topic_idx)
156 .expect("topic should exist from previous check");
157 topic_selection.clear();
158 for (idx, topic) in topics.into_iter().enumerate() {
159 topic_selection.push(
160 topic
161 .try_into()
162 .with_context(|| format!("invalid topic at position {idx}"))?,
163 );
164 }
165 Ok(self)
166 }
167
168 pub fn and_topic0<I, T>(self, topics: I) -> anyhow::Result<Self>
208 where
209 I: IntoIterator<Item = T>,
210 T: TryInto<LogArgument>,
211 T::Error: std::error::Error + Send + Sync + 'static,
212 {
213 self.and_topic(0, topics)
214 }
215 pub fn and_topic1<I, T>(self, topics: I) -> anyhow::Result<Self>
247 where
248 I: IntoIterator<Item = T>,
249 T: TryInto<LogArgument>,
250 T::Error: std::error::Error + Send + Sync + 'static,
251 {
252 self.and_topic(1, topics)
253 }
254 pub fn and_topic2<I, T>(self, topics: I) -> anyhow::Result<Self>
286 where
287 I: IntoIterator<Item = T>,
288 T: TryInto<LogArgument>,
289 T::Error: std::error::Error + Send + Sync + 'static,
290 {
291 self.and_topic(2, topics)
292 }
293 pub fn and_topic3<I, T>(self, topics: I) -> anyhow::Result<Self>
325 where
326 I: IntoIterator<Item = T>,
327 T: TryInto<LogArgument>,
328 T::Error: std::error::Error + Send + Sync + 'static,
329 {
330 self.and_topic(3, topics)
331 }
332}
333
334impl CapnpBuilder<hypersync_net_types_capnp::log_filter::Owned> for LogFilter {
335 fn populate_builder(
336 &self,
337 builder: &mut hypersync_net_types_capnp::log_filter::Builder,
338 ) -> Result<(), capnp::Error> {
339 if !self.address.is_empty() {
341 let mut addr_list = builder.reborrow().init_address(self.address.len() as u32);
342 for (i, addr) in self.address.iter().enumerate() {
343 addr_list.set(i as u32, addr.as_slice());
344 }
345 }
346
347 if let Some(filter) = &self.address_filter {
349 builder.reborrow().set_address_filter(filter.0.as_bytes());
350 }
351
352 if !self.topics.is_empty() {
354 let mut topics_list = builder.reborrow().init_topics(self.topics.len() as u32);
355 for (i, topic_vec) in self.topics.iter().enumerate() {
356 let mut topic_list = topics_list
357 .reborrow()
358 .init(i as u32, topic_vec.len() as u32);
359 for (j, topic) in topic_vec.iter().enumerate() {
360 topic_list.set(j as u32, topic.as_slice());
361 }
362 }
363 }
364
365 Ok(())
366 }
367}
368
369impl CapnpReader<hypersync_net_types_capnp::log_filter::Owned> for LogFilter {
370 fn from_reader(
372 reader: hypersync_net_types_capnp::log_filter::Reader,
373 ) -> Result<Self, capnp::Error> {
374 let mut address = Vec::new();
375
376 if reader.has_address() {
378 let addr_list = reader.get_address()?;
379 for i in 0..addr_list.len() {
380 let addr_data = addr_list.get(i)?;
381 if addr_data.len() == 20 {
382 let mut addr_bytes = [0u8; 20];
383 addr_bytes.copy_from_slice(addr_data);
384 address.push(Address::from(addr_bytes));
385 }
386 }
387 }
388
389 let mut address_filter = None;
390
391 if reader.has_address_filter() {
393 let filter_data = reader.get_address_filter()?;
394 let Ok(wrapper) = FilterWrapper::from_bytes(filter_data) else {
398 return Err(capnp::Error::failed("Invalid address filter".to_string()));
399 };
400 address_filter = Some(wrapper);
401 }
402
403 let mut topics = ArrayVec::new();
404
405 if reader.has_topics() {
407 let topics_list = reader.get_topics()?;
408
409 if topics_list.len() > 4 {
410 return Err(capnp::Error::failed(
411 "Topics list should not exceed 4 topics".to_string(),
412 ));
413 }
414
415 for i in 0..topics_list.len() {
416 let topic_list = topics_list.get(i)?;
417 let mut topic_vec = Vec::new();
418 for j in 0..topic_list.len() {
419 let topic_data = topic_list.get(j)?;
420 if topic_data.len() == 32 {
421 let mut topic_bytes = [0u8; 32];
422 topic_bytes.copy_from_slice(topic_data);
423 topic_vec.push(LogArgument::from(topic_bytes));
424 }
425 }
426
427 topics.push(topic_vec);
428 }
429 }
430
431 Ok(Self {
432 address,
433 address_filter,
434 topics,
435 })
436 }
437}
438
439#[derive(
440 Debug,
441 Clone,
442 Copy,
443 Serialize,
444 Deserialize,
445 PartialEq,
446 Eq,
447 schemars::JsonSchema,
448 strum_macros::EnumIter,
449 strum_macros::AsRefStr,
450 strum_macros::Display,
451 strum_macros::EnumString,
452)]
453#[serde(rename_all = "snake_case")]
454#[strum(serialize_all = "snake_case")]
455#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))]
456pub enum LogField {
457 TransactionHash,
459 BlockHash,
460 BlockNumber,
461 TransactionIndex,
462 LogIndex,
463 Address,
464 Data,
465 Removed,
466
467 Topic0,
469 Topic1,
470 Topic2,
471 Topic3,
472}
473
474impl Ord for LogField {
475 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
476 self.as_ref().cmp(other.as_ref())
477 }
478}
479
480impl PartialOrd for LogField {
481 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
482 Some(self.cmp(other))
483 }
484}
485
486impl LogField {
487 pub fn all() -> std::collections::BTreeSet<Self> {
488 use strum::IntoEnumIterator;
489 Self::iter().collect()
490 }
491
492 pub const fn is_nullable(&self) -> bool {
493 match self {
494 LogField::Removed
495 | LogField::Topic0
496 | LogField::Topic1
497 | LogField::Topic2
498 | LogField::Topic3 => true,
499 LogField::TransactionHash
500 | LogField::BlockHash
501 | LogField::BlockNumber
502 | LogField::TransactionIndex
503 | LogField::LogIndex
504 | LogField::Address
505 | LogField::Data => false,
506 }
507 }
508
509 pub fn to_capnp(&self) -> crate::hypersync_net_types_capnp::LogField {
511 match self {
512 LogField::TransactionHash => {
513 crate::hypersync_net_types_capnp::LogField::TransactionHash
514 }
515 LogField::BlockHash => crate::hypersync_net_types_capnp::LogField::BlockHash,
516 LogField::BlockNumber => crate::hypersync_net_types_capnp::LogField::BlockNumber,
517 LogField::TransactionIndex => {
518 crate::hypersync_net_types_capnp::LogField::TransactionIndex
519 }
520 LogField::LogIndex => crate::hypersync_net_types_capnp::LogField::LogIndex,
521 LogField::Address => crate::hypersync_net_types_capnp::LogField::Address,
522 LogField::Data => crate::hypersync_net_types_capnp::LogField::Data,
523 LogField::Removed => crate::hypersync_net_types_capnp::LogField::Removed,
524 LogField::Topic0 => crate::hypersync_net_types_capnp::LogField::Topic0,
525 LogField::Topic1 => crate::hypersync_net_types_capnp::LogField::Topic1,
526 LogField::Topic2 => crate::hypersync_net_types_capnp::LogField::Topic2,
527 LogField::Topic3 => crate::hypersync_net_types_capnp::LogField::Topic3,
528 }
529 }
530
531 pub fn from_capnp(field: crate::hypersync_net_types_capnp::LogField) -> Self {
533 match field {
534 crate::hypersync_net_types_capnp::LogField::TransactionHash => {
535 LogField::TransactionHash
536 }
537 crate::hypersync_net_types_capnp::LogField::BlockHash => LogField::BlockHash,
538 crate::hypersync_net_types_capnp::LogField::BlockNumber => LogField::BlockNumber,
539 crate::hypersync_net_types_capnp::LogField::TransactionIndex => {
540 LogField::TransactionIndex
541 }
542 crate::hypersync_net_types_capnp::LogField::LogIndex => LogField::LogIndex,
543 crate::hypersync_net_types_capnp::LogField::Address => LogField::Address,
544 crate::hypersync_net_types_capnp::LogField::Data => LogField::Data,
545 crate::hypersync_net_types_capnp::LogField::Removed => LogField::Removed,
546 crate::hypersync_net_types_capnp::LogField::Topic0 => LogField::Topic0,
547 crate::hypersync_net_types_capnp::LogField::Topic1 => LogField::Topic1,
548 crate::hypersync_net_types_capnp::LogField::Topic2 => LogField::Topic2,
549 crate::hypersync_net_types_capnp::LogField::Topic3 => LogField::Topic3,
550 }
551 }
552}
553
554#[cfg(test)]
555mod tests {
556 use std::collections::HashMap;
557
558 use super::*;
559 use crate::{query::tests::test_query_serde, Query};
560 use hypersync_format::Hex;
561 use serde_json::json;
562
563 #[test]
564 fn test_all_fields_in_schema() {
565 let schema = hypersync_schema::log();
566 let schema_fields = schema
567 .fields
568 .iter()
569 .map(|f| f.name().clone())
570 .collect::<std::collections::BTreeSet<_>>();
571 let all_fields = LogField::all()
572 .into_iter()
573 .map(|f| f.as_ref().to_string())
574 .collect::<std::collections::BTreeSet<_>>();
575 assert_eq!(schema_fields, all_fields);
576 }
577
578 #[test]
579 fn test_serde_matches_strum() {
580 for field in LogField::all() {
581 let serialized = serde_json::to_string(&field).unwrap();
582 let strum = serde_json::to_string(&field.as_ref()).unwrap();
583 assert_eq!(serialized, strum, "strum value should be the same as serde");
584 }
585 }
586
587 #[test]
588 fn test_log_selection_serde_with_defaults() {
589 let log_selection = LogSelection::default();
590 let query = Query::new()
591 .where_logs(log_selection)
592 .select_log_fields(LogField::all());
593
594 test_query_serde(query, "log selection with defaults");
595 }
596
597 #[test]
598 fn test_log_selection_serde_with_full_values() {
599 let log_selection = LogFilter {
600 address: vec![
601 Address::decode_hex("0xdadB0d80178819F2319190D340ce9A924f783711").unwrap(),
602 ],
603 address_filter: Some(FilterWrapper::new(16, 1)),
604 topics: {
605 let mut topics = ArrayVec::new();
606 topics.push(vec![LogArgument::decode_hex(
607 "0x1234567890123456789012345678901234567890123456789012345678901234",
608 )
609 .unwrap()]);
610 topics.push(vec![LogArgument::decode_hex(
611 "0xabcdefabcdefabcdefabcdefabcdefabcdefabcdefabcdefabcdefabcdefabcd",
612 )
613 .unwrap()]);
614 topics
615 },
616 };
617 let query = Query::new()
618 .where_logs(log_selection)
619 .select_log_fields(LogField::all());
620
621 test_query_serde(query, "log selection with full values");
622 }
623
624 #[test]
625 fn test_log_filter_builder() -> anyhow::Result<()> {
626 let lf = LogFilter::all()
627 .and_address([
628 "0xdadB0d80178819F2319190D340ce9A924f783711",
629 "0xdadB0d80178819F2319190D340ce9A924f783712",
630 ])?
631 .and_topic0([
632 "0xabcdefabcdefabcdefabcdefabcdefabcdefabcdefabcdefabcdefabcdefabcd",
633 "0xabcdefabcdefabcdefabcdefabcdefabcdefabcdefabcdefabcdefabcdefabcd",
634 ])?;
635
636 assert_eq!(lf.address.len(), 2);
637 assert_eq!(lf.topics.len(), 1);
638 assert_eq!(lf.topics[0].len(), 2);
639 assert_eq!(lf.address_filter, None);
640
641 let lf =
642 lf.and_topic0(["0xabcdefabcdefabcdefabcdefabcdefabcdefabcdefabcdefabcdefabcdefabcd"])?;
643 assert_eq!(
644 lf.topics[0].len(),
645 1,
646 "shoul overwrite previous topic0 selection"
647 );
648
649 let lf = lf.and_topic3([
650 "0xabcdefabcdefabcdefabcdefabcdefabcdefabcdefabcdefabcdefabcdefabcd",
651 "0xabcdefabcdefabcdefabcdefabcdefabcdefabcdefabcdefabcdefabcdefabcd",
652 ])?;
653
654 assert_eq!(lf.topics[3].len(), 2, "should have correctly added topic3");
655 assert_eq!(
656 lf.topics[2].len(),
657 0,
658 "should have added empty topics before the first non-empty topic"
659 );
660 assert_eq!(
661 lf.topics[1].len(),
662 0,
663 "should have added empty topics before the first non-empty topic"
664 );
665 assert_eq!(lf.topics[0].len(), 1, "topic0 should not have been changed");
666
667 Ok(())
668 }
669
670 #[test]
671 fn test_log_filter_topic_configurations() {
672 fn check_log_filter_json(json: serde_json::Value) {
673 let lf: LogFilter = serde_json::from_value(json).unwrap();
674 println!("{lf:?}");
675 let mut message = capnp::message::Builder::new_default();
676 let mut lf_builder =
677 message.init_root::<hypersync_net_types_capnp::log_filter::Builder>();
678 lf.populate_builder(&mut lf_builder).unwrap();
679 let lf_reader = lf_builder.into_reader();
680 let read_lf: LogFilter = LogFilter::from_reader(lf_reader).unwrap();
681 assert_eq!(lf, read_lf);
682 }
683
684 const TOPIC: &str = "0x1234567890123456789012345678901234567890123456789012345678901234";
685
686 check_log_filter_json(json!({"topics": []}));
687 check_log_filter_json(json!({"topics": [[], [], [], []]}));
688 check_log_filter_json(json!({"topics": [[], [], [], []]}));
689 check_log_filter_json(json!({"topics": [[TOPIC], [], [], []]}));
690 check_log_filter_json(json!({"topics": [[], [], [TOPIC]]}));
691 }
692
693 #[test]
694 fn nullable_fields() {
695 let is_nullable_map: HashMap<_, _> = LogField::all()
696 .iter()
697 .map(|f| (f.to_string(), f.is_nullable()))
698 .collect();
699 for field in hypersync_schema::log().fields.iter() {
700 let should_be_nullable = is_nullable_map.get(field.name().as_str()).unwrap();
701 assert_eq!(
702 field.is_nullable(),
703 *should_be_nullable,
704 "field {} nullable mismatch",
705 field.name()
706 );
707 }
708 }
709}