1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
//! Subject implementations
//!
//! This module implements the Subject pattern for rxRust v1.0.
//!
//! # Architecture
//!
//! The Subject implementation in rxRust v1.0 is designed with the following key
//! goals:
//!
//! 1. **Pointer-Based Design**: The Subject is parameterized by a single smart
//! pointer type `P`, which points to the subscribers container. This
//! decouples the Subject from the `Context` trait and allows for greater
//! flexibility.
//! 2. **Re-Entrancy Safety**: Emissions (`next`/`error`/`complete`) are not
//! re-entrant to keep ordering predictable and prevent accidental feedback
//! loops.
//!
//! # Core Concepts
//!
//! ## Pointer Abstraction
//!
//! Instead of binding the Subject to a `Context` trait directly, we use a
//! single `Subject<P>` struct where `P` is a smart pointer type (e.g.,
//! `Rc<RefCell<...>>` or `Arc<Mutex<...>>`). This allows the same `Subject`
//! struct to be used for both single-threaded (`Local`) and multi-threaded
//! (`Shared`) scenarios by simply changing the pointer type.
//!
//! ## Subscribers Container
//!
//! The `Subscribers` container uses `SmallVec` to optimize for the common case
//! of having few subscribers (<= 2), avoiding heap allocations for these
//! scenarios.
//!
//! ## Re-borrowing Support
//!
//! The Subject implementation supports broadcasting `&mut Item` via
//! re-borrowing, allowing for sequential modification of data in a subscription
//! chain.
//!
//! # Usage
//!
//! Subjects can be created using the `Local` or `Shared` context factories:
//!
//! ```rust
//! use rxrust::prelude::*;
//!
//! // Local Subject
//! let subject = Local::subject();
//! subject
//! .clone()
//! .subscribe(|v: i32| println!("Local: {}", v));
//! subject.clone().next(1);
//!
//! // Shared Subject
//! let shared_subject = Shared::subject();
//! shared_subject
//! .clone()
//! .subscribe(|v: i32| println!("Shared: {}", v));
//! shared_subject.clone().next(1);
//! ```
//!
//! ## Re-Entrancy Policy
//!
//! - **Emissions (`next`/`error`/`complete`) are not re-entrant**. Calling
//! `next`/`error`/`complete` on the same Subject from within one of its own
//! callbacks will **panic**.
//! - **Subscription mutations are allowed** (`subscribe`/`unsubscribe`) inside
//! callbacks. They may be applied after the current emission finishes.
//!
//! If you intentionally need feedback loops (emitting values from within a
//! callback), insert an explicit async boundary using `delay`:
//!
//! ```rust,no_run
//! # #[cfg(not(target_arch = "wasm32"))]
//! # {
//! use std::convert::Infallible;
//!
//! use rxrust::prelude::*;
//!
//! # #[tokio::main]
//! # async fn main() {
//! let mut subject = Shared::subject::<i32, Infallible>();
//! let mut emitter = subject.clone();
//!
//! // Apply delay(0) to the observable chain to create an async boundary.
//! // This schedules the callback on the next scheduler tick, after the
//! // original emission has completed and the Subject's internal borrow
//! // has been released.
//! subject
//! .clone()
//! .delay(Duration::from_millis(0))
//! .subscribe(move |n| {
//! println!("Received: {}", n);
//! if n < 3 {
//! // Safe: delay(0) ensures we're on a new scheduler tick
//! emitter.next(n + 1);
//! }
//! });
//!
//! subject.next(0);
//! tokio::time::sleep(Duration::from_millis(100)).await;
//! # }
//! # }
//! ```
// Re-export the main types for convenience
pub use *;
pub use *;
pub use *;
pub use *;
// Re-export BoxedObserver types from observer module for backward compatibility
pub use crate;