userland_io/
lib.rs

1#![deny(unused_must_use)]
2
3use std::io::Result;
4use tokio::sync::mpsc::{self, UnboundedSender, UnboundedReceiver};
5use async_trait::async_trait;
6use std::sync::Arc;
7
8pub mod transport;
9
10pub struct IOExecutor {
11    rx: UnboundedReceiver<Request>,
12}
13impl IOExecutor {
14    pub fn new() -> (Self, UnboundedSender<Request>) {
15        let (request_tx, request_rx) = mpsc::unbounded_channel();
16        let x = Self {
17            rx: request_rx,
18        };
19        (x, request_tx)
20    }
21    pub async fn run<E: StorageEngine>(mut self, engine: E) {
22        let engine = Arc::new(engine);
23        while let Some(req) = self.rx.recv().await {
24            let engine = Arc::clone(&engine);
25            let fut = async move {
26                let Request { inner, tx, context } = req;
27                match inner {
28                    IORequestInner::Echo(n) => {
29                        let resp = Response { inner: Ok(IOResponseInner::Echo(n)), context };
30                        let _ = tx.send(resp);
31                    },
32                    IORequestInner::IORequest(req) => {
33                        let resp_inner = engine.call(req).await;
34                        let resp = Response { inner: resp_inner.map(|x| IOResponseInner::IOResponse(x)), context };
35                        let _ = tx.send(resp);
36                    }
37                }
38            };
39            tokio::spawn(fut);
40        }
41    }
42}
43enum IORequestInner {
44    IORequest(IORequest),
45    Echo(u32),
46}
47pub enum IORequest {
48    Write {
49        offset: u64,
50        length: u32,
51        payload: Vec<u8>,
52        fua: bool,
53    },
54    Read {
55        offset: u64,
56        length: u32,
57    },
58    Flush,
59    // below not used yet.
60    // Trim {
61    //     offset: u64,
62    //     length: u32,
63    // },
64    // WriteZeros {
65    //     offset: u64,
66    //     length: u32,
67    //     fua: bool,
68    // },
69}
70enum IOResponseInner {
71    IOResponse(IOResponse),
72    Echo(u32),
73}
74pub enum IOResponse {
75    Ok,
76    Read { 
77        payload: Vec<u8>,
78    },
79}
80pub struct Request {
81    inner: IORequestInner,
82    tx: UnboundedSender<Response>,
83    context: Vec<u8>,
84}
85struct Response {
86    inner: Result<IOResponseInner>,
87    context: Vec<u8>,
88}
89
90#[async_trait]
91pub trait StorageEngine: Send + Sync + 'static {
92    async fn call(&self, req: IORequest) -> Result<IOResponse>;
93}