1use std::{io, marker::PhantomData};
2
3use ntex_bytes::ByteString;
4
5use super::codec::{self, DisconnectReasonCode, QoS, UserProperties};
6use crate::error;
7
8#[derive(Debug)]
10pub enum Control<E> {
11 Auth(Auth),
13 PublishRelease(PublishRelease),
15 Ping(Ping),
17 Disconnect(Disconnect),
19 Subscribe(Subscribe),
21 Unsubscribe(Unsubscribe),
23 WrBackpressure(WrBackpressure),
25 Closed(Closed),
27 Error(Error<E>),
29 ProtocolError(ProtocolError),
31 PeerGone(PeerGone),
33}
34
35#[derive(Debug)]
37pub struct ControlAck {
38 pub(crate) packet: Option<codec::Packet>,
39 pub(crate) disconnect: bool,
40}
41
42impl<E> Control<E> {
43 #[doc(hidden)]
45 pub fn auth(pkt: codec::Auth, size: u32) -> Self {
46 Control::Auth(Auth { pkt, size })
47 }
48
49 pub(crate) fn pubrel(pkt: codec::PublishAck2, size: u32) -> Self {
50 Control::PublishRelease(PublishRelease::new(pkt, size))
51 }
52
53 #[doc(hidden)]
55 pub fn subscribe(pkt: codec::Subscribe, size: u32) -> Self {
56 Control::Subscribe(Subscribe::new(pkt, size))
57 }
58
59 #[doc(hidden)]
61 pub fn unsubscribe(pkt: codec::Unsubscribe, size: u32) -> Self {
62 Control::Unsubscribe(Unsubscribe::new(pkt, size))
63 }
64
65 #[doc(hidden)]
67 pub fn ping() -> Self {
68 Control::Ping(Ping)
69 }
70
71 #[doc(hidden)]
73 pub fn remote_disconnect(pkt: codec::Disconnect, size: u32) -> Self {
74 Control::Disconnect(Disconnect(pkt, size))
75 }
76
77 pub(super) const fn closed() -> Self {
78 Control::Closed(Closed)
79 }
80
81 pub(super) const fn wr_backpressure(enabled: bool) -> Self {
82 Control::WrBackpressure(WrBackpressure(enabled))
83 }
84
85 pub(super) fn error(err: E) -> Self {
86 Control::Error(Error::new(err))
87 }
88
89 pub(super) fn peer_gone(err: Option<io::Error>) -> Self {
90 Control::PeerGone(PeerGone(err))
91 }
92
93 pub(super) fn proto_error(err: error::ProtocolError) -> Self {
94 Control::ProtocolError(ProtocolError::new(err))
95 }
96
97 pub fn disconnect(&self) -> ControlAck {
100 let pkt = codec::Disconnect {
101 reason_code: codec::DisconnectReasonCode::NormalDisconnection,
102 session_expiry_interval_secs: None,
103 server_reference: None,
104 reason_string: None,
105 user_properties: Default::default(),
106 };
107 ControlAck { packet: Some(codec::Packet::Disconnect(pkt)), disconnect: true }
108 }
109
110 pub fn disconnect_with(&self, pkt: codec::Disconnect) -> ControlAck {
113 ControlAck { packet: Some(codec::Packet::Disconnect(pkt)), disconnect: true }
114 }
115
116 pub fn ack(self) -> ControlAck {
118 match self {
119 Control::Auth(_) => super::disconnect("Auth control message is not supported"),
120 Control::PublishRelease(msg) => msg.ack(),
121 Control::Ping(msg) => msg.ack(),
122 Control::Disconnect(msg) => msg.ack(),
123 Control::Subscribe(msg) => msg.ack(),
124 Control::Unsubscribe(msg) => msg.ack(),
125 Control::WrBackpressure(msg) => msg.ack(),
126 Control::Closed(msg) => msg.ack(),
127 Control::Error(_) => super::disconnect("Error control message is not supported"),
128 Control::ProtocolError(msg) => msg.ack(),
129 Control::PeerGone(msg) => msg.ack(),
130 }
131 }
132}
133
134#[derive(Debug)]
135pub struct Auth {
136 pkt: codec::Auth,
137 size: u32,
138}
139
140impl Auth {
141 pub fn packet(&self) -> &codec::Auth {
143 &self.pkt
144 }
145
146 pub fn packet_size(&self) -> u32 {
148 self.size
149 }
150
151 pub fn ack(self, response: codec::Auth) -> ControlAck {
152 ControlAck { packet: Some(codec::Packet::Auth(response)), disconnect: false }
153 }
154}
155
156#[derive(Debug)]
157pub struct PublishRelease {
158 pkt: codec::PublishAck2,
159 result: codec::PublishAck2,
160 size: u32,
161}
162
163impl PublishRelease {
164 pub(crate) fn new(pkt: codec::PublishAck2, size: u32) -> Self {
165 let packet_id = pkt.packet_id;
166 Self {
167 pkt,
168 size,
169 result: codec::PublishAck2 {
170 packet_id,
171 reason_code: codec::PublishAck2Reason::Success,
172 properties: codec::UserProperties::default(),
173 reason_string: None,
174 },
175 }
176 }
177
178 pub fn packet(&self) -> &codec::PublishAck2 {
180 &self.pkt
181 }
182
183 pub fn packet_size(&self) -> u32 {
185 self.size
186 }
187
188 #[inline]
190 pub fn properties<F>(mut self, f: F) -> Self
191 where
192 F: FnOnce(&mut codec::UserProperties),
193 {
194 f(&mut self.result.properties);
195 self
196 }
197
198 #[inline]
200 pub fn reason(mut self, reason: ByteString) -> Self {
201 self.result.reason_string = Some(reason);
202 self
203 }
204
205 pub fn ack(self) -> ControlAck {
207 ControlAck {
208 packet: Some(codec::Packet::PublishComplete(self.result)),
209 disconnect: false,
210 }
211 }
212}
213
214#[derive(Debug)]
215pub struct Ping;
216
217impl Ping {
218 pub fn ack(self) -> ControlAck {
219 ControlAck { packet: Some(codec::Packet::PingResponse), disconnect: false }
220 }
221}
222
223#[derive(Debug)]
224pub struct Disconnect(pub(crate) codec::Disconnect, pub(crate) u32);
225
226impl Disconnect {
227 pub fn packet(&self) -> &codec::Disconnect {
229 &self.0
230 }
231
232 pub fn packet_size(&self) -> u32 {
234 self.1
235 }
236
237 pub fn ack(self) -> ControlAck {
239 ControlAck { packet: None, disconnect: true }
240 }
241}
242
243#[derive(Debug)]
245pub struct Subscribe {
246 packet: codec::Subscribe,
247 result: codec::SubscribeAck,
248 size: u32,
249}
250
251impl Subscribe {
252 pub fn new(packet: codec::Subscribe, size: u32) -> Self {
255 let mut status = Vec::with_capacity(packet.topic_filters.len());
256 (0..packet.topic_filters.len())
257 .for_each(|_| status.push(codec::SubscribeAckReason::UnspecifiedError));
258
259 let result = codec::SubscribeAck {
260 status,
261 packet_id: packet.packet_id,
262 properties: codec::UserProperties::default(),
263 reason_string: None,
264 };
265
266 Self { packet, result, size }
267 }
268
269 #[inline]
270 pub fn iter_mut(&mut self) -> SubscribeIter<'_> {
272 SubscribeIter { subs: self as *const _ as *mut _, entry: 0, lt: PhantomData }
273 }
274
275 #[inline]
276 pub fn ack_reason(mut self, reason: ByteString) -> Self {
278 self.result.reason_string = Some(reason);
279 self
280 }
281
282 #[inline]
283 pub fn ack_properties<F>(mut self, f: F) -> Self
285 where
286 F: FnOnce(&mut codec::UserProperties),
287 {
288 f(&mut self.result.properties);
289 self
290 }
291
292 #[inline]
293 pub fn ack(self) -> ControlAck {
295 ControlAck { packet: Some(codec::Packet::SubscribeAck(self.result)), disconnect: false }
296 }
297
298 pub fn packet(&self) -> &codec::Subscribe {
300 &self.packet
301 }
302
303 pub fn packet_size(&self) -> u32 {
305 self.size
306 }
307}
308
309impl<'a> IntoIterator for &'a mut Subscribe {
310 type Item = Subscription<'a>;
311 type IntoIter = SubscribeIter<'a>;
312
313 fn into_iter(self) -> SubscribeIter<'a> {
314 self.iter_mut()
315 }
316}
317
318pub struct SubscribeIter<'a> {
320 subs: *mut Subscribe,
321 entry: usize,
322 lt: PhantomData<&'a mut Subscribe>,
323}
324
325impl<'a> SubscribeIter<'a> {
326 fn next_unsafe(&mut self) -> Option<Subscription<'a>> {
327 let subs = unsafe { &mut *self.subs };
328
329 if self.entry < subs.packet.topic_filters.len() {
330 let s = Subscription {
331 topic: &subs.packet.topic_filters[self.entry].0,
332 options: &subs.packet.topic_filters[self.entry].1,
333 status: &mut subs.result.status[self.entry],
334 };
335 self.entry += 1;
336 Some(s)
337 } else {
338 None
339 }
340 }
341}
342
343impl<'a> Iterator for SubscribeIter<'a> {
344 type Item = Subscription<'a>;
345
346 #[inline]
347 fn next(&mut self) -> Option<Subscription<'a>> {
348 self.next_unsafe()
349 }
350}
351
352#[derive(Debug)]
354pub struct Subscription<'a> {
355 topic: &'a ByteString,
356 options: &'a codec::SubscriptionOptions,
357 status: &'a mut codec::SubscribeAckReason,
358}
359
360impl<'a> Subscription<'a> {
361 #[inline]
362 pub fn topic(&self) -> &'a ByteString {
364 self.topic
365 }
366
367 #[inline]
368 pub fn options(&self) -> &codec::SubscriptionOptions {
370 self.options
371 }
372
373 #[inline]
374 pub fn fail(&mut self, status: codec::SubscribeAckReason) {
376 *self.status = status
377 }
378
379 #[inline]
380 pub fn confirm(&mut self, qos: QoS) {
382 match qos {
383 QoS::AtMostOnce => *self.status = codec::SubscribeAckReason::GrantedQos0,
384 QoS::AtLeastOnce => *self.status = codec::SubscribeAckReason::GrantedQos1,
385 QoS::ExactlyOnce => *self.status = codec::SubscribeAckReason::GrantedQos2,
386 }
387 }
388
389 #[inline]
390 #[doc(hidden)]
391 pub fn subscribe(&mut self, qos: QoS) {
393 self.confirm(qos)
394 }
395}
396
397#[derive(Debug)]
399pub struct Unsubscribe {
400 packet: codec::Unsubscribe,
401 result: codec::UnsubscribeAck,
402 size: u32,
403}
404
405impl Unsubscribe {
406 pub fn new(packet: codec::Unsubscribe, size: u32) -> Self {
409 let mut status = Vec::with_capacity(packet.topic_filters.len());
410 (0..packet.topic_filters.len())
411 .for_each(|_| status.push(codec::UnsubscribeAckReason::Success));
412
413 let result = codec::UnsubscribeAck {
414 status,
415 packet_id: packet.packet_id,
416 properties: codec::UserProperties::default(),
417 reason_string: None,
418 };
419
420 Self { packet, result, size }
421 }
422
423 pub fn properties(&self) -> &codec::UserProperties {
425 &self.packet.user_properties
426 }
427
428 pub fn iter(&self) -> impl Iterator<Item = &ByteString> {
430 self.packet.topic_filters.iter()
431 }
432
433 #[inline]
434 pub fn iter_mut(&mut self) -> UnsubscribeIter<'_> {
436 UnsubscribeIter { subs: self as *const _ as *mut _, entry: 0, lt: PhantomData }
437 }
438
439 #[inline]
440 pub fn ack_reason(mut self, reason: ByteString) -> Self {
442 self.result.reason_string = Some(reason);
443 self
444 }
445
446 #[inline]
447 pub fn ack_properties<F>(mut self, f: F) -> Self
449 where
450 F: FnOnce(&mut codec::UserProperties),
451 {
452 f(&mut self.result.properties);
453 self
454 }
455
456 #[inline]
457 pub fn ack(self) -> ControlAck {
459 ControlAck {
460 packet: Some(codec::Packet::UnsubscribeAck(self.result)),
461 disconnect: false,
462 }
463 }
464
465 pub fn packet(&self) -> &codec::Unsubscribe {
467 &self.packet
468 }
469
470 pub fn packet_size(&self) -> u32 {
472 self.size
473 }
474}
475
476impl<'a> IntoIterator for &'a mut Unsubscribe {
477 type Item = UnsubscribeItem<'a>;
478 type IntoIter = UnsubscribeIter<'a>;
479
480 fn into_iter(self) -> UnsubscribeIter<'a> {
481 self.iter_mut()
482 }
483}
484
485pub struct UnsubscribeIter<'a> {
487 subs: *mut Unsubscribe,
488 entry: usize,
489 lt: PhantomData<&'a mut Unsubscribe>,
490}
491
492impl<'a> UnsubscribeIter<'a> {
493 fn next_unsafe(&mut self) -> Option<UnsubscribeItem<'a>> {
494 let subs = unsafe { &mut *self.subs };
495
496 if self.entry < subs.packet.topic_filters.len() {
497 let s = UnsubscribeItem {
498 topic: &subs.packet.topic_filters[self.entry],
499 status: &mut subs.result.status[self.entry],
500 };
501 self.entry += 1;
502 Some(s)
503 } else {
504 None
505 }
506 }
507}
508
509impl<'a> Iterator for UnsubscribeIter<'a> {
510 type Item = UnsubscribeItem<'a>;
511
512 #[inline]
513 fn next(&mut self) -> Option<UnsubscribeItem<'a>> {
514 self.next_unsafe()
515 }
516}
517
518#[derive(Debug)]
520pub struct UnsubscribeItem<'a> {
521 topic: &'a ByteString,
522 status: &'a mut codec::UnsubscribeAckReason,
523}
524
525impl<'a> UnsubscribeItem<'a> {
526 #[inline]
527 pub fn topic(&self) -> &'a ByteString {
529 self.topic
530 }
531
532 #[inline]
533 pub fn fail(&mut self, status: codec::UnsubscribeAckReason) {
535 *self.status = status;
536 }
537
538 #[inline]
539 pub fn success(&mut self) {
541 *self.status = codec::UnsubscribeAckReason::Success;
542 }
543}
544
545#[derive(Debug)]
547pub struct WrBackpressure(bool);
548
549impl WrBackpressure {
550 #[inline]
551 pub fn enabled(&self) -> bool {
553 self.0
554 }
555
556 #[inline]
557 pub fn ack(self) -> ControlAck {
559 ControlAck { packet: None, disconnect: false }
560 }
561}
562
563#[derive(Debug)]
565pub struct Closed;
566
567impl Closed {
568 #[inline]
569 pub fn ack(self) -> ControlAck {
571 ControlAck { packet: None, disconnect: false }
572 }
573}
574
575#[derive(Debug)]
577pub struct Error<E> {
578 err: E,
579 pkt: codec::Disconnect,
580}
581
582impl<E> Error<E> {
583 pub fn new(err: E) -> Self {
584 Self {
585 err,
586 pkt: codec::Disconnect {
587 session_expiry_interval_secs: None,
588 server_reference: None,
589 reason_string: None,
590 user_properties: UserProperties::default(),
591 reason_code: DisconnectReasonCode::ImplementationSpecificError,
592 },
593 }
594 }
595
596 #[inline]
597 pub fn get_ref(&self) -> &E {
599 &self.err
600 }
601
602 #[inline]
603 pub fn reason_string(mut self, reason: ByteString) -> Self {
605 self.pkt.reason_string = Some(reason);
606 self
607 }
608
609 #[inline]
610 pub fn server_reference(mut self, reference: ByteString) -> Self {
612 self.pkt.server_reference = Some(reference);
613 self
614 }
615
616 #[inline]
617 pub fn properties<F>(mut self, f: F) -> Self
619 where
620 F: FnOnce(&mut codec::UserProperties),
621 {
622 f(&mut self.pkt.user_properties);
623 self
624 }
625
626 #[inline]
627 pub fn ack(mut self, reason: DisconnectReasonCode) -> ControlAck {
629 self.pkt.reason_code = reason;
630 ControlAck { packet: Some(codec::Packet::Disconnect(self.pkt)), disconnect: true }
631 }
632
633 #[inline]
634 pub fn ack_with<F>(self, f: F) -> ControlAck
636 where
637 F: FnOnce(E, codec::Disconnect) -> codec::Disconnect,
638 {
639 let pkt = f(self.err, self.pkt);
640 ControlAck { packet: Some(codec::Packet::Disconnect(pkt)), disconnect: true }
641 }
642}
643
644#[derive(Debug)]
646pub struct ProtocolError {
647 err: error::ProtocolError,
648 pkt: codec::Disconnect,
649}
650
651impl ProtocolError {
652 pub fn new(err: error::ProtocolError) -> Self {
653 Self {
654 pkt: codec::Disconnect {
655 session_expiry_interval_secs: None,
656 server_reference: None,
657 reason_string: None,
658 user_properties: UserProperties::default(),
659 reason_code: match err {
660 error::ProtocolError::Decode(error::DecodeError::InvalidLength) => {
661 DisconnectReasonCode::MalformedPacket
662 }
663 error::ProtocolError::Decode(error::DecodeError::MaxSizeExceeded) => {
664 DisconnectReasonCode::PacketTooLarge
665 }
666 error::ProtocolError::KeepAliveTimeout => {
667 DisconnectReasonCode::KeepAliveTimeout
668 }
669 error::ProtocolError::ProtocolViolation(ref e) => e.reason(),
670 error::ProtocolError::Encode(_) => {
671 DisconnectReasonCode::ImplementationSpecificError
672 }
673 _ => DisconnectReasonCode::ImplementationSpecificError,
674 },
675 },
676 err,
677 }
678 }
679
680 #[inline]
681 pub fn get_ref(&self) -> &error::ProtocolError {
683 &self.err
684 }
685
686 #[inline]
687 pub fn reason_code(mut self, reason: DisconnectReasonCode) -> Self {
689 self.pkt.reason_code = reason;
690 self
691 }
692
693 #[inline]
694 pub fn reason_string(mut self, reason: ByteString) -> Self {
696 self.pkt.reason_string = Some(reason);
697 self
698 }
699
700 #[inline]
701 pub fn server_reference(mut self, reference: ByteString) -> Self {
703 self.pkt.server_reference = Some(reference);
704 self
705 }
706
707 #[inline]
708 pub fn properties<F>(mut self, f: F) -> Self
710 where
711 F: FnOnce(&mut codec::UserProperties),
712 {
713 f(&mut self.pkt.user_properties);
714 self
715 }
716
717 #[inline]
718 pub fn ack(self) -> ControlAck {
720 ControlAck { packet: Some(codec::Packet::Disconnect(self.pkt)), disconnect: true }
721 }
722
723 #[inline]
724 pub fn ack_and_error(self) -> (ControlAck, error::ProtocolError) {
726 (
727 ControlAck { packet: Some(codec::Packet::Disconnect(self.pkt)), disconnect: true },
728 self.err,
729 )
730 }
731}
732
733#[derive(Debug)]
734pub struct PeerGone(Option<io::Error>);
735
736impl PeerGone {
737 pub fn err(&self) -> Option<&io::Error> {
739 self.0.as_ref()
740 }
741
742 pub fn take(&mut self) -> Option<io::Error> {
744 self.0.take()
745 }
746
747 pub fn ack(self) -> ControlAck {
749 ControlAck { packet: None, disconnect: true }
750 }
751}