1#[cfg(test)]
63extern crate quickcheck;
64extern crate yaml_rust;
65
66use rand::{Rng, SeedableRng};
67use rand_pcg::Pcg64;
68use std::cmp::PartialEq;
69use std::fs::File;
70use std::fs::OpenOptions;
71use std::io::Write;
72use std::time::{Duration, Instant};
73use streaming_iterator::*;
74use yaml_rust::{Yaml, YamlEmitter};
75
76pub mod algorithms;
77pub mod conjugate_gradient;
78pub mod derivative_descent;
79pub mod utils;
80
81#[inline]
84pub fn take_until<I, F>(it: I, f: F) -> TakeUntil<I, F>
85where
86 I: StreamingIterator,
87 F: FnMut(&I::Item) -> bool,
88{
89 TakeUntil {
90 it,
91 f,
92 state: UntilState::Unfulfilled,
93 }
94}
95
96#[derive(Clone)]
99pub struct TakeUntil<I, F>
100where
101 I: StreamingIterator,
102 F: FnMut(&I::Item) -> bool,
103{
104 pub it: I,
105 pub f: F,
106 pub state: UntilState,
107}
108
109#[derive(Clone, PartialEq)]
110pub enum UntilState {
111 Unfulfilled,
112 Fulfilled,
113 Done,
114}
115
116impl<I, F> StreamingIterator for TakeUntil<I, F>
117where
118 I: StreamingIterator,
119 F: FnMut(&I::Item) -> bool,
120{
121 type Item = I::Item;
122 fn advance(&mut self) {
123 match self.state {
124 UntilState::Unfulfilled => {
125 self.it.advance();
126 if let Some(v) = self.it.get() {
127 if (self.f)(v) {
128 self.state = UntilState::Fulfilled
129 }
130 }
131 }
132 UntilState::Fulfilled => self.state = UntilState::Done,
133 UntilState::Done => {}
134 }
135 }
136
137 fn get(&self) -> Option<&Self::Item> {
138 if UntilState::Done == self.state {
139 None
140 } else {
141 self.it.get()
142 }
143 }
144}
145
146#[derive(Clone, Debug)]
148pub struct AnnotatedResult<T, A> {
149 pub result: T,
150 pub annotation: A,
151}
152
153#[derive(Clone, Debug)]
155pub struct Annotate<I, T, F, A>
156where
157 I: Sized + StreamingIterator<Item = T>,
158 T: Clone,
159 F: FnMut(&T) -> A,
160{
161 pub it: I,
162 pub f: F,
163 pub current: Option<AnnotatedResult<T, A>>,
164}
165
166impl<I, T, F, A> Annotate<I, T, F, A>
167where
168 I: StreamingIterator<Item = T>,
169 T: Sized + Clone,
170 F: FnMut(&T) -> A,
171{
172 pub fn new(it: I, f: F) -> Annotate<I, T, F, A> {
174 Annotate {
175 it,
176 f,
177 current: None,
178 }
179 }
180}
181
182impl<I, T, F, A> StreamingIterator for Annotate<I, T, F, A>
183where
184 I: StreamingIterator<Item = T>,
185 T: Sized + Clone,
186 F: FnMut(&T) -> A,
187{
188 type Item = AnnotatedResult<T, A>;
189
190 fn advance(&mut self) {
191 self.it.advance();
192 self.current = match self.it.get() {
193 Some(n) => {
194 let annotation = (self.f)(n);
195 Some(AnnotatedResult {
196 annotation,
197 result: n.clone(),
198 })
199 }
200 None => None,
201 }
202 }
203
204 fn get(&self) -> Option<&Self::Item> {
205 match &self.current {
206 Some(tr) => Some(&tr),
207 None => None,
208 }
209 }
210}
211
212pub fn assess<I, T, F, A>(it: I, f: F) -> Annotate<I, T, F, A>
214where
215 T: Clone,
216 F: FnMut(&T) -> A,
217 I: StreamingIterator<Item = T>,
218{
219 Annotate::new(it, f)
220}
221
222pub fn inspect<I, F, T>(it: I, f: F) -> Annotate<I, T, F, ()>
224where
225 I: Sized + StreamingIterator<Item = T>,
226 F: FnMut(&T),
227 T: Clone,
228{
229 Annotate::new(it, f)
230}
231
232pub fn last<I, T>(it: I) -> Option<T>
234where
235 I: StreamingIterator<Item = T>,
236 T: Sized + Clone,
237{
238 it.fold(None, |_acc, i| Some((*i).clone()))
239}
240
241#[derive(Clone, Debug)]
244pub struct Time<I, T>
245where
246 I: StreamingIterator<Item = T>,
247 T: Clone,
248{
249 it: I,
250 current: Option<TimedResult<T>>,
251 timer: Instant,
252}
253
254#[derive(Clone, Debug)]
261pub struct TimedResult<T> {
262 pub result: T,
263 pub start_time: Duration,
264 pub duration: Duration,
265}
266
267pub fn time<I, T>(it: I) -> Time<I, T>
271where
272 I: Sized + StreamingIterator<Item = T>,
273 T: Sized + Clone,
274{
275 Time {
276 it,
277 current: None,
278 timer: Instant::now(),
279 }
280}
281
282impl<I, T> StreamingIterator for Time<I, T>
283where
284 I: Sized + StreamingIterator<Item = T>,
285 T: Sized + Clone,
286{
287 type Item = TimedResult<T>;
288
289 fn advance(&mut self) {
290 let start_time = self.timer.elapsed();
291 let before = Instant::now();
292 self.it.advance();
293 self.current = match self.it.get() {
294 Some(n) => Some(TimedResult {
295 start_time,
296 duration: before.elapsed(),
297 result: n.clone(),
298 }),
299 None => None,
300 }
301 }
302
303 fn get(&self) -> Option<&Self::Item> {
304 match &self.current {
305 Some(tr) => Some(&tr),
306 None => None,
307 }
308 }
309}
310
311#[derive(Clone, Debug)]
320pub struct StepBy<I> {
321 it: I,
322 step: usize,
323 first_take: bool,
324}
325
326pub fn step_by<I, T>(it: I, step: usize) -> StepBy<I>
332where
333 I: Sized + StreamingIterator<Item = T>,
334{
335 assert!(step != 0);
336 StepBy {
337 it,
338 step: step - 1,
339 first_take: true,
340 }
341}
342
343impl<I> StreamingIterator for StepBy<I>
344where
345 I: StreamingIterator,
346{
347 type Item = I::Item;
348
349 #[inline]
350 fn advance(&mut self) {
351 if self.first_take {
352 self.first_take = false;
353 self.it.advance();
354 } else {
355 self.it.nth(self.step);
356 }
357 }
358
359 #[inline]
360 fn get(&self) -> Option<&I::Item> {
361 self.it.get()
362 }
363}
364
365#[derive(Debug)]
367struct WriteToFile<I, F> {
368 pub it: I,
369 pub write_function: F,
370 pub file_writer: File,
371}
372
373#[allow(dead_code)]
375fn write_to_file<I, T, F>(
376 it: I,
377 write_function: F,
378 file_path: String,
379) -> Result<WriteToFile<I, F>, std::io::Error>
380where
381 I: Sized + StreamingIterator<Item = T>,
382 T: std::fmt::Debug,
383 F: FnMut(&T, &mut std::fs::File) -> std::io::Result<()>,
384{
385 let result = match std::fs::metadata(&file_path) {
386 Ok(_) => {
387 panic!("File to which you want to write already exists or permission does not exist. Please rename or remove the file or gain permission.")
388 }
389 Err(_) => {
390 let file_writer = OpenOptions::new()
391 .append(true)
392 .create(true)
393 .open(file_path)?;
394 Ok(WriteToFile {
395 it,
396 write_function,
397 file_writer,
398 })
399 }
400 };
401 result
402}
403
404impl<I, T, F> StreamingIterator for WriteToFile<I, F>
405where
406 I: Sized + StreamingIterator<Item = T>,
407 T: std::fmt::Debug,
408 F: FnMut(&T, &mut std::fs::File) -> std::io::Result<()>,
409{
410 type Item = I::Item;
411
412 #[inline]
413 fn advance(&mut self) {
414 if let Some(item) = self.it.next() {
415 (self.write_function)(&item, &mut self.file_writer)
416 .expect("Write item to file in WriteToFile advance failed.");
417 } else {
418 self.file_writer.flush().expect("Flush of file failed.");
419 }
420 }
421
422 #[inline]
423 fn get(&self) -> Option<&I::Item> {
424 self.it.get()
425 }
426}
427
428pub trait YamlDataType {
430 fn create_yaml_object(&self) -> Yaml;
431}
432
433impl<T> YamlDataType for &T
435where
436 T: YamlDataType,
437{
438 fn create_yaml_object(&self) -> Yaml {
439 (*self).create_yaml_object()
440 }
441}
442
443impl YamlDataType for i64 {
445 fn create_yaml_object(&self) -> Yaml {
446 Yaml::Integer(*self)
447 }
448}
449
450impl YamlDataType for f64 {
451 fn create_yaml_object(&self) -> Yaml {
452 Yaml::Real((*self).to_string())
453 }
454}
455
456impl YamlDataType for String {
457 fn create_yaml_object(&self) -> Yaml {
458 Yaml::String((*self).to_string())
459 }
460}
461
462impl YamlDataType for Yaml {
465 fn create_yaml_object(&self) -> Yaml {
466 self.clone()
467 }
468}
469
470impl<T> YamlDataType for Vec<T>
472where
473 T: YamlDataType,
474{
475 fn create_yaml_object(&self) -> Yaml {
476 let v: Vec<Yaml> = self.iter().map(|x| x.create_yaml_object()).collect();
477 Yaml::Array(v)
478 }
479}
480
481impl<T, A> YamlDataType for AnnotatedResult<T, A>
482where
483 T: YamlDataType,
484 A: YamlDataType,
485{
486 fn create_yaml_object(&self) -> Yaml {
487 let t = &self.result;
488 let a = &self.annotation;
489 Yaml::Array(vec![t.create_yaml_object(), a.create_yaml_object()])
490 }
491}
492
493impl<T> YamlDataType for WeightedDatum<T>
494where
495 T: YamlDataType,
496{
497 fn create_yaml_object(&self) -> Yaml {
498 let value = &self.value;
499 let weight = &self.weight;
500 Yaml::Array(vec![
501 value.create_yaml_object(),
502 weight.create_yaml_object(),
503 ])
504 }
505}
506
507#[derive(Debug)]
509pub struct WriteYamlDocuments<I> {
510 pub it: I,
511 pub file_writer: File,
512}
513
514pub fn write_yaml_documents<I, T>(
516 it: I,
517 file_path: String,
518) -> Result<WriteYamlDocuments<I>, std::io::Error>
519where
520 I: Sized + StreamingIterator<Item = T>,
521 T: std::fmt::Debug,
522{
523 let result = match std::fs::metadata(&file_path) {
524 Ok(_) => {
525 panic!("Failed to create or gain permission of {}, please delete it or gain permission before running this demo. If the demo runs completely, it will delete the file upon completion.", file_path)
526 }
527 Err(_) => {
528 let file_writer = OpenOptions::new()
529 .append(true)
530 .create(true)
531 .open(file_path)?;
532 Ok(WriteYamlDocuments { it, file_writer })
533 }
534 };
535 result
536}
537
538pub fn write_yaml_object<T>(item: &T, file_writer: &mut std::fs::File) -> std::io::Result<()>
541where
542 T: YamlDataType,
543{
544 let yaml_item = item.create_yaml_object();
545 let mut out_str = String::new();
546 let mut emitter = YamlEmitter::new(&mut out_str);
547 emitter
548 .dump(&yaml_item)
549 .expect("Could not convert item to yaml object.");
550 out_str.push('\n');
551 file_writer
552 .write_all(out_str.as_bytes())
553 .expect("Writing value to file failed.");
554 Ok(())
555}
556
557impl<I, T> StreamingIterator for WriteYamlDocuments<I>
558where
559 I: Sized + StreamingIterator<Item = T>,
560 T: std::fmt::Debug + YamlDataType,
561{
562 type Item = I::Item;
563
564 #[inline]
565 fn advance(&mut self) {
566 if let Some(item) = self.it.next() {
567 write_yaml_object(&item, &mut self.file_writer)
568 .expect("Write item to file in WriteYamlDocuments advance failed.");
569 } else {
570 self.file_writer.flush().expect("Flush of file failed.");
571 }
572 }
573
574 #[inline]
575 fn get(&self) -> Option<&I::Item> {
576 self.it.get()
577 }
578}
579
580#[derive(Clone, Debug, std::cmp::PartialEq)]
582pub struct Numbered<T> {
583 pub count: i64,
584 pub item: Option<T>,
585}
586
587impl<T> YamlDataType for Numbered<T>
588where
589 T: YamlDataType,
590{
591 fn create_yaml_object(&self) -> Yaml {
592 let t = (self.item).as_ref().unwrap();
593 Yaml::Array(vec![Yaml::Integer(self.count), t.create_yaml_object()])
594 }
595}
596
597#[derive(Clone, Debug)]
599pub struct Enumerate<I, T> {
600 pub current: Option<Numbered<T>>,
601 pub it: I,
602}
603
604impl<I, T> Enumerate<I, T>
606where
607 I: StreamingIterator<Item = T>,
608{
609 pub fn new(it: I) -> Enumerate<I, T> {
610 Enumerate {
611 current: Some(Numbered {
612 count: -1,
613 item: None,
614 }),
615 it,
616 }
617 }
618}
619
620pub fn enumerate<I, T>(it: I) -> Enumerate<I, T>
622where
623 I: StreamingIterator<Item = T>,
624{
625 Enumerate {
626 current: Some(Numbered {
627 count: -1,
628 item: None,
629 }),
630 it,
631 }
632}
633
634impl<I, T> StreamingIterator for Enumerate<I, T>
635where
636 I: StreamingIterator<Item = T>,
637 T: Clone,
638{
639 type Item = Numbered<T>;
640
641 fn advance(&mut self) {
642 self.it.advance();
643 self.current = match self.it.get() {
644 Some(t) => {
645 if let Some(n) = &self.current {
646 let c = n.count + 1;
647 Some(Numbered {
648 count: c,
649 item: Some(t.clone()),
650 })
651 } else {
652 None
653 }
654 }
655 None => None,
656 }
657 }
658
659 fn get(&self) -> Option<&Self::Item> {
660 match &self.current {
661 Some(t) => Some(&t),
662 None => None,
663 }
664 }
665}
666
667#[derive(Debug, Clone)]
685pub struct ReservoirSample<I, T> {
686 it: I,
687 pub reservoir: Vec<T>,
688 capacity: usize,
689 w: f64,
690 skip: usize,
691 rng: Pcg64,
692}
693
694pub fn reservoir_sample<I, T>(
697 it: I,
698 capacity: usize,
699 custom_rng: Option<Pcg64>,
700) -> ReservoirSample<I, T>
701where
702 I: Sized + StreamingIterator<Item = T>,
703 T: Clone,
704{
705 let mut rng = match custom_rng {
706 Some(rng) => rng,
707 None => Pcg64::from_entropy(),
708 };
709 let res: Vec<T> = Vec::new();
710 let w_initial = (rng.gen::<f64>().ln() / (capacity as f64)).exp();
711 ReservoirSample {
712 it,
713 reservoir: res,
714 capacity,
715 w: w_initial,
716 skip: ((rng.gen::<f64>() as f64).ln() / (1. - w_initial).ln()).floor() as usize,
717 rng,
718 }
719}
720
721impl<I, T> StreamingIterator for ReservoirSample<I, T>
722where
723 T: Clone + std::fmt::Debug,
724 I: StreamingIterator<Item = T>,
725{
726 type Item = Vec<T>;
727
728 #[inline]
729 fn advance(&mut self) {
730 if self.reservoir.len() < self.capacity {
731 while self.reservoir.len() < self.capacity {
732 if let Some(datum) = self.it.next() {
733 let cloned_datum = datum.clone();
734 self.reservoir.push(cloned_datum);
735 } else {
736 break;
737 }
738 }
739 } else if let Some(datum) = self.it.nth(self.skip) {
740 let h = self.rng.gen_range(0..self.capacity) as usize;
741 let datum_struct = datum.clone();
742 self.reservoir[h] = datum_struct;
743 self.w *= (self.rng.gen::<f64>().ln() / (self.capacity as f64)).exp();
744 self.skip = ((self.rng.gen::<f64>() as f64).ln() / (1. - self.w).ln()).floor() as usize;
745 }
746 }
747
748 #[inline]
749 fn get(&self) -> Option<&Self::Item> {
750 if let Some(_wd) = &self.it.get() {
751 Some(&self.reservoir)
752 } else {
753 None
754 }
755 }
756}
757
758#[derive(Debug, Clone, PartialEq)]
764pub struct WeightedDatum<U> {
765 pub value: U,
766 pub weight: f64,
767}
768
769pub fn new_datum<U>(value: U, weight: f64) -> WeightedDatum<U>
771where
772 U: Clone,
773{
774 if !weight.is_finite() {
775 panic!("The weight is not finite and therefore cannot be used to compute the probability of inclusion in the reservoir.");
776 }
777 WeightedDatum { value, weight }
778}
779
780#[derive(Debug, Clone)]
788pub struct Weight<I, T, F>
789where
790 I: StreamingIterator<Item = T>,
791{
792 pub it: I,
793 pub wd: Option<WeightedDatum<T>>,
794 pub f: F,
795}
796
797pub fn wd_iterable<I, T, F>(it: I, f: F) -> Weight<I, T, F>
799where
800 I: StreamingIterator<Item = T>,
801 F: FnMut(&T) -> f64,
802{
803 Weight { it, wd: None, f }
804}
805
806impl<I, T, F> StreamingIterator for Weight<I, T, F>
807where
808 I: StreamingIterator<Item = T>,
809 F: FnMut(&T) -> f64,
810 T: Sized + Clone,
811{
812 type Item = WeightedDatum<T>;
813
814 fn advance(&mut self) {
815 self.it.advance();
816 self.wd = match self.it.get() {
817 Some(item) => {
818 let new_weight = (self.f)(item);
819 let new_item = item.clone();
820 Some(new_datum(new_item, new_weight))
821 }
822 None => None,
823 }
824 }
825
826 fn get(&self) -> Option<&Self::Item> {
827 match &self.wd {
828 Some(wdatum) => Some(&wdatum),
829 None => None,
830 }
831 }
832}
833
834#[derive(Clone, Debug)]
837pub struct ExtractValue<I, T>
838where
839 I: StreamingIterator<Item = WeightedDatum<T>>,
840{
841 it: I,
842}
843
844pub fn extract_value<I, T>(it: I) -> ExtractValue<I, T>
847where
848 I: StreamingIterator<Item = WeightedDatum<T>>,
849{
850 ExtractValue { it }
851}
852
853impl<I, T> StreamingIterator for ExtractValue<I, T>
854where
855 I: StreamingIterator<Item = WeightedDatum<T>>,
856{
857 type Item = T;
858 fn advance(&mut self) {
859 self.it.advance();
860 }
861
862 fn get(&self) -> Option<&Self::Item> {
863 match &self.it.get() {
864 Some(item) => Some(&item.value),
865 None => None,
866 }
867 }
868}
869
870#[derive(Debug, Clone)]
892pub struct WeightedReservoirSample<I, T> {
893 it: I,
894 pub reservoir: Vec<WeightedDatum<T>>,
895 capacity: usize,
896 weight_sum: f64,
897 rng: Pcg64,
898}
899
900pub fn weighted_reservoir_sample<I, T>(
902 it: I,
903 capacity: usize,
904 custom_rng: Option<Pcg64>,
905) -> WeightedReservoirSample<I, T>
906where
907 I: Sized + StreamingIterator<Item = WeightedDatum<T>>,
908 T: Clone,
909{
910 let rng = match custom_rng {
911 Some(rng) => rng,
912 None => Pcg64::from_entropy(),
913 };
914 let reservoir: Vec<WeightedDatum<T>> = Vec::new();
915 WeightedReservoirSample {
916 it,
917 reservoir,
918 capacity,
919 weight_sum: 0.0,
920 rng,
921 }
922}
923
924impl<I, T> StreamingIterator for WeightedReservoirSample<I, T>
925where
926 T: Clone + std::fmt::Debug,
927 I: StreamingIterator<Item = WeightedDatum<T>>,
928{
929 type Item = Vec<WeightedDatum<T>>;
930
931 #[inline]
932 fn advance(&mut self) {
933 if self.reservoir.len() >= self.capacity {
934 if let Some(datum) = self.it.next() {
935 self.weight_sum += datum.weight;
936 let p = &(self.capacity as f64 * datum.weight / self.weight_sum);
937 let j: f64 = self.rng.gen();
938 if j < *p {
939 let h = self.rng.gen_range(0..self.capacity) as usize;
940 let datum_struct = datum.clone();
941 self.reservoir[h] = datum_struct;
942 };
943 }
944 } else {
945 while self.reservoir.len() < self.capacity {
946 if let Some(datum) = self.it.next() {
947 let cloned_datum = datum.clone();
948 self.reservoir.push(cloned_datum);
949 self.weight_sum += datum.weight;
950 } else {
951 break;
952 }
953 }
954 }
955 }
956
957 #[inline]
958 fn get(&self) -> Option<&Self::Item> {
959 if let Some(_wd) = &self.it.get() {
960 Some(&self.reservoir)
961 } else {
962 None
963 }
964 }
965}
966
967#[cfg(test)]
969mod tests {
970
971 use super::*;
972 use crate::utils::generate_stream_with_constant_probability;
973 use crate::utils::mean_of_means_of_step_stream;
974 use std::convert::TryInto;
975 use std::io::Read;
976 use std::iter;
977
978 #[test]
979 fn test_last() {
980 let v = vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9];
981 let iter = convert(v.clone());
982 assert!(last(iter) == Some(9));
983 }
984
985 #[test]
986 fn test_last_none() {
987 let v: Vec<u32> = vec![];
988 assert!(last(convert(v.clone())) == None);
989 }
990
991 #[test]
992 fn step_by_test() {
993 let v = vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9];
994 let iter = convert(v);
995 let mut iter = step_by(iter, 3);
996 let mut _index = 0i64;
997 while let Some(element) = iter.next() {
998 assert_eq!(*element, _index * 3);
999 _index = _index + 1;
1000 }
1001 }
1002
1003 #[test]
1004 fn annotate_test() {
1005 let v = vec![0., 1., 2.];
1006 let iter = convert(v);
1007 fn f(num: &f64) -> f64 {
1008 num * 2.
1009 }
1010 let target_annotations = vec![0., 2., 4.];
1011 let mut annotations: Vec<f64> = Vec::with_capacity(3);
1012 let mut ann_iter = Annotate::new(iter, f);
1013 while let Some(n) = ann_iter.next() {
1014 annotations.push(n.annotation);
1015 }
1016 assert_eq!(annotations, target_annotations);
1017 }
1018
1019 #[test]
1025 fn write_yaml_documents_test() {
1026 let test_file_path = "./write_yaml_documents_test.yaml";
1027 let v: Vec<i64> = vec![0, 1, 2, 3];
1028 let v_iter = convert(v.clone());
1029 let mut yaml_iter = write_yaml_documents(v_iter, String::from(test_file_path))
1030 .expect("Create File and initialize yaml_iter failed.");
1031 while let Some(_) = yaml_iter.next() {}
1032 let mut read_file =
1033 File::open(test_file_path).expect("Could not open file with test data to asserteq.");
1034 let mut contents = String::new();
1035 read_file
1036 .read_to_string(&mut contents)
1037 .expect("Could not read data from file.");
1038 std::fs::remove_file(test_file_path).expect("Could not remove data file for test.");
1044 assert_eq!("---\n0\n---\n1\n---\n2\n---\n3\n", &contents);
1045 }
1046
1047 #[test]
1053 fn write_vec_to_yaml_test() {
1054 let test_file_path = "./vec_to_file_test.yaml";
1055 let v: Vec<Vec<i64>> = vec![vec![0, 1], vec![2, 3]];
1056 let vc = v.clone();
1058 let vc = vc.iter();
1059 let vc = convert(vc);
1060 let mut vc = write_yaml_documents(vc, String::from(test_file_path))
1061 .expect("Vec to Yaml: Create File and initialize yaml_iter failed.");
1062 while let Some(_) = vc.next() {}
1063 let mut read_file =
1064 File::open(test_file_path).expect("Could not open file with test data to asserteq.");
1065 let mut contents = String::new();
1066 read_file
1067 .read_to_string(&mut contents)
1068 .expect("Could not read data from file.");
1069 std::fs::remove_file(test_file_path).expect("Could not remove data file for test.");
1070 assert_eq!("---\n- 0\n- 1\n---\n- 2\n- 3\n", &contents);
1071 }
1072
1073 #[test]
1076 fn annotated_result_to_yaml_test() {
1077 let ann = AnnotatedResult {
1078 result: 0,
1079 annotation: "zero".to_string(),
1080 };
1081 let test_file_path = "./annotated_result_test.yaml";
1082 let mut file = OpenOptions::new()
1083 .append(true)
1084 .create(true)
1085 .open(test_file_path)
1086 .expect("Could not open test file.");
1087 write_yaml_object(&ann, &mut file)
1088 .expect(&format!("write_yaml_object Failed for {}", test_file_path));
1089 let contents = utils::read_yaml_to_string(test_file_path)
1090 .expect(&format!("Could not read {}", test_file_path));
1091 assert_eq!("---\n- 0\n- zero\n", &contents);
1092 }
1093
1094 #[test]
1097 fn numbered_to_yaml_test() {
1098 let num = Numbered {
1099 count: 0,
1100 item: Some(0.1),
1101 };
1102 let test_file_path = "./numbered_test.yaml";
1103 let mut file = OpenOptions::new()
1104 .append(true)
1105 .create(true)
1106 .open(test_file_path)
1107 .expect("Could not open test file.");
1108 write_yaml_object(&num, &mut file).expect("write_yaml_object Failed.");
1109 let contents = utils::read_yaml_to_string(test_file_path).expect("Could not read file.");
1110 assert_eq!("---\n- 0\n- 0.1\n", &contents);
1111 }
1112
1113 #[test]
1115 fn enumerate_test() {
1116 let v = vec![0, 1, 2];
1117 let stream = v.iter();
1118 let stream = convert(stream);
1119 let mut stream = enumerate(stream);
1120 let mut count = 0;
1121 while let Some(item) = stream.next() {
1122 println!("item: {:#?} \n count: {}\n\n", item, count);
1123 assert_eq!(
1124 *item,
1125 Numbered {
1126 count: count,
1127 item: Some(&count)
1128 }
1129 );
1130 count += 1;
1131 }
1132 }
1133 #[test]
1149 fn write_vec_vec_to_yaml_test() {
1150 let test_file_path = "./vec_vec_to_file_test.yaml";
1151 let data_1: Vec<i64> = vec![3, 6, 9];
1152 let data_2: Vec<i64> = vec![5, 10, 15];
1153 let data_1 = data_1.iter().enumerate();
1154 let data_2 = data_2.iter().enumerate();
1155 let mut data_1_vec: Vec<Vec<i64>> = Vec::new();
1156 let mut data_2_vec: Vec<Vec<i64>> = Vec::new();
1157 for (a, b) in data_1 {
1158 data_1_vec.push(vec![a.try_into().unwrap(), *b])
1159 }
1160 for (a, b) in data_2 {
1161 data_2_vec.push(vec![a.try_into().unwrap(), *b])
1162 }
1163 let v: Vec<Vec<Vec<i64>>> = vec![data_1_vec, data_2_vec];
1164 let v = v.iter();
1165 let v = convert(v);
1166 let mut v = write_yaml_documents(v, String::from(test_file_path))
1167 .expect("Vec to Yaml: Create File and initialize yaml_iter failed.");
1168 while let Some(item) = v.next() {
1169 println!("{:#?}", item);
1170 }
1171 let mut read_file =
1172 File::open(test_file_path).expect("Could not open file with test data to asserteq.");
1173 let mut contents = String::new();
1174 read_file
1175 .read_to_string(&mut contents)
1176 .expect("Could not read data from file.");
1177 std::fs::remove_file(test_file_path).expect("Could not remove data file for test.");
1178 assert_eq!("---\n- - 0\n - 3\n- - 1\n - 6\n- - 2\n - 9\n---\n- - 0\n - 5\n- - 1\n - 10\n- - 2\n - 15\n", &contents);
1179 }
1180
1181 #[test]
1185 fn fill_reservoir_test() {
1186 let v: Vec<f64> = vec![0.5, 0.2];
1188 let iter = convert(v);
1189 let mut iter = reservoir_sample(iter, 2, None);
1190 if let Some(reservoir) = iter.next() {
1191 assert_eq!(reservoir[0], 0.5);
1192 assert_eq!(reservoir[1], 0.2);
1193 }
1194 }
1195
1196 #[test]
1197 fn reservoir_replacement_test() {
1201 let stream_length = 1000usize;
1202 let capacity = 5usize;
1204 let initial_stream = iter::repeat(0).take(capacity);
1206 let final_stream = iter::repeat(1).take(stream_length - capacity);
1207 let stream = initial_stream.chain(final_stream);
1208 let stream = convert(stream);
1209 let mut res_iter = reservoir_sample(stream, capacity, None);
1210 if let Some(reservoir) = res_iter.next() {
1211 println!("Initial reservoir: \n {:#?} \n", reservoir);
1212 assert!(reservoir.into_iter().all(|x| *x == 0));
1213 } else {
1214 panic!("The initial reservoir was None.");
1215 };
1216
1217 let mut final_reservoir: Vec<usize> = vec![0, 0, 0, 0, 0];
1218 let mut count: usize = 0;
1219 while let Some(reservoir) = res_iter.next() {
1220 count += 1;
1221 final_reservoir = reservoir.to_vec();
1222 }
1223 println!(
1224 "Final reservoir after {:?} iterations: \n {:#?} \n ",
1225 count, final_reservoir
1226 );
1227 assert!(final_reservoir.into_iter().sum::<usize>() >= 4);
1228 }
1229
1230 #[test]
1232 fn test_datum_struct() {
1233 let samp = new_datum(String::from("hi"), 1.0);
1234 assert_eq!(samp.value, String::from("hi"));
1235 assert_eq!(samp.weight, 1.0);
1236 }
1237
1238 #[test]
1239 #[should_panic(
1240 expected = "The weight is not finite and therefore cannot be used to compute the probability of inclusion in the reservoir."
1241 )]
1242 fn test_new_datum_infinite() {
1243 let _wd: WeightedDatum<String> = new_datum(String::from("some value"), f64::INFINITY);
1244 }
1245
1246 #[test]
1248 fn fill_weighted_reservoir_test() {
1249 let v: Vec<WeightedDatum<f64>> = vec![new_datum(0.5, 1.), new_datum(0.2, 2.)];
1251 let iter = convert(v);
1252 let mut iter = weighted_reservoir_sample(iter, 2, None);
1253 if let Some(reservoir) = iter.next() {
1254 assert_eq!(
1255 reservoir[0],
1256 WeightedDatum {
1257 value: 0.5f64,
1258 weight: 1.0f64
1259 }
1260 );
1261 assert_eq!(
1262 reservoir[1],
1263 WeightedDatum {
1264 value: 0.2f64,
1265 weight: 2.0f64
1266 }
1267 );
1268 }
1269 }
1270
1271 #[test]
1272 fn stream_smaller_than_weighted_reservoir_test() {
1273 let stream_vec = vec![new_datum(1, 1.0), new_datum(2, 1.0)];
1274 let stream = convert(stream_vec);
1275 let mut stream = weighted_reservoir_sample(stream, 3, None);
1276 while let Some(_reservoir) = stream.next() {
1277 println!("{:#?}", _reservoir);
1278 }
1279 }
1280
1281 #[test]
1282 fn test_constant_probability() {
1283 let stream_length = 10usize;
1284 let capacity = 3usize;
1286 let probability = 0.01;
1287 let initial_weight = 1.0;
1288 let mut stream = generate_stream_with_constant_probability(
1290 stream_length,
1291 capacity,
1292 probability,
1293 initial_weight,
1294 0,
1295 1,
1296 );
1297 let mut weight_sum = initial_weight;
1298 stream.nth(capacity - 1);
1300 while let Some(item) = stream.next() {
1302 weight_sum += item.weight;
1303 let p = capacity as f64 * item.weight / weight_sum;
1304 assert!((p - probability).abs() < 0.01 * probability);
1305 }
1306 }
1307
1308 #[test]
1309 #[should_panic(
1310 expected = "The weight is not finite and therefore cannot be used to compute the probability of inclusion in the reservoir."
1311 )]
1312 fn test_constant_probability_fail_from_inf_weight() {
1313 let stream_length: usize = 10_usize.pow(4);
1314 let capacity = 3usize;
1316 let probability = 0.999999999;
1317 let initial_weight = 1.0;
1318 let mut stream = generate_stream_with_constant_probability(
1320 stream_length,
1321 capacity,
1322 probability,
1323 initial_weight,
1324 0,
1325 1,
1326 );
1327 while let Some(_item) = stream.next() {
1328 ()
1329 }
1330 }
1331
1332 #[test]
1333 fn test_stream_vec_generator() {
1334 let stream_length = 50usize;
1335 let capacity = 10usize;
1337 let probability = 0.01;
1338 let initial_weight = 1.0;
1339 let stream = generate_stream_with_constant_probability(
1341 stream_length,
1342 capacity,
1343 probability,
1344 initial_weight,
1345 0,
1346 1,
1347 );
1348 let mut stream = convert(stream);
1349 let mut _index: usize = 0;
1350 while let Some(item) = stream.next() {
1351 match _index {
1352 x if x < capacity => assert_eq!(
1353 item.value, 0,
1354 "Error: item value was {} for index={}",
1355 item.value, x
1356 ),
1357 _ => assert_eq!(
1358 item.value, 1,
1359 "Error: item value was {} for index={}",
1360 item.value, _index
1361 ),
1362 }
1363 _index = _index + 1;
1364 }
1365 }
1366
1367 #[test]
1368 fn wrs_no_replacement_test() {
1369 let stream_length = 20usize;
1370 let capacity = 10usize;
1372 let probability = 0.001;
1373 let initial_weight = 1.0;
1374 let stream = generate_stream_with_constant_probability(
1376 stream_length,
1377 capacity,
1378 probability,
1379 initial_weight,
1380 0,
1381 1,
1382 );
1383 let stream = convert(stream);
1384 let mut wrs_iter = weighted_reservoir_sample(stream, capacity, None);
1385 if let Some(reservoir) = wrs_iter.next() {
1386 assert!(reservoir.into_iter().all(|wd| wd.value == 0));
1387 };
1388
1389 if let Some(reservoir) = wrs_iter.nth(stream_length - capacity - 1) {
1390 assert!(reservoir.into_iter().all(|wd| wd.value == 0));
1391 } else {
1392 panic!("The final reservoir was None.");
1393 };
1394 }
1395
1396 #[test]
1411 fn wrs_complete_replacement_test() {
1412 let stream_length = 200usize;
1413 let capacity = 15usize;
1415 let probability = 0.9;
1416 let initial_weight = 1.0e-20;
1417 let stream = generate_stream_with_constant_probability(
1419 stream_length,
1420 capacity,
1421 probability,
1422 initial_weight,
1423 0,
1424 1,
1425 );
1426 let stream = convert(stream);
1427 let mut wrs_iter = weighted_reservoir_sample(stream, capacity, None);
1428 if let Some(reservoir) = wrs_iter.next() {
1429 assert!(reservoir.into_iter().all(|wd| wd.value == 0));
1430 };
1431
1432 if let Some(reservoir) = wrs_iter.nth(stream_length - capacity - 1) {
1433 assert!(reservoir.into_iter().all(|wd| wd.value == 1));
1434 } else {
1435 panic!("The final reservoir was None.");
1436 };
1437 }
1438
1439 #[test]
1451 fn wrs_mean_test() {
1452 let mean_means = mean_of_means_of_step_stream();
1453 assert!((mean_means - 0.5).abs() < 0.05 * 0.5);
1454 }
1455
1456 #[test]
1461 #[ignore]
1462 fn wrs_mean_test_looped() {
1463 let mut failures = 0usize;
1464 let number_of_runs = 3_000usize;
1465 for _j in 0..number_of_runs {
1466 let mean_means = mean_of_means_of_step_stream();
1467 if (mean_means - 0.5).abs() > 0.05 * 0.5 {
1468 failures += 1;
1469 };
1470 }
1471 println!(
1472 "failures: {:?}, number of runs: {}",
1473 failures, number_of_runs
1474 );
1475 }
1476}