1use std::sync::{Arc, RwLock, Weak};
2
3pub use chain_sync::ChainSyncConfig;
4pub use commit_manager::CommitManagerConfig;
5pub use config::EngineConfig;
6pub use error::EngineError;
7use exocore_core::{
8 cell::Cell,
9 framing::{FrameReader, TypedCapnpFrame},
10 futures::{interval, spawn_blocking},
11 time::Clock,
12 utils::handle_set::HandleSet,
13};
14use exocore_protos::generated::{
15 data_transport_capnp::{chain_sync_request, chain_sync_response, pending_sync_request},
16 MessageType,
17};
18use exocore_transport::{InEvent, InMessage, OutEvent, TransportServiceHandle};
19use futures::{channel::mpsc, future::FutureExt, SinkExt, StreamExt};
20pub use handle::{EngineHandle, EngineOperation, EngineOperationStatus, Event};
21pub use pending_sync::PendingSyncConfig;
22pub use request_tracker::RequestTrackerConfig;
23pub use sync_context::{SyncContext, SyncContextMessage, SyncState};
24
25use crate::{chain, operation, operation::NewOperation, pending};
26
27pub(super) mod chain_sync;
28mod commit_manager;
29mod config;
30mod error;
31mod handle;
32mod pending_sync;
33mod request_tracker;
34mod sync_context;
35#[cfg(test)]
36pub(crate) mod testing;
37
38pub struct Engine<T, CS, PS>
46where
47 T: TransportServiceHandle,
48 CS: chain::ChainStore,
49 PS: pending::PendingStore,
50{
51 config: EngineConfig,
52 transport: T,
53 inner: Arc<RwLock<Inner<CS, PS>>>,
54 handle_set: HandleSet,
55}
56
57impl<T, CS, PS> Engine<T, CS, PS>
58where
59 T: TransportServiceHandle,
60 CS: chain::ChainStore,
61 PS: pending::PendingStore,
62{
63 pub fn new(
64 config: EngineConfig,
65 clock: Clock,
66 transport: T,
67 chain_store: CS,
68 pending_store: PS,
69 cell: Cell,
70 ) -> Engine<T, CS, PS> {
71 let pending_synchronizer = pending_sync::PendingSynchronizer::new(
72 config.pending_sync_config,
73 cell.clone(),
74 clock.clone(),
75 );
76 let chain_synchronizer = chain_sync::ChainSynchronizer::new(
77 config.chain_sync_config.clone(),
78 cell.clone(),
79 clock.clone(),
80 );
81 let commit_manager = commit_manager::CommitManager::new(
82 config.commit_manager_config,
83 cell.clone(),
84 clock.clone(),
85 );
86
87 let inner = Arc::new(RwLock::new(Inner {
88 config: config.clone(),
89 cell,
90 clock,
91 pending_store,
92 pending_synchronizer,
93 chain_store,
94 chain_synchronizer,
95 commit_manager,
96 events_stream_sender: Vec::new(),
97 transport_sender: None,
98 sync_state: SyncState::default(),
99 }));
100
101 Engine {
102 config,
103 inner,
104 transport,
105 handle_set: HandleSet::new(),
106 }
107 }
108
109 pub fn get_handle(&mut self) -> EngineHandle<CS, PS> {
110 EngineHandle::new(Arc::downgrade(&self.inner), self.handle_set.get_handle())
111 }
112
113 pub async fn run(mut self) -> Result<(), EngineError> {
114 let config = self.config;
115
116 let (transport_out_sender, mut transport_out_receiver) =
117 mpsc::channel(config.to_transport_channel_size);
118 let mut transport_out_sink = self.transport.get_sink();
119 let outgoing_transport_handler = async move {
120 while let Some(event) = transport_out_receiver.next().await {
121 if let Err(err) = transport_out_sink.send(event).await {
122 error!("Error sending to transport sink: {}", err);
123 }
124 }
125 };
126
127 let mut transport_in_stream = self.transport.get_stream();
128 let weak_inner = Arc::downgrade(&self.inner);
129 let incoming_transport_handler = async move {
130 while let Some(event) = transport_in_stream.next().await {
131 let result = Self::handle_incoming_event(weak_inner.clone(), event).await;
132 if let Err(err) = result {
133 error!("Error handling incoming message: {}", err);
134 if err.is_fatal() {
135 return;
136 }
137 }
138 }
139 };
140
141 let weak_inner = Arc::downgrade(&self.inner);
142 let management_timer = async move {
143 let mut interval = interval(config.manager_timer_interval);
144 loop {
145 interval.tick().await;
146 let result = Self::handle_management_timer_tick(weak_inner.clone()).await;
147 if let Err(err) = result {
148 error!("Error in management timer: {}", err);
149 if err.is_fatal() {
150 return;
151 }
152 }
153 }
154 };
155
156 {
157 let mut unlocked_inner = self.inner.write()?;
158 unlocked_inner.transport_sender = Some(transport_out_sender);
159
160 let chain_last_block = unlocked_inner.chain_store.get_last_block()?;
161 if chain_last_block.is_none() {
162 warn!("{}: Chain has not been initialized (no genesis block). May not be able to start if no other nodes are found.",
163 unlocked_inner.cell,
164 )
165 }
166
167 unlocked_inner.dispatch_event(&Event::Started);
168 }
169
170 info!("Engine started");
171 futures::select! {
172 _ = outgoing_transport_handler.fuse() => {},
173 _ = incoming_transport_handler.fuse() => {},
174 _ = management_timer.fuse() => {},
175 _ = self.handle_set.on_handles_dropped().fuse() => {},
176 _ = self.transport.fuse() => {},
177 }
178 info!("Engine done");
179
180 Ok(())
181 }
182
183 async fn handle_incoming_event(
184 weak_inner: Weak<RwLock<Inner<CS, PS>>>,
185 event: InEvent,
186 ) -> Result<(), EngineError> {
187 match event {
188 InEvent::Message(msg) => Self::handle_incoming_message(weak_inner, msg).await,
189 InEvent::NodeStatus(_, _) => {
190 Ok(())
192 }
193 }
194 }
195
196 async fn handle_incoming_message(
197 weak_inner: Weak<RwLock<Inner<CS, PS>>>,
198 message: InMessage,
199 ) -> Result<(), EngineError> {
200 let join_result = spawn_blocking(move || {
201 let locked_inner = weak_inner.upgrade().ok_or(EngineError::InnerUpgrade)?;
202 let mut inner = locked_inner.write()?;
203
204 debug!(
205 "{}: Got message of type {} from node {}",
206 inner.cell, message.typ, message.source,
207 );
208
209 match message.typ {
210 <pending_sync_request::Owned as MessageType>::MESSAGE_TYPE => {
211 let sync_request = message.get_data_as_framed_message()?;
212 inner.handle_incoming_pending_sync_request(&message, sync_request)?;
213 }
214 <chain_sync_request::Owned as MessageType>::MESSAGE_TYPE => {
215 let sync_request = message.get_data_as_framed_message()?;
216 inner.handle_incoming_chain_sync_request(&message, sync_request)?;
217 }
218 <chain_sync_response::Owned as MessageType>::MESSAGE_TYPE => {
219 let sync_response = message.get_data_as_framed_message()?;
220 inner.handle_incoming_chain_sync_response(&message, sync_response)?;
221 }
222 msg_type => {
223 return Err(anyhow!(
224 "Got an unknown message type: message_type={} service_type={:?}",
225 msg_type,
226 message.service_type,
227 )
228 .into());
229 }
230 }
231
232 Ok(())
233 })
234 .await;
235
236 match join_result {
237 Ok(res) => res,
238 Err(err) => Err(EngineError::Fatal(anyhow!(
239 "Error joining blocking spawn: {}",
240 err
241 ))),
242 }
243 }
244
245 async fn handle_management_timer_tick(
246 weak_inner: Weak<RwLock<Inner<CS, PS>>>,
247 ) -> Result<(), EngineError> {
248 let join_result = spawn_blocking(move || {
249 let locked_inner = weak_inner.upgrade().ok_or(EngineError::InnerUpgrade)?;
250 let mut inner = locked_inner.write()?;
251 inner.tick_synchronizers()?;
252
253 Ok(())
254 })
255 .await;
256
257 match join_result {
258 Ok(res) => res,
259 Err(err) => Err(EngineError::Fatal(anyhow!(
260 "Error joining blocking spawn: {}",
261 err
262 ))),
263 }
264 }
265}
266
267pub(crate) struct Inner<CS, PS>
268where
269 CS: chain::ChainStore,
270 PS: pending::PendingStore,
271{
272 config: EngineConfig,
273 cell: Cell,
274 clock: Clock,
275 pending_store: PS,
276 pending_synchronizer: pending_sync::PendingSynchronizer<PS>,
277 chain_store: CS,
278 chain_synchronizer: chain_sync::ChainSynchronizer<CS>,
279 commit_manager: commit_manager::CommitManager<PS, CS>,
280 events_stream_sender: Vec<(usize, bool, mpsc::Sender<Event>)>,
281 transport_sender: Option<mpsc::Sender<OutEvent>>,
282 sync_state: SyncState,
283}
284
285impl<CS, PS> Inner<CS, PS>
286where
287 CS: chain::ChainStore,
288 PS: pending::PendingStore,
289{
290 fn handle_new_operation(&mut self, operation: NewOperation) -> Result<(), EngineError> {
291 let mut sync_context = SyncContext::new(self.sync_state);
292 self.pending_synchronizer.handle_new_operation(
293 &mut sync_context,
294 &mut self.pending_store,
295 operation,
296 )?;
297 self.sync_state = sync_context.sync_state;
298
299 if self.chain_is_synchronized() {
302 self.send_messages_from_sync_context(&mut sync_context)?;
303 }
304
305 self.dispatch_events_from_sync_context(&sync_context);
306
307 Ok(())
308 }
309
310 fn handle_incoming_pending_sync_request<R: FrameReader>(
311 &mut self,
312 message: &InMessage,
313 request: TypedCapnpFrame<R, pending_sync_request::Owned>,
314 ) -> Result<(), EngineError> {
315 if !self.chain_is_synchronized() {
318 return Ok(());
319 }
320
321 let mut sync_context = SyncContext::new(self.sync_state);
322 self.pending_synchronizer.handle_incoming_sync_request(
323 &message.source,
324 &mut sync_context,
325 &mut self.pending_store,
326 request,
327 )?;
328 self.sync_state = sync_context.sync_state;
329
330 self.send_messages_from_sync_context(&mut sync_context)?;
331 self.dispatch_events_from_sync_context(&sync_context);
332
333 Ok(())
334 }
335
336 fn handle_incoming_chain_sync_request<F: FrameReader>(
337 &mut self,
338 message: &InMessage,
339 request: TypedCapnpFrame<F, chain_sync_request::Owned>,
340 ) -> Result<(), EngineError> {
341 let mut sync_context = SyncContext::new(self.sync_state);
342 self.chain_synchronizer.handle_sync_request(
343 &mut sync_context,
344 &message.source,
345 &mut self.chain_store,
346 request,
347 )?;
348 self.sync_state = sync_context.sync_state;
349
350 self.send_messages_from_sync_context(&mut sync_context)?;
351 self.dispatch_events_from_sync_context(&sync_context);
352
353 Ok(())
354 }
355
356 fn handle_incoming_chain_sync_response<F: FrameReader>(
357 &mut self,
358 message: &InMessage,
359 response: TypedCapnpFrame<F, chain_sync_response::Owned>,
360 ) -> Result<(), EngineError> {
361 let mut sync_context = SyncContext::new(self.sync_state);
362 self.chain_synchronizer.handle_sync_response(
363 &mut sync_context,
364 &message.source,
365 &mut self.chain_store,
366 response,
367 )?;
368 self.sync_state = sync_context.sync_state;
369
370 self.send_messages_from_sync_context(&mut sync_context)?;
371 self.dispatch_events_from_sync_context(&sync_context);
372
373 Ok(())
374 }
375
376 fn tick_synchronizers(&mut self) -> Result<(), EngineError> {
377 let mut sync_context = SyncContext::new(self.sync_state);
378
379 self.chain_synchronizer
380 .tick(&mut sync_context, &self.chain_store)?;
381
382 if self.chain_is_synchronized() {
387 match self.commit_manager.tick(
391 &mut sync_context,
392 &mut self.pending_synchronizer,
393 &mut self.pending_store,
394 &mut self.chain_store,
395 ) {
396 Ok(_) => {}
397 Err(EngineError::OutOfSync) => {
398 warn!("Commit manager detected is out of sync with cluster. Resetting chain synchronizer.");
399 self.chain_synchronizer.reset_state();
400 }
401 Err(err) => return Err(err),
402 }
403 }
404
405 if self.chain_is_synchronized() {
408 self.pending_synchronizer
409 .tick(&mut sync_context, &self.pending_store)?;
410 }
411
412 self.sync_state = sync_context.sync_state;
413
414 self.send_messages_from_sync_context(&mut sync_context)?;
415 self.dispatch_events_from_sync_context(&sync_context);
416
417 Ok(())
418 }
419
420 fn send_messages_from_sync_context(
421 &mut self,
422 sync_context: &mut SyncContext,
423 ) -> Result<(), EngineError> {
424 if sync_context.messages.is_empty() {
425 return Ok(());
426 }
427
428 let mut messages = Vec::new();
430 std::mem::swap(&mut sync_context.messages, &mut messages);
431
432 for message in messages {
433 let out_message = message.into_out_message(&self.cell)?;
434 let transport_sender = self.transport_sender.as_mut().unwrap();
435 if let Err(err) = transport_sender.try_send(OutEvent::Message(out_message)) {
436 error!(
437 "Error sending message from sync context to transport: {}",
438 err
439 );
440 }
441 }
442
443 Ok(())
444 }
445
446 fn get_new_events_stream(&mut self, handle_id: usize) -> mpsc::Receiver<Event> {
447 let channel_size = self.config.events_stream_buffer_size;
448 let (events_sender, events_receiver) = mpsc::channel(channel_size);
449 self.events_stream_sender
450 .push((handle_id, false, events_sender));
451
452 events_receiver
453 }
454
455 fn dispatch_events_from_sync_context(&mut self, sync_context: &SyncContext) {
456 for event in sync_context.events.iter() {
457 self.dispatch_event(event)
458 }
459 }
460
461 fn dispatch_event(&mut self, event: &Event) {
462 for (handle_id, discontinued, stream_sender) in self.events_stream_sender.iter_mut() {
463 if *discontinued {
467 if let Ok(()) = stream_sender.try_send(Event::StreamDiscontinuity) {
468 *discontinued = false;
469 } else {
470 continue;
471 }
472 }
473
474 match stream_sender.try_send(event.clone()) {
475 Ok(()) => {}
476 Err(ref err) if err.is_full() => {
477 warn!(
478 "Couldn't send event to handle {} because channel buffer is full. Marking as discontinued",
479 handle_id
480 );
481 *discontinued = true;
482 }
483 Err(err) => {
484 error!(
485 "Couldn't send event to handle {} for a reason other than channel buffer full: {:}",
486 handle_id, err
487 );
488 }
489 }
490 }
491 }
492
493 fn chain_is_synchronized(&self) -> bool {
494 let chain_sync_status = self.chain_synchronizer.status();
495 chain_sync_status == chain_sync::Status::Synchronized
496 }
497
498 fn unregister_handle(&mut self, handle_id: usize) {
499 let mut previous_streams = Vec::new();
501 std::mem::swap(&mut self.events_stream_sender, &mut previous_streams);
502 for (one_handle_id, discontinued, sender) in previous_streams {
503 if one_handle_id != handle_id {
504 self.events_stream_sender
505 .push((handle_id, discontinued, sender));
506 }
507 }
508 }
509}