mfio_netfs/net/
client.rs

1use std::collections::{BTreeMap, VecDeque};
2
3use core::future::poll_fn;
4use core::marker::PhantomData;
5use core::mem::MaybeUninit;
6use core::task::{Context, Poll, Waker};
7
8use mfio::backend::*;
9use mfio::error::Result;
10use mfio::io::*;
11use mfio::mferr;
12
13use mfio::tarc::{Arc, BaseArc};
14use mfio::traits::*;
15
16use parking_lot::Mutex;
17use slab::Slab;
18
19use super::{FsRequest, FsResponse, HeaderRouter, ReadDirResponse, Request, Response};
20use cglue::result::IntError;
21use futures::{future::FutureExt, pin_mut};
22use mfio::error::Error;
23use mfio_rt::{DirEntry, DirHandle, DirOp, Fs, Metadata, OpenOptions};
24
25use core::future::Future;
26use core::pin::Pin;
27use mfio::stdeq::Seekable;
28use std::path::Path;
29
30use std::io::{self /* , Read as _, Write as _ */};
31use std::net::{SocketAddr, TcpStream};
32use std::path::PathBuf;
33
34use flume::{r#async::SendFut, Sender};
35use futures::Stream;
36use log::*;
37use tracing::instrument::Instrument;
38
39struct FileOperation {
40    file_id: u32,
41    pos: u64,
42    ty: OpType,
43}
44
45impl FileOperation {
46    fn write_msg<Io: PacketIo<Read, NoPos>>(
47        self,
48        router: &HeaderRouter<'_, Request, Read, Io>,
49        packet_id: u32,
50    ) -> Result<InFlightOpType> {
51        let Self { file_id, pos, ty } = self;
52        ty.write_msg(router, file_id, pos, packet_id)
53    }
54}
55
56trait IntoOp: PacketPerms {
57    fn into_op(pkt: BoundPacketView<Self>) -> OpType;
58}
59
60impl IntoOp for Read {
61    fn into_op(pkt: BoundPacketView<Self>) -> OpType {
62        OpType::Write(pkt)
63    }
64}
65
66impl IntoOp for Write {
67    fn into_op(pkt: BoundPacketView<Self>) -> OpType {
68        OpType::Read(pkt)
69    }
70}
71
72struct ShardedPacket<T: Splittable<u64>> {
73    shards: BTreeMap<u64, T>,
74}
75
76impl<T: Splittable<u64>> From<T> for ShardedPacket<T> {
77    fn from(pkt: T) -> Self {
78        Self {
79            shards: std::iter::once((0, pkt)).collect(),
80        }
81    }
82}
83
84impl<T: Splittable<u64>> ShardedPacket<T> {
85    fn is_empty(&self) -> bool {
86        // TODO: do this or self.len() == 0?
87        self.shards.is_empty()
88    }
89
90    fn extract(&mut self, idx: u64, len: u64) -> Option<T> {
91        let (&shard_idx, _) = self.shards.range(..=idx).next_back()?;
92        let mut shard = self.shards.remove(&shard_idx)?;
93
94        if idx > shard_idx {
95            let (left, right) = shard.split_at(idx - shard_idx);
96            self.shards.insert(shard_idx, left);
97            shard = right;
98        }
99
100        if len < shard.len() {
101            let (left, right) = shard.split_at(len);
102            self.shards.insert(idx + len, right);
103            shard = left;
104        }
105
106        Some(shard)
107    }
108}
109
110enum InFlightOpType {
111    Reserved,
112    Read(ShardedPacket<BoundPacketView<Write>>),
113    Write(usize),
114}
115
116enum OpType {
117    Read(BoundPacketView<Write>),
118    Write(BoundPacketView<Read>),
119}
120
121impl OpType {
122    fn write_msg<Io: PacketIo<Read, NoPos>>(
123        self,
124        router: &HeaderRouter<'_, Request, Read, Io>,
125        file_id: u32,
126        pos: u64,
127        packet_id: u32,
128    ) -> Result<InFlightOpType> {
129        match self {
130            Self::Read(v) => {
131                router.send_hdr(|_| Request::Read {
132                    file_id,
133                    pos,
134                    packet_id: packet_id as _,
135                    len: v.len(),
136                });
137                Ok(InFlightOpType::Read(v.into()))
138            }
139            Self::Write(v) => {
140                let len = v.len();
141                let key = router.send_pkt(
142                    |_| Request::Write {
143                        file_id,
144                        pos,
145                        packet_id,
146                        len,
147                    },
148                    v,
149                );
150                Ok(InFlightOpType::Write(key))
151            }
152        }
153    }
154}
155
156#[derive(Debug)]
157enum FsRequestState {
158    Started {
159        req: FsRequest,
160        dir_id: u16,
161        waker: Option<Waker>,
162    },
163    Processing {
164        waker: Option<Waker>,
165    },
166    Complete {
167        resp: Option<FsResponse>,
168    },
169}
170
171struct NetFsState {
172    in_flight_ops: Slab<InFlightOpType>,
173    fs_reqs: Slab<FsRequestState>,
174    read_dir_streams: BTreeMap<u16, ReadDirStream>,
175}
176
177struct Senders {
178    ops: Sender<FileOperation>,
179    fs_reqs: Sender<u32>,
180    read_dir_reqs: Sender<u16>,
181    close_reqs: Sender<u32>,
182}
183
184pub struct NetworkFs {
185    backend: BackendContainer<DynBackend>,
186    cwd: NetworkFsDir,
187    fs: Arc<mfio_rt::NativeRt>,
188    cancel_ops_on_drop: bool,
189}
190
191impl Drop for NetworkFs {
192    fn drop(&mut self) {
193        if self.cancel_ops_on_drop {
194            self.fs.cancel_all_ops();
195        }
196    }
197}
198
199impl NetworkFs {
200    pub fn try_new(addr: SocketAddr) -> Result<Self> {
201        let fs = Arc::new(
202            mfio_rt::NativeRt::builder()
203                .thread(true)
204                .enable_all()
205                .build()
206                .unwrap(),
207        );
208        Self::with_fs(addr, fs, true)
209    }
210
211    /// Create a new network FS to the provided target.
212    ///
213    /// # Arguments
214    ///
215    /// - `addr` address to connect to.
216    /// - `fs` filesystem instance to use.
217    /// - `cancel_ops_on_drop` whether to issue global cancellation to all outstanding operations
218    ///   when drop is called. This is required when using `cfg(mfio_assume_linear_types)` in order
219    ///   to prevent panic upon dropping the network filesystem.
220    pub fn with_fs(
221        addr: SocketAddr,
222        fs: Arc<mfio_rt::NativeRt>,
223        cancel_ops_on_drop: bool,
224    ) -> Result<Self> {
225        let stream = TcpStream::connect(addr)?;
226
227        let stream = fs.register_stream(stream);
228
229        let state = BaseArc::new(Mutex::new(NetFsState {
230            in_flight_ops: Default::default(),
231            fs_reqs: Default::default(),
232            read_dir_streams: Default::default(),
233        }));
234
235        let (ops, ops_rx) = flume::unbounded();
236        let (fs_reqs, fs_reqs_rx) = flume::bounded(16);
237        let (close_reqs, close_reqs_rx) = flume::unbounded();
238        let (read_dir_reqs, read_dir_reqs_rx) = flume::unbounded();
239
240        let senders = Senders {
241            ops,
242            fs_reqs,
243            close_reqs,
244            read_dir_reqs,
245        };
246
247        let backend = {
248            let state = state.clone();
249            async move {
250                let read_end = &stream;
251                let state = &state;
252                let stream = &stream;
253
254                let router = HeaderRouter::new(stream);
255
256                let results_loop = async {
257                    // Parse responses
258
259                    let mut tmp_buf: Vec<MaybeUninit<u8>> = vec![];
260
261                    loop {
262                        let resp = match {
263                            IoRead::read::<Response>(read_end, NoPos::new())
264                                .instrument(tracing::span!(
265                                    tracing::Level::TRACE,
266                                    "response header"
267                                ))
268                                .await
269                        } {
270                            Ok(resp) => resp,
271                            Err(e) => {
272                                error!("No resp: {e}");
273                                break;
274                            }
275                        };
276
277                        // Verify that the tag is proper, since otherwise we may jump to the wrong place of
278                        // code. TODO: use proper deserialization techniques
279                        // SAFETY: memunsafe made safe
280                        // while adding this check saves us from memory safety bugs, this will probably
281                        // still lead to arbitrarily large allocations that make us crash.
282                        let tag = unsafe { *(&resp as *const _ as *const u8) };
283                        assert!(tag < 4, "incoming data tag is invalid {tag}");
284
285                        trace!("Response: {resp:?}");
286
287                        match resp {
288                            Response::Read {
289                                packet_id,
290                                idx,
291                                len,
292                                err,
293                            } => {
294                                async move {
295                                    let pkt = {
296                                        let mut state = state.lock();
297                                        if let Some(InFlightOpType::Read(shards)) =
298                                            state.in_flight_ops.get_mut(packet_id as usize)
299                                        {
300                                            let pkt = shards.extract(idx, len).unwrap();
301                                            assert_eq!(pkt.len(), len);
302
303                                            if shards.is_empty() {
304                                                state.in_flight_ops.remove(packet_id as usize);
305                                            }
306
307                                            core::mem::drop(state);
308
309                                            if let Some(err) = err.map(Error::from_int_err) {
310                                                pkt.error(err);
311                                                None
312                                            } else {
313                                                Some(pkt)
314                                            }
315                                        } else {
316                                            None
317                                        }
318                                    };
319
320                                    if let Some(pkt) = pkt {
321                                        trace!("Send read {}", pkt.len());
322                                        // FIXME we should wait for this to finish perhaps?
323                                        read_end.send_io(NoPos::new(), pkt);
324                                    }
325                                }
326                                .instrument(tracing::span!(
327                                    tracing::Level::TRACE,
328                                    "read response",
329                                    packet_id,
330                                    idx,
331                                    len
332                                ))
333                                .await;
334                            }
335                            Response::Write {
336                                packet_id,
337                                idx,
338                                len,
339                                err,
340                            } => {
341                                let read_span = tracing::span!(
342                                    tracing::Level::TRACE,
343                                    "write response",
344                                    packet_id,
345                                    idx,
346                                    len
347                                );
348                                async {
349                                    let mut state = state.lock();
350                                    if let Some(InFlightOpType::Write(key)) =
351                                        state.in_flight_ops.get_mut(packet_id as usize)
352                                    {
353                                        if router.pkt_result(
354                                            *key,
355                                            idx,
356                                            len,
357                                            err.map(Error::from_int_err),
358                                        ) {
359                                            state.in_flight_ops.remove(packet_id as usize);
360                                        }
361                                    }
362                                }
363                                .instrument(read_span)
364                                .await
365                            }
366                            Response::Fs { req_id, resp_len } => {
367                                let fs_span = tracing::span!(
368                                    tracing::Level::TRACE,
369                                    "fs response",
370                                    req_id,
371                                    resp_len
372                                );
373                                tmp_buf = async move {
374                                    // TODO: make a wrapper for this...
375                                    let resp_len = resp_len as usize;
376                                    if tmp_buf.capacity() < resp_len {
377                                        tmp_buf.reserve(resp_len - tmp_buf.len());
378                                    }
379                                    // SAFETY: the data here is unininitialized
380                                    unsafe { tmp_buf.set_len(resp_len) };
381
382                                    let tmp_buf = if resp_len > 0 {
383                                        let buf = VecPacket::from(tmp_buf);
384                                        let buf =
385                                            read_end.read_all(NoPos::new(), buf).await.unwrap();
386                                        buf.take()
387                                    } else {
388                                        tmp_buf
389                                    };
390
391                                    let resp: Option<FsResponse> = postcard::from_bytes(unsafe {
392                                        &*(&tmp_buf[..] as *const [MaybeUninit<_>] as *const [u8])
393                                    })
394                                    .ok();
395
396                                    log::trace!("Fs Response {req_id}: {resp:?}");
397
398                                    let mut state = state.lock();
399                                    if let Some(req) = state.fs_reqs.get_mut(req_id as usize) {
400                                        log::trace!("State: {req:?}");
401                                        if let FsRequestState::Processing { waker } = req {
402                                            let waker = waker.take();
403
404                                            *req = FsRequestState::Complete { resp };
405
406                                            if let Some(waker) = waker {
407                                                waker.wake();
408                                            }
409                                        }
410                                    }
411
412                                    tmp_buf
413                                }
414                                .instrument(fs_span)
415                                .await
416                            }
417                            Response::ReadDir { stream_id, len } => {
418                                let fs_span = tracing::span!(
419                                    tracing::Level::TRACE,
420                                    "read dir",
421                                    stream_id,
422                                    len,
423                                );
424                                tmp_buf = async move {
425                                    let closed = (len & (1 << 31)) != 0;
426                                    let len = len & !(1 << 31);
427                                    log::trace!("ReadDir resp: {closed} {len}");
428                                    // TODO: make a wrapper for this...
429                                    let len = len as usize;
430                                    if tmp_buf.capacity() < len {
431                                        tmp_buf.reserve(len - tmp_buf.len());
432                                    }
433
434                                    // SAFETY: the data here is unininitialized
435                                    unsafe { tmp_buf.set_len(len) };
436
437                                    let buf = VecPacket::from(tmp_buf);
438                                    let buf = read_end.read_all(NoPos::new(), buf).await.unwrap();
439                                    let tmp_buf = buf.take();
440
441                                    let resp: Vec<ReadDirResponse> = postcard::from_bytes(unsafe {
442                                        &*(&tmp_buf[..] as *const [MaybeUninit<u8>]
443                                            as *const [u8])
444                                    })
445                                    .unwrap();
446
447                                    log::trace!("Resulting: {}", resp.len());
448
449                                    let mut state = state.lock();
450                                    if let Some(stream) = state.read_dir_streams.get_mut(&stream_id)
451                                    {
452                                        let waker = stream.waker.take();
453
454                                        if closed || resp.is_empty() {
455                                            log::trace!("Closing {closed}");
456                                            stream.closed = true;
457                                        }
458
459                                        stream.resp_count += resp.len();
460                                        stream.results.extend(
461                                            resp.into_iter()
462                                                .map(|r| r.map_err(Error::from_int_err)),
463                                        );
464
465                                        if let Some(waker) = waker {
466                                            waker.wake();
467                                        }
468                                    }
469
470                                    tmp_buf
471                                }
472                                .instrument(fs_span)
473                                .await
474                            }
475                        }
476                    }
477                }
478                .instrument(tracing::span!(tracing::Level::TRACE, "results_loop"))
479                .fuse();
480
481                let ops_loop = async {
482                    while let Ok(op) = ops_rx.recv_async().await {
483                        let key = state.lock().in_flight_ops.insert(InFlightOpType::Reserved);
484                        assert!(key <= u32::MAX as usize);
485
486                        async {
487                            let op = op.write_msg(&router, key as u32).unwrap();
488
489                            *state.lock().in_flight_ops.get_mut(key).unwrap() = op;
490                        }
491                        .instrument(tracing::span!(tracing::Level::TRACE, "op send", key))
492                        .await
493                    }
494                }
495                .instrument(tracing::span!(tracing::Level::TRACE, "ops_loop"))
496                .fuse();
497
498                let read_dir_loop = async {
499                    while let Ok(stream_id) = read_dir_reqs_rx.recv_async().await {
500                        log::trace!("Read dir {stream_id}");
501                        async {
502                            let mut state = state.lock();
503
504                            if let Some(req) = state.read_dir_streams.get_mut(&stream_id) {
505                                if !req.closed {
506                                    let count = req.compute_count();
507                                    core::mem::drop(state);
508                                    router.send_hdr(|_| Request::ReadDir { stream_id, count });
509                                }
510                            }
511                        }
512                        .instrument(tracing::span!(
513                            tracing::Level::TRACE,
514                            "read dir more",
515                            stream_id,
516                        ))
517                        .await;
518                    }
519                }
520                .instrument(tracing::span!(tracing::Level::TRACE, "open_loop"))
521                .fuse();
522
523                let fs_loop = async {
524                    while let Ok(req_id) = fs_reqs_rx.recv_async().await {
525                        async {
526                            let msg = {
527                                let mut state = state.lock();
528
529                                if let Some(fs_req) = state.fs_reqs.get_mut(req_id as usize) {
530                                    if let FsRequestState::Started { req, dir_id, waker } = fs_req {
531                                        log::trace!("Request: {req:?}");
532                                        let buf = postcard::to_allocvec(&req).unwrap();
533                                        // TODO: verify buflen
534                                        let ret = Some((
535                                            Request::Fs {
536                                                req_id,
537                                                dir_id: *dir_id,
538                                                req_len: buf.len() as u16,
539                                            },
540                                            OwnedPacket::from(buf.into_boxed_slice()),
541                                        ));
542
543                                        *fs_req = FsRequestState::Processing {
544                                            waker: waker.take(),
545                                        };
546
547                                        ret
548                                    } else {
549                                        None
550                                    }
551                                } else {
552                                    None
553                                }
554                            };
555
556                            if let Some((header, buf)) = msg {
557                                router.send_bytes(|_| header, buf);
558                            }
559                        }
560                        .instrument(tracing::span!(tracing::Level::TRACE, "open send", req_id))
561                        .await;
562                    }
563                }
564                .instrument(tracing::span!(tracing::Level::TRACE, "open_loop"))
565                .fuse();
566
567                let close_loop = async {
568                    while let Ok(file_id) = close_reqs_rx.recv_async().await {
569                        async {
570                            router.send_hdr(|_| Request::FileClose { file_id });
571                        }
572                        .instrument(tracing::span!(tracing::Level::TRACE, "close send", file_id))
573                        .await
574                    }
575                }
576                .instrument(tracing::span!(tracing::Level::TRACE, "close_loop"))
577                .fuse();
578
579                let combined = async move {
580                    pin_mut!(fs_loop);
581                    pin_mut!(read_dir_loop);
582                    pin_mut!(close_loop);
583                    pin_mut!(ops_loop);
584                    pin_mut!(results_loop);
585
586                    futures::select! {
587                        _ = fs_loop => log::error!("Fs done"),
588                        _ = read_dir_loop => log::error!("Read dir done"),
589                        _ = close_loop => log::error!("Close done"),
590                        _ = ops_loop => log::error!("Ops done"),
591                        _ = results_loop => log::error!("Results done"),
592                    }
593                };
594
595                combined.await;
596
597                poll_fn(|_| {
598                    log::error!("Network backend polled to completion!");
599                    Poll::Pending
600                })
601                .await
602                //fs.with_backend(combined).0.await;
603            }
604        };
605
606        Ok(Self {
607            fs,
608            cwd: NetworkFsDir {
609                dir_id: 0,
610                state,
611                senders: senders.into(),
612            },
613            backend: BackendContainer::new_dyn(backend),
614            cancel_ops_on_drop,
615        })
616    }
617}
618
619impl IoBackend for NetworkFs {
620    type Backend = DynBackend;
621
622    fn polling_handle(&self) -> Option<PollingHandle> {
623        self.fs.polling_handle()
624    }
625
626    fn get_backend(&self) -> BackendHandle<Self::Backend> {
627        self.backend.acquire_nested(self.fs.get_backend())
628    }
629}
630
631impl Fs for NetworkFs {
632    type DirHandle<'a> = NetworkFsDir;
633
634    fn current_dir(&self) -> &Self::DirHandle<'_> {
635        &self.cwd
636    }
637}
638
639pub struct NetworkFsDir {
640    dir_id: u16,
641    state: BaseArc<Mutex<NetFsState>>,
642    senders: BaseArc<Senders>,
643}
644
645impl DirHandle for NetworkFsDir {
646    type FileHandle = Seekable<FileWrapper, u64>;
647    type OpenFileFuture<'a> = OpenFileFuture<'a>;
648    type PathFuture<'a> = PathFuture<'a>;
649    type OpenDirFuture<'a> = OpenDirFuture<'a>;
650    type ReadDir<'a> = ReadDir<'a>;
651    type ReadDirFuture<'a> = ReadDirFuture<'a>;
652    type MetadataFuture<'a> = MetadataFuture<'a>;
653    type OpFuture<'a> = OpFuture<'a>;
654
655    /// Gets the absolute path to this `DirHandle`.
656    fn path(&self) -> Self::PathFuture<'_> {
657        PathOp::make_future(self, FsRequest::Path)
658    }
659
660    /// Reads the directory contents.
661    ///
662    /// This function, upon success, returns a stream that can be used to list files and
663    /// subdirectories within this dir.
664    ///
665    /// Note that on various platforms this may behave differently. For instance, Unix platforms
666    /// support holding
667    fn read_dir(&self) -> Self::ReadDirFuture<'_> {
668        ReadDirOp::make_future(self, FsRequest::ReadDir)
669    }
670
671    /// Opens a file.
672    ///
673    /// This function accepts an absolute or relative path to a file for reading. If the path is
674    /// relative, it is opened relative to this `DirHandle`.
675    fn open_file<'a, P: AsRef<Path> + ?Sized>(
676        &'a self,
677        path: &'a P,
678        options: OpenOptions,
679    ) -> Self::OpenFileFuture<'a> {
680        OpenFileOp::make_future(
681            self,
682            FsRequest::OpenFile {
683                path: path.as_ref().to_string_lossy().into(),
684                options,
685            },
686        )
687    }
688
689    /// Opens a directory.
690    ///
691    /// This function accepts an absolute or relative path to a directory for reading. If the path
692    /// is relative, it is opened relative to this `DirHandle`.
693    fn open_dir<'a, P: AsRef<Path> + ?Sized>(&'a self, path: &'a P) -> Self::OpenDirFuture<'a> {
694        OpenDirOp::make_future(
695            self,
696            FsRequest::OpenDir {
697                path: path.as_ref().to_string_lossy().into(),
698            },
699        )
700    }
701
702    fn metadata<'a, P: AsRef<Path> + ?Sized>(&'a self, path: &'a P) -> Self::MetadataFuture<'a> {
703        MetadataOp::make_future(
704            self,
705            FsRequest::Metadata {
706                path: path.as_ref().to_string_lossy().into(),
707            },
708        )
709    }
710
711    /// Do an operation.
712    ///
713    /// This function performs an operation from the [`DirOp`](DirOp) enum.
714    fn do_op<'a, P: AsRef<Path> + ?Sized>(&'a self, operation: DirOp<&'a P>) -> Self::OpFuture<'a> {
715        OpOp::make_future(self, FsRequest::DirOp(operation.into_string()))
716    }
717}
718
719trait FsRequestProc {
720    type Output<'a>: 'a;
721    type Future<'a>: Future<Output = Self::Output<'a>> + 'a;
722
723    fn finish<'a>(
724        req: FsResponse,
725        state: &mut NetFsState,
726        dir: &'a NetworkFsDir,
727    ) -> Self::Output<'a>;
728
729    fn make_future2<'a>(
730        fs: &'a NetworkFsDir,
731        req_id: usize,
732        send: Option<SendFut<'a, u32>>,
733    ) -> Self::Future<'a>;
734
735    fn make_future(fs: &NetworkFsDir, req: FsRequest) -> Self::Future<'_> {
736        let state = &mut *fs.state.lock();
737
738        let req_id = state.fs_reqs.insert(FsRequestState::Started {
739            dir_id: fs.dir_id,
740            waker: None,
741            req,
742        });
743
744        assert!(req_id <= u32::MAX as usize);
745
746        Self::make_future2(
747            fs,
748            req_id,
749            Some(fs.senders.fs_reqs.send_async(req_id as u32)),
750        )
751    }
752}
753
754struct ReadDirStream {
755    queued_read: bool,
756    closed: bool,
757    results: VecDeque<Result<DirEntry>>,
758    waker: Option<Waker>,
759    resp_count: usize,
760}
761
762impl ReadDirStream {
763    fn compute_count(&self) -> u16 {
764        // Automatically scale the request count to have better performance on larger directories
765        core::cmp::min(128 * ((self.resp_count + 1).ilog2() + 1), u16::MAX as u32) as u16
766    }
767}
768
769pub struct ReadDir<'a> {
770    fs: &'a NetworkFsDir,
771    stream_id: u16,
772    send: Option<SendFut<'a, u16>>,
773    cache: VecDeque<Result<DirEntry>>,
774    closed: bool,
775}
776
777impl<'a> Stream for ReadDir<'a> {
778    type Item = Result<DirEntry>;
779
780    fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
781        let this = unsafe { self.get_unchecked_mut() };
782
783        loop {
784            if let Some(item) = this.cache.pop_front() {
785                break Poll::Ready(Some(item));
786            } else if !this.closed {
787                // If there's any read request pending to be sent, try to finish sending it.
788                if let Some(send) = this.send.as_mut() {
789                    let send = unsafe { Pin::new_unchecked(send) };
790
791                    if let Poll::Ready(res) = send.poll(cx) {
792                        this.send = None;
793                        if res.is_err() {
794                            this.closed = true;
795                        }
796                    } else {
797                        break Poll::Pending;
798                    }
799                }
800
801                let state = &mut *this.fs.state.lock();
802                if let Some(stream) = state.read_dir_streams.get_mut(&this.stream_id) {
803                    // TODO: have better read-ahead caching strategy.
804                    if !stream.results.is_empty() {
805                        core::mem::swap(&mut this.cache, &mut stream.results);
806                    }
807
808                    if stream.closed {
809                        state.read_dir_streams.remove(&this.stream_id);
810                    } else if !stream.queued_read {
811                        stream.waker = Some(cx.waker().clone());
812                        if !stream.queued_read {
813                            stream.queued_read = true;
814                            this.send =
815                                Some(this.fs.senders.read_dir_reqs.send_async(this.stream_id));
816                        }
817                    } else {
818                        stream.waker = Some(cx.waker().clone());
819                        break Poll::Pending;
820                    }
821                } else {
822                    this.closed = true;
823                    break Poll::Ready(None);
824                }
825            } else {
826                break Poll::Ready(None);
827            }
828        }
829    }
830}
831
832macro_rules! fs_op {
833    ($fut:ident, $op:ident, $block:expr => $rettype:ty) => {
834        pub struct $op;
835
836        impl FsRequestProc for $op {
837            type Output<'a> = $rettype;
838            type Future<'a> = $fut<'a>;
839
840            fn finish<'a>(
841                resp: FsResponse,
842                state: &mut NetFsState,
843                dir: &'a NetworkFsDir,
844            ) -> Self::Output<'a> {
845                #[allow(clippy::redundant_closure_call)]
846                ($block)(resp, state, dir)
847            }
848
849            fn make_future2<'a>(
850                fs: &'a NetworkFsDir,
851                req_id: usize,
852                send: Option<SendFut<'a, u32>>,
853            ) -> Self::Future<'a> {
854                $fut { fs, req_id, send }
855            }
856        }
857
858        pub struct $fut<'a> {
859            fs: &'a NetworkFsDir,
860            req_id: usize,
861            send: Option<SendFut<'a, u32>>,
862        }
863
864        impl<'a> Future for $fut<'a> {
865            type Output = $rettype;
866
867            fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
868                // SAFETY: we are not moving the contents of self, or self itself.
869                let this = unsafe { self.get_unchecked_mut() };
870
871                loop {
872                    if let Some(send) = &mut this.send {
873                        let send = unsafe { Pin::new_unchecked(send) };
874                        match send.poll(cx) {
875                            Poll::Ready(Ok(_)) => this.send = None,
876                            Poll::Ready(Err(_)) => {
877                                break Poll::Ready(Err(io::ErrorKind::BrokenPipe.into()))
878                            }
879                            Poll::Pending => break Poll::Pending,
880                        }
881                    } else {
882                        let state = &mut *this.fs.state.lock();
883
884                        break match state
885                            .fs_reqs
886                            .get_mut(this.req_id)
887                            .expect("Request was not found")
888                        {
889                            FsRequestState::Complete { .. } => Poll::Ready({
890                                // Remove the entry
891                                let resp = state.fs_reqs.remove(this.req_id);
892
893                                let FsRequestState::Complete { resp } = resp else {
894                                    unreachable!()
895                                };
896
897                                if let Some(resp) = resp {
898                                    $op::finish(resp, state, this.fs)
899                                } else {
900                                    Err(mferr!(Response, NotFound, Filesystem))
901                                }
902                            }),
903                            FsRequestState::Started { waker, .. } => {
904                                *waker = Some(cx.waker().clone());
905                                Poll::Pending
906                            }
907                            FsRequestState::Processing { waker } => {
908                                *waker = Some(cx.waker().clone());
909                                Poll::Pending
910                            }
911                        };
912                    }
913                }
914            }
915        }
916    };
917}
918
919fs_op!(
920    OpenFileFuture,
921    OpenFileOp,
922    |resp, _, dir: &NetworkFsDir| {
923        if let FsResponse::OpenFile { file_id } = resp {
924            file_id
925                .map(|file_id| {
926                    FileWrapper(IoWrapper {
927                        file_id,
928                        senders: dir.senders.clone(),
929                        _phantom: PhantomData,
930                    })
931                    .into()
932                })
933                .map_err(Error::from_int_err)
934        } else {
935            Err(mferr!(Response, Invalid, Filesystem))
936        }
937    } => Result<Seekable<FileWrapper, u64>>
938);
939
940fs_op!(
941    PathFuture,
942    PathOp,
943    |resp, _, _| {
944        if let FsResponse::Path { path } = resp {
945            path.map(From::from).map_err(Error::from_int_err)
946        } else {
947            Err(mferr!(Response, Invalid, Filesystem))
948        }
949    } => Result<PathBuf>
950);
951
952fs_op!(
953    ReadDirFuture,
954    ReadDirOp,
955    |resp, state: &mut NetFsState, dir| {
956        if let FsResponse::ReadDir { stream_id } = resp {
957            stream_id
958                .map(|stream_id| {
959                    state.read_dir_streams.insert(stream_id, ReadDirStream {
960                        queued_read: false,
961                        closed: false,
962                        results: Default::default(),
963                        waker: None,
964                        resp_count: 0,
965                    });
966                    ReadDir {
967                        fs: dir,
968                        stream_id,
969                        cache: Default::default(),
970                        closed: false,
971                        send: None,
972                    }
973                })
974                .map_err(Error::from_int_err)
975        } else {
976            Err(mferr!(Response, Invalid, Filesystem))
977        }
978    } => Result<ReadDir<'a>>
979);
980
981fs_op!(
982    OpenDirFuture,
983    OpenDirOp,
984    |resp, _, dir: &NetworkFsDir| {
985        if let FsResponse::OpenDir { dir_id } = resp {
986            dir_id
987                .map(|dir_id| NetworkFsDir {
988                    dir_id: dir_id.into(),
989                    senders: dir.senders.clone(),
990                    state: dir.state.clone(),
991                })
992                .map_err(Error::from_int_err)
993        } else {
994            Err(mferr!(Response, Invalid, Filesystem))
995        }
996    } => Result<NetworkFsDir>
997);
998
999fs_op!(
1000    MetadataFuture,
1001    MetadataOp,
1002    |resp, _, _| {
1003        if let FsResponse::Metadata { metadata } = resp {
1004            metadata.map_err(Error::from_int_err)
1005        } else {
1006            Err(mferr!(Response, Invalid, Filesystem))
1007        }
1008    } => Result<Metadata>
1009);
1010
1011fs_op!(
1012    OpFuture,
1013    OpOp,
1014    |resp, _, _| {
1015        if let FsResponse::DirOp(res) = resp {
1016            if let Some(err) = res.map(Error::from_int_err) {
1017                Err(err)
1018            } else {
1019                Ok(())
1020            }
1021        } else {
1022            Err(mferr!(Response, Invalid, Filesystem))
1023        }
1024    } => Result<()>
1025);
1026
1027impl<Perms: IntoOp, Param: IoAt> PacketIo<Perms, u64> for IoWrapper<Param> {
1028    fn send_io(&self, pos: u64, packet: BoundPacketView<Perms>) {
1029        let operation = FileOperation {
1030            file_id: self.file_id,
1031            pos,
1032            ty: Perms::into_op(packet),
1033        };
1034
1035        // TODO: what to do with error?
1036        let _ = self.senders.ops.send(operation);
1037    }
1038}
1039
1040pub struct IoWrapper<Param: IoAt> {
1041    file_id: u32,
1042    senders: BaseArc<Senders>,
1043    _phantom: PhantomData<Param>,
1044}
1045
1046impl<Param: IoAt> Drop for IoWrapper<Param> {
1047    fn drop(&mut self) {
1048        Param::drop_io_wrapper(self);
1049    }
1050}
1051
1052impl PacketIo<Read, u64> for FileWrapper {
1053    fn send_io(&self, param: u64, view: BoundPacketView<Read>) {
1054        self.0.send_io(param, view)
1055    }
1056}
1057
1058impl PacketIo<Write, u64> for FileWrapper {
1059    fn send_io(&self, param: u64, view: BoundPacketView<Write>) {
1060        self.0.send_io(param, view)
1061    }
1062}
1063
1064pub struct FileWrapper(IoWrapper<u64>);
1065
1066pub trait IoAt: Sized + Clone {
1067    fn drop_io_wrapper(io: &mut IoWrapper<Self>);
1068}
1069
1070impl IoAt for u64 {
1071    fn drop_io_wrapper(io: &mut IoWrapper<Self>) {
1072        let _ = io.senders.close_reqs.send(io.file_id);
1073    }
1074}