Skip to main content

tansu_service/
lib.rs

1// Copyright ⓒ 2024-2025 Peter Morgan <peter.james.morgan@gmail.com>
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Common service layers used in other Tansu crates.
16//!
17//! ## Overview
18//!
19//! This crate provides [Layer][`rama::Layer`] and [Service][`rama::Service`]
20//! implementations for operating on [`Frame`], [`Body`], [Request][`tansu_sans_io::Request`]
21//! and [Response][`tansu_sans_io::Response`].
22//!
23//! The following transports are provided:
24//!
25//! - TCP with [`TcpBytesLayer`] and [`BytesTcpService`].
26//! - [`MPSC channel`][`tokio::sync::mpsc`] with [`ChannelFrameLayer`] and [`ChannelFrameService`].
27//! - [`Bytes`][`bytes::Bytes`] with [`BytesLayer`] (designed primarily for protocol testing)
28//!
29//! ### Routing
30//!
31//! Route [`Frame`] to services using [`FrameRouteService`] to automatically
32//! implement [`ApiVersionsRequest`][`tansu_sans_io::ApiVersionsRequest`]
33//! with valid protocol ranges:
34//!
35//! ```
36//! # use tansu_service::Error;
37//! # #[tokio::main]
38//! # async fn main() -> Result<(), Error> {
39//! # use rama::{Context, Layer as _, Service as _};
40//! # use tansu_sans_io::{ApiKey as _, ApiVersionsRequest, MetadataRequest, MetadataResponse};
41//! # use tansu_service::{
42//! #     BytesFrameLayer, BytesFrameService, BytesLayer, BytesService, FrameBytesLayer,
43//! #     FrameBytesService, FrameRouteService, RequestFrameLayer, RequestFrameService, RequestLayer,
44//! #     ResponseService,
45//! # };
46//! let frame_route = FrameRouteService::<(), Error>::builder()
47//!     .with_service(
48//!         RequestLayer::<MetadataRequest>::new().into_layer(ResponseService::new(|_, _| {
49//!             Ok(MetadataResponse::default()
50//!                 .brokers(Some([].into()))
51//!                 .topics(Some([].into()))
52//!                 .cluster_id(Some("tansu".into()))
53//!                 .controller_id(Some(111))
54//!                 .throttle_time_ms(Some(0))
55//!                 .cluster_authorized_operations(Some(-1)))
56//!         })),
57//!     )
58//!     .and_then(|builder| builder.build())?;
59//! # Ok(())
60//! # }
61//! ```
62//!
63//! ### Layering
64//!
65//! Composing [`RequestFrameLayer`], [`FrameBytesLayer`], [`BytesLayer`],
66//! [`BytesFrameLayer`] together into `frame_route` to implement a test protocol stack.
67//!
68//! A "client" [`Frame`] is marshalled into bytes using [`FrameBytesLayer`], with [`BytesLayer`] connecting
69//! to a "server" that demarshalls using [`BytesFrameLayer`] back into frames,
70//! routing into `frame_route` (above) to [`MetadataRequest`][`tansu_sans_io::MetadataRequest`]
71//! or [`ApiVersionsRequest`][`tansu_sans_io::ApiVersionsRequest`] depending on the
72//! [API key][`Frame#method.api_key`]:
73//!
74//! ```
75//! # use tansu_service::Error;
76//! # #[tokio::main]
77//! # async fn main() -> Result<(), Error> {
78//! # use rama::{Context, Layer as _, Service as _};
79//! # use tansu_sans_io::{ApiKey as _, ApiVersionsRequest, MetadataRequest, MetadataResponse};
80//! # use tansu_service::{
81//! #     BytesFrameLayer, BytesFrameService, BytesLayer, BytesService, FrameBytesLayer,
82//! #     FrameBytesService, FrameRouteService, RequestFrameLayer, RequestFrameService, RequestLayer,
83//! #     ResponseService,
84//! # };
85//! # let frame_route = FrameRouteService::<(), Error>::builder()
86//! #     .with_service(
87//! #         RequestLayer::<MetadataRequest>::new().into_layer(ResponseService::new(|_, _| {
88//! #             Ok(MetadataResponse::default()
89//! #                 .brokers(Some([].into()))
90//! #                 .topics(Some([].into()))
91//! #                 .cluster_id(Some("tansu".into()))
92//! #                 .controller_id(Some(111))
93//! #                 .throttle_time_ms(Some(0))
94//! #                 .cluster_authorized_operations(Some(-1)))
95//! #         })),
96//! #     )
97//! #     .and_then(|builder| builder.build())?;
98//!   let service = (
99//!       // "client" initiator side:
100//!       RequestFrameLayer,
101//!       FrameBytesLayer,
102//!
103//!       // transport
104//!       BytesLayer,
105//!
106//!       // "server" side:
107//!       BytesFrameLayer::default(),
108//!   )
109//!       .into_layer(frame_route);
110//! # Ok(())
111//! # }
112//! ```
113//!
114//! In the broker, proxy and CLI clients, [`BytesLayer`] is replaced with
115//! [`TcpBytesLayer`] (server side) or [`BytesTcpService`] (client/initiator side).
116//!
117//! ### Servicing
118//!
119//! We construct a default [`service context`][`rama::Context`] and a
120//! [`MetadataRequest`][`tansu_sans_io::MetadataRequest`] to initiate a request
121//! on the `service`. The request passes through the protocol stack
122//! and routed into our service. The service responds with a
123//! [`MetadataResponse`][`tansu_sans_io::MetadataResponse`], so that we can
124//! verify the expected `response.cluster_id`:
125//!
126//! ```
127//! # use tansu_service::Error;
128//! # #[tokio::main]
129//! # async fn main() -> Result<(), Error> {
130//! # use rama::{Context, Layer as _, Service as _};
131//! # use tansu_sans_io::{ApiKey as _, ApiVersionsRequest, MetadataRequest, MetadataResponse};
132//! # use tansu_service::{
133//! #     BytesFrameLayer, BytesFrameService, BytesLayer, BytesService, FrameBytesLayer,
134//! #     FrameBytesService, FrameRouteService, RequestFrameLayer, RequestFrameService, RequestLayer,
135//! #     ResponseService,
136//! # };
137//! # let frame_route = FrameRouteService::<(), Error>::builder()
138//! #     .with_service(
139//! #         RequestLayer::<MetadataRequest>::new().into_layer(ResponseService::new(|_, _| {
140//! #             Ok(MetadataResponse::default()
141//! #                 .brokers(Some([].into()))
142//! #                 .topics(Some([].into()))
143//! #                 .cluster_id(Some("tansu".into()))
144//! #                 .controller_id(Some(111))
145//! #                 .throttle_time_ms(Some(0))
146//! #                 .cluster_authorized_operations(Some(-1)))
147//! #         })),
148//! #     )
149//! #     .and_then(|builder| builder.build())?;
150//! # let service = (
151//! #      RequestFrameLayer,
152//! #      FrameBytesLayer,
153//! #      BytesLayer,
154//! #      BytesFrameLayer::default(),
155//! #  )
156//! #      .into_layer(frame_route);
157//!   let request = MetadataRequest::default()
158//!       .topics(Some([].into()))
159//!       .allow_auto_topic_creation(Some(false))
160//!       .include_cluster_authorized_operations(Some(false))
161//!       .include_topic_authorized_operations(Some(false));
162//!
163//!   let response = service.serve(Context::default(), request).await?;
164//!
165//!   assert_eq!(Some("tansu".into()), response.cluster_id);
166//! # Ok(())
167//! # }
168//! ```
169//!
170//! The [`FrameRouteService`] automatically implements
171//! [`ApiVersionsRequest`][`tansu_sans_io::ApiVersionsRequest`]
172//! with valid protocol ranges for all defined services. An
173//! [`ApiVersionsResponse`][`tansu_sans_io::ApiVersionsResponse`] contains
174//! version information for both [`MetadataRequest`][`tansu_sans_io::MetadataRequest`]
175//! and [`ApiVersionsRequest`][`tansu_sans_io::ApiVersionsRequest`]:
176//!
177//! ```
178//! # use tansu_service::Error;
179//! # #[tokio::main]
180//! # async fn main() -> Result<(), Error> {
181//! # use rama::{Context, Layer as _, Service as _};
182//! # use tansu_sans_io::{ApiKey as _, ApiVersionsRequest, MetadataRequest, MetadataResponse};
183//! # use tansu_service::{
184//! #     BytesFrameLayer, BytesFrameService, BytesLayer, BytesService, FrameBytesLayer,
185//! #     FrameBytesService, FrameRouteService, RequestFrameLayer, RequestFrameService, RequestLayer,
186//! #     ResponseService,
187//! # };
188//! # let frame_route = FrameRouteService::<(), Error>::builder()
189//! #     .with_service(
190//! #         RequestLayer::<MetadataRequest>::new().into_layer(ResponseService::new(|_, _| {
191//! #             Ok(MetadataResponse::default()
192//! #                 .brokers(Some([].into()))
193//! #                 .topics(Some([].into()))
194//! #                 .cluster_id(Some("tansu".into()))
195//! #                 .controller_id(Some(111))
196//! #                 .throttle_time_ms(Some(0))
197//! #                 .cluster_authorized_operations(Some(-1)))
198//! #         })),
199//! #     )
200//! #     .and_then(|builder| builder.build())?;
201//! # let service = (
202//! #      RequestFrameLayer,
203//! #      FrameBytesLayer,
204//! #      BytesLayer,
205//! #      BytesFrameLayer::default(),
206//! #  )
207//! #      .into_layer(frame_route);
208//! let response = service
209//!     .serve(
210//!         Context::default(),
211//!         ApiVersionsRequest::default()
212//!             .client_software_name(Some("abcba".into()))
213//!             .client_software_version(Some("1.2321".into())),
214//!     )
215//!     .await?;
216//!
217//! let api_versions = response
218//!     .api_keys
219//!     .unwrap_or_default()
220//!     .into_iter()
221//!     .map(|api_version| api_version.api_key)
222//!     .collect::<Vec<_>>();
223//!
224//! assert_eq!(2, api_versions.len());
225//! assert!(api_versions.contains(&ApiVersionsRequest::KEY));
226//! assert!(api_versions.contains(&MetadataRequest::KEY));
227//! # Ok(())
228//! # }
229//! ```
230
231use std::{
232    fmt, io,
233    net::SocketAddr,
234    sync::{Arc, LazyLock},
235    time::SystemTime,
236};
237
238use opentelemetry::{
239    InstrumentationScope, KeyValue, global,
240    metrics::{Counter, Histogram, Meter},
241};
242use opentelemetry_semantic_conventions::SCHEMA_URL;
243use tansu_sans_io::{Body, Frame};
244use tokio::{net::lookup_host, sync::oneshot, task::JoinError};
245use tracing::debug;
246use url::Url;
247
248mod api;
249mod channel;
250mod frame;
251mod stream;
252
253pub use api::{ApiVersionsService, FrameRouteBuilder, FrameRouteService};
254
255pub use channel::{
256    ChannelFrameLayer, ChannelFrameService, FrameChannelService, FrameReceiver, FrameSender,
257    bounded_channel,
258};
259
260pub use frame::{
261    BodyRequestLayer, BytesFrameLayer, BytesFrameService, FrameApiKeyMatcher, FrameBodyLayer,
262    FrameBytesLayer, FrameBytesService, FrameRequestLayer, FrameService, RequestApiKeyMatcher,
263    RequestFrameLayer, RequestFrameService, RequestLayer, ResponseService,
264};
265
266pub use stream::{
267    BytesLayer, BytesService, BytesTcpService, TcpBytesLayer, TcpBytesService, TcpContext,
268    TcpContextLayer, TcpContextService, TcpListenerLayer,
269};
270
271#[derive(Clone, Debug, thiserror::Error)]
272pub enum Error {
273    Auth(#[from] tansu_auth::Error),
274    DuplicateRoute(i16),
275    FrameTooBig(usize),
276    Io(Arc<io::Error>),
277    Join(Arc<JoinError>),
278    Message(String),
279    OneshotRecv(oneshot::error::RecvError),
280    Parse(#[from] url::ParseError),
281    Protocol(#[from] tansu_sans_io::Error),
282    UnableToSend(Box<Frame>),
283    UnknownHost(Url),
284    UnknownServiceBody(Box<Body>),
285    UnknownServiceFrame(Box<Frame>),
286}
287
288impl fmt::Display for Error {
289    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
290        write!(f, "{self:?}")
291    }
292}
293
294impl From<JoinError> for Error {
295    fn from(value: JoinError) -> Self {
296        Self::Join(Arc::new(value))
297    }
298}
299
300impl From<io::Error> for Error {
301    fn from(value: io::Error) -> Self {
302        Self::Io(Arc::new(value))
303    }
304}
305
306fn frame_length(encoded: [u8; 4]) -> usize {
307    i32::from_be_bytes(encoded) as usize + encoded.len()
308}
309
310pub(crate) static METER: LazyLock<Meter> = LazyLock::new(|| {
311    global::meter_with_scope(
312        InstrumentationScope::builder(env!("CARGO_PKG_NAME"))
313            .with_version(env!("CARGO_PKG_VERSION"))
314            .with_schema_url(SCHEMA_URL)
315            .build(),
316    )
317});
318
319/// Return the socket address for a given URL
320///
321/// Recording DNS lookup timings in the `DNS_LOOKUP_DURATION` histogram.
322///
323/// ```rust
324/// # use tansu_service::{Error, host_port};
325/// # use url::Url;
326/// # use std::net::Ipv4Addr;
327/// # #[tokio::main]
328/// # async fn main() -> Result<(), Error> {
329/// let earl = Url::parse("tcp://localhost:9092")?;
330/// let sock_addr = host_port(earl).await?;
331/// assert_eq!(sock_addr.ip(), Ipv4Addr::new(127, 0, 0, 1));
332/// assert_eq!(sock_addr.port(), 9092);
333/// # Ok(())
334/// # }
335/// ```
336pub async fn host_port(url: Url) -> Result<SocketAddr, Error> {
337    if let Some(host) = url.host_str()
338        && let Some(port) = url.port()
339    {
340        let attributes = [KeyValue::new("url", url.to_string())];
341        let start = SystemTime::now();
342
343        let mut addresses = lookup_host(format!("{host}:{port}"))
344            .await
345            .inspect(|_| {
346                DNS_LOOKUP_DURATION.record(
347                    start
348                        .elapsed()
349                        .map_or(0, |duration| duration.as_millis() as u64),
350                    &attributes,
351                )
352            })?
353            .filter(|socket_addr| matches!(socket_addr, SocketAddr::V4(_)));
354
355        if let Some(socket_addr) = addresses.next().inspect(|socket_addr| debug!(?socket_addr)) {
356            return Ok(socket_addr);
357        }
358    }
359
360    Err(Error::UnknownHost(url))
361}
362
363pub(crate) static DNS_LOOKUP_DURATION: LazyLock<Histogram<u64>> = LazyLock::new(|| {
364    METER
365        .u64_histogram("dns_lookup_duration")
366        .with_unit("ms")
367        .with_description("DNS lookup latencies")
368        .build()
369});
370
371pub(crate) static REQUEST_SIZE: LazyLock<Histogram<u64>> = LazyLock::new(|| {
372    METER
373        .u64_histogram("tansu_request_size")
374        .with_unit("By")
375        .with_description("The API request size in bytes")
376        .build()
377});
378
379pub(crate) static RESPONSE_SIZE: LazyLock<Histogram<u64>> = LazyLock::new(|| {
380    METER
381        .u64_histogram("tansu_response_size")
382        .with_unit("By")
383        .with_description("The API response size in bytes")
384        .build()
385});
386
387pub(crate) static REQUEST_DURATION: LazyLock<Histogram<u64>> = LazyLock::new(|| {
388    METER
389        .u64_histogram("tansu_request_duration")
390        .with_unit("ms")
391        .with_description("The API request latencies in milliseconds")
392        .build()
393});
394
395pub(crate) static API_REQUESTS: LazyLock<Counter<u64>> = LazyLock::new(|| {
396    METER
397        .u64_counter("tansu_api_requests")
398        .with_description("The number of API requests made")
399        .build()
400});
401
402pub(crate) static API_ERRORS: LazyLock<Counter<u64>> = LazyLock::new(|| {
403    METER
404        .u64_counter("tansu_api_errors")
405        .with_description("The number of API errors")
406        .build()
407});
408
409pub(crate) static BYTES_SENT: LazyLock<Counter<u64>> = LazyLock::new(|| {
410    METER
411        .u64_counter("tansu_bytes_sent")
412        .with_description("The number of bytes sent")
413        .build()
414});
415
416pub(crate) static BYTES_RECEIVED: LazyLock<Counter<u64>> = LazyLock::new(|| {
417    METER
418        .u64_counter("tansu_bytes_received")
419        .with_description("The number of bytes received")
420        .build()
421});