maxwell_utils/connection/
connection.rs1use std::{fmt::Debug, future::Future, pin::Pin, time::Duration};
2
3use actix::{dev::ToEnvelope, prelude::*, Message as ActixMessage};
4use futures::future::{AbortHandle, Abortable};
5use maxwell_protocol::{self, HandleError, ProtocolMsg};
6use tokio::time::timeout;
7
8pub trait Connection: Actor + Handler<StopMsg> {}
9
10#[derive(Debug, Clone)]
11pub struct ConnectionOptions {
12 pub reconnect_delay: u32,
13 pub mailbox_capacity: u32,
14 pub max_frame_size: u32,
15}
16
17impl Default for ConnectionOptions {
18 fn default() -> Self {
19 ConnectionOptions { reconnect_delay: 1000, mailbox_capacity: 128, max_frame_size: 134217728 }
20 }
21}
22
23pub struct ConnectionOptionsBuilder {
24 options: ConnectionOptions,
25}
26
27impl ConnectionOptionsBuilder {
28 pub fn new() -> Self {
29 ConnectionOptionsBuilder { options: ConnectionOptions::default() }
30 }
31
32 pub fn reconnect_delay(mut self, reconnect_delay: u32) -> Self {
33 self.options.reconnect_delay = reconnect_delay;
34 self
35 }
36
37 pub fn mailbox_capacity(mut self, mailbox_capacity: u32) -> Self {
38 self.options.mailbox_capacity = mailbox_capacity;
39 self
40 }
41
42 pub fn max_frame_size(mut self, max_frame_size: u32) -> Self {
43 self.options.max_frame_size = max_frame_size;
44 self
45 }
46
47 pub fn build(&self) -> ConnectionOptions {
48 self.options.clone()
49 }
50}
51
52#[derive(Debug, ActixMessage)]
53#[rtype(result = "()")]
54pub struct StopMsg;
55
56#[derive(Debug, ActixMessage)]
57#[rtype(result = "()")]
58pub struct DumpInfoMsg;
59
60pub trait TimeoutExt {
61 type Result;
62
63 fn timeout_ext(self, dur: Duration) -> Self::Result;
64}
65
66impl<C: Connection + Handler<ProtocolMsg>> TimeoutExt for Request<C, ProtocolMsg>
67where <C as Actor>::Context: ToEnvelope<C, ProtocolMsg>
68{
69 type Result = Pin<Box<dyn Future<Output = Result<ProtocolMsg, HandleError<ProtocolMsg>>> + Send>>;
70
71 fn timeout_ext(self, dur: Duration) -> Self::Result {
72 Box::pin(async move {
73 let (abort_handle, abort_registration) = AbortHandle::new_pair();
74 let res = timeout(dur, Abortable::new(self, abort_registration)).await;
75 match res {
76 Ok(res) => match res {
77 Ok(res) => match res {
78 Ok(res) => res,
79 Err(err) => match err {
80 MailboxError::Timeout => Err(HandleError::Timeout),
81 MailboxError::Closed => Err(HandleError::MailboxClosed),
82 },
83 },
84 Err(_err) => Err(HandleError::Timeout),
86 },
87 Err(_err) => {
89 abort_handle.abort();
90 Err(HandleError::Timeout)
91 }
92 }
93 })
94 }
95}
96
97pub const MAX_MSG_REF: u32 = 2100000000;