1use std::collections::VecDeque;
39use std::fs::File;
40use std::io::{self, Read, Seek, SeekFrom, Write};
41use std::path::Path;
42use std::sync::atomic::{AtomicU64, Ordering};
43use std::sync::Arc;
44
45use bytes::{Bytes, BytesMut};
46use parking_lot::Mutex;
47
48#[derive(Debug, Clone)]
54pub struct IoUringConfig {
55 pub sq_entries: u32,
57 pub kernel_poll: bool,
59 pub sq_poll_idle_ms: u32,
61 pub max_inflight: usize,
63 pub direct_io: bool,
65 pub registered_buffers: usize,
67 pub buffer_size: usize,
69}
70
71impl Default for IoUringConfig {
72 fn default() -> Self {
73 Self {
74 sq_entries: 1024,
75 kernel_poll: false, sq_poll_idle_ms: 10,
77 max_inflight: 256,
78 direct_io: false,
79 registered_buffers: 64,
80 buffer_size: 64 * 1024, }
82 }
83}
84
85impl IoUringConfig {
86 pub fn high_throughput() -> Self {
88 Self {
89 sq_entries: 4096,
90 kernel_poll: true,
91 sq_poll_idle_ms: 100,
92 max_inflight: 1024,
93 direct_io: true,
94 registered_buffers: 256,
95 buffer_size: 256 * 1024, }
97 }
98
99 pub fn low_latency() -> Self {
101 Self {
102 sq_entries: 256,
103 kernel_poll: true,
104 sq_poll_idle_ms: 1,
105 max_inflight: 64,
106 direct_io: true,
107 registered_buffers: 32,
108 buffer_size: 16 * 1024, }
110 }
111
112 pub fn minimal() -> Self {
114 Self {
115 sq_entries: 128,
116 kernel_poll: false,
117 sq_poll_idle_ms: 0,
118 max_inflight: 32,
119 direct_io: false,
120 registered_buffers: 8,
121 buffer_size: 4 * 1024, }
123 }
124}
125
126#[cfg(target_os = "linux")]
132pub fn is_io_uring_available() -> bool {
133 let version = match std::fs::read_to_string("/proc/sys/kernel/osrelease") {
135 Ok(v) => v,
136 Err(_) => return false,
137 };
138 let version = version.trim();
139 let parts: Vec<&str> = version.split('.').collect();
140
141 if parts.len() < 2 {
142 return false;
143 }
144
145 let major: u32 = parts[0].parse().unwrap_or(0);
146 let minor: u32 = parts[1]
147 .split('-')
148 .next()
149 .unwrap_or("0")
150 .parse()
151 .unwrap_or(0);
152
153 major > 5 || (major == 5 && minor >= 6)
155}
156
157#[cfg(not(target_os = "linux"))]
158pub fn is_io_uring_available() -> bool {
159 false
160}
161
162#[derive(Debug, Default)]
168pub struct IoUringStats {
169 pub ops_submitted: AtomicU64,
171 pub ops_completed: AtomicU64,
173 pub bytes_written: AtomicU64,
175 pub bytes_read: AtomicU64,
177 pub cqe_overflows: AtomicU64,
179 pub sq_dropped: AtomicU64,
181}
182
183impl IoUringStats {
184 pub fn new() -> Self {
186 Self::default()
187 }
188
189 pub fn snapshot(&self) -> IoUringStatsSnapshot {
191 IoUringStatsSnapshot {
192 ops_submitted: self.ops_submitted.load(Ordering::Relaxed),
193 ops_completed: self.ops_completed.load(Ordering::Relaxed),
194 bytes_written: self.bytes_written.load(Ordering::Relaxed),
195 bytes_read: self.bytes_read.load(Ordering::Relaxed),
196 cqe_overflows: self.cqe_overflows.load(Ordering::Relaxed),
197 sq_dropped: self.sq_dropped.load(Ordering::Relaxed),
198 }
199 }
200}
201
202#[derive(Debug, Clone)]
204pub struct IoUringStatsSnapshot {
205 pub ops_submitted: u64,
206 pub ops_completed: u64,
207 pub bytes_written: u64,
208 pub bytes_read: u64,
209 pub cqe_overflows: u64,
210 pub sq_dropped: u64,
211}
212
213impl IoUringStatsSnapshot {
214 pub fn in_flight(&self) -> u64 {
216 self.ops_submitted.saturating_sub(self.ops_completed)
217 }
218
219 pub fn completion_rate(&self) -> f64 {
221 if self.ops_submitted == 0 {
222 1.0
223 } else {
224 self.ops_completed as f64 / self.ops_submitted as f64
225 }
226 }
227}
228
229pub struct AsyncWriter {
239 file: Mutex<File>,
240 offset: AtomicU64,
241 stats: Arc<IoUringStats>,
242 #[allow(dead_code)] config: IoUringConfig,
244}
245
246impl AsyncWriter {
247 pub fn new(path: impl AsRef<Path>, config: IoUringConfig) -> io::Result<Self> {
249 let file = std::fs::OpenOptions::new()
250 .create(true)
251 .append(true)
252 .open(path)?;
253
254 let offset = file.metadata()?.len();
255
256 Ok(Self {
257 file: Mutex::new(file),
258 offset: AtomicU64::new(offset),
259 stats: Arc::new(IoUringStats::new()),
260 config,
261 })
262 }
263
264 pub fn open(path: impl AsRef<Path>, config: IoUringConfig) -> io::Result<Self> {
266 let file = std::fs::OpenOptions::new().write(true).open(path)?;
267
268 let offset = file.metadata()?.len();
269
270 Ok(Self {
271 file: Mutex::new(file),
272 offset: AtomicU64::new(offset),
273 stats: Arc::new(IoUringStats::new()),
274 config,
275 })
276 }
277
278 pub fn write(&self, data: &[u8]) -> io::Result<u64> {
280 let mut file = self.file.lock();
281 file.write_all(data)?;
282
283 let offset = self.offset.fetch_add(data.len() as u64, Ordering::AcqRel);
284 self.stats.ops_submitted.fetch_add(1, Ordering::Relaxed);
285 self.stats.ops_completed.fetch_add(1, Ordering::Relaxed);
286 self.stats
287 .bytes_written
288 .fetch_add(data.len() as u64, Ordering::Relaxed);
289
290 Ok(offset)
291 }
292
293 pub fn flush(&self) -> io::Result<()> {
295 let mut file = self.file.lock();
296 file.flush()?;
297 file.sync_data()
298 }
299
300 pub fn sync(&self) -> io::Result<()> {
302 let file = self.file.lock();
303 file.sync_data()
304 }
305
306 pub fn offset(&self) -> u64 {
308 self.offset.load(Ordering::Acquire)
309 }
310
311 pub fn stats(&self) -> IoUringStatsSnapshot {
313 self.stats.snapshot()
314 }
315}
316
317pub struct AsyncReader {
321 file: Mutex<File>,
322 stats: Arc<IoUringStats>,
323 #[allow(dead_code)]
324 config: IoUringConfig,
325}
326
327impl AsyncReader {
328 pub fn open(path: impl AsRef<Path>, config: IoUringConfig) -> io::Result<Self> {
330 let file = std::fs::File::open(path)?;
331
332 Ok(Self {
333 file: Mutex::new(file),
334 stats: Arc::new(IoUringStats::new()),
335 config,
336 })
337 }
338
339 pub fn read_at(&self, offset: u64, buf: &mut [u8]) -> io::Result<usize> {
341 let mut file = self.file.lock();
342 file.seek(SeekFrom::Start(offset))?;
343 let n = file.read(buf)?;
344
345 self.stats.ops_submitted.fetch_add(1, Ordering::Relaxed);
346 self.stats.ops_completed.fetch_add(1, Ordering::Relaxed);
347 self.stats.bytes_read.fetch_add(n as u64, Ordering::Relaxed);
348
349 Ok(n)
350 }
351
352 pub fn read_exact_at(&self, offset: u64, buf: &mut [u8]) -> io::Result<()> {
354 let mut file = self.file.lock();
355 file.seek(SeekFrom::Start(offset))?;
356 file.read_exact(buf)?;
357
358 self.stats.ops_submitted.fetch_add(1, Ordering::Relaxed);
359 self.stats.ops_completed.fetch_add(1, Ordering::Relaxed);
360 self.stats
361 .bytes_read
362 .fetch_add(buf.len() as u64, Ordering::Relaxed);
363
364 Ok(())
365 }
366
367 pub fn stats(&self) -> IoUringStatsSnapshot {
369 self.stats.snapshot()
370 }
371}
372
373#[derive(Debug, Clone, Default)]
379pub struct BatchStats {
380 pub total_ops: u64,
382 pub write_ops: u64,
384 pub read_ops: u64,
386 pub sync_ops: u64,
388 pub write_bytes: u64,
390 pub read_bytes: u64,
392}
393
394#[derive(Debug, Default)]
396pub struct IoBatch {
397 operations: VecDeque<IoOperation>,
398}
399
400#[derive(Debug, Clone)]
402pub enum IoOperation {
403 Write {
405 offset: u64,
407 data: Bytes,
409 },
410 Read {
412 offset: u64,
414 len: usize,
416 },
417 Sync,
419}
420
421impl IoBatch {
422 pub fn new() -> Self {
424 Self {
425 operations: VecDeque::new(),
426 }
427 }
428
429 pub fn write(&mut self, offset: u64, data: impl Into<Bytes>) {
431 self.operations.push_back(IoOperation::Write {
432 offset,
433 data: data.into(),
434 });
435 }
436
437 pub fn read(&mut self, offset: u64, len: usize) {
439 self.operations.push_back(IoOperation::Read { offset, len });
440 }
441
442 pub fn sync(&mut self) {
444 self.operations.push_back(IoOperation::Sync);
445 }
446
447 pub fn len(&self) -> usize {
449 self.operations.len()
450 }
451
452 pub fn is_empty(&self) -> bool {
454 self.operations.is_empty()
455 }
456
457 pub fn clear(&mut self) {
459 self.operations.clear();
460 }
461
462 pub fn drain(&mut self) -> impl Iterator<Item = IoOperation> + '_ {
464 self.operations.drain(..)
465 }
466
467 pub fn pending_write_bytes(&self) -> u64 {
469 self.operations
470 .iter()
471 .map(|op| match op {
472 IoOperation::Write { data, .. } => data.len() as u64,
473 _ => 0,
474 })
475 .sum()
476 }
477
478 pub fn stats(&self) -> BatchStats {
480 let mut stats = BatchStats::default();
481 for op in &self.operations {
482 stats.total_ops += 1;
483 match op {
484 IoOperation::Write { data, .. } => {
485 stats.write_ops += 1;
486 stats.write_bytes += data.len() as u64;
487 }
488 IoOperation::Read { len, .. } => {
489 stats.read_ops += 1;
490 stats.read_bytes += *len as u64;
491 }
492 IoOperation::Sync => {
493 stats.sync_ops += 1;
494 }
495 }
496 }
497 stats
498 }
499
500 pub fn pending_write_ops(&self) -> usize {
502 self.operations
503 .iter()
504 .filter(|op| matches!(op, IoOperation::Write { .. }))
505 .count()
506 }
507
508 pub fn pending_read_ops(&self) -> usize {
510 self.operations
511 .iter()
512 .filter(|op| matches!(op, IoOperation::Read { .. }))
513 .count()
514 }
515}
516
517#[derive(Debug, Clone)]
519pub struct BatchReadResult {
520 pub offset: u64,
522 pub data: BytesMut,
524}
525
526pub struct BatchExecutor {
536 writer: Option<AsyncWriter>,
537 reader: Option<AsyncReader>,
538 stats: Arc<IoUringStats>,
539}
540
541impl BatchExecutor {
542 pub fn for_writer(writer: AsyncWriter) -> Self {
544 Self {
545 stats: writer.stats.clone(),
546 writer: Some(writer),
547 reader: None,
548 }
549 }
550
551 pub fn for_reader(reader: AsyncReader) -> Self {
553 Self {
554 stats: reader.stats.clone(),
555 writer: None,
556 reader: Some(reader),
557 }
558 }
559
560 pub fn execute(&self, batch: &mut IoBatch) -> io::Result<Vec<BatchReadResult>> {
564 let mut read_results = Vec::new();
565
566 for op in batch.drain() {
567 match op {
568 IoOperation::Write { offset: _, data } => {
569 if let Some(ref writer) = self.writer {
570 writer.write(&data)?;
571 } else {
572 return Err(io::Error::new(
573 io::ErrorKind::InvalidInput,
574 "No writer configured for batch executor",
575 ));
576 }
577 }
578 IoOperation::Read { offset, len } => {
579 if let Some(ref reader) = self.reader {
580 let mut buf = BytesMut::zeroed(len);
581 let n = reader.read_at(offset, &mut buf)?;
582 buf.truncate(n);
583 read_results.push(BatchReadResult { offset, data: buf });
584 } else {
585 return Err(io::Error::new(
586 io::ErrorKind::InvalidInput,
587 "No reader configured for batch executor",
588 ));
589 }
590 }
591 IoOperation::Sync => {
592 if let Some(ref writer) = self.writer {
593 writer.sync()?;
594 }
595 }
596 }
597 }
598
599 Ok(read_results)
600 }
601
602 pub fn stats(&self) -> IoUringStatsSnapshot {
604 self.stats.snapshot()
605 }
606}
607
608pub struct WalWriter {
638 writer: AsyncWriter,
639 batch: Mutex<IoBatch>,
640 pending_bytes: AtomicU64,
641 max_batch_bytes: u64,
642}
643
644impl WalWriter {
645 pub fn new(path: impl AsRef<Path>, config: IoUringConfig) -> io::Result<Self> {
647 let max_batch_bytes = (config.registered_buffers * config.buffer_size) as u64;
648
649 Ok(Self {
650 writer: AsyncWriter::new(path, config)?,
651 batch: Mutex::new(IoBatch::new()),
652 pending_bytes: AtomicU64::new(0),
653 max_batch_bytes,
654 })
655 }
656
657 pub fn append(&self, data: &[u8]) -> io::Result<u64> {
659 self.writer.write(data)
660 }
661
662 pub fn append_batched(&self, data: &[u8]) -> io::Result<u64> {
670 let data_len = data.len() as u64;
671 let offset = self.writer.offset();
672
673 {
674 let mut batch = self.batch.lock();
675 batch.write(offset, Bytes::copy_from_slice(data));
676 }
677
678 let pending = self.pending_bytes.fetch_add(data_len, Ordering::AcqRel) + data_len;
679
680 if pending >= self.max_batch_bytes {
682 self.flush_batch()?;
683 }
684
685 Ok(pending)
686 }
687
688 pub fn flush_batch(&self) -> io::Result<()> {
692 let mut batch = self.batch.lock();
693
694 if batch.is_empty() {
695 return Ok(());
696 }
697
698 for op in batch.drain() {
700 match op {
701 IoOperation::Write { data, .. } => {
702 self.writer.write(&data)?;
703 }
704 IoOperation::Sync => {
705 self.writer.sync()?;
706 }
707 IoOperation::Read { .. } => {
708 }
710 }
711 }
712
713 self.pending_bytes.store(0, Ordering::Release);
714 self.writer.flush()?;
715 self.writer.sync()
716 }
717
718 pub fn pending_batch_bytes(&self) -> u64 {
720 self.pending_bytes.load(Ordering::Acquire)
721 }
722
723 pub fn pending_batch_ops(&self) -> usize {
725 self.batch.lock().len()
726 }
727
728 pub fn has_pending_batch(&self) -> bool {
730 !self.batch.lock().is_empty()
731 }
732
733 pub fn append_with_checksum(&self, data: &[u8]) -> io::Result<u64> {
735 let checksum = crc32fast::hash(data);
736
737 let mut buf = Vec::with_capacity(4 + data.len() + 4);
738 buf.extend_from_slice(&(data.len() as u32).to_be_bytes());
739 buf.extend_from_slice(data);
740 buf.extend_from_slice(&checksum.to_be_bytes());
741
742 self.writer.write(&buf)
743 }
744
745 pub fn append_with_checksum_batched(&self, data: &[u8]) -> io::Result<u64> {
747 let checksum = crc32fast::hash(data);
748
749 let mut buf = Vec::with_capacity(4 + data.len() + 4);
750 buf.extend_from_slice(&(data.len() as u32).to_be_bytes());
751 buf.extend_from_slice(data);
752 buf.extend_from_slice(&checksum.to_be_bytes());
753
754 self.append_batched(&buf)
755 }
756
757 pub fn sync(&self) -> io::Result<()> {
759 self.flush_batch()?;
761 self.writer.flush()?;
762 self.writer.sync()
763 }
764
765 pub fn size(&self) -> u64 {
767 self.writer.offset()
768 }
769
770 pub fn max_batch_bytes(&self) -> u64 {
772 self.max_batch_bytes
773 }
774
775 pub fn stats(&self) -> IoUringStatsSnapshot {
777 self.writer.stats()
778 }
779}
780
781pub struct SegmentReader {
787 reader: AsyncReader,
788 length: u64,
789}
790
791impl SegmentReader {
792 pub fn open(path: impl AsRef<Path>, config: IoUringConfig) -> io::Result<Self> {
794 let metadata = std::fs::metadata(&path)?;
795 let length = metadata.len();
796
797 Ok(Self {
798 reader: AsyncReader::open(path, config)?,
799 length,
800 })
801 }
802
803 pub fn read_messages(&self, offset: u64, max_bytes: usize) -> io::Result<BytesMut> {
805 let mut buf = BytesMut::zeroed(max_bytes);
806 let n = self.reader.read_at(offset, &mut buf)?;
807 buf.truncate(n);
808 Ok(buf)
809 }
810
811 pub fn read_range(&self, offset: u64, len: usize) -> io::Result<BytesMut> {
813 let mut buf = BytesMut::zeroed(len);
814 self.reader.read_exact_at(offset, &mut buf)?;
815 Ok(buf)
816 }
817
818 pub fn len(&self) -> u64 {
820 self.length
821 }
822
823 pub fn is_empty(&self) -> bool {
825 self.length == 0
826 }
827
828 pub fn stats(&self) -> IoUringStatsSnapshot {
830 self.reader.stats()
831 }
832}
833
834#[cfg(test)]
839mod tests {
840 use super::*;
841 use tempfile::tempdir;
842
843 #[test]
844 fn test_config_defaults() {
845 let config = IoUringConfig::default();
846 assert_eq!(config.sq_entries, 1024);
847 assert!(!config.kernel_poll);
848 assert_eq!(config.max_inflight, 256);
849 }
850
851 #[test]
852 fn test_config_high_throughput() {
853 let config = IoUringConfig::high_throughput();
854 assert_eq!(config.sq_entries, 4096);
855 assert!(config.kernel_poll);
856 assert!(config.direct_io);
857 }
858
859 #[test]
860 fn test_config_low_latency() {
861 let config = IoUringConfig::low_latency();
862 assert_eq!(config.sq_entries, 256);
863 assert!(config.kernel_poll);
864 }
865
866 #[test]
867 fn test_stats_snapshot() {
868 let stats = IoUringStats::new();
869 stats.ops_submitted.store(100, Ordering::Relaxed);
870 stats.ops_completed.store(95, Ordering::Relaxed);
871 stats.bytes_written.store(10000, Ordering::Relaxed);
872
873 let snapshot = stats.snapshot();
874 assert_eq!(snapshot.in_flight(), 5);
875 assert!((snapshot.completion_rate() - 0.95).abs() < 0.001);
876 }
877
878 #[test]
879 fn test_async_writer() {
880 let dir = tempdir().unwrap();
881 let path = dir.path().join("test.log");
882
883 let config = IoUringConfig::minimal();
884 let writer = AsyncWriter::new(&path, config).unwrap();
885
886 let offset = writer.write(b"hello").unwrap();
887 assert_eq!(offset, 0);
888
889 let offset = writer.write(b"world").unwrap();
890 assert_eq!(offset, 5);
891
892 writer.flush().unwrap();
893
894 let stats = writer.stats();
895 assert_eq!(stats.ops_completed, 2);
896 assert_eq!(stats.bytes_written, 10);
897 }
898
899 #[test]
900 fn test_async_reader() {
901 let dir = tempdir().unwrap();
902 let path = dir.path().join("test.log");
903
904 std::fs::write(&path, b"hello world test data").unwrap();
906
907 let config = IoUringConfig::minimal();
908 let reader = AsyncReader::open(&path, config).unwrap();
909
910 let mut buf = [0u8; 5];
911 let n = reader.read_at(0, &mut buf).unwrap();
912 assert_eq!(n, 5);
913 assert_eq!(&buf, b"hello");
914
915 let mut buf = [0u8; 5];
916 reader.read_exact_at(6, &mut buf).unwrap();
917 assert_eq!(&buf, b"world");
918
919 let stats = reader.stats();
920 assert_eq!(stats.ops_completed, 2);
921 }
922
923 #[test]
924 fn test_io_batch() {
925 let mut batch = IoBatch::new();
926 assert!(batch.is_empty());
927
928 batch.write(0, Bytes::from_static(b"hello"));
929 batch.read(100, 50);
930 batch.sync();
931
932 assert_eq!(batch.len(), 3);
933 assert!(!batch.is_empty());
934
935 batch.clear();
936 assert!(batch.is_empty());
937 }
938
939 #[test]
940 fn test_wal_writer() {
941 let dir = tempdir().unwrap();
942 let path = dir.path().join("wal.log");
943
944 let config = IoUringConfig::minimal();
945 let wal = WalWriter::new(&path, config).unwrap();
946
947 let offset = wal.append(b"entry1").unwrap();
948 assert_eq!(offset, 0);
949
950 let offset = wal.append_with_checksum(b"entry2").unwrap();
951 assert!(offset > 0);
952
953 wal.sync().unwrap();
954 assert!(wal.size() > 0);
955 }
956
957 #[test]
958 fn test_segment_reader() {
959 let dir = tempdir().unwrap();
960 let path = dir.path().join("segment.log");
961
962 std::fs::write(&path, b"message1message2message3").unwrap();
963
964 let config = IoUringConfig::minimal();
965 let reader = SegmentReader::open(&path, config).unwrap();
966
967 assert_eq!(reader.len(), 24);
968 assert!(!reader.is_empty());
969
970 let data = reader.read_messages(0, 100).unwrap();
971 assert_eq!(&data[..], b"message1message2message3");
972
973 let data = reader.read_range(8, 8).unwrap();
974 assert_eq!(&data[..], b"message2");
975 }
976
977 #[test]
978 fn test_io_uring_availability() {
979 let available = is_io_uring_available();
981 println!("io_uring available: {}", available);
982 }
983
984 #[test]
989 fn test_io_batch_pending_write_bytes() {
990 let mut batch = IoBatch::new();
991 assert_eq!(batch.pending_write_bytes(), 0);
992
993 batch.write(0, Bytes::from_static(b"hello"));
994 batch.write(5, Bytes::from_static(b"world"));
995 batch.read(100, 50); assert_eq!(batch.pending_write_bytes(), 10);
998 }
999
1000 #[test]
1001 fn test_io_batch_drain() {
1002 let mut batch = IoBatch::new();
1003 batch.write(0, Bytes::from_static(b"hello"));
1004 batch.sync();
1005
1006 let ops: Vec<_> = batch.drain().collect();
1007 assert_eq!(ops.len(), 2);
1008 assert!(batch.is_empty());
1009 }
1010
1011 #[test]
1012 fn test_batch_executor_write() {
1013 let dir = tempdir().unwrap();
1014 let path = dir.path().join("batch_write.log");
1015
1016 let config = IoUringConfig::minimal();
1017 let writer = AsyncWriter::new(&path, config).unwrap();
1018 let executor = BatchExecutor::for_writer(writer);
1019
1020 let mut batch = IoBatch::new();
1021 batch.write(0, Bytes::from_static(b"hello"));
1022 batch.write(5, Bytes::from_static(b"world"));
1023 batch.sync();
1024
1025 let results = executor.execute(&mut batch).unwrap();
1026 assert!(results.is_empty()); let contents = std::fs::read(&path).unwrap();
1030 assert_eq!(&contents, b"helloworld");
1031 }
1032
1033 #[test]
1034 fn test_batch_executor_read() {
1035 let dir = tempdir().unwrap();
1036 let path = dir.path().join("batch_read.log");
1037 std::fs::write(&path, b"hello world test data").unwrap();
1038
1039 let config = IoUringConfig::minimal();
1040 let reader = AsyncReader::open(&path, config).unwrap();
1041 let executor = BatchExecutor::for_reader(reader);
1042
1043 let mut batch = IoBatch::new();
1044 batch.read(0, 5);
1045 batch.read(6, 5);
1046
1047 let results = executor.execute(&mut batch).unwrap();
1048 assert_eq!(results.len(), 2);
1049 assert_eq!(&results[0].data[..], b"hello");
1050 assert_eq!(results[0].offset, 0);
1051 assert_eq!(&results[1].data[..], b"world");
1052 assert_eq!(results[1].offset, 6);
1053 }
1054
1055 #[test]
1056 fn test_wal_writer_batched() {
1057 let dir = tempdir().unwrap();
1058 let path = dir.path().join("wal_batch.log");
1059
1060 let config = IoUringConfig::minimal();
1061 let wal = WalWriter::new(&path, config).unwrap();
1062
1063 wal.append(b"direct").unwrap();
1065
1066 wal.append_batched(b"batch1").unwrap();
1068 wal.append_batched(b"batch2").unwrap();
1069
1070 assert!(wal.has_pending_batch());
1071 assert_eq!(wal.pending_batch_ops(), 2);
1072 assert_eq!(wal.pending_batch_bytes(), 12);
1073
1074 wal.flush_batch().unwrap();
1076
1077 assert!(!wal.has_pending_batch());
1078 assert_eq!(wal.pending_batch_bytes(), 0);
1079
1080 assert!(wal.size() >= 18); }
1083
1084 #[test]
1085 fn test_wal_writer_batched_checksum() {
1086 let dir = tempdir().unwrap();
1087 let path = dir.path().join("wal_batch_crc.log");
1088
1089 let config = IoUringConfig::minimal();
1090 let wal = WalWriter::new(&path, config).unwrap();
1091
1092 wal.append_with_checksum_batched(b"data1").unwrap();
1094 wal.append_with_checksum_batched(b"data2").unwrap();
1095
1096 assert!(wal.has_pending_batch());
1097 wal.sync().unwrap(); assert!(!wal.has_pending_batch());
1100 assert!(wal.size() > 0);
1101 }
1102
1103 #[test]
1104 fn test_wal_writer_auto_flush() {
1105 let dir = tempdir().unwrap();
1106 let path = dir.path().join("wal_auto_flush.log");
1107
1108 let mut config = IoUringConfig::minimal();
1110 config.registered_buffers = 1;
1111 config.buffer_size = 10; let wal = WalWriter::new(&path, config).unwrap();
1114 assert_eq!(wal.max_batch_bytes(), 10);
1115
1116 wal.append_batched(b"hello").unwrap(); assert!(wal.has_pending_batch());
1119
1120 wal.append_batched(b"world!").unwrap();
1122 assert!(!wal.has_pending_batch()); }
1124
1125 #[test]
1126 fn test_io_batch_stats() {
1127 let mut batch = IoBatch::new();
1128
1129 let stats = batch.stats();
1131 assert_eq!(stats.total_ops, 0);
1132 assert_eq!(stats.write_ops, 0);
1133 assert_eq!(stats.read_ops, 0);
1134 assert_eq!(stats.sync_ops, 0);
1135
1136 batch.write(0, Bytes::from_static(b"hello"));
1138 batch.write(5, Bytes::from_static(b"world"));
1139 batch.read(100, 50);
1140 batch.read(200, 100);
1141 batch.sync();
1142
1143 let stats = batch.stats();
1144 assert_eq!(stats.total_ops, 5);
1145 assert_eq!(stats.write_ops, 2);
1146 assert_eq!(stats.read_ops, 2);
1147 assert_eq!(stats.sync_ops, 1);
1148 assert_eq!(stats.write_bytes, 10); assert_eq!(stats.read_bytes, 150); }
1151
1152 #[test]
1153 fn test_io_batch_pending_ops() {
1154 let mut batch = IoBatch::new();
1155
1156 batch.write(0, Bytes::from_static(b"data1"));
1157 batch.write(5, Bytes::from_static(b"data2"));
1158 batch.read(100, 50);
1159 batch.sync();
1160
1161 assert_eq!(batch.pending_write_ops(), 2);
1162 assert_eq!(batch.pending_read_ops(), 1);
1163 }
1164}