async_std/fs/
file.rs

1use std::cell::UnsafeCell;
2use std::cmp;
3use std::fmt;
4use std::io::{Read as _, Seek as _, Write as _};
5use std::ops::{Deref, DerefMut};
6use std::pin::Pin;
7use std::sync::atomic::{AtomicBool, Ordering};
8use std::sync::{Arc, Mutex};
9
10use crate::fs::{Metadata, Permissions};
11use crate::future;
12use crate::io::{self, Read, Seek, SeekFrom, Write};
13use crate::path::Path;
14use crate::prelude::*;
15use crate::task::{spawn_blocking, Context, Poll, Waker};
16use crate::utils::Context as _;
17
18/// An open file on the filesystem.
19///
20/// Depending on what options the file was opened with, this type can be used for reading and/or
21/// writing.
22///
23/// Files are automatically closed when they get dropped and any errors detected on closing are
24/// ignored. Use the [`sync_all`] method before dropping a file if such errors need to be handled.
25///
26/// This type is an async version of [`std::fs::File`].
27///
28/// [`sync_all`]: #method.sync_all
29/// [`std::fs::File`]: https://doc.rust-lang.org/std/fs/struct.File.html
30///
31/// # Examples
32///
33/// Create a new file and write some bytes to it:
34///
35/// ```no_run
36/// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
37/// #
38/// use async_std::fs::File;
39/// use async_std::prelude::*;
40///
41/// let mut file = File::create("a.txt").await?;
42/// file.write_all(b"Hello, world!").await?;
43/// #
44/// # Ok(()) }) }
45/// ```
46///
47/// Read the contents of a file into a vector of bytes:
48///
49/// ```no_run
50/// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
51/// #
52/// use async_std::fs::File;
53/// use async_std::prelude::*;
54///
55/// let mut file = File::open("a.txt").await?;
56/// let mut contents = Vec::new();
57/// file.read_to_end(&mut contents).await?;
58/// #
59/// # Ok(()) }) }
60/// ```
61pub struct File {
62    /// A reference to the inner file.
63    file: Arc<std::fs::File>,
64
65    /// The state of the file protected by an async lock.
66    lock: Lock<State>,
67}
68
69impl File {
70    /// Creates an async file handle.
71    pub(crate) fn new(file: std::fs::File, is_flushed: bool) -> File {
72        let file = Arc::new(file);
73
74        File {
75            file: file.clone(),
76            lock: Lock::new(State {
77                file,
78                mode: Mode::Idle,
79                cache: Vec::new(),
80                is_flushed,
81                last_read_err: None,
82                last_write_err: None,
83            }),
84        }
85    }
86
87    /// Opens a file in read-only mode.
88    ///
89    /// See the [`OpenOptions::open`] function for more options.
90    ///
91    /// # Errors
92    ///
93    /// An error will be returned in the following situations:
94    ///
95    /// * `path` does not point to an existing file.
96    /// * The current process lacks permissions to read the file.
97    /// * Some other I/O error occurred.
98    ///
99    /// For more details, see the list of errors documented by [`OpenOptions::open`].
100    ///
101    /// [`OpenOptions::open`]: struct.OpenOptions.html#method.open
102    ///
103    /// # Examples
104    ///
105    /// ```no_run
106    /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
107    /// #
108    /// use async_std::fs::File;
109    ///
110    /// let file = File::open("a.txt").await?;
111    /// #
112    /// # Ok(()) }) }
113    /// ```
114    pub async fn open<P: AsRef<Path>>(path: P) -> io::Result<File> {
115        let path = path.as_ref().to_owned();
116        let file = spawn_blocking(move || {
117            std::fs::File::open(&path).context(|| format!("could not open `{}`", path.display()))
118        })
119        .await??;
120        Ok(File::new(file, true))
121    }
122
123    /// Opens a file in write-only mode.
124    ///
125    /// This function will create a file if it does not exist, and will truncate it if it does.
126    ///
127    /// See the [`OpenOptions::open`] function for more options.
128    ///
129    /// # Errors
130    ///
131    /// An error will be returned in the following situations:
132    ///
133    /// * The file's parent directory does not exist.
134    /// * The current process lacks permissions to write to the file.
135    /// * Some other I/O error occurred.
136    ///
137    /// For more details, see the list of errors documented by [`OpenOptions::open`].
138    ///
139    /// [`OpenOptions::open`]: struct.OpenOptions.html#method.open
140    ///
141    /// # Examples
142    ///
143    /// ```no_run
144    /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
145    /// #
146    /// use async_std::fs::File;
147    ///
148    /// let file = File::create("a.txt").await?;
149    /// #
150    /// # Ok(()) }) }
151    /// ```
152    pub async fn create<P: AsRef<Path>>(path: P) -> io::Result<File> {
153        let path = path.as_ref().to_owned();
154        let file = spawn_blocking(move || {
155            std::fs::File::create(&path)
156                .context(|| format!("could not create `{}`", path.display()))
157        })
158        .await??;
159        Ok(File::new(file, true))
160    }
161
162    /// Synchronizes OS-internal buffered contents and metadata to disk.
163    ///
164    /// This function will ensure that all in-memory data reaches the filesystem.
165    ///
166    /// This can be used to handle errors that would otherwise only be caught when the file is
167    /// closed. When a file is dropped, errors in synchronizing this in-memory data are ignored.
168    ///
169    /// # Examples
170    ///
171    /// ```no_run
172    /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
173    /// #
174    /// use async_std::fs::File;
175    /// use async_std::prelude::*;
176    ///
177    /// let mut file = File::create("a.txt").await?;
178    /// file.write_all(b"Hello, world!").await?;
179    /// file.sync_all().await?;
180    /// #
181    /// # Ok(()) }) }
182    /// ```
183    pub async fn sync_all(&self) -> io::Result<()> {
184        // Flush the write cache before calling `sync_all()`.
185        let state = future::poll_fn(|cx| {
186            let state = futures_core::ready!(self.lock.poll_lock(cx));
187            state.poll_flush(cx)
188        })
189        .await?;
190
191        spawn_blocking(move || state.file.sync_all()).await?
192    }
193
194    /// Synchronizes OS-internal buffered contents to disk.
195    ///
196    /// This is similar to [`sync_all`], except that file metadata may not be synchronized.
197    ///
198    /// This is intended for use cases that must synchronize the contents of the file, but don't
199    /// need the file metadata synchronized to disk.
200    ///
201    /// Note that some platforms may simply implement this in terms of [`sync_all`].
202    ///
203    /// [`sync_all`]: #method.sync_all
204    ///
205    /// # Examples
206    ///
207    /// ```no_run
208    /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
209    /// #
210    /// use async_std::fs::File;
211    /// use async_std::prelude::*;
212    ///
213    /// let mut file = File::create("a.txt").await?;
214    /// file.write_all(b"Hello, world!").await?;
215    /// file.sync_data().await?;
216    /// #
217    /// # Ok(()) }) }
218    /// ```
219    pub async fn sync_data(&self) -> io::Result<()> {
220        // Flush the write cache before calling `sync_data()`.
221        let state = future::poll_fn(|cx| {
222            let state = futures_core::ready!(self.lock.poll_lock(cx));
223            state.poll_flush(cx)
224        })
225        .await?;
226
227        spawn_blocking(move || state.file.sync_data()).await?
228    }
229
230    /// Truncates or extends the file.
231    ///
232    /// If `size` is less than the current file size, then the file will be truncated. If it is
233    /// greater than the current file size, then the file will be extended to `size` and have all
234    /// intermediate data filled with zeros.
235    ///
236    /// The file's cursor stays at the same position, even if the cursor ends up being past the end
237    /// of the file after this operation.
238    ///
239    /// # Examples
240    ///
241    /// ```no_run
242    /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
243    /// #
244    /// use async_std::fs::File;
245    ///
246    /// let file = File::create("a.txt").await?;
247    /// file.set_len(10).await?;
248    /// #
249    /// # Ok(()) }) }
250    /// ```
251    pub async fn set_len(&self, size: u64) -> io::Result<()> {
252        // Invalidate the read cache and flush the write cache before calling `set_len()`.
253        let state = future::poll_fn(|cx| {
254            let state = futures_core::ready!(self.lock.poll_lock(cx));
255            let state = futures_core::ready!(state.poll_unread(cx))?;
256            state.poll_flush(cx)
257        })
258        .await?;
259
260        spawn_blocking(move || state.file.set_len(size)).await?
261    }
262
263    /// Reads the file's metadata.
264    ///
265    /// # Examples
266    ///
267    /// ```no_run
268    /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
269    /// #
270    /// use async_std::fs::File;
271    ///
272    /// let file = File::open("a.txt").await?;
273    /// let metadata = file.metadata().await?;
274    /// #
275    /// # Ok(()) }) }
276    /// ```
277    pub async fn metadata(&self) -> io::Result<Metadata> {
278        let file = self.file.clone();
279        spawn_blocking(move || file.metadata()).await?
280    }
281
282    /// Changes the permissions on the file.
283    ///
284    /// # Errors
285    ///
286    /// An error will be returned in the following situations:
287    ///
288    /// * The current process lacks permissions to change attributes on the file.
289    /// * Some other I/O error occurred.
290    ///
291    /// # Examples
292    ///
293    /// ```no_run
294    /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
295    /// #
296    /// use async_std::fs::File;
297    ///
298    /// let file = File::create("a.txt").await?;
299    ///
300    /// let mut perms = file.metadata().await?.permissions();
301    /// perms.set_readonly(true);
302    /// file.set_permissions(perms).await?;
303    /// #
304    /// # Ok(()) }) }
305    /// ```
306    pub async fn set_permissions(&self, perm: Permissions) -> io::Result<()> {
307        let file = self.file.clone();
308        spawn_blocking(move || file.set_permissions(perm)).await?
309    }
310}
311
312impl Drop for File {
313    fn drop(&mut self) {
314        // We need to flush the file on drop. Unfortunately, that is not possible to do in a
315        // non-blocking fashion, but our only other option here is losing data remaining in the
316        // write cache. Good task schedulers should be resilient to occasional blocking hiccups in
317        // file destructors so we don't expect this to be a common problem in practice.
318        let _ = self.flush();
319    }
320}
321
322impl fmt::Debug for File {
323    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
324        self.file.fmt(f)
325    }
326}
327
328impl Read for File {
329    fn poll_read(
330        self: Pin<&mut Self>,
331        cx: &mut Context<'_>,
332        buf: &mut [u8],
333    ) -> Poll<io::Result<usize>> {
334        Pin::new(&mut &*self).poll_read(cx, buf)
335    }
336}
337
338impl Read for &File {
339    fn poll_read(
340        self: Pin<&mut Self>,
341        cx: &mut Context<'_>,
342        buf: &mut [u8],
343    ) -> Poll<io::Result<usize>> {
344        let state = futures_core::ready!(self.lock.poll_lock(cx));
345        state.poll_read(cx, buf)
346    }
347}
348
349impl Write for File {
350    fn poll_write(
351        self: Pin<&mut Self>,
352        cx: &mut Context<'_>,
353        buf: &[u8],
354    ) -> Poll<io::Result<usize>> {
355        Pin::new(&mut &*self).poll_write(cx, buf)
356    }
357
358    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
359        Pin::new(&mut &*self).poll_flush(cx)
360    }
361
362    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
363        Pin::new(&mut &*self).poll_close(cx)
364    }
365}
366
367impl Write for &File {
368    fn poll_write(
369        self: Pin<&mut Self>,
370        cx: &mut Context<'_>,
371        buf: &[u8],
372    ) -> Poll<io::Result<usize>> {
373        let state = futures_core::ready!(self.lock.poll_lock(cx));
374        state.poll_write(cx, buf)
375    }
376
377    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
378        let state = futures_core::ready!(self.lock.poll_lock(cx));
379        state.poll_flush(cx).map(|res| res.map(drop))
380    }
381
382    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
383        let state = futures_core::ready!(self.lock.poll_lock(cx));
384        state.poll_close(cx)
385    }
386}
387
388impl Seek for File {
389    fn poll_seek(
390        self: Pin<&mut Self>,
391        cx: &mut Context<'_>,
392        pos: SeekFrom,
393    ) -> Poll<io::Result<u64>> {
394        Pin::new(&mut &*self).poll_seek(cx, pos)
395    }
396}
397
398impl Seek for &File {
399    fn poll_seek(
400        self: Pin<&mut Self>,
401        cx: &mut Context<'_>,
402        pos: SeekFrom,
403    ) -> Poll<io::Result<u64>> {
404        let state = futures_core::ready!(self.lock.poll_lock(cx));
405        state.poll_seek(cx, pos)
406    }
407}
408
409impl From<std::fs::File> for File {
410    fn from(file: std::fs::File) -> File {
411        File::new(file, false)
412    }
413}
414
415cfg_unix! {
416    use crate::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
417
418    impl AsRawFd for File {
419        fn as_raw_fd(&self) -> RawFd {
420            self.file.as_raw_fd()
421        }
422    }
423
424    impl FromRawFd for File {
425        unsafe fn from_raw_fd(fd: RawFd) -> File {
426            std::fs::File::from_raw_fd(fd).into()
427        }
428    }
429
430    impl IntoRawFd for File {
431        fn into_raw_fd(self) -> RawFd {
432            let file = self.file.clone();
433            drop(self);
434            Arc::try_unwrap(file)
435                .expect("cannot acquire ownership of the file handle after drop")
436                .into_raw_fd()
437        }
438    }
439}
440
441cfg_windows! {
442    use crate::os::windows::io::{AsRawHandle, FromRawHandle, IntoRawHandle, RawHandle};
443
444    impl AsRawHandle for File {
445        fn as_raw_handle(&self) -> RawHandle {
446            self.file.as_raw_handle()
447        }
448    }
449
450    impl FromRawHandle for File {
451        unsafe fn from_raw_handle(handle: RawHandle) -> File {
452            std::fs::File::from_raw_handle(handle).into()
453        }
454    }
455
456    impl IntoRawHandle for File {
457        fn into_raw_handle(self) -> RawHandle {
458            let file = self.file.clone();
459            drop(self);
460            Arc::try_unwrap(file)
461                .expect("cannot acquire ownership of the file handle after drop")
462                .into_raw_handle()
463        }
464    }
465}
466
467/// An async mutex with non-borrowing lock guards.
468struct Lock<T>(Arc<LockState<T>>);
469
470unsafe impl<T: Send> Send for Lock<T> {}
471unsafe impl<T: Send> Sync for Lock<T> {}
472
473/// The state of a lock.
474struct LockState<T> {
475    /// Set to `true` when locked.
476    locked: AtomicBool,
477
478    /// The inner value.
479    value: UnsafeCell<T>,
480
481    /// A list of tasks interested in acquiring the lock.
482    wakers: Mutex<Vec<Waker>>,
483}
484
485impl<T> Lock<T> {
486    /// Creates a new lock initialized with `value`.
487    fn new(value: T) -> Lock<T> {
488        Lock(Arc::new(LockState {
489            locked: AtomicBool::new(false),
490            value: UnsafeCell::new(value),
491            wakers: Mutex::new(Vec::new()),
492        }))
493    }
494
495    /// Attempts to acquire the lock.
496    fn poll_lock(&self, cx: &mut Context<'_>) -> Poll<LockGuard<T>> {
497        // Try acquiring the lock.
498        if self.0.locked.swap(true, Ordering::Acquire) {
499            // Lock the list of wakers.
500            let mut list = self.0.wakers.lock().unwrap();
501
502            // Try acquiring the lock again.
503            if self.0.locked.swap(true, Ordering::Acquire) {
504                // If failed again, add the current task to the list and return.
505                if list.iter().all(|w| !w.will_wake(cx.waker())) {
506                    list.push(cx.waker().clone());
507                }
508                return Poll::Pending;
509            }
510        }
511
512        // The lock was successfully acquired.
513        Poll::Ready(LockGuard(self.0.clone()))
514    }
515}
516
517/// A lock guard.
518///
519/// When dropped, ownership of the inner value is returned back to the lock.
520struct LockGuard<T>(Arc<LockState<T>>);
521
522unsafe impl<T: Send> Send for LockGuard<T> {}
523unsafe impl<T: Sync> Sync for LockGuard<T> {}
524
525impl<T> LockGuard<T> {
526    /// Registers a task interested in acquiring the lock.
527    ///
528    /// When this lock guard gets dropped, all registered tasks will be woken up.
529    fn register(&self, cx: &Context<'_>) {
530        let mut list = self.0.wakers.lock().unwrap();
531
532        if list.iter().all(|w| !w.will_wake(cx.waker())) {
533            list.push(cx.waker().clone());
534        }
535    }
536}
537
538impl<T> Drop for LockGuard<T> {
539    fn drop(&mut self) {
540        // Release the lock.
541        self.0.locked.store(false, Ordering::Release);
542
543        // Wake up all registered tasks interested in acquiring the lock.
544        for w in self.0.wakers.lock().unwrap().drain(..) {
545            w.wake();
546        }
547    }
548}
549
550impl<T> Deref for LockGuard<T> {
551    type Target = T;
552
553    fn deref(&self) -> &T {
554        unsafe { &*self.0.value.get() }
555    }
556}
557
558impl<T> DerefMut for LockGuard<T> {
559    fn deref_mut(&mut self) -> &mut T {
560        unsafe { &mut *self.0.value.get() }
561    }
562}
563
564/// Modes a file can be in.
565///
566/// The file can either be in idle mode, reading mode, or writing mode.
567enum Mode {
568    /// The cache is empty.
569    Idle,
570
571    /// The cache contains data read from the inner file.
572    ///
573    /// The `usize` represents how many bytes from the beginning of cache have been consumed.
574    Reading(usize),
575
576    /// The cache contains data that needs to be written to the inner file.
577    Writing,
578}
579
580/// The current state of a file.
581///
582/// The `File` struct protects this state behind a lock.
583///
584/// Filesystem operations that get spawned as blocking tasks will acquire the lock, take ownership
585/// of the state and return it back once the operation completes.
586struct State {
587    /// The inner file.
588    file: Arc<std::fs::File>,
589
590    /// The current mode (idle, reading, or writing).
591    mode: Mode,
592
593    /// The read/write cache.
594    ///
595    /// If in reading mode, the cache contains a chunk of data that has been read from the file.
596    /// If in writing mode, the cache contains data that will eventually be written to the file.
597    cache: Vec<u8>,
598
599    /// Set to `true` if the file is flushed.
600    ///
601    /// When a file is flushed, the write cache and the inner file's buffer are empty.
602    is_flushed: bool,
603
604    /// The last read error that came from an async operation.
605    last_read_err: Option<io::Error>,
606
607    /// The last write error that came from an async operation.
608    last_write_err: Option<io::Error>,
609}
610
611impl LockGuard<State> {
612    /// Seeks to a new position in the file.
613    fn poll_seek(mut self, cx: &mut Context<'_>, pos: SeekFrom) -> Poll<io::Result<u64>> {
614        // If this operation doesn't move the cursor, then poll the current position inside the
615        // file. This call should not block because it doesn't touch the actual file on disk.
616        if pos == SeekFrom::Current(0) {
617            // Poll the internal file cursor.
618            let internal = (&*self.file).seek(SeekFrom::Current(0))?;
619
620            // Factor in the difference caused by caching.
621            let actual = match self.mode {
622                Mode::Idle => internal,
623                Mode::Reading(start) => internal - self.cache.len() as u64 + start as u64,
624                Mode::Writing => internal + self.cache.len() as u64,
625            };
626            return Poll::Ready(Ok(actual));
627        }
628
629        // If the file is in reading mode and the cache will stay valid after seeking, then adjust
630        // the current position in the read cache without invaliding it.
631        if let Mode::Reading(start) = self.mode {
632            if let SeekFrom::Current(diff) = pos {
633                if let Some(new) = (start as i64).checked_add(diff) {
634                    if 0 <= new && new <= self.cache.len() as i64 {
635                        // Poll the internal file cursor.
636                        let internal = (&*self.file).seek(SeekFrom::Current(0))?;
637
638                        // Adjust the current position in the read cache.
639                        self.mode = Mode::Reading(new as usize);
640
641                        // Factor in the difference caused by caching.
642                        return Poll::Ready(Ok(internal - self.cache.len() as u64 + new as u64));
643                    }
644                }
645            }
646        }
647
648        // Invalidate the read cache and flush the write cache before calling `seek()`.
649        self = futures_core::ready!(self.poll_unread(cx))?;
650        self = futures_core::ready!(self.poll_flush(cx))?;
651
652        // Seek to the new position. This call should not block because it only changes the
653        // internal offset into the file and doesn't touch the actual file on disk.
654        Poll::Ready((&*self.file).seek(pos))
655    }
656
657    /// Reads some bytes from the file into a buffer.
658    fn poll_read(mut self, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<io::Result<usize>> {
659        // If an async operation has left a read error, return it now.
660        if let Some(err) = self.last_read_err.take() {
661            return Poll::Ready(Err(err));
662        }
663
664        match self.mode {
665            Mode::Idle => {}
666            Mode::Reading(start) => {
667                // How many bytes in the cache are available for reading.
668                let available = self.cache.len() - start;
669
670                // If there is cached unconsumed data or if the cache is empty, we can read from
671                // it. Empty cache in reading mode indicates that the last operation didn't read
672                // any bytes, i.e. it reached the end of the file.
673                if available > 0 || self.cache.is_empty() {
674                    // Copy data from the cache into the buffer.
675                    let n = cmp::min(available, buf.len());
676                    buf[..n].copy_from_slice(&self.cache[start..n]);
677
678                    // Move the read cursor forward.
679                    self.mode = Mode::Reading(start + n);
680
681                    return Poll::Ready(Ok(n));
682                }
683            }
684            Mode::Writing => {
685                // If we're in writing mode, flush the write cache.
686                self = futures_core::ready!(self.poll_flush(cx))?;
687            }
688        }
689
690        // Make the cache as long as `buf`.
691        if self.cache.len() < buf.len() {
692            let diff = buf.len() - self.cache.len();
693            self.cache.reserve(diff);
694        }
695        unsafe {
696            self.cache.set_len(buf.len());
697        }
698
699        // Register current task's interest in the file lock.
700        self.register(cx);
701
702        // Start a read operation asynchronously.
703        spawn_blocking(move || {
704            // Read some data from the file into the cache.
705            let res = {
706                let State { file, cache, .. } = &mut *self;
707                (&**file).read(cache)
708            };
709
710            match res {
711                Ok(n) => {
712                    // Update cache length and switch to reading mode, starting from index 0.
713                    unsafe {
714                        self.cache.set_len(n);
715                    }
716                    self.mode = Mode::Reading(0);
717                }
718                Err(err) => {
719                    // Save the error and switch to idle mode.
720                    self.cache.clear();
721                    self.mode = Mode::Idle;
722                    self.last_read_err = Some(err);
723                }
724            }
725        });
726
727        Poll::Pending
728    }
729
730    /// Invalidates the read cache.
731    ///
732    /// This method will also move the internal file's cursor backwards by the number of unconsumed
733    /// bytes in the read cache.
734    fn poll_unread(mut self, _: &mut Context<'_>) -> Poll<io::Result<Self>> {
735        match self.mode {
736            Mode::Idle | Mode::Writing => Poll::Ready(Ok(self)),
737            Mode::Reading(start) => {
738                // The number of unconsumed bytes in the read cache.
739                let n = self.cache.len() - start;
740
741                if n > 0 {
742                    // Seek `n` bytes backwards. This call should not block because it only changes
743                    // the internal offset into the file and doesn't touch the actual file on disk.
744                    //
745                    // We ignore errors here because special files like `/dev/random` are not
746                    // seekable.
747                    let _ = (&*self.file).seek(SeekFrom::Current(-(n as i64)));
748                }
749
750                // Switch to idle mode.
751                self.cache.clear();
752                self.mode = Mode::Idle;
753
754                Poll::Ready(Ok(self))
755            }
756        }
757    }
758
759    /// Writes some data from a buffer into the file.
760    fn poll_write(mut self, cx: &mut Context<'_>, buf: &[u8]) -> Poll<io::Result<usize>> {
761        // If an async operation has left a write error, return it now.
762        if let Some(err) = self.last_write_err.take() {
763            return Poll::Ready(Err(err));
764        }
765
766        // If we're in reading mode, invalidate the read buffer.
767        self = futures_core::ready!(self.poll_unread(cx))?;
768
769        // If necessary, grow the cache to have as much capacity as `buf`.
770        if self.cache.capacity() < buf.len() {
771            let diff = buf.len() - self.cache.capacity();
772            self.cache.reserve(diff);
773        }
774
775        // How many bytes can be written into the cache before filling up.
776        let available = self.cache.capacity() - self.cache.len();
777
778        // If there is space available in the cache or if the buffer is empty, we can write data
779        // into the cache.
780        if available > 0 || buf.is_empty() {
781            let n = cmp::min(available, buf.len());
782            let start = self.cache.len();
783
784            // Copy data from the buffer into the cache.
785            unsafe {
786                self.cache.set_len(start + n);
787            }
788            self.cache[start..start + n].copy_from_slice(&buf[..n]);
789
790            // Mark the file as not flushed and switch to writing mode.
791            self.is_flushed = false;
792            self.mode = Mode::Writing;
793            Poll::Ready(Ok(n))
794        } else {
795            // Drain the write cache because it's full.
796            futures_core::ready!(self.poll_drain(cx))?;
797            Poll::Pending
798        }
799    }
800
801    /// Drains the write cache.
802    fn poll_drain(mut self, cx: &mut Context<'_>) -> Poll<io::Result<Self>> {
803        // If an async operation has left a write error, return it now.
804        if let Some(err) = self.last_write_err.take() {
805            return Poll::Ready(Err(err));
806        }
807
808        match self.mode {
809            Mode::Idle | Mode::Reading(..) => Poll::Ready(Ok(self)),
810            Mode::Writing => {
811                // Register current task's interest in the file lock.
812                self.register(cx);
813
814                // Start a write operation asynchronously.
815                spawn_blocking(move || {
816                    match (&*self.file).write_all(&self.cache) {
817                        Ok(_) => {
818                            // Switch to idle mode.
819                            self.cache.clear();
820                            self.mode = Mode::Idle;
821                        }
822                        Err(err) => {
823                            // Save the error.
824                            self.last_write_err = Some(err);
825                        }
826                    };
827                });
828
829                Poll::Pending
830            }
831        }
832    }
833
834    /// Flushes the write cache into the file.
835    fn poll_flush(mut self, cx: &mut Context<'_>) -> Poll<io::Result<Self>> {
836        // If the file is already in flushed state, return.
837        if self.is_flushed {
838            return Poll::Ready(Ok(self));
839        }
840
841        // If there is data in the write cache, drain it.
842        self = futures_core::ready!(self.poll_drain(cx))?;
843
844        // Register current task's interest in the file lock.
845        self.register(cx);
846
847        // Start a flush operation asynchronously.
848        spawn_blocking(move || {
849            match (&*self.file).flush() {
850                Ok(()) => {
851                    // Mark the file as flushed.
852                    self.is_flushed = true;
853                }
854                Err(err) => {
855                    // Save the error.
856                    self.last_write_err = Some(err);
857                }
858            }
859        });
860
861        Poll::Pending
862    }
863
864    // This function does nothing because we're not sure about `AsyncWrite::poll_close()`'s exact
865    // semantics nor whether it will stay in the `AsyncWrite` trait.
866    fn poll_close(self, _: &mut Context<'_>) -> Poll<io::Result<()>> {
867        Poll::Ready(Ok(()))
868    }
869}