hypersync_net_types/
request.rs1use crate::hypersync_net_types_capnp::{query_body, request};
2use crate::{hypersync_net_types_capnp, CapnpReader, Query};
3use serde::{Deserialize, Serialize};
4
5use anyhow::Context;
6use capnp::message::Builder;
7use hypersync_format::FixedSizeData;
8
9#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq, Hash)]
11pub struct QueryId(pub FixedSizeData<16>);
12impl QueryId {
13 pub fn from_bytes(bytes: &[u8]) -> anyhow::Result<Self> {
14 let data = FixedSizeData::<16>::try_from(bytes).context("invalid query id length")?;
15 Ok(Self(data))
16 }
17
18 pub fn from_query_body_reader(reader: query_body::Reader<'_>) -> Result<QueryId, capnp::Error> {
19 let mut canon_builder = capnp::message::Builder::new_default();
22 canon_builder.set_root_canonical(reader)?;
23
24 let segment = match canon_builder.get_segments_for_output() {
27 capnp::OutputSegments::SingleSegment([segment]) => segment,
28 capnp::OutputSegments::MultiSegment(items) => {
29 return Err(capnp::Error::failed(format!(
30 "Expected exactly 1 segment, found {}",
31 items.len(),
32 )))
33 }
34 };
35
36 let hash: u128 = xxhash_rust::xxh3::xxh3_128(segment);
37
38 Ok(QueryId(FixedSizeData::<16>::from(hash.to_be_bytes())))
39 }
40
41 pub fn from_query(query: &Query) -> Result<Self, capnp::Error> {
42 let mut message = Builder::new_default();
43 let mut query_body_builder = message.init_root::<query_body::Builder>();
44 query_body_builder.build_from_query(query)?;
45 Self::from_query_body_reader(query_body_builder.into_reader())
46 }
47}
48
49pub enum Request {
50 QueryBody {
51 should_cache: bool,
52 query: Box<Query>,
53 },
54 QueryId {
55 from_block: u64,
56 to_block: Option<u64>,
57 id: QueryId,
58 },
59}
60
61impl Request {
62 pub fn from_capnp_bytes(bytes: &[u8]) -> Result<Self, capnp::Error> {
63 let message_reader =
64 capnp::serialize_packed::read_message(bytes, capnp::message::ReaderOptions::new())?;
65 let query = message_reader.get_root::<request::Reader>()?;
66 Request::from_reader(query)
67 }
68}
69
70impl CapnpReader<request::Owned> for Request {
71 fn from_reader(query: request::Reader) -> Result<Self, capnp::Error> {
72 let block_range = query.get_block_range()?;
73 let from_block = block_range.get_from_block();
74 let to_block = if block_range.has_to_block() {
75 Some(block_range.get_to_block()?.get_value())
76 } else {
77 None
78 };
79
80 match query.get_body().which()? {
81 request::body::Which::Query(query_body_reader) => {
82 let body_reader = query_body_reader?;
83 Ok(Self::QueryBody {
84 should_cache: query.get_should_cache(),
85 query: Box::new(Query::from_capnp_query_body_reader(
86 &body_reader,
87 from_block,
88 to_block,
89 )?),
90 })
91 }
92 request::body::Which::QueryId(id_bytes) => {
93 let id = QueryId::from_bytes(id_bytes?)
94 .map_err(|_| capnp::Error::failed("Invalid query id bytes".to_string()))?;
95
96 Ok(Self::QueryId {
97 from_block,
98 to_block,
99 id,
100 })
101 }
102 }
103 }
104}
105
106impl hypersync_net_types_capnp::block_range::Builder<'_> {
107 pub fn set(&mut self, from_block: u64, to_block: Option<u64>) -> Result<(), capnp::Error> {
108 self.reborrow().set_from_block(from_block);
109
110 if let Some(to_block) = to_block {
111 let mut to_block_builder = self.reborrow().init_to_block();
112 to_block_builder.set_value(to_block)
113 }
114
115 Ok(())
116 }
117}
118
119impl request::Builder<'_> {
120 pub fn build_full_query_from_query(
121 &mut self,
122 query: &Query,
123 should_cache: bool,
124 ) -> Result<(), capnp::Error> {
125 let mut block_range_builder = self.reborrow().init_block_range();
126 block_range_builder.set(query.from_block, query.to_block)?;
127
128 let mut query_body_builder = self.reborrow().init_body().init_query();
129 query_body_builder.build_from_query(query)?;
130
131 self.set_should_cache(should_cache);
132
133 Ok(())
134 }
135
136 pub fn build_query_id_from_query(&mut self, query: &Query) -> Result<(), capnp::Error> {
137 self.reborrow()
138 .init_block_range()
139 .set(query.from_block, query.to_block)?;
140
141 let id = QueryId::from_query(query)?;
142 self.reborrow().init_body().set_query_id(id.0.as_slice());
143 Ok(())
144 }
145}
146
147#[cfg(test)]
148mod tests {
149 use crate::{
150 log::LogField, BlockFilter, BlockSelection, CapnpBuilder, FieldSelection, LogFilter,
151 LogSelection,
152 };
153
154 use super::*;
155
156 #[test]
157 fn test_query_id() {
158 let query = Query {
159 logs: vec![Default::default()].into_iter().collect(),
160 field_selection: FieldSelection {
161 log: LogField::all(),
162 ..Default::default()
163 },
164 ..Default::default()
165 };
166
167 let query_id = QueryId::from_query(&query).unwrap();
168 println!("{query_id:?}");
169 }
170
171 #[test]
172 fn test_needs_canonicalization_for_hashing() {
173 fn add_log_selection(query_body_builder: &mut query_body::Builder) {
174 let mut logs_builder = query_body_builder.reborrow().init_logs(1).get(0);
175 LogSelection::from(LogFilter {
176 address: vec![FixedSizeData::<20>::from([1u8; 20])],
177 ..Default::default()
178 })
179 .populate_builder(&mut logs_builder)
180 .unwrap();
181 }
182 fn add_block_selection(query_body_builder: &mut query_body::Builder) {
183 let mut block_selection_builder = query_body_builder.reborrow().init_blocks(1).get(0);
184 BlockSelection::from(BlockFilter {
185 hash: vec![FixedSizeData::<32>::from([1u8; 32])],
186 ..Default::default()
187 })
188 .populate_builder(&mut block_selection_builder)
189 .unwrap();
190 }
191 let (hash_a, hash_a_canon) = {
192 let mut message = Builder::new_default();
193 let mut query_body_builder = message.init_root::<query_body::Builder>();
194 add_log_selection(&mut query_body_builder);
195 add_block_selection(&mut query_body_builder);
196
197 let mut message_canon = Builder::new_default();
198 message_canon
199 .set_root_canonical(query_body_builder.into_reader())
200 .unwrap();
201
202 let mut buf = Vec::new();
203 capnp::serialize::write_message(&mut buf, &message).unwrap();
204 let hash = xxhash_rust::xxh3::xxh3_128(&buf);
205 let mut buf = Vec::new();
206 capnp::serialize::write_message(&mut buf, &message_canon).unwrap();
207 let hash_canon = xxhash_rust::xxh3::xxh3_128(&buf);
208 (hash, hash_canon)
209 };
210
211 let (hash_b, hash_b_canon) = {
212 let mut message = Builder::new_default();
213 let mut query_body_builder = message.init_root::<query_body::Builder>();
214 add_block_selection(&mut query_body_builder);
216 add_log_selection(&mut query_body_builder);
217
218 let mut message_canon = Builder::new_default();
219 message_canon
220 .set_root_canonical(query_body_builder.into_reader())
221 .unwrap();
222
223 let mut buf = Vec::new();
224 capnp::serialize::write_message(&mut buf, &message).unwrap();
225 let hash = xxhash_rust::xxh3::xxh3_128(&buf);
226 let mut buf = Vec::new();
227 capnp::serialize::write_message(&mut buf, &message_canon).unwrap();
228 let hash_canon = xxhash_rust::xxh3::xxh3_128(&buf);
229 (hash, hash_canon)
230 };
231 assert_ne!(
232 hash_a, hash_b,
233 "queries should be different since they are not canonicalized"
234 );
235
236 assert_eq!(
237 hash_a_canon, hash_b_canon,
238 "queries should be the same since they are canonicalized"
239 );
240 }
241}