1use delegate::delegate;
2use std::{
3 io::Write,
4 net::{SocketAddr, TcpListener, TcpStream},
5};
6
7use alloc::boxed::Box;
8
9use crate::{
10 encoding::{ccsds::PacketIdLookup, parse_buffer_for_ccsds_space_packets},
11 tmtc::{ReceivesTc, TmPacketSource},
12};
13
14use super::tcp_server::{
15 ConnectionResult, ServerConfig, TcpTcParser, TcpTmSender, TcpTmtcError, TcpTmtcGenericServer,
16};
17
18pub struct SpacepacketsTcParser {
20 packet_id_lookup: Box<dyn PacketIdLookup + Send>,
21}
22
23impl SpacepacketsTcParser {
24 pub fn new(packet_id_lookup: Box<dyn PacketIdLookup + Send>) -> Self {
25 Self { packet_id_lookup }
26 }
27}
28
29impl<TmError, TcError: 'static> TcpTcParser<TmError, TcError> for SpacepacketsTcParser {
30 fn handle_tc_parsing(
31 &mut self,
32 tc_buffer: &mut [u8],
33 tc_receiver: &mut (impl ReceivesTc<Error = TcError> + ?Sized),
34 conn_result: &mut ConnectionResult,
35 current_write_idx: usize,
36 next_write_idx: &mut usize,
37 ) -> Result<(), TcpTmtcError<TmError, TcError>> {
38 conn_result.num_received_tcs += parse_buffer_for_ccsds_space_packets(
40 &mut tc_buffer[..current_write_idx],
41 self.packet_id_lookup.as_ref(),
42 tc_receiver.upcast_mut(),
43 next_write_idx,
44 )
45 .map_err(|e| TcpTmtcError::TcError(e))?;
46 Ok(())
47 }
48}
49
50#[derive(Default)]
52pub struct SpacepacketsTmSender {}
53
54impl<TmError, TcError> TcpTmSender<TmError, TcError> for SpacepacketsTmSender {
55 fn handle_tm_sending(
56 &mut self,
57 tm_buffer: &mut [u8],
58 tm_source: &mut (impl TmPacketSource<Error = TmError> + ?Sized),
59 conn_result: &mut ConnectionResult,
60 stream: &mut TcpStream,
61 ) -> Result<bool, TcpTmtcError<TmError, TcError>> {
62 let mut tm_was_sent = false;
63 loop {
64 let read_tm_len = tm_source
67 .retrieve_packet(tm_buffer)
68 .map_err(|e| TcpTmtcError::TmError(e))?;
69
70 if read_tm_len == 0 {
71 return Ok(tm_was_sent);
72 }
73 tm_was_sent = true;
74 conn_result.num_sent_tms += 1;
75
76 stream.write_all(&tm_buffer[..read_tm_len])?;
77 }
78 }
79}
80
81pub struct TcpSpacepacketsServer<
94 TmError,
95 TcError: 'static,
96 TmSource: TmPacketSource<Error = TmError>,
97 TcReceiver: ReceivesTc<Error = TcError>,
98> {
99 generic_server: TcpTmtcGenericServer<
100 TmError,
101 TcError,
102 TmSource,
103 TcReceiver,
104 SpacepacketsTmSender,
105 SpacepacketsTcParser,
106 >,
107}
108
109impl<
110 TmError: 'static,
111 TcError: 'static,
112 TmSource: TmPacketSource<Error = TmError>,
113 TcReceiver: ReceivesTc<Error = TcError>,
114 > TcpSpacepacketsServer<TmError, TcError, TmSource, TcReceiver>
115{
116 pub fn new(
127 cfg: ServerConfig,
128 tm_source: TmSource,
129 tc_receiver: TcReceiver,
130 packet_id_lookup: Box<dyn PacketIdLookup + Send>,
131 ) -> Result<Self, std::io::Error> {
132 Ok(Self {
133 generic_server: TcpTmtcGenericServer::new(
134 cfg,
135 SpacepacketsTcParser::new(packet_id_lookup),
136 SpacepacketsTmSender::default(),
137 tm_source,
138 tc_receiver,
139 )?,
140 })
141 }
142
143 delegate! {
144 to self.generic_server {
145 pub fn listener(&mut self) -> &mut TcpListener;
146
147 pub fn local_addr(&self) -> std::io::Result<SocketAddr>;
150
151 pub fn handle_next_connection(
153 &mut self,
154 ) -> Result<ConnectionResult, TcpTmtcError<TmError, TcError>>;
155 }
156 }
157}
158
159#[cfg(test)]
160mod tests {
161 use core::{
162 sync::atomic::{AtomicBool, Ordering},
163 time::Duration,
164 };
165 #[allow(unused_imports)]
166 use std::println;
167 use std::{
168 io::{Read, Write},
169 net::{IpAddr, Ipv4Addr, SocketAddr, TcpStream},
170 thread,
171 };
172
173 use alloc::{boxed::Box, sync::Arc};
174 use hashbrown::HashSet;
175 use spacepackets::{
176 ecss::{tc::PusTcCreator, WritablePusPacket},
177 PacketId, SpHeader,
178 };
179
180 use crate::hal::std::tcp_server::{
181 tests::{SyncTcCacher, SyncTmSource},
182 ServerConfig,
183 };
184
185 use super::TcpSpacepacketsServer;
186
187 const TEST_APID_0: u16 = 0x02;
188 const TEST_PACKET_ID_0: PacketId = PacketId::const_tc(true, TEST_APID_0);
189 const TEST_APID_1: u16 = 0x10;
190 const TEST_PACKET_ID_1: PacketId = PacketId::const_tc(true, TEST_APID_1);
191
192 fn generic_tmtc_server(
193 addr: &SocketAddr,
194 tc_receiver: SyncTcCacher,
195 tm_source: SyncTmSource,
196 packet_id_lookup: HashSet<PacketId>,
197 ) -> TcpSpacepacketsServer<(), (), SyncTmSource, SyncTcCacher> {
198 TcpSpacepacketsServer::new(
199 ServerConfig::new(*addr, Duration::from_millis(2), 1024, 1024),
200 tm_source,
201 tc_receiver,
202 Box::new(packet_id_lookup),
203 )
204 .expect("TCP server generation failed")
205 }
206
207 #[test]
208 fn test_basic_tc_only() {
209 let auto_port_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 0);
210 let tc_receiver = SyncTcCacher::default();
211 let tm_source = SyncTmSource::default();
212 let mut packet_id_lookup = HashSet::new();
213 packet_id_lookup.insert(TEST_PACKET_ID_0);
214 let mut tcp_server = generic_tmtc_server(
215 &auto_port_addr,
216 tc_receiver.clone(),
217 tm_source,
218 packet_id_lookup,
219 );
220 let dest_addr = tcp_server
221 .local_addr()
222 .expect("retrieving dest addr failed");
223 let conn_handled: Arc<AtomicBool> = Default::default();
224 let set_if_done = conn_handled.clone();
225 thread::spawn(move || {
227 let result = tcp_server.handle_next_connection();
228 if result.is_err() {
229 panic!("handling connection failed: {:?}", result.unwrap_err());
230 }
231 let conn_result = result.unwrap();
232 assert_eq!(conn_result.num_received_tcs, 1);
233 assert_eq!(conn_result.num_sent_tms, 0);
234 set_if_done.store(true, Ordering::Relaxed);
235 });
236 let mut sph = SpHeader::tc_unseg(TEST_APID_0, 0, 0).unwrap();
237 let ping_tc = PusTcCreator::new_simple(&mut sph, 17, 1, None, true);
238 let tc_0 = ping_tc.to_vec().expect("packet generation failed");
239 let mut stream = TcpStream::connect(dest_addr).expect("connecting to TCP server failed");
240 stream
241 .write_all(&tc_0)
242 .expect("writing to TCP server failed");
243 drop(stream);
244
245 for _ in 0..3 {
247 if !conn_handled.load(Ordering::Relaxed) {
248 thread::sleep(Duration::from_millis(5));
249 }
250 }
251 if !conn_handled.load(Ordering::Relaxed) {
252 panic!("connection was not handled properly");
253 }
254 let mut tc_queue = tc_receiver.tc_queue.lock().unwrap();
256 assert_eq!(tc_queue.len(), 1);
257 assert_eq!(tc_queue.pop_front().unwrap(), tc_0);
258 }
259
260 #[test]
261 fn test_multi_tc_multi_tm() {
262 let auto_port_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 0);
263 let tc_receiver = SyncTcCacher::default();
264 let mut tm_source = SyncTmSource::default();
265
266 let mut total_tm_len = 0;
268 let mut sph = SpHeader::tc_unseg(TEST_APID_0, 0, 0).unwrap();
269 let verif_tm = PusTcCreator::new_simple(&mut sph, 1, 1, None, true);
270 let tm_0 = verif_tm.to_vec().expect("writing packet failed");
271 total_tm_len += tm_0.len();
272 tm_source.add_tm(&tm_0);
273 let mut sph = SpHeader::tc_unseg(TEST_APID_1, 0, 0).unwrap();
274 let verif_tm = PusTcCreator::new_simple(&mut sph, 1, 3, None, true);
275 let tm_1 = verif_tm.to_vec().expect("writing packet failed");
276 total_tm_len += tm_1.len();
277 tm_source.add_tm(&tm_1);
278
279 let mut packet_id_lookup = HashSet::new();
281 packet_id_lookup.insert(TEST_PACKET_ID_0);
282 packet_id_lookup.insert(TEST_PACKET_ID_1);
283 let mut tcp_server = generic_tmtc_server(
284 &auto_port_addr,
285 tc_receiver.clone(),
286 tm_source,
287 packet_id_lookup,
288 );
289 let dest_addr = tcp_server
290 .local_addr()
291 .expect("retrieving dest addr failed");
292 let conn_handled: Arc<AtomicBool> = Default::default();
293 let set_if_done = conn_handled.clone();
294
295 thread::spawn(move || {
297 let result = tcp_server.handle_next_connection();
298 if result.is_err() {
299 panic!("handling connection failed: {:?}", result.unwrap_err());
300 }
301 let conn_result = result.unwrap();
302 assert_eq!(
303 conn_result.num_received_tcs, 2,
304 "wrong number of received TCs"
305 );
306 assert_eq!(conn_result.num_sent_tms, 2, "wrong number of sent TMs");
307 set_if_done.store(true, Ordering::Relaxed);
308 });
309 let mut stream = TcpStream::connect(dest_addr).expect("connecting to TCP server failed");
310 stream
311 .set_read_timeout(Some(Duration::from_millis(10)))
312 .expect("setting reas timeout failed");
313
314 let mut sph = SpHeader::tc_unseg(TEST_APID_0, 0, 0).unwrap();
316 let ping_tc = PusTcCreator::new_simple(&mut sph, 17, 1, None, true);
317 let tc_0 = ping_tc.to_vec().expect("ping tc creation failed");
318 stream
319 .write_all(&tc_0)
320 .expect("writing to TCP server failed");
321 let mut sph = SpHeader::tc_unseg(TEST_APID_1, 0, 0).unwrap();
322 let action_tc = PusTcCreator::new_simple(&mut sph, 8, 0, None, true);
323 let tc_1 = action_tc.to_vec().expect("action tc creation failed");
324 stream
325 .write_all(&tc_1)
326 .expect("writing to TCP server failed");
327
328 stream
330 .shutdown(std::net::Shutdown::Write)
331 .expect("shutting down write failed");
332 let mut read_buf: [u8; 32] = [0; 32];
333 let mut current_idx = 0;
334 let mut read_len_total = 0;
335 while read_len_total < total_tm_len {
337 let read_len = stream
338 .read(&mut read_buf[current_idx..])
339 .expect("read failed");
340 current_idx += read_len;
341 read_len_total += read_len;
342 }
343 drop(stream);
344 assert_eq!(read_buf[..tm_0.len()], tm_0);
345 assert_eq!(read_buf[tm_0.len()..tm_0.len() + tm_1.len()], tm_1);
346
347 for _ in 0..3 {
349 if !conn_handled.load(Ordering::Relaxed) {
350 thread::sleep(Duration::from_millis(5));
351 }
352 }
353 if !conn_handled.load(Ordering::Relaxed) {
354 panic!("connection was not handled properly");
355 }
356 let mut tc_queue = tc_receiver.tc_queue.lock().unwrap();
358 assert_eq!(tc_queue.len(), 2);
359 assert_eq!(tc_queue.pop_front().unwrap(), tc_0);
360 assert_eq!(tc_queue.pop_front().unwrap(), tc_1);
361 }
362}