1use std::collections::VecDeque;
39use std::fs::File;
40use std::io::{self, Read, Seek, SeekFrom, Write};
41use std::path::Path;
42use std::sync::atomic::{AtomicU64, Ordering};
43use std::sync::Arc;
44
45use bytes::{Bytes, BytesMut};
46use parking_lot::Mutex;
47
48#[derive(Debug, Clone)]
54pub struct IoUringConfig {
55 pub sq_entries: u32,
57 pub kernel_poll: bool,
59 pub sq_poll_idle_ms: u32,
61 pub max_inflight: usize,
63 pub direct_io: bool,
65 pub registered_buffers: usize,
67 pub buffer_size: usize,
69}
70
71impl Default for IoUringConfig {
72 fn default() -> Self {
73 Self {
74 sq_entries: 1024,
75 kernel_poll: false, sq_poll_idle_ms: 10,
77 max_inflight: 256,
78 direct_io: false,
79 registered_buffers: 64,
80 buffer_size: 64 * 1024, }
82 }
83}
84
85impl IoUringConfig {
86 pub fn high_throughput() -> Self {
88 Self {
89 sq_entries: 4096,
90 kernel_poll: true,
91 sq_poll_idle_ms: 100,
92 max_inflight: 1024,
93 direct_io: true,
94 registered_buffers: 256,
95 buffer_size: 256 * 1024, }
97 }
98
99 pub fn low_latency() -> Self {
101 Self {
102 sq_entries: 256,
103 kernel_poll: true,
104 sq_poll_idle_ms: 1,
105 max_inflight: 64,
106 direct_io: true,
107 registered_buffers: 32,
108 buffer_size: 16 * 1024, }
110 }
111
112 pub fn minimal() -> Self {
114 Self {
115 sq_entries: 128,
116 kernel_poll: false,
117 sq_poll_idle_ms: 0,
118 max_inflight: 32,
119 direct_io: false,
120 registered_buffers: 8,
121 buffer_size: 4 * 1024, }
123 }
124}
125
126#[cfg(target_os = "linux")]
132pub fn is_io_uring_available() -> bool {
133 let version = match std::fs::read_to_string("/proc/sys/kernel/osrelease") {
135 Ok(v) => v,
136 Err(_) => return false,
137 };
138 let version = version.trim();
139 let parts: Vec<&str> = version.split('.').collect();
140
141 if parts.len() < 2 {
142 return false;
143 }
144
145 let major: u32 = parts[0].parse().unwrap_or(0);
146 let minor: u32 = parts[1]
147 .split('-')
148 .next()
149 .unwrap_or("0")
150 .parse()
151 .unwrap_or(0);
152
153 major > 5 || (major == 5 && minor >= 6)
155}
156
157#[cfg(not(target_os = "linux"))]
158pub fn is_io_uring_available() -> bool {
159 false
160}
161
162#[derive(Debug, Default)]
168pub struct IoUringStats {
169 pub ops_submitted: AtomicU64,
171 pub ops_completed: AtomicU64,
173 pub bytes_written: AtomicU64,
175 pub bytes_read: AtomicU64,
177 pub cqe_overflows: AtomicU64,
179 pub sq_dropped: AtomicU64,
181}
182
183impl IoUringStats {
184 pub fn new() -> Self {
186 Self::default()
187 }
188
189 pub fn snapshot(&self) -> IoUringStatsSnapshot {
191 IoUringStatsSnapshot {
192 ops_submitted: self.ops_submitted.load(Ordering::Relaxed),
193 ops_completed: self.ops_completed.load(Ordering::Relaxed),
194 bytes_written: self.bytes_written.load(Ordering::Relaxed),
195 bytes_read: self.bytes_read.load(Ordering::Relaxed),
196 cqe_overflows: self.cqe_overflows.load(Ordering::Relaxed),
197 sq_dropped: self.sq_dropped.load(Ordering::Relaxed),
198 }
199 }
200}
201
202#[derive(Debug, Clone)]
204pub struct IoUringStatsSnapshot {
205 pub ops_submitted: u64,
206 pub ops_completed: u64,
207 pub bytes_written: u64,
208 pub bytes_read: u64,
209 pub cqe_overflows: u64,
210 pub sq_dropped: u64,
211}
212
213impl IoUringStatsSnapshot {
214 pub fn in_flight(&self) -> u64 {
216 self.ops_submitted.saturating_sub(self.ops_completed)
217 }
218
219 pub fn completion_rate(&self) -> f64 {
221 if self.ops_submitted == 0 {
222 1.0
223 } else {
224 self.ops_completed as f64 / self.ops_submitted as f64
225 }
226 }
227}
228
229pub struct BlockingWriter {
246 file: Mutex<File>,
247 offset: AtomicU64,
248 stats: Arc<IoUringStats>,
249 #[allow(dead_code)] config: IoUringConfig,
251}
252
253impl BlockingWriter {
254 pub fn new(path: impl AsRef<Path>, config: IoUringConfig) -> io::Result<Self> {
256 let file = std::fs::OpenOptions::new()
257 .create(true)
258 .append(true)
259 .open(path)?;
260
261 let offset = file.metadata()?.len();
262
263 Ok(Self {
264 file: Mutex::new(file),
265 offset: AtomicU64::new(offset),
266 stats: Arc::new(IoUringStats::new()),
267 config,
268 })
269 }
270
271 pub fn open(path: impl AsRef<Path>, config: IoUringConfig) -> io::Result<Self> {
273 let file = std::fs::OpenOptions::new().write(true).open(path)?;
274
275 let offset = file.metadata()?.len();
276
277 Ok(Self {
278 file: Mutex::new(file),
279 offset: AtomicU64::new(offset),
280 stats: Arc::new(IoUringStats::new()),
281 config,
282 })
283 }
284
285 pub fn write(&self, data: &[u8]) -> io::Result<u64> {
287 let mut file = self.file.lock();
288 file.write_all(data)?;
289
290 let offset = self.offset.fetch_add(data.len() as u64, Ordering::AcqRel);
291 self.stats.ops_submitted.fetch_add(1, Ordering::Relaxed);
292 self.stats.ops_completed.fetch_add(1, Ordering::Relaxed);
293 self.stats
294 .bytes_written
295 .fetch_add(data.len() as u64, Ordering::Relaxed);
296
297 Ok(offset)
298 }
299
300 pub fn flush(&self) -> io::Result<()> {
302 let mut file = self.file.lock();
303 file.flush()?;
304 file.sync_data()
305 }
306
307 pub fn sync(&self) -> io::Result<()> {
309 let file = self.file.lock();
310 file.sync_data()
311 }
312
313 pub fn offset(&self) -> u64 {
315 self.offset.load(Ordering::Acquire)
316 }
317
318 pub fn stats(&self) -> IoUringStatsSnapshot {
320 self.stats.snapshot()
321 }
322}
323
324pub struct AsyncReader {
328 file: Mutex<File>,
329 stats: Arc<IoUringStats>,
330 #[allow(dead_code)]
331 config: IoUringConfig,
332}
333
334impl AsyncReader {
335 pub fn open(path: impl AsRef<Path>, config: IoUringConfig) -> io::Result<Self> {
337 let file = std::fs::File::open(path)?;
338
339 Ok(Self {
340 file: Mutex::new(file),
341 stats: Arc::new(IoUringStats::new()),
342 config,
343 })
344 }
345
346 pub fn read_at(&self, offset: u64, buf: &mut [u8]) -> io::Result<usize> {
348 let mut file = self.file.lock();
349 file.seek(SeekFrom::Start(offset))?;
350 let n = file.read(buf)?;
351
352 self.stats.ops_submitted.fetch_add(1, Ordering::Relaxed);
353 self.stats.ops_completed.fetch_add(1, Ordering::Relaxed);
354 self.stats.bytes_read.fetch_add(n as u64, Ordering::Relaxed);
355
356 Ok(n)
357 }
358
359 pub fn read_exact_at(&self, offset: u64, buf: &mut [u8]) -> io::Result<()> {
361 let mut file = self.file.lock();
362 file.seek(SeekFrom::Start(offset))?;
363 file.read_exact(buf)?;
364
365 self.stats.ops_submitted.fetch_add(1, Ordering::Relaxed);
366 self.stats.ops_completed.fetch_add(1, Ordering::Relaxed);
367 self.stats
368 .bytes_read
369 .fetch_add(buf.len() as u64, Ordering::Relaxed);
370
371 Ok(())
372 }
373
374 pub fn stats(&self) -> IoUringStatsSnapshot {
376 self.stats.snapshot()
377 }
378}
379
380#[derive(Debug, Clone, Default)]
386pub struct BatchStats {
387 pub total_ops: u64,
389 pub write_ops: u64,
391 pub read_ops: u64,
393 pub sync_ops: u64,
395 pub write_bytes: u64,
397 pub read_bytes: u64,
399}
400
401#[derive(Debug, Default)]
403pub struct IoBatch {
404 operations: VecDeque<IoOperation>,
405}
406
407#[derive(Debug, Clone)]
409pub enum IoOperation {
410 Write {
412 offset: u64,
414 data: Bytes,
416 },
417 Read {
419 offset: u64,
421 len: usize,
423 },
424 Sync,
426}
427
428impl IoBatch {
429 pub fn new() -> Self {
431 Self {
432 operations: VecDeque::new(),
433 }
434 }
435
436 pub fn write(&mut self, offset: u64, data: impl Into<Bytes>) {
438 self.operations.push_back(IoOperation::Write {
439 offset,
440 data: data.into(),
441 });
442 }
443
444 pub fn read(&mut self, offset: u64, len: usize) {
446 self.operations.push_back(IoOperation::Read { offset, len });
447 }
448
449 pub fn sync(&mut self) {
451 self.operations.push_back(IoOperation::Sync);
452 }
453
454 pub fn len(&self) -> usize {
456 self.operations.len()
457 }
458
459 pub fn is_empty(&self) -> bool {
461 self.operations.is_empty()
462 }
463
464 pub fn clear(&mut self) {
466 self.operations.clear();
467 }
468
469 pub fn drain(&mut self) -> impl Iterator<Item = IoOperation> + '_ {
471 self.operations.drain(..)
472 }
473
474 pub fn pending_write_bytes(&self) -> u64 {
476 self.operations
477 .iter()
478 .map(|op| match op {
479 IoOperation::Write { data, .. } => data.len() as u64,
480 _ => 0,
481 })
482 .sum()
483 }
484
485 pub fn stats(&self) -> BatchStats {
487 let mut stats = BatchStats::default();
488 for op in &self.operations {
489 stats.total_ops += 1;
490 match op {
491 IoOperation::Write { data, .. } => {
492 stats.write_ops += 1;
493 stats.write_bytes += data.len() as u64;
494 }
495 IoOperation::Read { len, .. } => {
496 stats.read_ops += 1;
497 stats.read_bytes += *len as u64;
498 }
499 IoOperation::Sync => {
500 stats.sync_ops += 1;
501 }
502 }
503 }
504 stats
505 }
506
507 pub fn pending_write_ops(&self) -> usize {
509 self.operations
510 .iter()
511 .filter(|op| matches!(op, IoOperation::Write { .. }))
512 .count()
513 }
514
515 pub fn pending_read_ops(&self) -> usize {
517 self.operations
518 .iter()
519 .filter(|op| matches!(op, IoOperation::Read { .. }))
520 .count()
521 }
522}
523
524#[derive(Debug, Clone)]
526pub struct BatchReadResult {
527 pub offset: u64,
529 pub data: BytesMut,
531}
532
533pub struct BatchExecutor {
543 writer: Option<BlockingWriter>,
544 reader: Option<AsyncReader>,
545 stats: Arc<IoUringStats>,
546}
547
548impl BatchExecutor {
549 pub fn for_writer(writer: BlockingWriter) -> Self {
551 Self {
552 stats: writer.stats.clone(),
553 writer: Some(writer),
554 reader: None,
555 }
556 }
557
558 pub fn for_reader(reader: AsyncReader) -> Self {
560 Self {
561 stats: reader.stats.clone(),
562 writer: None,
563 reader: Some(reader),
564 }
565 }
566
567 pub fn execute(&self, batch: &mut IoBatch) -> io::Result<Vec<BatchReadResult>> {
571 let mut read_results = Vec::new();
572
573 for op in batch.drain() {
574 match op {
575 IoOperation::Write { offset: _, data } => {
576 if let Some(ref writer) = self.writer {
577 writer.write(&data)?;
578 } else {
579 return Err(io::Error::new(
580 io::ErrorKind::InvalidInput,
581 "No writer configured for batch executor",
582 ));
583 }
584 }
585 IoOperation::Read { offset, len } => {
586 if let Some(ref reader) = self.reader {
587 let mut buf = BytesMut::zeroed(len);
588 let n = reader.read_at(offset, &mut buf)?;
589 buf.truncate(n);
590 read_results.push(BatchReadResult { offset, data: buf });
591 } else {
592 return Err(io::Error::new(
593 io::ErrorKind::InvalidInput,
594 "No reader configured for batch executor",
595 ));
596 }
597 }
598 IoOperation::Sync => {
599 if let Some(ref writer) = self.writer {
600 writer.sync()?;
601 }
602 }
603 }
604 }
605
606 Ok(read_results)
607 }
608
609 pub fn stats(&self) -> IoUringStatsSnapshot {
611 self.stats.snapshot()
612 }
613}
614
615pub struct PortableWalWriter {
645 writer: BlockingWriter,
646 batch: Mutex<IoBatch>,
647 pending_bytes: AtomicU64,
648 max_batch_bytes: u64,
649}
650
651impl PortableWalWriter {
652 pub fn new(path: impl AsRef<Path>, config: IoUringConfig) -> io::Result<Self> {
654 let max_batch_bytes = (config.registered_buffers * config.buffer_size) as u64;
655
656 Ok(Self {
657 writer: BlockingWriter::new(path, config)?,
658 batch: Mutex::new(IoBatch::new()),
659 pending_bytes: AtomicU64::new(0),
660 max_batch_bytes,
661 })
662 }
663
664 pub fn append(&self, data: &[u8]) -> io::Result<u64> {
666 self.writer.write(data)
667 }
668
669 pub fn append_batched(&self, data: &[u8]) -> io::Result<u64> {
677 let data_len = data.len() as u64;
678 let offset = self.writer.offset();
679
680 {
681 let mut batch = self.batch.lock();
682 batch.write(offset, Bytes::copy_from_slice(data));
683 }
684
685 let pending = self.pending_bytes.fetch_add(data_len, Ordering::AcqRel) + data_len;
686
687 if pending >= self.max_batch_bytes {
689 self.flush_batch()?;
690 }
691
692 Ok(pending)
693 }
694
695 pub fn flush_batch(&self) -> io::Result<()> {
699 let mut batch = self.batch.lock();
700
701 if batch.is_empty() {
702 return Ok(());
703 }
704
705 for op in batch.drain() {
707 match op {
708 IoOperation::Write { data, .. } => {
709 self.writer.write(&data)?;
710 }
711 IoOperation::Sync => {
712 self.writer.sync()?;
713 }
714 IoOperation::Read { .. } => {
715 }
717 }
718 }
719
720 self.pending_bytes.store(0, Ordering::Release);
721 self.writer.flush()?;
722 self.writer.sync()
723 }
724
725 pub fn pending_batch_bytes(&self) -> u64 {
727 self.pending_bytes.load(Ordering::Acquire)
728 }
729
730 pub fn pending_batch_ops(&self) -> usize {
732 self.batch.lock().len()
733 }
734
735 pub fn has_pending_batch(&self) -> bool {
737 !self.batch.lock().is_empty()
738 }
739
740 pub fn append_with_checksum(&self, data: &[u8]) -> io::Result<u64> {
742 let checksum = crc32fast::hash(data);
743
744 let mut buf = Vec::with_capacity(4 + data.len() + 4);
745 buf.extend_from_slice(&(data.len() as u32).to_be_bytes());
746 buf.extend_from_slice(data);
747 buf.extend_from_slice(&checksum.to_be_bytes());
748
749 self.writer.write(&buf)
750 }
751
752 pub fn append_with_checksum_batched(&self, data: &[u8]) -> io::Result<u64> {
754 let checksum = crc32fast::hash(data);
755
756 let mut buf = Vec::with_capacity(4 + data.len() + 4);
757 buf.extend_from_slice(&(data.len() as u32).to_be_bytes());
758 buf.extend_from_slice(data);
759 buf.extend_from_slice(&checksum.to_be_bytes());
760
761 self.append_batched(&buf)
762 }
763
764 pub fn sync(&self) -> io::Result<()> {
766 self.flush_batch()?;
768 self.writer.flush()?;
769 self.writer.sync()
770 }
771
772 pub fn size(&self) -> u64 {
774 self.writer.offset()
775 }
776
777 pub fn max_batch_bytes(&self) -> u64 {
779 self.max_batch_bytes
780 }
781
782 pub fn stats(&self) -> IoUringStatsSnapshot {
784 self.writer.stats()
785 }
786}
787
788pub struct SegmentReader {
794 reader: AsyncReader,
795 length: u64,
796}
797
798impl SegmentReader {
799 pub fn open(path: impl AsRef<Path>, config: IoUringConfig) -> io::Result<Self> {
801 let metadata = std::fs::metadata(&path)?;
802 let length = metadata.len();
803
804 Ok(Self {
805 reader: AsyncReader::open(path, config)?,
806 length,
807 })
808 }
809
810 pub fn read_messages(&self, offset: u64, max_bytes: usize) -> io::Result<BytesMut> {
812 let mut buf = BytesMut::zeroed(max_bytes);
813 let n = self.reader.read_at(offset, &mut buf)?;
814 buf.truncate(n);
815 Ok(buf)
816 }
817
818 pub fn read_range(&self, offset: u64, len: usize) -> io::Result<BytesMut> {
820 let mut buf = BytesMut::zeroed(len);
821 self.reader.read_exact_at(offset, &mut buf)?;
822 Ok(buf)
823 }
824
825 pub fn len(&self) -> u64 {
827 self.length
828 }
829
830 pub fn is_empty(&self) -> bool {
832 self.length == 0
833 }
834
835 pub fn stats(&self) -> IoUringStatsSnapshot {
837 self.reader.stats()
838 }
839}
840
841#[cfg(test)]
846mod tests {
847 use super::*;
848 use tempfile::tempdir;
849
850 #[test]
851 fn test_config_defaults() {
852 let config = IoUringConfig::default();
853 assert_eq!(config.sq_entries, 1024);
854 assert!(!config.kernel_poll);
855 assert_eq!(config.max_inflight, 256);
856 }
857
858 #[test]
859 fn test_config_high_throughput() {
860 let config = IoUringConfig::high_throughput();
861 assert_eq!(config.sq_entries, 4096);
862 assert!(config.kernel_poll);
863 assert!(config.direct_io);
864 }
865
866 #[test]
867 fn test_config_low_latency() {
868 let config = IoUringConfig::low_latency();
869 assert_eq!(config.sq_entries, 256);
870 assert!(config.kernel_poll);
871 }
872
873 #[test]
874 fn test_stats_snapshot() {
875 let stats = IoUringStats::new();
876 stats.ops_submitted.store(100, Ordering::Relaxed);
877 stats.ops_completed.store(95, Ordering::Relaxed);
878 stats.bytes_written.store(10000, Ordering::Relaxed);
879
880 let snapshot = stats.snapshot();
881 assert_eq!(snapshot.in_flight(), 5);
882 assert!((snapshot.completion_rate() - 0.95).abs() < 0.001);
883 }
884
885 #[test]
886 fn test_async_writer() {
887 let dir = tempdir().unwrap();
888 let path = dir.path().join("test.log");
889
890 let config = IoUringConfig::minimal();
891 let writer = BlockingWriter::new(&path, config).unwrap();
892
893 let offset = writer.write(b"hello").unwrap();
894 assert_eq!(offset, 0);
895
896 let offset = writer.write(b"world").unwrap();
897 assert_eq!(offset, 5);
898
899 writer.flush().unwrap();
900
901 let stats = writer.stats();
902 assert_eq!(stats.ops_completed, 2);
903 assert_eq!(stats.bytes_written, 10);
904 }
905
906 #[test]
907 fn test_async_reader() {
908 let dir = tempdir().unwrap();
909 let path = dir.path().join("test.log");
910
911 std::fs::write(&path, b"hello world test data").unwrap();
913
914 let config = IoUringConfig::minimal();
915 let reader = AsyncReader::open(&path, config).unwrap();
916
917 let mut buf = [0u8; 5];
918 let n = reader.read_at(0, &mut buf).unwrap();
919 assert_eq!(n, 5);
920 assert_eq!(&buf, b"hello");
921
922 let mut buf = [0u8; 5];
923 reader.read_exact_at(6, &mut buf).unwrap();
924 assert_eq!(&buf, b"world");
925
926 let stats = reader.stats();
927 assert_eq!(stats.ops_completed, 2);
928 }
929
930 #[test]
931 fn test_io_batch() {
932 let mut batch = IoBatch::new();
933 assert!(batch.is_empty());
934
935 batch.write(0, Bytes::from_static(b"hello"));
936 batch.read(100, 50);
937 batch.sync();
938
939 assert_eq!(batch.len(), 3);
940 assert!(!batch.is_empty());
941
942 batch.clear();
943 assert!(batch.is_empty());
944 }
945
946 #[test]
947 fn test_wal_writer() {
948 let dir = tempdir().unwrap();
949 let path = dir.path().join("wal.log");
950
951 let config = IoUringConfig::minimal();
952 let wal = PortableWalWriter::new(&path, config).unwrap();
953
954 let offset = wal.append(b"entry1").unwrap();
955 assert_eq!(offset, 0);
956
957 let offset = wal.append_with_checksum(b"entry2").unwrap();
958 assert!(offset > 0);
959
960 wal.sync().unwrap();
961 assert!(wal.size() > 0);
962 }
963
964 #[test]
965 fn test_segment_reader() {
966 let dir = tempdir().unwrap();
967 let path = dir.path().join("segment.log");
968
969 std::fs::write(&path, b"message1message2message3").unwrap();
970
971 let config = IoUringConfig::minimal();
972 let reader = SegmentReader::open(&path, config).unwrap();
973
974 assert_eq!(reader.len(), 24);
975 assert!(!reader.is_empty());
976
977 let data = reader.read_messages(0, 100).unwrap();
978 assert_eq!(&data[..], b"message1message2message3");
979
980 let data = reader.read_range(8, 8).unwrap();
981 assert_eq!(&data[..], b"message2");
982 }
983
984 #[test]
985 fn test_io_uring_availability() {
986 let available = is_io_uring_available();
988 println!("io_uring available: {}", available);
989 }
990
991 #[test]
996 fn test_io_batch_pending_write_bytes() {
997 let mut batch = IoBatch::new();
998 assert_eq!(batch.pending_write_bytes(), 0);
999
1000 batch.write(0, Bytes::from_static(b"hello"));
1001 batch.write(5, Bytes::from_static(b"world"));
1002 batch.read(100, 50); assert_eq!(batch.pending_write_bytes(), 10);
1005 }
1006
1007 #[test]
1008 fn test_io_batch_drain() {
1009 let mut batch = IoBatch::new();
1010 batch.write(0, Bytes::from_static(b"hello"));
1011 batch.sync();
1012
1013 let ops: Vec<_> = batch.drain().collect();
1014 assert_eq!(ops.len(), 2);
1015 assert!(batch.is_empty());
1016 }
1017
1018 #[test]
1019 fn test_batch_executor_write() {
1020 let dir = tempdir().unwrap();
1021 let path = dir.path().join("batch_write.log");
1022
1023 let config = IoUringConfig::minimal();
1024 let writer = BlockingWriter::new(&path, config).unwrap();
1025 let executor = BatchExecutor::for_writer(writer);
1026
1027 let mut batch = IoBatch::new();
1028 batch.write(0, Bytes::from_static(b"hello"));
1029 batch.write(5, Bytes::from_static(b"world"));
1030 batch.sync();
1031
1032 let results = executor.execute(&mut batch).unwrap();
1033 assert!(results.is_empty()); let contents = std::fs::read(&path).unwrap();
1037 assert_eq!(&contents, b"helloworld");
1038 }
1039
1040 #[test]
1041 fn test_batch_executor_read() {
1042 let dir = tempdir().unwrap();
1043 let path = dir.path().join("batch_read.log");
1044 std::fs::write(&path, b"hello world test data").unwrap();
1045
1046 let config = IoUringConfig::minimal();
1047 let reader = AsyncReader::open(&path, config).unwrap();
1048 let executor = BatchExecutor::for_reader(reader);
1049
1050 let mut batch = IoBatch::new();
1051 batch.read(0, 5);
1052 batch.read(6, 5);
1053
1054 let results = executor.execute(&mut batch).unwrap();
1055 assert_eq!(results.len(), 2);
1056 assert_eq!(&results[0].data[..], b"hello");
1057 assert_eq!(results[0].offset, 0);
1058 assert_eq!(&results[1].data[..], b"world");
1059 assert_eq!(results[1].offset, 6);
1060 }
1061
1062 #[test]
1063 fn test_wal_writer_batched() {
1064 let dir = tempdir().unwrap();
1065 let path = dir.path().join("wal_batch.log");
1066
1067 let config = IoUringConfig::minimal();
1068 let wal = PortableWalWriter::new(&path, config).unwrap();
1069
1070 wal.append(b"direct").unwrap();
1072
1073 wal.append_batched(b"batch1").unwrap();
1075 wal.append_batched(b"batch2").unwrap();
1076
1077 assert!(wal.has_pending_batch());
1078 assert_eq!(wal.pending_batch_ops(), 2);
1079 assert_eq!(wal.pending_batch_bytes(), 12);
1080
1081 wal.flush_batch().unwrap();
1083
1084 assert!(!wal.has_pending_batch());
1085 assert_eq!(wal.pending_batch_bytes(), 0);
1086
1087 assert!(wal.size() >= 18); }
1090
1091 #[test]
1092 fn test_wal_writer_batched_checksum() {
1093 let dir = tempdir().unwrap();
1094 let path = dir.path().join("wal_batch_crc.log");
1095
1096 let config = IoUringConfig::minimal();
1097 let wal = PortableWalWriter::new(&path, config).unwrap();
1098
1099 wal.append_with_checksum_batched(b"data1").unwrap();
1101 wal.append_with_checksum_batched(b"data2").unwrap();
1102
1103 assert!(wal.has_pending_batch());
1104 wal.sync().unwrap(); assert!(!wal.has_pending_batch());
1107 assert!(wal.size() > 0);
1108 }
1109
1110 #[test]
1111 fn test_wal_writer_auto_flush() {
1112 let dir = tempdir().unwrap();
1113 let path = dir.path().join("wal_auto_flush.log");
1114
1115 let mut config = IoUringConfig::minimal();
1117 config.registered_buffers = 1;
1118 config.buffer_size = 10; let wal = PortableWalWriter::new(&path, config).unwrap();
1121 assert_eq!(wal.max_batch_bytes(), 10);
1122
1123 wal.append_batched(b"hello").unwrap(); assert!(wal.has_pending_batch());
1126
1127 wal.append_batched(b"world!").unwrap();
1129 assert!(!wal.has_pending_batch()); }
1131
1132 #[test]
1133 fn test_io_batch_stats() {
1134 let mut batch = IoBatch::new();
1135
1136 let stats = batch.stats();
1138 assert_eq!(stats.total_ops, 0);
1139 assert_eq!(stats.write_ops, 0);
1140 assert_eq!(stats.read_ops, 0);
1141 assert_eq!(stats.sync_ops, 0);
1142
1143 batch.write(0, Bytes::from_static(b"hello"));
1145 batch.write(5, Bytes::from_static(b"world"));
1146 batch.read(100, 50);
1147 batch.read(200, 100);
1148 batch.sync();
1149
1150 let stats = batch.stats();
1151 assert_eq!(stats.total_ops, 5);
1152 assert_eq!(stats.write_ops, 2);
1153 assert_eq!(stats.read_ops, 2);
1154 assert_eq!(stats.sync_ops, 1);
1155 assert_eq!(stats.write_bytes, 10); assert_eq!(stats.read_bytes, 150); }
1158
1159 #[test]
1160 fn test_io_batch_pending_ops() {
1161 let mut batch = IoBatch::new();
1162
1163 batch.write(0, Bytes::from_static(b"data1"));
1164 batch.write(5, Bytes::from_static(b"data2"));
1165 batch.read(100, 50);
1166 batch.sync();
1167
1168 assert_eq!(batch.pending_write_ops(), 2);
1169 assert_eq!(batch.pending_read_ops(), 1);
1170 }
1171}