1#![allow(dead_code)]
7#![allow(missing_docs)]
8
9use crate::error::{IoError, Result};
10use memmap2::{Mmap, MmapMut, MmapOptions};
11use scirs2_core::ndarray::{Array1, ArrayView, ArrayView1, ArrayViewMut, IxDyn};
12use scirs2_core::parallel_ops::*;
13use scirs2_core::simd_ops::{PlatformCapabilities, SimdUnifiedOps};
14use std::fs::{File, OpenOptions};
15use std::marker::PhantomData;
16use std::mem;
17use std::path::Path;
18use std::slice;
19
20#[cfg(feature = "async")]
21use tokio::sync::Semaphore;
22
23pub struct ZeroCopyArrayView<'a, T> {
25 mmap: &'a Mmap,
26 shape: Vec<usize>,
27 _phantom: PhantomData<T>,
28}
29
30impl<'a, T> ZeroCopyArrayView<'a, T>
31where
32 T: 'static + Copy,
33{
34 pub fn apply_simd_operation<F>(&self, op: F) -> Result<Vec<T>>
36 where
37 F: Fn(&[T]) -> Vec<T>,
38 {
39 let slice = self.as_slice();
40 Ok(op(slice))
41 }
42
43 pub fn new(mmap: &'a Mmap, shape: Vec<usize>) -> Result<Self> {
45 let expected_bytes = shape.iter().product::<usize>() * mem::size_of::<T>();
46 if mmap.len() < expected_bytes {
47 return Err(IoError::FormatError(format!(
48 "Memory map too small: expected {} bytes, got {}",
49 expected_bytes,
50 mmap.len()
51 )));
52 }
53
54 Ok(Self {
55 mmap,
56 shape,
57 _phantom: PhantomData,
58 })
59 }
60
61 pub fn as_array_view(&self) -> ArrayView<T, IxDyn> {
63 let ptr = self.mmap.as_ptr() as *const T;
64 let slice = unsafe { slice::from_raw_parts(ptr, self.shape.iter().product()) };
65
66 ArrayView::from_shape(IxDyn(&self.shape), slice).expect("Shape mismatch in zero-copy view")
67 }
68
69 pub fn as_slice(&self) -> &[T] {
71 let ptr = self.mmap.as_ptr() as *const T;
72 let len = self.shape.iter().product();
73 unsafe { slice::from_raw_parts(ptr, len) }
74 }
75}
76
77pub struct ZeroCopyArrayViewMut<'a, T> {
79 mmap: &'a mut MmapMut,
80 shape: Vec<usize>,
81 _phantom: PhantomData<T>,
82}
83
84impl<'a, T> ZeroCopyArrayViewMut<'a, T>
85where
86 T: 'static + Copy,
87{
88 pub fn apply_simd_operation_inplace<F>(&mut self, op: F) -> Result<()>
90 where
91 F: Fn(&mut [T]),
92 {
93 let slice = self.as_slice_mut();
94 op(slice);
95 Ok(())
96 }
97
98 pub fn new(mmap: &'a mut MmapMut, shape: Vec<usize>) -> Result<Self> {
100 let expected_bytes = shape.iter().product::<usize>() * mem::size_of::<T>();
101 if mmap.len() < expected_bytes {
102 return Err(IoError::FormatError(format!(
103 "Memory map too small: expected {} bytes, got {}",
104 expected_bytes,
105 mmap.len()
106 )));
107 }
108
109 Ok(Self {
110 mmap,
111 shape,
112 _phantom: PhantomData,
113 })
114 }
115
116 pub fn as_array_view_mut(&mut self) -> ArrayViewMut<T, IxDyn> {
118 let ptr = self.mmap.as_mut_ptr() as *mut T;
119 let slice = unsafe { slice::from_raw_parts_mut(ptr, self.shape.iter().product()) };
120
121 ArrayViewMut::from_shape(IxDyn(&self.shape), slice)
122 .expect("Shape mismatch in zero-copy view")
123 }
124
125 pub fn as_slice_mut(&mut self) -> &mut [T] {
127 let ptr = self.mmap.as_mut_ptr() as *mut T;
128 let len = self.shape.iter().product();
129 unsafe { slice::from_raw_parts_mut(ptr, len) }
130 }
131}
132
133pub struct ZeroCopyReader {
135 file: File,
136 mmap: Option<Mmap>,
137}
138
139impl ZeroCopyReader {
140 pub fn new<P: AsRef<Path>>(path: P) -> Result<Self> {
142 let file = File::open(path).map_err(|e| IoError::FileError(e.to_string()))?;
143 Ok(Self { file, mmap: None })
144 }
145
146 pub fn map_file(&mut self) -> Result<&Mmap> {
148 if self.mmap.is_none() {
149 let mmap = unsafe {
150 MmapOptions::new()
151 .map(&self.file)
152 .map_err(|e| IoError::FileError(e.to_string()))?
153 };
154 self.mmap = Some(mmap);
155 }
156 Ok(self.mmap.as_ref().unwrap())
157 }
158
159 pub fn read_array<T>(&mut self, shape: Vec<usize>) -> Result<ZeroCopyArrayView<T>>
161 where
162 T: 'static + Copy,
163 {
164 let mmap = self.map_file()?;
165 ZeroCopyArrayView::new(mmap, shape)
166 }
167
168 pub fn read_slice(&mut self, offset: usize, len: usize) -> Result<&[u8]> {
170 let mmap = self.map_file()?;
171 if offset + len > mmap.len() {
172 return Err(IoError::Other(
173 "Slice extends beyond file boundaries".to_string(),
174 ));
175 }
176 Ok(&mmap[offset..offset + len])
177 }
178}
179
180pub struct ZeroCopyWriter {
182 file: File,
183 mmap: Option<MmapMut>,
184}
185
186impl ZeroCopyWriter {
187 pub fn new<P: AsRef<Path>>(path: P, size: usize) -> Result<Self> {
189 let file = OpenOptions::new()
190 .read(true)
191 .write(true)
192 .create(true)
193 .truncate(true)
194 .open(path)
195 .map_err(|e| IoError::FileError(e.to_string()))?;
196
197 file.set_len(size as u64)
199 .map_err(|e| IoError::FileError(e.to_string()))?;
200
201 Ok(Self { file, mmap: None })
202 }
203
204 pub fn map_file_mut(&mut self) -> Result<&mut MmapMut> {
206 if self.mmap.is_none() {
207 let mmap = unsafe {
208 MmapOptions::new()
209 .map_mut(&self.file)
210 .map_err(|e| IoError::FileError(e.to_string()))?
211 };
212 self.mmap = Some(mmap);
213 }
214 Ok(self.mmap.as_mut().unwrap())
215 }
216
217 pub fn write_array<T>(&mut self, shape: Vec<usize>) -> Result<ZeroCopyArrayViewMut<T>>
219 where
220 T: 'static + Copy,
221 {
222 let mmap = self.map_file_mut()?;
223 ZeroCopyArrayViewMut::new(mmap, shape)
224 }
225
226 pub fn write_slice(&mut self, offset: usize, data: &[u8]) -> Result<()> {
228 let mmap = self.map_file_mut()?;
229 if offset + data.len() > mmap.len() {
230 return Err(IoError::Other(
231 "Write extends beyond file boundaries".to_string(),
232 ));
233 }
234 mmap[offset..offset + data.len()].copy_from_slice(data);
235 Ok(())
236 }
237
238 pub fn flush(&mut self) -> Result<()> {
240 if let Some(ref mut mmap) = self.mmap {
241 mmap.flush()
242 .map_err(|e| IoError::FileError(e.to_string()))?;
243 }
244 Ok(())
245 }
246}
247
248pub struct ZeroCopyCsvReader<'a> {
250 data: &'a [u8],
251 delimiter: u8,
252}
253
254impl<'a> ZeroCopyCsvReader<'a> {
255 pub fn new(data: &'a [u8], delimiter: u8) -> Self {
257 Self { data, delimiter }
258 }
259
260 pub fn lines(&self) -> ZeroCopyLineIterator<'a> {
262 ZeroCopyLineIterator {
263 data: self.data,
264 pos: 0,
265 }
266 }
267
268 pub fn parse_line(&self, line: &'a [u8]) -> Vec<&'a str> {
270 let mut fields = Vec::new();
271 let mut start = 0;
272
273 for (i, &byte) in line.iter().enumerate() {
274 if byte == self.delimiter {
275 if let Ok(field) = std::str::from_utf8(&line[start..i]) {
276 fields.push(field);
277 }
278 start = i + 1;
279 }
280 }
281
282 if start < line.len() {
284 if let Ok(field) = std::str::from_utf8(&line[start..]) {
285 fields.push(field);
286 }
287 }
288
289 fields
290 }
291}
292
293pub struct ZeroCopyLineIterator<'a> {
295 data: &'a [u8],
296 pos: usize,
297}
298
299impl<'a> Iterator for ZeroCopyLineIterator<'a> {
300 type Item = &'a [u8];
301
302 fn next(&mut self) -> Option<Self::Item> {
303 if self.pos >= self.data.len() {
304 return None;
305 }
306
307 let start = self.pos;
308 while self.pos < self.data.len() && self.data[self.pos] != b'\n' {
309 self.pos += 1;
310 }
311
312 let line = &self.data[start..self.pos];
313
314 if self.pos < self.data.len() {
316 self.pos += 1;
317 }
318
319 Some(line)
320 }
321}
322
323pub struct ZeroCopyBinaryReader<'a> {
325 data: &'a [u8],
326 pos: usize,
327}
328
329impl<'a> ZeroCopyBinaryReader<'a> {
330 pub fn new(data: &'a [u8]) -> Self {
332 Self { data, pos: 0 }
333 }
334
335 pub fn read<T: Copy>(&mut self) -> Result<T> {
337 let size = mem::size_of::<T>();
338 if self.pos + size > self.data.len() {
339 return Err(IoError::Other("Not enough data".to_string()));
340 }
341
342 let value = unsafe {
343 let ptr = self.data.as_ptr().add(self.pos) as *const T;
344 ptr.read_unaligned()
345 };
346
347 self.pos += size;
348 Ok(value)
349 }
350
351 pub fn read_slice(&mut self, len: usize) -> Result<&'a [u8]> {
353 if self.pos + len > self.data.len() {
354 return Err(IoError::Other("Not enough data".to_string()));
355 }
356
357 let slice = &self.data[self.pos..self.pos + len];
358 self.pos += len;
359 Ok(slice)
360 }
361
362 pub fn remaining(&self) -> &'a [u8] {
364 &self.data[self.pos..]
365 }
366
367 pub fn read_f32_array_simd(&mut self, count: usize) -> Result<Array1<f32>> {
369 let bytes_needed = count * mem::size_of::<f32>();
370 if self.pos + bytes_needed > self.data.len() {
371 return Err(IoError::Other("Not enough data for f32 array".to_string()));
372 }
373
374 let slice =
375 unsafe { slice::from_raw_parts(self.data.as_ptr().add(self.pos) as *const f32, count) };
376
377 self.pos += bytes_needed;
378 Ok(Array1::from_vec(slice.to_vec()))
379 }
380
381 pub fn read_f64_array_simd(&mut self, count: usize) -> Result<Array1<f64>> {
383 let bytes_needed = count * mem::size_of::<f64>();
384 if self.pos + bytes_needed > self.data.len() {
385 return Err(IoError::Other("Not enough data for f64 array".to_string()));
386 }
387
388 let slice =
389 unsafe { slice::from_raw_parts(self.data.as_ptr().add(self.pos) as *const f64, count) };
390
391 self.pos += bytes_needed;
392 Ok(Array1::from_vec(slice.to_vec()))
393 }
394}
395
396pub mod simd_zero_copy {
398 use super::*;
399 use scirs2_core::ndarray::{Array2, ArrayView2};
400
401 pub struct SimdZeroCopyOpsF32;
403
404 impl SimdZeroCopyOpsF32 {
405 pub fn add_mmap(a_mmap: &Mmap, b_mmap: &Mmap, shape: &[usize]) -> Result<Array1<f32>> {
407 if a_mmap.len() != b_mmap.len() {
408 return Err(IoError::Other(
409 "Memory maps must have same size".to_string(),
410 ));
411 }
412
413 let count = shape.iter().product::<usize>();
414 let expected_bytes = count * mem::size_of::<f32>();
415
416 if a_mmap.len() < expected_bytes {
417 return Err(IoError::Other("Memory map too small for shape".to_string()));
418 }
419
420 let a_slice = unsafe { slice::from_raw_parts(a_mmap.as_ptr() as *const f32, count) };
422 let b_slice = unsafe { slice::from_raw_parts(b_mmap.as_ptr() as *const f32, count) };
423
424 let a_view = ArrayView1::from_shape(count, a_slice).unwrap();
425 let b_view = ArrayView1::from_shape(count, b_slice).unwrap();
426
427 let result: Array1<f32> = a_view
429 .iter()
430 .zip(b_view.iter())
431 .map(|(&a, &b)| a + b)
432 .collect();
433 Ok(result)
434 }
435
436 pub fn scalar_mul_mmap(mmap: &Mmap, scalar: f32, shape: &[usize]) -> Result<Array1<f32>> {
438 let count = shape.iter().product::<usize>();
439 let expected_bytes = count * mem::size_of::<f32>();
440
441 if mmap.len() < expected_bytes {
442 return Err(IoError::Other("Memory map too small for shape".to_string()));
443 }
444
445 let slice = unsafe { slice::from_raw_parts(mmap.as_ptr() as *const f32, count) };
446
447 let view = ArrayView1::from_shape(count, slice).unwrap();
448
449 let result: Array1<f32> = view.iter().map(|&x| x * scalar).collect();
451 Ok(result)
452 }
453
454 pub fn dot_mmap(a_mmap: &Mmap, b_mmap: &Mmap, len: usize) -> Result<f32> {
456 let expected_bytes = len * mem::size_of::<f32>();
457
458 if a_mmap.len() < expected_bytes || b_mmap.len() < expected_bytes {
459 return Err(IoError::Other("Memory maps too small".to_string()));
460 }
461
462 let a_slice = unsafe { slice::from_raw_parts(a_mmap.as_ptr() as *const f32, len) };
463 let b_slice = unsafe { slice::from_raw_parts(b_mmap.as_ptr() as *const f32, len) };
464
465 let a_view = ArrayView1::from_shape(len, a_slice).unwrap();
466 let b_view = ArrayView1::from_shape(len, b_slice).unwrap();
467
468 let result: f32 = a_view.iter().zip(b_view.iter()).map(|(&a, &b)| a * b).sum();
470 Ok(result)
471 }
472 }
473
474 pub struct SimdZeroCopyOpsF64;
476
477 impl SimdZeroCopyOpsF64 {
478 pub fn add_mmap(a_mmap: &Mmap, b_mmap: &Mmap, shape: &[usize]) -> Result<Array1<f64>> {
480 if a_mmap.len() != b_mmap.len() {
481 return Err(IoError::Other(
482 "Memory maps must have same size".to_string(),
483 ));
484 }
485
486 let count = shape.iter().product::<usize>();
487 let expected_bytes = count * mem::size_of::<f64>();
488
489 if a_mmap.len() < expected_bytes {
490 return Err(IoError::Other("Memory map too small for shape".to_string()));
491 }
492
493 let a_slice = unsafe { slice::from_raw_parts(a_mmap.as_ptr() as *const f64, count) };
495 let b_slice = unsafe { slice::from_raw_parts(b_mmap.as_ptr() as *const f64, count) };
496
497 let a_view = ArrayView1::from_shape(count, a_slice).unwrap();
498 let b_view = ArrayView1::from_shape(count, b_slice).unwrap();
499
500 let result: Array1<f64> = a_view
502 .iter()
503 .zip(b_view.iter())
504 .map(|(&a, &b)| a + b)
505 .collect();
506 Ok(result)
507 }
508
509 pub fn gemm_mmap(
511 a_mmap: &Mmap,
512 b_mmap: &Mmap,
513 ashape: (usize, usize),
514 bshape: (usize, usize),
515 alpha: f64,
516 beta: f64,
517 ) -> Result<Array2<f64>> {
518 let (m, k1) = ashape;
519 let (k2, n) = bshape;
520
521 if k1 != k2 {
522 return Err(IoError::Other(
523 "Matrix dimensions don't match for multiplication".to_string(),
524 ));
525 }
526
527 let a_expected = m * k1 * mem::size_of::<f64>();
528 let b_expected = k2 * n * mem::size_of::<f64>();
529
530 if a_mmap.len() < a_expected || b_mmap.len() < b_expected {
531 return Err(IoError::Other(
532 "Memory maps too small for matrices".to_string(),
533 ));
534 }
535
536 let a_slice = unsafe { slice::from_raw_parts(a_mmap.as_ptr() as *const f64, m * k1) };
538 let b_slice = unsafe { slice::from_raw_parts(b_mmap.as_ptr() as *const f64, k2 * n) };
539
540 let a_view = ArrayView2::from_shape((m, k1), a_slice).unwrap();
541 let b_view = ArrayView2::from_shape((k2, n), b_slice).unwrap();
542
543 let mut c = Array2::<f64>::zeros((m, n));
544
545 f64::simd_gemm(alpha, &a_view, &b_view, beta, &mut c);
547
548 Ok(c)
549 }
550 }
551}
552
553pub struct AsyncZeroCopyProcessor<T> {
555 reader: ZeroCopyReader,
556 chunk_size: usize,
557 numa_node: Option<usize>,
558 memory_policy: NumaMemoryPolicy,
559 async_config: AsyncConfig,
560 _phantom: PhantomData<T>,
561}
562
563#[derive(Debug, Clone, Copy)]
565pub enum NumaMemoryPolicy {
566 Local,
568 Bind(usize),
570 Interleave,
572 Default,
574}
575
576#[derive(Debug, Clone)]
578pub struct AsyncConfig {
579 pub max_concurrent_operations: usize,
580 pub prefetch_distance: usize,
581 pub enable_readahead: bool,
582 pub readahead_size: usize,
583 pub use_io_uring: bool,
584 pub memory_advice: MemoryAdvice,
585}
586
587impl Default for AsyncConfig {
588 fn default() -> Self {
589 Self {
590 max_concurrent_operations: 4,
591 prefetch_distance: 8,
592 enable_readahead: true,
593 readahead_size: 64 * 1024, use_io_uring: cfg!(target_os = "linux"),
595 memory_advice: MemoryAdvice::Sequential,
596 }
597 }
598}
599
600#[derive(Debug, Clone, Copy)]
602pub enum MemoryAdvice {
603 Normal,
604 Sequential,
605 Random,
606 WillNeed,
607 DontNeed,
608}
609
610impl<T: Copy + Send + Sync + 'static> AsyncZeroCopyProcessor<T> {
611 pub fn new<P: AsRef<Path>>(path: P, chunk_size: usize, config: AsyncConfig) -> Result<Self> {
613 let reader = ZeroCopyReader::new(path)?;
614 let numa_node = Self::detect_optimal_numa_node();
615
616 Ok(Self {
617 reader,
618 chunk_size,
619 numa_node,
620 memory_policy: NumaMemoryPolicy::Local,
621 async_config: config,
622 _phantom: PhantomData,
623 })
624 }
625
626 pub fn with_numa_binding<P: AsRef<Path>>(
628 path: P,
629 chunk_size: usize,
630 numa_node: usize,
631 config: AsyncConfig,
632 ) -> Result<Self> {
633 let reader = ZeroCopyReader::new(path)?;
634
635 Ok(Self {
636 reader,
637 chunk_size,
638 numa_node: Some(numa_node),
639 memory_policy: NumaMemoryPolicy::Bind(numa_node),
640 async_config: config,
641 _phantom: PhantomData,
642 })
643 }
644
645 fn detect_optimal_numa_node() -> Option<usize> {
647 #[cfg(target_os = "linux")]
648 {
649 use std::process;
652 Some(process::id() as usize % 2) }
654 #[cfg(not(target_os = "linux"))]
655 {
656 None
657 }
658 }
659
660 fn apply_memory_advice(&self, addr: *const u8, len: usize) -> Result<()> {
662 match self.async_config.memory_advice {
665 MemoryAdvice::Normal => {
666 }
668 MemoryAdvice::Sequential => {
669 }
671 MemoryAdvice::Random => {
672 }
674 MemoryAdvice::WillNeed => {
675 }
677 MemoryAdvice::DontNeed => {
678 }
680 }
681
682 let _ = (addr, len);
684
685 Ok(())
686 }
687
688 pub async fn process_async<F, R>(&mut self, shape: Vec<usize>, processor: F) -> Result<Vec<R>>
690 where
691 F: Fn(&[T]) -> R + Send + Sync + Clone + 'static,
692 R: Send + 'static,
693 {
694 let _capabilities = PlatformCapabilities::detect();
695
696 let numa_node = self.numa_node;
698 let memory_advice = self.async_config.memory_advice;
699 let memory_policy = self.memory_policy;
700 let _max_concurrent_operations = self.async_config.max_concurrent_operations;
701 let enable_readahead = self.async_config.enable_readahead;
702 let aligned_chunk_size = self.calculate_aligned_chunk_size();
703
704 let mmap = self.reader.map_file()?;
705
706 let total_elements: usize = shape.iter().product();
707 let element_size = mem::size_of::<T>();
708 let total_bytes = total_elements * element_size;
709
710 if mmap.len() < total_bytes {
711 return Err(IoError::Other(
712 "File too small for specified shape".to_string(),
713 ));
714 }
715
716 apply_memory_advice_static(mmap.as_ptr(), mmap.len(), memory_advice)?;
718
719 if let Some(numa_node) = numa_node {
721 configure_numa_policy_static(numa_node, memory_policy)?;
722 }
723
724 let ptr = mmap.as_ptr() as *const T;
725 let data_slice = unsafe { slice::from_raw_parts(ptr, total_elements) };
726
727 let chunks: Vec<_> = data_slice.chunks(aligned_chunk_size).collect();
729 let num_chunks = chunks.len();
730
731 #[cfg(feature = "async")]
733 let semaphore =
734 std::sync::Arc::new(tokio::sync::Semaphore::new(_max_concurrent_operations));
735
736 let tasks: Vec<_> = chunks
737 .into_iter()
738 .enumerate()
739 .map(|(idx, chunk)| {
740 let processor = processor.clone();
741 #[cfg(feature = "async")]
742 let permit = semaphore.clone();
743 let chunk_data = chunk.to_vec();
744 let _num_chunks_local = num_chunks;
745 let _enable_readahead_local = enable_readahead;
746
747 #[cfg(feature = "async")]
748 {
749 tokio::spawn(async move {
750 let _permit = permit.acquire().await.unwrap();
751
752 if idx + 1 < _num_chunks_local && _enable_readahead_local {
754 }
756
757 (idx, processor(&chunk_data))
758 })
759 }
760 #[cfg(not(feature = "async"))]
761 {
762 std::future::ready((idx, processor(&chunk_data)))
764 }
765 })
766 .collect();
767
768 let mut results: Vec<Option<R>> = (0..tasks.len()).map(|_| None).collect();
770
771 #[cfg(feature = "async")]
772 {
773 for task in tasks {
774 let (idx, result) = task
775 .await
776 .map_err(|e| IoError::Other(format!("Async task failed: {}", e)))?;
777 results[idx] = Some(result);
778 }
779 }
780
781 #[cfg(not(feature = "async"))]
782 {
783 for task in tasks {
784 let (idx, result) = task.await;
785 results[idx] = Some(result);
786 }
787 }
788
789 Ok(results.into_iter().map(|r| r.unwrap()).collect())
790 }
791
792 fn configure_numa_policy(&self, numanode: usize) -> Result<()> {
794 #[cfg(target_os = "linux")]
795 {
796 match self.memory_policy {
797 NumaMemoryPolicy::Bind(_node) => {
798 eprintln!("Binding memory to NUMA _node {_node}");
801 }
802 NumaMemoryPolicy::Interleave => {
803 eprintln!("Enabling NUMA interleaving");
805 }
806 NumaMemoryPolicy::Local => {
807 eprintln!("Using local NUMA _node {numanode}");
809 }
810 NumaMemoryPolicy::Default => {
811 }
813 }
814 }
815
816 Ok(())
817 }
818
819 fn calculate_aligned_chunk_size(&self) -> usize {
821 let base_chunk_size = self.chunk_size;
822 let page_size = 4096; let cache_line_size = 64; let aligned_to_page = ((base_chunk_size + page_size - 1) / page_size) * page_size;
827
828 ((aligned_to_page + cache_line_size - 1) / cache_line_size) * cache_line_size
830 }
831
832 pub fn get_numa_info(&self) -> NumaTopologyInfo {
834 NumaTopologyInfo {
835 current_node: self.numa_node,
836 total_nodes: Self::get_total_numa_nodes(),
837 memory_policy: self.memory_policy,
838 node_distances: Self::get_numa_distances(),
839 }
840 }
841
842 fn get_total_numa_nodes() -> usize {
844 #[cfg(target_os = "linux")]
845 {
846 std::fs::read_dir("/sys/devices/system/node/")
848 .map(|entries| {
849 entries
850 .filter_map(|entry| entry.ok())
851 .filter(|entry| entry.file_name().to_string_lossy().starts_with("node"))
852 .count()
853 })
854 .unwrap_or(1)
855 }
856 #[cfg(not(target_os = "linux"))]
857 {
858 1 }
860 }
861
862 fn get_numa_distances() -> Vec<Vec<u8>> {
864 #[cfg(target_os = "linux")]
865 {
866 let num_nodes = Self::get_total_numa_nodes();
869 let mut distances = vec![vec![0u8; num_nodes]; num_nodes];
870
871 for (i, distance_row) in distances.iter_mut().enumerate().take(num_nodes) {
872 for (j, distance_cell) in distance_row.iter_mut().enumerate().take(num_nodes) {
873 *distance_cell = if i == j { 10 } else { 20 }; }
875 }
876
877 distances
878 }
879 #[cfg(not(target_os = "linux"))]
880 {
881 vec![vec![10]]
882 }
883 }
884}
885
886#[derive(Debug, Clone)]
888pub struct NumaTopologyInfo {
889 pub current_node: Option<usize>,
890 pub total_nodes: usize,
891 pub memory_policy: NumaMemoryPolicy,
892 pub node_distances: Vec<Vec<u8>>,
893}
894
895pub struct ZeroCopyStreamProcessor<T> {
897 reader: ZeroCopyReader,
898 chunk_size: usize,
899 _phantom: PhantomData<T>,
900}
901
902impl<T: Copy + 'static> ZeroCopyStreamProcessor<T> {
903 pub fn new<P: AsRef<Path>>(path: P, chunk_size: usize) -> Result<Self> {
905 let reader = ZeroCopyReader::new(path)?;
906 Ok(Self {
907 reader,
908 chunk_size,
909 _phantom: PhantomData,
910 })
911 }
912
913 pub fn process_parallel<F, R>(&mut self, shape: Vec<usize>, processor: F) -> Result<Vec<R>>
915 where
916 F: Fn(&[T]) -> R + Send + Sync,
917 R: Send,
918 T: Send + Sync,
919 {
920 let capabilities = PlatformCapabilities::detect();
921 let mmap = self.reader.map_file()?;
922
923 let total_elements: usize = shape.iter().product();
924 let element_size = mem::size_of::<T>();
925 let total_bytes = total_elements * element_size;
926
927 if mmap.len() < total_bytes {
928 return Err(IoError::Other(
929 "File too small for specified shape".to_string(),
930 ));
931 }
932
933 let ptr = mmap.as_ptr() as *const T;
935 let data_slice = unsafe { slice::from_raw_parts(ptr, total_elements) };
936
937 if capabilities.simd_available && total_elements > 10000 {
938 let results: Vec<R> = data_slice
940 .chunks(self.chunk_size)
941 .collect::<Vec<_>>()
942 .into_par_iter()
943 .map(&processor)
944 .collect();
945
946 Ok(results)
947 } else {
948 let results: Vec<R> = data_slice.chunks(self.chunk_size).map(processor).collect();
950
951 Ok(results)
952 }
953 }
954}
955
956#[allow(dead_code)]
958fn apply_memory_advice_static(
959 addr: *const u8,
960 len: usize,
961 memory_advice: MemoryAdvice,
962) -> Result<()> {
963 match memory_advice {
966 MemoryAdvice::Normal => {
967 }
969 MemoryAdvice::Sequential => {
970 }
972 MemoryAdvice::Random => {
973 }
975 MemoryAdvice::WillNeed => {
976 }
978 MemoryAdvice::DontNeed => {
979 }
981 }
982
983 let _ = (addr, len);
985
986 Ok(())
987}
988
989#[allow(dead_code)]
991fn configure_numa_policy_static(numa_node: usize, memory_policy: NumaMemoryPolicy) -> Result<()> {
992 #[cfg(target_os = "linux")]
993 {
994 match memory_policy {
995 NumaMemoryPolicy::Bind(_node) => {
996 eprintln!("Binding memory to NUMA _node {_node}");
999 }
1000 NumaMemoryPolicy::Interleave => {
1001 eprintln!("Enabling NUMA interleaving");
1003 }
1004 NumaMemoryPolicy::Local => {
1005 eprintln!("Using local NUMA _node {numa_node}");
1007 }
1008 NumaMemoryPolicy::Default => {
1009 }
1011 }
1012 }
1013
1014 Ok(())
1015}
1016
1017#[cfg(test)]
1018mod tests {
1019 use super::*;
1020 use std::io::Write;
1021 use tempfile::NamedTempFile;
1022
1023 #[test]
1024 fn test_zero_copy_reader() -> Result<()> {
1025 let mut file = NamedTempFile::new().map_err(|e| IoError::FileError(e.to_string()))?;
1027 let data: Vec<f64> = (0..100).map(|i| i as f64).collect();
1028 let bytes = unsafe { slice::from_raw_parts(data.as_ptr() as *const u8, data.len() * 8) };
1029 file.write_all(bytes)
1030 .map_err(|e| IoError::FileError(e.to_string()))?;
1031
1032 let mut reader = ZeroCopyReader::new(file.path())?;
1034 let array_view = reader.read_array::<f64>(vec![10, 10])?;
1035 let view = array_view.as_array_view();
1036
1037 assert_eq!(view.shape(), &[10, 10]);
1038 assert_eq!(view[[0, 0]], 0.0);
1039 assert_eq!(view[[9, 9]], 99.0);
1040
1041 Ok(())
1042 }
1043
1044 #[test]
1045 fn test_zero_copy_csv() {
1046 let data = b"a,b,c\n1,2,3\n4,5,6";
1047 let reader = ZeroCopyCsvReader::new(data, b',');
1048
1049 let lines: Vec<_> = reader.lines().collect();
1050 assert_eq!(lines.len(), 3);
1051
1052 let fields = reader.parse_line(lines[0]);
1053 assert_eq!(fields, vec!["a", "b", "c"]);
1054 }
1055
1056 #[test]
1057 fn test_simd_zero_copy_add() -> Result<()> {
1058 let mut file1 = NamedTempFile::new().map_err(|e| IoError::FileError(e.to_string()))?;
1060 let mut file2 = NamedTempFile::new().map_err(|e| IoError::FileError(e.to_string()))?;
1061
1062 let data1: Vec<f32> = (0..100).map(|i| i as f32).collect();
1063 let data2: Vec<f32> = (0..100).map(|i| (i * 2) as f32).collect();
1064
1065 let bytes1 = unsafe { slice::from_raw_parts(data1.as_ptr() as *const u8, data1.len() * 4) };
1066 let bytes2 = unsafe { slice::from_raw_parts(data2.as_ptr() as *const u8, data2.len() * 4) };
1067
1068 file1
1069 .write_all(bytes1)
1070 .map_err(|e| IoError::FileError(e.to_string()))?;
1071 file2
1072 .write_all(bytes2)
1073 .map_err(|e| IoError::FileError(e.to_string()))?;
1074
1075 let mmap1 = unsafe {
1077 MmapOptions::new()
1078 .map(&file1)
1079 .map_err(|e| IoError::FileError(e.to_string()))?
1080 };
1081 let mmap2 = unsafe {
1082 MmapOptions::new()
1083 .map(&file2)
1084 .map_err(|e| IoError::FileError(e.to_string()))?
1085 };
1086
1087 let result = simd_zero_copy::SimdZeroCopyOpsF32::add_mmap(&mmap1, &mmap2, &[100])?;
1089
1090 assert_eq!(result.len(), 100);
1092 assert_eq!(result[0], 0.0); assert_eq!(result[50], 150.0); assert_eq!(result[99], 297.0); Ok(())
1097 }
1098
1099 #[test]
1100 fn test_async_config() {
1101 let config = AsyncConfig::default();
1102 assert_eq!(config.max_concurrent_operations, 4);
1103 assert!(config.enable_readahead);
1104 assert_eq!(config.readahead_size, 64 * 1024);
1105 }
1106
1107 #[test]
1108 fn test_numa_topology_info() {
1109 let total_nodes = AsyncZeroCopyProcessor::<f64>::get_total_numa_nodes();
1111 assert!(total_nodes >= 1);
1112
1113 let distances = AsyncZeroCopyProcessor::<f64>::get_numa_distances();
1114 assert_eq!(distances.len(), total_nodes);
1115 if !distances.is_empty() {
1116 assert_eq!(distances[0].len(), total_nodes);
1117 }
1118 }
1119
1120 #[test]
1121 fn test_memory_advice() {
1122 let advice = MemoryAdvice::Sequential;
1124 match advice {
1125 MemoryAdvice::Sequential => {} _ => panic!("Unexpected memory advice"),
1127 }
1128 }
1129
1130 #[test]
1131 fn test_numa_memory_policy() {
1132 let policy = NumaMemoryPolicy::Local;
1134 match policy {
1135 NumaMemoryPolicy::Local => {} _ => panic!("Unexpected NUMA policy"),
1137 }
1138
1139 let bind_policy = NumaMemoryPolicy::Bind(0);
1140 if let NumaMemoryPolicy::Bind(node) = bind_policy {
1141 assert_eq!(node, 0);
1142 }
1143 }
1144
1145 #[cfg(feature = "async")]
1146 #[tokio::test]
1147 async fn test_async_zero_copy_processor() -> Result<()> {
1148 let mut file = NamedTempFile::new().map_err(|e| IoError::FileError(e.to_string()))?;
1150 let data: Vec<f64> = (0..1000).map(|i| i as f64).collect();
1151 let bytes = unsafe { slice::from_raw_parts(data.as_ptr() as *const u8, data.len() * 8) };
1152 file.write_all(bytes)
1153 .map_err(|e| IoError::FileError(e.to_string()))?;
1154
1155 let config = AsyncConfig::default();
1157 let mut processor = AsyncZeroCopyProcessor::new(file.path(), 100, config)?;
1158
1159 let shape = vec![1000];
1160 let results = processor
1161 .process_async(shape, |chunk: &[f64]| chunk.iter().sum::<f64>())
1162 .await?;
1163
1164 assert!(!results.is_empty());
1165
1166 let numa_info = processor.get_numa_info();
1168 assert!(numa_info.total_nodes >= 1);
1169
1170 Ok(())
1171 }
1172}