1use crate::{hypersync_net_types_capnp, types::AnyOf, CapnpBuilder, CapnpReader, Selection};
2use anyhow::Context;
3use arrayvec::ArrayVec;
4use hypersync_format::{Address, FilterWrapper, LogArgument};
5use serde::{Deserialize, Serialize};
6
7pub type LogSelection = Selection<LogFilter>;
8
9#[derive(Default, Serialize, Deserialize, Clone, Debug, PartialEq)]
10pub struct LogFilter {
11 #[serde(default, skip_serializing_if = "Vec::is_empty")]
14 pub address: Vec<Address>,
15 #[serde(default, skip_serializing_if = "Option::is_none")]
16 pub address_filter: Option<FilterWrapper>,
17 #[serde(default, skip_serializing_if = "ArrayVec::is_empty")]
20 pub topics: ArrayVec<Vec<LogArgument>, 4>,
21}
22
23#[cfg(feature = "arbitrary")]
24impl<'input> arbitrary::Arbitrary<'input> for LogFilter {
25 fn arbitrary(u: &mut arbitrary::Unstructured<'input>) -> arbitrary::Result<Self> {
26 let num_topics = u.arbitrary::<u8>()? % 4 + 1;
27 let mut topics = ArrayVec::<Vec<LogArgument>, 4>::new();
28 for _ in 0..num_topics {
29 topics.push(u.arbitrary()?);
30 }
31
32 Ok(Self {
33 address: u.arbitrary()?,
34 address_filter: u.arbitrary()?,
35 topics,
36 })
37 }
38}
39
40impl From<LogFilter> for AnyOf<LogFilter> {
41 fn from(filter: LogFilter) -> Self {
42 Self::new(filter)
43 }
44}
45
46impl LogFilter {
47 pub fn all() -> Self {
51 Default::default()
52 }
53
54 pub fn or(self, other: Self) -> AnyOf<Self> {
80 AnyOf::new(self).or(other)
81 }
82
83 pub fn and_address<I, A>(mut self, addresses: I) -> anyhow::Result<Self>
121 where
122 I: IntoIterator<Item = A>,
123 A: TryInto<Address>,
124 A::Error: std::error::Error + Send + Sync + 'static,
125 {
126 let mut converted_addresses: Vec<Address> = Vec::new();
127 for (idx, address) in addresses.into_iter().enumerate() {
128 converted_addresses.push(
129 address
130 .try_into()
131 .with_context(|| format!("invalid address at position {idx}"))?,
132 );
133 }
134 self.address = converted_addresses;
135 Ok(self)
136 }
137
138 fn and_topic<I, T>(mut self, topic_idx: usize, topics: I) -> anyhow::Result<Self>
139 where
140 I: IntoIterator<Item = T>,
141 T: TryInto<LogArgument>,
142 T::Error: std::error::Error + Send + Sync + 'static,
143 {
144 if topic_idx > 3 {
145 anyhow::bail!("topic index should not be greater than 3");
146 }
147
148 if self.topics.len() <= topic_idx {
149 for _ in 0..=(topic_idx - self.topics.len()) {
150 self.topics.push(Vec::new());
151 }
152 }
153 let topic_selection = self
154 .topics
155 .get_mut(topic_idx)
156 .expect("topic should exist from previous check");
157 topic_selection.clear();
158 for (idx, topic) in topics.into_iter().enumerate() {
159 topic_selection.push(
160 topic
161 .try_into()
162 .with_context(|| format!("invalid topic at position {idx}"))?,
163 );
164 }
165 Ok(self)
166 }
167
168 pub fn and_topic0<I, T>(self, topics: I) -> anyhow::Result<Self>
208 where
209 I: IntoIterator<Item = T>,
210 T: TryInto<LogArgument>,
211 T::Error: std::error::Error + Send + Sync + 'static,
212 {
213 self.and_topic(0, topics)
214 }
215 pub fn and_topic1<I, T>(self, topics: I) -> anyhow::Result<Self>
247 where
248 I: IntoIterator<Item = T>,
249 T: TryInto<LogArgument>,
250 T::Error: std::error::Error + Send + Sync + 'static,
251 {
252 self.and_topic(1, topics)
253 }
254 pub fn and_topic2<I, T>(self, topics: I) -> anyhow::Result<Self>
286 where
287 I: IntoIterator<Item = T>,
288 T: TryInto<LogArgument>,
289 T::Error: std::error::Error + Send + Sync + 'static,
290 {
291 self.and_topic(2, topics)
292 }
293 pub fn and_topic3<I, T>(self, topics: I) -> anyhow::Result<Self>
325 where
326 I: IntoIterator<Item = T>,
327 T: TryInto<LogArgument>,
328 T::Error: std::error::Error + Send + Sync + 'static,
329 {
330 self.and_topic(3, topics)
331 }
332}
333
334impl CapnpBuilder<hypersync_net_types_capnp::log_filter::Owned> for LogFilter {
335 fn populate_builder(
336 &self,
337 builder: &mut hypersync_net_types_capnp::log_filter::Builder,
338 ) -> Result<(), capnp::Error> {
339 if !self.address.is_empty() {
341 let mut addr_list = builder.reborrow().init_address(self.address.len() as u32);
342 for (i, addr) in self.address.iter().enumerate() {
343 addr_list.set(i as u32, addr.as_slice());
344 }
345 }
346
347 if let Some(filter) = &self.address_filter {
349 builder.reborrow().set_address_filter(filter.0.as_bytes());
350 }
351
352 if !self.topics.is_empty() {
354 let mut topics_list = builder.reborrow().init_topics(self.topics.len() as u32);
355 for (i, topic_vec) in self.topics.iter().enumerate() {
356 let mut topic_list = topics_list
357 .reborrow()
358 .init(i as u32, topic_vec.len() as u32);
359 for (j, topic) in topic_vec.iter().enumerate() {
360 topic_list.set(j as u32, topic.as_slice());
361 }
362 }
363 }
364
365 Ok(())
366 }
367}
368
369impl CapnpReader<hypersync_net_types_capnp::log_filter::Owned> for LogFilter {
370 fn from_reader(
372 reader: hypersync_net_types_capnp::log_filter::Reader,
373 ) -> Result<Self, capnp::Error> {
374 let mut address = Vec::new();
375
376 if reader.has_address() {
378 let addr_list = reader.get_address()?;
379 for i in 0..addr_list.len() {
380 let addr_data = addr_list.get(i)?;
381 if addr_data.len() == 20 {
382 let mut addr_bytes = [0u8; 20];
383 addr_bytes.copy_from_slice(addr_data);
384 address.push(Address::from(addr_bytes));
385 }
386 }
387 }
388
389 let mut address_filter = None;
390
391 if reader.has_address_filter() {
393 let filter_data = reader.get_address_filter()?;
394 let Ok(wrapper) = FilterWrapper::from_bytes(filter_data) else {
398 return Err(capnp::Error::failed("Invalid address filter".to_string()));
399 };
400 address_filter = Some(wrapper);
401 }
402
403 let mut topics = ArrayVec::new();
404
405 if reader.has_topics() {
407 let topics_list = reader.get_topics()?;
408
409 if topics_list.len() > 4 {
410 return Err(capnp::Error::failed(
411 "Topics list should not exceed 4 topics".to_string(),
412 ));
413 }
414
415 for i in 0..topics_list.len() {
416 let topic_list = topics_list.get(i)?;
417 let mut topic_vec = Vec::new();
418 for j in 0..topic_list.len() {
419 let topic_data = topic_list.get(j)?;
420 if topic_data.len() == 32 {
421 let mut topic_bytes = [0u8; 32];
422 topic_bytes.copy_from_slice(topic_data);
423 topic_vec.push(LogArgument::from(topic_bytes));
424 }
425 }
426
427 topics.push(topic_vec);
428 }
429 }
430
431 Ok(Self {
432 address,
433 address_filter,
434 topics,
435 })
436 }
437}
438
439#[derive(
440 Debug,
441 Clone,
442 Copy,
443 Serialize,
444 Deserialize,
445 PartialEq,
446 Eq,
447 schemars::JsonSchema,
448 strum_macros::EnumIter,
449 strum_macros::AsRefStr,
450 strum_macros::Display,
451 strum_macros::EnumString,
452)]
453#[serde(rename_all = "snake_case")]
454#[strum(serialize_all = "snake_case")]
455#[cfg_attr(feature = "arbitrary", derive(arbitrary::Arbitrary))]
456pub enum LogField {
457 TransactionHash,
459 BlockHash,
460 BlockNumber,
461 TransactionIndex,
462 LogIndex,
463 Address,
464 Data,
465 Removed,
466
467 Topic0,
469 Topic1,
470 Topic2,
471 Topic3,
472}
473
474impl Ord for LogField {
475 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
476 self.as_ref().cmp(other.as_ref())
477 }
478}
479
480impl PartialOrd for LogField {
481 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
482 Some(self.cmp(other))
483 }
484}
485
486impl LogField {
487 pub fn all() -> std::collections::BTreeSet<Self> {
488 use strum::IntoEnumIterator;
489 Self::iter().collect()
490 }
491
492 pub fn to_capnp(&self) -> crate::hypersync_net_types_capnp::LogField {
494 match self {
495 LogField::TransactionHash => {
496 crate::hypersync_net_types_capnp::LogField::TransactionHash
497 }
498 LogField::BlockHash => crate::hypersync_net_types_capnp::LogField::BlockHash,
499 LogField::BlockNumber => crate::hypersync_net_types_capnp::LogField::BlockNumber,
500 LogField::TransactionIndex => {
501 crate::hypersync_net_types_capnp::LogField::TransactionIndex
502 }
503 LogField::LogIndex => crate::hypersync_net_types_capnp::LogField::LogIndex,
504 LogField::Address => crate::hypersync_net_types_capnp::LogField::Address,
505 LogField::Data => crate::hypersync_net_types_capnp::LogField::Data,
506 LogField::Removed => crate::hypersync_net_types_capnp::LogField::Removed,
507 LogField::Topic0 => crate::hypersync_net_types_capnp::LogField::Topic0,
508 LogField::Topic1 => crate::hypersync_net_types_capnp::LogField::Topic1,
509 LogField::Topic2 => crate::hypersync_net_types_capnp::LogField::Topic2,
510 LogField::Topic3 => crate::hypersync_net_types_capnp::LogField::Topic3,
511 }
512 }
513
514 pub fn from_capnp(field: crate::hypersync_net_types_capnp::LogField) -> Self {
516 match field {
517 crate::hypersync_net_types_capnp::LogField::TransactionHash => {
518 LogField::TransactionHash
519 }
520 crate::hypersync_net_types_capnp::LogField::BlockHash => LogField::BlockHash,
521 crate::hypersync_net_types_capnp::LogField::BlockNumber => LogField::BlockNumber,
522 crate::hypersync_net_types_capnp::LogField::TransactionIndex => {
523 LogField::TransactionIndex
524 }
525 crate::hypersync_net_types_capnp::LogField::LogIndex => LogField::LogIndex,
526 crate::hypersync_net_types_capnp::LogField::Address => LogField::Address,
527 crate::hypersync_net_types_capnp::LogField::Data => LogField::Data,
528 crate::hypersync_net_types_capnp::LogField::Removed => LogField::Removed,
529 crate::hypersync_net_types_capnp::LogField::Topic0 => LogField::Topic0,
530 crate::hypersync_net_types_capnp::LogField::Topic1 => LogField::Topic1,
531 crate::hypersync_net_types_capnp::LogField::Topic2 => LogField::Topic2,
532 crate::hypersync_net_types_capnp::LogField::Topic3 => LogField::Topic3,
533 }
534 }
535}
536
537#[cfg(test)]
538mod tests {
539 use super::*;
540 use crate::{query::tests::test_query_serde, Query};
541 use hypersync_format::Hex;
542 use serde_json::json;
543
544 #[test]
545 fn test_all_fields_in_schema() {
546 let schema = hypersync_schema::log();
547 let schema_fields = schema
548 .fields
549 .iter()
550 .map(|f| f.name.clone())
551 .collect::<std::collections::BTreeSet<_>>();
552 let all_fields = LogField::all()
553 .into_iter()
554 .map(|f| f.as_ref().to_string())
555 .collect::<std::collections::BTreeSet<_>>();
556 assert_eq!(schema_fields, all_fields);
557 }
558
559 #[test]
560 fn test_serde_matches_strum() {
561 for field in LogField::all() {
562 let serialized = serde_json::to_string(&field).unwrap();
563 let strum = serde_json::to_string(&field.as_ref()).unwrap();
564 assert_eq!(serialized, strum, "strum value should be the same as serde");
565 }
566 }
567
568 #[test]
569 fn test_log_selection_serde_with_defaults() {
570 let log_selection = LogSelection::default();
571 let query = Query::new()
572 .where_logs(log_selection)
573 .select_log_fields(LogField::all());
574
575 test_query_serde(query, "log selection with defaults");
576 }
577
578 #[test]
579 fn test_log_selection_serde_with_full_values() {
580 let log_selection = LogFilter {
581 address: vec![
582 Address::decode_hex("0xdadB0d80178819F2319190D340ce9A924f783711").unwrap(),
583 ],
584 address_filter: Some(FilterWrapper::new(16, 1)),
585 topics: {
586 let mut topics = ArrayVec::new();
587 topics.push(vec![LogArgument::decode_hex(
588 "0x1234567890123456789012345678901234567890123456789012345678901234",
589 )
590 .unwrap()]);
591 topics.push(vec![LogArgument::decode_hex(
592 "0xabcdefabcdefabcdefabcdefabcdefabcdefabcdefabcdefabcdefabcdefabcd",
593 )
594 .unwrap()]);
595 topics
596 },
597 };
598 let query = Query::new()
599 .where_logs(log_selection)
600 .select_log_fields(LogField::all());
601
602 test_query_serde(query, "log selection with full values");
603 }
604
605 #[test]
606 fn test_log_filter_builder() -> anyhow::Result<()> {
607 let lf = LogFilter::all()
608 .and_address([
609 "0xdadB0d80178819F2319190D340ce9A924f783711",
610 "0xdadB0d80178819F2319190D340ce9A924f783712",
611 ])?
612 .and_topic0([
613 "0xabcdefabcdefabcdefabcdefabcdefabcdefabcdefabcdefabcdefabcdefabcd",
614 "0xabcdefabcdefabcdefabcdefabcdefabcdefabcdefabcdefabcdefabcdefabcd",
615 ])?;
616
617 assert_eq!(lf.address.len(), 2);
618 assert_eq!(lf.topics.len(), 1);
619 assert_eq!(lf.topics[0].len(), 2);
620 assert_eq!(lf.address_filter, None);
621
622 let lf =
623 lf.and_topic0(["0xabcdefabcdefabcdefabcdefabcdefabcdefabcdefabcdefabcdefabcdefabcd"])?;
624 assert_eq!(
625 lf.topics[0].len(),
626 1,
627 "shoul overwrite previous topic0 selection"
628 );
629
630 let lf = lf.and_topic3([
631 "0xabcdefabcdefabcdefabcdefabcdefabcdefabcdefabcdefabcdefabcdefabcd",
632 "0xabcdefabcdefabcdefabcdefabcdefabcdefabcdefabcdefabcdefabcdefabcd",
633 ])?;
634
635 assert_eq!(lf.topics[3].len(), 2, "should have correctly added topic3");
636 assert_eq!(
637 lf.topics[2].len(),
638 0,
639 "should have added empty topics before the first non-empty topic"
640 );
641 assert_eq!(
642 lf.topics[1].len(),
643 0,
644 "should have added empty topics before the first non-empty topic"
645 );
646 assert_eq!(lf.topics[0].len(), 1, "topic0 should not have been changed");
647
648 Ok(())
649 }
650
651 #[test]
652 fn test_log_filter_topic_configurations() {
653 fn check_log_filter_json(json: serde_json::Value) {
654 let lf: LogFilter = serde_json::from_value(json).unwrap();
655 println!("{lf:?}");
656 let mut message = capnp::message::Builder::new_default();
657 let mut lf_builder =
658 message.init_root::<hypersync_net_types_capnp::log_filter::Builder>();
659 lf.populate_builder(&mut lf_builder).unwrap();
660 let lf_reader = lf_builder.into_reader();
661 let read_lf: LogFilter = LogFilter::from_reader(lf_reader).unwrap();
662 assert_eq!(lf, read_lf);
663 }
664
665 const TOPIC: &str = "0x1234567890123456789012345678901234567890123456789012345678901234";
666
667 check_log_filter_json(json!({"topics": []}));
668 check_log_filter_json(json!({"topics": [[], [], [], []]}));
669 check_log_filter_json(json!({"topics": [[], [], [], []]}));
670 check_log_filter_json(json!({"topics": [[TOPIC], [], [], []]}));
671 check_log_filter_json(json!({"topics": [[], [], [TOPIC]]}));
672 }
673}