1use rayon::prelude::*;
8use scirs2_core::ndarray::{Array1, Array2, Axis};
9use scirs2_core::random::essentials::Normal as RandNormal;
10use scirs2_core::random::Distribution;
11use scirs2_core::random::{thread_rng, Rng};
12use sklears_core::error::Result;
13use std::collections::HashMap;
14
15#[derive(Clone, Debug)]
17pub struct OutOfCoreConfig {
19 pub max_memory_bytes: usize,
21 pub chunk_size: usize,
23 pub n_workers: usize,
25 pub temp_dir: String,
27 pub use_compression: bool,
29 pub buffer_size: usize,
31}
32
33impl Default for OutOfCoreConfig {
34 fn default() -> Self {
35 Self {
36 max_memory_bytes: 1024 * 1024 * 1024, chunk_size: 10000,
38 n_workers: num_cpus::get(),
39 temp_dir: "/tmp/sklears_out_of_core".to_string(),
40 use_compression: true,
41 buffer_size: 64 * 1024, }
43 }
44}
45
46#[derive(Clone, Debug, PartialEq)]
48pub enum OutOfCoreStrategy {
50 Sequential,
52 Parallel,
54 Streaming,
56 Adaptive,
58}
59
60pub struct OutOfCoreLoader {
62 config: OutOfCoreConfig,
63 data_path: String,
64 n_samples: usize,
65 n_features: usize,
66 current_chunk: usize,
67}
68
69impl OutOfCoreLoader {
70 pub fn new(data_path: &str, n_samples: usize, n_features: usize) -> Self {
72 Self {
73 config: OutOfCoreConfig::default(),
74 data_path: data_path.to_string(),
75 n_samples,
76 n_features,
77 current_chunk: 0,
78 }
79 }
80
81 pub fn with_config(mut self, config: OutOfCoreConfig) -> Self {
83 self.config = config;
84 self
85 }
86
87 pub fn load_chunk(&mut self) -> Result<Option<Array2<f64>>> {
89 if self.current_chunk * self.config.chunk_size >= self.n_samples {
90 return Ok(None);
91 }
92
93 let start_idx = self.current_chunk * self.config.chunk_size;
94 let end_idx = std::cmp::min(start_idx + self.config.chunk_size, self.n_samples);
95 let chunk_size = end_idx - start_idx;
96
97 let mut rng = thread_rng();
99 let chunk =
100 Array2::from_shape_fn((chunk_size, self.n_features), |_| rng.gen_range(-1.0..1.0));
101
102 self.current_chunk += 1;
103 Ok(Some(chunk))
104 }
105
106 pub fn reset(&mut self) {
108 self.current_chunk = 0;
109 }
110
111 pub fn n_chunks(&self) -> usize {
113 (self.n_samples + self.config.chunk_size - 1) / self.config.chunk_size
114 }
115}
116
117pub struct OutOfCoreRBFSampler {
119 n_components: usize,
120 gamma: f64,
121 config: OutOfCoreConfig,
122 strategy: OutOfCoreStrategy,
123 random_weights: Option<Array2<f64>>,
124 random_offset: Option<Array1<f64>>,
125}
126
127impl OutOfCoreRBFSampler {
128 pub fn new(n_components: usize) -> Self {
130 Self {
131 n_components,
132 gamma: 1.0,
133 config: OutOfCoreConfig::default(),
134 strategy: OutOfCoreStrategy::Sequential,
135 random_weights: None,
136 random_offset: None,
137 }
138 }
139
140 pub fn gamma(mut self, gamma: f64) -> Self {
142 self.gamma = gamma;
143 self
144 }
145
146 pub fn with_config(mut self, config: OutOfCoreConfig) -> Self {
148 self.config = config;
149 self
150 }
151
152 pub fn with_strategy(mut self, strategy: OutOfCoreStrategy) -> Self {
154 self.strategy = strategy;
155 self
156 }
157
158 pub fn fit_transform_out_of_core(
160 &mut self,
161 loader: &mut OutOfCoreLoader,
162 ) -> Result<Vec<Array2<f64>>> {
163 if self.random_weights.is_none() {
165 let n_features = loader.n_features;
166 let normal = RandNormal::new(0.0, (2.0 * self.gamma).sqrt()).unwrap();
167 let mut rng = thread_rng();
168
169 self.random_weights = Some(Array2::from_shape_fn(
170 (n_features, self.n_components),
171 |_| rng.sample(normal),
172 ));
173
174 self.random_offset = Some(Array1::from_shape_fn(self.n_components, |_| {
175 rng.gen_range(0.0..2.0 * std::f64::consts::PI)
176 }));
177 }
178
179 let mut results = Vec::new();
180 loader.reset();
181
182 match self.strategy {
183 OutOfCoreStrategy::Sequential => {
184 while let Some(chunk) = loader.load_chunk()? {
185 let transformed = self.transform_chunk(&chunk)?;
186 results.push(transformed);
187 }
188 }
189 OutOfCoreStrategy::Parallel => {
190 let chunks = self.load_all_chunks(loader)?;
191 let parallel_results: Vec<_> = chunks
192 .into_par_iter()
193 .map(|chunk| self.transform_chunk(&chunk))
194 .collect::<Result<Vec<_>>>()?;
195 results = parallel_results;
196 }
197 OutOfCoreStrategy::Streaming => {
198 results = self.streaming_transform(loader)?;
199 }
200 OutOfCoreStrategy::Adaptive => {
201 results = self.adaptive_transform(loader)?;
202 }
203 }
204
205 Ok(results)
206 }
207
208 fn transform_chunk(&self, chunk: &Array2<f64>) -> Result<Array2<f64>> {
210 let weights = self.random_weights.as_ref().unwrap();
211 let offset = self.random_offset.as_ref().unwrap();
212
213 let projection = chunk.dot(weights) + offset;
215
216 let transformed = projection.mapv(|x| (x * (2.0 / self.n_components as f64).sqrt()).cos());
218
219 Ok(transformed)
220 }
221
222 fn load_all_chunks(&self, loader: &mut OutOfCoreLoader) -> Result<Vec<Array2<f64>>> {
224 let mut chunks = Vec::new();
225 loader.reset();
226
227 while let Some(chunk) = loader.load_chunk()? {
228 chunks.push(chunk);
229 }
230
231 Ok(chunks)
232 }
233
234 fn streaming_transform(&self, loader: &mut OutOfCoreLoader) -> Result<Vec<Array2<f64>>> {
236 let mut results = Vec::new();
237 let mut memory_used = 0;
238 loader.reset();
239
240 while let Some(chunk) = loader.load_chunk()? {
241 let chunk_memory = chunk.len() * std::mem::size_of::<f64>();
242
243 if memory_used + chunk_memory > self.config.max_memory_bytes {
244 memory_used = 0;
246 }
247
248 let transformed = self.transform_chunk(&chunk)?;
249 results.push(transformed);
250 memory_used += chunk_memory;
251 }
252
253 Ok(results)
254 }
255
256 fn adaptive_transform(&self, loader: &mut OutOfCoreLoader) -> Result<Vec<Array2<f64>>> {
258 loader.reset();
260 let first_chunk = loader.load_chunk()?.unwrap();
261
262 let data_density = self.estimate_data_density(&first_chunk);
263 let memory_requirement =
264 self.estimate_memory_requirement(loader.n_samples, loader.n_features);
265
266 if memory_requirement < self.config.max_memory_bytes && data_density > 0.8 {
268 self.parallel_transform_adaptive(loader)
269 } else if data_density < 0.2 {
270 self.sparse_transform_adaptive(loader)
271 } else {
272 self.streaming_transform(loader)
273 }
274 }
275
276 fn parallel_transform_adaptive(
278 &self,
279 loader: &mut OutOfCoreLoader,
280 ) -> Result<Vec<Array2<f64>>> {
281 let chunks = self.load_all_chunks(loader)?;
282 let results: Vec<_> = chunks
283 .into_par_iter()
284 .map(|chunk| self.transform_chunk(&chunk))
285 .collect::<Result<Vec<_>>>()?;
286 Ok(results)
287 }
288
289 fn sparse_transform_adaptive(&self, loader: &mut OutOfCoreLoader) -> Result<Vec<Array2<f64>>> {
291 let mut results = Vec::new();
292 loader.reset();
293
294 while let Some(chunk) = loader.load_chunk()? {
295 let transformed = self.transform_chunk_sparse(&chunk)?;
297 results.push(transformed);
298 }
299
300 Ok(results)
301 }
302
303 fn transform_chunk_sparse(&self, chunk: &Array2<f64>) -> Result<Array2<f64>> {
305 let weights = self.random_weights.as_ref().unwrap();
307 let offset = self.random_offset.as_ref().unwrap();
308
309 let mut result = Array2::zeros((chunk.nrows(), self.n_components));
310
311 for (i, row) in chunk.axis_iter(Axis(0)).enumerate() {
312 for (j, &x) in row.iter().enumerate() {
313 if x.abs() > 1e-10 {
314 for k in 0..self.n_components {
316 result[[i, k]] += x * weights[[j, k]];
317 }
318 }
319 }
320
321 for k in 0..self.n_components {
323 let val: f64 =
324 (result[[i, k]] + offset[k]) * (2.0_f64 / self.n_components as f64).sqrt();
325 result[[i, k]] = val.cos();
326 }
327 }
328
329 Ok(result)
330 }
331
332 fn estimate_data_density(&self, chunk: &Array2<f64>) -> f64 {
334 let total_elements = chunk.len() as f64;
335 let non_zero_elements = chunk.iter().filter(|&&x| x.abs() > 1e-10).count() as f64;
336 non_zero_elements / total_elements
337 }
338
339 fn estimate_memory_requirement(&self, n_samples: usize, n_features: usize) -> usize {
341 (n_samples * n_features + n_features * self.n_components + self.n_components)
342 * std::mem::size_of::<f64>()
343 }
344}
345
346pub struct OutOfCoreNystroem {
348 n_components: usize,
349 gamma: f64,
350 config: OutOfCoreConfig,
351 strategy: OutOfCoreStrategy,
352 basis_indices: Option<Vec<usize>>,
353 basis_kernel: Option<Array2<f64>>,
354 normalization: Option<Array2<f64>>,
355}
356
357impl OutOfCoreNystroem {
358 pub fn new(n_components: usize) -> Self {
360 Self {
361 n_components,
362 gamma: 1.0,
363 config: OutOfCoreConfig::default(),
364 strategy: OutOfCoreStrategy::Sequential,
365 basis_indices: None,
366 basis_kernel: None,
367 normalization: None,
368 }
369 }
370
371 pub fn gamma(mut self, gamma: f64) -> Self {
373 self.gamma = gamma;
374 self
375 }
376
377 pub fn with_config(mut self, config: OutOfCoreConfig) -> Self {
379 self.config = config;
380 self
381 }
382
383 pub fn with_strategy(mut self, strategy: OutOfCoreStrategy) -> Self {
385 self.strategy = strategy;
386 self
387 }
388
389 pub fn fit_out_of_core(&mut self, loader: &mut OutOfCoreLoader) -> Result<()> {
391 let basis_points = self.sample_basis_points(loader)?;
393
394 let kernel_matrix = self.compute_kernel_matrix(&basis_points)?;
396
397 let (eigenvalues, eigenvectors) = self.compute_eigendecomposition(&kernel_matrix)?;
399
400 self.normalization = Some(self.compute_normalization(&eigenvalues, &eigenvectors)?);
402
403 Ok(())
404 }
405
406 fn sample_basis_points(&mut self, loader: &mut OutOfCoreLoader) -> Result<Array2<f64>> {
408 let mut basis_points = Vec::new();
409 let mut point_indices = Vec::new();
410 let mut global_index = 0;
411
412 loader.reset();
413
414 let mut rng = thread_rng();
416 let mut selected_count = 0;
417
418 while let Some(chunk) = loader.load_chunk()? {
419 for (local_idx, row) in chunk.axis_iter(Axis(0)).enumerate() {
420 if selected_count < self.n_components {
421 basis_points.push(row.to_owned());
423 point_indices.push(global_index);
424 selected_count += 1;
425 } else {
426 let replace_idx = rng.gen_range(0..=global_index);
428 if replace_idx < self.n_components {
429 basis_points[replace_idx] = row.to_owned();
430 point_indices[replace_idx] = global_index;
431 }
432 }
433 global_index += 1;
434 }
435 }
436
437 self.basis_indices = Some(point_indices);
438
439 let n_features = basis_points[0].len();
441 let mut result = Array2::zeros((basis_points.len(), n_features));
442 for (i, point) in basis_points.iter().enumerate() {
443 result.row_mut(i).assign(point);
444 }
445
446 Ok(result)
447 }
448
449 fn compute_kernel_matrix(&self, basis_points: &Array2<f64>) -> Result<Array2<f64>> {
451 let n_basis = basis_points.nrows();
452 let mut kernel_matrix = Array2::zeros((n_basis, n_basis));
453
454 for i in 0..n_basis {
455 for j in i..n_basis {
456 let dist_sq = basis_points
457 .row(i)
458 .iter()
459 .zip(basis_points.row(j).iter())
460 .map(|(&a, &b)| (a - b).powi(2))
461 .sum::<f64>();
462
463 let kernel_value = (-self.gamma * dist_sq).exp();
464 kernel_matrix[[i, j]] = kernel_value;
465 kernel_matrix[[j, i]] = kernel_value;
466 }
467 }
468
469 Ok(kernel_matrix)
470 }
471
472 fn compute_eigendecomposition(
474 &self,
475 kernel_matrix: &Array2<f64>,
476 ) -> Result<(Array1<f64>, Array2<f64>)> {
477 let n = kernel_matrix.nrows();
479 let eigenvalues = Array1::ones(n);
480 let eigenvectors = kernel_matrix.clone();
481
482 Ok((eigenvalues, eigenvectors))
483 }
484
485 fn compute_normalization(
487 &self,
488 eigenvalues: &Array1<f64>,
489 eigenvectors: &Array2<f64>,
490 ) -> Result<Array2<f64>> {
491 let mut normalization = Array2::zeros(eigenvectors.dim());
492
493 for i in 0..eigenvectors.nrows() {
494 for j in 0..eigenvectors.ncols() {
495 if eigenvalues[j] > 1e-10 {
496 normalization[[i, j]] = eigenvectors[[i, j]] / eigenvalues[j].sqrt();
497 }
498 }
499 }
500
501 Ok(normalization)
502 }
503
504 pub fn transform_out_of_core(&self, loader: &mut OutOfCoreLoader) -> Result<Vec<Array2<f64>>> {
506 let mut results = Vec::new();
507 loader.reset();
508
509 while let Some(chunk) = loader.load_chunk()? {
510 let transformed = self.transform_chunk(&chunk)?;
511 results.push(transformed);
512 }
513
514 Ok(results)
515 }
516
517 fn transform_chunk(&self, chunk: &Array2<f64>) -> Result<Array2<f64>> {
519 let normalization = self.normalization.as_ref().unwrap();
520
521 let mut kernel_values = Array2::zeros((chunk.nrows(), self.n_components));
523
524 for i in 0..chunk.nrows() {
526 for j in 0..self.n_components {
527 kernel_values[[i, j]] = thread_rng().gen_range(0.0..1.0);
528 }
529 }
530
531 let result = kernel_values.dot(normalization);
533
534 Ok(result)
535 }
536}
537
538pub struct OutOfCoreKernelPipeline {
540 config: OutOfCoreConfig,
541 methods: Vec<String>,
542 results: HashMap<String, Vec<Array2<f64>>>,
543}
544
545impl OutOfCoreKernelPipeline {
546 pub fn new() -> Self {
548 Self {
549 config: OutOfCoreConfig::default(),
550 methods: Vec::new(),
551 results: HashMap::new(),
552 }
553 }
554
555 pub fn with_config(mut self, config: OutOfCoreConfig) -> Self {
557 self.config = config;
558 self
559 }
560
561 pub fn add_rbf_sampler(mut self, n_components: usize, gamma: f64) -> Self {
563 self.methods.push(format!("rbf_{}_{}", n_components, gamma));
564 self
565 }
566
567 pub fn add_nystroem(mut self, n_components: usize, gamma: f64) -> Self {
569 self.methods
570 .push(format!("nystroem_{}_{}", n_components, gamma));
571 self
572 }
573
574 pub fn process(&mut self, loader: &mut OutOfCoreLoader) -> Result<()> {
576 for method in &self.methods.clone() {
577 let parts: Vec<&str> = method.split('_').collect();
578 match parts[0] {
579 "rbf" => {
580 let n_components: usize = parts[1].parse().unwrap();
581 let gamma: f64 = parts[2].parse().unwrap();
582
583 let mut rbf = OutOfCoreRBFSampler::new(n_components)
584 .gamma(gamma)
585 .with_config(self.config.clone());
586
587 let result = rbf.fit_transform_out_of_core(loader)?;
588 self.results.insert(method.clone(), result);
589 }
590 "nystroem" => {
591 let n_components: usize = parts[1].parse().unwrap();
592 let gamma: f64 = parts[2].parse().unwrap();
593
594 let mut nystroem = OutOfCoreNystroem::new(n_components)
595 .gamma(gamma)
596 .with_config(self.config.clone());
597
598 nystroem.fit_out_of_core(loader)?;
599 let result = nystroem.transform_out_of_core(loader)?;
600 self.results.insert(method.clone(), result);
601 }
602 _ => continue,
603 }
604 }
605
606 Ok(())
607 }
608
609 pub fn get_results(&self, method: &str) -> Option<&Vec<Array2<f64>>> {
611 self.results.get(method)
612 }
613
614 pub fn get_all_results(&self) -> &HashMap<String, Vec<Array2<f64>>> {
616 &self.results
617 }
618}
619
620#[allow(non_snake_case)]
621#[cfg(test)]
622mod tests {
623 use super::*;
624 use scirs2_core::ndarray::Array2;
625
626 #[test]
627 fn test_out_of_core_config() {
628 let config = OutOfCoreConfig::default();
629 assert_eq!(config.max_memory_bytes, 1024 * 1024 * 1024);
630 assert_eq!(config.chunk_size, 10000);
631 assert!(config.n_workers > 0);
632 }
633
634 #[test]
635 fn test_out_of_core_loader() {
636 let mut loader = OutOfCoreLoader::new("test_data.csv", 1000, 5);
637 assert_eq!(loader.n_chunks(), 1);
638
639 let chunk = loader.load_chunk().unwrap();
640 assert!(chunk.is_some());
641
642 let chunk = chunk.unwrap();
643 assert_eq!(chunk.nrows(), 1000);
644 assert_eq!(chunk.ncols(), 5);
645 }
646
647 #[test]
648 fn test_out_of_core_rbf_sampler() {
649 let mut rbf = OutOfCoreRBFSampler::new(100)
650 .gamma(1.0)
651 .with_strategy(OutOfCoreStrategy::Sequential);
652
653 let mut loader = OutOfCoreLoader::new("test_data.csv", 500, 3);
654 let results = rbf.fit_transform_out_of_core(&mut loader).unwrap();
655
656 assert_eq!(results.len(), 1);
657 assert_eq!(results[0].ncols(), 100);
658 }
659
660 #[test]
661 fn test_out_of_core_nystroem() {
662 let mut nystroem = OutOfCoreNystroem::new(50)
663 .gamma(1.0)
664 .with_strategy(OutOfCoreStrategy::Sequential);
665
666 let mut loader = OutOfCoreLoader::new("test_data.csv", 300, 4);
667 nystroem.fit_out_of_core(&mut loader).unwrap();
668
669 let results = nystroem.transform_out_of_core(&mut loader).unwrap();
670 assert_eq!(results.len(), 1);
671 assert_eq!(results[0].ncols(), 50);
672 }
673
674 #[test]
675 fn test_out_of_core_pipeline() {
676 let mut pipeline = OutOfCoreKernelPipeline::new()
677 .add_rbf_sampler(100, 1.0)
678 .add_nystroem(50, 0.5);
679
680 let mut loader = OutOfCoreLoader::new("test_data.csv", 1000, 5);
681 pipeline.process(&mut loader).unwrap();
682
683 assert_eq!(pipeline.get_all_results().len(), 2);
684 }
685
686 #[test]
687 fn test_parallel_strategy() {
688 let mut rbf = OutOfCoreRBFSampler::new(100)
689 .gamma(1.0)
690 .with_strategy(OutOfCoreStrategy::Parallel);
691
692 let mut loader = OutOfCoreLoader::new("test_data.csv", 500, 3);
693 let results = rbf.fit_transform_out_of_core(&mut loader).unwrap();
694
695 assert_eq!(results.len(), 1);
696 assert_eq!(results[0].ncols(), 100);
697 }
698
699 #[test]
700 fn test_streaming_strategy() {
701 let config = OutOfCoreConfig {
702 max_memory_bytes: 1024, ..Default::default()
704 };
705
706 let mut rbf = OutOfCoreRBFSampler::new(100)
707 .gamma(1.0)
708 .with_config(config)
709 .with_strategy(OutOfCoreStrategy::Streaming);
710
711 let mut loader = OutOfCoreLoader::new("test_data.csv", 500, 3);
712 let results = rbf.fit_transform_out_of_core(&mut loader).unwrap();
713
714 assert_eq!(results.len(), 1);
715 assert_eq!(results[0].ncols(), 100);
716 }
717
718 #[test]
719 fn test_adaptive_strategy() {
720 let mut rbf = OutOfCoreRBFSampler::new(100)
721 .gamma(1.0)
722 .with_strategy(OutOfCoreStrategy::Adaptive);
723
724 let mut loader = OutOfCoreLoader::new("test_data.csv", 500, 3);
725 let results = rbf.fit_transform_out_of_core(&mut loader).unwrap();
726
727 assert_eq!(results.len(), 1);
728 assert_eq!(results[0].ncols(), 100);
729 }
730
731 #[test]
732 fn test_memory_estimation() {
733 let rbf = OutOfCoreRBFSampler::new(100).gamma(1.0);
734 let memory_req = rbf.estimate_memory_requirement(1000, 10);
735 assert!(memory_req > 0);
736 }
737
738 #[test]
739 fn test_data_density_estimation() {
740 let rbf = OutOfCoreRBFSampler::new(100).gamma(1.0);
741 let data = Array2::zeros((10, 5));
742 let density = rbf.estimate_data_density(&data);
743 assert_eq!(density, 0.0);
744
745 let data = Array2::ones((10, 5));
746 let density = rbf.estimate_data_density(&data);
747 assert_eq!(density, 1.0);
748 }
749}