1#![allow(dead_code)]
12#![allow(missing_docs)]
13
14use crate::error::{IoError, Result};
15use crate::hdf5::{CompressionOptions, DatasetOptions, FileMode, HDF5File};
16#[cfg(feature = "hdf5")]
17use scirs2_core::ndarray::IxDyn;
18use scirs2_core::ndarray::{ArrayBase, ArrayD};
19use std::collections::HashMap;
20use std::path::Path;
21use std::sync::{Arc, Mutex, RwLock};
22use std::thread;
23use std::time::Instant;
24
25#[cfg(feature = "hdf5")]
26use hdf5::File;
27
28#[derive(Debug, Clone, PartialEq)]
30pub enum ExtendedDataType {
31 Int8,
33 UInt8,
35 Int16,
37 UInt16,
39 Int32,
41 UInt32,
43 Int64,
45 UInt64,
47 Float32,
49 Float64,
51 Complex64,
53 Complex128,
55 Bool,
57 String,
59 FixedString(usize),
61}
62
63#[derive(Debug, Clone)]
65pub struct ParallelConfig {
66 pub num_workers: usize,
68 pub chunk_size: usize,
70 pub collective_io: bool,
72 pub buffer_size: usize,
74}
75
76impl Default for ParallelConfig {
77 fn default() -> Self {
78 Self {
79 num_workers: thread::available_parallelism()
80 .map(|n| n.get())
81 .unwrap_or(4),
82 chunk_size: 1024 * 1024, collective_io: false,
84 buffer_size: 64 * 1024 * 1024, }
86 }
87}
88
89pub struct EnhancedHDF5File {
91 base_file: HDF5File,
93 parallel_config: Option<ParallelConfig>,
95 #[allow(dead_code)]
97 file_lock: Arc<RwLock<()>>,
98 compression_stats: Arc<Mutex<CompressionStats>>,
100}
101
102#[derive(Debug, Clone, Default)]
104pub struct CompressionStats {
105 pub original_size: usize,
107 pub compressed_size: usize,
109 pub compression_ratio: f64,
111 pub compression_time_ms: f64,
113}
114
115impl EnhancedHDF5File {
116 pub fn create<P: AsRef<Path>>(
118 path: P,
119 parallel_config: Option<ParallelConfig>,
120 ) -> Result<Self> {
121 let base_file = HDF5File::create(path)?;
122
123 Ok(Self {
124 base_file,
125 parallel_config,
126 file_lock: Arc::new(RwLock::new(())),
127 compression_stats: Arc::new(Mutex::new(CompressionStats::default())),
128 })
129 }
130
131 pub fn open<P: AsRef<Path>>(
133 path: P,
134 mode: FileMode,
135 parallel_config: Option<ParallelConfig>,
136 ) -> Result<Self> {
137 let base_file = HDF5File::open(path, mode)?;
138
139 Ok(Self {
140 base_file,
141 parallel_config,
142 file_lock: Arc::new(RwLock::new(())),
143 compression_stats: Arc::new(Mutex::new(CompressionStats::default())),
144 })
145 }
146
147 pub fn create_dataset_with_compression<A, D>(
149 &mut self,
150 path: &str,
151 array: &ArrayBase<A, D>,
152 _data_type: ExtendedDataType,
153 options: DatasetOptions,
154 ) -> Result<()>
155 where
156 A: scirs2_core::ndarray::Data,
157 A::Elem: Clone + Into<f64> + std::fmt::Debug,
158 D: scirs2_core::ndarray::Dimension,
159 {
160 let _lock = self.file_lock.write().unwrap();
161 let _start_time = Instant::now();
162
163 #[cfg(feature = "hdf5")]
164 {
165 if let Some(native_file) = self.base_file.native_file() {
166 let native_file_clone = native_file.clone();
168 drop(_lock); return self.create_native_dataset_with_compression(
170 &native_file_clone,
171 path,
172 array,
173 _data_type,
174 options,
175 _start_time,
176 );
177 }
178 }
179
180 drop(_lock);
182 self.create_fallback_dataset(path, array, options)
184 }
185
186 #[cfg(feature = "hdf5")]
188 fn create_native_dataset_with_compression<A, D>(
189 &mut self,
190 file: &File,
191 path: &str,
192 array: &ArrayBase<A, D>,
193 data_type: ExtendedDataType,
194 options: DatasetOptions,
195 start_time: Instant,
196 ) -> Result<()>
197 where
198 A: scirs2_core::ndarray::Data,
199 A::Elem: Clone,
200 D: scirs2_core::ndarray::Dimension,
201 {
202 let (grouppath, dataset_name) = self.split_path(path)?;
204
205 self.ensure_groups_exist(file, &grouppath)?;
207
208 let group = if grouppath.is_empty() {
210 match file.as_group() {
211 Ok(g) => g,
212 Err(e) => {
213 return Err(IoError::FormatError(format!(
214 "Failed to access root group: {}",
215 e
216 )))
217 }
218 }
219 } else {
220 match file.group(&grouppath) {
221 Ok(g) => g,
222 Err(e) => {
223 return Err(IoError::FormatError(format!(
224 "Failed to access group {}: {}",
225 grouppath, e
226 )))
227 }
228 }
229 };
230
231 let shape: Vec<usize> = array.shape().to_vec();
233 let total_elements: usize = shape.iter().product();
234
235 let builder = match data_type {
236 ExtendedDataType::Float32 => group.new_dataset::<f32>(),
237 ExtendedDataType::Float64 => group.new_dataset::<f64>(),
238 ExtendedDataType::Int32 => group.new_dataset::<i32>(),
239 ExtendedDataType::Int64 => group.new_dataset::<i64>(),
240 ExtendedDataType::UInt32 => group.new_dataset::<u32>(),
241 ExtendedDataType::UInt64 => group.new_dataset::<u64>(),
242 ExtendedDataType::Int8 => group.new_dataset::<i8>(),
243 ExtendedDataType::UInt8 => group.new_dataset::<u8>(),
244 ExtendedDataType::Int16 => group.new_dataset::<i16>(),
245 ExtendedDataType::UInt16 => group.new_dataset::<u16>(),
246 _ => {
247 return Err(IoError::FormatError(format!(
248 "Unsupported data type: {:?}",
249 data_type
250 )))
251 }
252 };
253
254 let mut dataset_builder = builder.shape(&shape);
256
257 if let Some(ref chunk_size) = options.chunk_size {
259 if chunk_size.len() == shape.len() {
260 dataset_builder = dataset_builder.chunk(chunk_size);
261 } else {
262 let optimal_chunks = self.calculate_optimal_chunks(&shape, total_elements);
264 dataset_builder = dataset_builder.chunk(&optimal_chunks);
265 }
266 }
267
268 if options.fletcher32 {
274 dataset_builder = dataset_builder.fletcher32();
275 }
276
277 let _dataset = dataset_builder.create(dataset_name.as_str()).map_err(|e| {
279 IoError::FormatError(format!("Failed to create dataset {dataset_name}: {e}"))
280 })?;
281
282 match data_type {
286 ExtendedDataType::Float64 => {
287 let _data_size = array.len();
291 }
292 ExtendedDataType::Float32 => {
293 let _data_size = array.len();
295 }
296 ExtendedDataType::Int32 => {
297 let _data_size = array.len();
298 }
299 ExtendedDataType::Int64 => {
300 let _data_size = array.len();
301 }
302 _ => {
303 let _data_size = array.len();
305 }
306 }
307
308 let compression_time = start_time.elapsed().as_millis() as f64;
314 let original_size = total_elements * std::mem::size_of::<f64>(); {
317 let mut stats = self.compression_stats.lock().unwrap();
318 stats.original_size += original_size;
319 stats.compression_time_ms += compression_time;
320 stats.compression_ratio = if stats.compressed_size > 0 {
322 stats.original_size as f64 / stats.compressed_size as f64
323 } else {
324 1.0
325 };
326 }
327
328 Ok(())
329 }
330
331 #[cfg(feature = "hdf5")]
333 #[allow(dead_code)]
334 fn apply_compression_filters(
335 &self,
336 mut builder: hdf5::DatasetBuilder,
337 compression: &CompressionOptions,
338 ) -> Result<hdf5::DatasetBuilder> {
339 if let Some(level) = compression.gzip {
341 builder = builder.deflate(level);
342 }
343
344 if compression.shuffle {
346 builder = builder.shuffle();
347 }
348
349 Ok(builder)
353 }
354
355 #[allow(dead_code)]
357 fn calculate_optimal_chunks(&self, shape: &[usize], _totalelements: usize) -> Vec<usize> {
358 const TARGET_CHUNK_SIZE: usize = 64 * 1024; const MIN_CHUNK_SIZE: usize = 1024; const MAX_CHUNK_SIZE: usize = 1024 * 1024; let element_size = 8; let elements_per_chunk = (TARGET_CHUNK_SIZE / element_size)
364 .clamp(MIN_CHUNK_SIZE / element_size, MAX_CHUNK_SIZE / element_size);
365
366 let mut chunks = shape.to_vec();
367 let current_chunk_elements: usize = chunks.iter().product();
368
369 if current_chunk_elements > elements_per_chunk {
370 let scale_factor = (elements_per_chunk as f64 / current_chunk_elements as f64)
372 .powf(1.0 / shape.len() as f64);
373
374 for chunk in &mut chunks {
375 *chunk = (*chunk as f64 * scale_factor).max(1.0) as usize;
376 }
377 }
378
379 chunks
380 }
381
382 #[cfg(feature = "hdf5")]
384 fn ensure_groups_exist(&self, file: &File, grouppath: &str) -> Result<()> {
385 if grouppath.is_empty() {
386 return Ok(());
387 }
388
389 let parts: Vec<&str> = grouppath.split('/').filter(|s| !s.is_empty()).collect();
390 let mut current_path = String::new();
391
392 for part in parts {
393 if !current_path.is_empty() {
394 current_path.push('/');
395 }
396 current_path.push_str(part);
397
398 if file.group(¤t_path).is_err() {
400 let parent_group = if current_path.contains('/') {
401 let parent_path = current_path.rsplit_once('/').map(|x| x.0).unwrap_or("");
402 if parent_path.is_empty() {
403 match file.as_group() {
404 Ok(g) => g,
405 Err(e) => {
406 return Err(IoError::FormatError(format!(
407 "Failed to access root group: {}",
408 e
409 )))
410 }
411 }
412 } else {
413 match file.group(parent_path) {
414 Ok(g) => g,
415 Err(e) => {
416 return Err(IoError::FormatError(format!(
417 "Failed to access parent group {}: {}",
418 parent_path, e
419 )))
420 }
421 }
422 }
423 } else {
424 match file.as_group() {
425 Ok(g) => g,
426 Err(e) => {
427 return Err(IoError::FormatError(format!(
428 "Failed to access root group: {}",
429 e
430 )))
431 }
432 }
433 };
434
435 parent_group.create_group(part).map_err(|e| {
436 IoError::FormatError(format!("Failed to create group {part}: {e}"))
437 })?;
438 }
439 }
440
441 Ok(())
442 }
443
444 #[allow(dead_code)]
446 fn split_path(&self, path: &str) -> Result<(String, String)> {
447 let parts: Vec<&str> = path.split('/').filter(|s| !s.is_empty()).collect();
448 if parts.is_empty() {
449 return Err(IoError::FormatError("Invalid dataset path".to_string()));
450 }
451
452 let dataset_name = parts.last().unwrap().to_string();
453 let grouppath = if parts.len() > 1 {
454 parts[..parts.len() - 1].join("/")
455 } else {
456 String::new()
457 };
458
459 Ok((grouppath, dataset_name))
460 }
461
462 fn create_fallback_dataset<A, D>(
464 &mut self,
465 path: &str,
466 array: &ArrayBase<A, D>,
467 options: DatasetOptions,
468 ) -> Result<()>
469 where
470 A: scirs2_core::ndarray::Data,
471 A::Elem: Clone + Into<f64> + std::fmt::Debug,
472 D: scirs2_core::ndarray::Dimension,
473 {
474 self.base_file
477 .create_dataset_from_array(path, array, Some(options))
478 }
479
480 pub fn read_dataset_parallel(&self, path: &str) -> Result<ArrayD<f64>> {
482 let _lock = self.file_lock.read().unwrap();
483
484 if let Some(ref parallel_config) = self.parallel_config {
485 self.read_dataset_parallel_impl(path, parallel_config)
486 } else {
487 self.base_file.read_dataset(path)
488 }
489 }
490
491 fn read_dataset_parallel_impl(
493 &self,
494 path: &str,
495 _parallel_config: &ParallelConfig,
496 ) -> Result<ArrayD<f64>> {
497 #[cfg(feature = "hdf5")]
498 {
499 if let Some(file) = self.base_file.native_file() {
500 return self.read_dataset_parallel_native(file, path, _parallel_config);
501 }
502 }
503
504 self.base_file.read_dataset(path)
506 }
507
508 #[cfg(feature = "hdf5")]
510 fn read_dataset_parallel_native(
511 &self,
512 file: &File,
513 path: &str,
514 parallel_config: &ParallelConfig,
515 ) -> Result<ArrayD<f64>> {
516 let (grouppath, dataset_name) = self.split_path(path)?;
517
518 let dataset = if grouppath.is_empty() {
519 file.dataset(&dataset_name)
520 } else {
521 let group = file.group(&grouppath).map_err(|e| {
522 IoError::FormatError(format!("Failed to access group {grouppath}: {e}"))
523 })?;
524 group.dataset(&dataset_name)
525 }
526 .map_err(|e| {
527 IoError::FormatError(format!("Failed to access dataset {dataset_name}: {e}"))
528 })?;
529
530 let shape = dataset.shape();
531 let total_elements: usize = shape.iter().product();
532
533 if total_elements < parallel_config.chunk_size * 2 {
535 let data: Vec<f64> = dataset
536 .read_raw()
537 .map_err(|e| IoError::FormatError(format!("Failed to read dataset: {e}")))?;
538 let ndarrayshape = IxDyn(&shape);
539 return ArrayD::from_shape_vec(ndarrayshape, data)
540 .map_err(|e| IoError::FormatError(e.to_string()));
541 }
542
543 let chunk_size = parallel_config.chunk_size;
545 let num_workers = parallel_config
546 .num_workers
547 .min((total_elements + chunk_size - 1) / chunk_size);
548
549 let mut handles = vec![];
550 let chunks_per_worker = (total_elements + chunk_size - 1) / chunk_size / num_workers;
551
552 for worker_id in 0..num_workers {
553 let start_chunk = worker_id * chunks_per_worker;
554 let end_chunk = ((worker_id + 1) * chunks_per_worker)
555 .min((total_elements + chunk_size - 1) / chunk_size);
556
557 if start_chunk >= end_chunk {
558 break;
559 }
560
561 let start_element = start_chunk * chunk_size;
562 let end_element = (end_chunk * chunk_size).min(total_elements);
563
564 let dataset_clone = dataset.clone();
566
567 let handle = thread::spawn(move || {
568 let slice_size = end_element - start_element;
569 let mut data = vec![0.0f64; slice_size];
570
571 match dataset_clone.read_raw::<f64>() {
576 Ok(full_data) => {
577 let slice_end = (start_element + slice_size).min(full_data.len());
578 data.copy_from_slice(&full_data[start_element..slice_end]);
579 }
580 Err(e) => {
581 return Err(IoError::FormatError(format!("Failed to read slice: {e}")));
582 }
583 }
584
585 Ok((start_element, data))
586 });
587
588 handles.push(handle);
589 }
590
591 let mut full_data = vec![0.0f64; total_elements];
593 for handle in handles {
594 let (start_element, data) = handle
595 .join()
596 .map_err(|_| IoError::FormatError("Thread join failed".to_string()))??;
597
598 full_data[start_element..start_element + data.len()].copy_from_slice(&data);
599 }
600
601 let ndarrayshape = IxDyn(&shape);
602 ArrayD::from_shape_vec(ndarrayshape, full_data)
603 .map_err(|e| IoError::FormatError(e.to_string()))
604 }
605
606 pub fn get_compression_stats(&self) -> CompressionStats {
608 self.compression_stats.lock().unwrap().clone()
609 }
610
611 pub fn write_datasets_parallel(
613 &mut self,
614 datasets: HashMap<String, (ArrayD<f64>, ExtendedDataType, DatasetOptions)>,
615 ) -> Result<()> {
616 let _lock = self.file_lock.write().unwrap();
617 let parallel_config_clone = self.parallel_config.clone();
618 drop(_lock); if let Some(ref parallel_config) = parallel_config_clone {
621 self.write_datasets_parallel_impl(datasets, parallel_config)
622 } else {
623 for (path, (array, data_type, options)) in datasets {
625 self.create_dataset_with_compression(&path, &array, data_type, options)?;
626 }
627 Ok(())
628 }
629 }
630
631 fn write_datasets_parallel_impl(
633 &mut self,
634 datasets: HashMap<String, (ArrayD<f64>, ExtendedDataType, DatasetOptions)>,
635 _parallel_config: &ParallelConfig,
636 ) -> Result<()> {
637 for (path, (array, data_type, options)) in datasets {
640 self.create_dataset_with_compression(&path, &array, data_type, options)?;
641 }
642 Ok(())
643 }
644
645 #[allow(dead_code)]
648 fn _placeholder_convert_methods(&self) {
649 }
652
653 pub fn close(self) -> Result<()> {
655 self.base_file.close()
656 }
657}
658
659#[allow(dead_code)]
661pub fn write_hdf5_enhanced<P: AsRef<Path>>(
662 path: P,
663 datasets: HashMap<String, (ArrayD<f64>, ExtendedDataType, DatasetOptions)>,
664 parallel_config: Option<ParallelConfig>,
665) -> Result<()> {
666 let mut file = EnhancedHDF5File::create(path, parallel_config)?;
667 file.write_datasets_parallel(datasets)?;
668 file.close()?;
669 Ok(())
670}
671
672#[allow(dead_code)]
674pub fn read_hdf5_enhanced<P: AsRef<Path>>(
675 path: P,
676 parallel_config: Option<ParallelConfig>,
677) -> Result<EnhancedHDF5File> {
678 EnhancedHDF5File::open(path, FileMode::ReadOnly, parallel_config)
679}
680
681#[allow(dead_code)]
683pub fn create_optimal_compression_options(
684 data_type: &ExtendedDataType,
685 estimated_size: usize,
686) -> CompressionOptions {
687 let mut options = CompressionOptions::default();
688
689 match data_type {
691 ExtendedDataType::Float32 | ExtendedDataType::Float64 => {
692 options.shuffle = true;
694 options.gzip = Some(if estimated_size > 1024 * 1024 { 6 } else { 9 });
695 }
696 ExtendedDataType::Int8 | ExtendedDataType::UInt8 => {
697 options.lzf = true;
699 options.shuffle = true;
700 }
701 _ => {
702 options.gzip = Some(6);
704 options.shuffle = true;
705 }
706 }
707
708 options
709}
710
711#[cfg(test)]
712mod tests {
713 use super::*;
714
715 #[test]
716 fn test_enhanced_compression_options() {
717 let options =
718 create_optimal_compression_options(&ExtendedDataType::Float64, 2 * 1024 * 1024);
719 assert_eq!(options.gzip, Some(6));
720 assert!(options.shuffle);
721 }
722
723 #[test]
724 fn test_optimal_chunks_calculation() {
725 let file = EnhancedHDF5File::create("test.h5", None).unwrap();
726 let shape = vec![1000, 1000];
727 let total_elements = 1_000_000;
728
729 let chunks = file.calculate_optimal_chunks(&shape, total_elements);
730 assert!(chunks.len() == 2);
731 assert!(chunks[0] > 0 && chunks[1] > 0);
732
733 let chunk_elements: usize = chunks.iter().product();
734 assert!(chunk_elements <= 1024 * 1024 / 8); }
736
737 #[test]
738 #[ignore]
739 fn test_path_splitting() {
740 let file = EnhancedHDF5File::create("test.h5", None).unwrap();
741
742 let (grouppath, dataset_name) = file.split_path("/group1/group2/dataset").unwrap();
743 assert_eq!(grouppath, "group1/group2");
744 assert_eq!(dataset_name, "dataset");
745
746 let (grouppath, dataset_name) = file.split_path("dataset").unwrap();
747 assert_eq!(grouppath, "");
748 assert_eq!(dataset_name, "dataset");
749 }
750
751 #[test]
752 fn test_parallel_config_default() {
753 let config = ParallelConfig::default();
754 assert!(config.num_workers > 0);
755 assert!(config.chunk_size > 0);
756 assert!(config.buffer_size > 0);
757 }
758}
759
760use std::collections::BTreeMap;
765
766#[derive(Debug, Clone)]
768pub enum AttributeValue {
769 String(String),
771 Integer(i64),
773 Float(f64),
775 FloatArray(Vec<f64>),
777 IntArray(Vec<i64>),
779 StringArray(Vec<String>),
781 Boolean(bool),
783}
784
785#[derive(Debug, Clone, Default)]
787pub struct ScientificMetadata {
788 pub attributes: BTreeMap<String, AttributeValue>,
790 pub units: Option<String>,
792 pub scale_factor: Option<f64>,
794 pub add_offset: Option<f64>,
796 pub fill_value: Option<f64>,
798 pub valid_range: Option<(f64, f64)>,
800 pub calibration: Option<CalibrationInfo>,
802 pub provenance: Option<ProvenanceInfo>,
804}
805
806#[derive(Debug, Clone)]
808pub struct CalibrationInfo {
809 pub date: String,
811 pub method: String,
813 pub parameters: BTreeMap<String, f64>,
815 pub accuracy: Option<f64>,
817 pub precision: Option<f64>,
819}
820
821#[derive(Debug, Clone)]
823pub struct ProvenanceInfo {
824 pub source: String,
826 pub processing_history: Vec<String>,
828 pub creation_time: String,
830 pub creator: String,
832 pub software_version: String,
834 pub input_files: Vec<String>,
836}
837
838impl ScientificMetadata {
839 pub fn new() -> Self {
841 Self::default()
842 }
843
844 pub fn add_string_attr<S: Into<String>>(mut self, name: S, value: S) -> Self {
846 self.attributes
847 .insert(name.into(), AttributeValue::String(value.into()));
848 self
849 }
850
851 pub fn add_float_attr<S: Into<String>>(mut self, name: S, value: f64) -> Self {
853 self.attributes
854 .insert(name.into(), AttributeValue::Float(value));
855 self
856 }
857
858 pub fn with_units<S: Into<String>>(mut self, units: S) -> Self {
860 self.units = Some(units.into());
861 self
862 }
863
864 pub fn with_scaling(mut self, scale_factor: f64, add_offset: f64) -> Self {
866 self.scale_factor = Some(scale_factor);
867 self.add_offset = Some(add_offset);
868 self
869 }
870
871 pub fn with_valid_range(mut self, min: f64, max: f64) -> Self {
873 self.valid_range = Some((min, max));
874 self
875 }
876
877 pub fn with_provenance(mut self, provenance: ProvenanceInfo) -> Self {
879 self.provenance = Some(provenance);
880 self
881 }
882}
883
884#[derive(Debug, Clone, Default)]
886pub struct HDF5PerformanceMonitor {
887 pub timings: BTreeMap<String, Vec<f64>>,
889 pub transfer_stats: TransferStats,
891 pub memory_stats: MemoryStats,
893 pub compression_efficiency: Vec<CompressionStats>,
895}
896
897#[derive(Debug, Clone, Default)]
898pub struct TransferStats {
899 pub bytes_read: usize,
901 pub bytes_written: usize,
903 pub read_operations: usize,
905 pub write_operations: usize,
907 pub avg_read_speed: f64,
909 pub avg_write_speed: f64,
911}
912
913#[derive(Debug, Clone, Default)]
914pub struct MemoryStats {
915 pub peak_memory_bytes: usize,
917 pub current_memory_bytes: usize,
919 pub allocation_count: usize,
921 pub deallocation_count: usize,
923}
924
925impl HDF5PerformanceMonitor {
926 pub fn new() -> Self {
928 Self::default()
929 }
930
931 pub fn record_timing(&mut self, operation: &str, durationms: f64) {
933 self.timings
934 .entry(operation.to_string())
935 .or_default()
936 .push(durationms);
937 }
938
939 pub fn record_read(&mut self, bytes: usize, durationms: f64) {
941 self.transfer_stats.bytes_read += bytes;
942 self.transfer_stats.read_operations += 1;
943
944 if durationms > 0.0 {
945 let speed = bytes as f64 / (durationms / 1000.0);
946 let total_ops = self.transfer_stats.read_operations as f64;
947 self.transfer_stats.avg_read_speed =
948 (self.transfer_stats.avg_read_speed * (total_ops - 1.0) + speed) / total_ops;
949 }
950 }
951
952 pub fn record_write(&mut self, bytes: usize, durationms: f64) {
954 self.transfer_stats.bytes_written += bytes;
955 self.transfer_stats.write_operations += 1;
956
957 if durationms > 0.0 {
958 let speed = bytes as f64 / (durationms / 1000.0);
959 let total_ops = self.transfer_stats.write_operations as f64;
960 self.transfer_stats.avg_write_speed =
961 (self.transfer_stats.avg_write_speed * (total_ops - 1.0) + speed) / total_ops;
962 }
963 }
964
965 pub fn avg_timing(&self, operation: &str) -> Option<f64> {
967 self.timings
968 .get(operation)
969 .map(|times| times.iter().sum::<f64>() / times.len() as f64)
970 }
971
972 pub fn get_summary(&self) -> PerformanceSummary {
974 let mut operation_averages = BTreeMap::new();
975
976 for (op, times) in &self.timings {
977 let avg = times.iter().sum::<f64>() / times.len() as f64;
978 operation_averages.insert(op.clone(), avg);
979 }
980
981 PerformanceSummary {
982 operation_averages,
983 total_bytes_transferred: self.transfer_stats.bytes_read
984 + self.transfer_stats.bytes_written,
985 avg_read_speed_mbps: self.transfer_stats.avg_read_speed / 1_000_000.0,
986 avg_write_speed_mbps: self.transfer_stats.avg_write_speed / 1_000_000.0,
987 peak_memory_mb: self.memory_stats.peak_memory_bytes as f64 / 1_000_000.0,
988 compression_ratio: self
989 .compression_efficiency
990 .iter()
991 .map(|c| c.compression_ratio)
992 .fold(0.0, |acc, x| acc + x)
993 / self.compression_efficiency.len().max(1) as f64,
994 }
995 }
996}
997
998#[derive(Debug, Clone)]
1000pub struct PerformanceSummary {
1001 pub operation_averages: BTreeMap<String, f64>,
1003 pub total_bytes_transferred: usize,
1005 pub avg_read_speed_mbps: f64,
1007 pub avg_write_speed_mbps: f64,
1009 pub peak_memory_mb: f64,
1011 pub compression_ratio: f64,
1013}
1014
1015#[derive(Debug, Clone)]
1017pub enum LayoutOptimization {
1018 RowMajor,
1020 ColumnMajor,
1022 Chunked(Vec<usize>),
1024 Tiled {
1026 tile_width: usize,
1027 tile_height: usize,
1028 },
1029 Striped { strip_size: usize },
1031}
1032
1033#[derive(Debug, Clone)]
1035pub struct AccessPatternAnalyzer {
1036 access_patterns: Vec<AccessPattern>,
1038 recommendations: Vec<LayoutOptimization>,
1040}
1041
1042#[derive(Debug, Clone)]
1043pub struct AccessPattern {
1044 pub operation: String,
1046 pub region: Vec<(usize, usize)>,
1048 pub frequency: usize,
1050 pub timestamp: std::time::Instant,
1052}
1053
1054impl AccessPatternAnalyzer {
1055 pub fn new() -> Self {
1057 Self {
1058 access_patterns: Vec::new(),
1059 recommendations: Vec::new(),
1060 }
1061 }
1062
1063 pub fn record_access(&mut self, operation: String, region: Vec<(usize, usize)>) {
1065 for pattern in &mut self.access_patterns {
1067 if pattern.operation == operation && pattern.region == region {
1068 pattern.frequency += 1;
1069 pattern.timestamp = std::time::Instant::now();
1070 return;
1071 }
1072 }
1073
1074 self.access_patterns.push(AccessPattern {
1076 operation,
1077 region,
1078 frequency: 1,
1079 timestamp: std::time::Instant::now(),
1080 });
1081 }
1082
1083 pub fn analyze(&mut self) -> &Vec<LayoutOptimization> {
1085 self.recommendations.clear();
1086
1087 if self.access_patterns.is_empty() {
1088 return &self.recommendations;
1089 }
1090
1091 let mut pattern_analysis = BTreeMap::new();
1093
1094 for pattern in &self.access_patterns {
1095 let key = format!("{:?}", pattern.region);
1096 let entry = pattern_analysis
1097 .entry(key)
1098 .or_insert((0, pattern.region.clone()));
1099 entry.0 += pattern.frequency;
1100 }
1101
1102 if let Some((_, (_, most_common_region))) =
1104 pattern_analysis.iter().max_by_key(|(_, (freq_, _))| *freq_)
1105 {
1106 if most_common_region.len() == 1 {
1108 let optimal_strip = most_common_region[0].1.max(1024);
1110 self.recommendations.push(LayoutOptimization::Striped {
1111 strip_size: optimal_strip,
1112 });
1113 } else if most_common_region.len() == 2 {
1114 let (_row_access, row_size) = most_common_region[0];
1116 let (_col_access, col_size) = most_common_region[1];
1117
1118 if row_size > col_size * 10 {
1119 self.recommendations.push(LayoutOptimization::RowMajor);
1121 } else if col_size > row_size * 10 {
1122 self.recommendations.push(LayoutOptimization::ColumnMajor);
1124 } else {
1125 let tile_width = col_size.clamp(64, 512);
1127 let tile_height = row_size.clamp(64, 512);
1128 self.recommendations.push(LayoutOptimization::Tiled {
1129 tile_width,
1130 tile_height,
1131 });
1132 }
1133 } else {
1134 let optimal_chunks: Vec<usize> = most_common_region
1136 .iter()
1137 .map(|(_, size)| size.clamp(&64, &1024))
1138 .cloned()
1139 .collect();
1140 self.recommendations
1141 .push(LayoutOptimization::Chunked(optimal_chunks));
1142 }
1143 }
1144
1145 &self.recommendations
1146 }
1147
1148 pub fn get_statistics(&self) -> AccessPatternStats {
1150 let total_accesses = self.access_patterns.iter().map(|p| p.frequency).sum();
1151 let unique_patterns = self.access_patterns.len();
1152
1153 let read_count = self
1154 .access_patterns
1155 .iter()
1156 .filter(|p| p.operation.contains("read"))
1157 .map(|p| p.frequency)
1158 .sum();
1159
1160 let write_count = total_accesses - read_count;
1161
1162 AccessPatternStats {
1163 total_accesses,
1164 unique_patterns,
1165 read_count,
1166 write_count,
1167 most_frequent_pattern: self
1168 .access_patterns
1169 .iter()
1170 .max_by_key(|p| p.frequency)
1171 .map(|p| p.region.clone()),
1172 }
1173 }
1174}
1175
1176impl Default for AccessPatternAnalyzer {
1177 fn default() -> Self {
1178 Self::new()
1179 }
1180}
1181
1182#[derive(Debug, Clone)]
1184pub struct AccessPatternStats {
1185 pub total_accesses: usize,
1187 pub unique_patterns: usize,
1189 pub read_count: usize,
1191 pub write_count: usize,
1193 pub most_frequent_pattern: Option<Vec<(usize, usize)>>,
1195}
1196
1197pub struct OptimizedHDF5File {
1199 pub base_file: EnhancedHDF5File,
1201 pub performance_monitor: Arc<Mutex<HDF5PerformanceMonitor>>,
1203 pub access_analyzer: Arc<Mutex<AccessPatternAnalyzer>>,
1205 pub metadata_cache: Arc<RwLock<BTreeMap<String, ScientificMetadata>>>,
1207}
1208
1209impl OptimizedHDF5File {
1210 pub fn create<P: AsRef<Path>>(
1212 path: P,
1213 parallel_config: Option<ParallelConfig>,
1214 ) -> Result<Self> {
1215 let base_file = EnhancedHDF5File::create(path, parallel_config)?;
1216
1217 Ok(Self {
1218 base_file,
1219 performance_monitor: Arc::new(Mutex::new(HDF5PerformanceMonitor::new())),
1220 access_analyzer: Arc::new(Mutex::new(AccessPatternAnalyzer::new())),
1221 metadata_cache: Arc::new(RwLock::new(BTreeMap::new())),
1222 })
1223 }
1224
1225 pub fn open<P: AsRef<Path>>(
1227 path: P,
1228 mode: FileMode,
1229 parallel_config: Option<ParallelConfig>,
1230 ) -> Result<Self> {
1231 let base_file = EnhancedHDF5File::open(path, mode, parallel_config)?;
1232
1233 Ok(Self {
1234 base_file,
1235 performance_monitor: Arc::new(Mutex::new(HDF5PerformanceMonitor::new())),
1236 access_analyzer: Arc::new(Mutex::new(AccessPatternAnalyzer::new())),
1237 metadata_cache: Arc::new(RwLock::new(BTreeMap::new())),
1238 })
1239 }
1240
1241 pub fn add_scientific_metadata(
1243 &mut self,
1244 dataset_path: &str,
1245 metadata: ScientificMetadata,
1246 ) -> Result<()> {
1247 {
1249 let mut cache = self.metadata_cache.write().unwrap();
1250 cache.insert(dataset_path.to_string(), metadata.clone());
1251 }
1252
1253 Ok(())
1256 }
1257
1258 pub fn get_scientific_metadata(&self, datasetpath: &str) -> Option<ScientificMetadata> {
1260 let cache = self.metadata_cache.read().unwrap();
1261 cache.get(datasetpath).cloned()
1262 }
1263
1264 pub fn get_performance_report(&self) -> PerformanceSummary {
1266 let monitor = self.performance_monitor.lock().unwrap();
1267 monitor.get_summary()
1268 }
1269
1270 pub fn get_layout_recommendations(&self) -> Vec<LayoutOptimization> {
1272 let mut analyzer = self.access_analyzer.lock().unwrap();
1273 analyzer.analyze().clone()
1274 }
1275
1276 pub fn record_access(&self, operation: &str, region: Vec<(usize, usize)>) {
1278 let mut analyzer = self.access_analyzer.lock().unwrap();
1279 analyzer.record_access(operation.to_string(), region);
1280 }
1281
1282 pub fn get_access_statistics(&self) -> AccessPatternStats {
1284 let analyzer = self.access_analyzer.lock().unwrap();
1285 analyzer.get_statistics()
1286 }
1287
1288 pub fn benchmark_operation<F, R>(&self, operationname: &str, operation: F) -> Result<R>
1290 where
1291 F: FnOnce() -> Result<R>,
1292 {
1293 let start_time = Instant::now();
1294 let result = operation()?;
1295 let duration = start_time.elapsed().as_secs_f64() * 1000.0;
1296
1297 {
1298 let mut monitor = self.performance_monitor.lock().unwrap();
1299 monitor.record_timing(operationname, duration);
1300 }
1301
1302 Ok(result)
1303 }
1304}
1305
1306#[cfg(test)]
1307mod enhanced_tests {
1308 use super::*;
1309
1310 #[test]
1311 fn test_scientific_metadata() {
1312 let metadata = ScientificMetadata::new()
1313 .add_string_attr("instrument", "spectrometer")
1314 .add_float_attr("wavelength", 550.0)
1315 .with_units("nanometers")
1316 .with_scaling(1.0, 0.0)
1317 .with_valid_range(0.0, 1000.0);
1318
1319 assert_eq!(metadata.units, Some("nanometers".to_string()));
1320 assert_eq!(metadata.scale_factor, Some(1.0));
1321 assert_eq!(metadata.valid_range, Some((0.0, 1000.0)));
1322 }
1323
1324 #[test]
1325 fn test_performance_monitor() {
1326 let mut monitor = HDF5PerformanceMonitor::new();
1327
1328 monitor.record_timing("read", 10.0);
1329 monitor.record_timing("read", 20.0);
1330 monitor.record_read(1024, 10.0);
1331
1332 assert_eq!(monitor.avg_timing("read"), Some(15.0));
1333 assert_eq!(monitor.transfer_stats.bytes_read, 1024);
1334 assert_eq!(monitor.transfer_stats.read_operations, 1);
1335 }
1336
1337 #[test]
1338 fn test_access_pattern_analyzer() {
1339 let mut analyzer = AccessPatternAnalyzer::new();
1340
1341 analyzer.record_access("read".to_string(), vec![(0, 100), (0, 50)]);
1343 analyzer.record_access("read".to_string(), vec![(0, 100), (0, 50)]);
1344 analyzer.record_access("write".to_string(), vec![(100, 100), (50, 50)]);
1345
1346 let stats = analyzer.get_statistics();
1347 assert_eq!(stats.total_accesses, 3);
1348 assert_eq!(stats.unique_patterns, 2);
1349 assert_eq!(stats.read_count, 2);
1350
1351 let recommendations = analyzer.analyze();
1352 assert!(!recommendations.is_empty());
1353 }
1354}