1use std::collections::VecDeque;
47use std::fs::{File, OpenOptions};
48use std::io::{self, Seek, SeekFrom, Write};
49use std::path::{Path, PathBuf};
50use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
51use std::sync::Arc;
52
53const DEFAULT_RING_SIZE: u32 = 256;
55
56const DEFAULT_BATCH_SIZE: usize = 32;
58
59const DEFAULT_BATCH_TIMEOUT_US: u64 = 100;
61
62#[derive(Clone)]
68pub struct CompletionToken {
69 #[allow(dead_code)]
71 id: u64,
72 completed: Arc<AtomicBool>,
74 result: Arc<AtomicU64>,
76}
77
78impl CompletionToken {
79 fn new(id: u64) -> Self {
81 Self {
82 id,
83 completed: Arc::new(AtomicBool::new(false)),
84 result: Arc::new(AtomicU64::new(0)),
85 }
86 }
87
88 #[inline]
90 pub fn is_completed(&self) -> bool {
91 self.completed.load(Ordering::Acquire)
92 }
93
94 pub fn wait(&self) -> io::Result<usize> {
96 while !self.is_completed() {
97 std::hint::spin_loop();
98 }
99
100 let result = self.result.load(Ordering::Acquire);
101 if result & (1 << 63) != 0 {
102 Err(io::Error::from_raw_os_error((result & 0x7FFFFFFF) as i32))
104 } else {
105 Ok(result as usize)
106 }
107 }
108
109 fn complete(&self, bytes_written: usize) {
111 self.result.store(bytes_written as u64, Ordering::Release);
112 self.completed.store(true, Ordering::Release);
113 }
114
115 fn fail(&self, error_code: i32) {
117 self.result.store((1 << 63) | (error_code as u64), Ordering::Release);
118 self.completed.store(true, Ordering::Release);
119 }
120}
121
122struct SubmissionEntry {
128 data: Vec<u8>,
130 offset: u64,
132 token: CompletionToken,
134}
135
136struct BatchSubmitter {
142 pending: VecDeque<SubmissionEntry>,
144 batch_size: usize,
146 pending_bytes: usize,
148}
149
150impl BatchSubmitter {
151 fn new(batch_size: usize) -> Self {
152 Self {
153 pending: VecDeque::with_capacity(batch_size),
154 batch_size,
155 pending_bytes: 0,
156 }
157 }
158
159 fn push(&mut self, entry: SubmissionEntry) {
161 self.pending_bytes += entry.data.len();
162 self.pending.push_back(entry);
163 }
164
165 fn should_submit(&self) -> bool {
167 self.pending.len() >= self.batch_size
168 }
169
170 fn take_batch(&mut self) -> Vec<SubmissionEntry> {
172 self.pending_bytes = 0;
173 self.pending.drain(..).collect()
174 }
175
176 fn len(&self) -> usize {
178 self.pending.len()
179 }
180}
181
182#[derive(Clone)]
188pub struct IoUringWalConfig {
189 pub ring_size: u32,
191 pub batch_size: usize,
193 pub batch_timeout_us: u64,
195 pub use_direct_io: bool,
197 pub preallocate_size: u64,
199}
200
201impl Default for IoUringWalConfig {
202 fn default() -> Self {
203 Self {
204 ring_size: DEFAULT_RING_SIZE,
205 batch_size: DEFAULT_BATCH_SIZE,
206 batch_timeout_us: DEFAULT_BATCH_TIMEOUT_US,
207 use_direct_io: false,
208 preallocate_size: 64 * 1024 * 1024, }
210 }
211}
212
213pub struct IoUringWal {
218 #[allow(dead_code)]
220 path: PathBuf,
221 file: File,
223 #[allow(dead_code)]
225 config: IoUringWalConfig,
226 submitter: parking_lot::Mutex<BatchSubmitter>,
228 next_op_id: AtomicU64,
230 write_offset: AtomicU64,
232 total_bytes: AtomicU64,
234 total_ops: AtomicU64,
236 shutdown: AtomicBool,
238}
239
240impl IoUringWal {
241 pub fn open<P: AsRef<Path>>(path: P, config: IoUringWalConfig) -> io::Result<Self> {
243 let path = path.as_ref().to_path_buf();
244
245 let mut options = OpenOptions::new();
246 options.create(true).read(true).write(true);
247
248 let mut file = options.open(&path)?;
252
253 if config.preallocate_size > 0 {
255 let current_len = file.metadata()?.len();
257 if current_len < config.preallocate_size {
258 file.seek(SeekFrom::Start(config.preallocate_size - 1))?;
259 file.write_all(&[0])?;
260 file.seek(SeekFrom::Start(0))?;
261 }
262 }
263
264 Ok(Self {
265 path,
266 file,
267 config: config.clone(),
268 submitter: parking_lot::Mutex::new(BatchSubmitter::new(config.batch_size)),
269 next_op_id: AtomicU64::new(0),
270 write_offset: AtomicU64::new(0),
271 total_bytes: AtomicU64::new(0),
272 total_ops: AtomicU64::new(0),
273 shutdown: AtomicBool::new(false),
274 })
275 }
276
277 pub fn write(&self, data: Vec<u8>) -> io::Result<CompletionToken> {
281 if self.shutdown.load(Ordering::Acquire) {
282 return Err(io::Error::new(io::ErrorKind::Other, "WAL is shutdown"));
283 }
284
285 let op_id = self.next_op_id.fetch_add(1, Ordering::Relaxed);
286 let token = CompletionToken::new(op_id);
287
288 let data_len = data.len() as u64;
289 let offset = self.write_offset.fetch_add(data_len, Ordering::Relaxed);
290
291 let entry = SubmissionEntry {
292 data,
293 offset,
294 token: token.clone(),
295 };
296
297 let should_submit = {
298 let mut submitter = self.submitter.lock();
299 submitter.push(entry);
300 submitter.should_submit()
301 };
302
303 if should_submit {
304 self.submit_batch()?;
305 }
306
307 Ok(token)
308 }
309
310 fn submit_batch(&self) -> io::Result<()> {
312 let entries = {
313 let mut submitter = self.submitter.lock();
314 submitter.take_batch()
315 };
316
317 if entries.is_empty() {
318 return Ok(());
319 }
320
321 self.submit_sync(entries)
324 }
325
326 fn submit_sync(&self, entries: Vec<SubmissionEntry>) -> io::Result<()> {
328 for entry in entries {
333 match self.do_write(&entry) {
334 Ok(bytes) => {
335 entry.token.complete(bytes);
336 self.total_bytes.fetch_add(bytes as u64, Ordering::Relaxed);
337 self.total_ops.fetch_add(1, Ordering::Relaxed);
338 }
339 Err(e) => {
340 entry.token.fail(e.raw_os_error().unwrap_or(-1));
341 }
342 }
343 }
344
345 Ok(())
346 }
347
348 fn do_write(&self, entry: &SubmissionEntry) -> io::Result<usize> {
350 #[cfg(unix)]
352 {
353 use std::os::unix::fs::FileExt;
354 self.file.write_at(&entry.data, entry.offset)
355 }
356
357 #[cfg(not(unix))]
358 {
359 use std::io::{Seek, SeekFrom, Write};
361 let mut file = &self.file;
362 file.seek(SeekFrom::Start(entry.offset))?;
363 file.write_all(&entry.data)?;
364 Ok(entry.data.len())
365 }
366 }
367
368 pub fn flush(&self) -> io::Result<()> {
370 self.submit_batch()?;
372
373 self.file.sync_all()
375 }
376
377 pub fn flush_pending(&self) -> io::Result<()> {
379 self.submit_batch()
380 }
381
382 pub fn stats(&self) -> WalStats {
384 let submitter = self.submitter.lock();
385 WalStats {
386 total_bytes_written: self.total_bytes.load(Ordering::Relaxed),
387 total_operations: self.total_ops.load(Ordering::Relaxed),
388 current_offset: self.write_offset.load(Ordering::Relaxed),
389 pending_entries: submitter.len(),
390 pending_bytes: submitter.pending_bytes,
391 }
392 }
393
394 pub fn shutdown(&self) -> io::Result<()> {
396 self.shutdown.store(true, Ordering::Release);
397 self.flush()
398 }
399}
400
401#[derive(Debug, Clone)]
403pub struct WalStats {
404 pub total_bytes_written: u64,
406 pub total_operations: u64,
408 pub current_offset: u64,
410 pub pending_entries: usize,
412 pub pending_bytes: usize,
414}
415
416pub struct CompletionHandler {
422 tokens: Vec<CompletionToken>,
424}
425
426impl CompletionHandler {
427 pub fn new() -> Self {
429 Self { tokens: Vec::new() }
430 }
431
432 pub fn track(&mut self, token: CompletionToken) {
434 self.tokens.push(token);
435 }
436
437 pub fn wait_all(&self) -> io::Result<Vec<usize>> {
439 let mut results = Vec::with_capacity(self.tokens.len());
440 for token in &self.tokens {
441 results.push(token.wait()?);
442 }
443 Ok(results)
444 }
445
446 pub fn poll(&self) -> Vec<(usize, bool)> {
448 self.tokens.iter()
449 .enumerate()
450 .map(|(i, t)| (i, t.is_completed()))
451 .collect()
452 }
453
454 pub fn completed_count(&self) -> usize {
456 self.tokens.iter().filter(|t| t.is_completed()).count()
457 }
458
459 pub fn all_completed(&self) -> bool {
461 self.tokens.iter().all(|t| t.is_completed())
462 }
463
464 pub fn clear(&mut self) {
466 self.tokens.clear();
467 }
468}
469
470impl Default for CompletionHandler {
471 fn default() -> Self {
472 Self::new()
473 }
474}
475
476pub struct GroupCommitWal {
482 wal: IoUringWal,
484 group_size: usize,
486 #[allow(dead_code)]
488 group_timeout_ms: u64,
489 pending: parking_lot::Mutex<Vec<(Vec<u8>, CompletionToken)>>,
491}
492
493impl GroupCommitWal {
494 pub fn new(wal: IoUringWal, group_size: usize, group_timeout_ms: u64) -> Self {
496 Self {
497 wal,
498 group_size,
499 group_timeout_ms,
500 pending: parking_lot::Mutex::new(Vec::with_capacity(group_size)),
501 }
502 }
503
504 pub fn write(&self, data: Vec<u8>) -> io::Result<CompletionToken> {
506 let token = self.wal.write(data)?;
507
508 let should_flush = {
510 let pending = self.pending.lock();
511 pending.len() >= self.group_size
512 };
513
514 if should_flush {
515 self.wal.flush_pending()?;
516 }
517
518 Ok(token)
519 }
520
521 pub fn flush(&self) -> io::Result<()> {
523 self.wal.flush()
524 }
525}
526
527#[cfg(test)]
528mod tests {
529 use super::*;
530 use std::thread;
531 use tempfile::tempdir;
532
533 #[test]
534 fn test_completion_token() {
535 let token = CompletionToken::new(1);
536 assert!(!token.is_completed());
537
538 token.complete(100);
539 assert!(token.is_completed());
540 assert_eq!(token.wait().unwrap(), 100);
541 }
542
543 #[test]
544 fn test_completion_token_error() {
545 let token = CompletionToken::new(1);
546 token.fail(5); assert!(token.is_completed());
549 assert!(token.wait().is_err());
550 }
551
552 #[test]
553 fn test_wal_basic() {
554 let dir = tempdir().unwrap();
555 let wal_path = dir.path().join("test.wal");
556
557 let config = IoUringWalConfig {
558 batch_size: 4,
559 preallocate_size: 1024 * 1024,
560 ..Default::default()
561 };
562
563 let wal = IoUringWal::open(&wal_path, config).unwrap();
564
565 let token = wal.write(b"hello".to_vec()).unwrap();
566 wal.flush().unwrap();
567
568 assert_eq!(token.wait().unwrap(), 5);
569
570 let stats = wal.stats();
571 assert_eq!(stats.total_bytes_written, 5);
572 assert_eq!(stats.total_operations, 1);
573 }
574
575 #[test]
576 fn test_wal_batch() {
577 let dir = tempdir().unwrap();
578 let wal_path = dir.path().join("test.wal");
579
580 let config = IoUringWalConfig {
581 batch_size: 4,
582 ..Default::default()
583 };
584
585 let wal = IoUringWal::open(&wal_path, config).unwrap();
586
587 let mut handler = CompletionHandler::new();
588
589 for i in 0..10 {
590 let token = wal.write(format!("entry{}", i).into_bytes()).unwrap();
591 handler.track(token);
592 }
593
594 wal.flush().unwrap();
595
596 assert!(handler.all_completed());
597 assert_eq!(handler.completed_count(), 10);
598 }
599
600 #[test]
601 fn test_wal_concurrent() {
602 let dir = tempdir().unwrap();
603 let wal_path = dir.path().join("test.wal");
604
605 let config = IoUringWalConfig {
607 batch_size: 1,
608 ..Default::default()
609 };
610
611 let wal = Arc::new(IoUringWal::open(&wal_path, config).unwrap());
612
613 let mut handles = vec![];
614
615 for t in 0..4 {
616 let wal = wal.clone();
617 handles.push(thread::spawn(move || {
618 for i in 0..100 {
619 let data = format!("thread{}:entry{}", t, i);
620 let token = wal.write(data.into_bytes()).unwrap();
621 token.wait().unwrap();
622 }
623 }));
624 }
625
626 for handle in handles {
627 handle.join().unwrap();
628 }
629
630 wal.flush().unwrap();
631
632 let stats = wal.stats();
633 assert_eq!(stats.total_operations, 400);
634 }
635
636 #[test]
637 fn test_completion_handler() {
638 let mut handler = CompletionHandler::new();
639
640 let t1 = CompletionToken::new(1);
641 let t2 = CompletionToken::new(2);
642 let t3 = CompletionToken::new(3);
643
644 handler.track(t1.clone());
645 handler.track(t2.clone());
646 handler.track(t3.clone());
647
648 assert_eq!(handler.completed_count(), 0);
649
650 t1.complete(10);
651 assert_eq!(handler.completed_count(), 1);
652
653 t2.complete(20);
654 t3.complete(30);
655
656 assert!(handler.all_completed());
657
658 let results = handler.wait_all().unwrap();
659 assert_eq!(results, vec![10, 20, 30]);
660 }
661
662 #[test]
663 fn test_group_commit() {
664 let dir = tempdir().unwrap();
665 let wal_path = dir.path().join("test.wal");
666
667 let wal = IoUringWal::open(&wal_path, IoUringWalConfig::default()).unwrap();
668 let group_wal = GroupCommitWal::new(wal, 10, 100);
669
670 let mut tokens = vec![];
671 for i in 0..25 {
672 tokens.push(group_wal.write(format!("entry{}", i).into_bytes()).unwrap());
673 }
674
675 group_wal.flush().unwrap();
676
677 for token in tokens {
678 assert!(token.is_completed());
679 }
680 }
681}