1use core::{cell::RefCell, ops::ControlFlow, str::Utf8Error};
40
41use spacepackets::{
42 cfdp::{
43 lv::Lv,
44 pdu::{
45 eof::EofPdu,
46 file_data::{
47 calculate_max_file_seg_len_for_max_packet_len_and_pdu_header,
48 FileDataPduCreatorWithReservedDatafield,
49 },
50 finished::{DeliveryCode, FileStatus, FinishedPduReader},
51 metadata::{MetadataGenericParams, MetadataPduCreator},
52 CfdpPdu, CommonPduConfig, FileDirectiveType, PduError, PduHeader, WritablePduPacket,
53 },
54 ConditionCode, Direction, LargeFileFlag, PduType, SegmentMetadataFlag, SegmentationControl,
55 TransmissionMode,
56 },
57 util::{UnsignedByteField, UnsignedEnum},
58 ByteConversionError,
59};
60
61use spacepackets::seq_count::SequenceCountProvider;
62
63use crate::{
64 time::CountdownProvider, DummyPduProvider, EntityType, GenericSendError, PduProvider,
65 TimerCreatorProvider,
66};
67
68use super::{
69 filestore::{FilestoreError, VirtualFilestore},
70 request::{ReadablePutRequest, StaticPutRequestCacher},
71 user::{CfdpUser, TransactionFinishedParams},
72 LocalEntityConfig, PacketTarget, PduSendProvider, RemoteEntityConfig,
73 RemoteEntityConfigProvider, State, TransactionId, UserFaultHookProvider,
74};
75
76#[derive(Debug, Copy, Clone, PartialEq, Eq)]
78#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
79#[cfg_attr(feature = "defmt", derive(defmt::Format))]
80pub enum TransactionStep {
81 Idle = 0,
82 TransactionStart = 1,
83 SendingMetadata = 3,
84 SendingFileData = 4,
85 Retransmitting = 5,
87 SendingEof = 6,
88 WaitingForEofAck = 7,
89 WaitingForFinished = 8,
90 NoticeOfCompletion = 10,
92}
93
94#[derive(Default)]
95pub struct FileParams {
96 pub progress: u64,
97 pub segment_len: u64,
98 pub crc32: u32,
99 pub metadata_only: bool,
100 pub file_size: u64,
101 pub empty_file: bool,
102}
103
104pub struct StateHelper {
105 state: super::State,
106 step: TransactionStep,
107 num_packets_ready: u32,
108}
109
110#[derive(Debug)]
111pub struct FinishedParams {
112 condition_code: ConditionCode,
113 delivery_code: DeliveryCode,
114 file_status: FileStatus,
115}
116
117#[derive(Debug, derive_new::new)]
118pub struct TransferState {
119 transaction_id: TransactionId,
120 remote_cfg: RemoteEntityConfig,
121 transmission_mode: super::TransmissionMode,
122 closure_requested: bool,
123 cond_code_eof: Option<ConditionCode>,
124 finished_params: Option<FinishedParams>,
125}
126
127impl Default for StateHelper {
128 fn default() -> Self {
129 Self {
130 state: super::State::Idle,
131 step: TransactionStep::Idle,
132 num_packets_ready: 0,
133 }
134 }
135}
136
137#[derive(Debug, thiserror::Error)]
138pub enum SourceError {
139 #[error("can not process packet type {pdu_type:?} with directive type {directive_type:?}")]
140 CantProcessPacketType {
141 pdu_type: PduType,
142 directive_type: Option<FileDirectiveType>,
143 },
144 #[error("unexpected PDU")]
145 UnexpectedPdu {
146 pdu_type: PduType,
147 directive_type: Option<FileDirectiveType>,
148 },
149 #[error("source handler is already busy with put request")]
150 PutRequestAlreadyActive,
151 #[error("error caching put request")]
152 PutRequestCaching(ByteConversionError),
153 #[error("filestore error: {0}")]
154 FilestoreError(#[from] FilestoreError),
155 #[error("source file does not have valid UTF8 format: {0}")]
156 SourceFileNotValidUtf8(Utf8Error),
157 #[error("destination file does not have valid UTF8 format: {0}")]
158 DestFileNotValidUtf8(Utf8Error),
159 #[error("error related to PDU creation: {0}")]
160 Pdu(#[from] PduError),
161 #[error("cfdp feature not implemented")]
162 NotImplemented,
163 #[error("issue sending PDU: {0}")]
164 SendError(#[from] GenericSendError),
165}
166
167#[derive(Debug, thiserror::Error)]
168pub enum PutRequestError {
169 #[error("error caching put request: {0}")]
170 Storage(#[from] ByteConversionError),
171 #[error("already busy with put request")]
172 AlreadyBusy,
173 #[error("no remote entity configuration found for {0:?}")]
174 NoRemoteCfgFound(UnsignedByteField),
175 #[error("source file does not have valid UTF8 format: {0}")]
176 SourceFileNotValidUtf8(#[from] Utf8Error),
177 #[error("source file does not exist")]
178 FileDoesNotExist,
179 #[error("filestore error: {0}")]
180 FilestoreError(#[from] FilestoreError),
181}
182
183pub struct SourceHandler<
211 PduSender: PduSendProvider,
212 UserFaultHook: UserFaultHookProvider,
213 Vfs: VirtualFilestore,
214 RemoteCfgTable: RemoteEntityConfigProvider,
215 TimerCreator: TimerCreatorProvider<Countdown = Countdown>,
216 Countdown: CountdownProvider,
217 SeqCountProvider: SequenceCountProvider,
218> {
219 local_cfg: LocalEntityConfig<UserFaultHook>,
220 pdu_sender: PduSender,
221 pdu_and_cksum_buffer: RefCell<alloc::vec::Vec<u8>>,
222 put_request_cacher: StaticPutRequestCacher,
223 remote_cfg_table: RemoteCfgTable,
224 vfs: Vfs,
225 state_helper: StateHelper,
226 tstate: Option<TransferState>,
228 fparams: FileParams,
230 pdu_conf: CommonPduConfig,
232 countdown: Option<Countdown>,
233 timer_creator: TimerCreator,
234 seq_count_provider: SeqCountProvider,
235}
236
237impl<
238 PduSender: PduSendProvider,
239 UserFaultHook: UserFaultHookProvider,
240 Vfs: VirtualFilestore,
241 RemoteCfgTable: RemoteEntityConfigProvider,
242 TimerCreator: TimerCreatorProvider<Countdown = Countdown>,
243 Countdown: CountdownProvider,
244 SeqCountProvider: SequenceCountProvider,
245 >
246 SourceHandler<
247 PduSender,
248 UserFaultHook,
249 Vfs,
250 RemoteCfgTable,
251 TimerCreator,
252 Countdown,
253 SeqCountProvider,
254 >
255{
256 #[allow(clippy::too_many_arguments)]
279 pub fn new(
280 cfg: LocalEntityConfig<UserFaultHook>,
281 pdu_sender: PduSender,
282 vfs: Vfs,
283 put_request_cacher: StaticPutRequestCacher,
284 pdu_and_cksum_buf_size: usize,
285 remote_cfg_table: RemoteCfgTable,
286 timer_creator: TimerCreator,
287 seq_count_provider: SeqCountProvider,
288 ) -> Self {
289 Self {
290 local_cfg: cfg,
291 remote_cfg_table,
292 pdu_sender,
293 pdu_and_cksum_buffer: RefCell::new(alloc::vec![0; pdu_and_cksum_buf_size]),
294 vfs,
295 put_request_cacher,
296 state_helper: Default::default(),
297 tstate: Default::default(),
298 fparams: Default::default(),
299 pdu_conf: Default::default(),
300 countdown: None,
301 timer_creator,
302 seq_count_provider,
303 }
304 }
305
306 pub fn state_machine_no_packet(
308 &mut self,
309 cfdp_user: &mut impl CfdpUser,
310 ) -> Result<u32, SourceError> {
311 self.state_machine(cfdp_user, None::<&DummyPduProvider>)
312 }
313
314 pub fn state_machine(
323 &mut self,
324 cfdp_user: &mut impl CfdpUser,
325 pdu: Option<&impl PduProvider>,
326 ) -> Result<u32, SourceError> {
327 if let Some(packet) = pdu {
328 self.insert_packet(cfdp_user, packet)?;
329 }
330 match self.state_helper.state {
331 super::State::Idle => {
332 Ok(0)
334 }
335 super::State::Busy => self.fsm_busy(cfdp_user, pdu),
336 super::State::Suspended => {
337 Ok(0)
339 }
340 }
341 }
342
343 fn insert_packet(
344 &mut self,
345 _cfdp_user: &mut impl CfdpUser,
346 packet_to_insert: &impl PduProvider,
347 ) -> Result<(), SourceError> {
348 if packet_to_insert.packet_target()? != PacketTarget::SourceEntity {
349 return Err(SourceError::CantProcessPacketType {
352 pdu_type: packet_to_insert.pdu_type(),
353 directive_type: packet_to_insert.file_directive_type(),
354 });
355 }
356 if packet_to_insert.pdu_type() == PduType::FileData {
357 return Err(SourceError::UnexpectedPdu {
360 pdu_type: PduType::FileData,
361 directive_type: None,
362 });
363 }
364 match packet_to_insert
367 .file_directive_type()
368 .expect("PDU directive type unexpectedly not set")
369 {
370 FileDirectiveType::FinishedPdu => self.handle_finished_pdu(packet_to_insert)?,
371 FileDirectiveType::NakPdu => self.handle_nak_pdu(),
372 FileDirectiveType::KeepAlivePdu => self.handle_keep_alive_pdu(),
373 FileDirectiveType::AckPdu => return Err(SourceError::NotImplemented),
374 FileDirectiveType::EofPdu
375 | FileDirectiveType::PromptPdu
376 | FileDirectiveType::MetadataPdu => {
377 return Err(SourceError::CantProcessPacketType {
378 pdu_type: packet_to_insert.pdu_type(),
379 directive_type: packet_to_insert.file_directive_type(),
380 });
381 }
382 }
383 Ok(())
384 }
385
386 pub fn put_request(
394 &mut self,
395 put_request: &impl ReadablePutRequest,
396 ) -> Result<(), PutRequestError> {
397 if self.state_helper.state != super::State::Idle {
398 return Err(PutRequestError::AlreadyBusy);
399 }
400 self.put_request_cacher.set(put_request)?;
401 let remote_cfg = self.remote_cfg_table.get(
402 self.put_request_cacher
403 .static_fields
404 .destination_id
405 .value_const(),
406 );
407 if remote_cfg.is_none() {
408 return Err(PutRequestError::NoRemoteCfgFound(
409 self.put_request_cacher.static_fields.destination_id,
410 ));
411 }
412 let remote_cfg = remote_cfg.unwrap();
413 self.state_helper.num_packets_ready = 0;
414 let transmission_mode = if self.put_request_cacher.static_fields.trans_mode.is_some() {
415 self.put_request_cacher.static_fields.trans_mode.unwrap()
416 } else {
417 remote_cfg.default_transmission_mode
418 };
419 let closure_requested = if self
420 .put_request_cacher
421 .static_fields
422 .closure_requested
423 .is_some()
424 {
425 self.put_request_cacher
426 .static_fields
427 .closure_requested
428 .unwrap()
429 } else {
430 remote_cfg.closure_requested_by_default
431 };
432 if self.put_request_cacher.has_source_file()
433 && !self.vfs.exists(self.put_request_cacher.source_file()?)?
434 {
435 return Err(PutRequestError::FileDoesNotExist);
436 }
437
438 let transaction_id = TransactionId::new(
439 self.local_cfg().id,
440 UnsignedByteField::new(
441 SeqCountProvider::MAX_BIT_WIDTH / 8,
442 self.seq_count_provider.get_and_increment().into(),
443 ),
444 );
445 let larger_entity_width = core::cmp::max(
449 self.local_cfg.id.size(),
450 self.put_request_cacher.static_fields.destination_id.size(),
451 );
452 let create_id = |cached_id: &UnsignedByteField| {
453 if larger_entity_width != cached_id.size() {
454 UnsignedByteField::new(larger_entity_width, cached_id.value_const())
455 } else {
456 *cached_id
457 }
458 };
459
460 self.pdu_conf
462 .set_source_and_dest_id(
463 create_id(&self.local_cfg.id),
464 create_id(&self.put_request_cacher.static_fields.destination_id),
465 )
466 .unwrap();
467 self.pdu_conf.direction = Direction::TowardsReceiver;
469 self.pdu_conf.crc_flag = remote_cfg.crc_on_transmission_by_default.into();
470 self.pdu_conf.transaction_seq_num = *transaction_id.seq_num();
471 self.pdu_conf.trans_mode = transmission_mode;
472 self.fparams.segment_len = self.calculate_max_file_seg_len(remote_cfg);
473
474 self.tstate = Some(TransferState {
476 transaction_id,
477 remote_cfg: *remote_cfg,
478 transmission_mode,
479 closure_requested,
480 cond_code_eof: None,
481 finished_params: None,
482 });
483 self.state_helper.state = super::State::Busy;
484 Ok(())
485 }
486
487 pub fn cancel_request(
499 &mut self,
500 user: &mut impl CfdpUser,
501 transaction_id: &TransactionId,
502 ) -> Result<bool, SourceError> {
503 if self.state_helper.state == super::State::Idle {
504 return Ok(false);
505 }
506 if let Some(active_id) = self.transaction_id() {
507 if active_id == *transaction_id {
508 self.notice_of_cancellation(user, ConditionCode::CancelRequestReceived)?;
509 return Ok(true);
510 }
511 }
512 Ok(false)
513 }
514
515 fn fsm_busy(
516 &mut self,
517 user: &mut impl CfdpUser,
518 pdu: Option<&impl PduProvider>,
519 ) -> Result<u32, SourceError> {
520 let mut sent_packets = 0;
521 if self.state_helper.step == TransactionStep::Idle {
522 self.state_helper.step = TransactionStep::TransactionStart;
523 }
524 if self.state_helper.step == TransactionStep::TransactionStart {
525 self.handle_transaction_start(user)?;
526 self.state_helper.step = TransactionStep::SendingMetadata;
527 }
528 if self.state_helper.step == TransactionStep::SendingMetadata {
529 self.prepare_and_send_metadata_pdu()?;
530 self.state_helper.step = TransactionStep::SendingFileData;
531 sent_packets += 1;
532 }
533 if self.state_helper.step == TransactionStep::SendingFileData {
534 if let ControlFlow::Break(packets) = self.file_data_fsm()? {
535 sent_packets += packets;
536 return Ok(sent_packets);
538 }
539 }
540 if self.state_helper.step == TransactionStep::SendingEof {
541 self.eof_fsm(user)?;
542 sent_packets += 1;
543 }
544 if self.state_helper.step == TransactionStep::WaitingForFinished {
545 self.handle_wait_for_finished_pdu(user, pdu)?;
546 }
547 if self.state_helper.step == TransactionStep::NoticeOfCompletion {
548 self.notice_of_completion(user);
549 self.reset();
550 }
551 Ok(sent_packets)
552 }
553
554 fn handle_wait_for_finished_pdu(
555 &mut self,
556 user: &mut impl CfdpUser,
557 packet: Option<&impl PduProvider>,
558 ) -> Result<(), SourceError> {
559 if let Some(packet) = packet {
560 if let Some(FileDirectiveType::FinishedPdu) = packet.file_directive_type() {
561 let finished_pdu = FinishedPduReader::new(packet.pdu())?;
562 self.tstate.as_mut().unwrap().finished_params = Some(FinishedParams {
563 condition_code: finished_pdu.condition_code(),
564 delivery_code: finished_pdu.delivery_code(),
565 file_status: finished_pdu.file_status(),
566 });
567 if self.transmission_mode().unwrap() == TransmissionMode::Acknowledged {
568 self.state_helper.step = TransactionStep::NoticeOfCompletion;
570 } else {
571 self.state_helper.step = TransactionStep::NoticeOfCompletion;
572 }
573 return Ok(());
574 }
575 }
576 if self.countdown.as_ref().unwrap().has_expired() {
578 self.declare_fault(user, ConditionCode::CheckLimitReached)?;
579 }
580 Ok(())
606 }
607
608 fn eof_fsm(&mut self, user: &mut impl CfdpUser) -> Result<(), SourceError> {
609 let tstate = self.tstate.as_ref().unwrap();
610 let checksum = self.vfs.calculate_checksum(
611 self.put_request_cacher.source_file().unwrap(),
612 tstate.remote_cfg.default_crc_type,
613 self.fparams.file_size,
614 self.pdu_and_cksum_buffer.get_mut(),
615 )?;
616 self.prepare_and_send_eof_pdu(user, checksum)?;
617 let tstate = self.tstate.as_ref().unwrap();
618 if tstate.transmission_mode == TransmissionMode::Unacknowledged {
619 if tstate.closure_requested {
620 self.countdown = Some(self.timer_creator.create_countdown(
621 crate::TimerContext::CheckLimit {
622 local_id: self.local_cfg.id,
623 remote_id: tstate.remote_cfg.entity_id,
624 entity_type: EntityType::Sending,
625 },
626 ));
627 self.state_helper.step = TransactionStep::WaitingForFinished;
628 } else {
629 self.state_helper.step = TransactionStep::NoticeOfCompletion;
630 }
631 } else {
632 }
634 Ok(())
655 }
656
657 fn handle_transaction_start(
658 &mut self,
659 cfdp_user: &mut impl CfdpUser,
660 ) -> Result<(), SourceError> {
661 let tstate = self
662 .tstate
663 .as_ref()
664 .expect("transfer state unexpectedly empty");
665 if !self.put_request_cacher.has_source_file() {
666 self.fparams.metadata_only = true;
667 } else {
668 let source_file = self
669 .put_request_cacher
670 .source_file()
671 .map_err(SourceError::SourceFileNotValidUtf8)?;
672 if !self.vfs.exists(source_file)? {
673 return Err(SourceError::FilestoreError(
674 FilestoreError::FileDoesNotExist,
675 ));
676 }
677 self.put_request_cacher
679 .dest_file()
680 .map_err(SourceError::DestFileNotValidUtf8)?;
681 self.fparams.file_size = self.vfs.file_size(source_file)?;
682 if self.fparams.file_size > u32::MAX as u64 {
683 self.pdu_conf.file_flag = LargeFileFlag::Large
684 } else {
685 if self.fparams.file_size == 0 {
686 self.fparams.empty_file = true;
687 }
688 self.pdu_conf.file_flag = LargeFileFlag::Normal
689 }
690 }
691 cfdp_user.transaction_indication(&tstate.transaction_id);
692 Ok(())
693 }
694
695 fn prepare_and_send_metadata_pdu(&mut self) -> Result<(), SourceError> {
696 let tstate = self
697 .tstate
698 .as_ref()
699 .expect("transfer state unexpectedly empty");
700 let metadata_params = MetadataGenericParams::new(
701 tstate.closure_requested,
702 tstate.remote_cfg.default_crc_type,
703 self.fparams.file_size,
704 );
705 if self.fparams.metadata_only {
706 let metadata_pdu = MetadataPduCreator::new(
707 PduHeader::new_no_file_data(self.pdu_conf, 0),
708 metadata_params,
709 Lv::new_empty(),
710 Lv::new_empty(),
711 self.put_request_cacher.opts_slice(),
712 );
713 return self.pdu_send_helper(&metadata_pdu);
714 }
715 let metadata_pdu = MetadataPduCreator::new(
716 PduHeader::new_no_file_data(self.pdu_conf, 0),
717 metadata_params,
718 Lv::new_from_str(self.put_request_cacher.source_file().unwrap()).unwrap(),
719 Lv::new_from_str(self.put_request_cacher.dest_file().unwrap()).unwrap(),
720 self.put_request_cacher.opts_slice(),
721 );
722 self.pdu_send_helper(&metadata_pdu)
723 }
724
725 fn file_data_fsm(&mut self) -> Result<ControlFlow<u32>, SourceError> {
726 if self.transmission_mode().unwrap() == super::TransmissionMode::Acknowledged {
727 }
729 if !self.fparams.metadata_only
730 && self.fparams.progress < self.fparams.file_size
731 && self.send_progressing_file_data_pdu()?
732 {
733 return Ok(ControlFlow::Break(1));
734 }
735 if self.fparams.empty_file || self.fparams.progress >= self.fparams.file_size {
736 self.state_helper.step = TransactionStep::SendingEof;
738 self.tstate.as_mut().unwrap().cond_code_eof = Some(ConditionCode::NoError);
739 } else if self.fparams.metadata_only {
740 if self.tstate.as_ref().unwrap().closure_requested {
742 self.state_helper.step = TransactionStep::WaitingForFinished;
743 } else {
744 self.state_helper.step = TransactionStep::NoticeOfCompletion;
745 }
746 }
747 Ok(ControlFlow::Continue(()))
748 }
749
750 fn notice_of_completion(&mut self, cfdp_user: &mut impl CfdpUser) {
751 let tstate = self.tstate.as_ref().unwrap();
752 if self.local_cfg.indication_cfg.transaction_finished {
753 let finished_params = if tstate.finished_params.is_none() {
755 TransactionFinishedParams {
756 id: tstate.transaction_id,
757 condition_code: ConditionCode::NoError,
758 delivery_code: DeliveryCode::Complete,
759 file_status: FileStatus::Unreported,
760 }
761 } else {
762 let finished_params = tstate.finished_params.as_ref().unwrap();
763 TransactionFinishedParams {
764 id: tstate.transaction_id,
765 condition_code: finished_params.condition_code,
766 delivery_code: finished_params.delivery_code,
767 file_status: finished_params.file_status,
768 }
769 };
770 cfdp_user.transaction_finished_indication(&finished_params);
771 }
772 }
773
774 fn calculate_max_file_seg_len(&self, remote_cfg: &RemoteEntityConfig) -> u64 {
775 let mut derived_max_seg_len = calculate_max_file_seg_len_for_max_packet_len_and_pdu_header(
776 &PduHeader::new_no_file_data(self.pdu_conf, 0),
777 remote_cfg.max_packet_len,
778 None,
779 );
780 if remote_cfg.max_file_segment_len.is_some() {
781 derived_max_seg_len = core::cmp::min(
782 remote_cfg.max_file_segment_len.unwrap(),
783 derived_max_seg_len,
784 );
785 }
786 derived_max_seg_len as u64
787 }
788
789 fn send_progressing_file_data_pdu(&mut self) -> Result<bool, SourceError> {
790 if self.fparams.progress >= self.fparams.file_size {
792 return Ok(false);
793 }
794 let read_len = if self.fparams.file_size < self.fparams.segment_len {
795 self.fparams.file_size
796 } else if self.fparams.progress + self.fparams.segment_len > self.fparams.file_size {
797 self.fparams.file_size - self.fparams.progress
798 } else {
799 self.fparams.segment_len
800 };
801 let pdu_creator = FileDataPduCreatorWithReservedDatafield::new_no_seg_metadata(
802 PduHeader::new_for_file_data(
803 self.pdu_conf,
804 0,
805 SegmentMetadataFlag::NotPresent,
806 SegmentationControl::NoRecordBoundaryPreservation,
807 ),
808 self.fparams.progress,
809 read_len,
810 );
811 let mut unwritten_pdu =
812 pdu_creator.write_to_bytes_partially(self.pdu_and_cksum_buffer.get_mut())?;
813 self.vfs.read_data(
814 self.put_request_cacher.source_file().unwrap(),
815 self.fparams.progress,
816 read_len,
817 unwritten_pdu.file_data_field_mut(),
818 )?;
819 let written_len = unwritten_pdu.finish();
820 self.pdu_sender.send_pdu(
821 PduType::FileData,
822 None,
823 &self.pdu_and_cksum_buffer.borrow()[0..written_len],
824 )?;
825 self.fparams.progress += read_len;
826 Ok(true)
868 }
869
870 fn prepare_and_send_eof_pdu(
871 &mut self,
872 cfdp_user: &mut impl CfdpUser,
873 checksum: u32,
874 ) -> Result<(), SourceError> {
875 let tstate = self
876 .tstate
877 .as_ref()
878 .expect("transfer state unexpectedly empty");
879 let eof_pdu = EofPdu::new(
880 PduHeader::new_no_file_data(self.pdu_conf, 0),
881 tstate.cond_code_eof.unwrap_or(ConditionCode::NoError),
882 checksum,
883 self.fparams.progress,
884 None,
885 );
886 self.pdu_send_helper(&eof_pdu)?;
887 if self.local_cfg.indication_cfg.eof_sent {
888 cfdp_user.eof_sent_indication(&tstate.transaction_id);
889 }
890 Ok(())
891 }
892
893 fn pdu_send_helper(&self, pdu: &(impl WritablePduPacket + CfdpPdu)) -> Result<(), SourceError> {
894 let mut pdu_buffer_mut = self.pdu_and_cksum_buffer.borrow_mut();
895 let written_len = pdu.write_to_bytes(&mut pdu_buffer_mut)?;
896 self.pdu_sender.send_pdu(
897 pdu.pdu_type(),
898 pdu.file_directive_type(),
899 &pdu_buffer_mut[0..written_len],
900 )?;
901 Ok(())
902 }
903
904 fn handle_finished_pdu(&mut self, pdu_provider: &impl PduProvider) -> Result<(), SourceError> {
905 if self.state_helper.state == State::Idle {
907 return Ok(());
908 }
909 if self.state_helper.step != TransactionStep::WaitingForFinished {
910 return Err(SourceError::UnexpectedPdu {
911 pdu_type: PduType::FileDirective,
912 directive_type: Some(FileDirectiveType::FinishedPdu),
913 });
914 }
915 let finished_pdu = FinishedPduReader::new(pdu_provider.pdu())?;
916 self.tstate.as_mut().unwrap().finished_params = Some(FinishedParams {
919 condition_code: finished_pdu.condition_code(),
920 delivery_code: finished_pdu.delivery_code(),
921 file_status: finished_pdu.file_status(),
922 });
923 if self.tstate.as_ref().unwrap().transmission_mode == TransmissionMode::Acknowledged {
924 }
927 self.state_helper.step = TransactionStep::NoticeOfCompletion;
928
929 Ok(())
937 }
938
939 fn handle_nak_pdu(&mut self) {}
940
941 fn handle_keep_alive_pdu(&mut self) {}
942
943 pub fn transaction_id(&self) -> Option<TransactionId> {
944 self.tstate.as_ref().map(|v| v.transaction_id)
945 }
946
947 #[inline]
949 pub fn transmission_mode(&self) -> Option<super::TransmissionMode> {
950 self.tstate.as_ref().map(|v| v.transmission_mode)
951 }
952
953 pub fn step(&self) -> TransactionStep {
956 self.state_helper.step
957 }
958
959 pub fn state(&self) -> State {
960 self.state_helper.state
961 }
962
963 pub fn local_cfg(&self) -> &LocalEntityConfig<UserFaultHook> {
964 &self.local_cfg
965 }
966
967 fn declare_fault(
968 &mut self,
969 user: &mut impl CfdpUser,
970 cond: ConditionCode,
971 ) -> Result<(), SourceError> {
972 let transaction_id = self.tstate.as_ref().unwrap().transaction_id;
974 let progress = self.fparams.progress;
975 let fh = self.local_cfg.fault_handler.get_fault_handler(cond);
976 match fh {
977 spacepackets::cfdp::FaultHandlerCode::NoticeOfCancellation => {
978 if let ControlFlow::Break(_) = self.notice_of_cancellation(user, cond)? {
979 return Ok(());
980 }
981 }
982 spacepackets::cfdp::FaultHandlerCode::NoticeOfSuspension => {
983 self.notice_of_suspension();
984 }
985 spacepackets::cfdp::FaultHandlerCode::IgnoreError => (),
986 spacepackets::cfdp::FaultHandlerCode::AbandonTransaction => self.abandon_transaction(),
987 }
988 self.local_cfg
989 .fault_handler
990 .report_fault(transaction_id, cond, progress);
991 Ok(())
992 }
993
994 fn notice_of_cancellation(
995 &mut self,
996 user: &mut impl CfdpUser,
997 condition_code: ConditionCode,
998 ) -> Result<ControlFlow<()>, SourceError> {
999 let transaction_id = self.tstate.as_ref().unwrap().transaction_id;
1000 if let Some(cond_code_eof) = self.tstate.as_ref().unwrap().cond_code_eof {
1003 if cond_code_eof != ConditionCode::NoError {
1004 self.local_cfg
1006 .fault_handler
1007 .user_hook
1008 .get_mut()
1009 .abandoned_cb(transaction_id, cond_code_eof, self.fparams.progress);
1010 self.abandon_transaction();
1011 return Ok(ControlFlow::Break(()));
1012 }
1013 }
1014
1015 let tstate = self.tstate.as_mut().unwrap();
1016 tstate.cond_code_eof = Some(condition_code);
1017 let checksum = self.vfs.calculate_checksum(
1020 self.put_request_cacher.source_file().unwrap(),
1021 tstate.remote_cfg.default_crc_type,
1022 self.fparams.progress,
1023 self.pdu_and_cksum_buffer.get_mut(),
1024 )?;
1025 self.prepare_and_send_eof_pdu(user, checksum)?;
1026 if self.transmission_mode().unwrap() == TransmissionMode::Unacknowledged {
1027 self.reset();
1029 } else {
1030 self.state_helper.step = TransactionStep::WaitingForEofAck;
1031 }
1032 Ok(ControlFlow::Continue(()))
1033 }
1034
1035 fn notice_of_suspension(&mut self) {}
1036
1037 fn abandon_transaction(&mut self) {
1038 self.reset();
1041 }
1042
1043 pub fn reset(&mut self) {
1074 self.state_helper = Default::default();
1075 self.tstate = None;
1076 self.fparams = Default::default();
1077 self.countdown = None;
1078 }
1079}
1080
1081#[cfg(test)]
1082mod tests {
1083 use core::time::Duration;
1084 use std::{fs::OpenOptions, io::Write, path::PathBuf, thread, vec::Vec};
1085
1086 use alloc::string::String;
1087 use rand::Rng;
1088 use spacepackets::{
1089 cfdp::{
1090 pdu::{
1091 file_data::FileDataPdu, finished::FinishedPduCreator, metadata::MetadataPduReader,
1092 },
1093 ChecksumType, CrcFlag,
1094 },
1095 util::UnsignedByteFieldU16,
1096 };
1097 use tempfile::TempPath;
1098
1099 use super::*;
1100 use crate::{
1101 filestore::NativeFilestore,
1102 request::PutRequestOwned,
1103 source::TransactionStep,
1104 tests::{basic_remote_cfg_table, SentPdu, TestCfdpSender, TestCfdpUser, TestFaultHandler},
1105 FaultHandler, IndicationConfig, PduRawWithInfo, StdCountdown,
1106 StdRemoteEntityConfigProvider, StdTimerCreator, CRC_32,
1107 };
1108 use spacepackets::seq_count::SeqCountProviderSimple;
1109
1110 const LOCAL_ID: UnsignedByteFieldU16 = UnsignedByteFieldU16::new(1);
1111 const REMOTE_ID: UnsignedByteFieldU16 = UnsignedByteFieldU16::new(2);
1112 const INVALID_ID: UnsignedByteFieldU16 = UnsignedByteFieldU16::new(5);
1113
1114 fn init_full_filepaths_textfile() -> (TempPath, PathBuf) {
1115 (
1116 tempfile::NamedTempFile::new().unwrap().into_temp_path(),
1117 tempfile::TempPath::from_path("/tmp/test.txt").to_path_buf(),
1118 )
1119 }
1120
1121 type TestSourceHandler = SourceHandler<
1122 TestCfdpSender,
1123 TestFaultHandler,
1124 NativeFilestore,
1125 StdRemoteEntityConfigProvider,
1126 StdTimerCreator,
1127 StdCountdown,
1128 SeqCountProviderSimple<u16>,
1129 >;
1130
1131 struct SourceHandlerTestbench {
1132 handler: TestSourceHandler,
1133 #[allow(dead_code)]
1134 srcfile_handle: TempPath,
1135 srcfile: String,
1136 destfile: String,
1137 max_packet_len: usize,
1138 check_idle_on_drop: bool,
1139 }
1140
1141 #[allow(dead_code)]
1142 struct TransferInfo {
1143 id: TransactionId,
1144 closure_requested: bool,
1145 pdu_header: PduHeader,
1146 }
1147
1148 impl SourceHandlerTestbench {
1149 fn new(
1150 crc_on_transmission_by_default: bool,
1151 test_fault_handler: TestFaultHandler,
1152 test_packet_sender: TestCfdpSender,
1153 max_packet_len: usize,
1154 ) -> Self {
1155 let local_entity_cfg = LocalEntityConfig {
1156 id: LOCAL_ID.into(),
1157 indication_cfg: IndicationConfig::default(),
1158 fault_handler: FaultHandler::new(test_fault_handler),
1159 };
1160 let static_put_request_cacher = StaticPutRequestCacher::new(2048);
1161 let (srcfile_handle, destfile) = init_full_filepaths_textfile();
1162 let srcfile = String::from(srcfile_handle.to_path_buf().to_str().unwrap());
1163 Self {
1164 handler: SourceHandler::new(
1165 local_entity_cfg,
1166 test_packet_sender,
1167 NativeFilestore::default(),
1168 static_put_request_cacher,
1169 1024,
1170 basic_remote_cfg_table(
1171 REMOTE_ID,
1172 max_packet_len,
1173 crc_on_transmission_by_default,
1174 ),
1175 StdTimerCreator::new(core::time::Duration::from_millis(100)),
1176 SeqCountProviderSimple::default(),
1177 ),
1178 srcfile_handle,
1179 srcfile,
1180 destfile: String::from(destfile.to_path_buf().to_str().unwrap()),
1181 max_packet_len,
1182 check_idle_on_drop: true,
1183 }
1184 }
1185
1186 fn create_user(&self, next_expected_seq_num: u64, filesize: u64) -> TestCfdpUser {
1187 TestCfdpUser::new(
1188 next_expected_seq_num,
1189 self.srcfile.clone(),
1190 self.destfile.clone(),
1191 filesize,
1192 )
1193 }
1194
1195 fn set_check_limit_timeout(&mut self, timeout: Duration) {
1196 self.handler.timer_creator.check_limit_timeout = timeout;
1197 }
1198
1199 fn put_request(
1200 &mut self,
1201 put_request: &impl ReadablePutRequest,
1202 ) -> Result<(), PutRequestError> {
1203 self.handler.put_request(put_request)
1204 }
1205
1206 fn all_fault_queues_empty(&self) -> bool {
1207 self.handler
1208 .local_cfg
1209 .user_fault_hook()
1210 .borrow()
1211 .all_queues_empty()
1212 }
1213
1214 #[allow(dead_code)]
1215 fn test_fault_handler(&self) -> &RefCell<TestFaultHandler> {
1216 self.handler.local_cfg.user_fault_hook()
1217 }
1218
1219 fn test_fault_handler_mut(&mut self) -> &mut RefCell<TestFaultHandler> {
1220 self.handler.local_cfg.user_fault_hook_mut()
1221 }
1222
1223 fn pdu_queue_empty(&self) -> bool {
1224 self.handler.pdu_sender.queue_empty()
1225 }
1226
1227 fn get_next_sent_pdu(&self) -> Option<SentPdu> {
1228 self.handler.pdu_sender.retrieve_next_pdu()
1229 }
1230
1231 fn common_pdu_check_for_file_transfer(&self, pdu_header: &PduHeader, crc_flag: CrcFlag) {
1232 assert_eq!(
1233 pdu_header.seg_ctrl(),
1234 SegmentationControl::NoRecordBoundaryPreservation
1235 );
1236 assert_eq!(
1237 pdu_header.seg_metadata_flag(),
1238 SegmentMetadataFlag::NotPresent
1239 );
1240 assert_eq!(pdu_header.common_pdu_conf().source_id(), LOCAL_ID.into());
1241 assert_eq!(pdu_header.common_pdu_conf().dest_id(), REMOTE_ID.into());
1242 assert_eq!(pdu_header.common_pdu_conf().crc_flag, crc_flag);
1243 assert_eq!(
1244 pdu_header.common_pdu_conf().trans_mode,
1245 TransmissionMode::Unacknowledged
1246 );
1247 assert_eq!(
1248 pdu_header.common_pdu_conf().direction,
1249 Direction::TowardsReceiver
1250 );
1251 assert_eq!(
1252 pdu_header.common_pdu_conf().file_flag,
1253 LargeFileFlag::Normal
1254 );
1255 assert_eq!(pdu_header.common_pdu_conf().transaction_seq_num.size(), 2);
1256 }
1257
1258 fn generic_file_transfer(
1259 &mut self,
1260 cfdp_user: &mut TestCfdpUser,
1261 with_closure: bool,
1262 file_data: Vec<u8>,
1263 ) -> (PduHeader, u32) {
1264 let mut digest = CRC_32.digest();
1265 digest.update(&file_data);
1266 let checksum = digest.finalize();
1267 cfdp_user.expected_full_src_name = self.srcfile.clone();
1268 cfdp_user.expected_full_dest_name = self.destfile.clone();
1269 cfdp_user.expected_file_size = file_data.len() as u64;
1270 let put_request = PutRequestOwned::new_regular_request(
1271 REMOTE_ID.into(),
1272 &self.srcfile,
1273 &self.destfile,
1274 Some(TransmissionMode::Unacknowledged),
1275 Some(with_closure),
1276 )
1277 .expect("creating put request failed");
1278 let transaction_info = self.common_no_acked_file_transfer(
1279 cfdp_user,
1280 put_request,
1281 cfdp_user.expected_file_size,
1282 );
1283 let mut current_offset = 0;
1284 let chunks = file_data.chunks(
1285 calculate_max_file_seg_len_for_max_packet_len_and_pdu_header(
1286 &transaction_info.pdu_header,
1287 self.max_packet_len,
1288 None,
1289 ),
1290 );
1291 let mut fd_pdus = 0;
1292 for segment in chunks {
1293 self.check_next_file_pdu(current_offset, segment);
1294 self.handler.state_machine_no_packet(cfdp_user).unwrap();
1295 fd_pdus += 1;
1296 current_offset += segment.len() as u64;
1297 }
1298 self.common_eof_pdu_check(
1299 cfdp_user,
1300 transaction_info.closure_requested,
1301 cfdp_user.expected_file_size,
1302 checksum,
1303 );
1304 (transaction_info.pdu_header, fd_pdus)
1305 }
1306
1307 fn common_no_acked_file_transfer(
1308 &mut self,
1309 cfdp_user: &mut TestCfdpUser,
1310 put_request: PutRequestOwned,
1311 filesize: u64,
1312 ) -> TransferInfo {
1313 assert_eq!(cfdp_user.transaction_indication_call_count, 0);
1314 assert_eq!(cfdp_user.eof_sent_call_count, 0);
1315
1316 self.put_request(&put_request)
1317 .expect("put_request call failed");
1318 assert_eq!(self.handler.state(), State::Busy);
1319 assert_eq!(self.handler.step(), TransactionStep::Idle);
1320 let id = self.handler.transaction_id().unwrap();
1321 let sent_packets = self
1322 .handler
1323 .state_machine_no_packet(cfdp_user)
1324 .expect("source handler FSM failure");
1325 assert_eq!(sent_packets, 2);
1326 assert!(!self.pdu_queue_empty());
1327 let next_pdu = self.get_next_sent_pdu().unwrap();
1328 assert!(!self.pdu_queue_empty());
1329 assert_eq!(next_pdu.pdu_type, PduType::FileDirective);
1330 assert_eq!(
1331 next_pdu.file_directive_type,
1332 Some(FileDirectiveType::MetadataPdu)
1333 );
1334 let metadata_pdu =
1335 MetadataPduReader::new(&next_pdu.raw_pdu).expect("invalid metadata PDU format");
1336 let pdu_header = metadata_pdu.pdu_header();
1337 self.common_pdu_check_for_file_transfer(metadata_pdu.pdu_header(), CrcFlag::NoCrc);
1338 assert_eq!(
1339 metadata_pdu
1340 .src_file_name()
1341 .value_as_str()
1342 .unwrap()
1343 .unwrap(),
1344 self.srcfile
1345 );
1346 assert_eq!(
1347 metadata_pdu
1348 .dest_file_name()
1349 .value_as_str()
1350 .unwrap()
1351 .unwrap(),
1352 self.destfile
1353 );
1354 assert_eq!(metadata_pdu.metadata_params().file_size, filesize);
1355 assert_eq!(
1356 metadata_pdu.metadata_params().checksum_type,
1357 ChecksumType::Crc32
1358 );
1359 let closure_requested = if let Some(closure_requested) = put_request.closure_requested {
1360 assert_eq!(
1361 metadata_pdu.metadata_params().closure_requested,
1362 closure_requested
1363 );
1364 closure_requested
1365 } else {
1366 assert!(metadata_pdu.metadata_params().closure_requested);
1367 metadata_pdu.metadata_params().closure_requested
1368 };
1369 assert_eq!(metadata_pdu.options(), &[]);
1370 TransferInfo {
1371 pdu_header: *pdu_header,
1372 closure_requested,
1373 id,
1374 }
1375 }
1376
1377 fn check_next_file_pdu(&mut self, expected_offset: u64, expected_data: &[u8]) {
1378 let next_pdu = self.get_next_sent_pdu().unwrap();
1379 assert_eq!(next_pdu.pdu_type, PduType::FileData);
1380 assert!(next_pdu.file_directive_type.is_none());
1381 let fd_pdu =
1382 FileDataPdu::from_bytes(&next_pdu.raw_pdu).expect("reading file data PDU failed");
1383 assert_eq!(fd_pdu.offset(), expected_offset);
1384 assert_eq!(fd_pdu.file_data(), expected_data);
1385 assert!(fd_pdu.segment_metadata().is_none());
1386 }
1387
1388 fn common_eof_pdu_check(
1389 &mut self,
1390 cfdp_user: &mut TestCfdpUser,
1391 closure_requested: bool,
1392 filesize: u64,
1393 checksum: u32,
1394 ) {
1395 let next_pdu = self.get_next_sent_pdu().unwrap();
1396 assert_eq!(next_pdu.pdu_type, PduType::FileDirective);
1397 assert_eq!(
1398 next_pdu.file_directive_type,
1399 Some(FileDirectiveType::EofPdu)
1400 );
1401 let eof_pdu = EofPdu::from_bytes(&next_pdu.raw_pdu).expect("invalid EOF PDU format");
1402 self.common_pdu_check_for_file_transfer(eof_pdu.pdu_header(), CrcFlag::NoCrc);
1403 assert_eq!(eof_pdu.condition_code(), ConditionCode::NoError);
1404 assert_eq!(eof_pdu.file_size(), filesize);
1405 assert_eq!(eof_pdu.file_checksum(), checksum);
1406 assert_eq!(
1407 eof_pdu
1408 .pdu_header()
1409 .common_pdu_conf()
1410 .transaction_seq_num
1411 .value_const(),
1412 0
1413 );
1414 if !closure_requested {
1415 assert_eq!(self.handler.state(), State::Idle);
1416 assert_eq!(self.handler.step(), TransactionStep::Idle);
1417 } else {
1418 assert_eq!(self.handler.state(), State::Busy);
1419 assert_eq!(self.handler.step(), TransactionStep::WaitingForFinished);
1420 }
1421 assert_eq!(cfdp_user.transaction_indication_call_count, 1);
1422 assert_eq!(cfdp_user.eof_sent_call_count, 1);
1423 self.all_fault_queues_empty();
1424 }
1425
1426 fn common_tiny_file_transfer(
1427 &mut self,
1428 cfdp_user: &mut TestCfdpUser,
1429 with_closure: bool,
1430 ) -> PduHeader {
1431 let mut file = OpenOptions::new()
1432 .write(true)
1433 .open(&self.srcfile)
1434 .expect("opening file failed");
1435 let content_str = "Hello World!";
1436 file.write_all(content_str.as_bytes())
1437 .expect("writing file content failed");
1438 drop(file);
1439 let (pdu_header, fd_pdus) = self.generic_file_transfer(
1440 cfdp_user,
1441 with_closure,
1442 content_str.as_bytes().to_vec(),
1443 );
1444 assert_eq!(fd_pdus, 1);
1445 pdu_header
1446 }
1447
1448 fn finish_handling(&mut self, user: &mut TestCfdpUser, pdu_header: PduHeader) {
1449 let finished_pdu = FinishedPduCreator::new_default(
1450 pdu_header,
1451 DeliveryCode::Complete,
1452 FileStatus::Retained,
1453 );
1454 let finished_pdu_vec = finished_pdu.to_vec().unwrap();
1455 let packet_info = PduRawWithInfo::new(&finished_pdu_vec).unwrap();
1456 self.handler
1457 .state_machine(user, Some(&packet_info))
1458 .unwrap();
1459 }
1460 }
1461
1462 impl Drop for SourceHandlerTestbench {
1463 fn drop(&mut self) {
1464 self.all_fault_queues_empty();
1465 if self.check_idle_on_drop {
1466 assert_eq!(self.handler.state(), State::Idle);
1467 assert_eq!(self.handler.step(), TransactionStep::Idle);
1468 }
1469 }
1470 }
1471
1472 #[test]
1473 fn test_basic() {
1474 let fault_handler = TestFaultHandler::default();
1475 let test_sender = TestCfdpSender::default();
1476 let tb = SourceHandlerTestbench::new(false, fault_handler, test_sender, 512);
1477 assert!(tb.handler.transmission_mode().is_none());
1478 assert!(tb.pdu_queue_empty());
1479 }
1480
1481 #[test]
1482 fn test_empty_file_transfer_not_acked_no_closure() {
1483 let fault_handler = TestFaultHandler::default();
1484 let test_sender = TestCfdpSender::default();
1485 let mut tb = SourceHandlerTestbench::new(false, fault_handler, test_sender, 512);
1486 let filesize = 0;
1487 let put_request = PutRequestOwned::new_regular_request(
1488 REMOTE_ID.into(),
1489 &tb.srcfile,
1490 &tb.destfile,
1491 Some(TransmissionMode::Unacknowledged),
1492 Some(false),
1493 )
1494 .expect("creating put request failed");
1495 let mut cfdp_user = tb.create_user(0, filesize);
1496 let transaction_info =
1497 tb.common_no_acked_file_transfer(&mut cfdp_user, put_request, filesize);
1498 tb.common_eof_pdu_check(
1499 &mut cfdp_user,
1500 transaction_info.closure_requested,
1501 filesize,
1502 CRC_32.digest().finalize(),
1503 )
1504 }
1505
1506 #[test]
1507 fn test_tiny_file_transfer_not_acked_no_closure() {
1508 let fault_handler = TestFaultHandler::default();
1509 let test_sender = TestCfdpSender::default();
1510 let mut cfdp_user = TestCfdpUser::default();
1511 let mut tb = SourceHandlerTestbench::new(false, fault_handler, test_sender, 512);
1512 tb.common_tiny_file_transfer(&mut cfdp_user, false);
1513 }
1514
1515 #[test]
1516 fn test_tiny_file_transfer_not_acked_with_closure() {
1517 let fault_handler = TestFaultHandler::default();
1518 let test_sender = TestCfdpSender::default();
1519 let mut tb = SourceHandlerTestbench::new(false, fault_handler, test_sender, 512);
1520 let mut cfdp_user = TestCfdpUser::default();
1521 let pdu_header = tb.common_tiny_file_transfer(&mut cfdp_user, true);
1522 tb.finish_handling(&mut cfdp_user, pdu_header)
1523 }
1524
1525 #[test]
1526 fn test_two_segment_file_transfer_not_acked_no_closure() {
1527 let fault_handler = TestFaultHandler::default();
1528 let test_sender = TestCfdpSender::default();
1529 let mut tb = SourceHandlerTestbench::new(false, fault_handler, test_sender, 128);
1530 let mut cfdp_user = TestCfdpUser::default();
1531 let mut file = OpenOptions::new()
1532 .write(true)
1533 .open(&tb.srcfile)
1534 .expect("opening file failed");
1535 let mut rand_data = [0u8; 140];
1536 rand::thread_rng().fill(&mut rand_data[..]);
1537 file.write_all(&rand_data)
1538 .expect("writing file content failed");
1539 drop(file);
1540 let (_, fd_pdus) = tb.generic_file_transfer(&mut cfdp_user, false, rand_data.to_vec());
1541 assert_eq!(fd_pdus, 2);
1542 }
1543
1544 #[test]
1545 fn test_two_segment_file_transfer_not_acked_with_closure() {
1546 let fault_handler = TestFaultHandler::default();
1547 let test_sender = TestCfdpSender::default();
1548 let mut tb = SourceHandlerTestbench::new(false, fault_handler, test_sender, 128);
1549 let mut cfdp_user = TestCfdpUser::default();
1550 let mut file = OpenOptions::new()
1551 .write(true)
1552 .open(&tb.srcfile)
1553 .expect("opening file failed");
1554 let mut rand_data = [0u8; 140];
1555 rand::thread_rng().fill(&mut rand_data[..]);
1556 file.write_all(&rand_data)
1557 .expect("writing file content failed");
1558 drop(file);
1559 let (pdu_header, fd_pdus) =
1560 tb.generic_file_transfer(&mut cfdp_user, true, rand_data.to_vec());
1561 assert_eq!(fd_pdus, 2);
1562 tb.finish_handling(&mut cfdp_user, pdu_header)
1563 }
1564
1565 #[test]
1566 fn test_empty_file_transfer_not_acked_with_closure() {
1567 let fault_handler = TestFaultHandler::default();
1568 let test_sender = TestCfdpSender::default();
1569 let mut tb = SourceHandlerTestbench::new(false, fault_handler, test_sender, 512);
1570 let filesize = 0;
1571 let put_request = PutRequestOwned::new_regular_request(
1572 REMOTE_ID.into(),
1573 &tb.srcfile,
1574 &tb.destfile,
1575 Some(TransmissionMode::Unacknowledged),
1576 Some(true),
1577 )
1578 .expect("creating put request failed");
1579 let mut cfdp_user = tb.create_user(0, filesize);
1580 let transaction_info =
1581 tb.common_no_acked_file_transfer(&mut cfdp_user, put_request, filesize);
1582 tb.common_eof_pdu_check(
1583 &mut cfdp_user,
1584 transaction_info.closure_requested,
1585 filesize,
1586 CRC_32.digest().finalize(),
1587 );
1588 tb.finish_handling(&mut cfdp_user, transaction_info.pdu_header)
1589 }
1590
1591 #[test]
1592 fn test_put_request_no_remote_cfg() {
1593 let fault_handler = TestFaultHandler::default();
1594 let test_sender = TestCfdpSender::default();
1595 let mut tb = SourceHandlerTestbench::new(false, fault_handler, test_sender, 512);
1596
1597 let (srcfile, destfile) = init_full_filepaths_textfile();
1598 let srcfile_str = String::from(srcfile.to_str().unwrap());
1599 let destfile_str = String::from(destfile.to_str().unwrap());
1600 let put_request = PutRequestOwned::new_regular_request(
1601 INVALID_ID.into(),
1602 &srcfile_str,
1603 &destfile_str,
1604 Some(TransmissionMode::Unacknowledged),
1605 Some(true),
1606 )
1607 .expect("creating put request failed");
1608 let error = tb.handler.put_request(&put_request);
1609 assert!(error.is_err());
1610 let error = error.unwrap_err();
1611 if let PutRequestError::NoRemoteCfgFound(id) = error {
1612 assert_eq!(id, INVALID_ID.into());
1613 } else {
1614 panic!("unexpected error type: {:?}", error);
1615 }
1616 }
1617
1618 #[test]
1619 fn test_put_request_file_does_not_exist() {
1620 let fault_handler = TestFaultHandler::default();
1621 let test_sender = TestCfdpSender::default();
1622 let mut tb = SourceHandlerTestbench::new(false, fault_handler, test_sender, 512);
1623
1624 let file_which_does_not_exist = "/tmp/this_file_does_not_exist.txt";
1625 let destfile = "/tmp/tmp.txt";
1626 let put_request = PutRequestOwned::new_regular_request(
1627 REMOTE_ID.into(),
1628 file_which_does_not_exist,
1629 destfile,
1630 Some(TransmissionMode::Unacknowledged),
1631 Some(true),
1632 )
1633 .expect("creating put request failed");
1634 let error = tb.put_request(&put_request);
1635 assert!(error.is_err());
1636 let error = error.unwrap_err();
1637 if !matches!(error, PutRequestError::FileDoesNotExist) {
1638 panic!("unexpected error type: {:?}", error);
1639 }
1640 }
1641
1642 #[test]
1643 fn test_finished_pdu_check_timeout() {
1644 let fault_handler = TestFaultHandler::default();
1645 let test_sender = TestCfdpSender::default();
1646 let mut tb = SourceHandlerTestbench::new(false, fault_handler, test_sender, 512);
1647 tb.set_check_limit_timeout(Duration::from_millis(45));
1648 let filesize = 0;
1649 let put_request = PutRequestOwned::new_regular_request(
1650 REMOTE_ID.into(),
1651 &tb.srcfile,
1652 &tb.destfile,
1653 Some(TransmissionMode::Unacknowledged),
1654 Some(true),
1655 )
1656 .expect("creating put request failed");
1657 let mut cfdp_user = tb.create_user(0, filesize);
1658 let transaction_info =
1659 tb.common_no_acked_file_transfer(&mut cfdp_user, put_request, filesize);
1660 let expected_id = tb.handler.transaction_id().unwrap();
1661 tb.common_eof_pdu_check(
1662 &mut cfdp_user,
1663 transaction_info.closure_requested,
1664 filesize,
1665 CRC_32.digest().finalize(),
1666 );
1667 thread::sleep(Duration::from_millis(50));
1671 assert_eq!(
1672 tb.handler.state_machine_no_packet(&mut cfdp_user).unwrap(),
1673 0
1674 );
1675 let next_pdu = tb.get_next_sent_pdu().unwrap();
1676 let eof_pdu = EofPdu::from_bytes(&next_pdu.raw_pdu).expect("invalid EOF PDU format");
1677 tb.common_pdu_check_for_file_transfer(eof_pdu.pdu_header(), CrcFlag::NoCrc);
1678 assert_eq!(eof_pdu.condition_code(), ConditionCode::CheckLimitReached);
1679 assert_eq!(eof_pdu.file_size(), 0);
1680 assert_eq!(eof_pdu.file_checksum(), 0);
1681
1682 let fault_handler = tb.test_fault_handler_mut();
1684 let fh_ref_mut = fault_handler.get_mut();
1685 assert!(!fh_ref_mut.cancellation_queue_empty());
1686 assert_eq!(fh_ref_mut.notice_of_cancellation_queue.len(), 1);
1687 let (id, cond_code, progress) = fh_ref_mut.notice_of_cancellation_queue.pop_back().unwrap();
1688 assert_eq!(id, expected_id);
1689 assert_eq!(cond_code, ConditionCode::CheckLimitReached);
1690 assert_eq!(progress, 0);
1691 fh_ref_mut.all_queues_empty();
1692 }
1693
1694 #[test]
1695 fn test_cancelled_transfer_empty_file() {
1696 let fault_handler = TestFaultHandler::default();
1697 let test_sender = TestCfdpSender::default();
1698 let mut tb = SourceHandlerTestbench::new(false, fault_handler, test_sender, 512);
1699 let filesize = 0;
1700 let put_request = PutRequestOwned::new_regular_request(
1701 REMOTE_ID.into(),
1702 &tb.srcfile,
1703 &tb.destfile,
1704 Some(TransmissionMode::Unacknowledged),
1705 Some(false),
1706 )
1707 .expect("creating put request failed");
1708 let mut cfdp_user = tb.create_user(0, filesize);
1709 assert_eq!(cfdp_user.transaction_indication_call_count, 0);
1710 assert_eq!(cfdp_user.eof_sent_call_count, 0);
1711
1712 tb.put_request(&put_request)
1713 .expect("put_request call failed");
1714 assert_eq!(tb.handler.state(), State::Busy);
1715 assert_eq!(tb.handler.step(), TransactionStep::Idle);
1716 assert!(tb.get_next_sent_pdu().is_none());
1717 let id = tb.handler.transaction_id().unwrap();
1718 tb.handler
1719 .cancel_request(&mut cfdp_user, &id)
1720 .expect("transaction cancellation failed");
1721 assert_eq!(tb.handler.state(), State::Idle);
1722 assert_eq!(tb.handler.step(), TransactionStep::Idle);
1723 let eof_pdu = tb
1725 .get_next_sent_pdu()
1726 .expect("no EOF PDU generated like expected");
1727 assert_eq!(
1728 eof_pdu.file_directive_type.unwrap(),
1729 FileDirectiveType::EofPdu
1730 );
1731 let eof_pdu = EofPdu::from_bytes(&eof_pdu.raw_pdu).unwrap();
1732 assert_eq!(
1733 eof_pdu.condition_code(),
1734 ConditionCode::CancelRequestReceived
1735 );
1736 assert_eq!(eof_pdu.file_checksum(), 0);
1737 assert_eq!(eof_pdu.file_size(), 0);
1738 tb.common_pdu_check_for_file_transfer(eof_pdu.pdu_header(), CrcFlag::NoCrc);
1739 }
1740
1741 #[test]
1742 fn test_cancelled_transfer_mid_transfer() {
1743 let fault_handler = TestFaultHandler::default();
1744 let test_sender = TestCfdpSender::default();
1745 let mut tb = SourceHandlerTestbench::new(false, fault_handler, test_sender, 128);
1746 let mut file = OpenOptions::new()
1747 .write(true)
1748 .open(&tb.srcfile)
1749 .expect("opening file failed");
1750 let mut rand_data = [0u8; 140];
1751 rand::thread_rng().fill(&mut rand_data[..]);
1752 file.write_all(&rand_data)
1753 .expect("writing file content failed");
1754 drop(file);
1755 let put_request = PutRequestOwned::new_regular_request(
1756 REMOTE_ID.into(),
1757 &tb.srcfile,
1758 &tb.destfile,
1759 Some(TransmissionMode::Unacknowledged),
1760 Some(false),
1761 )
1762 .expect("creating put request failed");
1763 let file_size = rand_data.len() as u64;
1764 let mut cfdp_user = tb.create_user(0, file_size);
1765 let transaction_info =
1766 tb.common_no_acked_file_transfer(&mut cfdp_user, put_request, file_size);
1767 let mut chunks = rand_data.chunks(
1768 calculate_max_file_seg_len_for_max_packet_len_and_pdu_header(
1769 &transaction_info.pdu_header,
1770 tb.max_packet_len,
1771 None,
1772 ),
1773 );
1774 let mut digest = CRC_32.digest();
1775 let first_chunk = chunks.next().expect("no chunk found");
1776 digest.update(first_chunk);
1777 let checksum = digest.finalize();
1778 let next_packet = tb.get_next_sent_pdu().unwrap();
1779 assert_eq!(next_packet.pdu_type, PduType::FileData);
1780 let fd_pdu = FileDataPdu::from_bytes(&next_packet.raw_pdu).unwrap();
1781 assert_eq!(fd_pdu.file_data(), &rand_data[0..first_chunk.len()]);
1782 let expected_id = tb.handler.transaction_id().unwrap();
1783 assert!(tb
1784 .handler
1785 .cancel_request(&mut cfdp_user, &expected_id)
1786 .expect("cancellation failed"));
1787 assert_eq!(tb.handler.state(), State::Idle);
1788 assert_eq!(tb.handler.step(), TransactionStep::Idle);
1789 let next_packet = tb.get_next_sent_pdu().unwrap();
1790 assert_eq!(next_packet.pdu_type, PduType::FileDirective);
1791 assert_eq!(
1792 next_packet.file_directive_type.unwrap(),
1793 FileDirectiveType::EofPdu
1794 );
1795 let eof_pdu = EofPdu::from_bytes(&next_packet.raw_pdu).expect("EOF PDU creation failed");
1798 assert_eq!(eof_pdu.file_size(), first_chunk.len() as u64);
1799 assert_eq!(eof_pdu.file_checksum(), checksum);
1800 assert_eq!(
1801 eof_pdu.condition_code(),
1802 ConditionCode::CancelRequestReceived
1803 );
1804 }
1805}