rasi_mio/
fs.rs

1use std::{
2    fs::OpenOptions,
3    io::{Read, Seek, Write},
4    sync::Mutex,
5    task::Poll,
6};
7
8use rasi::fs::{register_fs_driver, FileOpenMode};
9
10use crate::utils::ready;
11
12pub struct MioFileSystemDriver;
13
14impl rasi::fs::syscall::Driver for MioFileSystemDriver {
15    fn open_file(
16        &self,
17        path: &std::path::Path,
18        open_mode: rasi::fs::FileOpenMode,
19    ) -> std::io::Result<rasi::fs::File> {
20        let mut ops = OpenOptions::new();
21
22        if open_mode.contains(FileOpenMode::Create) {
23            ops.create(true);
24        }
25
26        if open_mode.contains(FileOpenMode::CreateNew) {
27            ops.create_new(true);
28        }
29
30        if open_mode.contains(FileOpenMode::Append) {
31            ops.append(true);
32        }
33
34        if open_mode.contains(FileOpenMode::Readable) {
35            ops.read(true);
36        }
37
38        if open_mode.contains(FileOpenMode::Truncate) {
39            ops.truncate(true);
40        }
41
42        if open_mode.contains(FileOpenMode::Writable) {
43            ops.write(true);
44        }
45
46        let file = ops.open(path)?;
47
48        Ok(MioFile(file).into())
49    }
50
51    fn canonicalize(&self, path: &std::path::Path) -> std::io::Result<std::path::PathBuf> {
52        path.canonicalize()
53    }
54
55    fn poll_copy(
56        &self,
57        _cx: &mut std::task::Context<'_>,
58        from: &std::path::Path,
59        to: &std::path::Path,
60    ) -> std::task::Poll<std::io::Result<u64>> {
61        ready(|| std::fs::copy(from, to))
62    }
63
64    fn poll_create_dir(
65        &self,
66        _cx: &mut std::task::Context<'_>,
67        path: &std::path::Path,
68    ) -> std::task::Poll<std::io::Result<()>> {
69        ready(|| std::fs::create_dir(path))
70    }
71
72    fn poll_create_dir_all(
73        &self,
74        _cx: &mut std::task::Context<'_>,
75        path: &std::path::Path,
76    ) -> std::task::Poll<std::io::Result<()>> {
77        ready(|| std::fs::create_dir_all(path))
78    }
79
80    fn poll_hard_link(
81        &self,
82        _cx: &mut std::task::Context<'_>,
83        from: &std::path::Path,
84        to: &std::path::Path,
85    ) -> std::task::Poll<std::io::Result<()>> {
86        ready(|| std::fs::hard_link(from, to))
87    }
88
89    fn poll_metadata(
90        &self,
91        _cx: &mut std::task::Context<'_>,
92        path: &std::path::Path,
93    ) -> std::task::Poll<std::io::Result<std::fs::Metadata>> {
94        ready(|| std::fs::metadata(path))
95    }
96
97    fn poll_read_link(
98        &self,
99        _cx: &mut std::task::Context<'_>,
100        path: &std::path::Path,
101    ) -> std::task::Poll<std::io::Result<std::path::PathBuf>> {
102        ready(|| std::fs::read_link(path))
103    }
104
105    fn poll_remove_dir(
106        &self,
107        _cx: &mut std::task::Context<'_>,
108        path: &std::path::Path,
109    ) -> std::task::Poll<std::io::Result<()>> {
110        ready(|| std::fs::remove_dir(path))
111    }
112
113    fn poll_remove_dir_all(
114        &self,
115        _cx: &mut std::task::Context<'_>,
116        path: &std::path::Path,
117    ) -> std::task::Poll<std::io::Result<()>> {
118        ready(|| std::fs::remove_dir_all(path))
119    }
120
121    fn poll_remove_file(
122        &self,
123        _cx: &mut std::task::Context<'_>,
124        path: &std::path::Path,
125    ) -> std::task::Poll<std::io::Result<()>> {
126        ready(|| std::fs::remove_file(path))
127    }
128
129    fn poll_rename(
130        &self,
131        _cx: &mut std::task::Context<'_>,
132        from: &std::path::Path,
133        to: &std::path::Path,
134    ) -> std::task::Poll<std::io::Result<()>> {
135        ready(|| std::fs::rename(from, to))
136    }
137
138    fn poll_set_permissions(
139        &self,
140        _cx: &mut std::task::Context<'_>,
141        path: &std::path::Path,
142        perm: &std::fs::Permissions,
143    ) -> std::task::Poll<std::io::Result<()>> {
144        ready(|| std::fs::set_permissions(path, perm.clone()))
145    }
146
147    fn poll_symlink_metadata(
148        &self,
149        _cx: &mut std::task::Context<'_>,
150        path: &std::path::Path,
151    ) -> std::task::Poll<std::io::Result<std::fs::Metadata>> {
152        ready(|| std::fs::symlink_metadata(path))
153    }
154
155    fn read_dir(&self, path: &std::path::Path) -> std::io::Result<rasi::fs::ReadDir> {
156        Ok(MioReadDir(Mutex::new(std::fs::read_dir(path)?)).into())
157    }
158
159    #[cfg(windows)]
160    /// Opens the named pipe identified by `addr`.
161    fn named_pipe_client_open(
162        &self,
163        addr: &std::ffi::OsStr,
164    ) -> std::io::Result<rasi::fs::windows::NamedPipeStream> {
165        use std::{os::windows::io::FromRawHandle, ptr::null};
166
167        use mio::{Interest, Token};
168        use windows_sys::Win32::{
169            Foundation::{GENERIC_READ, GENERIC_WRITE, INVALID_HANDLE_VALUE},
170            Storage::FileSystem::{CreateFileW, FILE_FLAG_OVERLAPPED, OPEN_EXISTING},
171        };
172
173        use crate::{net::MioSocket, reactor::global_reactor, token::TokenSequence};
174
175        let addr = windows::encode_addr(addr);
176
177        let desired_access = GENERIC_READ | GENERIC_WRITE;
178
179        let flag = FILE_FLAG_OVERLAPPED;
180
181        unsafe {
182            let handle = CreateFileW(
183                addr.as_ptr(),
184                desired_access,
185                0,
186                null(),
187                OPEN_EXISTING,
188                flag,
189                std::ptr::null_mut(),
190            );
191
192            if handle == INVALID_HANDLE_VALUE {
193                return Err(std::io::Error::last_os_error());
194            }
195
196            let mut socket = mio::windows::NamedPipe::from_raw_handle(handle as _);
197
198            let token = Token::next();
199
200            global_reactor().register(
201                &mut socket,
202                token,
203                Interest::READABLE.add(Interest::WRITABLE),
204            )?;
205
206            Ok(windows::MioNamedPipeStream(MioSocket { token, socket }, false).into())
207        }
208    }
209
210    #[cfg(windows)]
211    /// Creates the named pipe identified by `addr` for use as a server.
212    ///
213    /// This uses the [`CreateNamedPipe`] function.
214    fn named_pipe_server_create(
215        &self,
216        addr: &std::ffi::OsStr,
217    ) -> std::io::Result<rasi::fs::windows::NamedPipeListener> {
218        Ok(windows::MioNamedPipeListener::new(addr).into())
219    }
220}
221
222#[cfg(windows)]
223mod windows {
224    use std::{
225        ffi::OsStr,
226        io::{Error, Read, Write},
227        os::windows::{ffi::OsStrExt, io::FromRawHandle},
228        ptr::null,
229        sync::Mutex,
230        task::Poll,
231    };
232
233    use mio::{Interest, Token};
234    use windows_sys::Win32::{
235        Foundation::INVALID_HANDLE_VALUE,
236        Storage::FileSystem::{FILE_FLAG_OVERLAPPED, PIPE_ACCESS_DUPLEX},
237        System::Pipes::{
238            CreateNamedPipeW, PIPE_REJECT_REMOTE_CLIENTS, PIPE_TYPE_BYTE, PIPE_UNLIMITED_INSTANCES,
239        },
240    };
241
242    use crate::{
243        net::MioSocket, reactor::global_reactor, token::TokenSequence, utils::would_block,
244    };
245
246    pub fn encode_addr(addr: &OsStr) -> Box<[u16]> {
247        let len = addr.encode_wide().count();
248        let mut vec = Vec::with_capacity(len + 1);
249        vec.extend(addr.encode_wide());
250        vec.push(0);
251        vec.into_boxed_slice()
252    }
253
254    pub struct MioNamedPipeListener {
255        addr: Box<[u16]>,
256        buffer_size: u32,
257        next: Mutex<Option<MioSocket<mio::windows::NamedPipe>>>,
258    }
259
260    impl MioNamedPipeListener {
261        pub fn new(addr: &std::ffi::OsStr) -> Self {
262            MioNamedPipeListener {
263                addr: encode_addr(addr),
264                buffer_size: 512,
265                next: Default::default(),
266            }
267        }
268
269        fn create_stream(&self) -> std::io::Result<()> {
270            let mut next = self.next.lock().unwrap();
271
272            if next.is_some() {
273                return Ok(());
274            }
275
276            let pipe_mode = PIPE_TYPE_BYTE | PIPE_REJECT_REMOTE_CLIENTS;
277
278            let open_mode = FILE_FLAG_OVERLAPPED | PIPE_ACCESS_DUPLEX;
279
280            unsafe {
281                let handle = CreateNamedPipeW(
282                    self.addr.as_ptr(),
283                    open_mode,
284                    pipe_mode,
285                    PIPE_UNLIMITED_INSTANCES,
286                    self.buffer_size,
287                    self.buffer_size,
288                    0,
289                    null(),
290                );
291
292                if handle == INVALID_HANDLE_VALUE {
293                    return Err(Error::last_os_error());
294                }
295
296                let mut socket = mio::windows::NamedPipe::from_raw_handle(handle as _);
297
298                let token = Token::next();
299
300                global_reactor().register(
301                    &mut socket,
302                    token,
303                    Interest::READABLE.add(Interest::WRITABLE),
304                )?;
305
306                *next = Some(MioSocket { token, socket });
307
308                return Ok(());
309            }
310        }
311    }
312
313    impl rasi::fs::syscall::windows::DriverNamedPipeListener for MioNamedPipeListener {
314        fn poll_ready(
315            &self,
316            _cx: &mut std::task::Context<'_>,
317        ) -> std::task::Poll<std::io::Result<()>> {
318            Poll::Ready(Ok(()))
319        }
320
321        fn poll_next(
322            &self,
323            cx: &mut std::task::Context<'_>,
324        ) -> std::task::Poll<std::io::Result<rasi::fs::windows::NamedPipeStream>> {
325            self.create_stream()?;
326
327            let stream = self.next.lock().unwrap().take().unwrap();
328
329            global_reactor().once(stream.token, Interest::WRITABLE, cx.waker().clone());
330
331            loop {
332                match stream.connect() {
333                    Ok(_) => {
334                        return {
335                            global_reactor().remove_listeners(stream.token, Interest::WRITABLE);
336
337                            Poll::Ready(Ok(MioNamedPipeStream(stream, true).into()))
338                        }
339                    }
340                    Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => {
341                        *self.next.lock().unwrap() = Some(stream);
342                        return Poll::Pending;
343                    }
344                    Err(err) if err.kind() == std::io::ErrorKind::Interrupted => {
345                        continue;
346                    }
347                    Err(err) => {
348                        global_reactor().remove_listeners(stream.token, Interest::WRITABLE);
349                        return Poll::Ready(Err(err));
350                    }
351                }
352            }
353        }
354    }
355
356    pub struct MioNamedPipeStream(pub MioSocket<mio::windows::NamedPipe>, pub bool);
357
358    impl rasi::fs::syscall::windows::DriverNamedPipeStream for MioNamedPipeStream {
359        fn poll_ready(
360            &self,
361            _cx: &mut std::task::Context<'_>,
362        ) -> std::task::Poll<std::io::Result<()>> {
363            Poll::Ready(Ok(()))
364        }
365
366        fn poll_write(
367            &self,
368            cx: &mut std::task::Context<'_>,
369            buf: &[u8],
370        ) -> std::task::Poll<std::io::Result<usize>> {
371            would_block(self.0.token, cx.waker().clone(), Interest::WRITABLE, || {
372                (&self.0.socket).write(buf)
373            })
374        }
375
376        fn poll_read(
377            &self,
378            cx: &mut std::task::Context<'_>,
379            buf: &mut [u8],
380        ) -> std::task::Poll<std::io::Result<usize>> {
381            would_block(self.0.token, cx.waker().clone(), Interest::READABLE, || {
382                (&self.0.socket).read(buf)
383            })
384        }
385
386        fn poll_close(
387            &self,
388            _cx: &mut std::task::Context<'_>,
389        ) -> std::task::Poll<std::io::Result<()>> {
390            if self.1 {
391                self.0.socket.disconnect()?;
392            }
393
394            Poll::Ready(Ok(()))
395        }
396    }
397}
398
399struct MioFile(std::fs::File);
400
401impl rasi::fs::syscall::DriverFile for MioFile {
402    fn poll_ready(&self, _cx: &mut std::task::Context<'_>) -> std::task::Poll<std::io::Result<()>> {
403        Poll::Ready(Ok(()))
404    }
405
406    fn poll_write(
407        &self,
408        _cx: &mut std::task::Context<'_>,
409        buf: &[u8],
410    ) -> std::task::Poll<std::io::Result<usize>> {
411        ready(|| (&self.0).write(buf))
412    }
413
414    fn poll_read(
415        &self,
416        _cx: &mut std::task::Context<'_>,
417        buf: &mut [u8],
418    ) -> std::task::Poll<std::io::Result<usize>> {
419        ready(|| (&self.0).read(buf))
420    }
421
422    fn poll_flush(&self, _cx: &mut std::task::Context<'_>) -> std::task::Poll<std::io::Result<()>> {
423        ready(|| (&self.0).flush())
424    }
425
426    fn poll_seek(
427        &self,
428        _cx: &mut std::task::Context<'_>,
429        pos: std::io::SeekFrom,
430    ) -> std::task::Poll<std::io::Result<u64>> {
431        ready(|| (&self.0).seek(pos))
432    }
433
434    fn poll_meta(
435        &self,
436        _cx: &mut std::task::Context<'_>,
437    ) -> std::task::Poll<std::io::Result<std::fs::Metadata>> {
438        ready(|| (&self.0).metadata())
439    }
440
441    fn poll_set_permissions(
442        &self,
443        _cx: &mut std::task::Context<'_>,
444        perm: &std::fs::Permissions,
445    ) -> std::task::Poll<std::io::Result<()>> {
446        ready(|| (&self.0).set_permissions(perm.clone()))
447    }
448
449    fn poll_set_len(
450        &self,
451        _cx: &mut std::task::Context<'_>,
452        size: u64,
453    ) -> std::task::Poll<std::io::Result<()>> {
454        ready(|| (&self.0).set_len(size))
455    }
456}
457
458struct MioReadDir(Mutex<std::fs::ReadDir>);
459
460impl rasi::fs::syscall::DriverReadDir for MioReadDir {
461    fn poll_ready(&self, _cx: &mut std::task::Context<'_>) -> Poll<std::io::Result<()>> {
462        Poll::Ready(Ok(()))
463    }
464
465    fn poll_next(
466        &self,
467        _cx: &mut std::task::Context<'_>,
468    ) -> Poll<Option<std::io::Result<rasi::fs::DirEntry>>> {
469        ready(|| {
470            self.0
471                .lock()
472                .unwrap()
473                .next()
474                .map(|r| r.map(|e| MioDirEntry(e).into()))
475        })
476    }
477}
478
479struct MioDirEntry(std::fs::DirEntry);
480
481impl rasi::fs::syscall::DriverDirEntry for MioDirEntry {
482    fn name(&self) -> String {
483        self.0.file_name().to_string_lossy().into_owned()
484    }
485
486    fn path(&self) -> std::path::PathBuf {
487        self.0.path()
488    }
489
490    fn meta(&self) -> std::io::Result<std::fs::Metadata> {
491        self.0.metadata()
492    }
493
494    fn file_type(&self) -> std::io::Result<std::fs::FileType> {
495        self.0.file_type()
496    }
497}
498
499/// This function using [`register_fs_driver`] to register the `MioFileSystemDriver` to global registry.
500///
501/// So you may not call this function twice, otherwise will cause a panic. [`read more`](`register_fs_driver`)
502pub fn register_mio_filesystem() {
503    register_fs_driver(MioFileSystemDriver)
504}
505
506#[cfg(test)]
507mod tests {
508
509    use rasi_spec::fs::run_fs_spec;
510
511    use super::*;
512
513    #[futures_test::test]
514    async fn test_mio_fs() {
515        static DRIVER: MioFileSystemDriver = MioFileSystemDriver;
516
517        run_fs_spec(&DRIVER).await;
518
519        #[cfg(windows)]
520        rasi_spec::ipc::run_ipc_spec(&DRIVER).await;
521    }
522}