1use core::{
40 cell::{Cell, RefCell},
41 ops::ControlFlow,
42 str::Utf8Error,
43};
44
45use spacepackets::{
46 ByteConversionError,
47 cfdp::{
48 ConditionCode, Direction, FaultHandlerCode, LargeFileFlag, PduType, SegmentMetadataFlag,
49 SegmentationControl, TransactionStatus, TransmissionMode,
50 lv::Lv,
51 pdu::{
52 CfdpPdu, CommonPduConfig, FileDirectiveType, PduError, PduHeader, WritablePduPacket,
53 ack::AckPdu,
54 eof::EofPdu,
55 file_data::{
56 FileDataPduCreatorWithReservedDatafield,
57 calculate_max_file_seg_len_for_max_packet_len_and_pdu_header,
58 },
59 finished::{DeliveryCode, FileStatus, FinishedPduReader},
60 metadata::{MetadataGenericParams, MetadataPduCreator},
61 nak::NakPduReader,
62 },
63 },
64 util::{UnsignedByteField, UnsignedEnum},
65};
66
67use spacepackets::seq_count::SequenceCounter;
68
69use crate::{
70 DummyPduProvider, EntityType, FaultInfo, GenericSendError, PduProvider, PositiveAckParams,
71 TimerCreator, time::Countdown,
72};
73
74use super::{
75 LocalEntityConfig, PacketTarget, PduSender, RemoteConfigStore, RemoteEntityConfig, State,
76 TransactionId, UserFaultHook,
77 filestore::{FilestoreError, VirtualFilestore},
78 request::{ReadablePutRequest, StaticPutRequestCacher},
79 user::{CfdpUser, TransactionFinishedParams},
80};
81
82#[derive(Debug, Copy, Clone, PartialEq, Eq)]
84#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
85#[cfg_attr(feature = "defmt", derive(defmt::Format))]
86pub enum TransactionStep {
87 Idle = 0,
88 TransactionStart = 1,
89 SendingMetadata = 3,
90 SendingFileData = 4,
91 Retransmitting = 5,
93 SendingEof = 6,
94 WaitingForEofAck = 7,
95 WaitingForFinished = 8,
96 NoticeOfCompletion = 10,
97}
98
99#[derive(Default, Debug, Copy, Clone)]
100pub struct FileParams {
101 pub progress: u64,
102 pub segment_len: u64,
103 pub crc32: u32,
104 pub metadata_only: bool,
105 pub file_size: u64,
106 pub empty_file: bool,
107 pub checksum_completed_file: Option<u32>,
110}
111
112struct StateHelper {
116 step: Cell<TransactionStep>,
117 state: Cell<super::State>,
118 num_packets_ready: Cell<u32>,
119}
120
121impl Default for StateHelper {
122 fn default() -> Self {
123 Self {
124 state: Cell::new(super::State::Idle),
125 step: Cell::new(TransactionStep::Idle),
126 num_packets_ready: Cell::new(0),
127 }
128 }
129}
130
131impl StateHelper {
132 #[allow(dead_code)]
133 pub fn reset(&self) {
134 self.step.set(TransactionStep::Idle);
135 self.state.set(super::State::Idle);
136 self.num_packets_ready.set(0);
137 }
138}
139
140#[derive(Debug, Copy, Clone)]
141pub struct FinishedParams {
142 condition_code: ConditionCode,
143 delivery_code: DeliveryCode,
144 file_status: FileStatus,
145}
146
147#[derive(Debug, thiserror::Error)]
148pub enum SourceError {
149 #[error("can not process packet type {pdu_type:?} with directive type {directive_type:?}")]
150 CantProcessPacketType {
151 pdu_type: PduType,
152 directive_type: Option<FileDirectiveType>,
153 },
154 #[error("unexpected PDU")]
155 UnexpectedPdu {
156 pdu_type: PduType,
157 directive_type: Option<FileDirectiveType>,
158 },
159 #[error("source handler is already busy with put request")]
160 PutRequestAlreadyActive,
161 #[error("error caching put request")]
162 PutRequestCaching(ByteConversionError),
163 #[error("filestore error: {0}")]
164 FilestoreError(#[from] FilestoreError),
165 #[error("source file does not have valid UTF8 format: {0}")]
166 SourceFileNotValidUtf8(Utf8Error),
167 #[error("destination file does not have valid UTF8 format: {0}")]
168 DestFileNotValidUtf8(Utf8Error),
169 #[error("invalid NAK PDU received")]
170 InvalidNakPdu,
171 #[error("error related to PDU creation: {0}")]
172 Pdu(#[from] PduError),
173 #[error("cfdp feature not implemented")]
174 NotImplemented,
175 #[error("issue sending PDU: {0}")]
176 SendError(#[from] GenericSendError),
177}
178
179#[derive(Debug, thiserror::Error)]
180pub enum PutRequestError {
181 #[error("error caching put request: {0}")]
182 Storage(#[from] ByteConversionError),
183 #[error("already busy with put request")]
184 AlreadyBusy,
185 #[error("no remote entity configuration found for {0:?}")]
186 NoRemoteCfgFound(UnsignedByteField),
187 #[error("source file does not have valid UTF8 format: {0}")]
188 SourceFileNotValidUtf8(#[from] Utf8Error),
189 #[error("source file does not exist")]
190 FileDoesNotExist,
191 #[error("filestore error: {0}")]
192 FilestoreError(#[from] FilestoreError),
193}
194
195#[derive(Debug, Default, Clone, Copy)]
196pub struct AnomalyTracker {
197 invalid_ack_directive_code: u8,
198}
199
200#[derive(Debug, Default, PartialEq, Eq)]
201pub enum FsmContext {
202 #[default]
203 None,
204 ResetWhenPossible,
205}
206
207#[derive(Debug)]
208pub struct TransactionParams<CountdownInstance: Countdown> {
209 transaction_id: Option<TransactionId>,
210 remote_cfg: Option<RemoteEntityConfig>,
211 transmission_mode: Option<super::TransmissionMode>,
212 closure_requested: bool,
213 cond_code_eof: Cell<Option<ConditionCode>>,
214 finished_params: Option<FinishedParams>,
215 file_params: FileParams,
217 pdu_conf: CommonPduConfig,
219 check_timer: Option<CountdownInstance>,
220 positive_ack_params: Cell<Option<PositiveAckParams>>,
221 ack_timer: RefCell<Option<CountdownInstance>>,
222}
223
224impl<CountdownInstance: Countdown> Default for TransactionParams<CountdownInstance> {
225 fn default() -> Self {
226 Self {
227 transaction_id: Default::default(),
228 remote_cfg: Default::default(),
229 transmission_mode: Default::default(),
230 closure_requested: Default::default(),
231 cond_code_eof: Default::default(),
232 finished_params: Default::default(),
233 file_params: Default::default(),
234 pdu_conf: Default::default(),
235 check_timer: Default::default(),
236 positive_ack_params: Default::default(),
237 ack_timer: Default::default(),
238 }
239 }
240}
241
242impl<CountdownInstance: Countdown> TransactionParams<CountdownInstance> {
243 #[inline]
244 fn reset(&mut self) {
245 self.transaction_id = None;
246 self.transmission_mode = None;
247 }
248}
249
250pub struct SourceHandler<
279 PduSenderInstance: PduSender,
280 UserFaultHookInstance: UserFaultHook,
281 Vfs: VirtualFilestore,
282 RemoteConfigStoreInstance: RemoteConfigStore,
283 TimerCreatorInstance: TimerCreator<Countdown = CountdownInstance>,
284 CountdownInstance: Countdown,
285 SequenceCounterInstance: SequenceCounter,
286> {
287 local_cfg: LocalEntityConfig<UserFaultHookInstance>,
288 pdu_sender: PduSenderInstance,
289 pdu_and_cksum_buffer: RefCell<[u8; crate::buf_len::PACKET_BUF_LEN]>,
290 put_request_cacher: StaticPutRequestCacher<{ crate::buf_len::PACKET_BUF_LEN }>,
291 remote_cfg_table: RemoteConfigStoreInstance,
292 vfs: Vfs,
293 state_helper: StateHelper,
294 transaction_params: TransactionParams<CountdownInstance>,
295 timer_creator: TimerCreatorInstance,
296 seq_count_provider: SequenceCounterInstance,
297 anomalies: AnomalyTracker,
298}
299
300impl<
301 PduSenderInstance: PduSender,
302 UserFaultHookInstance: UserFaultHook,
303 Vfs: VirtualFilestore,
304 RemoteConfigStoreInstance: RemoteConfigStore,
305 TimerCreatorInstance: TimerCreator<Countdown = CountdownInstance>,
306 CountdownInstance: Countdown,
307 SequenceCounterInstance: SequenceCounter,
308>
309 SourceHandler<
310 PduSenderInstance,
311 UserFaultHookInstance,
312 Vfs,
313 RemoteConfigStoreInstance,
314 TimerCreatorInstance,
315 CountdownInstance,
316 SequenceCounterInstance,
317 >
318{
319 #[allow(clippy::too_many_arguments)]
338 pub fn new(
339 cfg: LocalEntityConfig<UserFaultHookInstance>,
340 pdu_sender: PduSenderInstance,
341 vfs: Vfs,
342 remote_cfg_table: RemoteConfigStoreInstance,
343 timer_creator: TimerCreatorInstance,
344 seq_count_provider: SequenceCounterInstance,
345 ) -> Self {
346 Self {
347 local_cfg: cfg,
348 remote_cfg_table,
349 pdu_sender,
350 pdu_and_cksum_buffer: RefCell::new([0; crate::buf_len::PACKET_BUF_LEN]),
351 vfs,
352 put_request_cacher: StaticPutRequestCacher::<{ crate::buf_len::PACKET_BUF_LEN }>::new(),
353 state_helper: Default::default(),
354 transaction_params: Default::default(),
355 anomalies: Default::default(),
356 timer_creator,
357 seq_count_provider,
358 }
359 }
360
361 pub fn state_machine_no_packet(
363 &mut self,
364 cfdp_user: &mut impl CfdpUser,
365 ) -> Result<u32, SourceError> {
366 self.state_machine(cfdp_user, None::<&DummyPduProvider>)
367 }
368
369 pub fn state_machine(
378 &mut self,
379 cfdp_user: &mut impl CfdpUser,
380 pdu: Option<&impl PduProvider>,
381 ) -> Result<u32, SourceError> {
382 let mut sent_packets = 0;
383 if let Some(packet) = pdu {
384 sent_packets += self.insert_packet(cfdp_user, packet)?;
385 }
386 match self.state() {
387 super::State::Idle => {
388 Ok(0)
390 }
391 super::State::Busy => {
392 sent_packets += self.fsm_busy(cfdp_user, pdu)?;
393 Ok(sent_packets)
394 }
395 super::State::Suspended => {
396 Ok(0)
398 }
399 }
400 }
401
402 #[inline]
403 pub fn transaction_id(&self) -> Option<TransactionId> {
404 self.transaction_params.transaction_id
405 }
406
407 #[inline]
409 pub fn transmission_mode(&self) -> Option<super::TransmissionMode> {
410 self.transaction_params.transmission_mode
411 }
412
413 #[inline]
416 pub fn step(&self) -> TransactionStep {
417 self.state_helper.step.get()
418 }
419
420 #[inline]
421 pub fn state(&self) -> State {
422 self.state_helper.state.get()
423 }
424
425 #[inline]
426 pub fn local_cfg(&self) -> &LocalEntityConfig<UserFaultHookInstance> {
427 &self.local_cfg
428 }
429
430 pub fn put_request(
438 &mut self,
439 put_request: &impl ReadablePutRequest,
440 ) -> Result<(), PutRequestError> {
441 if self.state() != super::State::Idle {
442 return Err(PutRequestError::AlreadyBusy);
443 }
444 self.put_request_cacher.set(put_request)?;
445 let remote_cfg = self.remote_cfg_table.get(
446 self.put_request_cacher
447 .static_fields
448 .destination_id
449 .value_const(),
450 );
451 if remote_cfg.is_none() {
452 return Err(PutRequestError::NoRemoteCfgFound(
453 self.put_request_cacher.static_fields.destination_id,
454 ));
455 }
456 let remote_cfg = remote_cfg.unwrap();
457 self.state_helper.num_packets_ready.set(0);
458 let transmission_mode = if self.put_request_cacher.static_fields.trans_mode.is_some() {
459 self.put_request_cacher.static_fields.trans_mode.unwrap()
460 } else {
461 remote_cfg.default_transmission_mode
462 };
463 let closure_requested = if self
464 .put_request_cacher
465 .static_fields
466 .closure_requested
467 .is_some()
468 {
469 self.put_request_cacher
470 .static_fields
471 .closure_requested
472 .unwrap()
473 } else {
474 remote_cfg.closure_requested_by_default
475 };
476 if self.put_request_cacher.has_source_file()
477 && !self.vfs.exists(self.put_request_cacher.source_file()?)?
478 {
479 return Err(PutRequestError::FileDoesNotExist);
480 }
481
482 let transaction_id = TransactionId::new(
483 self.local_cfg().id,
484 UnsignedByteField::new(
485 SequenceCounterInstance::MAX_BIT_WIDTH / 8,
486 self.seq_count_provider.get_and_increment().into(),
487 ),
488 );
489 let larger_entity_width = core::cmp::max(
493 self.local_cfg.id.size(),
494 self.put_request_cacher.static_fields.destination_id.size(),
495 );
496 let create_id = |cached_id: &UnsignedByteField| {
497 if larger_entity_width != cached_id.size() {
498 UnsignedByteField::new(larger_entity_width, cached_id.value_const())
499 } else {
500 *cached_id
501 }
502 };
503
504 self.transaction_params
506 .pdu_conf
507 .set_source_and_dest_id(
508 create_id(&self.local_cfg.id),
509 create_id(&self.put_request_cacher.static_fields.destination_id),
510 )
511 .unwrap();
512 self.transaction_params.pdu_conf.direction = Direction::TowardsReceiver;
514 self.transaction_params.pdu_conf.crc_flag =
515 remote_cfg.crc_on_transmission_by_default.into();
516 self.transaction_params.pdu_conf.transaction_seq_num = *transaction_id.seq_num();
517 self.transaction_params.pdu_conf.trans_mode = transmission_mode;
518 self.transaction_params.file_params.segment_len =
519 self.calculate_max_file_seg_len(remote_cfg);
520
521 self.transaction_params.transaction_id = Some(transaction_id);
522 self.transaction_params.remote_cfg = Some(*remote_cfg);
523 self.transaction_params.transmission_mode = Some(transmission_mode);
524 self.transaction_params.closure_requested = closure_requested;
525 self.transaction_params.cond_code_eof.set(None);
526 self.transaction_params.finished_params = None;
527
528 self.state_helper.state.set(super::State::Busy);
529 Ok(())
530 }
531
532 fn insert_packet(
533 &mut self,
534 _cfdp_user: &mut impl CfdpUser,
535 packet_to_insert: &impl PduProvider,
536 ) -> Result<u32, SourceError> {
537 if packet_to_insert.packet_target()? != PacketTarget::SourceEntity {
538 return Err(SourceError::CantProcessPacketType {
541 pdu_type: packet_to_insert.pdu_type(),
542 directive_type: packet_to_insert.file_directive_type(),
543 });
544 }
545 if packet_to_insert.pdu_type() == PduType::FileData {
546 return Err(SourceError::UnexpectedPdu {
549 pdu_type: PduType::FileData,
550 directive_type: None,
551 });
552 }
553 let mut sent_packets = 0;
554
555 match packet_to_insert
558 .file_directive_type()
559 .expect("PDU directive type unexpectedly not set")
560 {
561 FileDirectiveType::FinishedPdu => {
562 let finished_pdu = FinishedPduReader::new(packet_to_insert.raw_pdu())?;
563 self.handle_finished_pdu(&finished_pdu)?
564 }
565 FileDirectiveType::NakPdu => {
566 let nak_pdu = NakPduReader::new(packet_to_insert.raw_pdu())?;
567 sent_packets += self.handle_nak_pdu(&nak_pdu)?;
568 }
569 FileDirectiveType::KeepAlivePdu => self.handle_keep_alive_pdu(),
570 FileDirectiveType::AckPdu => {
571 let ack_pdu = AckPdu::from_bytes(packet_to_insert.raw_pdu())?;
572 self.handle_ack_pdu(&ack_pdu)?
573 }
574 FileDirectiveType::EofPdu
575 | FileDirectiveType::PromptPdu
576 | FileDirectiveType::MetadataPdu => {
577 return Err(SourceError::CantProcessPacketType {
578 pdu_type: packet_to_insert.pdu_type(),
579 directive_type: packet_to_insert.file_directive_type(),
580 });
581 }
582 }
583 Ok(sent_packets)
584 }
585
586 pub fn cancel_request(
598 &mut self,
599 user: &mut impl CfdpUser,
600 transaction_id: &TransactionId,
601 ) -> Result<bool, SourceError> {
602 if self.state() == super::State::Idle {
603 return Ok(false);
604 }
605 if let Some(active_id) = self.transaction_id() {
606 if active_id == *transaction_id {
607 self.notice_of_cancellation(user, ConditionCode::CancelRequestReceived)?;
609 return Ok(true);
610 }
611 }
612 Ok(false)
613 }
614
615 pub fn reset(&mut self) {
620 self.state_helper = Default::default();
621 self.transaction_params.reset();
622 }
623
624 #[inline]
625 fn set_step(&mut self, step: TransactionStep) {
626 self.set_step_internal(step);
627 }
628
629 #[inline]
630 fn set_step_internal(&self, step: TransactionStep) {
631 self.state_helper.step.set(step);
632 }
633
634 fn fsm_busy(
635 &mut self,
636 user: &mut impl CfdpUser,
637 _pdu: Option<&impl PduProvider>,
638 ) -> Result<u32, SourceError> {
639 let mut sent_packets = 0;
640 if self.step() == TransactionStep::Idle {
641 self.set_step(TransactionStep::TransactionStart);
642 }
643 if self.step() == TransactionStep::TransactionStart {
644 self.handle_transaction_start(user)?;
645 self.set_step(TransactionStep::SendingMetadata);
646 }
647 if self.step() == TransactionStep::SendingMetadata {
648 self.prepare_and_send_metadata_pdu()?;
649 self.set_step(TransactionStep::SendingFileData);
650 sent_packets += 1;
651 }
652 if self.step() == TransactionStep::SendingFileData {
653 if let ControlFlow::Break(packets) = self.file_data_fsm()? {
654 sent_packets += packets;
655 return Ok(sent_packets);
657 }
658 }
659 if self.step() == TransactionStep::SendingEof {
660 self.eof_fsm(user)?;
661 sent_packets += 1;
662 }
663 if self.step() == TransactionStep::WaitingForEofAck {
664 sent_packets += self.handle_waiting_for_ack_pdu(user)?;
665 }
666 if self.step() == TransactionStep::WaitingForFinished {
667 sent_packets += self.handle_waiting_for_finished_pdu(user)?;
668 }
669 if self.step() == TransactionStep::NoticeOfCompletion {
670 self.notice_of_completion(user);
671 self.reset();
672 }
673 Ok(sent_packets)
674 }
675
676 fn handle_waiting_for_ack_pdu(&mut self, user: &mut impl CfdpUser) -> Result<u32, SourceError> {
677 self.handle_positive_ack_procedures(user)
678 }
679
680 fn handle_positive_ack_procedures(
681 &mut self,
682 user: &mut impl CfdpUser,
683 ) -> Result<u32, SourceError> {
684 let mut sent_packets = 0;
685 let current_params = self.transaction_params.positive_ack_params.get();
686 if let Some(mut positive_ack_params) = current_params {
687 if self
688 .transaction_params
689 .ack_timer
690 .borrow_mut()
691 .as_ref()
692 .unwrap()
693 .has_expired()
694 {
695 let ack_timer_exp_limit = self
696 .transaction_params
697 .remote_cfg
698 .as_ref()
699 .unwrap()
700 .positive_ack_timer_expiration_limit;
701 if positive_ack_params.ack_counter + 1 >= ack_timer_exp_limit {
702 let (fault_packets_sent, ctx) =
703 self.declare_fault(user, ConditionCode::PositiveAckLimitReached)?;
704 sent_packets += fault_packets_sent;
705 if ctx == FsmContext::ResetWhenPossible {
706 self.reset();
707 } else {
708 positive_ack_params.ack_counter = 0;
709 positive_ack_params.positive_ack_of_cancellation = true;
710 }
711 } else {
712 self.transaction_params
713 .ack_timer
714 .borrow_mut()
715 .as_mut()
716 .unwrap()
717 .reset();
718 positive_ack_params.ack_counter += 1;
719 self.prepare_and_send_eof_pdu(
720 user,
721 self.transaction_params
722 .file_params
723 .checksum_completed_file
724 .unwrap(),
725 )?;
726 sent_packets += 1;
727 }
728 }
729
730 self.transaction_params
731 .positive_ack_params
732 .set(Some(positive_ack_params));
733 }
734 Ok(sent_packets)
735 }
736
737 fn handle_retransmission(&mut self, nak_pdu: &NakPduReader) -> Result<u32, SourceError> {
738 let mut sent_packets = 0;
739 let segment_req_iter = nak_pdu.get_segment_requests_iterator().unwrap();
740 for segment_req in segment_req_iter {
741 if segment_req.0 == 0 && segment_req.1 == 0 {
743 self.prepare_and_send_metadata_pdu()?;
744 sent_packets += 1;
745 continue;
746 } else {
747 if (segment_req.1 < segment_req.0)
748 || (segment_req.0 > self.transaction_params.file_params.progress)
749 {
750 return Err(SourceError::InvalidNakPdu);
751 }
752 let mut missing_chunk_len = segment_req.1 - segment_req.0;
753 let current_offset = segment_req.0;
754 while missing_chunk_len > 0 {
755 let chunk_size = core::cmp::min(
756 missing_chunk_len,
757 self.transaction_params.file_params.segment_len,
758 );
759 self.prepare_and_send_file_data_pdu(current_offset, chunk_size)?;
760 sent_packets += 1;
761 missing_chunk_len -= missing_chunk_len;
762 }
763 }
764 }
765 Ok(sent_packets)
766 }
767
768 fn handle_waiting_for_finished_pdu(
769 &mut self,
770 user: &mut impl CfdpUser,
771 ) -> Result<u32, SourceError> {
772 #[allow(clippy::collapsible_if)]
774 if self.transmission_mode().unwrap() == TransmissionMode::Unacknowledged
775 && self
776 .transaction_params
777 .check_timer
778 .as_ref()
779 .unwrap()
780 .has_expired()
781 {
782 let (sent_packets, ctx) = self.declare_fault(user, ConditionCode::CheckLimitReached)?;
783 if ctx == FsmContext::ResetWhenPossible {
784 self.reset();
785 }
786 return Ok(sent_packets);
787 }
788 Ok(0)
789 }
790
791 fn eof_fsm(&mut self, user: &mut impl CfdpUser) -> Result<(), SourceError> {
792 let checksum = self.vfs.calculate_checksum(
793 self.put_request_cacher.source_file().unwrap(),
794 self.transaction_params
795 .remote_cfg
796 .as_ref()
797 .unwrap()
798 .default_crc_type,
799 self.transaction_params.file_params.file_size,
800 self.pdu_and_cksum_buffer.borrow_mut().as_mut_slice(),
801 )?;
802 self.transaction_params.file_params.checksum_completed_file = Some(checksum);
803 self.prepare_and_send_eof_pdu(user, checksum)?;
804 if self.transmission_mode().unwrap() == TransmissionMode::Unacknowledged {
805 if self.transaction_params.closure_requested {
806 self.transaction_params.check_timer = Some(
807 self.timer_creator
808 .create_countdown(crate::TimerContext::CheckLimit {
809 local_id: self.local_cfg.id,
810 remote_id: self
811 .transaction_params
812 .remote_cfg
813 .as_ref()
814 .unwrap()
815 .entity_id,
816 entity_type: EntityType::Sending,
817 }),
818 );
819 self.set_step(TransactionStep::WaitingForFinished);
820 } else {
821 self.set_step(TransactionStep::NoticeOfCompletion);
822 }
823 } else {
824 self.start_positive_ack_procedure();
825 }
826 Ok(())
827 }
828
829 fn start_positive_ack_procedure(&self) {
830 self.set_step_internal(TransactionStep::WaitingForEofAck);
831 match self.transaction_params.positive_ack_params.get() {
832 Some(mut current) => {
833 current.ack_counter = 0;
834 self.transaction_params
835 .positive_ack_params
836 .set(Some(current));
837 }
838 None => self
839 .transaction_params
840 .positive_ack_params
841 .set(Some(PositiveAckParams {
842 ack_counter: 0,
843 positive_ack_of_cancellation: false,
844 })),
845 }
846
847 *self.transaction_params.ack_timer.borrow_mut() = Some(
848 self.timer_creator
849 .create_countdown(crate::TimerContext::PositiveAck {
850 expiry_time: self
851 .transaction_params
852 .remote_cfg
853 .as_ref()
854 .unwrap()
855 .positive_ack_timer_interval,
856 }),
857 );
858 }
859
860 fn handle_transaction_start(
861 &mut self,
862 cfdp_user: &mut impl CfdpUser,
863 ) -> Result<(), SourceError> {
864 if !self.put_request_cacher.has_source_file() {
865 self.transaction_params.file_params.metadata_only = true;
866 } else {
867 let source_file = self
868 .put_request_cacher
869 .source_file()
870 .map_err(SourceError::SourceFileNotValidUtf8)?;
871 if !self.vfs.exists(source_file)? {
872 return Err(SourceError::FilestoreError(
873 FilestoreError::FileDoesNotExist,
874 ));
875 }
876 self.put_request_cacher
878 .dest_file()
879 .map_err(SourceError::DestFileNotValidUtf8)?;
880 self.transaction_params.file_params.file_size = self.vfs.file_size(source_file)?;
881 if self.transaction_params.file_params.file_size > u32::MAX as u64 {
882 self.transaction_params.pdu_conf.file_flag = LargeFileFlag::Large
883 } else {
884 if self.transaction_params.file_params.file_size == 0 {
885 self.transaction_params.file_params.empty_file = true;
886 }
887 self.transaction_params.pdu_conf.file_flag = LargeFileFlag::Normal
888 }
889 }
890 cfdp_user.transaction_indication(&self.transaction_id().unwrap());
891 Ok(())
892 }
893
894 fn prepare_and_send_ack_pdu(
895 &mut self,
896 condition_code: ConditionCode,
897 transaction_status: TransactionStatus,
898 ) -> Result<(), SourceError> {
899 let ack_pdu = AckPdu::new(
900 PduHeader::new_for_file_directive(self.transaction_params.pdu_conf, 0),
901 FileDirectiveType::FinishedPdu,
902 condition_code,
903 transaction_status,
904 )
905 .map_err(PduError::from)?;
906 self.pdu_send_helper(&ack_pdu)?;
907 Ok(())
908 }
909
910 fn prepare_and_send_metadata_pdu(&mut self) -> Result<(), SourceError> {
911 let metadata_params = MetadataGenericParams::new(
912 self.transaction_params.closure_requested,
913 self.transaction_params
914 .remote_cfg
915 .as_ref()
916 .unwrap()
917 .default_crc_type,
918 self.transaction_params.file_params.file_size,
919 );
920 if self.transaction_params.file_params.metadata_only {
921 let metadata_pdu = MetadataPduCreator::new(
922 PduHeader::new_for_file_directive(self.transaction_params.pdu_conf, 0),
923 metadata_params,
924 Lv::new_empty(),
925 Lv::new_empty(),
926 self.put_request_cacher.opts_slice(),
927 );
928 return self.pdu_send_helper(&metadata_pdu);
929 }
930 let metadata_pdu = MetadataPduCreator::new(
931 PduHeader::new_for_file_directive(self.transaction_params.pdu_conf, 0),
932 metadata_params,
933 Lv::new_from_str(self.put_request_cacher.source_file().unwrap()).unwrap(),
934 Lv::new_from_str(self.put_request_cacher.dest_file().unwrap()).unwrap(),
935 self.put_request_cacher.opts_slice(),
936 );
937 self.pdu_send_helper(&metadata_pdu)
938 }
939
940 fn file_data_fsm(&mut self) -> Result<ControlFlow<u32>, SourceError> {
941 if !self.transaction_params.file_params.metadata_only
942 && self.transaction_params.file_params.progress
943 < self.transaction_params.file_params.file_size
944 && self.send_progressing_file_data_pdu()?
945 {
946 return Ok(ControlFlow::Break(1));
947 }
948 if self.transaction_params.file_params.empty_file
949 || self.transaction_params.file_params.progress
950 >= self.transaction_params.file_params.file_size
951 {
952 self.set_step(TransactionStep::SendingEof);
954 self.transaction_params
955 .cond_code_eof
956 .set(Some(ConditionCode::NoError));
957 } else if self.transaction_params.file_params.metadata_only {
958 if self.transaction_params.closure_requested {
960 self.set_step(TransactionStep::WaitingForFinished);
961 } else {
962 self.set_step(TransactionStep::NoticeOfCompletion);
963 }
964 }
965 Ok(ControlFlow::Continue(()))
966 }
967
968 fn notice_of_completion(&mut self, cfdp_user: &mut impl CfdpUser) {
969 if self.local_cfg.indication_cfg.transaction_finished {
970 let finished_params = match self.transaction_params.finished_params {
972 Some(finished_params) => TransactionFinishedParams {
973 id: self.transaction_id().unwrap(),
974 condition_code: finished_params.condition_code,
975 delivery_code: finished_params.delivery_code,
976 file_status: finished_params.file_status,
977 },
978 None => TransactionFinishedParams {
979 id: self.transaction_id().unwrap(),
980 condition_code: ConditionCode::NoError,
981 delivery_code: DeliveryCode::Complete,
982 file_status: FileStatus::Unreported,
983 },
984 };
985 cfdp_user.transaction_finished_indication(&finished_params);
986 }
987 }
988
989 fn calculate_max_file_seg_len(&self, remote_cfg: &RemoteEntityConfig) -> u64 {
990 let mut derived_max_seg_len = calculate_max_file_seg_len_for_max_packet_len_and_pdu_header(
991 &PduHeader::new_for_file_directive(self.transaction_params.pdu_conf, 0),
992 remote_cfg.max_packet_len,
993 None,
994 );
995 if remote_cfg.max_file_segment_len.is_some() {
996 derived_max_seg_len = core::cmp::min(
997 remote_cfg.max_file_segment_len.unwrap(),
998 derived_max_seg_len,
999 );
1000 }
1001 derived_max_seg_len as u64
1002 }
1003
1004 fn send_progressing_file_data_pdu(&mut self) -> Result<bool, SourceError> {
1005 if self.transaction_params.file_params.progress
1007 >= self.transaction_params.file_params.file_size
1008 {
1009 return Ok(false);
1010 }
1011 let read_len = self.transaction_params.file_params.segment_len.min(
1012 self.transaction_params.file_params.file_size
1013 - self.transaction_params.file_params.progress,
1014 );
1015 self.prepare_and_send_file_data_pdu(
1016 self.transaction_params.file_params.progress,
1017 read_len,
1018 )?;
1019 Ok(true)
1020 }
1021
1022 fn prepare_and_send_file_data_pdu(
1023 &mut self,
1024 offset: u64,
1025 size: u64,
1026 ) -> Result<(), SourceError> {
1027 let pdu_creator = FileDataPduCreatorWithReservedDatafield::new_no_seg_metadata(
1028 PduHeader::new_for_file_data(
1029 self.transaction_params.pdu_conf,
1030 0,
1031 SegmentMetadataFlag::NotPresent,
1032 SegmentationControl::NoRecordBoundaryPreservation,
1033 ),
1034 offset,
1035 size,
1036 );
1037 let mut unwritten_pdu =
1038 pdu_creator.write_to_bytes_partially(self.pdu_and_cksum_buffer.get_mut())?;
1039 self.vfs.read_data(
1040 self.put_request_cacher.source_file().unwrap(),
1041 offset,
1042 size,
1043 unwritten_pdu.file_data_field_mut(),
1044 )?;
1045 let written_len = unwritten_pdu.finish();
1046 self.pdu_sender.send_pdu(
1047 PduType::FileData,
1048 None,
1049 &self.pdu_and_cksum_buffer.borrow()[0..written_len],
1050 )?;
1051 self.transaction_params.file_params.progress += size;
1052 Ok(())
1053 }
1054
1055 fn prepare_and_send_eof_pdu(
1056 &self,
1057 cfdp_user: &mut impl CfdpUser,
1058 checksum: u32,
1059 ) -> Result<(), SourceError> {
1060 let eof_pdu = EofPdu::new(
1061 PduHeader::new_for_file_directive(self.transaction_params.pdu_conf, 0),
1062 self.transaction_params
1063 .cond_code_eof
1064 .get()
1065 .unwrap_or(ConditionCode::NoError),
1066 checksum,
1067 self.transaction_params.file_params.progress,
1068 None,
1069 );
1070 self.pdu_send_helper(&eof_pdu)?;
1071 if self.local_cfg.indication_cfg.eof_sent {
1072 cfdp_user.eof_sent_indication(&self.transaction_id().unwrap());
1073 }
1074 Ok(())
1075 }
1076
1077 fn pdu_send_helper(&self, pdu: &(impl WritablePduPacket + CfdpPdu)) -> Result<(), SourceError> {
1078 let mut pdu_buffer_mut = self.pdu_and_cksum_buffer.borrow_mut();
1079 let written_len = pdu.write_to_bytes(pdu_buffer_mut.as_mut_slice())?;
1080 self.pdu_sender.send_pdu(
1081 pdu.pdu_type(),
1082 pdu.file_directive_type(),
1083 &pdu_buffer_mut[0..written_len],
1084 )?;
1085 Ok(())
1086 }
1087
1088 fn handle_finished_pdu(&mut self, finished_pdu: &FinishedPduReader) -> Result<(), SourceError> {
1089 if self.state() == State::Idle {
1091 return Ok(());
1092 }
1093 if self.step() != TransactionStep::WaitingForFinished {
1094 return Err(SourceError::UnexpectedPdu {
1095 pdu_type: PduType::FileDirective,
1096 directive_type: Some(FileDirectiveType::FinishedPdu),
1097 });
1098 }
1099 self.transaction_params.finished_params = Some(FinishedParams {
1102 condition_code: finished_pdu.condition_code(),
1103 delivery_code: finished_pdu.delivery_code(),
1104 file_status: finished_pdu.file_status(),
1105 });
1106 if let Some(TransmissionMode::Acknowledged) = self.transmission_mode() {
1107 self.prepare_and_send_ack_pdu(
1108 finished_pdu.condition_code(),
1109 TransactionStatus::Active,
1110 )?;
1111 }
1112 self.set_step(TransactionStep::NoticeOfCompletion);
1113 Ok(())
1114 }
1115
1116 fn handle_nak_pdu(&mut self, nak_pdu: &NakPduReader) -> Result<u32, SourceError> {
1117 self.handle_retransmission(nak_pdu)
1118 }
1119
1120 fn handle_ack_pdu(&mut self, ack_pdu: &AckPdu) -> Result<(), SourceError> {
1121 if self.step() != TransactionStep::WaitingForEofAck {
1122 return Err(SourceError::UnexpectedPdu {
1124 pdu_type: PduType::FileDirective,
1125 directive_type: Some(FileDirectiveType::AckPdu),
1126 });
1127 }
1128 if ack_pdu.directive_code_of_acked_pdu() == FileDirectiveType::EofPdu {
1129 self.set_step(TransactionStep::WaitingForFinished);
1130 } else {
1131 self.anomalies.invalid_ack_directive_code =
1132 self.anomalies.invalid_ack_directive_code.wrapping_add(1);
1133 }
1134 Ok(())
1135 }
1136
1137 pub fn notice_of_cancellation(
1138 &mut self,
1139 user: &mut impl CfdpUser,
1140 condition_code: ConditionCode,
1141 ) -> Result<u32, SourceError> {
1142 let mut sent_packets = 0;
1143 let ctx = self.notice_of_cancellation_internal(user, condition_code, &mut sent_packets)?;
1144 if ctx == FsmContext::ResetWhenPossible {
1145 self.reset();
1146 }
1147 Ok(sent_packets)
1148 }
1149
1150 fn notice_of_cancellation_internal(
1151 &self,
1152 user: &mut impl CfdpUser,
1153 condition_code: ConditionCode,
1154 sent_packets: &mut u32,
1155 ) -> Result<FsmContext, SourceError> {
1156 self.transaction_params
1157 .cond_code_eof
1158 .set(Some(condition_code));
1159 let checksum = self.vfs.calculate_checksum(
1162 self.put_request_cacher.source_file().unwrap(),
1163 self.transaction_params
1164 .remote_cfg
1165 .as_ref()
1166 .unwrap()
1167 .default_crc_type,
1168 self.transaction_params.file_params.progress,
1169 self.pdu_and_cksum_buffer.borrow_mut().as_mut_slice(),
1170 )?;
1171 self.prepare_and_send_eof_pdu(user, checksum)?;
1172 *sent_packets += 1;
1173 if self.transmission_mode().unwrap() == TransmissionMode::Unacknowledged {
1174 Ok(FsmContext::ResetWhenPossible)
1176 } else {
1177 self.start_positive_ack_procedure();
1178 Ok(FsmContext::default())
1179 }
1180 }
1181
1182 pub fn notice_of_suspension(&mut self) {
1183 self.notice_of_suspension_internal();
1184 }
1185
1186 fn notice_of_suspension_internal(&self) {}
1187
1188 pub fn abandon_transaction(&mut self) {
1189 self.reset();
1192 }
1193
1194 fn declare_fault(
1196 &self,
1197 user: &mut impl CfdpUser,
1198 cond: ConditionCode,
1199 ) -> Result<(u32, FsmContext), SourceError> {
1200 let mut sent_packets = 0;
1201 let mut fh = self.local_cfg.fault_handler.get_fault_handler(cond);
1202 if let Some(positive_ack_params) = self.transaction_params.positive_ack_params.get() {
1205 if positive_ack_params.positive_ack_of_cancellation {
1206 fh = FaultHandlerCode::AbandonTransaction;
1207 }
1208 }
1209 let mut ctx = FsmContext::default();
1210 match fh {
1211 FaultHandlerCode::NoticeOfCancellation => {
1212 ctx = self.notice_of_cancellation_internal(user, cond, &mut sent_packets)?;
1213 }
1214 FaultHandlerCode::NoticeOfSuspension => {
1215 self.notice_of_suspension_internal();
1216 }
1217 FaultHandlerCode::IgnoreError => (),
1218 FaultHandlerCode::AbandonTransaction => {
1219 ctx = FsmContext::ResetWhenPossible;
1220 }
1221 }
1222 self.local_cfg.fault_handler.report_fault(
1223 fh,
1224 FaultInfo::new(
1225 self.transaction_id().unwrap(),
1226 cond,
1227 self.transaction_params.file_params.progress,
1228 ),
1229 );
1230 Ok((sent_packets, ctx))
1231 }
1232
1233 fn handle_keep_alive_pdu(&mut self) {}
1234}
1235
1236#[cfg(test)]
1237mod tests {
1238 use std::{fs::OpenOptions, io::Write, path::PathBuf, vec::Vec};
1239
1240 use alloc::string::String;
1241 use rand::Rng;
1242 use spacepackets::{
1243 cfdp::{
1244 ChecksumType, CrcFlag,
1245 pdu::{
1246 file_data::FileDataPdu, finished::FinishedPduCreator, metadata::MetadataPduReader,
1247 nak::NakPduCreator,
1248 },
1249 },
1250 util::UnsignedByteFieldU16,
1251 };
1252 use tempfile::TempPath;
1253
1254 use super::*;
1255 use crate::{
1256 CRC_32, FaultHandler, IndicationConfig, PduRawWithInfo, RemoteConfigStoreStd,
1257 filestore::NativeFilestore,
1258 request::PutRequestOwned,
1259 source::TransactionStep,
1260 tests::{
1261 SentPdu, TestCfdpSender, TestCfdpUser, TestCheckTimer, TestCheckTimerCreator,
1262 TestFaultHandler, TimerExpiryControl, basic_remote_cfg_table,
1263 },
1264 };
1265 use spacepackets::seq_count::SequenceCounterSimple;
1266
1267 const LOCAL_ID: UnsignedByteFieldU16 = UnsignedByteFieldU16::new(1);
1268 const REMOTE_ID: UnsignedByteFieldU16 = UnsignedByteFieldU16::new(2);
1269 const INVALID_ID: UnsignedByteFieldU16 = UnsignedByteFieldU16::new(5);
1270
1271 fn init_full_filepaths_textfile() -> (TempPath, PathBuf) {
1272 (
1273 tempfile::NamedTempFile::new().unwrap().into_temp_path(),
1274 tempfile::TempPath::from_path("/tmp/test.txt").to_path_buf(),
1275 )
1276 }
1277
1278 type TestSourceHandler = SourceHandler<
1279 TestCfdpSender,
1280 TestFaultHandler,
1281 NativeFilestore,
1282 RemoteConfigStoreStd,
1283 TestCheckTimerCreator,
1284 TestCheckTimer,
1285 SequenceCounterSimple<u16>,
1286 >;
1287
1288 struct SourceHandlerTestbench {
1289 handler: TestSourceHandler,
1290 expiry_control: TimerExpiryControl,
1291 transmission_mode: TransmissionMode,
1292 #[allow(dead_code)]
1293 srcfile_handle: TempPath,
1294 srcfile: String,
1295 destfile: String,
1296 max_packet_len: usize,
1297 check_idle_on_drop: bool,
1298 }
1299
1300 #[allow(dead_code)]
1301 struct TransferInfo {
1302 id: TransactionId,
1303 file_size: u64,
1304 closure_requested: bool,
1305 pdu_header: PduHeader,
1306 }
1307
1308 #[derive(Debug, Clone, Copy)]
1309 struct EofParams {
1310 file_size: u64,
1311 file_checksum: u32,
1312 condition_code: ConditionCode,
1313 }
1314
1315 impl EofParams {
1316 pub const fn new_success(file_size: u64, file_checksum: u32) -> Self {
1317 Self {
1318 file_size,
1319 file_checksum,
1320 condition_code: ConditionCode::NoError,
1321 }
1322 }
1323 }
1324
1325 impl SourceHandlerTestbench {
1326 fn new(
1327 transmission_mode: TransmissionMode,
1328 crc_on_transmission_by_default: bool,
1329 max_packet_len: usize,
1330 ) -> Self {
1331 let local_entity_cfg = LocalEntityConfig {
1332 id: LOCAL_ID.into(),
1333 indication_cfg: IndicationConfig::default(),
1334 fault_handler: FaultHandler::new(TestFaultHandler::default()),
1335 };
1336 let (srcfile_handle, destfile) = init_full_filepaths_textfile();
1337 let srcfile = String::from(srcfile_handle.to_path_buf().to_str().unwrap());
1338 let expiry_control = TimerExpiryControl::default();
1339 let sender = TestCfdpSender::default();
1340 Self {
1341 handler: SourceHandler::new(
1342 local_entity_cfg,
1343 sender,
1344 NativeFilestore::default(),
1345 basic_remote_cfg_table(
1346 REMOTE_ID,
1347 max_packet_len,
1348 crc_on_transmission_by_default,
1349 ),
1350 TestCheckTimerCreator::new(&expiry_control),
1351 SequenceCounterSimple::default(),
1352 ),
1353 transmission_mode,
1354 expiry_control,
1355 srcfile_handle,
1356 srcfile,
1357 destfile: String::from(destfile.to_path_buf().to_str().unwrap()),
1358 max_packet_len,
1359 check_idle_on_drop: true,
1360 }
1361 }
1362
1363 fn create_user(&self, next_expected_seq_num: u64, filesize: u64) -> TestCfdpUser {
1364 TestCfdpUser::new(
1365 next_expected_seq_num,
1366 self.srcfile.clone(),
1367 self.destfile.clone(),
1368 filesize,
1369 )
1370 }
1371
1372 fn put_request(
1373 &mut self,
1374 put_request: &impl ReadablePutRequest,
1375 ) -> Result<(), PutRequestError> {
1376 self.handler.put_request(put_request)
1377 }
1378
1379 fn all_fault_queues_empty(&self) -> bool {
1380 self.handler
1381 .local_cfg
1382 .user_fault_hook()
1383 .borrow()
1384 .all_queues_empty()
1385 }
1386
1387 #[allow(dead_code)]
1388 fn test_fault_handler(&self) -> &RefCell<TestFaultHandler> {
1389 self.handler.local_cfg.user_fault_hook()
1390 }
1391
1392 fn test_fault_handler_mut(&mut self) -> &mut RefCell<TestFaultHandler> {
1393 self.handler.local_cfg.user_fault_hook_mut()
1394 }
1395
1396 fn pdu_queue_empty(&self) -> bool {
1397 self.handler.pdu_sender.queue_empty()
1398 }
1399
1400 fn get_next_sent_pdu(&self) -> Option<SentPdu> {
1401 self.handler.pdu_sender.retrieve_next_pdu()
1402 }
1403
1404 fn common_pdu_check_for_file_transfer(&self, pdu_header: &PduHeader, crc_flag: CrcFlag) {
1405 assert_eq!(
1406 pdu_header.seg_ctrl(),
1407 SegmentationControl::NoRecordBoundaryPreservation
1408 );
1409 assert_eq!(
1410 pdu_header.seg_metadata_flag(),
1411 SegmentMetadataFlag::NotPresent
1412 );
1413 assert_eq!(pdu_header.common_pdu_conf().source_id(), LOCAL_ID.into());
1414 assert_eq!(pdu_header.common_pdu_conf().dest_id(), REMOTE_ID.into());
1415 assert_eq!(pdu_header.common_pdu_conf().crc_flag, crc_flag);
1416 assert_eq!(
1417 pdu_header.common_pdu_conf().trans_mode,
1418 self.transmission_mode
1419 );
1420 assert_eq!(
1421 pdu_header.common_pdu_conf().direction,
1422 Direction::TowardsReceiver
1423 );
1424 assert_eq!(
1425 pdu_header.common_pdu_conf().file_flag,
1426 LargeFileFlag::Normal
1427 );
1428 assert_eq!(pdu_header.common_pdu_conf().transaction_seq_num.size(), 2);
1429 }
1430
1431 fn nak_for_file_segments(
1432 &mut self,
1433 cfdp_user: &mut TestCfdpUser,
1434 transfer_info: &TransferInfo,
1435 seg_reqs: &[(u32, u32)],
1436 ) {
1437 let nak_pdu = NakPduCreator::new_normal_file_size(
1438 transfer_info.pdu_header,
1439 0,
1440 transfer_info.file_size as u32,
1441 seg_reqs,
1442 )
1443 .unwrap();
1444 let nak_pdu_vec = nak_pdu.to_vec().unwrap();
1445 let packet_info = PduRawWithInfo::new(&nak_pdu_vec).unwrap();
1446 self.handler
1447 .state_machine(cfdp_user, Some(&packet_info))
1448 .unwrap();
1449 }
1450
1451 fn generic_file_transfer(
1452 &mut self,
1453 cfdp_user: &mut TestCfdpUser,
1454 with_closure: bool,
1455 file_data: Vec<u8>,
1456 ) -> (TransferInfo, u32) {
1457 let mut digest = CRC_32.digest();
1458 digest.update(&file_data);
1459 let checksum = digest.finalize();
1460 cfdp_user.expected_full_src_name = self.srcfile.clone();
1461 cfdp_user.expected_full_dest_name = self.destfile.clone();
1462 cfdp_user.expected_file_size = file_data.len() as u64;
1463 let put_request = PutRequestOwned::new_regular_request(
1464 REMOTE_ID.into(),
1465 &self.srcfile,
1466 &self.destfile,
1467 Some(self.transmission_mode),
1468 Some(with_closure),
1469 )
1470 .expect("creating put request failed");
1471 let transaction_info = self.common_file_transfer_init_with_metadata_check(
1472 cfdp_user,
1473 put_request,
1474 cfdp_user.expected_file_size,
1475 );
1476 let mut current_offset = 0;
1477 let chunks = file_data.chunks(
1478 calculate_max_file_seg_len_for_max_packet_len_and_pdu_header(
1479 &transaction_info.pdu_header,
1480 self.max_packet_len,
1481 None,
1482 ),
1483 );
1484 let mut fd_pdus = 0;
1485 for segment in chunks {
1486 self.check_next_file_pdu(current_offset, segment);
1487 self.handler.state_machine_no_packet(cfdp_user).unwrap();
1488 fd_pdus += 1;
1489 current_offset += segment.len() as u64;
1490 }
1491 self.common_eof_pdu_check(
1492 cfdp_user,
1493 transaction_info.closure_requested,
1494 EofParams {
1495 file_size: cfdp_user.expected_file_size,
1496 file_checksum: checksum,
1497 condition_code: ConditionCode::NoError,
1498 },
1499 1,
1500 );
1501 (transaction_info, fd_pdus)
1502 }
1503
1504 fn common_file_transfer_init_with_metadata_check(
1505 &mut self,
1506 cfdp_user: &mut TestCfdpUser,
1507 put_request: PutRequestOwned,
1508 file_size: u64,
1509 ) -> TransferInfo {
1510 assert_eq!(cfdp_user.transaction_indication_call_count, 0);
1511 assert_eq!(cfdp_user.eof_sent_call_count, 0);
1512
1513 self.put_request(&put_request)
1514 .expect("put_request call failed");
1515 assert_eq!(self.handler.state(), State::Busy);
1516 assert_eq!(self.handler.step(), TransactionStep::Idle);
1517 let transaction_id = self.handler.transaction_id().unwrap();
1518 let sent_packets = self
1519 .handler
1520 .state_machine_no_packet(cfdp_user)
1521 .expect("source handler FSM failure");
1522 assert_eq!(sent_packets, 2);
1523 assert!(!self.pdu_queue_empty());
1524 let next_pdu = self.get_next_sent_pdu().unwrap();
1525 assert!(!self.pdu_queue_empty());
1526 let metadata_pdu_reader = self.metadata_check(&next_pdu, file_size);
1527 let closure_requested = if let Some(closure_requested) = put_request.closure_requested {
1528 assert_eq!(
1529 metadata_pdu_reader.metadata_params().closure_requested,
1530 closure_requested
1531 );
1532 closure_requested
1533 } else {
1534 assert!(metadata_pdu_reader.metadata_params().closure_requested);
1535 metadata_pdu_reader.metadata_params().closure_requested
1536 };
1537 TransferInfo {
1538 pdu_header: *metadata_pdu_reader.pdu_header(),
1539 closure_requested,
1540 file_size,
1541 id: transaction_id,
1542 }
1543 }
1544
1545 fn metadata_check<'a>(
1546 &self,
1547 next_pdu: &'a SentPdu,
1548 file_size: u64,
1549 ) -> MetadataPduReader<'a> {
1550 assert_eq!(next_pdu.pdu_type, PduType::FileDirective);
1551 assert_eq!(
1552 next_pdu.file_directive_type,
1553 Some(FileDirectiveType::MetadataPdu)
1554 );
1555 let metadata_pdu =
1556 MetadataPduReader::new(&next_pdu.raw_pdu).expect("invalid metadata PDU format");
1557 self.common_pdu_check_for_file_transfer(metadata_pdu.pdu_header(), CrcFlag::NoCrc);
1558 assert_eq!(
1559 metadata_pdu
1560 .src_file_name()
1561 .value_as_str()
1562 .unwrap()
1563 .unwrap(),
1564 self.srcfile
1565 );
1566 assert_eq!(
1567 metadata_pdu
1568 .dest_file_name()
1569 .value_as_str()
1570 .unwrap()
1571 .unwrap(),
1572 self.destfile
1573 );
1574 assert_eq!(metadata_pdu.metadata_params().file_size, file_size);
1575 assert_eq!(
1576 metadata_pdu.metadata_params().checksum_type,
1577 ChecksumType::Crc32
1578 );
1579 assert_eq!(metadata_pdu.transmission_mode(), self.transmission_mode);
1580 assert_eq!(metadata_pdu.options(), &[]);
1581 metadata_pdu
1582 }
1583
1584 fn check_next_file_pdu(&mut self, expected_offset: u64, expected_data: &[u8]) {
1585 let next_pdu = self.get_next_sent_pdu().unwrap();
1586 assert_eq!(next_pdu.pdu_type, PduType::FileData);
1587 assert!(next_pdu.file_directive_type.is_none());
1588 let fd_pdu =
1589 FileDataPdu::from_bytes(&next_pdu.raw_pdu).expect("reading file data PDU failed");
1590 assert_eq!(fd_pdu.offset(), expected_offset);
1591 assert_eq!(fd_pdu.file_data(), expected_data);
1592 assert!(fd_pdu.segment_metadata().is_none());
1593 }
1594
1595 fn acknowledge_eof_pdu(
1596 &mut self,
1597 cfdp_user: &mut impl CfdpUser,
1598 transaction_info: &TransferInfo,
1599 ) {
1600 let ack_pdu = AckPdu::new(
1601 transaction_info.pdu_header,
1602 FileDirectiveType::EofPdu,
1603 ConditionCode::NoError,
1604 TransactionStatus::Active,
1605 )
1606 .expect("creating ACK PDU failed");
1607 let ack_pdu_vec = ack_pdu.to_vec().unwrap();
1608 let packet_info = PduRawWithInfo::new(&ack_pdu_vec).unwrap();
1609 self.handler
1610 .state_machine(cfdp_user, Some(&packet_info))
1611 .expect("state machine failed");
1612 }
1613
1614 fn common_finished_pdu_ack_check(&mut self) {
1615 assert!(!self.pdu_queue_empty());
1616 let next_pdu = self.get_next_sent_pdu().unwrap();
1617 assert!(self.pdu_queue_empty());
1618 assert_eq!(next_pdu.pdu_type, PduType::FileDirective);
1619 assert_eq!(
1620 next_pdu.file_directive_type,
1621 Some(FileDirectiveType::AckPdu)
1622 );
1623 let ack_pdu = AckPdu::from_bytes(&next_pdu.raw_pdu).unwrap();
1624 self.common_pdu_check_for_file_transfer(ack_pdu.pdu_header(), CrcFlag::NoCrc);
1625 assert_eq!(ack_pdu.condition_code(), ConditionCode::NoError);
1626 assert_eq!(
1627 ack_pdu.directive_code_of_acked_pdu(),
1628 FileDirectiveType::FinishedPdu
1629 );
1630 assert_eq!(ack_pdu.transaction_status(), TransactionStatus::Active);
1631 }
1632
1633 fn common_eof_pdu_check(
1634 &mut self,
1635 cfdp_user: &mut TestCfdpUser,
1636 closure_requested: bool,
1637 eof_params: EofParams,
1638 eof_sent_call_count: u32,
1639 ) {
1640 let next_pdu = self.get_next_sent_pdu().unwrap();
1641 assert_eq!(next_pdu.pdu_type, PduType::FileDirective);
1642 assert_eq!(
1643 next_pdu.file_directive_type,
1644 Some(FileDirectiveType::EofPdu)
1645 );
1646 let eof_pdu = EofPdu::from_bytes(&next_pdu.raw_pdu).expect("invalid EOF PDU format");
1647 self.common_pdu_check_for_file_transfer(eof_pdu.pdu_header(), CrcFlag::NoCrc);
1648 assert_eq!(eof_pdu.condition_code(), eof_params.condition_code);
1649 assert_eq!(eof_pdu.file_size(), eof_params.file_size);
1650 assert_eq!(eof_pdu.file_checksum(), eof_params.file_checksum);
1651 assert_eq!(
1652 eof_pdu
1653 .pdu_header()
1654 .common_pdu_conf()
1655 .transaction_seq_num
1656 .value_const(),
1657 0
1658 );
1659 if self.transmission_mode == TransmissionMode::Unacknowledged {
1660 if !closure_requested {
1661 assert_eq!(self.handler.state(), State::Idle);
1662 assert_eq!(self.handler.step(), TransactionStep::Idle);
1663 } else {
1664 assert_eq!(self.handler.state(), State::Busy);
1665 assert_eq!(self.handler.step(), TransactionStep::WaitingForFinished);
1666 }
1667 } else {
1668 assert_eq!(self.handler.state(), State::Busy);
1669 assert_eq!(self.handler.step(), TransactionStep::WaitingForEofAck);
1670 }
1671
1672 assert_eq!(cfdp_user.transaction_indication_call_count, 1);
1673 assert_eq!(cfdp_user.eof_sent_call_count, eof_sent_call_count);
1674 self.all_fault_queues_empty();
1675 }
1676
1677 fn common_tiny_file_transfer(
1678 &mut self,
1679 cfdp_user: &mut TestCfdpUser,
1680 with_closure: bool,
1681 ) -> (&'static str, TransferInfo) {
1682 let mut file = OpenOptions::new()
1683 .write(true)
1684 .open(&self.srcfile)
1685 .expect("opening file failed");
1686 let content_str = "Hello World!";
1687 file.write_all(content_str.as_bytes())
1688 .expect("writing file content failed");
1689 drop(file);
1690 let (transfer_info, fd_pdus) = self.generic_file_transfer(
1691 cfdp_user,
1692 with_closure,
1693 content_str.as_bytes().to_vec(),
1694 );
1695 assert_eq!(fd_pdus, 1);
1696 (content_str, transfer_info)
1697 }
1698
1699 fn finish_handling(&mut self, user: &mut TestCfdpUser, transfer_info: &TransferInfo) {
1701 let finished_pdu = FinishedPduCreator::new_no_error(
1702 transfer_info.pdu_header,
1703 DeliveryCode::Complete,
1704 FileStatus::Retained,
1705 );
1706 let finished_pdu_vec = finished_pdu.to_vec().unwrap();
1707 let packet_info = PduRawWithInfo::new(&finished_pdu_vec).unwrap();
1708 self.handler
1709 .state_machine(user, Some(&packet_info))
1710 .unwrap();
1711 }
1712 }
1713
1714 impl Drop for SourceHandlerTestbench {
1715 fn drop(&mut self) {
1716 self.all_fault_queues_empty();
1717 if self.check_idle_on_drop {
1718 assert_eq!(self.handler.state(), State::Idle);
1719 assert_eq!(self.handler.step(), TransactionStep::Idle);
1720 }
1721 }
1722 }
1723
1724 #[test]
1725 fn test_basic() {
1726 let tb = SourceHandlerTestbench::new(TransmissionMode::Unacknowledged, false, 512);
1727 assert!(tb.handler.transmission_mode().is_none());
1728 assert!(tb.pdu_queue_empty());
1729 }
1730
1731 #[test]
1732 fn test_empty_file_transfer_not_acked_no_closure() {
1733 let mut tb = SourceHandlerTestbench::new(TransmissionMode::Unacknowledged, false, 512);
1734 let file_size = 0;
1735 let put_request = PutRequestOwned::new_regular_request(
1736 REMOTE_ID.into(),
1737 &tb.srcfile,
1738 &tb.destfile,
1739 Some(TransmissionMode::Unacknowledged),
1740 Some(false),
1741 )
1742 .expect("creating put request failed");
1743 let mut user = tb.create_user(0, file_size);
1744 let transfer_info =
1745 tb.common_file_transfer_init_with_metadata_check(&mut user, put_request, file_size);
1746 tb.common_eof_pdu_check(
1747 &mut user,
1748 transfer_info.closure_requested,
1749 EofParams::new_success(file_size, CRC_32.digest().finalize()),
1750 1,
1751 );
1752 user.verify_finished_indication(
1753 DeliveryCode::Complete,
1754 ConditionCode::NoError,
1755 transfer_info.id,
1756 FileStatus::Unreported,
1757 );
1758 }
1759
1760 #[test]
1761 fn test_empty_file_transfer_acked() {
1762 let mut tb = SourceHandlerTestbench::new(TransmissionMode::Acknowledged, false, 512);
1763 let file_size = 0;
1764 let put_request = PutRequestOwned::new_regular_request(
1765 REMOTE_ID.into(),
1766 &tb.srcfile,
1767 &tb.destfile,
1768 Some(TransmissionMode::Acknowledged),
1769 Some(false),
1770 )
1771 .expect("creating put request failed");
1772 let mut user = tb.create_user(0, file_size);
1773 let transaction_info =
1774 tb.common_file_transfer_init_with_metadata_check(&mut user, put_request, file_size);
1775 tb.common_eof_pdu_check(
1776 &mut user,
1777 transaction_info.closure_requested,
1778 EofParams::new_success(file_size, CRC_32.digest().finalize()),
1779 1,
1780 );
1781
1782 tb.acknowledge_eof_pdu(&mut user, &transaction_info);
1783 tb.finish_handling(&mut user, &transaction_info);
1784 tb.common_finished_pdu_ack_check();
1785 user.verify_finished_indication_retained(
1786 DeliveryCode::Complete,
1787 ConditionCode::NoError,
1788 transaction_info.id,
1789 );
1790 }
1791
1792 #[test]
1793 fn test_tiny_file_transfer_not_acked_no_closure() {
1794 let mut user = TestCfdpUser::default();
1795 let mut tb = SourceHandlerTestbench::new(TransmissionMode::Unacknowledged, false, 512);
1796 tb.common_tiny_file_transfer(&mut user, false);
1797 }
1798
1799 #[test]
1800 fn test_tiny_file_transfer_acked() {
1801 let mut user = TestCfdpUser::default();
1802 let mut tb = SourceHandlerTestbench::new(TransmissionMode::Acknowledged, false, 512);
1803 let (_data, transfer_info) = tb.common_tiny_file_transfer(&mut user, false);
1804 tb.acknowledge_eof_pdu(&mut user, &transfer_info);
1805 tb.finish_handling(&mut user, &transfer_info);
1806 tb.common_finished_pdu_ack_check();
1807 }
1808
1809 #[test]
1810 fn test_tiny_file_transfer_not_acked_with_closure() {
1811 let mut tb = SourceHandlerTestbench::new(TransmissionMode::Unacknowledged, false, 512);
1812 let mut user = TestCfdpUser::default();
1813 let (_data, transfer_info) = tb.common_tiny_file_transfer(&mut user, true);
1814 tb.finish_handling(&mut user, &transfer_info)
1815 }
1816
1817 #[test]
1818 fn test_two_segment_file_transfer_not_acked_no_closure() {
1819 let mut tb = SourceHandlerTestbench::new(TransmissionMode::Unacknowledged, false, 128);
1820 let mut user = TestCfdpUser::default();
1821 let mut file = OpenOptions::new()
1822 .write(true)
1823 .open(&tb.srcfile)
1824 .expect("opening file failed");
1825 let mut rand_data = [0u8; 140];
1826 rand::rng().fill(&mut rand_data[..]);
1827 file.write_all(&rand_data)
1828 .expect("writing file content failed");
1829 drop(file);
1830 let (_, fd_pdus) = tb.generic_file_transfer(&mut user, false, rand_data.to_vec());
1831 assert_eq!(fd_pdus, 2);
1832 }
1833
1834 #[test]
1835 fn test_two_segment_file_transfer_not_acked_with_closure() {
1836 let mut tb = SourceHandlerTestbench::new(TransmissionMode::Unacknowledged, false, 128);
1837 let mut user = TestCfdpUser::default();
1838 let mut file = OpenOptions::new()
1839 .write(true)
1840 .open(&tb.srcfile)
1841 .expect("opening file failed");
1842 let mut rand_data = [0u8; 140];
1843 rand::rng().fill(&mut rand_data[..]);
1844 file.write_all(&rand_data)
1845 .expect("writing file content failed");
1846 drop(file);
1847 let (transfer_info, fd_pdus) =
1848 tb.generic_file_transfer(&mut user, true, rand_data.to_vec());
1849 assert_eq!(fd_pdus, 2);
1850 tb.finish_handling(&mut user, &transfer_info)
1851 }
1852
1853 #[test]
1854 fn test_two_segment_file_transfer_acked() {
1855 let mut user = TestCfdpUser::default();
1856 let mut tb = SourceHandlerTestbench::new(TransmissionMode::Acknowledged, false, 128);
1857 let mut file = OpenOptions::new()
1858 .write(true)
1859 .open(&tb.srcfile)
1860 .expect("opening file failed");
1861 let mut rand_data = [0u8; 140];
1862 rand::rng().fill(&mut rand_data[..]);
1863 file.write_all(&rand_data)
1864 .expect("writing file content failed");
1865 drop(file);
1866 let (transfer_info, fd_pdus) =
1867 tb.generic_file_transfer(&mut user, true, rand_data.to_vec());
1868 assert_eq!(fd_pdus, 2);
1869 tb.acknowledge_eof_pdu(&mut user, &transfer_info);
1870 tb.finish_handling(&mut user, &transfer_info);
1871 tb.common_finished_pdu_ack_check();
1872 }
1873
1874 #[test]
1875 fn test_empty_file_transfer_not_acked_with_closure() {
1876 let mut tb = SourceHandlerTestbench::new(TransmissionMode::Unacknowledged, false, 512);
1877 let file_size = 0;
1878 let put_request = PutRequestOwned::new_regular_request(
1879 REMOTE_ID.into(),
1880 &tb.srcfile,
1881 &tb.destfile,
1882 Some(TransmissionMode::Unacknowledged),
1883 Some(true),
1884 )
1885 .expect("creating put request failed");
1886 let mut user = tb.create_user(0, file_size);
1887 let transaction_info =
1888 tb.common_file_transfer_init_with_metadata_check(&mut user, put_request, file_size);
1889 tb.common_eof_pdu_check(
1890 &mut user,
1891 transaction_info.closure_requested,
1892 EofParams::new_success(file_size, CRC_32.digest().finalize()),
1893 1,
1894 );
1895 tb.finish_handling(&mut user, &transaction_info);
1896 user.verify_finished_indication_retained(
1897 DeliveryCode::Complete,
1898 ConditionCode::NoError,
1899 transaction_info.id,
1900 );
1901 }
1902
1903 #[test]
1904 fn test_put_request_no_remote_cfg() {
1905 let mut tb = SourceHandlerTestbench::new(TransmissionMode::Unacknowledged, false, 512);
1906
1907 let (srcfile, destfile) = init_full_filepaths_textfile();
1908 let srcfile_str = String::from(srcfile.to_str().unwrap());
1909 let destfile_str = String::from(destfile.to_str().unwrap());
1910 let put_request = PutRequestOwned::new_regular_request(
1911 INVALID_ID.into(),
1912 &srcfile_str,
1913 &destfile_str,
1914 Some(TransmissionMode::Unacknowledged),
1915 Some(true),
1916 )
1917 .expect("creating put request failed");
1918 let error = tb.handler.put_request(&put_request);
1919 assert!(error.is_err());
1920 let error = error.unwrap_err();
1921 if let PutRequestError::NoRemoteCfgFound(id) = error {
1922 assert_eq!(id, INVALID_ID.into());
1923 } else {
1924 panic!("unexpected error type: {:?}", error);
1925 }
1926 }
1927
1928 #[test]
1929 fn test_put_request_file_does_not_exist() {
1930 let mut tb = SourceHandlerTestbench::new(TransmissionMode::Unacknowledged, false, 512);
1931
1932 let file_which_does_not_exist = "/tmp/this_file_does_not_exist.txt";
1933 let destfile = "/tmp/tmp.txt";
1934 let put_request = PutRequestOwned::new_regular_request(
1935 REMOTE_ID.into(),
1936 file_which_does_not_exist,
1937 destfile,
1938 Some(TransmissionMode::Unacknowledged),
1939 Some(true),
1940 )
1941 .expect("creating put request failed");
1942 let error = tb.put_request(&put_request);
1943 assert!(error.is_err());
1944 let error = error.unwrap_err();
1945 if !matches!(error, PutRequestError::FileDoesNotExist) {
1946 panic!("unexpected error type: {:?}", error);
1947 }
1948 }
1949
1950 #[test]
1951 fn test_finished_pdu_check_timeout() {
1952 let mut tb = SourceHandlerTestbench::new(TransmissionMode::Unacknowledged, false, 512);
1953 let file_size = 0;
1954 let put_request = PutRequestOwned::new_regular_request(
1955 REMOTE_ID.into(),
1956 &tb.srcfile,
1957 &tb.destfile,
1958 Some(TransmissionMode::Unacknowledged),
1959 Some(true),
1960 )
1961 .expect("creating put request failed");
1962 let mut user = tb.create_user(0, file_size);
1963 let transaction_info =
1964 tb.common_file_transfer_init_with_metadata_check(&mut user, put_request, file_size);
1965 let expected_id = tb.handler.transaction_id().unwrap();
1966 tb.common_eof_pdu_check(
1967 &mut user,
1968 transaction_info.closure_requested,
1969 EofParams::new_success(file_size, CRC_32.digest().finalize()),
1970 1,
1971 );
1972 assert!(tb.pdu_queue_empty());
1973
1974 tb.expiry_control.set_check_limit_expired();
1977
1978 assert_eq!(tb.handler.state_machine_no_packet(&mut user).unwrap(), 1);
1979 assert!(!tb.pdu_queue_empty());
1980 let next_pdu = tb.get_next_sent_pdu().unwrap();
1981 let eof_pdu = EofPdu::from_bytes(&next_pdu.raw_pdu).expect("invalid EOF PDU format");
1982 tb.common_pdu_check_for_file_transfer(eof_pdu.pdu_header(), CrcFlag::NoCrc);
1983 assert_eq!(eof_pdu.condition_code(), ConditionCode::CheckLimitReached);
1984 assert_eq!(eof_pdu.file_size(), 0);
1985 assert_eq!(eof_pdu.file_checksum(), 0);
1986
1987 let fault_handler = tb.test_fault_handler_mut();
1989 let fh_ref_mut = fault_handler.get_mut();
1990 assert!(!fh_ref_mut.cancellation_queue_empty());
1991 assert_eq!(fh_ref_mut.notice_of_cancellation_queue.len(), 1);
1992 let FaultInfo {
1993 transaction_id,
1994 condition_code,
1995 progress,
1996 } = fh_ref_mut.notice_of_cancellation_queue.pop_back().unwrap();
1997 assert_eq!(transaction_id, expected_id);
1998 assert_eq!(condition_code, ConditionCode::CheckLimitReached);
1999 assert_eq!(progress, 0);
2000 fh_ref_mut.all_queues_empty();
2001 }
2002
2003 #[test]
2004 fn test_cancelled_transfer_empty_file() {
2005 let mut tb = SourceHandlerTestbench::new(TransmissionMode::Unacknowledged, false, 512);
2006 let filesize = 0;
2007 let put_request = PutRequestOwned::new_regular_request(
2008 REMOTE_ID.into(),
2009 &tb.srcfile,
2010 &tb.destfile,
2011 Some(TransmissionMode::Unacknowledged),
2012 Some(false),
2013 )
2014 .expect("creating put request failed");
2015 let mut user = tb.create_user(0, filesize);
2016 assert_eq!(user.transaction_indication_call_count, 0);
2017 assert_eq!(user.eof_sent_call_count, 0);
2018
2019 tb.put_request(&put_request)
2020 .expect("put_request call failed");
2021 assert_eq!(tb.handler.state(), State::Busy);
2022 assert_eq!(tb.handler.step(), TransactionStep::Idle);
2023 assert!(tb.get_next_sent_pdu().is_none());
2024 let id = tb.handler.transaction_id().unwrap();
2025 tb.handler
2026 .cancel_request(&mut user, &id)
2027 .expect("transaction cancellation failed");
2028 assert_eq!(tb.handler.state(), State::Idle);
2029 assert_eq!(tb.handler.step(), TransactionStep::Idle);
2030 let eof_pdu = tb
2032 .get_next_sent_pdu()
2033 .expect("no EOF PDU generated like expected");
2034 assert_eq!(
2035 eof_pdu.file_directive_type.unwrap(),
2036 FileDirectiveType::EofPdu
2037 );
2038 let eof_pdu = EofPdu::from_bytes(&eof_pdu.raw_pdu).unwrap();
2039 assert_eq!(
2040 eof_pdu.condition_code(),
2041 ConditionCode::CancelRequestReceived
2042 );
2043 assert_eq!(eof_pdu.file_checksum(), 0);
2044 assert_eq!(eof_pdu.file_size(), 0);
2045 tb.common_pdu_check_for_file_transfer(eof_pdu.pdu_header(), CrcFlag::NoCrc);
2046 }
2047
2048 #[test]
2049 fn test_cancelled_transfer_mid_transfer() {
2050 let mut tb = SourceHandlerTestbench::new(TransmissionMode::Unacknowledged, false, 128);
2051 let mut file = OpenOptions::new()
2052 .write(true)
2053 .open(&tb.srcfile)
2054 .expect("opening file failed");
2055 let mut rand_data = [0u8; 140];
2056 rand::rng().fill(&mut rand_data[..]);
2057 file.write_all(&rand_data)
2058 .expect("writing file content failed");
2059 drop(file);
2060 let put_request = PutRequestOwned::new_regular_request(
2061 REMOTE_ID.into(),
2062 &tb.srcfile,
2063 &tb.destfile,
2064 Some(TransmissionMode::Unacknowledged),
2065 Some(false),
2066 )
2067 .expect("creating put request failed");
2068 let file_size = rand_data.len() as u64;
2069 let mut user = tb.create_user(0, file_size);
2070 let transaction_info =
2071 tb.common_file_transfer_init_with_metadata_check(&mut user, put_request, file_size);
2072 let mut chunks = rand_data.chunks(
2073 calculate_max_file_seg_len_for_max_packet_len_and_pdu_header(
2074 &transaction_info.pdu_header,
2075 tb.max_packet_len,
2076 None,
2077 ),
2078 );
2079 let mut digest = CRC_32.digest();
2080 let first_chunk = chunks.next().expect("no chunk found");
2081 digest.update(first_chunk);
2082 let checksum = digest.finalize();
2083 let next_packet = tb.get_next_sent_pdu().unwrap();
2084 assert_eq!(next_packet.pdu_type, PduType::FileData);
2085 let fd_pdu = FileDataPdu::from_bytes(&next_packet.raw_pdu).unwrap();
2086 assert_eq!(fd_pdu.file_data(), &rand_data[0..first_chunk.len()]);
2087 let expected_id = tb.handler.transaction_id().unwrap();
2088 assert!(
2089 tb.handler
2090 .cancel_request(&mut user, &expected_id)
2091 .expect("cancellation failed")
2092 );
2093 assert_eq!(tb.handler.state(), State::Idle);
2094 assert_eq!(tb.handler.step(), TransactionStep::Idle);
2095 let next_packet = tb.get_next_sent_pdu().unwrap();
2096 assert_eq!(next_packet.pdu_type, PduType::FileDirective);
2097 assert_eq!(
2098 next_packet.file_directive_type.unwrap(),
2099 FileDirectiveType::EofPdu
2100 );
2101 let eof_pdu = EofPdu::from_bytes(&next_packet.raw_pdu).expect("EOF PDU creation failed");
2104 assert_eq!(eof_pdu.file_size(), first_chunk.len() as u64);
2105 assert_eq!(eof_pdu.file_checksum(), checksum);
2106 assert_eq!(
2107 eof_pdu.condition_code(),
2108 ConditionCode::CancelRequestReceived
2109 );
2110 }
2111
2112 #[test]
2113 fn test_positive_ack_procedure() {
2114 let mut tb = SourceHandlerTestbench::new(TransmissionMode::Acknowledged, false, 512);
2115 let file_size = 0;
2116 let eof_params = EofParams {
2117 file_size,
2118 file_checksum: CRC_32.digest().finalize(),
2119 condition_code: ConditionCode::NoError,
2120 };
2121 let put_request = PutRequestOwned::new_regular_request(
2122 REMOTE_ID.into(),
2123 &tb.srcfile,
2124 &tb.destfile,
2125 Some(TransmissionMode::Acknowledged),
2126 Some(false),
2127 )
2128 .expect("creating put request failed");
2129 let mut user = tb.create_user(0, file_size);
2130 let transfer_info =
2131 tb.common_file_transfer_init_with_metadata_check(&mut user, put_request, file_size);
2132 tb.common_eof_pdu_check(&mut user, transfer_info.closure_requested, eof_params, 1);
2133
2134 assert!(tb.pdu_queue_empty());
2135
2136 tb.expiry_control.set_positive_ack_expired();
2138 let sent_packets = tb
2139 .handler
2140 .state_machine_no_packet(&mut user)
2141 .expect("source handler FSM failure");
2142 assert_eq!(sent_packets, 1);
2143 tb.common_eof_pdu_check(&mut user, transfer_info.closure_requested, eof_params, 2);
2144
2145 tb.acknowledge_eof_pdu(&mut user, &transfer_info);
2146 tb.finish_handling(&mut user, &transfer_info);
2147 tb.common_finished_pdu_ack_check();
2148 user.verify_finished_indication_retained(
2149 DeliveryCode::Complete,
2150 ConditionCode::NoError,
2151 transfer_info.id,
2152 );
2153 }
2154
2155 #[test]
2156 fn test_positive_ack_procedure_ack_limit_reached() {
2157 let mut tb = SourceHandlerTestbench::new(TransmissionMode::Acknowledged, false, 512);
2158 let file_size = 0;
2159 let mut eof_params = EofParams::new_success(file_size, CRC_32.digest().finalize());
2160 let put_request = PutRequestOwned::new_regular_request(
2161 REMOTE_ID.into(),
2162 &tb.srcfile,
2163 &tb.destfile,
2164 Some(TransmissionMode::Acknowledged),
2165 Some(false),
2166 )
2167 .expect("creating put request failed");
2168 let mut user = tb.create_user(0, file_size);
2169 let transfer_info =
2170 tb.common_file_transfer_init_with_metadata_check(&mut user, put_request, file_size);
2171 tb.common_eof_pdu_check(&mut user, transfer_info.closure_requested, eof_params, 1);
2172
2173 assert!(tb.pdu_queue_empty());
2174
2175 tb.expiry_control.set_positive_ack_expired();
2177 let sent_packets = tb
2178 .handler
2179 .state_machine_no_packet(&mut user)
2180 .expect("source handler FSM failure");
2181 assert_eq!(sent_packets, 1);
2182 tb.common_eof_pdu_check(&mut user, transfer_info.closure_requested, eof_params, 2);
2183 tb.expiry_control.set_positive_ack_expired();
2185 let sent_packets = tb
2186 .handler
2187 .state_machine_no_packet(&mut user)
2188 .expect("source handler FSM failure");
2189 assert_eq!(sent_packets, 1);
2190 eof_params.condition_code = ConditionCode::PositiveAckLimitReached;
2191 tb.common_eof_pdu_check(&mut user, transfer_info.closure_requested, eof_params, 3);
2192 tb.acknowledge_eof_pdu(&mut user, &transfer_info);
2195 tb.finish_handling(&mut user, &transfer_info);
2196 tb.common_finished_pdu_ack_check();
2197 user.verify_finished_indication_retained(
2198 DeliveryCode::Complete,
2199 ConditionCode::NoError,
2200 transfer_info.id,
2201 );
2202 }
2203
2204 #[test]
2205 fn test_positive_ack_procedure_ack_limit_reached_abandonment() {
2206 let mut tb = SourceHandlerTestbench::new(TransmissionMode::Acknowledged, false, 512);
2207 let file_size = 0;
2208 let mut eof_params = EofParams::new_success(file_size, CRC_32.digest().finalize());
2209 let put_request = PutRequestOwned::new_regular_request(
2210 REMOTE_ID.into(),
2211 &tb.srcfile,
2212 &tb.destfile,
2213 Some(TransmissionMode::Acknowledged),
2214 Some(false),
2215 )
2216 .expect("creating put request failed");
2217 let mut user = tb.create_user(0, file_size);
2218 let transfer_info =
2219 tb.common_file_transfer_init_with_metadata_check(&mut user, put_request, file_size);
2220 tb.common_eof_pdu_check(&mut user, transfer_info.closure_requested, eof_params, 1);
2221
2222 assert!(tb.pdu_queue_empty());
2223
2224 tb.expiry_control.set_positive_ack_expired();
2226 let sent_packets = tb
2227 .handler
2228 .state_machine_no_packet(&mut user)
2229 .expect("source handler FSM failure");
2230 assert_eq!(sent_packets, 1);
2231 tb.common_eof_pdu_check(&mut user, transfer_info.closure_requested, eof_params, 2);
2232 tb.expiry_control.set_positive_ack_expired();
2234 let sent_packets = tb
2235 .handler
2236 .state_machine_no_packet(&mut user)
2237 .expect("source handler FSM failure");
2238 assert_eq!(sent_packets, 1);
2239 eof_params.condition_code = ConditionCode::PositiveAckLimitReached;
2240 tb.common_eof_pdu_check(&mut user, transfer_info.closure_requested, eof_params, 3);
2241 let fault_handler = tb.test_fault_handler_mut();
2243 let fh_ref_mut = fault_handler.get_mut();
2244 assert!(!fh_ref_mut.cancellation_queue_empty());
2245 assert_eq!(fh_ref_mut.notice_of_cancellation_queue.len(), 1);
2246 let FaultInfo {
2247 transaction_id,
2248 condition_code,
2249 progress,
2250 } = fh_ref_mut.notice_of_cancellation_queue.pop_back().unwrap();
2251 assert_eq!(transaction_id, transfer_info.id);
2252 assert_eq!(condition_code, ConditionCode::PositiveAckLimitReached);
2253 assert_eq!(progress, file_size);
2254 fh_ref_mut.all_queues_empty();
2255
2256 tb.expiry_control.set_positive_ack_expired();
2258 let sent_packets = tb
2259 .handler
2260 .state_machine_no_packet(&mut user)
2261 .expect("source handler FSM failure");
2262 assert_eq!(sent_packets, 1);
2263 tb.common_eof_pdu_check(&mut user, transfer_info.closure_requested, eof_params, 4);
2264
2265 tb.expiry_control.set_positive_ack_expired();
2268 let sent_packets = tb
2269 .handler
2270 .state_machine_no_packet(&mut user)
2271 .expect("source handler FSM failure");
2272 assert_eq!(sent_packets, 0);
2273 let fault_handler = tb.test_fault_handler_mut();
2275 let fh_ref_mut = fault_handler.get_mut();
2276 assert!(!fh_ref_mut.abandoned_queue_empty());
2277 assert_eq!(fh_ref_mut.abandoned_queue.len(), 1);
2278 let FaultInfo {
2279 transaction_id,
2280 condition_code,
2281 progress,
2282 } = fh_ref_mut.abandoned_queue.pop_back().unwrap();
2283 assert_eq!(transaction_id, transfer_info.id);
2284 assert_eq!(condition_code, ConditionCode::PositiveAckLimitReached);
2285 assert_eq!(progress, file_size);
2286 fh_ref_mut.all_queues_empty();
2287 }
2288
2289 #[test]
2290 fn test_nak_for_whole_file() {
2291 let mut tb = SourceHandlerTestbench::new(TransmissionMode::Acknowledged, false, 512);
2292 let mut user = TestCfdpUser::default();
2293 let (data, transfer_info) = tb.common_tiny_file_transfer(&mut user, true);
2294 let seg_reqs = &[(0, transfer_info.file_size as u32)];
2295 tb.nak_for_file_segments(&mut user, &transfer_info, seg_reqs);
2296 tb.check_next_file_pdu(0, data.as_bytes());
2297 tb.all_fault_queues_empty();
2298
2299 tb.acknowledge_eof_pdu(&mut user, &transfer_info);
2300 tb.finish_handling(&mut user, &transfer_info);
2301 tb.common_finished_pdu_ack_check();
2302 }
2303
2304 #[test]
2305 fn test_nak_for_file_segment() {
2306 let mut user = TestCfdpUser::default();
2307 let mut tb = SourceHandlerTestbench::new(TransmissionMode::Acknowledged, false, 128);
2308 let mut file = OpenOptions::new()
2309 .write(true)
2310 .open(&tb.srcfile)
2311 .expect("opening file failed");
2312 let mut rand_data = [0u8; 140];
2313 rand::rng().fill(&mut rand_data[..]);
2314 file.write_all(&rand_data)
2315 .expect("writing file content failed");
2316 drop(file);
2317 let (transfer_info, fd_pdus) =
2318 tb.generic_file_transfer(&mut user, false, rand_data.to_vec());
2319 assert_eq!(fd_pdus, 2);
2320 tb.nak_for_file_segments(&mut user, &transfer_info, &[(0, 90)]);
2321 tb.check_next_file_pdu(0, &rand_data[0..90]);
2322 tb.all_fault_queues_empty();
2323
2324 tb.acknowledge_eof_pdu(&mut user, &transfer_info);
2325 tb.finish_handling(&mut user, &transfer_info);
2326 tb.common_finished_pdu_ack_check();
2327 }
2328
2329 #[test]
2330 fn test_nak_for_metadata() {
2331 let mut tb = SourceHandlerTestbench::new(TransmissionMode::Acknowledged, false, 512);
2332 let file_size = 0;
2333 let put_request = PutRequestOwned::new_regular_request(
2334 REMOTE_ID.into(),
2335 &tb.srcfile,
2336 &tb.destfile,
2337 Some(TransmissionMode::Acknowledged),
2338 Some(false),
2339 )
2340 .expect("creating put request failed");
2341 let mut user = tb.create_user(0, file_size);
2342 let transfer_info =
2343 tb.common_file_transfer_init_with_metadata_check(&mut user, put_request, file_size);
2344 tb.common_eof_pdu_check(
2345 &mut user,
2346 transfer_info.closure_requested,
2347 EofParams::new_success(file_size, CRC_32.digest().finalize()),
2348 1,
2349 );
2350
2351 let nak_pdu = NakPduCreator::new_normal_file_size(
2353 transfer_info.pdu_header,
2354 0,
2355 transfer_info.file_size as u32,
2356 &[(0, 0)],
2357 )
2358 .unwrap();
2359 let nak_pdu_vec = nak_pdu.to_vec().unwrap();
2360 let packet_info = PduRawWithInfo::new(&nak_pdu_vec).unwrap();
2361 let sent_packets = tb
2362 .handler
2363 .state_machine(&mut user, Some(&packet_info))
2364 .unwrap();
2365 assert_eq!(sent_packets, 1);
2366 let next_pdu = tb.get_next_sent_pdu().unwrap();
2367 tb.metadata_check(&next_pdu, file_size);
2369 tb.all_fault_queues_empty();
2370
2371 tb.acknowledge_eof_pdu(&mut user, &transfer_info);
2372 tb.finish_handling(&mut user, &transfer_info);
2373 tb.common_finished_pdu_ack_check();
2374 user.verify_finished_indication_retained(
2375 DeliveryCode::Complete,
2376 ConditionCode::NoError,
2377 transfer_info.id,
2378 );
2379 }
2380}