ddp_rs/
connection.rs

1//! DDP connection handling for sending and receiving pixel data.
2//!
3//! This module provides the main [`DDPConnection`] type for communicating with
4//! DDP-compatible LED displays.
5
6use crate::error::DDPError;
7use crate::error::DDPError::CrossBeamError;
8use crate::packet::Packet;
9use crate::protocol;
10use crossbeam::channel::{unbounded, Receiver, TryRecvError};
11use std::net::{SocketAddr, UdpSocket};
12
13/// Maximum pixel data size per DDP packet (480 pixels × 3 bytes RGB = 1440 bytes)
14const MAX_DATA_LENGTH: usize = 480 * 3;
15
16/// A connection to a DDP display device.
17///
18/// This is the main type for sending pixel data to LED strips and other DDP-compatible
19/// displays. It handles packet assembly, sequencing, and automatic chunking of large
20/// data arrays.
21///
22/// # Examples
23///
24/// ## Basic usage
25///
26/// ```no_run
27/// use ddp_rs::connection::DDPConnection;
28/// use ddp_rs::protocol::{PixelConfig, ID};
29/// use std::net::UdpSocket;
30///
31/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
32/// let mut conn = DDPConnection::try_new(
33///     "192.168.1.40:4048",
34///     PixelConfig::default(),
35///     ID::Default,
36///     UdpSocket::bind("0.0.0.0:4048")?
37/// )?;
38///
39/// // Send RGB data for 3 pixels
40/// conn.write(&[
41///     255, 0, 0,    // Red
42///     0, 255, 0,    // Green
43///     0, 0, 255,    // Blue
44/// ])?;
45/// # Ok(())
46/// # }
47/// ```
48///
49/// ## Using offsets to update part of the strip
50///
51/// ```no_run
52/// # use ddp_rs::connection::DDPConnection;
53/// # use ddp_rs::protocol::{PixelConfig, ID};
54/// # use std::net::UdpSocket;
55/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
56/// # let mut conn = DDPConnection::try_new(
57/// #     "192.168.1.40:4048",
58/// #     PixelConfig::default(),
59/// #     ID::Default,
60/// #     UdpSocket::bind("0.0.0.0:4048")?
61/// # )?;
62/// // Update pixels starting at byte offset 300 (pixel 100 in RGB)
63/// conn.write_offset(&[255, 128, 64], 300)?;
64/// # Ok(())
65/// # }
66/// ```
67#[derive(Debug)]
68pub struct DDPConnection {
69    /// Pixel format configuration (RGB, RGBW, etc.)
70    pub pixel_config: protocol::PixelConfig,
71
72    /// Protocol ID for this connection
73    pub id: protocol::ID,
74
75    sequence_number: u8,
76    socket: UdpSocket,
77    addr: SocketAddr,
78
79    /// Receiver for packets coming from the display (responses)
80    pub receiver_packet: Receiver<Packet>,
81
82    // Since the buffer is hot path, we can reuse it to avoid allocations per packet
83    buffer: [u8; 1500],
84}
85
86impl DDPConnection {
87    /// Writes pixel data to the display starting at offset 0.
88    ///
89    /// Large data arrays are automatically split into multiple packets. Each packet
90    /// can contain up to 1440 bytes (480 RGB pixels).
91    ///
92    /// # Arguments
93    ///
94    /// * `data` - Raw pixel data bytes. For RGB, this should be groups of 3 bytes (R,G,B).
95    ///            For RGBW, groups of 4 bytes (R,G,B,W).
96    ///
97    /// # Returns
98    ///
99    /// The total number of bytes sent across all packets.
100    ///
101    /// # Examples
102    ///
103    /// ```no_run
104    /// # use ddp_rs::connection::DDPConnection;
105    /// # use ddp_rs::protocol::{PixelConfig, ID};
106    /// # use std::net::UdpSocket;
107    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
108    /// # let mut conn = DDPConnection::try_new("192.168.1.40:4048", PixelConfig::default(), ID::Default, UdpSocket::bind("0.0.0.0:4048")?)?;
109    /// // Set first 3 pixels to red, green, blue
110    /// conn.write(&[255, 0, 0, 0, 255, 0, 0, 0, 255])?;
111    /// # Ok(())
112    /// # }
113    /// ```
114    pub fn write(&mut self, data: &[u8]) -> Result<usize, DDPError> {
115        let mut h = protocol::Header::default();
116
117        h.packet_type.push(false);
118        h.pixel_config = self.pixel_config;
119        h.id = self.id;
120
121        self.slice_send(&mut h, data)
122    }
123
124    /// Writes pixel data to the display starting at a specific byte offset.
125    ///
126    /// This is useful for updating only a portion of your LED strip without
127    /// resending all the data.
128    ///
129    /// # Arguments
130    ///
131    /// * `data` - Raw pixel data bytes to send
132    /// * `offset` - Starting byte offset (not pixel offset). For RGB, offset 3 = pixel 1.
133    ///
134    /// # Examples
135    ///
136    /// ```no_run
137    /// # use ddp_rs::connection::DDPConnection;
138    /// # use ddp_rs::protocol::{PixelConfig, ID};
139    /// # use std::net::UdpSocket;
140    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
141    /// # let mut conn = DDPConnection::try_new("192.168.1.40:4048", PixelConfig::default(), ID::Default, UdpSocket::bind("0.0.0.0:4048")?)?;
142    /// // Update pixel 10 (offset = 10 * 3 = 30) to white
143    /// conn.write_offset(&[255, 255, 255], 30)?;
144    /// # Ok(())
145    /// # }
146    /// ```
147    pub fn write_offset(&mut self, data: &[u8], offset: u32) -> Result<usize, DDPError> {
148        let mut h = protocol::Header::default();
149
150        h.packet_type.push(false);
151        h.pixel_config = self.pixel_config;
152        h.id = self.id;
153        h.offset = offset;
154
155        self.slice_send(&mut h, data)
156    }
157
158    /// Sends a JSON control message to the display.
159    ///
160    /// This is useful for things like setting brightness, changing display modes,
161    /// or querying configuration.
162    ///
163    /// # Arguments
164    ///
165    /// * `msg` - A [`protocol::message::Message`] (typed or untyped JSON)
166    ///
167    /// # Examples
168    ///
169    /// ```no_run
170    /// # use ddp_rs::connection::DDPConnection;
171    /// # use ddp_rs::protocol::{PixelConfig, ID, message::Message};
172    /// # use std::net::UdpSocket;
173    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
174    /// # let mut conn = DDPConnection::try_new("192.168.1.40:4048", PixelConfig::default(), ID::Default, UdpSocket::bind("0.0.0.0:4048")?)?;
175    /// // Send a control message
176    /// let json_value = serde_json::json!({"brightness": 128});
177    /// conn.write_message(Message::Parsed((ID::Control, json_value)))?;
178    /// # Ok(())
179    /// # }
180    /// ```
181    pub fn write_message(&mut self, msg: protocol::message::Message) -> Result<usize, DDPError> {
182        let mut h = protocol::Header::default();
183        h.packet_type.push(false);
184        h.id = msg.get_id();
185        let msg_data: Vec<u8> = msg.try_into()?;
186        h.length = msg_data.len() as u16;
187
188        self.slice_send(&mut h, &msg_data)
189    }
190
191    fn slice_send(
192        &mut self,
193        header: &mut protocol::Header,
194        data: &[u8],
195    ) -> Result<usize, DDPError> {
196        let mut offset = header.offset as usize;
197        let mut sent = 0;
198
199        let num_iterations = (data.len() + MAX_DATA_LENGTH - 1) / MAX_DATA_LENGTH;
200        let mut iter = 0;
201
202        while offset < data.len() {
203            iter += 1;
204
205            if iter == num_iterations {
206                header.packet_type.push(true);
207            }
208
209            header.sequence_number = self.sequence_number;
210
211            let chunk_end = std::cmp::min(offset + MAX_DATA_LENGTH, data.len());
212            let chunk = &data[offset..chunk_end];
213            header.length = chunk.len() as u16;
214            let len = self.assemble_packet(*header, chunk);
215
216            // Send to socket
217            sent += self.socket.send_to(&self.buffer[0..len], self.addr)?;
218
219            // Increment sequence number
220            if self.sequence_number > 15 {
221                self.sequence_number = 1;
222            } else {
223                self.sequence_number += 1;
224            }
225            offset += MAX_DATA_LENGTH;
226            header.offset = offset as u32;
227        }
228
229        Ok(sent)
230    }
231
232    /// Attempts to retrieve a packet from the display (non-blocking).
233    ///
234    /// Checks if any response packets have been received from the display.
235    ///
236    /// # Returns
237    ///
238    /// * `Ok(Packet)` - A packet was available
239    /// * `Err(DDPError::NothingToReceive)` - No packets waiting
240    /// * `Err(DDPError::CrossBeamError)` - Channel error
241    pub fn get_incoming(&self) -> Result<Packet, DDPError> {
242        match self.receiver_packet.try_recv() {
243            Ok(packet) => Ok(packet),
244            Err(TryRecvError::Empty) => Err(DDPError::NothingToReceive),
245            Err(e2) => Err(CrossBeamError(e2)),
246        }
247    }
248
249    /// Creates a new DDP connection to a display.
250    ///
251    /// # Arguments
252    ///
253    /// * `addr` - The display address (IP:port). DDP standard port is 4048.
254    /// * `pixel_config` - Pixel format configuration (RGB, RGBW, etc.)
255    /// * `id` - Protocol ID to use for this connection
256    /// * `socket` - A bound UDP socket for sending/receiving data
257    ///
258    /// # Returns
259    ///
260    /// * `Ok(DDPConnection)` - Connection created successfully
261    /// * `Err(DDPError)` - Failed to resolve address or create connection
262    ///
263    /// # Examples
264    ///
265    /// ```no_run
266    /// use ddp_rs::connection::DDPConnection;
267    /// use ddp_rs::protocol::{PixelConfig, ID};
268    /// use std::net::UdpSocket;
269    ///
270    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
271    /// let conn = DDPConnection::try_new(
272    ///     "192.168.1.40:4048",
273    ///     PixelConfig::default(),
274    ///     ID::Default,
275    ///     UdpSocket::bind("0.0.0.0:4048")?
276    /// )?;
277    /// # Ok(())
278    /// # }
279    /// ```
280    pub fn try_new<A>(
281        addr: A,
282        pixel_config: protocol::PixelConfig,
283        id: protocol::ID,
284        socket: UdpSocket,
285    ) -> Result<DDPConnection, DDPError>
286    where
287        A: std::net::ToSocketAddrs,
288    {
289        let socket_addr: SocketAddr = addr
290            .to_socket_addrs()?
291            .next()
292            .ok_or(DDPError::NoValidSocketAddr)?;
293        let (_s, recv) = unbounded();
294
295        Ok(DDPConnection {
296            addr: socket_addr,
297            pixel_config,
298            id,
299            socket,
300            receiver_packet: recv,
301            sequence_number: 1,
302            buffer: [0u8; 1500],
303        })
304    }
305
306    // doing this to avoid allocations per frame
307    // micro optimization, but it's a hot path
308    // esp running this embedded
309    #[inline(always)]
310    fn assemble_packet(&mut self, header: protocol::Header, data: &[u8]) -> usize {
311        let header_bytes: usize = if header.packet_type.timecode {
312            let header_bytes: [u8; 14] = header.into();
313            self.buffer[0..14].copy_from_slice(&header_bytes);
314            14usize
315        } else {
316            let header_bytes: [u8; 10] = header.into();
317            self.buffer[0..10].copy_from_slice(&header_bytes);
318            10usize
319        };
320        self.buffer[header_bytes..(header_bytes + data.len())].copy_from_slice(data);
321
322        header_bytes + data.len()
323    }
324}
325
326#[cfg(test)]
327mod tests {
328    use super::*;
329    use crate::protocol::{PixelConfig, ID};
330    use crossbeam::channel::unbounded;
331    use std::thread;
332
333    #[test]
334    // Test sending to a loopback device
335    fn test_conn() {
336        let data_to_send = &vec![255, 0, 0, 255, 0, 0, 255, 0, 0];
337        let (s, r) = unbounded();
338
339        thread::spawn(move || {
340            let socket = UdpSocket::bind("127.0.0.1:4048").unwrap();
341
342            let mut buf = [0; 1500];
343            let (amt, _) = socket.recv_from(&mut buf).unwrap();
344            let buf = &mut buf[..amt];
345
346            s.send(buf.to_vec()).unwrap();
347        });
348
349        let mut conn = DDPConnection::try_new(
350            "127.0.0.1:4048",
351            PixelConfig::default(),
352            ID::Default,
353            UdpSocket::bind("0.0.0.0:4049").unwrap(),
354        )
355        .unwrap();
356
357        // Test simple send
358        conn.write(data_to_send).unwrap();
359        std::thread::sleep(std::time::Duration::from_millis(10));
360        let recv_data = r.recv().unwrap();
361        assert_eq!(
362            &vec![
363                0x41, 0x01, 0x0D, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x09, 0xFF, 0x00, 0x00, 0xFF,
364                0x00, 0x00, 0xFF, 0x00, 0x00
365            ],
366            &recv_data
367        );
368    }
369
370    // Helper function for creating test connections
371    fn create_test_connection() -> (DDPConnection, UdpSocket) {
372        let display_socket = UdpSocket::bind("127.0.0.1:0").expect("Failed to bind display socket");
373        let display_addr = display_socket.local_addr().unwrap();
374        let client_socket = UdpSocket::bind("127.0.0.1:0").expect("Failed to bind client socket");
375
376        let conn = DDPConnection::try_new(
377            display_addr,
378            PixelConfig::default(),
379            ID::default(),
380            client_socket,
381        )
382        .expect("Failed to create connection");
383
384        (conn, display_socket)
385    }
386
387    #[test]
388    fn test_connection_creation() {
389        let (conn, _display_socket) = create_test_connection();
390        assert_eq!(conn.pixel_config, PixelConfig::default());
391        assert_eq!(conn.id, ID::default());
392    }
393
394    #[test]
395    fn test_connection_write_pixel_data() {
396        use std::time::Duration;
397
398        let (mut conn, display_socket) = create_test_connection();
399        display_socket
400            .set_read_timeout(Some(Duration::from_millis(100)))
401            .unwrap();
402
403        let pixel_data = vec![255, 0, 0, 0, 255, 0, 0, 0, 255]; // 3 RGB pixels
404        let result = conn.write(&pixel_data);
405
406        assert!(result.is_ok());
407        assert!(result.unwrap() > 0);
408
409        let mut buf = [0u8; 1500];
410        let recv_result = display_socket.recv_from(&mut buf);
411        assert!(recv_result.is_ok());
412    }
413
414    #[test]
415    fn test_connection_write_with_offset() {
416        use std::time::Duration;
417
418        let (mut conn, display_socket) = create_test_connection();
419        display_socket
420            .set_read_timeout(Some(Duration::from_millis(500)))
421            .unwrap();
422
423        let pixel_data = vec![128, 128, 128]; // 1 RGB pixel
424        let offset = 30; // Start at pixel 10 (30 bytes / 3)
425        let result = conn.write_offset(&pixel_data, offset);
426
427        assert!(result.is_ok());
428
429        let mut buf = [0u8; 1500];
430        match display_socket.recv_from(&mut buf) {
431            Ok((size, _)) => {
432                assert!(size > 10);
433                let received_offset = u32::from_be_bytes([buf[4], buf[5], buf[6], buf[7]]);
434                assert_eq!(received_offset, offset);
435            }
436            Err(e) => {
437                eprintln!("Warning: recv_from timed out: {}", e);
438            }
439        }
440    }
441
442    #[test]
443    fn test_connection_sequence_numbers() {
444        use std::time::Duration;
445
446        let (mut conn, display_socket) = create_test_connection();
447        display_socket
448            .set_read_timeout(Some(Duration::from_millis(100)))
449            .unwrap();
450
451        let pixel_data = vec![255, 0, 0];
452
453        for i in 0..5 {
454            conn.write(&pixel_data).unwrap();
455
456            let mut buf = [0u8; 1500];
457            display_socket.recv_from(&mut buf).unwrap();
458
459            let seq_num = buf[1];
460            assert_eq!(seq_num, (i + 1) as u8);
461        }
462    }
463
464    #[test]
465    fn test_connection_large_data_chunking() {
466        use std::time::Duration;
467
468        let (mut conn, display_socket) = create_test_connection();
469        display_socket
470            .set_read_timeout(Some(Duration::from_millis(500)))
471            .unwrap();
472
473        // Send data larger than MAX_DATA_LENGTH (480 * 3 = 1440 bytes)
474        let large_data = vec![128u8; 2000];
475        let result = conn.write(&large_data);
476
477        assert!(result.is_ok());
478
479        // Should receive multiple packets
480        let mut received_packets = 0;
481        let mut buf = [0u8; 1500];
482
483        loop {
484            match display_socket.recv_from(&mut buf) {
485                Ok(_) => received_packets += 1,
486                Err(_) => break,
487            }
488
489            if received_packets >= 2 {
490                break;
491            }
492        }
493
494        assert!(received_packets >= 2, "Expected multiple packets for large data");
495    }
496
497    #[test]
498    fn test_connection_empty_data() {
499        use std::time::Duration;
500
501        let (mut conn, display_socket) = create_test_connection();
502        display_socket
503            .set_read_timeout(Some(Duration::from_millis(100)))
504            .unwrap();
505
506        let empty_data: Vec<u8> = vec![];
507        let result = conn.write(&empty_data);
508
509        assert!(result.is_ok());
510    }
511
512    #[test]
513    fn test_pixel_config_preserved() {
514        let display_socket = UdpSocket::bind("127.0.0.1:0").expect("Failed to bind display socket");
515        let display_addr = display_socket.local_addr().unwrap();
516        let client_socket = UdpSocket::bind("127.0.0.1:0").expect("Failed to bind client socket");
517
518        let custom_config = PixelConfig::default();
519
520        let conn = DDPConnection::try_new(
521            display_addr,
522            custom_config,
523            ID::default(),
524            client_socket,
525        )
526        .expect("Failed to create connection");
527
528        assert_eq!(conn.pixel_config, custom_config);
529    }
530
531    #[test]
532    fn test_id_preserved() {
533        let display_socket = UdpSocket::bind("127.0.0.1:0").expect("Failed to bind display socket");
534        let display_addr = display_socket.local_addr().unwrap();
535        let client_socket = UdpSocket::bind("127.0.0.1:0").expect("Failed to bind client socket");
536
537        let custom_id = ID::Config;
538
539        let conn = DDPConnection::try_new(
540            display_addr,
541            PixelConfig::default(),
542            custom_id,
543            client_socket,
544        )
545        .expect("Failed to create connection");
546
547        assert_eq!(conn.id, custom_id);
548    }
549}