1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
use {
super::{
super::{Datum, Streams},
Producer,
},
crate::{
NetworkId,
StreamId,
discovery::PeerInfo,
streams::status::ChannelConditions,
tickets::TicketValidator,
},
core::{any::Any, marker::PhantomData},
std::sync::Arc,
tokio::sync::mpsc::UnboundedSender,
};
#[derive(Debug, thiserror::Error)]
pub enum Error<D: Datum> {
/// A producer for the given stream id already exists.
///
/// This error is returned when attempting to create a new producer
/// through the builder while one already exists for the same stream id.
///
/// If you need multiple producers for the same datum type, consider using
/// the default `produce` method which allows multiple instances to
/// share the same underlying stream.
#[error("Producer for this stream id already exists")]
AlreadyExists(Producer<D>),
}
/// Configuration options for a stream producer.
pub struct ProducerConfig {
/// The stream id this producer is associated is producing for.
/// There can only be one producer per stream id on one network node.
pub stream_id: StreamId,
/// The buffer size for the producer's internal channel that holds datum
/// before they are sent to connected consumers. If the buffer is full, calls
/// to `send` on the producer will await until there is space available.
pub buffer_size: usize,
/// If set to true, the producer will disconnect slow consumers that are
/// unable to keep up with the data production rate. If the backlog of a
/// consumer inflight datums grows beyond `buffer_size` it will be
/// disconnected. Default to true.
pub disconnect_lagging: bool,
/// Sets a predicate function that is used to determine whether to
/// accept or reject incoming consumer connections.
pub require: Box<dyn Fn(&PeerInfo) -> bool + Send + Sync>,
/// A function that specifies conditions under which a channel is
/// considered online and can publish data to consumers. Here you can
/// specify conditions such as minimum number of subscribers, required tags,
/// or custom predicates, that must be met before publishing is allowed
/// otherwise sending data through the producer will fail.
///
/// This follows the same API as the `producer.when().subscribed()` method.
/// By default this is set to allow publishing if there is at least one
/// subscriber.
pub online_when:
Box<dyn Fn(ChannelConditions) -> ChannelConditions + Send + Sync>,
/// The network id this producer is associated with.
pub network_id: NetworkId,
/// Maximum number of subscribers allowed for this producer.
///
/// Defaults to unlimited if not set.
pub max_consumers: usize,
/// Ticket validators for authenticating consumer connections.
///
/// When non-empty, consumers must present valid tickets that pass all
/// configured validators before being accepted. Tickets that carry an
/// expiration are tracked and consumers are automatically disconnected
/// when their earliest ticket expires.
pub ticket_validators: Vec<Arc<dyn TicketValidator>>,
/// Optional sink for unsent datum that were not delivered to any consumers
/// because the datum did not meet any subscription criteria of any active
/// consumers.
///
/// This is type-erased to allow config to be stored without
/// knowing the datum type but is expected to be of type
/// [`tokio::sync::mpsc::UnboundedSender<D>`].
pub(crate) undelivered: Option<Box<dyn Any + Send + Sync>>,
}
/// Configurable builder for assembling a new producer instances for a specific
/// datum type `D`.
pub struct Builder<'s, D: Datum> {
config: ProducerConfig,
streams: &'s Streams,
_marker: PhantomData<D>,
}
/// Public API
impl<D: Datum> Builder<'_, D> {
/// Adds a ticket validator for authenticating consumer connections.
///
/// Each consumer attempting to subscribe must present valid tickets that
/// pass all configured validators. If a ticket carries an expiration,
/// the producer will automatically disconnect the consumer when the
/// earliest ticket expires. Can be called multiple times to require
/// multiple types of tickets.
#[must_use]
pub fn require_ticket(mut self, validator: impl TicketValidator) -> Self {
self.config.ticket_validators.push(Arc::new(validator));
self
}
/// Adds a peer eligibility requirement for incoming consumer connections.
///
/// The predicate receives a [`PeerInfo`] combining the peer's
/// self-reported [`PeerEntry`](crate::discovery::PeerEntry) with
/// locally-observed metrics like RTT. Since `PeerInfo` implements
/// `Deref<Target = PeerEntry>`, existing predicates that call methods
/// like `peer.tags()` or `peer.has_valid_ticket()` continue to work
/// unchanged.
///
/// When called multiple times, all predicates must pass — AND
/// composition — so a consumer is accepted only if every requirement
/// is satisfied.
///
/// The default requirement accepts all consumers.
#[must_use]
pub fn require<F>(mut self, pred: F) -> Self
where
F: Fn(&PeerInfo) -> bool + Send + Sync + 'static,
{
let prev = self.config.require;
self.config.require = Box::new(move |peer| prev(peer) && pred(peer));
self
}
/// A function that produces channel conditions under which a datum can be
/// considered publishable to a consumer. Here you can specify conditions
/// such as minimum number of subscribers, required tags, or custom
/// predicates, that must be met before publishing is allowed otherwise
/// sending data through the producer will fail.
///
/// This follows the same API as the `producer.when().subscribed()` method.
/// By default this is set to allow publishing if there is at least one
/// subscriber.
#[must_use]
pub fn online_when<F>(mut self, f: F) -> Self
where
F: Fn(ChannelConditions) -> ChannelConditions + Send + Sync + 'static,
{
self.config.online_when = Box::new(f);
self
}
/// If set to true, the producer will disconnect slow consumers that are
/// unable to keep up with the data production rate. If the backlog of a
/// consumer inflight datums grows beyond `buffer_size` it will be
/// disconnected.
#[must_use]
pub const fn disconnect_lagging(mut self, disconnect: bool) -> Self {
self.config.disconnect_lagging = disconnect;
self
}
/// Sets the buffer size for the producer's internal channel that holds datum
/// before they are sent to connected consumers. If the buffer is full, calls
/// to `send` on the producer will await until there is space available.
#[must_use]
pub const fn with_buffer_size(mut self, size: usize) -> Self {
self.config.buffer_size = size;
self
}
/// Sets the maximum number of subscribers allowed for this producer.
/// Defaults to unlimited if not set.
#[must_use]
pub const fn with_max_consumers(mut self, max: usize) -> Self {
self.config.max_consumers = max;
self
}
/// Sets the stream id this producer is associated is producing for.
/// There can only be one producer per stream id on one network node.
///
/// If not set, defaults to the stream id of datum type `D`.
#[must_use]
pub fn with_stream_id(mut self, stream_id: impl Into<StreamId>) -> Self {
self.config.stream_id = stream_id.into();
self
}
/// Sets an optional sink for undelivered datum that were not delivered to
/// any consumers because they did not meet any subscription criteria of
/// active subscriptions or because there were no active subscribers.
///
/// Note that in default configuration, when there are not active
/// subscribers, the producer is considered offline and will not accept any
/// new datum to be sent. However, if the `online_when` condition is
/// customized to allow publishing even when there are no subscribers, this
/// sink can be used to capture datum that would otherwise be dropped.
#[must_use]
pub fn with_undelivered_sink(mut self, sink: UnboundedSender<D>) -> Self {
self.config.undelivered = Some(Box::new(sink));
self
}
/// Builds a new producer with the given configuration for this stream id.
/// If there is already an existing producer for this stream id, an error
/// is returned containing the existing producer created using the original
/// configuration.
pub fn build(self) -> Result<Producer<D>, Error<D>> {
if let Some(existing) = self.streams.sinks.open(self.config.stream_id) {
return Err(Error::AlreadyExists(existing.sender()));
}
self
.streams
.sinks
.create::<D>(self.config)
.map(|handle| handle.sender())
.map_err(|existing| Error::AlreadyExists(existing.sender()))
}
}
impl<'s, D: Datum> Builder<'s, D> {
pub(in crate::streams) fn new(streams: &'s Streams) -> Self {
Self {
streams,
config: ProducerConfig {
buffer_size: 1024,
disconnect_lagging: true,
stream_id: D::derived_stream_id(),
require: Box::new(|_| true),
online_when: Box::new(|c| c.minimum_of(1)),
max_consumers: usize::MAX,
network_id: *streams.local.network_id(),
ticket_validators: Vec::new(),
undelivered: None,
},
_marker: PhantomData,
}
}
}