Expand description
Tansu Client
Tansu API client.
§Simple Request client
use tansu_client::{Client, ConnectionManager, Error};
use tansu_sans_io::MetadataRequest;
use rama::{Service as _, Context};
use url::Url;
let origin = ConnectionManager::builder(Url::parse("tcp://localhost:9092")?)
.client_id(Some(env!("CARGO_PKG_NAME").into()))
.build()
.await
.map(Client::new)?;
let response = origin
.call(
MetadataRequest::default()
.topics(Some([].into()))
.allow_auto_topic_creation(Some(false))
.include_cluster_authorized_operations(Some(false))
.include_topic_authorized_operations(Some(false)),
)
.await?;§Proxy: Layer Composition
An example API proxy listening for requests on tcp://localhost:9092 that
forwards each Frame to an origin broker on tcp://example.com:9092:
use rama::{Context, Layer as _, Service as _};
use tansu_client::{
BytesConnectionService, ConnectionManager, Error, FrameConnectionLayer,
FramePoolLayer,
};
use tansu_service::{
BytesFrameLayer, FrameBytesLayer, TcpBytesLayer, TcpContextLayer, TcpListenerLayer,
host_port,
};
use tokio::net::TcpListener;
use tokio_util::sync::CancellationToken;
use url::Url;
// forward protocol frames to the origin using a connection pool:
let origin = ConnectionManager::builder(Url::parse("tcp://example.com:9092")?)
.client_id(Some(env!("CARGO_PKG_NAME").into()))
.build()
.await?;
// a tcp listener used by the proxy
let listener =
TcpListener::bind(host_port(Url::parse("tcp://localhost:9092")?).await?).await?;
// listen for requests until cancelled
let token = CancellationToken::new();
let stack = (
// server layers: reading tcp -> bytes -> frames:
TcpListenerLayer::new(token),
TcpContextLayer::default(),
TcpBytesLayer::<()>::default(),
BytesFrameLayer,
// client layers: writing frames -> connection pool -> bytes -> origin:
FramePoolLayer::new(origin),
FrameConnectionLayer,
FrameBytesLayer,
)
.into_layer(BytesConnectionService);
stack.serve(Context::default(), listener).await?;
Structs§
- Builder
- Build a
ConnectionPoolto a broker - Bytes
Connection Service - A
Servicethat writes a frame represented byBytesto aConnectionContext, returning theBytesframe response. - Client
- API client using a
ConnectionPool - Connection
- Broker connection stream with
correlation id - Connection
Manager - Manager of supported API versions for a broker
- Frame
Connection Layer - A
Layerthat takes aConnectionfrom thePoolcalling an innerServicewith thatConnectionasContext - Frame
Connection Service - A
Servicethat takes aConnectionfrom thePoolcalling an innerServicewith thatConnectionasContext - Frame
Pool Layer - Inject the
Poolinto theServiceContextof thisLayerusingFramePoolService - Frame
Pool Service - Inject the
Poolinto theServiceContextof the innerService - Request
Connection Layer - A
LayerofRequestConnectionService - Request
Connection Service - Take a
Connectionfrom thePool. Enclose theRequestin aFrameusing latest API version supported by the broker. Call the inner service with theFrameusing theConnectionasContext. - Request
Pool Layer - Inject the
Poolinto theServiceContextof thisLayerusingRequestPoolService - Request
Pool Service - Inject the
Poolinto theServiceContextof the innerService
Enums§
- Error
- Client Errors
Type Aliases§
- Pool
- A managed
Poolof brokerConnections