1use std::{
103 collections::BTreeMap,
104 error, fmt, io,
105 sync::{Arc, LazyLock, PoisonError},
106 time::SystemTime,
107};
108
109use backoff::{ExponentialBackoffBuilder, future::retry};
110use bytes::Bytes;
111use deadpool::managed::{self, BuildError, Object, PoolError};
112use nisshi_sans_io::{ApiKey, ApiVersionsRequest, Body, Frame, Header, Request, RootMessageMeta};
113use nisshi_service::{FrameBytesLayer, FrameBytesService, host_port};
114use opentelemetry::{
115 InstrumentationScope, KeyValue, global,
116 metrics::{Counter, Gauge, Histogram, Meter},
117};
118use opentelemetry_semantic_conventions::SCHEMA_URL;
119use rama::{Context, Layer, Service};
120use tokio::{
121 io::{AsyncReadExt as _, AsyncWriteExt as _},
122 net::TcpStream,
123 task::JoinError,
124 time::Duration,
125};
126use tracing::{Instrument, Level, debug, span};
127use tracing_subscriber::filter::ParseError;
128use url::Url;
129
130mod consumer;
131
132pub use consumer::{ConsumerGroupLayer, ConsumerGroupService};
133
134#[derive(thiserror::Error, Clone, Debug)]
136pub enum Error {
137 DeadPoolBuild(#[from] BuildError),
138 Io(Arc<io::Error>),
139 Join(Arc<JoinError>),
140 Message(String),
141 ParseFilter(Arc<ParseError>),
142 ParseUrl(#[from] url::ParseError),
143 Poison,
144 Pool(Arc<Box<dyn error::Error + Send + Sync>>),
145 Protocol(#[from] nisshi_sans_io::Error),
146 Service(#[from] nisshi_service::Error),
147 UnknownApiKey(i16),
148 UnknownHost(Url),
149}
150
151impl<T> From<PoisonError<T>> for Error {
152 fn from(_value: PoisonError<T>) -> Self {
153 Self::Poison
154 }
155}
156
157impl fmt::Display for Error {
158 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
159 write!(f, "{self:?}")
160 }
161}
162
163impl From<JoinError> for Error {
164 fn from(value: JoinError) -> Self {
165 Self::Join(Arc::new(value))
166 }
167}
168
169impl<E> From<PoolError<E>> for Error
170where
171 E: error::Error + Send + Sync + 'static,
172{
173 fn from(value: PoolError<E>) -> Self {
174 Self::Pool(Arc::new(Box::new(value)))
175 }
176}
177
178impl From<io::Error> for Error {
179 fn from(value: io::Error) -> Self {
180 Self::Io(Arc::new(value))
181 }
182}
183
184impl From<ParseError> for Error {
185 fn from(value: ParseError) -> Self {
186 Self::ParseFilter(Arc::new(value))
187 }
188}
189
190pub(crate) static METER: LazyLock<Meter> = LazyLock::new(|| {
191 global::meter_with_scope(
192 InstrumentationScope::builder(env!("CARGO_PKG_NAME"))
193 .with_version(env!("CARGO_PKG_VERSION"))
194 .with_schema_url(SCHEMA_URL)
195 .build(),
196 )
197});
198
199#[derive(Debug)]
201pub struct Connection {
202 stream: TcpStream,
203 correlation_id: i32,
204}
205
206#[derive(Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)]
208pub struct ConnectionManager {
209 broker: Url,
210 client_id: Option<String>,
211 versions: BTreeMap<i16, i16>,
212}
213
214impl ConnectionManager {
215 pub fn builder(broker: Url) -> Builder {
217 Builder::broker(broker)
218 }
219
220 pub fn client_id(&self) -> Option<String> {
222 self.client_id.clone()
223 }
224
225 pub fn api_version(&self, api_key: i16) -> Result<i16, Error> {
227 self.versions
228 .get(&api_key)
229 .copied()
230 .ok_or(Error::UnknownApiKey(api_key))
231 }
232}
233
234const INITIAL_CONNECTION_TIMEOUT_MILLIS: u64 = 30_000;
235
236impl managed::Manager for ConnectionManager {
237 type Type = Connection;
238 type Error = Error;
239
240 async fn create(&self) -> Result<Self::Type, Self::Error> {
241 debug!(%self.broker);
242
243 let attributes = [KeyValue::new("broker", self.broker.to_string())];
244 let start = SystemTime::now();
245
246 let addr = host_port(self.broker.clone()).await?;
247
248 let backoff = ExponentialBackoffBuilder::new()
249 .with_max_elapsed_time(Some(Duration::from_millis(
250 INITIAL_CONNECTION_TIMEOUT_MILLIS,
251 )))
252 .build();
253 retry(backoff, || async {
254 Ok(TcpStream::connect(addr)
255 .await
256 .inspect(|_| {
257 TCP_CONNECT_DURATION.record(
258 start
259 .elapsed()
260 .map_or(0, |duration| duration.as_millis() as u64),
261 &attributes,
262 )
263 })
264 .inspect_err(|err| {
265 debug!(broker = %self.broker, ?err, elapsed = start.elapsed().map_or(0, |duration| duration.as_millis() as u64));
266 TCP_CONNECT_ERRORS.add(1, &attributes);
267 })
268 .map(|stream| Connection {
269 stream,
270 correlation_id: 0,
271 })?)
272 })
273 .await
274 .map_err(Into::into)
275 }
276
277 async fn recycle(
278 &self,
279 obj: &mut Self::Type,
280 metrics: &managed::Metrics,
281 ) -> managed::RecycleResult<Self::Error> {
282 debug!(obj.correlation_id, metrics.recycle_count);
283 Ok(())
284 }
285}
286
287pub type Pool = managed::Pool<ConnectionManager>;
289
290fn status_update(pool: &Pool) {
291 let status = pool.status();
292 POOL_AVAILABLE.record(status.available as u64, &[]);
293 POOL_CURRENT_SIZE.record(status.size as u64, &[]);
294 POOL_MAX_SIZE.record(status.max_size as u64, &[]);
295 POOL_WAITING.record(status.waiting as u64, &[]);
296}
297
298#[derive(Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)]
300pub struct Builder {
301 broker: Url,
302 client_id: Option<String>,
303}
304
305impl Builder {
306 pub fn broker(broker: Url) -> Self {
308 Self {
309 broker,
310 client_id: None,
311 }
312 }
313
314 pub fn client_id(self, client_id: Option<String>) -> Self {
316 Self { client_id, ..self }
317 }
318
319 async fn bootstrap(&self) -> Result<BTreeMap<i16, i16>, Error> {
321 let versions = BTreeMap::from([(ApiVersionsRequest::KEY, 0)]);
324
325 let req = ApiVersionsRequest::default()
326 .client_software_name(Some(env!("CARGO_PKG_NAME").into()))
327 .client_software_version(Some(env!("CARGO_PKG_VERSION").into()));
328
329 let client = Pool::builder(ConnectionManager {
330 broker: self.broker.clone(),
331 client_id: self.client_id.clone(),
332 versions,
333 })
334 .build()
335 .map(Client::new)?;
336
337 let supported = RootMessageMeta::messages().requests();
338
339 client.call(req).await.map(|response| {
340 response
341 .api_keys
342 .unwrap_or_default()
343 .into_iter()
344 .filter_map(|api| {
345 supported.get(&api.api_key).and_then(|supported| {
346 if api.min_version >= supported.version.valid.start {
347 Some((
348 api.api_key,
349 api.max_version.min(supported.version.valid.end),
350 ))
351 } else {
352 None
353 }
354 })
355 })
356 .collect()
357 })
358 }
359
360 pub async fn build(self) -> Result<Pool, Error> {
362 self.bootstrap().await.and_then(|versions| {
363 Pool::builder(ConnectionManager {
364 broker: self.broker,
365 client_id: self.client_id,
366 versions,
367 })
368 .build()
369 .map_err(Into::into)
370 })
371 }
372}
373
374#[derive(Clone, Debug)]
376pub struct FramePoolLayer {
377 pool: Pool,
378}
379
380impl FramePoolLayer {
381 pub fn new(pool: Pool) -> Self {
382 Self { pool }
383 }
384}
385
386impl<S> Layer<S> for FramePoolLayer {
387 type Service = FramePoolService<S>;
388
389 fn layer(&self, inner: S) -> Self::Service {
390 FramePoolService {
391 pool: self.pool.clone(),
392 inner,
393 }
394 }
395}
396
397#[derive(Clone, Debug)]
399pub struct FramePoolService<S> {
400 pool: Pool,
401 inner: S,
402}
403
404impl<State, S> Service<State, Frame> for FramePoolService<S>
405where
406 S: Service<Pool, Frame, Response = Frame>,
407 State: Send + Sync + 'static,
408{
409 type Response = Frame;
410 type Error = S::Error;
411
412 async fn serve(&self, ctx: Context<State>, req: Frame) -> Result<Self::Response, Self::Error> {
413 let (ctx, _) = ctx.swap_state(self.pool.clone());
414 self.inner.serve(ctx, req).await
415 }
416}
417
418#[derive(Clone, Debug)]
420pub struct RequestPoolLayer {
421 pool: Pool,
422}
423
424impl RequestPoolLayer {
425 pub fn new(pool: Pool) -> Self {
426 Self { pool }
427 }
428}
429
430impl<S> Layer<S> for RequestPoolLayer {
431 type Service = RequestPoolService<S>;
432
433 fn layer(&self, inner: S) -> Self::Service {
434 RequestPoolService {
435 pool: self.pool.clone(),
436 inner,
437 }
438 }
439}
440
441#[derive(Clone, Debug)]
443pub struct RequestPoolService<S> {
444 pool: Pool,
445 inner: S,
446}
447
448impl<State, S, Q> Service<State, Q> for RequestPoolService<S>
449where
450 Q: Request,
451 S: Service<Pool, Q>,
452 State: Send + Sync + 'static,
453{
454 type Response = S::Response;
455 type Error = S::Error;
456
457 async fn serve(&self, ctx: Context<State>, req: Q) -> Result<Self::Response, Self::Error> {
459 let (ctx, _) = ctx.swap_state(self.pool.clone());
460 self.inner.serve(ctx, req).await
461 }
462}
463
464#[derive(Clone, Debug)]
466pub struct Client {
467 service:
468 RequestPoolService<RequestConnectionService<FrameBytesService<BytesConnectionService>>>,
469}
470
471impl Client {
472 pub fn new(pool: Pool) -> Self {
474 let service = (
475 RequestPoolLayer::new(pool),
476 RequestConnectionLayer,
477 FrameBytesLayer,
478 )
479 .into_layer(BytesConnectionService);
480
481 Self { service }
482 }
483
484 pub async fn call<Q>(&self, req: Q) -> Result<Q::Response, Error>
486 where
487 Q: Request,
488 Error: From<<<Q as Request>::Response as TryFrom<Body>>::Error>,
489 {
490 self.service.serve(Context::default(), req).await
491 }
492}
493
494#[derive(Clone, Copy, Debug, Default, Eq, Hash, Ord, PartialEq, PartialOrd)]
496pub struct FrameConnectionLayer;
497
498impl<S> Layer<S> for FrameConnectionLayer {
499 type Service = FrameConnectionService<S>;
500
501 fn layer(&self, inner: S) -> Self::Service {
502 Self::Service { inner }
503 }
504}
505
506#[derive(Clone, Copy, Debug, Default, Eq, Hash, Ord, PartialEq, PartialOrd)]
508pub struct FrameConnectionService<S> {
509 inner: S,
510}
511
512impl<S> Service<Pool, Frame> for FrameConnectionService<S>
513where
514 S: Service<Object<ConnectionManager>, Frame, Response = Frame>,
515 S::Error: From<Error> + From<PoolError<Error>> + From<nisshi_sans_io::Error>,
516{
517 type Response = Frame;
518 type Error = S::Error;
519
520 async fn serve(&self, ctx: Context<Pool>, req: Frame) -> Result<Self::Response, Self::Error> {
521 debug!(?req);
522
523 let api_key = req.api_key()?;
524 let api_version = req.api_version()?;
525 let client_id = req
526 .client_id()
527 .map(|client_id| client_id.map(|client_id| client_id.to_string()))?;
528
529 let pool = ctx.state();
530 status_update(pool);
531
532 let connection = {
533 let start = SystemTime::now();
534 pool.get().await.inspect(|_| {
535 POOL_GET_DURATION.record(
536 start
537 .elapsed()
538 .map_or(0, |duration| duration.as_millis() as u64),
539 &[],
540 );
541 })?
542 };
543
544 let correlation_id = connection.correlation_id;
545
546 let frame = Frame {
547 size: 0,
548 header: Header::Request {
549 api_key,
550 api_version,
551 correlation_id,
552 client_id,
553 },
554 body: req.body,
555 };
556
557 let (ctx, _) = ctx.swap_state(connection);
558
559 self.inner.serve(ctx, frame).await
560 }
561}
562
563#[derive(Clone, Copy, Debug, Default, Eq, Hash, Ord, PartialEq, PartialOrd)]
565pub struct RequestConnectionLayer;
566
567impl<S> Layer<S> for RequestConnectionLayer {
568 type Service = RequestConnectionService<S>;
569
570 fn layer(&self, inner: S) -> Self::Service {
571 Self::Service { inner }
572 }
573}
574
575#[derive(Clone, Copy, Debug, Default, Eq, Hash, Ord, PartialEq, PartialOrd)]
579pub struct RequestConnectionService<S> {
580 inner: S,
581}
582
583impl<Q, S> Service<Pool, Q> for RequestConnectionService<S>
584where
585 Q: Request,
586 S: Service<Object<ConnectionManager>, Frame, Response = Frame>,
587 S::Error: From<Error>
588 + From<PoolError<Error>>
589 + From<nisshi_sans_io::Error>
590 + From<<Q::Response as TryFrom<Body>>::Error>,
591{
592 type Response = Q::Response;
593 type Error = S::Error;
594
595 async fn serve(&self, ctx: Context<Pool>, req: Q) -> Result<Self::Response, Self::Error> {
596 debug!(?req);
597 let pool = ctx.state();
598 status_update(pool);
599
600 let api_key = Q::KEY;
601 let api_version = pool.manager().api_version(api_key)?;
602 let client_id = pool.manager().client_id();
603 let connection = {
604 let start = SystemTime::now();
605 pool.get().await.inspect(|_| {
606 POOL_GET_DURATION.record(
607 start
608 .elapsed()
609 .map_or(0, |duration| duration.as_millis() as u64),
610 &[],
611 );
612 })?
613 };
614
615 let correlation_id = connection.correlation_id;
616
617 let frame = Frame {
618 size: 0,
619 header: Header::Request {
620 api_key,
621 api_version,
622 correlation_id,
623 client_id,
624 },
625 body: req.into(),
626 };
627
628 let (ctx, _) = ctx.swap_state(connection);
629
630 let frame = self.inner.serve(ctx, frame).await?;
631
632 Q::Response::try_from(frame.body)
633 .inspect(|response| debug!(?response))
634 .map_err(Into::into)
635 }
636}
637
638#[derive(Clone, Copy, Debug, Default, Eq, Hash, Ord, PartialEq, PartialOrd)]
640pub struct BytesConnectionService;
641
642impl BytesConnectionService {
643 async fn write(
644 &self,
645 stream: &mut TcpStream,
646 frame: Bytes,
647 attributes: &[KeyValue],
648 ) -> Result<(), Error> {
649 debug!(frame = ?&frame[..]);
650
651 let start = SystemTime::now();
652
653 stream
654 .write_all(&frame[..])
655 .await
656 .inspect(|_| {
657 TCP_SEND_DURATION.record(
658 start
659 .elapsed()
660 .map_or(0, |duration| duration.as_millis() as u64),
661 attributes,
662 );
663
664 TCP_BYTES_SENT.add(frame.len() as u64, attributes);
665 })
666 .inspect_err(|_| {
667 TCP_SEND_ERRORS.add(1, attributes);
668 })
669 .map_err(Into::into)
670 }
671
672 async fn read(&self, stream: &mut TcpStream, attributes: &[KeyValue]) -> Result<Bytes, Error> {
673 let start = SystemTime::now();
674
675 let mut size = [0u8; 4];
676 _ = stream.read_exact(&mut size).await?;
677
678 let mut buffer: Vec<u8> = vec![0u8; frame_length(size)];
679 buffer[0..size.len()].copy_from_slice(&size[..]);
680 _ = stream
681 .read_exact(&mut buffer[4..])
682 .await
683 .inspect(|_| {
684 TCP_RECEIVE_DURATION.record(
685 start
686 .elapsed()
687 .map_or(0, |duration| duration.as_millis() as u64),
688 attributes,
689 );
690
691 TCP_BYTES_RECEIVED.add(buffer.len() as u64, attributes);
692 })
693 .inspect_err(|_| {
694 TCP_RECEIVE_ERRORS.add(1, attributes);
695 })?;
696
697 Ok(Bytes::from(buffer)).inspect(|frame| debug!(frame = ?&frame[..]))
698 }
699}
700
701impl Service<Object<ConnectionManager>, Bytes> for BytesConnectionService {
702 type Response = Bytes;
703 type Error = Error;
704
705 async fn serve(
706 &self,
707 mut ctx: Context<Object<ConnectionManager>>,
708 req: Bytes,
709 ) -> Result<Self::Response, Self::Error> {
710 let c = ctx.state_mut();
711
712 let local = c.stream.local_addr()?;
713 let peer = c.stream.peer_addr()?;
714
715 let attributes = [KeyValue::new("peer", peer.to_string())];
716
717 let span = span!(Level::DEBUG, "client", local = %local, peer = %peer);
718
719 async move {
720 self.write(&mut c.stream, req, &attributes).await?;
721
722 c.correlation_id += 1;
723
724 self.read(&mut c.stream, &attributes).await
725 }
726 .instrument(span)
727 .await
728 }
729}
730
731fn frame_length(encoded: [u8; 4]) -> usize {
732 i32::from_be_bytes(encoded) as usize + encoded.len()
733}
734
735static TCP_CONNECT_DURATION: LazyLock<Histogram<u64>> = LazyLock::new(|| {
736 METER
737 .u64_histogram("tcp_connect_duration")
738 .with_unit("ms")
739 .with_description("The TCP connect latencies in milliseconds")
740 .build()
741});
742
743static TCP_CONNECT_ERRORS: LazyLock<Counter<u64>> = LazyLock::new(|| {
744 METER
745 .u64_counter("tcp_connect_errors")
746 .with_description("TCP connect errors")
747 .build()
748});
749
750static TCP_SEND_DURATION: LazyLock<Histogram<u64>> = LazyLock::new(|| {
751 METER
752 .u64_histogram("tcp_send_duration")
753 .with_unit("ms")
754 .with_description("The TCP send latencies in milliseconds")
755 .build()
756});
757
758static TCP_SEND_ERRORS: LazyLock<Counter<u64>> = LazyLock::new(|| {
759 METER
760 .u64_counter("tcp_send_errors")
761 .with_description("TCP send errors")
762 .build()
763});
764
765static TCP_RECEIVE_DURATION: LazyLock<Histogram<u64>> = LazyLock::new(|| {
766 METER
767 .u64_histogram("tcp_receive_duration")
768 .with_unit("ms")
769 .with_description("The TCP receive latencies in milliseconds")
770 .build()
771});
772
773static TCP_RECEIVE_ERRORS: LazyLock<Counter<u64>> = LazyLock::new(|| {
774 METER
775 .u64_counter("tcp_receive_errors")
776 .with_description("TCP receive errors")
777 .build()
778});
779
780static TCP_BYTES_SENT: LazyLock<Counter<u64>> = LazyLock::new(|| {
781 METER
782 .u64_counter("tcp_bytes_sent")
783 .with_description("TCP bytes sent")
784 .build()
785});
786
787static TCP_BYTES_RECEIVED: LazyLock<Counter<u64>> = LazyLock::new(|| {
788 METER
789 .u64_counter("tcp_bytes_received")
790 .with_description("TCP bytes received")
791 .build()
792});
793
794static POOL_GET_DURATION: LazyLock<Histogram<u64>> = LazyLock::new(|| {
795 METER
796 .u64_histogram("pool_get_duration")
797 .with_unit("ms")
798 .with_description("The Pool Get latencies in milliseconds")
799 .build()
800});
801
802static POOL_MAX_SIZE: LazyLock<Gauge<u64>> = LazyLock::new(|| {
803 METER
804 .u64_gauge("pool_max_size")
805 .with_description("The maximum size of the pool")
806 .build()
807});
808
809static POOL_CURRENT_SIZE: LazyLock<Gauge<u64>> = LazyLock::new(|| {
810 METER
811 .u64_gauge("pool_current_size")
812 .with_description("The current size of the pool")
813 .build()
814});
815
816static POOL_AVAILABLE: LazyLock<Gauge<u64>> = LazyLock::new(|| {
817 METER
818 .u64_gauge("pool_available")
819 .with_description("The number of available objects in the pool")
820 .build()
821});
822
823static POOL_WAITING: LazyLock<Gauge<u64>> = LazyLock::new(|| {
824 METER
825 .u64_gauge("pool_waiting")
826 .with_description("The number of waiting objects in the pool")
827 .build()
828});
829
830#[cfg(test)]
831mod tests {
832 use std::{fs::File, thread};
833
834 use nisshi_sans_io::{MetadataRequest, MetadataResponse};
835 use nisshi_service::{
836 BytesFrameLayer, FrameRouteService, RequestLayer, ResponseService, TcpBytesLayer,
837 TcpContextLayer, TcpListenerLayer,
838 };
839 use tokio::{net::TcpListener, task::JoinSet};
840 use tokio_util::sync::CancellationToken;
841 use tracing::subscriber::DefaultGuard;
842 use tracing_subscriber::EnvFilter;
843
844 use super::*;
845
846 fn init_tracing() -> Result<DefaultGuard, Error> {
847 Ok(tracing::subscriber::set_default(
848 tracing_subscriber::fmt()
849 .with_level(true)
850 .with_line_number(true)
851 .with_thread_names(false)
852 .with_env_filter(
853 EnvFilter::from_default_env()
854 .add_directive(format!("{}=debug", env!("CARGO_CRATE_NAME")).parse()?),
855 )
856 .with_writer(
857 thread::current()
858 .name()
859 .ok_or(Error::Message(String::from("unnamed thread")))
860 .and_then(|name| {
861 File::create(format!("../logs/{}/{name}.log", env!("CARGO_PKG_NAME"),))
862 .map_err(Into::into)
863 })
864 .map(Arc::new)?,
865 )
866 .finish(),
867 ))
868 }
869
870 async fn server(cancellation: CancellationToken, listener: TcpListener) -> Result<(), Error> {
871 let server = (
872 TcpListenerLayer::new(cancellation),
873 TcpContextLayer::default(),
874 TcpBytesLayer::default(),
875 BytesFrameLayer::default(),
876 )
877 .into_layer(
878 FrameRouteService::builder()
879 .with_service(RequestLayer::<MetadataRequest>::new().into_layer(
880 ResponseService::new(|_ctx: Context<()>, _req: MetadataRequest| {
881 Ok::<_, Error>(
882 MetadataResponse::default()
883 .brokers(Some([].into()))
884 .topics(Some([].into()))
885 .cluster_id(Some("abc".into()))
886 .controller_id(Some(111))
887 .throttle_time_ms(Some(0))
888 .cluster_authorized_operations(Some(-1)),
889 )
890 }),
891 ))
892 .and_then(|builder| builder.build())?,
893 );
894
895 server.serve(Context::default(), listener).await
896 }
897
898 #[tokio::test]
899 async fn tcp_client_server() -> Result<(), Error> {
900 let _guard = init_tracing()?;
901
902 let cancellation = CancellationToken::new();
903 let listener = TcpListener::bind("127.0.0.1:0").await?;
904 let local_addr = listener.local_addr()?;
905
906 let mut join = JoinSet::new();
907
908 let _server = {
909 let cancellation = cancellation.clone();
910 join.spawn(async move { server(cancellation, listener).await })
911 };
912
913 let origin = (
914 RequestPoolLayer::new(
915 ConnectionManager::builder(
916 Url::parse(&format!("tcp://{local_addr}")).inspect(|url| debug!(%url))?,
917 )
918 .client_id(Some(env!("CARGO_PKG_NAME").into()))
919 .build()
920 .await
921 .inspect(|pool| debug!(?pool))?,
922 ),
923 RequestConnectionLayer,
924 FrameBytesLayer,
925 )
926 .into_layer(BytesConnectionService);
927
928 let response = origin
929 .serve(
930 Context::default(),
931 MetadataRequest::default()
932 .topics(Some([].into()))
933 .allow_auto_topic_creation(Some(false))
934 .include_cluster_authorized_operations(Some(false))
935 .include_topic_authorized_operations(Some(false)),
936 )
937 .await?;
938
939 assert_eq!(Some("abc"), response.cluster_id.as_deref());
940 assert_eq!(Some(111), response.controller_id);
941
942 cancellation.cancel();
943
944 let joined = join.join_all().await;
945 debug!(?joined);
946
947 Ok(())
948 }
949}