use crate::journal::{JournalHandle, Lsn};
use crate::{Error, Result};
use std::sync::atomic::Ordering;
use std::sync::Arc;
#[cfg(all(target_os = "linux", feature = "async"))]
use crate::async_io::completion_driver::AsyncIoUring;
#[cfg(all(target_os = "linux", feature = "async"))]
const JOURNAL_IOURING_DEPTH: u32 = 256;
impl JournalHandle {
pub async fn append_async(self: Arc<Self>, record: Vec<u8>) -> Result<Lsn> {
super::require_runtime()?;
if self.direct {
return tokio::task::spawn_blocking(move || self.append(&record))
.await
.map_err(join_error_to_io)?;
}
#[cfg(all(target_os = "linux", feature = "async"))]
if let Some(ring) = self.native_ring() {
return self.append_native(ring, record).await;
}
tokio::task::spawn_blocking(move || self.append(&record))
.await
.map_err(join_error_to_io)?
}
pub async fn sync_through_async(self: Arc<Self>, lsn: Lsn) -> Result<()> {
super::require_runtime()?;
let lsn_off = lsn.as_u64();
if self.synced_lsn.load(Ordering::Acquire) >= lsn_off {
return Ok(());
}
if self.direct {
return tokio::task::spawn_blocking(move || self.sync_through(lsn))
.await
.map_err(join_error_to_io)?;
}
#[cfg(all(target_os = "linux", feature = "async"))]
if let Some(ring) = self.native_ring() {
return self.sync_through_native(ring, lsn).await;
}
tokio::task::spawn_blocking(move || self.sync_through(lsn))
.await
.map_err(join_error_to_io)?
}
#[must_use]
pub fn native_iouring_active(&self) -> bool {
#[cfg(all(target_os = "linux", feature = "async"))]
{
matches!(self.native_ring.get(), Some(Some(_)))
}
#[cfg(not(all(target_os = "linux", feature = "async")))]
{
false
}
}
}
#[cfg(all(target_os = "linux", feature = "async"))]
impl JournalHandle {
fn native_ring(&self) -> Option<&AsyncIoUring> {
let outer = self.native_ring.get_or_init(|| {
AsyncIoUring::new(JOURNAL_IOURING_DEPTH).ok().map(Arc::new)
});
outer.as_ref().map(|arc| arc.as_ref())
}
async fn append_native(&self, ring: &AsyncIoUring, record: Vec<u8>) -> Result<Lsn> {
use std::os::fd::AsRawFd;
let frame = crate::journal::format::encode_frame_owned(&record)?;
let frame_len = frame.len() as u64;
let start = self.next_lsn.fetch_add(frame_len, Ordering::Release);
let end = start + frame_len;
let fd = self.file.as_raw_fd();
let n =
crate::async_io::iouring_substrate::write_at_native(ring, fd, &frame, start).await?;
if n != frame.len() {
return Err(Error::Io(std::io::Error::other(
"native io_uring write returned short count on journal append",
)));
}
Ok(Lsn::new(end))
}
async fn sync_through_native(&self, ring: &AsyncIoUring, lsn: Lsn) -> Result<()> {
use std::os::fd::AsRawFd;
let lsn_off = lsn.as_u64();
loop {
if self.synced_lsn.load(Ordering::Acquire) >= lsn_off {
return Ok(());
}
let mut state = match self.group_commit.state.try_lock() {
Some(g) => g,
None => {
tokio::task::yield_now().await;
continue;
}
};
if state.committed_lsn >= lsn_off {
return Ok(());
}
if state.in_flight {
drop(state);
tokio::task::yield_now().await;
continue;
}
state.in_flight = true;
drop(state);
let frontier = self.next_lsn.load(Ordering::Acquire);
let fd = self.file.as_raw_fd();
let result = crate::async_io::iouring_substrate::fdatasync_native(ring, fd).await;
let mut state = self.group_commit.state.lock();
if result.is_ok() && frontier > state.committed_lsn {
state.committed_lsn = frontier;
self.synced_lsn.store(frontier, Ordering::Release);
}
state.in_flight = false;
let _ = self.group_commit.cv_followers.notify_all();
return result;
}
}
}
fn join_error_to_io(e: tokio::task::JoinError) -> Error {
Error::Io(std::io::Error::other(format!(
"spawn_blocking task failed: {e}"
)))
}
#[cfg(test)]
mod tests {
use super::*;
use crate::builder;
use std::path::PathBuf;
use std::sync::atomic::{AtomicU64, Ordering};
static C: AtomicU64 = AtomicU64::new(0);
fn tmp_path(tag: &str) -> PathBuf {
let n = C.fetch_add(1, Ordering::Relaxed);
std::env::temp_dir().join(format!(
"fsys_journal_async_test_{}_{}_{tag}",
std::process::id(),
n
))
}
struct Cleanup(PathBuf);
impl Drop for Cleanup {
fn drop(&mut self) {
let _ = std::fs::remove_file(&self.0);
}
}
async fn with_timeout<F, T>(fut: F) -> T
where
F: std::future::Future<Output = T>,
{
const TIMEOUT_SECS: u64 = 15;
match tokio::time::timeout(std::time::Duration::from_secs(TIMEOUT_SECS), fut).await {
Ok(v) => v,
Err(_) => panic!(
"test exceeded {TIMEOUT_SECS}s timeout — likely a hang in the async journal path"
),
}
}
#[tokio::test]
async fn append_async_returns_lsn_advanced_by_framed_len() {
with_timeout(async {
let path = tmp_path("append_async");
let _g = Cleanup(path.clone());
let fs = builder().build().expect("handle");
let log = Arc::new(fs.journal(&path).expect("journal"));
let lsn1 = log
.clone()
.append_async(b"hello".to_vec())
.await
.expect("a1");
assert_eq!(lsn1, Lsn::new(5 + 12));
let lsn2 = log
.clone()
.append_async(b" world".to_vec())
.await
.expect("a2");
assert_eq!(lsn2, Lsn::new(17 + 6 + 12));
})
.await;
}
#[tokio::test]
async fn sync_through_async_advances_synced_lsn() {
with_timeout(async {
let path = tmp_path("sync_through_async");
let _g = Cleanup(path.clone());
let fs = builder().build().expect("handle");
let log = Arc::new(fs.journal(&path).expect("journal"));
let lsn = log
.clone()
.append_async(b"durable".to_vec())
.await
.expect("append");
log.clone().sync_through_async(lsn).await.expect("sync");
assert!(log.synced_lsn() >= lsn);
})
.await;
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn concurrent_async_appends_all_succeed() {
with_timeout(async {
let path = tmp_path("concurrent_async");
let _g = Cleanup(path.clone());
let fs = builder().build().expect("handle");
let log = Arc::new(fs.journal(&path).expect("journal"));
let mut joins = Vec::new();
for i in 0..32 {
let log = log.clone();
let payload = format!("rec {i:04}").into_bytes();
joins.push(tokio::spawn(async move { log.append_async(payload).await }));
}
let mut max_lsn = Lsn::ZERO;
for j in joins {
let lsn = j.await.expect("join").expect("append_async");
if lsn > max_lsn {
max_lsn = lsn;
}
}
log.clone()
.sync_through_async(max_lsn)
.await
.expect("final sync");
assert!(log.synced_lsn() >= max_lsn);
})
.await;
}
#[tokio::test]
async fn direct_mode_async_round_trip() {
with_timeout(async {
let path = tmp_path("direct_async");
let _g = Cleanup(path.clone());
let fs = builder().build().expect("handle");
let log = Arc::new(
fs.journal_with(&path, crate::JournalOptions::new().direct(true))
.expect("direct journal"),
);
let lsn = log
.clone()
.append_async(b"async direct payload".to_vec())
.await
.expect("append_async");
log.clone()
.sync_through_async(lsn)
.await
.expect("sync_through_async");
assert!(log.synced_lsn() >= lsn);
assert!(!log.native_iouring_active());
})
.await;
}
#[cfg(target_os = "linux")]
#[tokio::test]
async fn native_iouring_engages_on_linux_when_available() {
with_timeout(async {
let path = tmp_path("native_engage");
let _g = Cleanup(path.clone());
let fs = builder().build().expect("handle");
let log = Arc::new(fs.journal(&path).expect("journal"));
let _ = log
.clone()
.append_async(b"trigger".to_vec())
.await
.expect("append");
let active = log.native_iouring_active();
assert!(active || !active);
})
.await;
}
}