exocore_chain/engine/
handle.rs

1use std::{
2    ops::RangeBounds,
3    sync::{Arc, RwLock, Weak},
4};
5
6use bytes::Bytes;
7use exocore_core::utils::handle_set::Handle;
8use exocore_protos::generated::data_chain_capnp::chain_operation;
9use futures::prelude::*;
10
11use super::{EngineError, Inner};
12use crate::{
13    block::{Block, BlockHeight, BlockOffset, DataBlock},
14    chain::{self, ChainData},
15    operation,
16    operation::{OperationBuilder, OperationId},
17    pending,
18    pending::CommitStatus,
19};
20
21/// Handle ot the Engine, allowing communication with the engine.
22/// The engine itself is owned by a future executor.
23pub struct EngineHandle<CS, PS>
24where
25    CS: chain::ChainStore,
26    PS: pending::PendingStore,
27{
28    inner: Weak<RwLock<Inner<CS, PS>>>,
29    handle: Handle,
30}
31
32impl<CS, PS> EngineHandle<CS, PS>
33where
34    CS: chain::ChainStore,
35    PS: pending::PendingStore,
36{
37    pub(crate) fn new(inner: Weak<RwLock<Inner<CS, PS>>>, handle: Handle) -> EngineHandle<CS, PS> {
38        EngineHandle { inner, handle }
39    }
40
41    pub fn on_started(&self) -> impl Future<Output = ()> {
42        self.handle.on_set_started()
43    }
44
45    pub fn write_entry_operation(&self, data: &[u8]) -> Result<OperationId, EngineError> {
46        let inner = self.inner.upgrade().ok_or(EngineError::InnerUpgrade)?;
47        let mut unlocked_inner = inner.write()?;
48
49        let my_node = unlocked_inner.cell.local_node();
50        let operation_id = unlocked_inner.clock.consistent_time(my_node).into();
51
52        let operation_builder = OperationBuilder::new_entry(operation_id, my_node.id(), data);
53        let operation = operation_builder.sign_and_build(my_node)?;
54
55        unlocked_inner.handle_new_operation(operation)?;
56
57        Ok(operation_id)
58    }
59
60    pub fn get_chain_segments(&self) -> Result<chain::Segments, EngineError> {
61        let inner = self.inner.upgrade().ok_or(EngineError::InnerUpgrade)?;
62        let unlocked_inner = inner.read()?;
63        Ok(unlocked_inner.chain_store.segments())
64    }
65
66    pub fn get_chain_operation(
67        &self,
68        block_offset: BlockOffset,
69        operation_id: OperationId,
70    ) -> Result<Option<EngineOperation>, EngineError> {
71        let inner = self.inner.upgrade().ok_or(EngineError::InnerUpgrade)?;
72        let unlocked_inner = inner.read()?;
73
74        let block = unlocked_inner.chain_store.get_block(block_offset)?;
75        EngineOperation::from_chain(block, operation_id)
76    }
77
78    pub fn get_chain_operations(
79        &self,
80        from_offset: Option<BlockOffset>,
81    ) -> ChainOperationsIterator<CS, PS> {
82        ChainOperationsIterator::new(self.inner.clone(), from_offset)
83    }
84
85    pub fn get_chain_last_block_info(
86        &self,
87    ) -> Result<Option<(BlockOffset, BlockHeight)>, EngineError> {
88        let inner = self.inner.upgrade().ok_or(EngineError::InnerUpgrade)?;
89        let unlocked_inner = inner.read()?;
90        let last_block = unlocked_inner.chain_store.get_last_block()?;
91
92        if let Some(last_block) = last_block {
93            let height = last_block.get_height()?;
94            Ok(Some((last_block.offset, height)))
95        } else {
96            Ok(None)
97        }
98    }
99
100    pub fn get_chain_block_info(
101        &self,
102        offset: BlockOffset,
103    ) -> Result<Option<(BlockOffset, BlockHeight)>, EngineError> {
104        let inner = self.inner.upgrade().ok_or(EngineError::InnerUpgrade)?;
105        let unlocked_inner = inner.read()?;
106        match unlocked_inner.chain_store.get_block(offset) {
107            Ok(block) => {
108                let height = block.get_height()?;
109                Ok(Some((block.offset, height)))
110            }
111            Err(err) if err.is_fatal() => Err(err.into()),
112            Err(_err) => {
113                // non-fatal just mean we didn't find block
114                Ok(None)
115            }
116        }
117    }
118
119    pub fn get_chain_block(
120        &self,
121        offset: BlockOffset,
122    ) -> Result<Option<DataBlock<ChainData>>, EngineError> {
123        let inner = self.inner.upgrade().ok_or(EngineError::InnerUpgrade)?;
124        let unlocked_inner = inner.read()?;
125        match unlocked_inner.chain_store.get_block(offset) {
126            Ok(block) => Ok(Some(block)),
127            Err(err) if err.is_fatal() => Err(err.into()),
128            Err(_err) => {
129                // non-fatal just mean we didn't find block
130                Ok(None)
131            }
132        }
133    }
134
135    pub fn get_pending_operation(
136        &self,
137        operation_id: OperationId,
138    ) -> Result<Option<EngineOperation>, EngineError> {
139        let inner = self.inner.upgrade().ok_or(EngineError::InnerUpgrade)?;
140        let unlocked_inner = inner.read()?;
141
142        let operation = unlocked_inner
143            .pending_store
144            .get_operation(operation_id)?
145            .map(EngineOperation::from_pending);
146
147        Ok(operation)
148    }
149
150    pub fn get_pending_operations<R: RangeBounds<OperationId>>(
151        &self,
152        operations_range: R,
153    ) -> Result<Vec<EngineOperation>, EngineError> {
154        let inner = self.inner.upgrade().ok_or(EngineError::InnerUpgrade)?;
155        let unlocked_inner = inner.read()?;
156
157        let operations = unlocked_inner
158            .pending_store
159            .operations_iter(operations_range)?
160            .map(EngineOperation::from_pending)
161            .collect::<Vec<_>>();
162        Ok(operations)
163    }
164
165    pub fn get_operation(
166        &self,
167        operation_id: OperationId,
168    ) -> Result<Option<EngineOperation>, EngineError> {
169        let inner = self.inner.upgrade().ok_or(EngineError::InnerUpgrade)?;
170        let unlocked_inner = inner.read()?;
171
172        // first check if it's in pending store with a clear commit status
173        let pending_operation = unlocked_inner.pending_store.get_operation(operation_id)?;
174        let pending_operation = if let Some(pending_operation) = pending_operation {
175            if pending_operation.commit_status != CommitStatus::Unknown {
176                return Ok(Some(EngineOperation::from_pending(pending_operation)));
177            }
178
179            Some(pending_operation)
180        } else {
181            None
182        };
183
184        // if it's not found in pending store, or that it didn't have a clear status, we
185        // check in chain
186        if let Some(block) = unlocked_inner
187            .chain_store
188            .get_block_by_operation_id(operation_id)?
189        {
190            if let Some(chain_operation) = EngineOperation::from_chain(block, operation_id)? {
191                return Ok(Some(chain_operation));
192            }
193        }
194
195        // if we're here, the operation was either absent, or just had a unknown status
196        // in pending store we return the pending store operation (if any)
197        Ok(pending_operation.map(EngineOperation::from_pending))
198    }
199
200    /// Take the events stream receiver out of this `Handle`.
201    /// This stream is bounded and consumptions should be non-blocking to
202    /// prevent losing events. Calling the engine on every call should be
203    /// throttled in the case of a big read amplification.
204    pub fn take_events_stream(&mut self) -> Result<impl Stream<Item = Event>, EngineError> {
205        let inner = self.inner.upgrade().ok_or(EngineError::InnerUpgrade)?;
206        let mut unlocked_inner = inner.write()?;
207
208        let stream = unlocked_inner.get_new_events_stream(self.handle.id());
209
210        Ok(stream)
211    }
212}
213
214impl<CS, PS> Drop for EngineHandle<CS, PS>
215where
216    CS: chain::ChainStore,
217    PS: pending::PendingStore,
218{
219    fn drop(&mut self) {
220        debug!("Engine handle got dropped.");
221
222        if let Some(inner) = self.inner.upgrade() {
223            if let Ok(mut unlocked_inner) = inner.write() {
224                unlocked_inner.unregister_handle(self.handle.id());
225            }
226        }
227    }
228}
229
230impl<CS, PS> Clone for EngineHandle<CS, PS>
231where
232    CS: chain::ChainStore,
233    PS: pending::PendingStore,
234{
235    fn clone(&self) -> Self {
236        EngineHandle::new(self.inner.clone(), self.handle.clone())
237    }
238}
239
240/// Events dispatched to handles to notify changes in the different stores.
241#[derive(Debug, Clone, PartialEq, Eq)]
242pub enum Event {
243    /// The engine is now started
244    Started,
245
246    /// The stream of events hit the maximum buffer size, and some events got
247    /// discarded. Consumer state should be rebuilt from scratch to prevent
248    /// having inconsistencies.
249    StreamDiscontinuity,
250
251    /// An operation added to the pending store.
252    NewPendingOperation(OperationId),
253
254    /// A new block got added to the chain.
255    NewChainBlock(BlockOffset),
256
257    /// The chain has diverged from given offset, which mean it will get
258    /// re-written with new blocks. Operations after this offset should
259    /// ignored.
260    ChainDiverged(BlockOffset),
261}
262
263/// Operation that comes either from the chain or from the pending store
264pub struct EngineOperation {
265    pub operation_id: OperationId,
266    pub status: EngineOperationStatus,
267    pub operation_frame: Arc<super::operation::OperationFrame<Bytes>>,
268}
269
270impl EngineOperation {
271    fn from_pending(operation: pending::StoredOperation) -> EngineOperation {
272        let status = match operation.commit_status {
273            pending::CommitStatus::Committed(offset, height) => {
274                EngineOperationStatus::Committed(offset, height)
275            }
276            _ => EngineOperationStatus::Pending,
277        };
278
279        EngineOperation {
280            operation_id: operation.operation_id,
281            status,
282            operation_frame: operation.frame,
283        }
284    }
285
286    fn from_chain<B: Block>(
287        block: B,
288        operation_id: OperationId,
289    ) -> Result<Option<EngineOperation>, EngineError> {
290        if let Some(operation) = block.get_operation(operation_id)? {
291            let height = block.get_height()?;
292            return Ok(Some(EngineOperation {
293                operation_id,
294                status: EngineOperationStatus::Committed(block.offset(), height),
295                operation_frame: Arc::new(operation.to_owned()),
296            }));
297        }
298
299        Ok(None)
300    }
301}
302
303impl crate::operation::Operation for EngineOperation {
304    fn get_operation_reader(&self) -> Result<chain_operation::Reader, operation::Error> {
305        Ok(self.operation_frame.get_reader()?)
306    }
307}
308
309#[derive(Debug, PartialEq, Eq)]
310pub enum EngineOperationStatus {
311    Committed(BlockOffset, BlockHeight),
312    Pending,
313}
314
315impl EngineOperationStatus {
316    pub fn is_committed(&self) -> bool {
317        matches!(self, EngineOperationStatus::Committed(_offset, _height))
318    }
319}
320
321/// Iterator of operations in the chain
322pub struct ChainOperationsIterator<CS, PS>
323where
324    CS: chain::ChainStore,
325    PS: pending::PendingStore,
326{
327    next_offset: BlockOffset,
328    current_operations: Vec<EngineOperation>,
329    inner: Weak<RwLock<Inner<CS, PS>>>,
330}
331
332impl<CS, PS> ChainOperationsIterator<CS, PS>
333where
334    CS: chain::ChainStore,
335    PS: pending::PendingStore,
336{
337    fn new(
338        inner: Weak<RwLock<Inner<CS, PS>>>,
339        from_offset: Option<BlockOffset>,
340    ) -> ChainOperationsIterator<CS, PS> {
341        ChainOperationsIterator {
342            next_offset: from_offset.unwrap_or(0),
343            current_operations: Vec::new(),
344            inner,
345        }
346    }
347
348    fn fetch_next_block(&mut self) -> Result<(), EngineError> {
349        let inner = self.inner.upgrade().ok_or(EngineError::InnerUpgrade)?;
350        let inner = inner.read()?;
351
352        // since a block may not contain operations (ex: genesis), we need to loop until
353        // we find one
354        while self.current_operations.is_empty() {
355            let block = inner.chain_store.get_block(self.next_offset)?;
356            let height = block.get_height()?;
357
358            for operation in block.operations_iter()? {
359                let operation_reader = operation.get_reader()?;
360                let operation_id = operation_reader.get_operation_id();
361
362                self.current_operations.push(EngineOperation {
363                    operation_id,
364                    status: EngineOperationStatus::Committed(block.offset, height),
365                    operation_frame: Arc::new(operation.to_owned()),
366                });
367            }
368
369            // need to reverse as we will pop from end
370            self.current_operations.reverse();
371            self.next_offset = block.next_offset();
372        }
373
374        Ok(())
375    }
376}
377
378impl<CS, PS> Iterator for ChainOperationsIterator<CS, PS>
379where
380    CS: chain::ChainStore,
381    PS: pending::PendingStore,
382{
383    type Item = EngineOperation;
384
385    fn next(&mut self) -> Option<Self::Item> {
386        if self.current_operations.is_empty() {
387            if let Err(EngineError::ChainStore(chain::Error::OutOfBound(_))) =
388                self.fetch_next_block()
389            {
390                return None;
391            }
392        }
393
394        self.current_operations.pop()
395    }
396}