use crate::{
os::windows::{winprelude::*, FileHandle, NeedsFlush},
UnpinExt, LOCK_POISON,
};
use std::{
future::{self, Future},
io,
sync::{atomic::Ordering::*, Mutex},
task::{ready, Context, Poll},
};
use tokio::task::JoinHandle;
type FlushJH = JoinHandle<io::Result<()>>;
#[derive(Debug)]
pub struct TokioFlusher {
join_handle: Mutex<Option<FlushJH>>,
}
impl TokioFlusher {
pub(crate) const fn new() -> Self {
Self {
join_handle: Mutex::new(None),
}
}
#[inline]
pub(crate) async fn flush_atomic(
&self,
file_handle: BorrowedHandle<'_>,
needs_flush: &NeedsFlush,
) -> io::Result<()> {
future::poll_fn(|cx| self.poll_flush_atomic(file_handle, needs_flush, cx)).await
}
pub(crate) fn poll_flush_atomic(
&self,
file_handle: BorrowedHandle<'_>,
needs_flush: &NeedsFlush,
cx: &mut Context<'_>,
) -> Poll<io::Result<()>> {
if !needs_flush.get(Acquire) {
return Poll::Ready(Ok(()));
}
let mut flush = self.join_handle.lock().expect(LOCK_POISON);
if !needs_flush.get(Relaxed) {
return Poll::Ready(Ok(()));
}
let jh = Self::ensure_flush_start(&mut flush, file_handle);
let rslt = ready!(jh.pin().poll(cx)).unwrap();
if rslt.is_ok() {
needs_flush.clear();
}
*flush = None;
Poll::Ready(rslt)
}
pub(crate) fn poll_flush_mut(
&self,
file_handle: BorrowedHandle<'_>,
needs_flush: &mut bool,
cx: &mut Context<'_>,
) -> Poll<io::Result<()>> {
if !*needs_flush {
return Poll::Ready(Ok(()));
}
let mut flush = self.join_handle.lock().expect(LOCK_POISON);
let jh = Self::ensure_flush_start(&mut flush, file_handle);
let rslt = ready!(jh.pin().poll(cx)).unwrap();
if rslt.is_ok() {
*needs_flush = false;
}
*flush = None;
Poll::Ready(rslt)
}
fn ensure_flush_start<'opt>(
join_handle: &'opt mut Option<FlushJH>,
file_handle: BorrowedHandle<'_>,
) -> &'opt mut FlushJH {
if let Some(jh) = join_handle {
return jh;
}
let handle = file_handle.as_int_handle();
let task = tokio::task::spawn_blocking(move || FileHandle::flush_hndl(handle));
join_handle.insert(task)
}
}
impl Default for TokioFlusher {
#[inline]
fn default() -> Self {
Self::new()
}
}