hypersync_net_types/
request.rs

1use crate::hypersync_net_types_capnp::{query_body, request};
2use crate::{hypersync_net_types_capnp, CapnpReader, Query};
3use serde::{Deserialize, Serialize};
4
5use anyhow::Context;
6use capnp::message::{Builder, HeapAllocator};
7use hypersync_format::FixedSizeData;
8
9/// A 128 bit hash of the query body, used as a unique identifier for the query body
10#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq, Hash)]
11pub struct QueryId(pub FixedSizeData<16>);
12impl QueryId {
13    pub fn from_bytes(bytes: &[u8]) -> anyhow::Result<Self> {
14        let data = FixedSizeData::<16>::try_from(bytes).context("invalid query id length")?;
15        Ok(Self(data))
16    }
17
18    pub fn from_query_body_reader(reader: query_body::Reader<'_>) -> Result<QueryId, capnp::Error> {
19        // See https://capnproto.org/encoding.html#canonicalization
20        // we need to ensure the query body is canonicalized for hashing
21        let num_words = reader.total_size()?.word_count;
22        let first_segment_words = u32::try_from(num_words).map_err(|_| {
23            capnp::Error::failed(format!("query body too large for u32: {num_words} words"))
24        })?;
25
26        let mut canon_builder = capnp::message::Builder::new(
27            HeapAllocator::new().first_segment_words(first_segment_words),
28        );
29        canon_builder.set_root_canonical(reader)?;
30
31        // After canonicalization, there is only one segment.
32        // We can just hash this withouth the segment table
33        let segment = match canon_builder.get_segments_for_output() {
34            capnp::OutputSegments::SingleSegment([segment]) => segment,
35            capnp::OutputSegments::MultiSegment(items) => {
36                return Err(capnp::Error::failed(format!(
37                    "Expected exactly 1 segment, found {}",
38                    items.len(),
39                )))
40            }
41        };
42
43        let hash: u128 = xxhash_rust::xxh3::xxh3_128(segment);
44
45        Ok(QueryId(FixedSizeData::<16>::from(hash.to_be_bytes())))
46    }
47
48    pub fn from_query(query: &Query) -> Result<Self, capnp::Error> {
49        let mut message = Builder::new_default();
50        let mut query_body_builder = message.init_root::<query_body::Builder>();
51        query_body_builder.build_from_query(query)?;
52        Self::from_query_body_reader(query_body_builder.into_reader())
53    }
54}
55
56pub enum Request {
57    QueryBody {
58        should_cache: bool,
59        query: Box<Query>,
60    },
61    QueryId {
62        from_block: u64,
63        to_block: Option<u64>,
64        id: QueryId,
65    },
66}
67
68impl Request {
69    pub fn from_capnp_bytes(bytes: &[u8]) -> Result<Self, capnp::Error> {
70        let message_reader =
71            capnp::serialize_packed::read_message(bytes, capnp::message::ReaderOptions::new())?;
72        let query = message_reader.get_root::<request::Reader>()?;
73        Request::from_reader(query)
74    }
75}
76
77impl CapnpReader<request::Owned> for Request {
78    fn from_reader(query: request::Reader) -> Result<Self, capnp::Error> {
79        let block_range = query.get_block_range()?;
80        let from_block = block_range.get_from_block();
81        let to_block = if block_range.has_to_block() {
82            Some(block_range.get_to_block()?.get_value())
83        } else {
84            None
85        };
86
87        match query.get_body().which()? {
88            request::body::Which::Query(query_body_reader) => {
89                let body_reader = query_body_reader?;
90                Ok(Self::QueryBody {
91                    should_cache: query.get_should_cache(),
92                    query: Box::new(Query::from_capnp_query_body_reader(
93                        &body_reader,
94                        from_block,
95                        to_block,
96                    )?),
97                })
98            }
99            request::body::Which::QueryId(id_bytes) => {
100                let id = QueryId::from_bytes(id_bytes?)
101                    .map_err(|_| capnp::Error::failed("Invalid query id bytes".to_string()))?;
102
103                Ok(Self::QueryId {
104                    from_block,
105                    to_block,
106                    id,
107                })
108            }
109        }
110    }
111}
112
113impl hypersync_net_types_capnp::block_range::Builder<'_> {
114    pub fn set(&mut self, from_block: u64, to_block: Option<u64>) -> Result<(), capnp::Error> {
115        self.reborrow().set_from_block(from_block);
116
117        if let Some(to_block) = to_block {
118            let mut to_block_builder = self.reborrow().init_to_block();
119            to_block_builder.set_value(to_block)
120        }
121
122        Ok(())
123    }
124}
125
126impl request::Builder<'_> {
127    pub fn build_full_query_from_query(
128        &mut self,
129        query: &Query,
130        should_cache: bool,
131    ) -> Result<(), capnp::Error> {
132        let mut block_range_builder = self.reborrow().init_block_range();
133        block_range_builder.set(query.from_block, query.to_block)?;
134
135        let mut query_body_builder = self.reborrow().init_body().init_query();
136        query_body_builder.build_from_query(query)?;
137
138        self.set_should_cache(should_cache);
139
140        Ok(())
141    }
142
143    pub fn build_query_id_from_query(&mut self, query: &Query) -> Result<(), capnp::Error> {
144        self.reborrow()
145            .init_block_range()
146            .set(query.from_block, query.to_block)?;
147
148        let id = QueryId::from_query(query)?;
149        self.reborrow().init_body().set_query_id(id.0.as_slice());
150        Ok(())
151    }
152}
153
154#[cfg(test)]
155mod tests {
156    use hypersync_format::Address;
157
158    use crate::{
159        log::LogField, BlockFilter, BlockSelection, CapnpBuilder, FieldSelection, LogFilter,
160        LogSelection,
161    };
162
163    use super::*;
164
165    #[test]
166    fn test_query_id() {
167        let query = Query {
168            logs: vec![Default::default()].into_iter().collect(),
169            field_selection: FieldSelection {
170                log: LogField::all(),
171                ..Default::default()
172            },
173            ..Default::default()
174        };
175
176        let query_id = QueryId::from_query(&query).unwrap();
177        println!("{query_id:?}");
178    }
179
180    #[test]
181    /// This was failing because the heap allocator was configured with a default first segment size of 1024.
182    fn test_large_query_id() {
183        let mut addresses = Vec::new();
184        for i in 0u64..200_000 {
185            let mock_bytes = i.to_be_bytes();
186            let padded_bytes = vec![0u8; 20 - mock_bytes.len()]
187                .into_iter()
188                .chain(mock_bytes)
189                .collect::<Vec<u8>>();
190            addresses.push(Address::try_from(padded_bytes).unwrap());
191        }
192        let query = Query::new().where_logs(LogFilter::all().and_address(addresses).unwrap());
193        let query_id = QueryId::from_query(&query);
194        assert!(query_id.is_ok());
195    }
196
197    #[test]
198    fn test_needs_canonicalization_for_hashing() {
199        fn add_log_selection(query_body_builder: &mut query_body::Builder) {
200            let mut logs_builder = query_body_builder.reborrow().init_logs(1).get(0);
201            LogSelection::from(LogFilter {
202                address: vec![FixedSizeData::<20>::from([1u8; 20])],
203                ..Default::default()
204            })
205            .populate_builder(&mut logs_builder)
206            .unwrap();
207        }
208        fn add_block_selection(query_body_builder: &mut query_body::Builder) {
209            let mut block_selection_builder = query_body_builder.reborrow().init_blocks(1).get(0);
210            BlockSelection::from(BlockFilter {
211                hash: vec![FixedSizeData::<32>::from([1u8; 32])],
212                ..Default::default()
213            })
214            .populate_builder(&mut block_selection_builder)
215            .unwrap();
216        }
217        let (hash_a, hash_a_canon) = {
218            let mut message = Builder::new_default();
219            let mut query_body_builder = message.init_root::<query_body::Builder>();
220            add_log_selection(&mut query_body_builder);
221            add_block_selection(&mut query_body_builder);
222
223            let mut message_canon = Builder::new_default();
224            message_canon
225                .set_root_canonical(query_body_builder.into_reader())
226                .unwrap();
227
228            let mut buf = Vec::new();
229            capnp::serialize::write_message(&mut buf, &message).unwrap();
230            let hash = xxhash_rust::xxh3::xxh3_128(&buf);
231            let mut buf = Vec::new();
232            capnp::serialize::write_message(&mut buf, &message_canon).unwrap();
233            let hash_canon = xxhash_rust::xxh3::xxh3_128(&buf);
234            (hash, hash_canon)
235        };
236
237        let (hash_b, hash_b_canon) = {
238            let mut message = Builder::new_default();
239            let mut query_body_builder = message.init_root::<query_body::Builder>();
240            // Insert block then log (the opposite order), allocater will not canonicalize
241            add_block_selection(&mut query_body_builder);
242            add_log_selection(&mut query_body_builder);
243
244            let mut message_canon = Builder::new_default();
245            message_canon
246                .set_root_canonical(query_body_builder.into_reader())
247                .unwrap();
248
249            let mut buf = Vec::new();
250            capnp::serialize::write_message(&mut buf, &message).unwrap();
251            let hash = xxhash_rust::xxh3::xxh3_128(&buf);
252            let mut buf = Vec::new();
253            capnp::serialize::write_message(&mut buf, &message_canon).unwrap();
254            let hash_canon = xxhash_rust::xxh3::xxh3_128(&buf);
255            (hash, hash_canon)
256        };
257        assert_ne!(
258            hash_a, hash_b,
259            "queries should be different since they are not canonicalized"
260        );
261
262        assert_eq!(
263            hash_a_canon, hash_b_canon,
264            "queries should be the same since they are canonicalized"
265        );
266    }
267}