lapin/
connection_status.rs

1use crate::{
2    auth::{Credentials, SASLMechanism},
3    Connection, ConnectionProperties, PromiseResolver,
4};
5use parking_lot::Mutex;
6use std::{fmt, sync::Arc};
7
8#[derive(Clone, Default)]
9pub struct ConnectionStatus(Arc<Mutex<Inner>>);
10
11impl ConnectionStatus {
12    pub fn state(&self) -> ConnectionState {
13        self.0.lock().state.clone()
14    }
15
16    pub(crate) fn set_state(&self, state: ConnectionState) -> ConnectionState {
17        let mut inner = self.0.lock();
18        std::mem::replace(&mut inner.state, state)
19    }
20
21    pub(crate) fn connection_step(&self) -> Option<ConnectionStep> {
22        self.0.lock().connection_step.take()
23    }
24
25    pub(crate) fn set_connection_step(&self, connection_step: ConnectionStep) {
26        self.0.lock().connection_step = Some(connection_step);
27    }
28
29    pub(crate) fn connection_resolver(&self) -> Option<PromiseResolver<Connection>> {
30        let resolver = self.0.lock().connection_resolver();
31        // We carry the Connection here to drop the lock() above before dropping the Connection
32        resolver.map(|(resolver, _connection)| resolver)
33    }
34
35    pub(crate) fn connection_step_name(&self) -> Option<&'static str> {
36        self.0.lock().connection_step_name()
37    }
38
39    pub fn vhost(&self) -> String {
40        self.0.lock().vhost.clone()
41    }
42
43    pub(crate) fn set_vhost(&self, vhost: &str) {
44        self.0.lock().vhost = vhost.into();
45    }
46
47    pub fn username(&self) -> String {
48        self.0.lock().username.clone()
49    }
50
51    pub(crate) fn set_username(&self, username: &str) {
52        self.0.lock().username = username.into();
53    }
54
55    pub(crate) fn block(&self) {
56        self.0.lock().blocked = true;
57    }
58
59    pub(crate) fn unblock(&self) {
60        self.0.lock().blocked = false;
61    }
62
63    pub fn blocked(&self) -> bool {
64        self.0.lock().blocked
65    }
66
67    pub fn connected(&self) -> bool {
68        self.0.lock().state == ConnectionState::Connected
69    }
70
71    pub fn closing(&self) -> bool {
72        self.0.lock().state == ConnectionState::Closing
73    }
74
75    pub fn closed(&self) -> bool {
76        self.0.lock().state == ConnectionState::Closed
77    }
78
79    pub fn errored(&self) -> bool {
80        self.0.lock().state == ConnectionState::Error
81    }
82
83    pub(crate) fn auto_close(&self) -> bool {
84        [ConnectionState::Connecting, ConnectionState::Connected].contains(&self.0.lock().state)
85    }
86}
87
88pub(crate) enum ConnectionStep {
89    ProtocolHeader(
90        PromiseResolver<Connection>,
91        Connection,
92        Credentials,
93        SASLMechanism,
94        ConnectionProperties,
95    ),
96    StartOk(PromiseResolver<Connection>, Connection, Credentials),
97    Open(PromiseResolver<Connection>),
98}
99
100#[derive(Clone, Debug, Default, PartialEq)]
101pub enum ConnectionState {
102    #[default]
103    Initial,
104    Connecting,
105    Connected,
106    Closing,
107    Closed,
108    Error,
109}
110
111impl fmt::Debug for ConnectionStatus {
112    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
113        let mut debug = f.debug_struct("ConnectionStatus");
114        if let Some(inner) = self.0.try_lock() {
115            debug
116                .field("state", &inner.state)
117                .field("vhost", &inner.vhost)
118                .field("username", &inner.username)
119                .field("blocked", &inner.blocked);
120        }
121        debug.finish()
122    }
123}
124
125struct Inner {
126    connection_step: Option<ConnectionStep>,
127    state: ConnectionState,
128    vhost: String,
129    username: String,
130    blocked: bool,
131}
132
133impl Default for Inner {
134    fn default() -> Self {
135        Self {
136            connection_step: None,
137            state: ConnectionState::default(),
138            vhost: "/".into(),
139            username: "guest".into(),
140            blocked: false,
141        }
142    }
143}
144
145impl Inner {
146    fn connection_resolver(&mut self) -> Option<(PromiseResolver<Connection>, Option<Connection>)> {
147        self.connection_step
148            .take()
149            .map(|connection_step| match connection_step {
150                ConnectionStep::ProtocolHeader(resolver, connection, ..) => {
151                    (resolver, Some(connection))
152                }
153                ConnectionStep::StartOk(resolver, connection, ..) => (resolver, Some(connection)),
154                ConnectionStep::Open(resolver, ..) => (resolver, None),
155            })
156    }
157
158    fn connection_step_name(&self) -> Option<&'static str> {
159        if let ConnectionState::Connecting = self.state {
160            self.connection_step.as_ref().map(|step| match step {
161                ConnectionStep::ProtocolHeader(..) => "ProtocolHeader",
162                ConnectionStep::StartOk(..) => "StartOk",
163                ConnectionStep::Open(..) => "Open",
164            })
165        } else {
166            None
167        }
168    }
169}