use async_trait::async_trait;
use futures::stream::iter;
use futures::StreamExt;
use off64::usz;
use off64::Off64AsyncRead;
use off64::Off64AsyncWrite;
use off64::Off64Read;
use off64::Off64Write;
use signal_future::SignalFuture;
use signal_future::SignalFutureController;
use std::collections::HashMap;
use std::io::SeekFrom;
#[cfg(feature = "tokio_file")]
use std::os::unix::prelude::FileExt;
use std::path::Path;
use std::sync::atomic::AtomicU64;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::time::Duration;
use tokio::fs::File;
use tokio::fs::OpenOptions;
use tokio::io;
use tokio::io::AsyncSeekExt;
use tokio::sync::Mutex;
use tokio::task::spawn_blocking;
use tokio::time::sleep;
use tokio::time::Instant;
pub async fn get_file_len_via_seek(path: &Path) -> io::Result<u64> {
let mut file = File::open(path).await?;
file.seek(SeekFrom::End(0)).await
}
fn dur_us(dur: Instant) -> u64 {
dur.elapsed().as_micros().try_into().unwrap()
}
pub struct WriteRequest {
data: Vec<u8>,
offset: u64,
deduplicate_seq: Option<u64>,
}
impl WriteRequest {
pub fn new(offset: u64, data: Vec<u8>) -> Self {
Self {
data,
offset,
deduplicate_seq: None,
}
}
pub fn deduplicated(offset: u64, seq: u64, data: Vec<u8>) -> Self {
Self {
data,
offset,
deduplicate_seq: Some(seq),
}
}
}
#[derive(Default)]
struct PendingDeduplicatedWrites {
data: HashMap<u64, Vec<u8>>,
seen_seq: HashMap<u64, u64>,
}
struct PendingSyncState {
earliest_unsynced: Option<Instant>, latest_unsynced: Option<Instant>,
pending_sync_fut_states: Vec<SignalFutureController>,
pending_deduplicated_writes: PendingDeduplicatedWrites,
}
#[derive(Default, Debug)]
pub struct SeekableAsyncFileMetrics {
sync_background_loops_counter: AtomicU64,
sync_counter: AtomicU64,
sync_delayed_counter: AtomicU64,
sync_longest_delay_us_counter: AtomicU64,
sync_shortest_delay_us_counter: AtomicU64,
sync_us_counter: AtomicU64,
write_bytes_counter: AtomicU64,
write_counter: AtomicU64,
write_us_counter: AtomicU64,
}
impl SeekableAsyncFileMetrics {
pub fn sync_background_loops_counter(&self) -> u64 {
self.sync_background_loops_counter.load(Ordering::Relaxed)
}
pub fn sync_counter(&self) -> u64 {
self.sync_counter.load(Ordering::Relaxed)
}
pub fn sync_delayed_counter(&self) -> u64 {
self.sync_delayed_counter.load(Ordering::Relaxed)
}
pub fn sync_longest_delay_us_counter(&self) -> u64 {
self.sync_longest_delay_us_counter.load(Ordering::Relaxed)
}
pub fn sync_shortest_delay_us_counter(&self) -> u64 {
self.sync_shortest_delay_us_counter.load(Ordering::Relaxed)
}
pub fn sync_us_counter(&self) -> u64 {
self.sync_us_counter.load(Ordering::Relaxed)
}
pub fn write_bytes_counter(&self) -> u64 {
self.write_bytes_counter.load(Ordering::Relaxed)
}
pub fn write_counter(&self) -> u64 {
self.write_counter.load(Ordering::Relaxed)
}
pub fn write_us_counter(&self) -> u64 {
self.write_us_counter.load(Ordering::Relaxed)
}
}
#[derive(Clone)]
pub struct SeekableAsyncFile {
#[cfg(feature = "tokio_file")]
fd: Arc<std::fs::File>,
#[cfg(feature = "mmap")]
mmap: Arc<memmap2::MmapRaw>,
#[cfg(feature = "mmap")]
mmap_len: usize,
sync_delay_us: u64,
metrics: Arc<SeekableAsyncFileMetrics>,
pending_sync_state: Arc<Mutex<PendingSyncState>>,
}
impl SeekableAsyncFile {
pub async fn open(
path: &Path,
#[cfg(feature = "mmap")] size: u64,
metrics: Arc<SeekableAsyncFileMetrics>,
sync_delay: Duration,
flags: i32,
) -> Self {
let async_fd = OpenOptions::new()
.read(true)
.write(true)
.custom_flags(flags)
.open(path)
.await
.unwrap();
let fd = async_fd.into_std().await;
SeekableAsyncFile {
#[cfg(feature = "tokio_file")]
fd: Arc::new(fd),
#[cfg(feature = "mmap")]
mmap: Arc::new(memmap2::MmapRaw::map_raw(&fd).unwrap()),
#[cfg(feature = "mmap")]
mmap_len: usz!(size),
sync_delay_us: sync_delay.as_micros().try_into().unwrap(),
metrics,
pending_sync_state: Arc::new(Mutex::new(PendingSyncState {
earliest_unsynced: None,
latest_unsynced: None,
pending_sync_fut_states: Vec::new(),
pending_deduplicated_writes: PendingDeduplicatedWrites::default(),
})),
}
}
#[cfg(feature = "mmap")]
pub unsafe fn get_mmap_raw_ptr(&self, offset: u64) -> *const u8 {
self.mmap.as_ptr().add(usz!(offset))
}
#[cfg(feature = "mmap")]
pub unsafe fn get_mmap_raw_mut_ptr(&self, offset: u64) -> *mut u8 {
self.mmap.as_mut_ptr().add(usz!(offset))
}
fn bump_write_metrics(&self, len: u64, call_us: u64) {
self
.metrics
.write_bytes_counter
.fetch_add(len, Ordering::Relaxed);
self.metrics.write_counter.fetch_add(1, Ordering::Relaxed);
self
.metrics
.write_us_counter
.fetch_add(call_us, Ordering::Relaxed);
}
pub async fn write_at_with_delayed_sync(&self, writes: Vec<WriteRequest>) {
let count: u64 = writes.len().try_into().unwrap();
let mut deduplicated = vec![];
for w in writes {
match w.deduplicate_seq {
Some(_) => deduplicated.push(w),
None => Off64AsyncWrite::write_at(self, w.offset, w.data).await,
};
}
let (fut, fut_ctl) = SignalFuture::new();
{
let mut state = self.pending_sync_state.lock().await;
let now = Instant::now();
state.earliest_unsynced.get_or_insert(now);
state.latest_unsynced = Some(now);
state.pending_sync_fut_states.push(fut_ctl.clone());
for w in deduplicated {
if state
.pending_deduplicated_writes
.seen_seq
.get(&w.offset)
.filter(|s| **s > w.deduplicate_seq.unwrap())
.is_none()
{
state
.pending_deduplicated_writes
.data
.insert(w.offset, w.data);
state
.pending_deduplicated_writes
.seen_seq
.insert(w.offset, w.deduplicate_seq.unwrap());
};
}
};
self
.metrics
.sync_delayed_counter
.fetch_add(count, Ordering::Relaxed);
fut.await;
}
pub async fn start_delayed_data_sync_background_loop(&self) {
let mut futures_to_wake = Vec::new();
let mut deduplicated_writes = Vec::new();
loop {
sleep(std::time::Duration::from_micros(self.sync_delay_us)).await;
struct SyncNow {
longest_delay_us: u64,
shortest_delay_us: u64,
}
let sync_now = {
let mut state = self.pending_sync_state.lock().await;
if !state.pending_sync_fut_states.is_empty()
|| !state.pending_deduplicated_writes.data.is_empty()
{
let longest_delay_us = dur_us(state.earliest_unsynced.unwrap());
let shortest_delay_us = dur_us(state.latest_unsynced.unwrap());
state.earliest_unsynced = None;
state.latest_unsynced = None;
futures_to_wake.extend(state.pending_sync_fut_states.drain(..));
deduplicated_writes.extend(state.pending_deduplicated_writes.data.drain());
Some(SyncNow {
longest_delay_us,
shortest_delay_us,
})
} else {
None
}
};
if let Some(SyncNow {
longest_delay_us,
shortest_delay_us,
}) = sync_now
{
self
.metrics
.sync_longest_delay_us_counter
.fetch_add(longest_delay_us, Ordering::Relaxed);
self
.metrics
.sync_shortest_delay_us_counter
.fetch_add(shortest_delay_us, Ordering::Relaxed);
iter(deduplicated_writes.drain(..))
.for_each_concurrent(None, |(offset, data)| async move {
Off64AsyncWrite::write_at(self, offset, data).await;
})
.await;
self.sync_data().await;
for ft in futures_to_wake.drain(..) {
ft.signal();
}
};
self
.metrics
.sync_background_loops_counter
.fetch_add(1, Ordering::Relaxed);
}
}
pub async fn sync_data(&self) {
#[cfg(feature = "tokio_file")]
let fd = self.fd.clone();
#[cfg(feature = "mmap")]
let mmap = self.mmap.clone();
let started = Instant::now();
spawn_blocking(move || {
#[cfg(feature = "tokio_file")]
fd.sync_data().unwrap();
#[cfg(feature = "mmap")]
mmap.flush().unwrap();
})
.await
.unwrap();
let sync_us: u64 = started.elapsed().as_micros().try_into().unwrap();
self.metrics.sync_counter.fetch_add(1, Ordering::Relaxed);
self
.metrics
.sync_us_counter
.fetch_add(sync_us, Ordering::Relaxed);
}
}
#[cfg(feature = "mmap")]
impl<'a> Off64Read<'a, Vec<u8>> for SeekableAsyncFile {
fn read_at(&self, offset: u64, len: u64) -> Vec<u8> {
let offset = usz!(offset);
let len = usz!(len);
let memory = unsafe { std::slice::from_raw_parts(self.mmap.as_ptr(), self.mmap_len) };
memory[offset..offset + len].to_vec()
}
}
#[cfg(feature = "mmap")]
impl<'a> Off64Write<Vec<u8>> for SeekableAsyncFile {
fn write_at<'v>(&self, offset: u64, data: Vec<u8>) -> () {
let offset = usz!(offset);
let len = data.len();
let started = Instant::now();
let memory = unsafe { std::slice::from_raw_parts_mut(self.mmap.as_mut_ptr(), self.mmap_len) };
memory[offset..offset + len].copy_from_slice(&data);
let call_us: u64 = started.elapsed().as_micros().try_into().unwrap();
self.bump_write_metrics(len.try_into().unwrap(), call_us);
}
}
#[async_trait]
impl<'a> Off64AsyncRead<'a, Vec<u8>> for SeekableAsyncFile {
#[cfg(feature = "mmap")]
async fn read_at(&self, offset: u64, len: u64) -> Vec<u8> {
let offset = usz!(offset);
let len = usz!(len);
let mmap = self.mmap.clone();
let mmap_len = self.mmap_len;
spawn_blocking(move || {
let memory = unsafe { std::slice::from_raw_parts(mmap.as_ptr(), mmap_len) };
memory[offset..offset + len].to_vec()
})
.await
.unwrap()
}
#[cfg(feature = "tokio_file")]
async fn read_at(&self, offset: u64, len: u64) -> Vec<u8> {
let fd = self.fd.clone();
spawn_blocking(move || {
let mut buf = vec![0u8; len.try_into().unwrap()];
fd.read_exact_at(&mut buf, offset).unwrap();
buf
})
.await
.unwrap()
}
}
#[async_trait]
impl Off64AsyncWrite<Vec<u8>> for SeekableAsyncFile {
#[cfg(feature = "mmap")]
async fn write_at(&self, offset: u64, data: Vec<u8>) {
let offset = usz!(offset);
let len = data.len();
let started = Instant::now();
let mmap = self.mmap.clone();
let mmap_len = self.mmap_len;
spawn_blocking(move || {
let memory = unsafe { std::slice::from_raw_parts_mut(mmap.as_mut_ptr(), mmap_len) };
memory[offset..offset + len].copy_from_slice(&data);
})
.await
.unwrap();
let call_us: u64 = started.elapsed().as_micros().try_into().unwrap();
self.bump_write_metrics(len.try_into().unwrap(), call_us);
}
#[cfg(feature = "tokio_file")]
async fn write_at(&self, offset: u64, data: Vec<u8>) {
let fd = self.fd.clone();
let len: u64 = data.len().try_into().unwrap();
let started = Instant::now();
spawn_blocking(move || fd.write_all_at(&data, offset).unwrap())
.await
.unwrap();
let call_us: u64 = started.elapsed().as_micros().try_into().unwrap();
self.bump_write_metrics(len, call_us);
}
}