exocore_chain/engine/
mod.rs

1use std::sync::{Arc, RwLock, Weak};
2
3pub use chain_sync::ChainSyncConfig;
4pub use commit_manager::CommitManagerConfig;
5pub use config::EngineConfig;
6pub use error::EngineError;
7use exocore_core::{
8    cell::Cell,
9    framing::{FrameReader, TypedCapnpFrame},
10    futures::{interval, spawn_blocking},
11    time::Clock,
12    utils::handle_set::HandleSet,
13};
14use exocore_protos::generated::{
15    data_transport_capnp::{chain_sync_request, chain_sync_response, pending_sync_request},
16    MessageType,
17};
18use exocore_transport::{InEvent, InMessage, OutEvent, TransportServiceHandle};
19use futures::{channel::mpsc, future::FutureExt, SinkExt, StreamExt};
20pub use handle::{EngineHandle, EngineOperation, EngineOperationStatus, Event};
21pub use pending_sync::PendingSyncConfig;
22pub use request_tracker::RequestTrackerConfig;
23pub use sync_context::{SyncContext, SyncContextMessage, SyncState};
24
25use crate::{chain, operation, operation::NewOperation, pending};
26
27pub(super) mod chain_sync;
28mod commit_manager;
29mod config;
30mod error;
31mod handle;
32mod pending_sync;
33mod request_tracker;
34mod sync_context;
35#[cfg(test)]
36pub(crate) mod testing;
37
38/// The chain engine manages storage and replication of data among the nodes of
39/// the cell.
40///
41/// It contains 2 stores:
42///   * Pending store: temporary store in which operations are stored until they
43///     get committed to chain
44///   * Chain store: persistent store using a block-chain like data structure
45pub struct Engine<T, CS, PS>
46where
47    T: TransportServiceHandle,
48    CS: chain::ChainStore,
49    PS: pending::PendingStore,
50{
51    config: EngineConfig,
52    transport: T,
53    inner: Arc<RwLock<Inner<CS, PS>>>,
54    handle_set: HandleSet,
55}
56
57impl<T, CS, PS> Engine<T, CS, PS>
58where
59    T: TransportServiceHandle,
60    CS: chain::ChainStore,
61    PS: pending::PendingStore,
62{
63    pub fn new(
64        config: EngineConfig,
65        clock: Clock,
66        transport: T,
67        chain_store: CS,
68        pending_store: PS,
69        cell: Cell,
70    ) -> Engine<T, CS, PS> {
71        let pending_synchronizer = pending_sync::PendingSynchronizer::new(
72            config.pending_sync_config,
73            cell.clone(),
74            clock.clone(),
75        );
76        let chain_synchronizer = chain_sync::ChainSynchronizer::new(
77            config.chain_sync_config.clone(),
78            cell.clone(),
79            clock.clone(),
80        );
81        let commit_manager = commit_manager::CommitManager::new(
82            config.commit_manager_config,
83            cell.clone(),
84            clock.clone(),
85        );
86
87        let inner = Arc::new(RwLock::new(Inner {
88            config: config.clone(),
89            cell,
90            clock,
91            pending_store,
92            pending_synchronizer,
93            chain_store,
94            chain_synchronizer,
95            commit_manager,
96            events_stream_sender: Vec::new(),
97            transport_sender: None,
98            sync_state: SyncState::default(),
99        }));
100
101        Engine {
102            config,
103            inner,
104            transport,
105            handle_set: HandleSet::new(),
106        }
107    }
108
109    pub fn get_handle(&mut self) -> EngineHandle<CS, PS> {
110        EngineHandle::new(Arc::downgrade(&self.inner), self.handle_set.get_handle())
111    }
112
113    pub async fn run(mut self) -> Result<(), EngineError> {
114        let config = self.config;
115
116        let (transport_out_sender, mut transport_out_receiver) =
117            mpsc::channel(config.to_transport_channel_size);
118        let mut transport_out_sink = self.transport.get_sink();
119        let outgoing_transport_handler = async move {
120            while let Some(event) = transport_out_receiver.next().await {
121                if let Err(err) = transport_out_sink.send(event).await {
122                    error!("Error sending to transport sink: {}", err);
123                }
124            }
125        };
126
127        let mut transport_in_stream = self.transport.get_stream();
128        let weak_inner = Arc::downgrade(&self.inner);
129        let incoming_transport_handler = async move {
130            while let Some(event) = transport_in_stream.next().await {
131                let result = Self::handle_incoming_event(weak_inner.clone(), event).await;
132                if let Err(err) = result {
133                    error!("Error handling incoming message: {}", err);
134                    if err.is_fatal() {
135                        return;
136                    }
137                }
138            }
139        };
140
141        let weak_inner = Arc::downgrade(&self.inner);
142        let management_timer = async move {
143            let mut interval = interval(config.manager_timer_interval);
144            loop {
145                interval.tick().await;
146                let result = Self::handle_management_timer_tick(weak_inner.clone()).await;
147                if let Err(err) = result {
148                    error!("Error in management timer: {}", err);
149                    if err.is_fatal() {
150                        return;
151                    }
152                }
153            }
154        };
155
156        {
157            let mut unlocked_inner = self.inner.write()?;
158            unlocked_inner.transport_sender = Some(transport_out_sender);
159
160            let chain_last_block = unlocked_inner.chain_store.get_last_block()?;
161            if chain_last_block.is_none() {
162                warn!("{}: Chain has not been initialized (no genesis block). May not be able to start if no other nodes are found.",
163                      unlocked_inner.cell,
164                )
165            }
166
167            unlocked_inner.dispatch_event(&Event::Started);
168        }
169
170        info!("Engine started");
171        futures::select! {
172            _ = outgoing_transport_handler.fuse() => {},
173            _ = incoming_transport_handler.fuse() => {},
174            _ = management_timer.fuse() => {},
175            _ = self.handle_set.on_handles_dropped().fuse() => {},
176            _ = self.transport.fuse() => {},
177        }
178        info!("Engine done");
179
180        Ok(())
181    }
182
183    async fn handle_incoming_event(
184        weak_inner: Weak<RwLock<Inner<CS, PS>>>,
185        event: InEvent,
186    ) -> Result<(), EngineError> {
187        match event {
188            InEvent::Message(msg) => Self::handle_incoming_message(weak_inner, msg).await,
189            InEvent::NodeStatus(_, _) => {
190                // unhandled for now, but could be used by synchronizers
191                Ok(())
192            }
193        }
194    }
195
196    async fn handle_incoming_message(
197        weak_inner: Weak<RwLock<Inner<CS, PS>>>,
198        message: InMessage,
199    ) -> Result<(), EngineError> {
200        let join_result = spawn_blocking(move || {
201            let locked_inner = weak_inner.upgrade().ok_or(EngineError::InnerUpgrade)?;
202            let mut inner = locked_inner.write()?;
203
204            debug!(
205                "{}: Got message of type {} from node {}",
206                inner.cell, message.typ, message.source,
207            );
208
209            match message.typ {
210                <pending_sync_request::Owned as MessageType>::MESSAGE_TYPE => {
211                    let sync_request = message.get_data_as_framed_message()?;
212                    inner.handle_incoming_pending_sync_request(&message, sync_request)?;
213                }
214                <chain_sync_request::Owned as MessageType>::MESSAGE_TYPE => {
215                    let sync_request = message.get_data_as_framed_message()?;
216                    inner.handle_incoming_chain_sync_request(&message, sync_request)?;
217                }
218                <chain_sync_response::Owned as MessageType>::MESSAGE_TYPE => {
219                    let sync_response = message.get_data_as_framed_message()?;
220                    inner.handle_incoming_chain_sync_response(&message, sync_response)?;
221                }
222                msg_type => {
223                    return Err(anyhow!(
224                        "Got an unknown message type: message_type={} service_type={:?}",
225                        msg_type,
226                        message.service_type,
227                    )
228                    .into());
229                }
230            }
231
232            Ok(())
233        })
234        .await;
235
236        match join_result {
237            Ok(res) => res,
238            Err(err) => Err(EngineError::Fatal(anyhow!(
239                "Error joining blocking spawn: {}",
240                err
241            ))),
242        }
243    }
244
245    async fn handle_management_timer_tick(
246        weak_inner: Weak<RwLock<Inner<CS, PS>>>,
247    ) -> Result<(), EngineError> {
248        let join_result = spawn_blocking(move || {
249            let locked_inner = weak_inner.upgrade().ok_or(EngineError::InnerUpgrade)?;
250            let mut inner = locked_inner.write()?;
251            inner.tick_synchronizers()?;
252
253            Ok(())
254        })
255        .await;
256
257        match join_result {
258            Ok(res) => res,
259            Err(err) => Err(EngineError::Fatal(anyhow!(
260                "Error joining blocking spawn: {}",
261                err
262            ))),
263        }
264    }
265}
266
267pub(crate) struct Inner<CS, PS>
268where
269    CS: chain::ChainStore,
270    PS: pending::PendingStore,
271{
272    config: EngineConfig,
273    cell: Cell,
274    clock: Clock,
275    pending_store: PS,
276    pending_synchronizer: pending_sync::PendingSynchronizer<PS>,
277    chain_store: CS,
278    chain_synchronizer: chain_sync::ChainSynchronizer<CS>,
279    commit_manager: commit_manager::CommitManager<PS, CS>,
280    events_stream_sender: Vec<(usize, bool, mpsc::Sender<Event>)>,
281    transport_sender: Option<mpsc::Sender<OutEvent>>,
282    sync_state: SyncState,
283}
284
285impl<CS, PS> Inner<CS, PS>
286where
287    CS: chain::ChainStore,
288    PS: pending::PendingStore,
289{
290    fn handle_new_operation(&mut self, operation: NewOperation) -> Result<(), EngineError> {
291        let mut sync_context = SyncContext::new(self.sync_state);
292        self.pending_synchronizer.handle_new_operation(
293            &mut sync_context,
294            &mut self.pending_store,
295            operation,
296        )?;
297        self.sync_state = sync_context.sync_state;
298
299        // to prevent sending operations that may have already been committed, we don't
300        // propagate pending store changes unless the chain is synchronized
301        if self.chain_is_synchronized() {
302            self.send_messages_from_sync_context(&mut sync_context)?;
303        }
304
305        self.dispatch_events_from_sync_context(&sync_context);
306
307        Ok(())
308    }
309
310    fn handle_incoming_pending_sync_request<R: FrameReader>(
311        &mut self,
312        message: &InMessage,
313        request: TypedCapnpFrame<R, pending_sync_request::Owned>,
314    ) -> Result<(), EngineError> {
315        // to prevent sending operations that may have already been committed, we don't
316        // accept any pending sync requests until the chain is synchronized
317        if !self.chain_is_synchronized() {
318            return Ok(());
319        }
320
321        let mut sync_context = SyncContext::new(self.sync_state);
322        self.pending_synchronizer.handle_incoming_sync_request(
323            &message.source,
324            &mut sync_context,
325            &mut self.pending_store,
326            request,
327        )?;
328        self.sync_state = sync_context.sync_state;
329
330        self.send_messages_from_sync_context(&mut sync_context)?;
331        self.dispatch_events_from_sync_context(&sync_context);
332
333        Ok(())
334    }
335
336    fn handle_incoming_chain_sync_request<F: FrameReader>(
337        &mut self,
338        message: &InMessage,
339        request: TypedCapnpFrame<F, chain_sync_request::Owned>,
340    ) -> Result<(), EngineError> {
341        let mut sync_context = SyncContext::new(self.sync_state);
342        self.chain_synchronizer.handle_sync_request(
343            &mut sync_context,
344            &message.source,
345            &mut self.chain_store,
346            request,
347        )?;
348        self.sync_state = sync_context.sync_state;
349
350        self.send_messages_from_sync_context(&mut sync_context)?;
351        self.dispatch_events_from_sync_context(&sync_context);
352
353        Ok(())
354    }
355
356    fn handle_incoming_chain_sync_response<F: FrameReader>(
357        &mut self,
358        message: &InMessage,
359        response: TypedCapnpFrame<F, chain_sync_response::Owned>,
360    ) -> Result<(), EngineError> {
361        let mut sync_context = SyncContext::new(self.sync_state);
362        self.chain_synchronizer.handle_sync_response(
363            &mut sync_context,
364            &message.source,
365            &mut self.chain_store,
366            response,
367        )?;
368        self.sync_state = sync_context.sync_state;
369
370        self.send_messages_from_sync_context(&mut sync_context)?;
371        self.dispatch_events_from_sync_context(&sync_context);
372
373        Ok(())
374    }
375
376    fn tick_synchronizers(&mut self) -> Result<(), EngineError> {
377        let mut sync_context = SyncContext::new(self.sync_state);
378
379        self.chain_synchronizer
380            .tick(&mut sync_context, &self.chain_store)?;
381
382        // to prevent synchronizing operations that may have been added to the chain, we
383        // should only start doing commit management & pending synchronization once the
384        // chain is synchronized
385
386        if self.chain_is_synchronized() {
387            // commit manager should always be ticked before pending synchronizer so that it
388            // may remove operations that don't need to be synchronized anymore
389            // (ex: been committed)
390            match self.commit_manager.tick(
391                &mut sync_context,
392                &mut self.pending_synchronizer,
393                &mut self.pending_store,
394                &mut self.chain_store,
395            ) {
396                Ok(_) => {}
397                Err(EngineError::OutOfSync) => {
398                    warn!("Commit manager detected is out of sync with cluster. Resetting chain synchronizer.");
399                    self.chain_synchronizer.reset_state();
400                }
401                Err(err) => return Err(err),
402            }
403        }
404
405        // check if chain is still synchronized after commit manager since it may have
406        // detected that we are out of sync
407        if self.chain_is_synchronized() {
408            self.pending_synchronizer
409                .tick(&mut sync_context, &self.pending_store)?;
410        }
411
412        self.sync_state = sync_context.sync_state;
413
414        self.send_messages_from_sync_context(&mut sync_context)?;
415        self.dispatch_events_from_sync_context(&sync_context);
416
417        Ok(())
418    }
419
420    fn send_messages_from_sync_context(
421        &mut self,
422        sync_context: &mut SyncContext,
423    ) -> Result<(), EngineError> {
424        if sync_context.messages.is_empty() {
425            return Ok(());
426        }
427
428        // swap out messages from the sync_context struct to consume them
429        let mut messages = Vec::new();
430        std::mem::swap(&mut sync_context.messages, &mut messages);
431
432        for message in messages {
433            let out_message = message.into_out_message(&self.cell)?;
434            let transport_sender = self.transport_sender.as_mut().unwrap();
435            if let Err(err) = transport_sender.try_send(OutEvent::Message(out_message)) {
436                error!(
437                    "Error sending message from sync context to transport: {}",
438                    err
439                );
440            }
441        }
442
443        Ok(())
444    }
445
446    fn get_new_events_stream(&mut self, handle_id: usize) -> mpsc::Receiver<Event> {
447        let channel_size = self.config.events_stream_buffer_size;
448        let (events_sender, events_receiver) = mpsc::channel(channel_size);
449        self.events_stream_sender
450            .push((handle_id, false, events_sender));
451
452        events_receiver
453    }
454
455    fn dispatch_events_from_sync_context(&mut self, sync_context: &SyncContext) {
456        for event in sync_context.events.iter() {
457            self.dispatch_event(event)
458        }
459    }
460
461    fn dispatch_event(&mut self, event: &Event) {
462        for (handle_id, discontinued, stream_sender) in self.events_stream_sender.iter_mut() {
463            // if we hit a full buffer at last send, the stream got a discontinuity and we
464            // need to advise consumer. we try to emit a discontinuity event,
465            // and if we succeed (buffer has space), we try to send the next event
466            if *discontinued {
467                if let Ok(()) = stream_sender.try_send(Event::StreamDiscontinuity) {
468                    *discontinued = false;
469                } else {
470                    continue;
471                }
472            }
473
474            match stream_sender.try_send(event.clone()) {
475                Ok(()) => {}
476                Err(ref err) if err.is_full() => {
477                    warn!(
478                        "Couldn't send event to handle {} because channel buffer is full. Marking as discontinued",
479                        handle_id
480                    );
481                    *discontinued = true;
482                }
483                Err(err) => {
484                    error!(
485                        "Couldn't send event to handle {} for a reason other than channel buffer full: {:}",
486                        handle_id, err
487                    );
488                }
489            }
490        }
491    }
492
493    fn chain_is_synchronized(&self) -> bool {
494        let chain_sync_status = self.chain_synchronizer.status();
495        chain_sync_status == chain_sync::Status::Synchronized
496    }
497
498    fn unregister_handle(&mut self, handle_id: usize) {
499        // remove all streams that this handle created
500        let mut previous_streams = Vec::new();
501        std::mem::swap(&mut self.events_stream_sender, &mut previous_streams);
502        for (one_handle_id, discontinued, sender) in previous_streams {
503            if one_handle_id != handle_id {
504                self.events_stream_sender
505                    .push((handle_id, discontinued, sender));
506            }
507        }
508    }
509}