Skip to main content

Crate tansu_service

Crate tansu_service 

Source
Expand description

Common service layers used in other Tansu crates.

§Overview

This crate provides Layer and Service implementations for operating on Frame, Body, Request and Response.

The following transports are provided:

§Routing

Route Frame to services using FrameRouteService to automatically implement ApiVersionsRequest with valid protocol ranges:

let frame_route = FrameRouteService::<(), Error>::builder()
    .with_service(
        RequestLayer::<MetadataRequest>::new().into_layer(ResponseService::new(|_, _| {
            Ok(MetadataResponse::default()
                .brokers(Some([].into()))
                .topics(Some([].into()))
                .cluster_id(Some("tansu".into()))
                .controller_id(Some(111))
                .throttle_time_ms(Some(0))
                .cluster_authorized_operations(Some(-1)))
        })),
    )
    .and_then(|builder| builder.build())?;

§Layering

Composing RequestFrameLayer, FrameBytesLayer, BytesLayer, BytesFrameLayer together into frame_route to implement a test protocol stack.

A “client” Frame is marshalled into bytes using FrameBytesLayer, with BytesLayer connecting to a “server” that demarshalls using BytesFrameLayer back into frames, routing into frame_route (above) to MetadataRequest or ApiVersionsRequest depending on the API key:

  let service = (
      // "client" initiator side:
      RequestFrameLayer,
      FrameBytesLayer,

      // transport
      BytesLayer,

      // "server" side:
      BytesFrameLayer::default(),
  )
      .into_layer(frame_route);

In the broker, proxy and CLI clients, BytesLayer is replaced with TcpBytesLayer (server side) or BytesTcpService (client/initiator side).

§Servicing

We construct a default service context and a MetadataRequest to initiate a request on the service. The request passes through the protocol stack and routed into our service. The service responds with a MetadataResponse, so that we can verify the expected response.cluster_id:

  let request = MetadataRequest::default()
      .topics(Some([].into()))
      .allow_auto_topic_creation(Some(false))
      .include_cluster_authorized_operations(Some(false))
      .include_topic_authorized_operations(Some(false));

  let response = service.serve(Context::default(), request).await?;

  assert_eq!(Some("tansu".into()), response.cluster_id);

The FrameRouteService automatically implements ApiVersionsRequest with valid protocol ranges for all defined services. An ApiVersionsResponse contains version information for both MetadataRequest and ApiVersionsRequest:

let response = service
    .serve(
        Context::default(),
        ApiVersionsRequest::default()
            .client_software_name(Some("abcba".into()))
            .client_software_version(Some("1.2321".into())),
    )
    .await?;

let api_versions = response
    .api_keys
    .unwrap_or_default()
    .into_iter()
    .map(|api_version| api_version.api_key)
    .collect::<Vec<_>>();

assert_eq!(2, api_versions.len());
assert!(api_versions.contains(&ApiVersionsRequest::KEY));
assert!(api_versions.contains(&MetadataRequest::KEY));

Structs§

ApiVersionsService
An ApiVersionsResponse Service with a supported set of API and versions from RootMessageMeta.
BodyRequestLayer
A Layer that transforms Body into Request
BytesFrameLayer
A Layer that transforms Bytes into Frames
BytesFrameService
A Service transforming Bytess into Frames
BytesLayer
A Layer that handles and responds with Bytes
BytesService
A Service that handles and responds with Bytes
BytesTcpService
A Service writing Bytes into a TcpStream, responding with a length delimited frame of Bytes
ChannelFrameLayer
A Layer receiving Frames from a FrameReceiver channel
ChannelFrameService
A Service receiving Frames from a FrameReceiver channel
FrameApiKeyMatcher
A Matcher of Frames using their API key
FrameBodyLayer
A Layer that transforms Frame into Body
FrameBytesLayer
A Layer that transforms Frames into Bytes
FrameBytesService
A Service that transforms Frames into Bytes
FrameChannelService
A Service sending Frames over a FrameSender channel
FrameRequestLayer
A Layer that transforms Frame into Request
FrameRouteBuilder
A Frame route builder providing an ApiVersionsResponse for all available routes
FrameRouteService
Route Frame to a Service via API key
FrameService
A Service that transforms Frame into a Frame using a closure.
RequestApiKeyMatcher
A Matcher of Requests using their API key.
RequestFrameLayer
A Layer that transforms Request into Frame
RequestFrameService
A Service that transforms Request into Frame
RequestLayer
A Layer for handling API Requests responding with an API Response
ResponseService
A Service that transforms Request into a Response using a closure.
TcpBytesLayer
A Layer receiving Bytes from a TcpStream
TcpBytesService
A Service receiving Bytes from a TcpStream, calling an inner Service and sending Bytes into the TcpStream
TcpContext
A context state state used by TcpContextLayer and TcpContextService
TcpContextLayer
A Layer that injects the TcpContext into the service Context state
TcpContextService
A Service that requires the TcpContext as the service Context state
TcpListenerLayer
A Layer that listens for TCP connections

Enums§

Error

Functions§

bounded_channel
A bounded channel for sending and receiving frames
host_port
Return the socket address for a given URL

Type Aliases§

FrameReceiver
A channel frame receiver
FrameSender
A channel frame sender