fluvio_socket/
multiplexing.rs

1use core::task::{Context, Poll};
2use std::collections::HashMap;
3use std::io::Cursor;
4use std::io::Error as IoError;
5use std::io::ErrorKind;
6use std::marker::PhantomData;
7use std::pin::Pin;
8use std::sync::Arc;
9use std::sync::atomic::AtomicBool;
10use std::sync::atomic::Ordering::{SeqCst, Relaxed};
11use std::sync::atomic::AtomicI32;
12use std::time::Duration;
13use std::fmt;
14use std::future::Future;
15
16use async_channel::bounded;
17use async_channel::Receiver;
18use async_channel::Sender;
19use async_lock::Mutex;
20use bytes::Bytes;
21use event_listener::Event;
22use futures_util::ready;
23use futures_util::stream::{Stream, StreamExt};
24use pin_project::pin_project;
25use pin_project::pinned_drop;
26use tokio::select;
27use tracing::{info, warn};
28use tracing::{debug, error, trace, instrument};
29
30use fluvio_future::net::ConnectionFd;
31use fluvio_future::timer::sleep;
32use fluvio_protocol::api::Request;
33use fluvio_protocol::api::RequestHeader;
34use fluvio_protocol::api::RequestMessage;
35use fluvio_protocol::Decoder;
36
37use crate::SocketError;
38use crate::ExclusiveFlvSink;
39use crate::FluvioSocket;
40use crate::FluvioStream;
41
42pub type SharedMultiplexerSocket = Arc<MultiplexerSocket>;
43
44#[derive(Clone)]
45struct SharedMsg(Arc<Mutex<Option<Bytes>>>, Arc<Event>);
46
47/// Handle different way to multiplex
48enum SharedSender {
49    /// Serial socket
50    Serial(SharedMsg),
51    /// Batch Socket
52    Queue(Sender<Option<Bytes>>),
53}
54
55type Senders = Arc<Mutex<HashMap<i32, SharedSender>>>;
56
57/// Socket that can multiplex connections
58pub struct MultiplexerSocket {
59    correlation_id_counter: AtomicI32,
60    senders: Senders,
61    sink: ExclusiveFlvSink,
62    stale: Arc<AtomicBool>,
63    terminate: Arc<Event>,
64}
65
66impl fmt::Debug for MultiplexerSocket {
67    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> std::fmt::Result {
68        write!(f, "MultiplexerSocket {}", self.sink.id())
69    }
70}
71
72impl Drop for MultiplexerSocket {
73    fn drop(&mut self) {
74        // notify dispatcher
75        self.terminate.notify(usize::MAX);
76    }
77}
78
79impl MultiplexerSocket {
80    pub fn shared(socket: FluvioSocket) -> Arc<Self> {
81        Arc::new(Self::new(socket))
82    }
83
84    /// create new multiplexer socket, this always starts with correlation id of 1
85    /// correlation id of 0 means shared
86    #[allow(clippy::clone_on_copy)]
87    pub fn new(socket: FluvioSocket) -> Self {
88        let id = socket.id().clone();
89        debug!(socket = %id, "spawning dispatcher");
90
91        let (sink, stream) = socket.split();
92        let stale = Arc::new(AtomicBool::new(false));
93
94        let multiplexer = Self {
95            correlation_id_counter: AtomicI32::new(1),
96            senders: Arc::new(Mutex::new(HashMap::new())),
97            sink: ExclusiveFlvSink::new(sink),
98            terminate: Arc::new(Event::new()),
99            stale: stale.clone(),
100        };
101
102        MultiPlexingResponseDispatcher::run(
103            id,
104            stream,
105            multiplexer.senders.clone(),
106            multiplexer.terminate.clone(),
107            stale,
108        );
109
110        multiplexer
111    }
112
113    pub fn set_stale(&self) {
114        self.stale.store(true, SeqCst);
115    }
116
117    pub fn is_stale(&self) -> bool {
118        self.stale.load(SeqCst)
119    }
120
121    /// get next available correlation to use
122    fn next_correlation_id(&self) -> i32 {
123        self.correlation_id_counter.fetch_add(1, Relaxed)
124    }
125
126    /// create socket to perform request and response
127    #[instrument(skip(req_msg))]
128    pub async fn send_and_receive<R>(
129        &self,
130        mut req_msg: RequestMessage<R>,
131    ) -> Result<R::Response, SocketError>
132    where
133        R: Request,
134    {
135        use once_cell::sync::Lazy;
136
137        static MAX_WAIT_TIME: Lazy<u64> = Lazy::new(|| {
138            use std::env;
139
140            let var_value = env::var("FLV_SOCKET_WAIT").unwrap_or_default();
141            let wait_time: u64 = var_value.parse().unwrap_or(60);
142            wait_time
143        });
144
145        let correlation_id = self.next_correlation_id();
146        let bytes_lock = SharedMsg(Arc::new(Mutex::new(None)), Arc::new(Event::new()));
147
148        req_msg.header.set_correlation_id(correlation_id);
149
150        trace!(correlation_id, "senders trying lock");
151        let mut senders = self.senders.lock().await;
152        senders.insert(correlation_id, SharedSender::Serial(bytes_lock.clone()));
153        drop(senders);
154
155        let SharedMsg(msg, msg_event) = bytes_lock;
156        // make sure we set up listener, otherwise dispatcher may notify before
157        let listener = msg_event.listen();
158
159        debug!(api = R::API_KEY, correlation_id, "sending request");
160        self.sink.send_request(&req_msg).await?;
161        trace!(correlation_id, "waiting");
162
163        select! {
164
165            _ = sleep(Duration::from_secs(*MAX_WAIT_TIME)) => {
166
167                trace!("serial socket for: {}  timeout happen, id: {}", R::API_KEY, correlation_id);
168                // clean channel
169                let mut senders = self.senders.lock().await;
170                senders.remove(&correlation_id);
171                drop(senders);
172                self.set_stale();
173
174
175                Err(IoError::new(
176                    ErrorKind::TimedOut,
177                    format!("Timed out: {} secs waiting for response. API_KEY={}, CorrelationId={}", *MAX_WAIT_TIME,R::API_KEY, correlation_id),
178                ).into())
179            },
180
181            _ = listener => {
182
183                // clean channel
184                trace!(correlation_id,"msg event");
185                let mut senders = self.senders.lock().await;
186                senders.remove(&correlation_id);
187                drop(senders);
188
189                match msg.try_lock() {
190                    Some(guard) => {
191
192                        if let Some(response_bytes) =  &*guard {
193
194                            debug!(correlation_id, len = response_bytes.len(),"receive serial message");
195                            let response = R::Response::decode_from(
196                                &mut Cursor::new(&response_bytes),
197                                req_msg.header.api_version(),
198                            )?;
199                            trace!("receive serial socket id: {}, response: {:#?}", correlation_id, response);
200                            Ok(response)
201                        } else {
202                            debug!("serial socket: {}, id: {}, value is empty, something bad happened",R::API_KEY,correlation_id);
203                            Err(IoError::new(
204                                ErrorKind::UnexpectedEof,
205                                "connection is closed".to_string(),
206                            ).into())
207                        }
208
209                    },
210                    None => Err(IoError::new(
211                        ErrorKind::BrokenPipe,
212                        format!("locked failed: {correlation_id}, serial socket is in bad state")
213                    ).into())
214                }
215            },
216        }
217    }
218    /// send request and get response asynchronously
219    #[instrument(skip(req_msg))]
220    pub async fn send_async<R>(
221        &self,
222        req_msg: RequestMessage<R>,
223    ) -> Result<AsyncResponse<R>, SocketError>
224    where
225        R: Request,
226    {
227        self.create_stream(req_msg, 1).await
228    }
229
230    /// create stream response
231    #[instrument(skip(self,req_msg), fields(api = R::API_KEY))]
232    pub async fn create_stream<R>(
233        &self,
234        mut req_msg: RequestMessage<R>,
235        queue_len: usize,
236    ) -> Result<AsyncResponse<R>, SocketError>
237    where
238        R: Request,
239    {
240        let correlation_id = self.next_correlation_id();
241
242        req_msg.header.set_correlation_id(correlation_id);
243
244        trace!(correlation_id,request = ?req_msg, "new correlation id");
245
246        // set up new channel
247        let (sender, receiver) = bounded(queue_len);
248        let mut senders = self.senders.lock().await;
249
250        // remove any closed channel, this is not optimal but should do trick for now
251
252        senders.retain(|_, shared_sender| match shared_sender {
253            SharedSender::Serial(_) => true,
254            SharedSender::Queue(sender) => !sender.is_closed(),
255        });
256
257        senders.insert(correlation_id, SharedSender::Queue(sender));
258        drop(senders);
259
260        trace!(correlation_id, "created new channel");
261
262        self.sink.send_request(&req_msg).await?;
263
264        trace!(correlation_id, "request send");
265
266        // it is possible that msg have received by dispatcher before channel is inserted into senders
267        // but it is easier to clean up
268
269        Ok(AsyncResponse {
270            receiver: Box::pin(receiver),
271            header: req_msg.header,
272            correlation_id,
273            data: PhantomData,
274        })
275    }
276}
277
278/// Implement async socket where response are send back async manner
279/// they are queued using channel
280#[pin_project(PinnedDrop)]
281pub struct AsyncResponse<R> {
282    #[pin]
283    receiver: Pin<Box<Receiver<Option<Bytes>>>>,
284    header: RequestHeader,
285    correlation_id: i32,
286    data: PhantomData<R>,
287}
288
289#[pinned_drop]
290impl<R> PinnedDrop for AsyncResponse<R> {
291    fn drop(self: Pin<&mut Self>) {
292        self.receiver.close();
293        debug!("multiplexer stream: {} closed", self.correlation_id);
294    }
295}
296
297impl<R: Request> Stream for AsyncResponse<R> {
298    type Item = Result<R::Response, SocketError>;
299
300    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
301        let this = self.project();
302        match ready!(this.receiver.poll_next(cx)) {
303            Some(Some(bytes)) => {
304                use bytes::Buf;
305                let response_len = bytes.len();
306                debug!(
307                    response_len,
308                    remaining = bytes.remaining(),
309                    version = this.header.api_version(),
310                    "response len>>>"
311                );
312
313                let mut cursor = Cursor::new(bytes);
314                let response = R::Response::decode_from(&mut cursor, this.header.api_version());
315                match response {
316                    Ok(value) => {
317                        trace!("Received response bytes: {},  {:#?}", response_len, &value,);
318                        Poll::Ready(Some(Ok(value)))
319                    }
320                    Err(e) => Poll::Ready(Some(Err(e.into()))),
321                }
322            }
323            Some(None) => Poll::Ready(Some(Err(SocketError::SocketClosed))),
324            None => Poll::Ready(None),
325        }
326    }
327}
328
329impl<R: Request> Future for AsyncResponse<R> {
330    type Output = Result<R::Response, SocketError>;
331
332    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
333        let this = self.project();
334        match ready!(this.receiver.poll_next(cx)) {
335            Some(Some(bytes)) => {
336                use bytes::Buf;
337                let response_len = bytes.len();
338                debug!(
339                    response_len,
340                    remaining = bytes.remaining(),
341                    version = this.header.api_version(),
342                    "response len>>>"
343                );
344
345                let mut cursor = Cursor::new(bytes);
346                let response = R::Response::decode_from(&mut cursor, this.header.api_version());
347                match response {
348                    Ok(value) => {
349                        trace!("Received response bytes: {},  {:#?}", response_len, &value,);
350                        Poll::Ready(Ok(value))
351                    }
352                    Err(e) => Poll::Ready(Err(e.into())),
353                }
354            }
355            Some(None) => Poll::Ready(Err(SocketError::SocketClosed)),
356            None => Poll::Ready(Err(SocketError::SocketClosed)),
357        }
358    }
359}
360
361/// This decodes fluvio protocol based streams and multiplex into different slots
362struct MultiPlexingResponseDispatcher {
363    id: ConnectionFd,
364    senders: Senders,
365    terminate: Arc<Event>,
366    stale: Arc<AtomicBool>,
367}
368
369impl fmt::Debug for MultiPlexingResponseDispatcher {
370    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
371        write!(f, "MultiplexDisp({})", self.id)
372    }
373}
374
375impl MultiPlexingResponseDispatcher {
376    pub fn run(
377        id: ConnectionFd,
378        stream: FluvioStream,
379        senders: Senders,
380        terminate: Arc<Event>,
381        stale: Arc<AtomicBool>,
382    ) {
383        use fluvio_future::task::spawn;
384
385        let dispatcher = Self {
386            id,
387            senders,
388            terminate,
389            stale,
390        };
391
392        spawn(dispatcher.dispatcher_loop(stream));
393    }
394
395    #[instrument(skip(stream))]
396    async fn dispatcher_loop(mut self, mut stream: FluvioStream) {
397        let frame_stream = stream.get_mut_tcp_stream();
398
399        loop {
400            trace!("waiting");
401
402            select! {
403                frame = frame_stream.next() => {
404                    match frame {
405                        Some(Ok(mut msg)) => {
406                            let mut correlation_id: i32 = 0;
407                            match correlation_id.decode(&mut msg, 0) {
408                                Ok(_) => {
409                                    use bytes::Buf;
410                                    debug!(correlation_id,len = msg.len(), remaining = msg.remaining(), "received frame");
411
412                                    if let Err(err) = self.send(correlation_id, msg.freeze()).await {
413                                        error!("error sending to socket, {}", err)
414                                    }
415                                }
416                                Err(err) => error!("error decoding response, {}", err),
417                            }
418                        },
419                        Some(Err(err)) => {
420                            warn!("problem getting frame from stream: {err}. terminating");
421                            self.close().await;
422                            break;
423                        },
424                        None => {
425                            info!("inner stream has terminated ");
426                            self.close().await;
427                            break;
428                        }
429                    }
430                },
431
432                _ = self.terminate.listen() => {
433                    // terminate all channels
434
435                    let guard = self.senders.lock().await;
436                    for sender in guard.values() {
437                        match sender {
438                            SharedSender::Serial(msg) => msg.close().await,
439                            SharedSender::Queue(stream_sender) => {
440                                stream_sender.close();
441                            }
442                        }
443                    }
444
445                    info!("multiplexer terminated");
446                    break;
447
448                }
449            }
450        }
451    }
452
453    /// send message to correct receiver
454    #[instrument(skip(self, msg),fields( msg = msg.len()))]
455    async fn send(&mut self, correlation_id: i32, msg: Bytes) -> Result<(), SocketError> {
456        let mut senders = self.senders.lock().await;
457        if let Some(sender) = senders.get_mut(&correlation_id) {
458            match sender {
459                SharedSender::Serial(serial_sender) => {
460                    trace!("found serial");
461                    // this should always succeed since nobody should lock
462                    match serial_sender.0.try_lock() {
463                        Some(mut guard) => {
464                            *guard = Some(msg);
465                            drop(guard); // unlock
466                            serial_sender.1.notify(1);
467                            trace!("found serial");
468                            Ok(())
469                        }
470                        None => Err(IoError::new(
471                            ErrorKind::BrokenPipe,
472                            format!(
473                                "failed locking, abandoning sending to socket: {correlation_id}"
474                            ),
475                        )
476                        .into()),
477                    }
478                }
479                SharedSender::Queue(queue_sender) => {
480                    trace!("found stream");
481                    // sender was dropped before response arrives
482                    if queue_sender.is_closed() {
483                        debug!(correlation_id, "attempt to send data to closed socket");
484                        Ok(())
485                    } else {
486                        queue_sender.send(Some(msg)).await.map_err(|err| {
487                            IoError::new(
488                                ErrorKind::BrokenPipe,
489                                format!(
490                                    "problem sending to queue socket: {correlation_id}, err: {err}"
491                                ),
492                            )
493                            .into()
494                        })
495                    }
496                }
497            }
498        } else {
499            // sender was dropped and unregistered before response arrives
500            debug!(
501                correlation_id,
502                "no socket receiver found, abandoning sending",
503            );
504            Ok(())
505        }
506    }
507
508    async fn close(&self) {
509        self.stale.store(true, SeqCst);
510
511        let guard = self.senders.lock().await;
512        for sender in guard.values() {
513            match sender {
514                SharedSender::Serial(msg) => msg.close().await,
515                SharedSender::Queue(stream_sender) => {
516                    let _ = stream_sender.send(None).await;
517                }
518            }
519        }
520
521        info!("multiplexer closed")
522    }
523}
524
525impl SharedMsg {
526    async fn close(&self) {
527        let mut guard = self.0.lock().await;
528        *guard = None;
529        drop(guard);
530        self.1.notify(1);
531    }
532}
533
534#[cfg(test)]
535mod tests {
536
537    use std::time::Duration;
538    use std::io::ErrorKind;
539
540    use async_trait::async_trait;
541    use futures_util::future::{join, join3};
542    use futures_util::io::{AsyncRead, AsyncWrite};
543    use futures_util::StreamExt;
544    use tracing::debug;
545
546    use fluvio_future::net::TcpListener;
547    use fluvio_future::net::TcpStream;
548    use fluvio_future::task::spawn;
549    use fluvio_future::timer::sleep;
550    use fluvio_protocol::api::RequestMessage;
551
552    use super::MultiplexerSocket;
553    use super::SocketError;
554    use crate::test_request::*;
555    use crate::ExclusiveFlvSink;
556    use crate::FluvioSocket;
557
558    #[allow(unused)]
559    const CA_PATH: &str = "certs/certs/ca.crt";
560    #[allow(unused)]
561    const X509_SERVER: &str = "certs/certs/server.crt";
562    #[allow(unused)]
563    const X509_SERVER_KEY: &str = "certs/certs/server.key";
564    #[allow(unused)]
565    const X509_CLIENT: &str = "certs/certs/client.crt";
566    #[allow(unused)]
567    const X509_CLIENT_KEY: &str = "certs/certs/client.key";
568
569    #[allow(unused)]
570    const SLEEP_MS: u64 = 10;
571
572    #[async_trait]
573    trait AcceptorHandler {
574        type Stream: AsyncRead + AsyncWrite + Unpin + Send;
575        async fn accept(&mut self, stream: TcpStream) -> FluvioSocket;
576    }
577
578    #[derive(Clone)]
579    struct TcpStreamHandler {}
580
581    #[async_trait]
582    impl AcceptorHandler for TcpStreamHandler {
583        type Stream = TcpStream;
584
585        async fn accept(&mut self, stream: TcpStream) -> FluvioSocket {
586            stream.into()
587        }
588    }
589
590    fn get_error_kind<T: std::fmt::Debug>(
591        result: Result<T, SocketError>,
592    ) -> Option<std::io::ErrorKind> {
593        match result {
594            Err(SocketError::Io { source, .. }) => Some(source.kind()),
595            _ => None,
596        }
597    }
598
599    async fn test_server<A: AcceptorHandler + 'static>(
600        addr: &str,
601        mut handler: A,
602        nb_iter: usize,
603        timeout: u64,
604    ) {
605        let listener = TcpListener::bind(addr).await.expect("binding");
606        debug!("server is running");
607        let mut incoming = listener.incoming();
608        let incoming_stream = incoming.next().await;
609        debug!("server: got connection");
610        let incoming_stream = incoming_stream.expect("next").expect("unwrap again");
611        let socket: FluvioSocket = handler.accept(incoming_stream).await;
612
613        let (sink, mut stream) = socket.split();
614
615        let shared_sink = ExclusiveFlvSink::new(sink);
616
617        let mut api_stream = stream.api_stream::<TestApiRequest, TestKafkaApiEnum>();
618
619        for i in 0..nb_iter {
620            debug!("server: waiting for next msg: {}", i);
621            let msg = api_stream.next().await.expect("msg").expect("unwrap");
622            debug!("server: msg received: {:#?}", msg);
623
624            match msg {
625                TestApiRequest::EchoRequest(echo_request) => {
626                    let mut reply_sink = shared_sink.clone();
627                    // depends on different request we delay
628                    if echo_request.request().msg == "slow" {
629                        debug!("server: received slow msg");
630                        spawn(async move {
631                            sleep(Duration::from_millis(SLEEP_MS * 50)).await;
632                            sleep(Duration::from_secs(timeout)).await; //simulate more waiting time from server while receiving slow msg
633                            let resp =
634                                echo_request.new_response(EchoResponse::new("slow".to_owned()));
635                            debug!("server send slow response");
636                            reply_sink
637                                .send_response(&resp, 0)
638                                .await
639                                .expect("send succeed");
640                        });
641                    } else {
642                        debug!("server: received fast msg");
643                        spawn(async move {
644                            let resp =
645                                echo_request.new_response(EchoResponse::new("hello".to_owned()));
646                            debug!("server: send fast response");
647                            reply_sink
648                                .send_response(&resp, 0)
649                                .await
650                                .expect("send succeed");
651                        });
652                    }
653                }
654                TestApiRequest::AsyncStatusRequest(status_request) => {
655                    debug!("server: received async status msg");
656                    let mut reply_sink = shared_sink.clone();
657                    spawn(async move {
658                        sleep(Duration::from_millis(SLEEP_MS * 3)).await;
659                        let resp = status_request.new_response(AsyncStatusResponse {
660                            status: status_request.request.count * 2,
661                        });
662                        reply_sink
663                            .send_response(&resp, 0)
664                            .await
665                            .expect("send succeed");
666                        debug!("server: send back status first");
667                        sleep(Duration::from_millis(SLEEP_MS * 10)).await;
668                        let resp = status_request.new_response(AsyncStatusResponse {
669                            status: status_request.request.count * 4,
670                        });
671                        reply_sink
672                            .send_response(&resp, 0)
673                            .await
674                            .expect("send succeed");
675                        debug!("server: send back status second");
676                    });
677                }
678                _ => panic!("no echo request"),
679            }
680        }
681
682        debug!("server: finish sending out"); // finish ok
683    }
684
685    #[async_trait]
686    trait ConnectorHandler {
687        type Stream: AsyncRead + AsyncWrite + Unpin + Send + Sync;
688        async fn connect(&mut self, stream: TcpStream) -> FluvioSocket;
689    }
690
691    #[async_trait]
692    impl ConnectorHandler for TcpStreamHandler {
693        type Stream = TcpStream;
694
695        async fn connect(&mut self, stream: TcpStream) -> FluvioSocket {
696            stream.into()
697        }
698    }
699
700    async fn test_client<C: ConnectorHandler + 'static>(addr: &str, mut handler: C) {
701        use std::time::SystemTime;
702
703        sleep(Duration::from_millis(SLEEP_MS * 2)).await;
704        debug!("client: trying to connect");
705        let tcp_stream = TcpStream::connect(&addr).await.expect("connection fail");
706        let socket = handler.connect(tcp_stream).await;
707        debug!("client: connected to test server and waiting...");
708        sleep(Duration::from_millis(SLEEP_MS * 2)).await;
709        let multiplexer = MultiplexerSocket::shared(socket);
710
711        // create async status
712        let async_status_request = RequestMessage::new_request(AsyncStatusRequest { count: 2 });
713        let mut status_response = multiplexer
714            .create_stream(async_status_request, 10)
715            .await
716            .expect("response");
717
718        let multiplexor2 = multiplexer.clone();
719        let multiplexor3 = multiplexer.clone();
720
721        let (slow, fast, _) = join3(
722            async move {
723                debug!("trying to send slow");
724                // this message was send first but since there is delay of 500ms, it will return slower than fast
725                let request = RequestMessage::new_request(EchoRequest::new("slow".to_owned()));
726                let response = multiplexer
727                    .send_and_receive(request)
728                    .await
729                    .expect("send success");
730                debug!("received slow response");
731                assert_eq!(response.msg, "slow");
732                SystemTime::now()
733            },
734            async move {
735                // this message will be send later than slow but since there is no delay, it should get earlier than first
736                sleep(Duration::from_millis(SLEEP_MS * 2)).await;
737                debug!("trying to send fast");
738                let request = RequestMessage::new_request(EchoRequest::new("fast".to_owned()));
739                let response = multiplexor2
740                    .send_and_receive(request)
741                    .await
742                    .expect("send success");
743                debug!("received fast response");
744                assert_eq!(response.msg, "hello");
745                SystemTime::now()
746            },
747            async move {
748                sleep(Duration::from_millis(SLEEP_MS * 10)).await;
749                let response = status_response
750                    .next()
751                    .await
752                    .expect("stream yields value")
753                    .expect("async response");
754                debug!("received async response");
755                assert_eq!(response.status, 4); // multiply by 2
756                let response = status_response
757                    .next()
758                    .await
759                    .expect("stream yields value")
760                    .expect("async response");
761                debug!("received async response");
762                assert_eq!(response.status, 8);
763                SystemTime::now()
764            },
765        )
766        .await;
767
768        assert!(slow > fast);
769
770        // create async echo response
771        let echo_request = RequestMessage::new_request(EchoRequest {
772            msg: "fast".to_string(),
773        });
774        let echo_async_response = multiplexor3
775            .send_async(echo_request)
776            .await
777            .expect("async response future");
778        let response = echo_async_response.await.expect("async response");
779        assert_eq!(response.msg, "hello");
780    }
781
782    async fn test_client_closed_socket<C: ConnectorHandler + 'static>(addr: &str, mut handler: C) {
783        use std::time::SystemTime;
784
785        sleep(Duration::from_millis(SLEEP_MS * 2)).await;
786        debug!("client: trying to connect");
787        let tcp_stream = TcpStream::connect(&addr).await.expect("connection fail");
788        let socket = handler.connect(tcp_stream).await;
789        debug!("client: connected to test server and waiting...");
790        sleep(Duration::from_millis(SLEEP_MS * 2)).await;
791        let multiplexer: std::sync::Arc<MultiplexerSocket> = MultiplexerSocket::shared(socket);
792
793        let multiplexor2 = multiplexer.clone();
794
795        let (slow, fast) = join(
796            async move {
797                debug!("trying to send slow");
798                // this message was send first but since there is delay of 500ms, it will return slower than fast
799                let request = RequestMessage::new_request(EchoRequest::new("slow".to_owned()));
800                let response = multiplexer.send_and_receive(request).await;
801                assert!(response.is_err());
802
803                let err_kind = get_error_kind(response).expect("Get right Error Kind");
804                let expected = ErrorKind::UnexpectedEof;
805                assert_eq!(expected, err_kind);
806                debug!("client: socket was closed");
807
808                SystemTime::now()
809            },
810            async move {
811                // this message will be send later than slow but since there is no delay, it should get earlier than first
812                sleep(Duration::from_millis(SLEEP_MS * 2)).await;
813                debug!("trying to send fast");
814                let request = RequestMessage::new_request(EchoRequest::new("fast".to_owned()));
815                let response = multiplexor2
816                    .send_and_receive(request)
817                    .await
818                    .expect("send success");
819                debug!("received fast response");
820                assert_eq!(response.msg, "hello");
821                multiplexor2.terminate.notify(usize::MAX); //close multiplexor2
822                SystemTime::now()
823            },
824        )
825        .await;
826        assert!(slow > fast);
827    }
828
829    async fn test_client_time_out<C: ConnectorHandler + 'static>(addr: &str, mut handler: C) {
830        sleep(Duration::from_millis(SLEEP_MS * 2)).await;
831        debug!("client: trying to connect");
832        let tcp_stream = TcpStream::connect(&addr).await.expect("connection fail");
833        let socket = handler.connect(tcp_stream).await;
834        debug!("client: connected to test server and waiting...");
835        sleep(Duration::from_millis(SLEEP_MS * 2)).await;
836        let multiplexer: std::sync::Arc<MultiplexerSocket> = MultiplexerSocket::shared(socket);
837
838        let expected: ErrorKind = ErrorKind::TimedOut;
839
840        debug!("trying to send slow");
841
842        let request = RequestMessage::new_request(EchoRequest::new("slow".to_owned()));
843        let response = multiplexer.send_and_receive(request).await;
844        assert!(response.is_err());
845
846        let err_kind = get_error_kind(response).expect("Get right Error Kind");
847
848        assert_eq!(expected, err_kind);
849        debug!("client: socket was timeout");
850    }
851
852    #[fluvio_future::test(ignore)]
853    async fn test_multiplexing() {
854        debug!("start testing");
855        let addr = "127.0.0.1:6000";
856
857        let _r = join(
858            test_client(addr, TcpStreamHandler {}),
859            test_server(addr, TcpStreamHandler {}, 4, 0),
860        )
861        .await;
862    }
863
864    #[fluvio_future::test(ignore)]
865    async fn test_multiplexing_close_socket() {
866        debug!("start test_multiplexing_close_socket");
867        let addr = "127.0.0.1:6000";
868
869        let _r = join(
870            test_client_closed_socket(addr, TcpStreamHandler {}),
871            test_server(addr, TcpStreamHandler {}, 2, 0),
872        )
873        .await;
874    }
875
876    #[fluvio_future::test(ignore)]
877    async fn test_multiplexing_time_out() {
878        debug!("start test_multiplexing_timeout");
879        let addr = "127.0.0.1:6000";
880
881        let _r = join(
882            test_client_time_out(addr, TcpStreamHandler {}),
883            test_server(addr, TcpStreamHandler {}, 1, 60), //MAX_WAIT_TIME is 60 second
884        )
885        .await;
886    }
887    #[cfg(unix)]
888    mod tls_test {
889        use std::os::unix::io::AsRawFd;
890
891        use fluvio_future::{
892            native_tls::{
893                AcceptorBuilder, CertBuilder, ConnectorBuilder, DefaultClientTlsStream,
894                DefaultServerTlsStream, IdentityBuilder, PrivateKeyBuilder, TlsAcceptor,
895                TlsConnector, X509PemBuilder,
896            },
897            net::SplitConnection,
898        };
899
900        use super::*;
901
902        struct TlsAcceptorHandler(TlsAcceptor);
903
904        impl TlsAcceptorHandler {
905            fn new() -> Self {
906                let acceptor = AcceptorBuilder::identity(
907                    IdentityBuilder::from_x509(
908                        X509PemBuilder::from_path(X509_SERVER).expect("read"),
909                        PrivateKeyBuilder::from_path(X509_SERVER_KEY).expect("file"),
910                    )
911                    .expect("identity"),
912                )
913                .expect("identity:")
914                .build()
915                .expect("acceptor");
916                Self(acceptor)
917            }
918        }
919
920        #[async_trait]
921        impl AcceptorHandler for TlsAcceptorHandler {
922            type Stream = DefaultServerTlsStream;
923
924            async fn accept(&mut self, stream: TcpStream) -> FluvioSocket {
925                let fd = stream.as_raw_fd();
926                let handshake = self.0.accept(stream);
927                let tls_stream = handshake.await.expect("hand shake failed");
928                let (write, read) = tls_stream.split_connection();
929                FluvioSocket::from_stream(write, read, fd)
930            }
931        }
932
933        struct TlsConnectorHandler(TlsConnector);
934
935        impl TlsConnectorHandler {
936            fn new() -> Self {
937                let connector = ConnectorBuilder::identity(
938                    IdentityBuilder::from_x509(
939                        X509PemBuilder::from_path(X509_CLIENT).expect("read"),
940                        PrivateKeyBuilder::from_path(X509_CLIENT_KEY).expect("read"),
941                    )
942                    .expect("509"),
943                )
944                .expect("connector")
945                .danger_accept_invalid_hostnames()
946                .no_cert_verification()
947                .build();
948                Self(connector)
949            }
950        }
951
952        #[async_trait]
953        impl ConnectorHandler for TlsConnectorHandler {
954            type Stream = DefaultClientTlsStream;
955
956            async fn connect(&mut self, stream: TcpStream) -> FluvioSocket {
957                let fd = stream.as_raw_fd();
958                let (write, read) = self
959                    .0
960                    .connect("localhost", stream)
961                    .await
962                    .expect("hand shakefailed")
963                    .split_connection();
964
965                FluvioSocket::from_stream(write, read, fd)
966            }
967        }
968
969        #[fluvio_future::test(ignore)]
970        async fn test_multiplexing_native_tls() {
971            debug!("start testing");
972            let addr = "127.0.0.1:6001";
973
974            let _r = join(
975                test_client(addr, TlsConnectorHandler::new()),
976                test_server(addr, TlsAcceptorHandler::new(), 4, 0),
977            )
978            .await;
979        }
980    }
981}