1use crate::error::DDPError;
7use crate::error::DDPError::CrossBeamError;
8use crate::packet::Packet;
9use crate::protocol;
10use crossbeam::channel::{unbounded, Receiver, TryRecvError};
11use std::net::{SocketAddr, UdpSocket};
12
13const MAX_DATA_LENGTH: usize = 480 * 3;
15
16#[derive(Debug)]
68pub struct DDPConnection {
69 pub pixel_config: protocol::PixelConfig,
71
72 pub id: protocol::ID,
74
75 sequence_number: u8,
76 socket: UdpSocket,
77 addr: SocketAddr,
78
79 pub receiver_packet: Receiver<Packet>,
81
82 buffer: [u8; 1500],
84}
85
86impl DDPConnection {
87 pub fn write(&mut self, data: &[u8]) -> Result<usize, DDPError> {
115 let mut h = protocol::Header::default();
116
117 h.packet_type.push(false);
118 h.pixel_config = self.pixel_config;
119 h.id = self.id;
120
121 self.slice_send(&mut h, data)
122 }
123
124 pub fn write_offset(&mut self, data: &[u8], offset: u32) -> Result<usize, DDPError> {
148 let mut h = protocol::Header::default();
149
150 h.packet_type.push(false);
151 h.pixel_config = self.pixel_config;
152 h.id = self.id;
153 h.offset = offset;
154
155 self.slice_send(&mut h, data)
156 }
157
158 pub fn write_message(&mut self, msg: protocol::message::Message) -> Result<usize, DDPError> {
182 let mut h = protocol::Header::default();
183 h.packet_type.push(false);
184 h.id = msg.get_id();
185 let msg_data: Vec<u8> = msg.try_into()?;
186 h.length = msg_data.len() as u16;
187
188 self.slice_send(&mut h, &msg_data)
189 }
190
191 fn slice_send(
192 &mut self,
193 header: &mut protocol::Header,
194 data: &[u8],
195 ) -> Result<usize, DDPError> {
196 let mut offset = header.offset as usize;
197 let mut sent = 0;
198
199 let num_iterations = (data.len() + MAX_DATA_LENGTH - 1) / MAX_DATA_LENGTH;
200 let mut iter = 0;
201
202 while offset < data.len() {
203 iter += 1;
204
205 if iter == num_iterations {
206 header.packet_type.push(true);
207 }
208
209 header.sequence_number = self.sequence_number;
210
211 let chunk_end = std::cmp::min(offset + MAX_DATA_LENGTH, data.len());
212 let chunk = &data[offset..chunk_end];
213 header.length = chunk.len() as u16;
214 let len = self.assemble_packet(*header, chunk);
215
216 sent += self.socket.send_to(&self.buffer[0..len], self.addr)?;
218
219 if self.sequence_number > 15 {
221 self.sequence_number = 1;
222 } else {
223 self.sequence_number += 1;
224 }
225 offset += MAX_DATA_LENGTH;
226 header.offset = offset as u32;
227 }
228
229 Ok(sent)
230 }
231
232 pub fn get_incoming(&self) -> Result<Packet, DDPError> {
242 match self.receiver_packet.try_recv() {
243 Ok(packet) => Ok(packet),
244 Err(TryRecvError::Empty) => Err(DDPError::NothingToReceive),
245 Err(e2) => Err(CrossBeamError(e2)),
246 }
247 }
248
249 pub fn try_new<A>(
281 addr: A,
282 pixel_config: protocol::PixelConfig,
283 id: protocol::ID,
284 socket: UdpSocket,
285 ) -> Result<DDPConnection, DDPError>
286 where
287 A: std::net::ToSocketAddrs,
288 {
289 let socket_addr: SocketAddr = addr
290 .to_socket_addrs()?
291 .next()
292 .ok_or(DDPError::NoValidSocketAddr)?;
293 let (_s, recv) = unbounded();
294
295 Ok(DDPConnection {
296 addr: socket_addr,
297 pixel_config,
298 id,
299 socket,
300 receiver_packet: recv,
301 sequence_number: 1,
302 buffer: [0u8; 1500],
303 })
304 }
305
306 #[inline(always)]
310 fn assemble_packet(&mut self, header: protocol::Header, data: &[u8]) -> usize {
311 let header_bytes: usize = if header.packet_type.timecode {
312 let header_bytes: [u8; 14] = header.into();
313 self.buffer[0..14].copy_from_slice(&header_bytes);
314 14usize
315 } else {
316 let header_bytes: [u8; 10] = header.into();
317 self.buffer[0..10].copy_from_slice(&header_bytes);
318 10usize
319 };
320 self.buffer[header_bytes..(header_bytes + data.len())].copy_from_slice(data);
321
322 header_bytes + data.len()
323 }
324}
325
326#[cfg(test)]
327mod tests {
328 use super::*;
329 use crate::protocol::{PixelConfig, ID};
330 use crossbeam::channel::unbounded;
331 use std::thread;
332
333 #[test]
334 fn test_conn() {
336 let data_to_send = &vec![255, 0, 0, 255, 0, 0, 255, 0, 0];
337 let (s, r) = unbounded();
338
339 thread::spawn(move || {
340 let socket = UdpSocket::bind("127.0.0.1:4048").unwrap();
341
342 let mut buf = [0; 1500];
343 let (amt, _) = socket.recv_from(&mut buf).unwrap();
344 let buf = &mut buf[..amt];
345
346 s.send(buf.to_vec()).unwrap();
347 });
348
349 let mut conn = DDPConnection::try_new(
350 "127.0.0.1:4048",
351 PixelConfig::default(),
352 ID::Default,
353 UdpSocket::bind("0.0.0.0:4049").unwrap(),
354 )
355 .unwrap();
356
357 conn.write(data_to_send).unwrap();
359 std::thread::sleep(std::time::Duration::from_millis(10));
360 let recv_data = r.recv().unwrap();
361 assert_eq!(
362 &vec![
363 0x41, 0x01, 0x0D, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x09, 0xFF, 0x00, 0x00, 0xFF,
364 0x00, 0x00, 0xFF, 0x00, 0x00
365 ],
366 &recv_data
367 );
368 }
369
370 fn create_test_connection() -> (DDPConnection, UdpSocket) {
372 let display_socket = UdpSocket::bind("127.0.0.1:0").expect("Failed to bind display socket");
373 let display_addr = display_socket.local_addr().unwrap();
374 let client_socket = UdpSocket::bind("127.0.0.1:0").expect("Failed to bind client socket");
375
376 let conn = DDPConnection::try_new(
377 display_addr,
378 PixelConfig::default(),
379 ID::default(),
380 client_socket,
381 )
382 .expect("Failed to create connection");
383
384 (conn, display_socket)
385 }
386
387 #[test]
388 fn test_connection_creation() {
389 let (conn, _display_socket) = create_test_connection();
390 assert_eq!(conn.pixel_config, PixelConfig::default());
391 assert_eq!(conn.id, ID::default());
392 }
393
394 #[test]
395 fn test_connection_write_pixel_data() {
396 use std::time::Duration;
397
398 let (mut conn, display_socket) = create_test_connection();
399 display_socket
400 .set_read_timeout(Some(Duration::from_millis(100)))
401 .unwrap();
402
403 let pixel_data = vec![255, 0, 0, 0, 255, 0, 0, 0, 255]; let result = conn.write(&pixel_data);
405
406 assert!(result.is_ok());
407 assert!(result.unwrap() > 0);
408
409 let mut buf = [0u8; 1500];
410 let recv_result = display_socket.recv_from(&mut buf);
411 assert!(recv_result.is_ok());
412 }
413
414 #[test]
415 fn test_connection_write_with_offset() {
416 use std::time::Duration;
417
418 let (mut conn, display_socket) = create_test_connection();
419 display_socket
420 .set_read_timeout(Some(Duration::from_millis(500)))
421 .unwrap();
422
423 let pixel_data = vec![128, 128, 128]; let offset = 30; let result = conn.write_offset(&pixel_data, offset);
426
427 assert!(result.is_ok());
428
429 let mut buf = [0u8; 1500];
430 match display_socket.recv_from(&mut buf) {
431 Ok((size, _)) => {
432 assert!(size > 10);
433 let received_offset = u32::from_be_bytes([buf[4], buf[5], buf[6], buf[7]]);
434 assert_eq!(received_offset, offset);
435 }
436 Err(e) => {
437 eprintln!("Warning: recv_from timed out: {}", e);
438 }
439 }
440 }
441
442 #[test]
443 fn test_connection_sequence_numbers() {
444 use std::time::Duration;
445
446 let (mut conn, display_socket) = create_test_connection();
447 display_socket
448 .set_read_timeout(Some(Duration::from_millis(100)))
449 .unwrap();
450
451 let pixel_data = vec![255, 0, 0];
452
453 for i in 0..5 {
454 conn.write(&pixel_data).unwrap();
455
456 let mut buf = [0u8; 1500];
457 display_socket.recv_from(&mut buf).unwrap();
458
459 let seq_num = buf[1];
460 assert_eq!(seq_num, (i + 1) as u8);
461 }
462 }
463
464 #[test]
465 fn test_connection_large_data_chunking() {
466 use std::time::Duration;
467
468 let (mut conn, display_socket) = create_test_connection();
469 display_socket
470 .set_read_timeout(Some(Duration::from_millis(500)))
471 .unwrap();
472
473 let large_data = vec![128u8; 2000];
475 let result = conn.write(&large_data);
476
477 assert!(result.is_ok());
478
479 let mut received_packets = 0;
481 let mut buf = [0u8; 1500];
482
483 loop {
484 match display_socket.recv_from(&mut buf) {
485 Ok(_) => received_packets += 1,
486 Err(_) => break,
487 }
488
489 if received_packets >= 2 {
490 break;
491 }
492 }
493
494 assert!(received_packets >= 2, "Expected multiple packets for large data");
495 }
496
497 #[test]
498 fn test_connection_empty_data() {
499 use std::time::Duration;
500
501 let (mut conn, display_socket) = create_test_connection();
502 display_socket
503 .set_read_timeout(Some(Duration::from_millis(100)))
504 .unwrap();
505
506 let empty_data: Vec<u8> = vec![];
507 let result = conn.write(&empty_data);
508
509 assert!(result.is_ok());
510 }
511
512 #[test]
513 fn test_pixel_config_preserved() {
514 let display_socket = UdpSocket::bind("127.0.0.1:0").expect("Failed to bind display socket");
515 let display_addr = display_socket.local_addr().unwrap();
516 let client_socket = UdpSocket::bind("127.0.0.1:0").expect("Failed to bind client socket");
517
518 let custom_config = PixelConfig::default();
519
520 let conn = DDPConnection::try_new(
521 display_addr,
522 custom_config,
523 ID::default(),
524 client_socket,
525 )
526 .expect("Failed to create connection");
527
528 assert_eq!(conn.pixel_config, custom_config);
529 }
530
531 #[test]
532 fn test_id_preserved() {
533 let display_socket = UdpSocket::bind("127.0.0.1:0").expect("Failed to bind display socket");
534 let display_addr = display_socket.local_addr().unwrap();
535 let client_socket = UdpSocket::bind("127.0.0.1:0").expect("Failed to bind client socket");
536
537 let custom_id = ID::Config;
538
539 let conn = DDPConnection::try_new(
540 display_addr,
541 PixelConfig::default(),
542 custom_id,
543 client_socket,
544 )
545 .expect("Failed to create connection");
546
547 assert_eq!(conn.id, custom_id);
548 }
549}