1#![deny(unused_must_use)]
2
3use std::io::Result;
4use tokio::sync::mpsc::{self, UnboundedSender, UnboundedReceiver};
5use async_trait::async_trait;
6use std::sync::Arc;
7
8pub mod transport;
9
10pub struct IOExecutor {
11 rx: UnboundedReceiver<Request>,
12}
13impl IOExecutor {
14 pub fn new() -> (Self, UnboundedSender<Request>) {
15 let (request_tx, request_rx) = mpsc::unbounded_channel();
16 let x = Self {
17 rx: request_rx,
18 };
19 (x, request_tx)
20 }
21 pub async fn run<E: StorageEngine>(mut self, engine: E) {
22 let engine = Arc::new(engine);
23 while let Some(req) = self.rx.recv().await {
24 let engine = Arc::clone(&engine);
25 let fut = async move {
26 let Request { inner, tx, context } = req;
27 match inner {
28 IORequestInner::Echo(n) => {
29 let resp = Response { inner: Ok(IOResponseInner::Echo(n)), context };
30 let _ = tx.send(resp);
31 },
32 IORequestInner::IORequest(req) => {
33 let resp_inner = engine.call(req).await;
34 let resp = Response { inner: resp_inner.map(|x| IOResponseInner::IOResponse(x)), context };
35 let _ = tx.send(resp);
36 }
37 }
38 };
39 tokio::spawn(fut);
40 }
41 }
42}
43enum IORequestInner {
44 IORequest(IORequest),
45 Echo(u32),
46}
47pub enum IORequest {
48 Write {
49 offset: u64,
50 length: u32,
51 payload: Vec<u8>,
52 fua: bool,
53 },
54 Read {
55 offset: u64,
56 length: u32,
57 },
58 Flush,
59 }
70enum IOResponseInner {
71 IOResponse(IOResponse),
72 Echo(u32),
73}
74pub enum IOResponse {
75 Ok,
76 Read {
77 payload: Vec<u8>,
78 },
79}
80pub struct Request {
81 inner: IORequestInner,
82 tx: UnboundedSender<Response>,
83 context: Vec<u8>,
84}
85struct Response {
86 inner: Result<IOResponseInner>,
87 context: Vec<u8>,
88}
89
90#[async_trait]
91pub trait StorageEngine: Send + Sync + 'static {
92 async fn call(&self, req: IORequest) -> Result<IOResponse>;
93}