1#![expect(clippy::module_name_repetitions, reason = "needs refactoring")]
5
6pub mod cell;
7pub mod progressable_cell;
8
9use std::{
10 cell::RefCell,
11 fmt,
12 ops::{Deref, DerefMut},
13};
14
15use futures::{
16 channel::{mpsc, oneshot},
17 future::{self, LocalBoxFuture},
18 stream::{self, LocalBoxStream, StreamExt as _},
19};
20
21#[doc(inline)]
22pub use self::{cell::ObservableCell, progressable_cell::ProgressableCell};
23use crate::subscribers_store::{
24 SubscribersStore as _, progressable, progressable::Processed,
25};
26
27type DefaultSubscribers<D> = RefCell<Vec<UniversalSubscriber<D>>>;
29
30pub type Observable<D> = ObservableField<D, DefaultSubscribers<D>>;
34
35pub type Progressable<D> = ObservableField<D, progressable::SubStore<D>>;
41
42#[derive(Debug)]
51pub struct ObservableField<D, S> {
52 data: D,
54
55 subs: S,
57}
58
59impl<D> ObservableField<D, RefCell<Vec<UniversalSubscriber<D>>>>
60where
61 D: 'static,
62{
63 #[must_use]
68 pub const fn new(data: D) -> Self {
69 Self { data, subs: RefCell::new(Vec::new()) }
70 }
71}
72
73impl<D, S> ObservableField<D, S>
74where
75 D: 'static,
76 S: Whenable<D>,
77{
78 pub fn when<F>(
81 &self,
82 assert_fn: F,
83 ) -> LocalBoxFuture<'static, Result<(), DroppedError>>
84 where
85 F: Fn(&D) -> bool + 'static,
86 {
87 if (assert_fn)(&self.data) {
90 Box::pin(future::ok(()))
91 } else {
92 self.subs.when(Box::new(assert_fn))
93 }
94 }
95}
96
97impl<D: 'static> Progressable<D> {
98 #[must_use]
103 pub fn new(data: D) -> Self {
104 Self { data, subs: progressable::SubStore::default() }
105 }
106}
107
108impl<D> Progressable<D>
109where
110 D: Clone + 'static,
111{
112 pub fn subscribe(
117 &self,
118 ) -> LocalBoxStream<'static, progressable::Guarded<D>> {
119 let data = self.subs.wrap(self.data.clone());
120 Box::pin(stream::once(async move { data }).chain(self.subs.subscribe()))
121 }
122
123 pub fn when_all_processed(&self) -> Processed<'static> {
126 self.subs.when_all_processed()
127 }
128}
129
130impl<D> Observable<D>
131where
132 D: Clone + 'static,
133{
134 pub fn subscribe(&self) -> LocalBoxStream<'static, D> {
138 let data = self.data.clone();
139 let (tx, rx) = mpsc::unbounded();
140 self.subs.borrow_mut().push(UniversalSubscriber::Subscribe(tx));
141
142 Box::pin(stream::once(async move { data }).chain(Box::pin(rx)))
143 }
144}
145
146impl<D, S> ObservableField<D, S>
147where
148 D: PartialEq + 'static,
149 S: Whenable<D>,
150{
151 pub fn when_eq(
157 &self,
158 should_be: D,
159 ) -> LocalBoxFuture<'static, Result<(), DroppedError>> {
160 self.when(move |data| data == &should_be)
161 }
162}
163
164impl<D, S> ObservableField<D, S>
165where
166 S: OnObservableFieldModification<D>,
167 D: Clone + PartialEq,
168{
169 pub fn borrow_mut(&mut self) -> MutObservableFieldGuard<'_, D, S> {
181 MutObservableFieldGuard {
182 value_before_mutation: self.data.clone(),
183 data: &mut self.data,
184 subs: &mut self.subs,
185 }
186 }
187}
188
189pub trait OnObservableFieldModification<D> {
192 fn on_modify(&mut self, data: &D);
200}
201
202pub enum UniversalSubscriber<D> {
206 When {
208 sender: RefCell<Option<oneshot::Sender<()>>>,
211
212 assert_fn: Box<dyn Fn(&D) -> bool>,
215 },
216
217 Subscribe(mpsc::UnboundedSender<D>),
219}
220
221impl<D> fmt::Debug for UniversalSubscriber<D> {
222 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
223 match self {
224 Self::When { .. } => {
225 write!(f, "UniversalSubscriber::When")
226 }
227 Self::Subscribe(_) => {
228 write!(f, "UniversalSubscriber::Subscribe")
229 }
230 }
231 }
232}
233
234#[derive(Clone, Copy, Debug)]
237pub struct DroppedError;
238
239impl fmt::Display for DroppedError {
240 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
241 write!(f, "Observable value has been dropped")
242 }
243}
244
245impl From<oneshot::Canceled> for DroppedError {
246 fn from(_: oneshot::Canceled) -> Self {
247 Self
248 }
249}
250
251pub trait Whenable<D: 'static> {
254 fn when(
259 &self,
260 assert_fn: Box<dyn Fn(&D) -> bool>,
261 ) -> LocalBoxFuture<'static, Result<(), DroppedError>>;
262}
263
264impl<D: 'static> Whenable<D> for RefCell<Vec<UniversalSubscriber<D>>> {
265 fn when(
266 &self,
267 assert_fn: Box<dyn Fn(&D) -> bool>,
268 ) -> LocalBoxFuture<'static, Result<(), DroppedError>> {
269 let (tx, rx) = oneshot::channel();
270 self.borrow_mut().push(UniversalSubscriber::When {
271 sender: RefCell::new(Some(tx)),
272 assert_fn,
273 });
274 Box::pin(async move { Ok(rx.await?) })
275 }
276}
277
278impl<D: Clone + 'static> OnObservableFieldModification<D>
279 for progressable::SubStore<D>
280{
281 fn on_modify(&mut self, data: &D) {
282 self.send_update(data.clone());
283 }
284}
285
286impl<D: Clone> OnObservableFieldModification<D>
287 for RefCell<Vec<UniversalSubscriber<D>>>
288{
289 fn on_modify(&mut self, data: &D) {
290 self.borrow_mut().retain(|sub| match sub {
291 UniversalSubscriber::When { assert_fn, sender } => {
292 #[expect(clippy::expect_used, reason = "single use expected")]
293 if (assert_fn)(data) {
294 sender
295 .borrow_mut()
296 .take()
297 .expect("`UniversalSubscriber::When` used already")
298 .send(())
299 .is_ok_and(|()| false)
300 } else {
301 true
302 }
303 }
304 UniversalSubscriber::Subscribe(sender) => {
305 sender.unbounded_send(data.clone()).is_ok()
306 }
307 });
308 }
309}
310
311impl<D, S> Deref for ObservableField<D, S> {
312 type Target = D;
313
314 fn deref(&self) -> &Self::Target {
315 &self.data
316 }
317}
318
319impl<D, S> fmt::Display for ObservableField<D, S>
320where
321 D: fmt::Display,
322{
323 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
324 fmt::Display::fmt(&self.data, f)
325 }
326}
327
328#[derive(Debug)]
335pub struct MutObservableFieldGuard<'a, D, S>
336where
337 S: OnObservableFieldModification<D>,
338 D: PartialEq,
339{
340 data: &'a mut D,
342
343 subs: &'a mut S,
345
346 value_before_mutation: D,
348}
349
350impl<D, S> Deref for MutObservableFieldGuard<'_, D, S>
351where
352 S: OnObservableFieldModification<D>,
353 D: PartialEq,
354{
355 type Target = D;
356
357 fn deref(&self) -> &Self::Target {
358 self.data
359 }
360}
361
362impl<D, S> DerefMut for MutObservableFieldGuard<'_, D, S>
363where
364 S: OnObservableFieldModification<D>,
365 D: PartialEq,
366{
367 fn deref_mut(&mut self) -> &mut Self::Target {
368 self.data
369 }
370}
371
372impl<D, S> Drop for MutObservableFieldGuard<'_, D, S>
373where
374 S: OnObservableFieldModification<D>,
375 D: PartialEq,
376{
377 fn drop(&mut self) {
378 if self.data != &self.value_before_mutation {
379 self.subs.on_modify(self.data);
380 }
381 }
382}
383
384#[cfg(test)]
385mod tests {
386 use std::{cell::RefCell, time::Duration};
387
388 use futures::{StreamExt as _, poll, task::Poll};
389 use tokio::time::timeout;
390
391 use crate::{Observable, Progressable};
392
393 #[tokio::test]
394 async fn subscriber_receives_current_data() {
395 let field = Observable::new(9);
396 let current_data = field.subscribe().next().await.unwrap();
397 assert_eq!(current_data, 9);
398 }
399
400 #[tokio::test]
401 async fn when_eq_resolves_if_value_eq_already() {
402 let field = Observable::new(9);
403 field.when_eq(9).await.unwrap();
404 }
405
406 #[tokio::test]
407 async fn when_eq_doesnt_resolve_if_value_is_not_eq() {
408 let field = Observable::new(9);
409 let _ = timeout(Duration::from_millis(50), field.when_eq(0))
410 .await
411 .unwrap_err();
412 }
413
414 #[tokio::test]
415 async fn current_value_is_provided_into_assert_fn_on_when_call() {
416 let field = Observable::new(9);
417
418 timeout(Duration::from_millis(50), field.when(|val| val == &9))
419 .await
420 .unwrap()
421 .unwrap();
422 }
423
424 #[tokio::test]
425 async fn value_updates_are_sent_to_subs() {
426 let mut field = Observable::new(0);
427 let mut subscription_on_changes = field.subscribe();
428
429 for _ in 0..100 {
430 *field.borrow_mut() += 1;
431 }
432 loop {
433 if let Some(change) = subscription_on_changes.next().await {
434 if change == 100 {
435 break;
436 }
437 } else {
438 panic!("Stream ended too early!");
439 }
440 }
441 }
442
443 #[tokio::test]
444 async fn when_resolves_on_value_update() {
445 let mut field = Observable::new(0);
446 let subscription = field.when(|change| change == &100);
447
448 for _ in 0..100 {
449 *field.borrow_mut() += 1;
450 }
451
452 timeout(Duration::from_millis(50), subscription)
453 .await
454 .unwrap()
455 .unwrap();
456 }
457
458 #[tokio::test]
459 async fn when_eq_resolves_on_value_update() {
460 let mut field = Observable::new(0);
461 let subscription = field.when_eq(100);
462
463 for _ in 0..100 {
464 *field.borrow_mut() += 1;
465 }
466
467 timeout(Duration::from_millis(50), subscription)
468 .await
469 .unwrap()
470 .unwrap();
471 }
472
473 #[tokio::test]
474 async fn when_returns_dropped_error_on_drop() {
475 let field = Observable::new(0);
476 let subscription = field.when(|change| change == &100);
477 drop(field);
478 let _ = subscription.await.unwrap_err();
479 }
480
481 #[tokio::test]
482 async fn when_eq_returns_dropped_error_on_drop() {
483 let field = Observable::new(0);
484 let subscription = field.when_eq(100);
485 drop(field);
486 let _ = subscription.await.unwrap_err();
487 }
488
489 #[tokio::test]
490 async fn stream_ends_when_reactive_field_is_dropped() {
491 let field = Observable::new(0);
492 let subscription = field.subscribe();
493 drop(field);
494 assert!(subscription.skip(1).next().await.is_none());
495 }
496
497 #[tokio::test]
498 async fn no_update_should_be_emitted_on_field_mutation() {
499 let mut field = Observable::new(0);
500 let subscription = field.subscribe();
501 *field.borrow_mut() = 0;
502 let _ = timeout(
503 Duration::from_millis(50),
504 Box::pin(subscription.skip(1).next()),
505 )
506 .await
507 .unwrap_err();
508 }
509
510 #[tokio::test]
511 async fn only_last_update_should_be_sent_to_subscribers() {
512 let mut field = Observable::new(0);
513 let subscription = field.subscribe();
514 let mut field_mut_guard = field.borrow_mut();
515 *field_mut_guard = 100;
516 *field_mut_guard = 200;
517 *field_mut_guard = 300;
518 drop(field_mut_guard);
519 assert_eq!(subscription.skip(1).next().await.unwrap(), 300);
520 }
521
522 #[tokio::test]
523 async fn reactive_with_refcell_inside() {
524 let field = RefCell::new(Observable::new(0));
525 let subscription = field.borrow().when_eq(1);
526 *field.borrow_mut().borrow_mut() = 1;
527 timeout(Duration::from_millis(50), Box::pin(subscription))
528 .await
529 .unwrap()
530 .unwrap();
531 }
532
533 #[tokio::test]
534 async fn when_all_processed_works() {
535 let mut field = Progressable::new(1);
536 assert_eq!(poll!(field.when_all_processed()), Poll::Ready(()));
537 *field.borrow_mut() = 2;
538 assert_eq!(poll!(field.when_all_processed()), Poll::Ready(()));
539
540 let mut subscribe = field.subscribe();
541 assert_eq!(poll!(field.when_all_processed()), Poll::Pending);
542
543 assert_eq!(subscribe.next().await.unwrap().into_inner(), 2);
544 *field.borrow_mut() = 3;
545 assert_eq!(poll!(field.when_all_processed()), Poll::Pending);
546 assert_eq!(subscribe.next().await.unwrap().into_inner(), 3);
547 assert_eq!(poll!(field.when_all_processed()), Poll::Ready(()));
548 }
549}