mosaik 0.3.13

A Rust runtime for building self-organizing, leaderless distributed systems.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
use {
	super::super::{
		Streams,
		accept::{ConsumerHandshake, StartStream},
		consumer::builder::ConsumerConfig,
		status::{State, Stats},
	},
	crate::{
		Datum,
		discovery::{Discovery, PeerEntry},
		network::{LocalNode, error::*, link::*},
		primitives::Short,
		streams::{NotAllowed, StreamNotFound, status::ChannelInfo},
	},
	backoff::backoff::Backoff,
	core::{future::pending, ops::ControlFlow, time::Duration},
	futures::{FutureExt, TryFutureExt},
	iroh::EndpointAddr,
	std::sync::Arc,
	tokio::sync::{mpsc, watch},
	tokio_util::sync::{CancellationToken, ReusableBoxFuture},
};

/// Worker task that manages receiving data from one remote producer peer.
///
/// Notes:
///
/// - Individual receiver workers are spawned for each connected producer peer.
///
/// - A Producer will reject connections from consumers that are not known in
///   its discovery catalog. So if a consumer attempts to subscribe to a stream
///   from a producer and the connection is rejected with
///   [`CloseReason::UnknownPeer`] the consumer should trigger a catalog sync
///   with the producer and retry the subscription again.
pub(super) struct Receiver<D: Datum> {
	/// Configuration for this consumer.
	config: Arc<ConsumerConfig>,

	/// Discovery system handle used to trigger catalog syncs with producers that
	/// are not recognizing this consumer.
	discovery: Discovery,

	/// Local socket, used to initiate connections to remote peers on the Streams
	/// protocol.
	local: LocalNode,

	/// The remote producer peer entry snapshot as known at the time of worker
	/// creation.
	peer: Arc<PeerEntry>,

	/// Channel for sending received data to the consumer handle for the public
	/// api to consume.
	data_tx: mpsc::UnboundedSender<(D, usize)>,

	/// Watch channel for reporting the current state of this receiver worker
	/// connection with the remote producer.
	state_tx: watch::Sender<State>,

	/// Triggered when the receiver worker should shut down.
	cancel: CancellationToken,

	/// Reusable future for receiving the next datum from the remote producer.
	///
	/// The receiver future always carries the current physical link along with
	/// it to enable repairing dropped connections according to the backoff
	/// policy.
	next_recv: ReusableBoxFuture<'static, (DatumRecvResult<D>, Link<Streams>)>,

	/// Reusable future for connecting to the remote producer.
	///
	/// This is used to keep track of the ongoing connection attempt when
	/// reconnections are needed on recoverable errors.
	next_connect: ReusableBoxFuture<'static, Result<Link<Streams>, LinkError>>,

	/// Backoff policy for reconnecting to the remote producer on recoverable
	/// errors that is currently being applied since the last successful receive.
	backoff: Option<Box<dyn Backoff + Send + Sync + 'static>>,

	/// Statistics about this receiver worker.
	stats: Arc<Stats>,
}

impl<D: Datum> Receiver<D> {
	/// Spawns a new receiver worker for the specified remote producer peer.
	///
	/// An instance of this worker should be created for each connected producer
	/// peer for a consumer instance.
	pub fn spawn(
		peer: PeerEntry,
		local: &LocalNode,
		discovery: &Discovery,
		cancel: &CancellationToken,
		data_tx: &mpsc::UnboundedSender<(D, usize)>,
		config: Arc<ConsumerConfig>,
	) -> ChannelInfo {
		let local = local.clone();
		let cancel = cancel.child_token();
		let data_tx = data_tx.clone();
		let discovery = discovery.clone();
		let peer = Arc::new(peer);
		let stats = Arc::new(Stats::default());
		let next_recv = ReusableBoxFuture::new(pending());
		let next_connect = ReusableBoxFuture::new(pending());
		let (state_tx, state) = watch::channel(State::Connecting);

		// Construct the status signaling info for this receiver worker.
		let channel_info = ChannelInfo {
			stream_id: config.stream_id,
			criteria: config.criteria.clone(),
			producer_id: *peer.id(),
			consumer_id: local.id(),
			stats: Arc::clone(&stats),
			peer: Arc::clone(&peer),
			state,
		};

		let worker = Self {
			local,
			discovery,
			data_tx,
			state_tx,
			next_recv,
			next_connect,
			backoff: None,
			cancel: cancel.child_token(),
			peer: Arc::clone(&peer),
			stats: Arc::clone(&stats),
			config,
		};

		tokio::spawn(worker.run());

		channel_info
	}
}

