cfdp/
source.rs

1//! # CFDP Source Entity Module
2//!
3//! The [SourceHandler] is the primary component of this module which converts a
4//! [ReadablePutRequest] into all packet data units (PDUs) which need to be sent to a remote
5//! CFDP entity to perform a File Copy operation to a remote entity.
6//!
7//! The source entity allows freedom communication by using a user-provided [PduSender] instance
8//! to send all generated PDUs. It should be noted that for regular file transfers, each
9//! [SourceHandler::state_machine] call will map to one generated file data PDU. This allows
10//! flow control for the user of the state machine.
11//!
12//! The [SourceHandler::state_machine] will generally perform the following steps after a valid
13//! put request was received through the [SourceHandler::put_request] method:
14//!
15//! 1. Generate the Metadata PDU to be sent to a remote CFDP entity. You can use the
16//!    [spacepackets::cfdp::pdu::metadata::MetadataPduReader] to inspect the generated PDU.
17//! 2. Generate all File Data PDUs to be sent to a remote CFDP entity if applicable (file not
18//!    empty). The PDU(s) can be inspected using the [spacepackets::cfdp::pdu::file_data::FileDataPdu] reader.
19//! 3. Generate an EOF PDU to be sent to a remote CFDP entity. The PDU can be inspected using
20//!    the [spacepackets::cfdp::pdu::eof::EofPdu] reader.
21//!
22//! If this is an unacknowledged transfer with no transaction closure, the file transfer will be
23//! done after these steps. In any other case:
24//!
25//! ### Unacknowledged transfer with requested closure
26//!
27//! 4. A Finished PDU will be awaited, for example one generated using
28//!    [spacepackets::cfdp::pdu::finished::FinishedPduCreator].
29//!
30//! ### Acknowledged transfer
31//!
32//! 4. A EOF ACK packet will be awaited, for example one generated using
33//!    [spacepackets::cfdp::pdu::ack::AckPdu].
34//! 5. A Finished PDU will be awaited, for example one generated using
35//!    [spacepackets::cfdp::pdu::finished::FinishedPduCreator].
36//! 6. A finished PDU ACK packet will be generated to be sent to the remote CFDP entity.
37//!    The [spacepackets::cfdp::pdu::finished::FinishedPduReader] can be used to inspect the
38//!    generated PDU.
39use core::{
40    cell::{Cell, RefCell},
41    ops::ControlFlow,
42    str::Utf8Error,
43};
44
45use spacepackets::{
46    ByteConversionError,
47    cfdp::{
48        ConditionCode, Direction, FaultHandlerCode, LargeFileFlag, PduType, SegmentMetadataFlag,
49        SegmentationControl, TransactionStatus, TransmissionMode,
50        lv::Lv,
51        pdu::{
52            CfdpPdu, CommonPduConfig, FileDirectiveType, PduError, PduHeader, WritablePduPacket,
53            ack::AckPdu,
54            eof::EofPdu,
55            file_data::{
56                FileDataPduCreatorWithReservedDatafield,
57                calculate_max_file_seg_len_for_max_packet_len_and_pdu_header,
58            },
59            finished::{DeliveryCode, FileStatus, FinishedPduReader},
60            metadata::{MetadataGenericParams, MetadataPduCreator},
61            nak::NakPduReader,
62        },
63    },
64    util::{UnsignedByteField, UnsignedEnum},
65};
66
67use spacepackets::seq_count::SequenceCounter;
68
69use crate::{
70    DummyPduProvider, EntityType, FaultInfo, GenericSendError, PduProvider, PositiveAckParams,
71    TimerCreator, time::Countdown,
72};
73
74use super::{
75    LocalEntityConfig, PacketTarget, PduSender, RemoteConfigStore, RemoteEntityConfig, State,
76    TransactionId, UserFaultHook,
77    filestore::{FilestoreError, VirtualFilestore},
78    request::{ReadablePutRequest, StaticPutRequestCacher},
79    user::{CfdpUser, TransactionFinishedParams},
80};
81
82/// This enumeration models the different transaction steps of the source entity handler.
83#[derive(Debug, Copy, Clone, PartialEq, Eq)]
84#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
85#[cfg_attr(feature = "defmt", derive(defmt::Format))]
86pub enum TransactionStep {
87    Idle = 0,
88    TransactionStart = 1,
89    SendingMetadata = 3,
90    SendingFileData = 4,
91    /// Re-transmitting missing packets in acknowledged mode
92    Retransmitting = 5,
93    SendingEof = 6,
94    WaitingForEofAck = 7,
95    WaitingForFinished = 8,
96    NoticeOfCompletion = 10,
97}
98
99#[derive(Default, Debug, Copy, Clone)]
100pub struct FileParams {
101    pub progress: u64,
102    pub segment_len: u64,
103    pub crc32: u32,
104    pub metadata_only: bool,
105    pub file_size: u64,
106    pub empty_file: bool,
107    /// The checksum is cached to avoid expensive re-calculation when the EOF PDU needs to be
108    /// re-sent.
109    pub checksum_completed_file: Option<u32>,
110}
111
112// Explicit choice to put all simple internal fields into Cells.
113// I think this is more efficient than wrapping the whole helper into a RefCell, especially
114// because some of the individual fields are used frequently.
115struct StateHelper {
116    step: Cell<TransactionStep>,
117    state: Cell<super::State>,
118    num_packets_ready: Cell<u32>,
119}
120
121impl Default for StateHelper {
122    fn default() -> Self {
123        Self {
124            state: Cell::new(super::State::Idle),
125            step: Cell::new(TransactionStep::Idle),
126            num_packets_ready: Cell::new(0),
127        }
128    }
129}
130
131impl StateHelper {
132    #[allow(dead_code)]
133    pub fn reset(&self) {
134        self.step.set(TransactionStep::Idle);
135        self.state.set(super::State::Idle);
136        self.num_packets_ready.set(0);
137    }
138}
139
140#[derive(Debug, Copy, Clone)]
141pub struct FinishedParams {
142    condition_code: ConditionCode,
143    delivery_code: DeliveryCode,
144    file_status: FileStatus,
145}
146
147#[derive(Debug, thiserror::Error)]
148pub enum SourceError {
149    #[error("can not process packet type {pdu_type:?} with directive type {directive_type:?}")]
150    CantProcessPacketType {
151        pdu_type: PduType,
152        directive_type: Option<FileDirectiveType>,
153    },
154    #[error("unexpected PDU")]
155    UnexpectedPdu {
156        pdu_type: PduType,
157        directive_type: Option<FileDirectiveType>,
158    },
159    #[error("source handler is already busy with put request")]
160    PutRequestAlreadyActive,
161    #[error("error caching put request")]
162    PutRequestCaching(ByteConversionError),
163    #[error("filestore error: {0}")]
164    FilestoreError(#[from] FilestoreError),
165    #[error("source file does not have valid UTF8 format: {0}")]
166    SourceFileNotValidUtf8(Utf8Error),
167    #[error("destination file does not have valid UTF8 format: {0}")]
168    DestFileNotValidUtf8(Utf8Error),
169    #[error("invalid NAK PDU received")]
170    InvalidNakPdu,
171    #[error("error related to PDU creation: {0}")]
172    Pdu(#[from] PduError),
173    #[error("cfdp feature not implemented")]
174    NotImplemented,
175    #[error("issue sending PDU: {0}")]
176    SendError(#[from] GenericSendError),
177}
178
179#[derive(Debug, thiserror::Error)]
180pub enum PutRequestError {
181    #[error("error caching put request: {0}")]
182    Storage(#[from] ByteConversionError),
183    #[error("already busy with put request")]
184    AlreadyBusy,
185    #[error("no remote entity configuration found for {0:?}")]
186    NoRemoteCfgFound(UnsignedByteField),
187    #[error("source file does not have valid UTF8 format: {0}")]
188    SourceFileNotValidUtf8(#[from] Utf8Error),
189    #[error("source file does not exist")]
190    FileDoesNotExist,
191    #[error("filestore error: {0}")]
192    FilestoreError(#[from] FilestoreError),
193}
194
195#[derive(Debug, Default, Clone, Copy)]
196pub struct AnomalyTracker {
197    invalid_ack_directive_code: u8,
198}
199
200#[derive(Debug, Default, PartialEq, Eq)]
201pub enum FsmContext {
202    #[default]
203    None,
204    ResetWhenPossible,
205}
206
207#[derive(Debug)]
208pub struct TransactionParams<CountdownInstance: Countdown> {
209    transaction_id: Option<TransactionId>,
210    remote_cfg: Option<RemoteEntityConfig>,
211    transmission_mode: Option<super::TransmissionMode>,
212    closure_requested: bool,
213    cond_code_eof: Cell<Option<ConditionCode>>,
214    finished_params: Option<FinishedParams>,
215    // File specific transfer fields
216    file_params: FileParams,
217    // PDU configuration is cached so it can be re-used for all PDUs generated for file transfers.
218    pdu_conf: CommonPduConfig,
219    check_timer: Option<CountdownInstance>,
220    positive_ack_params: Cell<Option<PositiveAckParams>>,
221    ack_timer: RefCell<Option<CountdownInstance>>,
222}
223
224impl<CountdownInstance: Countdown> Default for TransactionParams<CountdownInstance> {
225    fn default() -> Self {
226        Self {
227            transaction_id: Default::default(),
228            remote_cfg: Default::default(),
229            transmission_mode: Default::default(),
230            closure_requested: Default::default(),
231            cond_code_eof: Default::default(),
232            finished_params: Default::default(),
233            file_params: Default::default(),
234            pdu_conf: Default::default(),
235            check_timer: Default::default(),
236            positive_ack_params: Default::default(),
237            ack_timer: Default::default(),
238        }
239    }
240}
241
242impl<CountdownInstance: Countdown> TransactionParams<CountdownInstance> {
243    #[inline]
244    fn reset(&mut self) {
245        self.transaction_id = None;
246        self.transmission_mode = None;
247    }
248}
249
250/// This is the primary CFDP source handler. It models the CFDP source entity, which is
251/// primarily responsible for handling put requests to send files to another CFDP destination
252/// entity.
253///
254/// As such, it contains a state machine to perform all operations necessary to perform a
255/// source-to-destination file transfer. This class uses the user provides [PduSender] to
256/// send the CFDP PDU packets generated by the state machine.
257///
258/// The following core functions are the primary interface:
259///
260/// 1. [Self::put_request] can be used to start transactions, most notably to start
261///    and perform a Copy File procedure to send a file or to send a Proxy Put Request to request
262///    a file.
263/// 2. [Self::state_machine] is the primary interface to execute an
264///    active file transfer. It generates the necessary CFDP PDUs for this process.
265///    This method is also used to insert received packets with the appropriate destination ID
266///    and target handler type into the state machine.
267///
268/// A put request will only be accepted if the handler is in the idle state.
269///
270/// The handler has an internal buffer for PDU generation and checksum generation. The size of this
271/// buffer is select via Cargo features and defaults to 2048 bytes. It does not allocate
272/// memory during run-time and thus is suitable for embedded systems where  allocation is
273/// not possible. Furthermore, it uses the [VirtualFilestore] abstraction to allow
274/// usage on systems without a [std] filesystem.
275/// This handler does not support concurrency out of the box. Instead, if concurrent handling
276/// is required, it is recommended to create a new handler and run all active handlers inside a
277/// thread pool, or move the newly created handler to a new thread.
278pub struct SourceHandler<
279    PduSenderInstance: PduSender,
280    UserFaultHookInstance: UserFaultHook,
281    Vfs: VirtualFilestore,
282    RemoteConfigStoreInstance: RemoteConfigStore,
283    TimerCreatorInstance: TimerCreator<Countdown = CountdownInstance>,
284    CountdownInstance: Countdown,
285    SequenceCounterInstance: SequenceCounter,
286> {
287    local_cfg: LocalEntityConfig<UserFaultHookInstance>,
288    pdu_sender: PduSenderInstance,
289    pdu_and_cksum_buffer: RefCell<[u8; crate::buf_len::PACKET_BUF_LEN]>,
290    put_request_cacher: StaticPutRequestCacher<{ crate::buf_len::PACKET_BUF_LEN }>,
291    remote_cfg_table: RemoteConfigStoreInstance,
292    vfs: Vfs,
293    state_helper: StateHelper,
294    transaction_params: TransactionParams<CountdownInstance>,
295    timer_creator: TimerCreatorInstance,
296    seq_count_provider: SequenceCounterInstance,
297    anomalies: AnomalyTracker,
298}
299
300impl<
301    PduSenderInstance: PduSender,
302    UserFaultHookInstance: UserFaultHook,
303    Vfs: VirtualFilestore,
304    RemoteConfigStoreInstance: RemoteConfigStore,
305    TimerCreatorInstance: TimerCreator<Countdown = CountdownInstance>,
306    CountdownInstance: Countdown,
307    SequenceCounterInstance: SequenceCounter,
308>
309    SourceHandler<
310        PduSenderInstance,
311        UserFaultHookInstance,
312        Vfs,
313        RemoteConfigStoreInstance,
314        TimerCreatorInstance,
315        CountdownInstance,
316        SequenceCounterInstance,
317    >
318{
319    /// Creates a new instance of a source handler.
320    ///
321    /// # Arguments
322    ///
323    /// * `cfg` - The local entity configuration for this source handler.
324    /// * `pdu_sender` - [PduSender] used to send CFDP PDUs generated by the handler.
325    /// * `vfs` - [VirtualFilestore] implementation used by the handler, which decouples the CFDP
326    ///   implementation from the underlying filestore/filesystem. This allows to use this handler
327    ///   for embedded systems where a standard runtime might not be available.
328    /// * `put_request_cacher` - The put request cacher is used cache put requests without
329    ///   requiring run-time allocation.
330    /// * `remote_cfg_table` - The [RemoteEntityConfig] used to look up remote
331    ///   entities and target specific configuration for file copy operations.
332    /// * `timer_creator` - [TimerCreator] used by the CFDP handler to generate
333    ///   timers required by various tasks. This allows to use this handler for embedded systems
334    ///   where the standard time APIs might not be available.
335    /// * `seq_count_provider` - The [SequenceCounter] used to generate the [TransactionId]
336    ///   which contains an incrementing counter.
337    #[allow(clippy::too_many_arguments)]
338    pub fn new(
339        cfg: LocalEntityConfig<UserFaultHookInstance>,
340        pdu_sender: PduSenderInstance,
341        vfs: Vfs,
342        remote_cfg_table: RemoteConfigStoreInstance,
343        timer_creator: TimerCreatorInstance,
344        seq_count_provider: SequenceCounterInstance,
345    ) -> Self {
346        Self {
347            local_cfg: cfg,
348            remote_cfg_table,
349            pdu_sender,
350            pdu_and_cksum_buffer: RefCell::new([0; crate::buf_len::PACKET_BUF_LEN]),
351            vfs,
352            put_request_cacher: StaticPutRequestCacher::<{ crate::buf_len::PACKET_BUF_LEN }>::new(),
353            state_helper: Default::default(),
354            transaction_params: Default::default(),
355            anomalies: Default::default(),
356            timer_creator,
357            seq_count_provider,
358        }
359    }
360
361    /// Calls [Self::state_machine], without inserting a packet.
362    pub fn state_machine_no_packet(
363        &mut self,
364        cfdp_user: &mut impl CfdpUser,
365    ) -> Result<u32, SourceError> {
366        self.state_machine(cfdp_user, None::<&DummyPduProvider>)
367    }
368
369    /// This is the core function to drive the source handler. It is also used to insert
370    /// packets into the source handler.
371    ///
372    /// The state machine should either be called if a packet with the appropriate destination ID
373    /// is received, or periodically in IDLE periods to perform all CFDP related tasks, for example
374    /// checking for timeouts or missed file segments.
375    ///
376    /// The function returns the number of sent PDU packets on success.
377    pub fn state_machine(
378        &mut self,
379        cfdp_user: &mut impl CfdpUser,
380        pdu: Option<&impl PduProvider>,
381    ) -> Result<u32, SourceError> {
382        let mut sent_packets = 0;
383        if let Some(packet) = pdu {
384            sent_packets += self.insert_packet(cfdp_user, packet)?;
385        }
386        match self.state() {
387            super::State::Idle => {
388                // TODO: In acknowledged mode, add timer handling.
389                Ok(0)
390            }
391            super::State::Busy => {
392                sent_packets += self.fsm_busy(cfdp_user, pdu)?;
393                Ok(sent_packets)
394            }
395            super::State::Suspended => {
396                // There is now way to suspend the handler currently anyway.
397                Ok(0)
398            }
399        }
400    }
401
402    #[inline]
403    pub fn transaction_id(&self) -> Option<TransactionId> {
404        self.transaction_params.transaction_id
405    }
406
407    /// Returns the [TransmissionMode] for the active file operation.
408    #[inline]
409    pub fn transmission_mode(&self) -> Option<super::TransmissionMode> {
410        self.transaction_params.transmission_mode
411    }
412
413    /// Get the [TransactionStep], which denotes the exact step of a pending CFDP transaction when
414    /// applicable.
415    #[inline]
416    pub fn step(&self) -> TransactionStep {
417        self.state_helper.step.get()
418    }
419
420    #[inline]
421    pub fn state(&self) -> State {
422        self.state_helper.state.get()
423    }
424
425    #[inline]
426    pub fn local_cfg(&self) -> &LocalEntityConfig<UserFaultHookInstance> {
427        &self.local_cfg
428    }
429
430    /// This function is used to pass a put request to the source handler, which is
431    /// also used to start a file copy operation. As such, this function models the Put.request
432    /// CFDP primtiive.
433    ///
434    /// Please note that the source handler can also process one put request at a time.
435    /// The caller is responsible of creating a new source handler, one handler can only handle
436    /// one file copy request at a time.
437    pub fn put_request(
438        &mut self,
439        put_request: &impl ReadablePutRequest,
440    ) -> Result<(), PutRequestError> {
441        if self.state() != super::State::Idle {
442            return Err(PutRequestError::AlreadyBusy);
443        }
444        self.put_request_cacher.set(put_request)?;
445        let remote_cfg = self.remote_cfg_table.get(
446            self.put_request_cacher
447                .static_fields
448                .destination_id
449                .value_const(),
450        );
451        if remote_cfg.is_none() {
452            return Err(PutRequestError::NoRemoteCfgFound(
453                self.put_request_cacher.static_fields.destination_id,
454            ));
455        }
456        let remote_cfg = remote_cfg.unwrap();
457        self.state_helper.num_packets_ready.set(0);
458        let transmission_mode = if self.put_request_cacher.static_fields.trans_mode.is_some() {
459            self.put_request_cacher.static_fields.trans_mode.unwrap()
460        } else {
461            remote_cfg.default_transmission_mode
462        };
463        let closure_requested = if self
464            .put_request_cacher
465            .static_fields
466            .closure_requested
467            .is_some()
468        {
469            self.put_request_cacher
470                .static_fields
471                .closure_requested
472                .unwrap()
473        } else {
474            remote_cfg.closure_requested_by_default
475        };
476        if self.put_request_cacher.has_source_file()
477            && !self.vfs.exists(self.put_request_cacher.source_file()?)?
478        {
479            return Err(PutRequestError::FileDoesNotExist);
480        }
481
482        let transaction_id = TransactionId::new(
483            self.local_cfg().id,
484            UnsignedByteField::new(
485                SequenceCounterInstance::MAX_BIT_WIDTH / 8,
486                self.seq_count_provider.get_and_increment().into(),
487            ),
488        );
489        // Both the source entity and destination entity ID field must have the same size.
490        // We use the larger of either the Put Request destination ID or the local entity ID
491        // as the size for the new entity IDs.
492        let larger_entity_width = core::cmp::max(
493            self.local_cfg.id.size(),
494            self.put_request_cacher.static_fields.destination_id.size(),
495        );
496        let create_id = |cached_id: &UnsignedByteField| {
497            if larger_entity_width != cached_id.size() {
498                UnsignedByteField::new(larger_entity_width, cached_id.value_const())
499            } else {
500                *cached_id
501            }
502        };
503
504        // Set PDU configuration fields which are important for generating PDUs.
505        self.transaction_params
506            .pdu_conf
507            .set_source_and_dest_id(
508                create_id(&self.local_cfg.id),
509                create_id(&self.put_request_cacher.static_fields.destination_id),
510            )
511            .unwrap();
512        // Set up other PDU configuration fields.
513        self.transaction_params.pdu_conf.direction = Direction::TowardsReceiver;
514        self.transaction_params.pdu_conf.crc_flag =
515            remote_cfg.crc_on_transmission_by_default.into();
516        self.transaction_params.pdu_conf.transaction_seq_num = *transaction_id.seq_num();
517        self.transaction_params.pdu_conf.trans_mode = transmission_mode;
518        self.transaction_params.file_params.segment_len =
519            self.calculate_max_file_seg_len(remote_cfg);
520
521        self.transaction_params.transaction_id = Some(transaction_id);
522        self.transaction_params.remote_cfg = Some(*remote_cfg);
523        self.transaction_params.transmission_mode = Some(transmission_mode);
524        self.transaction_params.closure_requested = closure_requested;
525        self.transaction_params.cond_code_eof.set(None);
526        self.transaction_params.finished_params = None;
527
528        self.state_helper.state.set(super::State::Busy);
529        Ok(())
530    }
531
532    fn insert_packet(
533        &mut self,
534        _cfdp_user: &mut impl CfdpUser,
535        packet_to_insert: &impl PduProvider,
536    ) -> Result<u32, SourceError> {
537        if packet_to_insert.packet_target()? != PacketTarget::SourceEntity {
538            // Unwrap is okay here, a PacketInfo for a file data PDU should always have the
539            // destination as the target.
540            return Err(SourceError::CantProcessPacketType {
541                pdu_type: packet_to_insert.pdu_type(),
542                directive_type: packet_to_insert.file_directive_type(),
543            });
544        }
545        if packet_to_insert.pdu_type() == PduType::FileData {
546            // The [PacketInfo] API should ensure that file data PDUs can not be passed
547            // into a source entity, so this should never happen.
548            return Err(SourceError::UnexpectedPdu {
549                pdu_type: PduType::FileData,
550                directive_type: None,
551            });
552        }
553        let mut sent_packets = 0;
554
555        // Unwrap is okay here, the [PacketInfo] API should ensure that the directive type is
556        // always a valid value.
557        match packet_to_insert
558            .file_directive_type()
559            .expect("PDU directive type unexpectedly not set")
560        {
561            FileDirectiveType::FinishedPdu => {
562                let finished_pdu = FinishedPduReader::new(packet_to_insert.raw_pdu())?;
563                self.handle_finished_pdu(&finished_pdu)?
564            }
565            FileDirectiveType::NakPdu => {
566                let nak_pdu = NakPduReader::new(packet_to_insert.raw_pdu())?;
567                sent_packets += self.handle_nak_pdu(&nak_pdu)?;
568            }
569            FileDirectiveType::KeepAlivePdu => self.handle_keep_alive_pdu(),
570            FileDirectiveType::AckPdu => {
571                let ack_pdu = AckPdu::from_bytes(packet_to_insert.raw_pdu())?;
572                self.handle_ack_pdu(&ack_pdu)?
573            }
574            FileDirectiveType::EofPdu
575            | FileDirectiveType::PromptPdu
576            | FileDirectiveType::MetadataPdu => {
577                return Err(SourceError::CantProcessPacketType {
578                    pdu_type: packet_to_insert.pdu_type(),
579                    directive_type: packet_to_insert.file_directive_type(),
580                });
581            }
582        }
583        Ok(sent_packets)
584    }
585
586    /// This functions models the Cancel.request CFDP primitive and is the recommended way to
587    /// cancel a transaction.
588    ///
589    /// This method will cause a Notice of Cancellation at this entity if a transaction is active
590    /// and the passed transaction ID matches the currently active transaction ID. Please note
591    /// that the state machine might still be active because a cancelled transfer might still
592    /// require some packets to be sent to the remote receiver entity.
593    ///
594    /// If not unexpected errors occur, this method returns [true] if the transfer was cancelled
595    /// propery and [false] if there is no transaction active or the passed transaction ID and the
596    /// active ID do not match.
597    pub fn cancel_request(
598        &mut self,
599        user: &mut impl CfdpUser,
600        transaction_id: &TransactionId,
601    ) -> Result<bool, SourceError> {
602        if self.state() == super::State::Idle {
603            return Ok(false);
604        }
605        if let Some(active_id) = self.transaction_id() {
606            if active_id == *transaction_id {
607                // Control flow result can be ignored here for the cancel request.
608                self.notice_of_cancellation(user, ConditionCode::CancelRequestReceived)?;
609                return Ok(true);
610            }
611        }
612        Ok(false)
613    }
614
615    /// This function is public to allow completely resetting the handler, but it is explicitely
616    /// discouraged to do this. CFDP has mechanism to detect issues and errors on itself.
617    /// Resetting the handler might interfere with these mechanisms and lead to unexpected
618    /// behaviour.
619    pub fn reset(&mut self) {
620        self.state_helper = Default::default();
621        self.transaction_params.reset();
622    }
623
624    #[inline]
625    fn set_step(&mut self, step: TransactionStep) {
626        self.set_step_internal(step);
627    }
628
629    #[inline]
630    fn set_step_internal(&self, step: TransactionStep) {
631        self.state_helper.step.set(step);
632    }
633
634    fn fsm_busy(
635        &mut self,
636        user: &mut impl CfdpUser,
637        _pdu: Option<&impl PduProvider>,
638    ) -> Result<u32, SourceError> {
639        let mut sent_packets = 0;
640        if self.step() == TransactionStep::Idle {
641            self.set_step(TransactionStep::TransactionStart);
642        }
643        if self.step() == TransactionStep::TransactionStart {
644            self.handle_transaction_start(user)?;
645            self.set_step(TransactionStep::SendingMetadata);
646        }
647        if self.step() == TransactionStep::SendingMetadata {
648            self.prepare_and_send_metadata_pdu()?;
649            self.set_step(TransactionStep::SendingFileData);
650            sent_packets += 1;
651        }
652        if self.step() == TransactionStep::SendingFileData {
653            if let ControlFlow::Break(packets) = self.file_data_fsm()? {
654                sent_packets += packets;
655                // Exit for each file data PDU to allow flow control.
656                return Ok(sent_packets);
657            }
658        }
659        if self.step() == TransactionStep::SendingEof {
660            self.eof_fsm(user)?;
661            sent_packets += 1;
662        }
663        if self.step() == TransactionStep::WaitingForEofAck {
664            sent_packets += self.handle_waiting_for_ack_pdu(user)?;
665        }
666        if self.step() == TransactionStep::WaitingForFinished {
667            sent_packets += self.handle_waiting_for_finished_pdu(user)?;
668        }
669        if self.step() == TransactionStep::NoticeOfCompletion {
670            self.notice_of_completion(user);
671            self.reset();
672        }
673        Ok(sent_packets)
674    }
675
676    fn handle_waiting_for_ack_pdu(&mut self, user: &mut impl CfdpUser) -> Result<u32, SourceError> {
677        self.handle_positive_ack_procedures(user)
678    }
679
680    fn handle_positive_ack_procedures(
681        &mut self,
682        user: &mut impl CfdpUser,
683    ) -> Result<u32, SourceError> {
684        let mut sent_packets = 0;
685        let current_params = self.transaction_params.positive_ack_params.get();
686        if let Some(mut positive_ack_params) = current_params {
687            if self
688                .transaction_params
689                .ack_timer
690                .borrow_mut()
691                .as_ref()
692                .unwrap()
693                .has_expired()
694            {
695                let ack_timer_exp_limit = self
696                    .transaction_params
697                    .remote_cfg
698                    .as_ref()
699                    .unwrap()
700                    .positive_ack_timer_expiration_limit;
701                if positive_ack_params.ack_counter + 1 >= ack_timer_exp_limit {
702                    let (fault_packets_sent, ctx) =
703                        self.declare_fault(user, ConditionCode::PositiveAckLimitReached)?;
704                    sent_packets += fault_packets_sent;
705                    if ctx == FsmContext::ResetWhenPossible {
706                        self.reset();
707                    } else {
708                        positive_ack_params.ack_counter = 0;
709                        positive_ack_params.positive_ack_of_cancellation = true;
710                    }
711                } else {
712                    self.transaction_params
713                        .ack_timer
714                        .borrow_mut()
715                        .as_mut()
716                        .unwrap()
717                        .reset();
718                    positive_ack_params.ack_counter += 1;
719                    self.prepare_and_send_eof_pdu(
720                        user,
721                        self.transaction_params
722                            .file_params
723                            .checksum_completed_file
724                            .unwrap(),
725                    )?;
726                    sent_packets += 1;
727                }
728            }
729
730            self.transaction_params
731                .positive_ack_params
732                .set(Some(positive_ack_params));
733        }
734        Ok(sent_packets)
735    }
736
737    fn handle_retransmission(&mut self, nak_pdu: &NakPduReader) -> Result<u32, SourceError> {
738        let mut sent_packets = 0;
739        let segment_req_iter = nak_pdu.get_segment_requests_iterator().unwrap();
740        for segment_req in segment_req_iter {
741            // Special case: Metadata PDU is re-requested.
742            if segment_req.0 == 0 && segment_req.1 == 0 {
743                self.prepare_and_send_metadata_pdu()?;
744                sent_packets += 1;
745                continue;
746            } else {
747                if (segment_req.1 < segment_req.0)
748                    || (segment_req.0 > self.transaction_params.file_params.progress)
749                {
750                    return Err(SourceError::InvalidNakPdu);
751                }
752                let mut missing_chunk_len = segment_req.1 - segment_req.0;
753                let current_offset = segment_req.0;
754                while missing_chunk_len > 0 {
755                    let chunk_size = core::cmp::min(
756                        missing_chunk_len,
757                        self.transaction_params.file_params.segment_len,
758                    );
759                    self.prepare_and_send_file_data_pdu(current_offset, chunk_size)?;
760                    sent_packets += 1;
761                    missing_chunk_len -= missing_chunk_len;
762                }
763            }
764        }
765        Ok(sent_packets)
766    }
767
768    fn handle_waiting_for_finished_pdu(
769        &mut self,
770        user: &mut impl CfdpUser,
771    ) -> Result<u32, SourceError> {
772        // If we reach this state, countdown definitely is set.
773        #[allow(clippy::collapsible_if)]
774        if self.transmission_mode().unwrap() == TransmissionMode::Unacknowledged
775            && self
776                .transaction_params
777                .check_timer
778                .as_ref()
779                .unwrap()
780                .has_expired()
781        {
782            let (sent_packets, ctx) = self.declare_fault(user, ConditionCode::CheckLimitReached)?;
783            if ctx == FsmContext::ResetWhenPossible {
784                self.reset();
785            }
786            return Ok(sent_packets);
787        }
788        Ok(0)
789    }
790
791    fn eof_fsm(&mut self, user: &mut impl CfdpUser) -> Result<(), SourceError> {
792        let checksum = self.vfs.calculate_checksum(
793            self.put_request_cacher.source_file().unwrap(),
794            self.transaction_params
795                .remote_cfg
796                .as_ref()
797                .unwrap()
798                .default_crc_type,
799            self.transaction_params.file_params.file_size,
800            self.pdu_and_cksum_buffer.borrow_mut().as_mut_slice(),
801        )?;
802        self.transaction_params.file_params.checksum_completed_file = Some(checksum);
803        self.prepare_and_send_eof_pdu(user, checksum)?;
804        if self.transmission_mode().unwrap() == TransmissionMode::Unacknowledged {
805            if self.transaction_params.closure_requested {
806                self.transaction_params.check_timer = Some(
807                    self.timer_creator
808                        .create_countdown(crate::TimerContext::CheckLimit {
809                            local_id: self.local_cfg.id,
810                            remote_id: self
811                                .transaction_params
812                                .remote_cfg
813                                .as_ref()
814                                .unwrap()
815                                .entity_id,
816                            entity_type: EntityType::Sending,
817                        }),
818                );
819                self.set_step(TransactionStep::WaitingForFinished);
820            } else {
821                self.set_step(TransactionStep::NoticeOfCompletion);
822            }
823        } else {
824            self.start_positive_ack_procedure();
825        }
826        Ok(())
827    }
828
829    fn start_positive_ack_procedure(&self) {
830        self.set_step_internal(TransactionStep::WaitingForEofAck);
831        match self.transaction_params.positive_ack_params.get() {
832            Some(mut current) => {
833                current.ack_counter = 0;
834                self.transaction_params
835                    .positive_ack_params
836                    .set(Some(current));
837            }
838            None => self
839                .transaction_params
840                .positive_ack_params
841                .set(Some(PositiveAckParams {
842                    ack_counter: 0,
843                    positive_ack_of_cancellation: false,
844                })),
845        }
846
847        *self.transaction_params.ack_timer.borrow_mut() = Some(
848            self.timer_creator
849                .create_countdown(crate::TimerContext::PositiveAck {
850                    expiry_time: self
851                        .transaction_params
852                        .remote_cfg
853                        .as_ref()
854                        .unwrap()
855                        .positive_ack_timer_interval,
856                }),
857        );
858    }
859
860    fn handle_transaction_start(
861        &mut self,
862        cfdp_user: &mut impl CfdpUser,
863    ) -> Result<(), SourceError> {
864        if !self.put_request_cacher.has_source_file() {
865            self.transaction_params.file_params.metadata_only = true;
866        } else {
867            let source_file = self
868                .put_request_cacher
869                .source_file()
870                .map_err(SourceError::SourceFileNotValidUtf8)?;
871            if !self.vfs.exists(source_file)? {
872                return Err(SourceError::FilestoreError(
873                    FilestoreError::FileDoesNotExist,
874                ));
875            }
876            // We expect the destination file path to consist of valid UTF-8 characters as well.
877            self.put_request_cacher
878                .dest_file()
879                .map_err(SourceError::DestFileNotValidUtf8)?;
880            self.transaction_params.file_params.file_size = self.vfs.file_size(source_file)?;
881            if self.transaction_params.file_params.file_size > u32::MAX as u64 {
882                self.transaction_params.pdu_conf.file_flag = LargeFileFlag::Large
883            } else {
884                if self.transaction_params.file_params.file_size == 0 {
885                    self.transaction_params.file_params.empty_file = true;
886                }
887                self.transaction_params.pdu_conf.file_flag = LargeFileFlag::Normal
888            }
889        }
890        cfdp_user.transaction_indication(&self.transaction_id().unwrap());
891        Ok(())
892    }
893
894    fn prepare_and_send_ack_pdu(
895        &mut self,
896        condition_code: ConditionCode,
897        transaction_status: TransactionStatus,
898    ) -> Result<(), SourceError> {
899        let ack_pdu = AckPdu::new(
900            PduHeader::new_for_file_directive(self.transaction_params.pdu_conf, 0),
901            FileDirectiveType::FinishedPdu,
902            condition_code,
903            transaction_status,
904        )
905        .map_err(PduError::from)?;
906        self.pdu_send_helper(&ack_pdu)?;
907        Ok(())
908    }
909
910    fn prepare_and_send_metadata_pdu(&mut self) -> Result<(), SourceError> {
911        let metadata_params = MetadataGenericParams::new(
912            self.transaction_params.closure_requested,
913            self.transaction_params
914                .remote_cfg
915                .as_ref()
916                .unwrap()
917                .default_crc_type,
918            self.transaction_params.file_params.file_size,
919        );
920        if self.transaction_params.file_params.metadata_only {
921            let metadata_pdu = MetadataPduCreator::new(
922                PduHeader::new_for_file_directive(self.transaction_params.pdu_conf, 0),
923                metadata_params,
924                Lv::new_empty(),
925                Lv::new_empty(),
926                self.put_request_cacher.opts_slice(),
927            );
928            return self.pdu_send_helper(&metadata_pdu);
929        }
930        let metadata_pdu = MetadataPduCreator::new(
931            PduHeader::new_for_file_directive(self.transaction_params.pdu_conf, 0),
932            metadata_params,
933            Lv::new_from_str(self.put_request_cacher.source_file().unwrap()).unwrap(),
934            Lv::new_from_str(self.put_request_cacher.dest_file().unwrap()).unwrap(),
935            self.put_request_cacher.opts_slice(),
936        );
937        self.pdu_send_helper(&metadata_pdu)
938    }
939
940    fn file_data_fsm(&mut self) -> Result<ControlFlow<u32>, SourceError> {
941        if !self.transaction_params.file_params.metadata_only
942            && self.transaction_params.file_params.progress
943                < self.transaction_params.file_params.file_size
944            && self.send_progressing_file_data_pdu()?
945        {
946            return Ok(ControlFlow::Break(1));
947        }
948        if self.transaction_params.file_params.empty_file
949            || self.transaction_params.file_params.progress
950                >= self.transaction_params.file_params.file_size
951        {
952            // EOF is still expected.
953            self.set_step(TransactionStep::SendingEof);
954            self.transaction_params
955                .cond_code_eof
956                .set(Some(ConditionCode::NoError));
957        } else if self.transaction_params.file_params.metadata_only {
958            // Special case: Metadata Only, no EOF required.
959            if self.transaction_params.closure_requested {
960                self.set_step(TransactionStep::WaitingForFinished);
961            } else {
962                self.set_step(TransactionStep::NoticeOfCompletion);
963            }
964        }
965        Ok(ControlFlow::Continue(()))
966    }
967
968    fn notice_of_completion(&mut self, cfdp_user: &mut impl CfdpUser) {
969        if self.local_cfg.indication_cfg.transaction_finished {
970            // The first case happens for unacknowledged file copy operation with no closure.
971            let finished_params = match self.transaction_params.finished_params {
972                Some(finished_params) => TransactionFinishedParams {
973                    id: self.transaction_id().unwrap(),
974                    condition_code: finished_params.condition_code,
975                    delivery_code: finished_params.delivery_code,
976                    file_status: finished_params.file_status,
977                },
978                None => TransactionFinishedParams {
979                    id: self.transaction_id().unwrap(),
980                    condition_code: ConditionCode::NoError,
981                    delivery_code: DeliveryCode::Complete,
982                    file_status: FileStatus::Unreported,
983                },
984            };
985            cfdp_user.transaction_finished_indication(&finished_params);
986        }
987    }
988
989    fn calculate_max_file_seg_len(&self, remote_cfg: &RemoteEntityConfig) -> u64 {
990        let mut derived_max_seg_len = calculate_max_file_seg_len_for_max_packet_len_and_pdu_header(
991            &PduHeader::new_for_file_directive(self.transaction_params.pdu_conf, 0),
992            remote_cfg.max_packet_len,
993            None,
994        );
995        if remote_cfg.max_file_segment_len.is_some() {
996            derived_max_seg_len = core::cmp::min(
997                remote_cfg.max_file_segment_len.unwrap(),
998                derived_max_seg_len,
999            );
1000        }
1001        derived_max_seg_len as u64
1002    }
1003
1004    fn send_progressing_file_data_pdu(&mut self) -> Result<bool, SourceError> {
1005        // Should never be called, but use defensive programming here.
1006        if self.transaction_params.file_params.progress
1007            >= self.transaction_params.file_params.file_size
1008        {
1009            return Ok(false);
1010        }
1011        let read_len = self.transaction_params.file_params.segment_len.min(
1012            self.transaction_params.file_params.file_size
1013                - self.transaction_params.file_params.progress,
1014        );
1015        self.prepare_and_send_file_data_pdu(
1016            self.transaction_params.file_params.progress,
1017            read_len,
1018        )?;
1019        Ok(true)
1020    }
1021
1022    fn prepare_and_send_file_data_pdu(
1023        &mut self,
1024        offset: u64,
1025        size: u64,
1026    ) -> Result<(), SourceError> {
1027        let pdu_creator = FileDataPduCreatorWithReservedDatafield::new_no_seg_metadata(
1028            PduHeader::new_for_file_data(
1029                self.transaction_params.pdu_conf,
1030                0,
1031                SegmentMetadataFlag::NotPresent,
1032                SegmentationControl::NoRecordBoundaryPreservation,
1033            ),
1034            offset,
1035            size,
1036        );
1037        let mut unwritten_pdu =
1038            pdu_creator.write_to_bytes_partially(self.pdu_and_cksum_buffer.get_mut())?;
1039        self.vfs.read_data(
1040            self.put_request_cacher.source_file().unwrap(),
1041            offset,
1042            size,
1043            unwritten_pdu.file_data_field_mut(),
1044        )?;
1045        let written_len = unwritten_pdu.finish();
1046        self.pdu_sender.send_pdu(
1047            PduType::FileData,
1048            None,
1049            &self.pdu_and_cksum_buffer.borrow()[0..written_len],
1050        )?;
1051        self.transaction_params.file_params.progress += size;
1052        Ok(())
1053    }
1054
1055    fn prepare_and_send_eof_pdu(
1056        &self,
1057        cfdp_user: &mut impl CfdpUser,
1058        checksum: u32,
1059    ) -> Result<(), SourceError> {
1060        let eof_pdu = EofPdu::new(
1061            PduHeader::new_for_file_directive(self.transaction_params.pdu_conf, 0),
1062            self.transaction_params
1063                .cond_code_eof
1064                .get()
1065                .unwrap_or(ConditionCode::NoError),
1066            checksum,
1067            self.transaction_params.file_params.progress,
1068            None,
1069        );
1070        self.pdu_send_helper(&eof_pdu)?;
1071        if self.local_cfg.indication_cfg.eof_sent {
1072            cfdp_user.eof_sent_indication(&self.transaction_id().unwrap());
1073        }
1074        Ok(())
1075    }
1076
1077    fn pdu_send_helper(&self, pdu: &(impl WritablePduPacket + CfdpPdu)) -> Result<(), SourceError> {
1078        let mut pdu_buffer_mut = self.pdu_and_cksum_buffer.borrow_mut();
1079        let written_len = pdu.write_to_bytes(pdu_buffer_mut.as_mut_slice())?;
1080        self.pdu_sender.send_pdu(
1081            pdu.pdu_type(),
1082            pdu.file_directive_type(),
1083            &pdu_buffer_mut[0..written_len],
1084        )?;
1085        Ok(())
1086    }
1087
1088    fn handle_finished_pdu(&mut self, finished_pdu: &FinishedPduReader) -> Result<(), SourceError> {
1089        // Ignore this packet when we are idle.
1090        if self.state() == State::Idle {
1091            return Ok(());
1092        }
1093        if self.step() != TransactionStep::WaitingForFinished {
1094            return Err(SourceError::UnexpectedPdu {
1095                pdu_type: PduType::FileDirective,
1096                directive_type: Some(FileDirectiveType::FinishedPdu),
1097            });
1098        }
1099        // Unwrapping should be fine here, the transfer state is valid when we are not in IDLE
1100        // mode.
1101        self.transaction_params.finished_params = Some(FinishedParams {
1102            condition_code: finished_pdu.condition_code(),
1103            delivery_code: finished_pdu.delivery_code(),
1104            file_status: finished_pdu.file_status(),
1105        });
1106        if let Some(TransmissionMode::Acknowledged) = self.transmission_mode() {
1107            self.prepare_and_send_ack_pdu(
1108                finished_pdu.condition_code(),
1109                TransactionStatus::Active,
1110            )?;
1111        }
1112        self.set_step(TransactionStep::NoticeOfCompletion);
1113        Ok(())
1114    }
1115
1116    fn handle_nak_pdu(&mut self, nak_pdu: &NakPduReader) -> Result<u32, SourceError> {
1117        self.handle_retransmission(nak_pdu)
1118    }
1119
1120    fn handle_ack_pdu(&mut self, ack_pdu: &AckPdu) -> Result<(), SourceError> {
1121        if self.step() != TransactionStep::WaitingForEofAck {
1122            // Drop the packet, wrong state to handle it..
1123            return Err(SourceError::UnexpectedPdu {
1124                pdu_type: PduType::FileDirective,
1125                directive_type: Some(FileDirectiveType::AckPdu),
1126            });
1127        }
1128        if ack_pdu.directive_code_of_acked_pdu() == FileDirectiveType::EofPdu {
1129            self.set_step(TransactionStep::WaitingForFinished);
1130        } else {
1131            self.anomalies.invalid_ack_directive_code =
1132                self.anomalies.invalid_ack_directive_code.wrapping_add(1);
1133        }
1134        Ok(())
1135    }
1136
1137    pub fn notice_of_cancellation(
1138        &mut self,
1139        user: &mut impl CfdpUser,
1140        condition_code: ConditionCode,
1141    ) -> Result<u32, SourceError> {
1142        let mut sent_packets = 0;
1143        let ctx = self.notice_of_cancellation_internal(user, condition_code, &mut sent_packets)?;
1144        if ctx == FsmContext::ResetWhenPossible {
1145            self.reset();
1146        }
1147        Ok(sent_packets)
1148    }
1149
1150    fn notice_of_cancellation_internal(
1151        &self,
1152        user: &mut impl CfdpUser,
1153        condition_code: ConditionCode,
1154        sent_packets: &mut u32,
1155    ) -> Result<FsmContext, SourceError> {
1156        self.transaction_params
1157            .cond_code_eof
1158            .set(Some(condition_code));
1159        // As specified in 4.11.2.2, prepare an EOF PDU to be sent to the remote entity. Supply
1160        // the checksum for the file copy progress sent so far.
1161        let checksum = self.vfs.calculate_checksum(
1162            self.put_request_cacher.source_file().unwrap(),
1163            self.transaction_params
1164                .remote_cfg
1165                .as_ref()
1166                .unwrap()
1167                .default_crc_type,
1168            self.transaction_params.file_params.progress,
1169            self.pdu_and_cksum_buffer.borrow_mut().as_mut_slice(),
1170        )?;
1171        self.prepare_and_send_eof_pdu(user, checksum)?;
1172        *sent_packets += 1;
1173        if self.transmission_mode().unwrap() == TransmissionMode::Unacknowledged {
1174            // We are done.
1175            Ok(FsmContext::ResetWhenPossible)
1176        } else {
1177            self.start_positive_ack_procedure();
1178            Ok(FsmContext::default())
1179        }
1180    }
1181
1182    pub fn notice_of_suspension(&mut self) {
1183        self.notice_of_suspension_internal();
1184    }
1185
1186    fn notice_of_suspension_internal(&self) {}
1187
1188    pub fn abandon_transaction(&mut self) {
1189        // I guess an abandoned transaction just stops whatever the handler is doing and resets
1190        // it to a clean state.. The implementation for this is quite easy.
1191        self.reset();
1192    }
1193
1194    // Returns the number of packets sent and a FSM context structure.
1195    fn declare_fault(
1196        &self,
1197        user: &mut impl CfdpUser,
1198        cond: ConditionCode,
1199    ) -> Result<(u32, FsmContext), SourceError> {
1200        let mut sent_packets = 0;
1201        let mut fh = self.local_cfg.fault_handler.get_fault_handler(cond);
1202        // CFDP standard 4.11.2.2.3: Any fault declared in the course of transferring
1203        // the EOF (cancel) PDU must result in abandonment of the transaction.
1204        if let Some(positive_ack_params) = self.transaction_params.positive_ack_params.get() {
1205            if positive_ack_params.positive_ack_of_cancellation {
1206                fh = FaultHandlerCode::AbandonTransaction;
1207            }
1208        }
1209        let mut ctx = FsmContext::default();
1210        match fh {
1211            FaultHandlerCode::NoticeOfCancellation => {
1212                ctx = self.notice_of_cancellation_internal(user, cond, &mut sent_packets)?;
1213            }
1214            FaultHandlerCode::NoticeOfSuspension => {
1215                self.notice_of_suspension_internal();
1216            }
1217            FaultHandlerCode::IgnoreError => (),
1218            FaultHandlerCode::AbandonTransaction => {
1219                ctx = FsmContext::ResetWhenPossible;
1220            }
1221        }
1222        self.local_cfg.fault_handler.report_fault(
1223            fh,
1224            FaultInfo::new(
1225                self.transaction_id().unwrap(),
1226                cond,
1227                self.transaction_params.file_params.progress,
1228            ),
1229        );
1230        Ok((sent_packets, ctx))
1231    }
1232
1233    fn handle_keep_alive_pdu(&mut self) {}
1234}
1235
1236#[cfg(test)]
1237mod tests {
1238    use std::{fs::OpenOptions, io::Write, path::PathBuf, vec::Vec};
1239
1240    use alloc::string::String;
1241    use rand::Rng;
1242    use spacepackets::{
1243        cfdp::{
1244            ChecksumType, CrcFlag,
1245            pdu::{
1246                file_data::FileDataPdu, finished::FinishedPduCreator, metadata::MetadataPduReader,
1247                nak::NakPduCreator,
1248            },
1249        },
1250        util::UnsignedByteFieldU16,
1251    };
1252    use tempfile::TempPath;
1253
1254    use super::*;
1255    use crate::{
1256        CRC_32, FaultHandler, IndicationConfig, PduRawWithInfo, RemoteConfigStoreStd,
1257        filestore::NativeFilestore,
1258        request::PutRequestOwned,
1259        source::TransactionStep,
1260        tests::{
1261            SentPdu, TestCfdpSender, TestCfdpUser, TestCheckTimer, TestCheckTimerCreator,
1262            TestFaultHandler, TimerExpiryControl, basic_remote_cfg_table,
1263        },
1264    };
1265    use spacepackets::seq_count::SequenceCounterSimple;
1266
1267    const LOCAL_ID: UnsignedByteFieldU16 = UnsignedByteFieldU16::new(1);
1268    const REMOTE_ID: UnsignedByteFieldU16 = UnsignedByteFieldU16::new(2);
1269    const INVALID_ID: UnsignedByteFieldU16 = UnsignedByteFieldU16::new(5);
1270
1271    fn init_full_filepaths_textfile() -> (TempPath, PathBuf) {
1272        (
1273            tempfile::NamedTempFile::new().unwrap().into_temp_path(),
1274            tempfile::TempPath::from_path("/tmp/test.txt").to_path_buf(),
1275        )
1276    }
1277
1278    type TestSourceHandler = SourceHandler<
1279        TestCfdpSender,
1280        TestFaultHandler,
1281        NativeFilestore,
1282        RemoteConfigStoreStd,
1283        TestCheckTimerCreator,
1284        TestCheckTimer,
1285        SequenceCounterSimple<u16>,
1286    >;
1287
1288    struct SourceHandlerTestbench {
1289        handler: TestSourceHandler,
1290        expiry_control: TimerExpiryControl,
1291        transmission_mode: TransmissionMode,
1292        #[allow(dead_code)]
1293        srcfile_handle: TempPath,
1294        srcfile: String,
1295        destfile: String,
1296        max_packet_len: usize,
1297        check_idle_on_drop: bool,
1298    }
1299
1300    #[allow(dead_code)]
1301    struct TransferInfo {
1302        id: TransactionId,
1303        file_size: u64,
1304        closure_requested: bool,
1305        pdu_header: PduHeader,
1306    }
1307
1308    #[derive(Debug, Clone, Copy)]
1309    struct EofParams {
1310        file_size: u64,
1311        file_checksum: u32,
1312        condition_code: ConditionCode,
1313    }
1314
1315    impl EofParams {
1316        pub const fn new_success(file_size: u64, file_checksum: u32) -> Self {
1317            Self {
1318                file_size,
1319                file_checksum,
1320                condition_code: ConditionCode::NoError,
1321            }
1322        }
1323    }
1324
1325    impl SourceHandlerTestbench {
1326        fn new(
1327            transmission_mode: TransmissionMode,
1328            crc_on_transmission_by_default: bool,
1329            max_packet_len: usize,
1330        ) -> Self {
1331            let local_entity_cfg = LocalEntityConfig {
1332                id: LOCAL_ID.into(),
1333                indication_cfg: IndicationConfig::default(),
1334                fault_handler: FaultHandler::new(TestFaultHandler::default()),
1335            };
1336            let (srcfile_handle, destfile) = init_full_filepaths_textfile();
1337            let srcfile = String::from(srcfile_handle.to_path_buf().to_str().unwrap());
1338            let expiry_control = TimerExpiryControl::default();
1339            let sender = TestCfdpSender::default();
1340            Self {
1341                handler: SourceHandler::new(
1342                    local_entity_cfg,
1343                    sender,
1344                    NativeFilestore::default(),
1345                    basic_remote_cfg_table(
1346                        REMOTE_ID,
1347                        max_packet_len,
1348                        crc_on_transmission_by_default,
1349                    ),
1350                    TestCheckTimerCreator::new(&expiry_control),
1351                    SequenceCounterSimple::default(),
1352                ),
1353                transmission_mode,
1354                expiry_control,
1355                srcfile_handle,
1356                srcfile,
1357                destfile: String::from(destfile.to_path_buf().to_str().unwrap()),
1358                max_packet_len,
1359                check_idle_on_drop: true,
1360            }
1361        }
1362
1363        fn create_user(&self, next_expected_seq_num: u64, filesize: u64) -> TestCfdpUser {
1364            TestCfdpUser::new(
1365                next_expected_seq_num,
1366                self.srcfile.clone(),
1367                self.destfile.clone(),
1368                filesize,
1369            )
1370        }
1371
1372        fn put_request(
1373            &mut self,
1374            put_request: &impl ReadablePutRequest,
1375        ) -> Result<(), PutRequestError> {
1376            self.handler.put_request(put_request)
1377        }
1378
1379        fn all_fault_queues_empty(&self) -> bool {
1380            self.handler
1381                .local_cfg
1382                .user_fault_hook()
1383                .borrow()
1384                .all_queues_empty()
1385        }
1386
1387        #[allow(dead_code)]
1388        fn test_fault_handler(&self) -> &RefCell<TestFaultHandler> {
1389            self.handler.local_cfg.user_fault_hook()
1390        }
1391
1392        fn test_fault_handler_mut(&mut self) -> &mut RefCell<TestFaultHandler> {
1393            self.handler.local_cfg.user_fault_hook_mut()
1394        }
1395
1396        fn pdu_queue_empty(&self) -> bool {
1397            self.handler.pdu_sender.queue_empty()
1398        }
1399
1400        fn get_next_sent_pdu(&self) -> Option<SentPdu> {
1401            self.handler.pdu_sender.retrieve_next_pdu()
1402        }
1403
1404        fn common_pdu_check_for_file_transfer(&self, pdu_header: &PduHeader, crc_flag: CrcFlag) {
1405            assert_eq!(
1406                pdu_header.seg_ctrl(),
1407                SegmentationControl::NoRecordBoundaryPreservation
1408            );
1409            assert_eq!(
1410                pdu_header.seg_metadata_flag(),
1411                SegmentMetadataFlag::NotPresent
1412            );
1413            assert_eq!(pdu_header.common_pdu_conf().source_id(), LOCAL_ID.into());
1414            assert_eq!(pdu_header.common_pdu_conf().dest_id(), REMOTE_ID.into());
1415            assert_eq!(pdu_header.common_pdu_conf().crc_flag, crc_flag);
1416            assert_eq!(
1417                pdu_header.common_pdu_conf().trans_mode,
1418                self.transmission_mode
1419            );
1420            assert_eq!(
1421                pdu_header.common_pdu_conf().direction,
1422                Direction::TowardsReceiver
1423            );
1424            assert_eq!(
1425                pdu_header.common_pdu_conf().file_flag,
1426                LargeFileFlag::Normal
1427            );
1428            assert_eq!(pdu_header.common_pdu_conf().transaction_seq_num.size(), 2);
1429        }
1430
1431        fn nak_for_file_segments(
1432            &mut self,
1433            cfdp_user: &mut TestCfdpUser,
1434            transfer_info: &TransferInfo,
1435            seg_reqs: &[(u32, u32)],
1436        ) {
1437            let nak_pdu = NakPduCreator::new_normal_file_size(
1438                transfer_info.pdu_header,
1439                0,
1440                transfer_info.file_size as u32,
1441                seg_reqs,
1442            )
1443            .unwrap();
1444            let nak_pdu_vec = nak_pdu.to_vec().unwrap();
1445            let packet_info = PduRawWithInfo::new(&nak_pdu_vec).unwrap();
1446            self.handler
1447                .state_machine(cfdp_user, Some(&packet_info))
1448                .unwrap();
1449        }
1450
1451        fn generic_file_transfer(
1452            &mut self,
1453            cfdp_user: &mut TestCfdpUser,
1454            with_closure: bool,
1455            file_data: Vec<u8>,
1456        ) -> (TransferInfo, u32) {
1457            let mut digest = CRC_32.digest();
1458            digest.update(&file_data);
1459            let checksum = digest.finalize();
1460            cfdp_user.expected_full_src_name = self.srcfile.clone();
1461            cfdp_user.expected_full_dest_name = self.destfile.clone();
1462            cfdp_user.expected_file_size = file_data.len() as u64;
1463            let put_request = PutRequestOwned::new_regular_request(
1464                REMOTE_ID.into(),
1465                &self.srcfile,
1466                &self.destfile,
1467                Some(self.transmission_mode),
1468                Some(with_closure),
1469            )
1470            .expect("creating put request failed");
1471            let transaction_info = self.common_file_transfer_init_with_metadata_check(
1472                cfdp_user,
1473                put_request,
1474                cfdp_user.expected_file_size,
1475            );
1476            let mut current_offset = 0;
1477            let chunks = file_data.chunks(
1478                calculate_max_file_seg_len_for_max_packet_len_and_pdu_header(
1479                    &transaction_info.pdu_header,
1480                    self.max_packet_len,
1481                    None,
1482                ),
1483            );
1484            let mut fd_pdus = 0;
1485            for segment in chunks {
1486                self.check_next_file_pdu(current_offset, segment);
1487                self.handler.state_machine_no_packet(cfdp_user).unwrap();
1488                fd_pdus += 1;
1489                current_offset += segment.len() as u64;
1490            }
1491            self.common_eof_pdu_check(
1492                cfdp_user,
1493                transaction_info.closure_requested,
1494                EofParams {
1495                    file_size: cfdp_user.expected_file_size,
1496                    file_checksum: checksum,
1497                    condition_code: ConditionCode::NoError,
1498                },
1499                1,
1500            );
1501            (transaction_info, fd_pdus)
1502        }
1503
1504        fn common_file_transfer_init_with_metadata_check(
1505            &mut self,
1506            cfdp_user: &mut TestCfdpUser,
1507            put_request: PutRequestOwned,
1508            file_size: u64,
1509        ) -> TransferInfo {
1510            assert_eq!(cfdp_user.transaction_indication_call_count, 0);
1511            assert_eq!(cfdp_user.eof_sent_call_count, 0);
1512
1513            self.put_request(&put_request)
1514                .expect("put_request call failed");
1515            assert_eq!(self.handler.state(), State::Busy);
1516            assert_eq!(self.handler.step(), TransactionStep::Idle);
1517            let transaction_id = self.handler.transaction_id().unwrap();
1518            let sent_packets = self
1519                .handler
1520                .state_machine_no_packet(cfdp_user)
1521                .expect("source handler FSM failure");
1522            assert_eq!(sent_packets, 2);
1523            assert!(!self.pdu_queue_empty());
1524            let next_pdu = self.get_next_sent_pdu().unwrap();
1525            assert!(!self.pdu_queue_empty());
1526            let metadata_pdu_reader = self.metadata_check(&next_pdu, file_size);
1527            let closure_requested = if let Some(closure_requested) = put_request.closure_requested {
1528                assert_eq!(
1529                    metadata_pdu_reader.metadata_params().closure_requested,
1530                    closure_requested
1531                );
1532                closure_requested
1533            } else {
1534                assert!(metadata_pdu_reader.metadata_params().closure_requested);
1535                metadata_pdu_reader.metadata_params().closure_requested
1536            };
1537            TransferInfo {
1538                pdu_header: *metadata_pdu_reader.pdu_header(),
1539                closure_requested,
1540                file_size,
1541                id: transaction_id,
1542            }
1543        }
1544
1545        fn metadata_check<'a>(
1546            &self,
1547            next_pdu: &'a SentPdu,
1548            file_size: u64,
1549        ) -> MetadataPduReader<'a> {
1550            assert_eq!(next_pdu.pdu_type, PduType::FileDirective);
1551            assert_eq!(
1552                next_pdu.file_directive_type,
1553                Some(FileDirectiveType::MetadataPdu)
1554            );
1555            let metadata_pdu =
1556                MetadataPduReader::new(&next_pdu.raw_pdu).expect("invalid metadata PDU format");
1557            self.common_pdu_check_for_file_transfer(metadata_pdu.pdu_header(), CrcFlag::NoCrc);
1558            assert_eq!(
1559                metadata_pdu
1560                    .src_file_name()
1561                    .value_as_str()
1562                    .unwrap()
1563                    .unwrap(),
1564                self.srcfile
1565            );
1566            assert_eq!(
1567                metadata_pdu
1568                    .dest_file_name()
1569                    .value_as_str()
1570                    .unwrap()
1571                    .unwrap(),
1572                self.destfile
1573            );
1574            assert_eq!(metadata_pdu.metadata_params().file_size, file_size);
1575            assert_eq!(
1576                metadata_pdu.metadata_params().checksum_type,
1577                ChecksumType::Crc32
1578            );
1579            assert_eq!(metadata_pdu.transmission_mode(), self.transmission_mode);
1580            assert_eq!(metadata_pdu.options(), &[]);
1581            metadata_pdu
1582        }
1583
1584        fn check_next_file_pdu(&mut self, expected_offset: u64, expected_data: &[u8]) {
1585            let next_pdu = self.get_next_sent_pdu().unwrap();
1586            assert_eq!(next_pdu.pdu_type, PduType::FileData);
1587            assert!(next_pdu.file_directive_type.is_none());
1588            let fd_pdu =
1589                FileDataPdu::from_bytes(&next_pdu.raw_pdu).expect("reading file data PDU failed");
1590            assert_eq!(fd_pdu.offset(), expected_offset);
1591            assert_eq!(fd_pdu.file_data(), expected_data);
1592            assert!(fd_pdu.segment_metadata().is_none());
1593        }
1594
1595        fn acknowledge_eof_pdu(
1596            &mut self,
1597            cfdp_user: &mut impl CfdpUser,
1598            transaction_info: &TransferInfo,
1599        ) {
1600            let ack_pdu = AckPdu::new(
1601                transaction_info.pdu_header,
1602                FileDirectiveType::EofPdu,
1603                ConditionCode::NoError,
1604                TransactionStatus::Active,
1605            )
1606            .expect("creating ACK PDU failed");
1607            let ack_pdu_vec = ack_pdu.to_vec().unwrap();
1608            let packet_info = PduRawWithInfo::new(&ack_pdu_vec).unwrap();
1609            self.handler
1610                .state_machine(cfdp_user, Some(&packet_info))
1611                .expect("state machine failed");
1612        }
1613
1614        fn common_finished_pdu_ack_check(&mut self) {
1615            assert!(!self.pdu_queue_empty());
1616            let next_pdu = self.get_next_sent_pdu().unwrap();
1617            assert!(self.pdu_queue_empty());
1618            assert_eq!(next_pdu.pdu_type, PduType::FileDirective);
1619            assert_eq!(
1620                next_pdu.file_directive_type,
1621                Some(FileDirectiveType::AckPdu)
1622            );
1623            let ack_pdu = AckPdu::from_bytes(&next_pdu.raw_pdu).unwrap();
1624            self.common_pdu_check_for_file_transfer(ack_pdu.pdu_header(), CrcFlag::NoCrc);
1625            assert_eq!(ack_pdu.condition_code(), ConditionCode::NoError);
1626            assert_eq!(
1627                ack_pdu.directive_code_of_acked_pdu(),
1628                FileDirectiveType::FinishedPdu
1629            );
1630            assert_eq!(ack_pdu.transaction_status(), TransactionStatus::Active);
1631        }
1632
1633        fn common_eof_pdu_check(
1634            &mut self,
1635            cfdp_user: &mut TestCfdpUser,
1636            closure_requested: bool,
1637            eof_params: EofParams,
1638            eof_sent_call_count: u32,
1639        ) {
1640            let next_pdu = self.get_next_sent_pdu().unwrap();
1641            assert_eq!(next_pdu.pdu_type, PduType::FileDirective);
1642            assert_eq!(
1643                next_pdu.file_directive_type,
1644                Some(FileDirectiveType::EofPdu)
1645            );
1646            let eof_pdu = EofPdu::from_bytes(&next_pdu.raw_pdu).expect("invalid EOF PDU format");
1647            self.common_pdu_check_for_file_transfer(eof_pdu.pdu_header(), CrcFlag::NoCrc);
1648            assert_eq!(eof_pdu.condition_code(), eof_params.condition_code);
1649            assert_eq!(eof_pdu.file_size(), eof_params.file_size);
1650            assert_eq!(eof_pdu.file_checksum(), eof_params.file_checksum);
1651            assert_eq!(
1652                eof_pdu
1653                    .pdu_header()
1654                    .common_pdu_conf()
1655                    .transaction_seq_num
1656                    .value_const(),
1657                0
1658            );
1659            if self.transmission_mode == TransmissionMode::Unacknowledged {
1660                if !closure_requested {
1661                    assert_eq!(self.handler.state(), State::Idle);
1662                    assert_eq!(self.handler.step(), TransactionStep::Idle);
1663                } else {
1664                    assert_eq!(self.handler.state(), State::Busy);
1665                    assert_eq!(self.handler.step(), TransactionStep::WaitingForFinished);
1666                }
1667            } else {
1668                assert_eq!(self.handler.state(), State::Busy);
1669                assert_eq!(self.handler.step(), TransactionStep::WaitingForEofAck);
1670            }
1671
1672            assert_eq!(cfdp_user.transaction_indication_call_count, 1);
1673            assert_eq!(cfdp_user.eof_sent_call_count, eof_sent_call_count);
1674            self.all_fault_queues_empty();
1675        }
1676
1677        fn common_tiny_file_transfer(
1678            &mut self,
1679            cfdp_user: &mut TestCfdpUser,
1680            with_closure: bool,
1681        ) -> (&'static str, TransferInfo) {
1682            let mut file = OpenOptions::new()
1683                .write(true)
1684                .open(&self.srcfile)
1685                .expect("opening file failed");
1686            let content_str = "Hello World!";
1687            file.write_all(content_str.as_bytes())
1688                .expect("writing file content failed");
1689            drop(file);
1690            let (transfer_info, fd_pdus) = self.generic_file_transfer(
1691                cfdp_user,
1692                with_closure,
1693                content_str.as_bytes().to_vec(),
1694            );
1695            assert_eq!(fd_pdus, 1);
1696            (content_str, transfer_info)
1697        }
1698
1699        // Finish handling: Simulate completion from the destination side by insert finished PDU.
1700        fn finish_handling(&mut self, user: &mut TestCfdpUser, transfer_info: &TransferInfo) {
1701            let finished_pdu = FinishedPduCreator::new_no_error(
1702                transfer_info.pdu_header,
1703                DeliveryCode::Complete,
1704                FileStatus::Retained,
1705            );
1706            let finished_pdu_vec = finished_pdu.to_vec().unwrap();
1707            let packet_info = PduRawWithInfo::new(&finished_pdu_vec).unwrap();
1708            self.handler
1709                .state_machine(user, Some(&packet_info))
1710                .unwrap();
1711        }
1712    }
1713
1714    impl Drop for SourceHandlerTestbench {
1715        fn drop(&mut self) {
1716            self.all_fault_queues_empty();
1717            if self.check_idle_on_drop {
1718                assert_eq!(self.handler.state(), State::Idle);
1719                assert_eq!(self.handler.step(), TransactionStep::Idle);
1720            }
1721        }
1722    }
1723
1724    #[test]
1725    fn test_basic() {
1726        let tb = SourceHandlerTestbench::new(TransmissionMode::Unacknowledged, false, 512);
1727        assert!(tb.handler.transmission_mode().is_none());
1728        assert!(tb.pdu_queue_empty());
1729    }
1730
1731    #[test]
1732    fn test_empty_file_transfer_not_acked_no_closure() {
1733        let mut tb = SourceHandlerTestbench::new(TransmissionMode::Unacknowledged, false, 512);
1734        let file_size = 0;
1735        let put_request = PutRequestOwned::new_regular_request(
1736            REMOTE_ID.into(),
1737            &tb.srcfile,
1738            &tb.destfile,
1739            Some(TransmissionMode::Unacknowledged),
1740            Some(false),
1741        )
1742        .expect("creating put request failed");
1743        let mut user = tb.create_user(0, file_size);
1744        let transfer_info =
1745            tb.common_file_transfer_init_with_metadata_check(&mut user, put_request, file_size);
1746        tb.common_eof_pdu_check(
1747            &mut user,
1748            transfer_info.closure_requested,
1749            EofParams::new_success(file_size, CRC_32.digest().finalize()),
1750            1,
1751        );
1752        user.verify_finished_indication(
1753            DeliveryCode::Complete,
1754            ConditionCode::NoError,
1755            transfer_info.id,
1756            FileStatus::Unreported,
1757        );
1758    }
1759
1760    #[test]
1761    fn test_empty_file_transfer_acked() {
1762        let mut tb = SourceHandlerTestbench::new(TransmissionMode::Acknowledged, false, 512);
1763        let file_size = 0;
1764        let put_request = PutRequestOwned::new_regular_request(
1765            REMOTE_ID.into(),
1766            &tb.srcfile,
1767            &tb.destfile,
1768            Some(TransmissionMode::Acknowledged),
1769            Some(false),
1770        )
1771        .expect("creating put request failed");
1772        let mut user = tb.create_user(0, file_size);
1773        let transaction_info =
1774            tb.common_file_transfer_init_with_metadata_check(&mut user, put_request, file_size);
1775        tb.common_eof_pdu_check(
1776            &mut user,
1777            transaction_info.closure_requested,
1778            EofParams::new_success(file_size, CRC_32.digest().finalize()),
1779            1,
1780        );
1781
1782        tb.acknowledge_eof_pdu(&mut user, &transaction_info);
1783        tb.finish_handling(&mut user, &transaction_info);
1784        tb.common_finished_pdu_ack_check();
1785        user.verify_finished_indication_retained(
1786            DeliveryCode::Complete,
1787            ConditionCode::NoError,
1788            transaction_info.id,
1789        );
1790    }
1791
1792    #[test]
1793    fn test_tiny_file_transfer_not_acked_no_closure() {
1794        let mut user = TestCfdpUser::default();
1795        let mut tb = SourceHandlerTestbench::new(TransmissionMode::Unacknowledged, false, 512);
1796        tb.common_tiny_file_transfer(&mut user, false);
1797    }
1798
1799    #[test]
1800    fn test_tiny_file_transfer_acked() {
1801        let mut user = TestCfdpUser::default();
1802        let mut tb = SourceHandlerTestbench::new(TransmissionMode::Acknowledged, false, 512);
1803        let (_data, transfer_info) = tb.common_tiny_file_transfer(&mut user, false);
1804        tb.acknowledge_eof_pdu(&mut user, &transfer_info);
1805        tb.finish_handling(&mut user, &transfer_info);
1806        tb.common_finished_pdu_ack_check();
1807    }
1808
1809    #[test]
1810    fn test_tiny_file_transfer_not_acked_with_closure() {
1811        let mut tb = SourceHandlerTestbench::new(TransmissionMode::Unacknowledged, false, 512);
1812        let mut user = TestCfdpUser::default();
1813        let (_data, transfer_info) = tb.common_tiny_file_transfer(&mut user, true);
1814        tb.finish_handling(&mut user, &transfer_info)
1815    }
1816
1817    #[test]
1818    fn test_two_segment_file_transfer_not_acked_no_closure() {
1819        let mut tb = SourceHandlerTestbench::new(TransmissionMode::Unacknowledged, false, 128);
1820        let mut user = TestCfdpUser::default();
1821        let mut file = OpenOptions::new()
1822            .write(true)
1823            .open(&tb.srcfile)
1824            .expect("opening file failed");
1825        let mut rand_data = [0u8; 140];
1826        rand::rng().fill(&mut rand_data[..]);
1827        file.write_all(&rand_data)
1828            .expect("writing file content failed");
1829        drop(file);
1830        let (_, fd_pdus) = tb.generic_file_transfer(&mut user, false, rand_data.to_vec());
1831        assert_eq!(fd_pdus, 2);
1832    }
1833
1834    #[test]
1835    fn test_two_segment_file_transfer_not_acked_with_closure() {
1836        let mut tb = SourceHandlerTestbench::new(TransmissionMode::Unacknowledged, false, 128);
1837        let mut user = TestCfdpUser::default();
1838        let mut file = OpenOptions::new()
1839            .write(true)
1840            .open(&tb.srcfile)
1841            .expect("opening file failed");
1842        let mut rand_data = [0u8; 140];
1843        rand::rng().fill(&mut rand_data[..]);
1844        file.write_all(&rand_data)
1845            .expect("writing file content failed");
1846        drop(file);
1847        let (transfer_info, fd_pdus) =
1848            tb.generic_file_transfer(&mut user, true, rand_data.to_vec());
1849        assert_eq!(fd_pdus, 2);
1850        tb.finish_handling(&mut user, &transfer_info)
1851    }
1852
1853    #[test]
1854    fn test_two_segment_file_transfer_acked() {
1855        let mut user = TestCfdpUser::default();
1856        let mut tb = SourceHandlerTestbench::new(TransmissionMode::Acknowledged, false, 128);
1857        let mut file = OpenOptions::new()
1858            .write(true)
1859            .open(&tb.srcfile)
1860            .expect("opening file failed");
1861        let mut rand_data = [0u8; 140];
1862        rand::rng().fill(&mut rand_data[..]);
1863        file.write_all(&rand_data)
1864            .expect("writing file content failed");
1865        drop(file);
1866        let (transfer_info, fd_pdus) =
1867            tb.generic_file_transfer(&mut user, true, rand_data.to_vec());
1868        assert_eq!(fd_pdus, 2);
1869        tb.acknowledge_eof_pdu(&mut user, &transfer_info);
1870        tb.finish_handling(&mut user, &transfer_info);
1871        tb.common_finished_pdu_ack_check();
1872    }
1873
1874    #[test]
1875    fn test_empty_file_transfer_not_acked_with_closure() {
1876        let mut tb = SourceHandlerTestbench::new(TransmissionMode::Unacknowledged, false, 512);
1877        let file_size = 0;
1878        let put_request = PutRequestOwned::new_regular_request(
1879            REMOTE_ID.into(),
1880            &tb.srcfile,
1881            &tb.destfile,
1882            Some(TransmissionMode::Unacknowledged),
1883            Some(true),
1884        )
1885        .expect("creating put request failed");
1886        let mut user = tb.create_user(0, file_size);
1887        let transaction_info =
1888            tb.common_file_transfer_init_with_metadata_check(&mut user, put_request, file_size);
1889        tb.common_eof_pdu_check(
1890            &mut user,
1891            transaction_info.closure_requested,
1892            EofParams::new_success(file_size, CRC_32.digest().finalize()),
1893            1,
1894        );
1895        tb.finish_handling(&mut user, &transaction_info);
1896        user.verify_finished_indication_retained(
1897            DeliveryCode::Complete,
1898            ConditionCode::NoError,
1899            transaction_info.id,
1900        );
1901    }
1902
1903    #[test]
1904    fn test_put_request_no_remote_cfg() {
1905        let mut tb = SourceHandlerTestbench::new(TransmissionMode::Unacknowledged, false, 512);
1906
1907        let (srcfile, destfile) = init_full_filepaths_textfile();
1908        let srcfile_str = String::from(srcfile.to_str().unwrap());
1909        let destfile_str = String::from(destfile.to_str().unwrap());
1910        let put_request = PutRequestOwned::new_regular_request(
1911            INVALID_ID.into(),
1912            &srcfile_str,
1913            &destfile_str,
1914            Some(TransmissionMode::Unacknowledged),
1915            Some(true),
1916        )
1917        .expect("creating put request failed");
1918        let error = tb.handler.put_request(&put_request);
1919        assert!(error.is_err());
1920        let error = error.unwrap_err();
1921        if let PutRequestError::NoRemoteCfgFound(id) = error {
1922            assert_eq!(id, INVALID_ID.into());
1923        } else {
1924            panic!("unexpected error type: {:?}", error);
1925        }
1926    }
1927
1928    #[test]
1929    fn test_put_request_file_does_not_exist() {
1930        let mut tb = SourceHandlerTestbench::new(TransmissionMode::Unacknowledged, false, 512);
1931
1932        let file_which_does_not_exist = "/tmp/this_file_does_not_exist.txt";
1933        let destfile = "/tmp/tmp.txt";
1934        let put_request = PutRequestOwned::new_regular_request(
1935            REMOTE_ID.into(),
1936            file_which_does_not_exist,
1937            destfile,
1938            Some(TransmissionMode::Unacknowledged),
1939            Some(true),
1940        )
1941        .expect("creating put request failed");
1942        let error = tb.put_request(&put_request);
1943        assert!(error.is_err());
1944        let error = error.unwrap_err();
1945        if !matches!(error, PutRequestError::FileDoesNotExist) {
1946            panic!("unexpected error type: {:?}", error);
1947        }
1948    }
1949
1950    #[test]
1951    fn test_finished_pdu_check_timeout() {
1952        let mut tb = SourceHandlerTestbench::new(TransmissionMode::Unacknowledged, false, 512);
1953        let file_size = 0;
1954        let put_request = PutRequestOwned::new_regular_request(
1955            REMOTE_ID.into(),
1956            &tb.srcfile,
1957            &tb.destfile,
1958            Some(TransmissionMode::Unacknowledged),
1959            Some(true),
1960        )
1961        .expect("creating put request failed");
1962        let mut user = tb.create_user(0, file_size);
1963        let transaction_info =
1964            tb.common_file_transfer_init_with_metadata_check(&mut user, put_request, file_size);
1965        let expected_id = tb.handler.transaction_id().unwrap();
1966        tb.common_eof_pdu_check(
1967            &mut user,
1968            transaction_info.closure_requested,
1969            EofParams::new_success(file_size, CRC_32.digest().finalize()),
1970            1,
1971        );
1972        assert!(tb.pdu_queue_empty());
1973
1974        // Enforce a check limit error by expiring the check limit timer -> leads to a notice of
1975        // cancellation -> leads to an EOF PDU with the appropriate error code.
1976        tb.expiry_control.set_check_limit_expired();
1977
1978        assert_eq!(tb.handler.state_machine_no_packet(&mut user).unwrap(), 1);
1979        assert!(!tb.pdu_queue_empty());
1980        let next_pdu = tb.get_next_sent_pdu().unwrap();
1981        let eof_pdu = EofPdu::from_bytes(&next_pdu.raw_pdu).expect("invalid EOF PDU format");
1982        tb.common_pdu_check_for_file_transfer(eof_pdu.pdu_header(), CrcFlag::NoCrc);
1983        assert_eq!(eof_pdu.condition_code(), ConditionCode::CheckLimitReached);
1984        assert_eq!(eof_pdu.file_size(), 0);
1985        assert_eq!(eof_pdu.file_checksum(), 0);
1986
1987        // Cancellation fault should have been triggered.
1988        let fault_handler = tb.test_fault_handler_mut();
1989        let fh_ref_mut = fault_handler.get_mut();
1990        assert!(!fh_ref_mut.cancellation_queue_empty());
1991        assert_eq!(fh_ref_mut.notice_of_cancellation_queue.len(), 1);
1992        let FaultInfo {
1993            transaction_id,
1994            condition_code,
1995            progress,
1996        } = fh_ref_mut.notice_of_cancellation_queue.pop_back().unwrap();
1997        assert_eq!(transaction_id, expected_id);
1998        assert_eq!(condition_code, ConditionCode::CheckLimitReached);
1999        assert_eq!(progress, 0);
2000        fh_ref_mut.all_queues_empty();
2001    }
2002
2003    #[test]
2004    fn test_cancelled_transfer_empty_file() {
2005        let mut tb = SourceHandlerTestbench::new(TransmissionMode::Unacknowledged, false, 512);
2006        let filesize = 0;
2007        let put_request = PutRequestOwned::new_regular_request(
2008            REMOTE_ID.into(),
2009            &tb.srcfile,
2010            &tb.destfile,
2011            Some(TransmissionMode::Unacknowledged),
2012            Some(false),
2013        )
2014        .expect("creating put request failed");
2015        let mut user = tb.create_user(0, filesize);
2016        assert_eq!(user.transaction_indication_call_count, 0);
2017        assert_eq!(user.eof_sent_call_count, 0);
2018
2019        tb.put_request(&put_request)
2020            .expect("put_request call failed");
2021        assert_eq!(tb.handler.state(), State::Busy);
2022        assert_eq!(tb.handler.step(), TransactionStep::Idle);
2023        assert!(tb.get_next_sent_pdu().is_none());
2024        let id = tb.handler.transaction_id().unwrap();
2025        tb.handler
2026            .cancel_request(&mut user, &id)
2027            .expect("transaction cancellation failed");
2028        assert_eq!(tb.handler.state(), State::Idle);
2029        assert_eq!(tb.handler.step(), TransactionStep::Idle);
2030        // EOF (Cancel) PDU will be generated
2031        let eof_pdu = tb
2032            .get_next_sent_pdu()
2033            .expect("no EOF PDU generated like expected");
2034        assert_eq!(
2035            eof_pdu.file_directive_type.unwrap(),
2036            FileDirectiveType::EofPdu
2037        );
2038        let eof_pdu = EofPdu::from_bytes(&eof_pdu.raw_pdu).unwrap();
2039        assert_eq!(
2040            eof_pdu.condition_code(),
2041            ConditionCode::CancelRequestReceived
2042        );
2043        assert_eq!(eof_pdu.file_checksum(), 0);
2044        assert_eq!(eof_pdu.file_size(), 0);
2045        tb.common_pdu_check_for_file_transfer(eof_pdu.pdu_header(), CrcFlag::NoCrc);
2046    }
2047
2048    #[test]
2049    fn test_cancelled_transfer_mid_transfer() {
2050        let mut tb = SourceHandlerTestbench::new(TransmissionMode::Unacknowledged, false, 128);
2051        let mut file = OpenOptions::new()
2052            .write(true)
2053            .open(&tb.srcfile)
2054            .expect("opening file failed");
2055        let mut rand_data = [0u8; 140];
2056        rand::rng().fill(&mut rand_data[..]);
2057        file.write_all(&rand_data)
2058            .expect("writing file content failed");
2059        drop(file);
2060        let put_request = PutRequestOwned::new_regular_request(
2061            REMOTE_ID.into(),
2062            &tb.srcfile,
2063            &tb.destfile,
2064            Some(TransmissionMode::Unacknowledged),
2065            Some(false),
2066        )
2067        .expect("creating put request failed");
2068        let file_size = rand_data.len() as u64;
2069        let mut user = tb.create_user(0, file_size);
2070        let transaction_info =
2071            tb.common_file_transfer_init_with_metadata_check(&mut user, put_request, file_size);
2072        let mut chunks = rand_data.chunks(
2073            calculate_max_file_seg_len_for_max_packet_len_and_pdu_header(
2074                &transaction_info.pdu_header,
2075                tb.max_packet_len,
2076                None,
2077            ),
2078        );
2079        let mut digest = CRC_32.digest();
2080        let first_chunk = chunks.next().expect("no chunk found");
2081        digest.update(first_chunk);
2082        let checksum = digest.finalize();
2083        let next_packet = tb.get_next_sent_pdu().unwrap();
2084        assert_eq!(next_packet.pdu_type, PduType::FileData);
2085        let fd_pdu = FileDataPdu::from_bytes(&next_packet.raw_pdu).unwrap();
2086        assert_eq!(fd_pdu.file_data(), &rand_data[0..first_chunk.len()]);
2087        let expected_id = tb.handler.transaction_id().unwrap();
2088        assert!(
2089            tb.handler
2090                .cancel_request(&mut user, &expected_id)
2091                .expect("cancellation failed")
2092        );
2093        assert_eq!(tb.handler.state(), State::Idle);
2094        assert_eq!(tb.handler.step(), TransactionStep::Idle);
2095        let next_packet = tb.get_next_sent_pdu().unwrap();
2096        assert_eq!(next_packet.pdu_type, PduType::FileDirective);
2097        assert_eq!(
2098            next_packet.file_directive_type.unwrap(),
2099            FileDirectiveType::EofPdu
2100        );
2101        // As specified in 4.11.2.2 of the standard, the file size will be the progress of the
2102        // file copy operation so far, and the checksum is calculated for that progress.
2103        let eof_pdu = EofPdu::from_bytes(&next_packet.raw_pdu).expect("EOF PDU creation failed");
2104        assert_eq!(eof_pdu.file_size(), first_chunk.len() as u64);
2105        assert_eq!(eof_pdu.file_checksum(), checksum);
2106        assert_eq!(
2107            eof_pdu.condition_code(),
2108            ConditionCode::CancelRequestReceived
2109        );
2110    }
2111
2112    #[test]
2113    fn test_positive_ack_procedure() {
2114        let mut tb = SourceHandlerTestbench::new(TransmissionMode::Acknowledged, false, 512);
2115        let file_size = 0;
2116        let eof_params = EofParams {
2117            file_size,
2118            file_checksum: CRC_32.digest().finalize(),
2119            condition_code: ConditionCode::NoError,
2120        };
2121        let put_request = PutRequestOwned::new_regular_request(
2122            REMOTE_ID.into(),
2123            &tb.srcfile,
2124            &tb.destfile,
2125            Some(TransmissionMode::Acknowledged),
2126            Some(false),
2127        )
2128        .expect("creating put request failed");
2129        let mut user = tb.create_user(0, file_size);
2130        let transfer_info =
2131            tb.common_file_transfer_init_with_metadata_check(&mut user, put_request, file_size);
2132        tb.common_eof_pdu_check(&mut user, transfer_info.closure_requested, eof_params, 1);
2133
2134        assert!(tb.pdu_queue_empty());
2135
2136        // Enforce a postive ack timer expiry -> leads to a re-send of the EOF PDU.
2137        tb.expiry_control.set_positive_ack_expired();
2138        let sent_packets = tb
2139            .handler
2140            .state_machine_no_packet(&mut user)
2141            .expect("source handler FSM failure");
2142        assert_eq!(sent_packets, 1);
2143        tb.common_eof_pdu_check(&mut user, transfer_info.closure_requested, eof_params, 2);
2144
2145        tb.acknowledge_eof_pdu(&mut user, &transfer_info);
2146        tb.finish_handling(&mut user, &transfer_info);
2147        tb.common_finished_pdu_ack_check();
2148        user.verify_finished_indication_retained(
2149            DeliveryCode::Complete,
2150            ConditionCode::NoError,
2151            transfer_info.id,
2152        );
2153    }
2154
2155    #[test]
2156    fn test_positive_ack_procedure_ack_limit_reached() {
2157        let mut tb = SourceHandlerTestbench::new(TransmissionMode::Acknowledged, false, 512);
2158        let file_size = 0;
2159        let mut eof_params = EofParams::new_success(file_size, CRC_32.digest().finalize());
2160        let put_request = PutRequestOwned::new_regular_request(
2161            REMOTE_ID.into(),
2162            &tb.srcfile,
2163            &tb.destfile,
2164            Some(TransmissionMode::Acknowledged),
2165            Some(false),
2166        )
2167        .expect("creating put request failed");
2168        let mut user = tb.create_user(0, file_size);
2169        let transfer_info =
2170            tb.common_file_transfer_init_with_metadata_check(&mut user, put_request, file_size);
2171        tb.common_eof_pdu_check(&mut user, transfer_info.closure_requested, eof_params, 1);
2172
2173        assert!(tb.pdu_queue_empty());
2174
2175        // Enforce a postive ack timer expiry -> leads to a re-send of the EOF PDU.
2176        tb.expiry_control.set_positive_ack_expired();
2177        let sent_packets = tb
2178            .handler
2179            .state_machine_no_packet(&mut user)
2180            .expect("source handler FSM failure");
2181        assert_eq!(sent_packets, 1);
2182        tb.common_eof_pdu_check(&mut user, transfer_info.closure_requested, eof_params, 2);
2183        // Enforce a postive ack timer expiry -> leads to a re-send of the EOF PDU.
2184        tb.expiry_control.set_positive_ack_expired();
2185        let sent_packets = tb
2186            .handler
2187            .state_machine_no_packet(&mut user)
2188            .expect("source handler FSM failure");
2189        assert_eq!(sent_packets, 1);
2190        eof_params.condition_code = ConditionCode::PositiveAckLimitReached;
2191        tb.common_eof_pdu_check(&mut user, transfer_info.closure_requested, eof_params, 3);
2192        // This boilerplate handling is still expected. In a real-life use-case I would expect
2193        // this to fail as well, leading to a transaction abandonment. This is tested separately.
2194        tb.acknowledge_eof_pdu(&mut user, &transfer_info);
2195        tb.finish_handling(&mut user, &transfer_info);
2196        tb.common_finished_pdu_ack_check();
2197        user.verify_finished_indication_retained(
2198            DeliveryCode::Complete,
2199            ConditionCode::NoError,
2200            transfer_info.id,
2201        );
2202    }
2203
2204    #[test]
2205    fn test_positive_ack_procedure_ack_limit_reached_abandonment() {
2206        let mut tb = SourceHandlerTestbench::new(TransmissionMode::Acknowledged, false, 512);
2207        let file_size = 0;
2208        let mut eof_params = EofParams::new_success(file_size, CRC_32.digest().finalize());
2209        let put_request = PutRequestOwned::new_regular_request(
2210            REMOTE_ID.into(),
2211            &tb.srcfile,
2212            &tb.destfile,
2213            Some(TransmissionMode::Acknowledged),
2214            Some(false),
2215        )
2216        .expect("creating put request failed");
2217        let mut user = tb.create_user(0, file_size);
2218        let transfer_info =
2219            tb.common_file_transfer_init_with_metadata_check(&mut user, put_request, file_size);
2220        tb.common_eof_pdu_check(&mut user, transfer_info.closure_requested, eof_params, 1);
2221
2222        assert!(tb.pdu_queue_empty());
2223
2224        // Enforce a postive ack timer expiry -> leads to a re-send of the EOF PDU.
2225        tb.expiry_control.set_positive_ack_expired();
2226        let sent_packets = tb
2227            .handler
2228            .state_machine_no_packet(&mut user)
2229            .expect("source handler FSM failure");
2230        assert_eq!(sent_packets, 1);
2231        tb.common_eof_pdu_check(&mut user, transfer_info.closure_requested, eof_params, 2);
2232        // Enforce a postive ack timer expiry -> positive ACK limit reached -> Cancel EOF sent.
2233        tb.expiry_control.set_positive_ack_expired();
2234        let sent_packets = tb
2235            .handler
2236            .state_machine_no_packet(&mut user)
2237            .expect("source handler FSM failure");
2238        assert_eq!(sent_packets, 1);
2239        eof_params.condition_code = ConditionCode::PositiveAckLimitReached;
2240        tb.common_eof_pdu_check(&mut user, transfer_info.closure_requested, eof_params, 3);
2241        // Cancellation fault should have been triggered.
2242        let fault_handler = tb.test_fault_handler_mut();
2243        let fh_ref_mut = fault_handler.get_mut();
2244        assert!(!fh_ref_mut.cancellation_queue_empty());
2245        assert_eq!(fh_ref_mut.notice_of_cancellation_queue.len(), 1);
2246        let FaultInfo {
2247            transaction_id,
2248            condition_code,
2249            progress,
2250        } = fh_ref_mut.notice_of_cancellation_queue.pop_back().unwrap();
2251        assert_eq!(transaction_id, transfer_info.id);
2252        assert_eq!(condition_code, ConditionCode::PositiveAckLimitReached);
2253        assert_eq!(progress, file_size);
2254        fh_ref_mut.all_queues_empty();
2255
2256        // Enforce a postive ack timer expiry -> leads to a re-send of the EOF Cancel PDU.
2257        tb.expiry_control.set_positive_ack_expired();
2258        let sent_packets = tb
2259            .handler
2260            .state_machine_no_packet(&mut user)
2261            .expect("source handler FSM failure");
2262        assert_eq!(sent_packets, 1);
2263        tb.common_eof_pdu_check(&mut user, transfer_info.closure_requested, eof_params, 4);
2264
2265        // Enforce a postive ack timer expiry -> positive ACK limit reached -> Transaction
2266        // abandoned
2267        tb.expiry_control.set_positive_ack_expired();
2268        let sent_packets = tb
2269            .handler
2270            .state_machine_no_packet(&mut user)
2271            .expect("source handler FSM failure");
2272        assert_eq!(sent_packets, 0);
2273        // Abandonment fault should have been triggered.
2274        let fault_handler = tb.test_fault_handler_mut();
2275        let fh_ref_mut = fault_handler.get_mut();
2276        assert!(!fh_ref_mut.abandoned_queue_empty());
2277        assert_eq!(fh_ref_mut.abandoned_queue.len(), 1);
2278        let FaultInfo {
2279            transaction_id,
2280            condition_code,
2281            progress,
2282        } = fh_ref_mut.abandoned_queue.pop_back().unwrap();
2283        assert_eq!(transaction_id, transfer_info.id);
2284        assert_eq!(condition_code, ConditionCode::PositiveAckLimitReached);
2285        assert_eq!(progress, file_size);
2286        fh_ref_mut.all_queues_empty();
2287    }
2288
2289    #[test]
2290    fn test_nak_for_whole_file() {
2291        let mut tb = SourceHandlerTestbench::new(TransmissionMode::Acknowledged, false, 512);
2292        let mut user = TestCfdpUser::default();
2293        let (data, transfer_info) = tb.common_tiny_file_transfer(&mut user, true);
2294        let seg_reqs = &[(0, transfer_info.file_size as u32)];
2295        tb.nak_for_file_segments(&mut user, &transfer_info, seg_reqs);
2296        tb.check_next_file_pdu(0, data.as_bytes());
2297        tb.all_fault_queues_empty();
2298
2299        tb.acknowledge_eof_pdu(&mut user, &transfer_info);
2300        tb.finish_handling(&mut user, &transfer_info);
2301        tb.common_finished_pdu_ack_check();
2302    }
2303
2304    #[test]
2305    fn test_nak_for_file_segment() {
2306        let mut user = TestCfdpUser::default();
2307        let mut tb = SourceHandlerTestbench::new(TransmissionMode::Acknowledged, false, 128);
2308        let mut file = OpenOptions::new()
2309            .write(true)
2310            .open(&tb.srcfile)
2311            .expect("opening file failed");
2312        let mut rand_data = [0u8; 140];
2313        rand::rng().fill(&mut rand_data[..]);
2314        file.write_all(&rand_data)
2315            .expect("writing file content failed");
2316        drop(file);
2317        let (transfer_info, fd_pdus) =
2318            tb.generic_file_transfer(&mut user, false, rand_data.to_vec());
2319        assert_eq!(fd_pdus, 2);
2320        tb.nak_for_file_segments(&mut user, &transfer_info, &[(0, 90)]);
2321        tb.check_next_file_pdu(0, &rand_data[0..90]);
2322        tb.all_fault_queues_empty();
2323
2324        tb.acknowledge_eof_pdu(&mut user, &transfer_info);
2325        tb.finish_handling(&mut user, &transfer_info);
2326        tb.common_finished_pdu_ack_check();
2327    }
2328
2329    #[test]
2330    fn test_nak_for_metadata() {
2331        let mut tb = SourceHandlerTestbench::new(TransmissionMode::Acknowledged, false, 512);
2332        let file_size = 0;
2333        let put_request = PutRequestOwned::new_regular_request(
2334            REMOTE_ID.into(),
2335            &tb.srcfile,
2336            &tb.destfile,
2337            Some(TransmissionMode::Acknowledged),
2338            Some(false),
2339        )
2340        .expect("creating put request failed");
2341        let mut user = tb.create_user(0, file_size);
2342        let transfer_info =
2343            tb.common_file_transfer_init_with_metadata_check(&mut user, put_request, file_size);
2344        tb.common_eof_pdu_check(
2345            &mut user,
2346            transfer_info.closure_requested,
2347            EofParams::new_success(file_size, CRC_32.digest().finalize()),
2348            1,
2349        );
2350
2351        // NAK to cause re-transmission of metadata PDU.
2352        let nak_pdu = NakPduCreator::new_normal_file_size(
2353            transfer_info.pdu_header,
2354            0,
2355            transfer_info.file_size as u32,
2356            &[(0, 0)],
2357        )
2358        .unwrap();
2359        let nak_pdu_vec = nak_pdu.to_vec().unwrap();
2360        let packet_info = PduRawWithInfo::new(&nak_pdu_vec).unwrap();
2361        let sent_packets = tb
2362            .handler
2363            .state_machine(&mut user, Some(&packet_info))
2364            .unwrap();
2365        assert_eq!(sent_packets, 1);
2366        let next_pdu = tb.get_next_sent_pdu().unwrap();
2367        // Check the metadata PDU.
2368        tb.metadata_check(&next_pdu, file_size);
2369        tb.all_fault_queues_empty();
2370
2371        tb.acknowledge_eof_pdu(&mut user, &transfer_info);
2372        tb.finish_handling(&mut user, &transfer_info);
2373        tb.common_finished_pdu_ack_check();
2374        user.verify_finished_indication_retained(
2375            DeliveryCode::Complete,
2376            ConditionCode::NoError,
2377            transfer_info.id,
2378        );
2379    }
2380}