exocore_chain/engine/
sync_context.rs

1use exocore_core::{
2    cell::{Cell, CellNodes, NodeId},
3    framing::CapnpFrameBuilder,
4};
5use exocore_protos::generated::data_transport_capnp::{
6    chain_sync_request, chain_sync_response, pending_sync_request,
7};
8use exocore_transport::{OutMessage, ServiceType};
9
10use super::{EngineError, Event};
11use crate::block::{BlockHeight, BlockOffset};
12
13/// Synchronization context used by `chain_sync`, `pending_sync` and
14/// `commit_manager` to dispatch messages to other nodes, and dispatch events to
15/// be sent to engine handles.
16pub struct SyncContext {
17    pub events: Vec<Event>,
18    pub messages: Vec<SyncContextMessage>,
19    pub sync_state: SyncState,
20}
21
22impl SyncContext {
23    pub fn new(sync_state: SyncState) -> SyncContext {
24        SyncContext {
25            events: Vec::new(),
26            messages: Vec::new(),
27            sync_state,
28        }
29    }
30
31    pub fn push_pending_sync_request(
32        &mut self,
33        node_id: NodeId,
34        request_builder: CapnpFrameBuilder<pending_sync_request::Owned>,
35    ) {
36        self.messages.push(SyncContextMessage::PendingSyncRequest(
37            node_id,
38            request_builder,
39        ));
40    }
41
42    pub fn push_chain_sync_request(
43        &mut self,
44        node_id: NodeId,
45        request_builder: CapnpFrameBuilder<chain_sync_request::Owned>,
46    ) {
47        self.messages.push(SyncContextMessage::ChainSyncRequest(
48            node_id,
49            request_builder,
50        ));
51    }
52
53    pub fn push_chain_sync_response(
54        &mut self,
55        node_id: NodeId,
56        response_builder: CapnpFrameBuilder<chain_sync_response::Owned>,
57    ) {
58        self.messages.push(SyncContextMessage::ChainSyncResponse(
59            node_id,
60            response_builder,
61        ));
62    }
63
64    pub fn push_event(&mut self, event: Event) {
65        self.events.push(event);
66    }
67}
68
69pub enum SyncContextMessage {
70    PendingSyncRequest(NodeId, CapnpFrameBuilder<pending_sync_request::Owned>),
71    ChainSyncRequest(NodeId, CapnpFrameBuilder<chain_sync_request::Owned>),
72    ChainSyncResponse(NodeId, CapnpFrameBuilder<chain_sync_response::Owned>),
73}
74
75impl SyncContextMessage {
76    pub fn into_out_message(self, cell: &Cell) -> Result<OutMessage, EngineError> {
77        let cell_nodes = cell.nodes();
78        let dest_node = cell_nodes
79            .get(self.dest_node())
80            .map(|n| n.node().clone())
81            .ok_or_else(|| EngineError::NodeNotFound(self.dest_node().clone()))?;
82
83        let message = match self {
84            SyncContextMessage::PendingSyncRequest(_, request_builder) => {
85                OutMessage::from_framed_message(cell, ServiceType::Chain, request_builder)?
86                    .with_destination(dest_node)
87            }
88            SyncContextMessage::ChainSyncRequest(_, request_builder) => {
89                OutMessage::from_framed_message(cell, ServiceType::Chain, request_builder)?
90                    .with_destination(dest_node)
91            }
92            SyncContextMessage::ChainSyncResponse(_, response_builder) => {
93                OutMessage::from_framed_message(cell, ServiceType::Chain, response_builder)?
94                    .with_destination(dest_node)
95            }
96        };
97
98        Ok(message)
99    }
100
101    fn dest_node(&self) -> &NodeId {
102        match self {
103            SyncContextMessage::PendingSyncRequest(to_node, _) => to_node,
104            SyncContextMessage::ChainSyncRequest(to_node, _) => to_node,
105            SyncContextMessage::ChainSyncResponse(to_node, _) => to_node,
106        }
107    }
108}
109
110/// State of the synchronization, used to communicate information between the
111/// `ChainSynchronizer`, `CommitManager` and `PendingSynchronizer`.
112#[derive(Default, Clone, Copy)]
113pub struct SyncState {
114    /// Indicates what is the last block that got cleaned up from pending store,
115    /// and that is now only available from the chain. This is used by the
116    /// `PendingSynchronizer` to know which operations it should not include
117    /// anymore in its requests.
118    pub pending_last_cleanup_block: Option<(BlockOffset, BlockHeight)>,
119}