async_std/fs/file.rs
1use std::cell::UnsafeCell;
2use std::cmp;
3use std::fmt;
4use std::io::{Read as _, Seek as _, Write as _};
5use std::ops::{Deref, DerefMut};
6use std::pin::Pin;
7use std::sync::atomic::{AtomicBool, Ordering};
8use std::sync::{Arc, Mutex};
9
10use crate::fs::{Metadata, Permissions};
11use crate::future;
12use crate::io::{self, Read, Seek, SeekFrom, Write};
13use crate::path::Path;
14use crate::prelude::*;
15use crate::task::{spawn_blocking, Context, Poll, Waker};
16use crate::utils::Context as _;
17
18/// An open file on the filesystem.
19///
20/// Depending on what options the file was opened with, this type can be used for reading and/or
21/// writing.
22///
23/// Files are automatically closed when they get dropped and any errors detected on closing are
24/// ignored. Use the [`sync_all`] method before dropping a file if such errors need to be handled.
25///
26/// This type is an async version of [`std::fs::File`].
27///
28/// [`sync_all`]: #method.sync_all
29/// [`std::fs::File`]: https://doc.rust-lang.org/std/fs/struct.File.html
30///
31/// # Examples
32///
33/// Create a new file and write some bytes to it:
34///
35/// ```no_run
36/// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
37/// #
38/// use async_std::fs::File;
39/// use async_std::prelude::*;
40///
41/// let mut file = File::create("a.txt").await?;
42/// file.write_all(b"Hello, world!").await?;
43/// #
44/// # Ok(()) }) }
45/// ```
46///
47/// Read the contents of a file into a vector of bytes:
48///
49/// ```no_run
50/// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
51/// #
52/// use async_std::fs::File;
53/// use async_std::prelude::*;
54///
55/// let mut file = File::open("a.txt").await?;
56/// let mut contents = Vec::new();
57/// file.read_to_end(&mut contents).await?;
58/// #
59/// # Ok(()) }) }
60/// ```
61pub struct File {
62 /// A reference to the inner file.
63 file: Arc<std::fs::File>,
64
65 /// The state of the file protected by an async lock.
66 lock: Lock<State>,
67}
68
69impl File {
70 /// Creates an async file handle.
71 pub(crate) fn new(file: std::fs::File, is_flushed: bool) -> File {
72 let file = Arc::new(file);
73
74 File {
75 file: file.clone(),
76 lock: Lock::new(State {
77 file,
78 mode: Mode::Idle,
79 cache: Vec::new(),
80 is_flushed,
81 last_read_err: None,
82 last_write_err: None,
83 }),
84 }
85 }
86
87 /// Opens a file in read-only mode.
88 ///
89 /// See the [`OpenOptions::open`] function for more options.
90 ///
91 /// # Errors
92 ///
93 /// An error will be returned in the following situations:
94 ///
95 /// * `path` does not point to an existing file.
96 /// * The current process lacks permissions to read the file.
97 /// * Some other I/O error occurred.
98 ///
99 /// For more details, see the list of errors documented by [`OpenOptions::open`].
100 ///
101 /// [`OpenOptions::open`]: struct.OpenOptions.html#method.open
102 ///
103 /// # Examples
104 ///
105 /// ```no_run
106 /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
107 /// #
108 /// use async_std::fs::File;
109 ///
110 /// let file = File::open("a.txt").await?;
111 /// #
112 /// # Ok(()) }) }
113 /// ```
114 pub async fn open<P: AsRef<Path>>(path: P) -> io::Result<File> {
115 let path = path.as_ref().to_owned();
116 let file = spawn_blocking(move || {
117 std::fs::File::open(&path).context(|| format!("could not open `{}`", path.display()))
118 })
119 .await??;
120 Ok(File::new(file, true))
121 }
122
123 /// Opens a file in write-only mode.
124 ///
125 /// This function will create a file if it does not exist, and will truncate it if it does.
126 ///
127 /// See the [`OpenOptions::open`] function for more options.
128 ///
129 /// # Errors
130 ///
131 /// An error will be returned in the following situations:
132 ///
133 /// * The file's parent directory does not exist.
134 /// * The current process lacks permissions to write to the file.
135 /// * Some other I/O error occurred.
136 ///
137 /// For more details, see the list of errors documented by [`OpenOptions::open`].
138 ///
139 /// [`OpenOptions::open`]: struct.OpenOptions.html#method.open
140 ///
141 /// # Examples
142 ///
143 /// ```no_run
144 /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
145 /// #
146 /// use async_std::fs::File;
147 ///
148 /// let file = File::create("a.txt").await?;
149 /// #
150 /// # Ok(()) }) }
151 /// ```
152 pub async fn create<P: AsRef<Path>>(path: P) -> io::Result<File> {
153 let path = path.as_ref().to_owned();
154 let file = spawn_blocking(move || {
155 std::fs::File::create(&path)
156 .context(|| format!("could not create `{}`", path.display()))
157 })
158 .await??;
159 Ok(File::new(file, true))
160 }
161
162 /// Synchronizes OS-internal buffered contents and metadata to disk.
163 ///
164 /// This function will ensure that all in-memory data reaches the filesystem.
165 ///
166 /// This can be used to handle errors that would otherwise only be caught when the file is
167 /// closed. When a file is dropped, errors in synchronizing this in-memory data are ignored.
168 ///
169 /// # Examples
170 ///
171 /// ```no_run
172 /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
173 /// #
174 /// use async_std::fs::File;
175 /// use async_std::prelude::*;
176 ///
177 /// let mut file = File::create("a.txt").await?;
178 /// file.write_all(b"Hello, world!").await?;
179 /// file.sync_all().await?;
180 /// #
181 /// # Ok(()) }) }
182 /// ```
183 pub async fn sync_all(&self) -> io::Result<()> {
184 // Flush the write cache before calling `sync_all()`.
185 let state = future::poll_fn(|cx| {
186 let state = futures_core::ready!(self.lock.poll_lock(cx));
187 state.poll_flush(cx)
188 })
189 .await?;
190
191 spawn_blocking(move || state.file.sync_all()).await?
192 }
193
194 /// Synchronizes OS-internal buffered contents to disk.
195 ///
196 /// This is similar to [`sync_all`], except that file metadata may not be synchronized.
197 ///
198 /// This is intended for use cases that must synchronize the contents of the file, but don't
199 /// need the file metadata synchronized to disk.
200 ///
201 /// Note that some platforms may simply implement this in terms of [`sync_all`].
202 ///
203 /// [`sync_all`]: #method.sync_all
204 ///
205 /// # Examples
206 ///
207 /// ```no_run
208 /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
209 /// #
210 /// use async_std::fs::File;
211 /// use async_std::prelude::*;
212 ///
213 /// let mut file = File::create("a.txt").await?;
214 /// file.write_all(b"Hello, world!").await?;
215 /// file.sync_data().await?;
216 /// #
217 /// # Ok(()) }) }
218 /// ```
219 pub async fn sync_data(&self) -> io::Result<()> {
220 // Flush the write cache before calling `sync_data()`.
221 let state = future::poll_fn(|cx| {
222 let state = futures_core::ready!(self.lock.poll_lock(cx));
223 state.poll_flush(cx)
224 })
225 .await?;
226
227 spawn_blocking(move || state.file.sync_data()).await?
228 }
229
230 /// Truncates or extends the file.
231 ///
232 /// If `size` is less than the current file size, then the file will be truncated. If it is
233 /// greater than the current file size, then the file will be extended to `size` and have all
234 /// intermediate data filled with zeros.
235 ///
236 /// The file's cursor stays at the same position, even if the cursor ends up being past the end
237 /// of the file after this operation.
238 ///
239 /// # Examples
240 ///
241 /// ```no_run
242 /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
243 /// #
244 /// use async_std::fs::File;
245 ///
246 /// let file = File::create("a.txt").await?;
247 /// file.set_len(10).await?;
248 /// #
249 /// # Ok(()) }) }
250 /// ```
251 pub async fn set_len(&self, size: u64) -> io::Result<()> {
252 // Invalidate the read cache and flush the write cache before calling `set_len()`.
253 let state = future::poll_fn(|cx| {
254 let state = futures_core::ready!(self.lock.poll_lock(cx));
255 let state = futures_core::ready!(state.poll_unread(cx))?;
256 state.poll_flush(cx)
257 })
258 .await?;
259
260 spawn_blocking(move || state.file.set_len(size)).await?
261 }
262
263 /// Reads the file's metadata.
264 ///
265 /// # Examples
266 ///
267 /// ```no_run
268 /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
269 /// #
270 /// use async_std::fs::File;
271 ///
272 /// let file = File::open("a.txt").await?;
273 /// let metadata = file.metadata().await?;
274 /// #
275 /// # Ok(()) }) }
276 /// ```
277 pub async fn metadata(&self) -> io::Result<Metadata> {
278 let file = self.file.clone();
279 spawn_blocking(move || file.metadata()).await?
280 }
281
282 /// Changes the permissions on the file.
283 ///
284 /// # Errors
285 ///
286 /// An error will be returned in the following situations:
287 ///
288 /// * The current process lacks permissions to change attributes on the file.
289 /// * Some other I/O error occurred.
290 ///
291 /// # Examples
292 ///
293 /// ```no_run
294 /// # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
295 /// #
296 /// use async_std::fs::File;
297 ///
298 /// let file = File::create("a.txt").await?;
299 ///
300 /// let mut perms = file.metadata().await?.permissions();
301 /// perms.set_readonly(true);
302 /// file.set_permissions(perms).await?;
303 /// #
304 /// # Ok(()) }) }
305 /// ```
306 pub async fn set_permissions(&self, perm: Permissions) -> io::Result<()> {
307 let file = self.file.clone();
308 spawn_blocking(move || file.set_permissions(perm)).await?
309 }
310}
311
312impl Drop for File {
313 fn drop(&mut self) {
314 // We need to flush the file on drop. Unfortunately, that is not possible to do in a
315 // non-blocking fashion, but our only other option here is losing data remaining in the
316 // write cache. Good task schedulers should be resilient to occasional blocking hiccups in
317 // file destructors so we don't expect this to be a common problem in practice.
318 let _ = self.flush();
319 }
320}
321
322impl fmt::Debug for File {
323 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
324 self.file.fmt(f)
325 }
326}
327
328impl Read for File {
329 fn poll_read(
330 self: Pin<&mut Self>,
331 cx: &mut Context<'_>,
332 buf: &mut [u8],
333 ) -> Poll<io::Result<usize>> {
334 Pin::new(&mut &*self).poll_read(cx, buf)
335 }
336}
337
338impl Read for &File {
339 fn poll_read(
340 self: Pin<&mut Self>,
341 cx: &mut Context<'_>,
342 buf: &mut [u8],
343 ) -> Poll<io::Result<usize>> {
344 let state = futures_core::ready!(self.lock.poll_lock(cx));
345 state.poll_read(cx, buf)
346 }
347}
348
349impl Write for File {
350 fn poll_write(
351 self: Pin<&mut Self>,
352 cx: &mut Context<'_>,
353 buf: &[u8],
354 ) -> Poll<io::Result<usize>> {
355 Pin::new(&mut &*self).poll_write(cx, buf)
356 }
357
358 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
359 Pin::new(&mut &*self).poll_flush(cx)
360 }
361
362 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
363 Pin::new(&mut &*self).poll_close(cx)
364 }
365}
366
367impl Write for &File {
368 fn poll_write(
369 self: Pin<&mut Self>,
370 cx: &mut Context<'_>,
371 buf: &[u8],
372 ) -> Poll<io::Result<usize>> {
373 let state = futures_core::ready!(self.lock.poll_lock(cx));
374 state.poll_write(cx, buf)
375 }
376
377 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
378 let state = futures_core::ready!(self.lock.poll_lock(cx));
379 state.poll_flush(cx).map(|res| res.map(drop))
380 }
381
382 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
383 let state = futures_core::ready!(self.lock.poll_lock(cx));
384 state.poll_close(cx)
385 }
386}
387
388impl Seek for File {
389 fn poll_seek(
390 self: Pin<&mut Self>,
391 cx: &mut Context<'_>,
392 pos: SeekFrom,
393 ) -> Poll<io::Result<u64>> {
394 Pin::new(&mut &*self).poll_seek(cx, pos)
395 }
396}
397
398impl Seek for &File {
399 fn poll_seek(
400 self: Pin<&mut Self>,
401 cx: &mut Context<'_>,
402 pos: SeekFrom,
403 ) -> Poll<io::Result<u64>> {
404 let state = futures_core::ready!(self.lock.poll_lock(cx));
405 state.poll_seek(cx, pos)
406 }
407}
408
409impl From<std::fs::File> for File {
410 fn from(file: std::fs::File) -> File {
411 File::new(file, false)
412 }
413}
414
415cfg_unix! {
416 use crate::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
417
418 impl AsRawFd for File {
419 fn as_raw_fd(&self) -> RawFd {
420 self.file.as_raw_fd()
421 }
422 }
423
424 impl FromRawFd for File {
425 unsafe fn from_raw_fd(fd: RawFd) -> File {
426 std::fs::File::from_raw_fd(fd).into()
427 }
428 }
429
430 impl IntoRawFd for File {
431 fn into_raw_fd(self) -> RawFd {
432 let file = self.file.clone();
433 drop(self);
434 Arc::try_unwrap(file)
435 .expect("cannot acquire ownership of the file handle after drop")
436 .into_raw_fd()
437 }
438 }
439}
440
441cfg_windows! {
442 use crate::os::windows::io::{AsRawHandle, FromRawHandle, IntoRawHandle, RawHandle};
443
444 impl AsRawHandle for File {
445 fn as_raw_handle(&self) -> RawHandle {
446 self.file.as_raw_handle()
447 }
448 }
449
450 impl FromRawHandle for File {
451 unsafe fn from_raw_handle(handle: RawHandle) -> File {
452 std::fs::File::from_raw_handle(handle).into()
453 }
454 }
455
456 impl IntoRawHandle for File {
457 fn into_raw_handle(self) -> RawHandle {
458 let file = self.file.clone();
459 drop(self);
460 Arc::try_unwrap(file)
461 .expect("cannot acquire ownership of the file handle after drop")
462 .into_raw_handle()
463 }
464 }
465}
466
467/// An async mutex with non-borrowing lock guards.
468struct Lock<T>(Arc<LockState<T>>);
469
470unsafe impl<T: Send> Send for Lock<T> {}
471unsafe impl<T: Send> Sync for Lock<T> {}
472
473/// The state of a lock.
474struct LockState<T> {
475 /// Set to `true` when locked.
476 locked: AtomicBool,
477
478 /// The inner value.
479 value: UnsafeCell<T>,
480
481 /// A list of tasks interested in acquiring the lock.
482 wakers: Mutex<Vec<Waker>>,
483}
484
485impl<T> Lock<T> {
486 /// Creates a new lock initialized with `value`.
487 fn new(value: T) -> Lock<T> {
488 Lock(Arc::new(LockState {
489 locked: AtomicBool::new(false),
490 value: UnsafeCell::new(value),
491 wakers: Mutex::new(Vec::new()),
492 }))
493 }
494
495 /// Attempts to acquire the lock.
496 fn poll_lock(&self, cx: &mut Context<'_>) -> Poll<LockGuard<T>> {
497 // Try acquiring the lock.
498 if self.0.locked.swap(true, Ordering::Acquire) {
499 // Lock the list of wakers.
500 let mut list = self.0.wakers.lock().unwrap();
501
502 // Try acquiring the lock again.
503 if self.0.locked.swap(true, Ordering::Acquire) {
504 // If failed again, add the current task to the list and return.
505 if list.iter().all(|w| !w.will_wake(cx.waker())) {
506 list.push(cx.waker().clone());
507 }
508 return Poll::Pending;
509 }
510 }
511
512 // The lock was successfully acquired.
513 Poll::Ready(LockGuard(self.0.clone()))
514 }
515}
516
517/// A lock guard.
518///
519/// When dropped, ownership of the inner value is returned back to the lock.
520struct LockGuard<T>(Arc<LockState<T>>);
521
522unsafe impl<T: Send> Send for LockGuard<T> {}
523unsafe impl<T: Sync> Sync for LockGuard<T> {}
524
525impl<T> LockGuard<T> {
526 /// Registers a task interested in acquiring the lock.
527 ///
528 /// When this lock guard gets dropped, all registered tasks will be woken up.
529 fn register(&self, cx: &Context<'_>) {
530 let mut list = self.0.wakers.lock().unwrap();
531
532 if list.iter().all(|w| !w.will_wake(cx.waker())) {
533 list.push(cx.waker().clone());
534 }
535 }
536}
537
538impl<T> Drop for LockGuard<T> {
539 fn drop(&mut self) {
540 // Release the lock.
541 self.0.locked.store(false, Ordering::Release);
542
543 // Wake up all registered tasks interested in acquiring the lock.
544 for w in self.0.wakers.lock().unwrap().drain(..) {
545 w.wake();
546 }
547 }
548}
549
550impl<T> Deref for LockGuard<T> {
551 type Target = T;
552
553 fn deref(&self) -> &T {
554 unsafe { &*self.0.value.get() }
555 }
556}
557
558impl<T> DerefMut for LockGuard<T> {
559 fn deref_mut(&mut self) -> &mut T {
560 unsafe { &mut *self.0.value.get() }
561 }
562}
563
564/// Modes a file can be in.
565///
566/// The file can either be in idle mode, reading mode, or writing mode.
567enum Mode {
568 /// The cache is empty.
569 Idle,
570
571 /// The cache contains data read from the inner file.
572 ///
573 /// The `usize` represents how many bytes from the beginning of cache have been consumed.
574 Reading(usize),
575
576 /// The cache contains data that needs to be written to the inner file.
577 Writing,
578}
579
580/// The current state of a file.
581///
582/// The `File` struct protects this state behind a lock.
583///
584/// Filesystem operations that get spawned as blocking tasks will acquire the lock, take ownership
585/// of the state and return it back once the operation completes.
586struct State {
587 /// The inner file.
588 file: Arc<std::fs::File>,
589
590 /// The current mode (idle, reading, or writing).
591 mode: Mode,
592
593 /// The read/write cache.
594 ///
595 /// If in reading mode, the cache contains a chunk of data that has been read from the file.
596 /// If in writing mode, the cache contains data that will eventually be written to the file.
597 cache: Vec<u8>,
598
599 /// Set to `true` if the file is flushed.
600 ///
601 /// When a file is flushed, the write cache and the inner file's buffer are empty.
602 is_flushed: bool,
603
604 /// The last read error that came from an async operation.
605 last_read_err: Option<io::Error>,
606
607 /// The last write error that came from an async operation.
608 last_write_err: Option<io::Error>,
609}
610
611impl LockGuard<State> {
612 /// Seeks to a new position in the file.
613 fn poll_seek(mut self, cx: &mut Context<'_>, pos: SeekFrom) -> Poll<io::Result<u64>> {
614 // If this operation doesn't move the cursor, then poll the current position inside the
615 // file. This call should not block because it doesn't touch the actual file on disk.
616 if pos == SeekFrom::Current(0) {
617 // Poll the internal file cursor.
618 let internal = (&*self.file).seek(SeekFrom::Current(0))?;
619
620 // Factor in the difference caused by caching.
621 let actual = match self.mode {
622 Mode::Idle => internal,
623 Mode::Reading(start) => internal - self.cache.len() as u64 + start as u64,
624 Mode::Writing => internal + self.cache.len() as u64,
625 };
626 return Poll::Ready(Ok(actual));
627 }
628
629 // If the file is in reading mode and the cache will stay valid after seeking, then adjust
630 // the current position in the read cache without invaliding it.
631 if let Mode::Reading(start) = self.mode {
632 if let SeekFrom::Current(diff) = pos {
633 if let Some(new) = (start as i64).checked_add(diff) {
634 if 0 <= new && new <= self.cache.len() as i64 {
635 // Poll the internal file cursor.
636 let internal = (&*self.file).seek(SeekFrom::Current(0))?;
637
638 // Adjust the current position in the read cache.
639 self.mode = Mode::Reading(new as usize);
640
641 // Factor in the difference caused by caching.
642 return Poll::Ready(Ok(internal - self.cache.len() as u64 + new as u64));
643 }
644 }
645 }
646 }
647
648 // Invalidate the read cache and flush the write cache before calling `seek()`.
649 self = futures_core::ready!(self.poll_unread(cx))?;
650 self = futures_core::ready!(self.poll_flush(cx))?;
651
652 // Seek to the new position. This call should not block because it only changes the
653 // internal offset into the file and doesn't touch the actual file on disk.
654 Poll::Ready((&*self.file).seek(pos))
655 }
656
657 /// Reads some bytes from the file into a buffer.
658 fn poll_read(mut self, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll<io::Result<usize>> {
659 // If an async operation has left a read error, return it now.
660 if let Some(err) = self.last_read_err.take() {
661 return Poll::Ready(Err(err));
662 }
663
664 match self.mode {
665 Mode::Idle => {}
666 Mode::Reading(start) => {
667 // How many bytes in the cache are available for reading.
668 let available = self.cache.len() - start;
669
670 // If there is cached unconsumed data or if the cache is empty, we can read from
671 // it. Empty cache in reading mode indicates that the last operation didn't read
672 // any bytes, i.e. it reached the end of the file.
673 if available > 0 || self.cache.is_empty() {
674 // Copy data from the cache into the buffer.
675 let n = cmp::min(available, buf.len());
676 buf[..n].copy_from_slice(&self.cache[start..n]);
677
678 // Move the read cursor forward.
679 self.mode = Mode::Reading(start + n);
680
681 return Poll::Ready(Ok(n));
682 }
683 }
684 Mode::Writing => {
685 // If we're in writing mode, flush the write cache.
686 self = futures_core::ready!(self.poll_flush(cx))?;
687 }
688 }
689
690 // Make the cache as long as `buf`.
691 if self.cache.len() < buf.len() {
692 let diff = buf.len() - self.cache.len();
693 self.cache.reserve(diff);
694 }
695 unsafe {
696 self.cache.set_len(buf.len());
697 }
698
699 // Register current task's interest in the file lock.
700 self.register(cx);
701
702 // Start a read operation asynchronously.
703 spawn_blocking(move || {
704 // Read some data from the file into the cache.
705 let res = {
706 let State { file, cache, .. } = &mut *self;
707 (&**file).read(cache)
708 };
709
710 match res {
711 Ok(n) => {
712 // Update cache length and switch to reading mode, starting from index 0.
713 unsafe {
714 self.cache.set_len(n);
715 }
716 self.mode = Mode::Reading(0);
717 }
718 Err(err) => {
719 // Save the error and switch to idle mode.
720 self.cache.clear();
721 self.mode = Mode::Idle;
722 self.last_read_err = Some(err);
723 }
724 }
725 });
726
727 Poll::Pending
728 }
729
730 /// Invalidates the read cache.
731 ///
732 /// This method will also move the internal file's cursor backwards by the number of unconsumed
733 /// bytes in the read cache.
734 fn poll_unread(mut self, _: &mut Context<'_>) -> Poll<io::Result<Self>> {
735 match self.mode {
736 Mode::Idle | Mode::Writing => Poll::Ready(Ok(self)),
737 Mode::Reading(start) => {
738 // The number of unconsumed bytes in the read cache.
739 let n = self.cache.len() - start;
740
741 if n > 0 {
742 // Seek `n` bytes backwards. This call should not block because it only changes
743 // the internal offset into the file and doesn't touch the actual file on disk.
744 //
745 // We ignore errors here because special files like `/dev/random` are not
746 // seekable.
747 let _ = (&*self.file).seek(SeekFrom::Current(-(n as i64)));
748 }
749
750 // Switch to idle mode.
751 self.cache.clear();
752 self.mode = Mode::Idle;
753
754 Poll::Ready(Ok(self))
755 }
756 }
757 }
758
759 /// Writes some data from a buffer into the file.
760 fn poll_write(mut self, cx: &mut Context<'_>, buf: &[u8]) -> Poll<io::Result<usize>> {
761 // If an async operation has left a write error, return it now.
762 if let Some(err) = self.last_write_err.take() {
763 return Poll::Ready(Err(err));
764 }
765
766 // If we're in reading mode, invalidate the read buffer.
767 self = futures_core::ready!(self.poll_unread(cx))?;
768
769 // If necessary, grow the cache to have as much capacity as `buf`.
770 if self.cache.capacity() < buf.len() {
771 let diff = buf.len() - self.cache.capacity();
772 self.cache.reserve(diff);
773 }
774
775 // How many bytes can be written into the cache before filling up.
776 let available = self.cache.capacity() - self.cache.len();
777
778 // If there is space available in the cache or if the buffer is empty, we can write data
779 // into the cache.
780 if available > 0 || buf.is_empty() {
781 let n = cmp::min(available, buf.len());
782 let start = self.cache.len();
783
784 // Copy data from the buffer into the cache.
785 unsafe {
786 self.cache.set_len(start + n);
787 }
788 self.cache[start..start + n].copy_from_slice(&buf[..n]);
789
790 // Mark the file as not flushed and switch to writing mode.
791 self.is_flushed = false;
792 self.mode = Mode::Writing;
793 Poll::Ready(Ok(n))
794 } else {
795 // Drain the write cache because it's full.
796 futures_core::ready!(self.poll_drain(cx))?;
797 Poll::Pending
798 }
799 }
800
801 /// Drains the write cache.
802 fn poll_drain(mut self, cx: &mut Context<'_>) -> Poll<io::Result<Self>> {
803 // If an async operation has left a write error, return it now.
804 if let Some(err) = self.last_write_err.take() {
805 return Poll::Ready(Err(err));
806 }
807
808 match self.mode {
809 Mode::Idle | Mode::Reading(..) => Poll::Ready(Ok(self)),
810 Mode::Writing => {
811 // Register current task's interest in the file lock.
812 self.register(cx);
813
814 // Start a write operation asynchronously.
815 spawn_blocking(move || {
816 match (&*self.file).write_all(&self.cache) {
817 Ok(_) => {
818 // Switch to idle mode.
819 self.cache.clear();
820 self.mode = Mode::Idle;
821 }
822 Err(err) => {
823 // Save the error.
824 self.last_write_err = Some(err);
825 }
826 };
827 });
828
829 Poll::Pending
830 }
831 }
832 }
833
834 /// Flushes the write cache into the file.
835 fn poll_flush(mut self, cx: &mut Context<'_>) -> Poll<io::Result<Self>> {
836 // If the file is already in flushed state, return.
837 if self.is_flushed {
838 return Poll::Ready(Ok(self));
839 }
840
841 // If there is data in the write cache, drain it.
842 self = futures_core::ready!(self.poll_drain(cx))?;
843
844 // Register current task's interest in the file lock.
845 self.register(cx);
846
847 // Start a flush operation asynchronously.
848 spawn_blocking(move || {
849 match (&*self.file).flush() {
850 Ok(()) => {
851 // Mark the file as flushed.
852 self.is_flushed = true;
853 }
854 Err(err) => {
855 // Save the error.
856 self.last_write_err = Some(err);
857 }
858 }
859 });
860
861 Poll::Pending
862 }
863
864 // This function does nothing because we're not sure about `AsyncWrite::poll_close()`'s exact
865 // semantics nor whether it will stay in the `AsyncWrite` trait.
866 fn poll_close(self, _: &mut Context<'_>) -> Poll<io::Result<()>> {
867 Poll::Ready(Ok(()))
868 }
869}