tansu_service/lib.rs
1// Copyright ⓒ 2024-2025 Peter Morgan <peter.james.morgan@gmail.com>
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Common service layers used in other Tansu crates.
16//!
17//! ## Overview
18//!
19//! This crate provides [Layer][`rama::Layer`] and [Service][`rama::Service`]
20//! implementations for operating on [`Frame`], [`Body`], [Request][`tansu_sans_io::Request`]
21//! and [Response][`tansu_sans_io::Response`].
22//!
23//! The following transports are provided:
24//!
25//! - TCP with [`TcpBytesLayer`] and [`BytesTcpService`].
26//! - [`MPSC channel`][`tokio::sync::mpsc`] with [`ChannelFrameLayer`] and [`ChannelFrameService`].
27//! - [`Bytes`][`bytes::Bytes`] with [`BytesLayer`] (designed primarily for protocol testing)
28//!
29//! ### Routing
30//!
31//! Route [`Frame`] to services using [`FrameRouteService`] to automatically
32//! implement [`ApiVersionsRequest`][`tansu_sans_io::ApiVersionsRequest`]
33//! with valid protocol ranges:
34//!
35//! ```
36//! # use tansu_service::Error;
37//! # #[tokio::main]
38//! # async fn main() -> Result<(), Error> {
39//! # use rama::{Context, Layer as _, Service as _};
40//! # use tansu_sans_io::{ApiKey as _, ApiVersionsRequest, MetadataRequest, MetadataResponse};
41//! # use tansu_service::{
42//! # BytesFrameLayer, BytesFrameService, BytesLayer, BytesService, FrameBytesLayer,
43//! # FrameBytesService, FrameRouteService, RequestFrameLayer, RequestFrameService, RequestLayer,
44//! # ResponseService,
45//! # };
46//! let frame_route = FrameRouteService::<(), Error>::builder()
47//! .with_service(
48//! RequestLayer::<MetadataRequest>::new().into_layer(ResponseService::new(|_, _| {
49//! Ok(MetadataResponse::default()
50//! .brokers(Some([].into()))
51//! .topics(Some([].into()))
52//! .cluster_id(Some("tansu".into()))
53//! .controller_id(Some(111))
54//! .throttle_time_ms(Some(0))
55//! .cluster_authorized_operations(Some(-1)))
56//! })),
57//! )
58//! .and_then(|builder| builder.build())?;
59//! # Ok(())
60//! # }
61//! ```
62//!
63//! ### Layering
64//!
65//! Composing [`RequestFrameLayer`], [`FrameBytesLayer`], [`BytesLayer`],
66//! [`BytesFrameLayer`] together into `frame_route` to implement a test protocol stack.
67//!
68//! A "client" [`Frame`] is marshalled into bytes using [`FrameBytesLayer`], with [`BytesLayer`] connecting
69//! to a "server" that demarshalls using [`BytesFrameLayer`] back into frames,
70//! routing into `frame_route` (above) to [`MetadataRequest`][`tansu_sans_io::MetadataRequest`]
71//! or [`ApiVersionsRequest`][`tansu_sans_io::ApiVersionsRequest`] depending on the
72//! [API key][`Frame#method.api_key`]:
73//!
74//! ```
75//! # use tansu_service::Error;
76//! # #[tokio::main]
77//! # async fn main() -> Result<(), Error> {
78//! # use rama::{Context, Layer as _, Service as _};
79//! # use tansu_sans_io::{ApiKey as _, ApiVersionsRequest, MetadataRequest, MetadataResponse};
80//! # use tansu_service::{
81//! # BytesFrameLayer, BytesFrameService, BytesLayer, BytesService, FrameBytesLayer,
82//! # FrameBytesService, FrameRouteService, RequestFrameLayer, RequestFrameService, RequestLayer,
83//! # ResponseService,
84//! # };
85//! # let frame_route = FrameRouteService::<(), Error>::builder()
86//! # .with_service(
87//! # RequestLayer::<MetadataRequest>::new().into_layer(ResponseService::new(|_, _| {
88//! # Ok(MetadataResponse::default()
89//! # .brokers(Some([].into()))
90//! # .topics(Some([].into()))
91//! # .cluster_id(Some("tansu".into()))
92//! # .controller_id(Some(111))
93//! # .throttle_time_ms(Some(0))
94//! # .cluster_authorized_operations(Some(-1)))
95//! # })),
96//! # )
97//! # .and_then(|builder| builder.build())?;
98//! let service = (
99//! // "client" initiator side:
100//! RequestFrameLayer,
101//! FrameBytesLayer,
102//!
103//! // transport
104//! BytesLayer,
105//!
106//! // "server" side:
107//! BytesFrameLayer::default(),
108//! )
109//! .into_layer(frame_route);
110//! # Ok(())
111//! # }
112//! ```
113//!
114//! In the broker, proxy and CLI clients, [`BytesLayer`] is replaced with
115//! [`TcpBytesLayer`] (server side) or [`BytesTcpService`] (client/initiator side).
116//!
117//! ### Servicing
118//!
119//! We construct a default [`service context`][`rama::Context`] and a
120//! [`MetadataRequest`][`tansu_sans_io::MetadataRequest`] to initiate a request
121//! on the `service`. The request passes through the protocol stack
122//! and routed into our service. The service responds with a
123//! [`MetadataResponse`][`tansu_sans_io::MetadataResponse`], so that we can
124//! verify the expected `response.cluster_id`:
125//!
126//! ```
127//! # use tansu_service::Error;
128//! # #[tokio::main]
129//! # async fn main() -> Result<(), Error> {
130//! # use rama::{Context, Layer as _, Service as _};
131//! # use tansu_sans_io::{ApiKey as _, ApiVersionsRequest, MetadataRequest, MetadataResponse};
132//! # use tansu_service::{
133//! # BytesFrameLayer, BytesFrameService, BytesLayer, BytesService, FrameBytesLayer,
134//! # FrameBytesService, FrameRouteService, RequestFrameLayer, RequestFrameService, RequestLayer,
135//! # ResponseService,
136//! # };
137//! # let frame_route = FrameRouteService::<(), Error>::builder()
138//! # .with_service(
139//! # RequestLayer::<MetadataRequest>::new().into_layer(ResponseService::new(|_, _| {
140//! # Ok(MetadataResponse::default()
141//! # .brokers(Some([].into()))
142//! # .topics(Some([].into()))
143//! # .cluster_id(Some("tansu".into()))
144//! # .controller_id(Some(111))
145//! # .throttle_time_ms(Some(0))
146//! # .cluster_authorized_operations(Some(-1)))
147//! # })),
148//! # )
149//! # .and_then(|builder| builder.build())?;
150//! # let service = (
151//! # RequestFrameLayer,
152//! # FrameBytesLayer,
153//! # BytesLayer,
154//! # BytesFrameLayer::default(),
155//! # )
156//! # .into_layer(frame_route);
157//! let request = MetadataRequest::default()
158//! .topics(Some([].into()))
159//! .allow_auto_topic_creation(Some(false))
160//! .include_cluster_authorized_operations(Some(false))
161//! .include_topic_authorized_operations(Some(false));
162//!
163//! let response = service.serve(Context::default(), request).await?;
164//!
165//! assert_eq!(Some("tansu".into()), response.cluster_id);
166//! # Ok(())
167//! # }
168//! ```
169//!
170//! The [`FrameRouteService`] automatically implements
171//! [`ApiVersionsRequest`][`tansu_sans_io::ApiVersionsRequest`]
172//! with valid protocol ranges for all defined services. An
173//! [`ApiVersionsResponse`][`tansu_sans_io::ApiVersionsResponse`] contains
174//! version information for both [`MetadataRequest`][`tansu_sans_io::MetadataRequest`]
175//! and [`ApiVersionsRequest`][`tansu_sans_io::ApiVersionsRequest`]:
176//!
177//! ```
178//! # use tansu_service::Error;
179//! # #[tokio::main]
180//! # async fn main() -> Result<(), Error> {
181//! # use rama::{Context, Layer as _, Service as _};
182//! # use tansu_sans_io::{ApiKey as _, ApiVersionsRequest, MetadataRequest, MetadataResponse};
183//! # use tansu_service::{
184//! # BytesFrameLayer, BytesFrameService, BytesLayer, BytesService, FrameBytesLayer,
185//! # FrameBytesService, FrameRouteService, RequestFrameLayer, RequestFrameService, RequestLayer,
186//! # ResponseService,
187//! # };
188//! # let frame_route = FrameRouteService::<(), Error>::builder()
189//! # .with_service(
190//! # RequestLayer::<MetadataRequest>::new().into_layer(ResponseService::new(|_, _| {
191//! # Ok(MetadataResponse::default()
192//! # .brokers(Some([].into()))
193//! # .topics(Some([].into()))
194//! # .cluster_id(Some("tansu".into()))
195//! # .controller_id(Some(111))
196//! # .throttle_time_ms(Some(0))
197//! # .cluster_authorized_operations(Some(-1)))
198//! # })),
199//! # )
200//! # .and_then(|builder| builder.build())?;
201//! # let service = (
202//! # RequestFrameLayer,
203//! # FrameBytesLayer,
204//! # BytesLayer,
205//! # BytesFrameLayer::default(),
206//! # )
207//! # .into_layer(frame_route);
208//! let response = service
209//! .serve(
210//! Context::default(),
211//! ApiVersionsRequest::default()
212//! .client_software_name(Some("abcba".into()))
213//! .client_software_version(Some("1.2321".into())),
214//! )
215//! .await?;
216//!
217//! let api_versions = response
218//! .api_keys
219//! .unwrap_or_default()
220//! .into_iter()
221//! .map(|api_version| api_version.api_key)
222//! .collect::<Vec<_>>();
223//!
224//! assert_eq!(2, api_versions.len());
225//! assert!(api_versions.contains(&ApiVersionsRequest::KEY));
226//! assert!(api_versions.contains(&MetadataRequest::KEY));
227//! # Ok(())
228//! # }
229//! ```
230
231use std::{
232 fmt, io,
233 net::SocketAddr,
234 sync::{Arc, LazyLock},
235 time::SystemTime,
236};
237
238use opentelemetry::{
239 InstrumentationScope, KeyValue, global,
240 metrics::{Counter, Histogram, Meter},
241};
242use opentelemetry_semantic_conventions::SCHEMA_URL;
243use tansu_sans_io::{Body, Frame};
244use tokio::{net::lookup_host, sync::oneshot, task::JoinError};
245use tracing::debug;
246use url::Url;
247
248mod api;
249mod channel;
250mod frame;
251mod stream;
252
253pub use api::{ApiVersionsService, FrameRouteBuilder, FrameRouteService};
254
255pub use channel::{
256 ChannelFrameLayer, ChannelFrameService, FrameChannelService, FrameReceiver, FrameSender,
257 bounded_channel,
258};
259
260pub use frame::{
261 BodyRequestLayer, BytesFrameLayer, BytesFrameService, FrameApiKeyMatcher, FrameBodyLayer,
262 FrameBytesLayer, FrameBytesService, FrameRequestLayer, FrameService, RequestApiKeyMatcher,
263 RequestFrameLayer, RequestFrameService, RequestLayer, ResponseService,
264};
265
266pub use stream::{
267 BytesLayer, BytesService, BytesTcpService, TcpBytesLayer, TcpBytesService, TcpContext,
268 TcpContextLayer, TcpContextService, TcpListenerLayer,
269};
270
271#[derive(Clone, Debug, thiserror::Error)]
272pub enum Error {
273 Auth(#[from] tansu_auth::Error),
274 DuplicateRoute(i16),
275 FrameTooBig(usize),
276 Io(Arc<io::Error>),
277 Join(Arc<JoinError>),
278 Message(String),
279 OneshotRecv(oneshot::error::RecvError),
280 Parse(#[from] url::ParseError),
281 Protocol(#[from] tansu_sans_io::Error),
282 UnableToSend(Box<Frame>),
283 UnknownHost(Url),
284 UnknownServiceBody(Box<Body>),
285 UnknownServiceFrame(Box<Frame>),
286}
287
288impl fmt::Display for Error {
289 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
290 write!(f, "{self:?}")
291 }
292}
293
294impl From<JoinError> for Error {
295 fn from(value: JoinError) -> Self {
296 Self::Join(Arc::new(value))
297 }
298}
299
300impl From<io::Error> for Error {
301 fn from(value: io::Error) -> Self {
302 Self::Io(Arc::new(value))
303 }
304}
305
306fn frame_length(encoded: [u8; 4]) -> usize {
307 i32::from_be_bytes(encoded) as usize + encoded.len()
308}
309
310pub(crate) static METER: LazyLock<Meter> = LazyLock::new(|| {
311 global::meter_with_scope(
312 InstrumentationScope::builder(env!("CARGO_PKG_NAME"))
313 .with_version(env!("CARGO_PKG_VERSION"))
314 .with_schema_url(SCHEMA_URL)
315 .build(),
316 )
317});
318
319/// Return the socket address for a given URL
320///
321/// Recording DNS lookup timings in the `DNS_LOOKUP_DURATION` histogram.
322///
323/// ```rust
324/// # use tansu_service::{Error, host_port};
325/// # use url::Url;
326/// # use std::net::Ipv4Addr;
327/// # #[tokio::main]
328/// # async fn main() -> Result<(), Error> {
329/// let earl = Url::parse("tcp://localhost:9092")?;
330/// let sock_addr = host_port(earl).await?;
331/// assert_eq!(sock_addr.ip(), Ipv4Addr::new(127, 0, 0, 1));
332/// assert_eq!(sock_addr.port(), 9092);
333/// # Ok(())
334/// # }
335/// ```
336pub async fn host_port(url: Url) -> Result<SocketAddr, Error> {
337 if let Some(host) = url.host_str()
338 && let Some(port) = url.port()
339 {
340 let attributes = [KeyValue::new("url", url.to_string())];
341 let start = SystemTime::now();
342
343 let mut addresses = lookup_host(format!("{host}:{port}"))
344 .await
345 .inspect(|_| {
346 DNS_LOOKUP_DURATION.record(
347 start
348 .elapsed()
349 .map_or(0, |duration| duration.as_millis() as u64),
350 &attributes,
351 )
352 })?
353 .filter(|socket_addr| matches!(socket_addr, SocketAddr::V4(_)));
354
355 if let Some(socket_addr) = addresses.next().inspect(|socket_addr| debug!(?socket_addr)) {
356 return Ok(socket_addr);
357 }
358 }
359
360 Err(Error::UnknownHost(url))
361}
362
363pub(crate) static DNS_LOOKUP_DURATION: LazyLock<Histogram<u64>> = LazyLock::new(|| {
364 METER
365 .u64_histogram("dns_lookup_duration")
366 .with_unit("ms")
367 .with_description("DNS lookup latencies")
368 .build()
369});
370
371pub(crate) static REQUEST_SIZE: LazyLock<Histogram<u64>> = LazyLock::new(|| {
372 METER
373 .u64_histogram("tansu_request_size")
374 .with_unit("By")
375 .with_description("The API request size in bytes")
376 .build()
377});
378
379pub(crate) static RESPONSE_SIZE: LazyLock<Histogram<u64>> = LazyLock::new(|| {
380 METER
381 .u64_histogram("tansu_response_size")
382 .with_unit("By")
383 .with_description("The API response size in bytes")
384 .build()
385});
386
387pub(crate) static REQUEST_DURATION: LazyLock<Histogram<u64>> = LazyLock::new(|| {
388 METER
389 .u64_histogram("tansu_request_duration")
390 .with_unit("ms")
391 .with_description("The API request latencies in milliseconds")
392 .build()
393});
394
395pub(crate) static API_REQUESTS: LazyLock<Counter<u64>> = LazyLock::new(|| {
396 METER
397 .u64_counter("tansu_api_requests")
398 .with_description("The number of API requests made")
399 .build()
400});
401
402pub(crate) static API_ERRORS: LazyLock<Counter<u64>> = LazyLock::new(|| {
403 METER
404 .u64_counter("tansu_api_errors")
405 .with_description("The number of API errors")
406 .build()
407});
408
409pub(crate) static BYTES_SENT: LazyLock<Counter<u64>> = LazyLock::new(|| {
410 METER
411 .u64_counter("tansu_bytes_sent")
412 .with_description("The number of bytes sent")
413 .build()
414});
415
416pub(crate) static BYTES_RECEIVED: LazyLock<Counter<u64>> = LazyLock::new(|| {
417 METER
418 .u64_counter("tansu_bytes_received")
419 .with_description("The number of bytes received")
420 .build()
421});