1#![feature(async_closure)]
2#![feature(type_ascription)]
3
4use primal::{Primes, Sieve, SievePrimes};
5use tokio::stream::Stream;
6
7pub fn primes_unbounded() -> impl Stream<Item = usize> {
8 utils::spawn_stream_from_iterator(Primes::all())
9}
10
11struct PrimeFactor {
12 n: usize,
13 #[allow(dead_code)]
14 sieve_box: Box<Sieve>,
15 primes: SievePrimes<'static>,
16}
17
18impl Iterator for PrimeFactor {
19 type Item = Option<(usize, usize)>;
20
21 fn next(&mut self) -> Option<Option<(usize, usize)>> {
22 let p = self.primes.next()?;
23
24 if self.n == 1 {
25 return None;
26 }
27
28 if p * p > self.n {
29 let result = Some(Some((self.n, 1)));
30 self.n = 1;
31 return result;
32 }
33
34 if self.n % p == 0 {
35 let mut i = 0;
36 while self.n % p == 0 {
37 i += 1;
38 self.n /= p;
39 }
40 Some(Some((p, i)))
41 } else {
42 Some(None)
43 }
44 }
45}
46
47pub fn prime_factor(n: usize) -> impl Stream<Item = (usize, usize)> {
48 let sieve = Sieve::new((n as f64).sqrt() as usize);
49 let sieve_box = Box::new(sieve);
50 let sieve_ref = unsafe { &*(&*sieve_box as *const Sieve) };
51 let primes = sieve_ref.primes_from(0);
52 utils::spawn_stream_from_iterator_yield(PrimeFactor {
53 n,
54 sieve_box,
55 primes,
56 })
57}
58
59mod utils {
60 use tokio::future::poll_fn;
61 use tokio::stream::Stream;
62 use tokio::sync::mpsc;
63 use tokio::task::spawn;
64
65 mod error {
66 use tokio::sync::mpsc::error::*;
67
68 pub enum MpscSendError<T> {
69 ClosedNoValue,
70 Closed(T),
71 Full(T),
72 }
74
75 impl<T> From<ClosedError> for MpscSendError<T> {
76 fn from(_err: ClosedError) -> Self {
77 Self::ClosedNoValue
78 }
79 }
80
81 impl<T> From<SendError<T>> for MpscSendError<T> {
82 fn from(err: SendError<T>) -> Self {
83 Self::Closed(err.0)
84 }
85 }
86
87 impl<T> From<TrySendError<T>> for MpscSendError<T> {
97 fn from(err: TrySendError<T>) -> Self {
98 match err {
99 TrySendError::Full(x) => Self::Full(x),
100 TrySendError::Closed(x) => Self::Closed(x),
101 }
102 }
103 }
104 }
105
106 pub fn spawn_stream_from_iterator<T: Send + 'static>(
107 it: impl Iterator<Item = T> + Send + 'static,
108 ) -> (impl Stream<Item = T> + 'static) {
109 let (mut s, r) = mpsc::channel(1);
110 spawn(async move {
111 for x in it {
112 s.send(x).await?;
113 }
114 Ok::<(), mpsc::error::SendError<T>>(())
115 });
116 r
117 }
118
119 pub fn spawn_stream_from_iterator_yield<T: Send + 'static>(
123 it: impl Iterator<Item = Option<T>> + Send + 'static,
124 ) -> (impl Stream<Item = T> + 'static) {
125 let (mut s, r) = mpsc::channel(1);
126 spawn(async move {
127 for x in it {
128 match x {
129 Some(y) => s.send(y).await?,
130 None => poll_fn(|cx| s.poll_ready(cx)).await?, }
132 }
133 Ok::<(), error::MpscSendError<T>>(())
134 });
135 r
136 }
137}
138
139#[cfg(test)]
140mod tests {
141 use super::*;
142
143 use tokio::stream::StreamExt;
144
145 #[tokio::test]
146 async fn test_primes_unbounded() {
147 let mut primes = primes_unbounded();
148 assert_eq!(primes.next().await, Some(2));
149 assert_eq!(primes.next().await, Some(3));
150 assert_eq!(primes.next().await, Some(5));
151 assert_eq!(primes.next().await, Some(7));
152 }
153
154 #[tokio::test]
155 async fn test_prime_factor() {
156 let mut factors = prime_factor(98_736); assert_eq!(factors.next().await, Some((2, 4)));
158 assert_eq!(factors.next().await, Some((3, 1)));
159 assert_eq!(factors.next().await, Some((11, 2)));
160 assert_eq!(factors.next().await, Some((17, 1)));
161 assert_eq!(factors.next().await, None);
162 }
163
164 #[tokio::test]
165 async fn test_prime_factor_large_prime() {
166 let mut factors = prime_factor(5_706_079_200_624); assert_eq!(factors.next().await, Some((2, 4)));
168 assert_eq!(factors.next().await, Some((3, 1)));
169 assert_eq!(factors.next().await, Some((11, 2)));
170 assert_eq!(factors.next().await, Some((982_451_653, 1)));
171 assert_eq!(factors.next().await, None);
172 }
173}