use std::ops::Bound;
use exocore_core::{
framing::FrameReader,
sec::hash::{Multihash, MultihashDigestExt, Sha3_256},
};
use exocore_protos::generated::data_transport_capnp::pending_sync_range;
use super::{OperationDetailsLevel, PendingSyncConfig};
use crate::{operation::OperationId, pending::StoredOperation};
pub struct SyncRangesBuilder {
config: PendingSyncConfig,
pub ranges: Vec<SyncRangeBuilder>,
}
impl SyncRangesBuilder {
pub fn new(config: PendingSyncConfig) -> SyncRangesBuilder {
SyncRangesBuilder {
config,
ranges: Vec::new(),
}
}
pub(super) fn push_operation(
&mut self,
operation: StoredOperation,
details: OperationDetailsLevel,
) {
if self.ranges.is_empty() {
self.push_new_range(Bound::Unbounded);
} else {
let last_range_size = self.ranges.last().map_or(0, |r| r.operations_count);
if last_range_size > self.config.max_operations_per_range {
let last_range_to = self.last_range_to_bound().expect("Should had a last range");
if let Bound::Included(to) = last_range_to {
self.push_new_range(Bound::Excluded(to));
} else {
panic!("Expected current range end bound to be included");
}
}
}
let last_range = self
.ranges
.last_mut()
.expect("Ranges should have had at least one range");
last_range.push_operation(operation, details);
}
pub fn push_new_range(&mut self, from_bound: Bound<OperationId>) {
self.ranges
.push(SyncRangeBuilder::new(from_bound, Bound::Unbounded));
}
pub fn push_range(&mut self, sync_range: SyncRangeBuilder) {
self.ranges.push(sync_range);
}
pub fn set_last_range_to_bound(&mut self, to_bound: Bound<OperationId>) {
if let Some(range) = self.ranges.last_mut() {
range.to_operation = to_bound;
}
}
fn last_range_to_bound(&self) -> Option<Bound<OperationId>> {
self.ranges.last().map(|r| r.to_operation)
}
}
pub struct SyncRangeBuilder {
pub from_operation: Bound<OperationId>,
pub to_operation: Bound<OperationId>,
pub operations: Vec<StoredOperation>,
pub operations_headers: Vec<StoredOperation>,
pub operations_count: u32,
pub hasher: Option<Sha3_256>,
pub hash: Option<Multihash>,
}
impl SyncRangeBuilder {
fn new(
from_operation: Bound<OperationId>,
to_operation: Bound<OperationId>,
) -> SyncRangeBuilder {
SyncRangeBuilder {
from_operation,
to_operation,
operations: Vec::new(),
operations_headers: Vec::new(),
operations_count: 0,
hasher: Some(Sha3_256::default()),
hash: None,
}
}
pub(crate) fn new_hashed(
operations_range: (Bound<OperationId>, Bound<OperationId>),
operations_hash: Multihash,
operations_count: u32,
) -> SyncRangeBuilder {
SyncRangeBuilder {
from_operation: operations_range.0,
to_operation: operations_range.1,
operations: Vec::new(),
operations_headers: Vec::new(),
operations_count,
hasher: None,
hash: Some(operations_hash),
}
}
fn push_operation(&mut self, operation: StoredOperation, details: OperationDetailsLevel) {
self.to_operation = Bound::Included(operation.operation_id);
self.operations_count += 1;
if let Some(hasher) = self.hasher.as_mut() {
hasher.input_signed_frame(operation.frame.inner().inner())
}
match details {
OperationDetailsLevel::Full => {
self.operations.push(operation);
}
OperationDetailsLevel::Header => {
self.operations_headers.push(operation);
}
OperationDetailsLevel::None => {
}
}
}
pub(crate) fn write_into_sync_range_builder(
self,
range_msg_builder: &mut pending_sync_range::Builder,
) {
match self.from_operation {
Bound::Included(bound) => {
range_msg_builder.set_from_included(true);
range_msg_builder.set_from_operation(bound);
}
Bound::Excluded(bound) => {
range_msg_builder.set_from_included(false);
range_msg_builder.set_from_operation(bound);
}
Bound::Unbounded => {
range_msg_builder.set_from_included(false);
range_msg_builder.set_from_operation(0);
}
}
match self.to_operation {
Bound::Included(bound) => {
range_msg_builder.set_to_included(true);
range_msg_builder.set_to_operation(bound);
}
Bound::Excluded(bound) => {
range_msg_builder.set_to_included(false);
range_msg_builder.set_to_operation(bound);
}
Bound::Unbounded => {
range_msg_builder.set_to_included(false);
range_msg_builder.set_to_operation(0);
}
}
range_msg_builder.set_operations_count(self.operations_count);
if !self.operations_headers.is_empty() {
let mut operations_headers_builder = range_msg_builder
.reborrow()
.init_operations_headers(self.operations_headers.len() as u32);
for (i, operation) in self.operations_headers.iter().enumerate() {
let mut op_header_builder = operations_headers_builder.reborrow().get(i as u32);
op_header_builder.set_group_id(operation.group_id);
op_header_builder.set_operation_id(operation.operation_id);
let signature_data = operation.frame.inner().inner().multihash_bytes();
op_header_builder.set_operation_signature(signature_data);
}
}
if !self.operations.is_empty() {
let mut operations_builder = range_msg_builder
.reborrow()
.init_operations_frames(self.operations.len() as u32);
for (i, operation) in self.operations.iter().enumerate() {
operations_builder.set(i as u32, operation.frame.whole_data());
}
}
match (self.hash, self.hasher) {
(Some(hash), _) => {
range_msg_builder.set_operations_hash(hash.to_bytes().as_ref());
}
(_, Some(hasher)) => {
range_msg_builder.set_operations_hash(hasher.to_multihash().to_bytes().as_ref());
}
_ => {}
}
}
}