aleph_bft_mock/
dataio.rs

1use aleph_bft_types::{DataProvider as DataProviderT, FinalizationHandler as FinalizationHandlerT};
2use async_trait::async_trait;
3use codec::{Decode, Encode};
4use futures::{channel::mpsc::unbounded, future::pending, AsyncWrite};
5use log::error;
6use parking_lot::Mutex;
7use std::{
8    io::{self},
9    pin::Pin,
10    sync::Arc,
11    task::{self, Poll},
12};
13
14type Receiver<T> = futures::channel::mpsc::UnboundedReceiver<T>;
15type Sender<T> = futures::channel::mpsc::UnboundedSender<T>;
16
17pub type Data = u32;
18
19#[derive(Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash, Debug, Default)]
20pub struct DataProvider {
21    counter: usize,
22    n_data: Option<usize>,
23}
24
25impl DataProvider {
26    pub fn new() -> Self {
27        Self {
28            counter: 0,
29            n_data: None,
30        }
31    }
32
33    pub fn new_finite(n_data: usize) -> Self {
34        Self {
35            counter: 0,
36            n_data: Some(n_data),
37        }
38    }
39    pub fn new_range(start: usize, end: usize) -> Self {
40        Self {
41            counter: start,
42            n_data: Some(end),
43        }
44    }
45}
46
47#[async_trait]
48impl DataProviderT for DataProvider {
49    type Output = Data;
50
51    async fn get_data(&mut self) -> Option<Data> {
52        let result = self.counter as u32;
53        self.counter += 1;
54        if let Some(n_data) = self.n_data {
55            if n_data < self.counter {
56                return None;
57            }
58        }
59        Some(result)
60    }
61}
62
63#[derive(Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash, Debug, Default, Decode, Encode)]
64pub struct StalledDataProvider {}
65
66impl StalledDataProvider {
67    pub fn new() -> Self {
68        Self {}
69    }
70}
71
72#[async_trait]
73impl DataProviderT for StalledDataProvider {
74    type Output = Data;
75
76    async fn get_data(&mut self) -> Option<Data> {
77        pending().await
78    }
79}
80
81#[derive(Clone, Debug)]
82pub struct FinalizationHandler {
83    tx: Sender<Data>,
84}
85
86impl FinalizationHandlerT<Data> for FinalizationHandler {
87    fn data_finalized(&mut self, data: Data) {
88        if let Err(e) = self.tx.unbounded_send(data) {
89            error!(target: "finalization-handler", "Error when sending data from FinalizationHandler {:?}.", e);
90        }
91    }
92}
93
94impl FinalizationHandler {
95    pub fn new() -> (Self, Receiver<Data>) {
96        let (tx, rx) = unbounded();
97
98        (Self { tx }, rx)
99    }
100}
101
102#[derive(Clone, Debug, Default)]
103pub struct Saver {
104    data: Arc<Mutex<Vec<u8>>>,
105}
106
107impl Saver {
108    pub fn new() -> Self {
109        Self {
110            data: Arc::new(Mutex::new(vec![])),
111        }
112    }
113}
114
115impl AsyncWrite for Saver {
116    fn poll_write(
117        self: Pin<&mut Self>,
118        _: &mut task::Context<'_>,
119        buf: &[u8],
120    ) -> Poll<io::Result<usize>> {
121        self.data.lock().extend_from_slice(buf);
122        Poll::Ready(Ok(buf.len()))
123    }
124
125    fn poll_flush(self: Pin<&mut Self>, _: &mut task::Context<'_>) -> Poll<io::Result<()>> {
126        Poll::Ready(Ok(()))
127    }
128
129    fn poll_close(self: Pin<&mut Self>, _: &mut task::Context<'_>) -> Poll<io::Result<()>> {
130        Poll::Ready(Ok(()))
131    }
132}
133
134impl From<Arc<Mutex<Vec<u8>>>> for Saver {
135    fn from(data: Arc<Mutex<Vec<u8>>>) -> Self {
136        Self { data }
137    }
138}
139
140pub type Loader = futures::io::Cursor<Vec<u8>>;