1use core::task::{Context, Poll};
2use std::collections::HashMap;
3use std::io::Cursor;
4use std::io::Error as IoError;
5use std::io::ErrorKind;
6use std::marker::PhantomData;
7use std::pin::Pin;
8use std::sync::Arc;
9use std::sync::atomic::AtomicBool;
10use std::sync::atomic::Ordering::{SeqCst, Relaxed};
11use std::sync::atomic::AtomicI32;
12use std::time::Duration;
13use std::fmt;
14use std::future::Future;
15
16use async_channel::bounded;
17use async_channel::Receiver;
18use async_channel::Sender;
19use async_lock::Mutex;
20use bytes::Bytes;
21use event_listener::Event;
22use futures_util::ready;
23use futures_util::stream::{Stream, StreamExt};
24use pin_project::pin_project;
25use pin_project::pinned_drop;
26use tokio::select;
27use tracing::{info, warn};
28use tracing::{debug, error, trace, instrument};
29
30use fluvio_future::net::ConnectionFd;
31use fluvio_future::timer::sleep;
32use fluvio_protocol::api::Request;
33use fluvio_protocol::api::RequestHeader;
34use fluvio_protocol::api::RequestMessage;
35use fluvio_protocol::Decoder;
36
37use crate::SocketError;
38use crate::ExclusiveFlvSink;
39use crate::FluvioSocket;
40use crate::FluvioStream;
41
42pub type SharedMultiplexerSocket = Arc<MultiplexerSocket>;
43
44#[derive(Clone)]
45struct SharedMsg(Arc<Mutex<Option<Bytes>>>, Arc<Event>);
46
47enum SharedSender {
49 Serial(SharedMsg),
51 Queue(Sender<Option<Bytes>>),
53}
54
55type Senders = Arc<Mutex<HashMap<i32, SharedSender>>>;
56
57pub struct MultiplexerSocket {
59 correlation_id_counter: AtomicI32,
60 senders: Senders,
61 sink: ExclusiveFlvSink,
62 stale: Arc<AtomicBool>,
63 terminate: Arc<Event>,
64}
65
66impl fmt::Debug for MultiplexerSocket {
67 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> std::fmt::Result {
68 write!(f, "MultiplexerSocket {}", self.sink.id())
69 }
70}
71
72impl Drop for MultiplexerSocket {
73 fn drop(&mut self) {
74 self.terminate.notify(usize::MAX);
76 }
77}
78
79impl MultiplexerSocket {
80 pub fn shared(socket: FluvioSocket) -> Arc<Self> {
81 Arc::new(Self::new(socket))
82 }
83
84 #[allow(clippy::clone_on_copy)]
87 pub fn new(socket: FluvioSocket) -> Self {
88 let id = socket.id().clone();
89 debug!(socket = %id, "spawning dispatcher");
90
91 let (sink, stream) = socket.split();
92 let stale = Arc::new(AtomicBool::new(false));
93
94 let multiplexer = Self {
95 correlation_id_counter: AtomicI32::new(1),
96 senders: Arc::new(Mutex::new(HashMap::new())),
97 sink: ExclusiveFlvSink::new(sink),
98 terminate: Arc::new(Event::new()),
99 stale: stale.clone(),
100 };
101
102 MultiPlexingResponseDispatcher::run(
103 id,
104 stream,
105 multiplexer.senders.clone(),
106 multiplexer.terminate.clone(),
107 stale,
108 );
109
110 multiplexer
111 }
112
113 pub fn set_stale(&self) {
114 self.stale.store(true, SeqCst);
115 }
116
117 pub fn is_stale(&self) -> bool {
118 self.stale.load(SeqCst)
119 }
120
121 fn next_correlation_id(&self) -> i32 {
123 self.correlation_id_counter.fetch_add(1, Relaxed)
124 }
125
126 #[instrument(skip(req_msg))]
128 pub async fn send_and_receive<R>(
129 &self,
130 mut req_msg: RequestMessage<R>,
131 ) -> Result<R::Response, SocketError>
132 where
133 R: Request,
134 {
135 use once_cell::sync::Lazy;
136
137 static MAX_WAIT_TIME: Lazy<u64> = Lazy::new(|| {
138 use std::env;
139
140 let var_value = env::var("FLV_SOCKET_WAIT").unwrap_or_default();
141 let wait_time: u64 = var_value.parse().unwrap_or(60);
142 wait_time
143 });
144
145 let correlation_id = self.next_correlation_id();
146 let bytes_lock = SharedMsg(Arc::new(Mutex::new(None)), Arc::new(Event::new()));
147
148 req_msg.header.set_correlation_id(correlation_id);
149
150 trace!(correlation_id, "senders trying lock");
151 let mut senders = self.senders.lock().await;
152 senders.insert(correlation_id, SharedSender::Serial(bytes_lock.clone()));
153 drop(senders);
154
155 let SharedMsg(msg, msg_event) = bytes_lock;
156 let listener = msg_event.listen();
158
159 debug!(api = R::API_KEY, correlation_id, "sending request");
160 self.sink.send_request(&req_msg).await?;
161 trace!(correlation_id, "waiting");
162
163 select! {
164
165 _ = sleep(Duration::from_secs(*MAX_WAIT_TIME)) => {
166
167 trace!("serial socket for: {} timeout happen, id: {}", R::API_KEY, correlation_id);
168 let mut senders = self.senders.lock().await;
170 senders.remove(&correlation_id);
171 drop(senders);
172 self.set_stale();
173
174
175 Err(IoError::new(
176 ErrorKind::TimedOut,
177 format!("Timed out: {} secs waiting for response. API_KEY={}, CorrelationId={}", *MAX_WAIT_TIME,R::API_KEY, correlation_id),
178 ).into())
179 },
180
181 _ = listener => {
182
183 trace!(correlation_id,"msg event");
185 let mut senders = self.senders.lock().await;
186 senders.remove(&correlation_id);
187 drop(senders);
188
189 match msg.try_lock() {
190 Some(guard) => {
191
192 if let Some(response_bytes) = &*guard {
193
194 debug!(correlation_id, len = response_bytes.len(),"receive serial message");
195 let response = R::Response::decode_from(
196 &mut Cursor::new(&response_bytes),
197 req_msg.header.api_version(),
198 )?;
199 trace!("receive serial socket id: {}, response: {:#?}", correlation_id, response);
200 Ok(response)
201 } else {
202 debug!("serial socket: {}, id: {}, value is empty, something bad happened",R::API_KEY,correlation_id);
203 Err(IoError::new(
204 ErrorKind::UnexpectedEof,
205 "connection is closed".to_string(),
206 ).into())
207 }
208
209 },
210 None => Err(IoError::new(
211 ErrorKind::BrokenPipe,
212 format!("locked failed: {correlation_id}, serial socket is in bad state")
213 ).into())
214 }
215 },
216 }
217 }
218 #[instrument(skip(req_msg))]
220 pub async fn send_async<R>(
221 &self,
222 req_msg: RequestMessage<R>,
223 ) -> Result<AsyncResponse<R>, SocketError>
224 where
225 R: Request,
226 {
227 self.create_stream(req_msg, 1).await
228 }
229
230 #[instrument(skip(self,req_msg), fields(api = R::API_KEY))]
232 pub async fn create_stream<R>(
233 &self,
234 mut req_msg: RequestMessage<R>,
235 queue_len: usize,
236 ) -> Result<AsyncResponse<R>, SocketError>
237 where
238 R: Request,
239 {
240 let correlation_id = self.next_correlation_id();
241
242 req_msg.header.set_correlation_id(correlation_id);
243
244 trace!(correlation_id,request = ?req_msg, "new correlation id");
245
246 let (sender, receiver) = bounded(queue_len);
248 let mut senders = self.senders.lock().await;
249
250 senders.retain(|_, shared_sender| match shared_sender {
253 SharedSender::Serial(_) => true,
254 SharedSender::Queue(sender) => !sender.is_closed(),
255 });
256
257 senders.insert(correlation_id, SharedSender::Queue(sender));
258 drop(senders);
259
260 trace!(correlation_id, "created new channel");
261
262 self.sink.send_request(&req_msg).await?;
263
264 trace!(correlation_id, "request send");
265
266 Ok(AsyncResponse {
270 receiver: Box::pin(receiver),
271 header: req_msg.header,
272 correlation_id,
273 data: PhantomData,
274 })
275 }
276}
277
278#[pin_project(PinnedDrop)]
281pub struct AsyncResponse<R> {
282 #[pin]
283 receiver: Pin<Box<Receiver<Option<Bytes>>>>,
284 header: RequestHeader,
285 correlation_id: i32,
286 data: PhantomData<R>,
287}
288
289#[pinned_drop]
290impl<R> PinnedDrop for AsyncResponse<R> {
291 fn drop(self: Pin<&mut Self>) {
292 self.receiver.close();
293 debug!("multiplexer stream: {} closed", self.correlation_id);
294 }
295}
296
297impl<R: Request> Stream for AsyncResponse<R> {
298 type Item = Result<R::Response, SocketError>;
299
300 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
301 let this = self.project();
302 match ready!(this.receiver.poll_next(cx)) {
303 Some(Some(bytes)) => {
304 use bytes::Buf;
305 let response_len = bytes.len();
306 debug!(
307 response_len,
308 remaining = bytes.remaining(),
309 version = this.header.api_version(),
310 "response len>>>"
311 );
312
313 let mut cursor = Cursor::new(bytes);
314 let response = R::Response::decode_from(&mut cursor, this.header.api_version());
315 match response {
316 Ok(value) => {
317 trace!("Received response bytes: {}, {:#?}", response_len, &value,);
318 Poll::Ready(Some(Ok(value)))
319 }
320 Err(e) => Poll::Ready(Some(Err(e.into()))),
321 }
322 }
323 Some(None) => Poll::Ready(Some(Err(SocketError::SocketClosed))),
324 None => Poll::Ready(None),
325 }
326 }
327}
328
329impl<R: Request> Future for AsyncResponse<R> {
330 type Output = Result<R::Response, SocketError>;
331
332 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
333 let this = self.project();
334 match ready!(this.receiver.poll_next(cx)) {
335 Some(Some(bytes)) => {
336 use bytes::Buf;
337 let response_len = bytes.len();
338 debug!(
339 response_len,
340 remaining = bytes.remaining(),
341 version = this.header.api_version(),
342 "response len>>>"
343 );
344
345 let mut cursor = Cursor::new(bytes);
346 let response = R::Response::decode_from(&mut cursor, this.header.api_version());
347 match response {
348 Ok(value) => {
349 trace!("Received response bytes: {}, {:#?}", response_len, &value,);
350 Poll::Ready(Ok(value))
351 }
352 Err(e) => Poll::Ready(Err(e.into())),
353 }
354 }
355 Some(None) => Poll::Ready(Err(SocketError::SocketClosed)),
356 None => Poll::Ready(Err(SocketError::SocketClosed)),
357 }
358 }
359}
360
361struct MultiPlexingResponseDispatcher {
363 id: ConnectionFd,
364 senders: Senders,
365 terminate: Arc<Event>,
366 stale: Arc<AtomicBool>,
367}
368
369impl fmt::Debug for MultiPlexingResponseDispatcher {
370 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
371 write!(f, "MultiplexDisp({})", self.id)
372 }
373}
374
375impl MultiPlexingResponseDispatcher {
376 pub fn run(
377 id: ConnectionFd,
378 stream: FluvioStream,
379 senders: Senders,
380 terminate: Arc<Event>,
381 stale: Arc<AtomicBool>,
382 ) {
383 use fluvio_future::task::spawn;
384
385 let dispatcher = Self {
386 id,
387 senders,
388 terminate,
389 stale,
390 };
391
392 spawn(dispatcher.dispatcher_loop(stream));
393 }
394
395 #[instrument(skip(stream))]
396 async fn dispatcher_loop(mut self, mut stream: FluvioStream) {
397 let frame_stream = stream.get_mut_tcp_stream();
398
399 loop {
400 trace!("waiting");
401
402 select! {
403 frame = frame_stream.next() => {
404 match frame {
405 Some(Ok(mut msg)) => {
406 let mut correlation_id: i32 = 0;
407 match correlation_id.decode(&mut msg, 0) {
408 Ok(_) => {
409 use bytes::Buf;
410 debug!(correlation_id,len = msg.len(), remaining = msg.remaining(), "received frame");
411
412 if let Err(err) = self.send(correlation_id, msg.freeze()).await {
413 error!("error sending to socket, {}", err)
414 }
415 }
416 Err(err) => error!("error decoding response, {}", err),
417 }
418 },
419 Some(Err(err)) => {
420 warn!("problem getting frame from stream: {err}. terminating");
421 self.close().await;
422 break;
423 },
424 None => {
425 info!("inner stream has terminated ");
426 self.close().await;
427 break;
428 }
429 }
430 },
431
432 _ = self.terminate.listen() => {
433 let guard = self.senders.lock().await;
436 for sender in guard.values() {
437 match sender {
438 SharedSender::Serial(msg) => msg.close().await,
439 SharedSender::Queue(stream_sender) => {
440 stream_sender.close();
441 }
442 }
443 }
444
445 info!("multiplexer terminated");
446 break;
447
448 }
449 }
450 }
451 }
452
453 #[instrument(skip(self, msg),fields( msg = msg.len()))]
455 async fn send(&mut self, correlation_id: i32, msg: Bytes) -> Result<(), SocketError> {
456 let mut senders = self.senders.lock().await;
457 if let Some(sender) = senders.get_mut(&correlation_id) {
458 match sender {
459 SharedSender::Serial(serial_sender) => {
460 trace!("found serial");
461 match serial_sender.0.try_lock() {
463 Some(mut guard) => {
464 *guard = Some(msg);
465 drop(guard); serial_sender.1.notify(1);
467 trace!("found serial");
468 Ok(())
469 }
470 None => Err(IoError::new(
471 ErrorKind::BrokenPipe,
472 format!(
473 "failed locking, abandoning sending to socket: {correlation_id}"
474 ),
475 )
476 .into()),
477 }
478 }
479 SharedSender::Queue(queue_sender) => {
480 trace!("found stream");
481 if queue_sender.is_closed() {
483 debug!(correlation_id, "attempt to send data to closed socket");
484 Ok(())
485 } else {
486 queue_sender.send(Some(msg)).await.map_err(|err| {
487 IoError::new(
488 ErrorKind::BrokenPipe,
489 format!(
490 "problem sending to queue socket: {correlation_id}, err: {err}"
491 ),
492 )
493 .into()
494 })
495 }
496 }
497 }
498 } else {
499 debug!(
501 correlation_id,
502 "no socket receiver found, abandoning sending",
503 );
504 Ok(())
505 }
506 }
507
508 async fn close(&self) {
509 self.stale.store(true, SeqCst);
510
511 let guard = self.senders.lock().await;
512 for sender in guard.values() {
513 match sender {
514 SharedSender::Serial(msg) => msg.close().await,
515 SharedSender::Queue(stream_sender) => {
516 let _ = stream_sender.send(None).await;
517 }
518 }
519 }
520
521 info!("multiplexer closed")
522 }
523}
524
525impl SharedMsg {
526 async fn close(&self) {
527 let mut guard = self.0.lock().await;
528 *guard = None;
529 drop(guard);
530 self.1.notify(1);
531 }
532}
533
534#[cfg(test)]
535mod tests {
536
537 use std::time::Duration;
538 use std::io::ErrorKind;
539
540 use async_trait::async_trait;
541 use futures_util::future::{join, join3};
542 use futures_util::io::{AsyncRead, AsyncWrite};
543 use futures_util::StreamExt;
544 use tracing::debug;
545
546 use fluvio_future::net::TcpListener;
547 use fluvio_future::net::TcpStream;
548 use fluvio_future::task::spawn;
549 use fluvio_future::timer::sleep;
550 use fluvio_protocol::api::RequestMessage;
551
552 use super::MultiplexerSocket;
553 use super::SocketError;
554 use crate::test_request::*;
555 use crate::ExclusiveFlvSink;
556 use crate::FluvioSocket;
557
558 #[allow(unused)]
559 const CA_PATH: &str = "certs/certs/ca.crt";
560 #[allow(unused)]
561 const X509_SERVER: &str = "certs/certs/server.crt";
562 #[allow(unused)]
563 const X509_SERVER_KEY: &str = "certs/certs/server.key";
564 #[allow(unused)]
565 const X509_CLIENT: &str = "certs/certs/client.crt";
566 #[allow(unused)]
567 const X509_CLIENT_KEY: &str = "certs/certs/client.key";
568
569 #[allow(unused)]
570 const SLEEP_MS: u64 = 10;
571
572 #[async_trait]
573 trait AcceptorHandler {
574 type Stream: AsyncRead + AsyncWrite + Unpin + Send;
575 async fn accept(&mut self, stream: TcpStream) -> FluvioSocket;
576 }
577
578 #[derive(Clone)]
579 struct TcpStreamHandler {}
580
581 #[async_trait]
582 impl AcceptorHandler for TcpStreamHandler {
583 type Stream = TcpStream;
584
585 async fn accept(&mut self, stream: TcpStream) -> FluvioSocket {
586 stream.into()
587 }
588 }
589
590 fn get_error_kind<T: std::fmt::Debug>(
591 result: Result<T, SocketError>,
592 ) -> Option<std::io::ErrorKind> {
593 match result {
594 Err(SocketError::Io { source, .. }) => Some(source.kind()),
595 _ => None,
596 }
597 }
598
599 async fn test_server<A: AcceptorHandler + 'static>(
600 addr: &str,
601 mut handler: A,
602 nb_iter: usize,
603 timeout: u64,
604 ) {
605 let listener = TcpListener::bind(addr).await.expect("binding");
606 debug!("server is running");
607 let mut incoming = listener.incoming();
608 let incoming_stream = incoming.next().await;
609 debug!("server: got connection");
610 let incoming_stream = incoming_stream.expect("next").expect("unwrap again");
611 let socket: FluvioSocket = handler.accept(incoming_stream).await;
612
613 let (sink, mut stream) = socket.split();
614
615 let shared_sink = ExclusiveFlvSink::new(sink);
616
617 let mut api_stream = stream.api_stream::<TestApiRequest, TestKafkaApiEnum>();
618
619 for i in 0..nb_iter {
620 debug!("server: waiting for next msg: {}", i);
621 let msg = api_stream.next().await.expect("msg").expect("unwrap");
622 debug!("server: msg received: {:#?}", msg);
623
624 match msg {
625 TestApiRequest::EchoRequest(echo_request) => {
626 let mut reply_sink = shared_sink.clone();
627 if echo_request.request().msg == "slow" {
629 debug!("server: received slow msg");
630 spawn(async move {
631 sleep(Duration::from_millis(SLEEP_MS * 50)).await;
632 sleep(Duration::from_secs(timeout)).await; let resp =
634 echo_request.new_response(EchoResponse::new("slow".to_owned()));
635 debug!("server send slow response");
636 reply_sink
637 .send_response(&resp, 0)
638 .await
639 .expect("send succeed");
640 });
641 } else {
642 debug!("server: received fast msg");
643 spawn(async move {
644 let resp =
645 echo_request.new_response(EchoResponse::new("hello".to_owned()));
646 debug!("server: send fast response");
647 reply_sink
648 .send_response(&resp, 0)
649 .await
650 .expect("send succeed");
651 });
652 }
653 }
654 TestApiRequest::AsyncStatusRequest(status_request) => {
655 debug!("server: received async status msg");
656 let mut reply_sink = shared_sink.clone();
657 spawn(async move {
658 sleep(Duration::from_millis(SLEEP_MS * 3)).await;
659 let resp = status_request.new_response(AsyncStatusResponse {
660 status: status_request.request.count * 2,
661 });
662 reply_sink
663 .send_response(&resp, 0)
664 .await
665 .expect("send succeed");
666 debug!("server: send back status first");
667 sleep(Duration::from_millis(SLEEP_MS * 10)).await;
668 let resp = status_request.new_response(AsyncStatusResponse {
669 status: status_request.request.count * 4,
670 });
671 reply_sink
672 .send_response(&resp, 0)
673 .await
674 .expect("send succeed");
675 debug!("server: send back status second");
676 });
677 }
678 _ => panic!("no echo request"),
679 }
680 }
681
682 debug!("server: finish sending out"); }
684
685 #[async_trait]
686 trait ConnectorHandler {
687 type Stream: AsyncRead + AsyncWrite + Unpin + Send + Sync;
688 async fn connect(&mut self, stream: TcpStream) -> FluvioSocket;
689 }
690
691 #[async_trait]
692 impl ConnectorHandler for TcpStreamHandler {
693 type Stream = TcpStream;
694
695 async fn connect(&mut self, stream: TcpStream) -> FluvioSocket {
696 stream.into()
697 }
698 }
699
700 async fn test_client<C: ConnectorHandler + 'static>(addr: &str, mut handler: C) {
701 use std::time::SystemTime;
702
703 sleep(Duration::from_millis(SLEEP_MS * 2)).await;
704 debug!("client: trying to connect");
705 let tcp_stream = TcpStream::connect(&addr).await.expect("connection fail");
706 let socket = handler.connect(tcp_stream).await;
707 debug!("client: connected to test server and waiting...");
708 sleep(Duration::from_millis(SLEEP_MS * 2)).await;
709 let multiplexer = MultiplexerSocket::shared(socket);
710
711 let async_status_request = RequestMessage::new_request(AsyncStatusRequest { count: 2 });
713 let mut status_response = multiplexer
714 .create_stream(async_status_request, 10)
715 .await
716 .expect("response");
717
718 let multiplexor2 = multiplexer.clone();
719 let multiplexor3 = multiplexer.clone();
720
721 let (slow, fast, _) = join3(
722 async move {
723 debug!("trying to send slow");
724 let request = RequestMessage::new_request(EchoRequest::new("slow".to_owned()));
726 let response = multiplexer
727 .send_and_receive(request)
728 .await
729 .expect("send success");
730 debug!("received slow response");
731 assert_eq!(response.msg, "slow");
732 SystemTime::now()
733 },
734 async move {
735 sleep(Duration::from_millis(SLEEP_MS * 2)).await;
737 debug!("trying to send fast");
738 let request = RequestMessage::new_request(EchoRequest::new("fast".to_owned()));
739 let response = multiplexor2
740 .send_and_receive(request)
741 .await
742 .expect("send success");
743 debug!("received fast response");
744 assert_eq!(response.msg, "hello");
745 SystemTime::now()
746 },
747 async move {
748 sleep(Duration::from_millis(SLEEP_MS * 10)).await;
749 let response = status_response
750 .next()
751 .await
752 .expect("stream yields value")
753 .expect("async response");
754 debug!("received async response");
755 assert_eq!(response.status, 4); let response = status_response
757 .next()
758 .await
759 .expect("stream yields value")
760 .expect("async response");
761 debug!("received async response");
762 assert_eq!(response.status, 8);
763 SystemTime::now()
764 },
765 )
766 .await;
767
768 assert!(slow > fast);
769
770 let echo_request = RequestMessage::new_request(EchoRequest {
772 msg: "fast".to_string(),
773 });
774 let echo_async_response = multiplexor3
775 .send_async(echo_request)
776 .await
777 .expect("async response future");
778 let response = echo_async_response.await.expect("async response");
779 assert_eq!(response.msg, "hello");
780 }
781
782 async fn test_client_closed_socket<C: ConnectorHandler + 'static>(addr: &str, mut handler: C) {
783 use std::time::SystemTime;
784
785 sleep(Duration::from_millis(SLEEP_MS * 2)).await;
786 debug!("client: trying to connect");
787 let tcp_stream = TcpStream::connect(&addr).await.expect("connection fail");
788 let socket = handler.connect(tcp_stream).await;
789 debug!("client: connected to test server and waiting...");
790 sleep(Duration::from_millis(SLEEP_MS * 2)).await;
791 let multiplexer: std::sync::Arc<MultiplexerSocket> = MultiplexerSocket::shared(socket);
792
793 let multiplexor2 = multiplexer.clone();
794
795 let (slow, fast) = join(
796 async move {
797 debug!("trying to send slow");
798 let request = RequestMessage::new_request(EchoRequest::new("slow".to_owned()));
800 let response = multiplexer.send_and_receive(request).await;
801 assert!(response.is_err());
802
803 let err_kind = get_error_kind(response).expect("Get right Error Kind");
804 let expected = ErrorKind::UnexpectedEof;
805 assert_eq!(expected, err_kind);
806 debug!("client: socket was closed");
807
808 SystemTime::now()
809 },
810 async move {
811 sleep(Duration::from_millis(SLEEP_MS * 2)).await;
813 debug!("trying to send fast");
814 let request = RequestMessage::new_request(EchoRequest::new("fast".to_owned()));
815 let response = multiplexor2
816 .send_and_receive(request)
817 .await
818 .expect("send success");
819 debug!("received fast response");
820 assert_eq!(response.msg, "hello");
821 multiplexor2.terminate.notify(usize::MAX); SystemTime::now()
823 },
824 )
825 .await;
826 assert!(slow > fast);
827 }
828
829 async fn test_client_time_out<C: ConnectorHandler + 'static>(addr: &str, mut handler: C) {
830 sleep(Duration::from_millis(SLEEP_MS * 2)).await;
831 debug!("client: trying to connect");
832 let tcp_stream = TcpStream::connect(&addr).await.expect("connection fail");
833 let socket = handler.connect(tcp_stream).await;
834 debug!("client: connected to test server and waiting...");
835 sleep(Duration::from_millis(SLEEP_MS * 2)).await;
836 let multiplexer: std::sync::Arc<MultiplexerSocket> = MultiplexerSocket::shared(socket);
837
838 let expected: ErrorKind = ErrorKind::TimedOut;
839
840 debug!("trying to send slow");
841
842 let request = RequestMessage::new_request(EchoRequest::new("slow".to_owned()));
843 let response = multiplexer.send_and_receive(request).await;
844 assert!(response.is_err());
845
846 let err_kind = get_error_kind(response).expect("Get right Error Kind");
847
848 assert_eq!(expected, err_kind);
849 debug!("client: socket was timeout");
850 }
851
852 #[fluvio_future::test(ignore)]
853 async fn test_multiplexing() {
854 debug!("start testing");
855 let addr = "127.0.0.1:6000";
856
857 let _r = join(
858 test_client(addr, TcpStreamHandler {}),
859 test_server(addr, TcpStreamHandler {}, 4, 0),
860 )
861 .await;
862 }
863
864 #[fluvio_future::test(ignore)]
865 async fn test_multiplexing_close_socket() {
866 debug!("start test_multiplexing_close_socket");
867 let addr = "127.0.0.1:6000";
868
869 let _r = join(
870 test_client_closed_socket(addr, TcpStreamHandler {}),
871 test_server(addr, TcpStreamHandler {}, 2, 0),
872 )
873 .await;
874 }
875
876 #[fluvio_future::test(ignore)]
877 async fn test_multiplexing_time_out() {
878 debug!("start test_multiplexing_timeout");
879 let addr = "127.0.0.1:6000";
880
881 let _r = join(
882 test_client_time_out(addr, TcpStreamHandler {}),
883 test_server(addr, TcpStreamHandler {}, 1, 60), )
885 .await;
886 }
887 #[cfg(unix)]
888 mod tls_test {
889 use std::os::unix::io::AsRawFd;
890
891 use fluvio_future::{
892 native_tls::{
893 AcceptorBuilder, CertBuilder, ConnectorBuilder, DefaultClientTlsStream,
894 DefaultServerTlsStream, IdentityBuilder, PrivateKeyBuilder, TlsAcceptor,
895 TlsConnector, X509PemBuilder,
896 },
897 net::SplitConnection,
898 };
899
900 use super::*;
901
902 struct TlsAcceptorHandler(TlsAcceptor);
903
904 impl TlsAcceptorHandler {
905 fn new() -> Self {
906 let acceptor = AcceptorBuilder::identity(
907 IdentityBuilder::from_x509(
908 X509PemBuilder::from_path(X509_SERVER).expect("read"),
909 PrivateKeyBuilder::from_path(X509_SERVER_KEY).expect("file"),
910 )
911 .expect("identity"),
912 )
913 .expect("identity:")
914 .build()
915 .expect("acceptor");
916 Self(acceptor)
917 }
918 }
919
920 #[async_trait]
921 impl AcceptorHandler for TlsAcceptorHandler {
922 type Stream = DefaultServerTlsStream;
923
924 async fn accept(&mut self, stream: TcpStream) -> FluvioSocket {
925 let fd = stream.as_raw_fd();
926 let handshake = self.0.accept(stream);
927 let tls_stream = handshake.await.expect("hand shake failed");
928 let (write, read) = tls_stream.split_connection();
929 FluvioSocket::from_stream(write, read, fd)
930 }
931 }
932
933 struct TlsConnectorHandler(TlsConnector);
934
935 impl TlsConnectorHandler {
936 fn new() -> Self {
937 let connector = ConnectorBuilder::identity(
938 IdentityBuilder::from_x509(
939 X509PemBuilder::from_path(X509_CLIENT).expect("read"),
940 PrivateKeyBuilder::from_path(X509_CLIENT_KEY).expect("read"),
941 )
942 .expect("509"),
943 )
944 .expect("connector")
945 .danger_accept_invalid_hostnames()
946 .no_cert_verification()
947 .build();
948 Self(connector)
949 }
950 }
951
952 #[async_trait]
953 impl ConnectorHandler for TlsConnectorHandler {
954 type Stream = DefaultClientTlsStream;
955
956 async fn connect(&mut self, stream: TcpStream) -> FluvioSocket {
957 let fd = stream.as_raw_fd();
958 let (write, read) = self
959 .0
960 .connect("localhost", stream)
961 .await
962 .expect("hand shakefailed")
963 .split_connection();
964
965 FluvioSocket::from_stream(write, read, fd)
966 }
967 }
968
969 #[fluvio_future::test(ignore)]
970 async fn test_multiplexing_native_tls() {
971 debug!("start testing");
972 let addr = "127.0.0.1:6001";
973
974 let _r = join(
975 test_client(addr, TlsConnectorHandler::new()),
976 test_server(addr, TlsAcceptorHandler::new(), 4, 0),
977 )
978 .await;
979 }
980 }
981}