maxwell_utils/connection/
connection.rs

1use std::{fmt::Debug, future::Future, pin::Pin, time::Duration};
2
3use actix::{dev::ToEnvelope, prelude::*, Message as ActixMessage};
4use futures::future::{AbortHandle, Abortable};
5use maxwell_protocol::{self, HandleError, ProtocolMsg};
6use tokio::time::timeout;
7
8pub trait Connection: Actor + Handler<StopMsg> {}
9
10#[derive(Debug, Clone)]
11pub struct ConnectionOptions {
12  pub reconnect_delay: u32,
13  pub mailbox_capacity: u32,
14  pub max_frame_size: u32,
15}
16
17impl Default for ConnectionOptions {
18  fn default() -> Self {
19    ConnectionOptions { reconnect_delay: 1000, mailbox_capacity: 128, max_frame_size: 134217728 }
20  }
21}
22
23pub struct ConnectionOptionsBuilder {
24  options: ConnectionOptions,
25}
26
27impl ConnectionOptionsBuilder {
28  pub fn new() -> Self {
29    ConnectionOptionsBuilder { options: ConnectionOptions::default() }
30  }
31
32  pub fn reconnect_delay(mut self, reconnect_delay: u32) -> Self {
33    self.options.reconnect_delay = reconnect_delay;
34    self
35  }
36
37  pub fn mailbox_capacity(mut self, mailbox_capacity: u32) -> Self {
38    self.options.mailbox_capacity = mailbox_capacity;
39    self
40  }
41
42  pub fn max_frame_size(mut self, max_frame_size: u32) -> Self {
43    self.options.max_frame_size = max_frame_size;
44    self
45  }
46
47  pub fn build(&self) -> ConnectionOptions {
48    self.options.clone()
49  }
50}
51
52#[derive(Debug, ActixMessage)]
53#[rtype(result = "()")]
54pub struct StopMsg;
55
56#[derive(Debug, ActixMessage)]
57#[rtype(result = "()")]
58pub struct DumpInfoMsg;
59
60pub trait TimeoutExt {
61  type Result;
62
63  fn timeout_ext(self, dur: Duration) -> Self::Result;
64}
65
66impl<C: Connection + Handler<ProtocolMsg>> TimeoutExt for Request<C, ProtocolMsg>
67where <C as Actor>::Context: ToEnvelope<C, ProtocolMsg>
68{
69  type Result = Pin<Box<dyn Future<Output = Result<ProtocolMsg, HandleError<ProtocolMsg>>> + Send>>;
70
71  fn timeout_ext(self, dur: Duration) -> Self::Result {
72    Box::pin(async move {
73      let (abort_handle, abort_registration) = AbortHandle::new_pair();
74      let res = timeout(dur, Abortable::new(self, abort_registration)).await;
75      match res {
76        Ok(res) => match res {
77          Ok(res) => match res {
78            Ok(res) => res,
79            Err(err) => match err {
80              MailboxError::Timeout => Err(HandleError::Timeout),
81              MailboxError::Closed => Err(HandleError::MailboxClosed),
82            },
83          },
84          // Aborted
85          Err(_err) => Err(HandleError::Timeout),
86        },
87        // Elapsed(())
88        Err(_err) => {
89          abort_handle.abort();
90          Err(HandleError::Timeout)
91        }
92      }
93    })
94  }
95}
96
97pub const MAX_MSG_REF: u32 = 2100000000;