1use std::{
2 cell::{Cell, RefCell},
3 fmt, format,
4 future::Future,
5 pin::Pin,
6 rc::Rc,
7 sync::atomic::{AtomicU32, Ordering},
8 task::{Context as TaskContext, Poll, Waker},
9 time::Duration,
10};
11
12use actix::{prelude::*, Message as ActixMessage};
13use ahash::{AHashMap as HashMap, AHashSet as HashSet};
14use anyhow::Error as AnyError;
15use fastwebsockets::{
16 handshake, CloseCode, FragmentCollectorRead, Frame, OpCode, Payload, WebSocketError,
17 WebSocketWrite,
18};
19use futures_intrusive::sync::LocalManualResetEvent;
20use hyper::{
21 header::{CONNECTION, UPGRADE},
22 upgrade::Upgraded,
23 Body, Request as HyperRequest,
24};
25use maxwell_protocol::{self, HandleError, ProtocolMsg, *};
26use tokio::{
27 io::{split as tokio_split, ReadHalf, WriteHalf},
28 net::TcpStream,
29 task::spawn as tokio_spawn,
30 time::{sleep, timeout},
31};
32
33use super::*;
34use crate::arbiter_pool::ArbiterPool;
35
36static ID_SEED: AtomicU32 = AtomicU32::new(0);
37
38struct Attachment {
39 response: Option<ProtocolMsg>,
40 waker: Option<Waker>,
41}
42
43struct Completer {
44 msg_ref: u32,
45 connection_inner: Rc<FutureStyleConnectionInner>,
46}
47
48impl Completer {
49 fn new(msg_ref: u32, connection_inner: Rc<FutureStyleConnectionInner>) -> Self {
50 connection_inner
51 .attachments
52 .borrow_mut()
53 .insert(msg_ref, Attachment { response: None, waker: None });
54 Completer { msg_ref, connection_inner }
55 }
56}
57
58impl Drop for Completer {
59 fn drop(&mut self) {
60 self.connection_inner.attachments.borrow_mut().remove(&self.msg_ref);
61 }
62}
63
64impl Future for Completer {
65 type Output = ProtocolMsg;
66
67 fn poll(self: Pin<&mut Self>, ctx: &mut TaskContext<'_>) -> Poll<ProtocolMsg> {
68 let mut attachments = self.connection_inner.attachments.borrow_mut();
69 let attachment = attachments.get_mut(&self.msg_ref).unwrap();
70 if let Some(msg) = attachment.response.take() {
71 Poll::Ready(msg)
72 } else {
73 match attachment.waker.as_ref() {
74 None => {
75 attachment.waker = Some(ctx.waker().clone());
76 }
77 Some(waker) => {
78 if !waker.will_wake(ctx.waker()) {
79 attachment.waker = Some(ctx.waker().clone());
80 }
81 }
82 }
83 Poll::Pending
84 }
85 }
86}
87
88struct SpawnExecutor;
90impl<Fut> hyper::rt::Executor<Fut> for SpawnExecutor
91where
92 Fut: Future + Send + 'static,
93 Fut::Output: Send + 'static,
94{
95 fn execute(&self, fut: Fut) {
96 tokio_spawn(fut);
97 }
98}
99
100type Sink = WebSocketWrite<WriteHalf<Upgraded>>;
101type Stream = FragmentCollectorRead<ReadHalf<Upgraded>>;
102
103struct FutureStyleConnectionInner {
104 id: u32,
105 addr: RefCell<Option<Addr<FutureStyleConnection>>>,
106 endpoint_index: Cell<usize>,
107 endpoints: Vec<String>,
108 options: ConnectionOptions,
109 sink: RefCell<Option<Sink>>,
110 stream: RefCell<Option<Stream>>,
111 connected_event: LocalManualResetEvent,
112 disconnected_event: LocalManualResetEvent,
113 is_connected: Cell<bool>,
114 attachments: RefCell<HashMap<u32, Attachment>>,
115 msg_ref: Cell<u32>,
116 observable_event_handlers: RefCell<HashMap<u64, Box<dyn ObservableEventHandler>>>,
117 observable_event_actors: RefCell<HashSet<Recipient<ObservableEvent>>>,
118 is_stopping: Cell<bool>,
119}
120
121impl FutureStyleConnectionInner {
122 #[inline]
123 pub fn new(endpoint: String, options: ConnectionOptions) -> Self {
124 Self::with_alt_endpoints(vec![endpoint], options)
125 }
126
127 #[inline]
128 pub fn with_alt_endpoints(endpoints: Vec<String>, options: ConnectionOptions) -> Self {
129 FutureStyleConnectionInner {
130 id: ID_SEED.fetch_add(1, Ordering::Relaxed),
131 addr: RefCell::new(None),
132 endpoint_index: Cell::new(endpoints.len() - 1),
133 endpoints,
134 options,
135 sink: RefCell::new(None),
136 stream: RefCell::new(None),
137 connected_event: LocalManualResetEvent::new(false),
138 disconnected_event: LocalManualResetEvent::new(true),
139 is_connected: Cell::new(false),
140 attachments: RefCell::new(HashMap::new()),
141 msg_ref: Cell::new(1),
142 observable_event_handlers: RefCell::new(HashMap::default()),
143 observable_event_actors: RefCell::new(HashSet::new()),
144 is_stopping: Cell::new(false),
145 }
146 }
147
148 pub async fn connect_repeatedly(self: Rc<Self>) {
149 loop {
150 if self.is_stopping() {
151 break;
152 }
153
154 self.disconnected_event.wait().await;
155 self.close_sink().await.unwrap_or_else(|err| {
156 log::error!(
157 "Failed to close sink: actor: {}<{}>, err: {}",
158 &self.curr_endpoint(),
159 &self.id,
160 err
161 );
162 });
163 let endpoint = self.next_endpoint();
164 log::info!("Connecting: actor: {}<{}>", &endpoint, &self.id);
165 match self.connect(&endpoint).await {
166 Ok((sink, stream)) => {
167 log::info!("Connected: actor: {}<{}>", endpoint, &self.id);
168 self.set_socket_pair(Some(sink), Some(stream));
169 self.toggle_to_connected();
170 }
171 Err(err) => {
172 log::error!("Failed to connect: actor: {}<{}>, err: {}", endpoint, &self.id, err);
173 self.set_socket_pair(None, None);
174 self.toggle_to_disconnected();
175 sleep(Duration::from_millis(self.options.reconnect_delay as u64)).await;
176 }
177 }
178 }
179 }
180
181 #[inline]
182 pub async fn send(
183 self: Rc<Self>, mut msg: ProtocolMsg,
184 ) -> Result<ProtocolMsg, HandleError<ProtocolMsg>> {
185 let mut msg_ref = maxwell_protocol::get_ref(&msg);
186 if msg_ref == 0 {
187 msg_ref = self.next_msg_ref();
188 maxwell_protocol::set_ref(&mut msg, msg_ref);
189 } else {
190 self.try_set_msg_ref(msg_ref);
191 }
192
193 let completer = Completer::new(msg_ref, Rc::clone(&self));
194
195 if !self.is_connected() {
196 for i in 0..3 {
197 if let Err(_) =
198 timeout(Duration::from_millis(i * 500 + 500), self.connected_event.wait()).await
199 {
200 continue;
201 } else {
202 break;
203 }
204 }
205 if !self.is_connected() {
206 let desc = format!("Timeout to send msg: actor: {}<{}>", &self.curr_endpoint(), &self.id);
207 log::error!("{:?}", desc);
208 return Err(HandleError::Any { code: 1, desc, msg });
209 }
210 }
211
212 if let Err(err) = self
213 .sink
214 .borrow_mut()
215 .as_mut()
216 .unwrap()
217 .write_frame(Frame::binary(encode(&msg).as_ref().into()))
218 .await
219 {
220 let curr_endpoint = self.curr_endpoint();
221 let desc =
222 format!("Failed to send msg: actor: {}<{}>, err: {}", &curr_endpoint, &self.id, &err);
223 log::error!("{:?}", desc);
224 log::warn!(
225 "The connection maybe broken, try to reconnect: actor: {}<{}>",
226 &curr_endpoint,
227 &self.id
228 );
229 self.toggle_to_disconnected();
230 return Err(HandleError::Any { code: 1, desc, msg });
231 }
232
233 Ok(completer.await)
234 }
235
236 pub async fn receive_repeatedly(self: Rc<Self>) {
237 loop {
238 if self.is_stopping() {
239 break;
240 }
241
242 if !self.is_connected() {
243 self.connected_event.wait().await;
244 }
245
246 match self
247 .stream
248 .borrow_mut()
249 .as_mut()
250 .unwrap() .read_frame(&mut move |_| async { Ok::<_, WebSocketError>(()) })
252 .await
253 {
254 Ok(frame) => match frame.opcode {
255 OpCode::Ping => {}
256 OpCode::Pong => {}
257 OpCode::Binary => {
258 let response = decode_bytes(&frame.payload).unwrap();
259 let msg_ref = maxwell_protocol::get_ref(&response);
260 let mut attachments = self.attachments.borrow_mut();
261 if let Some(attachment) = attachments.get_mut(&msg_ref) {
262 attachment.response = Some(response);
263 attachment.waker.as_ref().unwrap().wake_by_ref();
264 }
265 }
266 OpCode::Close => {
267 log::error!(
268 "Disconnected: actor: {}<{}>, reason: {}",
269 &self.curr_endpoint(),
270 &self.id,
271 Self::stringify(&frame.payload)
272 );
273 self.toggle_to_disconnected();
274 }
275 other => {
276 log::warn!("Received unknown msg: {:?}({:?})", &frame.payload, &other);
277 }
278 },
279 Err(err) => {
280 log::error!("Protocol error occured: err: {}", &err);
281 self.toggle_to_disconnected();
282 }
283 }
284 }
285 }
286
287 #[inline]
288 pub fn stop(&self) {
289 self.is_stopping.set(true);
290 self.notify_stopped_event();
291 }
292
293 #[inline]
294 pub fn observe_observable_event_with_handler<OEH: ObservableEventHandler>(
295 &self, msg: ObserveObservableEventWithHandlerMsg<OEH>,
296 ) {
297 if self.is_connected() {
298 msg.handler.handle(ObservableEvent::Connected(self.addr.borrow().as_ref().unwrap().clone()));
299 }
300 self.observable_event_handlers.borrow_mut().insert(msg.handler.id(), Box::new(msg.handler));
301 }
302
303 #[inline]
304 pub fn unobserve_observable_event_with_handler(
305 &self, msg: UnobserveObservableEventWithHandlerMsg,
306 ) {
307 self.observable_event_handlers.borrow_mut().remove(&msg.handler_id);
308 }
309
310 #[inline]
311 pub fn observe_observable_event_with_actor(&self, recip: Recipient<ObservableEvent>) {
312 if self.is_connected() {
313 recip.do_send(ObservableEvent::Connected(self.addr.borrow().as_ref().unwrap().clone()));
314 }
315 self.observable_event_actors.borrow_mut().insert(recip);
316 }
317
318 #[inline]
319 pub fn unobserve_observable_event_with_actor(&self, recip: Recipient<ObservableEvent>) {
320 self.observable_event_actors.borrow_mut().remove(&recip);
321 }
322
323 #[inline]
324 fn notify_connected_event(&self) {
325 self.notify_observable_event_for_handlers(ObservableEvent::Connected(
326 self.addr.borrow().as_ref().unwrap().clone(),
327 ));
328 self.notify_observable_event_for_actors(ObservableEvent::Connected(
329 self.addr.borrow().as_ref().unwrap().clone(),
330 ))
331 }
332
333 #[inline]
334 fn notify_disconnected_event(&self) {
335 self.notify_observable_event_for_handlers(ObservableEvent::Disconnected(
336 self.addr.borrow().as_ref().unwrap().clone(),
337 ));
338 self.notify_observable_event_for_actors(ObservableEvent::Disconnected(
339 self.addr.borrow().as_ref().unwrap().clone(),
340 ));
341 }
342
343 #[inline]
344 fn notify_stopped_event(&self) {
345 self.notify_observable_event_for_handlers(ObservableEvent::Stopped(
346 self.addr.borrow().as_ref().unwrap().clone(),
347 ));
348 self.notify_observable_event_for_actors(ObservableEvent::Stopped(
349 self.addr.borrow().as_ref().unwrap().clone(),
350 ));
351 }
352
353 #[inline]
354 fn notify_observable_event_for_handlers(&self, event: ObservableEvent) {
355 let mut unavailables = Vec::new();
356 let binding = self.observable_event_handlers.borrow();
357 for (id, handler) in binding.iter() {
358 if handler.is_available() {
359 handler.handle(event.clone());
360 } else {
361 unavailables.push(id);
362 }
363 }
364 for handler in &unavailables {
365 self.observable_event_handlers.borrow_mut().remove(handler);
366 }
367 }
368
369 #[inline]
370 fn notify_observable_event_for_actors(&self, event: ObservableEvent) {
371 let mut unavailables = Vec::new();
372 for actor in &*self.observable_event_actors.borrow() {
373 if actor.connected() {
374 actor.do_send(event.clone());
375 } else {
376 unavailables.push(actor.clone());
377 }
378 }
379 for actor in &unavailables {
380 self.observable_event_actors.borrow_mut().remove(actor);
381 }
382 }
383
384 #[inline]
385 pub fn next_msg_ref(&self) -> u32 {
386 let prev_msg_ref = self.msg_ref.get();
387 if prev_msg_ref < MAX_MSG_REF {
388 let curr_msg_ref = prev_msg_ref + 1;
389 self.msg_ref.set(curr_msg_ref);
390 curr_msg_ref
391 } else {
392 self.msg_ref.set(1);
393 1
394 }
395 }
396
397 #[inline]
398 fn try_set_msg_ref(&self, msg_ref: u32) {
399 if msg_ref > self.msg_ref.get() {
400 self.msg_ref.set(msg_ref);
401 }
402 }
403
404 #[inline]
405 fn next_endpoint(&self) -> &String {
406 let curr_endpoint_index = self.endpoint_index.get();
407 let next_endpoint_index =
408 if curr_endpoint_index >= self.endpoints.len() - 1 { 0 } else { curr_endpoint_index + 1 };
409 self.endpoint_index.set(next_endpoint_index);
410 &self.endpoints[next_endpoint_index]
411 }
412
413 #[inline]
414 fn curr_endpoint(&self) -> &String {
415 &self.endpoints[self.endpoint_index.get()]
416 }
417
418 #[inline]
419 fn stringify(payload: &Payload) -> String {
420 match Self::decode_error_payload(payload) {
421 Ok(code) => format!("{:?}({})", code, u16::from(code)),
422 Err(err) => format!("{:?}", err),
423 }
424 }
425
426 #[inline]
427 fn decode_error_payload(payload: &Payload) -> Result<CloseCode, WebSocketError> {
428 match payload.len() {
429 0 => Ok(CloseCode::Normal),
430 1 => return Err(WebSocketError::InvalidCloseFrame),
431 _ => {
432 let code = CloseCode::from(u16::from_be_bytes(payload[0..2].try_into().unwrap()));
433 if !code.is_allowed() {
434 return Err(WebSocketError::InvalidCloseCode);
435 }
436 Ok(code)
437 }
438 }
439 }
440
441 #[inline]
442 async fn connect(&self, endpoint: &String) -> Result<(Sink, Stream), AnyError> {
443 let stream = TcpStream::connect(endpoint).await?;
444 let req = HyperRequest::builder()
445 .method("GET")
446 .uri("/$ws")
447 .header("Host", endpoint)
448 .header(UPGRADE, "websocket")
449 .header(CONNECTION, "upgrade")
450 .header("CLIENT-ID", &format!("{}", self.id))
451 .header("Sec-WebSocket-Key", handshake::generate_key())
452 .header("Sec-WebSocket-Version", "13")
453 .body(Body::empty())?;
454
455 let (mut ws, _) = handshake::client(&SpawnExecutor, req, stream).await?;
456 ws.set_auto_close(false);
457 ws.set_auto_pong(false);
458 ws.set_max_message_size(self.options.max_frame_size as usize);
459 let (stream, sink) = ws.split(|s| tokio_split(s));
460 Ok((sink, FragmentCollectorRead::new(stream)))
461 }
462
463 #[inline]
464 fn set_socket_pair(&self, sink: Option<Sink>, stream: Option<Stream>) {
465 *self.sink.borrow_mut() = sink;
466 *self.stream.borrow_mut() = stream;
467 }
468
469 #[inline]
470 fn toggle_to_connected(&self) {
471 self.is_connected.set(true);
472 self.connected_event.set();
473 self.disconnected_event.reset();
474 self.notify_connected_event();
475 }
476
477 #[inline]
478 fn toggle_to_disconnected(&self) {
479 self.is_connected.set(false);
480 self.connected_event.reset();
481 self.disconnected_event.set();
482 self.notify_disconnected_event()
483 }
484
485 #[inline]
486 async fn close_sink(&self) -> Result<(), AnyError> {
487 if let Some(sink) = self.sink.try_borrow_mut()?.as_mut() {
488 Ok(sink.write_frame(Frame::close_raw(vec![].into())).await?)
489 } else {
490 Ok(())
491 }
492 }
493
494 #[inline]
495 fn is_connected(&self) -> bool {
496 self.is_connected.get()
497 }
498
499 #[inline]
500 fn is_stopping(&self) -> bool {
501 self.is_stopping.get()
502 }
503}
504
505pub struct FutureStyleConnection {
506 inner: Rc<FutureStyleConnectionInner>,
507}
508
509impl Connection for FutureStyleConnection {}
510
511impl Actor for FutureStyleConnection {
512 type Context = Context<Self>;
513
514 #[inline]
515 fn started(&mut self, ctx: &mut Self::Context) {
516 *self.inner.addr.borrow_mut() = Some(ctx.address());
517
518 ctx.set_mailbox_capacity(self.inner.options.mailbox_capacity as usize);
519
520 Box::pin(Rc::clone(&self.inner).connect_repeatedly().into_actor(self)).spawn(ctx);
521 Box::pin(Rc::clone(&self.inner).receive_repeatedly().into_actor(self)).spawn(ctx);
522
523 log::info!("Started: actor: {}<{}>", &self.inner.curr_endpoint(), &self.inner.id);
524 }
525
526 #[inline]
527 fn stopping(&mut self, _: &mut Self::Context) -> Running {
528 log::info!("Stopping: actor: {}<{}>", &self.inner.curr_endpoint(), &self.inner.id);
529 self.inner.stop();
530 Running::Stop
531 }
532
533 #[inline]
534 fn stopped(&mut self, _: &mut Self::Context) {
535 log::info!("Stopped: actor: {}<{}>", &self.inner.curr_endpoint(), &self.inner.id);
536 }
537}
538
539#[derive(Clone, ActixMessage, PartialEq, Eq)]
540#[rtype(result = "()")]
541pub enum ObservableEvent {
542 Connected(Addr<FutureStyleConnection>),
543 Disconnected(Addr<FutureStyleConnection>),
544 Stopped(Addr<FutureStyleConnection>),
545}
546
547impl fmt::Debug for ObservableEvent {
548 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
549 match self {
550 Self::Connected(_) => f.debug_tuple("Connected").finish(),
551 Self::Disconnected(_) => f.debug_tuple("Disconnected").finish(),
552 Self::Stopped(_) => f.debug_tuple("Closed").finish(),
553 }
554 }
555}
556
557pub trait ObservableEventHandler: Send + Sync + 'static {
558 fn id(&self) -> u64;
559 fn is_available(&self) -> bool;
560 fn handle(&self, event: ObservableEvent);
561}
562
563#[derive(Debug, ActixMessage)]
564#[rtype(result = "()")]
565pub struct ObserveObservableEventWithHandlerMsg<OEH: ObservableEventHandler> {
566 pub handler: OEH,
567}
568
569impl<OEH: ObservableEventHandler> Handler<ObserveObservableEventWithHandlerMsg<OEH>>
570 for FutureStyleConnection
571{
572 type Result = ();
573
574 #[inline]
575 fn handle(
576 &mut self, msg: ObserveObservableEventWithHandlerMsg<OEH>, _ctx: &mut Context<Self>,
577 ) -> Self::Result {
578 self.inner.observe_observable_event_with_handler(msg);
579 }
580}
581
582#[derive(Debug, ActixMessage)]
583#[rtype(result = "()")]
584pub struct UnobserveObservableEventWithHandlerMsg {
585 pub handler_id: u64,
586}
587
588impl Handler<UnobserveObservableEventWithHandlerMsg> for FutureStyleConnection {
589 type Result = ();
590
591 #[inline]
592 fn handle(
593 &mut self, msg: UnobserveObservableEventWithHandlerMsg, _ctx: &mut Context<Self>,
594 ) -> Self::Result {
595 self.inner.unobserve_observable_event_with_handler(msg);
596 }
597}
598
599#[derive(Debug, ActixMessage)]
600#[rtype(result = "()")]
601pub struct ObserveObservableEventWithActorMsg {
602 pub recip: Recipient<ObservableEvent>,
603}
604
605impl Handler<ObserveObservableEventWithActorMsg> for FutureStyleConnection {
606 type Result = ();
607
608 fn handle(
609 &mut self, msg: ObserveObservableEventWithActorMsg, _ctx: &mut Context<Self>,
610 ) -> Self::Result {
611 self.inner.observe_observable_event_with_actor(msg.recip);
612 }
613}
614
615#[derive(Debug, ActixMessage)]
616#[rtype(result = "()")]
617pub struct UnobserveObservableEventWithActorMsg {
618 pub recip: Recipient<ObservableEvent>,
619}
620
621impl Handler<UnobserveObservableEventWithActorMsg> for FutureStyleConnection {
622 type Result = ();
623
624 fn handle(
625 &mut self, msg: UnobserveObservableEventWithActorMsg, _ctx: &mut Context<Self>,
626 ) -> Self::Result {
627 self.inner.unobserve_observable_event_with_actor(msg.recip);
628 }
629}
630
631impl Handler<ProtocolMsg> for FutureStyleConnection {
632 type Result = ResponseFuture<Result<ProtocolMsg, HandleError<ProtocolMsg>>>;
633
634 #[inline]
635 fn handle(&mut self, msg: ProtocolMsg, _ctx: &mut Context<Self>) -> Self::Result {
636 Box::pin(Rc::clone(&self.inner).send(msg))
637 }
638}
639
640impl Handler<StopMsg> for FutureStyleConnection {
641 type Result = ();
642
643 #[inline]
644 fn handle(&mut self, _msg: StopMsg, ctx: &mut Context<Self>) -> Self::Result {
645 ctx.stop();
646 }
647}
648
649impl FutureStyleConnection {
650 #[inline]
651 pub fn new(endpoint: String, options: ConnectionOptions) -> Self {
652 FutureStyleConnection { inner: Rc::new(FutureStyleConnectionInner::new(endpoint, options)) }
653 }
654
655 #[inline]
656 pub fn with_alt_endpoints(endpoints: Vec<String>, options: ConnectionOptions) -> Self {
657 FutureStyleConnection {
658 inner: Rc::new(FutureStyleConnectionInner::with_alt_endpoints(endpoints, options)),
659 }
660 }
661
662 #[inline]
663 pub fn start2(endpoint: String, options: ConnectionOptions) -> Addr<Self> {
664 FutureStyleConnection::start_in_arbiter(
665 &ArbiterPool::singleton().fetch_arbiter(),
666 move |_ctx| FutureStyleConnection::new(endpoint, options),
667 )
668 }
669
670 #[inline]
671 pub fn start_with_alt_endpoints2(
672 endpoints: Vec<String>, options: ConnectionOptions,
673 ) -> Addr<Self> {
674 FutureStyleConnection::start_in_arbiter(
675 &ArbiterPool::singleton().fetch_arbiter(),
676 move |_ctx| FutureStyleConnection::with_alt_endpoints(endpoints, options),
677 )
678 }
679
680 #[inline]
681 pub fn start3(
682 endpoint: String, options: ConnectionOptions, arbiter: ArbiterHandle,
683 ) -> Addr<Self> {
684 FutureStyleConnection::start_in_arbiter(&arbiter, move |_ctx| {
685 FutureStyleConnection::new(endpoint, options)
686 })
687 }
688
689 #[inline]
690 pub fn start_with_alt_endpoints3(
691 endpoints: Vec<String>, options: ConnectionOptions, arbiter: ArbiterHandle,
692 ) -> Addr<Self> {
693 FutureStyleConnection::start_in_arbiter(&arbiter, move |_ctx| {
694 FutureStyleConnection::with_alt_endpoints(endpoints, options)
695 })
696 }
697
698 #[inline]
699 pub async fn stop(addr: Addr<Self>) -> Result<(), HandleError<StopMsg>> {
700 match addr.send(StopMsg).await {
701 Ok(ok) => Ok(ok),
702 Err(err) => match err {
703 MailboxError::Closed => Err(HandleError::MailboxClosed),
704 MailboxError::Timeout => Err(HandleError::Timeout),
705 },
706 }
707 }
708}
709
710#[cfg(test)]
714mod tests {
715 use std::time::Duration;
716
717 use actix::prelude::*;
718 use maxwell_protocol::IntoEnum;
719
720 use super::*;
721
722 #[actix::test]
723 async fn test_send_msg() {
724 let conn =
725 FutureStyleConnection::new(String::from("localhost:8081"), ConnectionOptions::default())
726 .start();
727 for _ in 1..2 {
728 let msg = maxwell_protocol::PingReq { r#ref: 0 }.into_enum();
729 let res = conn.send(msg).timeout_ext(Duration::from_millis(1000)).await;
730 println!("with_connection_full result: {:?}", res);
731 }
732 }
733}