cfdp/
source.rs

1//! # CFDP Source Entity Module
2//!
3//! The [SourceHandler] is the primary component of this module which converts a
4//! [ReadablePutRequest] into all packet data units (PDUs) which need to be sent to a remote
5//! CFDP entity to perform a File Copy operation to a remote entity.
6//!
7//! The source entity allows freedom communication by using a user-provided [PduSendProvider]
8//! to send all generated PDUs. It should be noted that for regular file transfers, each
9//! [SourceHandler::state_machine] call will map to one generated file data PDU. This allows
10//! flow control for the user of the state machine.
11//!
12//! The [SourceHandler::state_machine] will generally perform the following steps after a valid
13//! put request was received through the [SourceHandler::put_request] method:
14//!
15//! 1. Generate the Metadata PDU to be sent to a remote CFDP entity. You can use the
16//!    [spacepackets::cfdp::pdu::metadata::MetadataPduReader] to inspect the generated PDU.
17//! 2. Generate all File Data PDUs to be sent to a remote CFDP entity if applicable (file not
18//!    empty). The PDU(s) can be inspected using the [spacepackets::cfdp::pdu::file_data::FileDataPdu] reader.
19//! 3. Generate an EOF PDU to be sent to a remote CFDP entity. The PDU can be inspected using
20//!    the [spacepackets::cfdp::pdu::eof::EofPdu] reader.
21//!
22//! If this is an unacknowledged transfer with no transaction closure, the file transfer will be
23//! done after these steps. In any other case:
24//!
25//! ### Unacknowledged transfer with requested closure
26//!
27//! 4. A Finished PDU will be awaited, for example one generated using
28//!    [spacepackets::cfdp::pdu::finished::FinishedPduCreator].
29//!
30//! ### Acknowledged transfer (*not implemented yet*)
31//!
32//! 4. A EOF ACK packet will be awaited, for example one generated using
33//!    [spacepackets::cfdp::pdu::ack::AckPdu].
34//! 5. A Finished PDU will be awaited, for example one generated using
35//!    [spacepackets::cfdp::pdu::finished::FinishedPduCreator].
36//! 6. A finished PDU ACK packet will be generated to be sent to the remote CFDP entity.
37//!    The [spacepackets::cfdp::pdu::finished::FinishedPduReader] can be used to inspect the
38//!    generated PDU.
39use core::{cell::RefCell, ops::ControlFlow, str::Utf8Error};
40
41use spacepackets::{
42    cfdp::{
43        lv::Lv,
44        pdu::{
45            eof::EofPdu,
46            file_data::{
47                calculate_max_file_seg_len_for_max_packet_len_and_pdu_header,
48                FileDataPduCreatorWithReservedDatafield,
49            },
50            finished::{DeliveryCode, FileStatus, FinishedPduReader},
51            metadata::{MetadataGenericParams, MetadataPduCreator},
52            CfdpPdu, CommonPduConfig, FileDirectiveType, PduError, PduHeader, WritablePduPacket,
53        },
54        ConditionCode, Direction, LargeFileFlag, PduType, SegmentMetadataFlag, SegmentationControl,
55        TransmissionMode,
56    },
57    util::{UnsignedByteField, UnsignedEnum},
58    ByteConversionError,
59};
60
61use spacepackets::seq_count::SequenceCountProvider;
62
63use crate::{
64    time::CountdownProvider, DummyPduProvider, EntityType, GenericSendError, PduProvider,
65    TimerCreatorProvider,
66};
67
68use super::{
69    filestore::{FilestoreError, VirtualFilestore},
70    request::{ReadablePutRequest, StaticPutRequestCacher},
71    user::{CfdpUser, TransactionFinishedParams},
72    LocalEntityConfig, PacketTarget, PduSendProvider, RemoteEntityConfig,
73    RemoteEntityConfigProvider, State, TransactionId, UserFaultHookProvider,
74};
75
76/// This enumeration models the different transaction steps of the source entity handler.
77#[derive(Debug, Copy, Clone, PartialEq, Eq)]
78#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
79#[cfg_attr(feature = "defmt", derive(defmt::Format))]
80pub enum TransactionStep {
81    Idle = 0,
82    TransactionStart = 1,
83    SendingMetadata = 3,
84    SendingFileData = 4,
85    /// Re-transmitting missing packets in acknowledged mode
86    Retransmitting = 5,
87    SendingEof = 6,
88    WaitingForEofAck = 7,
89    WaitingForFinished = 8,
90    // SendingAckOfFinished = 9,
91    NoticeOfCompletion = 10,
92}
93
94#[derive(Default)]
95pub struct FileParams {
96    pub progress: u64,
97    pub segment_len: u64,
98    pub crc32: u32,
99    pub metadata_only: bool,
100    pub file_size: u64,
101    pub empty_file: bool,
102}
103
104pub struct StateHelper {
105    state: super::State,
106    step: TransactionStep,
107    num_packets_ready: u32,
108}
109
110#[derive(Debug)]
111pub struct FinishedParams {
112    condition_code: ConditionCode,
113    delivery_code: DeliveryCode,
114    file_status: FileStatus,
115}
116
117#[derive(Debug, derive_new::new)]
118pub struct TransferState {
119    transaction_id: TransactionId,
120    remote_cfg: RemoteEntityConfig,
121    transmission_mode: super::TransmissionMode,
122    closure_requested: bool,
123    cond_code_eof: Option<ConditionCode>,
124    finished_params: Option<FinishedParams>,
125}
126
127impl Default for StateHelper {
128    fn default() -> Self {
129        Self {
130            state: super::State::Idle,
131            step: TransactionStep::Idle,
132            num_packets_ready: 0,
133        }
134    }
135}
136
137#[derive(Debug, thiserror::Error)]
138pub enum SourceError {
139    #[error("can not process packet type {pdu_type:?} with directive type {directive_type:?}")]
140    CantProcessPacketType {
141        pdu_type: PduType,
142        directive_type: Option<FileDirectiveType>,
143    },
144    #[error("unexpected PDU")]
145    UnexpectedPdu {
146        pdu_type: PduType,
147        directive_type: Option<FileDirectiveType>,
148    },
149    #[error("source handler is already busy with put request")]
150    PutRequestAlreadyActive,
151    #[error("error caching put request")]
152    PutRequestCaching(ByteConversionError),
153    #[error("filestore error: {0}")]
154    FilestoreError(#[from] FilestoreError),
155    #[error("source file does not have valid UTF8 format: {0}")]
156    SourceFileNotValidUtf8(Utf8Error),
157    #[error("destination file does not have valid UTF8 format: {0}")]
158    DestFileNotValidUtf8(Utf8Error),
159    #[error("error related to PDU creation: {0}")]
160    Pdu(#[from] PduError),
161    #[error("cfdp feature not implemented")]
162    NotImplemented,
163    #[error("issue sending PDU: {0}")]
164    SendError(#[from] GenericSendError),
165}
166
167#[derive(Debug, thiserror::Error)]
168pub enum PutRequestError {
169    #[error("error caching put request: {0}")]
170    Storage(#[from] ByteConversionError),
171    #[error("already busy with put request")]
172    AlreadyBusy,
173    #[error("no remote entity configuration found for {0:?}")]
174    NoRemoteCfgFound(UnsignedByteField),
175    #[error("source file does not have valid UTF8 format: {0}")]
176    SourceFileNotValidUtf8(#[from] Utf8Error),
177    #[error("source file does not exist")]
178    FileDoesNotExist,
179    #[error("filestore error: {0}")]
180    FilestoreError(#[from] FilestoreError),
181}
182
183/// This is the primary CFDP source handler. It models the CFDP source entity, which is
184/// primarily responsible for handling put requests to send files to another CFDP destination
185/// entity.
186///
187/// As such, it contains a state machine to perform all operations necessary to perform a
188/// source-to-destination file transfer. This class uses the user provides [PduSendProvider] to
189/// send the CFDP PDU packets generated by the state machine.
190///
191/// The following core functions are the primary interface:
192///
193/// 1. [Self::put_request] can be used to start transactions, most notably to start
194///    and perform a Copy File procedure to send a file or to send a Proxy Put Request to request
195///    a file.
196/// 2. [Self::state_machine] is the primary interface to execute an
197///    active file transfer. It generates the necessary CFDP PDUs for this process.
198///    This method is also used to insert received packets with the appropriate destination ID
199///    and target handler type into the state machine.
200///
201/// A put request will only be accepted if the handler is in the idle state.
202///
203/// The handler requires the [alloc] feature but will allocated all required memory on construction
204/// time. This means that the handler is still suitable for embedded systems where run-time
205/// allocation is prohibited. Furthermore, it uses the [VirtualFilestore] abstraction to allow
206/// usage on systems without a [std] filesystem.
207/// This handler does not support concurrency out of the box. Instead, if concurrent handling
208/// is required, it is recommended to create a new handler and run all active handlers inside a
209/// thread pool, or move the newly created handler to a new thread.
210pub struct SourceHandler<
211    PduSender: PduSendProvider,
212    UserFaultHook: UserFaultHookProvider,
213    Vfs: VirtualFilestore,
214    RemoteCfgTable: RemoteEntityConfigProvider,
215    TimerCreator: TimerCreatorProvider<Countdown = Countdown>,
216    Countdown: CountdownProvider,
217    SeqCountProvider: SequenceCountProvider,
218> {
219    local_cfg: LocalEntityConfig<UserFaultHook>,
220    pdu_sender: PduSender,
221    pdu_and_cksum_buffer: RefCell<alloc::vec::Vec<u8>>,
222    put_request_cacher: StaticPutRequestCacher,
223    remote_cfg_table: RemoteCfgTable,
224    vfs: Vfs,
225    state_helper: StateHelper,
226    // Transfer related state information
227    tstate: Option<TransferState>,
228    // File specific transfer fields
229    fparams: FileParams,
230    // PDU configuration is cached so it can be re-used for all PDUs generated for file transfers.
231    pdu_conf: CommonPduConfig,
232    countdown: Option<Countdown>,
233    timer_creator: TimerCreator,
234    seq_count_provider: SeqCountProvider,
235}
236
237impl<
238        PduSender: PduSendProvider,
239        UserFaultHook: UserFaultHookProvider,
240        Vfs: VirtualFilestore,
241        RemoteCfgTable: RemoteEntityConfigProvider,
242        TimerCreator: TimerCreatorProvider<Countdown = Countdown>,
243        Countdown: CountdownProvider,
244        SeqCountProvider: SequenceCountProvider,
245    >
246    SourceHandler<
247        PduSender,
248        UserFaultHook,
249        Vfs,
250        RemoteCfgTable,
251        TimerCreator,
252        Countdown,
253        SeqCountProvider,
254    >
255{
256    /// Creates a new instance of a source handler.
257    ///
258    /// # Arguments
259    ///
260    /// * `cfg` - The local entity configuration for this source handler.
261    /// * `pdu_sender` - [PduSendProvider] provider used to send CFDP PDUs generated by the handler.
262    /// * `vfs` - [VirtualFilestore] implementation used by the handler, which decouples the CFDP
263    ///    implementation from the underlying filestore/filesystem. This allows to use this handler
264    ///    for embedded systems where a standard runtime might not be available.
265    /// * `put_request_cacher` - The put request cacher is used cache put requests without
266    ///    requiring run-time allocation.
267    /// * `pdu_and_cksum_buf_size` - The handler requires a buffer to generate PDUs and perform
268    ///    checksum calculations. The user can specify the size of this buffer, so this should be
269    ///    set to the maximum expected PDU size or a conservative upper bound for this size, for
270    ///    example 2048 or 4096 bytes.
271    /// * `remote_cfg_table` - The [RemoteEntityConfigProvider] used to look up remote
272    ///    entities and target specific configuration for file copy operations.
273    /// * `timer_creator` - [TimerCreatorProvider] used by the CFDP handler to generate
274    ///    timers required by various tasks. This allows to use this handler for embedded systems
275    ///    where the standard time APIs might not be available.
276    /// * `seq_count_provider` - The [SequenceCountProvider] used to generate the [TransactionId]
277    ///    which contains an incrementing counter.
278    #[allow(clippy::too_many_arguments)]
279    pub fn new(
280        cfg: LocalEntityConfig<UserFaultHook>,
281        pdu_sender: PduSender,
282        vfs: Vfs,
283        put_request_cacher: StaticPutRequestCacher,
284        pdu_and_cksum_buf_size: usize,
285        remote_cfg_table: RemoteCfgTable,
286        timer_creator: TimerCreator,
287        seq_count_provider: SeqCountProvider,
288    ) -> Self {
289        Self {
290            local_cfg: cfg,
291            remote_cfg_table,
292            pdu_sender,
293            pdu_and_cksum_buffer: RefCell::new(alloc::vec![0; pdu_and_cksum_buf_size]),
294            vfs,
295            put_request_cacher,
296            state_helper: Default::default(),
297            tstate: Default::default(),
298            fparams: Default::default(),
299            pdu_conf: Default::default(),
300            countdown: None,
301            timer_creator,
302            seq_count_provider,
303        }
304    }
305
306    /// Calls [Self::state_machine], without inserting a packet.
307    pub fn state_machine_no_packet(
308        &mut self,
309        cfdp_user: &mut impl CfdpUser,
310    ) -> Result<u32, SourceError> {
311        self.state_machine(cfdp_user, None::<&DummyPduProvider>)
312    }
313
314    /// This is the core function to drive the source handler. It is also used to insert
315    /// packets into the source handler.
316    ///
317    /// The state machine should either be called if a packet with the appropriate destination ID
318    /// is received, or periodically in IDLE periods to perform all CFDP related tasks, for example
319    /// checking for timeouts or missed file segments.
320    ///
321    /// The function returns the number of sent PDU packets on success.
322    pub fn state_machine(
323        &mut self,
324        cfdp_user: &mut impl CfdpUser,
325        pdu: Option<&impl PduProvider>,
326    ) -> Result<u32, SourceError> {
327        if let Some(packet) = pdu {
328            self.insert_packet(cfdp_user, packet)?;
329        }
330        match self.state_helper.state {
331            super::State::Idle => {
332                // TODO: In acknowledged mode, add timer handling.
333                Ok(0)
334            }
335            super::State::Busy => self.fsm_busy(cfdp_user, pdu),
336            super::State::Suspended => {
337                // There is now way to suspend the handler currently anyway.
338                Ok(0)
339            }
340        }
341    }
342
343    fn insert_packet(
344        &mut self,
345        _cfdp_user: &mut impl CfdpUser,
346        packet_to_insert: &impl PduProvider,
347    ) -> Result<(), SourceError> {
348        if packet_to_insert.packet_target()? != PacketTarget::SourceEntity {
349            // Unwrap is okay here, a PacketInfo for a file data PDU should always have the
350            // destination as the target.
351            return Err(SourceError::CantProcessPacketType {
352                pdu_type: packet_to_insert.pdu_type(),
353                directive_type: packet_to_insert.file_directive_type(),
354            });
355        }
356        if packet_to_insert.pdu_type() == PduType::FileData {
357            // The [PacketInfo] API should ensure that file data PDUs can not be passed
358            // into a source entity, so this should never happen.
359            return Err(SourceError::UnexpectedPdu {
360                pdu_type: PduType::FileData,
361                directive_type: None,
362            });
363        }
364        // Unwrap is okay here, the [PacketInfo] API should ensure that the directive type is
365        // always a valid value.
366        match packet_to_insert
367            .file_directive_type()
368            .expect("PDU directive type unexpectedly not set")
369        {
370            FileDirectiveType::FinishedPdu => self.handle_finished_pdu(packet_to_insert)?,
371            FileDirectiveType::NakPdu => self.handle_nak_pdu(),
372            FileDirectiveType::KeepAlivePdu => self.handle_keep_alive_pdu(),
373            FileDirectiveType::AckPdu => return Err(SourceError::NotImplemented),
374            FileDirectiveType::EofPdu
375            | FileDirectiveType::PromptPdu
376            | FileDirectiveType::MetadataPdu => {
377                return Err(SourceError::CantProcessPacketType {
378                    pdu_type: packet_to_insert.pdu_type(),
379                    directive_type: packet_to_insert.file_directive_type(),
380                });
381            }
382        }
383        Ok(())
384    }
385
386    /// This function is used to pass a put request to the source handler, which is
387    /// also used to start a file copy operation. As such, this function models the Put.request
388    /// CFDP primtiive.
389
390    /// Please note that the source handler can also process one put request at a time.
391    /// The caller is responsible of creating a new source handler, one handler can only handle
392    /// one file copy request at a time.
393    pub fn put_request(
394        &mut self,
395        put_request: &impl ReadablePutRequest,
396    ) -> Result<(), PutRequestError> {
397        if self.state_helper.state != super::State::Idle {
398            return Err(PutRequestError::AlreadyBusy);
399        }
400        self.put_request_cacher.set(put_request)?;
401        let remote_cfg = self.remote_cfg_table.get(
402            self.put_request_cacher
403                .static_fields
404                .destination_id
405                .value_const(),
406        );
407        if remote_cfg.is_none() {
408            return Err(PutRequestError::NoRemoteCfgFound(
409                self.put_request_cacher.static_fields.destination_id,
410            ));
411        }
412        let remote_cfg = remote_cfg.unwrap();
413        self.state_helper.num_packets_ready = 0;
414        let transmission_mode = if self.put_request_cacher.static_fields.trans_mode.is_some() {
415            self.put_request_cacher.static_fields.trans_mode.unwrap()
416        } else {
417            remote_cfg.default_transmission_mode
418        };
419        let closure_requested = if self
420            .put_request_cacher
421            .static_fields
422            .closure_requested
423            .is_some()
424        {
425            self.put_request_cacher
426                .static_fields
427                .closure_requested
428                .unwrap()
429        } else {
430            remote_cfg.closure_requested_by_default
431        };
432        if self.put_request_cacher.has_source_file()
433            && !self.vfs.exists(self.put_request_cacher.source_file()?)?
434        {
435            return Err(PutRequestError::FileDoesNotExist);
436        }
437
438        let transaction_id = TransactionId::new(
439            self.local_cfg().id,
440            UnsignedByteField::new(
441                SeqCountProvider::MAX_BIT_WIDTH / 8,
442                self.seq_count_provider.get_and_increment().into(),
443            ),
444        );
445        // Both the source entity and destination entity ID field must have the same size.
446        // We use the larger of either the Put Request destination ID or the local entity ID
447        // as the size for the new entity IDs.
448        let larger_entity_width = core::cmp::max(
449            self.local_cfg.id.size(),
450            self.put_request_cacher.static_fields.destination_id.size(),
451        );
452        let create_id = |cached_id: &UnsignedByteField| {
453            if larger_entity_width != cached_id.size() {
454                UnsignedByteField::new(larger_entity_width, cached_id.value_const())
455            } else {
456                *cached_id
457            }
458        };
459
460        // Set PDU configuration fields which are important for generating PDUs.
461        self.pdu_conf
462            .set_source_and_dest_id(
463                create_id(&self.local_cfg.id),
464                create_id(&self.put_request_cacher.static_fields.destination_id),
465            )
466            .unwrap();
467        // Set up other PDU configuration fields.
468        self.pdu_conf.direction = Direction::TowardsReceiver;
469        self.pdu_conf.crc_flag = remote_cfg.crc_on_transmission_by_default.into();
470        self.pdu_conf.transaction_seq_num = *transaction_id.seq_num();
471        self.pdu_conf.trans_mode = transmission_mode;
472        self.fparams.segment_len = self.calculate_max_file_seg_len(remote_cfg);
473
474        // Set up the transfer context structure.
475        self.tstate = Some(TransferState {
476            transaction_id,
477            remote_cfg: *remote_cfg,
478            transmission_mode,
479            closure_requested,
480            cond_code_eof: None,
481            finished_params: None,
482        });
483        self.state_helper.state = super::State::Busy;
484        Ok(())
485    }
486
487    /// This functions models the Cancel.request CFDP primitive and is the recommended way to
488    /// cancel a transaction.
489    ///
490    /// This method will cause a Notice of Cancellation at this entity if a transaction is active
491    /// and the passed transaction ID matches the currently active transaction ID. Please note
492    /// that the state machine might still be active because a cancelled transfer might still
493    /// require some packets to be sent to the remote receiver entity.
494    ///
495    /// If not unexpected errors occur, this method returns [true] if the transfer was cancelled
496    /// propery and [false] if there is no transaction active or the passed transaction ID and the
497    /// active ID do not match.
498    pub fn cancel_request(
499        &mut self,
500        user: &mut impl CfdpUser,
501        transaction_id: &TransactionId,
502    ) -> Result<bool, SourceError> {
503        if self.state_helper.state == super::State::Idle {
504            return Ok(false);
505        }
506        if let Some(active_id) = self.transaction_id() {
507            if active_id == *transaction_id {
508                self.notice_of_cancellation(user, ConditionCode::CancelRequestReceived)?;
509                return Ok(true);
510            }
511        }
512        Ok(false)
513    }
514
515    fn fsm_busy(
516        &mut self,
517        user: &mut impl CfdpUser,
518        pdu: Option<&impl PduProvider>,
519    ) -> Result<u32, SourceError> {
520        let mut sent_packets = 0;
521        if self.state_helper.step == TransactionStep::Idle {
522            self.state_helper.step = TransactionStep::TransactionStart;
523        }
524        if self.state_helper.step == TransactionStep::TransactionStart {
525            self.handle_transaction_start(user)?;
526            self.state_helper.step = TransactionStep::SendingMetadata;
527        }
528        if self.state_helper.step == TransactionStep::SendingMetadata {
529            self.prepare_and_send_metadata_pdu()?;
530            self.state_helper.step = TransactionStep::SendingFileData;
531            sent_packets += 1;
532        }
533        if self.state_helper.step == TransactionStep::SendingFileData {
534            if let ControlFlow::Break(packets) = self.file_data_fsm()? {
535                sent_packets += packets;
536                // Exit for each file data PDU to allow flow control.
537                return Ok(sent_packets);
538            }
539        }
540        if self.state_helper.step == TransactionStep::SendingEof {
541            self.eof_fsm(user)?;
542            sent_packets += 1;
543        }
544        if self.state_helper.step == TransactionStep::WaitingForFinished {
545            self.handle_wait_for_finished_pdu(user, pdu)?;
546        }
547        if self.state_helper.step == TransactionStep::NoticeOfCompletion {
548            self.notice_of_completion(user);
549            self.reset();
550        }
551        Ok(sent_packets)
552    }
553
554    fn handle_wait_for_finished_pdu(
555        &mut self,
556        user: &mut impl CfdpUser,
557        packet: Option<&impl PduProvider>,
558    ) -> Result<(), SourceError> {
559        if let Some(packet) = packet {
560            if let Some(FileDirectiveType::FinishedPdu) = packet.file_directive_type() {
561                let finished_pdu = FinishedPduReader::new(packet.pdu())?;
562                self.tstate.as_mut().unwrap().finished_params = Some(FinishedParams {
563                    condition_code: finished_pdu.condition_code(),
564                    delivery_code: finished_pdu.delivery_code(),
565                    file_status: finished_pdu.file_status(),
566                });
567                if self.transmission_mode().unwrap() == TransmissionMode::Acknowledged {
568                    // TODO: Ack packet handling
569                    self.state_helper.step = TransactionStep::NoticeOfCompletion;
570                } else {
571                    self.state_helper.step = TransactionStep::NoticeOfCompletion;
572                }
573                return Ok(());
574            }
575        }
576        // If we reach this state, countdown is definitely valid instance.
577        if self.countdown.as_ref().unwrap().has_expired() {
578            self.declare_fault(user, ConditionCode::CheckLimitReached)?;
579        }
580        /*
581        def _handle_wait_for_finish(self):
582            if (
583                self.transmission_mode == TransmissionMode.ACKNOWLEDGED
584                and self.__handle_retransmission()
585            ):
586                return
587            if (
588                self._inserted_pdu.pdu is None
589                or self._inserted_pdu.pdu_directive_type is None
590                or self._inserted_pdu.pdu_directive_type != DirectiveType.FINISHED_PDU
591            ):
592                if self._params.check_timer is not None:
593                    if self._params.check_timer.timed_out():
594                        self._declare_fault(ConditionCode.CHECK_LIMIT_REACHED)
595                return
596            finished_pdu = self._inserted_pdu.to_finished_pdu()
597            self._inserted_pdu.pdu = None
598            self._params.finished_params = finished_pdu.finished_params
599            if self.transmission_mode == TransmissionMode.ACKNOWLEDGED:
600                self._prepare_finished_ack_packet(finished_pdu.condition_code)
601                self.states.step = TransactionStep.SENDING_ACK_OF_FINISHED
602            else:
603                self.states.step = TransactionStep.NOTICE_OF_COMPLETION
604                */
605        Ok(())
606    }
607
608    fn eof_fsm(&mut self, user: &mut impl CfdpUser) -> Result<(), SourceError> {
609        let tstate = self.tstate.as_ref().unwrap();
610        let checksum = self.vfs.calculate_checksum(
611            self.put_request_cacher.source_file().unwrap(),
612            tstate.remote_cfg.default_crc_type,
613            self.fparams.file_size,
614            self.pdu_and_cksum_buffer.get_mut(),
615        )?;
616        self.prepare_and_send_eof_pdu(user, checksum)?;
617        let tstate = self.tstate.as_ref().unwrap();
618        if tstate.transmission_mode == TransmissionMode::Unacknowledged {
619            if tstate.closure_requested {
620                self.countdown = Some(self.timer_creator.create_countdown(
621                    crate::TimerContext::CheckLimit {
622                        local_id: self.local_cfg.id,
623                        remote_id: tstate.remote_cfg.entity_id,
624                        entity_type: EntityType::Sending,
625                    },
626                ));
627                self.state_helper.step = TransactionStep::WaitingForFinished;
628            } else {
629                self.state_helper.step = TransactionStep::NoticeOfCompletion;
630            }
631        } else {
632            // TODO: Start positive ACK procedure.
633        }
634        /*
635        if self.cfg.indication_cfg.eof_sent_indication_required:
636            assert self._params.transaction_id is not None
637            self.user.eof_sent_indication(self._params.transaction_id)
638        if self.transmission_mode == TransmissionMode.UNACKNOWLEDGED:
639            if self._params.closure_requested:
640                assert self._params.remote_cfg is not None
641                self._params.check_timer = (
642                    self.check_timer_provider.provide_check_timer(
643                        local_entity_id=self.cfg.local_entity_id,
644                        remote_entity_id=self._params.remote_cfg.entity_id,
645                        entity_type=EntityType.SENDING,
646                    )
647                )
648                self.states.step = TransactionStep.WAITING_FOR_FINISHED
649            else:
650                self.states.step = TransactionStep.NOTICE_OF_COMPLETION
651        else:
652            self._start_positive_ack_procedure()
653            */
654        Ok(())
655    }
656
657    fn handle_transaction_start(
658        &mut self,
659        cfdp_user: &mut impl CfdpUser,
660    ) -> Result<(), SourceError> {
661        let tstate = self
662            .tstate
663            .as_ref()
664            .expect("transfer state unexpectedly empty");
665        if !self.put_request_cacher.has_source_file() {
666            self.fparams.metadata_only = true;
667        } else {
668            let source_file = self
669                .put_request_cacher
670                .source_file()
671                .map_err(SourceError::SourceFileNotValidUtf8)?;
672            if !self.vfs.exists(source_file)? {
673                return Err(SourceError::FilestoreError(
674                    FilestoreError::FileDoesNotExist,
675                ));
676            }
677            // We expect the destination file path to consist of valid UTF-8 characters as well.
678            self.put_request_cacher
679                .dest_file()
680                .map_err(SourceError::DestFileNotValidUtf8)?;
681            self.fparams.file_size = self.vfs.file_size(source_file)?;
682            if self.fparams.file_size > u32::MAX as u64 {
683                self.pdu_conf.file_flag = LargeFileFlag::Large
684            } else {
685                if self.fparams.file_size == 0 {
686                    self.fparams.empty_file = true;
687                }
688                self.pdu_conf.file_flag = LargeFileFlag::Normal
689            }
690        }
691        cfdp_user.transaction_indication(&tstate.transaction_id);
692        Ok(())
693    }
694
695    fn prepare_and_send_metadata_pdu(&mut self) -> Result<(), SourceError> {
696        let tstate = self
697            .tstate
698            .as_ref()
699            .expect("transfer state unexpectedly empty");
700        let metadata_params = MetadataGenericParams::new(
701            tstate.closure_requested,
702            tstate.remote_cfg.default_crc_type,
703            self.fparams.file_size,
704        );
705        if self.fparams.metadata_only {
706            let metadata_pdu = MetadataPduCreator::new(
707                PduHeader::new_no_file_data(self.pdu_conf, 0),
708                metadata_params,
709                Lv::new_empty(),
710                Lv::new_empty(),
711                self.put_request_cacher.opts_slice(),
712            );
713            return self.pdu_send_helper(&metadata_pdu);
714        }
715        let metadata_pdu = MetadataPduCreator::new(
716            PduHeader::new_no_file_data(self.pdu_conf, 0),
717            metadata_params,
718            Lv::new_from_str(self.put_request_cacher.source_file().unwrap()).unwrap(),
719            Lv::new_from_str(self.put_request_cacher.dest_file().unwrap()).unwrap(),
720            self.put_request_cacher.opts_slice(),
721        );
722        self.pdu_send_helper(&metadata_pdu)
723    }
724
725    fn file_data_fsm(&mut self) -> Result<ControlFlow<u32>, SourceError> {
726        if self.transmission_mode().unwrap() == super::TransmissionMode::Acknowledged {
727            // TODO: Handle re-transmission
728        }
729        if !self.fparams.metadata_only
730            && self.fparams.progress < self.fparams.file_size
731            && self.send_progressing_file_data_pdu()?
732        {
733            return Ok(ControlFlow::Break(1));
734        }
735        if self.fparams.empty_file || self.fparams.progress >= self.fparams.file_size {
736            // EOF is still expected.
737            self.state_helper.step = TransactionStep::SendingEof;
738            self.tstate.as_mut().unwrap().cond_code_eof = Some(ConditionCode::NoError);
739        } else if self.fparams.metadata_only {
740            // Special case: Metadata Only, no EOF required.
741            if self.tstate.as_ref().unwrap().closure_requested {
742                self.state_helper.step = TransactionStep::WaitingForFinished;
743            } else {
744                self.state_helper.step = TransactionStep::NoticeOfCompletion;
745            }
746        }
747        Ok(ControlFlow::Continue(()))
748    }
749
750    fn notice_of_completion(&mut self, cfdp_user: &mut impl CfdpUser) {
751        let tstate = self.tstate.as_ref().unwrap();
752        if self.local_cfg.indication_cfg.transaction_finished {
753            // The first case happens for unacknowledged file copy operation with no closure.
754            let finished_params = if tstate.finished_params.is_none() {
755                TransactionFinishedParams {
756                    id: tstate.transaction_id,
757                    condition_code: ConditionCode::NoError,
758                    delivery_code: DeliveryCode::Complete,
759                    file_status: FileStatus::Unreported,
760                }
761            } else {
762                let finished_params = tstate.finished_params.as_ref().unwrap();
763                TransactionFinishedParams {
764                    id: tstate.transaction_id,
765                    condition_code: finished_params.condition_code,
766                    delivery_code: finished_params.delivery_code,
767                    file_status: finished_params.file_status,
768                }
769            };
770            cfdp_user.transaction_finished_indication(&finished_params);
771        }
772    }
773
774    fn calculate_max_file_seg_len(&self, remote_cfg: &RemoteEntityConfig) -> u64 {
775        let mut derived_max_seg_len = calculate_max_file_seg_len_for_max_packet_len_and_pdu_header(
776            &PduHeader::new_no_file_data(self.pdu_conf, 0),
777            remote_cfg.max_packet_len,
778            None,
779        );
780        if remote_cfg.max_file_segment_len.is_some() {
781            derived_max_seg_len = core::cmp::min(
782                remote_cfg.max_file_segment_len.unwrap(),
783                derived_max_seg_len,
784            );
785        }
786        derived_max_seg_len as u64
787    }
788
789    fn send_progressing_file_data_pdu(&mut self) -> Result<bool, SourceError> {
790        // Should never be called, but use defensive programming here.
791        if self.fparams.progress >= self.fparams.file_size {
792            return Ok(false);
793        }
794        let read_len = if self.fparams.file_size < self.fparams.segment_len {
795            self.fparams.file_size
796        } else if self.fparams.progress + self.fparams.segment_len > self.fparams.file_size {
797            self.fparams.file_size - self.fparams.progress
798        } else {
799            self.fparams.segment_len
800        };
801        let pdu_creator = FileDataPduCreatorWithReservedDatafield::new_no_seg_metadata(
802            PduHeader::new_for_file_data(
803                self.pdu_conf,
804                0,
805                SegmentMetadataFlag::NotPresent,
806                SegmentationControl::NoRecordBoundaryPreservation,
807            ),
808            self.fparams.progress,
809            read_len,
810        );
811        let mut unwritten_pdu =
812            pdu_creator.write_to_bytes_partially(self.pdu_and_cksum_buffer.get_mut())?;
813        self.vfs.read_data(
814            self.put_request_cacher.source_file().unwrap(),
815            self.fparams.progress,
816            read_len,
817            unwritten_pdu.file_data_field_mut(),
818        )?;
819        let written_len = unwritten_pdu.finish();
820        self.pdu_sender.send_pdu(
821            PduType::FileData,
822            None,
823            &self.pdu_and_cksum_buffer.borrow()[0..written_len],
824        )?;
825        self.fparams.progress += read_len;
826        /*
827                """Generic function to prepare a file data PDU. This function can also be used to
828                re-transmit file data PDUs of segments which were already sent."""
829                assert self._put_req is not None
830                assert self._put_req.source_file is not None
831                with open(self._put_req.source_file, "rb") as of:
832                    file_data = self.user.vfs.read_from_opened_file(of, offset, read_len)
833                    # TODO: Support for record continuation state not implemented yet. Segment metadata
834                    #       flag is therefore always set to False. Segment metadata support also omitted
835                    #       for now. Implementing those generically could be done in form of a callback,
836                    #       e.g. abstractmethod of this handler as a first way, another one being
837                    #       to expect the user to supply some helper class to split up a file
838                    fd_params = FileDataParams(
839                        file_data=file_data, offset=offset, segment_metadata=None
840                    )
841                    file_data_pdu = FileDataPdu(
842                        pdu_conf=self._params.pdu_conf, params=fd_params
843                    )
844                    self._add_packet_to_be_sent(file_data_pdu)
845        */
846        /*
847        """Prepare the next file data PDU, which also progresses the file copy operation.
848
849        :return: True if a packet was prepared, False if PDU handling is done and the next steps
850            in the Copy File procedure can be performed
851        """
852        # This function should only be called if file segments still need to be sent.
853        assert self._params.fp.progress < self._params.fp.file_size
854        if self._params.fp.file_size < self._params.fp.segment_len:
855            read_len = self._params.fp.file_size
856        else:
857            if (
858                self._params.fp.progress + self._params.fp.segment_len
859                > self._params.fp.file_size
860            ):
861                read_len = self._params.fp.file_size - self._params.fp.progress
862            else:
863                read_len = self._params.fp.segment_len
864        self._prepare_file_data_pdu(self._params.fp.progress, read_len)
865        self._params.fp.progress += read_len
866            */
867        Ok(true)
868    }
869
870    fn prepare_and_send_eof_pdu(
871        &mut self,
872        cfdp_user: &mut impl CfdpUser,
873        checksum: u32,
874    ) -> Result<(), SourceError> {
875        let tstate = self
876            .tstate
877            .as_ref()
878            .expect("transfer state unexpectedly empty");
879        let eof_pdu = EofPdu::new(
880            PduHeader::new_no_file_data(self.pdu_conf, 0),
881            tstate.cond_code_eof.unwrap_or(ConditionCode::NoError),
882            checksum,
883            self.fparams.progress,
884            None,
885        );
886        self.pdu_send_helper(&eof_pdu)?;
887        if self.local_cfg.indication_cfg.eof_sent {
888            cfdp_user.eof_sent_indication(&tstate.transaction_id);
889        }
890        Ok(())
891    }
892
893    fn pdu_send_helper(&self, pdu: &(impl WritablePduPacket + CfdpPdu)) -> Result<(), SourceError> {
894        let mut pdu_buffer_mut = self.pdu_and_cksum_buffer.borrow_mut();
895        let written_len = pdu.write_to_bytes(&mut pdu_buffer_mut)?;
896        self.pdu_sender.send_pdu(
897            pdu.pdu_type(),
898            pdu.file_directive_type(),
899            &pdu_buffer_mut[0..written_len],
900        )?;
901        Ok(())
902    }
903
904    fn handle_finished_pdu(&mut self, pdu_provider: &impl PduProvider) -> Result<(), SourceError> {
905        // Ignore this packet when we are idle.
906        if self.state_helper.state == State::Idle {
907            return Ok(());
908        }
909        if self.state_helper.step != TransactionStep::WaitingForFinished {
910            return Err(SourceError::UnexpectedPdu {
911                pdu_type: PduType::FileDirective,
912                directive_type: Some(FileDirectiveType::FinishedPdu),
913            });
914        }
915        let finished_pdu = FinishedPduReader::new(pdu_provider.pdu())?;
916        // Unwrapping should be fine here, the transfer state is valid when we are not in IDLE
917        // mode.
918        self.tstate.as_mut().unwrap().finished_params = Some(FinishedParams {
919            condition_code: finished_pdu.condition_code(),
920            delivery_code: finished_pdu.delivery_code(),
921            file_status: finished_pdu.file_status(),
922        });
923        if self.tstate.as_ref().unwrap().transmission_mode == TransmissionMode::Acknowledged {
924            // TODO: Send ACK packet here immediately and continue.
925            //self.state_helper.step = TransactionStep::SendingAckOfFinished;
926        }
927        self.state_helper.step = TransactionStep::NoticeOfCompletion;
928
929        /*
930        if self.transmission_mode == TransmissionMode.ACKNOWLEDGED:
931            self._prepare_finished_ack_packet(finished_pdu.condition_code)
932            self.states.step = TransactionStep.SENDING_ACK_OF_FINISHED
933        else:
934            self.states.step = TransactionStep.NOTICE_OF_COMPLETION
935        */
936        Ok(())
937    }
938
939    fn handle_nak_pdu(&mut self) {}
940
941    fn handle_keep_alive_pdu(&mut self) {}
942
943    pub fn transaction_id(&self) -> Option<TransactionId> {
944        self.tstate.as_ref().map(|v| v.transaction_id)
945    }
946
947    /// Returns the [TransmissionMode] for the active file operation.
948    #[inline]
949    pub fn transmission_mode(&self) -> Option<super::TransmissionMode> {
950        self.tstate.as_ref().map(|v| v.transmission_mode)
951    }
952
953    /// Get the [TransactionStep], which denotes the exact step of a pending CFDP transaction when
954    /// applicable.
955    pub fn step(&self) -> TransactionStep {
956        self.state_helper.step
957    }
958
959    pub fn state(&self) -> State {
960        self.state_helper.state
961    }
962
963    pub fn local_cfg(&self) -> &LocalEntityConfig<UserFaultHook> {
964        &self.local_cfg
965    }
966
967    fn declare_fault(
968        &mut self,
969        user: &mut impl CfdpUser,
970        cond: ConditionCode,
971    ) -> Result<(), SourceError> {
972        // Need to cache those in advance, because a notice of cancellation can reset the handler.
973        let transaction_id = self.tstate.as_ref().unwrap().transaction_id;
974        let progress = self.fparams.progress;
975        let fh = self.local_cfg.fault_handler.get_fault_handler(cond);
976        match fh {
977            spacepackets::cfdp::FaultHandlerCode::NoticeOfCancellation => {
978                if let ControlFlow::Break(_) = self.notice_of_cancellation(user, cond)? {
979                    return Ok(());
980                }
981            }
982            spacepackets::cfdp::FaultHandlerCode::NoticeOfSuspension => {
983                self.notice_of_suspension();
984            }
985            spacepackets::cfdp::FaultHandlerCode::IgnoreError => (),
986            spacepackets::cfdp::FaultHandlerCode::AbandonTransaction => self.abandon_transaction(),
987        }
988        self.local_cfg
989            .fault_handler
990            .report_fault(transaction_id, cond, progress);
991        Ok(())
992    }
993
994    fn notice_of_cancellation(
995        &mut self,
996        user: &mut impl CfdpUser,
997        condition_code: ConditionCode,
998    ) -> Result<ControlFlow<()>, SourceError> {
999        let transaction_id = self.tstate.as_ref().unwrap().transaction_id;
1000        // CFDP standard 4.11.2.2.3: Any fault declared in the course of transferring
1001        // the EOF (cancel) PDU must result in abandonment of the transaction.
1002        if let Some(cond_code_eof) = self.tstate.as_ref().unwrap().cond_code_eof {
1003            if cond_code_eof != ConditionCode::NoError {
1004                // Still call the abandonment callback to ensure the fault is logged.
1005                self.local_cfg
1006                    .fault_handler
1007                    .user_hook
1008                    .get_mut()
1009                    .abandoned_cb(transaction_id, cond_code_eof, self.fparams.progress);
1010                self.abandon_transaction();
1011                return Ok(ControlFlow::Break(()));
1012            }
1013        }
1014
1015        let tstate = self.tstate.as_mut().unwrap();
1016        tstate.cond_code_eof = Some(condition_code);
1017        // As specified in 4.11.2.2, prepare an EOF PDU to be sent to the remote entity. Supply
1018        // the checksum for the file copy progress sent so far.
1019        let checksum = self.vfs.calculate_checksum(
1020            self.put_request_cacher.source_file().unwrap(),
1021            tstate.remote_cfg.default_crc_type,
1022            self.fparams.progress,
1023            self.pdu_and_cksum_buffer.get_mut(),
1024        )?;
1025        self.prepare_and_send_eof_pdu(user, checksum)?;
1026        if self.transmission_mode().unwrap() == TransmissionMode::Unacknowledged {
1027            // We are done.
1028            self.reset();
1029        } else {
1030            self.state_helper.step = TransactionStep::WaitingForEofAck;
1031        }
1032        Ok(ControlFlow::Continue(()))
1033    }
1034
1035    fn notice_of_suspension(&mut self) {}
1036
1037    fn abandon_transaction(&mut self) {
1038        // I guess an abandoned transaction just stops whatever the handler is doing and resets
1039        // it to a clean state.. The implementation for this is quite easy.
1040        self.reset();
1041    }
1042
1043    /*
1044    def _notice_of_cancellation(self, condition_code: ConditionCode) -> bool:
1045        """Returns whether the fault declaration handler can returns prematurely."""
1046        # CFDP standard 4.11.2.2.3: Any fault declared in the course of transferring
1047        # the EOF (cancel) PDU must result in abandonment of the transaction.
1048        if (
1049            self._params.cond_code_eof is not None
1050            and self._params.cond_code_eof != ConditionCode.NO_ERROR
1051        ):
1052            assert self._params.transaction_id is not None
1053            # We still call the abandonment callback to ensure the fault is logged.
1054            self.cfg.default_fault_handlers.abandoned_cb(
1055                self._params.transaction_id,
1056                self._params.cond_code_eof,
1057                self._params.fp.progress,
1058            )
1059            self._abandon_transaction()
1060            return False
1061        self._params.cond_code_eof = condition_code
1062        # As specified in 4.11.2.2, prepare an EOF PDU to be sent to the remote entity. Supply
1063        # the checksum for the file copy progress sent so far.
1064        self._prepare_eof_pdu(self._checksum_calculation(self._params.fp.progress))
1065        self.states.step = TransactionStep.SENDING_EOF
1066        return True
1067    */
1068
1069    /// This function is public to allow completely resetting the handler, but it is explicitely
1070    /// discouraged to do this. CFDP has mechanism to detect issues and errors on itself.
1071    /// Resetting the handler might interfere with these mechanisms and lead to unexpected
1072    /// behaviour.
1073    pub fn reset(&mut self) {
1074        self.state_helper = Default::default();
1075        self.tstate = None;
1076        self.fparams = Default::default();
1077        self.countdown = None;
1078    }
1079}
1080
1081#[cfg(test)]
1082mod tests {
1083    use core::time::Duration;
1084    use std::{fs::OpenOptions, io::Write, path::PathBuf, thread, vec::Vec};
1085
1086    use alloc::string::String;
1087    use rand::Rng;
1088    use spacepackets::{
1089        cfdp::{
1090            pdu::{
1091                file_data::FileDataPdu, finished::FinishedPduCreator, metadata::MetadataPduReader,
1092            },
1093            ChecksumType, CrcFlag,
1094        },
1095        util::UnsignedByteFieldU16,
1096    };
1097    use tempfile::TempPath;
1098
1099    use super::*;
1100    use crate::{
1101        filestore::NativeFilestore,
1102        request::PutRequestOwned,
1103        source::TransactionStep,
1104        tests::{basic_remote_cfg_table, SentPdu, TestCfdpSender, TestCfdpUser, TestFaultHandler},
1105        FaultHandler, IndicationConfig, PduRawWithInfo, StdCountdown,
1106        StdRemoteEntityConfigProvider, StdTimerCreator, CRC_32,
1107    };
1108    use spacepackets::seq_count::SeqCountProviderSimple;
1109
1110    const LOCAL_ID: UnsignedByteFieldU16 = UnsignedByteFieldU16::new(1);
1111    const REMOTE_ID: UnsignedByteFieldU16 = UnsignedByteFieldU16::new(2);
1112    const INVALID_ID: UnsignedByteFieldU16 = UnsignedByteFieldU16::new(5);
1113
1114    fn init_full_filepaths_textfile() -> (TempPath, PathBuf) {
1115        (
1116            tempfile::NamedTempFile::new().unwrap().into_temp_path(),
1117            tempfile::TempPath::from_path("/tmp/test.txt").to_path_buf(),
1118        )
1119    }
1120
1121    type TestSourceHandler = SourceHandler<
1122        TestCfdpSender,
1123        TestFaultHandler,
1124        NativeFilestore,
1125        StdRemoteEntityConfigProvider,
1126        StdTimerCreator,
1127        StdCountdown,
1128        SeqCountProviderSimple<u16>,
1129    >;
1130
1131    struct SourceHandlerTestbench {
1132        handler: TestSourceHandler,
1133        #[allow(dead_code)]
1134        srcfile_handle: TempPath,
1135        srcfile: String,
1136        destfile: String,
1137        max_packet_len: usize,
1138        check_idle_on_drop: bool,
1139    }
1140
1141    #[allow(dead_code)]
1142    struct TransferInfo {
1143        id: TransactionId,
1144        closure_requested: bool,
1145        pdu_header: PduHeader,
1146    }
1147
1148    impl SourceHandlerTestbench {
1149        fn new(
1150            crc_on_transmission_by_default: bool,
1151            test_fault_handler: TestFaultHandler,
1152            test_packet_sender: TestCfdpSender,
1153            max_packet_len: usize,
1154        ) -> Self {
1155            let local_entity_cfg = LocalEntityConfig {
1156                id: LOCAL_ID.into(),
1157                indication_cfg: IndicationConfig::default(),
1158                fault_handler: FaultHandler::new(test_fault_handler),
1159            };
1160            let static_put_request_cacher = StaticPutRequestCacher::new(2048);
1161            let (srcfile_handle, destfile) = init_full_filepaths_textfile();
1162            let srcfile = String::from(srcfile_handle.to_path_buf().to_str().unwrap());
1163            Self {
1164                handler: SourceHandler::new(
1165                    local_entity_cfg,
1166                    test_packet_sender,
1167                    NativeFilestore::default(),
1168                    static_put_request_cacher,
1169                    1024,
1170                    basic_remote_cfg_table(
1171                        REMOTE_ID,
1172                        max_packet_len,
1173                        crc_on_transmission_by_default,
1174                    ),
1175                    StdTimerCreator::new(core::time::Duration::from_millis(100)),
1176                    SeqCountProviderSimple::default(),
1177                ),
1178                srcfile_handle,
1179                srcfile,
1180                destfile: String::from(destfile.to_path_buf().to_str().unwrap()),
1181                max_packet_len,
1182                check_idle_on_drop: true,
1183            }
1184        }
1185
1186        fn create_user(&self, next_expected_seq_num: u64, filesize: u64) -> TestCfdpUser {
1187            TestCfdpUser::new(
1188                next_expected_seq_num,
1189                self.srcfile.clone(),
1190                self.destfile.clone(),
1191                filesize,
1192            )
1193        }
1194
1195        fn set_check_limit_timeout(&mut self, timeout: Duration) {
1196            self.handler.timer_creator.check_limit_timeout = timeout;
1197        }
1198
1199        fn put_request(
1200            &mut self,
1201            put_request: &impl ReadablePutRequest,
1202        ) -> Result<(), PutRequestError> {
1203            self.handler.put_request(put_request)
1204        }
1205
1206        fn all_fault_queues_empty(&self) -> bool {
1207            self.handler
1208                .local_cfg
1209                .user_fault_hook()
1210                .borrow()
1211                .all_queues_empty()
1212        }
1213
1214        #[allow(dead_code)]
1215        fn test_fault_handler(&self) -> &RefCell<TestFaultHandler> {
1216            self.handler.local_cfg.user_fault_hook()
1217        }
1218
1219        fn test_fault_handler_mut(&mut self) -> &mut RefCell<TestFaultHandler> {
1220            self.handler.local_cfg.user_fault_hook_mut()
1221        }
1222
1223        fn pdu_queue_empty(&self) -> bool {
1224            self.handler.pdu_sender.queue_empty()
1225        }
1226
1227        fn get_next_sent_pdu(&self) -> Option<SentPdu> {
1228            self.handler.pdu_sender.retrieve_next_pdu()
1229        }
1230
1231        fn common_pdu_check_for_file_transfer(&self, pdu_header: &PduHeader, crc_flag: CrcFlag) {
1232            assert_eq!(
1233                pdu_header.seg_ctrl(),
1234                SegmentationControl::NoRecordBoundaryPreservation
1235            );
1236            assert_eq!(
1237                pdu_header.seg_metadata_flag(),
1238                SegmentMetadataFlag::NotPresent
1239            );
1240            assert_eq!(pdu_header.common_pdu_conf().source_id(), LOCAL_ID.into());
1241            assert_eq!(pdu_header.common_pdu_conf().dest_id(), REMOTE_ID.into());
1242            assert_eq!(pdu_header.common_pdu_conf().crc_flag, crc_flag);
1243            assert_eq!(
1244                pdu_header.common_pdu_conf().trans_mode,
1245                TransmissionMode::Unacknowledged
1246            );
1247            assert_eq!(
1248                pdu_header.common_pdu_conf().direction,
1249                Direction::TowardsReceiver
1250            );
1251            assert_eq!(
1252                pdu_header.common_pdu_conf().file_flag,
1253                LargeFileFlag::Normal
1254            );
1255            assert_eq!(pdu_header.common_pdu_conf().transaction_seq_num.size(), 2);
1256        }
1257
1258        fn generic_file_transfer(
1259            &mut self,
1260            cfdp_user: &mut TestCfdpUser,
1261            with_closure: bool,
1262            file_data: Vec<u8>,
1263        ) -> (PduHeader, u32) {
1264            let mut digest = CRC_32.digest();
1265            digest.update(&file_data);
1266            let checksum = digest.finalize();
1267            cfdp_user.expected_full_src_name = self.srcfile.clone();
1268            cfdp_user.expected_full_dest_name = self.destfile.clone();
1269            cfdp_user.expected_file_size = file_data.len() as u64;
1270            let put_request = PutRequestOwned::new_regular_request(
1271                REMOTE_ID.into(),
1272                &self.srcfile,
1273                &self.destfile,
1274                Some(TransmissionMode::Unacknowledged),
1275                Some(with_closure),
1276            )
1277            .expect("creating put request failed");
1278            let transaction_info = self.common_no_acked_file_transfer(
1279                cfdp_user,
1280                put_request,
1281                cfdp_user.expected_file_size,
1282            );
1283            let mut current_offset = 0;
1284            let chunks = file_data.chunks(
1285                calculate_max_file_seg_len_for_max_packet_len_and_pdu_header(
1286                    &transaction_info.pdu_header,
1287                    self.max_packet_len,
1288                    None,
1289                ),
1290            );
1291            let mut fd_pdus = 0;
1292            for segment in chunks {
1293                self.check_next_file_pdu(current_offset, segment);
1294                self.handler.state_machine_no_packet(cfdp_user).unwrap();
1295                fd_pdus += 1;
1296                current_offset += segment.len() as u64;
1297            }
1298            self.common_eof_pdu_check(
1299                cfdp_user,
1300                transaction_info.closure_requested,
1301                cfdp_user.expected_file_size,
1302                checksum,
1303            );
1304            (transaction_info.pdu_header, fd_pdus)
1305        }
1306
1307        fn common_no_acked_file_transfer(
1308            &mut self,
1309            cfdp_user: &mut TestCfdpUser,
1310            put_request: PutRequestOwned,
1311            filesize: u64,
1312        ) -> TransferInfo {
1313            assert_eq!(cfdp_user.transaction_indication_call_count, 0);
1314            assert_eq!(cfdp_user.eof_sent_call_count, 0);
1315
1316            self.put_request(&put_request)
1317                .expect("put_request call failed");
1318            assert_eq!(self.handler.state(), State::Busy);
1319            assert_eq!(self.handler.step(), TransactionStep::Idle);
1320            let id = self.handler.transaction_id().unwrap();
1321            let sent_packets = self
1322                .handler
1323                .state_machine_no_packet(cfdp_user)
1324                .expect("source handler FSM failure");
1325            assert_eq!(sent_packets, 2);
1326            assert!(!self.pdu_queue_empty());
1327            let next_pdu = self.get_next_sent_pdu().unwrap();
1328            assert!(!self.pdu_queue_empty());
1329            assert_eq!(next_pdu.pdu_type, PduType::FileDirective);
1330            assert_eq!(
1331                next_pdu.file_directive_type,
1332                Some(FileDirectiveType::MetadataPdu)
1333            );
1334            let metadata_pdu =
1335                MetadataPduReader::new(&next_pdu.raw_pdu).expect("invalid metadata PDU format");
1336            let pdu_header = metadata_pdu.pdu_header();
1337            self.common_pdu_check_for_file_transfer(metadata_pdu.pdu_header(), CrcFlag::NoCrc);
1338            assert_eq!(
1339                metadata_pdu
1340                    .src_file_name()
1341                    .value_as_str()
1342                    .unwrap()
1343                    .unwrap(),
1344                self.srcfile
1345            );
1346            assert_eq!(
1347                metadata_pdu
1348                    .dest_file_name()
1349                    .value_as_str()
1350                    .unwrap()
1351                    .unwrap(),
1352                self.destfile
1353            );
1354            assert_eq!(metadata_pdu.metadata_params().file_size, filesize);
1355            assert_eq!(
1356                metadata_pdu.metadata_params().checksum_type,
1357                ChecksumType::Crc32
1358            );
1359            let closure_requested = if let Some(closure_requested) = put_request.closure_requested {
1360                assert_eq!(
1361                    metadata_pdu.metadata_params().closure_requested,
1362                    closure_requested
1363                );
1364                closure_requested
1365            } else {
1366                assert!(metadata_pdu.metadata_params().closure_requested);
1367                metadata_pdu.metadata_params().closure_requested
1368            };
1369            assert_eq!(metadata_pdu.options(), &[]);
1370            TransferInfo {
1371                pdu_header: *pdu_header,
1372                closure_requested,
1373                id,
1374            }
1375        }
1376
1377        fn check_next_file_pdu(&mut self, expected_offset: u64, expected_data: &[u8]) {
1378            let next_pdu = self.get_next_sent_pdu().unwrap();
1379            assert_eq!(next_pdu.pdu_type, PduType::FileData);
1380            assert!(next_pdu.file_directive_type.is_none());
1381            let fd_pdu =
1382                FileDataPdu::from_bytes(&next_pdu.raw_pdu).expect("reading file data PDU failed");
1383            assert_eq!(fd_pdu.offset(), expected_offset);
1384            assert_eq!(fd_pdu.file_data(), expected_data);
1385            assert!(fd_pdu.segment_metadata().is_none());
1386        }
1387
1388        fn common_eof_pdu_check(
1389            &mut self,
1390            cfdp_user: &mut TestCfdpUser,
1391            closure_requested: bool,
1392            filesize: u64,
1393            checksum: u32,
1394        ) {
1395            let next_pdu = self.get_next_sent_pdu().unwrap();
1396            assert_eq!(next_pdu.pdu_type, PduType::FileDirective);
1397            assert_eq!(
1398                next_pdu.file_directive_type,
1399                Some(FileDirectiveType::EofPdu)
1400            );
1401            let eof_pdu = EofPdu::from_bytes(&next_pdu.raw_pdu).expect("invalid EOF PDU format");
1402            self.common_pdu_check_for_file_transfer(eof_pdu.pdu_header(), CrcFlag::NoCrc);
1403            assert_eq!(eof_pdu.condition_code(), ConditionCode::NoError);
1404            assert_eq!(eof_pdu.file_size(), filesize);
1405            assert_eq!(eof_pdu.file_checksum(), checksum);
1406            assert_eq!(
1407                eof_pdu
1408                    .pdu_header()
1409                    .common_pdu_conf()
1410                    .transaction_seq_num
1411                    .value_const(),
1412                0
1413            );
1414            if !closure_requested {
1415                assert_eq!(self.handler.state(), State::Idle);
1416                assert_eq!(self.handler.step(), TransactionStep::Idle);
1417            } else {
1418                assert_eq!(self.handler.state(), State::Busy);
1419                assert_eq!(self.handler.step(), TransactionStep::WaitingForFinished);
1420            }
1421            assert_eq!(cfdp_user.transaction_indication_call_count, 1);
1422            assert_eq!(cfdp_user.eof_sent_call_count, 1);
1423            self.all_fault_queues_empty();
1424        }
1425
1426        fn common_tiny_file_transfer(
1427            &mut self,
1428            cfdp_user: &mut TestCfdpUser,
1429            with_closure: bool,
1430        ) -> PduHeader {
1431            let mut file = OpenOptions::new()
1432                .write(true)
1433                .open(&self.srcfile)
1434                .expect("opening file failed");
1435            let content_str = "Hello World!";
1436            file.write_all(content_str.as_bytes())
1437                .expect("writing file content failed");
1438            drop(file);
1439            let (pdu_header, fd_pdus) = self.generic_file_transfer(
1440                cfdp_user,
1441                with_closure,
1442                content_str.as_bytes().to_vec(),
1443            );
1444            assert_eq!(fd_pdus, 1);
1445            pdu_header
1446        }
1447
1448        fn finish_handling(&mut self, user: &mut TestCfdpUser, pdu_header: PduHeader) {
1449            let finished_pdu = FinishedPduCreator::new_default(
1450                pdu_header,
1451                DeliveryCode::Complete,
1452                FileStatus::Retained,
1453            );
1454            let finished_pdu_vec = finished_pdu.to_vec().unwrap();
1455            let packet_info = PduRawWithInfo::new(&finished_pdu_vec).unwrap();
1456            self.handler
1457                .state_machine(user, Some(&packet_info))
1458                .unwrap();
1459        }
1460    }
1461
1462    impl Drop for SourceHandlerTestbench {
1463        fn drop(&mut self) {
1464            self.all_fault_queues_empty();
1465            if self.check_idle_on_drop {
1466                assert_eq!(self.handler.state(), State::Idle);
1467                assert_eq!(self.handler.step(), TransactionStep::Idle);
1468            }
1469        }
1470    }
1471
1472    #[test]
1473    fn test_basic() {
1474        let fault_handler = TestFaultHandler::default();
1475        let test_sender = TestCfdpSender::default();
1476        let tb = SourceHandlerTestbench::new(false, fault_handler, test_sender, 512);
1477        assert!(tb.handler.transmission_mode().is_none());
1478        assert!(tb.pdu_queue_empty());
1479    }
1480
1481    #[test]
1482    fn test_empty_file_transfer_not_acked_no_closure() {
1483        let fault_handler = TestFaultHandler::default();
1484        let test_sender = TestCfdpSender::default();
1485        let mut tb = SourceHandlerTestbench::new(false, fault_handler, test_sender, 512);
1486        let filesize = 0;
1487        let put_request = PutRequestOwned::new_regular_request(
1488            REMOTE_ID.into(),
1489            &tb.srcfile,
1490            &tb.destfile,
1491            Some(TransmissionMode::Unacknowledged),
1492            Some(false),
1493        )
1494        .expect("creating put request failed");
1495        let mut cfdp_user = tb.create_user(0, filesize);
1496        let transaction_info =
1497            tb.common_no_acked_file_transfer(&mut cfdp_user, put_request, filesize);
1498        tb.common_eof_pdu_check(
1499            &mut cfdp_user,
1500            transaction_info.closure_requested,
1501            filesize,
1502            CRC_32.digest().finalize(),
1503        )
1504    }
1505
1506    #[test]
1507    fn test_tiny_file_transfer_not_acked_no_closure() {
1508        let fault_handler = TestFaultHandler::default();
1509        let test_sender = TestCfdpSender::default();
1510        let mut cfdp_user = TestCfdpUser::default();
1511        let mut tb = SourceHandlerTestbench::new(false, fault_handler, test_sender, 512);
1512        tb.common_tiny_file_transfer(&mut cfdp_user, false);
1513    }
1514
1515    #[test]
1516    fn test_tiny_file_transfer_not_acked_with_closure() {
1517        let fault_handler = TestFaultHandler::default();
1518        let test_sender = TestCfdpSender::default();
1519        let mut tb = SourceHandlerTestbench::new(false, fault_handler, test_sender, 512);
1520        let mut cfdp_user = TestCfdpUser::default();
1521        let pdu_header = tb.common_tiny_file_transfer(&mut cfdp_user, true);
1522        tb.finish_handling(&mut cfdp_user, pdu_header)
1523    }
1524
1525    #[test]
1526    fn test_two_segment_file_transfer_not_acked_no_closure() {
1527        let fault_handler = TestFaultHandler::default();
1528        let test_sender = TestCfdpSender::default();
1529        let mut tb = SourceHandlerTestbench::new(false, fault_handler, test_sender, 128);
1530        let mut cfdp_user = TestCfdpUser::default();
1531        let mut file = OpenOptions::new()
1532            .write(true)
1533            .open(&tb.srcfile)
1534            .expect("opening file failed");
1535        let mut rand_data = [0u8; 140];
1536        rand::thread_rng().fill(&mut rand_data[..]);
1537        file.write_all(&rand_data)
1538            .expect("writing file content failed");
1539        drop(file);
1540        let (_, fd_pdus) = tb.generic_file_transfer(&mut cfdp_user, false, rand_data.to_vec());
1541        assert_eq!(fd_pdus, 2);
1542    }
1543
1544    #[test]
1545    fn test_two_segment_file_transfer_not_acked_with_closure() {
1546        let fault_handler = TestFaultHandler::default();
1547        let test_sender = TestCfdpSender::default();
1548        let mut tb = SourceHandlerTestbench::new(false, fault_handler, test_sender, 128);
1549        let mut cfdp_user = TestCfdpUser::default();
1550        let mut file = OpenOptions::new()
1551            .write(true)
1552            .open(&tb.srcfile)
1553            .expect("opening file failed");
1554        let mut rand_data = [0u8; 140];
1555        rand::thread_rng().fill(&mut rand_data[..]);
1556        file.write_all(&rand_data)
1557            .expect("writing file content failed");
1558        drop(file);
1559        let (pdu_header, fd_pdus) =
1560            tb.generic_file_transfer(&mut cfdp_user, true, rand_data.to_vec());
1561        assert_eq!(fd_pdus, 2);
1562        tb.finish_handling(&mut cfdp_user, pdu_header)
1563    }
1564
1565    #[test]
1566    fn test_empty_file_transfer_not_acked_with_closure() {
1567        let fault_handler = TestFaultHandler::default();
1568        let test_sender = TestCfdpSender::default();
1569        let mut tb = SourceHandlerTestbench::new(false, fault_handler, test_sender, 512);
1570        let filesize = 0;
1571        let put_request = PutRequestOwned::new_regular_request(
1572            REMOTE_ID.into(),
1573            &tb.srcfile,
1574            &tb.destfile,
1575            Some(TransmissionMode::Unacknowledged),
1576            Some(true),
1577        )
1578        .expect("creating put request failed");
1579        let mut cfdp_user = tb.create_user(0, filesize);
1580        let transaction_info =
1581            tb.common_no_acked_file_transfer(&mut cfdp_user, put_request, filesize);
1582        tb.common_eof_pdu_check(
1583            &mut cfdp_user,
1584            transaction_info.closure_requested,
1585            filesize,
1586            CRC_32.digest().finalize(),
1587        );
1588        tb.finish_handling(&mut cfdp_user, transaction_info.pdu_header)
1589    }
1590
1591    #[test]
1592    fn test_put_request_no_remote_cfg() {
1593        let fault_handler = TestFaultHandler::default();
1594        let test_sender = TestCfdpSender::default();
1595        let mut tb = SourceHandlerTestbench::new(false, fault_handler, test_sender, 512);
1596
1597        let (srcfile, destfile) = init_full_filepaths_textfile();
1598        let srcfile_str = String::from(srcfile.to_str().unwrap());
1599        let destfile_str = String::from(destfile.to_str().unwrap());
1600        let put_request = PutRequestOwned::new_regular_request(
1601            INVALID_ID.into(),
1602            &srcfile_str,
1603            &destfile_str,
1604            Some(TransmissionMode::Unacknowledged),
1605            Some(true),
1606        )
1607        .expect("creating put request failed");
1608        let error = tb.handler.put_request(&put_request);
1609        assert!(error.is_err());
1610        let error = error.unwrap_err();
1611        if let PutRequestError::NoRemoteCfgFound(id) = error {
1612            assert_eq!(id, INVALID_ID.into());
1613        } else {
1614            panic!("unexpected error type: {:?}", error);
1615        }
1616    }
1617
1618    #[test]
1619    fn test_put_request_file_does_not_exist() {
1620        let fault_handler = TestFaultHandler::default();
1621        let test_sender = TestCfdpSender::default();
1622        let mut tb = SourceHandlerTestbench::new(false, fault_handler, test_sender, 512);
1623
1624        let file_which_does_not_exist = "/tmp/this_file_does_not_exist.txt";
1625        let destfile = "/tmp/tmp.txt";
1626        let put_request = PutRequestOwned::new_regular_request(
1627            REMOTE_ID.into(),
1628            file_which_does_not_exist,
1629            destfile,
1630            Some(TransmissionMode::Unacknowledged),
1631            Some(true),
1632        )
1633        .expect("creating put request failed");
1634        let error = tb.put_request(&put_request);
1635        assert!(error.is_err());
1636        let error = error.unwrap_err();
1637        if !matches!(error, PutRequestError::FileDoesNotExist) {
1638            panic!("unexpected error type: {:?}", error);
1639        }
1640    }
1641
1642    #[test]
1643    fn test_finished_pdu_check_timeout() {
1644        let fault_handler = TestFaultHandler::default();
1645        let test_sender = TestCfdpSender::default();
1646        let mut tb = SourceHandlerTestbench::new(false, fault_handler, test_sender, 512);
1647        tb.set_check_limit_timeout(Duration::from_millis(45));
1648        let filesize = 0;
1649        let put_request = PutRequestOwned::new_regular_request(
1650            REMOTE_ID.into(),
1651            &tb.srcfile,
1652            &tb.destfile,
1653            Some(TransmissionMode::Unacknowledged),
1654            Some(true),
1655        )
1656        .expect("creating put request failed");
1657        let mut cfdp_user = tb.create_user(0, filesize);
1658        let transaction_info =
1659            tb.common_no_acked_file_transfer(&mut cfdp_user, put_request, filesize);
1660        let expected_id = tb.handler.transaction_id().unwrap();
1661        tb.common_eof_pdu_check(
1662            &mut cfdp_user,
1663            transaction_info.closure_requested,
1664            filesize,
1665            CRC_32.digest().finalize(),
1666        );
1667        // After 50 ms delay, we run into a timeout, which leads to a check limit error
1668        // declaration -> leads to a notice of cancellation -> leads to an EOF PDU with the
1669        // appropriate error code.
1670        thread::sleep(Duration::from_millis(50));
1671        assert_eq!(
1672            tb.handler.state_machine_no_packet(&mut cfdp_user).unwrap(),
1673            0
1674        );
1675        let next_pdu = tb.get_next_sent_pdu().unwrap();
1676        let eof_pdu = EofPdu::from_bytes(&next_pdu.raw_pdu).expect("invalid EOF PDU format");
1677        tb.common_pdu_check_for_file_transfer(eof_pdu.pdu_header(), CrcFlag::NoCrc);
1678        assert_eq!(eof_pdu.condition_code(), ConditionCode::CheckLimitReached);
1679        assert_eq!(eof_pdu.file_size(), 0);
1680        assert_eq!(eof_pdu.file_checksum(), 0);
1681
1682        // Cancellation fault should have been triggered.
1683        let fault_handler = tb.test_fault_handler_mut();
1684        let fh_ref_mut = fault_handler.get_mut();
1685        assert!(!fh_ref_mut.cancellation_queue_empty());
1686        assert_eq!(fh_ref_mut.notice_of_cancellation_queue.len(), 1);
1687        let (id, cond_code, progress) = fh_ref_mut.notice_of_cancellation_queue.pop_back().unwrap();
1688        assert_eq!(id, expected_id);
1689        assert_eq!(cond_code, ConditionCode::CheckLimitReached);
1690        assert_eq!(progress, 0);
1691        fh_ref_mut.all_queues_empty();
1692    }
1693
1694    #[test]
1695    fn test_cancelled_transfer_empty_file() {
1696        let fault_handler = TestFaultHandler::default();
1697        let test_sender = TestCfdpSender::default();
1698        let mut tb = SourceHandlerTestbench::new(false, fault_handler, test_sender, 512);
1699        let filesize = 0;
1700        let put_request = PutRequestOwned::new_regular_request(
1701            REMOTE_ID.into(),
1702            &tb.srcfile,
1703            &tb.destfile,
1704            Some(TransmissionMode::Unacknowledged),
1705            Some(false),
1706        )
1707        .expect("creating put request failed");
1708        let mut cfdp_user = tb.create_user(0, filesize);
1709        assert_eq!(cfdp_user.transaction_indication_call_count, 0);
1710        assert_eq!(cfdp_user.eof_sent_call_count, 0);
1711
1712        tb.put_request(&put_request)
1713            .expect("put_request call failed");
1714        assert_eq!(tb.handler.state(), State::Busy);
1715        assert_eq!(tb.handler.step(), TransactionStep::Idle);
1716        assert!(tb.get_next_sent_pdu().is_none());
1717        let id = tb.handler.transaction_id().unwrap();
1718        tb.handler
1719            .cancel_request(&mut cfdp_user, &id)
1720            .expect("transaction cancellation failed");
1721        assert_eq!(tb.handler.state(), State::Idle);
1722        assert_eq!(tb.handler.step(), TransactionStep::Idle);
1723        // EOF (Cancel) PDU will be generated
1724        let eof_pdu = tb
1725            .get_next_sent_pdu()
1726            .expect("no EOF PDU generated like expected");
1727        assert_eq!(
1728            eof_pdu.file_directive_type.unwrap(),
1729            FileDirectiveType::EofPdu
1730        );
1731        let eof_pdu = EofPdu::from_bytes(&eof_pdu.raw_pdu).unwrap();
1732        assert_eq!(
1733            eof_pdu.condition_code(),
1734            ConditionCode::CancelRequestReceived
1735        );
1736        assert_eq!(eof_pdu.file_checksum(), 0);
1737        assert_eq!(eof_pdu.file_size(), 0);
1738        tb.common_pdu_check_for_file_transfer(eof_pdu.pdu_header(), CrcFlag::NoCrc);
1739    }
1740
1741    #[test]
1742    fn test_cancelled_transfer_mid_transfer() {
1743        let fault_handler = TestFaultHandler::default();
1744        let test_sender = TestCfdpSender::default();
1745        let mut tb = SourceHandlerTestbench::new(false, fault_handler, test_sender, 128);
1746        let mut file = OpenOptions::new()
1747            .write(true)
1748            .open(&tb.srcfile)
1749            .expect("opening file failed");
1750        let mut rand_data = [0u8; 140];
1751        rand::thread_rng().fill(&mut rand_data[..]);
1752        file.write_all(&rand_data)
1753            .expect("writing file content failed");
1754        drop(file);
1755        let put_request = PutRequestOwned::new_regular_request(
1756            REMOTE_ID.into(),
1757            &tb.srcfile,
1758            &tb.destfile,
1759            Some(TransmissionMode::Unacknowledged),
1760            Some(false),
1761        )
1762        .expect("creating put request failed");
1763        let file_size = rand_data.len() as u64;
1764        let mut cfdp_user = tb.create_user(0, file_size);
1765        let transaction_info =
1766            tb.common_no_acked_file_transfer(&mut cfdp_user, put_request, file_size);
1767        let mut chunks = rand_data.chunks(
1768            calculate_max_file_seg_len_for_max_packet_len_and_pdu_header(
1769                &transaction_info.pdu_header,
1770                tb.max_packet_len,
1771                None,
1772            ),
1773        );
1774        let mut digest = CRC_32.digest();
1775        let first_chunk = chunks.next().expect("no chunk found");
1776        digest.update(first_chunk);
1777        let checksum = digest.finalize();
1778        let next_packet = tb.get_next_sent_pdu().unwrap();
1779        assert_eq!(next_packet.pdu_type, PduType::FileData);
1780        let fd_pdu = FileDataPdu::from_bytes(&next_packet.raw_pdu).unwrap();
1781        assert_eq!(fd_pdu.file_data(), &rand_data[0..first_chunk.len()]);
1782        let expected_id = tb.handler.transaction_id().unwrap();
1783        assert!(tb
1784            .handler
1785            .cancel_request(&mut cfdp_user, &expected_id)
1786            .expect("cancellation failed"));
1787        assert_eq!(tb.handler.state(), State::Idle);
1788        assert_eq!(tb.handler.step(), TransactionStep::Idle);
1789        let next_packet = tb.get_next_sent_pdu().unwrap();
1790        assert_eq!(next_packet.pdu_type, PduType::FileDirective);
1791        assert_eq!(
1792            next_packet.file_directive_type.unwrap(),
1793            FileDirectiveType::EofPdu
1794        );
1795        // As specified in 4.11.2.2 of the standard, the file size will be the progress of the
1796        // file copy operation so far, and the checksum is calculated for that progress.
1797        let eof_pdu = EofPdu::from_bytes(&next_packet.raw_pdu).expect("EOF PDU creation failed");
1798        assert_eq!(eof_pdu.file_size(), first_chunk.len() as u64);
1799        assert_eq!(eof_pdu.file_checksum(), checksum);
1800        assert_eq!(
1801            eof_pdu.condition_code(),
1802            ConditionCode::CancelRequestReceived
1803        );
1804    }
1805}