Crate splaycast

Source
Expand description

A specialized version of Broadcast, for 1 Stream in -> many Streams out.

§About

Splaycast is a Broadcast-adjacent tool for connecting streams. It greedily consumes the upstream and tries to send to all of the downstreams as quickly as possible.

This project does not directly use unsafe code. Reputable dependencies like arc-swap and crossbeam-queue are used here, which internally do have unsafe code.

Direct dependencies are fairly trim, and while Splaycast is not tested on other runtimes, it is expected that you can use something other than Tokio.

§Details

Splaycast does not explicitly synchronize for publishing from upstream or for vending to downstream receiver streams. A normal channel() usually only has 2 main components - a sender and a receiver. Splaycast has 3:

  1. The [splaycast::Engine]. This drives the streams and notifies receivers. You spawn this on your runtime.
  2. The [splaycast::Receiver]. It’s just a stream. You use it or compose it how you need to.
  3. The Splaycast itself. This is how you subscribe new receivers. It is not a sender, and you cannot send to it.

If you drop [1] the Upstream Stream, [2] the Splaycast, or [3] the Engine, the splaycast is terminated and everything is dropped. Your Receivers will receive prompt notification of the termination of any critical upstream resource.

§Engine

The splaycast::Engine is a broadcast bridge. It is a raw Future which does its work inside of poll(). By doing so, it has &mut self, permitting the safe taking of liberties with data on the struct. There is no locking context shared with Receivers, no matter how brief.

There are some easy optimizations available on the publish (upstream) end, but Splaycast is intended to help most with high subscriber (downstream) counts so it hasn’t been a priority up to now.

Receivers’ wake handles are held in 1 of 2 places: The Wake Queue, or the Park List.

The Wake Queue is a crossbeam queue, which efficiently handles multithreaded registration of Wakers, e.g., on a multithreaded Tokio runtime. The wakers push themselves into the wake queue, and the Engine pops them out when it runs.

The Park List is where a wake handle goes when the upstream hasn’t yielded the next item that the wake handle’s Receiver needs. It is a plain, owned Vec of wake handles. Upon receipt of a new item from the upstream, the park list is unconditionally drained and woken in one shot. There is no synchronization with Receivers, so the expectation is that those Receivers may immediately begin consuming on another thread. Being a plain Vec, the Park List will grow to match your subscriber count, and amortize to no-allocation-cost over time.

§Receiver

The buffer (buffet-style, if you like) is shared with the Receivers via an ArcSwap. Each receiver is woken when there may be something new for them to consume. The Receiver consumes from the then-current “buffet” Arc item-by-item until it reaches the end of the buffer. Only then does it register itself for wake with the Engine.

§Splaycast

You register new Receivers through this component. It carries a reference to the Shared structure that the 3 components of a Splaycast all share, which is all that is needed to hook up a new Receiver.

§Examples

The most basic usage of splaycast which approximates a normal broadcast channel:

let (sender, engine, splaycast) = splaycast::channel(128);
tokio::spawn(engine);

let mut receiver = splaycast.subscribe();
sender.send("hello");

let hello = receiver.next().await;
assert_eq!(Some(Message::Entry { item: "hello" }), hello);

Some basic examples can be found under src/benches.

§Feature Flags

Modules§

buffer_policy

Structs§

Engine
An Engine is an api-less plugin to an event loop. It is an adapter between an upstream Stream and downstream subscriber Streams.
Receiver
This is a cloned view of the upstream Stream you wrapped with a Splaycast. You receive crate::Messages on this stream. If you’d like to get back to your Item type, you can .map() this stream and handle Message::Lagged however it makes sense for your use.
Sender
A single-producer sender, for a splaycast.
SenderStream
Splaycast
The handle for attaching new subscribers to and inspecting the state of a splaycast.
SubscriberCountHandle
A handle for inspecting the current subscriber count. Subscriber counts are updated asynchronously, so values may be stale.

Enums§

Message
Messages on a Splaycast Receiver are either an Entry or a Lagged. If you lag, you’ll get a count of how many messages were skipped, and then you’ll resume Entries from that point on.

Functions§

channel
Get a channel to splay out to streaming receivers.
channel_with_policy
Get a channel to splay out to streaming receivers.
wrap
Wrap a stream with a Splaycast - a broadcast channel for streams.
wrap_with_policy
Wrap a stream with a Splaycast - a broadcast channel for streams.