kalosm_streams/
text_stream.rs1use pin_project_lite::pin_project;
4use std::{
5 collections::VecDeque,
6 pin::Pin,
7 task::{Context, Poll},
8};
9
10pub use crate::sender::*;
11use futures_util::{Stream, StreamExt};
12
13pub trait TextStream<I: AsRef<str> = String>: Stream<Item = I> {
15 fn words(self) -> WordStream<Self, I>
17 where
18 Self: Sized,
19 {
20 WordStream::new(self)
21 }
22
23 fn sentences(self) -> SentenceStream<Self, I>
25 where
26 Self: Sized,
27 {
28 SentenceStream::new(self)
29 }
30
31 fn paragraphs(self) -> ParagraphStream<Self, I>
33 where
34 Self: Sized,
35 {
36 ParagraphStream::new(self)
37 }
38
39 fn write_to<'a, W: std::io::Write + Send + 'a>(
41 &'a mut self,
42 mut writer: W,
43 ) -> impl std::future::Future<Output = std::io::Result<()>> + Send + 'a
44 where
45 Self: Sized + Unpin + Send,
46 {
47 async move {
48 while let Some(text) = self.next().await {
49 writer.write_all(text.as_ref().as_bytes())?;
50 writer.flush()?;
51 }
52 Ok(())
53 }
54 }
55
56 fn all_text(&mut self) -> impl std::future::Future<Output = String> + Send + '_
58 where
59 Self: Sized + Unpin + Send,
60 {
61 async move {
62 let mut all_text = String::new();
63 while let Some(text) = self.next().await {
64 all_text.push_str(text.as_ref());
65 }
66 all_text
67 }
68 }
69
70 fn to_std_out(&mut self) -> impl std::future::Future<Output = std::io::Result<()>> + Send + '_
72 where
73 Self: Sized + Unpin + Send,
74 {
75 self.write_to(std::io::stdout())
76 }
77}
78
79impl<S: Stream<Item = I>, I: AsRef<str>> TextStream<I> for S {}
80
81pub trait Pattern {
83 fn matches(&self, char: char) -> bool;
85}
86
87pin_project! {
88 pub struct SegmentedStream<S: Stream<Item = I>, I: AsRef<str>, P: Pattern> {
90 #[pin]
91 backing: S,
92 queue: VecDeque<String>,
93 incomplete: String,
94 pattern: P,
95 }
96}
97
98impl<S: Stream<Item = I>, I: AsRef<str>, P: Pattern> SegmentedStream<S, I, P> {
99 fn new(backing: S, pattern: P) -> Self {
101 Self {
102 backing,
103 queue: Default::default(),
104 incomplete: Default::default(),
105 pattern,
106 }
107 }
108}
109
110impl<S: Stream<Item = I>, I: AsRef<str>, P: Pattern> Stream for SegmentedStream<S, I, P> {
111 type Item = String;
112
113 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
114 let projected = self.project();
115 let mut backing = projected.backing;
116 let incomplete = projected.incomplete;
117 let queue = projected.queue;
118 if let Some(next) = queue.pop_front() {
119 return Poll::Ready(Some(next));
120 }
121 loop {
122 let poll = backing.as_mut().poll_next(cx);
123 match poll {
124 Poll::Ready(Some(item)) => {
125 let item = item.as_ref();
126 let mut completed = None;
127 for char in item.chars() {
128 if projected.pattern.matches(char) {
129 incomplete.push(char);
130 let full_sentence = std::mem::take(incomplete);
131 if completed.is_some() {
132 queue.push_back(full_sentence);
133 } else {
134 completed = Some(full_sentence);
135 }
136 } else {
137 incomplete.push(char);
138 }
139 }
140 if let Some(completed) = completed {
141 return Poll::Ready(Some(completed));
142 }
143 }
144 Poll::Ready(None) => {
145 if !incomplete.is_empty() {
146 return Poll::Ready(Some(std::mem::take(incomplete)));
147 } else {
148 return Poll::Ready(None);
149 }
150 }
151 _ => {
152 return Poll::Pending;
153 }
154 }
155 }
156 }
157}
158
159struct SentencePattern;
160
161impl Pattern for SentencePattern {
162 fn matches(&self, char: char) -> bool {
163 char == '.' || char == '?' || char == '!'
164 }
165}
166
167pin_project! {
168 pub struct SentenceStream<S: Stream<Item = I>, I: AsRef<str>> {
170 #[pin]
171 segmented: SegmentedStream<S, I, SentencePattern>,
172 }
173}
174
175impl<S: Stream<Item = I>, I: AsRef<str>> SentenceStream<S, I> {
176 fn new(backing: S) -> Self {
178 Self {
179 segmented: SegmentedStream::new(backing, SentencePattern),
180 }
181 }
182}
183
184impl<S: Stream<Item = I>, I: AsRef<str>> Stream for SentenceStream<S, I> {
185 type Item = String;
186
187 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
188 self.project().segmented.poll_next(cx)
189 }
190}
191
192pin_project! {
193 pub struct WordStream<S: Stream<Item = I>, I: AsRef<str>> {
195 #[pin]
196 segmented: SegmentedStream<S, I, WordPattern>,
197 }
198}
199
200impl<S: Stream<Item = I>, I: AsRef<str>> WordStream<S, I> {
201 fn new(backing: S) -> Self {
203 Self {
204 segmented: SegmentedStream::new(backing, WordPattern),
205 }
206 }
207}
208
209impl<S: Stream<Item = I>, I: AsRef<str>> Stream for WordStream<S, I> {
210 type Item = String;
211
212 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
213 self.project().segmented.poll_next(cx)
214 }
215}
216
217struct WordPattern;
218
219impl Pattern for WordPattern {
220 fn matches(&self, char: char) -> bool {
221 char.is_whitespace()
222 }
223}
224
225pin_project! {
226 pub struct ParagraphStream<S: Stream<Item = I>, I: AsRef<str>> {
228 #[pin]
229 segmented: SegmentedStream<S, I, ParagraphPattern>,
230 }
231}
232
233impl<S: Stream<Item = I>, I: AsRef<str>> ParagraphStream<S, I> {
234 pub fn new(backing: S) -> Self {
236 Self {
237 segmented: SegmentedStream::new(backing, ParagraphPattern),
238 }
239 }
240}
241
242impl<S: Stream<Item = I>, I: AsRef<str>> Stream for ParagraphStream<S, I> {
243 type Item = String;
244
245 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
246 self.project().segmented.poll_next(cx)
247 }
248}
249
250struct ParagraphPattern;
251
252impl Pattern for ParagraphPattern {
253 fn matches(&self, char: char) -> bool {
254 char == '\n'
255 }
256}