mfio_rt/native/
mod.rs

1//! Native async runtime
2//!
3//! This module provides native OS-backed runtime/backend implementations. To start, please access
4//! [`NativeRt`] or [`NativeRtBuilder`] types.
5
6use core::future::{ready, Future, Ready};
7use core::pin::Pin;
8use core::task::{Context, Poll};
9use futures::Stream;
10use mfio::backend::*;
11use mfio::error::{Code, Error, Location, Result as MfioResult, State, Subject};
12use mfio::io::{BoundPacketView, NoPos, PacketIo, Read, Write};
13use mfio::mferr;
14use mfio::stdeq::Seekable;
15use mfio::tarc::BaseArc;
16use std::fs;
17use std::net::{SocketAddr, TcpStream, ToSocketAddrs};
18use std::path::{Path, PathBuf};
19
20use crate::util::from_io_error;
21use crate::{
22    DirEntry, DirHandle, DirOp, Fs, Metadata, OpenOptions, Shutdown, Tcp, TcpListenerHandle,
23    TcpStreamHandle,
24};
25
26#[cfg(test)]
27use crate::{net_test_suite, test_suite};
28
29mod impls;
30
31macro_rules! fs_dispatch {
32    ($($(#[cfg($meta:meta)])* $name:ident => $mod:ident),*$(,)?) => {
33
34        pub enum NativeRtInstance {
35            $($(#[cfg($meta)] #[cfg_attr(docsrs, doc(cfg($meta)))])* $name(impls::$mod::Runtime)),*
36        }
37
38        impl NativeRtInstance {
39            fn register_file(&self, file: std::fs::File) -> NativeFile {
40                match self {
41                    $($(#[cfg($meta)])* Self::$name(v) => NativeFile::$name(v.register_file(file))),*
42                }
43            }
44
45            /// Registers a non-seekable I/O stream
46            ///
47            /// TODO: this perhaps should be private. We can't expose register_file publicly,
48            /// because on iocp, files need to be opened with appropriate (unchangeable!) flags.
49            /// Perhaps we should mirror this with streams.
50            pub fn register_stream(&self, stream: TcpStream) -> NativeTcpStream {
51                match self {
52                    $($(#[cfg($meta)])* Self::$name(v) => NativeTcpStream::$name(v.register_stream(stream))),*
53                }
54            }
55
56            fn get_map_options(&self) -> fn(fs::OpenOptions) -> fs::OpenOptions {
57                match self {
58                    $($(#[cfg($meta)])* Self::$name(_) => impls::$mod::map_options),*
59                }
60            }
61
62            pub fn cancel_all_ops(&self) {
63                match self {
64                    $($(#[cfg($meta)])* Self::$name(v) => v.cancel_all_ops()),*
65                }
66            }
67        }
68
69        impl core::fmt::Debug for NativeRtInstance {
70            fn fmt(&self, f: &mut core::fmt::Formatter) -> core::fmt::Result {
71                match self {
72                    $($(#[cfg($meta)])* Self::$name(_) => write!(f, stringify!(NativeRt::$name))),*
73                }
74            }
75        }
76
77        impl IoBackend for NativeRtInstance {
78            type Backend = DynBackend;
79
80            fn polling_handle(&self) -> Option<PollingHandle> {
81                match self {
82                    $($(#[cfg($meta)])* Self::$name(v) => v.polling_handle()),*
83                }
84            }
85
86            fn get_backend(&self) -> BackendHandle<Self::Backend> {
87                match self {
88                    $($(#[cfg($meta)])* Self::$name(v) => v.get_backend()),*
89                }
90            }
91        }
92
93        /// Builder for the [`NativeRt`](NativeRt).
94        ///
95        /// This builder allows configuring the I/O backends to try to construct for the filesystem
96        /// handle. Note that the order of backends is fixed, and is as follows:
97        ///
98        $(#[cfg_attr(all($($meta)*), doc = concat!("- ", stringify!($mod)))])*
99        ///
100        /// The full order (including unsupported/disabled backends) is as follows:
101        ///
102        $(#[doc = concat!("- `", stringify!($mod), "`")])*
103        ///
104        /// If you wish to customize the construction order, please use multiple builders.
105        #[derive(Default)]
106        pub struct NativeRtBuilder {
107            $($(#[cfg($meta)])* $mod: bool),*
108        }
109
110        impl NativeRtBuilder {
111            /// Get a `NativeRtBuilder` with all backends enabled.
112            pub fn all_backends() -> Self {
113                Self {
114                    $($(#[cfg($meta)])* $mod: true),*
115                }
116            }
117
118            /// Get a `NativeRtBuilder` with backends specified by environment.
119            ///
120            /// This function attempts to parse `MFIO_FS_BACKENDS` environment variable and load
121            /// backends specified by it. If the environment variable is not present, or
122            /// non-unicode, this function falls back to using
123            /// [`all_backends`](NativeRtBuilder::all_backends).
124            pub fn env_backends() -> Self {
125                match std::env::var("MFIO_FS_BACKENDS") {
126                    Ok(val) => {
127                        let vals = val.split(',').collect::<Vec<_>>();
128                        Self {
129                            $($(#[cfg($meta)])* $mod: vals.contains(&stringify!($mod))),*
130                        }
131                    }
132                    Err(_) => {
133                        Self::all_backends()
134                    }
135                }
136            }
137
138            pub fn enable_all(self) -> Self {
139                let _ = self;
140                Self::all_backends()
141            }
142
143            $($(#[cfg($meta)] #[cfg_attr(docsrs, doc(cfg($meta)))])*
144            #[doc = concat!("Enables the ", stringify!($mod), " backend.")]
145            pub fn $mod(self, $mod: bool) -> Self {
146                Self {
147                    $mod,
148                    ..self
149                }
150            })*
151
152            pub fn build(self) -> mfio::error::Result<NativeRt> {
153                $($(#[cfg($meta)])* if self.$mod {
154                    if let Ok(v) = impls::$mod::Runtime::try_new() {
155                        return Ok(NativeRtInstance::$name(v).into());
156                    }
157                })*
158
159                Err(Error {
160                    code: Code::from_http(501).unwrap(),
161                    subject: Subject::Backend,
162                    state: State::Unsupported,
163                    location: Location::Filesystem,
164                })
165            }
166
167            pub fn build_each(self) -> Vec<(&'static str, mfio::error::Result<NativeRt>)> {
168                let mut ret = vec![];
169
170                $($(#[cfg($meta)])* if self.$mod {
171                    ret.push((
172                        stringify!($mod),
173                        impls::$mod::Runtime::try_new()
174                            .map_err(|e| e.into())
175                            .map(|v| NativeRtInstance::$name(v).into())
176                    ));
177                })*
178
179                ret
180            }
181        }
182
183        pub enum NativeFile {
184            $($(#[cfg($meta)])* $name(impls::$mod::FileWrapper)),*
185        }
186
187        impl PacketIo<Write, u64> for NativeFile {
188            fn send_io(&self, param: u64, view: BoundPacketView<Write>) {
189                match self {
190                    $($(#[cfg($meta)])* Self::$name(v) => v.send_io(param, view)),*
191                }
192            }
193        }
194
195        impl PacketIo<Read, u64> for NativeFile {
196            fn send_io(&self, param: u64, view: BoundPacketView<Read>) {
197                match self {
198                    $($(#[cfg($meta)])* Self::$name(v) => v.send_io(param, view)),*
199                }
200            }
201        }
202
203        pub enum NativeTcpStream {
204            $($(#[cfg($meta)])* $name(impls::$mod::TcpStream)),*
205        }
206
207        impl Drop for NativeTcpStream {
208            fn drop(&mut self) {
209                log::trace!("Drop stream");
210            }
211        }
212
213        impl PacketIo<Write, NoPos> for NativeTcpStream {
214            fn send_io(&self, param: NoPos, view: BoundPacketView<Write>) {
215                match self {
216                    $($(#[cfg($meta)])* Self::$name(v) => v.send_io(param, view)),*
217                }
218            }
219        }
220
221        impl PacketIo<Read, NoPos> for NativeTcpStream {
222            fn send_io(&self, param: NoPos, view: BoundPacketView<Read>) {
223                match self {
224                    $($(#[cfg($meta)])* Self::$name(v) => v.send_io(param, view)),*
225                }
226            }
227        }
228
229        impl TcpStreamHandle for NativeTcpStream {
230            fn local_addr(&self) -> MfioResult<SocketAddr> {
231                match self {
232                    $($(#[cfg($meta)])* Self::$name(v) => v.local_addr()),*
233                }
234            }
235
236            fn peer_addr(&self) -> MfioResult<SocketAddr> {
237                match self {
238                    $($(#[cfg($meta)])* Self::$name(v) => v.peer_addr()),*
239                }
240            }
241
242            fn shutdown(&self, how: Shutdown) -> MfioResult<()> {
243                match self {
244                    $($(#[cfg($meta)])* Self::$name(v) => v.shutdown(how)),*
245                }
246            }
247        }
248
249        pub enum NativeTcpConnectFuture<'a, A: ToSocketAddrs + 'a> {
250            $($(#[cfg($meta)])* $name(impls::$mod::TcpConnectFuture<'a, A>)),*
251        }
252
253        impl<'a, A: ToSocketAddrs + Send> Future for NativeTcpConnectFuture<'a, A> {
254            type Output = MfioResult<NativeTcpStream>;
255
256            fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
257                // SAFETY: we are not moving the inner value
258                let this = unsafe { self.get_unchecked_mut() };
259                match this {
260                    $($(#[cfg($meta)])* Self::$name(v) => {
261                        if let Poll::Ready(v) = unsafe { Pin::new_unchecked(v).poll(cx) } {
262                            Poll::Ready(v.map(NativeTcpStream::$name))
263                        } else {
264                            Poll::Pending
265                        }
266                    }),*
267                }
268            }
269        }
270
271        impl Tcp for NativeRtInstance {
272            type StreamHandle = NativeTcpStream;
273            type ListenerHandle = NativeTcpListener;
274            type ConnectFuture<'a, A: ToSocketAddrs + Send + 'a> = NativeTcpConnectFuture<'a, A>;
275            type BindFuture<'a, A: ToSocketAddrs + Send + 'a> = core::future::Ready<MfioResult<NativeTcpListener>>;
276
277            fn connect<'a, A: ToSocketAddrs + Send + 'a>(
278                &'a self,
279                addrs: A,
280            ) -> Self::ConnectFuture<'a, A> {
281                match self {
282                    $($(#[cfg($meta)])* Self::$name(v) => NativeTcpConnectFuture::$name(v.tcp_connect(addrs))),*
283                }
284            }
285
286            fn bind<'a, A: ToSocketAddrs + Send + 'a>(&'a self, addrs: A) -> Self::BindFuture<'a, A> {
287                let listener = std::net::TcpListener::bind(addrs);
288                core::future::ready(
289                    listener.map(|l| match self {
290                        $($(#[cfg($meta)])* Self::$name(v) => NativeTcpListener::$name(v.register_listener(l))),*
291                    }).map_err(from_io_error)
292                )
293            }
294        }
295
296        pub enum NativeTcpListener {
297            $($(#[cfg($meta)])* $name(impls::$mod::TcpListener)),*
298        }
299
300        impl Stream for NativeTcpListener {
301            type Item = (NativeTcpStream, SocketAddr);
302
303            fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
304                let this = unsafe { self.get_unchecked_mut() };
305                match this {
306                    $($(#[cfg($meta)])* Self::$name(v) => {
307                        if let Poll::Ready(v) = unsafe { Pin::new_unchecked(v).poll_next(cx) } {
308                            Poll::Ready(v.map(|(a, b)| (NativeTcpStream::$name(a), b)))
309                        } else {
310                            Poll::Pending
311                        }
312                    }),*
313                }
314            }
315        }
316
317        impl TcpListenerHandle for NativeTcpListener {
318            type StreamHandle = NativeTcpStream;
319
320            fn local_addr(&self) -> MfioResult<SocketAddr> {
321                match self {
322                    $($(#[cfg($meta)])* Self::$name(v) => v.local_addr()),*
323                }
324            }
325        }
326    }
327}
328
329fs_dispatch! {
330    #[cfg(all(not(miri), target_os = "linux", feature = "io-uring"))]
331    IoUring => io_uring,
332    #[cfg(all(not(miri), target_os = "windows", feature = "iocp"))]
333    Iocp => iocp,
334    #[cfg(all(not(miri), unix, feature = "mio"))]
335    Mio => mio,
336    Default => thread,
337}
338
339const _: () = {
340    const fn verify_send<T: Send>() {}
341    const fn verify_sync<T: Sync>() {}
342
343    verify_send::<NativeRtInstance>();
344    verify_send::<NativeFile>();
345    verify_send::<NativeTcpStream>();
346    verify_send::<NativeTcpListener>();
347};
348
349/// Native OS backed runtime
350///
351/// # Examples
352///
353/// Read a file:
354/// ```
355/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
356/// use mfio::io::*;
357/// use mfio::stdeq::*;
358/// use mfio_rt::*;
359/// use std::fs::write;
360/// use std::path::Path;
361///
362/// let test_string = "Test test 42";
363/// let mut filepath = std::env::temp_dir();
364/// filepath.push("mfio-fs-test-read");
365///
366/// // Create a test file:
367/// write(&filepath, test_string.as_bytes())?;
368///
369/// // Create mfio's filesystem
370/// NativeRt::default()
371///     .run(|fs| async move {
372///         let fh = fs.open(&filepath, OpenOptions::new().read(true)).await?;
373///
374///         let mut output = vec![];
375///         fh.read_to_end(&mut output).await?;
376///
377///         assert_eq!(test_string.len(), fh.get_pos() as usize);
378///         assert_eq!(test_string.as_bytes(), output);
379///         mfio::error::Result::Ok(())
380///     })
381///     .unwrap();
382///
383/// # Ok(())
384/// # }
385/// ```
386///
387/// Write a file:
388/// ```
389/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
390/// # pollster::block_on(async move {
391/// use mfio::io::*;
392/// use mfio::stdeq::*;
393/// use mfio_rt::*;
394/// use std::io::Seek;
395/// use std::path::Path;
396///
397/// let mut test_data = vec![];
398///
399/// for i in 0u8..128 {
400///     test_data.extend(i.to_ne_bytes());
401/// }
402///
403/// let mut filepath = std::env::temp_dir();
404/// filepath.push("mfio-fs-test-write");
405///
406/// // Create mfio's filesystem
407/// NativeRt::default()
408///     .run(|fs| async move {
409///         let mut fh = fs
410///             .open(
411///                 &filepath,
412///                 OpenOptions::new()
413///                     .read(true)
414///                     .write(true)
415///                     .create(true)
416///                     .truncate(true),
417///             )
418///             .await?;
419///
420///         fh.write(&test_data).await?;
421///
422///         assert_eq!(test_data.len(), fh.get_pos() as usize);
423///
424///         fh.rewind();
425///
426///         // Read the data back out
427///         let mut output = vec![];
428///         fh.read_to_end(&mut output).await?;
429///
430///         assert_eq!(test_data.len(), fh.get_pos() as usize);
431///         assert_eq!(test_data, output);
432///         mfio::error::Result::Ok(())
433///     })
434///     .unwrap();
435/// # Ok(())
436/// # })
437/// # }
438/// ```
439#[derive(Debug)]
440pub struct NativeRt {
441    cwd: NativeRtDir,
442}
443
444impl IoBackend for NativeRt {
445    type Backend = <NativeRtInstance as IoBackend>::Backend;
446
447    fn polling_handle(&self) -> Option<PollingHandle> {
448        self.cwd.instance.polling_handle()
449    }
450
451    fn get_backend(&self) -> BackendHandle<Self::Backend> {
452        self.cwd.instance.get_backend()
453    }
454}
455
456impl Tcp for NativeRt {
457    type StreamHandle = <NativeRtInstance as Tcp>::StreamHandle;
458    type ListenerHandle = <NativeRtInstance as Tcp>::ListenerHandle;
459    type ConnectFuture<'a, A: ToSocketAddrs + Send + 'a> =
460        <NativeRtInstance as Tcp>::ConnectFuture<'a, A>;
461    type BindFuture<'a, A: ToSocketAddrs + Send + 'a> =
462        <NativeRtInstance as Tcp>::BindFuture<'a, A>;
463
464    fn connect<'a, A: ToSocketAddrs + Send + 'a>(&'a self, addrs: A) -> Self::ConnectFuture<'a, A> {
465        self.cwd.instance.connect(addrs)
466    }
467
468    fn bind<'a, A: ToSocketAddrs + Send + 'a>(&'a self, addrs: A) -> Self::BindFuture<'a, A> {
469        self.cwd.instance.bind(addrs)
470    }
471}
472
473impl Default for NativeRt {
474    fn default() -> Self {
475        NativeRtBuilder::env_backends()
476            .build()
477            .expect("Could not initialize any FS backend")
478    }
479}
480
481impl Fs for NativeRt {
482    type DirHandle<'a> = NativeRtDir;
483
484    fn current_dir(&self) -> &Self::DirHandle<'_> {
485        &self.cwd
486    }
487}
488
489impl NativeRt {
490    pub fn builder() -> NativeRtBuilder {
491        NativeRtBuilder::default()
492    }
493
494    pub fn instance(&self) -> &BaseArc<NativeRtInstance> {
495        &self.cwd.instance
496    }
497
498    pub fn run<'a, Func: FnOnce(&'a NativeRt) -> F, F: Future>(
499        &'a mut self,
500        func: Func,
501    ) -> F::Output {
502        self.block_on(func(self))
503    }
504
505    /// Registers a non-seekable I/O stream
506    pub fn register_stream(&self, stream: TcpStream) -> NativeTcpStream {
507        self.cwd.instance.register_stream(stream)
508    }
509
510    pub fn cancel_all_ops(&self) {
511        self.cwd.instance.cancel_all_ops()
512    }
513
514    pub fn set_cwd(&mut self, dir: PathBuf) {
515        self.cwd.dir = Some(dir);
516    }
517}
518
519impl From<NativeRtInstance> for NativeRt {
520    fn from(instance: NativeRtInstance) -> Self {
521        let (ops, rx) = flume::bounded(16);
522
523        // TODO: store the join handle
524        std::thread::spawn(move || rx.into_iter().for_each(RtBgOp::process));
525
526        Self {
527            cwd: NativeRtDir {
528                dir: None,
529                ops,
530                instance: BaseArc::from(instance),
531            },
532        }
533    }
534}
535
536impl NativeRtDir {
537    fn join_path<P: AsRef<Path>>(&self, other: P) -> std::io::Result<PathBuf> {
538        if other.as_ref().is_absolute() {
539            Ok(other.as_ref().into())
540        } else {
541            self.get_path().map(|v| v.join(other))
542        }
543    }
544    fn get_path(&self) -> std::io::Result<PathBuf> {
545        if let Some(dir) = self.dir.clone() {
546            Ok(dir)
547        } else {
548            std::env::current_dir()
549        }
550    }
551}
552
553impl DirHandle for NativeRtDir {
554    type FileHandle = Seekable<NativeFile, u64>;
555    type OpenFileFuture<'a> = OpenFileFuture<'a>;
556    type PathFuture<'a> = Ready<MfioResult<PathBuf>>;
557    type OpenDirFuture<'a> = Ready<MfioResult<Self>>;
558    type ReadDir<'a> = ReadDir;
559    type ReadDirFuture<'a> = Ready<MfioResult<ReadDir>>;
560    type MetadataFuture<'a> = MetadataFuture;
561    type OpFuture<'a> = OpFuture;
562
563    /// Gets the absolute path to this `DirHandle`.
564    fn path(&self) -> Self::PathFuture<'_> {
565        ready(self.get_path().map_err(from_io_error))
566    }
567
568    /// Reads the directory contents.
569    ///
570    /// This function, upon success, returns a stream that can be used to list files and
571    /// subdirectories within this dir.
572    ///
573    /// Note that on various platforms this may behave differently. For instance, Unix platforms
574    /// support holding
575    fn read_dir(&self) -> Self::ReadDirFuture<'_> {
576        ready(
577            self.get_path()
578                .and_then(|v| v.read_dir())
579                .map_err(from_io_error)
580                .map(|instance| ReadDir { instance }),
581        )
582    }
583
584    /// Opens a file.
585    ///
586    /// This function accepts an absolute or relative path to a file for reading. If the path is
587    /// relative, it is opened relative to this `DirHandle`.
588    fn open_file<'a, P: AsRef<Path> + ?Sized>(
589        &'a self,
590        path: &'a P,
591        options: OpenOptions,
592    ) -> Self::OpenFileFuture<'a> {
593        let (tx, rx) = oneshot::channel();
594
595        if let Ok(path) = self.join_path(path) {
596            let _ = self.ops.send(RtBgOp::OpenFile {
597                path,
598                options,
599                map_options: self.instance.get_map_options(),
600                completion: tx,
601            });
602        } else {
603            let _ = tx.send(Err(mferr!(Directory, Unavailable, Filesystem)));
604        }
605
606        OpenFileFuture {
607            rt: self,
608            completion: rx,
609        }
610    }
611
612    /// Opens a directory.
613    ///
614    /// This function accepts an absolute or relative path to a directory for reading. If the path
615    /// is relative, it is opened relative to this `DirHandle`.
616    fn open_dir<'a, P: AsRef<Path> + ?Sized>(&'a self, path: &'a P) -> Self::OpenDirFuture<'a> {
617        let dir = self.join_path(path).map_err(from_io_error).and_then(|v| {
618            if v.is_dir() {
619                Ok(Self {
620                    dir: Some(v),
621                    ops: self.ops.clone(),
622                    instance: self.instance.clone(),
623                })
624            } else if v.exists() {
625                Err(mferr!(Path, Invalid, Filesystem))
626            } else {
627                Err(mferr!(Path, NotFound, Filesystem))
628            }
629        });
630
631        ready(dir)
632    }
633
634    fn metadata<'a, P: AsRef<Path> + ?Sized>(&'a self, path: &'a P) -> Self::MetadataFuture<'a> {
635        let (tx, rx) = oneshot::channel();
636
637        if let Ok(path) = self.join_path(path) {
638            let _ = self.ops.send(RtBgOp::Metadata {
639                path,
640                completion: tx,
641            });
642        } else {
643            let _ = tx.send(Err(mferr!(Directory, Unavailable, Filesystem)));
644        }
645
646        MetadataFuture { completion: rx }
647    }
648
649    /// Do an operation.
650    ///
651    /// This function performs an operation from the [`DirOp`] enum.
652    fn do_op<'a, P: AsRef<Path> + ?Sized>(&'a self, operation: DirOp<&'a P>) -> Self::OpFuture<'a> {
653        let (tx, rx) = oneshot::channel();
654
655        let _ = self.ops.send(RtBgOp::DirOp {
656            op: operation.as_path().into_pathbuf(),
657            completion: tx,
658        });
659
660        OpFuture { completion: rx }
661    }
662}
663
664pub struct ReadDir {
665    instance: fs::ReadDir,
666}
667
668impl Stream for ReadDir {
669    type Item = MfioResult<DirEntry>;
670
671    fn poll_next(self: Pin<&mut Self>, _: &mut Context) -> Poll<Option<Self::Item>> {
672        let this = unsafe { self.get_unchecked_mut() };
673
674        Poll::Ready(
675            this.instance
676                .next()
677                .map(|v| v.map(From::from).map_err(from_io_error)),
678        )
679    }
680}
681
682pub struct MetadataFuture {
683    completion: oneshot::Receiver<MfioResult<Metadata>>,
684}
685
686impl Future for MetadataFuture {
687    type Output = MfioResult<Metadata>;
688
689    fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
690        let this = unsafe { self.get_unchecked_mut() };
691        let completion = unsafe { Pin::new_unchecked(&mut this.completion) };
692        match completion.poll(cx) {
693            Poll::Ready(Ok(res)) => Poll::Ready(res),
694            Poll::Ready(Err(_)) => Poll::Ready(Err(mferr!(Output, BrokenPipe, Filesystem))),
695            Poll::Pending => Poll::Pending,
696        }
697    }
698}
699
700pub struct OpenFileFuture<'a> {
701    rt: &'a NativeRtDir,
702    completion: oneshot::Receiver<MfioResult<std::fs::File>>,
703}
704
705impl<'a> Future for OpenFileFuture<'a> {
706    type Output = MfioResult<<NativeRtDir as DirHandle>::FileHandle>;
707
708    fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
709        let this = unsafe { self.get_unchecked_mut() };
710        let completion = unsafe { Pin::new_unchecked(&mut this.completion) };
711        match completion.poll(cx) {
712            Poll::Ready(Ok(file)) => {
713                Poll::Ready(file.map(|f| this.rt.instance.register_file(f).into()))
714            }
715            Poll::Ready(Err(_)) => Poll::Ready(Err(mferr!(Output, BrokenPipe, Filesystem))),
716            Poll::Pending => Poll::Pending,
717        }
718    }
719}
720
721pub struct OpFuture {
722    completion: oneshot::Receiver<MfioResult<()>>,
723}
724
725impl Future for OpFuture {
726    type Output = MfioResult<()>;
727
728    fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
729        let this = unsafe { self.get_unchecked_mut() };
730        let completion = unsafe { Pin::new_unchecked(&mut this.completion) };
731        match completion.poll(cx) {
732            Poll::Ready(Ok(res)) => Poll::Ready(res),
733            Poll::Ready(Err(_)) => Poll::Ready(Err(mferr!(Output, BrokenPipe, Filesystem))),
734            Poll::Pending => Poll::Pending,
735        }
736    }
737}
738
739enum RtBgOp {
740    OpenFile {
741        path: PathBuf,
742        options: OpenOptions,
743        map_options: fn(std::fs::OpenOptions) -> std::fs::OpenOptions,
744        completion: oneshot::Sender<MfioResult<std::fs::File>>,
745    },
746    DirOp {
747        op: DirOp<PathBuf>,
748        completion: oneshot::Sender<MfioResult<()>>,
749    },
750    Metadata {
751        path: PathBuf,
752        completion: oneshot::Sender<MfioResult<Metadata>>,
753    },
754}
755
756impl RtBgOp {
757    fn process(self) {
758        match self {
759            Self::OpenFile {
760                path,
761                options,
762                map_options,
763                completion,
764            } => {
765                let mut fs_options = fs::OpenOptions::new();
766
767                fs_options
768                    .read(options.read)
769                    .write(options.write)
770                    .create(options.create)
771                    .create_new(options.create_new)
772                    .truncate(options.truncate);
773
774                let file = map_options(fs_options)
775                    .open(path)
776                    .map_err(crate::util::from_io_error);
777
778                let _ = completion.send(file);
779            }
780            Self::DirOp { op, completion } => {
781                let ret = match op {
782                    DirOp::SetPermissions { .. } => {
783                        // FIXME
784                        // This needs to be made platform specific, because we don't have a way to
785                        // build permissions object ourselves.
786                        Err(std::io::ErrorKind::Unsupported.into())
787                    }
788                    DirOp::RemoveDir { path } => fs::remove_dir(path),
789                    DirOp::RemoveDirAll { path } => fs::remove_dir_all(path),
790                    DirOp::CreateDir { path } => fs::create_dir(path),
791                    DirOp::CreateDirAll { path } => fs::create_dir_all(path),
792                    DirOp::RemoveFile { path } => fs::remove_file(path),
793                    DirOp::Rename { from, to } => fs::rename(from, to),
794                    DirOp::Copy { from, to } => fs::copy(from, to).map(|_| ()),
795                    DirOp::HardLink { from, to } => fs::hard_link(from, to),
796                };
797                let _ = completion.send(ret.map_err(from_io_error));
798            }
799            Self::Metadata { path, completion } => {
800                let to_epoch_duration = |v: std::time::SystemTime| {
801                    v.duration_since(std::time::SystemTime::UNIX_EPOCH).ok()
802                };
803
804                let res = path
805                    .metadata()
806                    .map(|m| Metadata {
807                        permissions: m.permissions().into(),
808                        len: m.len(),
809                        modified: m.modified().ok().and_then(to_epoch_duration),
810                        accessed: m.accessed().ok().and_then(to_epoch_duration),
811                        created: m.created().ok().and_then(to_epoch_duration),
812                    })
813                    .map_err(from_io_error);
814                let _ = completion.send(res);
815            }
816        }
817    }
818}
819
820#[derive(Debug)]
821pub struct NativeRtDir {
822    dir: Option<PathBuf>,
823    ops: flume::Sender<RtBgOp>,
824    instance: BaseArc<NativeRtInstance>,
825}
826
827#[cfg(test)]
828mod tests {
829    //! There is not much to test here! But it is invaluable to use say thread pool based
830    //! implementation to verify data races when using miri/tsan. `mfio-fs` proves to be incredibly
831    //! valuable in doing that!
832
833    use super::*;
834    use core::future::poll_fn;
835    use core::mem::MaybeUninit;
836    use core::task::Poll;
837    use mfio::stdeq::*;
838    use mfio::traits::*;
839    use std::fs::write;
840    use std::io::Seek;
841
842    #[test]
843    fn simple_io() {
844        // Running this test under miri verifies correctness of basic
845        // cross-thread communication.
846        let test_string = "Test test 42";
847        let mut filepath = std::env::temp_dir();
848        filepath.push("mfio-fs-test-simple-io");
849
850        write(&filepath, test_string.as_bytes()).unwrap();
851
852        for (backend, fs) in NativeRtBuilder::all_backends().build_each() {
853            println!("{backend}");
854            fs.unwrap().run(|fs: &NativeRt| async {
855                let fh = fs
856                    .open(&filepath, OpenOptions::new().read(true))
857                    .await
858                    .unwrap();
859
860                let mut d = [MaybeUninit::uninit(); 8];
861
862                fh.read_all(0, &mut d[..]).await.unwrap();
863            });
864        }
865    }
866
867    #[test]
868    fn read_all() {
869        // Running this test under miri verifies correctness of basic
870        // cross-thread communication.
871        let test_string = "Test test 42";
872        let mut filepath = std::env::temp_dir();
873        filepath.push("mfio-fs-test-read-all");
874
875        write(&filepath, test_string.as_bytes()).unwrap();
876
877        for (backend, fs) in NativeRtBuilder::all_backends().build_each() {
878            println!("{backend}");
879            fs.unwrap().run(|fs| async {
880                let fh = fs
881                    .open(&filepath, OpenOptions::new().read(true))
882                    .await
883                    .unwrap();
884
885                let mut d = [MaybeUninit::uninit(); 8];
886
887                fh.read_all(0, &mut d[..]).await.unwrap();
888            });
889        }
890    }
891
892    #[test]
893    fn write_test() {
894        let mut test_data = vec![];
895
896        for i in 0u8..128 {
897            test_data.extend(i.to_ne_bytes());
898        }
899
900        let mut filepath = std::env::temp_dir();
901        filepath.push("mfio-fs-test-write");
902
903        for (backend, fs) in NativeRtBuilder::all_backends().build_each() {
904            println!("{backend}");
905            fs.unwrap().run(|fs| async {
906                let mut fh = fs
907                    .open(
908                        &filepath,
909                        OpenOptions::new()
910                            .read(true)
911                            .write(true)
912                            .create(true)
913                            .truncate(true),
914                    )
915                    .await
916                    .unwrap();
917
918                AsyncWrite::write(&fh, &test_data).await.unwrap();
919
920                assert_eq!(test_data.len(), fh.get_pos() as usize);
921
922                fh.rewind().unwrap();
923
924                // Read the data back out
925                let mut output = vec![];
926                AsyncRead::read_to_end(&fh, &mut output).await.unwrap();
927
928                assert_eq!(test_data.len(), fh.get_pos() as usize);
929                assert_eq!(test_data, output);
930
931                core::mem::drop(fh);
932            });
933        }
934    }
935
936    #[test]
937    fn read_to_end() {
938        let test_string = "Test test 42";
939        let mut filepath = std::env::temp_dir();
940        filepath.push("mfio-fs-test-read-to-end");
941
942        // Create a test file:
943        write(&filepath, test_string.as_bytes()).unwrap();
944
945        for (backend, fs) in NativeRtBuilder::all_backends().build_each() {
946            println!("{backend}");
947            fs.unwrap().run(|fs| async {
948                let fh = fs
949                    .open(&filepath, OpenOptions::new().read(true))
950                    .await
951                    .unwrap();
952
953                let mut output = vec![];
954                AsyncRead::read_to_end(&fh, &mut output).await.unwrap();
955
956                assert_eq!(test_string.len(), fh.get_pos() as usize);
957                assert_eq!(test_string.as_bytes(), output);
958            });
959        }
960    }
961
962    #[test]
963    fn wake_test_single() {
964        for (backend, fs) in NativeRtBuilder::all_backends().build_each() {
965            println!("{backend}");
966            fs.unwrap().run(|_| async move {
967                for i in 0..2 {
968                    println!("{i}");
969                    let mut signaled = false;
970                    poll_fn(|cx| {
971                        println!("{signaled}");
972                        if signaled {
973                            Poll::Ready(())
974                        } else {
975                            signaled = true;
976                            let waker = cx.waker().clone();
977                            std::thread::spawn(|| {
978                                std::thread::sleep(std::time::Duration::from_millis(200));
979                                println!("WAKE");
980                                waker.wake();
981                            });
982                            Poll::Pending
983                        }
984                    })
985                    .await;
986                }
987            });
988        }
989    }
990
991    #[test]
992    fn wake_test_dropped() {
993        for (backend, fs) in NativeRtBuilder::all_backends().build_each() {
994            println!("{backend}");
995            let (tx1, rx1) = std::sync::mpsc::channel();
996            let rx1 = BaseArc::new(parking_lot::Mutex::new(rx1));
997            let (tx2, rx2) = std::sync::mpsc::channel();
998
999            {
1000                fs.unwrap().run(|_| async move {
1001                    poll_fn(|cx| {
1002                        let tx2 = tx2.clone();
1003                        let rx1 = rx1.clone();
1004                        let waker = cx.waker().clone();
1005                        std::thread::spawn(move || {
1006                            rx1.lock().recv().unwrap();
1007                            println!("WAKE");
1008                            waker.wake();
1009                            println!("Woke");
1010                            tx2.send(()).unwrap();
1011                        });
1012                        Poll::Ready(())
1013                    })
1014                    .await;
1015                });
1016            }
1017
1018            tx1.send(()).unwrap();
1019            rx2.recv().unwrap();
1020        }
1021    }
1022
1023    #[test]
1024    fn wake_test_lot() {
1025        for (backend, fs) in NativeRtBuilder::all_backends().build_each() {
1026            println!("{backend}");
1027            fs.unwrap().run(|_| async move {
1028                #[cfg(miri)]
1029                let wakes = 20;
1030                #[cfg(not(miri))]
1031                let wakes = 2000;
1032                for i in 0..wakes {
1033                    println!("{i}");
1034                    let mut signaled = false;
1035                    poll_fn(|cx| {
1036                        println!("{signaled}");
1037                        if signaled {
1038                            Poll::Ready(())
1039                        } else {
1040                            signaled = true;
1041                            let waker = cx.waker().clone();
1042                            std::thread::spawn(|| {
1043                                println!("WAKE");
1044                                waker.wake();
1045                                println!("Woke");
1046                            });
1047                            Poll::Pending
1048                        }
1049                    })
1050                    .await;
1051                }
1052            });
1053        }
1054    }
1055
1056    #[test]
1057    fn self_wake() {
1058        // Verifies that all backends support self waking correctly
1059        for (backend, fs) in NativeRtBuilder::all_backends().build_each() {
1060            println!("{backend}");
1061            fs.unwrap().run(|_| async move {
1062                #[cfg(miri)]
1063                let wakes = 20;
1064                #[cfg(not(miri))]
1065                let wakes = 2000;
1066                for i in 0..wakes {
1067                    println!("{i}");
1068                    let mut signaled = false;
1069                    poll_fn(|cx| {
1070                        println!("{signaled}");
1071                        if signaled {
1072                            Poll::Ready(())
1073                        } else {
1074                            signaled = true;
1075                            cx.waker().wake_by_ref();
1076                            Poll::Pending
1077                        }
1078                    })
1079                    .await;
1080                }
1081            });
1082        }
1083    }
1084
1085    #[test]
1086    fn self_no_doublewake() {
1087        // Verifies that no backend incorrectly wakes itself up when not needed
1088        for (backend, fs) in NativeRtBuilder::all_backends().build_each() {
1089            println!("{backend}");
1090
1091            let (tx, rx) = std::sync::mpsc::channel();
1092
1093            fs.unwrap().run(|_| async move {
1094                let mut signaled = 0;
1095                poll_fn(|cx| {
1096                    println!("{signaled}");
1097                    if signaled > 1 {
1098                        Poll::Ready(())
1099                    } else {
1100                        signaled += 1;
1101                        if signaled == 1 {
1102                            cx.waker().wake_by_ref();
1103                        } else {
1104                            let waker = cx.waker().clone();
1105                            let tx = tx.clone();
1106                            std::thread::spawn(move || {
1107                                std::thread::sleep(std::time::Duration::from_millis(200));
1108                                println!("WAKE");
1109                                tx.send(()).unwrap();
1110                                waker.wake();
1111                            });
1112                        }
1113                        Poll::Pending
1114                    }
1115                })
1116                .await;
1117            });
1118
1119            assert_eq!(Ok(()), rx.try_recv());
1120        }
1121    }
1122}
1123
1124#[cfg(test)]
1125mod suite_tests {
1126    use super::*;
1127    test_suite!(tests_default, |test_name, closure| {
1128        let _ = ::env_logger::builder().is_test(true).try_init();
1129        let mut rt = crate::NativeRt::default();
1130        let rt = staticify(&mut rt);
1131        let dir = TempDir::new(test_name).unwrap();
1132        rt.set_cwd(dir.path().to_path_buf());
1133        rt.run(move |rt| {
1134            let run = TestRun::new(rt, dir);
1135            closure(run)
1136        });
1137    });
1138
1139    test_suite!(tests_all, |test_name, closure| {
1140        let _ = ::env_logger::builder().is_test(true).try_init();
1141        for (name, rt) in crate::NativeRt::builder().enable_all().build_each() {
1142            println!("{name}");
1143            if let Ok(mut rt) = rt {
1144                let rt = staticify(&mut rt);
1145                let dir = TempDir::new(test_name).unwrap();
1146                rt.set_cwd(dir.path().to_path_buf());
1147                rt.run(move |rt| {
1148                    let run = TestRun::new(rt, dir);
1149                    closure(run)
1150                });
1151            }
1152        }
1153    });
1154
1155    net_test_suite!(net_tests_default, |closure| {
1156        let _ = ::env_logger::builder().is_test(true).try_init();
1157        let mut rt = crate::NativeRt::default();
1158        let rt = staticify(&mut rt);
1159        rt.run(closure);
1160    });
1161
1162    net_test_suite!(net_tests_all, |closure| {
1163        let _ = ::env_logger::builder().is_test(true).try_init();
1164        for (name, rt) in crate::NativeRt::builder().enable_all().build_each() {
1165            println!("{name}");
1166            if let Ok(mut rt) = rt {
1167                let rt = staticify(&mut rt);
1168                rt.run(closure);
1169            }
1170        }
1171    });
1172
1173    // Test with different async runtimes
1174    #[cfg(all(unix, not(miri)))]
1175    mod smol {
1176        use super::*;
1177
1178        test_suite!(tests_default, |test_name, closure| {
1179            let _ = ::env_logger::builder().is_test(true).try_init();
1180
1181            smol::block_on(async {
1182                use mfio::backend::{integrations::async_io::AsyncIo, *};
1183
1184                let mut rt = crate::NativeRt::default();
1185                let rt = staticify(&mut rt);
1186                let dir = TempDir::new(test_name).unwrap();
1187                rt.set_cwd(dir.path().to_path_buf());
1188
1189                AsyncIo::run_with_mut(rt, move |rt| {
1190                    let run = TestRun::new(rt, dir);
1191                    closure(run)
1192                })
1193                .await;
1194            });
1195        });
1196
1197        test_suite!(tests_all, |test_name, closure| {
1198            let _ = ::env_logger::builder().is_test(true).try_init();
1199
1200            smol::block_on(async {
1201                use mfio::backend::{integrations::async_io::AsyncIo, *};
1202
1203                for (name, rt) in crate::NativeRt::builder().enable_all().build_each() {
1204                    println!("{name}");
1205                    if let Ok(mut rt) = rt {
1206                        let rt = staticify(&mut rt);
1207                        let dir = TempDir::new(test_name).unwrap();
1208                        rt.set_cwd(dir.path().to_path_buf());
1209                        AsyncIo::run_with_mut(rt, move |rt| {
1210                            let run = TestRun::new(rt, dir);
1211                            closure(run)
1212                        })
1213                        .await;
1214                    }
1215                }
1216            });
1217        });
1218
1219        net_test_suite!(net_tests_default, |closure| {
1220            let _ = ::env_logger::builder().is_test(true).try_init();
1221
1222            smol::block_on(async {
1223                use mfio::backend::{integrations::async_io::AsyncIo, *};
1224                let mut rt = crate::NativeRt::default();
1225                let rt = staticify(&mut rt);
1226                AsyncIo::run_with_mut(rt, closure).await;
1227            });
1228        });
1229
1230        net_test_suite!(net_tests_all, |closure| {
1231            let _ = ::env_logger::builder().is_test(true).try_init();
1232            smol::block_on(async {
1233                use mfio::backend::{integrations::async_io::AsyncIo, *};
1234                for (name, rt) in crate::NativeRt::builder().enable_all().build_each() {
1235                    println!("{name}");
1236                    if let Ok(mut rt) = rt {
1237                        let rt = staticify(&mut rt);
1238                        AsyncIo::run_with_mut(rt, closure).await;
1239                    }
1240                }
1241            });
1242        });
1243    }
1244
1245    #[cfg(all(unix, not(miri)))]
1246    mod tokio {
1247        use super::*;
1248
1249        test_suite!(tests_default, |test_name, closure| {
1250            let _ = ::env_logger::builder().is_test(true).try_init();
1251
1252            tokio::runtime::Runtime::new().unwrap().block_on(async {
1253                use mfio::backend::{integrations::tokio::Tokio, *};
1254
1255                let mut rt = crate::NativeRt::default();
1256                let rt = staticify(&mut rt);
1257                let dir = TempDir::new(test_name).unwrap();
1258                rt.set_cwd(dir.path().to_path_buf());
1259
1260                Tokio::run_with_mut(rt, move |rt| {
1261                    let run = TestRun::new(rt, dir);
1262                    closure(run)
1263                })
1264                .await;
1265            });
1266        });
1267
1268        test_suite!(tests_all, |test_name, closure| {
1269            let _ = ::env_logger::builder().is_test(true).try_init();
1270
1271            tokio::runtime::Runtime::new().unwrap().block_on(async {
1272                use mfio::backend::{integrations::tokio::Tokio, *};
1273
1274                for (name, rt) in crate::NativeRt::builder().enable_all().build_each() {
1275                    println!("{name}");
1276                    if let Ok(mut rt) = rt {
1277                        let rt = staticify(&mut rt);
1278                        let dir = TempDir::new(test_name).unwrap();
1279                        rt.set_cwd(dir.path().to_path_buf());
1280                        Tokio::run_with_mut(rt, move |rt| {
1281                            let run = TestRun::new(rt, dir);
1282                            closure(run)
1283                        })
1284                        .await;
1285                    }
1286                }
1287            });
1288        });
1289
1290        net_test_suite!(net_tests_default, |closure| {
1291            let _ = ::env_logger::builder().is_test(true).try_init();
1292
1293            tokio::runtime::Runtime::new().unwrap().block_on(async {
1294                use mfio::backend::{integrations::tokio::Tokio, *};
1295                let mut rt = crate::NativeRt::default();
1296                let rt = staticify(&mut rt);
1297                Tokio::run_with_mut(rt, closure).await;
1298            });
1299        });
1300
1301        net_test_suite!(net_tests_all, |closure| {
1302            let _ = ::env_logger::builder().is_test(true).try_init();
1303            tokio::runtime::Runtime::new().unwrap().block_on(async {
1304                use mfio::backend::{integrations::tokio::Tokio, *};
1305                for (name, rt) in crate::NativeRt::builder().enable_all().build_each() {
1306                    println!("{name}");
1307                    if let Ok(mut rt) = rt {
1308                        let rt = staticify(&mut rt);
1309                        Tokio::run_with_mut(rt, closure).await;
1310                    }
1311                }
1312            });
1313        });
1314    }
1315}