1use std::cell::RefCell;
6use std::rc::Rc;
7use std::sync::mpsc::{self, Receiver, Sender};
8use std::thread;
9
10#[derive(Clone, Debug)]
12pub enum StreamState {
13 Idle,
15 Streaming,
17 Done,
19 Error(String),
21}
22
23pub struct StreamHandle<T> {
25 inner: Rc<RefCell<StreamInner<T>>>,
26}
27
28struct StreamInner<T> {
29 accumulated: T,
31 state: StreamState,
33 started: bool,
35 receiver: Option<Receiver<StreamItem<T>>>,
37}
38
39enum StreamItem<T> {
41 Value(T),
43 Done,
45 Error(String),
47}
48
49impl<T> Clone for StreamHandle<T>
50where
51 T: Clone,
52{
53 fn clone(&self) -> Self {
54 Self {
55 inner: Rc::clone(&self.inner),
56 }
57 }
58}
59
60impl<T: Clone + Default + 'static> StreamHandle<T> {
61 pub fn new() -> Self {
63 Self {
64 inner: Rc::new(RefCell::new(StreamInner {
65 accumulated: T::default(),
66 state: StreamState::Idle,
67 started: false,
68 receiver: None,
69 })),
70 }
71 }
72
73 pub fn with_initial(initial: T) -> Self {
75 Self {
76 inner: Rc::new(RefCell::new(StreamInner {
77 accumulated: initial,
78 state: StreamState::Idle,
79 started: false,
80 receiver: None,
81 })),
82 }
83 }
84
85 pub fn get(&self) -> T {
87 self.inner.borrow().accumulated.clone()
88 }
89
90 pub fn is_loading(&self) -> bool {
92 matches!(
93 self.inner.borrow().state,
94 StreamState::Idle | StreamState::Streaming
95 )
96 }
97
98 pub fn is_streaming(&self) -> bool {
100 matches!(self.inner.borrow().state, StreamState::Streaming)
101 }
102
103 pub fn is_done(&self) -> bool {
105 matches!(self.inner.borrow().state, StreamState::Done)
106 }
107
108 pub fn is_error(&self) -> bool {
110 matches!(self.inner.borrow().state, StreamState::Error(_))
111 }
112
113 pub fn error(&self) -> Option<String> {
115 match &self.inner.borrow().state {
116 StreamState::Error(e) => Some(e.clone()),
117 _ => None,
118 }
119 }
120
121 pub fn state(&self) -> StreamState {
123 self.inner.borrow().state.clone()
124 }
125}
126
127impl<T: Clone + Send + 'static> StreamHandle<T> {
128 pub fn start<F, I>(&self, stream_fn: F)
133 where
134 F: FnOnce() -> I + Send + 'static,
135 I: Iterator<Item = T> + Send + 'static,
136 {
137 let mut inner = self.inner.borrow_mut();
138 if inner.started {
139 return;
140 }
141
142 inner.started = true;
143 inner.state = StreamState::Streaming;
144
145 let (tx, rx): (Sender<StreamItem<T>>, Receiver<StreamItem<T>>) = mpsc::channel();
147 inner.receiver = Some(rx);
148
149 thread::spawn(move || {
151 let iter = stream_fn();
152 for item in iter {
153 if tx.send(StreamItem::Value(item)).is_err() {
154 return;
156 }
157 }
158 let _ = tx.send(StreamItem::Done);
159 });
160 }
161
162 pub fn start_with_result<F, I>(&self, stream_fn: F)
164 where
165 F: FnOnce() -> Result<I, String> + Send + 'static,
166 I: Iterator<Item = T> + Send + 'static,
167 {
168 let mut inner = self.inner.borrow_mut();
169 if inner.started {
170 return;
171 }
172
173 inner.started = true;
174 inner.state = StreamState::Streaming;
175
176 let (tx, rx): (Sender<StreamItem<T>>, Receiver<StreamItem<T>>) = mpsc::channel();
177 inner.receiver = Some(rx);
178
179 thread::spawn(move || match stream_fn() {
180 Ok(iter) => {
181 for item in iter {
182 if tx.send(StreamItem::Value(item)).is_err() {
183 return;
184 }
185 }
186 let _ = tx.send(StreamItem::Done);
187 }
188 Err(e) => {
189 let _ = tx.send(StreamItem::Error(e));
190 }
191 });
192 }
193
194 pub fn poll(&self, accumulate: impl Fn(&mut T, T)) -> bool {
197 let mut inner = self.inner.borrow_mut();
198 let mut updated = false;
199
200 if let Some(receiver) = inner.receiver.take() {
202 let mut new_state = None;
204 loop {
205 match receiver.try_recv() {
206 Ok(StreamItem::Value(item)) => {
207 accumulate(&mut inner.accumulated, item);
208 updated = true;
209 }
210 Ok(StreamItem::Done) => {
211 new_state = Some(StreamState::Done);
212 break;
213 }
214 Ok(StreamItem::Error(e)) => {
215 new_state = Some(StreamState::Error(e));
216 break;
217 }
218 Err(mpsc::TryRecvError::Empty) => {
219 break;
220 }
221 Err(mpsc::TryRecvError::Disconnected) => {
222 if !matches!(inner.state, StreamState::Done | StreamState::Error(_)) {
223 new_state = Some(StreamState::Error(
224 "Stream disconnected unexpectedly".to_string(),
225 ));
226 }
227 break;
228 }
229 }
230 }
231
232 if new_state.is_none() || matches!(new_state, Some(StreamState::Streaming)) {
234 inner.receiver = Some(receiver);
235 }
236
237 if let Some(state) = new_state {
238 inner.state = state;
239 }
240 }
241
242 updated
243 }
244
245 pub fn reset(&self)
247 where
248 T: Default,
249 {
250 let mut inner = self.inner.borrow_mut();
251 inner.accumulated = T::default();
252 inner.state = StreamState::Idle;
253 inner.started = false;
254 inner.receiver = None;
255 }
256
257 pub fn reset_with(&self, initial: T) {
259 let mut inner = self.inner.borrow_mut();
260 inner.accumulated = initial;
261 inner.state = StreamState::Idle;
262 inner.started = false;
263 inner.receiver = None;
264 }
265}
266
267impl<T: Clone + Default + 'static> Default for StreamHandle<T> {
268 fn default() -> Self {
269 Self::new()
270 }
271}
272
273pub type TextStreamHandle = StreamHandle<String>;
276
277impl TextStreamHandle {
278 pub fn poll_text(&self) -> bool {
280 self.poll(|acc, item| acc.push_str(&item))
281 }
282}