ddp_connection/
connection.rs1use crate::error::DDPError;
2use crate::packet::Packet;
3use crate::protocol;
4use crossbeam::channel::{unbounded, Receiver, TryRecvError};
5use std::net::{SocketAddr, UdpSocket};
6use crate::error::DDPError::CrossBeamError;
7
8
9const MAX_DATA_LENGTH: usize = 480 * 3;
10
11#[derive(Debug)]
13pub struct DDPConnection {
14 pub pixel_config: protocol::PixelConfig,
15 pub id: protocol::ID,
16
17 sequence_number: u8,
18 socket: UdpSocket,
19 addr: SocketAddr,
20
21 pub receiver_packet: Receiver<Packet>,
22
23 buffer: [u8; 1500],
25}
26
27
28impl DDPConnection {
29 pub fn write(&mut self, data: &[u8]) -> Result<usize, DDPError> {
33
34 let mut h = protocol::Header::default();
35
36 h.packet_type.push(false);
37 h.pixel_config = self.pixel_config;
38 h.id = self.id;
39 h.length = data.len() as u16;
40
41 self.slice_send(&mut h, data)
42 }
43
44 pub fn write_message(&mut self, msg: protocol::message::Message) -> Result<usize, DDPError>
50 {
51 let mut h = protocol::Header::default();
52 h.packet_type.push(false);
53 h.id = msg.get_id();
54 let msg_data: Vec<u8> = msg.try_into()?;
55 h.length = msg_data.len() as u16;
56
57 self.slice_send(&mut h, &msg_data)
58 }
59
60 fn slice_send(
61 &mut self,
62 header: &mut protocol::Header,
63 data: &[u8],
64 ) -> Result<usize, DDPError> {
65 let mut offset = 0;
66 let mut sent = 0;
67
68 let num_iterations = (data.len() + MAX_DATA_LENGTH - 1) / MAX_DATA_LENGTH;
69 let mut iter = 0;
70
71 while offset < data.len() {
72 iter += 1;
73
74 if iter == num_iterations {
75 header.packet_type.push(true);
76 }
77
78 header.sequence_number = self.sequence_number;
79
80 let chunk_end = std::cmp::min(offset + MAX_DATA_LENGTH, data.len());
81 let chunk = &data[offset..chunk_end];
82 let len = self.assemble_packet(*header, chunk);
83
84 sent += self.socket.send_to(&self.buffer[0..len], self.addr)?;
86
87
88 if self.sequence_number > 15 {
90 self.sequence_number = 1;
91 } else {
92 self.sequence_number += 1;
93 }
94 offset += MAX_DATA_LENGTH;
95 }
96
97 Ok(sent)
98 }
99
100 pub fn get_incoming(&self) -> Result<Packet, DDPError>{
101 match self.receiver_packet.try_recv() {
102 Ok(packet) => {
103 Ok(packet)
104 },
105 Err(e) if e == TryRecvError::Empty => {
106 Err(DDPError::NothingToReceive)
107 },
108 Err(e2) => Err(CrossBeamError(e2))
109 }
110 }
111
112 pub fn try_new<A>(
113 addr: A,
114 pixel_config: protocol::PixelConfig,
115 id: protocol::ID,
116 socket: UdpSocket
117 ) -> Result<DDPConnection, DDPError>
118 where
119 A: std::net::ToSocketAddrs,
120 {
121 let socket_addr: SocketAddr = addr
122 .to_socket_addrs()?
123 .next()
124 .ok_or(DDPError::NoValidSocketAddr)?;
125 let (_s, recv) = unbounded();
126
127 Ok(
128 DDPConnection {
129 addr: socket_addr,
130 pixel_config,
131 id,
132 socket,
133 receiver_packet: recv,
134 sequence_number: 1,
135 buffer: [0u8; 1500],
136 }
137 )
138 }
139
140 #[inline(always)]
144 fn assemble_packet(&mut self, header: protocol::Header, data: &[u8]) -> usize {
145 let header_bytes: usize = if header.packet_type.timecode {
146 let header_bytes: [u8; 14] = header.into();
147 self.buffer[0..14].copy_from_slice(&header_bytes);
148 14usize
149
150 } else {
151 let header_bytes: [u8; 10] = header.into();
152 self.buffer[0..10].copy_from_slice(&header_bytes);
153 10usize
154 };
155 self.buffer[header_bytes..(header_bytes + data.len())].copy_from_slice(data);
156
157 return header_bytes + data.len();
158 }
159}
160
161
162
163
164
165#[cfg(test)]
166mod tests {
167 use crate::protocol::{ID, PixelConfig};
168 use super::*;
169
170 #[test]
171 fn test_conn() {
172 let conn = DDPConnection::try_new
173 (
174 "192.168.1.40:4048",
175 PixelConfig::default(),
176 ID::Default,
177 UdpSocket::bind("0.0.0.0:4048").unwrap()
178 );
179 assert!(conn.is_ok());
180 }
181}