rxrust/subscription/
tuple.rs

1use super::Subscription;
2
3/// TupleSubscription that combines two subscriptions
4///
5/// This is a simple tuple-based subscription designed specifically for merge
6/// operators and other multi-source scenarios in the v1 architecture. It
7/// provides a straightforward way to manage two concurrent subscriptions
8/// without the complexity of reference counting.
9///
10/// # Examples
11///
12/// ```rust
13/// use rxrust::prelude::*;
14///
15/// // Merge operator usage
16/// let obs1 = Local::from_iter([1, 3, 5]);
17/// let obs2 = Local::from_iter([2, 4, 6]);
18/// let merged = obs1.merge(obs2);
19/// let subscription = merged.subscribe(|_| {});
20///
21/// // The TupleSubscription handles cleanup automatically
22/// drop(subscription);
23/// ```
24pub struct TupleSubscription<U1, U2> {
25  unsub1: U1,
26  unsub2: U2,
27}
28
29impl<U1, U2> TupleSubscription<U1, U2> {
30  /// Create a new tuple subscription for multi-source operators
31  pub fn new(unsub1: U1, unsub2: U2) -> Self { TupleSubscription { unsub1, unsub2 } }
32}
33
34impl<U1, U2> Subscription for TupleSubscription<U1, U2>
35where
36  U1: Subscription,
37  U2: Subscription,
38{
39  fn unsubscribe(self) {
40    // Unsubscribe from both source subscriptions
41    self.unsub1.unsubscribe();
42    self.unsub2.unsubscribe();
43  }
44
45  fn is_closed(&self) -> bool {
46    // Return true only when both subscriptions are closed
47    self.unsub1.is_closed() && self.unsub2.is_closed()
48  }
49}
50
51#[cfg(test)]
52mod tests {
53  use std::{cell::RefCell, rc::Rc};
54
55  use super::*;
56
57  /// A mock subscription for testing
58  struct MockSubscription {
59    closed: Rc<RefCell<bool>>,
60  }
61
62  impl MockSubscription {
63    fn new() -> (Self, Rc<RefCell<bool>>) {
64      let closed = Rc::new(RefCell::new(false));
65      (Self { closed: closed.clone() }, closed)
66    }
67  }
68
69  impl Subscription for MockSubscription {
70    fn unsubscribe(self) { *self.closed.borrow_mut() = true; }
71
72    fn is_closed(&self) -> bool { *self.closed.borrow() }
73  }
74
75  #[rxrust_macro::test]
76  fn test_tuple_subscription() {
77    let (mock1, closed1) = MockSubscription::new();
78    let (mock2, closed2) = MockSubscription::new();
79
80    let tuple_sub = TupleSubscription::new(mock1, mock2);
81
82    assert!(!tuple_sub.is_closed());
83    tuple_sub.unsubscribe();
84
85    assert!(*closed1.borrow());
86    assert!(*closed2.borrow());
87  }
88}