1use aleph_bft_types::{DataProvider as DataProviderT, FinalizationHandler as FinalizationHandlerT};
2use async_trait::async_trait;
3use codec::{Decode, Encode};
4use futures::{channel::mpsc::unbounded, future::pending, AsyncWrite};
5use log::error;
6use parking_lot::Mutex;
7use std::{
8 io::{self},
9 pin::Pin,
10 sync::Arc,
11 task::{self, Poll},
12};
13
14type Receiver<T> = futures::channel::mpsc::UnboundedReceiver<T>;
15type Sender<T> = futures::channel::mpsc::UnboundedSender<T>;
16
17pub type Data = u32;
18
19#[derive(Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash, Debug, Default)]
20pub struct DataProvider {
21 counter: usize,
22 n_data: Option<usize>,
23}
24
25impl DataProvider {
26 pub fn new() -> Self {
27 Self {
28 counter: 0,
29 n_data: None,
30 }
31 }
32
33 pub fn new_finite(n_data: usize) -> Self {
34 Self {
35 counter: 0,
36 n_data: Some(n_data),
37 }
38 }
39 pub fn new_range(start: usize, end: usize) -> Self {
40 Self {
41 counter: start,
42 n_data: Some(end),
43 }
44 }
45}
46
47#[async_trait]
48impl DataProviderT for DataProvider {
49 type Output = Data;
50
51 async fn get_data(&mut self) -> Option<Data> {
52 let result = self.counter as u32;
53 self.counter += 1;
54 if let Some(n_data) = self.n_data {
55 if n_data < self.counter {
56 return None;
57 }
58 }
59 Some(result)
60 }
61}
62
63#[derive(Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash, Debug, Default, Decode, Encode)]
64pub struct StalledDataProvider {}
65
66impl StalledDataProvider {
67 pub fn new() -> Self {
68 Self {}
69 }
70}
71
72#[async_trait]
73impl DataProviderT for StalledDataProvider {
74 type Output = Data;
75
76 async fn get_data(&mut self) -> Option<Data> {
77 pending().await
78 }
79}
80
81#[derive(Clone, Debug)]
82pub struct FinalizationHandler {
83 tx: Sender<Data>,
84}
85
86impl FinalizationHandlerT<Data> for FinalizationHandler {
87 fn data_finalized(&mut self, data: Data) {
88 if let Err(e) = self.tx.unbounded_send(data) {
89 error!(target: "finalization-handler", "Error when sending data from FinalizationHandler {:?}.", e);
90 }
91 }
92}
93
94impl FinalizationHandler {
95 pub fn new() -> (Self, Receiver<Data>) {
96 let (tx, rx) = unbounded();
97
98 (Self { tx }, rx)
99 }
100}
101
102#[derive(Clone, Debug, Default)]
103pub struct Saver {
104 data: Arc<Mutex<Vec<u8>>>,
105}
106
107impl Saver {
108 pub fn new() -> Self {
109 Self {
110 data: Arc::new(Mutex::new(vec![])),
111 }
112 }
113}
114
115impl AsyncWrite for Saver {
116 fn poll_write(
117 self: Pin<&mut Self>,
118 _: &mut task::Context<'_>,
119 buf: &[u8],
120 ) -> Poll<io::Result<usize>> {
121 self.data.lock().extend_from_slice(buf);
122 Poll::Ready(Ok(buf.len()))
123 }
124
125 fn poll_flush(self: Pin<&mut Self>, _: &mut task::Context<'_>) -> Poll<io::Result<()>> {
126 Poll::Ready(Ok(()))
127 }
128
129 fn poll_close(self: Pin<&mut Self>, _: &mut task::Context<'_>) -> Poll<io::Result<()>> {
130 Poll::Ready(Ok(()))
131 }
132}
133
134impl From<Arc<Mutex<Vec<u8>>>> for Saver {
135 fn from(data: Arc<Mutex<Vec<u8>>>) -> Self {
136 Self { data }
137 }
138}
139
140pub type Loader = futures::io::Cursor<Vec<u8>>;