hypersync_net_types/
request.rs

1use crate::hypersync_net_types_capnp::{query_body, request};
2use crate::{hypersync_net_types_capnp, CapnpReader, Query};
3use serde::{Deserialize, Serialize};
4
5use anyhow::Context;
6use capnp::message::Builder;
7use hypersync_format::FixedSizeData;
8
9/// A 128 bit hash of the query body, used as a unique identifier for the query body
10#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq, Hash)]
11pub struct QueryId(pub FixedSizeData<16>);
12impl QueryId {
13    pub fn from_bytes(bytes: &[u8]) -> anyhow::Result<Self> {
14        let data = FixedSizeData::<16>::try_from(bytes).context("invalid query id length")?;
15        Ok(Self(data))
16    }
17
18    pub fn from_query_body_reader(reader: query_body::Reader<'_>) -> Result<QueryId, capnp::Error> {
19        // See https://capnproto.org/encoding.html#canonicalization
20        // we need to ensure the query body is canonicalized for hashing
21        let mut canon_builder = capnp::message::Builder::new_default();
22        canon_builder.set_root_canonical(reader)?;
23
24        // After canonicalization, there is only one segment.
25        // We can just hash this withouth the segment table
26        let segment = match canon_builder.get_segments_for_output() {
27            capnp::OutputSegments::SingleSegment([segment]) => segment,
28            capnp::OutputSegments::MultiSegment(items) => {
29                return Err(capnp::Error::failed(format!(
30                    "Expected exactly 1 segment, found {}",
31                    items.len(),
32                )))
33            }
34        };
35
36        let hash: u128 = xxhash_rust::xxh3::xxh3_128(segment);
37
38        Ok(QueryId(FixedSizeData::<16>::from(hash.to_be_bytes())))
39    }
40
41    pub fn from_query(query: &Query) -> Result<Self, capnp::Error> {
42        let mut message = Builder::new_default();
43        let mut query_body_builder = message.init_root::<query_body::Builder>();
44        query_body_builder.build_from_query(query)?;
45        Self::from_query_body_reader(query_body_builder.into_reader())
46    }
47}
48
49pub enum Request {
50    QueryBody {
51        should_cache: bool,
52        query: Box<Query>,
53    },
54    QueryId {
55        from_block: u64,
56        to_block: Option<u64>,
57        id: QueryId,
58    },
59}
60
61impl Request {
62    pub fn from_capnp_bytes(bytes: &[u8]) -> Result<Self, capnp::Error> {
63        let message_reader =
64            capnp::serialize_packed::read_message(bytes, capnp::message::ReaderOptions::new())?;
65        let query = message_reader.get_root::<request::Reader>()?;
66        Request::from_reader(query)
67    }
68}
69
70impl CapnpReader<request::Owned> for Request {
71    fn from_reader(query: request::Reader) -> Result<Self, capnp::Error> {
72        let block_range = query.get_block_range()?;
73        let from_block = block_range.get_from_block();
74        let to_block = if block_range.has_to_block() {
75            Some(block_range.get_to_block()?.get_value())
76        } else {
77            None
78        };
79
80        match query.get_body().which()? {
81            request::body::Which::Query(query_body_reader) => {
82                let body_reader = query_body_reader?;
83                Ok(Self::QueryBody {
84                    should_cache: query.get_should_cache(),
85                    query: Box::new(Query::from_capnp_query_body_reader(
86                        &body_reader,
87                        from_block,
88                        to_block,
89                    )?),
90                })
91            }
92            request::body::Which::QueryId(id_bytes) => {
93                let id = QueryId::from_bytes(id_bytes?)
94                    .map_err(|_| capnp::Error::failed("Invalid query id bytes".to_string()))?;
95
96                Ok(Self::QueryId {
97                    from_block,
98                    to_block,
99                    id,
100                })
101            }
102        }
103    }
104}
105
106impl hypersync_net_types_capnp::block_range::Builder<'_> {
107    pub fn set(&mut self, from_block: u64, to_block: Option<u64>) -> Result<(), capnp::Error> {
108        self.reborrow().set_from_block(from_block);
109
110        if let Some(to_block) = to_block {
111            let mut to_block_builder = self.reborrow().init_to_block();
112            to_block_builder.set_value(to_block)
113        }
114
115        Ok(())
116    }
117}
118
119impl request::Builder<'_> {
120    pub fn build_full_query_from_query(
121        &mut self,
122        query: &Query,
123        should_cache: bool,
124    ) -> Result<(), capnp::Error> {
125        let mut block_range_builder = self.reborrow().init_block_range();
126        block_range_builder.set(query.from_block, query.to_block)?;
127
128        let mut query_body_builder = self.reborrow().init_body().init_query();
129        query_body_builder.build_from_query(query)?;
130
131        self.set_should_cache(should_cache);
132
133        Ok(())
134    }
135
136    pub fn build_query_id_from_query(&mut self, query: &Query) -> Result<(), capnp::Error> {
137        self.reborrow()
138            .init_block_range()
139            .set(query.from_block, query.to_block)?;
140
141        let id = QueryId::from_query(query)?;
142        self.reborrow().init_body().set_query_id(id.0.as_slice());
143        Ok(())
144    }
145}
146
147#[cfg(test)]
148mod tests {
149    use crate::{
150        log::LogField, BlockFilter, BlockSelection, CapnpBuilder, FieldSelection, LogFilter,
151        LogSelection,
152    };
153
154    use super::*;
155
156    #[test]
157    fn test_query_id() {
158        let query = Query {
159            logs: vec![Default::default()].into_iter().collect(),
160            field_selection: FieldSelection {
161                log: LogField::all(),
162                ..Default::default()
163            },
164            ..Default::default()
165        };
166
167        let query_id = QueryId::from_query(&query).unwrap();
168        println!("{query_id:?}");
169    }
170
171    #[test]
172    fn test_needs_canonicalization_for_hashing() {
173        fn add_log_selection(query_body_builder: &mut query_body::Builder) {
174            let mut logs_builder = query_body_builder.reborrow().init_logs(1).get(0);
175            LogSelection::from(LogFilter {
176                address: vec![FixedSizeData::<20>::from([1u8; 20])],
177                ..Default::default()
178            })
179            .populate_builder(&mut logs_builder)
180            .unwrap();
181        }
182        fn add_block_selection(query_body_builder: &mut query_body::Builder) {
183            let mut block_selection_builder = query_body_builder.reborrow().init_blocks(1).get(0);
184            BlockSelection::from(BlockFilter {
185                hash: vec![FixedSizeData::<32>::from([1u8; 32])],
186                ..Default::default()
187            })
188            .populate_builder(&mut block_selection_builder)
189            .unwrap();
190        }
191        let (hash_a, hash_a_canon) = {
192            let mut message = Builder::new_default();
193            let mut query_body_builder = message.init_root::<query_body::Builder>();
194            add_log_selection(&mut query_body_builder);
195            add_block_selection(&mut query_body_builder);
196
197            let mut message_canon = Builder::new_default();
198            message_canon
199                .set_root_canonical(query_body_builder.into_reader())
200                .unwrap();
201
202            let mut buf = Vec::new();
203            capnp::serialize::write_message(&mut buf, &message).unwrap();
204            let hash = xxhash_rust::xxh3::xxh3_128(&buf);
205            let mut buf = Vec::new();
206            capnp::serialize::write_message(&mut buf, &message_canon).unwrap();
207            let hash_canon = xxhash_rust::xxh3::xxh3_128(&buf);
208            (hash, hash_canon)
209        };
210
211        let (hash_b, hash_b_canon) = {
212            let mut message = Builder::new_default();
213            let mut query_body_builder = message.init_root::<query_body::Builder>();
214            // Insert block then log (the opposite order), allocater will not canonicalize
215            add_block_selection(&mut query_body_builder);
216            add_log_selection(&mut query_body_builder);
217
218            let mut message_canon = Builder::new_default();
219            message_canon
220                .set_root_canonical(query_body_builder.into_reader())
221                .unwrap();
222
223            let mut buf = Vec::new();
224            capnp::serialize::write_message(&mut buf, &message).unwrap();
225            let hash = xxhash_rust::xxh3::xxh3_128(&buf);
226            let mut buf = Vec::new();
227            capnp::serialize::write_message(&mut buf, &message_canon).unwrap();
228            let hash_canon = xxhash_rust::xxh3::xxh3_128(&buf);
229            (hash, hash_canon)
230        };
231        assert_ne!(
232            hash_a, hash_b,
233            "queries should be different since they are not canonicalized"
234        );
235
236        assert_eq!(
237            hash_a_canon, hash_b_canon,
238            "queries should be the same since they are canonicalized"
239        );
240    }
241}