impl<D: Datum> Receiver<D> {
	pub async fn run(mut self) {
		// initial connection attempt
		let connect_fut = self.connect();
		self.next_connect.set(connect_fut);
		let mut conn_state = self.state_tx.subscribe();

		loop {
			tokio::select! {
				// Triggered when the next datum read future is ready.
				// This could be either a successfully received datum or an error.
				(result, link) = &mut self.next_recv => {
					self.on_next_recv(result, link);
				}

				// Triggered when a connection attempt to the remote producer
				// completes.
				result = &mut self.next_connect => {
					self.handle_connect_result(result);
				}

				// Triggered when the receiver worker state changes to terminated.
				// Used to break out of the loop when an unrecoverable error occurs.
				_ = conn_state.wait_for(|s| *s == State::Terminated) => {
					tracing::debug!(
						stream_id = %Short(self.config.stream_id),
						producer_id = %Short(&self.peer.id()),
						"stream subscription terminated",
					);
					break;
				}
			}
		}
	}

	/// Handles the result of receiving the next datum from the remote producer.
	///
	/// Ensures that the next receive future is properly set up for the next datum
	/// and honors the cancellation signal.
	fn on_next_recv(&mut self, result: DatumRecvResult<D>, link: Link<Streams>) {
		match result {
			// a datum was successfully received
			Ok((datum, bytes_len)) => {
				// forward the received datum to the consumer worker
				// for delivery to public api consumer handle.
				self.data_tx.send((datum, bytes_len)).ok();

				// update stats
				self.stats.increment_datums();
				self.stats.increment_bytes(bytes_len);

				// if not cancelled, prepare to receive the next datum
				if !self.cancel.is_cancelled() {
					// reset the global backoff policy on successful receive
					if let Some(ref mut backoff) = self.backoff {
						backoff.reset();
					}

					self.next_recv.set(self.make_next_recv_future(link));
				}
			}
			// an error occurred while receiving the datum,
			Err(error) => {
				// kick off the sad path handling for the current link.
				self.handle_recv_error(error, Some(link));
			}
		}
	}

	/// Creates a future that receives the next datum from the remote producer
	/// over the specified link.
	///
	/// The future is cancellable using the worker's cancellation token and
	/// carries the link along with it for further receives or reconnections.
	///
	/// The first instance of this future is created by [`connect`].
	#[expect(clippy::unused_self)]
	fn make_next_recv_future(
		&self,
		mut link: Link<Streams>,
	) -> impl Future<Output = (Result<(D, usize), LinkError>, Link<Streams>)> + 'static
	{
		// bind the the receive future along with the transport link it is using so
		// we can repair the connection if needed.
		async move {
			let fut = link.recv_with_size::<D>().map_err(LinkError::Recv);
			(fut.await, link)
		}
		.fuse()
	}

	/// Attempts to connect to the remote producer and perform the stream
	/// subscription handshake. This method will retry connections according
	/// to the backoff policy specified in the configuration.
	fn connect(
		&mut self,
	) -> impl Future<Output = Result<Link<Streams>, LinkError>> + 'static {
		self.state_tx.send(State::Connecting).ok();

		let net_id = *self.local.network_id();
		let cancel = self.cancel.clone();
		let config = Arc::clone(&self.config);
		let peer_id = *self.peer.id();
		let peer_addr = self.peer.address().clone();
		let backoff_fut = self.apply_backoff();
		let connect_fut =
			self.local.connect_with_cancel::<Streams>(peer_addr, cancel);

