1use core::future::{ready, Future, Ready};
7use core::pin::Pin;
8use core::task::{Context, Poll};
9use futures::Stream;
10use mfio::backend::*;
11use mfio::error::{Code, Error, Location, Result as MfioResult, State, Subject};
12use mfio::io::{BoundPacketView, NoPos, PacketIo, Read, Write};
13use mfio::mferr;
14use mfio::stdeq::Seekable;
15use mfio::tarc::BaseArc;
16use std::fs;
17use std::net::{SocketAddr, TcpStream, ToSocketAddrs};
18use std::path::{Path, PathBuf};
19
20use crate::util::from_io_error;
21use crate::{
22 DirEntry, DirHandle, DirOp, Fs, Metadata, OpenOptions, Shutdown, Tcp, TcpListenerHandle,
23 TcpStreamHandle,
24};
25
26#[cfg(test)]
27use crate::{net_test_suite, test_suite};
28
29mod impls;
30
31macro_rules! fs_dispatch {
32 ($($(#[cfg($meta:meta)])* $name:ident => $mod:ident),*$(,)?) => {
33
34 pub enum NativeRtInstance {
35 $($(#[cfg($meta)] #[cfg_attr(docsrs, doc(cfg($meta)))])* $name(impls::$mod::Runtime)),*
36 }
37
38 impl NativeRtInstance {
39 fn register_file(&self, file: std::fs::File) -> NativeFile {
40 match self {
41 $($(#[cfg($meta)])* Self::$name(v) => NativeFile::$name(v.register_file(file))),*
42 }
43 }
44
45 pub fn register_stream(&self, stream: TcpStream) -> NativeTcpStream {
51 match self {
52 $($(#[cfg($meta)])* Self::$name(v) => NativeTcpStream::$name(v.register_stream(stream))),*
53 }
54 }
55
56 fn get_map_options(&self) -> fn(fs::OpenOptions) -> fs::OpenOptions {
57 match self {
58 $($(#[cfg($meta)])* Self::$name(_) => impls::$mod::map_options),*
59 }
60 }
61
62 pub fn cancel_all_ops(&self) {
63 match self {
64 $($(#[cfg($meta)])* Self::$name(v) => v.cancel_all_ops()),*
65 }
66 }
67 }
68
69 impl core::fmt::Debug for NativeRtInstance {
70 fn fmt(&self, f: &mut core::fmt::Formatter) -> core::fmt::Result {
71 match self {
72 $($(#[cfg($meta)])* Self::$name(_) => write!(f, stringify!(NativeRt::$name))),*
73 }
74 }
75 }
76
77 impl IoBackend for NativeRtInstance {
78 type Backend = DynBackend;
79
80 fn polling_handle(&self) -> Option<PollingHandle> {
81 match self {
82 $($(#[cfg($meta)])* Self::$name(v) => v.polling_handle()),*
83 }
84 }
85
86 fn get_backend(&self) -> BackendHandle<Self::Backend> {
87 match self {
88 $($(#[cfg($meta)])* Self::$name(v) => v.get_backend()),*
89 }
90 }
91 }
92
93 $(#[cfg_attr(all($($meta)*), doc = concat!("- ", stringify!($mod)))])*
99 $(#[doc = concat!("- `", stringify!($mod), "`")])*
103 #[derive(Default)]
106 pub struct NativeRtBuilder {
107 $($(#[cfg($meta)])* $mod: bool),*
108 }
109
110 impl NativeRtBuilder {
111 pub fn all_backends() -> Self {
113 Self {
114 $($(#[cfg($meta)])* $mod: true),*
115 }
116 }
117
118 pub fn env_backends() -> Self {
125 match std::env::var("MFIO_FS_BACKENDS") {
126 Ok(val) => {
127 let vals = val.split(',').collect::<Vec<_>>();
128 Self {
129 $($(#[cfg($meta)])* $mod: vals.contains(&stringify!($mod))),*
130 }
131 }
132 Err(_) => {
133 Self::all_backends()
134 }
135 }
136 }
137
138 pub fn enable_all(self) -> Self {
139 let _ = self;
140 Self::all_backends()
141 }
142
143 $($(#[cfg($meta)] #[cfg_attr(docsrs, doc(cfg($meta)))])*
144 #[doc = concat!("Enables the ", stringify!($mod), " backend.")]
145 pub fn $mod(self, $mod: bool) -> Self {
146 Self {
147 $mod,
148 ..self
149 }
150 })*
151
152 pub fn build(self) -> mfio::error::Result<NativeRt> {
153 $($(#[cfg($meta)])* if self.$mod {
154 if let Ok(v) = impls::$mod::Runtime::try_new() {
155 return Ok(NativeRtInstance::$name(v).into());
156 }
157 })*
158
159 Err(Error {
160 code: Code::from_http(501).unwrap(),
161 subject: Subject::Backend,
162 state: State::Unsupported,
163 location: Location::Filesystem,
164 })
165 }
166
167 pub fn build_each(self) -> Vec<(&'static str, mfio::error::Result<NativeRt>)> {
168 let mut ret = vec![];
169
170 $($(#[cfg($meta)])* if self.$mod {
171 ret.push((
172 stringify!($mod),
173 impls::$mod::Runtime::try_new()
174 .map_err(|e| e.into())
175 .map(|v| NativeRtInstance::$name(v).into())
176 ));
177 })*
178
179 ret
180 }
181 }
182
183 pub enum NativeFile {
184 $($(#[cfg($meta)])* $name(impls::$mod::FileWrapper)),*
185 }
186
187 impl PacketIo<Write, u64> for NativeFile {
188 fn send_io(&self, param: u64, view: BoundPacketView<Write>) {
189 match self {
190 $($(#[cfg($meta)])* Self::$name(v) => v.send_io(param, view)),*
191 }
192 }
193 }
194
195 impl PacketIo<Read, u64> for NativeFile {
196 fn send_io(&self, param: u64, view: BoundPacketView<Read>) {
197 match self {
198 $($(#[cfg($meta)])* Self::$name(v) => v.send_io(param, view)),*
199 }
200 }
201 }
202
203 pub enum NativeTcpStream {
204 $($(#[cfg($meta)])* $name(impls::$mod::TcpStream)),*
205 }
206
207 impl Drop for NativeTcpStream {
208 fn drop(&mut self) {
209 log::trace!("Drop stream");
210 }
211 }
212
213 impl PacketIo<Write, NoPos> for NativeTcpStream {
214 fn send_io(&self, param: NoPos, view: BoundPacketView<Write>) {
215 match self {
216 $($(#[cfg($meta)])* Self::$name(v) => v.send_io(param, view)),*
217 }
218 }
219 }
220
221 impl PacketIo<Read, NoPos> for NativeTcpStream {
222 fn send_io(&self, param: NoPos, view: BoundPacketView<Read>) {
223 match self {
224 $($(#[cfg($meta)])* Self::$name(v) => v.send_io(param, view)),*
225 }
226 }
227 }
228
229 impl TcpStreamHandle for NativeTcpStream {
230 fn local_addr(&self) -> MfioResult<SocketAddr> {
231 match self {
232 $($(#[cfg($meta)])* Self::$name(v) => v.local_addr()),*
233 }
234 }
235
236 fn peer_addr(&self) -> MfioResult<SocketAddr> {
237 match self {
238 $($(#[cfg($meta)])* Self::$name(v) => v.peer_addr()),*
239 }
240 }
241
242 fn shutdown(&self, how: Shutdown) -> MfioResult<()> {
243 match self {
244 $($(#[cfg($meta)])* Self::$name(v) => v.shutdown(how)),*
245 }
246 }
247 }
248
249 pub enum NativeTcpConnectFuture<'a, A: ToSocketAddrs + 'a> {
250 $($(#[cfg($meta)])* $name(impls::$mod::TcpConnectFuture<'a, A>)),*
251 }
252
253 impl<'a, A: ToSocketAddrs + Send> Future for NativeTcpConnectFuture<'a, A> {
254 type Output = MfioResult<NativeTcpStream>;
255
256 fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
257 let this = unsafe { self.get_unchecked_mut() };
259 match this {
260 $($(#[cfg($meta)])* Self::$name(v) => {
261 if let Poll::Ready(v) = unsafe { Pin::new_unchecked(v).poll(cx) } {
262 Poll::Ready(v.map(NativeTcpStream::$name))
263 } else {
264 Poll::Pending
265 }
266 }),*
267 }
268 }
269 }
270
271 impl Tcp for NativeRtInstance {
272 type StreamHandle = NativeTcpStream;
273 type ListenerHandle = NativeTcpListener;
274 type ConnectFuture<'a, A: ToSocketAddrs + Send + 'a> = NativeTcpConnectFuture<'a, A>;
275 type BindFuture<'a, A: ToSocketAddrs + Send + 'a> = core::future::Ready<MfioResult<NativeTcpListener>>;
276
277 fn connect<'a, A: ToSocketAddrs + Send + 'a>(
278 &'a self,
279 addrs: A,
280 ) -> Self::ConnectFuture<'a, A> {
281 match self {
282 $($(#[cfg($meta)])* Self::$name(v) => NativeTcpConnectFuture::$name(v.tcp_connect(addrs))),*
283 }
284 }
285
286 fn bind<'a, A: ToSocketAddrs + Send + 'a>(&'a self, addrs: A) -> Self::BindFuture<'a, A> {
287 let listener = std::net::TcpListener::bind(addrs);
288 core::future::ready(
289 listener.map(|l| match self {
290 $($(#[cfg($meta)])* Self::$name(v) => NativeTcpListener::$name(v.register_listener(l))),*
291 }).map_err(from_io_error)
292 )
293 }
294 }
295
296 pub enum NativeTcpListener {
297 $($(#[cfg($meta)])* $name(impls::$mod::TcpListener)),*
298 }
299
300 impl Stream for NativeTcpListener {
301 type Item = (NativeTcpStream, SocketAddr);
302
303 fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
304 let this = unsafe { self.get_unchecked_mut() };
305 match this {
306 $($(#[cfg($meta)])* Self::$name(v) => {
307 if let Poll::Ready(v) = unsafe { Pin::new_unchecked(v).poll_next(cx) } {
308 Poll::Ready(v.map(|(a, b)| (NativeTcpStream::$name(a), b)))
309 } else {
310 Poll::Pending
311 }
312 }),*
313 }
314 }
315 }
316
317 impl TcpListenerHandle for NativeTcpListener {
318 type StreamHandle = NativeTcpStream;
319
320 fn local_addr(&self) -> MfioResult<SocketAddr> {
321 match self {
322 $($(#[cfg($meta)])* Self::$name(v) => v.local_addr()),*
323 }
324 }
325 }
326 }
327}
328
329fs_dispatch! {
330 #[cfg(all(not(miri), target_os = "linux", feature = "io-uring"))]
331 IoUring => io_uring,
332 #[cfg(all(not(miri), target_os = "windows", feature = "iocp"))]
333 Iocp => iocp,
334 #[cfg(all(not(miri), unix, feature = "mio"))]
335 Mio => mio,
336 Default => thread,
337}
338
339const _: () = {
340 const fn verify_send<T: Send>() {}
341 const fn verify_sync<T: Sync>() {}
342
343 verify_send::<NativeRtInstance>();
344 verify_send::<NativeFile>();
345 verify_send::<NativeTcpStream>();
346 verify_send::<NativeTcpListener>();
347};
348
349#[derive(Debug)]
440pub struct NativeRt {
441 cwd: NativeRtDir,
442}
443
444impl IoBackend for NativeRt {
445 type Backend = <NativeRtInstance as IoBackend>::Backend;
446
447 fn polling_handle(&self) -> Option<PollingHandle> {
448 self.cwd.instance.polling_handle()
449 }
450
451 fn get_backend(&self) -> BackendHandle<Self::Backend> {
452 self.cwd.instance.get_backend()
453 }
454}
455
456impl Tcp for NativeRt {
457 type StreamHandle = <NativeRtInstance as Tcp>::StreamHandle;
458 type ListenerHandle = <NativeRtInstance as Tcp>::ListenerHandle;
459 type ConnectFuture<'a, A: ToSocketAddrs + Send + 'a> =
460 <NativeRtInstance as Tcp>::ConnectFuture<'a, A>;
461 type BindFuture<'a, A: ToSocketAddrs + Send + 'a> =
462 <NativeRtInstance as Tcp>::BindFuture<'a, A>;
463
464 fn connect<'a, A: ToSocketAddrs + Send + 'a>(&'a self, addrs: A) -> Self::ConnectFuture<'a, A> {
465 self.cwd.instance.connect(addrs)
466 }
467
468 fn bind<'a, A: ToSocketAddrs + Send + 'a>(&'a self, addrs: A) -> Self::BindFuture<'a, A> {
469 self.cwd.instance.bind(addrs)
470 }
471}
472
473impl Default for NativeRt {
474 fn default() -> Self {
475 NativeRtBuilder::env_backends()
476 .build()
477 .expect("Could not initialize any FS backend")
478 }
479}
480
481impl Fs for NativeRt {
482 type DirHandle<'a> = NativeRtDir;
483
484 fn current_dir(&self) -> &Self::DirHandle<'_> {
485 &self.cwd
486 }
487}
488
489impl NativeRt {
490 pub fn builder() -> NativeRtBuilder {
491 NativeRtBuilder::default()
492 }
493
494 pub fn instance(&self) -> &BaseArc<NativeRtInstance> {
495 &self.cwd.instance
496 }
497
498 pub fn run<'a, Func: FnOnce(&'a NativeRt) -> F, F: Future>(
499 &'a mut self,
500 func: Func,
501 ) -> F::Output {
502 self.block_on(func(self))
503 }
504
505 pub fn register_stream(&self, stream: TcpStream) -> NativeTcpStream {
507 self.cwd.instance.register_stream(stream)
508 }
509
510 pub fn cancel_all_ops(&self) {
511 self.cwd.instance.cancel_all_ops()
512 }
513
514 pub fn set_cwd(&mut self, dir: PathBuf) {
515 self.cwd.dir = Some(dir);
516 }
517}
518
519impl From<NativeRtInstance> for NativeRt {
520 fn from(instance: NativeRtInstance) -> Self {
521 let (ops, rx) = flume::bounded(16);
522
523 std::thread::spawn(move || rx.into_iter().for_each(RtBgOp::process));
525
526 Self {
527 cwd: NativeRtDir {
528 dir: None,
529 ops,
530 instance: BaseArc::from(instance),
531 },
532 }
533 }
534}
535
536impl NativeRtDir {
537 fn join_path<P: AsRef<Path>>(&self, other: P) -> std::io::Result<PathBuf> {
538 if other.as_ref().is_absolute() {
539 Ok(other.as_ref().into())
540 } else {
541 self.get_path().map(|v| v.join(other))
542 }
543 }
544 fn get_path(&self) -> std::io::Result<PathBuf> {
545 if let Some(dir) = self.dir.clone() {
546 Ok(dir)
547 } else {
548 std::env::current_dir()
549 }
550 }
551}
552
553impl DirHandle for NativeRtDir {
554 type FileHandle = Seekable<NativeFile, u64>;
555 type OpenFileFuture<'a> = OpenFileFuture<'a>;
556 type PathFuture<'a> = Ready<MfioResult<PathBuf>>;
557 type OpenDirFuture<'a> = Ready<MfioResult<Self>>;
558 type ReadDir<'a> = ReadDir;
559 type ReadDirFuture<'a> = Ready<MfioResult<ReadDir>>;
560 type MetadataFuture<'a> = MetadataFuture;
561 type OpFuture<'a> = OpFuture;
562
563 fn path(&self) -> Self::PathFuture<'_> {
565 ready(self.get_path().map_err(from_io_error))
566 }
567
568 fn read_dir(&self) -> Self::ReadDirFuture<'_> {
576 ready(
577 self.get_path()
578 .and_then(|v| v.read_dir())
579 .map_err(from_io_error)
580 .map(|instance| ReadDir { instance }),
581 )
582 }
583
584 fn open_file<'a, P: AsRef<Path> + ?Sized>(
589 &'a self,
590 path: &'a P,
591 options: OpenOptions,
592 ) -> Self::OpenFileFuture<'a> {
593 let (tx, rx) = oneshot::channel();
594
595 if let Ok(path) = self.join_path(path) {
596 let _ = self.ops.send(RtBgOp::OpenFile {
597 path,
598 options,
599 map_options: self.instance.get_map_options(),
600 completion: tx,
601 });
602 } else {
603 let _ = tx.send(Err(mferr!(Directory, Unavailable, Filesystem)));
604 }
605
606 OpenFileFuture {
607 rt: self,
608 completion: rx,
609 }
610 }
611
612 fn open_dir<'a, P: AsRef<Path> + ?Sized>(&'a self, path: &'a P) -> Self::OpenDirFuture<'a> {
617 let dir = self.join_path(path).map_err(from_io_error).and_then(|v| {
618 if v.is_dir() {
619 Ok(Self {
620 dir: Some(v),
621 ops: self.ops.clone(),
622 instance: self.instance.clone(),
623 })
624 } else if v.exists() {
625 Err(mferr!(Path, Invalid, Filesystem))
626 } else {
627 Err(mferr!(Path, NotFound, Filesystem))
628 }
629 });
630
631 ready(dir)
632 }
633
634 fn metadata<'a, P: AsRef<Path> + ?Sized>(&'a self, path: &'a P) -> Self::MetadataFuture<'a> {
635 let (tx, rx) = oneshot::channel();
636
637 if let Ok(path) = self.join_path(path) {
638 let _ = self.ops.send(RtBgOp::Metadata {
639 path,
640 completion: tx,
641 });
642 } else {
643 let _ = tx.send(Err(mferr!(Directory, Unavailable, Filesystem)));
644 }
645
646 MetadataFuture { completion: rx }
647 }
648
649 fn do_op<'a, P: AsRef<Path> + ?Sized>(&'a self, operation: DirOp<&'a P>) -> Self::OpFuture<'a> {
653 let (tx, rx) = oneshot::channel();
654
655 let _ = self.ops.send(RtBgOp::DirOp {
656 op: operation.as_path().into_pathbuf(),
657 completion: tx,
658 });
659
660 OpFuture { completion: rx }
661 }
662}
663
664pub struct ReadDir {
665 instance: fs::ReadDir,
666}
667
668impl Stream for ReadDir {
669 type Item = MfioResult<DirEntry>;
670
671 fn poll_next(self: Pin<&mut Self>, _: &mut Context) -> Poll<Option<Self::Item>> {
672 let this = unsafe { self.get_unchecked_mut() };
673
674 Poll::Ready(
675 this.instance
676 .next()
677 .map(|v| v.map(From::from).map_err(from_io_error)),
678 )
679 }
680}
681
682pub struct MetadataFuture {
683 completion: oneshot::Receiver<MfioResult<Metadata>>,
684}
685
686impl Future for MetadataFuture {
687 type Output = MfioResult<Metadata>;
688
689 fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
690 let this = unsafe { self.get_unchecked_mut() };
691 let completion = unsafe { Pin::new_unchecked(&mut this.completion) };
692 match completion.poll(cx) {
693 Poll::Ready(Ok(res)) => Poll::Ready(res),
694 Poll::Ready(Err(_)) => Poll::Ready(Err(mferr!(Output, BrokenPipe, Filesystem))),
695 Poll::Pending => Poll::Pending,
696 }
697 }
698}
699
700pub struct OpenFileFuture<'a> {
701 rt: &'a NativeRtDir,
702 completion: oneshot::Receiver<MfioResult<std::fs::File>>,
703}
704
705impl<'a> Future for OpenFileFuture<'a> {
706 type Output = MfioResult<<NativeRtDir as DirHandle>::FileHandle>;
707
708 fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
709 let this = unsafe { self.get_unchecked_mut() };
710 let completion = unsafe { Pin::new_unchecked(&mut this.completion) };
711 match completion.poll(cx) {
712 Poll::Ready(Ok(file)) => {
713 Poll::Ready(file.map(|f| this.rt.instance.register_file(f).into()))
714 }
715 Poll::Ready(Err(_)) => Poll::Ready(Err(mferr!(Output, BrokenPipe, Filesystem))),
716 Poll::Pending => Poll::Pending,
717 }
718 }
719}
720
721pub struct OpFuture {
722 completion: oneshot::Receiver<MfioResult<()>>,
723}
724
725impl Future for OpFuture {
726 type Output = MfioResult<()>;
727
728 fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
729 let this = unsafe { self.get_unchecked_mut() };
730 let completion = unsafe { Pin::new_unchecked(&mut this.completion) };
731 match completion.poll(cx) {
732 Poll::Ready(Ok(res)) => Poll::Ready(res),
733 Poll::Ready(Err(_)) => Poll::Ready(Err(mferr!(Output, BrokenPipe, Filesystem))),
734 Poll::Pending => Poll::Pending,
735 }
736 }
737}
738
739enum RtBgOp {
740 OpenFile {
741 path: PathBuf,
742 options: OpenOptions,
743 map_options: fn(std::fs::OpenOptions) -> std::fs::OpenOptions,
744 completion: oneshot::Sender<MfioResult<std::fs::File>>,
745 },
746 DirOp {
747 op: DirOp<PathBuf>,
748 completion: oneshot::Sender<MfioResult<()>>,
749 },
750 Metadata {
751 path: PathBuf,
752 completion: oneshot::Sender<MfioResult<Metadata>>,
753 },
754}
755
756impl RtBgOp {
757 fn process(self) {
758 match self {
759 Self::OpenFile {
760 path,
761 options,
762 map_options,
763 completion,
764 } => {
765 let mut fs_options = fs::OpenOptions::new();
766
767 fs_options
768 .read(options.read)
769 .write(options.write)
770 .create(options.create)
771 .create_new(options.create_new)
772 .truncate(options.truncate);
773
774 let file = map_options(fs_options)
775 .open(path)
776 .map_err(crate::util::from_io_error);
777
778 let _ = completion.send(file);
779 }
780 Self::DirOp { op, completion } => {
781 let ret = match op {
782 DirOp::SetPermissions { .. } => {
783 Err(std::io::ErrorKind::Unsupported.into())
787 }
788 DirOp::RemoveDir { path } => fs::remove_dir(path),
789 DirOp::RemoveDirAll { path } => fs::remove_dir_all(path),
790 DirOp::CreateDir { path } => fs::create_dir(path),
791 DirOp::CreateDirAll { path } => fs::create_dir_all(path),
792 DirOp::RemoveFile { path } => fs::remove_file(path),
793 DirOp::Rename { from, to } => fs::rename(from, to),
794 DirOp::Copy { from, to } => fs::copy(from, to).map(|_| ()),
795 DirOp::HardLink { from, to } => fs::hard_link(from, to),
796 };
797 let _ = completion.send(ret.map_err(from_io_error));
798 }
799 Self::Metadata { path, completion } => {
800 let to_epoch_duration = |v: std::time::SystemTime| {
801 v.duration_since(std::time::SystemTime::UNIX_EPOCH).ok()
802 };
803
804 let res = path
805 .metadata()
806 .map(|m| Metadata {
807 permissions: m.permissions().into(),
808 len: m.len(),
809 modified: m.modified().ok().and_then(to_epoch_duration),
810 accessed: m.accessed().ok().and_then(to_epoch_duration),
811 created: m.created().ok().and_then(to_epoch_duration),
812 })
813 .map_err(from_io_error);
814 let _ = completion.send(res);
815 }
816 }
817 }
818}
819
820#[derive(Debug)]
821pub struct NativeRtDir {
822 dir: Option<PathBuf>,
823 ops: flume::Sender<RtBgOp>,
824 instance: BaseArc<NativeRtInstance>,
825}
826
827#[cfg(test)]
828mod tests {
829 use super::*;
834 use core::future::poll_fn;
835 use core::mem::MaybeUninit;
836 use core::task::Poll;
837 use mfio::stdeq::*;
838 use mfio::traits::*;
839 use std::fs::write;
840 use std::io::Seek;
841
842 #[test]
843 fn simple_io() {
844 let test_string = "Test test 42";
847 let mut filepath = std::env::temp_dir();
848 filepath.push("mfio-fs-test-simple-io");
849
850 write(&filepath, test_string.as_bytes()).unwrap();
851
852 for (backend, fs) in NativeRtBuilder::all_backends().build_each() {
853 println!("{backend}");
854 fs.unwrap().run(|fs: &NativeRt| async {
855 let fh = fs
856 .open(&filepath, OpenOptions::new().read(true))
857 .await
858 .unwrap();
859
860 let mut d = [MaybeUninit::uninit(); 8];
861
862 fh.read_all(0, &mut d[..]).await.unwrap();
863 });
864 }
865 }
866
867 #[test]
868 fn read_all() {
869 let test_string = "Test test 42";
872 let mut filepath = std::env::temp_dir();
873 filepath.push("mfio-fs-test-read-all");
874
875 write(&filepath, test_string.as_bytes()).unwrap();
876
877 for (backend, fs) in NativeRtBuilder::all_backends().build_each() {
878 println!("{backend}");
879 fs.unwrap().run(|fs| async {
880 let fh = fs
881 .open(&filepath, OpenOptions::new().read(true))
882 .await
883 .unwrap();
884
885 let mut d = [MaybeUninit::uninit(); 8];
886
887 fh.read_all(0, &mut d[..]).await.unwrap();
888 });
889 }
890 }
891
892 #[test]
893 fn write_test() {
894 let mut test_data = vec![];
895
896 for i in 0u8..128 {
897 test_data.extend(i.to_ne_bytes());
898 }
899
900 let mut filepath = std::env::temp_dir();
901 filepath.push("mfio-fs-test-write");
902
903 for (backend, fs) in NativeRtBuilder::all_backends().build_each() {
904 println!("{backend}");
905 fs.unwrap().run(|fs| async {
906 let mut fh = fs
907 .open(
908 &filepath,
909 OpenOptions::new()
910 .read(true)
911 .write(true)
912 .create(true)
913 .truncate(true),
914 )
915 .await
916 .unwrap();
917
918 AsyncWrite::write(&fh, &test_data).await.unwrap();
919
920 assert_eq!(test_data.len(), fh.get_pos() as usize);
921
922 fh.rewind().unwrap();
923
924 let mut output = vec![];
926 AsyncRead::read_to_end(&fh, &mut output).await.unwrap();
927
928 assert_eq!(test_data.len(), fh.get_pos() as usize);
929 assert_eq!(test_data, output);
930
931 core::mem::drop(fh);
932 });
933 }
934 }
935
936 #[test]
937 fn read_to_end() {
938 let test_string = "Test test 42";
939 let mut filepath = std::env::temp_dir();
940 filepath.push("mfio-fs-test-read-to-end");
941
942 write(&filepath, test_string.as_bytes()).unwrap();
944
945 for (backend, fs) in NativeRtBuilder::all_backends().build_each() {
946 println!("{backend}");
947 fs.unwrap().run(|fs| async {
948 let fh = fs
949 .open(&filepath, OpenOptions::new().read(true))
950 .await
951 .unwrap();
952
953 let mut output = vec![];
954 AsyncRead::read_to_end(&fh, &mut output).await.unwrap();
955
956 assert_eq!(test_string.len(), fh.get_pos() as usize);
957 assert_eq!(test_string.as_bytes(), output);
958 });
959 }
960 }
961
962 #[test]
963 fn wake_test_single() {
964 for (backend, fs) in NativeRtBuilder::all_backends().build_each() {
965 println!("{backend}");
966 fs.unwrap().run(|_| async move {
967 for i in 0..2 {
968 println!("{i}");
969 let mut signaled = false;
970 poll_fn(|cx| {
971 println!("{signaled}");
972 if signaled {
973 Poll::Ready(())
974 } else {
975 signaled = true;
976 let waker = cx.waker().clone();
977 std::thread::spawn(|| {
978 std::thread::sleep(std::time::Duration::from_millis(200));
979 println!("WAKE");
980 waker.wake();
981 });
982 Poll::Pending
983 }
984 })
985 .await;
986 }
987 });
988 }
989 }
990
991 #[test]
992 fn wake_test_dropped() {
993 for (backend, fs) in NativeRtBuilder::all_backends().build_each() {
994 println!("{backend}");
995 let (tx1, rx1) = std::sync::mpsc::channel();
996 let rx1 = BaseArc::new(parking_lot::Mutex::new(rx1));
997 let (tx2, rx2) = std::sync::mpsc::channel();
998
999 {
1000 fs.unwrap().run(|_| async move {
1001 poll_fn(|cx| {
1002 let tx2 = tx2.clone();
1003 let rx1 = rx1.clone();
1004 let waker = cx.waker().clone();
1005 std::thread::spawn(move || {
1006 rx1.lock().recv().unwrap();
1007 println!("WAKE");
1008 waker.wake();
1009 println!("Woke");
1010 tx2.send(()).unwrap();
1011 });
1012 Poll::Ready(())
1013 })
1014 .await;
1015 });
1016 }
1017
1018 tx1.send(()).unwrap();
1019 rx2.recv().unwrap();
1020 }
1021 }
1022
1023 #[test]
1024 fn wake_test_lot() {
1025 for (backend, fs) in NativeRtBuilder::all_backends().build_each() {
1026 println!("{backend}");
1027 fs.unwrap().run(|_| async move {
1028 #[cfg(miri)]
1029 let wakes = 20;
1030 #[cfg(not(miri))]
1031 let wakes = 2000;
1032 for i in 0..wakes {
1033 println!("{i}");
1034 let mut signaled = false;
1035 poll_fn(|cx| {
1036 println!("{signaled}");
1037 if signaled {
1038 Poll::Ready(())
1039 } else {
1040 signaled = true;
1041 let waker = cx.waker().clone();
1042 std::thread::spawn(|| {
1043 println!("WAKE");
1044 waker.wake();
1045 println!("Woke");
1046 });
1047 Poll::Pending
1048 }
1049 })
1050 .await;
1051 }
1052 });
1053 }
1054 }
1055
1056 #[test]
1057 fn self_wake() {
1058 for (backend, fs) in NativeRtBuilder::all_backends().build_each() {
1060 println!("{backend}");
1061 fs.unwrap().run(|_| async move {
1062 #[cfg(miri)]
1063 let wakes = 20;
1064 #[cfg(not(miri))]
1065 let wakes = 2000;
1066 for i in 0..wakes {
1067 println!("{i}");
1068 let mut signaled = false;
1069 poll_fn(|cx| {
1070 println!("{signaled}");
1071 if signaled {
1072 Poll::Ready(())
1073 } else {
1074 signaled = true;
1075 cx.waker().wake_by_ref();
1076 Poll::Pending
1077 }
1078 })
1079 .await;
1080 }
1081 });
1082 }
1083 }
1084
1085 #[test]
1086 fn self_no_doublewake() {
1087 for (backend, fs) in NativeRtBuilder::all_backends().build_each() {
1089 println!("{backend}");
1090
1091 let (tx, rx) = std::sync::mpsc::channel();
1092
1093 fs.unwrap().run(|_| async move {
1094 let mut signaled = 0;
1095 poll_fn(|cx| {
1096 println!("{signaled}");
1097 if signaled > 1 {
1098 Poll::Ready(())
1099 } else {
1100 signaled += 1;
1101 if signaled == 1 {
1102 cx.waker().wake_by_ref();
1103 } else {
1104 let waker = cx.waker().clone();
1105 let tx = tx.clone();
1106 std::thread::spawn(move || {
1107 std::thread::sleep(std::time::Duration::from_millis(200));
1108 println!("WAKE");
1109 tx.send(()).unwrap();
1110 waker.wake();
1111 });
1112 }
1113 Poll::Pending
1114 }
1115 })
1116 .await;
1117 });
1118
1119 assert_eq!(Ok(()), rx.try_recv());
1120 }
1121 }
1122}
1123
1124#[cfg(test)]
1125mod suite_tests {
1126 use super::*;
1127 test_suite!(tests_default, |test_name, closure| {
1128 let _ = ::env_logger::builder().is_test(true).try_init();
1129 let mut rt = crate::NativeRt::default();
1130 let rt = staticify(&mut rt);
1131 let dir = TempDir::new(test_name).unwrap();
1132 rt.set_cwd(dir.path().to_path_buf());
1133 rt.run(move |rt| {
1134 let run = TestRun::new(rt, dir);
1135 closure(run)
1136 });
1137 });
1138
1139 test_suite!(tests_all, |test_name, closure| {
1140 let _ = ::env_logger::builder().is_test(true).try_init();
1141 for (name, rt) in crate::NativeRt::builder().enable_all().build_each() {
1142 println!("{name}");
1143 if let Ok(mut rt) = rt {
1144 let rt = staticify(&mut rt);
1145 let dir = TempDir::new(test_name).unwrap();
1146 rt.set_cwd(dir.path().to_path_buf());
1147 rt.run(move |rt| {
1148 let run = TestRun::new(rt, dir);
1149 closure(run)
1150 });
1151 }
1152 }
1153 });
1154
1155 net_test_suite!(net_tests_default, |closure| {
1156 let _ = ::env_logger::builder().is_test(true).try_init();
1157 let mut rt = crate::NativeRt::default();
1158 let rt = staticify(&mut rt);
1159 rt.run(closure);
1160 });
1161
1162 net_test_suite!(net_tests_all, |closure| {
1163 let _ = ::env_logger::builder().is_test(true).try_init();
1164 for (name, rt) in crate::NativeRt::builder().enable_all().build_each() {
1165 println!("{name}");
1166 if let Ok(mut rt) = rt {
1167 let rt = staticify(&mut rt);
1168 rt.run(closure);
1169 }
1170 }
1171 });
1172
1173 #[cfg(all(unix, not(miri)))]
1175 mod smol {
1176 use super::*;
1177
1178 test_suite!(tests_default, |test_name, closure| {
1179 let _ = ::env_logger::builder().is_test(true).try_init();
1180
1181 smol::block_on(async {
1182 use mfio::backend::{integrations::async_io::AsyncIo, *};
1183
1184 let mut rt = crate::NativeRt::default();
1185 let rt = staticify(&mut rt);
1186 let dir = TempDir::new(test_name).unwrap();
1187 rt.set_cwd(dir.path().to_path_buf());
1188
1189 AsyncIo::run_with_mut(rt, move |rt| {
1190 let run = TestRun::new(rt, dir);
1191 closure(run)
1192 })
1193 .await;
1194 });
1195 });
1196
1197 test_suite!(tests_all, |test_name, closure| {
1198 let _ = ::env_logger::builder().is_test(true).try_init();
1199
1200 smol::block_on(async {
1201 use mfio::backend::{integrations::async_io::AsyncIo, *};
1202
1203 for (name, rt) in crate::NativeRt::builder().enable_all().build_each() {
1204 println!("{name}");
1205 if let Ok(mut rt) = rt {
1206 let rt = staticify(&mut rt);
1207 let dir = TempDir::new(test_name).unwrap();
1208 rt.set_cwd(dir.path().to_path_buf());
1209 AsyncIo::run_with_mut(rt, move |rt| {
1210 let run = TestRun::new(rt, dir);
1211 closure(run)
1212 })
1213 .await;
1214 }
1215 }
1216 });
1217 });
1218
1219 net_test_suite!(net_tests_default, |closure| {
1220 let _ = ::env_logger::builder().is_test(true).try_init();
1221
1222 smol::block_on(async {
1223 use mfio::backend::{integrations::async_io::AsyncIo, *};
1224 let mut rt = crate::NativeRt::default();
1225 let rt = staticify(&mut rt);
1226 AsyncIo::run_with_mut(rt, closure).await;
1227 });
1228 });
1229
1230 net_test_suite!(net_tests_all, |closure| {
1231 let _ = ::env_logger::builder().is_test(true).try_init();
1232 smol::block_on(async {
1233 use mfio::backend::{integrations::async_io::AsyncIo, *};
1234 for (name, rt) in crate::NativeRt::builder().enable_all().build_each() {
1235 println!("{name}");
1236 if let Ok(mut rt) = rt {
1237 let rt = staticify(&mut rt);
1238 AsyncIo::run_with_mut(rt, closure).await;
1239 }
1240 }
1241 });
1242 });
1243 }
1244
1245 #[cfg(all(unix, not(miri)))]
1246 mod tokio {
1247 use super::*;
1248
1249 test_suite!(tests_default, |test_name, closure| {
1250 let _ = ::env_logger::builder().is_test(true).try_init();
1251
1252 tokio::runtime::Runtime::new().unwrap().block_on(async {
1253 use mfio::backend::{integrations::tokio::Tokio, *};
1254
1255 let mut rt = crate::NativeRt::default();
1256 let rt = staticify(&mut rt);
1257 let dir = TempDir::new(test_name).unwrap();
1258 rt.set_cwd(dir.path().to_path_buf());
1259
1260 Tokio::run_with_mut(rt, move |rt| {
1261 let run = TestRun::new(rt, dir);
1262 closure(run)
1263 })
1264 .await;
1265 });
1266 });
1267
1268 test_suite!(tests_all, |test_name, closure| {
1269 let _ = ::env_logger::builder().is_test(true).try_init();
1270
1271 tokio::runtime::Runtime::new().unwrap().block_on(async {
1272 use mfio::backend::{integrations::tokio::Tokio, *};
1273
1274 for (name, rt) in crate::NativeRt::builder().enable_all().build_each() {
1275 println!("{name}");
1276 if let Ok(mut rt) = rt {
1277 let rt = staticify(&mut rt);
1278 let dir = TempDir::new(test_name).unwrap();
1279 rt.set_cwd(dir.path().to_path_buf());
1280 Tokio::run_with_mut(rt, move |rt| {
1281 let run = TestRun::new(rt, dir);
1282 closure(run)
1283 })
1284 .await;
1285 }
1286 }
1287 });
1288 });
1289
1290 net_test_suite!(net_tests_default, |closure| {
1291 let _ = ::env_logger::builder().is_test(true).try_init();
1292
1293 tokio::runtime::Runtime::new().unwrap().block_on(async {
1294 use mfio::backend::{integrations::tokio::Tokio, *};
1295 let mut rt = crate::NativeRt::default();
1296 let rt = staticify(&mut rt);
1297 Tokio::run_with_mut(rt, closure).await;
1298 });
1299 });
1300
1301 net_test_suite!(net_tests_all, |closure| {
1302 let _ = ::env_logger::builder().is_test(true).try_init();
1303 tokio::runtime::Runtime::new().unwrap().block_on(async {
1304 use mfio::backend::{integrations::tokio::Tokio, *};
1305 for (name, rt) in crate::NativeRt::builder().enable_all().build_each() {
1306 println!("{name}");
1307 if let Ok(mut rt) = rt {
1308 let rt = staticify(&mut rt);
1309 Tokio::run_with_mut(rt, closure).await;
1310 }
1311 }
1312 });
1313 });
1314 }
1315}