use flexi_logger::{FileSpec, Logger, WriteMode};
use log::debug;
use std::cell::UnsafeCell;
use std::fs::{File, OpenOptions};
use std::io::{BufRead, BufReader, Read, Seek, SeekFrom};
use std::os::unix::io::AsRawFd;
use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering as AtomicOrdering};
use std::sync::Arc;
use std::thread;
use std::time::Instant;
use io_uring::{opcode, types, IoUring};
const BATCH_SIZE: usize = 1024 * 1024; const RING_LEN: usize = 4;
pub struct Buffer {
pub(crate) buffer: UnsafeCell<*mut u8>,
size: AtomicUsize,
}
unsafe impl Send for Buffer {}
unsafe impl Sync for Buffer {}
impl Buffer {
pub fn new(capacity: usize) -> Self {
let layout = std::alloc::Layout::from_size_align(capacity, 4096)
.expect("invalid layout");
let ptr = unsafe { std::alloc::alloc(layout) };
assert!(!ptr.is_null(), "allocation failed");
Self {
buffer: UnsafeCell::new(ptr),
size: AtomicUsize::new(capacity),
}
}
pub fn write(&self, offset: usize, payload: &[u8]) {
unsafe {
let dst = (*self.buffer.get()).add(offset);
std::ptr::copy_nonoverlapping(payload.as_ptr(), dst, payload.len());
}
}
pub fn as_ptr(&self, offset: usize) -> *const u8 {
unsafe { (*self.buffer.get()).add(offset) }
}
pub fn capacity(&self) -> usize {
self.size.load(AtomicOrdering::Relaxed)
}
}
impl Drop for Buffer {
fn drop(&mut self) {
let cap = self.size.load(AtomicOrdering::Relaxed);
let layout = std::alloc::Layout::from_size_align(cap, 4096)
.expect("invalid layout");
unsafe { std::alloc::dealloc(*self.buffer.get(), layout) };
}
}
#[derive(PartialEq)]
enum SlotState {
Open,
InFlight,
}
struct Slot {
buf: Buffer,
state: SlotState,
disk_offset: u64,
sealed_len: usize,
}
impl Slot {
fn new() -> Self {
Self {
buf: Buffer::new(BATCH_SIZE),
state: SlotState::Open,
disk_offset: 0,
sealed_len: 0,
}
}
}
#[derive(Debug, Clone, Copy)]
pub struct CompletedRange {
pub disk_offset: u64,
pub sealed_len: usize,
}
struct RingWriter {
slots: [Slot; RING_LEN],
head: usize,
cursor: AtomicUsize,
file_offset: Arc<AtomicU64>,
ring: IoUring,
pending: usize,
fd: i32,
completed: Vec<CompletedRange>,
}
impl RingWriter {
fn new(
fd: i32,
file_offset: Arc<AtomicU64>,
) -> Result<Self, Box<dyn std::error::Error + Send + Sync>> {
Ok(Self {
slots: std::array::from_fn(|_| Slot::new()),
head: 0,
cursor: AtomicUsize::new(0),
file_offset,
ring: IoUring::new(256)?,
pending: 0,
fd,
completed: Vec::new(),
})
}
pub fn append(
&mut self,
payload: &[u8],
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let len = payload.len();
let offset = self.cursor.fetch_add(len, AtomicOrdering::SeqCst);
if offset + len > BATCH_SIZE {
self.seal_and_advance(offset)?;
let new_offset = self.cursor.fetch_add(len, AtomicOrdering::SeqCst);
self.slots[self.head].buf.write(new_offset, payload);
} else {
self.slots[self.head].buf.write(offset, payload);
}
Ok(())
}
pub fn flush(
&mut self,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let filled = self.cursor.load(AtomicOrdering::SeqCst);
if filled > 0 {
self.seal_and_advance(filled)?;
}
Ok(())
}
pub fn finish(
&mut self,
) -> Result<Vec<CompletedRange>, Box<dyn std::error::Error + Send + Sync>> {
let remaining = self.cursor.load(AtomicOrdering::SeqCst);
if remaining > 0 {
self.seal_and_advance(remaining)?;
}
if self.pending > 0 {
self.ring.submit_and_wait(self.pending)?;
self.reap_all();
}
Ok(std::mem::take(&mut self.completed))
}
fn seal_and_advance(
&mut self,
valid_len: usize,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
if valid_len > 0 {
self.submit_slot(self.head, valid_len)?;
}
let next = (self.head + 1) % RING_LEN;
while self.slots[next].state == SlotState::InFlight {
self.ring.submit_and_wait(1)?;
self.reap_all();
}
self.head = next;
self.cursor.store(0, AtomicOrdering::SeqCst);
Ok(())
}
fn submit_slot(
&mut self,
idx: usize,
valid_len: usize,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let disk_off = self
.file_offset
.fetch_add(valid_len as u64, AtomicOrdering::SeqCst);
self.slots[idx].disk_offset = disk_off;
self.slots[idx].sealed_len = valid_len;
self.slots[idx].state = SlotState::InFlight;
let user_data = ((idx as u64) << 32) | (valid_len as u64);
let write_e = opcode::Write::new(
types::Fd(self.fd),
self.slots[idx].buf.as_ptr(0),
valid_len as u32,
)
.offset(disk_off)
.build()
.user_data(user_data);
unsafe {
if self.ring.submission().push(&write_e).is_err() {
self.ring.submit()?;
self.ring
.submission()
.push(&write_e)
.expect("submission queue full after drain");
}
}
self.pending += 1;
Ok(())
}
fn reap_all(&mut self) {
while let Some(cqe) = self.ring.completion().next() {
let ud = cqe.user_data();
let idx = (ud >> 32) as usize;
let len = (ud & 0xFFFF_FFFF) as usize;
self.completed.push(CompletedRange {
disk_offset: self.slots[idx].disk_offset,
sealed_len: len,
});
self.slots[idx].state = SlotState::Open;
self.slots[idx].sealed_len = 0;
self.pending -= 1;
}
}
}
fn main() -> Result<(), Box<dyn std::error::Error>> {
let _logger = Logger::try_with_str("debug")
.unwrap()
.log_to_file(
FileSpec::default()
.directory("logs")
.basename("verification")
.suffix("log"),
)
.write_mode(WriteMode::BufferAndFlush)
.start()
.unwrap();
println!("Writing 150,000 messages using io_uring (4-slot 1 MB ring, pre-reserved addresses)...");
let file = OpenOptions::new()
.create(true)
.write(true)
.read(true)
.truncate(true)
.open("data_output.log")?;
let fd = file.as_raw_fd();
let file_offset = Arc::new(AtomicU64::new(0));
let start = Instant::now();
let mut handles = vec![];
for thread_id in 0..5usize {
let file_offset_clone = Arc::clone(&file_offset);
let handle = thread::spawn(
move || -> Result<Vec<CompletedRange>, Box<dyn std::error::Error + Send + Sync>> {
let mut writer = RingWriter::new(fd, file_offset_clone)?;
for i in 0..30_000usize {
let message_num = i + 30_000 * thread_id;
let message = format!(
"[{:02}] [Thread-{}] Message {:06}\n",
thread_id, thread_id, message_num
);
writer.append(message.as_bytes())?;
}
let ranges = writer.finish()?;
println!(
"Thread {} done — {} completed range(s)",
thread_id,
ranges.len()
);
Ok(ranges)
},
);
handles.push(handle);
}
let mut all_ranges: Vec<CompletedRange> = Vec::new();
let mut total_messages = 0usize;
for handle in handles {
let ranges = handle.join().unwrap().expect("thread panicked");
total_messages += 30_000; all_ranges.extend(ranges);
}
drop(file);
let elapsed = start.elapsed();
println!(
"Writing complete! {} messages in {:.2}s ({:.0}/s)",
total_messages,
elapsed.as_secs_f64(),
total_messages as f64 / elapsed.as_secs_f64(),
);
all_ranges.sort_by_key(|r| r.disk_offset);
debug!("\n--- {} completed flush range(s) ---", all_ranges.len());
println!("\n--- {} completed flush range(s) ---", all_ranges.len());
let mut read_file = File::open("data_output.log")?;
for (i, range) in all_ranges.iter().enumerate() {
let mut buf = vec![0u8; range.sealed_len];
read_file.seek(SeekFrom::Start(range.disk_offset))?;
read_file.read_exact(&mut buf)?;
let text = String::from_utf8_lossy(&buf);
debug!("[range {i}] disk_offset={} sealed_len={}", range.disk_offset, range.sealed_len);
println!("[range {i}] disk_offset={} sealed_len={}", range.disk_offset, range.sealed_len);
debug!("{}", text);
}
println!("\nReading data file and logging to flexi_logger...");
let log_file = File::open("data_output.log")?;
let reader = BufReader::new(log_file);
let mut line_count = 0;
for line in reader.lines() {
if let Ok(content) = line {
debug!("{}", content);
line_count += 1;
}
}
println!("Logged {} lines to flexi_logger", line_count);
drop(_logger);
std::thread::sleep(std::time::Duration::from_millis(500));
println!("\nCheck logs/verification.log for all messages!");
Ok(())
}