maxwell_utils/connection/
future_style_connection.rs

1use std::{
2  cell::{Cell, RefCell},
3  fmt, format,
4  future::Future,
5  pin::Pin,
6  rc::Rc,
7  sync::atomic::{AtomicU32, Ordering},
8  task::{Context as TaskContext, Poll, Waker},
9  time::Duration,
10};
11
12use actix::{prelude::*, Message as ActixMessage};
13use ahash::{AHashMap as HashMap, AHashSet as HashSet};
14use anyhow::Error as AnyError;
15use fastwebsockets::{
16  handshake, CloseCode, FragmentCollectorRead, Frame, OpCode, Payload, WebSocketError,
17  WebSocketWrite,
18};
19use futures_intrusive::sync::LocalManualResetEvent;
20use hyper::{
21  header::{CONNECTION, UPGRADE},
22  upgrade::Upgraded,
23  Body, Request as HyperRequest,
24};
25use maxwell_protocol::{self, HandleError, ProtocolMsg, *};
26use tokio::{
27  io::{split as tokio_split, ReadHalf, WriteHalf},
28  net::TcpStream,
29  task::spawn as tokio_spawn,
30  time::{sleep, timeout},
31};
32
33use super::*;
34use crate::arbiter_pool::ArbiterPool;
35
36static ID_SEED: AtomicU32 = AtomicU32::new(0);
37
38struct Attachment {
39  response: Option<ProtocolMsg>,
40  waker: Option<Waker>,
41}
42
43struct Completer {
44  msg_ref: u32,
45  connection_inner: Rc<FutureStyleConnectionInner>,
46}
47
48impl Completer {
49  fn new(msg_ref: u32, connection_inner: Rc<FutureStyleConnectionInner>) -> Self {
50    connection_inner
51      .attachments
52      .borrow_mut()
53      .insert(msg_ref, Attachment { response: None, waker: None });
54    Completer { msg_ref, connection_inner }
55  }
56}
57
58impl Drop for Completer {
59  fn drop(&mut self) {
60    self.connection_inner.attachments.borrow_mut().remove(&self.msg_ref);
61  }
62}
63
64impl Future for Completer {
65  type Output = ProtocolMsg;
66
67  fn poll(self: Pin<&mut Self>, ctx: &mut TaskContext<'_>) -> Poll<ProtocolMsg> {
68    let mut attachments = self.connection_inner.attachments.borrow_mut();
69    let attachment = attachments.get_mut(&self.msg_ref).unwrap();
70    if let Some(msg) = attachment.response.take() {
71      Poll::Ready(msg)
72    } else {
73      match attachment.waker.as_ref() {
74        None => {
75          attachment.waker = Some(ctx.waker().clone());
76        }
77        Some(waker) => {
78          if !waker.will_wake(ctx.waker()) {
79            attachment.waker = Some(ctx.waker().clone());
80          }
81        }
82      }
83      Poll::Pending
84    }
85  }
86}
87
88// Tie hyper's executor to tokio runtime
89struct SpawnExecutor;
90impl<Fut> hyper::rt::Executor<Fut> for SpawnExecutor
91where
92  Fut: Future + Send + 'static,
93  Fut::Output: Send + 'static,
94{
95  fn execute(&self, fut: Fut) {
96    tokio_spawn(fut);
97  }
98}
99
100type Sink = WebSocketWrite<WriteHalf<Upgraded>>;
101type Stream = FragmentCollectorRead<ReadHalf<Upgraded>>;
102
103struct FutureStyleConnectionInner {
104  id: u32,
105  addr: RefCell<Option<Addr<FutureStyleConnection>>>,
106  endpoint_index: Cell<usize>,
107  endpoints: Vec<String>,
108  options: ConnectionOptions,
109  sink: RefCell<Option<Sink>>,
110  stream: RefCell<Option<Stream>>,
111  connected_event: LocalManualResetEvent,
112  disconnected_event: LocalManualResetEvent,
113  is_connected: Cell<bool>,
114  attachments: RefCell<HashMap<u32, Attachment>>,
115  msg_ref: Cell<u32>,
116  observable_event_handlers: RefCell<HashMap<u64, Box<dyn ObservableEventHandler>>>,
117  observable_event_actors: RefCell<HashSet<Recipient<ObservableEvent>>>,
118  is_stopping: Cell<bool>,
119}
120
121impl FutureStyleConnectionInner {
122  #[inline]
123  pub fn new(endpoint: String, options: ConnectionOptions) -> Self {
124    Self::with_alt_endpoints(vec![endpoint], options)
125  }
126
127  #[inline]
128  pub fn with_alt_endpoints(endpoints: Vec<String>, options: ConnectionOptions) -> Self {
129    FutureStyleConnectionInner {
130      id: ID_SEED.fetch_add(1, Ordering::Relaxed),
131      addr: RefCell::new(None),
132      endpoint_index: Cell::new(endpoints.len() - 1),
133      endpoints,
134      options,
135      sink: RefCell::new(None),
136      stream: RefCell::new(None),
137      connected_event: LocalManualResetEvent::new(false),
138      disconnected_event: LocalManualResetEvent::new(true),
139      is_connected: Cell::new(false),
140      attachments: RefCell::new(HashMap::new()),
141      msg_ref: Cell::new(1),
142      observable_event_handlers: RefCell::new(HashMap::default()),
143      observable_event_actors: RefCell::new(HashSet::new()),
144      is_stopping: Cell::new(false),
145    }
146  }
147
148  pub async fn connect_repeatedly(self: Rc<Self>) {
149    loop {
150      if self.is_stopping() {
151        break;
152      }
153
154      self.disconnected_event.wait().await;
155      self.close_sink().await.unwrap_or_else(|err| {
156        log::error!(
157          "Failed to close sink: actor: {}<{}>, err: {}",
158          &self.curr_endpoint(),
159          &self.id,
160          err
161        );
162      });
163      let endpoint = self.next_endpoint();
164      log::info!("Connecting: actor: {}<{}>", &endpoint, &self.id);
165      match self.connect(&endpoint).await {
166        Ok((sink, stream)) => {
167          log::info!("Connected: actor: {}<{}>", endpoint, &self.id);
168          self.set_socket_pair(Some(sink), Some(stream));
169          self.toggle_to_connected();
170        }
171        Err(err) => {
172          log::error!("Failed to connect: actor: {}<{}>, err: {}", endpoint, &self.id, err);
173          self.set_socket_pair(None, None);
174          self.toggle_to_disconnected();
175          sleep(Duration::from_millis(self.options.reconnect_delay as u64)).await;
176        }
177      }
178    }
179  }
180
181  #[inline]
182  pub async fn send(
183    self: Rc<Self>, mut msg: ProtocolMsg,
184  ) -> Result<ProtocolMsg, HandleError<ProtocolMsg>> {
185    let mut msg_ref = maxwell_protocol::get_ref(&msg);
186    if msg_ref == 0 {
187      msg_ref = self.next_msg_ref();
188      maxwell_protocol::set_ref(&mut msg, msg_ref);
189    } else {
190      self.try_set_msg_ref(msg_ref);
191    }
192
193    let completer = Completer::new(msg_ref, Rc::clone(&self));
194
195    if !self.is_connected() {
196      for i in 0..3 {
197        if let Err(_) =
198          timeout(Duration::from_millis(i * 500 + 500), self.connected_event.wait()).await
199        {
200          continue;
201        } else {
202          break;
203        }
204      }
205      if !self.is_connected() {
206        let desc = format!("Timeout to send msg: actor: {}<{}>", &self.curr_endpoint(), &self.id);
207        log::error!("{:?}", desc);
208        return Err(HandleError::Any { code: 1, desc, msg });
209      }
210    }
211
212    if let Err(err) = self
213      .sink
214      .borrow_mut()
215      .as_mut()
216      .unwrap()
217      .write_frame(Frame::binary(encode(&msg).as_ref().into()))
218      .await
219    {
220      let curr_endpoint = self.curr_endpoint();
221      let desc =
222        format!("Failed to send msg: actor: {}<{}>, err: {}", &curr_endpoint, &self.id, &err);
223      log::error!("{:?}", desc);
224      log::warn!(
225        "The connection maybe broken, try to reconnect: actor: {}<{}>",
226        &curr_endpoint,
227        &self.id
228      );
229      self.toggle_to_disconnected();
230      return Err(HandleError::Any { code: 1, desc, msg });
231    }
232
233    Ok(completer.await)
234  }
235
236  pub async fn receive_repeatedly(self: Rc<Self>) {
237    loop {
238      if self.is_stopping() {
239        break;
240      }
241
242      if !self.is_connected() {
243        self.connected_event.wait().await;
244      }
245
246      match self
247        .stream
248        .borrow_mut()
249        .as_mut()
250        .unwrap() // send_fn is empty because we do not create obligated writes here.
251        .read_frame(&mut move |_| async { Ok::<_, WebSocketError>(()) })
252        .await
253      {
254        Ok(frame) => match frame.opcode {
255          OpCode::Ping => {}
256          OpCode::Pong => {}
257          OpCode::Binary => {
258            let response = decode_bytes(&frame.payload).unwrap();
259            let msg_ref = maxwell_protocol::get_ref(&response);
260            let mut attachments = self.attachments.borrow_mut();
261            if let Some(attachment) = attachments.get_mut(&msg_ref) {
262              attachment.response = Some(response);
263              attachment.waker.as_ref().unwrap().wake_by_ref();
264            }
265          }
266          OpCode::Close => {
267            log::error!(
268              "Disconnected: actor: {}<{}>, reason: {}",
269              &self.curr_endpoint(),
270              &self.id,
271              Self::stringify(&frame.payload)
272            );
273            self.toggle_to_disconnected();
274          }
275          other => {
276            log::warn!("Received unknown msg: {:?}({:?})", &frame.payload, &other);
277          }
278        },
279        Err(err) => {
280          log::error!("Protocol error occured: err: {}", &err);
281          self.toggle_to_disconnected();
282        }
283      }
284    }
285  }
286
287  #[inline]
288  pub fn stop(&self) {
289    self.is_stopping.set(true);
290    self.notify_stopped_event();
291  }
292
293  #[inline]
294  pub fn observe_observable_event_with_handler<OEH: ObservableEventHandler>(
295    &self, msg: ObserveObservableEventWithHandlerMsg<OEH>,
296  ) {
297    if self.is_connected() {
298      msg.handler.handle(ObservableEvent::Connected(self.addr.borrow().as_ref().unwrap().clone()));
299    }
300    self.observable_event_handlers.borrow_mut().insert(msg.handler.id(), Box::new(msg.handler));
301  }
302
303  #[inline]
304  pub fn unobserve_observable_event_with_handler(
305    &self, msg: UnobserveObservableEventWithHandlerMsg,
306  ) {
307    self.observable_event_handlers.borrow_mut().remove(&msg.handler_id);
308  }
309
310  #[inline]
311  pub fn observe_observable_event_with_actor(&self, recip: Recipient<ObservableEvent>) {
312    if self.is_connected() {
313      recip.do_send(ObservableEvent::Connected(self.addr.borrow().as_ref().unwrap().clone()));
314    }
315    self.observable_event_actors.borrow_mut().insert(recip);
316  }
317
318  #[inline]
319  pub fn unobserve_observable_event_with_actor(&self, recip: Recipient<ObservableEvent>) {
320    self.observable_event_actors.borrow_mut().remove(&recip);
321  }
322
323  #[inline]
324  fn notify_connected_event(&self) {
325    self.notify_observable_event_for_handlers(ObservableEvent::Connected(
326      self.addr.borrow().as_ref().unwrap().clone(),
327    ));
328    self.notify_observable_event_for_actors(ObservableEvent::Connected(
329      self.addr.borrow().as_ref().unwrap().clone(),
330    ))
331  }
332
333  #[inline]
334  fn notify_disconnected_event(&self) {
335    self.notify_observable_event_for_handlers(ObservableEvent::Disconnected(
336      self.addr.borrow().as_ref().unwrap().clone(),
337    ));
338    self.notify_observable_event_for_actors(ObservableEvent::Disconnected(
339      self.addr.borrow().as_ref().unwrap().clone(),
340    ));
341  }
342
343  #[inline]
344  fn notify_stopped_event(&self) {
345    self.notify_observable_event_for_handlers(ObservableEvent::Stopped(
346      self.addr.borrow().as_ref().unwrap().clone(),
347    ));
348    self.notify_observable_event_for_actors(ObservableEvent::Stopped(
349      self.addr.borrow().as_ref().unwrap().clone(),
350    ));
351  }
352
353  #[inline]
354  fn notify_observable_event_for_handlers(&self, event: ObservableEvent) {
355    let mut unavailables = Vec::new();
356    let binding = self.observable_event_handlers.borrow();
357    for (id, handler) in binding.iter() {
358      if handler.is_available() {
359        handler.handle(event.clone());
360      } else {
361        unavailables.push(id);
362      }
363    }
364    for handler in &unavailables {
365      self.observable_event_handlers.borrow_mut().remove(handler);
366    }
367  }
368
369  #[inline]
370  fn notify_observable_event_for_actors(&self, event: ObservableEvent) {
371    let mut unavailables = Vec::new();
372    for actor in &*self.observable_event_actors.borrow() {
373      if actor.connected() {
374        actor.do_send(event.clone());
375      } else {
376        unavailables.push(actor.clone());
377      }
378    }
379    for actor in &unavailables {
380      self.observable_event_actors.borrow_mut().remove(actor);
381    }
382  }
383
384  #[inline]
385  pub fn next_msg_ref(&self) -> u32 {
386    let prev_msg_ref = self.msg_ref.get();
387    if prev_msg_ref < MAX_MSG_REF {
388      let curr_msg_ref = prev_msg_ref + 1;
389      self.msg_ref.set(curr_msg_ref);
390      curr_msg_ref
391    } else {
392      self.msg_ref.set(1);
393      1
394    }
395  }
396
397  #[inline]
398  fn try_set_msg_ref(&self, msg_ref: u32) {
399    if msg_ref > self.msg_ref.get() {
400      self.msg_ref.set(msg_ref);
401    }
402  }
403
404  #[inline]
405  fn next_endpoint(&self) -> &String {
406    let curr_endpoint_index = self.endpoint_index.get();
407    let next_endpoint_index =
408      if curr_endpoint_index >= self.endpoints.len() - 1 { 0 } else { curr_endpoint_index + 1 };
409    self.endpoint_index.set(next_endpoint_index);
410    &self.endpoints[next_endpoint_index]
411  }
412
413  #[inline]
414  fn curr_endpoint(&self) -> &String {
415    &self.endpoints[self.endpoint_index.get()]
416  }
417
418  #[inline]
419  fn stringify(payload: &Payload) -> String {
420    match Self::decode_error_payload(payload) {
421      Ok(code) => format!("{:?}({})", code, u16::from(code)),
422      Err(err) => format!("{:?}", err),
423    }
424  }
425
426  #[inline]
427  fn decode_error_payload(payload: &Payload) -> Result<CloseCode, WebSocketError> {
428    match payload.len() {
429      0 => Ok(CloseCode::Normal),
430      1 => return Err(WebSocketError::InvalidCloseFrame),
431      _ => {
432        let code = CloseCode::from(u16::from_be_bytes(payload[0..2].try_into().unwrap()));
433        if !code.is_allowed() {
434          return Err(WebSocketError::InvalidCloseCode);
435        }
436        Ok(code)
437      }
438    }
439  }
440
441  #[inline]
442  async fn connect(&self, endpoint: &String) -> Result<(Sink, Stream), AnyError> {
443    let stream = TcpStream::connect(endpoint).await?;
444    let req = HyperRequest::builder()
445      .method("GET")
446      .uri("/$ws")
447      .header("Host", endpoint)
448      .header(UPGRADE, "websocket")
449      .header(CONNECTION, "upgrade")
450      .header("CLIENT-ID", &format!("{}", self.id))
451      .header("Sec-WebSocket-Key", handshake::generate_key())
452      .header("Sec-WebSocket-Version", "13")
453      .body(Body::empty())?;
454
455    let (mut ws, _) = handshake::client(&SpawnExecutor, req, stream).await?;
456    ws.set_auto_close(false);
457    ws.set_auto_pong(false);
458    ws.set_max_message_size(self.options.max_frame_size as usize);
459    let (stream, sink) = ws.split(|s| tokio_split(s));
460    Ok((sink, FragmentCollectorRead::new(stream)))
461  }
462
463  #[inline]
464  fn set_socket_pair(&self, sink: Option<Sink>, stream: Option<Stream>) {
465    *self.sink.borrow_mut() = sink;
466    *self.stream.borrow_mut() = stream;
467  }
468
469  #[inline]
470  fn toggle_to_connected(&self) {
471    self.is_connected.set(true);
472    self.connected_event.set();
473    self.disconnected_event.reset();
474    self.notify_connected_event();
475  }
476
477  #[inline]
478  fn toggle_to_disconnected(&self) {
479    self.is_connected.set(false);
480    self.connected_event.reset();
481    self.disconnected_event.set();
482    self.notify_disconnected_event()
483  }
484
485  #[inline]
486  async fn close_sink(&self) -> Result<(), AnyError> {
487    if let Some(sink) = self.sink.try_borrow_mut()?.as_mut() {
488      Ok(sink.write_frame(Frame::close_raw(vec![].into())).await?)
489    } else {
490      Ok(())
491    }
492  }
493
494  #[inline]
495  fn is_connected(&self) -> bool {
496    self.is_connected.get()
497  }
498
499  #[inline]
500  fn is_stopping(&self) -> bool {
501    self.is_stopping.get()
502  }
503}
504
505pub struct FutureStyleConnection {
506  inner: Rc<FutureStyleConnectionInner>,
507}
508
509impl Connection for FutureStyleConnection {}
510
511impl Actor for FutureStyleConnection {
512  type Context = Context<Self>;
513
514  #[inline]
515  fn started(&mut self, ctx: &mut Self::Context) {
516    *self.inner.addr.borrow_mut() = Some(ctx.address());
517
518    ctx.set_mailbox_capacity(self.inner.options.mailbox_capacity as usize);
519
520    Box::pin(Rc::clone(&self.inner).connect_repeatedly().into_actor(self)).spawn(ctx);
521    Box::pin(Rc::clone(&self.inner).receive_repeatedly().into_actor(self)).spawn(ctx);
522
523    log::info!("Started: actor: {}<{}>", &self.inner.curr_endpoint(), &self.inner.id);
524  }
525
526  #[inline]
527  fn stopping(&mut self, _: &mut Self::Context) -> Running {
528    log::info!("Stopping: actor: {}<{}>", &self.inner.curr_endpoint(), &self.inner.id);
529    self.inner.stop();
530    Running::Stop
531  }
532
533  #[inline]
534  fn stopped(&mut self, _: &mut Self::Context) {
535    log::info!("Stopped: actor: {}<{}>", &self.inner.curr_endpoint(), &self.inner.id);
536  }
537}
538
539#[derive(Clone, ActixMessage, PartialEq, Eq)]
540#[rtype(result = "()")]
541pub enum ObservableEvent {
542  Connected(Addr<FutureStyleConnection>),
543  Disconnected(Addr<FutureStyleConnection>),
544  Stopped(Addr<FutureStyleConnection>),
545}
546
547impl fmt::Debug for ObservableEvent {
548  fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
549    match self {
550      Self::Connected(_) => f.debug_tuple("Connected").finish(),
551      Self::Disconnected(_) => f.debug_tuple("Disconnected").finish(),
552      Self::Stopped(_) => f.debug_tuple("Closed").finish(),
553    }
554  }
555}
556
557pub trait ObservableEventHandler: Send + Sync + 'static {
558  fn id(&self) -> u64;
559  fn is_available(&self) -> bool;
560  fn handle(&self, event: ObservableEvent);
561}
562
563#[derive(Debug, ActixMessage)]
564#[rtype(result = "()")]
565pub struct ObserveObservableEventWithHandlerMsg<OEH: ObservableEventHandler> {
566  pub handler: OEH,
567}
568
569impl<OEH: ObservableEventHandler> Handler<ObserveObservableEventWithHandlerMsg<OEH>>
570  for FutureStyleConnection
571{
572  type Result = ();
573
574  #[inline]
575  fn handle(
576    &mut self, msg: ObserveObservableEventWithHandlerMsg<OEH>, _ctx: &mut Context<Self>,
577  ) -> Self::Result {
578    self.inner.observe_observable_event_with_handler(msg);
579  }
580}
581
582#[derive(Debug, ActixMessage)]
583#[rtype(result = "()")]
584pub struct UnobserveObservableEventWithHandlerMsg {
585  pub handler_id: u64,
586}
587
588impl Handler<UnobserveObservableEventWithHandlerMsg> for FutureStyleConnection {
589  type Result = ();
590
591  #[inline]
592  fn handle(
593    &mut self, msg: UnobserveObservableEventWithHandlerMsg, _ctx: &mut Context<Self>,
594  ) -> Self::Result {
595    self.inner.unobserve_observable_event_with_handler(msg);
596  }
597}
598
599#[derive(Debug, ActixMessage)]
600#[rtype(result = "()")]
601pub struct ObserveObservableEventWithActorMsg {
602  pub recip: Recipient<ObservableEvent>,
603}
604
605impl Handler<ObserveObservableEventWithActorMsg> for FutureStyleConnection {
606  type Result = ();
607
608  fn handle(
609    &mut self, msg: ObserveObservableEventWithActorMsg, _ctx: &mut Context<Self>,
610  ) -> Self::Result {
611    self.inner.observe_observable_event_with_actor(msg.recip);
612  }
613}
614
615#[derive(Debug, ActixMessage)]
616#[rtype(result = "()")]
617pub struct UnobserveObservableEventWithActorMsg {
618  pub recip: Recipient<ObservableEvent>,
619}
620
621impl Handler<UnobserveObservableEventWithActorMsg> for FutureStyleConnection {
622  type Result = ();
623
624  fn handle(
625    &mut self, msg: UnobserveObservableEventWithActorMsg, _ctx: &mut Context<Self>,
626  ) -> Self::Result {
627    self.inner.unobserve_observable_event_with_actor(msg.recip);
628  }
629}
630
631impl Handler<ProtocolMsg> for FutureStyleConnection {
632  type Result = ResponseFuture<Result<ProtocolMsg, HandleError<ProtocolMsg>>>;
633
634  #[inline]
635  fn handle(&mut self, msg: ProtocolMsg, _ctx: &mut Context<Self>) -> Self::Result {
636    Box::pin(Rc::clone(&self.inner).send(msg))
637  }
638}
639
640impl Handler<StopMsg> for FutureStyleConnection {
641  type Result = ();
642
643  #[inline]
644  fn handle(&mut self, _msg: StopMsg, ctx: &mut Context<Self>) -> Self::Result {
645    ctx.stop();
646  }
647}
648
649impl FutureStyleConnection {
650  #[inline]
651  pub fn new(endpoint: String, options: ConnectionOptions) -> Self {
652    FutureStyleConnection { inner: Rc::new(FutureStyleConnectionInner::new(endpoint, options)) }
653  }
654
655  #[inline]
656  pub fn with_alt_endpoints(endpoints: Vec<String>, options: ConnectionOptions) -> Self {
657    FutureStyleConnection {
658      inner: Rc::new(FutureStyleConnectionInner::with_alt_endpoints(endpoints, options)),
659    }
660  }
661
662  #[inline]
663  pub fn start2(endpoint: String, options: ConnectionOptions) -> Addr<Self> {
664    FutureStyleConnection::start_in_arbiter(
665      &ArbiterPool::singleton().fetch_arbiter(),
666      move |_ctx| FutureStyleConnection::new(endpoint, options),
667    )
668  }
669
670  #[inline]
671  pub fn start_with_alt_endpoints2(
672    endpoints: Vec<String>, options: ConnectionOptions,
673  ) -> Addr<Self> {
674    FutureStyleConnection::start_in_arbiter(
675      &ArbiterPool::singleton().fetch_arbiter(),
676      move |_ctx| FutureStyleConnection::with_alt_endpoints(endpoints, options),
677    )
678  }
679
680  #[inline]
681  pub fn start3(
682    endpoint: String, options: ConnectionOptions, arbiter: ArbiterHandle,
683  ) -> Addr<Self> {
684    FutureStyleConnection::start_in_arbiter(&arbiter, move |_ctx| {
685      FutureStyleConnection::new(endpoint, options)
686    })
687  }
688
689  #[inline]
690  pub fn start_with_alt_endpoints3(
691    endpoints: Vec<String>, options: ConnectionOptions, arbiter: ArbiterHandle,
692  ) -> Addr<Self> {
693    FutureStyleConnection::start_in_arbiter(&arbiter, move |_ctx| {
694      FutureStyleConnection::with_alt_endpoints(endpoints, options)
695    })
696  }
697
698  #[inline]
699  pub async fn stop(addr: Addr<Self>) -> Result<(), HandleError<StopMsg>> {
700    match addr.send(StopMsg).await {
701      Ok(ok) => Ok(ok),
702      Err(err) => match err {
703        MailboxError::Closed => Err(HandleError::MailboxClosed),
704        MailboxError::Timeout => Err(HandleError::Timeout),
705      },
706    }
707  }
708}
709
710////////////////////////////////////////////////////////////////////////////////
711/// test cases
712////////////////////////////////////////////////////////////////////////////////
713#[cfg(test)]
714mod tests {
715  use std::time::Duration;
716
717  use actix::prelude::*;
718  use maxwell_protocol::IntoEnum;
719
720  use super::*;
721
722  #[actix::test]
723  async fn test_send_msg() {
724    let conn =
725      FutureStyleConnection::new(String::from("localhost:8081"), ConnectionOptions::default())
726        .start();
727    for _ in 1..2 {
728      let msg = maxwell_protocol::PingReq { r#ref: 0 }.into_enum();
729      let res = conn.send(msg).timeout_ext(Duration::from_millis(1000)).await;
730      println!("with_connection_full result: {:?}", res);
731    }
732  }
733}