1use std::{
2 ops::RangeBounds,
3 sync::{Arc, RwLock, Weak},
4};
5
6use bytes::Bytes;
7use exocore_core::utils::handle_set::Handle;
8use exocore_protos::generated::data_chain_capnp::chain_operation;
9use futures::prelude::*;
10
11use super::{EngineError, Inner};
12use crate::{
13 block::{Block, BlockHeight, BlockOffset, DataBlock},
14 chain::{self, ChainData},
15 operation,
16 operation::{OperationBuilder, OperationId},
17 pending,
18 pending::CommitStatus,
19};
20
21pub struct EngineHandle<CS, PS>
24where
25 CS: chain::ChainStore,
26 PS: pending::PendingStore,
27{
28 inner: Weak<RwLock<Inner<CS, PS>>>,
29 handle: Handle,
30}
31
32impl<CS, PS> EngineHandle<CS, PS>
33where
34 CS: chain::ChainStore,
35 PS: pending::PendingStore,
36{
37 pub(crate) fn new(inner: Weak<RwLock<Inner<CS, PS>>>, handle: Handle) -> EngineHandle<CS, PS> {
38 EngineHandle { inner, handle }
39 }
40
41 pub fn on_started(&self) -> impl Future<Output = ()> {
42 self.handle.on_set_started()
43 }
44
45 pub fn write_entry_operation(&self, data: &[u8]) -> Result<OperationId, EngineError> {
46 let inner = self.inner.upgrade().ok_or(EngineError::InnerUpgrade)?;
47 let mut unlocked_inner = inner.write()?;
48
49 let my_node = unlocked_inner.cell.local_node();
50 let operation_id = unlocked_inner.clock.consistent_time(my_node).into();
51
52 let operation_builder = OperationBuilder::new_entry(operation_id, my_node.id(), data);
53 let operation = operation_builder.sign_and_build(my_node)?;
54
55 unlocked_inner.handle_new_operation(operation)?;
56
57 Ok(operation_id)
58 }
59
60 pub fn get_chain_segments(&self) -> Result<chain::Segments, EngineError> {
61 let inner = self.inner.upgrade().ok_or(EngineError::InnerUpgrade)?;
62 let unlocked_inner = inner.read()?;
63 Ok(unlocked_inner.chain_store.segments())
64 }
65
66 pub fn get_chain_operation(
67 &self,
68 block_offset: BlockOffset,
69 operation_id: OperationId,
70 ) -> Result<Option<EngineOperation>, EngineError> {
71 let inner = self.inner.upgrade().ok_or(EngineError::InnerUpgrade)?;
72 let unlocked_inner = inner.read()?;
73
74 let block = unlocked_inner.chain_store.get_block(block_offset)?;
75 EngineOperation::from_chain(block, operation_id)
76 }
77
78 pub fn get_chain_operations(
79 &self,
80 from_offset: Option<BlockOffset>,
81 ) -> ChainOperationsIterator<CS, PS> {
82 ChainOperationsIterator::new(self.inner.clone(), from_offset)
83 }
84
85 pub fn get_chain_last_block_info(
86 &self,
87 ) -> Result<Option<(BlockOffset, BlockHeight)>, EngineError> {
88 let inner = self.inner.upgrade().ok_or(EngineError::InnerUpgrade)?;
89 let unlocked_inner = inner.read()?;
90 let last_block = unlocked_inner.chain_store.get_last_block()?;
91
92 if let Some(last_block) = last_block {
93 let height = last_block.get_height()?;
94 Ok(Some((last_block.offset, height)))
95 } else {
96 Ok(None)
97 }
98 }
99
100 pub fn get_chain_block_info(
101 &self,
102 offset: BlockOffset,
103 ) -> Result<Option<(BlockOffset, BlockHeight)>, EngineError> {
104 let inner = self.inner.upgrade().ok_or(EngineError::InnerUpgrade)?;
105 let unlocked_inner = inner.read()?;
106 match unlocked_inner.chain_store.get_block(offset) {
107 Ok(block) => {
108 let height = block.get_height()?;
109 Ok(Some((block.offset, height)))
110 }
111 Err(err) if err.is_fatal() => Err(err.into()),
112 Err(_err) => {
113 Ok(None)
115 }
116 }
117 }
118
119 pub fn get_chain_block(
120 &self,
121 offset: BlockOffset,
122 ) -> Result<Option<DataBlock<ChainData>>, EngineError> {
123 let inner = self.inner.upgrade().ok_or(EngineError::InnerUpgrade)?;
124 let unlocked_inner = inner.read()?;
125 match unlocked_inner.chain_store.get_block(offset) {
126 Ok(block) => Ok(Some(block)),
127 Err(err) if err.is_fatal() => Err(err.into()),
128 Err(_err) => {
129 Ok(None)
131 }
132 }
133 }
134
135 pub fn get_pending_operation(
136 &self,
137 operation_id: OperationId,
138 ) -> Result<Option<EngineOperation>, EngineError> {
139 let inner = self.inner.upgrade().ok_or(EngineError::InnerUpgrade)?;
140 let unlocked_inner = inner.read()?;
141
142 let operation = unlocked_inner
143 .pending_store
144 .get_operation(operation_id)?
145 .map(EngineOperation::from_pending);
146
147 Ok(operation)
148 }
149
150 pub fn get_pending_operations<R: RangeBounds<OperationId>>(
151 &self,
152 operations_range: R,
153 ) -> Result<Vec<EngineOperation>, EngineError> {
154 let inner = self.inner.upgrade().ok_or(EngineError::InnerUpgrade)?;
155 let unlocked_inner = inner.read()?;
156
157 let operations = unlocked_inner
158 .pending_store
159 .operations_iter(operations_range)?
160 .map(EngineOperation::from_pending)
161 .collect::<Vec<_>>();
162 Ok(operations)
163 }
164
165 pub fn get_operation(
166 &self,
167 operation_id: OperationId,
168 ) -> Result<Option<EngineOperation>, EngineError> {
169 let inner = self.inner.upgrade().ok_or(EngineError::InnerUpgrade)?;
170 let unlocked_inner = inner.read()?;
171
172 let pending_operation = unlocked_inner.pending_store.get_operation(operation_id)?;
174 let pending_operation = if let Some(pending_operation) = pending_operation {
175 if pending_operation.commit_status != CommitStatus::Unknown {
176 return Ok(Some(EngineOperation::from_pending(pending_operation)));
177 }
178
179 Some(pending_operation)
180 } else {
181 None
182 };
183
184 if let Some(block) = unlocked_inner
187 .chain_store
188 .get_block_by_operation_id(operation_id)?
189 {
190 if let Some(chain_operation) = EngineOperation::from_chain(block, operation_id)? {
191 return Ok(Some(chain_operation));
192 }
193 }
194
195 Ok(pending_operation.map(EngineOperation::from_pending))
198 }
199
200 pub fn take_events_stream(&mut self) -> Result<impl Stream<Item = Event>, EngineError> {
205 let inner = self.inner.upgrade().ok_or(EngineError::InnerUpgrade)?;
206 let mut unlocked_inner = inner.write()?;
207
208 let stream = unlocked_inner.get_new_events_stream(self.handle.id());
209
210 Ok(stream)
211 }
212}
213
214impl<CS, PS> Drop for EngineHandle<CS, PS>
215where
216 CS: chain::ChainStore,
217 PS: pending::PendingStore,
218{
219 fn drop(&mut self) {
220 debug!("Engine handle got dropped.");
221
222 if let Some(inner) = self.inner.upgrade() {
223 if let Ok(mut unlocked_inner) = inner.write() {
224 unlocked_inner.unregister_handle(self.handle.id());
225 }
226 }
227 }
228}
229
230impl<CS, PS> Clone for EngineHandle<CS, PS>
231where
232 CS: chain::ChainStore,
233 PS: pending::PendingStore,
234{
235 fn clone(&self) -> Self {
236 EngineHandle::new(self.inner.clone(), self.handle.clone())
237 }
238}
239
240#[derive(Debug, Clone, PartialEq, Eq)]
242pub enum Event {
243 Started,
245
246 StreamDiscontinuity,
250
251 NewPendingOperation(OperationId),
253
254 NewChainBlock(BlockOffset),
256
257 ChainDiverged(BlockOffset),
261}
262
263pub struct EngineOperation {
265 pub operation_id: OperationId,
266 pub status: EngineOperationStatus,
267 pub operation_frame: Arc<super::operation::OperationFrame<Bytes>>,
268}
269
270impl EngineOperation {
271 fn from_pending(operation: pending::StoredOperation) -> EngineOperation {
272 let status = match operation.commit_status {
273 pending::CommitStatus::Committed(offset, height) => {
274 EngineOperationStatus::Committed(offset, height)
275 }
276 _ => EngineOperationStatus::Pending,
277 };
278
279 EngineOperation {
280 operation_id: operation.operation_id,
281 status,
282 operation_frame: operation.frame,
283 }
284 }
285
286 fn from_chain<B: Block>(
287 block: B,
288 operation_id: OperationId,
289 ) -> Result<Option<EngineOperation>, EngineError> {
290 if let Some(operation) = block.get_operation(operation_id)? {
291 let height = block.get_height()?;
292 return Ok(Some(EngineOperation {
293 operation_id,
294 status: EngineOperationStatus::Committed(block.offset(), height),
295 operation_frame: Arc::new(operation.to_owned()),
296 }));
297 }
298
299 Ok(None)
300 }
301}
302
303impl crate::operation::Operation for EngineOperation {
304 fn get_operation_reader(&self) -> Result<chain_operation::Reader, operation::Error> {
305 Ok(self.operation_frame.get_reader()?)
306 }
307}
308
309#[derive(Debug, PartialEq, Eq)]
310pub enum EngineOperationStatus {
311 Committed(BlockOffset, BlockHeight),
312 Pending,
313}
314
315impl EngineOperationStatus {
316 pub fn is_committed(&self) -> bool {
317 matches!(self, EngineOperationStatus::Committed(_offset, _height))
318 }
319}
320
321pub struct ChainOperationsIterator<CS, PS>
323where
324 CS: chain::ChainStore,
325 PS: pending::PendingStore,
326{
327 next_offset: BlockOffset,
328 current_operations: Vec<EngineOperation>,
329 inner: Weak<RwLock<Inner<CS, PS>>>,
330}
331
332impl<CS, PS> ChainOperationsIterator<CS, PS>
333where
334 CS: chain::ChainStore,
335 PS: pending::PendingStore,
336{
337 fn new(
338 inner: Weak<RwLock<Inner<CS, PS>>>,
339 from_offset: Option<BlockOffset>,
340 ) -> ChainOperationsIterator<CS, PS> {
341 ChainOperationsIterator {
342 next_offset: from_offset.unwrap_or(0),
343 current_operations: Vec::new(),
344 inner,
345 }
346 }
347
348 fn fetch_next_block(&mut self) -> Result<(), EngineError> {
349 let inner = self.inner.upgrade().ok_or(EngineError::InnerUpgrade)?;
350 let inner = inner.read()?;
351
352 while self.current_operations.is_empty() {
355 let block = inner.chain_store.get_block(self.next_offset)?;
356 let height = block.get_height()?;
357
358 for operation in block.operations_iter()? {
359 let operation_reader = operation.get_reader()?;
360 let operation_id = operation_reader.get_operation_id();
361
362 self.current_operations.push(EngineOperation {
363 operation_id,
364 status: EngineOperationStatus::Committed(block.offset, height),
365 operation_frame: Arc::new(operation.to_owned()),
366 });
367 }
368
369 self.current_operations.reverse();
371 self.next_offset = block.next_offset();
372 }
373
374 Ok(())
375 }
376}
377
378impl<CS, PS> Iterator for ChainOperationsIterator<CS, PS>
379where
380 CS: chain::ChainStore,
381 PS: pending::PendingStore,
382{
383 type Item = EngineOperation;
384
385 fn next(&mut self) -> Option<Self::Item> {
386 if self.current_operations.is_empty() {
387 if let Err(EngineError::ChainStore(chain::Error::OutOfBound(_))) =
388 self.fetch_next_block()
389 {
390 return None;
391 }
392 }
393
394 self.current_operations.pop()
395 }
396}