embedded_mqttc/state/connection/
mod.rs

1use core::{cell::RefCell, future::Future, net::SocketAddr, ops::Deref};
2
3use embassy_futures::poll_once;
4use embassy_sync::{blocking_mutex::raw::RawMutex, watch::Watch};
5use embedded_nal_async::{Dns, TcpConnect};
6use embedded_io_async::{Read, Write, Error};
7use mqttrs2::{Connack, Connect, LastWill, Packet, Protocol, Suback, Subscribe, decode_slice_with_len, encode_slice};
8
9use crate::{AutoSubscribe, ClientConfig, MqttError, buffer::{MappedBufferRef, StackBufferCell}, state::pid::next_pid};
10
11const MQTT_DEFAULT_PORT: u16 = 1883;
12
13macro_rules! network_write {
14    ($data:expr, $conn:expr) => {
15        $conn.write($data).await
16            .map_err(|err| MqttError::ConnectionFailed2(err.kind()))?
17    };
18}
19
20macro_rules! network_read {
21    ($data:expr, $conn:expr) => {
22        $conn.read($data).await
23            .map_err(|err| MqttError::ConnectionFailed2(err.kind()))?
24    };
25}
26
27macro_rules! network_flush {
28    ($conn:expr) => {
29        $conn.flush().await
30            .map_err(|err| MqttError::ConnectionFailed2(err.kind()))?
31    };
32}
33
34fn network_try_read(connection: &mut impl Read, buf: &mut [u8]) -> Result<usize, MqttError> {
35    let read_fut = connection.read(buf);
36    match poll_once(read_fut) {
37        core::task::Poll::Ready(Ok(n)) => {
38            trace!("network_try_read: read {} bytes", n);
39            Ok(n)
40        },
41        core::task::Poll::Ready(Err(err)) => {
42            trace!("network_try_read err: {}", &err);
43            Err(MqttError::ConnectionFailed2(err.kind()))
44        },
45        core::task::Poll::Pending => {
46            trace!("try_network_read did not reaad anything");
47            Ok(0)
48        },
49    }
50}
51
52#[cfg(test)]
53pub mod test;
54
55#[derive(Debug, Clone, Copy, PartialEq, Eq)]
56pub enum ConnectionStateValue {
57    ConnectSent,
58    Connected,
59    Disconnected,
60    Error
61}
62
63pub trait ConnectionState {
64
65    /// Establishes the connection to the broker
66    fn connect(&self) -> impl Future<Output = Result<(), MqttError>>;
67
68    fn disconnect(&self) -> impl Future<Output = Result<(), MqttError>>;
69
70    fn get_state(&self) -> Option<ConnectionStateValue>;
71
72    fn on_state_change(&self) -> impl Future<Output = ConnectionStateValue>;
73
74    fn await_connected(&self) -> impl Future<Output = ()>;
75
76    fn set_error(&self);
77
78    /// Send a mqtt packet to the broker
79    /// This method may return without sending the packet returning Ok(false). This indicates that the send sould be tried later.
80    fn try_write_packet(&self, packet: &Packet<'_>) -> Result<bool, MqttError>;
81
82    /// Writes a packet to the send buffer and sends data until teh packet fits
83    fn write_packet(&self, packet: &Packet<'_>) -> impl Future<Output = Result<(), MqttError>>;
84
85
86    /// Run pending io tasks until there is a new packet
87    fn run_io(&self) -> impl Future<Output = Result<impl Deref<Target = Packet<'_>>, MqttError>>;
88
89    /// Run until there is a new packet or something has been sent
90    fn run_io_nonblocking(&self) -> impl Future<Output = Result<Option<impl Deref<Target = Packet<'_>>>, MqttError>>;
91}
92
93pub struct TcpConnectionState<'a, 'l, M: RawMutex, NETWORK, DNS, const BUFFER_SIZE: usize> 
94where NETWORK: TcpConnect, DNS: Dns {
95    inner: Watch<M, ConnectionStateValue, 8>,
96    network: &'a NETWORK,
97    dns: DNS,
98    connection: RefCell<Option<<NETWORK as TcpConnect>::Connection<'a>>>,
99
100    send_buffer: StackBufferCell<BUFFER_SIZE>, 
101    recv_buffer: StackBufferCell<BUFFER_SIZE>, 
102
103    last_will: Option<LastWill<'l>>,
104    config: ClientConfig<'l>
105}
106
107impl<'a, 'l, M: RawMutex, NETWORK, DNS, const BUFFER_SIZE: usize> TcpConnectionState<'a, 'l, M, NETWORK, DNS, BUFFER_SIZE> 
108where NETWORK: TcpConnect, DNS: Dns {
109
110    pub fn new(network: &'a NETWORK, dns: DNS, last_will: Option<LastWill<'l>>, config: ClientConfig<'l>) -> Self {
111        Self {
112            inner: Watch::new(),
113            network,
114            dns,
115            connection: RefCell::new(None),
116
117            send_buffer: StackBufferCell::new(),
118            recv_buffer: StackBufferCell::new(),
119
120            last_will,
121            config
122        }
123    }
124
125
126    /// Read data from network until a packet can be received
127    async fn read_packet(&self) -> Result<MappedBufferRef<'_, Packet<'_>, BUFFER_SIZE>, MqttError> {
128        let mut connection = self.connection.borrow_mut();
129        let connection = connection.as_mut().unwrap();
130        
131        loop {
132            if let Some(packet) = self.try_read_packet()? {
133                return Ok(packet);
134            }
135
136            trace!("could not read packet from network, wait for new data");
137            let mut buffer = self.recv_buffer.borrow();
138            let bytes_received = network_read!(buffer.writeable_data(), connection);
139            buffer.commit_bytes_written(bytes_received).unwrap();
140        }
141    }
142
143    fn try_read_packet(&self) -> Result<Option<MappedBufferRef<'_, Packet<'_>, BUFFER_SIZE>>, MqttError> {
144        let buffer = self.recv_buffer.borrow();
145        let is_max_len = buffer.is_max_len();
146
147        let result = buffer.try_map_maybe(|buf| decode_slice_with_len(buf))
148            .map_err(|err| MqttError::CodecError(err));
149
150        match result {
151            Ok(None) if is_max_len => {
152                error!("recv buffer too small to receive packet");
153                Err(MqttError::BufferTooSmall)
154            },
155            r => r
156        }
157    }
158
159
160    /// Write bytes to network without blocking
161    // fn try_send(&self) -> Result<(), MqttError> {
162    //     let mut send_buffer = self.send_buffer.borrow();
163    //     let mut connection = self.connection.borrow_mut();
164    //     let connection = connection.as_mut().unwrap();
165
166    //     let bytes_sent = NETWORK::try_write(send_buffer.reaable_data(), connection)?;
167    //     send_buffer.add_bytes_read(bytes_sent).unwrap();
168
169    //     Ok(())
170    // }
171
172    async fn send_all_intern(&self, connection: &mut NETWORK::Connection<'_>) -> Result<(), MqttError> {
173        let mut send_buffer = self.send_buffer.borrow();
174        while send_buffer.has_remaining_len() {
175            let bytes_sent = network_write!(send_buffer.reaable_data(), connection);
176            send_buffer.add_bytes_read(bytes_sent).unwrap();
177        }
178
179        network_flush!(connection);
180
181        Ok(())
182    }
183
184    /// Sends data until the send buffer is empty
185    async fn send_all(&self) -> Result<(), MqttError> {
186        let mut connection = self.connection.borrow_mut();
187        let connection = connection.as_mut().unwrap();
188
189        self.send_all_intern(connection).await
190    }
191
192    // async fn receive(&self) -> Result<(), MqttError> {
193    //     let mut recv_buffer = self.recv_buffer.borrow();
194    //     let mut connection = self.connection.borrow_mut();
195    //     let connection = connection.as_mut().unwrap();
196
197    //     let bytes_received = NETWORK::read(recv_buffer.writeable_data(), connection).await?;
198    //     recv_buffer.commit_bytes_written(bytes_received).unwrap();
199    //     Ok(())
200    // }
201
202    /// Write bytes to network and block until at least one byte is written
203    /// Returns arly if there is nothing to send
204    async fn send(&self) -> Result<(), MqttError> {
205        let mut send_buffer = self.send_buffer.borrow();
206        // early return if there is nothing to send
207        if ! send_buffer.has_remaining_len() {
208            trace!("network send: nothing to send");
209            return Ok(());
210        }
211
212        let mut connection = self.connection.borrow_mut();
213        let connection = connection.as_mut().unwrap();
214
215        let bytes_sent = network_write!(send_buffer.reaable_data(), connection);
216        debug!("sent {} bytes to network", bytes_sent);
217        send_buffer.add_bytes_read(bytes_sent).unwrap();
218        connection.flush().await
219            .map_err(|err| MqttError::ConnectionFailed2(err.kind()))?;
220
221        Ok(())
222    }
223
224    /// Writes a packet to the network and waits until there is enaugh space
225    async fn write_packet_async(&self, packet: &Packet<'_>) -> Result<(), MqttError> {
226        loop {
227            if self.try_write_packet(packet)? {
228                return Ok(())
229            }
230
231            self.send().await?;
232        }
233    }
234
235    /// Tries to write a packet to the send buffer
236    /// Returns an error if the packet does not fit in the send buffer
237    fn try_write_packet(&self, packet: &Packet<'_>) -> Result<bool, MqttError> {
238        let mut send_buffer = self.send_buffer.borrow();
239        send_buffer.flip();
240        let buf = send_buffer.writeable_data();
241
242        match encode_slice(packet, buf) {
243            Ok(bytes_written) => {
244                send_buffer.commit_bytes_written(bytes_written).unwrap();
245                Ok(true)
246            },
247            Err(mqttrs2::Error::WriteZero) => {
248                if send_buffer.is_max_capacity() {
249                    error!("send buffer too small to write packet {}", packet);
250                    Err(MqttError::BufferTooSmall)
251                } else {
252                    trace!("could not write packet: no space in buffer");
253                    Ok(false)
254                }
255            },
256            Err(err) => Err(err.into()),
257        }
258    }
259
260    async fn subscribe_auto_subscribes(&self) -> Result<(), MqttError> {
261        for chunk in self.config.auto_subscribes.chunks(5) {
262            debug!("autosubscribe chunk of {} topics", chunk.len());
263            let pid = next_pid();
264            
265            let topics = chunk.iter()
266                .map(|el| el.try_into())
267                .collect::<Result<_, _>>()?;
268            
269            let request = Subscribe{
270                pid,
271                topics
272            };
273            let request = Packet::Subscribe(request);
274
275            self.write_packet_async(&request).await?;
276            
277            // Send everything that is in the send buffer
278            self.send_all().await?;
279
280
281            // let suback = self.send_receive_until(connection, |packet| match packet {
282            //     Packet::Suback(suback) if suback.pid == pid => Ok(suback),
283            //     unexpected_packet => {
284            //         error!("got unexpected packet {} while waiting for suback", unexpected_packet.get_type());
285            //         return Err(MqttError::ConnackError)
286            //     }
287            // }).await??;
288
289            let next_packet = self.read_packet().await?;
290            let suback = match next_packet.deref() {
291                Packet::Suback(suback) => suback,
292                unexpected_packet => {
293                    error!("got unexpected packet {} while waiting for suback", unexpected_packet.get_type());
294                    return Err(MqttError::ConnackError)
295                }
296            };
297
298            Self::process_suback(suback, chunk)?;
299        }
300
301        info!("auto subscribes done");
302
303        Ok(())
304    }
305
306    fn process_suback(suback: &Suback, auto_subscribes: &[AutoSubscribe]) -> Result<(), MqttError> {
307
308        for (suback, requestes_topic) in suback.return_codes.iter().zip(auto_subscribes.iter()) {
309            match suback {
310                mqttrs2::SubscribeReturnCodes::Success(qos) if *qos == requestes_topic.qos => {
311                    info!("successfully auto subscribes to {} with {}", &requestes_topic.topic, qos);
312                },
313                mqttrs2::SubscribeReturnCodes::Success(qos) => {
314                    warn!("autosubscribes to {} with different qos: requested {} but got {}", &requestes_topic.topic, requestes_topic.qos, qos);
315                },
316                mqttrs2::SubscribeReturnCodes::Failure => {
317                    error!("could not auto subscribe t {}", &requestes_topic.topic);
318                    return Err(MqttError::SubscribeOrUnsubscribeFailed);
319                },
320            }
321        }
322
323        Ok(())
324    }
325
326    async fn process_connack(&self, connack: &Connack) -> Result<(), MqttError> {
327
328        match connack.code {
329            mqttrs2::ConnectReturnCode::Accepted => {
330                info!("connction to broker established");
331
332                // Add autosubscribe requests
333                self.subscribe_auto_subscribes().await?;
334
335                self.inner.sender().send(ConnectionStateValue::Connected);
336
337                Ok(())
338            },
339            mqttrs2::ConnectReturnCode::RefusedProtocolVersion | mqttrs2::ConnectReturnCode::RefusedIdentifierRejected | mqttrs2::ConnectReturnCode::ServerUnavailable => {
340                error!("connack returned error: {}", connack.code);
341                self.inner.sender().send(ConnectionStateValue::Error);
342                Err(MqttError::ConnackError)
343            },
344            mqttrs2::ConnectReturnCode::BadUsernamePassword | mqttrs2::ConnectReturnCode::NotAuthorized => {
345                error!("connack: authentication failed: {}", connack.code);
346                self.inner.sender().send(ConnectionStateValue::Error);
347                Err(MqttError::AuthenticationError)
348            }
349        }
350    }
351
352    async fn send_receive(&self) -> Result<(), MqttError> {
353        let mut send_buffer = self.send_buffer.borrow();
354        let mut recv_buffer = self.recv_buffer.borrow();
355        recv_buffer.flip();
356
357        let mut connection = self.connection.borrow_mut();
358        let connection = connection.as_mut().unwrap();
359
360        while send_buffer.has_remaining_len() {
361            let bytes_sent = network_write!(send_buffer.reaable_data(), connection);
362            send_buffer.add_bytes_read(bytes_sent).unwrap();
363            network_flush!(connection);
364            debug!("write {} bytes to network, {} remaining", bytes_sent, send_buffer.reaable_data().len());
365
366            let bytes_received = network_try_read(connection, recv_buffer.writeable_data())?;
367            recv_buffer.commit_bytes_written(bytes_received).unwrap();
368            debug!("try_read {} bytes from network", bytes_received);
369
370            if bytes_received > 0 {
371                return Ok(())
372            }
373        }
374
375        let bytes_received = network_read!(recv_buffer.writeable_data(), connection);
376        recv_buffer.commit_bytes_written(bytes_received).unwrap();
377        debug!("read {} bytes from network", bytes_received);
378
379        Ok(())
380    }
381
382}
383
384impl <'a, 'l, M: RawMutex, NETWORK, DNS, const BUFFER_SIZE: usize> ConnectionState for TcpConnectionState<'a, 'l, M, NETWORK, DNS, BUFFER_SIZE> 
385where NETWORK: TcpConnect, DNS: Dns {
386    
387    fn get_state(&self) -> Option<ConnectionStateValue> {
388        self.inner.try_get()
389    }
390
391    async fn on_state_change(&self) -> ConnectionStateValue {
392        self.inner.dyn_receiver().unwrap().changed().await
393    }
394
395    async fn disconnect(&self) -> Result<(), MqttError> {
396        assert!(self.inner.try_get() == Some(ConnectionStateValue::Connected));
397        info!("disconnect started");
398        
399        // Empty send buffer
400        self.send_all().await?;
401
402        let sent = self.try_write_packet(&Packet::Disconnect)?;
403        if ! sent {
404            panic!("could not write disconnect to send buffer");
405        }
406
407        self.send_all().await?;
408
409        // Drop connection
410        let connection = self.connection.borrow_mut().take().unwrap();
411        drop(connection);
412
413        self.inner.sender().send(ConnectionStateValue::Disconnected);
414
415        Ok(())
416    }
417    
418    async fn connect(&self) -> Result<(), MqttError> {
419        assert!(self.inner.try_get() == None || self.inner.try_get() == Some(ConnectionStateValue::Error));
420
421        self.recv_buffer.borrow().reset();
422        self.send_buffer.borrow().reset();
423
424        {       
425            let port = self.config.port.unwrap_or(MQTT_DEFAULT_PORT);
426            debug!("connect using port {}", port);
427            let ip = self.config.host.resolve(&self.dns).await?;
428            let addr = SocketAddr::new(ip, port);
429
430            trace!("start connecting to socket addr {}", &addr);
431            let connection = self.network.connect(addr).await
432                .map_err(|err| MqttError::ConnectionFailed2(err.kind()))?;
433            trace!("successfully established tcp connection to broker");
434
435
436            let mut connection_lock = self.connection.borrow_mut();
437            *connection_lock = Some(connection);
438        }
439
440        info!("tcp connection to broker established");
441
442        let mut connect = Connect{
443            protocol: Protocol::MQTT311,
444            keep_alive: super::KEEP_ALIVE as u16,
445            client_id: &self.config.client_id,
446            clean_session: false,
447            last_will: self.last_will.clone(),
448            username: None,
449            password: None
450        };
451
452        if let Some(cred) = &self.config.credentials {
453            connect.username = Some(&cred.username);
454            connect.password = Some(cred.password.as_bytes());
455        }
456
457        let connect = Packet::Connect(connect);
458
459        let sent = self.try_write_packet(&connect)?;
460        if ! sent {
461            error!("connect packet does not fit in send buffer");
462            return Err(MqttError::BufferTooSmall);
463        }
464
465        // Send everything that is in the send buffer
466        self.send().await?;
467
468        self.inner.sender().send(ConnectionStateValue::ConnectSent);
469
470        let next_packet = self.read_packet().await?;
471        let connack = match next_packet.deref() {
472            Packet::Connack(connack) => connack,
473            unexpected_packet => {
474                error!("got unexpected packet {} while waiting for connack", unexpected_packet.get_type());
475                return Err(MqttError::ConnackError)
476            }
477        };
478
479        let connack = connack.clone();
480        drop(next_packet); // Frees borrow of recv_buffer
481
482        
483        self.process_connack(&connack).await?;
484        
485        Ok(())
486    }
487
488    async fn await_connected(&self) {
489        self.inner.receiver().unwrap().get_and(|value| *value == ConnectionStateValue::Connected).await;
490    }
491
492    fn set_error(&self) {
493        self.inner.sender().send(ConnectionStateValue::Error);
494    }
495
496    fn try_write_packet(&self, packet: &Packet<'_>) -> Result<bool, MqttError> {
497        let mut send_buffer = self.send_buffer.borrow();
498
499        match encode_slice(packet, send_buffer.writeable_data()) {
500            Err(mqttrs2::Error::WriteZero) => Ok(false),
501            Ok(n) => {
502                send_buffer.commit_bytes_written(n).unwrap();
503                Ok(true)
504            }
505            Err(err) => Err(MqttError::CodecError(err))
506        }
507    }
508
509    async fn write_packet(&self, packet: &Packet<'_>) -> Result<(), MqttError> {
510        self.write_packet_async(packet).await
511    }
512
513    async fn run_io(&self) -> Result<impl Deref<Target = Packet<'_>>, MqttError> {
514        assert_eq!(self.inner.try_get(), Some(ConnectionStateValue::Connected));
515
516        loop {
517            self.send_receive().await?;
518
519            if let Some(packet) = self.try_read_packet()? {
520                return Ok(packet)
521            }
522        }
523    }
524
525    async fn run_io_nonblocking(&self) -> Result<Option<impl Deref<Target = Packet<'_>>>, MqttError> {
526        assert_eq!(self.inner.try_get(), Some(ConnectionStateValue::Connected));
527
528        self.send().await?;
529        
530        {
531            let mut connection = self.connection.borrow_mut();
532            let connection = connection.as_mut().unwrap();
533            let mut recv_buffer = self.recv_buffer.borrow();
534            let bytes_received = network_try_read(connection, recv_buffer.writeable_data())?;
535            recv_buffer.commit_bytes_written(bytes_received).unwrap();
536        }
537
538
539        self.try_read_packet()
540    }
541}
542
543#[cfg(test)]
544mod tests {
545    use core::{net::{IpAddr, Ipv4Addr}, pin::{Pin, pin}};
546
547    use embassy_sync::blocking_mutex::raw::{CriticalSectionRawMutex, RawMutex};
548    use heapless::Vec;
549    use mqttrs2::{Connack, ConnectReturnCode, Packet, PacketType};
550
551    use crate::{ClientConfig, Host, state::connection::{ConnectionState, ConnectionStateValue, TcpConnectionState, test::{TestDns, TestTcpConnect}}};
552    use crate::testutils::*;
553
554    #[test]
555    fn test_connect() {
556        let tcp = TestTcpConnect::new(); 
557        let dns = TestDns::new_single("my-mqtt-test-broker", IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)));
558
559        let client_config = ClientConfig {
560            host: Host::Hostname("my-mqtt-test-broker"),
561            port: None,
562            client_id: "clien-12345", 
563            credentials: None, 
564            auto_subscribes: Vec::new(),
565        };
566
567        let connection_state: TcpConnectionState<'_, '_, CriticalSectionRawMutex, _, _, 1024> = TcpConnectionState::new(&tcp, dns, None, client_config.clone());
568        let mut connection_state_receiver = connection_state.inner.dyn_receiver().unwrap();
569
570        let mut connect_future = connection_state.connect();
571        let mut connect_future = unsafe {
572            Pin::new_unchecked(&mut connect_future)
573        };
574
575        // Send connect packet to network, now waiting for connack
576        assert_pending(connect_future.as_mut());
577        assert_eq!(connection_state_receiver.try_changed(), Some(ConnectionStateValue::ConnectSent));
578
579        // test if the connect packet was written
580        tcp.assert_packet_written(|packet| match packet {
581            Packet::Connect(connect) => {
582                assert_eq!(connect.client_id, client_config.client_id);
583                // TODO add more asserts
584            },
585            p => panic!("unexpected packet: {:?}", p)
586        });
587
588        // "receive" a conack
589        let connack = Connack{
590            session_present: false,
591            code: ConnectReturnCode::Accepted
592        };
593
594        tcp.add_packet_to_receive(&Packet::Connack(connack));
595
596        // process connack
597        // Ready because there are no autosubscribes
598        assert_ready(connect_future.as_mut()).unwrap();
599        assert_eq!(connection_state_receiver.try_changed(), Some(ConnectionStateValue::Connected));
600    }
601
602    fn connect<M: RawMutex, const BUFFER: usize>(state: &TcpConnectionState<'_, '_, M, TestTcpConnect, TestDns, BUFFER>, tcp: &TestTcpConnect) {
603        let connect_future = state.connect();
604        let mut connect_future = pin!(connect_future);
605
606        // Send connect packet to network, now waiting for connack
607        assert_pending(connect_future.as_mut());
608
609        tcp.assert_packet_written(|p| {
610            assert_eq!(p.get_type(), PacketType::Connect);
611        });
612
613        // "receive" a conack
614        let connack = Connack{
615            session_present: false,
616            code: ConnectReturnCode::Accepted
617        };
618
619        tcp.add_packet_to_receive(&Packet::Connack(connack));
620
621        assert_ready(connect_future.as_mut()).unwrap();
622    }
623
624    #[test]
625    fn test_run_io_read() {
626        let tcp = TestTcpConnect::new(); 
627        let dns = TestDns::new_single("my-mqtt-test-broker", IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)));
628
629        let client_config = ClientConfig {
630            host: Host::Hostname("my-mqtt-test-broker"),
631            port: None,
632            client_id: "clien-12345", 
633            credentials: None, 
634            auto_subscribes: Vec::new(),
635        };
636
637        let connection_state: TcpConnectionState<'_, '_, CriticalSectionRawMutex, _, _, 1024> = TcpConnectionState::new(&tcp, dns, None, client_config.clone());
638
639        connect(&connection_state, &tcp);
640
641        let run_io_future = connection_state.run_io();
642        let mut run_io_future = pin!(run_io_future);
643
644        // Nothing to send and retrieve
645        assert_pending(run_io_future.as_mut());
646        assert_pending(run_io_future.as_mut());
647        assert_pending(run_io_future.as_mut());
648        assert_pending(run_io_future.as_mut());
649
650        tcp.add_packet_to_receive(&Packet::Pingresp);
651
652        // There is a packet to receive, return now
653        assert_ready(run_io_future).unwrap();
654    }
655
656    #[test]
657    fn test_run_io_read_write() {
658        let tcp = TestTcpConnect::new(); 
659        let dns = TestDns::new_single("my-mqtt-test-broker", IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)));
660
661        let client_config = ClientConfig {
662            host: Host::Hostname("my-mqtt-test-broker"),
663            port: None,
664            client_id: "clien-12345", 
665            credentials: None, 
666            auto_subscribes: Vec::new(),
667        };
668
669        let connection_state: TcpConnectionState<'_, '_, CriticalSectionRawMutex, _, _, 1024> = TcpConnectionState::new(&tcp, dns, None, client_config.clone());
670        connect(&connection_state, &tcp);
671
672        assert!(connection_state.try_write_packet(&Packet::Pingreq).unwrap());
673
674        let run_io_future = connection_state.run_io();
675        let mut run_io_future = pin!(run_io_future);
676
677        // Nothing to send and retrieve
678        assert_pending(run_io_future.as_mut());
679        tcp.assert_packet_written(|p| {
680            assert_eq!(*p, Packet::Pingreq);
681        });
682
683        tcp.add_packet_to_receive(&Packet::Pingresp);
684
685        // There is a packet to receive, return now
686        assert_ready(run_io_future).unwrap();
687    }
688
689    #[test]
690    fn test_run_io_nonblocking_read_write() {
691        let tcp = TestTcpConnect::new(); 
692        let dns = TestDns::new_single("my-mqtt-test-broker", IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)));
693
694        let client_config = ClientConfig {
695            host: Host::Hostname("my-mqtt-test-broker"),
696            port: None,
697            client_id: "clien-12345", 
698            credentials: None, 
699            auto_subscribes: Vec::new(),
700        };
701
702        let connection_state: TcpConnectionState<'_, '_, CriticalSectionRawMutex, _, _, 1024> = TcpConnectionState::new(&tcp, dns, None, client_config.clone());
703        connect(&connection_state, &tcp);
704
705        // First time: Nothing to send and something to retrieve
706        let run_io_future = connection_state.run_io_nonblocking();
707
708        // Nothing to send and retrieve
709        assert_ready_pin(run_io_future).unwrap();
710        tcp.assert_nothing_written();
711        
712
713        // Second time: something to send and nothing to retrieve
714        assert!(connection_state.try_write_packet(&Packet::Pingreq).unwrap());
715        let run_io_future = connection_state.run_io_nonblocking();
716        let result = assert_ready_pin(run_io_future).unwrap();
717        assert!(result.is_none());
718    }
719
720    #[test]
721    fn test_disconnect() {
722        let tcp = TestTcpConnect::new(); 
723        let dns = TestDns::new_single("my-mqtt-test-broker", IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)));
724
725        let client_config = ClientConfig {
726            host: Host::Hostname("my-mqtt-test-broker"),
727            port: None,
728            client_id: "clien-12345", 
729            credentials: None, 
730            auto_subscribes: Vec::new(),
731        };
732
733        let connection_state: TcpConnectionState<'_, '_, CriticalSectionRawMutex, _, _, 1024> = TcpConnectionState::new(&tcp, dns, None, client_config.clone());
734        connect(&connection_state, &tcp);
735
736        let disconnect_future = connection_state.disconnect();
737        assert_ready_pin(disconnect_future).unwrap();
738
739        tcp.assert_packet_written(|p| {
740            assert_eq!(p.get_type(), PacketType::Disconnect);
741        });
742
743        // assert_eq!(tcp.connections_closed.load(core::sync::atomic::Ordering::Acquire), 1);
744        assert_eq!(connection_state.inner.dyn_receiver().unwrap().try_get(), Some(ConnectionStateValue::Disconnected));
745
746    }
747
748
749}
750