1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
#![deny(unused_must_use)]

use std::io::Result;
use tokio::sync::mpsc::{self, UnboundedSender, UnboundedReceiver};
use async_trait::async_trait;
use std::sync::Arc;

pub mod transport;

pub struct IOExecutor {
    rx: UnboundedReceiver<Request>,
}
impl IOExecutor {
    pub fn new() -> (Self, UnboundedSender<Request>) {
        let (request_tx, request_rx) = mpsc::unbounded_channel();
        let x = Self {
            rx: request_rx,
        };
        (x, request_tx)
    }
    pub async fn run<E: StorageEngine>(mut self, engine: E) {
        let engine = Arc::new(engine);
        while let Some(req) = self.rx.recv().await {
            let engine = Arc::clone(&engine);
            let fut = async move {
                let Request { inner, tx, context } = req;
                match inner {
                    IORequestInner::Echo(n) => {
                        let resp = Response { inner: Ok(IOResponseInner::Echo(n)), context };
                        let _ = tx.send(resp);
                    },
                    IORequestInner::IORequest(req) => {
                        let resp_inner = engine.call(req).await;
                        let resp = Response { inner: resp_inner.map(|x| IOResponseInner::IOResponse(x)), context };
                        let _ = tx.send(resp);
                    }
                }
            };
            tokio::spawn(fut);
        }
    }
}
enum IORequestInner {
    IORequest(IORequest),
    Echo(u32),
}
pub enum IORequest {
    Write {
        offset: u64,
        length: u32,
        payload: Vec<u8>,
        fua: bool,
    },
    Read {
        offset: u64,
        length: u32,
    },
    Flush,
    // below not used yet.
    // Trim {
    //     offset: u64,
    //     length: u32,
    // },
    // WriteZeros {
    //     offset: u64,
    //     length: u32,
    //     fua: bool,
    // },
}
enum IOResponseInner {
    IOResponse(IOResponse),
    Echo(u32),
}
pub enum IOResponse {
    Ok,
    Read { 
        payload: Vec<u8>,
    },
}
pub struct Request {
    inner: IORequestInner,
    tx: UnboundedSender<Response>,
    context: Vec<u8>,
}
struct Response {
    inner: Result<IOResponseInner>,
    context: Vec<u8>,
}

#[async_trait]
pub trait StorageEngine: Send + Sync + 'static {
    async fn call(&self, req: IORequest) -> Result<IOResponse>;
}