hypersync_net_types/log.rs
1use crate::{hypersync_net_types_capnp, types::AnyOf, CapnpBuilder, CapnpReader, Selection};
2use anyhow::Context;
3use arrayvec::ArrayVec;
4use hypersync_format::{Address, FilterWrapper, LogArgument};
5use serde::{Deserialize, Serialize};
6
7pub type LogSelection = Selection<LogFilter>;
8
9#[derive(Default, Serialize, Deserialize, Clone, Debug, PartialEq)]
10pub struct LogFilter {
11 /// Address of the contract, any logs that has any of these addresses will be returned.
12 /// Empty means match all.
13 #[serde(default, skip_serializing_if = "Vec::is_empty")]
14 pub address: Vec<Address>,
15 #[serde(default, skip_serializing_if = "Option::is_none")]
16 pub address_filter: Option<FilterWrapper>,
17 /// Topics to match, each member of the top level array is another array, if the nth topic matches any
18 /// topic specified in nth element of topics, the log will be returned. Empty means match all.
19 #[serde(default, skip_serializing_if = "ArrayVec::is_empty")]
20 pub topics: ArrayVec<Vec<LogArgument>, 4>,
21}
22
23impl From<LogFilter> for AnyOf<LogFilter> {
24 fn from(filter: LogFilter) -> Self {
25 Self::new(filter)
26 }
27}
28
29impl LogFilter {
30 /// Base filter to match all logs
31 /// Narrow the filter down by chaining additional values for eg.
32 /// `LogFilter::all().and_address(["0xdac17f958d2ee523a2206206994597c13d831ec7"])`
33 pub fn all() -> Self {
34 Default::default()
35 }
36
37 /// Combine this filter with another using logical OR.
38 ///
39 /// Creates an `AnyOf` that matches logs satisfying either this filter or the other filter.
40 /// This allows for fluent chaining of multiple log filters with OR semantics.
41 ///
42 /// # Arguments
43 /// * `other` - Another `LogFilter` to combine with this one
44 ///
45 /// # Returns
46 /// An `AnyOf<LogFilter>` that matches logs satisfying either filter
47 ///
48 /// # Examples
49 ///
50 /// ```
51 /// use hypersync_net_types::LogFilter;
52 ///
53 /// // Match logs from specific contracts OR with specific topics
54 /// let filter = LogFilter::all()
55 /// .and_address(["0xdac17f958d2ee523a2206206994597c13d831ec7"])?
56 /// .or(
57 /// LogFilter::all()
58 /// .and_topic0(["0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef"])? // Transfer event
59 /// );
60 /// # Ok::<(), anyhow::Error>(())
61 /// ```
62 pub fn or(self, other: Self) -> AnyOf<Self> {
63 AnyOf::new(self).or(other)
64 }
65
66 /// Filter logs by any of the provided contract addresses.
67 ///
68 /// This method accepts any iterable of values that can be converted to `Address`.
69 /// Common input types include string slices, byte arrays, and `Address` objects.
70 ///
71 /// # Arguments
72 /// * `addresses` - An iterable of addresses to filter by
73 ///
74 /// # Returns
75 /// * `Ok(Self)` - The updated filter on success
76 /// * `Err(anyhow::Error)` - If any address fails to convert
77 ///
78 /// # Examples
79 ///
80 /// ```
81 /// use hypersync_net_types::LogFilter;
82 ///
83 /// // Filter by a single address using string
84 /// let filter = LogFilter::all()
85 /// .and_address(["0xdac17f958d2ee523a2206206994597c13d831ec7"])?;
86 ///
87 /// // Filter by multiple addresses
88 /// let filter = LogFilter::all()
89 /// .and_address([
90 /// "0xdac17f958d2ee523a2206206994597c13d831ec7", // USDT
91 /// "0xa0b86a33e6c11c8c0c5c0b5e6adee30d1a234567", // Another contract
92 /// ])?;
93 ///
94 /// // Using byte arrays
95 /// let usdt_address = [
96 /// 0xda, 0xc1, 0x7f, 0x95, 0x8d, 0x2e, 0xe5, 0x23, 0xa2, 0x20,
97 /// 0x62, 0x06, 0x99, 0x45, 0x97, 0xc1, 0x3d, 0x83, 0x1e, 0xc7
98 /// ];
99 /// let filter = LogFilter::all()
100 /// .and_address([usdt_address])?;
101 /// # Ok::<(), anyhow::Error>(())
102 /// ```
103 pub fn and_address<I, A>(mut self, addresses: I) -> anyhow::Result<Self>
104 where
105 I: IntoIterator<Item = A>,
106 A: TryInto<Address>,
107 A::Error: std::error::Error + Send + Sync + 'static,
108 {
109 let mut converted_addresses: Vec<Address> = Vec::new();
110 for (idx, address) in addresses.into_iter().enumerate() {
111 converted_addresses.push(
112 address
113 .try_into()
114 .with_context(|| format!("invalid address at position {idx}"))?,
115 );
116 }
117 self.address = converted_addresses;
118 Ok(self)
119 }
120
121 fn and_topic<I, T>(mut self, topic_idx: usize, topics: I) -> anyhow::Result<Self>
122 where
123 I: IntoIterator<Item = T>,
124 T: TryInto<LogArgument>,
125 T::Error: std::error::Error + Send + Sync + 'static,
126 {
127 if topic_idx > 3 {
128 anyhow::bail!("topic index should not be greater than 3");
129 }
130
131 if self.topics.len() <= topic_idx {
132 for _ in 0..=(topic_idx - self.topics.len()) {
133 self.topics.push(Vec::new());
134 }
135 }
136 let topic_selection = self
137 .topics
138 .get_mut(topic_idx)
139 .expect("topic should exist from previous check");
140 topic_selection.clear();
141 for (idx, topic) in topics.into_iter().enumerate() {
142 topic_selection.push(
143 topic
144 .try_into()
145 .with_context(|| format!("invalid topic at position {idx}"))?,
146 );
147 }
148 Ok(self)
149 }
150
151 /// Filter logs by any of the provided topic0 values.
152 ///
153 /// Topic0 typically contains the event signature hash for Ethereum logs.
154 /// This method accepts any iterable of values that can be converted to `LogArgument`.
155 /// Common input types include string slices, byte arrays, and `LogArgument` objects.
156 ///
157 /// # Arguments
158 /// * `topics` - An iterable of topic0 values to filter by
159 ///
160 /// # Returns
161 /// * `Ok(Self)` - The updated filter on success
162 /// * `Err(anyhow::Error)` - If any topic fails to convert
163 ///
164 /// # Examples
165 ///
166 /// ```
167 /// use hypersync_net_types::LogFilter;
168 ///
169 /// // Filter by Transfer event signature
170 /// let transfer_sig = "0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef";
171 /// let filter = LogFilter::all()
172 /// .and_topic0([transfer_sig])?;
173 ///
174 /// // Filter by multiple event signatures
175 /// let filter = LogFilter::all()
176 /// .and_topic0([
177 /// "0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef", // Transfer
178 /// "0x8c5be1e5ebec7d5bd14f71427d1e84f3dd0314c0f7b2291e5b200ac8c7c3b925", // Approval
179 /// ])?;
180 ///
181 /// // Using byte arrays
182 /// let transfer_bytes = [
183 /// 0xdd, 0xf2, 0x52, 0xad, 0x1b, 0xe2, 0xc8, 0x9b, 0x69, 0xc2, 0xb0, 0x68, 0xfc, 0x37, 0x8d, 0xaa,
184 /// 0x95, 0x2b, 0xa7, 0xf1, 0x63, 0xc4, 0xa1, 0x16, 0x28, 0xf5, 0x5a, 0x4d, 0xf5, 0x23, 0xb3, 0xef
185 /// ];
186 /// let filter = LogFilter::all()
187 /// .and_topic0([transfer_bytes])?;
188 /// # Ok::<(), anyhow::Error>(())
189 /// ```
190 pub fn and_topic0<I, T>(self, topics: I) -> anyhow::Result<Self>
191 where
192 I: IntoIterator<Item = T>,
193 T: TryInto<LogArgument>,
194 T::Error: std::error::Error + Send + Sync + 'static,
195 {
196 self.and_topic(0, topics)
197 }
198 /// Filter logs by any of the provided topic1 values.
199 ///
200 /// Topic1 typically contains the first indexed parameter of an Ethereum event.
201 /// This method accepts any iterable of values that can be converted to `LogArgument`.
202 /// Common input types include string slices, byte arrays, and `LogArgument` objects.
203 ///
204 /// # Arguments
205 /// * `topics` - An iterable of topic1 values to filter by
206 ///
207 /// # Returns
208 /// * `Ok(Self)` - The updated filter on success
209 /// * `Err(anyhow::Error)` - If any topic fails to convert
210 ///
211 /// # Examples
212 ///
213 /// ```
214 /// use hypersync_net_types::LogFilter;
215 ///
216 /// // Filter by specific sender address in Transfer events (topic1 = from)
217 /// let sender_address = "0x000000000000000000000000dac17f958d2ee523a2206206994597c13d831ec7";
218 /// let filter = LogFilter::all()
219 /// .and_topic1([sender_address])?;
220 ///
221 /// // Filter by multiple possible senders
222 /// let filter = LogFilter::all()
223 /// .and_topic1([
224 /// "0x000000000000000000000000dac17f958d2ee523a2206206994597c13d831ec7",
225 /// "0x000000000000000000000000a0b86a33e6c11c8c0c5c0b5e6adee30d1a234567",
226 /// ])?;
227 /// # Ok::<(), anyhow::Error>(())
228 /// ```
229 pub fn and_topic1<I, T>(self, topics: I) -> anyhow::Result<Self>
230 where
231 I: IntoIterator<Item = T>,
232 T: TryInto<LogArgument>,
233 T::Error: std::error::Error + Send + Sync + 'static,
234 {
235 self.and_topic(1, topics)
236 }
237 /// Filter logs by any of the provided topic2 values.
238 ///
239 /// Topic2 typically contains the second indexed parameter of an Ethereum event.
240 /// This method accepts any iterable of values that can be converted to `LogArgument`.
241 /// Common input types include string slices, byte arrays, and `LogArgument` objects.
242 ///
243 /// # Arguments
244 /// * `topics` - An iterable of topic2 values to filter by
245 ///
246 /// # Returns
247 /// * `Ok(Self)` - The updated filter on success
248 /// * `Err(anyhow::Error)` - If any topic fails to convert
249 ///
250 /// # Examples
251 ///
252 /// ```
253 /// use hypersync_net_types::LogFilter;
254 ///
255 /// // Filter by specific recipient address in Transfer events (topic2 = to)
256 /// let recipient_address = "0x000000000000000000000000a0b86a33e6c11c8c0c5c0b5e6adee30d1a234567";
257 /// let filter = LogFilter::all()
258 /// .and_topic2([recipient_address])?;
259 ///
260 /// // Filter by multiple possible recipients
261 /// let filter = LogFilter::all()
262 /// .and_topic2([
263 /// "0x000000000000000000000000a0b86a33e6c11c8c0c5c0b5e6adee30d1a234567",
264 /// "0x000000000000000000000000dac17f958d2ee523a2206206994597c13d831ec7",
265 /// ])?;
266 /// # Ok::<(), anyhow::Error>(())
267 /// ```
268 pub fn and_topic2<I, T>(self, topics: I) -> anyhow::Result<Self>
269 where
270 I: IntoIterator<Item = T>,
271 T: TryInto<LogArgument>,
272 T::Error: std::error::Error + Send + Sync + 'static,
273 {
274 self.and_topic(2, topics)
275 }
276 /// Filter logs by any of the provided topic3 values.
277 ///
278 /// Topic3 typically contains the third indexed parameter of an Ethereum event.
279 /// This method accepts any iterable of values that can be converted to `LogArgument`.
280 /// Common input types include string slices, byte arrays, and `LogArgument` objects.
281 ///
282 /// # Arguments
283 /// * `topics` - An iterable of topic3 values to filter by
284 ///
285 /// # Returns
286 /// * `Ok(Self)` - The updated filter on success
287 /// * `Err(anyhow::Error)` - If any topic fails to convert
288 ///
289 /// # Examples
290 ///
291 /// ```
292 /// use hypersync_net_types::LogFilter;
293 ///
294 /// // Filter by specific token ID in NFT Transfer events (topic3 = tokenId)
295 /// let token_id = "0x0000000000000000000000000000000000000000000000000000000000000001";
296 /// let filter = LogFilter::all()
297 /// .and_topic3([token_id])?;
298 ///
299 /// // Filter by multiple token IDs
300 /// let filter = LogFilter::all()
301 /// .and_topic3([
302 /// "0x0000000000000000000000000000000000000000000000000000000000000001",
303 /// "0x0000000000000000000000000000000000000000000000000000000000000002",
304 /// ])?;
305 /// # Ok::<(), anyhow::Error>(())
306 /// ```
307 pub fn and_topic3<I, T>(self, topics: I) -> anyhow::Result<Self>
308 where
309 I: IntoIterator<Item = T>,
310 T: TryInto<LogArgument>,
311 T::Error: std::error::Error + Send + Sync + 'static,
312 {
313 self.and_topic(3, topics)
314 }
315}
316
317impl CapnpBuilder<hypersync_net_types_capnp::log_filter::Owned> for LogFilter {
318 fn populate_builder(
319 &self,
320 builder: &mut hypersync_net_types_capnp::log_filter::Builder,
321 ) -> Result<(), capnp::Error> {
322 // Set addresses
323 if !self.address.is_empty() {
324 let mut addr_list = builder.reborrow().init_address(self.address.len() as u32);
325 for (i, addr) in self.address.iter().enumerate() {
326 addr_list.set(i as u32, addr.as_slice());
327 }
328 }
329
330 // Set address filter
331 if let Some(filter) = &self.address_filter {
332 builder.reborrow().set_address_filter(filter.0.as_bytes());
333 }
334
335 // Set topics
336 if !self.topics.is_empty() {
337 let mut topics_list = builder.reborrow().init_topics(self.topics.len() as u32);
338 for (i, topic_vec) in self.topics.iter().enumerate() {
339 let mut topic_list = topics_list
340 .reborrow()
341 .init(i as u32, topic_vec.len() as u32);
342 for (j, topic) in topic_vec.iter().enumerate() {
343 topic_list.set(j as u32, topic.as_slice());
344 }
345 }
346 }
347
348 Ok(())
349 }
350}
351
352impl CapnpReader<hypersync_net_types_capnp::log_filter::Owned> for LogFilter {
353 /// Deserialize LogSelection from Cap'n Proto reader
354 fn from_reader(
355 reader: hypersync_net_types_capnp::log_filter::Reader,
356 ) -> Result<Self, capnp::Error> {
357 let mut address = Vec::new();
358
359 // Parse addresses
360 if reader.has_address() {
361 let addr_list = reader.get_address()?;
362 for i in 0..addr_list.len() {
363 let addr_data = addr_list.get(i)?;
364 if addr_data.len() == 20 {
365 let mut addr_bytes = [0u8; 20];
366 addr_bytes.copy_from_slice(addr_data);
367 address.push(Address::from(addr_bytes));
368 }
369 }
370 }
371
372 let mut address_filter = None;
373
374 // Parse address filter
375 if reader.has_address_filter() {
376 let filter_data = reader.get_address_filter()?;
377 // For now, skip filter deserialization - this would need proper Filter construction
378 // log_selection.address_filter = Some(FilterWrapper::from_keys(std::iter::empty(), None).unwrap());
379
380 let Ok(wrapper) = FilterWrapper::from_bytes(filter_data) else {
381 return Err(capnp::Error::failed("Invalid address filter".to_string()));
382 };
383 address_filter = Some(wrapper);
384 }
385
386 let mut topics = ArrayVec::new();
387
388 // Parse topics
389 if reader.has_topics() {
390 let topics_list = reader.get_topics()?;
391 for i in 0..topics_list.len() {
392 let topic_list = topics_list.get(i)?;
393 let mut topic_vec = Vec::new();
394 for j in 0..topic_list.len() {
395 let topic_data = topic_list.get(j)?;
396 if topic_data.len() == 32 {
397 let mut topic_bytes = [0u8; 32];
398 topic_bytes.copy_from_slice(topic_data);
399 topic_vec.push(LogArgument::from(topic_bytes));
400 }
401 }
402 if i < 4 && !topic_vec.is_empty() {
403 topics.push(topic_vec);
404 }
405 }
406 }
407
408 Ok(Self {
409 address,
410 address_filter,
411 topics,
412 })
413 }
414}
415
416#[derive(
417 Debug,
418 Clone,
419 Copy,
420 Serialize,
421 Deserialize,
422 PartialEq,
423 Eq,
424 schemars::JsonSchema,
425 strum_macros::EnumIter,
426 strum_macros::AsRefStr,
427 strum_macros::Display,
428 strum_macros::EnumString,
429)]
430#[serde(rename_all = "snake_case")]
431#[strum(serialize_all = "snake_case")]
432pub enum LogField {
433 // Core log fields
434 TransactionHash,
435 BlockHash,
436 BlockNumber,
437 TransactionIndex,
438 LogIndex,
439 Address,
440 Data,
441 Removed,
442
443 // Topic fields
444 Topic0,
445 Topic1,
446 Topic2,
447 Topic3,
448}
449
450impl Ord for LogField {
451 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
452 self.as_ref().cmp(other.as_ref())
453 }
454}
455
456impl PartialOrd for LogField {
457 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
458 Some(self.cmp(other))
459 }
460}
461
462impl LogField {
463 pub fn all() -> std::collections::BTreeSet<Self> {
464 use strum::IntoEnumIterator;
465 Self::iter().collect()
466 }
467
468 /// Convert LogField to Cap'n Proto enum
469 pub fn to_capnp(&self) -> crate::hypersync_net_types_capnp::LogField {
470 match self {
471 LogField::TransactionHash => {
472 crate::hypersync_net_types_capnp::LogField::TransactionHash
473 }
474 LogField::BlockHash => crate::hypersync_net_types_capnp::LogField::BlockHash,
475 LogField::BlockNumber => crate::hypersync_net_types_capnp::LogField::BlockNumber,
476 LogField::TransactionIndex => {
477 crate::hypersync_net_types_capnp::LogField::TransactionIndex
478 }
479 LogField::LogIndex => crate::hypersync_net_types_capnp::LogField::LogIndex,
480 LogField::Address => crate::hypersync_net_types_capnp::LogField::Address,
481 LogField::Data => crate::hypersync_net_types_capnp::LogField::Data,
482 LogField::Removed => crate::hypersync_net_types_capnp::LogField::Removed,
483 LogField::Topic0 => crate::hypersync_net_types_capnp::LogField::Topic0,
484 LogField::Topic1 => crate::hypersync_net_types_capnp::LogField::Topic1,
485 LogField::Topic2 => crate::hypersync_net_types_capnp::LogField::Topic2,
486 LogField::Topic3 => crate::hypersync_net_types_capnp::LogField::Topic3,
487 }
488 }
489
490 /// Convert Cap'n Proto enum to LogField
491 pub fn from_capnp(field: crate::hypersync_net_types_capnp::LogField) -> Self {
492 match field {
493 crate::hypersync_net_types_capnp::LogField::TransactionHash => {
494 LogField::TransactionHash
495 }
496 crate::hypersync_net_types_capnp::LogField::BlockHash => LogField::BlockHash,
497 crate::hypersync_net_types_capnp::LogField::BlockNumber => LogField::BlockNumber,
498 crate::hypersync_net_types_capnp::LogField::TransactionIndex => {
499 LogField::TransactionIndex
500 }
501 crate::hypersync_net_types_capnp::LogField::LogIndex => LogField::LogIndex,
502 crate::hypersync_net_types_capnp::LogField::Address => LogField::Address,
503 crate::hypersync_net_types_capnp::LogField::Data => LogField::Data,
504 crate::hypersync_net_types_capnp::LogField::Removed => LogField::Removed,
505 crate::hypersync_net_types_capnp::LogField::Topic0 => LogField::Topic0,
506 crate::hypersync_net_types_capnp::LogField::Topic1 => LogField::Topic1,
507 crate::hypersync_net_types_capnp::LogField::Topic2 => LogField::Topic2,
508 crate::hypersync_net_types_capnp::LogField::Topic3 => LogField::Topic3,
509 }
510 }
511}
512
513#[cfg(test)]
514mod tests {
515 use hypersync_format::Hex;
516
517 use super::*;
518 use crate::{query::tests::test_query_serde, Query};
519
520 #[test]
521 fn test_all_fields_in_schema() {
522 let schema = hypersync_schema::log();
523 let schema_fields = schema
524 .fields
525 .iter()
526 .map(|f| f.name.clone())
527 .collect::<std::collections::BTreeSet<_>>();
528 let all_fields = LogField::all()
529 .into_iter()
530 .map(|f| f.as_ref().to_string())
531 .collect::<std::collections::BTreeSet<_>>();
532 assert_eq!(schema_fields, all_fields);
533 }
534
535 #[test]
536 fn test_serde_matches_strum() {
537 for field in LogField::all() {
538 let serialized = serde_json::to_string(&field).unwrap();
539 let strum = serde_json::to_string(&field.as_ref()).unwrap();
540 assert_eq!(serialized, strum, "strum value should be the same as serde");
541 }
542 }
543
544 #[test]
545 fn test_log_selection_serde_with_defaults() {
546 let log_selection = LogSelection::default();
547 let query = Query::new()
548 .where_logs(log_selection)
549 .select_log_fields(LogField::all());
550
551 test_query_serde(query, "log selection with defaults");
552 }
553
554 #[test]
555 fn test_log_selection_serde_with_full_values() {
556 let log_selection = LogFilter {
557 address: vec![
558 Address::decode_hex("0xdadB0d80178819F2319190D340ce9A924f783711").unwrap(),
559 ],
560 address_filter: Some(FilterWrapper::new(16, 1)),
561 topics: {
562 let mut topics = ArrayVec::new();
563 topics.push(vec![LogArgument::decode_hex(
564 "0x1234567890123456789012345678901234567890123456789012345678901234",
565 )
566 .unwrap()]);
567 topics.push(vec![LogArgument::decode_hex(
568 "0xabcdefabcdefabcdefabcdefabcdefabcdefabcdefabcdefabcdefabcdefabcd",
569 )
570 .unwrap()]);
571 topics
572 },
573 };
574 let query = Query::new()
575 .where_logs(log_selection)
576 .select_log_fields(LogField::all());
577
578 test_query_serde(query, "log selection with full values");
579 }
580
581 #[test]
582 fn test_log_filter_builder() -> anyhow::Result<()> {
583 let lf = LogFilter::all()
584 .and_address([
585 "0xdadB0d80178819F2319190D340ce9A924f783711",
586 "0xdadB0d80178819F2319190D340ce9A924f783712",
587 ])?
588 .and_topic0([
589 "0xabcdefabcdefabcdefabcdefabcdefabcdefabcdefabcdefabcdefabcdefabcd",
590 "0xabcdefabcdefabcdefabcdefabcdefabcdefabcdefabcdefabcdefabcdefabcd",
591 ])?;
592
593 assert_eq!(lf.address.len(), 2);
594 assert_eq!(lf.topics.len(), 1);
595 assert_eq!(lf.topics[0].len(), 2);
596 assert_eq!(lf.address_filter, None);
597
598 let lf =
599 lf.and_topic0(["0xabcdefabcdefabcdefabcdefabcdefabcdefabcdefabcdefabcdefabcdefabcd"])?;
600 assert_eq!(
601 lf.topics[0].len(),
602 1,
603 "shoul overwrite previous topic0 selection"
604 );
605
606 let lf = lf.and_topic3([
607 "0xabcdefabcdefabcdefabcdefabcdefabcdefabcdefabcdefabcdefabcdefabcd",
608 "0xabcdefabcdefabcdefabcdefabcdefabcdefabcdefabcdefabcdefabcdefabcd",
609 ])?;
610
611 assert_eq!(lf.topics[3].len(), 2, "should have correctly added topic3");
612 assert_eq!(
613 lf.topics[2].len(),
614 0,
615 "should have added empty topics before the first non-empty topic"
616 );
617 assert_eq!(
618 lf.topics[1].len(),
619 0,
620 "should have added empty topics before the first non-empty topic"
621 );
622 assert_eq!(lf.topics[0].len(), 1, "topic0 should not have been changed");
623
624 Ok(())
625 }
626}