Skip to main content

primal_tokio/
lib.rs

1#![feature(async_closure)]
2#![feature(type_ascription)]
3
4use primal::{Primes, Sieve, SievePrimes};
5use tokio::stream::Stream;
6
7pub fn primes_unbounded() -> impl Stream<Item = usize> {
8    utils::spawn_stream_from_iterator(Primes::all())
9}
10
11struct PrimeFactor {
12    n: usize,
13    #[allow(dead_code)]
14    sieve_box: Box<Sieve>,
15    primes: SievePrimes<'static>,
16}
17
18impl Iterator for PrimeFactor {
19    type Item = Option<(usize, usize)>;
20
21    fn next(&mut self) -> Option<Option<(usize, usize)>> {
22        let p = self.primes.next()?;
23        
24        if self.n == 1 {
25            return None;
26        }
27
28        if p * p > self.n {
29            let result = Some(Some((self.n, 1)));
30            self.n = 1;
31            return result;
32        }
33
34        if self.n % p == 0 {
35            let mut i = 0;
36            while self.n % p == 0 {
37                i += 1;
38                self.n /= p;
39            }
40            Some(Some((p, i)))
41        } else {
42            Some(None)
43        }
44    }
45}
46
47pub fn prime_factor(n: usize) -> impl Stream<Item = (usize, usize)> {
48    let sieve = Sieve::new((n as f64).sqrt() as usize);
49    let sieve_box = Box::new(sieve);
50    let sieve_ref = unsafe { &*(&*sieve_box as *const Sieve) };
51    let primes = sieve_ref.primes_from(0);
52    utils::spawn_stream_from_iterator_yield(PrimeFactor {
53        n,
54        sieve_box,
55        primes,
56    })
57}
58
59mod utils {
60    use tokio::future::poll_fn;
61    use tokio::stream::Stream;
62    use tokio::sync::mpsc;
63    use tokio::task::spawn;
64
65    mod error {
66        use tokio::sync::mpsc::error::*;
67
68        pub enum MpscSendError<T> {
69            ClosedNoValue,
70            Closed(T),
71            Full(T),
72            //Timeout(T),
73        }
74
75        impl<T> From<ClosedError> for MpscSendError<T> {
76            fn from(_err: ClosedError) -> Self {
77                Self::ClosedNoValue
78            }
79        }
80
81        impl<T> From<SendError<T>> for MpscSendError<T> {
82            fn from(err: SendError<T>) -> Self {
83                Self::Closed(err.0)
84            }
85        }
86
87        /*impl<T> From<SendTimeoutError<T>> for MpscSendError<T> {
88            fn from(err: SendTimeoutError<T>) -> Self {
89                match err {
90                    SendTimeoutError::Timeout(x) => Self::Timeout(x),
91                    SendTimeoutError::Closed(x) => Self::Closed(x),
92                }
93            }
94        }*/
95
96        impl<T> From<TrySendError<T>> for MpscSendError<T> {
97            fn from(err: TrySendError<T>) -> Self {
98                match err {
99                    TrySendError::Full(x) => Self::Full(x),
100                    TrySendError::Closed(x) => Self::Closed(x),
101                }
102            }
103        }
104    }
105
106    pub fn spawn_stream_from_iterator<T: Send + 'static>(
107        it: impl Iterator<Item = T> + Send + 'static,
108    ) -> (impl Stream<Item = T> + 'static) {
109        let (mut s, r) = mpsc::channel(1);
110        spawn(async move {
111            for x in it {
112                s.send(x).await?;
113            }
114            Ok::<(), mpsc::error::SendError<T>>(())
115        });
116        r
117    }
118
119    /// None => end of iterator
120    /// Some(None) => yield without value
121    /// Some(Some(...)) => value
122    pub fn spawn_stream_from_iterator_yield<T: Send + 'static>(
123        it: impl Iterator<Item = Option<T>> + Send + 'static,
124    ) -> (impl Stream<Item = T> + 'static) {
125        let (mut s, r) = mpsc::channel(1);
126        spawn(async move {
127            for x in it {
128                match x {
129                    Some(y) => s.send(y).await?,
130                    None => poll_fn(|cx| s.poll_ready(cx)).await?, // Make sure the receiver is still open
131                }
132            }
133            Ok::<(), error::MpscSendError<T>>(())
134        });
135        r
136    }
137}
138
139#[cfg(test)]
140mod tests {
141    use super::*;
142
143    use tokio::stream::StreamExt;
144
145    #[tokio::test]
146    async fn test_primes_unbounded() {
147        let mut primes = primes_unbounded();
148        assert_eq!(primes.next().await, Some(2));
149        assert_eq!(primes.next().await, Some(3));
150        assert_eq!(primes.next().await, Some(5));
151        assert_eq!(primes.next().await, Some(7));
152    }
153
154    #[tokio::test]
155    async fn test_prime_factor() {
156        let mut factors = prime_factor(98_736); // 2*2*2*2*3*11*11*17
157        assert_eq!(factors.next().await, Some((2, 4)));
158        assert_eq!(factors.next().await, Some((3, 1)));
159        assert_eq!(factors.next().await, Some((11, 2)));
160        assert_eq!(factors.next().await, Some((17, 1)));
161        assert_eq!(factors.next().await, None);
162    }
163
164    #[tokio::test]
165    async fn test_prime_factor_large_prime() {
166        let mut factors = prime_factor(5_706_079_200_624); // 2*2*2*2*3*11*11*982_451_653
167        assert_eq!(factors.next().await, Some((2, 4)));
168        assert_eq!(factors.next().await, Some((3, 1)));
169        assert_eq!(factors.next().await, Some((11, 2)));
170        assert_eq!(factors.next().await, Some((982_451_653, 1)));
171        assert_eq!(factors.next().await, None);
172    }
173}