1use core::{cell::RefCell, future::Future, net::SocketAddr, ops::Deref};
2
3use embassy_futures::poll_once;
4use embassy_sync::{blocking_mutex::raw::RawMutex, watch::Watch};
5use embedded_nal_async::{Dns, TcpConnect};
6use embedded_io_async::{Read, Write, Error};
7use mqttrs2::{Connack, Connect, LastWill, Packet, Protocol, Suback, Subscribe, decode_slice_with_len, encode_slice};
8
9use crate::{AutoSubscribe, ClientConfig, MqttError, buffer::{MappedBufferRef, StackBufferCell}, state::pid::next_pid};
10
11const MQTT_DEFAULT_PORT: u16 = 1883;
12
13macro_rules! network_write {
14 ($data:expr, $conn:expr) => {
15 $conn.write($data).await
16 .map_err(|err| MqttError::ConnectionFailed2(err.kind()))?
17 };
18}
19
20macro_rules! network_read {
21 ($data:expr, $conn:expr) => {
22 $conn.read($data).await
23 .map_err(|err| MqttError::ConnectionFailed2(err.kind()))?
24 };
25}
26
27macro_rules! network_flush {
28 ($conn:expr) => {
29 $conn.flush().await
30 .map_err(|err| MqttError::ConnectionFailed2(err.kind()))?
31 };
32}
33
34fn network_try_read(connection: &mut impl Read, buf: &mut [u8]) -> Result<usize, MqttError> {
35 let read_fut = connection.read(buf);
36 match poll_once(read_fut) {
37 core::task::Poll::Ready(Ok(n)) => {
38 trace!("network_try_read: read {} bytes", n);
39 Ok(n)
40 },
41 core::task::Poll::Ready(Err(err)) => {
42 trace!("network_try_read err: {}", &err);
43 Err(MqttError::ConnectionFailed2(err.kind()))
44 },
45 core::task::Poll::Pending => {
46 trace!("try_network_read did not reaad anything");
47 Ok(0)
48 },
49 }
50}
51
52#[cfg(test)]
53pub mod test;
54
55#[derive(Debug, Clone, Copy, PartialEq, Eq)]
56pub enum ConnectionStateValue {
57 ConnectSent,
58 Connected,
59 Disconnected,
60 Error
61}
62
63pub trait ConnectionState {
64
65 fn connect(&self) -> impl Future<Output = Result<(), MqttError>>;
67
68 fn disconnect(&self) -> impl Future<Output = Result<(), MqttError>>;
69
70 fn get_state(&self) -> Option<ConnectionStateValue>;
71
72 fn on_state_change(&self) -> impl Future<Output = ConnectionStateValue>;
73
74 fn await_connected(&self) -> impl Future<Output = ()>;
75
76 fn set_error(&self);
77
78 fn try_write_packet(&self, packet: &Packet<'_>) -> Result<bool, MqttError>;
81
82 fn write_packet(&self, packet: &Packet<'_>) -> impl Future<Output = Result<(), MqttError>>;
84
85
86 fn run_io(&self) -> impl Future<Output = Result<impl Deref<Target = Packet<'_>>, MqttError>>;
88
89 fn run_io_nonblocking(&self) -> impl Future<Output = Result<Option<impl Deref<Target = Packet<'_>>>, MqttError>>;
91}
92
93pub struct TcpConnectionState<'a, 'l, M: RawMutex, NETWORK, DNS, const BUFFER_SIZE: usize>
94where NETWORK: TcpConnect, DNS: Dns {
95 inner: Watch<M, ConnectionStateValue, 8>,
96 network: &'a NETWORK,
97 dns: DNS,
98 connection: RefCell<Option<<NETWORK as TcpConnect>::Connection<'a>>>,
99
100 send_buffer: StackBufferCell<BUFFER_SIZE>,
101 recv_buffer: StackBufferCell<BUFFER_SIZE>,
102
103 last_will: Option<LastWill<'l>>,
104 config: ClientConfig<'l>
105}
106
107impl<'a, 'l, M: RawMutex, NETWORK, DNS, const BUFFER_SIZE: usize> TcpConnectionState<'a, 'l, M, NETWORK, DNS, BUFFER_SIZE>
108where NETWORK: TcpConnect, DNS: Dns {
109
110 pub fn new(network: &'a NETWORK, dns: DNS, last_will: Option<LastWill<'l>>, config: ClientConfig<'l>) -> Self {
111 Self {
112 inner: Watch::new(),
113 network,
114 dns,
115 connection: RefCell::new(None),
116
117 send_buffer: StackBufferCell::new(),
118 recv_buffer: StackBufferCell::new(),
119
120 last_will,
121 config
122 }
123 }
124
125
126 async fn read_packet(&self) -> Result<MappedBufferRef<'_, Packet<'_>, BUFFER_SIZE>, MqttError> {
128 let mut connection = self.connection.borrow_mut();
129 let connection = connection.as_mut().unwrap();
130
131 loop {
132 if let Some(packet) = self.try_read_packet()? {
133 return Ok(packet);
134 }
135
136 trace!("could not read packet from network, wait for new data");
137 let mut buffer = self.recv_buffer.borrow();
138 let bytes_received = network_read!(buffer.writeable_data(), connection);
139 buffer.commit_bytes_written(bytes_received).unwrap();
140 }
141 }
142
143 fn try_read_packet(&self) -> Result<Option<MappedBufferRef<'_, Packet<'_>, BUFFER_SIZE>>, MqttError> {
144 let buffer = self.recv_buffer.borrow();
145 let is_max_len = buffer.is_max_len();
146
147 let result = buffer.try_map_maybe(|buf| decode_slice_with_len(buf))
148 .map_err(|err| MqttError::CodecError(err));
149
150 match result {
151 Ok(None) if is_max_len => {
152 error!("recv buffer too small to receive packet");
153 Err(MqttError::BufferTooSmall)
154 },
155 r => r
156 }
157 }
158
159
160 async fn send_all_intern(&self, connection: &mut NETWORK::Connection<'_>) -> Result<(), MqttError> {
173 let mut send_buffer = self.send_buffer.borrow();
174 while send_buffer.has_remaining_len() {
175 let bytes_sent = network_write!(send_buffer.reaable_data(), connection);
176 send_buffer.add_bytes_read(bytes_sent).unwrap();
177 }
178
179 network_flush!(connection);
180
181 Ok(())
182 }
183
184 async fn send_all(&self) -> Result<(), MqttError> {
186 let mut connection = self.connection.borrow_mut();
187 let connection = connection.as_mut().unwrap();
188
189 self.send_all_intern(connection).await
190 }
191
192 async fn send(&self) -> Result<(), MqttError> {
205 let mut send_buffer = self.send_buffer.borrow();
206 if ! send_buffer.has_remaining_len() {
208 trace!("network send: nothing to send");
209 return Ok(());
210 }
211
212 let mut connection = self.connection.borrow_mut();
213 let connection = connection.as_mut().unwrap();
214
215 let bytes_sent = network_write!(send_buffer.reaable_data(), connection);
216 debug!("sent {} bytes to network", bytes_sent);
217 send_buffer.add_bytes_read(bytes_sent).unwrap();
218 connection.flush().await
219 .map_err(|err| MqttError::ConnectionFailed2(err.kind()))?;
220
221 Ok(())
222 }
223
224 async fn write_packet_async(&self, packet: &Packet<'_>) -> Result<(), MqttError> {
226 loop {
227 if self.try_write_packet(packet)? {
228 return Ok(())
229 }
230
231 self.send().await?;
232 }
233 }
234
235 fn try_write_packet(&self, packet: &Packet<'_>) -> Result<bool, MqttError> {
238 let mut send_buffer = self.send_buffer.borrow();
239 send_buffer.flip();
240 let buf = send_buffer.writeable_data();
241
242 match encode_slice(packet, buf) {
243 Ok(bytes_written) => {
244 send_buffer.commit_bytes_written(bytes_written).unwrap();
245 Ok(true)
246 },
247 Err(mqttrs2::Error::WriteZero) => {
248 if send_buffer.is_max_capacity() {
249 error!("send buffer too small to write packet {}", packet);
250 Err(MqttError::BufferTooSmall)
251 } else {
252 trace!("could not write packet: no space in buffer");
253 Ok(false)
254 }
255 },
256 Err(err) => Err(err.into()),
257 }
258 }
259
260 async fn subscribe_auto_subscribes(&self) -> Result<(), MqttError> {
261 for chunk in self.config.auto_subscribes.chunks(5) {
262 debug!("autosubscribe chunk of {} topics", chunk.len());
263 let pid = next_pid();
264
265 let topics = chunk.iter()
266 .map(|el| el.try_into())
267 .collect::<Result<_, _>>()?;
268
269 let request = Subscribe{
270 pid,
271 topics
272 };
273 let request = Packet::Subscribe(request);
274
275 self.write_packet_async(&request).await?;
276
277 self.send_all().await?;
279
280
281 let next_packet = self.read_packet().await?;
290 let suback = match next_packet.deref() {
291 Packet::Suback(suback) => suback,
292 unexpected_packet => {
293 error!("got unexpected packet {} while waiting for suback", unexpected_packet.get_type());
294 return Err(MqttError::ConnackError)
295 }
296 };
297
298 Self::process_suback(suback, chunk)?;
299 }
300
301 info!("auto subscribes done");
302
303 Ok(())
304 }
305
306 fn process_suback(suback: &Suback, auto_subscribes: &[AutoSubscribe]) -> Result<(), MqttError> {
307
308 for (suback, requestes_topic) in suback.return_codes.iter().zip(auto_subscribes.iter()) {
309 match suback {
310 mqttrs2::SubscribeReturnCodes::Success(qos) if *qos == requestes_topic.qos => {
311 info!("successfully auto subscribes to {} with {}", &requestes_topic.topic, qos);
312 },
313 mqttrs2::SubscribeReturnCodes::Success(qos) => {
314 warn!("autosubscribes to {} with different qos: requested {} but got {}", &requestes_topic.topic, requestes_topic.qos, qos);
315 },
316 mqttrs2::SubscribeReturnCodes::Failure => {
317 error!("could not auto subscribe t {}", &requestes_topic.topic);
318 return Err(MqttError::SubscribeOrUnsubscribeFailed);
319 },
320 }
321 }
322
323 Ok(())
324 }
325
326 async fn process_connack(&self, connack: &Connack) -> Result<(), MqttError> {
327
328 match connack.code {
329 mqttrs2::ConnectReturnCode::Accepted => {
330 info!("connction to broker established");
331
332 self.subscribe_auto_subscribes().await?;
334
335 self.inner.sender().send(ConnectionStateValue::Connected);
336
337 Ok(())
338 },
339 mqttrs2::ConnectReturnCode::RefusedProtocolVersion | mqttrs2::ConnectReturnCode::RefusedIdentifierRejected | mqttrs2::ConnectReturnCode::ServerUnavailable => {
340 error!("connack returned error: {}", connack.code);
341 self.inner.sender().send(ConnectionStateValue::Error);
342 Err(MqttError::ConnackError)
343 },
344 mqttrs2::ConnectReturnCode::BadUsernamePassword | mqttrs2::ConnectReturnCode::NotAuthorized => {
345 error!("connack: authentication failed: {}", connack.code);
346 self.inner.sender().send(ConnectionStateValue::Error);
347 Err(MqttError::AuthenticationError)
348 }
349 }
350 }
351
352 async fn send_receive(&self) -> Result<(), MqttError> {
353 let mut send_buffer = self.send_buffer.borrow();
354 let mut recv_buffer = self.recv_buffer.borrow();
355 recv_buffer.flip();
356
357 let mut connection = self.connection.borrow_mut();
358 let connection = connection.as_mut().unwrap();
359
360 while send_buffer.has_remaining_len() {
361 let bytes_sent = network_write!(send_buffer.reaable_data(), connection);
362 send_buffer.add_bytes_read(bytes_sent).unwrap();
363 network_flush!(connection);
364 debug!("write {} bytes to network, {} remaining", bytes_sent, send_buffer.reaable_data().len());
365
366 let bytes_received = network_try_read(connection, recv_buffer.writeable_data())?;
367 recv_buffer.commit_bytes_written(bytes_received).unwrap();
368 debug!("try_read {} bytes from network", bytes_received);
369
370 if bytes_received > 0 {
371 return Ok(())
372 }
373 }
374
375 let bytes_received = network_read!(recv_buffer.writeable_data(), connection);
376 recv_buffer.commit_bytes_written(bytes_received).unwrap();
377 debug!("read {} bytes from network", bytes_received);
378
379 Ok(())
380 }
381
382}
383
384impl <'a, 'l, M: RawMutex, NETWORK, DNS, const BUFFER_SIZE: usize> ConnectionState for TcpConnectionState<'a, 'l, M, NETWORK, DNS, BUFFER_SIZE>
385where NETWORK: TcpConnect, DNS: Dns {
386
387 fn get_state(&self) -> Option<ConnectionStateValue> {
388 self.inner.try_get()
389 }
390
391 async fn on_state_change(&self) -> ConnectionStateValue {
392 self.inner.dyn_receiver().unwrap().changed().await
393 }
394
395 async fn disconnect(&self) -> Result<(), MqttError> {
396 assert!(self.inner.try_get() == Some(ConnectionStateValue::Connected));
397 info!("disconnect started");
398
399 self.send_all().await?;
401
402 let sent = self.try_write_packet(&Packet::Disconnect)?;
403 if ! sent {
404 panic!("could not write disconnect to send buffer");
405 }
406
407 self.send_all().await?;
408
409 let connection = self.connection.borrow_mut().take().unwrap();
411 drop(connection);
412
413 self.inner.sender().send(ConnectionStateValue::Disconnected);
414
415 Ok(())
416 }
417
418 async fn connect(&self) -> Result<(), MqttError> {
419 assert!(self.inner.try_get() == None || self.inner.try_get() == Some(ConnectionStateValue::Error));
420
421 self.recv_buffer.borrow().reset();
422 self.send_buffer.borrow().reset();
423
424 {
425 let port = self.config.port.unwrap_or(MQTT_DEFAULT_PORT);
426 debug!("connect using port {}", port);
427 let ip = self.config.host.resolve(&self.dns).await?;
428 let addr = SocketAddr::new(ip, port);
429
430 trace!("start connecting to socket addr {}", &addr);
431 let connection = self.network.connect(addr).await
432 .map_err(|err| MqttError::ConnectionFailed2(err.kind()))?;
433 trace!("successfully established tcp connection to broker");
434
435
436 let mut connection_lock = self.connection.borrow_mut();
437 *connection_lock = Some(connection);
438 }
439
440 info!("tcp connection to broker established");
441
442 let mut connect = Connect{
443 protocol: Protocol::MQTT311,
444 keep_alive: super::KEEP_ALIVE as u16,
445 client_id: &self.config.client_id,
446 clean_session: false,
447 last_will: self.last_will.clone(),
448 username: None,
449 password: None
450 };
451
452 if let Some(cred) = &self.config.credentials {
453 connect.username = Some(&cred.username);
454 connect.password = Some(cred.password.as_bytes());
455 }
456
457 let connect = Packet::Connect(connect);
458
459 let sent = self.try_write_packet(&connect)?;
460 if ! sent {
461 error!("connect packet does not fit in send buffer");
462 return Err(MqttError::BufferTooSmall);
463 }
464
465 self.send().await?;
467
468 self.inner.sender().send(ConnectionStateValue::ConnectSent);
469
470 let next_packet = self.read_packet().await?;
471 let connack = match next_packet.deref() {
472 Packet::Connack(connack) => connack,
473 unexpected_packet => {
474 error!("got unexpected packet {} while waiting for connack", unexpected_packet.get_type());
475 return Err(MqttError::ConnackError)
476 }
477 };
478
479 let connack = connack.clone();
480 drop(next_packet); self.process_connack(&connack).await?;
484
485 Ok(())
486 }
487
488 async fn await_connected(&self) {
489 self.inner.receiver().unwrap().get_and(|value| *value == ConnectionStateValue::Connected).await;
490 }
491
492 fn set_error(&self) {
493 self.inner.sender().send(ConnectionStateValue::Error);
494 }
495
496 fn try_write_packet(&self, packet: &Packet<'_>) -> Result<bool, MqttError> {
497 let mut send_buffer = self.send_buffer.borrow();
498
499 match encode_slice(packet, send_buffer.writeable_data()) {
500 Err(mqttrs2::Error::WriteZero) => Ok(false),
501 Ok(n) => {
502 send_buffer.commit_bytes_written(n).unwrap();
503 Ok(true)
504 }
505 Err(err) => Err(MqttError::CodecError(err))
506 }
507 }
508
509 async fn write_packet(&self, packet: &Packet<'_>) -> Result<(), MqttError> {
510 self.write_packet_async(packet).await
511 }
512
513 async fn run_io(&self) -> Result<impl Deref<Target = Packet<'_>>, MqttError> {
514 assert_eq!(self.inner.try_get(), Some(ConnectionStateValue::Connected));
515
516 loop {
517 self.send_receive().await?;
518
519 if let Some(packet) = self.try_read_packet()? {
520 return Ok(packet)
521 }
522 }
523 }
524
525 async fn run_io_nonblocking(&self) -> Result<Option<impl Deref<Target = Packet<'_>>>, MqttError> {
526 assert_eq!(self.inner.try_get(), Some(ConnectionStateValue::Connected));
527
528 self.send().await?;
529
530 {
531 let mut connection = self.connection.borrow_mut();
532 let connection = connection.as_mut().unwrap();
533 let mut recv_buffer = self.recv_buffer.borrow();
534 let bytes_received = network_try_read(connection, recv_buffer.writeable_data())?;
535 recv_buffer.commit_bytes_written(bytes_received).unwrap();
536 }
537
538
539 self.try_read_packet()
540 }
541}
542
543#[cfg(test)]
544mod tests {
545 use core::{net::{IpAddr, Ipv4Addr}, pin::{Pin, pin}};
546
547 use embassy_sync::blocking_mutex::raw::{CriticalSectionRawMutex, RawMutex};
548 use heapless::Vec;
549 use mqttrs2::{Connack, ConnectReturnCode, Packet, PacketType};
550
551 use crate::{ClientConfig, Host, state::connection::{ConnectionState, ConnectionStateValue, TcpConnectionState, test::{TestDns, TestTcpConnect}}};
552 use crate::testutils::*;
553
554 #[test]
555 fn test_connect() {
556 let tcp = TestTcpConnect::new();
557 let dns = TestDns::new_single("my-mqtt-test-broker", IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)));
558
559 let client_config = ClientConfig {
560 host: Host::Hostname("my-mqtt-test-broker"),
561 port: None,
562 client_id: "clien-12345",
563 credentials: None,
564 auto_subscribes: Vec::new(),
565 };
566
567 let connection_state: TcpConnectionState<'_, '_, CriticalSectionRawMutex, _, _, 1024> = TcpConnectionState::new(&tcp, dns, None, client_config.clone());
568 let mut connection_state_receiver = connection_state.inner.dyn_receiver().unwrap();
569
570 let mut connect_future = connection_state.connect();
571 let mut connect_future = unsafe {
572 Pin::new_unchecked(&mut connect_future)
573 };
574
575 assert_pending(connect_future.as_mut());
577 assert_eq!(connection_state_receiver.try_changed(), Some(ConnectionStateValue::ConnectSent));
578
579 tcp.assert_packet_written(|packet| match packet {
581 Packet::Connect(connect) => {
582 assert_eq!(connect.client_id, client_config.client_id);
583 },
585 p => panic!("unexpected packet: {:?}", p)
586 });
587
588 let connack = Connack{
590 session_present: false,
591 code: ConnectReturnCode::Accepted
592 };
593
594 tcp.add_packet_to_receive(&Packet::Connack(connack));
595
596 assert_ready(connect_future.as_mut()).unwrap();
599 assert_eq!(connection_state_receiver.try_changed(), Some(ConnectionStateValue::Connected));
600 }
601
602 fn connect<M: RawMutex, const BUFFER: usize>(state: &TcpConnectionState<'_, '_, M, TestTcpConnect, TestDns, BUFFER>, tcp: &TestTcpConnect) {
603 let connect_future = state.connect();
604 let mut connect_future = pin!(connect_future);
605
606 assert_pending(connect_future.as_mut());
608
609 tcp.assert_packet_written(|p| {
610 assert_eq!(p.get_type(), PacketType::Connect);
611 });
612
613 let connack = Connack{
615 session_present: false,
616 code: ConnectReturnCode::Accepted
617 };
618
619 tcp.add_packet_to_receive(&Packet::Connack(connack));
620
621 assert_ready(connect_future.as_mut()).unwrap();
622 }
623
624 #[test]
625 fn test_run_io_read() {
626 let tcp = TestTcpConnect::new();
627 let dns = TestDns::new_single("my-mqtt-test-broker", IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)));
628
629 let client_config = ClientConfig {
630 host: Host::Hostname("my-mqtt-test-broker"),
631 port: None,
632 client_id: "clien-12345",
633 credentials: None,
634 auto_subscribes: Vec::new(),
635 };
636
637 let connection_state: TcpConnectionState<'_, '_, CriticalSectionRawMutex, _, _, 1024> = TcpConnectionState::new(&tcp, dns, None, client_config.clone());
638
639 connect(&connection_state, &tcp);
640
641 let run_io_future = connection_state.run_io();
642 let mut run_io_future = pin!(run_io_future);
643
644 assert_pending(run_io_future.as_mut());
646 assert_pending(run_io_future.as_mut());
647 assert_pending(run_io_future.as_mut());
648 assert_pending(run_io_future.as_mut());
649
650 tcp.add_packet_to_receive(&Packet::Pingresp);
651
652 assert_ready(run_io_future).unwrap();
654 }
655
656 #[test]
657 fn test_run_io_read_write() {
658 let tcp = TestTcpConnect::new();
659 let dns = TestDns::new_single("my-mqtt-test-broker", IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)));
660
661 let client_config = ClientConfig {
662 host: Host::Hostname("my-mqtt-test-broker"),
663 port: None,
664 client_id: "clien-12345",
665 credentials: None,
666 auto_subscribes: Vec::new(),
667 };
668
669 let connection_state: TcpConnectionState<'_, '_, CriticalSectionRawMutex, _, _, 1024> = TcpConnectionState::new(&tcp, dns, None, client_config.clone());
670 connect(&connection_state, &tcp);
671
672 assert!(connection_state.try_write_packet(&Packet::Pingreq).unwrap());
673
674 let run_io_future = connection_state.run_io();
675 let mut run_io_future = pin!(run_io_future);
676
677 assert_pending(run_io_future.as_mut());
679 tcp.assert_packet_written(|p| {
680 assert_eq!(*p, Packet::Pingreq);
681 });
682
683 tcp.add_packet_to_receive(&Packet::Pingresp);
684
685 assert_ready(run_io_future).unwrap();
687 }
688
689 #[test]
690 fn test_run_io_nonblocking_read_write() {
691 let tcp = TestTcpConnect::new();
692 let dns = TestDns::new_single("my-mqtt-test-broker", IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)));
693
694 let client_config = ClientConfig {
695 host: Host::Hostname("my-mqtt-test-broker"),
696 port: None,
697 client_id: "clien-12345",
698 credentials: None,
699 auto_subscribes: Vec::new(),
700 };
701
702 let connection_state: TcpConnectionState<'_, '_, CriticalSectionRawMutex, _, _, 1024> = TcpConnectionState::new(&tcp, dns, None, client_config.clone());
703 connect(&connection_state, &tcp);
704
705 let run_io_future = connection_state.run_io_nonblocking();
707
708 assert_ready_pin(run_io_future).unwrap();
710 tcp.assert_nothing_written();
711
712
713 assert!(connection_state.try_write_packet(&Packet::Pingreq).unwrap());
715 let run_io_future = connection_state.run_io_nonblocking();
716 let result = assert_ready_pin(run_io_future).unwrap();
717 assert!(result.is_none());
718 }
719
720 #[test]
721 fn test_disconnect() {
722 let tcp = TestTcpConnect::new();
723 let dns = TestDns::new_single("my-mqtt-test-broker", IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)));
724
725 let client_config = ClientConfig {
726 host: Host::Hostname("my-mqtt-test-broker"),
727 port: None,
728 client_id: "clien-12345",
729 credentials: None,
730 auto_subscribes: Vec::new(),
731 };
732
733 let connection_state: TcpConnectionState<'_, '_, CriticalSectionRawMutex, _, _, 1024> = TcpConnectionState::new(&tcp, dns, None, client_config.clone());
734 connect(&connection_state, &tcp);
735
736 let disconnect_future = connection_state.disconnect();
737 assert_ready_pin(disconnect_future).unwrap();
738
739 tcp.assert_packet_written(|p| {
740 assert_eq!(p.get_type(), PacketType::Disconnect);
741 });
742
743 assert_eq!(connection_state.inner.dyn_receiver().unwrap().try_get(), Some(ConnectionStateValue::Disconnected));
745
746 }
747
748
749}
750