1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
use std::sync::{
Arc,
atomic::{AtomicBool, Ordering},
};
use super::{CellValue, Watchable};
use crate::{
cell::{Cell, CellImmutable, CellMutable},
signal::Signal,
};
pub trait ConcatExt<T>: Watchable<T> {
/// Sequential stream composition - subscribe to second source after first completes.
///
/// Values from the first source are emitted until it completes, then values
/// from the second source are emitted.
///
/// # Example
///
/// ```
/// use hyphae::{Cell, Mutable, Gettable, ConcatExt};
///
/// let first = Cell::new(1);
/// let second = Cell::new(100);
/// let combined = first.concat(&second);
///
/// first.set(2); // Emits 2
/// first.set(3); // Emits 3
/// first.complete(); // Switches to second
/// second.set(200); // Emits 200
/// ```
#[track_caller]
fn concat<W2>(&self, other: &W2) -> Cell<T, CellImmutable>
where
T: CellValue,
W2: Watchable<T> + Clone + Send + Sync + 'static,
Self: Clone + Send + Sync + 'static,
{
let derived = Cell::<T, CellMutable>::new(self.get());
let weak = derived.downgrade();
let first_done = Arc::new(AtomicBool::new(false));
let first_skip = Arc::new(AtomicBool::new(true));
let other = other.clone();
// Subscribe to second source when first completes
let first_done2 = first_done.clone();
let guard1 = self.subscribe(move |signal| {
if let Some(d) = weak.upgrade() {
match signal {
Signal::Value(value) => {
if first_skip.swap(false, Ordering::SeqCst) {
return;
}
d.notify(Signal::Value(value.clone()));
}
Signal::Complete => {
first_done2.store(true, Ordering::SeqCst);
// Now subscribe to second source
let weak2 = d.downgrade();
let second_skip = Arc::new(AtomicBool::new(true));
let guard2 = other.subscribe(move |signal| {
if let Some(d2) = weak2.upgrade() {
match signal {
Signal::Value(value) => {
if second_skip.swap(false, Ordering::SeqCst) {
return;
}
d2.notify(Signal::Value(value.clone()));
}
Signal::Complete => d2.notify(Signal::Complete),
Signal::Error(e) => d2.notify(Signal::Error(e.clone())),
}
}
});
d.own(guard2);
}
Signal::Error(e) => d.notify(Signal::Error(e.clone())),
}
}
});
derived.own(guard1);
derived.lock()
}
}
impl<T, W: Watchable<T>> ConcatExt<T> for W {}
#[cfg(test)]
mod tests {
use super::*;
use crate::Mutable;
#[test]
fn test_concat() {
let first = Cell::new(1);
let second = Cell::new(100);
let combined = first.concat(&second);
let (tx, rx) = std::sync::mpsc::channel::<i32>();
let _guard = combined.subscribe(move |signal| {
if let Signal::Value(v) = signal {
let _ = tx.send(**v);
}
});
// Initial value from first
assert_eq!(rx.recv().ok(), Some(1));
// Values from first
first.set(2);
first.set(3);
assert_eq!(rx.recv().ok(), Some(2));
assert_eq!(rx.recv().ok(), Some(3));
// Complete first - switches to second
first.complete();
// Second's current value is emitted on subscribe
// (but we skip the first emission in the callback)
// Values from second
second.set(200);
assert_eq!(rx.recv().ok(), Some(200));
second.set(300);
assert_eq!(rx.recv().ok(), Some(300));
}
}