iterative_methods/
lib.rs

1//! # Iterative methods
2//!
3//! Implements [iterative
4//! methods](https://en.wikipedia.org/wiki/Iterative_method) and
5//! utilities for using and developing them as
6//! [StreamingIterators](https://crates.io/crates/streaming-iterator). A
7//! series of [blog
8//! posts](https://daniel-vainsencher.github.io/book/iterative_methods_part_1.html)
9//! provide a gentle introduction.
10//!
11//!
12//! ... but ok fine, here is a really quick example:
13//!```
14//!# extern crate iterative_methods;
15//!# use iterative_methods::derivative_descent::*;
16//!# use iterative_methods::*;
17//!# use streaming_iterator::*;
18//!#
19//!// Problem: minimize the convex parabola f(x) = x^2 + x
20//!let function = |x| x * x + x;
21//!
22//!// An iterative solution by gradient descent
23//!let derivative = |x| 2.0 * x + 1.0;
24//!let step_size = 0.2;
25//!let x_0 = 2.0;
26//!
27//!// Au naturale:
28//!let mut x = x_0;
29//!for i in 0..10 {
30//!    x -= step_size * derivative(x);
31//!    println!("x_{} = {:.2}; f(x_{}) = {:.4}", i, x, i, x * x + x);
32//!}
33//!
34//!// Using replaceable components:
35//!let dd = DerivativeDescent::new(function, derivative, step_size, x_0);
36//!let dd = enumerate(dd);
37//!let mut dd = dd.take(10);
38//!while let Some(&Numbered{item: Some(ref curr), count}) = dd.next() {
39//!    println!("x_{} = {:.2}; f(x_{}) = {:.4}", count, curr.x, count, curr.value());
40//!}
41//!```
42//!
43//! Both produce the exact same output (below), and the first common
44//! approach is much easier to look at, the descent step is right
45//! there. The second separates the algorithm and every other concern
46//! into an easily reusable and composable components. If that sounds
47//! useful, have fun exploring.
48//!
49//!```ignore
50//! x_0 = 1.00; f(x_0) = 2.0000
51//! x_1 = 0.40; f(x_1) = 0.5600
52//! x_2 = 0.04; f(x_2) = 0.0416
53//! x_3 = -0.18; f(x_3) = -0.1450
54//! x_4 = -0.31; f(x_4) = -0.2122
55//! x_5 = -0.38; f(x_5) = -0.2364
56//! x_6 = -0.43; f(x_6) = -0.2451
57//! x_7 = -0.46; f(x_7) = -0.2482
58//! x_8 = -0.47; f(x_8) = -0.2494
59//! x_9 = -0.48; f(x_9) = -0.2498
60//!```
61
62#[cfg(test)]
63extern crate quickcheck;
64extern crate yaml_rust;
65
66use rand::{Rng, SeedableRng};
67use rand_pcg::Pcg64;
68use std::cmp::PartialEq;
69use std::fs::File;
70use std::fs::OpenOptions;
71use std::io::Write;
72use std::time::{Duration, Instant};
73use streaming_iterator::*;
74use yaml_rust::{Yaml, YamlEmitter};
75
76pub mod algorithms;
77pub mod conjugate_gradient;
78pub mod derivative_descent;
79pub mod utils;
80
81/// Creates an iterator which returns initial elements until and
82/// including the first satisfying a predicate.
83#[inline]
84pub fn take_until<I, F>(it: I, f: F) -> TakeUntil<I, F>
85where
86    I: StreamingIterator,
87    F: FnMut(&I::Item) -> bool,
88{
89    TakeUntil {
90        it,
91        f,
92        state: UntilState::Unfulfilled,
93    }
94}
95
96/// An adaptor that returns initial elements until and including the
97/// first satisfying a predicate.
98#[derive(Clone)]
99pub struct TakeUntil<I, F>
100where
101    I: StreamingIterator,
102    F: FnMut(&I::Item) -> bool,
103{
104    pub it: I,
105    pub f: F,
106    pub state: UntilState,
107}
108
109#[derive(Clone, PartialEq)]
110pub enum UntilState {
111    Unfulfilled,
112    Fulfilled,
113    Done,
114}
115
116impl<I, F> StreamingIterator for TakeUntil<I, F>
117where
118    I: StreamingIterator,
119    F: FnMut(&I::Item) -> bool,
120{
121    type Item = I::Item;
122    fn advance(&mut self) {
123        match self.state {
124            UntilState::Unfulfilled => {
125                self.it.advance();
126                if let Some(v) = self.it.get() {
127                    if (self.f)(v) {
128                        self.state = UntilState::Fulfilled
129                    }
130                }
131            }
132            UntilState::Fulfilled => self.state = UntilState::Done,
133            UntilState::Done => {}
134        }
135    }
136
137    fn get(&self) -> Option<&Self::Item> {
138        if UntilState::Done == self.state {
139            None
140        } else {
141            self.it.get()
142        }
143    }
144}
145
146/// Store a generic annotation next to the state.
147#[derive(Clone, Debug)]
148pub struct AnnotatedResult<T, A> {
149    pub result: T,
150    pub annotation: A,
151}
152
153/// An adaptor that annotates every underlying item `x` with `f(x)`.
154#[derive(Clone, Debug)]
155pub struct Annotate<I, T, F, A>
156where
157    I: Sized + StreamingIterator<Item = T>,
158    T: Clone,
159    F: FnMut(&T) -> A,
160{
161    pub it: I,
162    pub f: F,
163    pub current: Option<AnnotatedResult<T, A>>,
164}
165
166impl<I, T, F, A> Annotate<I, T, F, A>
167where
168    I: StreamingIterator<Item = T>,
169    T: Sized + Clone,
170    F: FnMut(&T) -> A,
171{
172    /// Annotate every underlying item with the result of applying `f` to it.
173    pub fn new(it: I, f: F) -> Annotate<I, T, F, A> {
174        Annotate {
175            it,
176            f,
177            current: None,
178        }
179    }
180}
181
182impl<I, T, F, A> StreamingIterator for Annotate<I, T, F, A>
183where
184    I: StreamingIterator<Item = T>,
185    T: Sized + Clone,
186    F: FnMut(&T) -> A,
187{
188    type Item = AnnotatedResult<T, A>;
189
190    fn advance(&mut self) {
191        self.it.advance();
192        self.current = match self.it.get() {
193            Some(n) => {
194                let annotation = (self.f)(n);
195                Some(AnnotatedResult {
196                    annotation,
197                    result: n.clone(),
198                })
199            }
200            None => None,
201        }
202    }
203
204    fn get(&self) -> Option<&Self::Item> {
205        match &self.current {
206            Some(tr) => Some(&tr),
207            None => None,
208        }
209    }
210}
211
212/// Annotate every underlying item with its score, as defined by `f`.
213pub fn assess<I, T, F, A>(it: I, f: F) -> Annotate<I, T, F, A>
214where
215    T: Clone,
216    F: FnMut(&T) -> A,
217    I: StreamingIterator<Item = T>,
218{
219    Annotate::new(it, f)
220}
221
222/// Apply `f(_)->()` to every underlying item (for side-effects).
223pub fn inspect<I, F, T>(it: I, f: F) -> Annotate<I, T, F, ()>
224where
225    I: Sized + StreamingIterator<Item = T>,
226    F: FnMut(&T),
227    T: Clone,
228{
229    Annotate::new(it, f)
230}
231
232/// Get the item before the first None, assuming any exist.
233pub fn last<I, T>(it: I) -> Option<T>
234where
235    I: StreamingIterator<Item = T>,
236    T: Sized + Clone,
237{
238    it.fold(None, |_acc, i| Some((*i).clone()))
239}
240
241/// Adaptor that times every call to `advance` on adaptee. Stores
242/// start time and duration.
243#[derive(Clone, Debug)]
244pub struct Time<I, T>
245where
246    I: StreamingIterator<Item = T>,
247    T: Clone,
248{
249    it: I,
250    current: Option<TimedResult<T>>,
251    timer: Instant,
252}
253
254/// Wrapper for Time.
255///
256/// TimedResult decorates with two duration fields: start_time is
257/// relative to the creation of the process generating results, and
258/// duration is relative to the start of the creation of the current
259/// result.
260#[derive(Clone, Debug)]
261pub struct TimedResult<T> {
262    pub result: T,
263    pub start_time: Duration,
264    pub duration: Duration,
265}
266
267/// Wrap each value of a streaming iterator with the durations:
268/// - between the call to this function and start of the value's computation
269/// - it took to calculate that value
270pub fn time<I, T>(it: I) -> Time<I, T>
271where
272    I: Sized + StreamingIterator<Item = T>,
273    T: Sized + Clone,
274{
275    Time {
276        it,
277        current: None,
278        timer: Instant::now(),
279    }
280}
281
282impl<I, T> StreamingIterator for Time<I, T>
283where
284    I: Sized + StreamingIterator<Item = T>,
285    T: Sized + Clone,
286{
287    type Item = TimedResult<T>;
288
289    fn advance(&mut self) {
290        let start_time = self.timer.elapsed();
291        let before = Instant::now();
292        self.it.advance();
293        self.current = match self.it.get() {
294            Some(n) => Some(TimedResult {
295                start_time,
296                duration: before.elapsed(),
297                result: n.clone(),
298            }),
299            None => None,
300        }
301    }
302
303    fn get(&self) -> Option<&Self::Item> {
304        match &self.current {
305            Some(tr) => Some(&tr),
306            None => None,
307        }
308    }
309}
310
311/// An iterator for stepping iterators by a custom amount.
312///
313/// This is a StreamingIterator version of [std::iter::StepBy.](https://doc.rust-lang.org/std/iter/struct.StepBy.html)
314///
315/// The iterator adaptor step_by(it, step) wraps a StreamingIterator. A
316/// `step` is specified and only the items located every `step` are returned.
317///
318/// Iterator indices begin at 0, thus step_by() converts step -> step - 1
319#[derive(Clone, Debug)]
320pub struct StepBy<I> {
321    it: I,
322    step: usize,
323    first_take: bool,
324}
325
326/// Creates an iterator starting at the same point, but stepping by the given amount at each iteration.
327///
328/// This is a `StreamingIterator` version of [step_by](https://doc.rust-lang.org/std/iter/trait.Iterator.html#method.step_by)
329/// in [`std::iter::Iterator`](https://doc.rust-lang.org/std/iter/trait.Iterator.html)
330
331pub fn step_by<I, T>(it: I, step: usize) -> StepBy<I>
332where
333    I: Sized + StreamingIterator<Item = T>,
334{
335    assert!(step != 0);
336    StepBy {
337        it,
338        step: step - 1,
339        first_take: true,
340    }
341}
342
343impl<I> StreamingIterator for StepBy<I>
344where
345    I: StreamingIterator,
346{
347    type Item = I::Item;
348
349    #[inline]
350    fn advance(&mut self) {
351        if self.first_take {
352            self.first_take = false;
353            self.it.advance();
354        } else {
355            self.it.nth(self.step);
356        }
357    }
358
359    #[inline]
360    fn get(&self) -> Option<&I::Item> {
361        self.it.get()
362    }
363}
364
365/// Write items of StreamingIterator to a file.
366#[derive(Debug)]
367struct WriteToFile<I, F> {
368    pub it: I,
369    pub write_function: F,
370    pub file_writer: File,
371}
372
373/// An adaptor that calls a function to write each item to a file.
374#[allow(dead_code)]
375fn write_to_file<I, T, F>(
376    it: I,
377    write_function: F,
378    file_path: String,
379) -> Result<WriteToFile<I, F>, std::io::Error>
380where
381    I: Sized + StreamingIterator<Item = T>,
382    T: std::fmt::Debug,
383    F: FnMut(&T, &mut std::fs::File) -> std::io::Result<()>,
384{
385    let result = match std::fs::metadata(&file_path) {
386        Ok(_) => {
387            panic!("File to which you want to write already exists or permission does not exist. Please rename or remove the file or gain permission.")
388        }
389        Err(_) => {
390            let file_writer = OpenOptions::new()
391                .append(true)
392                .create(true)
393                .open(file_path)?;
394            Ok(WriteToFile {
395                it,
396                write_function,
397                file_writer,
398            })
399        }
400    };
401    result
402}
403
404impl<I, T, F> StreamingIterator for WriteToFile<I, F>
405where
406    I: Sized + StreamingIterator<Item = T>,
407    T: std::fmt::Debug,
408    F: FnMut(&T, &mut std::fs::File) -> std::io::Result<()>,
409{
410    type Item = I::Item;
411
412    #[inline]
413    fn advance(&mut self) {
414        if let Some(item) = self.it.next() {
415            (self.write_function)(&item, &mut self.file_writer)
416                .expect("Write item to file in WriteToFile advance failed.");
417        } else {
418            self.file_writer.flush().expect("Flush of file failed.");
419        }
420    }
421
422    #[inline]
423    fn get(&self) -> Option<&I::Item> {
424        self.it.get()
425    }
426}
427
428/// Define a trait object for converting to YAML objects.
429pub trait YamlDataType {
430    fn create_yaml_object(&self) -> Yaml;
431}
432
433/// Allow for references.
434impl<T> YamlDataType for &T
435where
436    T: YamlDataType,
437{
438    fn create_yaml_object(&self) -> Yaml {
439        (*self).create_yaml_object()
440    }
441}
442
443/// Implement for basic scalar types.
444impl YamlDataType for i64 {
445    fn create_yaml_object(&self) -> Yaml {
446        Yaml::Integer(*self)
447    }
448}
449
450impl YamlDataType for f64 {
451    fn create_yaml_object(&self) -> Yaml {
452        Yaml::Real((*self).to_string())
453    }
454}
455
456impl YamlDataType for String {
457    fn create_yaml_object(&self) -> Yaml {
458        Yaml::String((*self).to_string())
459    }
460}
461
462// Does this clone cause memory or speed issues?
463// This circular impl was necessary to allow impl YamlDataType for Vec<T> where T impl YamlDataType.
464impl YamlDataType for Yaml {
465    fn create_yaml_object(&self) -> Yaml {
466        self.clone()
467    }
468}
469
470/// This allows recursive wrapping of YamlDataType in Vec, e.g. Vec<Vec<Vec<T>>>.
471impl<T> YamlDataType for Vec<T>
472where
473    T: YamlDataType,
474{
475    fn create_yaml_object(&self) -> Yaml {
476        let v: Vec<Yaml> = self.iter().map(|x| x.create_yaml_object()).collect();
477        Yaml::Array(v)
478    }
479}
480
481impl<T, A> YamlDataType for AnnotatedResult<T, A>
482where
483    T: YamlDataType,
484    A: YamlDataType,
485{
486    fn create_yaml_object(&self) -> Yaml {
487        let t = &self.result;
488        let a = &self.annotation;
489        Yaml::Array(vec![t.create_yaml_object(), a.create_yaml_object()])
490    }
491}
492
493impl<T> YamlDataType for WeightedDatum<T>
494where
495    T: YamlDataType,
496{
497    fn create_yaml_object(&self) -> Yaml {
498        let value = &self.value;
499        let weight = &self.weight;
500        Yaml::Array(vec![
501            value.create_yaml_object(),
502            weight.create_yaml_object(),
503        ])
504    }
505}
506
507/// Write items of StreamingIterator to a Yaml file.
508#[derive(Debug)]
509pub struct WriteYamlDocuments<I> {
510    pub it: I,
511    pub file_writer: File,
512}
513
514/// Adaptor that writes each item to a YAML document.
515pub fn write_yaml_documents<I, T>(
516    it: I,
517    file_path: String,
518) -> Result<WriteYamlDocuments<I>, std::io::Error>
519where
520    I: Sized + StreamingIterator<Item = T>,
521    T: std::fmt::Debug,
522{
523    let result = match std::fs::metadata(&file_path) {
524        Ok(_) => {
525            panic!("Failed to create or gain permission of {}, please delete it or gain permission before running this demo. If the demo runs completely, it will delete the file upon completion.", file_path)
526        }
527        Err(_) => {
528            let file_writer = OpenOptions::new()
529                .append(true)
530                .create(true)
531                .open(file_path)?;
532            Ok(WriteYamlDocuments { it, file_writer })
533        }
534    };
535    result
536}
537
538/// Function used by WriteYamlDocuments to specify how to write each item to file.
539///
540pub fn write_yaml_object<T>(item: &T, file_writer: &mut std::fs::File) -> std::io::Result<()>
541where
542    T: YamlDataType,
543{
544    let yaml_item = item.create_yaml_object();
545    let mut out_str = String::new();
546    let mut emitter = YamlEmitter::new(&mut out_str);
547    emitter
548        .dump(&yaml_item)
549        .expect("Could not convert item to yaml object.");
550    out_str.push('\n');
551    file_writer
552        .write_all(out_str.as_bytes())
553        .expect("Writing value to file failed.");
554    Ok(())
555}
556
557impl<I, T> StreamingIterator for WriteYamlDocuments<I>
558where
559    I: Sized + StreamingIterator<Item = T>,
560    T: std::fmt::Debug + YamlDataType,
561{
562    type Item = I::Item;
563
564    #[inline]
565    fn advance(&mut self) {
566        if let Some(item) = self.it.next() {
567            write_yaml_object(&item, &mut self.file_writer)
568                .expect("Write item to file in WriteYamlDocuments advance failed.");
569        } else {
570            self.file_writer.flush().expect("Flush of file failed.");
571        }
572    }
573
574    #[inline]
575    fn get(&self) -> Option<&I::Item> {
576        self.it.get()
577    }
578}
579
580/// A struct that wraps an `Item` as `Option<Item>` and annotates it with an `i64`. Used by `Enumerate`.
581#[derive(Clone, Debug, std::cmp::PartialEq)]
582pub struct Numbered<T> {
583    pub count: i64,
584    pub item: Option<T>,
585}
586
587impl<T> YamlDataType for Numbered<T>
588where
589    T: YamlDataType,
590{
591    fn create_yaml_object(&self) -> Yaml {
592        let t = (self.item).as_ref().unwrap();
593        Yaml::Array(vec![Yaml::Integer(self.count), t.create_yaml_object()])
594    }
595}
596
597/// An adaptor that enumerates items.
598#[derive(Clone, Debug)]
599pub struct Enumerate<I, T> {
600    pub current: Option<Numbered<T>>,
601    pub it: I,
602}
603
604/// Define a constructor in the Enumerate context.
605impl<I, T> Enumerate<I, T>
606where
607    I: StreamingIterator<Item = T>,
608{
609    pub fn new(it: I) -> Enumerate<I, T> {
610        Enumerate {
611            current: Some(Numbered {
612                count: -1,
613                item: None,
614            }),
615            it,
616        }
617    }
618}
619
620/// A constructor for Enumerate.
621pub fn enumerate<I, T>(it: I) -> Enumerate<I, T>
622where
623    I: StreamingIterator<Item = T>,
624{
625    Enumerate {
626        current: Some(Numbered {
627            count: -1,
628            item: None,
629        }),
630        it,
631    }
632}
633
634impl<I, T> StreamingIterator for Enumerate<I, T>
635where
636    I: StreamingIterator<Item = T>,
637    T: Clone,
638{
639    type Item = Numbered<T>;
640
641    fn advance(&mut self) {
642        self.it.advance();
643        self.current = match self.it.get() {
644            Some(t) => {
645                if let Some(n) = &self.current {
646                    let c = n.count + 1;
647                    Some(Numbered {
648                        count: c,
649                        item: Some(t.clone()),
650                    })
651                } else {
652                    None
653                }
654            }
655            None => None,
656        }
657    }
658
659    fn get(&self) -> Option<&Self::Item> {
660        match &self.current {
661            Some(t) => Some(&t),
662            None => None,
663        }
664    }
665}
666
667/// Adaptor to reservoir sample.
668///
669/// `ReservoirSample` wraps a `StreamingIterator`, `I` and
670/// produces a `StreamingIterator` whose items are samples of size `capacity`
671/// from the stream of `I`. (This is not the capacity of the `Vec` which holds the `reservoir`;
672/// Rather, the length of the `reservoir` is normally referred to as its `capacity`.)
673/// To produce a `reservoir` of length `capacity` on the first call, the
674/// first call of the `advance` method automatically advances the input
675/// iterator `capacity` steps. Subsequent calls of `advance` on `ReservoirIterator`
676/// advance `I` by `skip + 1` steps and will at most replace a single element of the `reservoir`.
677
678/// The random rng is of type `Pcg64` by default, which allows seeded rng. This should be
679/// extended to generic type bound by traits for implementing seeding.
680
681/// See Algorithm L in https://en.wikipedia.org/wiki/Reservoir_sampling#An_optimal_algorithm and
682/// https://dl.acm.org/doi/abs/10.1145/198429.198435
683
684#[derive(Debug, Clone)]
685pub struct ReservoirSample<I, T> {
686    it: I,
687    pub reservoir: Vec<T>,
688    capacity: usize,
689    w: f64,
690    skip: usize,
691    rng: Pcg64,
692}
693
694/// An adaptor for which the items are random samples of the underlying iterator up to the item processed.
695/// The constructor for ReservoirSample.
696pub fn reservoir_sample<I, T>(
697    it: I,
698    capacity: usize,
699    custom_rng: Option<Pcg64>,
700) -> ReservoirSample<I, T>
701where
702    I: Sized + StreamingIterator<Item = T>,
703    T: Clone,
704{
705    let mut rng = match custom_rng {
706        Some(rng) => rng,
707        None => Pcg64::from_entropy(),
708    };
709    let res: Vec<T> = Vec::new();
710    let w_initial = (rng.gen::<f64>().ln() / (capacity as f64)).exp();
711    ReservoirSample {
712        it,
713        reservoir: res,
714        capacity,
715        w: w_initial,
716        skip: ((rng.gen::<f64>() as f64).ln() / (1. - w_initial).ln()).floor() as usize,
717        rng,
718    }
719}
720
721impl<I, T> StreamingIterator for ReservoirSample<I, T>
722where
723    T: Clone + std::fmt::Debug,
724    I: StreamingIterator<Item = T>,
725{
726    type Item = Vec<T>;
727
728    #[inline]
729    fn advance(&mut self) {
730        if self.reservoir.len() < self.capacity {
731            while self.reservoir.len() < self.capacity {
732                if let Some(datum) = self.it.next() {
733                    let cloned_datum = datum.clone();
734                    self.reservoir.push(cloned_datum);
735                } else {
736                    break;
737                }
738            }
739        } else if let Some(datum) = self.it.nth(self.skip) {
740            let h = self.rng.gen_range(0..self.capacity) as usize;
741            let datum_struct = datum.clone();
742            self.reservoir[h] = datum_struct;
743            self.w *= (self.rng.gen::<f64>().ln() / (self.capacity as f64)).exp();
744            self.skip = ((self.rng.gen::<f64>() as f64).ln() / (1. - self.w).ln()).floor() as usize;
745        }
746    }
747
748    #[inline]
749    fn get(&self) -> Option<&Self::Item> {
750        if let Some(_wd) = &self.it.get() {
751            Some(&self.reservoir)
752        } else {
753            None
754        }
755    }
756}
757
758/// Wrapper for Weight.
759///
760/// The WeightedDatum struct wraps the values of a data set to include
761/// a weight for each datum. Currently, the main motivation for this
762/// is to use it for Weighted Reservoir Sampling (WRS).
763#[derive(Debug, Clone, PartialEq)]
764pub struct WeightedDatum<U> {
765    pub value: U,
766    pub weight: f64,
767}
768
769/// Constructor for WeightedDatum.
770pub fn new_datum<U>(value: U, weight: f64) -> WeightedDatum<U>
771where
772    U: Clone,
773{
774    if !weight.is_finite() {
775        panic!("The weight is not finite and therefore cannot be used to compute the probability of inclusion in the reservoir.");
776    }
777    WeightedDatum { value, weight }
778}
779
780/// Adaptor wrapping items with a computed weight.
781///
782/// Weight provides an easy conversion of any iterable to one whose items are WeightedDatum.
783/// Weight holds an iterator and a function. The function is defined by the user to extract
784/// weights from the iterable and package the old items and extracted weights into items as
785/// WeightedDatum
786
787#[derive(Debug, Clone)]
788pub struct Weight<I, T, F>
789where
790    I: StreamingIterator<Item = T>,
791{
792    pub it: I,
793    pub wd: Option<WeightedDatum<T>>,
794    pub f: F,
795}
796
797/// Annotates items of an iterable with a weight using a function `f`.
798pub fn wd_iterable<I, T, F>(it: I, f: F) -> Weight<I, T, F>
799where
800    I: StreamingIterator<Item = T>,
801    F: FnMut(&T) -> f64,
802{
803    Weight { it, wd: None, f }
804}
805
806impl<I, T, F> StreamingIterator for Weight<I, T, F>
807where
808    I: StreamingIterator<Item = T>,
809    F: FnMut(&T) -> f64,
810    T: Sized + Clone,
811{
812    type Item = WeightedDatum<T>;
813
814    fn advance(&mut self) {
815        self.it.advance();
816        self.wd = match self.it.get() {
817            Some(item) => {
818                let new_weight = (self.f)(item);
819                let new_item = item.clone();
820                Some(new_datum(new_item, new_weight))
821            }
822            None => None,
823        }
824    }
825
826    fn get(&self) -> Option<&Self::Item> {
827        match &self.wd {
828            Some(wdatum) => Some(&wdatum),
829            None => None,
830        }
831    }
832}
833
834/// An adaptor that converts items from `WeightedDatum<T>` to `T`.
835///
836#[derive(Clone, Debug)]
837pub struct ExtractValue<I, T>
838where
839    I: StreamingIterator<Item = WeightedDatum<T>>,
840{
841    it: I,
842}
843
844/// The constructor for ExtractValue. Apply it to a StreamingIterator with
845/// `Item = WeightedDatum<T>` and it returns a StreamingIterator with `Item = T`.
846pub fn extract_value<I, T>(it: I) -> ExtractValue<I, T>
847where
848    I: StreamingIterator<Item = WeightedDatum<T>>,
849{
850    ExtractValue { it }
851}
852
853impl<I, T> StreamingIterator for ExtractValue<I, T>
854where
855    I: StreamingIterator<Item = WeightedDatum<T>>,
856{
857    type Item = T;
858    fn advance(&mut self) {
859        self.it.advance();
860    }
861
862    fn get(&self) -> Option<&Self::Item> {
863        match &self.it.get() {
864            Some(item) => Some(&item.value),
865            None => None,
866        }
867    }
868}
869
870/// Adaptor that reservoir samples with weights
871///
872/// Uses the algorithm of M. T. Chao.
873/// `WeightedReservoirSample` wraps a `StreamingIterator`, `I`, whose items must be of type `WeightedDatum` and
874/// produces a `StreamingIterator` whose items are samples of size `capacity`
875/// from the stream of `I`. (This is not the capacity of the `Vec` which holds the `reservoir`;
876/// Rather, the length of the `reservoir` is normally referred to as its `capacity`.)
877/// To produce a `reservoir` of length `capacity` on the first call, the
878/// first call of the `advance` method automatically advances the input
879/// iterator `capacity` steps. Subsequent calls of `advance` on `ReservoirIterator`
880/// advance `I` one step and will at most replace a single element of the `reservoir`.
881
882/// The random rng is of type `Pcg64` by default, which allows seeded rng.
883
884/// See https://en.wikipedia.org/wiki/Reservoir_sampling#Weighted_random_sampling,
885/// https://arxiv.org/abs/1910.11069, or for the original paper,
886/// https://doi.org/10.1093/biomet/69.3.653.
887
888/// Future work might include implementing parallellized batch processing:
889/// https://dl.acm.org/doi/10.1145/3350755.3400287
890
891#[derive(Debug, Clone)]
892pub struct WeightedReservoirSample<I, T> {
893    it: I,
894    pub reservoir: Vec<WeightedDatum<T>>,
895    capacity: usize,
896    weight_sum: f64,
897    rng: Pcg64,
898}
899
900/// Create a random sample of the underlying weighted stream.
901pub fn weighted_reservoir_sample<I, T>(
902    it: I,
903    capacity: usize,
904    custom_rng: Option<Pcg64>,
905) -> WeightedReservoirSample<I, T>
906where
907    I: Sized + StreamingIterator<Item = WeightedDatum<T>>,
908    T: Clone,
909{
910    let rng = match custom_rng {
911        Some(rng) => rng,
912        None => Pcg64::from_entropy(),
913    };
914    let reservoir: Vec<WeightedDatum<T>> = Vec::new();
915    WeightedReservoirSample {
916        it,
917        reservoir,
918        capacity,
919        weight_sum: 0.0,
920        rng,
921    }
922}
923
924impl<I, T> StreamingIterator for WeightedReservoirSample<I, T>
925where
926    T: Clone + std::fmt::Debug,
927    I: StreamingIterator<Item = WeightedDatum<T>>,
928{
929    type Item = Vec<WeightedDatum<T>>;
930
931    #[inline]
932    fn advance(&mut self) {
933        if self.reservoir.len() >= self.capacity {
934            if let Some(datum) = self.it.next() {
935                self.weight_sum += datum.weight;
936                let p = &(self.capacity as f64 * datum.weight / self.weight_sum);
937                let j: f64 = self.rng.gen();
938                if j < *p {
939                    let h = self.rng.gen_range(0..self.capacity) as usize;
940                    let datum_struct = datum.clone();
941                    self.reservoir[h] = datum_struct;
942                };
943            }
944        } else {
945            while self.reservoir.len() < self.capacity {
946                if let Some(datum) = self.it.next() {
947                    let cloned_datum = datum.clone();
948                    self.reservoir.push(cloned_datum);
949                    self.weight_sum += datum.weight;
950                } else {
951                    break;
952                }
953            }
954        }
955    }
956
957    #[inline]
958    fn get(&self) -> Option<&Self::Item> {
959        if let Some(_wd) = &self.it.get() {
960            Some(&self.reservoir)
961        } else {
962            None
963        }
964    }
965}
966
967/// Unit Tests Module
968#[cfg(test)]
969mod tests {
970
971    use super::*;
972    use crate::utils::generate_stream_with_constant_probability;
973    use crate::utils::mean_of_means_of_step_stream;
974    use std::convert::TryInto;
975    use std::io::Read;
976    use std::iter;
977
978    #[test]
979    fn test_last() {
980        let v = vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9];
981        let iter = convert(v.clone());
982        assert!(last(iter) == Some(9));
983    }
984
985    #[test]
986    fn test_last_none() {
987        let v: Vec<u32> = vec![];
988        assert!(last(convert(v.clone())) == None);
989    }
990
991    #[test]
992    fn step_by_test() {
993        let v = vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9];
994        let iter = convert(v);
995        let mut iter = step_by(iter, 3);
996        let mut _index = 0i64;
997        while let Some(element) = iter.next() {
998            assert_eq!(*element, _index * 3);
999            _index = _index + 1;
1000        }
1001    }
1002
1003    #[test]
1004    fn annotate_test() {
1005        let v = vec![0., 1., 2.];
1006        let iter = convert(v);
1007        fn f(num: &f64) -> f64 {
1008            num * 2.
1009        }
1010        let target_annotations = vec![0., 2., 4.];
1011        let mut annotations: Vec<f64> = Vec::with_capacity(3);
1012        let mut ann_iter = Annotate::new(iter, f);
1013        while let Some(n) = ann_iter.next() {
1014            annotations.push(n.annotation);
1015        }
1016        assert_eq!(annotations, target_annotations);
1017    }
1018
1019    /// WriteYamlDocuments Test: Write stream of scalars to yaml
1020    ///
1021    /// This writes a stream of scalars to a yaml file using WriteYamlDocuments iterable.
1022    /// It would fail if the file path used to write the data already existed
1023    /// due to the functionality of write_yaml_documents().
1024    #[test]
1025    fn write_yaml_documents_test() {
1026        let test_file_path = "./write_yaml_documents_test.yaml";
1027        let v: Vec<i64> = vec![0, 1, 2, 3];
1028        let v_iter = convert(v.clone());
1029        let mut yaml_iter = write_yaml_documents(v_iter, String::from(test_file_path))
1030            .expect("Create File and initialize yaml_iter failed.");
1031        while let Some(_) = yaml_iter.next() {}
1032        let mut read_file =
1033            File::open(test_file_path).expect("Could not open file with test data to asserteq.");
1034        let mut contents = String::new();
1035        read_file
1036            .read_to_string(&mut contents)
1037            .expect("Could not read data from file.");
1038        // The following line is to be used when the test is revised to read the contents of the file.
1039        // let docs = Yaml::from_str(&contents);
1040        // This could be used instead of Yaml::from_str; not sure of tradeoffs.
1041        // let docs = YamlLoader::load_from_str(&contents).expect("Could not load contents of file to yaml object.");
1042        // Remove the file for the next run of the test.
1043        std::fs::remove_file(test_file_path).expect("Could not remove data file for test.");
1044        assert_eq!("---\n0\n---\n1\n---\n2\n---\n3\n", &contents);
1045    }
1046
1047    /// WriteYamlDocuments Test: Write stream of vecs to yaml
1048    ///
1049    /// This writes a stream of vecs to a yaml file using WriteYamlDocuments iterable.
1050    /// It would fail if the file path used to write the data already existed
1051    /// due to the functionality of write_to_file().
1052    #[test]
1053    fn write_vec_to_yaml_test() {
1054        let test_file_path = "./vec_to_file_test.yaml";
1055        let v: Vec<Vec<i64>> = vec![vec![0, 1], vec![2, 3]];
1056        // println!("{:#?}", v);
1057        let vc = v.clone();
1058        let vc = vc.iter();
1059        let vc = convert(vc);
1060        let mut vc = write_yaml_documents(vc, String::from(test_file_path))
1061            .expect("Vec to Yaml: Create File and initialize yaml_iter failed.");
1062        while let Some(_) = vc.next() {}
1063        let mut read_file =
1064            File::open(test_file_path).expect("Could not open file with test data to asserteq.");
1065        let mut contents = String::new();
1066        read_file
1067            .read_to_string(&mut contents)
1068            .expect("Could not read data from file.");
1069        std::fs::remove_file(test_file_path).expect("Could not remove data file for test.");
1070        assert_eq!("---\n- 0\n- 1\n---\n- 2\n- 3\n", &contents);
1071    }
1072
1073    /// Test write_yaml_object works on AnnotatedResult
1074    /// This shows that that write_yaml_object works on a custom struct.
1075    #[test]
1076    fn annotated_result_to_yaml_test() {
1077        let ann = AnnotatedResult {
1078            result: 0,
1079            annotation: "zero".to_string(),
1080        };
1081        let test_file_path = "./annotated_result_test.yaml";
1082        let mut file = OpenOptions::new()
1083            .append(true)
1084            .create(true)
1085            .open(test_file_path)
1086            .expect("Could not open test file.");
1087        write_yaml_object(&ann, &mut file)
1088            .expect(&format!("write_yaml_object Failed for {}", test_file_path));
1089        let contents = utils::read_yaml_to_string(test_file_path)
1090            .expect(&format!("Could not read {}", test_file_path));
1091        assert_eq!("---\n- 0\n- zero\n", &contents);
1092    }
1093
1094    /// Test that write_yaml_object works on Numbered.
1095    /// This shows that that write_yaml_object works on a custom struct.
1096    #[test]
1097    fn numbered_to_yaml_test() {
1098        let num = Numbered {
1099            count: 0,
1100            item: Some(0.1),
1101        };
1102        let test_file_path = "./numbered_test.yaml";
1103        let mut file = OpenOptions::new()
1104            .append(true)
1105            .create(true)
1106            .open(test_file_path)
1107            .expect("Could not open test file.");
1108        write_yaml_object(&num, &mut file).expect("write_yaml_object Failed.");
1109        let contents = utils::read_yaml_to_string(test_file_path).expect("Could not read file.");
1110        assert_eq!("---\n- 0\n- 0.1\n", &contents);
1111    }
1112
1113    // Test that enumerate() adaptor produces items wrapped in a Numbered struct with the enumeration count.
1114    #[test]
1115    fn enumerate_test() {
1116        let v = vec![0, 1, 2];
1117        let stream = v.iter();
1118        let stream = convert(stream);
1119        let mut stream = enumerate(stream);
1120        let mut count = 0;
1121        while let Some(item) = stream.next() {
1122            println!("item: {:#?} \n count: {}\n\n", item, count);
1123            assert_eq!(
1124                *item,
1125                Numbered {
1126                    count: count,
1127                    item: Some(&count)
1128                }
1129            );
1130            count += 1;
1131        }
1132    }
1133    /// A stream of 2 items, each of type Vec<Vec<i64>>, is written to .yaml. The stream used is:
1134    /// ---
1135    /// - - 0
1136    ///   - 3
1137    /// - - 1
1138    ///   - 6
1139    /// - - 2
1140    ///   - 9
1141    /// ---
1142    /// - - 0
1143    ///   - 5
1144    /// - - 1
1145    ///   - 10
1146    /// - - 2
1147    ///   - 15
1148    #[test]
1149    fn write_vec_vec_to_yaml_test() {
1150        let test_file_path = "./vec_vec_to_file_test.yaml";
1151        let data_1: Vec<i64> = vec![3, 6, 9];
1152        let data_2: Vec<i64> = vec![5, 10, 15];
1153        let data_1 = data_1.iter().enumerate();
1154        let data_2 = data_2.iter().enumerate();
1155        let mut data_1_vec: Vec<Vec<i64>> = Vec::new();
1156        let mut data_2_vec: Vec<Vec<i64>> = Vec::new();
1157        for (a, b) in data_1 {
1158            data_1_vec.push(vec![a.try_into().unwrap(), *b])
1159        }
1160        for (a, b) in data_2 {
1161            data_2_vec.push(vec![a.try_into().unwrap(), *b])
1162        }
1163        let v: Vec<Vec<Vec<i64>>> = vec![data_1_vec, data_2_vec];
1164        let v = v.iter();
1165        let v = convert(v);
1166        let mut v = write_yaml_documents(v, String::from(test_file_path))
1167            .expect("Vec to Yaml: Create File and initialize yaml_iter failed.");
1168        while let Some(item) = v.next() {
1169            println!("{:#?}", item);
1170        }
1171        let mut read_file =
1172            File::open(test_file_path).expect("Could not open file with test data to asserteq.");
1173        let mut contents = String::new();
1174        read_file
1175            .read_to_string(&mut contents)
1176            .expect("Could not read data from file.");
1177        std::fs::remove_file(test_file_path).expect("Could not remove data file for test.");
1178        assert_eq!("---\n- - 0\n  - 3\n- - 1\n  - 6\n- - 2\n  - 9\n---\n- - 0\n  - 5\n- - 1\n  - 10\n- - 2\n  - 15\n", &contents);
1179    }
1180
1181    /// Tests for the ReservoirSample adaptor
1182    ///
1183    /// This test asserts that the reservoir is filled with the correct items.
1184    #[test]
1185    fn fill_reservoir_test() {
1186        // v is the data stream.
1187        let v: Vec<f64> = vec![0.5, 0.2];
1188        let iter = convert(v);
1189        let mut iter = reservoir_sample(iter, 2, None);
1190        if let Some(reservoir) = iter.next() {
1191            assert_eq!(reservoir[0], 0.5);
1192            assert_eq!(reservoir[1], 0.2);
1193        }
1194    }
1195
1196    #[test]
1197    /// Test that the initial reservoir of zeros is eventually filled with at least 4 ones.
1198    /// Running the test 10000 times resulted in 9997 tests passed. Thus the fail rate is significantly less
1199    /// than 1 in 1000. If the test fails more than 1 in 1000 times, then a change in the library has introduced a bug.
1200    fn reservoir_replacement_test() {
1201        let stream_length = 1000usize;
1202        // reservoir capacity:
1203        let capacity = 5usize;
1204        // Generate a stream with items initially 0 and then 1:
1205        let initial_stream = iter::repeat(0).take(capacity);
1206        let final_stream = iter::repeat(1).take(stream_length - capacity);
1207        let stream = initial_stream.chain(final_stream);
1208        let stream = convert(stream);
1209        let mut res_iter = reservoir_sample(stream, capacity, None);
1210        if let Some(reservoir) = res_iter.next() {
1211            println!("Initial reservoir: \n {:#?} \n", reservoir);
1212            assert!(reservoir.into_iter().all(|x| *x == 0));
1213        } else {
1214            panic!("The initial reservoir was None.");
1215        };
1216
1217        let mut final_reservoir: Vec<usize> = vec![0, 0, 0, 0, 0];
1218        let mut count: usize = 0;
1219        while let Some(reservoir) = res_iter.next() {
1220            count += 1;
1221            final_reservoir = reservoir.to_vec();
1222        }
1223        println!(
1224            "Final reservoir after {:?} iterations: \n {:#?} \n ",
1225            count, final_reservoir
1226        );
1227        assert!(final_reservoir.into_iter().sum::<usize>() >= 4);
1228    }
1229
1230    /// Tests for the WeightedReservoirSample adaptor
1231    #[test]
1232    fn test_datum_struct() {
1233        let samp = new_datum(String::from("hi"), 1.0);
1234        assert_eq!(samp.value, String::from("hi"));
1235        assert_eq!(samp.weight, 1.0);
1236    }
1237
1238    #[test]
1239    #[should_panic(
1240        expected = "The weight is not finite and therefore cannot be used to compute the probability of inclusion in the reservoir."
1241    )]
1242    fn test_new_datum_infinite() {
1243        let _wd: WeightedDatum<String> = new_datum(String::from("some value"), f64::INFINITY);
1244    }
1245
1246    /// This test asserts that the weighted reservoir is filled with the correct items.
1247    #[test]
1248    fn fill_weighted_reservoir_test() {
1249        // v is the data stream.
1250        let v: Vec<WeightedDatum<f64>> = vec![new_datum(0.5, 1.), new_datum(0.2, 2.)];
1251        let iter = convert(v);
1252        let mut iter = weighted_reservoir_sample(iter, 2, None);
1253        if let Some(reservoir) = iter.next() {
1254            assert_eq!(
1255                reservoir[0],
1256                WeightedDatum {
1257                    value: 0.5f64,
1258                    weight: 1.0f64
1259                }
1260            );
1261            assert_eq!(
1262                reservoir[1],
1263                WeightedDatum {
1264                    value: 0.2f64,
1265                    weight: 2.0f64
1266                }
1267            );
1268        }
1269    }
1270
1271    #[test]
1272    fn stream_smaller_than_weighted_reservoir_test() {
1273        let stream_vec = vec![new_datum(1, 1.0), new_datum(2, 1.0)];
1274        let stream = convert(stream_vec);
1275        let mut stream = weighted_reservoir_sample(stream, 3, None);
1276        while let Some(_reservoir) = stream.next() {
1277            println!("{:#?}", _reservoir);
1278        }
1279    }
1280
1281    #[test]
1282    fn test_constant_probability() {
1283        let stream_length = 10usize;
1284        // reservoir capacity:
1285        let capacity = 3usize;
1286        let probability = 0.01;
1287        let initial_weight = 1.0;
1288        // We create a stream with constant probability for all elements:
1289        let mut stream = generate_stream_with_constant_probability(
1290            stream_length,
1291            capacity,
1292            probability,
1293            initial_weight,
1294            0,
1295            1,
1296        );
1297        let mut weight_sum = initial_weight;
1298        // Cue the stream to the first "final value" element:
1299        stream.nth(capacity - 1);
1300        // Check that the probabilities are approximately correct.
1301        while let Some(item) = stream.next() {
1302            weight_sum += item.weight;
1303            let p = capacity as f64 * item.weight / weight_sum;
1304            assert!((p - probability).abs() < 0.01 * probability);
1305        }
1306    }
1307
1308    #[test]
1309    #[should_panic(
1310        expected = "The weight is not finite and therefore cannot be used to compute the probability of inclusion in the reservoir."
1311    )]
1312    fn test_constant_probability_fail_from_inf_weight() {
1313        let stream_length: usize = 10_usize.pow(4);
1314        // reservoir capacity:
1315        let capacity = 3usize;
1316        let probability = 0.999999999;
1317        let initial_weight = 1.0;
1318        // We create a stream with constant probability for all elements:
1319        let mut stream = generate_stream_with_constant_probability(
1320            stream_length,
1321            capacity,
1322            probability,
1323            initial_weight,
1324            0,
1325            1,
1326        );
1327        while let Some(_item) = stream.next() {
1328            ()
1329        }
1330    }
1331
1332    #[test]
1333    fn test_stream_vec_generator() {
1334        let stream_length = 50usize;
1335        // reservoir capacity:
1336        let capacity = 10usize;
1337        let probability = 0.01;
1338        let initial_weight = 1.0;
1339        // We create a stream with constant probability for all elements:
1340        let stream = generate_stream_with_constant_probability(
1341            stream_length,
1342            capacity,
1343            probability,
1344            initial_weight,
1345            0,
1346            1,
1347        );
1348        let mut stream = convert(stream);
1349        let mut _index: usize = 0;
1350        while let Some(item) = stream.next() {
1351            match _index {
1352                x if x < capacity => assert_eq!(
1353                    item.value, 0,
1354                    "Error: item value was {} for index={}",
1355                    item.value, x
1356                ),
1357                _ => assert_eq!(
1358                    item.value, 1,
1359                    "Error: item value was {} for index={}",
1360                    item.value, _index
1361                ),
1362            }
1363            _index = _index + 1;
1364        }
1365    }
1366
1367    #[test]
1368    fn wrs_no_replacement_test() {
1369        let stream_length = 20usize;
1370        // reservoir capacity:
1371        let capacity = 10usize;
1372        let probability = 0.001;
1373        let initial_weight = 1.0;
1374        // We create a stream with constant probability for all elements:
1375        let stream = generate_stream_with_constant_probability(
1376            stream_length,
1377            capacity,
1378            probability,
1379            initial_weight,
1380            0,
1381            1,
1382        );
1383        let stream = convert(stream);
1384        let mut wrs_iter = weighted_reservoir_sample(stream, capacity, None);
1385        if let Some(reservoir) = wrs_iter.next() {
1386            assert!(reservoir.into_iter().all(|wd| wd.value == 0));
1387        };
1388
1389        if let Some(reservoir) = wrs_iter.nth(stream_length - capacity - 1) {
1390            assert!(reservoir.into_iter().all(|wd| wd.value == 0));
1391        } else {
1392            panic!("The final reservoir was None.");
1393        };
1394    }
1395
1396    // Add link to derivation of bounds.
1397    /// This _probabilistic_ test asserts that all items of the initial
1398    /// reservoir will be replaced by the end of the streaming processes.
1399    /// It uses a stream in which the probability of each item being
1400    /// added to the reseroivr is close to 1. By using a large enough
1401    /// stream, we can ensure that the test fails very infrequently.
1402    /// The probability of the test failing is less than .001 if three
1403    /// conditions are met: 1) the
1404    /// reservoir capacity = 20, 2) the probability of each item being
1405    /// added is >=0.9, and 3) the length of the stream is >=460. A
1406    /// derivation of bounds that ensure a given level of success for
1407    /// the test can be found in the docs [LINK].
1408    // Consider wrapping the test in a for loop that runs the test 10^6 times
1409    // and counts the number of failures.
1410    #[test]
1411    fn wrs_complete_replacement_test() {
1412        let stream_length = 200usize;
1413        // reservoir capacity:
1414        let capacity = 15usize;
1415        let probability = 0.9;
1416        let initial_weight = 1.0e-20;
1417        // We create a stream whose probabilities are all 0.9:
1418        let stream = generate_stream_with_constant_probability(
1419            stream_length,
1420            capacity,
1421            probability,
1422            initial_weight,
1423            0,
1424            1,
1425        );
1426        let stream = convert(stream);
1427        let mut wrs_iter = weighted_reservoir_sample(stream, capacity, None);
1428        if let Some(reservoir) = wrs_iter.next() {
1429            assert!(reservoir.into_iter().all(|wd| wd.value == 0));
1430        };
1431
1432        if let Some(reservoir) = wrs_iter.nth(stream_length - capacity - 1) {
1433            assert!(reservoir.into_iter().all(|wd| wd.value == 1));
1434        } else {
1435            panic!("The final reservoir was None.");
1436        };
1437    }
1438
1439    // For a stream of the form [(0,1),..,(0,1),(1,1),..,(1,1)] with equal numbers
1440    // of zero and one values and all weights equal to 1, we expect weighted reservoir
1441    // sampling to reduce to reservoir sampling (no weights) and thus to produce
1442    // a reservoir whose mean estimates the mean of the entire stream, which in this case
1443    // is 0.5. A reservoir sample is generated 50 times. Each time the mean is calculated
1444    // and the mean of these means is taken. It is asserted that this mean of means is
1445    // within 5% of the true mean, 0.5.
1446    //
1447    // In wrs_mean_test_looped(), the current test (wrs_mean_test()) was run 3000 times
1448    // with one failure resulting. Thus we estimate the failure rate to be approximately
1449    // 1 in 3000. If this test fails more than once for you, there is likely a problem.
1450    #[test]
1451    fn wrs_mean_test() {
1452        let mean_means = mean_of_means_of_step_stream();
1453        assert!((mean_means - 0.5).abs() < 0.05 * 0.5);
1454    }
1455
1456    // This test is used to estimate the failure rate of wrs_mean_test (see above).
1457    // wrs_mean_test() is run 3000 times. In our experience this has led to one
1458    // failure. Thus we estimate the failure rate of wrs_mean_test() to be approximately
1459    // 1 in 3000.
1460    #[test]
1461    #[ignore]
1462    fn wrs_mean_test_looped() {
1463        let mut failures = 0usize;
1464        let number_of_runs = 3_000usize;
1465        for _j in 0..number_of_runs {
1466            let mean_means = mean_of_means_of_step_stream();
1467            if (mean_means - 0.5).abs() > 0.05 * 0.5 {
1468                failures += 1;
1469            };
1470        }
1471        println!(
1472            "failures: {:?}, number of runs: {}",
1473            failures, number_of_runs
1474        );
1475    }
1476}