1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
#![deny(unused_must_use)]
use std::io::Result;
use tokio::sync::mpsc::{self, UnboundedSender, UnboundedReceiver};
use async_trait::async_trait;
use std::sync::Arc;
pub mod transport;
pub struct IOExecutor {
rx: UnboundedReceiver<Request>,
}
impl IOExecutor {
pub fn new() -> (Self, UnboundedSender<Request>) {
let (request_tx, request_rx) = mpsc::unbounded_channel();
let x = Self {
rx: request_rx,
};
(x, request_tx)
}
pub async fn run<E: StorageEngine>(mut self, engine: E) {
let engine = Arc::new(engine);
while let Some(req) = self.rx.recv().await {
let engine = Arc::clone(&engine);
let fut = async move {
let Request { inner, tx, context } = req;
match inner {
IORequestInner::Echo(n) => {
let resp = Response { inner: Ok(IOResponseInner::Echo(n)), context };
let _ = tx.send(resp);
},
IORequestInner::IORequest(req) => {
let resp_inner = engine.call(req).await;
let resp = Response { inner: resp_inner.map(|x| IOResponseInner::IOResponse(x)), context };
let _ = tx.send(resp);
}
}
};
tokio::spawn(fut);
}
}
}
enum IORequestInner {
IORequest(IORequest),
Echo(u32),
}
pub enum IORequest {
Write {
offset: u64,
length: u32,
payload: Vec<u8>,
fua: bool,
},
Read {
offset: u64,
length: u32,
},
Flush,
}
enum IOResponseInner {
IOResponse(IOResponse),
Echo(u32),
}
pub enum IOResponse {
Ok,
Read {
payload: Vec<u8>,
},
}
pub struct Request {
inner: IORequestInner,
tx: UnboundedSender<Response>,
context: Vec<u8>,
}
struct Response {
inner: Result<IOResponseInner>,
context: Vec<u8>,
}
#[async_trait]
pub trait StorageEngine: Send + Sync + 'static {
async fn call(&self, req: IORequest) -> Result<IOResponse>;
}