Module aldrin_core::channel

source ·
Expand description

Channels-based transports.

Channels-based transports can be used to efficiently connect client and broker within the same process.

The transports come in two flavors, Bounded and Unbounded. Bounded will cause back-pressure to the sender when an internal fifo runs full, whereas Unbounded never blocks (asynchronously).

§Examples

use aldrin_broker::Broker;
use aldrin::Client;
use aldrin_core::channel;
use futures::future;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Create a broker:
    let broker = Broker::new();
    let mut broker_handle = broker.handle().clone();
    let broker_join = tokio::spawn(broker.run());

    // Connect a client with the Bounded transport:
    let (t1, t2) = channel::bounded(16);
    let (connection1, client1) =
        future::join(broker_handle.connect(t1), Client::connect(t2)).await;
    let connection1 = connection1?;
    let client1 = client1?;
    tokio::spawn(connection1.run());
    let client1_handle = client1.handle().clone();
    let client1_join = tokio::spawn(client1.run());

    // Connect a client with the Unbounded transport:
    let (t1, t2) = channel::unbounded();
    let (connection2, client2) =
        future::join(broker_handle.connect(t1), Client::connect(t2)).await;
    let connection2 = connection2?;
    let client2 = client2?;
    tokio::spawn(connection2.run());
    let client2_handle = client2.handle().clone();
    let client2_join = tokio::spawn(client2.run());

    // Shut everything down again:
    broker_handle.shutdown_idle().await;
    client1_handle.shutdown();
    client1_join.await??;
    client2_handle.shutdown();
    client2_join.await??;
    broker_join.await?;

    Ok(())
}

Structs§

  • A bounded channels-based transport for connecting a broker and a client in the same process.
  • Error type when using channels as a transport.
  • An unbounded channels-based transport for connecting a broker and a client in the same process.

Functions§

  • Creates a pair of bounded channel transports.
  • Creates a pair of unbounded channel transports.