		async move {
			// apply backoff before attempting to reconnect
			if backoff_fut.await.is_break() {
				return Err(LinkError::Cancelled);
			}

			tracing::debug!(
				stream_id = %Short(config.stream_id),
				producer_id = %Short(&peer_id),
				criteria = ?config.criteria,
				"connecting to stream producer",
			);

			// attempt to establish a new connection to the remote producer
			let mut link = connect_fut.await?;

			// Send the consumer handshake to the producer
			link
				.send(&ConsumerHandshake::new(
					net_id,
					config.stream_id,
					config.criteria.clone(),
				))
				.await?;

			// await the producer's handshake response
			let start = link.recv::<StartStream>().await?;

			// confirm that the producer is on the correct network
			if start.network_id() != net_id {
				tracing::warn!(
					stream_id = %Short(config.stream_id),
					producer_id = %Short(&peer_id),
					expected_network = %Short(net_id),
					received_network = %Short(start.network_id()),
					"producer is on a different network",
				);

				link.close(DifferentNetwork).await.ok();
				return Err(LinkError::Recv(RecvError::closed(DifferentNetwork)));
			}

			// confirm that the producer is producing the requested stream
			if start.stream_id() != config.stream_id {
				tracing::warn!(
					stream_id = %Short(config.stream_id),
					producer_id = %Short(&peer_id),
					"producer is producing a different stream than requested",
				);
				link.close(StreamNotFound).await.ok();
				return Err(LinkError::Recv(RecvError::closed(StreamNotFound)));
			}

			Ok(link)
		}
		.fuse()
	}

	/// Handles errors that occur while receiving data from the remote producer or
	/// during initial connection setup.
	///
	/// Application-level errors such as being unknown to the producer are
	/// communicated by the producer by closing the connection with a specific
	/// [`CloseReason`].
	///
	/// Any error during receiving data will result in dropping the current link
	/// (if it was not already closed by the producer) and potentially repairing
	/// with a new connection according to the backoff policy.
	///
	/// Inside this method, if the error is unrecoverable, the worker's
	/// cancellation token is triggered to initiate shutdown.
	fn handle_recv_error(&mut self, error: LinkError, _: Option<Link<Streams>>) {
		let close_reason = error.close_reason().cloned();

		// indicates an unrecoverable error that should terminate the worker
		// and not attempt to repair the connection any further.
		macro_rules! unrecoverable {
			() => {
				self.cancel.cancel();
				self.state_tx.send(State::Terminated).ok();
				self.stats.disconnected();
				return;
			};

			($msg:expr, $e:expr) => {
				tracing::warn!(
					stream_id = %Short(self.config.stream_id),
					producer_id = %Short(&self.peer.id()),
					criteria = ?self.config.criteria,
					error = %$e,
					$msg,
				);

				self.cancel.cancel();
				self.state_tx.send(State::Terminated).ok();
				self.stats.disconnected();
				return;
			};
		}

		match (error, close_reason) {
			// Consumer or network is terminating
			(LinkError::Cancelled | LinkError::Recv(RecvError::Cancelled), _) => {
				// explicitly cancelled through the cancellation token, shut down
				unrecoverable!();
			}

			// Received datum could not be deserialized
			(LinkError::Recv(RecvError::Decode(err)), _) => {
				// High likelihood of malicious or buggy producer sending invalid data.
				unrecoverable!("producer sent invalid datum", err);
			}

			// The connection was closed by the producer because it does not have
			// this consumer in its discovery catalog.
			(_, Some(reason)) if reason == UnknownPeer => {
				// Trigger full catalog sync with the producer then reconnect.
				let addr = self.peer.address().clone();
				let reconnect_fut = self.sync_catalog_then_connect(addr);
				self.next_connect.set(reconnect_fut);
				return;
			}

			(_, Some(reason)) if reason == NotAllowed => {
				// The producer has refused the subscription request due to
				// not meeting its subscription criteria.
				unrecoverable!("not allowed", reason);
			}

			// the connection was closed gracefully by the producer because it is
			// shutting down. Don't attempt to reconnect.
			(_, Some(reason)) if reason == GracefulShutdown => {
				unrecoverable!("producer is shutting down", "none");
			}
			(e, Some(reason)) if reason == StreamNotFound => {
				// the reason why we are not reconnecting on this error is because
				// producers are discovered through the discovery catalog which
				// should only list producers that are actually producing the
				// requested stream. If we reach this point it indicates a bug
				// either in the discovery system or the producer's stream
				// registration logic.
				// todo: Revisit this decision later.
				unrecoverable!("producer does not have the requested stream", e);
			}
			(e, Some(reason)) if reason == DifferentNetwork => {
				// The producer is on a different network than this consumer.
				// This is unrecoverable and most likely indicates a corrupt or stale
				// discovery catalog.
				unrecoverable!("producer is on a different network", e);
			}
			(_, Some(reason)) => {
				// The producer closed the connection with an application-level
				// error that is not explicitly handled above and is not known
				// to be unrecoverable. Log and attempt to reconnect.
				tracing::warn!(
					stream_id = %Short(self.config.stream_id),
					producer_id = %Short(&self.peer.id()),
					reason = %reason,
					"subscription refused by producer",
				);
			}
			(e, _) => {
				// io error occurred, drop the current link and attempt to reconnect
				tracing::warn!(
					error = %e,
					stream_id = %Short(self.config.stream_id),
					producer_id = %Short(self.peer.id()),
				);
			}
		}

		let fut = self.connect();
		self.next_connect.set(fut);
	}

	fn handle_connect_result(
		&mut self,
		result: Result<Link<Streams>, LinkError>,
	) {
		match result {
			Ok(link) => {
				// successfully connected and performed handshake
				tracing::info!(
					stream_id = %Short(self.config.stream_id),
					producer_id = %Short(&self.peer.id()),
					criteria = ?self.config.criteria,
					"connected to stream producer",
				);

				// set the receiver state to connected
				self.state_tx.send(State::Connected).ok();

				// update stats
				self.stats.connected();

				// begin listening for incoming data
				self.next_recv.set(self.make_next_recv_future(link));
			}
			Err(error) => {
				self.handle_recv_error(error, None);
			}
		}
	}

	/// Applies the backoff policy before attempting to reconnect to the
	/// remote producer. The backoff policy is reset on successful receives and
	/// initialized to the default starting state on the first connection attempt.
	fn apply_backoff(
		&mut self,
	) -> impl Future<Output = ControlFlow<()>> + use<D> {
		let producer_id = *self.peer.id();
		let next_step: Result<ControlFlow<()>, Duration> = match self.backoff {
			None => {
				self.backoff = Some((self.config.backoff)());
				Ok(ControlFlow::Continue(()))
			}
			Some(ref mut backoff) => {
				if let Some(duration) = backoff.next_backoff() {
					Err(duration)
				} else {
					// backoff policy has been exhausted, terminate the worker
					tracing::debug!(
						stream_id = %Short(self.config.stream_id),
						producer_id = %Short(producer_id),
						criteria = ?self.config.criteria,
						"exhausted all reconnection attempts, terminating",
					);

					self.cancel.cancel();
					self.state_tx.send(State::Terminated).ok();

					Ok(ControlFlow::Break(()))
				}
			}
		};

		let cancel = self.cancel.clone();
		let stream_id = self.config.stream_id;

		async move {
			match next_step {
				Ok(step) => step,
				Err(duration) => {
					tracing::debug!(
						stream_id = %Short(stream_id),
						producer_id = %Short(producer_id),
						"waiting {duration:?} before reconnecting",
					);

					tokio::select! {
						() = tokio::time::sleep(duration) => {
							ControlFlow::Continue(())
						}
						() = cancel.cancelled() => {
							ControlFlow::Break(())
						}
					}
				}
			}
		}
		.fuse()
	}

	/// Triggered when a consumer attempts to connect to a producer that does not
	/// have its info in its discovery catalog. This method performs a full
	/// catalog sync with the producer and then attempts to reconnect.
	fn sync_catalog_then_connect(
		&mut self,
		peer_addr: EndpointAddr,
	) -> impl Future<Output = Result<Link<Streams>, LinkError>> + 'static {
		let stream_id = self.config.stream_id;
		let discovery = self.discovery.clone();
		let producer_id = *self.peer.id();
		let connect_fut = self.connect();

		tracing::trace!(
			stream_id = %Short(stream_id),
			producer_id = %Short(producer_id),
			"producer is not recognizing this consumer, will sync catalog then reconnect",
		);

		async move {
			discovery.sync_with(peer_addr).await.map_err(|e| {
				tracing::warn!(
					stream_id = %Short(stream_id),
					producer_id = %Short(&producer_id),
					error = %e,
					"failed to sync discovery catalog with producer",
				);
				LinkError::Cancelled
			})?;
			connect_fut.await
		}
		.fuse()
	}
}

type DatumRecvResult<D> = Result<(D, usize), LinkError>;