Expand description
Common service layers used in other Tansu crates.
§Overview
This crate provides Layer and Service
implementations for operating on Frame, Body, Request
and Response.
The following transports are provided:
- TCP with
TcpBytesLayerandBytesTcpService. MPSC channelwithChannelFrameLayerandChannelFrameService.ByteswithBytesLayer(designed primarily for protocol testing)
§Routing
Route Frame to services using FrameRouteService to automatically
implement ApiVersionsRequest
with valid protocol ranges:
let frame_route = FrameRouteService::<(), Error>::builder()
.with_service(
RequestLayer::<MetadataRequest>::new().into_layer(ResponseService::new(|_, _| {
Ok(MetadataResponse::default()
.brokers(Some([].into()))
.topics(Some([].into()))
.cluster_id(Some("tansu".into()))
.controller_id(Some(111))
.throttle_time_ms(Some(0))
.cluster_authorized_operations(Some(-1)))
})),
)
.and_then(|builder| builder.build())?;§Layering
Composing RequestFrameLayer, FrameBytesLayer, BytesLayer,
BytesFrameLayer together into frame_route to implement a test protocol stack.
A “client” Frame is marshalled into bytes using FrameBytesLayer, with BytesLayer connecting
to a “server” that demarshalls using BytesFrameLayer back into frames,
routing into frame_route (above) to MetadataRequest
or ApiVersionsRequest depending on the
API key:
let service = (
// "client" initiator side:
RequestFrameLayer,
FrameBytesLayer,
// transport
BytesLayer,
// "server" side:
BytesFrameLayer::default(),
)
.into_layer(frame_route);In the broker, proxy and CLI clients, BytesLayer is replaced with
TcpBytesLayer (server side) or BytesTcpService (client/initiator side).
§Servicing
We construct a default service context and a
MetadataRequest to initiate a request
on the service. The request passes through the protocol stack
and routed into our service. The service responds with a
MetadataResponse, so that we can
verify the expected response.cluster_id:
let request = MetadataRequest::default()
.topics(Some([].into()))
.allow_auto_topic_creation(Some(false))
.include_cluster_authorized_operations(Some(false))
.include_topic_authorized_operations(Some(false));
let response = service.serve(Context::default(), request).await?;
assert_eq!(Some("tansu".into()), response.cluster_id);The FrameRouteService automatically implements
ApiVersionsRequest
with valid protocol ranges for all defined services. An
ApiVersionsResponse contains
version information for both MetadataRequest
and ApiVersionsRequest:
let response = service
.serve(
Context::default(),
ApiVersionsRequest::default()
.client_software_name(Some("abcba".into()))
.client_software_version(Some("1.2321".into())),
)
.await?;
let api_versions = response
.api_keys
.unwrap_or_default()
.into_iter()
.map(|api_version| api_version.api_key)
.collect::<Vec<_>>();
assert_eq!(2, api_versions.len());
assert!(api_versions.contains(&ApiVersionsRequest::KEY));
assert!(api_versions.contains(&MetadataRequest::KEY));Structs§
- ApiVersions
Service - An
ApiVersionsResponseServicewith a supported set of API and versions fromRootMessageMeta. - Body
Request Layer - A
Layerthat transformsBodyintoRequest - Bytes
Frame Layer - A
Layerthat transformsBytesintoFrames - Bytes
Frame Service - A
ServicetransformingBytess intoFrames - Bytes
Layer - A
Layerthat handles and responds withBytes - Bytes
Service - A
Servicethat handles and responds withBytes - Bytes
TcpService - A
ServicewritingBytesinto aTcpStream, responding with a length delimited frame ofBytes - Channel
Frame Layer - A
LayerreceivingFrames from aFrameReceiverchannel - Channel
Frame Service - A
ServicereceivingFrames from aFrameReceiverchannel - Frame
ApiKey Matcher - A
MatcherofFrames using their API key - Frame
Body Layer - A
Layerthat transformsFrameintoBody - Frame
Bytes Layer - A
Layerthat transformsFrames intoBytes - Frame
Bytes Service - A
Servicethat transformsFrames intoBytes - Frame
Channel Service - A
ServicesendingFrames over aFrameSenderchannel - Frame
Request Layer - A
Layerthat transformsFrameintoRequest - Frame
Route Builder - A
Frameroute builder providing anApiVersionsResponsefor all available routes - Frame
Route Service - Route
Frameto aServicevia API key - Frame
Service - A
Servicethat transformsFrameinto aFrameusing a closure. - Request
ApiKey Matcher - A Matcher of
Requests using their API key. - Request
Frame Layer - A
Layerthat transformsRequestintoFrame - Request
Frame Service - A
Servicethat transformsRequestintoFrame - Request
Layer - A
Layerfor handling APIRequests responding with an APIResponse - Response
Service - A
Servicethat transformsRequestinto aResponseusing a closure. - TcpBytes
Layer - A
LayerreceivingBytesfrom aTcpStream - TcpBytes
Service - A
ServicereceivingBytesfrom aTcpStream, calling an innerServiceand sendingBytesinto theTcpStream - TcpContext
- A context state state used by
TcpContextLayerandTcpContextService - TcpContext
Layer - A
Layerthat injects theTcpContextinto the serviceContextstate - TcpContext
Service - A
Servicethat requires theTcpContextas the serviceContextstate - TcpListener
Layer - A
Layerthat listens for TCP connections
Enums§
Functions§
- bounded_
channel - A bounded channel for sending and receiving frames
- host_
port - Return the socket address for a given URL
Type Aliases§
- Frame
Receiver - A channel frame receiver
- Frame
Sender - A channel frame sender