Crate tansu_client

Crate tansu_client 

Source
Expand description

Tansu Client

Tansu API client.

§Simple Request client

use tansu_client::{Client, ConnectionManager, Error};
use tansu_sans_io::MetadataRequest;
use rama::{Service as _, Context};
use url::Url;

let origin = ConnectionManager::builder(Url::parse("tcp://localhost:9092")?)
    .client_id(Some(env!("CARGO_PKG_NAME").into()))
    .build()
    .await
    .map(Client::new)?;

let response = origin
    .call(
        MetadataRequest::default()
            .topics(Some([].into()))
            .allow_auto_topic_creation(Some(false))
            .include_cluster_authorized_operations(Some(false))
            .include_topic_authorized_operations(Some(false)),
    )
    .await?;

§Proxy: Layer Composition

An example API proxy listening for requests on tcp://localhost:9092 that forwards each Frame to an origin broker on tcp://example.com:9092:

use rama::{Context, Layer as _, Service as _};
use tansu_client::{
    BytesConnectionService, ConnectionManager, Error, FrameConnectionLayer,
    FramePoolLayer,
};
use tansu_service::{
    BytesFrameLayer, FrameBytesLayer, TcpBytesLayer, TcpContextLayer, TcpListenerLayer,
    host_port,
};
use tokio::net::TcpListener;
use tokio_util::sync::CancellationToken;
use url::Url;

// forward protocol frames to the origin using a connection pool:
let origin = ConnectionManager::builder(Url::parse("tcp://example.com:9092")?)
    .client_id(Some(env!("CARGO_PKG_NAME").into()))
    .build()
    .await?;

// a tcp listener used by the proxy
let listener =
    TcpListener::bind(host_port(Url::parse("tcp://localhost:9092")?).await?).await?;

// listen for requests until cancelled
let token = CancellationToken::new();

let stack = (
    // server layers: reading tcp -> bytes -> frames:
    TcpListenerLayer::new(token),
    TcpContextLayer::default(),
    TcpBytesLayer::<()>::default(),
    BytesFrameLayer,

    // client layers: writing frames -> connection pool -> bytes -> origin:
    FramePoolLayer::new(origin),
    FrameConnectionLayer,
    FrameBytesLayer,
)
    .into_layer(BytesConnectionService);

stack.serve(Context::default(), listener).await?;

Structs§

Builder
Build a Connection Pool to a broker
BytesConnectionService
A Service that writes a frame represented by Bytes to a Connection Context, returning the Bytes frame response.
Client
API client using a Connection Pool
Connection
Broker connection stream with correlation id
ConnectionManager
Manager of supported API versions for a broker
FrameConnectionLayer
A Layer that takes a Connection from the Pool calling an inner Service with that Connection as Context
FrameConnectionService
A Service that takes a Connection from the Pool calling an inner Service with that Connection as Context
FramePoolLayer
Inject the Pool into the Service Context of this Layer using FramePoolService
FramePoolService
Inject the Pool into the Service Context of the inner Service
RequestConnectionLayer
A Layer of RequestConnectionService
RequestConnectionService
Take a Connection from the Pool. Enclose the Request in a Frame using latest API version supported by the broker. Call the inner service with the Frame using the Connection as Context.
RequestPoolLayer
Inject the Pool into the Service Context of this Layer using RequestPoolService
RequestPoolService
Inject the Pool into the Service Context of the inner Service

Enums§

Error
Client Errors

Type Aliases§

Pool
A managed Pool of broker Connections