Skip to main content

nisshi_client/
lib.rs

1// Copyright ⓒ 2024-2025 Peter Morgan <peter.james.morgan@gmail.com>
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Nisshi Client
16//!
17//! Nisshi API client.
18//!
19//! # Simple [`Request`] client
20//!
21//! ```no_run
22//! use nisshi_client::{Client, ConnectionManager, Error};
23//! use nisshi_sans_io::MetadataRequest;
24//! use rama::{Service as _, Context};
25//! use url::Url;
26//!
27//! # #[tokio::main]
28//! # async fn main() -> Result<(), Error> {
29//! let origin = ConnectionManager::builder(Url::parse("tcp://localhost:9092")?)
30//!     .client_id(Some(env!("CARGO_PKG_NAME").into()))
31//!     .build()
32//!     .await
33//!     .map(Client::new)?;
34//!
35//! let response = origin
36//!     .call(
37//!         MetadataRequest::default()
38//!             .topics(Some([].into()))
39//!             .allow_auto_topic_creation(Some(false))
40//!             .include_cluster_authorized_operations(Some(false))
41//!             .include_topic_authorized_operations(Some(false)),
42//!     )
43//!     .await?;
44//! # Ok(())
45//! # }
46//! ```
47//!
48//! # Proxy: [`Layer`] Composition
49//!
50//! An example API proxy listening for requests on `tcp://localhost:9092` that
51//! forwards each [`Frame`] to an origin broker on `tcp://example.com:9092`:
52//!
53//! ```no_run
54//! use rama::{Context, Layer as _, Service as _};
55//! use nisshi_client::{
56//!     BytesConnectionService, ConnectionManager, Error, FrameConnectionLayer,
57//!     FramePoolLayer,
58//! };
59//! use nisshi_service::{
60//!     BytesFrameLayer, FrameBytesLayer, TcpBytesLayer, TcpContextLayer, TcpListenerLayer,
61//!     host_port,
62//! };
63//! use tokio::net::TcpListener;
64//! use tokio_util::sync::CancellationToken;
65//! use url::Url;
66//!
67//! # #[tokio::main]
68//! # async fn main() -> Result<(), Error> {
69//! // forward protocol frames to the origin using a connection pool:
70//! let origin = ConnectionManager::builder(Url::parse("tcp://example.com:9092")?)
71//!     .client_id(Some(env!("CARGO_PKG_NAME").into()))
72//!     .build()
73//!     .await?;
74//!
75//! // a tcp listener used by the proxy
76//! let listener =
77//!     TcpListener::bind(host_port(Url::parse("tcp://localhost:9092")?).await?).await?;
78//!
79//! // listen for requests until cancelled
80//! let token = CancellationToken::new();
81//!
82//! let stack = (
83//!     // server layers: reading tcp -> bytes -> frames:
84//!     TcpListenerLayer::new(token),
85//!     TcpContextLayer::default(),
86//!     TcpBytesLayer::<()>::default(),
87//!     BytesFrameLayer::default(),
88//!
89//!     // client layers: writing frames -> connection pool -> bytes -> origin:
90//!     FramePoolLayer::new(origin),
91//!     FrameConnectionLayer,
92//!     FrameBytesLayer,
93//! )
94//!     .into_layer(BytesConnectionService);
95//!
96//! stack.serve(Context::default(), listener).await?;
97//!
98//! # Ok(())
99//! # }
100//! ```
101
102use std::{
103    collections::BTreeMap,
104    error, fmt, io,
105    sync::{Arc, LazyLock, PoisonError},
106    time::SystemTime,
107};
108
109use backoff::{ExponentialBackoffBuilder, future::retry};
110use bytes::Bytes;
111use deadpool::managed::{self, BuildError, Object, PoolError};
112use nisshi_sans_io::{ApiKey, ApiVersionsRequest, Body, Frame, Header, Request, RootMessageMeta};
113use nisshi_service::{FrameBytesLayer, FrameBytesService, host_port};
114use opentelemetry::{
115    InstrumentationScope, KeyValue, global,
116    metrics::{Counter, Gauge, Histogram, Meter},
117};
118use opentelemetry_semantic_conventions::SCHEMA_URL;
119use rama::{Context, Layer, Service};
120use tokio::{
121    io::{AsyncReadExt as _, AsyncWriteExt as _},
122    net::TcpStream,
123    task::JoinError,
124    time::Duration,
125};
126use tracing::{Instrument, Level, debug, span};
127use tracing_subscriber::filter::ParseError;
128use url::Url;
129
130mod consumer;
131
132pub use consumer::{ConsumerGroupLayer, ConsumerGroupService};
133
134/// Client Errors
135#[derive(thiserror::Error, Clone, Debug)]
136pub enum Error {
137    DeadPoolBuild(#[from] BuildError),
138    Io(Arc<io::Error>),
139    Join(Arc<JoinError>),
140    Message(String),
141    ParseFilter(Arc<ParseError>),
142    ParseUrl(#[from] url::ParseError),
143    Poison,
144    Pool(Arc<Box<dyn error::Error + Send + Sync>>),
145    Protocol(#[from] nisshi_sans_io::Error),
146    Service(#[from] nisshi_service::Error),
147    UnknownApiKey(i16),
148    UnknownHost(Url),
149}
150
151impl<T> From<PoisonError<T>> for Error {
152    fn from(_value: PoisonError<T>) -> Self {
153        Self::Poison
154    }
155}
156
157impl fmt::Display for Error {
158    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
159        write!(f, "{self:?}")
160    }
161}
162
163impl From<JoinError> for Error {
164    fn from(value: JoinError) -> Self {
165        Self::Join(Arc::new(value))
166    }
167}
168
169impl<E> From<PoolError<E>> for Error
170where
171    E: error::Error + Send + Sync + 'static,
172{
173    fn from(value: PoolError<E>) -> Self {
174        Self::Pool(Arc::new(Box::new(value)))
175    }
176}
177
178impl From<io::Error> for Error {
179    fn from(value: io::Error) -> Self {
180        Self::Io(Arc::new(value))
181    }
182}
183
184impl From<ParseError> for Error {
185    fn from(value: ParseError) -> Self {
186        Self::ParseFilter(Arc::new(value))
187    }
188}
189
190pub(crate) static METER: LazyLock<Meter> = LazyLock::new(|| {
191    global::meter_with_scope(
192        InstrumentationScope::builder(env!("CARGO_PKG_NAME"))
193            .with_version(env!("CARGO_PKG_VERSION"))
194            .with_schema_url(SCHEMA_URL)
195            .build(),
196    )
197});
198
199///  Broker connection stream with [`correlation id`][`Header#variant.Request.field.correlation_id`]
200#[derive(Debug)]
201pub struct Connection {
202    stream: TcpStream,
203    correlation_id: i32,
204}
205
206/// Manager of supported API versions for a broker
207#[derive(Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)]
208pub struct ConnectionManager {
209    broker: Url,
210    client_id: Option<String>,
211    versions: BTreeMap<i16, i16>,
212}
213
214impl ConnectionManager {
215    /// Build a manager with a broker endpoint
216    pub fn builder(broker: Url) -> Builder {
217        Builder::broker(broker)
218    }
219
220    /// Client id used in requests to the broker
221    pub fn client_id(&self) -> Option<String> {
222        self.client_id.clone()
223    }
224
225    /// The version supported by the broker for a given api key
226    pub fn api_version(&self, api_key: i16) -> Result<i16, Error> {
227        self.versions
228            .get(&api_key)
229            .copied()
230            .ok_or(Error::UnknownApiKey(api_key))
231    }
232}
233
234const INITIAL_CONNECTION_TIMEOUT_MILLIS: u64 = 30_000;
235
236impl managed::Manager for ConnectionManager {
237    type Type = Connection;
238    type Error = Error;
239
240    async fn create(&self) -> Result<Self::Type, Self::Error> {
241        debug!(%self.broker);
242
243        let attributes = [KeyValue::new("broker", self.broker.to_string())];
244        let start = SystemTime::now();
245
246        let addr = host_port(self.broker.clone()).await?;
247
248        let backoff = ExponentialBackoffBuilder::new()
249            .with_max_elapsed_time(Some(Duration::from_millis(
250                INITIAL_CONNECTION_TIMEOUT_MILLIS,
251            )))
252            .build();
253        retry(backoff, || async {
254            Ok(TcpStream::connect(addr)
255                .await
256                .inspect(|_| {
257                    TCP_CONNECT_DURATION.record(
258                        start
259                            .elapsed()
260                            .map_or(0, |duration| duration.as_millis() as u64),
261                        &attributes,
262                    )
263                })
264                .inspect_err(|err| {
265                    debug!(broker = %self.broker, ?err, elapsed = start.elapsed().map_or(0, |duration| duration.as_millis() as u64));
266                    TCP_CONNECT_ERRORS.add(1, &attributes);
267                })
268                .map(|stream| Connection {
269                    stream,
270                    correlation_id: 0,
271                })?)
272        })
273        .await
274        .map_err(Into::into)
275    }
276
277    async fn recycle(
278        &self,
279        obj: &mut Self::Type,
280        metrics: &managed::Metrics,
281    ) -> managed::RecycleResult<Self::Error> {
282        debug!(obj.correlation_id, metrics.recycle_count);
283        Ok(())
284    }
285}
286
287/// A managed [`Pool`] of broker [`Connection`]s
288pub type Pool = managed::Pool<ConnectionManager>;
289
290fn status_update(pool: &Pool) {
291    let status = pool.status();
292    POOL_AVAILABLE.record(status.available as u64, &[]);
293    POOL_CURRENT_SIZE.record(status.size as u64, &[]);
294    POOL_MAX_SIZE.record(status.max_size as u64, &[]);
295    POOL_WAITING.record(status.waiting as u64, &[]);
296}
297
298/// [Build][`Builder#method.build`] a [`Connection`] [`Pool`] to a [broker][`Builder#method.broker`]
299#[derive(Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)]
300pub struct Builder {
301    broker: Url,
302    client_id: Option<String>,
303}
304
305impl Builder {
306    /// Broker URL
307    pub fn broker(broker: Url) -> Self {
308        Self {
309            broker,
310            client_id: None,
311        }
312    }
313
314    /// Client id used when making requests to the broker
315    pub fn client_id(self, client_id: Option<String>) -> Self {
316        Self { client_id, ..self }
317    }
318
319    /// Inquire with the broker supported api versions
320    async fn bootstrap(&self) -> Result<BTreeMap<i16, i16>, Error> {
321        // Create a temporary pool to establish the API requests
322        // and versions supported by the broker
323        let versions = BTreeMap::from([(ApiVersionsRequest::KEY, 0)]);
324
325        let req = ApiVersionsRequest::default()
326            .client_software_name(Some(env!("CARGO_PKG_NAME").into()))
327            .client_software_version(Some(env!("CARGO_PKG_VERSION").into()));
328
329        let client = Pool::builder(ConnectionManager {
330            broker: self.broker.clone(),
331            client_id: self.client_id.clone(),
332            versions,
333        })
334        .build()
335        .map(Client::new)?;
336
337        let supported = RootMessageMeta::messages().requests();
338
339        client.call(req).await.map(|response| {
340            response
341                .api_keys
342                .unwrap_or_default()
343                .into_iter()
344                .filter_map(|api| {
345                    supported.get(&api.api_key).and_then(|supported| {
346                        if api.min_version >= supported.version.valid.start {
347                            Some((
348                                api.api_key,
349                                api.max_version.min(supported.version.valid.end),
350                            ))
351                        } else {
352                            None
353                        }
354                    })
355                })
356                .collect()
357        })
358    }
359
360    /// Establish the API versions supported by the broker returning a [`Pool`]
361    pub async fn build(self) -> Result<Pool, Error> {
362        self.bootstrap().await.and_then(|versions| {
363            Pool::builder(ConnectionManager {
364                broker: self.broker,
365                client_id: self.client_id,
366                versions,
367            })
368            .build()
369            .map_err(Into::into)
370        })
371    }
372}
373
374/// Inject the [`Pool`][`Pool`] into the [`Service`] [`Context`] of this [`Layer`] using [`FramePoolService`]
375#[derive(Clone, Debug)]
376pub struct FramePoolLayer {
377    pool: Pool,
378}
379
380impl FramePoolLayer {
381    pub fn new(pool: Pool) -> Self {
382        Self { pool }
383    }
384}
385
386impl<S> Layer<S> for FramePoolLayer {
387    type Service = FramePoolService<S>;
388
389    fn layer(&self, inner: S) -> Self::Service {
390        FramePoolService {
391            pool: self.pool.clone(),
392            inner,
393        }
394    }
395}
396
397/// Inject the [`Pool`][`Pool`] into the [`Service`] [`Context`] of the inner [`Service`]
398#[derive(Clone, Debug)]
399pub struct FramePoolService<S> {
400    pool: Pool,
401    inner: S,
402}
403
404impl<State, S> Service<State, Frame> for FramePoolService<S>
405where
406    S: Service<Pool, Frame, Response = Frame>,
407    State: Send + Sync + 'static,
408{
409    type Response = Frame;
410    type Error = S::Error;
411
412    async fn serve(&self, ctx: Context<State>, req: Frame) -> Result<Self::Response, Self::Error> {
413        let (ctx, _) = ctx.swap_state(self.pool.clone());
414        self.inner.serve(ctx, req).await
415    }
416}
417
418/// Inject the [`Pool`][`Pool`] into the [`Service`] [`Context`] of this [`Layer`] using [`RequestPoolService`]
419#[derive(Clone, Debug)]
420pub struct RequestPoolLayer {
421    pool: Pool,
422}
423
424impl RequestPoolLayer {
425    pub fn new(pool: Pool) -> Self {
426        Self { pool }
427    }
428}
429
430impl<S> Layer<S> for RequestPoolLayer {
431    type Service = RequestPoolService<S>;
432
433    fn layer(&self, inner: S) -> Self::Service {
434        RequestPoolService {
435            pool: self.pool.clone(),
436            inner,
437        }
438    }
439}
440
441/// Inject the [`Pool`][`Pool`] into the [`Service`] [`Context`] of the inner [`Service`]
442#[derive(Clone, Debug)]
443pub struct RequestPoolService<S> {
444    pool: Pool,
445    inner: S,
446}
447
448impl<State, S, Q> Service<State, Q> for RequestPoolService<S>
449where
450    Q: Request,
451    S: Service<Pool, Q>,
452    State: Send + Sync + 'static,
453{
454    type Response = S::Response;
455    type Error = S::Error;
456
457    /// serve the request, injecting the pool into the context of the inner service
458    async fn serve(&self, ctx: Context<State>, req: Q) -> Result<Self::Response, Self::Error> {
459        let (ctx, _) = ctx.swap_state(self.pool.clone());
460        self.inner.serve(ctx, req).await
461    }
462}
463
464/// API client using a [`Connection`] [`Pool`]
465#[derive(Clone, Debug)]
466pub struct Client {
467    service:
468        RequestPoolService<RequestConnectionService<FrameBytesService<BytesConnectionService>>>,
469}
470
471impl Client {
472    /// Create a new client using the supplied pool
473    pub fn new(pool: Pool) -> Self {
474        let service = (
475            RequestPoolLayer::new(pool),
476            RequestConnectionLayer,
477            FrameBytesLayer,
478        )
479            .into_layer(BytesConnectionService);
480
481        Self { service }
482    }
483
484    /// Make an API request using the connection from the pool
485    pub async fn call<Q>(&self, req: Q) -> Result<Q::Response, Error>
486    where
487        Q: Request,
488        Error: From<<<Q as Request>::Response as TryFrom<Body>>::Error>,
489    {
490        self.service.serve(Context::default(), req).await
491    }
492}
493
494/// A [`Layer`] that takes a [`Connection`] from the [`Pool`] calling an inner [`Service`] with that [`Connection`] as [`Context`]
495#[derive(Clone, Copy, Debug, Default, Eq, Hash, Ord, PartialEq, PartialOrd)]
496pub struct FrameConnectionLayer;
497
498impl<S> Layer<S> for FrameConnectionLayer {
499    type Service = FrameConnectionService<S>;
500
501    fn layer(&self, inner: S) -> Self::Service {
502        Self::Service { inner }
503    }
504}
505
506/// A [`Service`] that takes a [`Connection`] from the [`Pool`] calling an inner [`Service`] with that [`Connection`] as [`Context`]
507#[derive(Clone, Copy, Debug, Default, Eq, Hash, Ord, PartialEq, PartialOrd)]
508pub struct FrameConnectionService<S> {
509    inner: S,
510}
511
512impl<S> Service<Pool, Frame> for FrameConnectionService<S>
513where
514    S: Service<Object<ConnectionManager>, Frame, Response = Frame>,
515    S::Error: From<Error> + From<PoolError<Error>> + From<nisshi_sans_io::Error>,
516{
517    type Response = Frame;
518    type Error = S::Error;
519
520    async fn serve(&self, ctx: Context<Pool>, req: Frame) -> Result<Self::Response, Self::Error> {
521        debug!(?req);
522
523        let api_key = req.api_key()?;
524        let api_version = req.api_version()?;
525        let client_id = req
526            .client_id()
527            .map(|client_id| client_id.map(|client_id| client_id.to_string()))?;
528
529        let pool = ctx.state();
530        status_update(pool);
531
532        let connection = {
533            let start = SystemTime::now();
534            pool.get().await.inspect(|_| {
535                POOL_GET_DURATION.record(
536                    start
537                        .elapsed()
538                        .map_or(0, |duration| duration.as_millis() as u64),
539                    &[],
540                );
541            })?
542        };
543
544        let correlation_id = connection.correlation_id;
545
546        let frame = Frame {
547            size: 0,
548            header: Header::Request {
549                api_key,
550                api_version,
551                correlation_id,
552                client_id,
553            },
554            body: req.body,
555        };
556
557        let (ctx, _) = ctx.swap_state(connection);
558
559        self.inner.serve(ctx, frame).await
560    }
561}
562
563/// A [`Layer`] of [`RequestConnectionService`]
564#[derive(Clone, Copy, Debug, Default, Eq, Hash, Ord, PartialEq, PartialOrd)]
565pub struct RequestConnectionLayer;
566
567impl<S> Layer<S> for RequestConnectionLayer {
568    type Service = RequestConnectionService<S>;
569
570    fn layer(&self, inner: S) -> Self::Service {
571        Self::Service { inner }
572    }
573}
574
575/// Take a [`Connection`] from the [`Pool`]. Enclose the [`Request`]
576/// in a [`Frame`] using latest API version supported by the broker. Call the
577/// inner service with the [`Frame`] using the [`Connection`] as [`Context`].
578#[derive(Clone, Copy, Debug, Default, Eq, Hash, Ord, PartialEq, PartialOrd)]
579pub struct RequestConnectionService<S> {
580    inner: S,
581}
582
583impl<Q, S> Service<Pool, Q> for RequestConnectionService<S>
584where
585    Q: Request,
586    S: Service<Object<ConnectionManager>, Frame, Response = Frame>,
587    S::Error: From<Error>
588        + From<PoolError<Error>>
589        + From<nisshi_sans_io::Error>
590        + From<<Q::Response as TryFrom<Body>>::Error>,
591{
592    type Response = Q::Response;
593    type Error = S::Error;
594
595    async fn serve(&self, ctx: Context<Pool>, req: Q) -> Result<Self::Response, Self::Error> {
596        debug!(?req);
597        let pool = ctx.state();
598        status_update(pool);
599
600        let api_key = Q::KEY;
601        let api_version = pool.manager().api_version(api_key)?;
602        let client_id = pool.manager().client_id();
603        let connection = {
604            let start = SystemTime::now();
605            pool.get().await.inspect(|_| {
606                POOL_GET_DURATION.record(
607                    start
608                        .elapsed()
609                        .map_or(0, |duration| duration.as_millis() as u64),
610                    &[],
611                );
612            })?
613        };
614
615        let correlation_id = connection.correlation_id;
616
617        let frame = Frame {
618            size: 0,
619            header: Header::Request {
620                api_key,
621                api_version,
622                correlation_id,
623                client_id,
624            },
625            body: req.into(),
626        };
627
628        let (ctx, _) = ctx.swap_state(connection);
629
630        let frame = self.inner.serve(ctx, frame).await?;
631
632        Q::Response::try_from(frame.body)
633            .inspect(|response| debug!(?response))
634            .map_err(Into::into)
635    }
636}
637
638/// A [`Service`] that writes a frame represented by [`Bytes`] to a [`Connection`] [`Context`], returning the [`Bytes`] frame response.
639#[derive(Clone, Copy, Debug, Default, Eq, Hash, Ord, PartialEq, PartialOrd)]
640pub struct BytesConnectionService;
641
642impl BytesConnectionService {
643    async fn write(
644        &self,
645        stream: &mut TcpStream,
646        frame: Bytes,
647        attributes: &[KeyValue],
648    ) -> Result<(), Error> {
649        debug!(frame = ?&frame[..]);
650
651        let start = SystemTime::now();
652
653        stream
654            .write_all(&frame[..])
655            .await
656            .inspect(|_| {
657                TCP_SEND_DURATION.record(
658                    start
659                        .elapsed()
660                        .map_or(0, |duration| duration.as_millis() as u64),
661                    attributes,
662                );
663
664                TCP_BYTES_SENT.add(frame.len() as u64, attributes);
665            })
666            .inspect_err(|_| {
667                TCP_SEND_ERRORS.add(1, attributes);
668            })
669            .map_err(Into::into)
670    }
671
672    async fn read(&self, stream: &mut TcpStream, attributes: &[KeyValue]) -> Result<Bytes, Error> {
673        let start = SystemTime::now();
674
675        let mut size = [0u8; 4];
676        _ = stream.read_exact(&mut size).await?;
677
678        let mut buffer: Vec<u8> = vec![0u8; frame_length(size)];
679        buffer[0..size.len()].copy_from_slice(&size[..]);
680        _ = stream
681            .read_exact(&mut buffer[4..])
682            .await
683            .inspect(|_| {
684                TCP_RECEIVE_DURATION.record(
685                    start
686                        .elapsed()
687                        .map_or(0, |duration| duration.as_millis() as u64),
688                    attributes,
689                );
690
691                TCP_BYTES_RECEIVED.add(buffer.len() as u64, attributes);
692            })
693            .inspect_err(|_| {
694                TCP_RECEIVE_ERRORS.add(1, attributes);
695            })?;
696
697        Ok(Bytes::from(buffer)).inspect(|frame| debug!(frame = ?&frame[..]))
698    }
699}
700
701impl Service<Object<ConnectionManager>, Bytes> for BytesConnectionService {
702    type Response = Bytes;
703    type Error = Error;
704
705    async fn serve(
706        &self,
707        mut ctx: Context<Object<ConnectionManager>>,
708        req: Bytes,
709    ) -> Result<Self::Response, Self::Error> {
710        let c = ctx.state_mut();
711
712        let local = c.stream.local_addr()?;
713        let peer = c.stream.peer_addr()?;
714
715        let attributes = [KeyValue::new("peer", peer.to_string())];
716
717        let span = span!(Level::DEBUG, "client", local = %local, peer = %peer);
718
719        async move {
720            self.write(&mut c.stream, req, &attributes).await?;
721
722            c.correlation_id += 1;
723
724            self.read(&mut c.stream, &attributes).await
725        }
726        .instrument(span)
727        .await
728    }
729}
730
731fn frame_length(encoded: [u8; 4]) -> usize {
732    i32::from_be_bytes(encoded) as usize + encoded.len()
733}
734
735static TCP_CONNECT_DURATION: LazyLock<Histogram<u64>> = LazyLock::new(|| {
736    METER
737        .u64_histogram("tcp_connect_duration")
738        .with_unit("ms")
739        .with_description("The TCP connect latencies in milliseconds")
740        .build()
741});
742
743static TCP_CONNECT_ERRORS: LazyLock<Counter<u64>> = LazyLock::new(|| {
744    METER
745        .u64_counter("tcp_connect_errors")
746        .with_description("TCP connect errors")
747        .build()
748});
749
750static TCP_SEND_DURATION: LazyLock<Histogram<u64>> = LazyLock::new(|| {
751    METER
752        .u64_histogram("tcp_send_duration")
753        .with_unit("ms")
754        .with_description("The TCP send latencies in milliseconds")
755        .build()
756});
757
758static TCP_SEND_ERRORS: LazyLock<Counter<u64>> = LazyLock::new(|| {
759    METER
760        .u64_counter("tcp_send_errors")
761        .with_description("TCP send errors")
762        .build()
763});
764
765static TCP_RECEIVE_DURATION: LazyLock<Histogram<u64>> = LazyLock::new(|| {
766    METER
767        .u64_histogram("tcp_receive_duration")
768        .with_unit("ms")
769        .with_description("The TCP receive latencies in milliseconds")
770        .build()
771});
772
773static TCP_RECEIVE_ERRORS: LazyLock<Counter<u64>> = LazyLock::new(|| {
774    METER
775        .u64_counter("tcp_receive_errors")
776        .with_description("TCP receive errors")
777        .build()
778});
779
780static TCP_BYTES_SENT: LazyLock<Counter<u64>> = LazyLock::new(|| {
781    METER
782        .u64_counter("tcp_bytes_sent")
783        .with_description("TCP bytes sent")
784        .build()
785});
786
787static TCP_BYTES_RECEIVED: LazyLock<Counter<u64>> = LazyLock::new(|| {
788    METER
789        .u64_counter("tcp_bytes_received")
790        .with_description("TCP bytes received")
791        .build()
792});
793
794static POOL_GET_DURATION: LazyLock<Histogram<u64>> = LazyLock::new(|| {
795    METER
796        .u64_histogram("pool_get_duration")
797        .with_unit("ms")
798        .with_description("The Pool Get latencies in milliseconds")
799        .build()
800});
801
802static POOL_MAX_SIZE: LazyLock<Gauge<u64>> = LazyLock::new(|| {
803    METER
804        .u64_gauge("pool_max_size")
805        .with_description("The maximum size of the pool")
806        .build()
807});
808
809static POOL_CURRENT_SIZE: LazyLock<Gauge<u64>> = LazyLock::new(|| {
810    METER
811        .u64_gauge("pool_current_size")
812        .with_description("The current size of the pool")
813        .build()
814});
815
816static POOL_AVAILABLE: LazyLock<Gauge<u64>> = LazyLock::new(|| {
817    METER
818        .u64_gauge("pool_available")
819        .with_description("The number of available objects in the pool")
820        .build()
821});
822
823static POOL_WAITING: LazyLock<Gauge<u64>> = LazyLock::new(|| {
824    METER
825        .u64_gauge("pool_waiting")
826        .with_description("The number of waiting objects in the pool")
827        .build()
828});
829
830#[cfg(test)]
831mod tests {
832    use std::{fs::File, thread};
833
834    use nisshi_sans_io::{MetadataRequest, MetadataResponse};
835    use nisshi_service::{
836        BytesFrameLayer, FrameRouteService, RequestLayer, ResponseService, TcpBytesLayer,
837        TcpContextLayer, TcpListenerLayer,
838    };
839    use tokio::{net::TcpListener, task::JoinSet};
840    use tokio_util::sync::CancellationToken;
841    use tracing::subscriber::DefaultGuard;
842    use tracing_subscriber::EnvFilter;
843
844    use super::*;
845
846    fn init_tracing() -> Result<DefaultGuard, Error> {
847        Ok(tracing::subscriber::set_default(
848            tracing_subscriber::fmt()
849                .with_level(true)
850                .with_line_number(true)
851                .with_thread_names(false)
852                .with_env_filter(
853                    EnvFilter::from_default_env()
854                        .add_directive(format!("{}=debug", env!("CARGO_CRATE_NAME")).parse()?),
855                )
856                .with_writer(
857                    thread::current()
858                        .name()
859                        .ok_or(Error::Message(String::from("unnamed thread")))
860                        .and_then(|name| {
861                            File::create(format!("../logs/{}/{name}.log", env!("CARGO_PKG_NAME"),))
862                                .map_err(Into::into)
863                        })
864                        .map(Arc::new)?,
865                )
866                .finish(),
867        ))
868    }
869
870    async fn server(cancellation: CancellationToken, listener: TcpListener) -> Result<(), Error> {
871        let server = (
872            TcpListenerLayer::new(cancellation),
873            TcpContextLayer::default(),
874            TcpBytesLayer::default(),
875            BytesFrameLayer::default(),
876        )
877            .into_layer(
878                FrameRouteService::builder()
879                    .with_service(RequestLayer::<MetadataRequest>::new().into_layer(
880                        ResponseService::new(|_ctx: Context<()>, _req: MetadataRequest| {
881                            Ok::<_, Error>(
882                                MetadataResponse::default()
883                                    .brokers(Some([].into()))
884                                    .topics(Some([].into()))
885                                    .cluster_id(Some("abc".into()))
886                                    .controller_id(Some(111))
887                                    .throttle_time_ms(Some(0))
888                                    .cluster_authorized_operations(Some(-1)),
889                            )
890                        }),
891                    ))
892                    .and_then(|builder| builder.build())?,
893            );
894
895        server.serve(Context::default(), listener).await
896    }
897
898    #[tokio::test]
899    async fn tcp_client_server() -> Result<(), Error> {
900        let _guard = init_tracing()?;
901
902        let cancellation = CancellationToken::new();
903        let listener = TcpListener::bind("127.0.0.1:0").await?;
904        let local_addr = listener.local_addr()?;
905
906        let mut join = JoinSet::new();
907
908        let _server = {
909            let cancellation = cancellation.clone();
910            join.spawn(async move { server(cancellation, listener).await })
911        };
912
913        let origin = (
914            RequestPoolLayer::new(
915                ConnectionManager::builder(
916                    Url::parse(&format!("tcp://{local_addr}")).inspect(|url| debug!(%url))?,
917                )
918                .client_id(Some(env!("CARGO_PKG_NAME").into()))
919                .build()
920                .await
921                .inspect(|pool| debug!(?pool))?,
922            ),
923            RequestConnectionLayer,
924            FrameBytesLayer,
925        )
926            .into_layer(BytesConnectionService);
927
928        let response = origin
929            .serve(
930                Context::default(),
931                MetadataRequest::default()
932                    .topics(Some([].into()))
933                    .allow_auto_topic_creation(Some(false))
934                    .include_cluster_authorized_operations(Some(false))
935                    .include_topic_authorized_operations(Some(false)),
936            )
937            .await?;
938
939        assert_eq!(Some("abc"), response.cluster_id.as_deref());
940        assert_eq!(Some(111), response.controller_id);
941
942        cancellation.cancel();
943
944        let joined = join.join_all().await;
945        debug!(?joined);
946
947        Ok(())
948    }
949}