Skip to main content

tempest_rt/fs/
write_at.rs

1//! Async positioned write operation.
2
3use std::{
4    io,
5    marker::PhantomData,
6    mem::replace,
7    pin::Pin,
8    task::{Context, Poll},
9};
10
11use tempest_io::{Io, IoBuf, OpHandle, WriteHandle};
12
13use crate::context::{current_io, get_op_handle};
14
15enum WriteAtState<B: IoBuf> {
16    NeedsSubmit {
17        buf: B,
18        op_handle: Option<OpHandle>,
19    },
20    InFlight {
21        write_handle: WriteHandle<B>,
22        op_handle: OpHandle,
23    },
24    Done,
25}
26
27/// Future that writes to a file at a given offset, resolving to `(bytes_written, buf)`.
28///
29/// May write fewer bytes than provided; use [`write_exact`] for a full write.
30///
31/// [`write_exact`]: crate::fs::write_exact
32#[must_use = "futures do nothing unless awaited"]
33pub struct WriteAt<B: IoBuf, I: Io> {
34    state: WriteAtState<B>,
35    fd: I::Fd,
36    offset: u64,
37    _marker: PhantomData<fn() -> I>,
38}
39
40/// Writes `buf` to `fd` at `offset`. Returns `(bytes_written, buf)`.
41pub fn write_at<B: IoBuf, I: Io>(fd: I::Fd, buf: B, offset: u64) -> WriteAt<B, I> {
42    WriteAt {
43        state: WriteAtState::NeedsSubmit {
44            buf,
45            op_handle: None,
46        },
47        fd,
48        offset,
49        _marker: PhantomData,
50    }
51}
52
53impl<B: IoBuf, I: Io> Unpin for WriteAt<B, I> {}
54
55impl<B: IoBuf, I: Io> Future for WriteAt<B, I> {
56    type Output = (io::Result<usize>, B);
57
58    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
59        let this = self.get_mut();
60
61        match replace(&mut this.state, WriteAtState::Done) {
62            WriteAtState::NeedsSubmit { buf, op_handle } => {
63                let op_handle = op_handle.unwrap_or_else(|| unsafe { get_op_handle(cx) });
64
65                // SAFETY: we do not hold on to io outside of this function
66                let io = unsafe { current_io::<I>() };
67                match io.write_at(this.fd, buf, this.offset, op_handle) {
68                    Ok(write_handle) => {
69                        this.state = WriteAtState::InFlight {
70                            write_handle,
71                            op_handle,
72                        };
73                        Poll::Pending
74                    }
75                    Err((e, buf)) if e.kind() == io::ErrorKind::WouldBlock => {
76                        this.state = WriteAtState::NeedsSubmit {
77                            buf,
78                            op_handle: Some(op_handle),
79                        };
80                        cx.waker().wake_by_ref();
81                        Poll::Pending
82                    }
83                    Err((e, buf)) => Poll::Ready((Err(e), buf)),
84                }
85            }
86            WriteAtState::InFlight {
87                write_handle,
88                op_handle,
89            } => {
90                // SAFETY: we do not hold on to io outside of this function
91                let io = unsafe { current_io::<I>() };
92                match io.get_cqe(op_handle) {
93                    Some(Ok(bytes_written)) => {
94                        let buf = write_handle.complete();
95                        Poll::Ready((Ok(bytes_written as usize), buf))
96                    }
97                    Some(Err(e)) => {
98                        let buf = write_handle.complete();
99                        Poll::Ready((Err(e), buf))
100                    }
101                    None => Poll::Pending,
102                }
103            }
104            WriteAtState::Done => panic!("polled after completion"),
105        }
106    }
107}