1use std::collections::{BTreeMap, VecDeque};
2
3use core::future::poll_fn;
4use core::marker::PhantomData;
5use core::mem::MaybeUninit;
6use core::task::{Context, Poll, Waker};
7
8use mfio::backend::*;
9use mfio::error::Result;
10use mfio::io::*;
11use mfio::mferr;
12
13use mfio::tarc::{Arc, BaseArc};
14use mfio::traits::*;
15
16use parking_lot::Mutex;
17use slab::Slab;
18
19use super::{FsRequest, FsResponse, HeaderRouter, ReadDirResponse, Request, Response};
20use cglue::result::IntError;
21use futures::{future::FutureExt, pin_mut};
22use mfio::error::Error;
23use mfio_rt::{DirEntry, DirHandle, DirOp, Fs, Metadata, OpenOptions};
24
25use core::future::Future;
26use core::pin::Pin;
27use mfio::stdeq::Seekable;
28use std::path::Path;
29
30use std::io::{self };
31use std::net::{SocketAddr, TcpStream};
32use std::path::PathBuf;
33
34use flume::{r#async::SendFut, Sender};
35use futures::Stream;
36use log::*;
37use tracing::instrument::Instrument;
38
39struct FileOperation {
40 file_id: u32,
41 pos: u64,
42 ty: OpType,
43}
44
45impl FileOperation {
46 fn write_msg<Io: PacketIo<Read, NoPos>>(
47 self,
48 router: &HeaderRouter<'_, Request, Read, Io>,
49 packet_id: u32,
50 ) -> Result<InFlightOpType> {
51 let Self { file_id, pos, ty } = self;
52 ty.write_msg(router, file_id, pos, packet_id)
53 }
54}
55
56trait IntoOp: PacketPerms {
57 fn into_op(pkt: BoundPacketView<Self>) -> OpType;
58}
59
60impl IntoOp for Read {
61 fn into_op(pkt: BoundPacketView<Self>) -> OpType {
62 OpType::Write(pkt)
63 }
64}
65
66impl IntoOp for Write {
67 fn into_op(pkt: BoundPacketView<Self>) -> OpType {
68 OpType::Read(pkt)
69 }
70}
71
72struct ShardedPacket<T: Splittable<u64>> {
73 shards: BTreeMap<u64, T>,
74}
75
76impl<T: Splittable<u64>> From<T> for ShardedPacket<T> {
77 fn from(pkt: T) -> Self {
78 Self {
79 shards: std::iter::once((0, pkt)).collect(),
80 }
81 }
82}
83
84impl<T: Splittable<u64>> ShardedPacket<T> {
85 fn is_empty(&self) -> bool {
86 self.shards.is_empty()
88 }
89
90 fn extract(&mut self, idx: u64, len: u64) -> Option<T> {
91 let (&shard_idx, _) = self.shards.range(..=idx).next_back()?;
92 let mut shard = self.shards.remove(&shard_idx)?;
93
94 if idx > shard_idx {
95 let (left, right) = shard.split_at(idx - shard_idx);
96 self.shards.insert(shard_idx, left);
97 shard = right;
98 }
99
100 if len < shard.len() {
101 let (left, right) = shard.split_at(len);
102 self.shards.insert(idx + len, right);
103 shard = left;
104 }
105
106 Some(shard)
107 }
108}
109
110enum InFlightOpType {
111 Reserved,
112 Read(ShardedPacket<BoundPacketView<Write>>),
113 Write(usize),
114}
115
116enum OpType {
117 Read(BoundPacketView<Write>),
118 Write(BoundPacketView<Read>),
119}
120
121impl OpType {
122 fn write_msg<Io: PacketIo<Read, NoPos>>(
123 self,
124 router: &HeaderRouter<'_, Request, Read, Io>,
125 file_id: u32,
126 pos: u64,
127 packet_id: u32,
128 ) -> Result<InFlightOpType> {
129 match self {
130 Self::Read(v) => {
131 router.send_hdr(|_| Request::Read {
132 file_id,
133 pos,
134 packet_id: packet_id as _,
135 len: v.len(),
136 });
137 Ok(InFlightOpType::Read(v.into()))
138 }
139 Self::Write(v) => {
140 let len = v.len();
141 let key = router.send_pkt(
142 |_| Request::Write {
143 file_id,
144 pos,
145 packet_id,
146 len,
147 },
148 v,
149 );
150 Ok(InFlightOpType::Write(key))
151 }
152 }
153 }
154}
155
156#[derive(Debug)]
157enum FsRequestState {
158 Started {
159 req: FsRequest,
160 dir_id: u16,
161 waker: Option<Waker>,
162 },
163 Processing {
164 waker: Option<Waker>,
165 },
166 Complete {
167 resp: Option<FsResponse>,
168 },
169}
170
171struct NetFsState {
172 in_flight_ops: Slab<InFlightOpType>,
173 fs_reqs: Slab<FsRequestState>,
174 read_dir_streams: BTreeMap<u16, ReadDirStream>,
175}
176
177struct Senders {
178 ops: Sender<FileOperation>,
179 fs_reqs: Sender<u32>,
180 read_dir_reqs: Sender<u16>,
181 close_reqs: Sender<u32>,
182}
183
184pub struct NetworkFs {
185 backend: BackendContainer<DynBackend>,
186 cwd: NetworkFsDir,
187 fs: Arc<mfio_rt::NativeRt>,
188 cancel_ops_on_drop: bool,
189}
190
191impl Drop for NetworkFs {
192 fn drop(&mut self) {
193 if self.cancel_ops_on_drop {
194 self.fs.cancel_all_ops();
195 }
196 }
197}
198
199impl NetworkFs {
200 pub fn try_new(addr: SocketAddr) -> Result<Self> {
201 let fs = Arc::new(
202 mfio_rt::NativeRt::builder()
203 .thread(true)
204 .enable_all()
205 .build()
206 .unwrap(),
207 );
208 Self::with_fs(addr, fs, true)
209 }
210
211 pub fn with_fs(
221 addr: SocketAddr,
222 fs: Arc<mfio_rt::NativeRt>,
223 cancel_ops_on_drop: bool,
224 ) -> Result<Self> {
225 let stream = TcpStream::connect(addr)?;
226
227 let stream = fs.register_stream(stream);
228
229 let state = BaseArc::new(Mutex::new(NetFsState {
230 in_flight_ops: Default::default(),
231 fs_reqs: Default::default(),
232 read_dir_streams: Default::default(),
233 }));
234
235 let (ops, ops_rx) = flume::unbounded();
236 let (fs_reqs, fs_reqs_rx) = flume::bounded(16);
237 let (close_reqs, close_reqs_rx) = flume::unbounded();
238 let (read_dir_reqs, read_dir_reqs_rx) = flume::unbounded();
239
240 let senders = Senders {
241 ops,
242 fs_reqs,
243 close_reqs,
244 read_dir_reqs,
245 };
246
247 let backend = {
248 let state = state.clone();
249 async move {
250 let read_end = &stream;
251 let state = &state;
252 let stream = &stream;
253
254 let router = HeaderRouter::new(stream);
255
256 let results_loop = async {
257 let mut tmp_buf: Vec<MaybeUninit<u8>> = vec![];
260
261 loop {
262 let resp = match {
263 IoRead::read::<Response>(read_end, NoPos::new())
264 .instrument(tracing::span!(
265 tracing::Level::TRACE,
266 "response header"
267 ))
268 .await
269 } {
270 Ok(resp) => resp,
271 Err(e) => {
272 error!("No resp: {e}");
273 break;
274 }
275 };
276
277 let tag = unsafe { *(&resp as *const _ as *const u8) };
283 assert!(tag < 4, "incoming data tag is invalid {tag}");
284
285 trace!("Response: {resp:?}");
286
287 match resp {
288 Response::Read {
289 packet_id,
290 idx,
291 len,
292 err,
293 } => {
294 async move {
295 let pkt = {
296 let mut state = state.lock();
297 if let Some(InFlightOpType::Read(shards)) =
298 state.in_flight_ops.get_mut(packet_id as usize)
299 {
300 let pkt = shards.extract(idx, len).unwrap();
301 assert_eq!(pkt.len(), len);
302
303 if shards.is_empty() {
304 state.in_flight_ops.remove(packet_id as usize);
305 }
306
307 core::mem::drop(state);
308
309 if let Some(err) = err.map(Error::from_int_err) {
310 pkt.error(err);
311 None
312 } else {
313 Some(pkt)
314 }
315 } else {
316 None
317 }
318 };
319
320 if let Some(pkt) = pkt {
321 trace!("Send read {}", pkt.len());
322 read_end.send_io(NoPos::new(), pkt);
324 }
325 }
326 .instrument(tracing::span!(
327 tracing::Level::TRACE,
328 "read response",
329 packet_id,
330 idx,
331 len
332 ))
333 .await;
334 }
335 Response::Write {
336 packet_id,
337 idx,
338 len,
339 err,
340 } => {
341 let read_span = tracing::span!(
342 tracing::Level::TRACE,
343 "write response",
344 packet_id,
345 idx,
346 len
347 );
348 async {
349 let mut state = state.lock();
350 if let Some(InFlightOpType::Write(key)) =
351 state.in_flight_ops.get_mut(packet_id as usize)
352 {
353 if router.pkt_result(
354 *key,
355 idx,
356 len,
357 err.map(Error::from_int_err),
358 ) {
359 state.in_flight_ops.remove(packet_id as usize);
360 }
361 }
362 }
363 .instrument(read_span)
364 .await
365 }
366 Response::Fs { req_id, resp_len } => {
367 let fs_span = tracing::span!(
368 tracing::Level::TRACE,
369 "fs response",
370 req_id,
371 resp_len
372 );
373 tmp_buf = async move {
374 let resp_len = resp_len as usize;
376 if tmp_buf.capacity() < resp_len {
377 tmp_buf.reserve(resp_len - tmp_buf.len());
378 }
379 unsafe { tmp_buf.set_len(resp_len) };
381
382 let tmp_buf = if resp_len > 0 {
383 let buf = VecPacket::from(tmp_buf);
384 let buf =
385 read_end.read_all(NoPos::new(), buf).await.unwrap();
386 buf.take()
387 } else {
388 tmp_buf
389 };
390
391 let resp: Option<FsResponse> = postcard::from_bytes(unsafe {
392 &*(&tmp_buf[..] as *const [MaybeUninit<_>] as *const [u8])
393 })
394 .ok();
395
396 log::trace!("Fs Response {req_id}: {resp:?}");
397
398 let mut state = state.lock();
399 if let Some(req) = state.fs_reqs.get_mut(req_id as usize) {
400 log::trace!("State: {req:?}");
401 if let FsRequestState::Processing { waker } = req {
402 let waker = waker.take();
403
404 *req = FsRequestState::Complete { resp };
405
406 if let Some(waker) = waker {
407 waker.wake();
408 }
409 }
410 }
411
412 tmp_buf
413 }
414 .instrument(fs_span)
415 .await
416 }
417 Response::ReadDir { stream_id, len } => {
418 let fs_span = tracing::span!(
419 tracing::Level::TRACE,
420 "read dir",
421 stream_id,
422 len,
423 );
424 tmp_buf = async move {
425 let closed = (len & (1 << 31)) != 0;
426 let len = len & !(1 << 31);
427 log::trace!("ReadDir resp: {closed} {len}");
428 let len = len as usize;
430 if tmp_buf.capacity() < len {
431 tmp_buf.reserve(len - tmp_buf.len());
432 }
433
434 unsafe { tmp_buf.set_len(len) };
436
437 let buf = VecPacket::from(tmp_buf);
438 let buf = read_end.read_all(NoPos::new(), buf).await.unwrap();
439 let tmp_buf = buf.take();
440
441 let resp: Vec<ReadDirResponse> = postcard::from_bytes(unsafe {
442 &*(&tmp_buf[..] as *const [MaybeUninit<u8>]
443 as *const [u8])
444 })
445 .unwrap();
446
447 log::trace!("Resulting: {}", resp.len());
448
449 let mut state = state.lock();
450 if let Some(stream) = state.read_dir_streams.get_mut(&stream_id)
451 {
452 let waker = stream.waker.take();
453
454 if closed || resp.is_empty() {
455 log::trace!("Closing {closed}");
456 stream.closed = true;
457 }
458
459 stream.resp_count += resp.len();
460 stream.results.extend(
461 resp.into_iter()
462 .map(|r| r.map_err(Error::from_int_err)),
463 );
464
465 if let Some(waker) = waker {
466 waker.wake();
467 }
468 }
469
470 tmp_buf
471 }
472 .instrument(fs_span)
473 .await
474 }
475 }
476 }
477 }
478 .instrument(tracing::span!(tracing::Level::TRACE, "results_loop"))
479 .fuse();
480
481 let ops_loop = async {
482 while let Ok(op) = ops_rx.recv_async().await {
483 let key = state.lock().in_flight_ops.insert(InFlightOpType::Reserved);
484 assert!(key <= u32::MAX as usize);
485
486 async {
487 let op = op.write_msg(&router, key as u32).unwrap();
488
489 *state.lock().in_flight_ops.get_mut(key).unwrap() = op;
490 }
491 .instrument(tracing::span!(tracing::Level::TRACE, "op send", key))
492 .await
493 }
494 }
495 .instrument(tracing::span!(tracing::Level::TRACE, "ops_loop"))
496 .fuse();
497
498 let read_dir_loop = async {
499 while let Ok(stream_id) = read_dir_reqs_rx.recv_async().await {
500 log::trace!("Read dir {stream_id}");
501 async {
502 let mut state = state.lock();
503
504 if let Some(req) = state.read_dir_streams.get_mut(&stream_id) {
505 if !req.closed {
506 let count = req.compute_count();
507 core::mem::drop(state);
508 router.send_hdr(|_| Request::ReadDir { stream_id, count });
509 }
510 }
511 }
512 .instrument(tracing::span!(
513 tracing::Level::TRACE,
514 "read dir more",
515 stream_id,
516 ))
517 .await;
518 }
519 }
520 .instrument(tracing::span!(tracing::Level::TRACE, "open_loop"))
521 .fuse();
522
523 let fs_loop = async {
524 while let Ok(req_id) = fs_reqs_rx.recv_async().await {
525 async {
526 let msg = {
527 let mut state = state.lock();
528
529 if let Some(fs_req) = state.fs_reqs.get_mut(req_id as usize) {
530 if let FsRequestState::Started { req, dir_id, waker } = fs_req {
531 log::trace!("Request: {req:?}");
532 let buf = postcard::to_allocvec(&req).unwrap();
533 let ret = Some((
535 Request::Fs {
536 req_id,
537 dir_id: *dir_id,
538 req_len: buf.len() as u16,
539 },
540 OwnedPacket::from(buf.into_boxed_slice()),
541 ));
542
543 *fs_req = FsRequestState::Processing {
544 waker: waker.take(),
545 };
546
547 ret
548 } else {
549 None
550 }
551 } else {
552 None
553 }
554 };
555
556 if let Some((header, buf)) = msg {
557 router.send_bytes(|_| header, buf);
558 }
559 }
560 .instrument(tracing::span!(tracing::Level::TRACE, "open send", req_id))
561 .await;
562 }
563 }
564 .instrument(tracing::span!(tracing::Level::TRACE, "open_loop"))
565 .fuse();
566
567 let close_loop = async {
568 while let Ok(file_id) = close_reqs_rx.recv_async().await {
569 async {
570 router.send_hdr(|_| Request::FileClose { file_id });
571 }
572 .instrument(tracing::span!(tracing::Level::TRACE, "close send", file_id))
573 .await
574 }
575 }
576 .instrument(tracing::span!(tracing::Level::TRACE, "close_loop"))
577 .fuse();
578
579 let combined = async move {
580 pin_mut!(fs_loop);
581 pin_mut!(read_dir_loop);
582 pin_mut!(close_loop);
583 pin_mut!(ops_loop);
584 pin_mut!(results_loop);
585
586 futures::select! {
587 _ = fs_loop => log::error!("Fs done"),
588 _ = read_dir_loop => log::error!("Read dir done"),
589 _ = close_loop => log::error!("Close done"),
590 _ = ops_loop => log::error!("Ops done"),
591 _ = results_loop => log::error!("Results done"),
592 }
593 };
594
595 combined.await;
596
597 poll_fn(|_| {
598 log::error!("Network backend polled to completion!");
599 Poll::Pending
600 })
601 .await
602 }
604 };
605
606 Ok(Self {
607 fs,
608 cwd: NetworkFsDir {
609 dir_id: 0,
610 state,
611 senders: senders.into(),
612 },
613 backend: BackendContainer::new_dyn(backend),
614 cancel_ops_on_drop,
615 })
616 }
617}
618
619impl IoBackend for NetworkFs {
620 type Backend = DynBackend;
621
622 fn polling_handle(&self) -> Option<PollingHandle> {
623 self.fs.polling_handle()
624 }
625
626 fn get_backend(&self) -> BackendHandle<Self::Backend> {
627 self.backend.acquire_nested(self.fs.get_backend())
628 }
629}
630
631impl Fs for NetworkFs {
632 type DirHandle<'a> = NetworkFsDir;
633
634 fn current_dir(&self) -> &Self::DirHandle<'_> {
635 &self.cwd
636 }
637}
638
639pub struct NetworkFsDir {
640 dir_id: u16,
641 state: BaseArc<Mutex<NetFsState>>,
642 senders: BaseArc<Senders>,
643}
644
645impl DirHandle for NetworkFsDir {
646 type FileHandle = Seekable<FileWrapper, u64>;
647 type OpenFileFuture<'a> = OpenFileFuture<'a>;
648 type PathFuture<'a> = PathFuture<'a>;
649 type OpenDirFuture<'a> = OpenDirFuture<'a>;
650 type ReadDir<'a> = ReadDir<'a>;
651 type ReadDirFuture<'a> = ReadDirFuture<'a>;
652 type MetadataFuture<'a> = MetadataFuture<'a>;
653 type OpFuture<'a> = OpFuture<'a>;
654
655 fn path(&self) -> Self::PathFuture<'_> {
657 PathOp::make_future(self, FsRequest::Path)
658 }
659
660 fn read_dir(&self) -> Self::ReadDirFuture<'_> {
668 ReadDirOp::make_future(self, FsRequest::ReadDir)
669 }
670
671 fn open_file<'a, P: AsRef<Path> + ?Sized>(
676 &'a self,
677 path: &'a P,
678 options: OpenOptions,
679 ) -> Self::OpenFileFuture<'a> {
680 OpenFileOp::make_future(
681 self,
682 FsRequest::OpenFile {
683 path: path.as_ref().to_string_lossy().into(),
684 options,
685 },
686 )
687 }
688
689 fn open_dir<'a, P: AsRef<Path> + ?Sized>(&'a self, path: &'a P) -> Self::OpenDirFuture<'a> {
694 OpenDirOp::make_future(
695 self,
696 FsRequest::OpenDir {
697 path: path.as_ref().to_string_lossy().into(),
698 },
699 )
700 }
701
702 fn metadata<'a, P: AsRef<Path> + ?Sized>(&'a self, path: &'a P) -> Self::MetadataFuture<'a> {
703 MetadataOp::make_future(
704 self,
705 FsRequest::Metadata {
706 path: path.as_ref().to_string_lossy().into(),
707 },
708 )
709 }
710
711 fn do_op<'a, P: AsRef<Path> + ?Sized>(&'a self, operation: DirOp<&'a P>) -> Self::OpFuture<'a> {
715 OpOp::make_future(self, FsRequest::DirOp(operation.into_string()))
716 }
717}
718
719trait FsRequestProc {
720 type Output<'a>: 'a;
721 type Future<'a>: Future<Output = Self::Output<'a>> + 'a;
722
723 fn finish<'a>(
724 req: FsResponse,
725 state: &mut NetFsState,
726 dir: &'a NetworkFsDir,
727 ) -> Self::Output<'a>;
728
729 fn make_future2<'a>(
730 fs: &'a NetworkFsDir,
731 req_id: usize,
732 send: Option<SendFut<'a, u32>>,
733 ) -> Self::Future<'a>;
734
735 fn make_future(fs: &NetworkFsDir, req: FsRequest) -> Self::Future<'_> {
736 let state = &mut *fs.state.lock();
737
738 let req_id = state.fs_reqs.insert(FsRequestState::Started {
739 dir_id: fs.dir_id,
740 waker: None,
741 req,
742 });
743
744 assert!(req_id <= u32::MAX as usize);
745
746 Self::make_future2(
747 fs,
748 req_id,
749 Some(fs.senders.fs_reqs.send_async(req_id as u32)),
750 )
751 }
752}
753
754struct ReadDirStream {
755 queued_read: bool,
756 closed: bool,
757 results: VecDeque<Result<DirEntry>>,
758 waker: Option<Waker>,
759 resp_count: usize,
760}
761
762impl ReadDirStream {
763 fn compute_count(&self) -> u16 {
764 core::cmp::min(128 * ((self.resp_count + 1).ilog2() + 1), u16::MAX as u32) as u16
766 }
767}
768
769pub struct ReadDir<'a> {
770 fs: &'a NetworkFsDir,
771 stream_id: u16,
772 send: Option<SendFut<'a, u16>>,
773 cache: VecDeque<Result<DirEntry>>,
774 closed: bool,
775}
776
777impl<'a> Stream for ReadDir<'a> {
778 type Item = Result<DirEntry>;
779
780 fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
781 let this = unsafe { self.get_unchecked_mut() };
782
783 loop {
784 if let Some(item) = this.cache.pop_front() {
785 break Poll::Ready(Some(item));
786 } else if !this.closed {
787 if let Some(send) = this.send.as_mut() {
789 let send = unsafe { Pin::new_unchecked(send) };
790
791 if let Poll::Ready(res) = send.poll(cx) {
792 this.send = None;
793 if res.is_err() {
794 this.closed = true;
795 }
796 } else {
797 break Poll::Pending;
798 }
799 }
800
801 let state = &mut *this.fs.state.lock();
802 if let Some(stream) = state.read_dir_streams.get_mut(&this.stream_id) {
803 if !stream.results.is_empty() {
805 core::mem::swap(&mut this.cache, &mut stream.results);
806 }
807
808 if stream.closed {
809 state.read_dir_streams.remove(&this.stream_id);
810 } else if !stream.queued_read {
811 stream.waker = Some(cx.waker().clone());
812 if !stream.queued_read {
813 stream.queued_read = true;
814 this.send =
815 Some(this.fs.senders.read_dir_reqs.send_async(this.stream_id));
816 }
817 } else {
818 stream.waker = Some(cx.waker().clone());
819 break Poll::Pending;
820 }
821 } else {
822 this.closed = true;
823 break Poll::Ready(None);
824 }
825 } else {
826 break Poll::Ready(None);
827 }
828 }
829 }
830}
831
832macro_rules! fs_op {
833 ($fut:ident, $op:ident, $block:expr => $rettype:ty) => {
834 pub struct $op;
835
836 impl FsRequestProc for $op {
837 type Output<'a> = $rettype;
838 type Future<'a> = $fut<'a>;
839
840 fn finish<'a>(
841 resp: FsResponse,
842 state: &mut NetFsState,
843 dir: &'a NetworkFsDir,
844 ) -> Self::Output<'a> {
845 #[allow(clippy::redundant_closure_call)]
846 ($block)(resp, state, dir)
847 }
848
849 fn make_future2<'a>(
850 fs: &'a NetworkFsDir,
851 req_id: usize,
852 send: Option<SendFut<'a, u32>>,
853 ) -> Self::Future<'a> {
854 $fut { fs, req_id, send }
855 }
856 }
857
858 pub struct $fut<'a> {
859 fs: &'a NetworkFsDir,
860 req_id: usize,
861 send: Option<SendFut<'a, u32>>,
862 }
863
864 impl<'a> Future for $fut<'a> {
865 type Output = $rettype;
866
867 fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
868 let this = unsafe { self.get_unchecked_mut() };
870
871 loop {
872 if let Some(send) = &mut this.send {
873 let send = unsafe { Pin::new_unchecked(send) };
874 match send.poll(cx) {
875 Poll::Ready(Ok(_)) => this.send = None,
876 Poll::Ready(Err(_)) => {
877 break Poll::Ready(Err(io::ErrorKind::BrokenPipe.into()))
878 }
879 Poll::Pending => break Poll::Pending,
880 }
881 } else {
882 let state = &mut *this.fs.state.lock();
883
884 break match state
885 .fs_reqs
886 .get_mut(this.req_id)
887 .expect("Request was not found")
888 {
889 FsRequestState::Complete { .. } => Poll::Ready({
890 let resp = state.fs_reqs.remove(this.req_id);
892
893 let FsRequestState::Complete { resp } = resp else {
894 unreachable!()
895 };
896
897 if let Some(resp) = resp {
898 $op::finish(resp, state, this.fs)
899 } else {
900 Err(mferr!(Response, NotFound, Filesystem))
901 }
902 }),
903 FsRequestState::Started { waker, .. } => {
904 *waker = Some(cx.waker().clone());
905 Poll::Pending
906 }
907 FsRequestState::Processing { waker } => {
908 *waker = Some(cx.waker().clone());
909 Poll::Pending
910 }
911 };
912 }
913 }
914 }
915 }
916 };
917}
918
919fs_op!(
920 OpenFileFuture,
921 OpenFileOp,
922 |resp, _, dir: &NetworkFsDir| {
923 if let FsResponse::OpenFile { file_id } = resp {
924 file_id
925 .map(|file_id| {
926 FileWrapper(IoWrapper {
927 file_id,
928 senders: dir.senders.clone(),
929 _phantom: PhantomData,
930 })
931 .into()
932 })
933 .map_err(Error::from_int_err)
934 } else {
935 Err(mferr!(Response, Invalid, Filesystem))
936 }
937 } => Result<Seekable<FileWrapper, u64>>
938);
939
940fs_op!(
941 PathFuture,
942 PathOp,
943 |resp, _, _| {
944 if let FsResponse::Path { path } = resp {
945 path.map(From::from).map_err(Error::from_int_err)
946 } else {
947 Err(mferr!(Response, Invalid, Filesystem))
948 }
949 } => Result<PathBuf>
950);
951
952fs_op!(
953 ReadDirFuture,
954 ReadDirOp,
955 |resp, state: &mut NetFsState, dir| {
956 if let FsResponse::ReadDir { stream_id } = resp {
957 stream_id
958 .map(|stream_id| {
959 state.read_dir_streams.insert(stream_id, ReadDirStream {
960 queued_read: false,
961 closed: false,
962 results: Default::default(),
963 waker: None,
964 resp_count: 0,
965 });
966 ReadDir {
967 fs: dir,
968 stream_id,
969 cache: Default::default(),
970 closed: false,
971 send: None,
972 }
973 })
974 .map_err(Error::from_int_err)
975 } else {
976 Err(mferr!(Response, Invalid, Filesystem))
977 }
978 } => Result<ReadDir<'a>>
979);
980
981fs_op!(
982 OpenDirFuture,
983 OpenDirOp,
984 |resp, _, dir: &NetworkFsDir| {
985 if let FsResponse::OpenDir { dir_id } = resp {
986 dir_id
987 .map(|dir_id| NetworkFsDir {
988 dir_id: dir_id.into(),
989 senders: dir.senders.clone(),
990 state: dir.state.clone(),
991 })
992 .map_err(Error::from_int_err)
993 } else {
994 Err(mferr!(Response, Invalid, Filesystem))
995 }
996 } => Result<NetworkFsDir>
997);
998
999fs_op!(
1000 MetadataFuture,
1001 MetadataOp,
1002 |resp, _, _| {
1003 if let FsResponse::Metadata { metadata } = resp {
1004 metadata.map_err(Error::from_int_err)
1005 } else {
1006 Err(mferr!(Response, Invalid, Filesystem))
1007 }
1008 } => Result<Metadata>
1009);
1010
1011fs_op!(
1012 OpFuture,
1013 OpOp,
1014 |resp, _, _| {
1015 if let FsResponse::DirOp(res) = resp {
1016 if let Some(err) = res.map(Error::from_int_err) {
1017 Err(err)
1018 } else {
1019 Ok(())
1020 }
1021 } else {
1022 Err(mferr!(Response, Invalid, Filesystem))
1023 }
1024 } => Result<()>
1025);
1026
1027impl<Perms: IntoOp, Param: IoAt> PacketIo<Perms, u64> for IoWrapper<Param> {
1028 fn send_io(&self, pos: u64, packet: BoundPacketView<Perms>) {
1029 let operation = FileOperation {
1030 file_id: self.file_id,
1031 pos,
1032 ty: Perms::into_op(packet),
1033 };
1034
1035 let _ = self.senders.ops.send(operation);
1037 }
1038}
1039
1040pub struct IoWrapper<Param: IoAt> {
1041 file_id: u32,
1042 senders: BaseArc<Senders>,
1043 _phantom: PhantomData<Param>,
1044}
1045
1046impl<Param: IoAt> Drop for IoWrapper<Param> {
1047 fn drop(&mut self) {
1048 Param::drop_io_wrapper(self);
1049 }
1050}
1051
1052impl PacketIo<Read, u64> for FileWrapper {
1053 fn send_io(&self, param: u64, view: BoundPacketView<Read>) {
1054 self.0.send_io(param, view)
1055 }
1056}
1057
1058impl PacketIo<Write, u64> for FileWrapper {
1059 fn send_io(&self, param: u64, view: BoundPacketView<Write>) {
1060 self.0.send_io(param, view)
1061 }
1062}
1063
1064pub struct FileWrapper(IoWrapper<u64>);
1065
1066pub trait IoAt: Sized + Clone {
1067 fn drop_io_wrapper(io: &mut IoWrapper<Self>);
1068}
1069
1070impl IoAt for u64 {
1071 fn drop_io_wrapper(io: &mut IoWrapper<Self>) {
1072 let _ = io.senders.close_reqs.send(io.file_id);
1073 }
1074}