1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
//! `with_latest_from(other)` operator — pair every emission with the latest
//! value of another `Gettable` cell.
//!
//! Returns a [`Pipeline`] (specifically a [`MapPipeline`] over self).
//! Materialize to subscribe; further operators can also chain off it.
use super::{super::operators::MapExt, CellValue, Gettable, MapPipeline};
use crate::pipeline::{Pipeline, PipelineSeed};
#[allow(private_bounds)]
pub trait WithLatestFromExt<T: CellValue>: Pipeline<T> + PipelineSeed<T> {
/// On each `self` emission, pair the value with the latest from `other`.
///
/// Unlike `join()` (which emits when either source changes), this only
/// emits when the primary source changes; `other.get()` is read at
/// emission time.
///
/// # Example
///
/// ```
/// use hyphae::{Cell, Gettable, MaterializeDefinite, Mutable, WithLatestFromExt};
///
/// let clicks = Cell::new(0u32);
/// let mouse_pos = Cell::new((0, 0));
///
/// let click_positions = clicks.with_latest_from(&mouse_pos).materialize();
///
/// mouse_pos.set((10, 20)); // no emission
/// mouse_pos.set((30, 40)); // no emission
/// clicks.set(1); // emits (1, (30, 40))
/// ```
#[track_caller]
fn with_latest_from<U, W2>(
&self,
other: &W2,
) -> MapPipeline<Self, T, (T, U), impl Fn(&T) -> (T, U) + Send + Sync + 'static>
where
T: CellValue,
U: CellValue,
W2: Gettable<U> + Clone + Send + Sync + 'static,
Self: Clone + Send + Sync + 'static,
{
let other = other.clone();
self.clone().map(move |t| (t.clone(), other.get()))
}
}
impl<T: CellValue, P: Pipeline<T> + PipelineSeed<T>> WithLatestFromExt<T> for P {}
#[cfg(test)]
mod tests {
use super::*;
use crate::{
Cell, MaterializeDefinite, Mutable, Signal, cell::CellImmutable, traits::Watchable,
};
#[test]
fn test_with_latest_from() {
let source = Cell::new(0);
let other = Cell::new("a".to_string());
let combined: Cell<(i32, String), CellImmutable> =
source.with_latest_from(&other).materialize();
let (tx, rx) = std::sync::mpsc::channel::<(i32, String)>();
let _guard = combined.subscribe(move |signal| {
if let Signal::Value(v) = signal {
let _ = tx.send((**v).clone());
}
});
// Initial combined value
assert_eq!(rx.recv().ok(), Some((0, "a".to_string())));
// Other changes - no emission
other.set("b".to_string());
other.set("c".to_string());
assert!(rx.try_recv().is_err());
// Source changes - emits with latest other
source.set(1);
assert_eq!(rx.recv().ok(), Some((1, "c".to_string())));
// Other changes again - no emission
other.set("d".to_string());
assert!(rx.try_recv().is_err());
// Source changes - emits with latest other
source.set(2);
assert_eq!(rx.recv().ok(), Some((2, "d".to_string())));
}
}