kalosm_streams/
text_stream.rs

1//! Streams for text data.
2
3use pin_project_lite::pin_project;
4use std::{
5    collections::VecDeque,
6    pin::Pin,
7    task::{Context, Poll},
8};
9
10pub use crate::sender::*;
11use futures_util::{Stream, StreamExt};
12
13/// A stream of text. This is automatically implemented for all streams of something that acts like a string (String, &str).
14pub trait TextStream<I: AsRef<str> = String>: Stream<Item = I> {
15    /// Split the stream into words.
16    fn words(self) -> WordStream<Self, I>
17    where
18        Self: Sized,
19    {
20        WordStream::new(self)
21    }
22
23    /// Split the stream into sentences.
24    fn sentences(self) -> SentenceStream<Self, I>
25    where
26        Self: Sized,
27    {
28        SentenceStream::new(self)
29    }
30
31    /// Split the stream into paragraphs.
32    fn paragraphs(self) -> ParagraphStream<Self, I>
33    where
34        Self: Sized,
35    {
36        ParagraphStream::new(self)
37    }
38
39    /// Write the stream to a writer.
40    fn write_to<'a, W: std::io::Write + Send + 'a>(
41        &'a mut self,
42        mut writer: W,
43    ) -> impl std::future::Future<Output = std::io::Result<()>> + Send + 'a
44    where
45        Self: Sized + Unpin + Send,
46    {
47        async move {
48            while let Some(text) = self.next().await {
49                writer.write_all(text.as_ref().as_bytes())?;
50                writer.flush()?;
51            }
52            Ok(())
53        }
54    }
55
56    /// Get all the text from the stream.
57    fn all_text(&mut self) -> impl std::future::Future<Output = String> + Send + '_
58    where
59        Self: Sized + Unpin + Send,
60    {
61        async move {
62            let mut all_text = String::new();
63            while let Some(text) = self.next().await {
64                all_text.push_str(text.as_ref());
65            }
66            all_text
67        }
68    }
69
70    /// Write the stream to standard output.
71    fn to_std_out(&mut self) -> impl std::future::Future<Output = std::io::Result<()>> + Send + '_
72    where
73        Self: Sized + Unpin + Send,
74    {
75        self.write_to(std::io::stdout())
76    }
77}
78
79impl<S: Stream<Item = I>, I: AsRef<str>> TextStream<I> for S {}
80
81/// A pattern that matches a character.
82pub trait Pattern {
83    /// Check if a character matches the pattern.
84    fn matches(&self, char: char) -> bool;
85}
86
87pin_project! {
88    /// A stream that output segments of text at a time.
89    pub struct SegmentedStream<S: Stream<Item = I>, I: AsRef<str>, P: Pattern> {
90        #[pin]
91        backing: S,
92        queue: VecDeque<String>,
93        incomplete: String,
94        pattern: P,
95    }
96}
97
98impl<S: Stream<Item = I>, I: AsRef<str>, P: Pattern> SegmentedStream<S, I, P> {
99    /// Create a new segmented stream from a stream of text and a pattern that separates segments
100    fn new(backing: S, pattern: P) -> Self {
101        Self {
102            backing,
103            queue: Default::default(),
104            incomplete: Default::default(),
105            pattern,
106        }
107    }
108}
109
110impl<S: Stream<Item = I>, I: AsRef<str>, P: Pattern> Stream for SegmentedStream<S, I, P> {
111    type Item = String;
112
113    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
114        let projected = self.project();
115        let mut backing = projected.backing;
116        let incomplete = projected.incomplete;
117        let queue = projected.queue;
118        if let Some(next) = queue.pop_front() {
119            return Poll::Ready(Some(next));
120        }
121        loop {
122            let poll = backing.as_mut().poll_next(cx);
123            match poll {
124                Poll::Ready(Some(item)) => {
125                    let item = item.as_ref();
126                    let mut completed = None;
127                    for char in item.chars() {
128                        if projected.pattern.matches(char) {
129                            incomplete.push(char);
130                            let full_sentence = std::mem::take(incomplete);
131                            if completed.is_some() {
132                                queue.push_back(full_sentence);
133                            } else {
134                                completed = Some(full_sentence);
135                            }
136                        } else {
137                            incomplete.push(char);
138                        }
139                    }
140                    if let Some(completed) = completed {
141                        return Poll::Ready(Some(completed));
142                    }
143                }
144                Poll::Ready(None) => {
145                    if !incomplete.is_empty() {
146                        return Poll::Ready(Some(std::mem::take(incomplete)));
147                    } else {
148                        return Poll::Ready(None);
149                    }
150                }
151                _ => {
152                    return Poll::Pending;
153                }
154            }
155        }
156    }
157}
158
159struct SentencePattern;
160
161impl Pattern for SentencePattern {
162    fn matches(&self, char: char) -> bool {
163        char == '.' || char == '?' || char == '!'
164    }
165}
166
167pin_project! {
168    /// A stream that output sentences of text at a time.
169    pub struct SentenceStream<S: Stream<Item = I>, I: AsRef<str>> {
170        #[pin]
171        segmented: SegmentedStream<S, I, SentencePattern>,
172    }
173}
174
175impl<S: Stream<Item = I>, I: AsRef<str>> SentenceStream<S, I> {
176    /// Create a new sentence stream from a stream of text
177    fn new(backing: S) -> Self {
178        Self {
179            segmented: SegmentedStream::new(backing, SentencePattern),
180        }
181    }
182}
183
184impl<S: Stream<Item = I>, I: AsRef<str>> Stream for SentenceStream<S, I> {
185    type Item = String;
186
187    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
188        self.project().segmented.poll_next(cx)
189    }
190}
191
192pin_project! {
193    /// A stream that output words of text at a time.
194    pub struct WordStream<S: Stream<Item = I>, I: AsRef<str>> {
195        #[pin]
196        segmented: SegmentedStream<S, I, WordPattern>,
197    }
198}
199
200impl<S: Stream<Item = I>, I: AsRef<str>> WordStream<S, I> {
201    /// Create a new word stream from a stream of text
202    fn new(backing: S) -> Self {
203        Self {
204            segmented: SegmentedStream::new(backing, WordPattern),
205        }
206    }
207}
208
209impl<S: Stream<Item = I>, I: AsRef<str>> Stream for WordStream<S, I> {
210    type Item = String;
211
212    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
213        self.project().segmented.poll_next(cx)
214    }
215}
216
217struct WordPattern;
218
219impl Pattern for WordPattern {
220    fn matches(&self, char: char) -> bool {
221        char.is_whitespace()
222    }
223}
224
225pin_project! {
226    /// A stream that output paragraphs of text at a time.
227    pub struct ParagraphStream<S: Stream<Item = I>, I: AsRef<str>> {
228        #[pin]
229        segmented: SegmentedStream<S, I, ParagraphPattern>,
230    }
231}
232
233impl<S: Stream<Item = I>, I: AsRef<str>> ParagraphStream<S, I> {
234    /// Create a new paragraph stream from a stream of text
235    pub fn new(backing: S) -> Self {
236        Self {
237            segmented: SegmentedStream::new(backing, ParagraphPattern),
238        }
239    }
240}
241
242impl<S: Stream<Item = I>, I: AsRef<str>> Stream for ParagraphStream<S, I> {
243    type Item = String;
244
245    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
246        self.project().segmented.poll_next(cx)
247    }
248}
249
250struct ParagraphPattern;
251
252impl Pattern for ParagraphPattern {
253    fn matches(&self, char: char) -> bool {
254        char == '\n'
255    }
256}