1use rayon::prelude::*;
8use scirs2_core::ndarray::{Array1, Array2, Axis};
9use scirs2_core::random::essentials::Normal as RandNormal;
10use scirs2_core::random::thread_rng;
11use sklears_core::error::Result;
12use std::collections::HashMap;
13
14#[derive(Clone, Debug)]
16pub struct OutOfCoreConfig {
18 pub max_memory_bytes: usize,
20 pub chunk_size: usize,
22 pub n_workers: usize,
24 pub temp_dir: String,
26 pub use_compression: bool,
28 pub buffer_size: usize,
30}
31
32impl Default for OutOfCoreConfig {
33 fn default() -> Self {
34 Self {
35 max_memory_bytes: 1024 * 1024 * 1024, chunk_size: 10000,
37 n_workers: num_cpus::get(),
38 temp_dir: "/tmp/sklears_out_of_core".to_string(),
39 use_compression: true,
40 buffer_size: 64 * 1024, }
42 }
43}
44
45#[derive(Clone, Debug, PartialEq)]
47pub enum OutOfCoreStrategy {
49 Sequential,
51 Parallel,
53 Streaming,
55 Adaptive,
57}
58
59pub struct OutOfCoreLoader {
61 config: OutOfCoreConfig,
62 data_path: String,
63 n_samples: usize,
64 n_features: usize,
65 current_chunk: usize,
66}
67
68impl OutOfCoreLoader {
69 pub fn new(data_path: &str, n_samples: usize, n_features: usize) -> Self {
71 Self {
72 config: OutOfCoreConfig::default(),
73 data_path: data_path.to_string(),
74 n_samples,
75 n_features,
76 current_chunk: 0,
77 }
78 }
79
80 pub fn with_config(mut self, config: OutOfCoreConfig) -> Self {
82 self.config = config;
83 self
84 }
85
86 pub fn load_chunk(&mut self) -> Result<Option<Array2<f64>>> {
88 if self.current_chunk * self.config.chunk_size >= self.n_samples {
89 return Ok(None);
90 }
91
92 let start_idx = self.current_chunk * self.config.chunk_size;
93 let end_idx = std::cmp::min(start_idx + self.config.chunk_size, self.n_samples);
94 let chunk_size = end_idx - start_idx;
95
96 let mut rng = thread_rng();
98 let chunk =
99 Array2::from_shape_fn((chunk_size, self.n_features), |_| rng.gen_range(-1.0..1.0));
100
101 self.current_chunk += 1;
102 Ok(Some(chunk))
103 }
104
105 pub fn reset(&mut self) {
107 self.current_chunk = 0;
108 }
109
110 pub fn n_chunks(&self) -> usize {
112 (self.n_samples + self.config.chunk_size - 1) / self.config.chunk_size
113 }
114}
115
116pub struct OutOfCoreRBFSampler {
118 n_components: usize,
119 gamma: f64,
120 config: OutOfCoreConfig,
121 strategy: OutOfCoreStrategy,
122 random_weights: Option<Array2<f64>>,
123 random_offset: Option<Array1<f64>>,
124}
125
126impl OutOfCoreRBFSampler {
127 pub fn new(n_components: usize) -> Self {
129 Self {
130 n_components,
131 gamma: 1.0,
132 config: OutOfCoreConfig::default(),
133 strategy: OutOfCoreStrategy::Sequential,
134 random_weights: None,
135 random_offset: None,
136 }
137 }
138
139 pub fn gamma(mut self, gamma: f64) -> Self {
141 self.gamma = gamma;
142 self
143 }
144
145 pub fn with_config(mut self, config: OutOfCoreConfig) -> Self {
147 self.config = config;
148 self
149 }
150
151 pub fn with_strategy(mut self, strategy: OutOfCoreStrategy) -> Self {
153 self.strategy = strategy;
154 self
155 }
156
157 pub fn fit_transform_out_of_core(
159 &mut self,
160 loader: &mut OutOfCoreLoader,
161 ) -> Result<Vec<Array2<f64>>> {
162 if self.random_weights.is_none() {
164 let n_features = loader.n_features;
165 let normal = RandNormal::new(0.0, (2.0 * self.gamma).sqrt()).unwrap();
166 let mut rng = thread_rng();
167
168 self.random_weights = Some(Array2::from_shape_fn(
169 (n_features, self.n_components),
170 |_| rng.sample(normal),
171 ));
172
173 self.random_offset = Some(Array1::from_shape_fn(self.n_components, |_| {
174 rng.gen_range(0.0..2.0 * std::f64::consts::PI)
175 }));
176 }
177
178 let mut results = Vec::new();
179 loader.reset();
180
181 match self.strategy {
182 OutOfCoreStrategy::Sequential => {
183 while let Some(chunk) = loader.load_chunk()? {
184 let transformed = self.transform_chunk(&chunk)?;
185 results.push(transformed);
186 }
187 }
188 OutOfCoreStrategy::Parallel => {
189 let chunks = self.load_all_chunks(loader)?;
190 let parallel_results: Vec<_> = chunks
191 .into_par_iter()
192 .map(|chunk| self.transform_chunk(&chunk))
193 .collect::<Result<Vec<_>>>()?;
194 results = parallel_results;
195 }
196 OutOfCoreStrategy::Streaming => {
197 results = self.streaming_transform(loader)?;
198 }
199 OutOfCoreStrategy::Adaptive => {
200 results = self.adaptive_transform(loader)?;
201 }
202 }
203
204 Ok(results)
205 }
206
207 fn transform_chunk(&self, chunk: &Array2<f64>) -> Result<Array2<f64>> {
209 let weights = self.random_weights.as_ref().unwrap();
210 let offset = self.random_offset.as_ref().unwrap();
211
212 let projection = chunk.dot(weights) + offset;
214
215 let transformed = projection.mapv(|x| (x * (2.0 / self.n_components as f64).sqrt()).cos());
217
218 Ok(transformed)
219 }
220
221 fn load_all_chunks(&self, loader: &mut OutOfCoreLoader) -> Result<Vec<Array2<f64>>> {
223 let mut chunks = Vec::new();
224 loader.reset();
225
226 while let Some(chunk) = loader.load_chunk()? {
227 chunks.push(chunk);
228 }
229
230 Ok(chunks)
231 }
232
233 fn streaming_transform(&self, loader: &mut OutOfCoreLoader) -> Result<Vec<Array2<f64>>> {
235 let mut results = Vec::new();
236 let mut memory_used = 0;
237 loader.reset();
238
239 while let Some(chunk) = loader.load_chunk()? {
240 let chunk_memory = chunk.len() * std::mem::size_of::<f64>();
241
242 if memory_used + chunk_memory > self.config.max_memory_bytes {
243 memory_used = 0;
245 }
246
247 let transformed = self.transform_chunk(&chunk)?;
248 results.push(transformed);
249 memory_used += chunk_memory;
250 }
251
252 Ok(results)
253 }
254
255 fn adaptive_transform(&self, loader: &mut OutOfCoreLoader) -> Result<Vec<Array2<f64>>> {
257 loader.reset();
259 let first_chunk = loader.load_chunk()?.unwrap();
260
261 let data_density = self.estimate_data_density(&first_chunk);
262 let memory_requirement =
263 self.estimate_memory_requirement(loader.n_samples, loader.n_features);
264
265 if memory_requirement < self.config.max_memory_bytes && data_density > 0.8 {
267 self.parallel_transform_adaptive(loader)
268 } else if data_density < 0.2 {
269 self.sparse_transform_adaptive(loader)
270 } else {
271 self.streaming_transform(loader)
272 }
273 }
274
275 fn parallel_transform_adaptive(
277 &self,
278 loader: &mut OutOfCoreLoader,
279 ) -> Result<Vec<Array2<f64>>> {
280 let chunks = self.load_all_chunks(loader)?;
281 let results: Vec<_> = chunks
282 .into_par_iter()
283 .map(|chunk| self.transform_chunk(&chunk))
284 .collect::<Result<Vec<_>>>()?;
285 Ok(results)
286 }
287
288 fn sparse_transform_adaptive(&self, loader: &mut OutOfCoreLoader) -> Result<Vec<Array2<f64>>> {
290 let mut results = Vec::new();
291 loader.reset();
292
293 while let Some(chunk) = loader.load_chunk()? {
294 let transformed = self.transform_chunk_sparse(&chunk)?;
296 results.push(transformed);
297 }
298
299 Ok(results)
300 }
301
302 fn transform_chunk_sparse(&self, chunk: &Array2<f64>) -> Result<Array2<f64>> {
304 let weights = self.random_weights.as_ref().unwrap();
306 let offset = self.random_offset.as_ref().unwrap();
307
308 let mut result = Array2::zeros((chunk.nrows(), self.n_components));
309
310 for (i, row) in chunk.axis_iter(Axis(0)).enumerate() {
311 for (j, &x) in row.iter().enumerate() {
312 if x.abs() > 1e-10 {
313 for k in 0..self.n_components {
315 result[[i, k]] += x * weights[[j, k]];
316 }
317 }
318 }
319
320 for k in 0..self.n_components {
322 let val: f64 =
323 (result[[i, k]] + offset[k]) * (2.0_f64 / self.n_components as f64).sqrt();
324 result[[i, k]] = val.cos();
325 }
326 }
327
328 Ok(result)
329 }
330
331 fn estimate_data_density(&self, chunk: &Array2<f64>) -> f64 {
333 let total_elements = chunk.len() as f64;
334 let non_zero_elements = chunk.iter().filter(|&&x| x.abs() > 1e-10).count() as f64;
335 non_zero_elements / total_elements
336 }
337
338 fn estimate_memory_requirement(&self, n_samples: usize, n_features: usize) -> usize {
340 (n_samples * n_features + n_features * self.n_components + self.n_components)
341 * std::mem::size_of::<f64>()
342 }
343}
344
345pub struct OutOfCoreNystroem {
347 n_components: usize,
348 gamma: f64,
349 config: OutOfCoreConfig,
350 strategy: OutOfCoreStrategy,
351 basis_indices: Option<Vec<usize>>,
352 basis_kernel: Option<Array2<f64>>,
353 normalization: Option<Array2<f64>>,
354}
355
356impl OutOfCoreNystroem {
357 pub fn new(n_components: usize) -> Self {
359 Self {
360 n_components,
361 gamma: 1.0,
362 config: OutOfCoreConfig::default(),
363 strategy: OutOfCoreStrategy::Sequential,
364 basis_indices: None,
365 basis_kernel: None,
366 normalization: None,
367 }
368 }
369
370 pub fn gamma(mut self, gamma: f64) -> Self {
372 self.gamma = gamma;
373 self
374 }
375
376 pub fn with_config(mut self, config: OutOfCoreConfig) -> Self {
378 self.config = config;
379 self
380 }
381
382 pub fn with_strategy(mut self, strategy: OutOfCoreStrategy) -> Self {
384 self.strategy = strategy;
385 self
386 }
387
388 pub fn fit_out_of_core(&mut self, loader: &mut OutOfCoreLoader) -> Result<()> {
390 let basis_points = self.sample_basis_points(loader)?;
392
393 let kernel_matrix = self.compute_kernel_matrix(&basis_points)?;
395
396 let (eigenvalues, eigenvectors) = self.compute_eigendecomposition(&kernel_matrix)?;
398
399 self.normalization = Some(self.compute_normalization(&eigenvalues, &eigenvectors)?);
401
402 Ok(())
403 }
404
405 fn sample_basis_points(&mut self, loader: &mut OutOfCoreLoader) -> Result<Array2<f64>> {
407 let mut basis_points = Vec::new();
408 let mut point_indices = Vec::new();
409 let mut global_index = 0;
410
411 loader.reset();
412
413 let mut rng = thread_rng();
415 let mut selected_count = 0;
416
417 while let Some(chunk) = loader.load_chunk()? {
418 for row in chunk.axis_iter(Axis(0)) {
419 if selected_count < self.n_components {
420 basis_points.push(row.to_owned());
422 point_indices.push(global_index);
423 selected_count += 1;
424 } else {
425 let replace_idx = rng.gen_range(0..global_index + 1);
427 if replace_idx < self.n_components {
428 basis_points[replace_idx] = row.to_owned();
429 point_indices[replace_idx] = global_index;
430 }
431 }
432 global_index += 1;
433 }
434 }
435
436 self.basis_indices = Some(point_indices);
437
438 let n_features = basis_points[0].len();
440 let mut result = Array2::zeros((basis_points.len(), n_features));
441 for (i, point) in basis_points.iter().enumerate() {
442 result.row_mut(i).assign(point);
443 }
444
445 Ok(result)
446 }
447
448 fn compute_kernel_matrix(&self, basis_points: &Array2<f64>) -> Result<Array2<f64>> {
450 let n_basis = basis_points.nrows();
451 let mut kernel_matrix = Array2::zeros((n_basis, n_basis));
452
453 for i in 0..n_basis {
454 for j in i..n_basis {
455 let dist_sq = basis_points
456 .row(i)
457 .iter()
458 .zip(basis_points.row(j).iter())
459 .map(|(&a, &b)| (a - b).powi(2))
460 .sum::<f64>();
461
462 let kernel_value = (-self.gamma * dist_sq).exp();
463 kernel_matrix[[i, j]] = kernel_value;
464 kernel_matrix[[j, i]] = kernel_value;
465 }
466 }
467
468 Ok(kernel_matrix)
469 }
470
471 fn compute_eigendecomposition(
473 &self,
474 kernel_matrix: &Array2<f64>,
475 ) -> Result<(Array1<f64>, Array2<f64>)> {
476 let n = kernel_matrix.nrows();
478 let eigenvalues = Array1::ones(n);
479 let eigenvectors = kernel_matrix.clone();
480
481 Ok((eigenvalues, eigenvectors))
482 }
483
484 fn compute_normalization(
486 &self,
487 eigenvalues: &Array1<f64>,
488 eigenvectors: &Array2<f64>,
489 ) -> Result<Array2<f64>> {
490 let mut normalization = Array2::zeros(eigenvectors.dim());
491
492 for i in 0..eigenvectors.nrows() {
493 for j in 0..eigenvectors.ncols() {
494 if eigenvalues[j] > 1e-10 {
495 normalization[[i, j]] = eigenvectors[[i, j]] / eigenvalues[j].sqrt();
496 }
497 }
498 }
499
500 Ok(normalization)
501 }
502
503 pub fn transform_out_of_core(&self, loader: &mut OutOfCoreLoader) -> Result<Vec<Array2<f64>>> {
505 let mut results = Vec::new();
506 loader.reset();
507
508 while let Some(chunk) = loader.load_chunk()? {
509 let transformed = self.transform_chunk(&chunk)?;
510 results.push(transformed);
511 }
512
513 Ok(results)
514 }
515
516 fn transform_chunk(&self, chunk: &Array2<f64>) -> Result<Array2<f64>> {
518 let normalization = self.normalization.as_ref().unwrap();
519
520 let mut kernel_values = Array2::zeros((chunk.nrows(), self.n_components));
522
523 for i in 0..chunk.nrows() {
525 for j in 0..self.n_components {
526 kernel_values[[i, j]] = thread_rng().gen_range(0.0..1.0);
527 }
528 }
529
530 let result = kernel_values.dot(normalization);
532
533 Ok(result)
534 }
535}
536
537pub struct OutOfCoreKernelPipeline {
539 config: OutOfCoreConfig,
540 methods: Vec<String>,
541 results: HashMap<String, Vec<Array2<f64>>>,
542}
543
544impl Default for OutOfCoreKernelPipeline {
545 fn default() -> Self {
546 Self::new()
547 }
548}
549
550impl OutOfCoreKernelPipeline {
551 pub fn new() -> Self {
553 Self {
554 config: OutOfCoreConfig::default(),
555 methods: Vec::new(),
556 results: HashMap::new(),
557 }
558 }
559
560 pub fn with_config(mut self, config: OutOfCoreConfig) -> Self {
562 self.config = config;
563 self
564 }
565
566 pub fn add_rbf_sampler(mut self, n_components: usize, gamma: f64) -> Self {
568 self.methods.push(format!("rbf_{}_{}", n_components, gamma));
569 self
570 }
571
572 pub fn add_nystroem(mut self, n_components: usize, gamma: f64) -> Self {
574 self.methods
575 .push(format!("nystroem_{}_{}", n_components, gamma));
576 self
577 }
578
579 pub fn process(&mut self, loader: &mut OutOfCoreLoader) -> Result<()> {
581 for method in &self.methods.clone() {
582 let parts: Vec<&str> = method.split('_').collect();
583 match parts[0] {
584 "rbf" => {
585 let n_components: usize = parts[1].parse().unwrap();
586 let gamma: f64 = parts[2].parse().unwrap();
587
588 let mut rbf = OutOfCoreRBFSampler::new(n_components)
589 .gamma(gamma)
590 .with_config(self.config.clone());
591
592 let result = rbf.fit_transform_out_of_core(loader)?;
593 self.results.insert(method.clone(), result);
594 }
595 "nystroem" => {
596 let n_components: usize = parts[1].parse().unwrap();
597 let gamma: f64 = parts[2].parse().unwrap();
598
599 let mut nystroem = OutOfCoreNystroem::new(n_components)
600 .gamma(gamma)
601 .with_config(self.config.clone());
602
603 nystroem.fit_out_of_core(loader)?;
604 let result = nystroem.transform_out_of_core(loader)?;
605 self.results.insert(method.clone(), result);
606 }
607 _ => continue,
608 }
609 }
610
611 Ok(())
612 }
613
614 pub fn get_results(&self, method: &str) -> Option<&Vec<Array2<f64>>> {
616 self.results.get(method)
617 }
618
619 pub fn get_all_results(&self) -> &HashMap<String, Vec<Array2<f64>>> {
621 &self.results
622 }
623}
624
625#[allow(non_snake_case)]
626#[cfg(test)]
627mod tests {
628 use super::*;
629 use scirs2_core::ndarray::Array2;
630
631 #[test]
632 fn test_out_of_core_config() {
633 let config = OutOfCoreConfig::default();
634 assert_eq!(config.max_memory_bytes, 1024 * 1024 * 1024);
635 assert_eq!(config.chunk_size, 10000);
636 assert!(config.n_workers > 0);
637 }
638
639 #[test]
640 fn test_out_of_core_loader() {
641 let mut loader = OutOfCoreLoader::new("test_data.csv", 1000, 5);
642 assert_eq!(loader.n_chunks(), 1);
643
644 let chunk = loader.load_chunk().unwrap();
645 assert!(chunk.is_some());
646
647 let chunk = chunk.unwrap();
648 assert_eq!(chunk.nrows(), 1000);
649 assert_eq!(chunk.ncols(), 5);
650 }
651
652 #[test]
653 fn test_out_of_core_rbf_sampler() {
654 let mut rbf = OutOfCoreRBFSampler::new(100)
655 .gamma(1.0)
656 .with_strategy(OutOfCoreStrategy::Sequential);
657
658 let mut loader = OutOfCoreLoader::new("test_data.csv", 500, 3);
659 let results = rbf.fit_transform_out_of_core(&mut loader).unwrap();
660
661 assert_eq!(results.len(), 1);
662 assert_eq!(results[0].ncols(), 100);
663 }
664
665 #[test]
666 fn test_out_of_core_nystroem() {
667 let mut nystroem = OutOfCoreNystroem::new(50)
668 .gamma(1.0)
669 .with_strategy(OutOfCoreStrategy::Sequential);
670
671 let mut loader = OutOfCoreLoader::new("test_data.csv", 300, 4);
672 nystroem.fit_out_of_core(&mut loader).unwrap();
673
674 let results = nystroem.transform_out_of_core(&mut loader).unwrap();
675 assert_eq!(results.len(), 1);
676 assert_eq!(results[0].ncols(), 50);
677 }
678
679 #[test]
680 fn test_out_of_core_pipeline() {
681 let mut pipeline = OutOfCoreKernelPipeline::new()
682 .add_rbf_sampler(100, 1.0)
683 .add_nystroem(50, 0.5);
684
685 let mut loader = OutOfCoreLoader::new("test_data.csv", 1000, 5);
686 pipeline.process(&mut loader).unwrap();
687
688 assert_eq!(pipeline.get_all_results().len(), 2);
689 }
690
691 #[test]
692 fn test_parallel_strategy() {
693 let mut rbf = OutOfCoreRBFSampler::new(100)
694 .gamma(1.0)
695 .with_strategy(OutOfCoreStrategy::Parallel);
696
697 let mut loader = OutOfCoreLoader::new("test_data.csv", 500, 3);
698 let results = rbf.fit_transform_out_of_core(&mut loader).unwrap();
699
700 assert_eq!(results.len(), 1);
701 assert_eq!(results[0].ncols(), 100);
702 }
703
704 #[test]
705 fn test_streaming_strategy() {
706 let config = OutOfCoreConfig {
707 max_memory_bytes: 1024, ..Default::default()
709 };
710
711 let mut rbf = OutOfCoreRBFSampler::new(100)
712 .gamma(1.0)
713 .with_config(config)
714 .with_strategy(OutOfCoreStrategy::Streaming);
715
716 let mut loader = OutOfCoreLoader::new("test_data.csv", 500, 3);
717 let results = rbf.fit_transform_out_of_core(&mut loader).unwrap();
718
719 assert_eq!(results.len(), 1);
720 assert_eq!(results[0].ncols(), 100);
721 }
722
723 #[test]
724 fn test_adaptive_strategy() {
725 let mut rbf = OutOfCoreRBFSampler::new(100)
726 .gamma(1.0)
727 .with_strategy(OutOfCoreStrategy::Adaptive);
728
729 let mut loader = OutOfCoreLoader::new("test_data.csv", 500, 3);
730 let results = rbf.fit_transform_out_of_core(&mut loader).unwrap();
731
732 assert_eq!(results.len(), 1);
733 assert_eq!(results[0].ncols(), 100);
734 }
735
736 #[test]
737 fn test_memory_estimation() {
738 let rbf = OutOfCoreRBFSampler::new(100).gamma(1.0);
739 let memory_req = rbf.estimate_memory_requirement(1000, 10);
740 assert!(memory_req > 0);
741 }
742
743 #[test]
744 fn test_data_density_estimation() {
745 let rbf = OutOfCoreRBFSampler::new(100).gamma(1.0);
746 let data = Array2::zeros((10, 5));
747 let density = rbf.estimate_data_density(&data);
748 assert_eq!(density, 0.0);
749
750 let data = Array2::ones((10, 5));
751 let density = rbf.estimate_data_density(&data);
752 assert_eq!(density, 1.0);
753 }
754}