hypersync_net_types/
request.rs

1use crate::hypersync_net_types_capnp::{query_body, request};
2use crate::{hypersync_net_types_capnp, CapnpReader, Query};
3use serde::{Deserialize, Serialize};
4
5use anyhow::Context;
6use capnp::message::{Builder, HeapAllocator};
7use hypersync_format::FixedSizeData;
8
9/// A 128 bit hash of the query body, used as a unique identifier for the query body
10#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq, Hash)]
11pub struct QueryId(pub FixedSizeData<16>);
12impl QueryId {
13    pub fn from_bytes(bytes: &[u8]) -> anyhow::Result<Self> {
14        let data = FixedSizeData::<16>::try_from(bytes).context("invalid query id length")?;
15        Ok(Self(data))
16    }
17
18    pub fn from_query_body_reader(reader: query_body::Reader<'_>) -> Result<QueryId, capnp::Error> {
19        // See https://capnproto.org/encoding.html#canonicalization
20        // we need to ensure the query body is canonicalized for hashing
21        let num_words = reader.total_size()?.word_count;
22        let first_segment_words = u32::try_from(num_words).map_err(|_| {
23            capnp::Error::failed(format!("query body too large for u32: {num_words} words"))
24        })?;
25
26        // Must account for the additional root pointer word
27        const ROOT_POINTER_WORDS: u32 = 1;
28        let mut canon_builder = capnp::message::Builder::new(
29            HeapAllocator::new().first_segment_words(first_segment_words + ROOT_POINTER_WORDS),
30        );
31        canon_builder.set_root_canonical(reader)?;
32
33        // After canonicalization, there is only one segment.
34        // We can just hash this withouth the segment table
35        let segment = match canon_builder.get_segments_for_output() {
36            capnp::OutputSegments::SingleSegment([segment]) => segment,
37            capnp::OutputSegments::MultiSegment(items) => {
38                return Err(capnp::Error::failed(format!(
39                    "Expected exactly 1 segment, found {}",
40                    items.len(),
41                )))
42            }
43        };
44
45        let hash: u128 = xxhash_rust::xxh3::xxh3_128(segment);
46
47        Ok(QueryId(FixedSizeData::<16>::from(hash.to_be_bytes())))
48    }
49
50    pub fn from_query(query: &Query) -> Result<Self, capnp::Error> {
51        let mut message = Builder::new_default();
52        let mut query_body_builder = message.init_root::<query_body::Builder>();
53        query_body_builder.build_from_query(query)?;
54        Self::from_query_body_reader(query_body_builder.into_reader())
55    }
56}
57
58pub enum Request {
59    QueryBody {
60        should_cache: bool,
61        query: Box<Query>,
62    },
63    QueryId {
64        from_block: u64,
65        to_block: Option<u64>,
66        id: QueryId,
67    },
68}
69
70impl Request {
71    pub fn from_capnp_bytes(bytes: &[u8]) -> Result<Self, capnp::Error> {
72        let message_reader =
73            capnp::serialize_packed::read_message(bytes, capnp::message::ReaderOptions::new())?;
74        let query = message_reader.get_root::<request::Reader>()?;
75        Request::from_reader(query)
76    }
77}
78
79impl CapnpReader<request::Owned> for Request {
80    fn from_reader(query: request::Reader) -> Result<Self, capnp::Error> {
81        let block_range = query.get_block_range()?;
82        let from_block = block_range.get_from_block();
83        let to_block = if block_range.has_to_block() {
84            Some(block_range.get_to_block()?.get_value())
85        } else {
86            None
87        };
88
89        match query.get_body().which()? {
90            request::body::Which::Query(query_body_reader) => {
91                let body_reader = query_body_reader?;
92                Ok(Self::QueryBody {
93                    should_cache: query.get_should_cache(),
94                    query: Box::new(Query::from_capnp_query_body_reader(
95                        &body_reader,
96                        from_block,
97                        to_block,
98                    )?),
99                })
100            }
101            request::body::Which::QueryId(id_bytes) => {
102                let id = QueryId::from_bytes(id_bytes?)
103                    .map_err(|_| capnp::Error::failed("Invalid query id bytes".to_string()))?;
104
105                Ok(Self::QueryId {
106                    from_block,
107                    to_block,
108                    id,
109                })
110            }
111        }
112    }
113}
114
115impl hypersync_net_types_capnp::block_range::Builder<'_> {
116    pub fn set(&mut self, from_block: u64, to_block: Option<u64>) -> Result<(), capnp::Error> {
117        self.reborrow().set_from_block(from_block);
118
119        if let Some(to_block) = to_block {
120            let mut to_block_builder = self.reborrow().init_to_block();
121            to_block_builder.set_value(to_block)
122        }
123
124        Ok(())
125    }
126}
127
128impl request::Builder<'_> {
129    pub fn build_full_query_from_query(
130        &mut self,
131        query: &Query,
132        should_cache: bool,
133    ) -> Result<(), capnp::Error> {
134        let mut block_range_builder = self.reborrow().init_block_range();
135        block_range_builder.set(query.from_block, query.to_block)?;
136
137        let mut query_body_builder = self.reborrow().init_body().init_query();
138        query_body_builder.build_from_query(query)?;
139
140        self.set_should_cache(should_cache);
141
142        Ok(())
143    }
144
145    pub fn build_query_id_from_query(&mut self, query: &Query) -> Result<(), capnp::Error> {
146        self.reborrow()
147            .init_block_range()
148            .set(query.from_block, query.to_block)?;
149
150        let id = QueryId::from_query(query)?;
151        self.reborrow().init_body().set_query_id(id.0.as_slice());
152        Ok(())
153    }
154}
155
156#[cfg(test)]
157mod tests {
158    use hypersync_format::Address;
159
160    use crate::{
161        log::LogField, BlockFilter, BlockSelection, CapnpBuilder, FieldSelection, LogFilter,
162        LogSelection,
163    };
164
165    use super::*;
166
167    #[test]
168    fn test_query_id() {
169        let query = Query {
170            logs: vec![Default::default()].into_iter().collect(),
171            field_selection: FieldSelection {
172                log: LogField::all(),
173                ..Default::default()
174            },
175            ..Default::default()
176        };
177
178        let query_id = QueryId::from_query(&query).unwrap();
179        println!("{query_id:?}");
180    }
181
182    #[test]
183    /// This was failing because the heap allocator was configured with a default first segment size of 1024.
184    fn test_large_query_id() {
185        let mut addresses = Vec::new();
186        for i in 0u64..200_000 {
187            let mock_bytes = i.to_be_bytes();
188            let padded_bytes = vec![0u8; 20 - mock_bytes.len()]
189                .into_iter()
190                .chain(mock_bytes)
191                .collect::<Vec<u8>>();
192            addresses.push(Address::try_from(padded_bytes).unwrap());
193        }
194        let query = Query::new().where_logs(LogFilter::all().and_address(addresses).unwrap());
195        let query_id = QueryId::from_query(&query);
196        assert!(query_id.is_ok());
197    }
198
199    #[test]
200    fn test_needs_canonicalization_for_hashing() {
201        fn add_log_selection(query_body_builder: &mut query_body::Builder) {
202            let mut logs_builder = query_body_builder.reborrow().init_logs(1).get(0);
203            LogSelection::from(LogFilter {
204                address: vec![FixedSizeData::<20>::from([1u8; 20])],
205                ..Default::default()
206            })
207            .populate_builder(&mut logs_builder)
208            .unwrap();
209        }
210        fn add_block_selection(query_body_builder: &mut query_body::Builder) {
211            let mut block_selection_builder = query_body_builder.reborrow().init_blocks(1).get(0);
212            BlockSelection::from(BlockFilter {
213                hash: vec![FixedSizeData::<32>::from([1u8; 32])],
214                ..Default::default()
215            })
216            .populate_builder(&mut block_selection_builder)
217            .unwrap();
218        }
219        let (hash_a, hash_a_canon) = {
220            let mut message = Builder::new_default();
221            let mut query_body_builder = message.init_root::<query_body::Builder>();
222            add_log_selection(&mut query_body_builder);
223            add_block_selection(&mut query_body_builder);
224
225            let mut message_canon = Builder::new_default();
226            message_canon
227                .set_root_canonical(query_body_builder.into_reader())
228                .unwrap();
229
230            let mut buf = Vec::new();
231            capnp::serialize::write_message(&mut buf, &message).unwrap();
232            let hash = xxhash_rust::xxh3::xxh3_128(&buf);
233            let mut buf = Vec::new();
234            capnp::serialize::write_message(&mut buf, &message_canon).unwrap();
235            let hash_canon = xxhash_rust::xxh3::xxh3_128(&buf);
236            (hash, hash_canon)
237        };
238
239        let (hash_b, hash_b_canon) = {
240            let mut message = Builder::new_default();
241            let mut query_body_builder = message.init_root::<query_body::Builder>();
242            // Insert block then log (the opposite order), allocater will not canonicalize
243            add_block_selection(&mut query_body_builder);
244            add_log_selection(&mut query_body_builder);
245
246            let mut message_canon = Builder::new_default();
247            message_canon
248                .set_root_canonical(query_body_builder.into_reader())
249                .unwrap();
250
251            let mut buf = Vec::new();
252            capnp::serialize::write_message(&mut buf, &message).unwrap();
253            let hash = xxhash_rust::xxh3::xxh3_128(&buf);
254            let mut buf = Vec::new();
255            capnp::serialize::write_message(&mut buf, &message_canon).unwrap();
256            let hash_canon = xxhash_rust::xxh3::xxh3_128(&buf);
257            (hash, hash_canon)
258        };
259        assert_ne!(
260            hash_a, hash_b,
261            "queries should be different since they are not canonicalized"
262        );
263
264        assert_eq!(
265            hash_a_canon, hash_b_canon,
266            "queries should be the same since they are canonicalized"
267        );
268    }
269}