exocore_chain/engine/pending_sync/
sync_range.rs

1use std::ops::Bound;
2
3use exocore_core::{
4    framing::FrameReader,
5    sec::hash::{Multihash, MultihashDigestExt, Sha3_256},
6};
7use exocore_protos::generated::data_transport_capnp::pending_sync_range;
8
9use super::{OperationDetailsLevel, PendingSyncConfig};
10use crate::{operation::OperationId, pending::StoredOperation};
11
12/// Collection of SyncRangeBuilder, taking into account maximum operations we
13/// want per range.
14pub struct SyncRangesBuilder {
15    config: PendingSyncConfig,
16    pub ranges: Vec<SyncRangeBuilder>,
17}
18
19impl SyncRangesBuilder {
20    pub fn new(config: PendingSyncConfig) -> SyncRangesBuilder {
21        SyncRangesBuilder {
22            config,
23            ranges: Vec::new(),
24        }
25    }
26
27    /// Pushes the given operation to the latest range, or to a new range if the
28    /// latest is full.
29    pub(super) fn push_operation(
30        &mut self,
31        operation: StoredOperation,
32        details: OperationDetailsLevel,
33    ) {
34        if self.ranges.is_empty() {
35            self.push_new_range(Bound::Unbounded);
36        } else {
37            let last_range_size = self.ranges.last().map_or(0, |r| r.operations_count);
38            if last_range_size > self.config.max_operations_per_range {
39                let last_range_to = self.last_range_to_bound().expect("Should had a last range");
40
41                // converted included into excluded for starting bound of next range since the
42                // item is in current range, not next one
43                if let Bound::Included(to) = last_range_to {
44                    self.push_new_range(Bound::Excluded(to));
45                } else {
46                    panic!("Expected current range end bound to be included");
47                }
48            }
49        }
50
51        let last_range = self
52            .ranges
53            .last_mut()
54            .expect("Ranges should have had at least one range");
55        last_range.push_operation(operation, details);
56    }
57
58    pub fn push_new_range(&mut self, from_bound: Bound<OperationId>) {
59        self.ranges
60            .push(SyncRangeBuilder::new(from_bound, Bound::Unbounded));
61    }
62
63    pub fn push_range(&mut self, sync_range: SyncRangeBuilder) {
64        self.ranges.push(sync_range);
65    }
66
67    pub fn set_last_range_to_bound(&mut self, to_bound: Bound<OperationId>) {
68        if let Some(range) = self.ranges.last_mut() {
69            range.to_operation = to_bound;
70        }
71    }
72
73    fn last_range_to_bound(&self) -> Option<Bound<OperationId>> {
74        self.ranges.last().map(|r| r.to_operation)
75    }
76}
77
78/// Builder for pending_sync_range messages. A pending sync range represents a
79/// range in the Pending Store to be synchronized against a remote node's own
80/// store.
81///
82/// It can describe the operations in 3 ways:
83///  * High level metadata (hash + count)
84///  * Operations headers
85///  * Operations full data
86pub struct SyncRangeBuilder {
87    pub from_operation: Bound<OperationId>,
88    pub to_operation: Bound<OperationId>,
89
90    pub operations: Vec<StoredOperation>,
91    pub operations_headers: Vec<StoredOperation>,
92    pub operations_count: u32,
93
94    pub hasher: Option<Sha3_256>,
95    pub hash: Option<Multihash<32>>,
96}
97
98impl SyncRangeBuilder {
99    fn new(
100        from_operation: Bound<OperationId>,
101        to_operation: Bound<OperationId>,
102    ) -> SyncRangeBuilder {
103        SyncRangeBuilder {
104            from_operation,
105            to_operation,
106            operations: Vec::new(),
107            operations_headers: Vec::new(),
108            operations_count: 0,
109            hasher: Some(Sha3_256::default()),
110            hash: None,
111        }
112    }
113
114    pub(crate) fn new_hashed(
115        operations_range: (Bound<OperationId>, Bound<OperationId>),
116        operations_hash: Multihash<32>,
117        operations_count: u32,
118    ) -> SyncRangeBuilder {
119        SyncRangeBuilder {
120            from_operation: operations_range.0,
121            to_operation: operations_range.1,
122            operations: Vec::new(),
123            operations_headers: Vec::new(),
124            operations_count,
125            hasher: None,
126            hash: Some(operations_hash),
127        }
128    }
129
130    fn push_operation(&mut self, operation: StoredOperation, details: OperationDetailsLevel) {
131        self.to_operation = Bound::Included(operation.operation_id);
132        self.operations_count += 1;
133
134        if let Some(hasher) = self.hasher.as_mut() {
135            hasher.input_signed_frame(operation.frame.inner().inner())
136        }
137
138        match details {
139            OperationDetailsLevel::Full => {
140                self.operations.push(operation);
141            }
142            OperationDetailsLevel::Header => {
143                self.operations_headers.push(operation);
144            }
145            OperationDetailsLevel::None => {
146                // Only included in hash
147            }
148        }
149    }
150
151    pub(crate) fn write_into_sync_range_builder(
152        mut self,
153        range_msg_builder: &mut pending_sync_range::Builder,
154    ) {
155        match self.from_operation {
156            Bound::Included(bound) => {
157                range_msg_builder.set_from_included(true);
158                range_msg_builder.set_from_operation(bound);
159            }
160            Bound::Excluded(bound) => {
161                range_msg_builder.set_from_included(false);
162                range_msg_builder.set_from_operation(bound);
163            }
164            Bound::Unbounded => {
165                range_msg_builder.set_from_included(false);
166                range_msg_builder.set_from_operation(0);
167            }
168        }
169
170        match self.to_operation {
171            Bound::Included(bound) => {
172                range_msg_builder.set_to_included(true);
173                range_msg_builder.set_to_operation(bound);
174            }
175            Bound::Excluded(bound) => {
176                range_msg_builder.set_to_included(false);
177                range_msg_builder.set_to_operation(bound);
178            }
179            Bound::Unbounded => {
180                range_msg_builder.set_to_included(false);
181                range_msg_builder.set_to_operation(0);
182            }
183        }
184
185        range_msg_builder.set_operations_count(self.operations_count);
186
187        if !self.operations_headers.is_empty() {
188            let mut operations_headers_builder = range_msg_builder
189                .reborrow()
190                .init_operations_headers(self.operations_headers.len() as u32);
191            for (i, operation) in self.operations_headers.iter().enumerate() {
192                let mut op_header_builder = operations_headers_builder.reborrow().get(i as u32);
193                op_header_builder.set_group_id(operation.group_id);
194                op_header_builder.set_operation_id(operation.operation_id);
195
196                let signature_data = operation.frame.inner().inner().multihash_bytes();
197                op_header_builder.set_operation_signature(signature_data);
198            }
199        }
200
201        if !self.operations.is_empty() {
202            let mut operations_builder = range_msg_builder
203                .reborrow()
204                .init_operations_frames(self.operations.len() as u32);
205            for (i, operation) in self.operations.iter().enumerate() {
206                operations_builder.set(i as u32, operation.frame.whole_data());
207            }
208        }
209
210        match (self.hash, self.hasher.as_mut()) {
211            (Some(hash), _) => {
212                range_msg_builder.set_operations_hash(hash.to_bytes().as_ref());
213            }
214            (_, Some(hasher)) => {
215                range_msg_builder.set_operations_hash(hasher.to_multihash().to_bytes().as_ref());
216            }
217            _ => {}
218        }
219    }
220}