1use std::collections::VecDeque;
44use std::fs::{File, OpenOptions};
45use std::io::{self, Seek, SeekFrom, Write};
46use std::path::{Path, PathBuf};
47use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
48use std::sync::Arc;
49
50const DEFAULT_RING_SIZE: u32 = 256;
52
53const DEFAULT_BATCH_SIZE: usize = 32;
55
56const DEFAULT_BATCH_TIMEOUT_US: u64 = 100;
58
59#[derive(Clone)]
65pub struct CompletionToken {
66 #[allow(dead_code)]
68 id: u64,
69 completed: Arc<AtomicBool>,
71 result: Arc<AtomicU64>,
73}
74
75impl CompletionToken {
76 fn new(id: u64) -> Self {
78 Self {
79 id,
80 completed: Arc::new(AtomicBool::new(false)),
81 result: Arc::new(AtomicU64::new(0)),
82 }
83 }
84
85 #[inline]
87 pub fn is_completed(&self) -> bool {
88 self.completed.load(Ordering::Acquire)
89 }
90
91 pub fn wait(&self) -> io::Result<usize> {
93 while !self.is_completed() {
94 std::hint::spin_loop();
95 }
96
97 let result = self.result.load(Ordering::Acquire);
98 if result & (1 << 63) != 0 {
99 Err(io::Error::from_raw_os_error((result & 0x7FFFFFFF) as i32))
101 } else {
102 Ok(result as usize)
103 }
104 }
105
106 fn complete(&self, bytes_written: usize) {
108 self.result.store(bytes_written as u64, Ordering::Release);
109 self.completed.store(true, Ordering::Release);
110 }
111
112 fn fail(&self, error_code: i32) {
114 self.result.store((1 << 63) | (error_code as u64), Ordering::Release);
115 self.completed.store(true, Ordering::Release);
116 }
117}
118
119struct SubmissionEntry {
125 data: Vec<u8>,
127 offset: u64,
129 token: CompletionToken,
131}
132
133struct BatchSubmitter {
139 pending: VecDeque<SubmissionEntry>,
141 batch_size: usize,
143 pending_bytes: usize,
145}
146
147impl BatchSubmitter {
148 fn new(batch_size: usize) -> Self {
149 Self {
150 pending: VecDeque::with_capacity(batch_size),
151 batch_size,
152 pending_bytes: 0,
153 }
154 }
155
156 fn push(&mut self, entry: SubmissionEntry) {
158 self.pending_bytes += entry.data.len();
159 self.pending.push_back(entry);
160 }
161
162 fn should_submit(&self) -> bool {
164 self.pending.len() >= self.batch_size
165 }
166
167 fn take_batch(&mut self) -> Vec<SubmissionEntry> {
169 self.pending_bytes = 0;
170 self.pending.drain(..).collect()
171 }
172
173 fn len(&self) -> usize {
175 self.pending.len()
176 }
177}
178
179#[derive(Clone)]
185pub struct IoUringWalConfig {
186 pub ring_size: u32,
188 pub batch_size: usize,
190 pub batch_timeout_us: u64,
192 pub use_direct_io: bool,
194 pub preallocate_size: u64,
196}
197
198impl Default for IoUringWalConfig {
199 fn default() -> Self {
200 Self {
201 ring_size: DEFAULT_RING_SIZE,
202 batch_size: DEFAULT_BATCH_SIZE,
203 batch_timeout_us: DEFAULT_BATCH_TIMEOUT_US,
204 use_direct_io: false,
205 preallocate_size: 64 * 1024 * 1024, }
207 }
208}
209
210pub struct IoUringWal {
215 #[allow(dead_code)]
217 path: PathBuf,
218 file: File,
220 #[allow(dead_code)]
222 config: IoUringWalConfig,
223 submitter: parking_lot::Mutex<BatchSubmitter>,
225 next_op_id: AtomicU64,
227 write_offset: AtomicU64,
229 total_bytes: AtomicU64,
231 total_ops: AtomicU64,
233 shutdown: AtomicBool,
235}
236
237impl IoUringWal {
238 pub fn open<P: AsRef<Path>>(path: P, config: IoUringWalConfig) -> io::Result<Self> {
240 let path = path.as_ref().to_path_buf();
241
242 let mut options = OpenOptions::new();
243 options.create(true).read(true).write(true);
244
245 let mut file = options.open(&path)?;
249
250 if config.preallocate_size > 0 {
252 let current_len = file.metadata()?.len();
254 if current_len < config.preallocate_size {
255 file.seek(SeekFrom::Start(config.preallocate_size - 1))?;
256 file.write_all(&[0])?;
257 file.seek(SeekFrom::Start(0))?;
258 }
259 }
260
261 Ok(Self {
262 path,
263 file,
264 config: config.clone(),
265 submitter: parking_lot::Mutex::new(BatchSubmitter::new(config.batch_size)),
266 next_op_id: AtomicU64::new(0),
267 write_offset: AtomicU64::new(0),
268 total_bytes: AtomicU64::new(0),
269 total_ops: AtomicU64::new(0),
270 shutdown: AtomicBool::new(false),
271 })
272 }
273
274 pub fn write(&self, data: Vec<u8>) -> io::Result<CompletionToken> {
278 if self.shutdown.load(Ordering::Acquire) {
279 return Err(io::Error::new(io::ErrorKind::Other, "WAL is shutdown"));
280 }
281
282 let op_id = self.next_op_id.fetch_add(1, Ordering::Relaxed);
283 let token = CompletionToken::new(op_id);
284
285 let data_len = data.len() as u64;
286 let offset = self.write_offset.fetch_add(data_len, Ordering::Relaxed);
287
288 let entry = SubmissionEntry {
289 data,
290 offset,
291 token: token.clone(),
292 };
293
294 let should_submit = {
295 let mut submitter = self.submitter.lock();
296 submitter.push(entry);
297 submitter.should_submit()
298 };
299
300 if should_submit {
301 self.submit_batch()?;
302 }
303
304 Ok(token)
305 }
306
307 fn submit_batch(&self) -> io::Result<()> {
309 let entries = {
310 let mut submitter = self.submitter.lock();
311 submitter.take_batch()
312 };
313
314 if entries.is_empty() {
315 return Ok(());
316 }
317
318 self.submit_sync(entries)
321 }
322
323 fn submit_sync(&self, entries: Vec<SubmissionEntry>) -> io::Result<()> {
325 for entry in entries {
330 match self.do_write(&entry) {
331 Ok(bytes) => {
332 entry.token.complete(bytes);
333 self.total_bytes.fetch_add(bytes as u64, Ordering::Relaxed);
334 self.total_ops.fetch_add(1, Ordering::Relaxed);
335 }
336 Err(e) => {
337 entry.token.fail(e.raw_os_error().unwrap_or(-1));
338 }
339 }
340 }
341
342 Ok(())
343 }
344
345 fn do_write(&self, entry: &SubmissionEntry) -> io::Result<usize> {
347 #[cfg(unix)]
349 {
350 use std::os::unix::fs::FileExt;
351 self.file.write_at(&entry.data, entry.offset)
352 }
353
354 #[cfg(not(unix))]
355 {
356 use std::io::{Seek, SeekFrom, Write};
358 let mut file = &self.file;
359 file.seek(SeekFrom::Start(entry.offset))?;
360 file.write_all(&entry.data)?;
361 Ok(entry.data.len())
362 }
363 }
364
365 pub fn flush(&self) -> io::Result<()> {
367 self.submit_batch()?;
369
370 self.file.sync_all()
372 }
373
374 pub fn flush_pending(&self) -> io::Result<()> {
376 self.submit_batch()
377 }
378
379 pub fn stats(&self) -> WalStats {
381 let submitter = self.submitter.lock();
382 WalStats {
383 total_bytes_written: self.total_bytes.load(Ordering::Relaxed),
384 total_operations: self.total_ops.load(Ordering::Relaxed),
385 current_offset: self.write_offset.load(Ordering::Relaxed),
386 pending_entries: submitter.len(),
387 pending_bytes: submitter.pending_bytes,
388 }
389 }
390
391 pub fn shutdown(&self) -> io::Result<()> {
393 self.shutdown.store(true, Ordering::Release);
394 self.flush()
395 }
396}
397
398#[derive(Debug, Clone)]
400pub struct WalStats {
401 pub total_bytes_written: u64,
403 pub total_operations: u64,
405 pub current_offset: u64,
407 pub pending_entries: usize,
409 pub pending_bytes: usize,
411}
412
413pub struct CompletionHandler {
419 tokens: Vec<CompletionToken>,
421}
422
423impl CompletionHandler {
424 pub fn new() -> Self {
426 Self { tokens: Vec::new() }
427 }
428
429 pub fn track(&mut self, token: CompletionToken) {
431 self.tokens.push(token);
432 }
433
434 pub fn wait_all(&self) -> io::Result<Vec<usize>> {
436 let mut results = Vec::with_capacity(self.tokens.len());
437 for token in &self.tokens {
438 results.push(token.wait()?);
439 }
440 Ok(results)
441 }
442
443 pub fn poll(&self) -> Vec<(usize, bool)> {
445 self.tokens.iter()
446 .enumerate()
447 .map(|(i, t)| (i, t.is_completed()))
448 .collect()
449 }
450
451 pub fn completed_count(&self) -> usize {
453 self.tokens.iter().filter(|t| t.is_completed()).count()
454 }
455
456 pub fn all_completed(&self) -> bool {
458 self.tokens.iter().all(|t| t.is_completed())
459 }
460
461 pub fn clear(&mut self) {
463 self.tokens.clear();
464 }
465}
466
467impl Default for CompletionHandler {
468 fn default() -> Self {
469 Self::new()
470 }
471}
472
473pub struct GroupCommitWal {
479 wal: IoUringWal,
481 group_size: usize,
483 #[allow(dead_code)]
485 group_timeout_ms: u64,
486 pending: parking_lot::Mutex<Vec<(Vec<u8>, CompletionToken)>>,
488}
489
490impl GroupCommitWal {
491 pub fn new(wal: IoUringWal, group_size: usize, group_timeout_ms: u64) -> Self {
493 Self {
494 wal,
495 group_size,
496 group_timeout_ms,
497 pending: parking_lot::Mutex::new(Vec::with_capacity(group_size)),
498 }
499 }
500
501 pub fn write(&self, data: Vec<u8>) -> io::Result<CompletionToken> {
503 let token = self.wal.write(data)?;
504
505 let should_flush = {
507 let pending = self.pending.lock();
508 pending.len() >= self.group_size
509 };
510
511 if should_flush {
512 self.wal.flush_pending()?;
513 }
514
515 Ok(token)
516 }
517
518 pub fn flush(&self) -> io::Result<()> {
520 self.wal.flush()
521 }
522}
523
524#[cfg(test)]
525mod tests {
526 use super::*;
527 use std::thread;
528 use tempfile::tempdir;
529
530 #[test]
531 fn test_completion_token() {
532 let token = CompletionToken::new(1);
533 assert!(!token.is_completed());
534
535 token.complete(100);
536 assert!(token.is_completed());
537 assert_eq!(token.wait().unwrap(), 100);
538 }
539
540 #[test]
541 fn test_completion_token_error() {
542 let token = CompletionToken::new(1);
543 token.fail(5); assert!(token.is_completed());
546 assert!(token.wait().is_err());
547 }
548
549 #[test]
550 fn test_wal_basic() {
551 let dir = tempdir().unwrap();
552 let wal_path = dir.path().join("test.wal");
553
554 let config = IoUringWalConfig {
555 batch_size: 4,
556 preallocate_size: 1024 * 1024,
557 ..Default::default()
558 };
559
560 let wal = IoUringWal::open(&wal_path, config).unwrap();
561
562 let token = wal.write(b"hello".to_vec()).unwrap();
563 wal.flush().unwrap();
564
565 assert_eq!(token.wait().unwrap(), 5);
566
567 let stats = wal.stats();
568 assert_eq!(stats.total_bytes_written, 5);
569 assert_eq!(stats.total_operations, 1);
570 }
571
572 #[test]
573 fn test_wal_batch() {
574 let dir = tempdir().unwrap();
575 let wal_path = dir.path().join("test.wal");
576
577 let config = IoUringWalConfig {
578 batch_size: 4,
579 ..Default::default()
580 };
581
582 let wal = IoUringWal::open(&wal_path, config).unwrap();
583
584 let mut handler = CompletionHandler::new();
585
586 for i in 0..10 {
587 let token = wal.write(format!("entry{}", i).into_bytes()).unwrap();
588 handler.track(token);
589 }
590
591 wal.flush().unwrap();
592
593 assert!(handler.all_completed());
594 assert_eq!(handler.completed_count(), 10);
595 }
596
597 #[test]
598 fn test_wal_concurrent() {
599 let dir = tempdir().unwrap();
600 let wal_path = dir.path().join("test.wal");
601
602 let config = IoUringWalConfig {
604 batch_size: 1,
605 ..Default::default()
606 };
607
608 let wal = Arc::new(IoUringWal::open(&wal_path, config).unwrap());
609
610 let mut handles = vec![];
611
612 for t in 0..4 {
613 let wal = wal.clone();
614 handles.push(thread::spawn(move || {
615 for i in 0..100 {
616 let data = format!("thread{}:entry{}", t, i);
617 let token = wal.write(data.into_bytes()).unwrap();
618 token.wait().unwrap();
619 }
620 }));
621 }
622
623 for handle in handles {
624 handle.join().unwrap();
625 }
626
627 wal.flush().unwrap();
628
629 let stats = wal.stats();
630 assert_eq!(stats.total_operations, 400);
631 }
632
633 #[test]
634 fn test_completion_handler() {
635 let mut handler = CompletionHandler::new();
636
637 let t1 = CompletionToken::new(1);
638 let t2 = CompletionToken::new(2);
639 let t3 = CompletionToken::new(3);
640
641 handler.track(t1.clone());
642 handler.track(t2.clone());
643 handler.track(t3.clone());
644
645 assert_eq!(handler.completed_count(), 0);
646
647 t1.complete(10);
648 assert_eq!(handler.completed_count(), 1);
649
650 t2.complete(20);
651 t3.complete(30);
652
653 assert!(handler.all_completed());
654
655 let results = handler.wait_all().unwrap();
656 assert_eq!(results, vec![10, 20, 30]);
657 }
658
659 #[test]
660 fn test_group_commit() {
661 let dir = tempdir().unwrap();
662 let wal_path = dir.path().join("test.wal");
663
664 let wal = IoUringWal::open(&wal_path, IoUringWalConfig::default()).unwrap();
665 let group_wal = GroupCommitWal::new(wal, 10, 100);
666
667 let mut tokens = vec![];
668 for i in 0..25 {
669 tokens.push(group_wal.write(format!("entry{}", i).into_bytes()).unwrap());
670 }
671
672 group_wal.flush().unwrap();
673
674 for token in tokens {
675 assert!(token.is_completed());
676 }
677 }
678}