exocore_chain/engine/pending_sync/
sync_range.rs1use std::ops::Bound;
2
3use exocore_core::{
4 framing::FrameReader,
5 sec::hash::{Multihash, MultihashDigestExt, Sha3_256},
6};
7use exocore_protos::generated::data_transport_capnp::pending_sync_range;
8
9use super::{OperationDetailsLevel, PendingSyncConfig};
10use crate::{operation::OperationId, pending::StoredOperation};
11
12pub struct SyncRangesBuilder {
15 config: PendingSyncConfig,
16 pub ranges: Vec<SyncRangeBuilder>,
17}
18
19impl SyncRangesBuilder {
20 pub fn new(config: PendingSyncConfig) -> SyncRangesBuilder {
21 SyncRangesBuilder {
22 config,
23 ranges: Vec::new(),
24 }
25 }
26
27 pub(super) fn push_operation(
30 &mut self,
31 operation: StoredOperation,
32 details: OperationDetailsLevel,
33 ) {
34 if self.ranges.is_empty() {
35 self.push_new_range(Bound::Unbounded);
36 } else {
37 let last_range_size = self.ranges.last().map_or(0, |r| r.operations_count);
38 if last_range_size > self.config.max_operations_per_range {
39 let last_range_to = self.last_range_to_bound().expect("Should had a last range");
40
41 if let Bound::Included(to) = last_range_to {
44 self.push_new_range(Bound::Excluded(to));
45 } else {
46 panic!("Expected current range end bound to be included");
47 }
48 }
49 }
50
51 let last_range = self
52 .ranges
53 .last_mut()
54 .expect("Ranges should have had at least one range");
55 last_range.push_operation(operation, details);
56 }
57
58 pub fn push_new_range(&mut self, from_bound: Bound<OperationId>) {
59 self.ranges
60 .push(SyncRangeBuilder::new(from_bound, Bound::Unbounded));
61 }
62
63 pub fn push_range(&mut self, sync_range: SyncRangeBuilder) {
64 self.ranges.push(sync_range);
65 }
66
67 pub fn set_last_range_to_bound(&mut self, to_bound: Bound<OperationId>) {
68 if let Some(range) = self.ranges.last_mut() {
69 range.to_operation = to_bound;
70 }
71 }
72
73 fn last_range_to_bound(&self) -> Option<Bound<OperationId>> {
74 self.ranges.last().map(|r| r.to_operation)
75 }
76}
77
78pub struct SyncRangeBuilder {
87 pub from_operation: Bound<OperationId>,
88 pub to_operation: Bound<OperationId>,
89
90 pub operations: Vec<StoredOperation>,
91 pub operations_headers: Vec<StoredOperation>,
92 pub operations_count: u32,
93
94 pub hasher: Option<Sha3_256>,
95 pub hash: Option<Multihash<32>>,
96}
97
98impl SyncRangeBuilder {
99 fn new(
100 from_operation: Bound<OperationId>,
101 to_operation: Bound<OperationId>,
102 ) -> SyncRangeBuilder {
103 SyncRangeBuilder {
104 from_operation,
105 to_operation,
106 operations: Vec::new(),
107 operations_headers: Vec::new(),
108 operations_count: 0,
109 hasher: Some(Sha3_256::default()),
110 hash: None,
111 }
112 }
113
114 pub(crate) fn new_hashed(
115 operations_range: (Bound<OperationId>, Bound<OperationId>),
116 operations_hash: Multihash<32>,
117 operations_count: u32,
118 ) -> SyncRangeBuilder {
119 SyncRangeBuilder {
120 from_operation: operations_range.0,
121 to_operation: operations_range.1,
122 operations: Vec::new(),
123 operations_headers: Vec::new(),
124 operations_count,
125 hasher: None,
126 hash: Some(operations_hash),
127 }
128 }
129
130 fn push_operation(&mut self, operation: StoredOperation, details: OperationDetailsLevel) {
131 self.to_operation = Bound::Included(operation.operation_id);
132 self.operations_count += 1;
133
134 if let Some(hasher) = self.hasher.as_mut() {
135 hasher.input_signed_frame(operation.frame.inner().inner())
136 }
137
138 match details {
139 OperationDetailsLevel::Full => {
140 self.operations.push(operation);
141 }
142 OperationDetailsLevel::Header => {
143 self.operations_headers.push(operation);
144 }
145 OperationDetailsLevel::None => {
146 }
148 }
149 }
150
151 pub(crate) fn write_into_sync_range_builder(
152 mut self,
153 range_msg_builder: &mut pending_sync_range::Builder,
154 ) {
155 match self.from_operation {
156 Bound::Included(bound) => {
157 range_msg_builder.set_from_included(true);
158 range_msg_builder.set_from_operation(bound);
159 }
160 Bound::Excluded(bound) => {
161 range_msg_builder.set_from_included(false);
162 range_msg_builder.set_from_operation(bound);
163 }
164 Bound::Unbounded => {
165 range_msg_builder.set_from_included(false);
166 range_msg_builder.set_from_operation(0);
167 }
168 }
169
170 match self.to_operation {
171 Bound::Included(bound) => {
172 range_msg_builder.set_to_included(true);
173 range_msg_builder.set_to_operation(bound);
174 }
175 Bound::Excluded(bound) => {
176 range_msg_builder.set_to_included(false);
177 range_msg_builder.set_to_operation(bound);
178 }
179 Bound::Unbounded => {
180 range_msg_builder.set_to_included(false);
181 range_msg_builder.set_to_operation(0);
182 }
183 }
184
185 range_msg_builder.set_operations_count(self.operations_count);
186
187 if !self.operations_headers.is_empty() {
188 let mut operations_headers_builder = range_msg_builder
189 .reborrow()
190 .init_operations_headers(self.operations_headers.len() as u32);
191 for (i, operation) in self.operations_headers.iter().enumerate() {
192 let mut op_header_builder = operations_headers_builder.reborrow().get(i as u32);
193 op_header_builder.set_group_id(operation.group_id);
194 op_header_builder.set_operation_id(operation.operation_id);
195
196 let signature_data = operation.frame.inner().inner().multihash_bytes();
197 op_header_builder.set_operation_signature(signature_data);
198 }
199 }
200
201 if !self.operations.is_empty() {
202 let mut operations_builder = range_msg_builder
203 .reborrow()
204 .init_operations_frames(self.operations.len() as u32);
205 for (i, operation) in self.operations.iter().enumerate() {
206 operations_builder.set(i as u32, operation.frame.whole_data());
207 }
208 }
209
210 match (self.hash, self.hasher.as_mut()) {
211 (Some(hash), _) => {
212 range_msg_builder.set_operations_hash(hash.to_bytes().as_ref());
213 }
214 (_, Some(hasher)) => {
215 range_msg_builder.set_operations_hash(hasher.to_multihash().to_bytes().as_ref());
216 }
217 _ => {}
218 }
219 }
220}