1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
// Copyright 2025 Umberto Gotti <umberto.gotti@umbertogotti.dev>
// Licensed under the Apache License, Version 2.0
// http://www.apache.org/licenses/LICENSE-2.0
use crate::StreamItem;
use alloc::vec::Vec;
use async_channel::Sender;
pub(crate) struct SubjectState<T> {
pub(crate) closed: bool,
pub(crate) senders: Vec<Sender<StreamItem<T>>>,
}
macro_rules! define_subject_impl {
($($bounds:tt)*) => {
use crate::fluxion_mutex::Mutex;
use crate::{FluxionError, StreamItem, SubjectError};
use alloc::boxed::Box;
use alloc::sync::Arc;
use alloc::vec::Vec;
use core::pin::Pin;
use futures::stream::Stream;
use crate::fluxion_subject::implementation::SubjectState;
type SubjectBoxStream<T> = Pin<Box<dyn Stream<Item = StreamItem<T>> + $($bounds)* 'static>>;
/// A hot, unbounded subject that broadcasts items to all current subscribers.
///
/// `FluxionSubject` is the entry point for pushing values into a Fluxion stream pipeline.
/// It implements a publish-subscribe pattern where multiple subscribers can receive
/// the same items.
///
/// See the [module documentation](crate::fluxion_subject) for examples and more details.
pub struct FluxionSubject<T: Clone + $($bounds)* 'static> {
state: Arc<Mutex<SubjectState<T>>>,
}
impl<T: Clone + $($bounds)* 'static> FluxionSubject<T> {
/// Creates a new unbounded subject with no subscribers.
///
/// The subject starts in an open state and can immediately accept
/// subscriptions and items.
#[must_use]
pub fn new() -> Self {
Self {
state: Arc::new(Mutex::new(SubjectState {
closed: false,
senders: Vec::new(),
})),
}
}
/// Subscribe to this subject and receive a stream of `StreamItem<T>`.
/// Late subscribers do not receive previously sent items.
pub fn subscribe(&self) -> Result<SubjectBoxStream<T>, SubjectError> {
let mut state = self.state.lock();
if state.closed {
return Err(SubjectError::Closed);
}
let (tx, rx) = async_channel::unbounded();
state.senders.push(tx);
Ok(Box::pin(rx))
}
/// Send an item to all active subscribers.
///
/// Returns a subject-specific error if the operation fails:
/// - `SubjectError::Closed` if the subject has been closed
pub fn send(&self, item: StreamItem<T>) -> Result<(), SubjectError> {
let mut state = self.state.lock();
if state.closed {
return Err(SubjectError::Closed);
}
let mut next_senders = Vec::with_capacity(state.senders.len());
for tx in state.senders.drain(..) {
if tx.try_send(item.clone()).is_ok() {
next_senders.push(tx);
}
}
state.senders = next_senders;
Ok(())
}
/// Send a value to all active subscribers.
///
/// This is a convenience wrapper around `send(StreamItem::Value(value))`.
///
/// # Errors
///
/// Returns `SubjectError::Closed` if the subject has been closed.
pub fn next(&self, value: T) -> Result<(), SubjectError> {
self.send(StreamItem::Value(value))
}
/// Convenience helper to send a stream error to all subscribers and terminate the subject.
///
/// This bridges stream errors (FluxionError) with subject operations (SubjectError).
pub fn error(&self, err: FluxionError) -> Result<(), SubjectError> {
let result = self.send(StreamItem::Error(err));
self.close();
result
}
/// Closes the subject, completing all subscriber streams.
///
/// After closing:
/// - All existing subscribers will receive `None` on their next poll (stream ends).
/// - `send()` and `error()` will return `SubjectError::Closed`.
/// - `subscribe()` will return `SubjectError::Closed`.
///
/// Closing is idempotent—calling it multiple times has no additional effect.
pub fn close(&self) {
let mut state = self.state.lock();
state.closed = true;
state.senders.clear();
}
/// Returns `true` if the subject has been closed.
///
/// A closed subject cannot accept new items or subscribers.
#[must_use]
pub fn is_closed(&self) -> bool {
self.state.lock().closed
}
/// Returns the number of currently active subscribers.
///
/// Note: This count is updated lazily—dropped subscribers are removed
/// on the next `send()` call, not immediately when dropped.
#[must_use]
pub fn subscriber_count(&self) -> usize {
self.state.lock().senders.len()
}
}
impl<T: Clone + $($bounds)* 'static> Default for FluxionSubject<T> {
fn default() -> Self {
Self::new()
}
}
impl<T: Clone + $($bounds)* 'static> Clone for FluxionSubject<T> {
fn clone(&self) -> Self {
Self {
state: self.state.clone(),
}
}
}
};
}