Skip to main content

ntex_h2/
control.rs

1use std::io;
2
3use crate::frame::Frame;
4use crate::{error, frame, stream::StreamRef};
5
6#[derive(Debug)]
7pub enum Control<E> {
8    /// Connection is prepared to disconnect
9    Disconnect(Reason<E>),
10    /// Protocol dispatcher is terminated
11    Terminated(Terminated),
12}
13
14#[derive(Debug)]
15/// Disconnect reason
16pub enum Reason<E> {
17    /// Application level error from publish service
18    Error(Error<E>),
19    /// Protocol level error
20    ProtocolError(ConnectionError),
21    /// Remote GoAway is received
22    GoAway(GoAway),
23    /// Peer is gone
24    PeerGone(PeerGone),
25}
26
27#[derive(Clone, Debug)]
28pub struct ControlAck {
29    pub(crate) frame: Option<Frame>,
30}
31
32impl<E> Control<E> {
33    /// Create a new `Control` message for app level errors
34    pub(super) fn error(err: E, stream: Option<StreamRef>) -> Self {
35        Control::Disconnect(Reason::Error(Error::new(err, stream)))
36    }
37
38    /// Create a new `Control` message from GOAWAY packet.
39    pub(super) fn go_away(frm: frame::GoAway) -> Self {
40        Control::Disconnect(Reason::GoAway(GoAway(frm)))
41    }
42
43    /// Create a new `Control` message from DISCONNECT packet.
44    pub(super) fn peer_gone(err: Option<io::Error>) -> Self {
45        Control::Disconnect(Reason::PeerGone(PeerGone(err)))
46    }
47
48    /// Create a new `Control` message for protocol level errors
49    pub(super) fn proto_error(err: error::ConnectionError) -> Self {
50        Control::Disconnect(Reason::ProtocolError(ConnectionError::new(err)))
51    }
52
53    pub(super) fn terminated() -> Self {
54        Control::Terminated(Terminated)
55    }
56
57    /// Default ack impl
58    pub fn ack(self) -> ControlAck {
59        match self {
60            Control::Disconnect(item) => item.ack(),
61            Control::Terminated(item) => item.ack(),
62        }
63    }
64}
65
66impl<E> Reason<E> {
67    /// Default ack impl
68    pub fn ack(self) -> ControlAck {
69        match self {
70            Reason::Error(item) => item.ack(),
71            Reason::ProtocolError(item) => item.ack(),
72            Reason::GoAway(item) => item.ack(),
73            Reason::PeerGone(item) => item.ack(),
74        }
75    }
76}
77
78/// Application level error
79#[derive(Debug)]
80pub struct Error<E> {
81    err: E,
82    goaway: frame::GoAway,
83}
84
85impl<E> Error<E> {
86    fn new(err: E, stream: Option<StreamRef>) -> Self {
87        let goaway = if let Some(ref stream) = stream {
88            frame::GoAway::new(frame::Reason::INTERNAL_ERROR).set_last_stream_id(stream.id())
89        } else {
90            frame::GoAway::new(frame::Reason::INTERNAL_ERROR)
91        };
92
93        Self { err, goaway }
94    }
95
96    #[inline]
97    /// Returns reference to mqtt error
98    pub fn get_ref(&self) -> &E {
99        &self.err
100    }
101
102    #[inline]
103    /// Set reason code for go away packet
104    pub fn reason(mut self, reason: frame::Reason) -> Self {
105        self.goaway = self.goaway.set_reason(reason);
106        self
107    }
108
109    #[inline]
110    /// Ack service error, return disconnect packet and close connection.
111    pub fn ack(self) -> ControlAck {
112        ControlAck {
113            frame: Some(self.goaway.into()),
114        }
115    }
116}
117
118/// Dispatcher has been terminated
119#[derive(Debug)]
120pub struct Terminated;
121
122impl Terminated {
123    #[inline]
124    /// convert packet to a result
125    pub fn ack(self) -> ControlAck {
126        ControlAck { frame: None }
127    }
128}
129
130/// Protocol level error
131#[derive(Debug)]
132pub struct ConnectionError {
133    err: error::ConnectionError,
134    frm: frame::GoAway,
135}
136
137impl ConnectionError {
138    pub fn new(err: error::ConnectionError) -> Self {
139        Self {
140            frm: err.to_goaway(),
141            err,
142        }
143    }
144
145    #[inline]
146    /// Returns reference to a protocol error
147    pub fn get_ref(&self) -> &error::ConnectionError {
148        &self.err
149    }
150
151    #[inline]
152    /// Set reason code for go away packet
153    pub fn reason(mut self, reason: frame::Reason) -> Self {
154        self.frm = self.frm.set_reason(reason);
155        self
156    }
157
158    #[inline]
159    /// Ack protocol error, return disconnect packet and close connection.
160    pub fn ack(self) -> ControlAck {
161        ControlAck {
162            frame: Some(self.frm.into()),
163        }
164    }
165}
166
167#[derive(Debug)]
168pub struct PeerGone(pub(super) Option<io::Error>);
169
170impl PeerGone {
171    /// Returns error reference
172    pub fn err(&self) -> Option<&io::Error> {
173        self.0.as_ref()
174    }
175
176    /// Take error
177    pub fn take(&mut self) -> Option<io::Error> {
178        self.0.take()
179    }
180
181    pub fn ack(self) -> ControlAck {
182        ControlAck { frame: None }
183    }
184}
185
186#[derive(Debug)]
187pub struct GoAway(frame::GoAway);
188
189impl GoAway {
190    /// Returns error reference
191    pub fn frame(&self) -> &frame::GoAway {
192        &self.0
193    }
194
195    pub fn ack(self) -> ControlAck {
196        ControlAck { frame: None }
197    }
198}