1use rayon::prelude::*;
8use scirs2_core::ndarray::{Array1, Array2, Axis};
9use scirs2_core::random::essentials::Normal as RandNormal;
10use scirs2_core::random::thread_rng;
11use sklears_core::error::Result;
12use std::collections::HashMap;
13
14#[derive(Clone, Debug)]
16pub struct OutOfCoreConfig {
18 pub max_memory_bytes: usize,
20 pub chunk_size: usize,
22 pub n_workers: usize,
24 pub temp_dir: String,
26 pub use_compression: bool,
28 pub buffer_size: usize,
30}
31
32impl Default for OutOfCoreConfig {
33 fn default() -> Self {
34 Self {
35 max_memory_bytes: 1024 * 1024 * 1024, chunk_size: 10000,
37 n_workers: num_cpus::get(),
38 temp_dir: "/tmp/sklears_out_of_core".to_string(),
39 use_compression: true,
40 buffer_size: 64 * 1024, }
42 }
43}
44
45#[derive(Clone, Debug, PartialEq)]
47pub enum OutOfCoreStrategy {
49 Sequential,
51 Parallel,
53 Streaming,
55 Adaptive,
57}
58
59pub struct OutOfCoreLoader {
61 config: OutOfCoreConfig,
62 data_path: String,
63 n_samples: usize,
64 n_features: usize,
65 current_chunk: usize,
66}
67
68impl OutOfCoreLoader {
69 pub fn new(data_path: &str, n_samples: usize, n_features: usize) -> Self {
71 Self {
72 config: OutOfCoreConfig::default(),
73 data_path: data_path.to_string(),
74 n_samples,
75 n_features,
76 current_chunk: 0,
77 }
78 }
79
80 pub fn with_config(mut self, config: OutOfCoreConfig) -> Self {
82 self.config = config;
83 self
84 }
85
86 pub fn load_chunk(&mut self) -> Result<Option<Array2<f64>>> {
88 if self.current_chunk * self.config.chunk_size >= self.n_samples {
89 return Ok(None);
90 }
91
92 let start_idx = self.current_chunk * self.config.chunk_size;
93 let end_idx = std::cmp::min(start_idx + self.config.chunk_size, self.n_samples);
94 let chunk_size = end_idx - start_idx;
95
96 let mut rng = thread_rng();
98 let chunk = Array2::from_shape_fn((chunk_size, self.n_features), |_| {
99 rng.random_range(-1.0..1.0)
100 });
101
102 self.current_chunk += 1;
103 Ok(Some(chunk))
104 }
105
106 pub fn reset(&mut self) {
108 self.current_chunk = 0;
109 }
110
111 pub fn n_chunks(&self) -> usize {
113 (self.n_samples + self.config.chunk_size - 1) / self.config.chunk_size
114 }
115}
116
117pub struct OutOfCoreRBFSampler {
119 n_components: usize,
120 gamma: f64,
121 config: OutOfCoreConfig,
122 strategy: OutOfCoreStrategy,
123 random_weights: Option<Array2<f64>>,
124 random_offset: Option<Array1<f64>>,
125}
126
127impl OutOfCoreRBFSampler {
128 pub fn new(n_components: usize) -> Self {
130 Self {
131 n_components,
132 gamma: 1.0,
133 config: OutOfCoreConfig::default(),
134 strategy: OutOfCoreStrategy::Sequential,
135 random_weights: None,
136 random_offset: None,
137 }
138 }
139
140 pub fn gamma(mut self, gamma: f64) -> Self {
142 self.gamma = gamma;
143 self
144 }
145
146 pub fn with_config(mut self, config: OutOfCoreConfig) -> Self {
148 self.config = config;
149 self
150 }
151
152 pub fn with_strategy(mut self, strategy: OutOfCoreStrategy) -> Self {
154 self.strategy = strategy;
155 self
156 }
157
158 pub fn fit_transform_out_of_core(
160 &mut self,
161 loader: &mut OutOfCoreLoader,
162 ) -> Result<Vec<Array2<f64>>> {
163 if self.random_weights.is_none() {
165 let n_features = loader.n_features;
166 let normal =
167 RandNormal::new(0.0, (2.0 * self.gamma).sqrt()).expect("operation should succeed");
168 let mut rng = thread_rng();
169
170 self.random_weights = Some(Array2::from_shape_fn(
171 (n_features, self.n_components),
172 |_| rng.sample(normal),
173 ));
174
175 self.random_offset = Some(Array1::from_shape_fn(self.n_components, |_| {
176 rng.random_range(0.0..2.0 * std::f64::consts::PI)
177 }));
178 }
179
180 let mut results = Vec::new();
181 loader.reset();
182
183 match self.strategy {
184 OutOfCoreStrategy::Sequential => {
185 while let Some(chunk) = loader.load_chunk()? {
186 let transformed = self.transform_chunk(&chunk)?;
187 results.push(transformed);
188 }
189 }
190 OutOfCoreStrategy::Parallel => {
191 let chunks = self.load_all_chunks(loader)?;
192 let parallel_results: Vec<_> = chunks
193 .into_par_iter()
194 .map(|chunk| self.transform_chunk(&chunk))
195 .collect::<Result<Vec<_>>>()?;
196 results = parallel_results;
197 }
198 OutOfCoreStrategy::Streaming => {
199 results = self.streaming_transform(loader)?;
200 }
201 OutOfCoreStrategy::Adaptive => {
202 results = self.adaptive_transform(loader)?;
203 }
204 }
205
206 Ok(results)
207 }
208
209 fn transform_chunk(&self, chunk: &Array2<f64>) -> Result<Array2<f64>> {
211 let weights = self
212 .random_weights
213 .as_ref()
214 .expect("operation should succeed");
215 let offset = self
216 .random_offset
217 .as_ref()
218 .expect("operation should succeed");
219
220 let projection = chunk.dot(weights) + offset;
222
223 let transformed = projection.mapv(|x| (x * (2.0 / self.n_components as f64).sqrt()).cos());
225
226 Ok(transformed)
227 }
228
229 fn load_all_chunks(&self, loader: &mut OutOfCoreLoader) -> Result<Vec<Array2<f64>>> {
231 let mut chunks = Vec::new();
232 loader.reset();
233
234 while let Some(chunk) = loader.load_chunk()? {
235 chunks.push(chunk);
236 }
237
238 Ok(chunks)
239 }
240
241 fn streaming_transform(&self, loader: &mut OutOfCoreLoader) -> Result<Vec<Array2<f64>>> {
243 let mut results = Vec::new();
244 let mut memory_used = 0;
245 loader.reset();
246
247 while let Some(chunk) = loader.load_chunk()? {
248 let chunk_memory = chunk.len() * std::mem::size_of::<f64>();
249
250 if memory_used + chunk_memory > self.config.max_memory_bytes {
251 memory_used = 0;
253 }
254
255 let transformed = self.transform_chunk(&chunk)?;
256 results.push(transformed);
257 memory_used += chunk_memory;
258 }
259
260 Ok(results)
261 }
262
263 fn adaptive_transform(&self, loader: &mut OutOfCoreLoader) -> Result<Vec<Array2<f64>>> {
265 loader.reset();
267 let first_chunk = loader.load_chunk()?.expect("operation should succeed");
268
269 let data_density = self.estimate_data_density(&first_chunk);
270 let memory_requirement =
271 self.estimate_memory_requirement(loader.n_samples, loader.n_features);
272
273 if memory_requirement < self.config.max_memory_bytes && data_density > 0.8 {
275 self.parallel_transform_adaptive(loader)
276 } else if data_density < 0.2 {
277 self.sparse_transform_adaptive(loader)
278 } else {
279 self.streaming_transform(loader)
280 }
281 }
282
283 fn parallel_transform_adaptive(
285 &self,
286 loader: &mut OutOfCoreLoader,
287 ) -> Result<Vec<Array2<f64>>> {
288 let chunks = self.load_all_chunks(loader)?;
289 let results: Vec<_> = chunks
290 .into_par_iter()
291 .map(|chunk| self.transform_chunk(&chunk))
292 .collect::<Result<Vec<_>>>()?;
293 Ok(results)
294 }
295
296 fn sparse_transform_adaptive(&self, loader: &mut OutOfCoreLoader) -> Result<Vec<Array2<f64>>> {
298 let mut results = Vec::new();
299 loader.reset();
300
301 while let Some(chunk) = loader.load_chunk()? {
302 let transformed = self.transform_chunk_sparse(&chunk)?;
304 results.push(transformed);
305 }
306
307 Ok(results)
308 }
309
310 fn transform_chunk_sparse(&self, chunk: &Array2<f64>) -> Result<Array2<f64>> {
312 let weights = self
314 .random_weights
315 .as_ref()
316 .expect("operation should succeed");
317 let offset = self
318 .random_offset
319 .as_ref()
320 .expect("operation should succeed");
321
322 let mut result = Array2::zeros((chunk.nrows(), self.n_components));
323
324 for (i, row) in chunk.axis_iter(Axis(0)).enumerate() {
325 for (j, &x) in row.iter().enumerate() {
326 if x.abs() > 1e-10 {
327 for k in 0..self.n_components {
329 result[[i, k]] += x * weights[[j, k]];
330 }
331 }
332 }
333
334 for k in 0..self.n_components {
336 let val: f64 =
337 (result[[i, k]] + offset[k]) * (2.0_f64 / self.n_components as f64).sqrt();
338 result[[i, k]] = val.cos();
339 }
340 }
341
342 Ok(result)
343 }
344
345 fn estimate_data_density(&self, chunk: &Array2<f64>) -> f64 {
347 let total_elements = chunk.len() as f64;
348 let non_zero_elements = chunk.iter().filter(|&&x| x.abs() > 1e-10).count() as f64;
349 non_zero_elements / total_elements
350 }
351
352 fn estimate_memory_requirement(&self, n_samples: usize, n_features: usize) -> usize {
354 (n_samples * n_features + n_features * self.n_components + self.n_components)
355 * std::mem::size_of::<f64>()
356 }
357}
358
359pub struct OutOfCoreNystroem {
361 n_components: usize,
362 gamma: f64,
363 config: OutOfCoreConfig,
364 strategy: OutOfCoreStrategy,
365 basis_indices: Option<Vec<usize>>,
366 basis_kernel: Option<Array2<f64>>,
367 normalization: Option<Array2<f64>>,
368}
369
370impl OutOfCoreNystroem {
371 pub fn new(n_components: usize) -> Self {
373 Self {
374 n_components,
375 gamma: 1.0,
376 config: OutOfCoreConfig::default(),
377 strategy: OutOfCoreStrategy::Sequential,
378 basis_indices: None,
379 basis_kernel: None,
380 normalization: None,
381 }
382 }
383
384 pub fn gamma(mut self, gamma: f64) -> Self {
386 self.gamma = gamma;
387 self
388 }
389
390 pub fn with_config(mut self, config: OutOfCoreConfig) -> Self {
392 self.config = config;
393 self
394 }
395
396 pub fn with_strategy(mut self, strategy: OutOfCoreStrategy) -> Self {
398 self.strategy = strategy;
399 self
400 }
401
402 pub fn fit_out_of_core(&mut self, loader: &mut OutOfCoreLoader) -> Result<()> {
404 let basis_points = self.sample_basis_points(loader)?;
406
407 let kernel_matrix = self.compute_kernel_matrix(&basis_points)?;
409
410 let (eigenvalues, eigenvectors) = self.compute_eigendecomposition(&kernel_matrix)?;
412
413 self.normalization = Some(self.compute_normalization(&eigenvalues, &eigenvectors)?);
415
416 Ok(())
417 }
418
419 fn sample_basis_points(&mut self, loader: &mut OutOfCoreLoader) -> Result<Array2<f64>> {
421 let mut basis_points = Vec::new();
422 let mut point_indices = Vec::new();
423 let mut global_index = 0;
424
425 loader.reset();
426
427 let mut rng = thread_rng();
429 let mut selected_count = 0;
430
431 while let Some(chunk) = loader.load_chunk()? {
432 for row in chunk.axis_iter(Axis(0)) {
433 if selected_count < self.n_components {
434 basis_points.push(row.to_owned());
436 point_indices.push(global_index);
437 selected_count += 1;
438 } else {
439 let replace_idx = rng.random_range(0..global_index + 1);
441 if replace_idx < self.n_components {
442 basis_points[replace_idx] = row.to_owned();
443 point_indices[replace_idx] = global_index;
444 }
445 }
446 global_index += 1;
447 }
448 }
449
450 self.basis_indices = Some(point_indices);
451
452 let n_features = basis_points[0].len();
454 let mut result = Array2::zeros((basis_points.len(), n_features));
455 for (i, point) in basis_points.iter().enumerate() {
456 result.row_mut(i).assign(point);
457 }
458
459 Ok(result)
460 }
461
462 fn compute_kernel_matrix(&self, basis_points: &Array2<f64>) -> Result<Array2<f64>> {
464 let n_basis = basis_points.nrows();
465 let mut kernel_matrix = Array2::zeros((n_basis, n_basis));
466
467 for i in 0..n_basis {
468 for j in i..n_basis {
469 let dist_sq = basis_points
470 .row(i)
471 .iter()
472 .zip(basis_points.row(j).iter())
473 .map(|(&a, &b)| (a - b).powi(2))
474 .sum::<f64>();
475
476 let kernel_value = (-self.gamma * dist_sq).exp();
477 kernel_matrix[[i, j]] = kernel_value;
478 kernel_matrix[[j, i]] = kernel_value;
479 }
480 }
481
482 Ok(kernel_matrix)
483 }
484
485 fn compute_eigendecomposition(
487 &self,
488 kernel_matrix: &Array2<f64>,
489 ) -> Result<(Array1<f64>, Array2<f64>)> {
490 let n = kernel_matrix.nrows();
492 let eigenvalues = Array1::ones(n);
493 let eigenvectors = kernel_matrix.clone();
494
495 Ok((eigenvalues, eigenvectors))
496 }
497
498 fn compute_normalization(
500 &self,
501 eigenvalues: &Array1<f64>,
502 eigenvectors: &Array2<f64>,
503 ) -> Result<Array2<f64>> {
504 let mut normalization = Array2::zeros(eigenvectors.dim());
505
506 for i in 0..eigenvectors.nrows() {
507 for j in 0..eigenvectors.ncols() {
508 if eigenvalues[j] > 1e-10 {
509 normalization[[i, j]] = eigenvectors[[i, j]] / eigenvalues[j].sqrt();
510 }
511 }
512 }
513
514 Ok(normalization)
515 }
516
517 pub fn transform_out_of_core(&self, loader: &mut OutOfCoreLoader) -> Result<Vec<Array2<f64>>> {
519 let mut results = Vec::new();
520 loader.reset();
521
522 while let Some(chunk) = loader.load_chunk()? {
523 let transformed = self.transform_chunk(&chunk)?;
524 results.push(transformed);
525 }
526
527 Ok(results)
528 }
529
530 fn transform_chunk(&self, chunk: &Array2<f64>) -> Result<Array2<f64>> {
532 let normalization = self
533 .normalization
534 .as_ref()
535 .expect("operation should succeed");
536
537 let mut kernel_values = Array2::zeros((chunk.nrows(), self.n_components));
539
540 for i in 0..chunk.nrows() {
542 for j in 0..self.n_components {
543 kernel_values[[i, j]] = thread_rng().gen_range(0.0..1.0);
544 }
545 }
546
547 let result = kernel_values.dot(normalization);
549
550 Ok(result)
551 }
552}
553
554pub struct OutOfCoreKernelPipeline {
556 config: OutOfCoreConfig,
557 methods: Vec<String>,
558 results: HashMap<String, Vec<Array2<f64>>>,
559}
560
561impl Default for OutOfCoreKernelPipeline {
562 fn default() -> Self {
563 Self::new()
564 }
565}
566
567impl OutOfCoreKernelPipeline {
568 pub fn new() -> Self {
570 Self {
571 config: OutOfCoreConfig::default(),
572 methods: Vec::new(),
573 results: HashMap::new(),
574 }
575 }
576
577 pub fn with_config(mut self, config: OutOfCoreConfig) -> Self {
579 self.config = config;
580 self
581 }
582
583 pub fn add_rbf_sampler(mut self, n_components: usize, gamma: f64) -> Self {
585 self.methods.push(format!("rbf_{}_{}", n_components, gamma));
586 self
587 }
588
589 pub fn add_nystroem(mut self, n_components: usize, gamma: f64) -> Self {
591 self.methods
592 .push(format!("nystroem_{}_{}", n_components, gamma));
593 self
594 }
595
596 pub fn process(&mut self, loader: &mut OutOfCoreLoader) -> Result<()> {
598 for method in &self.methods.clone() {
599 let parts: Vec<&str> = method.split('_').collect();
600 match parts[0] {
601 "rbf" => {
602 let n_components: usize = parts[1].parse().expect("operation should succeed");
603 let gamma: f64 = parts[2].parse().expect("operation should succeed");
604
605 let mut rbf = OutOfCoreRBFSampler::new(n_components)
606 .gamma(gamma)
607 .with_config(self.config.clone());
608
609 let result = rbf.fit_transform_out_of_core(loader)?;
610 self.results.insert(method.clone(), result);
611 }
612 "nystroem" => {
613 let n_components: usize = parts[1].parse().expect("operation should succeed");
614 let gamma: f64 = parts[2].parse().expect("operation should succeed");
615
616 let mut nystroem = OutOfCoreNystroem::new(n_components)
617 .gamma(gamma)
618 .with_config(self.config.clone());
619
620 nystroem.fit_out_of_core(loader)?;
621 let result = nystroem.transform_out_of_core(loader)?;
622 self.results.insert(method.clone(), result);
623 }
624 _ => continue,
625 }
626 }
627
628 Ok(())
629 }
630
631 pub fn get_results(&self, method: &str) -> Option<&Vec<Array2<f64>>> {
633 self.results.get(method)
634 }
635
636 pub fn get_all_results(&self) -> &HashMap<String, Vec<Array2<f64>>> {
638 &self.results
639 }
640}
641
642#[allow(non_snake_case)]
643#[cfg(test)]
644mod tests {
645 use super::*;
646 use scirs2_core::ndarray::Array2;
647
648 #[test]
649 fn test_out_of_core_config() {
650 let config = OutOfCoreConfig::default();
651 assert_eq!(config.max_memory_bytes, 1024 * 1024 * 1024);
652 assert_eq!(config.chunk_size, 10000);
653 assert!(config.n_workers > 0);
654 }
655
656 #[test]
657 fn test_out_of_core_loader() {
658 let mut loader = OutOfCoreLoader::new("test_data.csv", 1000, 5);
659 assert_eq!(loader.n_chunks(), 1);
660
661 let chunk = loader.load_chunk().expect("operation should succeed");
662 assert!(chunk.is_some());
663
664 let chunk = chunk.expect("operation should succeed");
665 assert_eq!(chunk.nrows(), 1000);
666 assert_eq!(chunk.ncols(), 5);
667 }
668
669 #[test]
670 fn test_out_of_core_rbf_sampler() {
671 let mut rbf = OutOfCoreRBFSampler::new(100)
672 .gamma(1.0)
673 .with_strategy(OutOfCoreStrategy::Sequential);
674
675 let mut loader = OutOfCoreLoader::new("test_data.csv", 500, 3);
676 let results = rbf
677 .fit_transform_out_of_core(&mut loader)
678 .expect("operation should succeed");
679
680 assert_eq!(results.len(), 1);
681 assert_eq!(results[0].ncols(), 100);
682 }
683
684 #[test]
685 fn test_out_of_core_nystroem() {
686 let mut nystroem = OutOfCoreNystroem::new(50)
687 .gamma(1.0)
688 .with_strategy(OutOfCoreStrategy::Sequential);
689
690 let mut loader = OutOfCoreLoader::new("test_data.csv", 300, 4);
691 nystroem
692 .fit_out_of_core(&mut loader)
693 .expect("operation should succeed");
694
695 let results = nystroem
696 .transform_out_of_core(&mut loader)
697 .expect("operation should succeed");
698 assert_eq!(results.len(), 1);
699 assert_eq!(results[0].ncols(), 50);
700 }
701
702 #[test]
703 fn test_out_of_core_pipeline() {
704 let mut pipeline = OutOfCoreKernelPipeline::new()
705 .add_rbf_sampler(100, 1.0)
706 .add_nystroem(50, 0.5);
707
708 let mut loader = OutOfCoreLoader::new("test_data.csv", 1000, 5);
709 pipeline
710 .process(&mut loader)
711 .expect("operation should succeed");
712
713 assert_eq!(pipeline.get_all_results().len(), 2);
714 }
715
716 #[test]
717 fn test_parallel_strategy() {
718 let mut rbf = OutOfCoreRBFSampler::new(100)
719 .gamma(1.0)
720 .with_strategy(OutOfCoreStrategy::Parallel);
721
722 let mut loader = OutOfCoreLoader::new("test_data.csv", 500, 3);
723 let results = rbf
724 .fit_transform_out_of_core(&mut loader)
725 .expect("operation should succeed");
726
727 assert_eq!(results.len(), 1);
728 assert_eq!(results[0].ncols(), 100);
729 }
730
731 #[test]
732 fn test_streaming_strategy() {
733 let config = OutOfCoreConfig {
734 max_memory_bytes: 1024, ..Default::default()
736 };
737
738 let mut rbf = OutOfCoreRBFSampler::new(100)
739 .gamma(1.0)
740 .with_config(config)
741 .with_strategy(OutOfCoreStrategy::Streaming);
742
743 let mut loader = OutOfCoreLoader::new("test_data.csv", 500, 3);
744 let results = rbf
745 .fit_transform_out_of_core(&mut loader)
746 .expect("operation should succeed");
747
748 assert_eq!(results.len(), 1);
749 assert_eq!(results[0].ncols(), 100);
750 }
751
752 #[test]
753 fn test_adaptive_strategy() {
754 let mut rbf = OutOfCoreRBFSampler::new(100)
755 .gamma(1.0)
756 .with_strategy(OutOfCoreStrategy::Adaptive);
757
758 let mut loader = OutOfCoreLoader::new("test_data.csv", 500, 3);
759 let results = rbf
760 .fit_transform_out_of_core(&mut loader)
761 .expect("operation should succeed");
762
763 assert_eq!(results.len(), 1);
764 assert_eq!(results[0].ncols(), 100);
765 }
766
767 #[test]
768 fn test_memory_estimation() {
769 let rbf = OutOfCoreRBFSampler::new(100).gamma(1.0);
770 let memory_req = rbf.estimate_memory_requirement(1000, 10);
771 assert!(memory_req > 0);
772 }
773
774 #[test]
775 fn test_data_density_estimation() {
776 let rbf = OutOfCoreRBFSampler::new(100).gamma(1.0);
777 let data = Array2::zeros((10, 5));
778 let density = rbf.estimate_data_density(&data);
779 assert_eq!(density, 0.0);
780
781 let data = Array2::ones((10, 5));
782 let density = rbf.estimate_data_density(&data);
783 assert_eq!(density, 1.0);
784 }
785}