1use crate::cfdp::user::TransactionFinishedParams;
2use core::str::{from_utf8, Utf8Error};
3use std::path::{Path, PathBuf};
4
5use super::{
6 filestore::{FilestoreError, VirtualFilestore},
7 user::{CfdpUser, FileSegmentRecvdParams, MetadataReceivedParams},
8 CheckTimer, CheckTimerCreator, EntityType, LocalEntityConfig, PacketInfo, PacketTarget,
9 RemoteEntityConfig, RemoteEntityConfigProvider, State, TimerContext, TransactionId,
10 TransactionStep,
11};
12use alloc::boxed::Box;
13use smallvec::SmallVec;
14use spacepackets::{
15 cfdp::{
16 pdu::{
17 eof::EofPdu,
18 file_data::FileDataPdu,
19 finished::{DeliveryCode, FileStatus, FinishedPduCreator},
20 metadata::{MetadataGenericParams, MetadataPduReader},
21 CfdpPdu, CommonPduConfig, FileDirectiveType, PduError, PduHeader, WritablePduPacket,
22 },
23 tlv::{msg_to_user::MsgToUserTlv, EntityIdTlv, GenericTlv, TlvType},
24 ChecksumType, ConditionCode, FaultHandlerCode, PduType, TransmissionMode,
25 },
26 util::UnsignedByteField,
27};
28use thiserror::Error;
29
30#[derive(Debug)]
31struct FileProperties {
32 src_file_name: [u8; u8::MAX as usize],
33 src_file_name_len: usize,
34 dest_file_name: [u8; u8::MAX as usize],
35 dest_file_name_len: usize,
36 dest_path_buf: PathBuf,
37}
38
39#[derive(Debug, PartialEq, Eq, Copy, Clone)]
40enum CompletionDisposition {
41 Completed = 0,
42 Cancelled = 1,
43}
44
45#[derive(Debug)]
46struct TransferState {
47 transaction_id: Option<TransactionId>,
48 metadata_params: MetadataGenericParams,
49 progress: u64,
50 metadata_only: bool,
51 condition_code: ConditionCode,
52 delivery_code: DeliveryCode,
53 file_status: FileStatus,
54 completion_disposition: CompletionDisposition,
55 checksum: u32,
56 current_check_count: u32,
57 current_check_timer: Option<Box<dyn CheckTimer>>,
58}
59
60impl Default for TransferState {
61 fn default() -> Self {
62 Self {
63 transaction_id: None,
64 metadata_params: Default::default(),
65 progress: Default::default(),
66 metadata_only: false,
67 condition_code: ConditionCode::NoError,
68 delivery_code: DeliveryCode::Incomplete,
69 file_status: FileStatus::Unreported,
70 completion_disposition: CompletionDisposition::Completed,
71 checksum: 0,
72 current_check_count: 0,
73 current_check_timer: None,
74 }
75 }
76}
77
78#[derive(Debug)]
79struct TransactionParams {
80 tstate: TransferState,
81 pdu_conf: CommonPduConfig,
82 file_properties: FileProperties,
83 cksum_buf: [u8; 1024],
84 msgs_to_user_size: usize,
85 msgs_to_user_buf: [u8; 1024],
86 remote_cfg: Option<RemoteEntityConfig>,
87}
88
89impl TransactionParams {
90 fn transmission_mode(&self) -> TransmissionMode {
91 self.pdu_conf.trans_mode
92 }
93}
94
95impl Default for FileProperties {
96 fn default() -> Self {
97 Self {
98 src_file_name: [0; u8::MAX as usize],
99 src_file_name_len: Default::default(),
100 dest_file_name: [0; u8::MAX as usize],
101 dest_file_name_len: Default::default(),
102 dest_path_buf: Default::default(),
103 }
104 }
105}
106
107impl TransactionParams {
108 fn file_size(&self) -> u64 {
109 self.tstate.metadata_params.file_size
110 }
111
112 fn metadata_params(&self) -> &MetadataGenericParams {
113 &self.tstate.metadata_params
114 }
115}
116
117impl Default for TransactionParams {
118 fn default() -> Self {
119 Self {
120 pdu_conf: Default::default(),
121 cksum_buf: [0; 1024],
122 msgs_to_user_size: 0,
123 msgs_to_user_buf: [0; 1024],
124 tstate: Default::default(),
125 file_properties: Default::default(),
126 remote_cfg: None,
127 }
128 }
129}
130
131impl TransactionParams {
132 fn reset(&mut self) {
133 self.tstate.condition_code = ConditionCode::NoError;
134 self.tstate.delivery_code = DeliveryCode::Incomplete;
135 self.tstate.file_status = FileStatus::Unreported;
136 }
137}
138
139#[derive(Debug, Error)]
140pub enum DestError {
141 #[error("expected file directive")]
143 DirectiveExpected,
144 #[error("can not process packet type {0:?}")]
145 CantProcessPacketType(FileDirectiveType),
146 #[error("can not process file data PDUs in current state")]
147 WrongStateForFileDataAndEof,
148 #[error("busy with transfer")]
150 RecvdMetadataButIsBusy,
151 #[error("empty source file field")]
152 EmptySrcFileField,
153 #[error("empty dest file field")]
154 EmptyDestFileField,
155 #[error("packets to be sent are still left")]
156 PacketToSendLeft,
157 #[error("pdu error {0}")]
158 Pdu(#[from] PduError),
159 #[error("io error {0}")]
160 Io(#[from] std::io::Error),
161 #[error("file store error {0}")]
162 Filestore(#[from] FilestoreError),
163 #[error("path conversion error {0}")]
164 PathConversion(#[from] Utf8Error),
165 #[error("error building dest path from source file name and dest folder")]
166 PathConcat,
167 #[error("no remote entity configuration found for {0:?}")]
168 NoRemoteCfgFound(UnsignedByteField),
169}
170
171pub trait CfdpPacketSender: Send {
172 fn send_pdu(
173 &mut self,
174 pdu_type: PduType,
175 file_directive_type: Option<FileDirectiveType>,
176 raw_pdu: &[u8],
177 ) -> Result<(), PduError>;
178}
179
180pub struct DestinationHandler {
195 local_cfg: LocalEntityConfig,
196 step: TransactionStep,
197 state: State,
198 tparams: TransactionParams,
199 packet_buf: alloc::vec::Vec<u8>,
200 packet_sender: Box<dyn CfdpPacketSender>,
201 vfs: Box<dyn VirtualFilestore>,
202 remote_cfg_table: Box<dyn RemoteEntityConfigProvider>,
203 check_timer_creator: Box<dyn CheckTimerCreator>,
204}
205
206impl DestinationHandler {
207 pub fn new(
227 local_cfg: LocalEntityConfig,
228 max_packet_len: usize,
229 packet_sender: Box<dyn CfdpPacketSender>,
230 vfs: Box<dyn VirtualFilestore>,
231 remote_cfg_table: Box<dyn RemoteEntityConfigProvider>,
232 check_timer_creator: Box<dyn CheckTimerCreator>,
233 ) -> Self {
234 Self {
235 local_cfg,
236 step: TransactionStep::Idle,
237 state: State::Idle,
238 tparams: Default::default(),
239 packet_buf: alloc::vec![0; max_packet_len],
240 packet_sender,
241 vfs,
242 remote_cfg_table,
243 check_timer_creator,
244 }
245 }
246
247 pub fn state_machine(
254 &mut self,
255 cfdp_user: &mut impl CfdpUser,
256 packet_to_insert: Option<&PacketInfo>,
257 ) -> Result<u32, DestError> {
258 if let Some(packet) = packet_to_insert {
259 self.insert_packet(cfdp_user, packet)?;
260 }
261 match self.state {
262 State::Idle => todo!(),
263 State::Busy => self.fsm_busy(cfdp_user),
264 State::Suspended => todo!(),
265 }
266 }
267
268 pub fn transmission_mode(&self) -> Option<TransmissionMode> {
271 if self.state == State::Idle {
272 return None;
273 }
274 Some(self.tparams.transmission_mode())
275 }
276
277 pub fn transaction_id(&self) -> Option<TransactionId> {
278 self.tstate().transaction_id
279 }
280
281 fn insert_packet(
282 &mut self,
283 cfdp_user: &mut impl CfdpUser,
284 packet_info: &PacketInfo,
285 ) -> Result<(), DestError> {
286 if packet_info.target() != PacketTarget::DestEntity {
287 return Err(DestError::CantProcessPacketType(
290 packet_info.pdu_directive().unwrap(),
291 ));
292 }
293 match packet_info.pdu_type {
294 PduType::FileDirective => {
295 if packet_info.pdu_directive.is_none() {
296 return Err(DestError::DirectiveExpected);
297 }
298 self.handle_file_directive(
299 cfdp_user,
300 packet_info.pdu_directive.unwrap(),
301 packet_info.raw_packet,
302 )
303 }
304 PduType::FileData => self.handle_file_data(cfdp_user, packet_info.raw_packet),
305 }
306 }
307
308 fn handle_file_directive(
309 &mut self,
310 cfdp_user: &mut impl CfdpUser,
311 pdu_directive: FileDirectiveType,
312 raw_packet: &[u8],
313 ) -> Result<(), DestError> {
314 match pdu_directive {
315 FileDirectiveType::EofPdu => self.handle_eof_pdu(cfdp_user, raw_packet)?,
316 FileDirectiveType::FinishedPdu
317 | FileDirectiveType::NakPdu
318 | FileDirectiveType::KeepAlivePdu => {
319 return Err(DestError::CantProcessPacketType(pdu_directive));
320 }
321 FileDirectiveType::AckPdu => {
322 todo!(
323 "check whether ACK pdu handling is applicable by checking the acked directive field"
324 )
325 }
326 FileDirectiveType::MetadataPdu => self.handle_metadata_pdu(raw_packet)?,
327 FileDirectiveType::PromptPdu => self.handle_prompt_pdu(raw_packet)?,
328 };
329 Ok(())
330 }
331
332 fn handle_metadata_pdu(&mut self, raw_packet: &[u8]) -> Result<(), DestError> {
333 if self.state != State::Idle {
334 return Err(DestError::RecvdMetadataButIsBusy);
335 }
336 let metadata_pdu = MetadataPduReader::from_bytes(raw_packet)?;
337 self.tparams.reset();
338 self.tparams.tstate.metadata_params = *metadata_pdu.metadata_params();
339 let remote_cfg = self
340 .remote_cfg_table
341 .get_remote_config(metadata_pdu.source_id().value());
342 if remote_cfg.is_none() {
343 return Err(DestError::NoRemoteCfgFound(metadata_pdu.dest_id()));
344 }
345 self.tparams.remote_cfg = Some(*remote_cfg.unwrap());
346
347 let src_name = metadata_pdu.src_file_name();
349 let dest_name = metadata_pdu.dest_file_name();
350 if src_name.is_empty() && dest_name.is_empty() {
351 self.tparams.tstate.metadata_only = true;
352 }
353 if !self.tparams.tstate.metadata_only && src_name.is_empty() {
354 return Err(DestError::EmptySrcFileField);
355 }
356 if !self.tparams.tstate.metadata_only && dest_name.is_empty() {
357 return Err(DestError::EmptyDestFileField);
358 }
359 if !self.tparams.tstate.metadata_only {
360 self.tparams.file_properties.src_file_name[..src_name.len_value()]
361 .copy_from_slice(src_name.value());
362 self.tparams.file_properties.src_file_name_len = src_name.len_value();
363 if dest_name.is_empty() {
364 return Err(DestError::EmptyDestFileField);
365 }
366 self.tparams.file_properties.dest_file_name[..dest_name.len_value()]
367 .copy_from_slice(dest_name.value());
368 self.tparams.file_properties.dest_file_name_len = dest_name.len_value();
369 self.tparams.pdu_conf = *metadata_pdu.pdu_header().common_pdu_conf();
370 self.tparams.msgs_to_user_size = 0;
371 }
372 if !metadata_pdu.options().is_empty() {
373 for option_tlv in metadata_pdu.options_iter().unwrap() {
374 if option_tlv.is_standard_tlv()
375 && option_tlv.tlv_type().unwrap() == TlvType::MsgToUser
376 {
377 self.tparams
378 .msgs_to_user_buf
379 .copy_from_slice(option_tlv.raw_data().unwrap());
380 self.tparams.msgs_to_user_size += option_tlv.len_full();
381 }
382 }
383 }
384 self.state = State::Busy;
385 self.step = TransactionStep::TransactionStart;
386 Ok(())
387 }
388
389 fn handle_file_data(
390 &mut self,
391 user: &mut impl CfdpUser,
392 raw_packet: &[u8],
393 ) -> Result<(), DestError> {
394 if self.state == State::Idle
395 || (self.step != TransactionStep::ReceivingFileDataPdus
396 && self.step != TransactionStep::ReceivingFileDataPdusWithCheckLimitHandling)
397 {
398 return Err(DestError::WrongStateForFileDataAndEof);
399 }
400 let fd_pdu = FileDataPdu::from_bytes(raw_packet)?;
401 if self.local_cfg.indication_cfg.file_segment_recv {
402 user.file_segment_recvd_indication(&FileSegmentRecvdParams {
403 id: self.tstate().transaction_id.unwrap(),
404 offset: fd_pdu.offset(),
405 length: fd_pdu.file_data().len(),
406 segment_metadata: fd_pdu.segment_metadata(),
407 });
408 }
409 if let Err(e) = self.vfs.write_data(
410 self.tparams.file_properties.dest_path_buf.to_str().unwrap(),
411 fd_pdu.offset(),
412 fd_pdu.file_data(),
413 ) {
414 self.declare_fault(ConditionCode::FilestoreRejection);
415 return Err(e.into());
416 }
417 self.tstate_mut().progress += fd_pdu.file_data().len() as u64;
418 Ok(())
419 }
420
421 fn handle_eof_pdu(
422 &mut self,
423 cfdp_user: &mut impl CfdpUser,
424 raw_packet: &[u8],
425 ) -> Result<(), DestError> {
426 if self.state == State::Idle || self.step != TransactionStep::ReceivingFileDataPdus {
427 return Err(DestError::WrongStateForFileDataAndEof);
428 }
429 let eof_pdu = EofPdu::from_bytes(raw_packet)?;
430 if self.local_cfg.indication_cfg.eof_recv {
431 cfdp_user.eof_recvd_indication(self.tparams.tstate.transaction_id.as_ref().unwrap());
433 }
434 let regular_transfer_finish = if eof_pdu.condition_code() == ConditionCode::NoError {
435 self.handle_no_error_eof_pdu(&eof_pdu)?
436 } else {
437 todo!("implement cancel request handling");
438 };
439 if regular_transfer_finish {
440 self.file_transfer_complete_transition();
441 }
442 Ok(())
443 }
444
445 fn handle_no_error_eof_pdu(&mut self, eof_pdu: &EofPdu) -> Result<bool, DestError> {
447 if self.tparams.tstate.progress > eof_pdu.file_size()
449 && self.declare_fault(ConditionCode::FileSizeError) != FaultHandlerCode::IgnoreError
450 {
451 return Ok(false);
452 } else if (self.tparams.tstate.progress < eof_pdu.file_size())
453 && self.tparams.transmission_mode() == TransmissionMode::Acknowledged
454 {
455 }
463
464 self.tparams.tstate.checksum = eof_pdu.file_checksum();
465 if self.tparams.transmission_mode() == TransmissionMode::Unacknowledged
466 && !self.checksum_verify(self.tparams.tstate.checksum)
467 {
468 if self.declare_fault(ConditionCode::FileChecksumFailure)
469 != FaultHandlerCode::IgnoreError
470 {
471 return Ok(false);
472 }
473 self.start_check_limit_handling();
474 return Ok(false);
475 }
476 Ok(true)
477 }
478
479 fn file_transfer_complete_transition(&mut self) {
480 if self.tparams.transmission_mode() == TransmissionMode::Unacknowledged {
481 self.step = TransactionStep::TransferCompletion;
482 } else {
483 self.step = TransactionStep::SendingAckPdu;
485 }
486 }
487
488 fn checksum_verify(&mut self, checksum: u32) -> bool {
489 let mut file_delivery_complete = false;
490 if self.tparams.metadata_params().checksum_type == ChecksumType::NullChecksum
491 || self.tparams.tstate.metadata_only
492 {
493 file_delivery_complete = true;
494 self.tparams.tstate.delivery_code = DeliveryCode::Complete;
495 self.tparams.tstate.condition_code = ConditionCode::NoError;
496 } else {
497 match self.vfs.checksum_verify(
498 self.tparams.file_properties.dest_path_buf.to_str().unwrap(),
499 self.tparams.metadata_params().checksum_type,
500 checksum,
501 &mut self.tparams.cksum_buf,
502 ) {
503 Ok(checksum_success) => {
504 file_delivery_complete = checksum_success;
505 }
506 Err(e) => match e {
507 FilestoreError::ChecksumTypeNotImplemented(_) => {
508 self.declare_fault(ConditionCode::UnsupportedChecksumType);
509 file_delivery_complete = true;
512 }
513 _ => {
514 self.declare_fault(ConditionCode::FilestoreRejection);
515 }
517 },
518 };
519 }
520 file_delivery_complete
521 }
522
523 fn start_check_limit_handling(&mut self) {
524 self.step = TransactionStep::ReceivingFileDataPdusWithCheckLimitHandling;
525 self.tparams.tstate.current_check_timer = Some(
526 self.check_timer_creator
527 .get_check_timer_provider(TimerContext::CheckLimit {
528 local_id: self.local_cfg.id,
529 remote_id: self.tparams.remote_cfg.unwrap().entity_id,
530 entity_type: EntityType::Receiving,
531 }),
532 );
533 self.tparams.tstate.current_check_count = 0;
534 }
535
536 fn check_limit_handling(&mut self) {
537 if self.tparams.tstate.current_check_timer.is_none() {
538 return;
539 }
540 let check_timer = self.tparams.tstate.current_check_timer.as_ref().unwrap();
541 if check_timer.has_expired() {
542 if self.checksum_verify(self.tparams.tstate.checksum) {
543 self.file_transfer_complete_transition();
544 return;
545 }
546 if self.tparams.tstate.current_check_count + 1
547 >= self.tparams.remote_cfg.unwrap().check_limit
548 {
549 self.declare_fault(ConditionCode::CheckLimitReached);
550 } else {
551 self.tparams.tstate.current_check_count += 1;
552 self.tparams
553 .tstate
554 .current_check_timer
555 .as_mut()
556 .unwrap()
557 .reset();
558 }
559 }
560 }
561
562 pub fn handle_prompt_pdu(&mut self, _raw_packet: &[u8]) -> Result<(), DestError> {
563 todo!();
564 }
565
566 fn fsm_busy(&mut self, cfdp_user: &mut impl CfdpUser) -> Result<u32, DestError> {
567 let mut sent_packets = 0;
568 if self.step == TransactionStep::TransactionStart {
569 self.transaction_start(cfdp_user)?;
570 }
571 if self.step == TransactionStep::ReceivingFileDataPdusWithCheckLimitHandling {
572 self.check_limit_handling();
573 }
574 if self.step == TransactionStep::TransferCompletion {
575 sent_packets += self.transfer_completion(cfdp_user)?;
576 }
577 if self.step == TransactionStep::SendingAckPdu {
578 todo!("no support for acknowledged mode yet");
579 }
580 if self.step == TransactionStep::SendingFinishedPdu {
581 self.reset();
582 }
583 Ok(sent_packets)
584 }
585
586 pub fn step(&self) -> TransactionStep {
588 self.step
589 }
590
591 pub fn state(&self) -> State {
594 self.state
595 }
596
597 fn transaction_start(&mut self, cfdp_user: &mut impl CfdpUser) -> Result<(), DestError> {
598 let dest_name = from_utf8(
599 &self.tparams.file_properties.dest_file_name
600 [..self.tparams.file_properties.dest_file_name_len],
601 )?;
602 let dest_path = Path::new(dest_name);
603 self.tparams.file_properties.dest_path_buf = dest_path.to_path_buf();
604 let source_id = self.tparams.pdu_conf.source_id();
605 let id = TransactionId::new(source_id, self.tparams.pdu_conf.transaction_seq_num);
606 let src_name = from_utf8(
607 &self.tparams.file_properties.src_file_name
608 [0..self.tparams.file_properties.src_file_name_len],
609 )?;
610 let mut msgs_to_user = SmallVec::<[MsgToUserTlv<'_>; 16]>::new();
611 let mut num_msgs_to_user = 0;
612 if self.tparams.msgs_to_user_size > 0 {
613 let mut index = 0;
614 while index < self.tparams.msgs_to_user_size {
615 let msgs_to_user_tlv =
617 MsgToUserTlv::from_bytes(&self.tparams.msgs_to_user_buf[index..])
618 .expect("message to user creation failed unexpectedly");
619 msgs_to_user.push(msgs_to_user_tlv);
620 index += msgs_to_user_tlv.len_full();
621 num_msgs_to_user += 1;
622 }
623 }
624 let metadata_recvd_params = MetadataReceivedParams {
625 id,
626 source_id,
627 file_size: self.tparams.file_size(),
628 src_file_name: src_name,
629 dest_file_name: dest_name,
630 msgs_to_user: &msgs_to_user[..num_msgs_to_user],
631 };
632 self.tparams.tstate.transaction_id = Some(id);
633 cfdp_user.metadata_recvd_indication(&metadata_recvd_params);
634
635 if dest_path.exists() && self.vfs.is_dir(dest_path.to_str().unwrap()) {
639 let source_path = Path::new(from_utf8(
644 &self.tparams.file_properties.src_file_name
645 [..self.tparams.file_properties.src_file_name_len],
646 )?);
647 let source_name = source_path.file_name();
648 if source_name.is_none() {
649 return Err(DestError::PathConcat);
650 }
651 let source_name = source_name.unwrap();
652 self.tparams.file_properties.dest_path_buf.push(source_name);
653 }
654 let dest_path_str = self.tparams.file_properties.dest_path_buf.to_str().unwrap();
655 if self.vfs.exists(dest_path_str) {
656 self.vfs.truncate_file(dest_path_str)?;
657 } else {
658 self.vfs.create_file(dest_path_str)?;
659 }
660 self.tparams.tstate.file_status = FileStatus::Retained;
661 self.step = TransactionStep::ReceivingFileDataPdus;
662 Ok(())
663 }
664
665 fn transfer_completion(&mut self, cfdp_user: &mut impl CfdpUser) -> Result<u32, DestError> {
666 let mut sent_packets = 0;
667 self.notice_of_completion(cfdp_user)?;
668 if self.tparams.transmission_mode() == TransmissionMode::Acknowledged
669 || self.tparams.metadata_params().closure_requested
670 {
671 sent_packets += self.send_finished_pdu()?;
672 self.step = TransactionStep::SendingFinishedPdu;
673 } else {
674 self.reset();
675 }
676 Ok(sent_packets)
677 }
678
679 fn notice_of_completion(&mut self, cfdp_user: &mut impl CfdpUser) -> Result<(), DestError> {
680 if self.tstate().completion_disposition == CompletionDisposition::Completed {
681 } else if self
683 .tparams
684 .remote_cfg
685 .as_ref()
686 .unwrap()
687 .disposition_on_cancellation
688 && self.tstate().delivery_code == DeliveryCode::Incomplete
689 {
690 self.vfs
691 .remove_file(self.tparams.file_properties.dest_path_buf.to_str().unwrap())?;
692 self.tstate_mut().file_status = FileStatus::DiscardDeliberately;
693 }
694 let tstate = self.tstate();
695 let transaction_finished_params = TransactionFinishedParams {
696 id: tstate.transaction_id.unwrap(),
697 condition_code: tstate.condition_code,
698 delivery_code: tstate.delivery_code,
699 file_status: tstate.file_status,
700 };
701 cfdp_user.transaction_finished_indication(&transaction_finished_params);
702 Ok(())
703 }
704
705 fn declare_fault(&mut self, condition_code: ConditionCode) -> FaultHandlerCode {
706 let transaction_id = self.tstate().transaction_id.unwrap();
708 let progress = self.tstate().progress;
709 let fh_code = self
710 .local_cfg
711 .default_fault_handler
712 .get_fault_handler(condition_code);
713 match fh_code {
714 FaultHandlerCode::NoticeOfCancellation => {
715 self.notice_of_cancellation(condition_code);
716 }
717 FaultHandlerCode::NoticeOfSuspension => self.notice_of_suspension(),
718 FaultHandlerCode::IgnoreError => (),
719 FaultHandlerCode::AbandonTransaction => self.abandon_transaction(),
720 }
721 self.local_cfg
722 .default_fault_handler
723 .report_fault(transaction_id, condition_code, progress)
724 }
725
726 fn notice_of_cancellation(&mut self, condition_code: ConditionCode) {
727 self.step = TransactionStep::TransferCompletion;
728 self.tstate_mut().condition_code = condition_code;
729 self.tstate_mut().completion_disposition = CompletionDisposition::Cancelled;
730 }
731
732 fn notice_of_suspension(&mut self) {
733 }
735 fn abandon_transaction(&mut self) {
736 self.reset();
737 }
738
739 fn reset(&mut self) {
740 self.step = TransactionStep::Idle;
741 self.state = State::Idle;
742 self.tparams.reset();
744 }
745
746 fn send_finished_pdu(&mut self) -> Result<u32, DestError> {
747 let tstate = self.tstate();
748
749 let pdu_header = PduHeader::new_no_file_data(self.tparams.pdu_conf, 0);
750 let finished_pdu = if tstate.condition_code == ConditionCode::NoError
751 || tstate.condition_code == ConditionCode::UnsupportedChecksumType
752 {
753 FinishedPduCreator::new_default(pdu_header, tstate.delivery_code, tstate.file_status)
754 } else {
755 let entity_id = EntityIdTlv::new(self.local_cfg.id);
757 FinishedPduCreator::new_with_error(
758 pdu_header,
759 tstate.condition_code,
760 tstate.delivery_code,
761 tstate.file_status,
762 entity_id,
763 )
764 };
765 finished_pdu.write_to_bytes(&mut self.packet_buf)?;
766 self.packet_sender.send_pdu(
767 finished_pdu.pdu_type(),
768 finished_pdu.file_directive_type(),
769 &self.packet_buf[0..finished_pdu.len_written()],
770 )?;
771 Ok(1)
772 }
773
774 fn tstate(&self) -> &TransferState {
775 &self.tparams.tstate
776 }
777
778 fn tstate_mut(&mut self) -> &mut TransferState {
779 &mut self.tparams.tstate
780 }
781}
782
783#[cfg(test)]
784mod tests {
785 use core::{cell::Cell, sync::atomic::AtomicBool};
786 #[allow(unused_imports)]
787 use std::println;
788 use std::{fs, sync::Mutex};
789
790 use alloc::{collections::VecDeque, string::String, sync::Arc, vec::Vec};
791 use rand::Rng;
792 use spacepackets::{
793 cfdp::{
794 lv::Lv,
795 pdu::{finished::FinishedPduReader, metadata::MetadataPduCreator, WritablePduPacket},
796 ChecksumType, TransmissionMode,
797 },
798 util::{UbfU16, UnsignedByteFieldU16},
799 };
800
801 use crate::cfdp::{
802 filestore::NativeFilestore, user::OwnedMetadataRecvdParams, CheckTimer, CheckTimerCreator,
803 DefaultFaultHandler, IndicationConfig, RemoteEntityConfig, StdRemoteEntityConfigProvider,
804 UserFaultHandler, CRC_32,
805 };
806
807 use super::*;
808
809 const LOCAL_ID: UnsignedByteFieldU16 = UnsignedByteFieldU16::new(1);
810 const REMOTE_ID: UnsignedByteFieldU16 = UnsignedByteFieldU16::new(2);
811
812 pub struct FileSegmentRecvdParamsNoSegMetadata {
813 pub id: TransactionId,
814 pub offset: u64,
815 pub length: usize,
816 }
817
818 struct SentPdu {
819 pdu_type: PduType,
820 file_directive_type: Option<FileDirectiveType>,
821 raw_pdu: Vec<u8>,
822 }
823 type SharedPduPacketQueue = Arc<Mutex<VecDeque<SentPdu>>>;
824 #[derive(Default, Clone)]
825 struct TestCfdpSender {
826 packet_queue: SharedPduPacketQueue,
827 }
828
829 impl CfdpPacketSender for TestCfdpSender {
830 fn send_pdu(
831 &mut self,
832 pdu_type: PduType,
833 file_directive_type: Option<FileDirectiveType>,
834 raw_pdu: &[u8],
835 ) -> Result<(), PduError> {
836 self.packet_queue.lock().unwrap().push_back(SentPdu {
837 pdu_type,
838 file_directive_type,
839 raw_pdu: raw_pdu.to_vec(),
840 });
841 Ok(())
842 }
843 }
844
845 impl TestCfdpSender {
846 pub fn retrieve_next_pdu(&self) -> Option<SentPdu> {
847 self.packet_queue.lock().unwrap().pop_front()
848 }
849 pub fn queue_empty(&self) -> bool {
850 self.packet_queue.lock().unwrap().is_empty()
851 }
852 }
853
854 #[derive(Default)]
855 struct TestCfdpUser {
856 next_expected_seq_num: u64,
857 expected_full_src_name: String,
858 expected_full_dest_name: String,
859 expected_file_size: u64,
860 transaction_indication_call_count: u32,
861 eof_recvd_call_count: u32,
862 finished_indic_queue: VecDeque<TransactionFinishedParams>,
863 metadata_recv_queue: VecDeque<OwnedMetadataRecvdParams>,
864 file_seg_recvd_queue: VecDeque<FileSegmentRecvdParamsNoSegMetadata>,
865 }
866
867 impl TestCfdpUser {
868 fn new(
869 next_expected_seq_num: u64,
870 expected_full_src_name: String,
871 expected_full_dest_name: String,
872 expected_file_size: u64,
873 ) -> Self {
874 Self {
875 next_expected_seq_num,
876 expected_full_src_name,
877 expected_full_dest_name,
878 expected_file_size,
879 transaction_indication_call_count: 0,
880 eof_recvd_call_count: 0,
881 finished_indic_queue: VecDeque::new(),
882 metadata_recv_queue: VecDeque::new(),
883 file_seg_recvd_queue: VecDeque::new(),
884 }
885 }
886
887 fn generic_id_check(&self, id: &crate::cfdp::TransactionId) {
888 assert_eq!(id.source_id, LOCAL_ID.into());
889 assert_eq!(id.seq_num().value(), self.next_expected_seq_num);
890 }
891 }
892
893 impl CfdpUser for TestCfdpUser {
894 fn transaction_indication(&mut self, id: &crate::cfdp::TransactionId) {
895 self.generic_id_check(id);
896 self.transaction_indication_call_count += 1;
897 }
898
899 fn eof_sent_indication(&mut self, id: &crate::cfdp::TransactionId) {
900 self.generic_id_check(id);
901 }
902
903 fn transaction_finished_indication(
904 &mut self,
905 finished_params: &crate::cfdp::user::TransactionFinishedParams,
906 ) {
907 self.generic_id_check(&finished_params.id);
908 self.finished_indic_queue.push_back(*finished_params);
909 }
910
911 fn metadata_recvd_indication(
912 &mut self,
913 md_recvd_params: &crate::cfdp::user::MetadataReceivedParams,
914 ) {
915 self.generic_id_check(&md_recvd_params.id);
916 assert_eq!(
917 String::from(md_recvd_params.src_file_name),
918 self.expected_full_src_name
919 );
920 assert_eq!(
921 String::from(md_recvd_params.dest_file_name),
922 self.expected_full_dest_name
923 );
924 assert_eq!(md_recvd_params.msgs_to_user.len(), 0);
925 assert_eq!(md_recvd_params.source_id, LOCAL_ID.into());
926 assert_eq!(md_recvd_params.file_size, self.expected_file_size);
927 self.metadata_recv_queue.push_back(md_recvd_params.into());
928 }
929
930 fn file_segment_recvd_indication(
931 &mut self,
932 segment_recvd_params: &crate::cfdp::user::FileSegmentRecvdParams,
933 ) {
934 self.generic_id_check(&segment_recvd_params.id);
935 self.file_seg_recvd_queue
936 .push_back(FileSegmentRecvdParamsNoSegMetadata {
937 id: segment_recvd_params.id,
938 offset: segment_recvd_params.offset,
939 length: segment_recvd_params.length,
940 })
941 }
942
943 fn report_indication(&mut self, _id: &crate::cfdp::TransactionId) {}
944
945 fn suspended_indication(
946 &mut self,
947 _id: &crate::cfdp::TransactionId,
948 _condition_code: ConditionCode,
949 ) {
950 panic!("unexpected suspended indication");
951 }
952
953 fn resumed_indication(&mut self, _id: &crate::cfdp::TransactionId, _progresss: u64) {}
954
955 fn fault_indication(
956 &mut self,
957 _id: &crate::cfdp::TransactionId,
958 _condition_code: ConditionCode,
959 _progress: u64,
960 ) {
961 panic!("unexpected fault indication");
962 }
963
964 fn abandoned_indication(
965 &mut self,
966 _id: &crate::cfdp::TransactionId,
967 _condition_code: ConditionCode,
968 _progress: u64,
969 ) {
970 panic!("unexpected abandoned indication");
971 }
972
973 fn eof_recvd_indication(&mut self, id: &crate::cfdp::TransactionId) {
974 self.generic_id_check(id);
975 self.eof_recvd_call_count += 1;
976 }
977 }
978
979 #[derive(Default, Clone)]
980 struct TestFaultHandler {
981 notice_of_suspension_queue: Arc<Mutex<VecDeque<(TransactionId, ConditionCode, u64)>>>,
982 notice_of_cancellation_queue: Arc<Mutex<VecDeque<(TransactionId, ConditionCode, u64)>>>,
983 abandoned_queue: Arc<Mutex<VecDeque<(TransactionId, ConditionCode, u64)>>>,
984 ignored_queue: Arc<Mutex<VecDeque<(TransactionId, ConditionCode, u64)>>>,
985 }
986
987 impl UserFaultHandler for TestFaultHandler {
988 fn notice_of_suspension_cb(
989 &mut self,
990 transaction_id: TransactionId,
991 cond: ConditionCode,
992 progress: u64,
993 ) {
994 self.notice_of_suspension_queue.lock().unwrap().push_back((
995 transaction_id,
996 cond,
997 progress,
998 ))
999 }
1000
1001 fn notice_of_cancellation_cb(
1002 &mut self,
1003 transaction_id: TransactionId,
1004 cond: ConditionCode,
1005 progress: u64,
1006 ) {
1007 self.notice_of_cancellation_queue
1008 .lock()
1009 .unwrap()
1010 .push_back((transaction_id, cond, progress))
1011 }
1012
1013 fn abandoned_cb(
1014 &mut self,
1015 transaction_id: TransactionId,
1016 cond: ConditionCode,
1017 progress: u64,
1018 ) {
1019 self.abandoned_queue
1020 .lock()
1021 .unwrap()
1022 .push_back((transaction_id, cond, progress))
1023 }
1024
1025 fn ignore_cb(&mut self, transaction_id: TransactionId, cond: ConditionCode, progress: u64) {
1026 self.ignored_queue
1027 .lock()
1028 .unwrap()
1029 .push_back((transaction_id, cond, progress))
1030 }
1031 }
1032
1033 impl TestFaultHandler {
1034 fn suspension_queue_empty(&self) -> bool {
1035 self.notice_of_suspension_queue.lock().unwrap().is_empty()
1036 }
1037 fn cancellation_queue_empty(&self) -> bool {
1038 self.notice_of_cancellation_queue.lock().unwrap().is_empty()
1039 }
1040 fn ignored_queue_empty(&self) -> bool {
1041 self.ignored_queue.lock().unwrap().is_empty()
1042 }
1043 fn abandoned_queue_empty(&self) -> bool {
1044 self.abandoned_queue.lock().unwrap().is_empty()
1045 }
1046 fn all_queues_empty(&self) -> bool {
1047 self.suspension_queue_empty()
1048 && self.cancellation_queue_empty()
1049 && self.ignored_queue_empty()
1050 && self.abandoned_queue_empty()
1051 }
1052 }
1053
1054 #[derive(Debug)]
1055 struct TestCheckTimer {
1056 counter: Cell<u32>,
1057 expired: Arc<AtomicBool>,
1058 }
1059
1060 impl CheckTimer for TestCheckTimer {
1061 fn has_expired(&self) -> bool {
1062 self.expired.load(core::sync::atomic::Ordering::Relaxed)
1063 }
1064 fn reset(&mut self) {
1065 self.counter.set(0);
1066 }
1067 }
1068
1069 impl TestCheckTimer {
1070 pub fn new(expired_flag: Arc<AtomicBool>) -> Self {
1071 Self {
1072 counter: Cell::new(0),
1073 expired: expired_flag,
1074 }
1075 }
1076 }
1077
1078 struct TestCheckTimerCreator {
1079 check_limit_expired_flag: Arc<AtomicBool>,
1080 }
1081
1082 impl TestCheckTimerCreator {
1083 pub fn new(expired_flag: Arc<AtomicBool>) -> Self {
1084 Self {
1085 check_limit_expired_flag: expired_flag,
1086 }
1087 }
1088 }
1089
1090 impl CheckTimerCreator for TestCheckTimerCreator {
1091 fn get_check_timer_provider(&self, timer_context: TimerContext) -> Box<dyn CheckTimer> {
1092 match timer_context {
1093 TimerContext::CheckLimit { .. } => {
1094 Box::new(TestCheckTimer::new(self.check_limit_expired_flag.clone()))
1095 }
1096 _ => {
1097 panic!("invalid check timer creator, can only be used for check limit handling")
1098 }
1099 }
1100 }
1101 }
1102
1103 struct DestHandlerTester {
1104 check_timer_expired: Arc<AtomicBool>,
1105 pdu_sender: TestCfdpSender,
1106 handler: DestinationHandler,
1107 src_path: PathBuf,
1108 dest_path: PathBuf,
1109 check_dest_file: bool,
1110 check_handler_idle_at_drop: bool,
1111 expected_file_size: u64,
1112 closure_requested: bool,
1113 pdu_header: PduHeader,
1114 expected_full_data: Vec<u8>,
1115 buf: [u8; 512],
1116 }
1117
1118 impl DestHandlerTester {
1119 fn new(fault_handler: TestFaultHandler, closure_requested: bool) -> Self {
1120 let check_timer_expired = Arc::new(AtomicBool::new(false));
1121 let test_sender = TestCfdpSender::default();
1122 let dest_handler = default_dest_handler(
1123 fault_handler,
1124 test_sender.clone(),
1125 check_timer_expired.clone(),
1126 );
1127 let (src_path, dest_path) = init_full_filenames();
1128 assert!(!Path::exists(&dest_path));
1129 let handler = Self {
1130 check_timer_expired,
1131 pdu_sender: test_sender,
1132 handler: dest_handler,
1133 src_path,
1134 closure_requested,
1135 dest_path,
1136 check_dest_file: false,
1137 check_handler_idle_at_drop: false,
1138 expected_file_size: 0,
1139 pdu_header: create_pdu_header(UbfU16::new(0)),
1140 expected_full_data: Vec::new(),
1141 buf: [0; 512],
1142 };
1143 handler.state_check(State::Idle, TransactionStep::Idle);
1144 handler
1145 }
1146
1147 fn dest_path(&self) -> &PathBuf {
1148 &self.dest_path
1149 }
1150
1151 #[allow(dead_code)]
1152 fn indication_cfg_mut(&mut self) -> &mut IndicationConfig {
1153 &mut self.handler.local_cfg.indication_cfg
1154 }
1155
1156 fn indication_cfg(&mut self) -> &IndicationConfig {
1157 &self.handler.local_cfg.indication_cfg
1158 }
1159
1160 fn set_check_timer_expired(&mut self) {
1161 self.check_timer_expired
1162 .store(true, core::sync::atomic::Ordering::Relaxed);
1163 }
1164
1165 fn test_user_from_cached_paths(&self, expected_file_size: u64) -> TestCfdpUser {
1166 TestCfdpUser::new(
1167 0,
1168 self.src_path.to_string_lossy().into(),
1169 self.dest_path.to_string_lossy().into(),
1170 expected_file_size,
1171 )
1172 }
1173
1174 fn generic_transfer_init(
1175 &mut self,
1176 user: &mut TestCfdpUser,
1177 file_size: u64,
1178 ) -> Result<TransactionId, DestError> {
1179 self.expected_file_size = file_size;
1180 let metadata_pdu = create_metadata_pdu(
1181 &self.pdu_header,
1182 self.src_path.as_path(),
1183 self.dest_path.as_path(),
1184 file_size,
1185 self.closure_requested,
1186 );
1187 let packet_info = create_packet_info(&metadata_pdu, &mut self.buf);
1188 self.handler.state_machine(user, Some(&packet_info))?;
1189 assert_eq!(user.metadata_recv_queue.len(), 1);
1190 assert_eq!(
1191 self.handler.transmission_mode().unwrap(),
1192 TransmissionMode::Unacknowledged
1193 );
1194 Ok(self.handler.transaction_id().unwrap())
1195 }
1196
1197 fn generic_file_data_insert(
1198 &mut self,
1199 user: &mut TestCfdpUser,
1200 offset: u64,
1201 file_data_chunk: &[u8],
1202 ) -> Result<u32, DestError> {
1203 let filedata_pdu =
1204 FileDataPdu::new_no_seg_metadata(self.pdu_header, offset, file_data_chunk);
1205 filedata_pdu
1206 .write_to_bytes(&mut self.buf)
1207 .expect("writing file data PDU failed");
1208 let packet_info = PacketInfo::new(&self.buf).expect("creating packet info failed");
1209 let result = self.handler.state_machine(user, Some(&packet_info));
1210 if self.indication_cfg().file_segment_recv {
1211 assert!(!user.file_seg_recvd_queue.is_empty());
1212 assert_eq!(user.file_seg_recvd_queue.back().unwrap().offset, offset);
1213 assert_eq!(
1214 user.file_seg_recvd_queue.back().unwrap().length,
1215 file_data_chunk.len()
1216 );
1217 }
1218 result
1219 }
1220
1221 fn generic_eof_no_error(
1222 &mut self,
1223 user: &mut TestCfdpUser,
1224 expected_full_data: Vec<u8>,
1225 ) -> Result<u32, DestError> {
1226 self.expected_full_data = expected_full_data;
1227 let eof_pdu = create_no_error_eof(&self.expected_full_data, &self.pdu_header);
1228 let packet_info = create_packet_info(&eof_pdu, &mut self.buf);
1229 self.check_handler_idle_at_drop = true;
1230 self.check_dest_file = true;
1231 let result = self.handler.state_machine(user, Some(&packet_info));
1232 if self.indication_cfg().eof_recv {
1233 assert_eq!(user.eof_recvd_call_count, 1);
1234 }
1235 result
1236 }
1237
1238 fn state_check(&self, state: State, step: TransactionStep) {
1239 assert_eq!(self.handler.state(), state);
1240 assert_eq!(self.handler.step(), step);
1241 }
1242 }
1243
1244 impl Drop for DestHandlerTester {
1245 fn drop(&mut self) {
1246 if self.check_handler_idle_at_drop {
1247 self.state_check(State::Idle, TransactionStep::Idle);
1248 }
1249 if self.check_dest_file {
1250 assert!(Path::exists(&self.dest_path));
1251 let read_content = fs::read(&self.dest_path).expect("reading back string failed");
1252 assert_eq!(read_content.len() as u64, self.expected_file_size);
1253 assert_eq!(read_content, self.expected_full_data);
1254 assert!(fs::remove_file(self.dest_path.as_path()).is_ok());
1255 }
1256 }
1257 }
1258
1259 fn init_full_filenames() -> (PathBuf, PathBuf) {
1260 (
1261 tempfile::TempPath::from_path("/tmp/test.txt").to_path_buf(),
1262 tempfile::NamedTempFile::new()
1263 .unwrap()
1264 .into_temp_path()
1265 .to_path_buf(),
1266 )
1267 }
1268
1269 fn basic_remote_cfg_table() -> StdRemoteEntityConfigProvider {
1270 let mut table = StdRemoteEntityConfigProvider::default();
1271 let remote_entity_cfg = RemoteEntityConfig::new_with_default_values(
1272 UnsignedByteFieldU16::new(1).into(),
1273 1024,
1274 1024,
1275 true,
1276 true,
1277 TransmissionMode::Unacknowledged,
1278 ChecksumType::Crc32,
1279 );
1280 table.add_config(&remote_entity_cfg);
1281 table
1282 }
1283
1284 fn default_dest_handler(
1285 test_fault_handler: TestFaultHandler,
1286 test_packet_sender: TestCfdpSender,
1287 check_timer_expired: Arc<AtomicBool>,
1288 ) -> DestinationHandler {
1289 let local_entity_cfg = LocalEntityConfig {
1290 id: REMOTE_ID.into(),
1291 indication_cfg: IndicationConfig::default(),
1292 default_fault_handler: DefaultFaultHandler::new(Box::new(test_fault_handler)),
1293 };
1294 DestinationHandler::new(
1295 local_entity_cfg,
1296 2048,
1297 Box::new(test_packet_sender),
1298 Box::<NativeFilestore>::default(),
1299 Box::new(basic_remote_cfg_table()),
1300 Box::new(TestCheckTimerCreator::new(check_timer_expired)),
1301 )
1302 }
1303
1304 fn create_pdu_header(seq_num: impl Into<UnsignedByteField>) -> PduHeader {
1305 let mut pdu_conf =
1306 CommonPduConfig::new_with_byte_fields(LOCAL_ID, REMOTE_ID, seq_num).unwrap();
1307 pdu_conf.trans_mode = TransmissionMode::Unacknowledged;
1308 PduHeader::new_no_file_data(pdu_conf, 0)
1309 }
1310
1311 fn create_metadata_pdu<'filename>(
1312 pdu_header: &PduHeader,
1313 src_name: &'filename Path,
1314 dest_name: &'filename Path,
1315 file_size: u64,
1316 closure_requested: bool,
1317 ) -> MetadataPduCreator<'filename, 'filename, 'static> {
1318 let checksum_type = if file_size == 0 {
1319 ChecksumType::NullChecksum
1320 } else {
1321 ChecksumType::Crc32
1322 };
1323 let metadata_params =
1324 MetadataGenericParams::new(closure_requested, checksum_type, file_size);
1325 MetadataPduCreator::new_no_opts(
1326 *pdu_header,
1327 metadata_params,
1328 Lv::new_from_str(src_name.as_os_str().to_str().unwrap()).unwrap(),
1329 Lv::new_from_str(dest_name.as_os_str().to_str().unwrap()).unwrap(),
1330 )
1331 }
1332
1333 fn create_packet_info<'a>(
1334 pdu: &'a impl WritablePduPacket,
1335 buf: &'a mut [u8],
1336 ) -> PacketInfo<'a> {
1337 let written_len = pdu
1338 .write_to_bytes(buf)
1339 .expect("writing metadata PDU failed");
1340 PacketInfo::new(&buf[..written_len]).expect("generating packet info failed")
1341 }
1342
1343 fn create_no_error_eof(file_data: &[u8], pdu_header: &PduHeader) -> EofPdu {
1344 let crc32 = if !file_data.is_empty() {
1345 let mut digest = CRC_32.digest();
1346 digest.update(file_data);
1347 digest.finalize()
1348 } else {
1349 0
1350 };
1351 EofPdu::new_no_error(*pdu_header, crc32, file_data.len() as u64)
1352 }
1353
1354 #[test]
1355 fn test_basic() {
1356 let fault_handler = TestFaultHandler::default();
1357 let test_sender = TestCfdpSender::default();
1358 let dest_handler = default_dest_handler(fault_handler.clone(), test_sender, Arc::default());
1359 assert!(dest_handler.transmission_mode().is_none());
1360 assert!(fault_handler.all_queues_empty());
1361 }
1362
1363 #[test]
1364 fn test_empty_file_transfer_not_acked_no_closure() {
1365 let fault_handler = TestFaultHandler::default();
1366 let mut test_obj = DestHandlerTester::new(fault_handler.clone(), false);
1367 let mut test_user = test_obj.test_user_from_cached_paths(0);
1368 test_obj
1369 .generic_transfer_init(&mut test_user, 0)
1370 .expect("transfer init failed");
1371 test_obj.state_check(State::Busy, TransactionStep::ReceivingFileDataPdus);
1372 test_obj
1373 .generic_eof_no_error(&mut test_user, Vec::new())
1374 .expect("EOF no error insertion failed");
1375 assert!(fault_handler.all_queues_empty());
1376 assert!(test_obj.pdu_sender.queue_empty());
1377 test_obj.state_check(State::Idle, TransactionStep::Idle);
1378 }
1379
1380 #[test]
1381 fn test_small_file_transfer_not_acked() {
1382 let file_data_str = "Hello World!";
1383 let file_data = file_data_str.as_bytes();
1384 let file_size = file_data.len() as u64;
1385 let fault_handler = TestFaultHandler::default();
1386
1387 let mut test_obj = DestHandlerTester::new(fault_handler.clone(), false);
1388 let mut test_user = test_obj.test_user_from_cached_paths(file_size);
1389 test_obj
1390 .generic_transfer_init(&mut test_user, file_size)
1391 .expect("transfer init failed");
1392 test_obj.state_check(State::Busy, TransactionStep::ReceivingFileDataPdus);
1393 test_obj
1394 .generic_file_data_insert(&mut test_user, 0, file_data)
1395 .expect("file data insertion failed");
1396 test_obj
1397 .generic_eof_no_error(&mut test_user, file_data.to_vec())
1398 .expect("EOF no error insertion failed");
1399 assert!(fault_handler.all_queues_empty());
1400 assert!(test_obj.pdu_sender.queue_empty());
1401 test_obj.state_check(State::Idle, TransactionStep::Idle);
1402 }
1403
1404 #[test]
1405 fn test_segmented_file_transfer_not_acked() {
1406 let mut rng = rand::thread_rng();
1407 let mut random_data = [0u8; 512];
1408 rng.fill(&mut random_data);
1409 let file_size = random_data.len() as u64;
1410 let segment_len = 256;
1411 let fault_handler = TestFaultHandler::default();
1412
1413 let mut test_obj = DestHandlerTester::new(fault_handler.clone(), false);
1414 let mut test_user = test_obj.test_user_from_cached_paths(file_size);
1415 test_obj
1416 .generic_transfer_init(&mut test_user, file_size)
1417 .expect("transfer init failed");
1418 test_obj.state_check(State::Busy, TransactionStep::ReceivingFileDataPdus);
1419 test_obj
1420 .generic_file_data_insert(&mut test_user, 0, &random_data[0..segment_len])
1421 .expect("file data insertion failed");
1422 test_obj
1423 .generic_file_data_insert(
1424 &mut test_user,
1425 segment_len as u64,
1426 &random_data[segment_len..],
1427 )
1428 .expect("file data insertion failed");
1429 test_obj
1430 .generic_eof_no_error(&mut test_user, random_data.to_vec())
1431 .expect("EOF no error insertion failed");
1432 assert!(fault_handler.all_queues_empty());
1433 assert!(test_obj.pdu_sender.queue_empty());
1434 test_obj.state_check(State::Idle, TransactionStep::Idle);
1435 }
1436
1437 #[test]
1438 fn test_check_limit_handling_transfer_success() {
1439 let mut rng = rand::thread_rng();
1440 let mut random_data = [0u8; 512];
1441 rng.fill(&mut random_data);
1442 let file_size = random_data.len() as u64;
1443 let segment_len = 256;
1444 let fault_handler = TestFaultHandler::default();
1445
1446 let mut test_obj = DestHandlerTester::new(fault_handler.clone(), false);
1447 let mut test_user = test_obj.test_user_from_cached_paths(file_size);
1448 let transaction_id = test_obj
1449 .generic_transfer_init(&mut test_user, file_size)
1450 .expect("transfer init failed");
1451
1452 test_obj.state_check(State::Busy, TransactionStep::ReceivingFileDataPdus);
1453 test_obj
1454 .generic_file_data_insert(&mut test_user, 0, &random_data[0..segment_len])
1455 .expect("file data insertion 0 failed");
1456 test_obj
1457 .generic_eof_no_error(&mut test_user, random_data.to_vec())
1458 .expect("EOF no error insertion failed");
1459 test_obj.state_check(
1460 State::Busy,
1461 TransactionStep::ReceivingFileDataPdusWithCheckLimitHandling,
1462 );
1463 test_obj
1464 .generic_file_data_insert(
1465 &mut test_user,
1466 segment_len as u64,
1467 &random_data[segment_len..],
1468 )
1469 .expect("file data insertion 1 failed");
1470 test_obj.set_check_timer_expired();
1471 test_obj
1472 .handler
1473 .state_machine(&mut test_user, None)
1474 .expect("fsm failure");
1475
1476 let ignored_queue = fault_handler.ignored_queue.lock().unwrap();
1477 assert_eq!(ignored_queue.len(), 1);
1478 let cancelled = *ignored_queue.front().unwrap();
1479 assert_eq!(cancelled.0, transaction_id);
1480 assert_eq!(cancelled.1, ConditionCode::FileChecksumFailure);
1481 assert_eq!(cancelled.2, segment_len as u64);
1482 assert!(test_obj.pdu_sender.queue_empty());
1483 test_obj.state_check(State::Idle, TransactionStep::Idle);
1484 }
1485
1486 #[test]
1487 fn test_check_limit_handling_limit_reached() {
1488 let mut rng = rand::thread_rng();
1489 let mut random_data = [0u8; 512];
1490 rng.fill(&mut random_data);
1491 let file_size = random_data.len() as u64;
1492 let segment_len = 256;
1493
1494 let fault_handler = TestFaultHandler::default();
1495 let mut test_obj = DestHandlerTester::new(fault_handler.clone(), false);
1496 let mut test_user = test_obj.test_user_from_cached_paths(file_size);
1497 let transaction_id = test_obj
1498 .generic_transfer_init(&mut test_user, file_size)
1499 .expect("transfer init failed");
1500
1501 test_obj.state_check(State::Busy, TransactionStep::ReceivingFileDataPdus);
1502 test_obj
1503 .generic_file_data_insert(&mut test_user, 0, &random_data[0..segment_len])
1504 .expect("file data insertion 0 failed");
1505 test_obj
1506 .generic_eof_no_error(&mut test_user, random_data.to_vec())
1507 .expect("EOF no error insertion failed");
1508 test_obj.state_check(
1509 State::Busy,
1510 TransactionStep::ReceivingFileDataPdusWithCheckLimitHandling,
1511 );
1512 test_obj.set_check_timer_expired();
1513 test_obj
1514 .handler
1515 .state_machine(&mut test_user, None)
1516 .expect("fsm error");
1517 test_obj.state_check(
1518 State::Busy,
1519 TransactionStep::ReceivingFileDataPdusWithCheckLimitHandling,
1520 );
1521 test_obj.set_check_timer_expired();
1522 test_obj
1523 .handler
1524 .state_machine(&mut test_user, None)
1525 .expect("fsm error");
1526 test_obj.state_check(State::Idle, TransactionStep::Idle);
1527
1528 assert!(fault_handler
1529 .notice_of_suspension_queue
1530 .lock()
1531 .unwrap()
1532 .is_empty());
1533
1534 let ignored_queue = fault_handler.ignored_queue.lock().unwrap();
1535 assert_eq!(ignored_queue.len(), 1);
1536 let cancelled = *ignored_queue.front().unwrap();
1537 assert_eq!(cancelled.0, transaction_id);
1538 assert_eq!(cancelled.1, ConditionCode::FileChecksumFailure);
1539 assert_eq!(cancelled.2, segment_len as u64);
1540
1541 let cancelled_queue = fault_handler.notice_of_cancellation_queue.lock().unwrap();
1542 assert_eq!(cancelled_queue.len(), 1);
1543 let cancelled = *cancelled_queue.front().unwrap();
1544 assert_eq!(cancelled.0, transaction_id);
1545 assert_eq!(cancelled.1, ConditionCode::CheckLimitReached);
1546 assert_eq!(cancelled.2, segment_len as u64);
1547
1548 drop(cancelled_queue);
1549
1550 assert!(test_obj.pdu_sender.queue_empty());
1551
1552 test_obj.check_dest_file = false;
1554 assert!(Path::exists(test_obj.dest_path()));
1555 let read_content = fs::read(test_obj.dest_path()).expect("reading back string failed");
1556 assert_eq!(read_content.len(), segment_len);
1557 assert_eq!(read_content, &random_data[0..segment_len]);
1558 assert!(fs::remove_file(test_obj.dest_path().as_path()).is_ok());
1559 }
1560
1561 fn check_finished_pdu_success(sent_pdu: &SentPdu) {
1562 assert_eq!(sent_pdu.pdu_type, PduType::FileDirective);
1563 assert_eq!(
1564 sent_pdu.file_directive_type,
1565 Some(FileDirectiveType::FinishedPdu)
1566 );
1567 let finished_pdu = FinishedPduReader::from_bytes(&sent_pdu.raw_pdu).unwrap();
1568 assert_eq!(finished_pdu.file_status(), FileStatus::Retained);
1569 assert_eq!(finished_pdu.condition_code(), ConditionCode::NoError);
1570 assert_eq!(finished_pdu.delivery_code(), DeliveryCode::Complete);
1571 assert!(finished_pdu.fault_location().is_none());
1572 assert_eq!(finished_pdu.fs_responses_raw(), &[]);
1573 }
1574
1575 #[test]
1576 fn test_file_transfer_with_closure() {
1577 let fault_handler = TestFaultHandler::default();
1578 let mut test_obj = DestHandlerTester::new(fault_handler.clone(), true);
1579 let mut test_user = test_obj.test_user_from_cached_paths(0);
1580 test_obj
1581 .generic_transfer_init(&mut test_user, 0)
1582 .expect("transfer init failed");
1583 test_obj.state_check(State::Busy, TransactionStep::ReceivingFileDataPdus);
1584 let sent_packets = test_obj
1585 .generic_eof_no_error(&mut test_user, Vec::new())
1586 .expect("EOF no error insertion failed");
1587 assert_eq!(sent_packets, 1);
1588 assert!(fault_handler.all_queues_empty());
1589 test_obj.state_check(State::Idle, TransactionStep::Idle);
1591 assert!(!test_obj.pdu_sender.queue_empty());
1592 let sent_pdu = test_obj.pdu_sender.retrieve_next_pdu().unwrap();
1593 check_finished_pdu_success(&sent_pdu);
1594 }
1595
1596 #[test]
1597 fn test_file_transfer_with_closure_check_limit_reached() {
1598 }
1600}