satrs_core/cfdp/
dest.rs

1use crate::cfdp::user::TransactionFinishedParams;
2use core::str::{from_utf8, Utf8Error};
3use std::path::{Path, PathBuf};
4
5use super::{
6    filestore::{FilestoreError, VirtualFilestore},
7    user::{CfdpUser, FileSegmentRecvdParams, MetadataReceivedParams},
8    CheckTimer, CheckTimerCreator, EntityType, LocalEntityConfig, PacketInfo, PacketTarget,
9    RemoteEntityConfig, RemoteEntityConfigProvider, State, TimerContext, TransactionId,
10    TransactionStep,
11};
12use alloc::boxed::Box;
13use smallvec::SmallVec;
14use spacepackets::{
15    cfdp::{
16        pdu::{
17            eof::EofPdu,
18            file_data::FileDataPdu,
19            finished::{DeliveryCode, FileStatus, FinishedPduCreator},
20            metadata::{MetadataGenericParams, MetadataPduReader},
21            CfdpPdu, CommonPduConfig, FileDirectiveType, PduError, PduHeader, WritablePduPacket,
22        },
23        tlv::{msg_to_user::MsgToUserTlv, EntityIdTlv, GenericTlv, TlvType},
24        ChecksumType, ConditionCode, FaultHandlerCode, PduType, TransmissionMode,
25    },
26    util::UnsignedByteField,
27};
28use thiserror::Error;
29
30#[derive(Debug)]
31struct FileProperties {
32    src_file_name: [u8; u8::MAX as usize],
33    src_file_name_len: usize,
34    dest_file_name: [u8; u8::MAX as usize],
35    dest_file_name_len: usize,
36    dest_path_buf: PathBuf,
37}
38
39#[derive(Debug, PartialEq, Eq, Copy, Clone)]
40enum CompletionDisposition {
41    Completed = 0,
42    Cancelled = 1,
43}
44
45#[derive(Debug)]
46struct TransferState {
47    transaction_id: Option<TransactionId>,
48    metadata_params: MetadataGenericParams,
49    progress: u64,
50    metadata_only: bool,
51    condition_code: ConditionCode,
52    delivery_code: DeliveryCode,
53    file_status: FileStatus,
54    completion_disposition: CompletionDisposition,
55    checksum: u32,
56    current_check_count: u32,
57    current_check_timer: Option<Box<dyn CheckTimer>>,
58}
59
60impl Default for TransferState {
61    fn default() -> Self {
62        Self {
63            transaction_id: None,
64            metadata_params: Default::default(),
65            progress: Default::default(),
66            metadata_only: false,
67            condition_code: ConditionCode::NoError,
68            delivery_code: DeliveryCode::Incomplete,
69            file_status: FileStatus::Unreported,
70            completion_disposition: CompletionDisposition::Completed,
71            checksum: 0,
72            current_check_count: 0,
73            current_check_timer: None,
74        }
75    }
76}
77
78#[derive(Debug)]
79struct TransactionParams {
80    tstate: TransferState,
81    pdu_conf: CommonPduConfig,
82    file_properties: FileProperties,
83    cksum_buf: [u8; 1024],
84    msgs_to_user_size: usize,
85    msgs_to_user_buf: [u8; 1024],
86    remote_cfg: Option<RemoteEntityConfig>,
87}
88
89impl TransactionParams {
90    fn transmission_mode(&self) -> TransmissionMode {
91        self.pdu_conf.trans_mode
92    }
93}
94
95impl Default for FileProperties {
96    fn default() -> Self {
97        Self {
98            src_file_name: [0; u8::MAX as usize],
99            src_file_name_len: Default::default(),
100            dest_file_name: [0; u8::MAX as usize],
101            dest_file_name_len: Default::default(),
102            dest_path_buf: Default::default(),
103        }
104    }
105}
106
107impl TransactionParams {
108    fn file_size(&self) -> u64 {
109        self.tstate.metadata_params.file_size
110    }
111
112    fn metadata_params(&self) -> &MetadataGenericParams {
113        &self.tstate.metadata_params
114    }
115}
116
117impl Default for TransactionParams {
118    fn default() -> Self {
119        Self {
120            pdu_conf: Default::default(),
121            cksum_buf: [0; 1024],
122            msgs_to_user_size: 0,
123            msgs_to_user_buf: [0; 1024],
124            tstate: Default::default(),
125            file_properties: Default::default(),
126            remote_cfg: None,
127        }
128    }
129}
130
131impl TransactionParams {
132    fn reset(&mut self) {
133        self.tstate.condition_code = ConditionCode::NoError;
134        self.tstate.delivery_code = DeliveryCode::Incomplete;
135        self.tstate.file_status = FileStatus::Unreported;
136    }
137}
138
139#[derive(Debug, Error)]
140pub enum DestError {
141    /// File directive expected, but none specified
142    #[error("expected file directive")]
143    DirectiveExpected,
144    #[error("can not process packet type {0:?}")]
145    CantProcessPacketType(FileDirectiveType),
146    #[error("can not process file data PDUs in current state")]
147    WrongStateForFileDataAndEof,
148    // Received new metadata PDU while being already being busy with a file transfer.
149    #[error("busy with transfer")]
150    RecvdMetadataButIsBusy,
151    #[error("empty source file field")]
152    EmptySrcFileField,
153    #[error("empty dest file field")]
154    EmptyDestFileField,
155    #[error("packets to be sent are still left")]
156    PacketToSendLeft,
157    #[error("pdu error {0}")]
158    Pdu(#[from] PduError),
159    #[error("io error {0}")]
160    Io(#[from] std::io::Error),
161    #[error("file store error {0}")]
162    Filestore(#[from] FilestoreError),
163    #[error("path conversion error {0}")]
164    PathConversion(#[from] Utf8Error),
165    #[error("error building dest path from source file name and dest folder")]
166    PathConcat,
167    #[error("no remote entity configuration found for {0:?}")]
168    NoRemoteCfgFound(UnsignedByteField),
169}
170
171pub trait CfdpPacketSender: Send {
172    fn send_pdu(
173        &mut self,
174        pdu_type: PduType,
175        file_directive_type: Option<FileDirectiveType>,
176        raw_pdu: &[u8],
177    ) -> Result<(), PduError>;
178}
179
180/// This is the primary CFDP destination handler. It models the CFDP destination entity, which is
181/// primarily responsible for receiving files sent from another CFDP entity. It performs the
182/// reception side of File Copy Operations.
183///
184/// The [DestinationHandler::state_machine] function is the primary function to drive the
185/// destination handler. It can be used to insert packets into the destination
186/// handler and driving the state machine, which might generate new
187/// packets to be sent to the remote entity. Please note that the destination handler can also
188/// only process Metadata, EOF and Prompt PDUs in addition to ACK PDUs where the acknowledged
189/// PDU is the Finished PDU.
190///
191/// All generated packets are sent via the [CfdpPacketSender] trait, which is implemented by the
192/// user and passed as a constructor parameter. The number of generated packets is returned
193/// by the state machine call.
194pub struct DestinationHandler {
195    local_cfg: LocalEntityConfig,
196    step: TransactionStep,
197    state: State,
198    tparams: TransactionParams,
199    packet_buf: alloc::vec::Vec<u8>,
200    packet_sender: Box<dyn CfdpPacketSender>,
201    vfs: Box<dyn VirtualFilestore>,
202    remote_cfg_table: Box<dyn RemoteEntityConfigProvider>,
203    check_timer_creator: Box<dyn CheckTimerCreator>,
204}
205
206impl DestinationHandler {
207    /// Constructs a new destination handler.
208    ///
209    /// # Arguments
210    ///
211    /// * `local_cfg` - The local CFDP entity configuration, consisting of the local entity ID,
212    ///    the indication configuration, and the fault handlers.
213    /// * `max_packet_len` - The maximum expected generated packet size in bytes. Each time a
214    ///    packet is sent, it will be buffered inside an internal buffer. The length of this buffer
215    ///    will be determined by this parameter. This parameter can either be a known upper bound,
216    ///    or it can specifically be determined by the largest packet size parameter of all remote
217    ///    entity configurations in the passed `remote_cfg_table`.
218    /// * `packet_sender` - All generated packets are sent via this abstraction.
219    /// * `vfs` - Virtual filestore implementation to decouple the CFDP implementation from the
220    ///    underlying filestore/filesystem. This allows to use this handler for embedded systems
221    ///    where a standard runtime might not be available.
222    /// * `remote_cfg_table` - A table of all expected remote entities this entity will communicate
223    ///    with. It contains various configuration parameters required for file transfers.
224    /// * `check_timer_creator` - This is used by the CFDP handler to generate timers required
225    ///    by various tasks.
226    pub fn new(
227        local_cfg: LocalEntityConfig,
228        max_packet_len: usize,
229        packet_sender: Box<dyn CfdpPacketSender>,
230        vfs: Box<dyn VirtualFilestore>,
231        remote_cfg_table: Box<dyn RemoteEntityConfigProvider>,
232        check_timer_creator: Box<dyn CheckTimerCreator>,
233    ) -> Self {
234        Self {
235            local_cfg,
236            step: TransactionStep::Idle,
237            state: State::Idle,
238            tparams: Default::default(),
239            packet_buf: alloc::vec![0; max_packet_len],
240            packet_sender,
241            vfs,
242            remote_cfg_table,
243            check_timer_creator,
244        }
245    }
246
247    /// This is the core function to drive the destination handler. It is also used to insert
248    /// packets into the destination handler.
249    ///
250    /// The state machine should either be called if a packet with the appropriate destination ID
251    /// is received, or periodically in IDLE periods to perform all CFDP related tasks, for example
252    /// checking for timeouts or missed file segments.
253    pub fn state_machine(
254        &mut self,
255        cfdp_user: &mut impl CfdpUser,
256        packet_to_insert: Option<&PacketInfo>,
257    ) -> Result<u32, DestError> {
258        if let Some(packet) = packet_to_insert {
259            self.insert_packet(cfdp_user, packet)?;
260        }
261        match self.state {
262            State::Idle => todo!(),
263            State::Busy => self.fsm_busy(cfdp_user),
264            State::Suspended => todo!(),
265        }
266    }
267
268    /// Returns [None] if the state machine is IDLE, and the transmission mode of the current
269    /// request otherwise.
270    pub fn transmission_mode(&self) -> Option<TransmissionMode> {
271        if self.state == State::Idle {
272            return None;
273        }
274        Some(self.tparams.transmission_mode())
275    }
276
277    pub fn transaction_id(&self) -> Option<TransactionId> {
278        self.tstate().transaction_id
279    }
280
281    fn insert_packet(
282        &mut self,
283        cfdp_user: &mut impl CfdpUser,
284        packet_info: &PacketInfo,
285    ) -> Result<(), DestError> {
286        if packet_info.target() != PacketTarget::DestEntity {
287            // Unwrap is okay here, a PacketInfo for a file data PDU should always have the
288            // destination as the target.
289            return Err(DestError::CantProcessPacketType(
290                packet_info.pdu_directive().unwrap(),
291            ));
292        }
293        match packet_info.pdu_type {
294            PduType::FileDirective => {
295                if packet_info.pdu_directive.is_none() {
296                    return Err(DestError::DirectiveExpected);
297                }
298                self.handle_file_directive(
299                    cfdp_user,
300                    packet_info.pdu_directive.unwrap(),
301                    packet_info.raw_packet,
302                )
303            }
304            PduType::FileData => self.handle_file_data(cfdp_user, packet_info.raw_packet),
305        }
306    }
307
308    fn handle_file_directive(
309        &mut self,
310        cfdp_user: &mut impl CfdpUser,
311        pdu_directive: FileDirectiveType,
312        raw_packet: &[u8],
313    ) -> Result<(), DestError> {
314        match pdu_directive {
315            FileDirectiveType::EofPdu => self.handle_eof_pdu(cfdp_user, raw_packet)?,
316            FileDirectiveType::FinishedPdu
317            | FileDirectiveType::NakPdu
318            | FileDirectiveType::KeepAlivePdu => {
319                return Err(DestError::CantProcessPacketType(pdu_directive));
320            }
321            FileDirectiveType::AckPdu => {
322                todo!(
323                "check whether ACK pdu handling is applicable by checking the acked directive field"
324                )
325            }
326            FileDirectiveType::MetadataPdu => self.handle_metadata_pdu(raw_packet)?,
327            FileDirectiveType::PromptPdu => self.handle_prompt_pdu(raw_packet)?,
328        };
329        Ok(())
330    }
331
332    fn handle_metadata_pdu(&mut self, raw_packet: &[u8]) -> Result<(), DestError> {
333        if self.state != State::Idle {
334            return Err(DestError::RecvdMetadataButIsBusy);
335        }
336        let metadata_pdu = MetadataPduReader::from_bytes(raw_packet)?;
337        self.tparams.reset();
338        self.tparams.tstate.metadata_params = *metadata_pdu.metadata_params();
339        let remote_cfg = self
340            .remote_cfg_table
341            .get_remote_config(metadata_pdu.source_id().value());
342        if remote_cfg.is_none() {
343            return Err(DestError::NoRemoteCfgFound(metadata_pdu.dest_id()));
344        }
345        self.tparams.remote_cfg = Some(*remote_cfg.unwrap());
346
347        // TODO: Support for metadata only PDUs.
348        let src_name = metadata_pdu.src_file_name();
349        let dest_name = metadata_pdu.dest_file_name();
350        if src_name.is_empty() && dest_name.is_empty() {
351            self.tparams.tstate.metadata_only = true;
352        }
353        if !self.tparams.tstate.metadata_only && src_name.is_empty() {
354            return Err(DestError::EmptySrcFileField);
355        }
356        if !self.tparams.tstate.metadata_only && dest_name.is_empty() {
357            return Err(DestError::EmptyDestFileField);
358        }
359        if !self.tparams.tstate.metadata_only {
360            self.tparams.file_properties.src_file_name[..src_name.len_value()]
361                .copy_from_slice(src_name.value());
362            self.tparams.file_properties.src_file_name_len = src_name.len_value();
363            if dest_name.is_empty() {
364                return Err(DestError::EmptyDestFileField);
365            }
366            self.tparams.file_properties.dest_file_name[..dest_name.len_value()]
367                .copy_from_slice(dest_name.value());
368            self.tparams.file_properties.dest_file_name_len = dest_name.len_value();
369            self.tparams.pdu_conf = *metadata_pdu.pdu_header().common_pdu_conf();
370            self.tparams.msgs_to_user_size = 0;
371        }
372        if !metadata_pdu.options().is_empty() {
373            for option_tlv in metadata_pdu.options_iter().unwrap() {
374                if option_tlv.is_standard_tlv()
375                    && option_tlv.tlv_type().unwrap() == TlvType::MsgToUser
376                {
377                    self.tparams
378                        .msgs_to_user_buf
379                        .copy_from_slice(option_tlv.raw_data().unwrap());
380                    self.tparams.msgs_to_user_size += option_tlv.len_full();
381                }
382            }
383        }
384        self.state = State::Busy;
385        self.step = TransactionStep::TransactionStart;
386        Ok(())
387    }
388
389    fn handle_file_data(
390        &mut self,
391        user: &mut impl CfdpUser,
392        raw_packet: &[u8],
393    ) -> Result<(), DestError> {
394        if self.state == State::Idle
395            || (self.step != TransactionStep::ReceivingFileDataPdus
396                && self.step != TransactionStep::ReceivingFileDataPdusWithCheckLimitHandling)
397        {
398            return Err(DestError::WrongStateForFileDataAndEof);
399        }
400        let fd_pdu = FileDataPdu::from_bytes(raw_packet)?;
401        if self.local_cfg.indication_cfg.file_segment_recv {
402            user.file_segment_recvd_indication(&FileSegmentRecvdParams {
403                id: self.tstate().transaction_id.unwrap(),
404                offset: fd_pdu.offset(),
405                length: fd_pdu.file_data().len(),
406                segment_metadata: fd_pdu.segment_metadata(),
407            });
408        }
409        if let Err(e) = self.vfs.write_data(
410            self.tparams.file_properties.dest_path_buf.to_str().unwrap(),
411            fd_pdu.offset(),
412            fd_pdu.file_data(),
413        ) {
414            self.declare_fault(ConditionCode::FilestoreRejection);
415            return Err(e.into());
416        }
417        self.tstate_mut().progress += fd_pdu.file_data().len() as u64;
418        Ok(())
419    }
420
421    fn handle_eof_pdu(
422        &mut self,
423        cfdp_user: &mut impl CfdpUser,
424        raw_packet: &[u8],
425    ) -> Result<(), DestError> {
426        if self.state == State::Idle || self.step != TransactionStep::ReceivingFileDataPdus {
427            return Err(DestError::WrongStateForFileDataAndEof);
428        }
429        let eof_pdu = EofPdu::from_bytes(raw_packet)?;
430        if self.local_cfg.indication_cfg.eof_recv {
431            // Unwrap is okay here, application logic ensures that transaction ID is valid here.
432            cfdp_user.eof_recvd_indication(self.tparams.tstate.transaction_id.as_ref().unwrap());
433        }
434        let regular_transfer_finish = if eof_pdu.condition_code() == ConditionCode::NoError {
435            self.handle_no_error_eof_pdu(&eof_pdu)?
436        } else {
437            todo!("implement cancel request handling");
438        };
439        if regular_transfer_finish {
440            self.file_transfer_complete_transition();
441        }
442        Ok(())
443    }
444
445    /// Returns whether the transfer can be completed regularly.
446    fn handle_no_error_eof_pdu(&mut self, eof_pdu: &EofPdu) -> Result<bool, DestError> {
447        // CFDP 4.6.1.2.9: Declare file size error if progress exceeds file size
448        if self.tparams.tstate.progress > eof_pdu.file_size()
449            && self.declare_fault(ConditionCode::FileSizeError) != FaultHandlerCode::IgnoreError
450        {
451            return Ok(false);
452        } else if (self.tparams.tstate.progress < eof_pdu.file_size())
453            && self.tparams.transmission_mode() == TransmissionMode::Acknowledged
454        {
455            // CFDP 4.6.4.3.1: The end offset of the last received file segment and the file
456            // size as stated in the EOF PDU is not the same, so we need to add that segment to
457            // the lost segments for the deferred lost segment detection procedure.
458            // TODO: Proper lost segment handling.
459            // self._params.acked_params.lost_seg_tracker.add_lost_segment(
460            //  (self._params.fp.progress, self._params.fp.file_size_eof)
461            // )
462        }
463
464        self.tparams.tstate.checksum = eof_pdu.file_checksum();
465        if self.tparams.transmission_mode() == TransmissionMode::Unacknowledged
466            && !self.checksum_verify(self.tparams.tstate.checksum)
467        {
468            if self.declare_fault(ConditionCode::FileChecksumFailure)
469                != FaultHandlerCode::IgnoreError
470            {
471                return Ok(false);
472            }
473            self.start_check_limit_handling();
474            return Ok(false);
475        }
476        Ok(true)
477    }
478
479    fn file_transfer_complete_transition(&mut self) {
480        if self.tparams.transmission_mode() == TransmissionMode::Unacknowledged {
481            self.step = TransactionStep::TransferCompletion;
482        } else {
483            // TODO: Prepare ACK PDU somehow.
484            self.step = TransactionStep::SendingAckPdu;
485        }
486    }
487
488    fn checksum_verify(&mut self, checksum: u32) -> bool {
489        let mut file_delivery_complete = false;
490        if self.tparams.metadata_params().checksum_type == ChecksumType::NullChecksum
491            || self.tparams.tstate.metadata_only
492        {
493            file_delivery_complete = true;
494            self.tparams.tstate.delivery_code = DeliveryCode::Complete;
495            self.tparams.tstate.condition_code = ConditionCode::NoError;
496        } else {
497            match self.vfs.checksum_verify(
498                self.tparams.file_properties.dest_path_buf.to_str().unwrap(),
499                self.tparams.metadata_params().checksum_type,
500                checksum,
501                &mut self.tparams.cksum_buf,
502            ) {
503                Ok(checksum_success) => {
504                    file_delivery_complete = checksum_success;
505                }
506                Err(e) => match e {
507                    FilestoreError::ChecksumTypeNotImplemented(_) => {
508                        self.declare_fault(ConditionCode::UnsupportedChecksumType);
509                        // For this case, the applicable algorithm shall be the the null checksum,
510                        // which is always succesful.
511                        file_delivery_complete = true;
512                    }
513                    _ => {
514                        self.declare_fault(ConditionCode::FilestoreRejection);
515                        // Treat this equivalent to a failed checksum procedure.
516                    }
517                },
518            };
519        }
520        file_delivery_complete
521    }
522
523    fn start_check_limit_handling(&mut self) {
524        self.step = TransactionStep::ReceivingFileDataPdusWithCheckLimitHandling;
525        self.tparams.tstate.current_check_timer = Some(
526            self.check_timer_creator
527                .get_check_timer_provider(TimerContext::CheckLimit {
528                    local_id: self.local_cfg.id,
529                    remote_id: self.tparams.remote_cfg.unwrap().entity_id,
530                    entity_type: EntityType::Receiving,
531                }),
532        );
533        self.tparams.tstate.current_check_count = 0;
534    }
535
536    fn check_limit_handling(&mut self) {
537        if self.tparams.tstate.current_check_timer.is_none() {
538            return;
539        }
540        let check_timer = self.tparams.tstate.current_check_timer.as_ref().unwrap();
541        if check_timer.has_expired() {
542            if self.checksum_verify(self.tparams.tstate.checksum) {
543                self.file_transfer_complete_transition();
544                return;
545            }
546            if self.tparams.tstate.current_check_count + 1
547                >= self.tparams.remote_cfg.unwrap().check_limit
548            {
549                self.declare_fault(ConditionCode::CheckLimitReached);
550            } else {
551                self.tparams.tstate.current_check_count += 1;
552                self.tparams
553                    .tstate
554                    .current_check_timer
555                    .as_mut()
556                    .unwrap()
557                    .reset();
558            }
559        }
560    }
561
562    pub fn handle_prompt_pdu(&mut self, _raw_packet: &[u8]) -> Result<(), DestError> {
563        todo!();
564    }
565
566    fn fsm_busy(&mut self, cfdp_user: &mut impl CfdpUser) -> Result<u32, DestError> {
567        let mut sent_packets = 0;
568        if self.step == TransactionStep::TransactionStart {
569            self.transaction_start(cfdp_user)?;
570        }
571        if self.step == TransactionStep::ReceivingFileDataPdusWithCheckLimitHandling {
572            self.check_limit_handling();
573        }
574        if self.step == TransactionStep::TransferCompletion {
575            sent_packets += self.transfer_completion(cfdp_user)?;
576        }
577        if self.step == TransactionStep::SendingAckPdu {
578            todo!("no support for acknowledged mode yet");
579        }
580        if self.step == TransactionStep::SendingFinishedPdu {
581            self.reset();
582        }
583        Ok(sent_packets)
584    }
585
586    /// Get the step, which denotes the exact step of a pending CFDP transaction when applicable.
587    pub fn step(&self) -> TransactionStep {
588        self.step
589    }
590
591    /// Get the step, which denotes whether the CFDP handler is active, and which CFDP class
592    /// is used if it is active.
593    pub fn state(&self) -> State {
594        self.state
595    }
596
597    fn transaction_start(&mut self, cfdp_user: &mut impl CfdpUser) -> Result<(), DestError> {
598        let dest_name = from_utf8(
599            &self.tparams.file_properties.dest_file_name
600                [..self.tparams.file_properties.dest_file_name_len],
601        )?;
602        let dest_path = Path::new(dest_name);
603        self.tparams.file_properties.dest_path_buf = dest_path.to_path_buf();
604        let source_id = self.tparams.pdu_conf.source_id();
605        let id = TransactionId::new(source_id, self.tparams.pdu_conf.transaction_seq_num);
606        let src_name = from_utf8(
607            &self.tparams.file_properties.src_file_name
608                [0..self.tparams.file_properties.src_file_name_len],
609        )?;
610        let mut msgs_to_user = SmallVec::<[MsgToUserTlv<'_>; 16]>::new();
611        let mut num_msgs_to_user = 0;
612        if self.tparams.msgs_to_user_size > 0 {
613            let mut index = 0;
614            while index < self.tparams.msgs_to_user_size {
615                // This should never panic as the validity of the options was checked beforehand.
616                let msgs_to_user_tlv =
617                    MsgToUserTlv::from_bytes(&self.tparams.msgs_to_user_buf[index..])
618                        .expect("message to user creation failed unexpectedly");
619                msgs_to_user.push(msgs_to_user_tlv);
620                index += msgs_to_user_tlv.len_full();
621                num_msgs_to_user += 1;
622            }
623        }
624        let metadata_recvd_params = MetadataReceivedParams {
625            id,
626            source_id,
627            file_size: self.tparams.file_size(),
628            src_file_name: src_name,
629            dest_file_name: dest_name,
630            msgs_to_user: &msgs_to_user[..num_msgs_to_user],
631        };
632        self.tparams.tstate.transaction_id = Some(id);
633        cfdp_user.metadata_recvd_indication(&metadata_recvd_params);
634
635        // TODO: This is the only remaining function which uses std.. the easiest way would
636        // probably be to use a static pre-allocated dest path buffer to store any concatenated
637        // paths.
638        if dest_path.exists() && self.vfs.is_dir(dest_path.to_str().unwrap()) {
639            // Create new destination path by concatenating the last part of the source source
640            // name and the destination folder. For example, for a source file of /tmp/hello.txt
641            // and a destination name of /home/test, the resulting file name should be
642            // /home/test/hello.txt
643            let source_path = Path::new(from_utf8(
644                &self.tparams.file_properties.src_file_name
645                    [..self.tparams.file_properties.src_file_name_len],
646            )?);
647            let source_name = source_path.file_name();
648            if source_name.is_none() {
649                return Err(DestError::PathConcat);
650            }
651            let source_name = source_name.unwrap();
652            self.tparams.file_properties.dest_path_buf.push(source_name);
653        }
654        let dest_path_str = self.tparams.file_properties.dest_path_buf.to_str().unwrap();
655        if self.vfs.exists(dest_path_str) {
656            self.vfs.truncate_file(dest_path_str)?;
657        } else {
658            self.vfs.create_file(dest_path_str)?;
659        }
660        self.tparams.tstate.file_status = FileStatus::Retained;
661        self.step = TransactionStep::ReceivingFileDataPdus;
662        Ok(())
663    }
664
665    fn transfer_completion(&mut self, cfdp_user: &mut impl CfdpUser) -> Result<u32, DestError> {
666        let mut sent_packets = 0;
667        self.notice_of_completion(cfdp_user)?;
668        if self.tparams.transmission_mode() == TransmissionMode::Acknowledged
669            || self.tparams.metadata_params().closure_requested
670        {
671            sent_packets += self.send_finished_pdu()?;
672            self.step = TransactionStep::SendingFinishedPdu;
673        } else {
674            self.reset();
675        }
676        Ok(sent_packets)
677    }
678
679    fn notice_of_completion(&mut self, cfdp_user: &mut impl CfdpUser) -> Result<(), DestError> {
680        if self.tstate().completion_disposition == CompletionDisposition::Completed {
681            // TODO: Execute any filestore requests
682        } else if self
683            .tparams
684            .remote_cfg
685            .as_ref()
686            .unwrap()
687            .disposition_on_cancellation
688            && self.tstate().delivery_code == DeliveryCode::Incomplete
689        {
690            self.vfs
691                .remove_file(self.tparams.file_properties.dest_path_buf.to_str().unwrap())?;
692            self.tstate_mut().file_status = FileStatus::DiscardDeliberately;
693        }
694        let tstate = self.tstate();
695        let transaction_finished_params = TransactionFinishedParams {
696            id: tstate.transaction_id.unwrap(),
697            condition_code: tstate.condition_code,
698            delivery_code: tstate.delivery_code,
699            file_status: tstate.file_status,
700        };
701        cfdp_user.transaction_finished_indication(&transaction_finished_params);
702        Ok(())
703    }
704
705    fn declare_fault(&mut self, condition_code: ConditionCode) -> FaultHandlerCode {
706        // Cache those, because they might be reset when abandoning the transaction.
707        let transaction_id = self.tstate().transaction_id.unwrap();
708        let progress = self.tstate().progress;
709        let fh_code = self
710            .local_cfg
711            .default_fault_handler
712            .get_fault_handler(condition_code);
713        match fh_code {
714            FaultHandlerCode::NoticeOfCancellation => {
715                self.notice_of_cancellation(condition_code);
716            }
717            FaultHandlerCode::NoticeOfSuspension => self.notice_of_suspension(),
718            FaultHandlerCode::IgnoreError => (),
719            FaultHandlerCode::AbandonTransaction => self.abandon_transaction(),
720        }
721        self.local_cfg
722            .default_fault_handler
723            .report_fault(transaction_id, condition_code, progress)
724    }
725
726    fn notice_of_cancellation(&mut self, condition_code: ConditionCode) {
727        self.step = TransactionStep::TransferCompletion;
728        self.tstate_mut().condition_code = condition_code;
729        self.tstate_mut().completion_disposition = CompletionDisposition::Cancelled;
730    }
731
732    fn notice_of_suspension(&mut self) {
733        // TODO: Implement suspension handling.
734    }
735    fn abandon_transaction(&mut self) {
736        self.reset();
737    }
738
739    fn reset(&mut self) {
740        self.step = TransactionStep::Idle;
741        self.state = State::Idle;
742        // self.packets_to_send_ctx.packet_available = false;
743        self.tparams.reset();
744    }
745
746    fn send_finished_pdu(&mut self) -> Result<u32, DestError> {
747        let tstate = self.tstate();
748
749        let pdu_header = PduHeader::new_no_file_data(self.tparams.pdu_conf, 0);
750        let finished_pdu = if tstate.condition_code == ConditionCode::NoError
751            || tstate.condition_code == ConditionCode::UnsupportedChecksumType
752        {
753            FinishedPduCreator::new_default(pdu_header, tstate.delivery_code, tstate.file_status)
754        } else {
755            // TODO: Are there cases where this ID is actually the source entity ID?
756            let entity_id = EntityIdTlv::new(self.local_cfg.id);
757            FinishedPduCreator::new_with_error(
758                pdu_header,
759                tstate.condition_code,
760                tstate.delivery_code,
761                tstate.file_status,
762                entity_id,
763            )
764        };
765        finished_pdu.write_to_bytes(&mut self.packet_buf)?;
766        self.packet_sender.send_pdu(
767            finished_pdu.pdu_type(),
768            finished_pdu.file_directive_type(),
769            &self.packet_buf[0..finished_pdu.len_written()],
770        )?;
771        Ok(1)
772    }
773
774    fn tstate(&self) -> &TransferState {
775        &self.tparams.tstate
776    }
777
778    fn tstate_mut(&mut self) -> &mut TransferState {
779        &mut self.tparams.tstate
780    }
781}
782
783#[cfg(test)]
784mod tests {
785    use core::{cell::Cell, sync::atomic::AtomicBool};
786    #[allow(unused_imports)]
787    use std::println;
788    use std::{fs, sync::Mutex};
789
790    use alloc::{collections::VecDeque, string::String, sync::Arc, vec::Vec};
791    use rand::Rng;
792    use spacepackets::{
793        cfdp::{
794            lv::Lv,
795            pdu::{finished::FinishedPduReader, metadata::MetadataPduCreator, WritablePduPacket},
796            ChecksumType, TransmissionMode,
797        },
798        util::{UbfU16, UnsignedByteFieldU16},
799    };
800
801    use crate::cfdp::{
802        filestore::NativeFilestore, user::OwnedMetadataRecvdParams, CheckTimer, CheckTimerCreator,
803        DefaultFaultHandler, IndicationConfig, RemoteEntityConfig, StdRemoteEntityConfigProvider,
804        UserFaultHandler, CRC_32,
805    };
806
807    use super::*;
808
809    const LOCAL_ID: UnsignedByteFieldU16 = UnsignedByteFieldU16::new(1);
810    const REMOTE_ID: UnsignedByteFieldU16 = UnsignedByteFieldU16::new(2);
811
812    pub struct FileSegmentRecvdParamsNoSegMetadata {
813        pub id: TransactionId,
814        pub offset: u64,
815        pub length: usize,
816    }
817
818    struct SentPdu {
819        pdu_type: PduType,
820        file_directive_type: Option<FileDirectiveType>,
821        raw_pdu: Vec<u8>,
822    }
823    type SharedPduPacketQueue = Arc<Mutex<VecDeque<SentPdu>>>;
824    #[derive(Default, Clone)]
825    struct TestCfdpSender {
826        packet_queue: SharedPduPacketQueue,
827    }
828
829    impl CfdpPacketSender for TestCfdpSender {
830        fn send_pdu(
831            &mut self,
832            pdu_type: PduType,
833            file_directive_type: Option<FileDirectiveType>,
834            raw_pdu: &[u8],
835        ) -> Result<(), PduError> {
836            self.packet_queue.lock().unwrap().push_back(SentPdu {
837                pdu_type,
838                file_directive_type,
839                raw_pdu: raw_pdu.to_vec(),
840            });
841            Ok(())
842        }
843    }
844
845    impl TestCfdpSender {
846        pub fn retrieve_next_pdu(&self) -> Option<SentPdu> {
847            self.packet_queue.lock().unwrap().pop_front()
848        }
849        pub fn queue_empty(&self) -> bool {
850            self.packet_queue.lock().unwrap().is_empty()
851        }
852    }
853
854    #[derive(Default)]
855    struct TestCfdpUser {
856        next_expected_seq_num: u64,
857        expected_full_src_name: String,
858        expected_full_dest_name: String,
859        expected_file_size: u64,
860        transaction_indication_call_count: u32,
861        eof_recvd_call_count: u32,
862        finished_indic_queue: VecDeque<TransactionFinishedParams>,
863        metadata_recv_queue: VecDeque<OwnedMetadataRecvdParams>,
864        file_seg_recvd_queue: VecDeque<FileSegmentRecvdParamsNoSegMetadata>,
865    }
866
867    impl TestCfdpUser {
868        fn new(
869            next_expected_seq_num: u64,
870            expected_full_src_name: String,
871            expected_full_dest_name: String,
872            expected_file_size: u64,
873        ) -> Self {
874            Self {
875                next_expected_seq_num,
876                expected_full_src_name,
877                expected_full_dest_name,
878                expected_file_size,
879                transaction_indication_call_count: 0,
880                eof_recvd_call_count: 0,
881                finished_indic_queue: VecDeque::new(),
882                metadata_recv_queue: VecDeque::new(),
883                file_seg_recvd_queue: VecDeque::new(),
884            }
885        }
886
887        fn generic_id_check(&self, id: &crate::cfdp::TransactionId) {
888            assert_eq!(id.source_id, LOCAL_ID.into());
889            assert_eq!(id.seq_num().value(), self.next_expected_seq_num);
890        }
891    }
892
893    impl CfdpUser for TestCfdpUser {
894        fn transaction_indication(&mut self, id: &crate::cfdp::TransactionId) {
895            self.generic_id_check(id);
896            self.transaction_indication_call_count += 1;
897        }
898
899        fn eof_sent_indication(&mut self, id: &crate::cfdp::TransactionId) {
900            self.generic_id_check(id);
901        }
902
903        fn transaction_finished_indication(
904            &mut self,
905            finished_params: &crate::cfdp::user::TransactionFinishedParams,
906        ) {
907            self.generic_id_check(&finished_params.id);
908            self.finished_indic_queue.push_back(*finished_params);
909        }
910
911        fn metadata_recvd_indication(
912            &mut self,
913            md_recvd_params: &crate::cfdp::user::MetadataReceivedParams,
914        ) {
915            self.generic_id_check(&md_recvd_params.id);
916            assert_eq!(
917                String::from(md_recvd_params.src_file_name),
918                self.expected_full_src_name
919            );
920            assert_eq!(
921                String::from(md_recvd_params.dest_file_name),
922                self.expected_full_dest_name
923            );
924            assert_eq!(md_recvd_params.msgs_to_user.len(), 0);
925            assert_eq!(md_recvd_params.source_id, LOCAL_ID.into());
926            assert_eq!(md_recvd_params.file_size, self.expected_file_size);
927            self.metadata_recv_queue.push_back(md_recvd_params.into());
928        }
929
930        fn file_segment_recvd_indication(
931            &mut self,
932            segment_recvd_params: &crate::cfdp::user::FileSegmentRecvdParams,
933        ) {
934            self.generic_id_check(&segment_recvd_params.id);
935            self.file_seg_recvd_queue
936                .push_back(FileSegmentRecvdParamsNoSegMetadata {
937                    id: segment_recvd_params.id,
938                    offset: segment_recvd_params.offset,
939                    length: segment_recvd_params.length,
940                })
941        }
942
943        fn report_indication(&mut self, _id: &crate::cfdp::TransactionId) {}
944
945        fn suspended_indication(
946            &mut self,
947            _id: &crate::cfdp::TransactionId,
948            _condition_code: ConditionCode,
949        ) {
950            panic!("unexpected suspended indication");
951        }
952
953        fn resumed_indication(&mut self, _id: &crate::cfdp::TransactionId, _progresss: u64) {}
954
955        fn fault_indication(
956            &mut self,
957            _id: &crate::cfdp::TransactionId,
958            _condition_code: ConditionCode,
959            _progress: u64,
960        ) {
961            panic!("unexpected fault indication");
962        }
963
964        fn abandoned_indication(
965            &mut self,
966            _id: &crate::cfdp::TransactionId,
967            _condition_code: ConditionCode,
968            _progress: u64,
969        ) {
970            panic!("unexpected abandoned indication");
971        }
972
973        fn eof_recvd_indication(&mut self, id: &crate::cfdp::TransactionId) {
974            self.generic_id_check(id);
975            self.eof_recvd_call_count += 1;
976        }
977    }
978
979    #[derive(Default, Clone)]
980    struct TestFaultHandler {
981        notice_of_suspension_queue: Arc<Mutex<VecDeque<(TransactionId, ConditionCode, u64)>>>,
982        notice_of_cancellation_queue: Arc<Mutex<VecDeque<(TransactionId, ConditionCode, u64)>>>,
983        abandoned_queue: Arc<Mutex<VecDeque<(TransactionId, ConditionCode, u64)>>>,
984        ignored_queue: Arc<Mutex<VecDeque<(TransactionId, ConditionCode, u64)>>>,
985    }
986
987    impl UserFaultHandler for TestFaultHandler {
988        fn notice_of_suspension_cb(
989            &mut self,
990            transaction_id: TransactionId,
991            cond: ConditionCode,
992            progress: u64,
993        ) {
994            self.notice_of_suspension_queue.lock().unwrap().push_back((
995                transaction_id,
996                cond,
997                progress,
998            ))
999        }
1000
1001        fn notice_of_cancellation_cb(
1002            &mut self,
1003            transaction_id: TransactionId,
1004            cond: ConditionCode,
1005            progress: u64,
1006        ) {
1007            self.notice_of_cancellation_queue
1008                .lock()
1009                .unwrap()
1010                .push_back((transaction_id, cond, progress))
1011        }
1012
1013        fn abandoned_cb(
1014            &mut self,
1015            transaction_id: TransactionId,
1016            cond: ConditionCode,
1017            progress: u64,
1018        ) {
1019            self.abandoned_queue
1020                .lock()
1021                .unwrap()
1022                .push_back((transaction_id, cond, progress))
1023        }
1024
1025        fn ignore_cb(&mut self, transaction_id: TransactionId, cond: ConditionCode, progress: u64) {
1026            self.ignored_queue
1027                .lock()
1028                .unwrap()
1029                .push_back((transaction_id, cond, progress))
1030        }
1031    }
1032
1033    impl TestFaultHandler {
1034        fn suspension_queue_empty(&self) -> bool {
1035            self.notice_of_suspension_queue.lock().unwrap().is_empty()
1036        }
1037        fn cancellation_queue_empty(&self) -> bool {
1038            self.notice_of_cancellation_queue.lock().unwrap().is_empty()
1039        }
1040        fn ignored_queue_empty(&self) -> bool {
1041            self.ignored_queue.lock().unwrap().is_empty()
1042        }
1043        fn abandoned_queue_empty(&self) -> bool {
1044            self.abandoned_queue.lock().unwrap().is_empty()
1045        }
1046        fn all_queues_empty(&self) -> bool {
1047            self.suspension_queue_empty()
1048                && self.cancellation_queue_empty()
1049                && self.ignored_queue_empty()
1050                && self.abandoned_queue_empty()
1051        }
1052    }
1053
1054    #[derive(Debug)]
1055    struct TestCheckTimer {
1056        counter: Cell<u32>,
1057        expired: Arc<AtomicBool>,
1058    }
1059
1060    impl CheckTimer for TestCheckTimer {
1061        fn has_expired(&self) -> bool {
1062            self.expired.load(core::sync::atomic::Ordering::Relaxed)
1063        }
1064        fn reset(&mut self) {
1065            self.counter.set(0);
1066        }
1067    }
1068
1069    impl TestCheckTimer {
1070        pub fn new(expired_flag: Arc<AtomicBool>) -> Self {
1071            Self {
1072                counter: Cell::new(0),
1073                expired: expired_flag,
1074            }
1075        }
1076    }
1077
1078    struct TestCheckTimerCreator {
1079        check_limit_expired_flag: Arc<AtomicBool>,
1080    }
1081
1082    impl TestCheckTimerCreator {
1083        pub fn new(expired_flag: Arc<AtomicBool>) -> Self {
1084            Self {
1085                check_limit_expired_flag: expired_flag,
1086            }
1087        }
1088    }
1089
1090    impl CheckTimerCreator for TestCheckTimerCreator {
1091        fn get_check_timer_provider(&self, timer_context: TimerContext) -> Box<dyn CheckTimer> {
1092            match timer_context {
1093                TimerContext::CheckLimit { .. } => {
1094                    Box::new(TestCheckTimer::new(self.check_limit_expired_flag.clone()))
1095                }
1096                _ => {
1097                    panic!("invalid check timer creator, can only be used for check limit handling")
1098                }
1099            }
1100        }
1101    }
1102
1103    struct DestHandlerTester {
1104        check_timer_expired: Arc<AtomicBool>,
1105        pdu_sender: TestCfdpSender,
1106        handler: DestinationHandler,
1107        src_path: PathBuf,
1108        dest_path: PathBuf,
1109        check_dest_file: bool,
1110        check_handler_idle_at_drop: bool,
1111        expected_file_size: u64,
1112        closure_requested: bool,
1113        pdu_header: PduHeader,
1114        expected_full_data: Vec<u8>,
1115        buf: [u8; 512],
1116    }
1117
1118    impl DestHandlerTester {
1119        fn new(fault_handler: TestFaultHandler, closure_requested: bool) -> Self {
1120            let check_timer_expired = Arc::new(AtomicBool::new(false));
1121            let test_sender = TestCfdpSender::default();
1122            let dest_handler = default_dest_handler(
1123                fault_handler,
1124                test_sender.clone(),
1125                check_timer_expired.clone(),
1126            );
1127            let (src_path, dest_path) = init_full_filenames();
1128            assert!(!Path::exists(&dest_path));
1129            let handler = Self {
1130                check_timer_expired,
1131                pdu_sender: test_sender,
1132                handler: dest_handler,
1133                src_path,
1134                closure_requested,
1135                dest_path,
1136                check_dest_file: false,
1137                check_handler_idle_at_drop: false,
1138                expected_file_size: 0,
1139                pdu_header: create_pdu_header(UbfU16::new(0)),
1140                expected_full_data: Vec::new(),
1141                buf: [0; 512],
1142            };
1143            handler.state_check(State::Idle, TransactionStep::Idle);
1144            handler
1145        }
1146
1147        fn dest_path(&self) -> &PathBuf {
1148            &self.dest_path
1149        }
1150
1151        #[allow(dead_code)]
1152        fn indication_cfg_mut(&mut self) -> &mut IndicationConfig {
1153            &mut self.handler.local_cfg.indication_cfg
1154        }
1155
1156        fn indication_cfg(&mut self) -> &IndicationConfig {
1157            &self.handler.local_cfg.indication_cfg
1158        }
1159
1160        fn set_check_timer_expired(&mut self) {
1161            self.check_timer_expired
1162                .store(true, core::sync::atomic::Ordering::Relaxed);
1163        }
1164
1165        fn test_user_from_cached_paths(&self, expected_file_size: u64) -> TestCfdpUser {
1166            TestCfdpUser::new(
1167                0,
1168                self.src_path.to_string_lossy().into(),
1169                self.dest_path.to_string_lossy().into(),
1170                expected_file_size,
1171            )
1172        }
1173
1174        fn generic_transfer_init(
1175            &mut self,
1176            user: &mut TestCfdpUser,
1177            file_size: u64,
1178        ) -> Result<TransactionId, DestError> {
1179            self.expected_file_size = file_size;
1180            let metadata_pdu = create_metadata_pdu(
1181                &self.pdu_header,
1182                self.src_path.as_path(),
1183                self.dest_path.as_path(),
1184                file_size,
1185                self.closure_requested,
1186            );
1187            let packet_info = create_packet_info(&metadata_pdu, &mut self.buf);
1188            self.handler.state_machine(user, Some(&packet_info))?;
1189            assert_eq!(user.metadata_recv_queue.len(), 1);
1190            assert_eq!(
1191                self.handler.transmission_mode().unwrap(),
1192                TransmissionMode::Unacknowledged
1193            );
1194            Ok(self.handler.transaction_id().unwrap())
1195        }
1196
1197        fn generic_file_data_insert(
1198            &mut self,
1199            user: &mut TestCfdpUser,
1200            offset: u64,
1201            file_data_chunk: &[u8],
1202        ) -> Result<u32, DestError> {
1203            let filedata_pdu =
1204                FileDataPdu::new_no_seg_metadata(self.pdu_header, offset, file_data_chunk);
1205            filedata_pdu
1206                .write_to_bytes(&mut self.buf)
1207                .expect("writing file data PDU failed");
1208            let packet_info = PacketInfo::new(&self.buf).expect("creating packet info failed");
1209            let result = self.handler.state_machine(user, Some(&packet_info));
1210            if self.indication_cfg().file_segment_recv {
1211                assert!(!user.file_seg_recvd_queue.is_empty());
1212                assert_eq!(user.file_seg_recvd_queue.back().unwrap().offset, offset);
1213                assert_eq!(
1214                    user.file_seg_recvd_queue.back().unwrap().length,
1215                    file_data_chunk.len()
1216                );
1217            }
1218            result
1219        }
1220
1221        fn generic_eof_no_error(
1222            &mut self,
1223            user: &mut TestCfdpUser,
1224            expected_full_data: Vec<u8>,
1225        ) -> Result<u32, DestError> {
1226            self.expected_full_data = expected_full_data;
1227            let eof_pdu = create_no_error_eof(&self.expected_full_data, &self.pdu_header);
1228            let packet_info = create_packet_info(&eof_pdu, &mut self.buf);
1229            self.check_handler_idle_at_drop = true;
1230            self.check_dest_file = true;
1231            let result = self.handler.state_machine(user, Some(&packet_info));
1232            if self.indication_cfg().eof_recv {
1233                assert_eq!(user.eof_recvd_call_count, 1);
1234            }
1235            result
1236        }
1237
1238        fn state_check(&self, state: State, step: TransactionStep) {
1239            assert_eq!(self.handler.state(), state);
1240            assert_eq!(self.handler.step(), step);
1241        }
1242    }
1243
1244    impl Drop for DestHandlerTester {
1245        fn drop(&mut self) {
1246            if self.check_handler_idle_at_drop {
1247                self.state_check(State::Idle, TransactionStep::Idle);
1248            }
1249            if self.check_dest_file {
1250                assert!(Path::exists(&self.dest_path));
1251                let read_content = fs::read(&self.dest_path).expect("reading back string failed");
1252                assert_eq!(read_content.len() as u64, self.expected_file_size);
1253                assert_eq!(read_content, self.expected_full_data);
1254                assert!(fs::remove_file(self.dest_path.as_path()).is_ok());
1255            }
1256        }
1257    }
1258
1259    fn init_full_filenames() -> (PathBuf, PathBuf) {
1260        (
1261            tempfile::TempPath::from_path("/tmp/test.txt").to_path_buf(),
1262            tempfile::NamedTempFile::new()
1263                .unwrap()
1264                .into_temp_path()
1265                .to_path_buf(),
1266        )
1267    }
1268
1269    fn basic_remote_cfg_table() -> StdRemoteEntityConfigProvider {
1270        let mut table = StdRemoteEntityConfigProvider::default();
1271        let remote_entity_cfg = RemoteEntityConfig::new_with_default_values(
1272            UnsignedByteFieldU16::new(1).into(),
1273            1024,
1274            1024,
1275            true,
1276            true,
1277            TransmissionMode::Unacknowledged,
1278            ChecksumType::Crc32,
1279        );
1280        table.add_config(&remote_entity_cfg);
1281        table
1282    }
1283
1284    fn default_dest_handler(
1285        test_fault_handler: TestFaultHandler,
1286        test_packet_sender: TestCfdpSender,
1287        check_timer_expired: Arc<AtomicBool>,
1288    ) -> DestinationHandler {
1289        let local_entity_cfg = LocalEntityConfig {
1290            id: REMOTE_ID.into(),
1291            indication_cfg: IndicationConfig::default(),
1292            default_fault_handler: DefaultFaultHandler::new(Box::new(test_fault_handler)),
1293        };
1294        DestinationHandler::new(
1295            local_entity_cfg,
1296            2048,
1297            Box::new(test_packet_sender),
1298            Box::<NativeFilestore>::default(),
1299            Box::new(basic_remote_cfg_table()),
1300            Box::new(TestCheckTimerCreator::new(check_timer_expired)),
1301        )
1302    }
1303
1304    fn create_pdu_header(seq_num: impl Into<UnsignedByteField>) -> PduHeader {
1305        let mut pdu_conf =
1306            CommonPduConfig::new_with_byte_fields(LOCAL_ID, REMOTE_ID, seq_num).unwrap();
1307        pdu_conf.trans_mode = TransmissionMode::Unacknowledged;
1308        PduHeader::new_no_file_data(pdu_conf, 0)
1309    }
1310
1311    fn create_metadata_pdu<'filename>(
1312        pdu_header: &PduHeader,
1313        src_name: &'filename Path,
1314        dest_name: &'filename Path,
1315        file_size: u64,
1316        closure_requested: bool,
1317    ) -> MetadataPduCreator<'filename, 'filename, 'static> {
1318        let checksum_type = if file_size == 0 {
1319            ChecksumType::NullChecksum
1320        } else {
1321            ChecksumType::Crc32
1322        };
1323        let metadata_params =
1324            MetadataGenericParams::new(closure_requested, checksum_type, file_size);
1325        MetadataPduCreator::new_no_opts(
1326            *pdu_header,
1327            metadata_params,
1328            Lv::new_from_str(src_name.as_os_str().to_str().unwrap()).unwrap(),
1329            Lv::new_from_str(dest_name.as_os_str().to_str().unwrap()).unwrap(),
1330        )
1331    }
1332
1333    fn create_packet_info<'a>(
1334        pdu: &'a impl WritablePduPacket,
1335        buf: &'a mut [u8],
1336    ) -> PacketInfo<'a> {
1337        let written_len = pdu
1338            .write_to_bytes(buf)
1339            .expect("writing metadata PDU failed");
1340        PacketInfo::new(&buf[..written_len]).expect("generating packet info failed")
1341    }
1342
1343    fn create_no_error_eof(file_data: &[u8], pdu_header: &PduHeader) -> EofPdu {
1344        let crc32 = if !file_data.is_empty() {
1345            let mut digest = CRC_32.digest();
1346            digest.update(file_data);
1347            digest.finalize()
1348        } else {
1349            0
1350        };
1351        EofPdu::new_no_error(*pdu_header, crc32, file_data.len() as u64)
1352    }
1353
1354    #[test]
1355    fn test_basic() {
1356        let fault_handler = TestFaultHandler::default();
1357        let test_sender = TestCfdpSender::default();
1358        let dest_handler = default_dest_handler(fault_handler.clone(), test_sender, Arc::default());
1359        assert!(dest_handler.transmission_mode().is_none());
1360        assert!(fault_handler.all_queues_empty());
1361    }
1362
1363    #[test]
1364    fn test_empty_file_transfer_not_acked_no_closure() {
1365        let fault_handler = TestFaultHandler::default();
1366        let mut test_obj = DestHandlerTester::new(fault_handler.clone(), false);
1367        let mut test_user = test_obj.test_user_from_cached_paths(0);
1368        test_obj
1369            .generic_transfer_init(&mut test_user, 0)
1370            .expect("transfer init failed");
1371        test_obj.state_check(State::Busy, TransactionStep::ReceivingFileDataPdus);
1372        test_obj
1373            .generic_eof_no_error(&mut test_user, Vec::new())
1374            .expect("EOF no error insertion failed");
1375        assert!(fault_handler.all_queues_empty());
1376        assert!(test_obj.pdu_sender.queue_empty());
1377        test_obj.state_check(State::Idle, TransactionStep::Idle);
1378    }
1379
1380    #[test]
1381    fn test_small_file_transfer_not_acked() {
1382        let file_data_str = "Hello World!";
1383        let file_data = file_data_str.as_bytes();
1384        let file_size = file_data.len() as u64;
1385        let fault_handler = TestFaultHandler::default();
1386
1387        let mut test_obj = DestHandlerTester::new(fault_handler.clone(), false);
1388        let mut test_user = test_obj.test_user_from_cached_paths(file_size);
1389        test_obj
1390            .generic_transfer_init(&mut test_user, file_size)
1391            .expect("transfer init failed");
1392        test_obj.state_check(State::Busy, TransactionStep::ReceivingFileDataPdus);
1393        test_obj
1394            .generic_file_data_insert(&mut test_user, 0, file_data)
1395            .expect("file data insertion failed");
1396        test_obj
1397            .generic_eof_no_error(&mut test_user, file_data.to_vec())
1398            .expect("EOF no error insertion failed");
1399        assert!(fault_handler.all_queues_empty());
1400        assert!(test_obj.pdu_sender.queue_empty());
1401        test_obj.state_check(State::Idle, TransactionStep::Idle);
1402    }
1403
1404    #[test]
1405    fn test_segmented_file_transfer_not_acked() {
1406        let mut rng = rand::thread_rng();
1407        let mut random_data = [0u8; 512];
1408        rng.fill(&mut random_data);
1409        let file_size = random_data.len() as u64;
1410        let segment_len = 256;
1411        let fault_handler = TestFaultHandler::default();
1412
1413        let mut test_obj = DestHandlerTester::new(fault_handler.clone(), false);
1414        let mut test_user = test_obj.test_user_from_cached_paths(file_size);
1415        test_obj
1416            .generic_transfer_init(&mut test_user, file_size)
1417            .expect("transfer init failed");
1418        test_obj.state_check(State::Busy, TransactionStep::ReceivingFileDataPdus);
1419        test_obj
1420            .generic_file_data_insert(&mut test_user, 0, &random_data[0..segment_len])
1421            .expect("file data insertion failed");
1422        test_obj
1423            .generic_file_data_insert(
1424                &mut test_user,
1425                segment_len as u64,
1426                &random_data[segment_len..],
1427            )
1428            .expect("file data insertion failed");
1429        test_obj
1430            .generic_eof_no_error(&mut test_user, random_data.to_vec())
1431            .expect("EOF no error insertion failed");
1432        assert!(fault_handler.all_queues_empty());
1433        assert!(test_obj.pdu_sender.queue_empty());
1434        test_obj.state_check(State::Idle, TransactionStep::Idle);
1435    }
1436
1437    #[test]
1438    fn test_check_limit_handling_transfer_success() {
1439        let mut rng = rand::thread_rng();
1440        let mut random_data = [0u8; 512];
1441        rng.fill(&mut random_data);
1442        let file_size = random_data.len() as u64;
1443        let segment_len = 256;
1444        let fault_handler = TestFaultHandler::default();
1445
1446        let mut test_obj = DestHandlerTester::new(fault_handler.clone(), false);
1447        let mut test_user = test_obj.test_user_from_cached_paths(file_size);
1448        let transaction_id = test_obj
1449            .generic_transfer_init(&mut test_user, file_size)
1450            .expect("transfer init failed");
1451
1452        test_obj.state_check(State::Busy, TransactionStep::ReceivingFileDataPdus);
1453        test_obj
1454            .generic_file_data_insert(&mut test_user, 0, &random_data[0..segment_len])
1455            .expect("file data insertion 0 failed");
1456        test_obj
1457            .generic_eof_no_error(&mut test_user, random_data.to_vec())
1458            .expect("EOF no error insertion failed");
1459        test_obj.state_check(
1460            State::Busy,
1461            TransactionStep::ReceivingFileDataPdusWithCheckLimitHandling,
1462        );
1463        test_obj
1464            .generic_file_data_insert(
1465                &mut test_user,
1466                segment_len as u64,
1467                &random_data[segment_len..],
1468            )
1469            .expect("file data insertion 1 failed");
1470        test_obj.set_check_timer_expired();
1471        test_obj
1472            .handler
1473            .state_machine(&mut test_user, None)
1474            .expect("fsm failure");
1475
1476        let ignored_queue = fault_handler.ignored_queue.lock().unwrap();
1477        assert_eq!(ignored_queue.len(), 1);
1478        let cancelled = *ignored_queue.front().unwrap();
1479        assert_eq!(cancelled.0, transaction_id);
1480        assert_eq!(cancelled.1, ConditionCode::FileChecksumFailure);
1481        assert_eq!(cancelled.2, segment_len as u64);
1482        assert!(test_obj.pdu_sender.queue_empty());
1483        test_obj.state_check(State::Idle, TransactionStep::Idle);
1484    }
1485
1486    #[test]
1487    fn test_check_limit_handling_limit_reached() {
1488        let mut rng = rand::thread_rng();
1489        let mut random_data = [0u8; 512];
1490        rng.fill(&mut random_data);
1491        let file_size = random_data.len() as u64;
1492        let segment_len = 256;
1493
1494        let fault_handler = TestFaultHandler::default();
1495        let mut test_obj = DestHandlerTester::new(fault_handler.clone(), false);
1496        let mut test_user = test_obj.test_user_from_cached_paths(file_size);
1497        let transaction_id = test_obj
1498            .generic_transfer_init(&mut test_user, file_size)
1499            .expect("transfer init failed");
1500
1501        test_obj.state_check(State::Busy, TransactionStep::ReceivingFileDataPdus);
1502        test_obj
1503            .generic_file_data_insert(&mut test_user, 0, &random_data[0..segment_len])
1504            .expect("file data insertion 0 failed");
1505        test_obj
1506            .generic_eof_no_error(&mut test_user, random_data.to_vec())
1507            .expect("EOF no error insertion failed");
1508        test_obj.state_check(
1509            State::Busy,
1510            TransactionStep::ReceivingFileDataPdusWithCheckLimitHandling,
1511        );
1512        test_obj.set_check_timer_expired();
1513        test_obj
1514            .handler
1515            .state_machine(&mut test_user, None)
1516            .expect("fsm error");
1517        test_obj.state_check(
1518            State::Busy,
1519            TransactionStep::ReceivingFileDataPdusWithCheckLimitHandling,
1520        );
1521        test_obj.set_check_timer_expired();
1522        test_obj
1523            .handler
1524            .state_machine(&mut test_user, None)
1525            .expect("fsm error");
1526        test_obj.state_check(State::Idle, TransactionStep::Idle);
1527
1528        assert!(fault_handler
1529            .notice_of_suspension_queue
1530            .lock()
1531            .unwrap()
1532            .is_empty());
1533
1534        let ignored_queue = fault_handler.ignored_queue.lock().unwrap();
1535        assert_eq!(ignored_queue.len(), 1);
1536        let cancelled = *ignored_queue.front().unwrap();
1537        assert_eq!(cancelled.0, transaction_id);
1538        assert_eq!(cancelled.1, ConditionCode::FileChecksumFailure);
1539        assert_eq!(cancelled.2, segment_len as u64);
1540
1541        let cancelled_queue = fault_handler.notice_of_cancellation_queue.lock().unwrap();
1542        assert_eq!(cancelled_queue.len(), 1);
1543        let cancelled = *cancelled_queue.front().unwrap();
1544        assert_eq!(cancelled.0, transaction_id);
1545        assert_eq!(cancelled.1, ConditionCode::CheckLimitReached);
1546        assert_eq!(cancelled.2, segment_len as u64);
1547
1548        drop(cancelled_queue);
1549
1550        assert!(test_obj.pdu_sender.queue_empty());
1551
1552        // Check that the broken file exists.
1553        test_obj.check_dest_file = false;
1554        assert!(Path::exists(test_obj.dest_path()));
1555        let read_content = fs::read(test_obj.dest_path()).expect("reading back string failed");
1556        assert_eq!(read_content.len(), segment_len);
1557        assert_eq!(read_content, &random_data[0..segment_len]);
1558        assert!(fs::remove_file(test_obj.dest_path().as_path()).is_ok());
1559    }
1560
1561    fn check_finished_pdu_success(sent_pdu: &SentPdu) {
1562        assert_eq!(sent_pdu.pdu_type, PduType::FileDirective);
1563        assert_eq!(
1564            sent_pdu.file_directive_type,
1565            Some(FileDirectiveType::FinishedPdu)
1566        );
1567        let finished_pdu = FinishedPduReader::from_bytes(&sent_pdu.raw_pdu).unwrap();
1568        assert_eq!(finished_pdu.file_status(), FileStatus::Retained);
1569        assert_eq!(finished_pdu.condition_code(), ConditionCode::NoError);
1570        assert_eq!(finished_pdu.delivery_code(), DeliveryCode::Complete);
1571        assert!(finished_pdu.fault_location().is_none());
1572        assert_eq!(finished_pdu.fs_responses_raw(), &[]);
1573    }
1574
1575    #[test]
1576    fn test_file_transfer_with_closure() {
1577        let fault_handler = TestFaultHandler::default();
1578        let mut test_obj = DestHandlerTester::new(fault_handler.clone(), true);
1579        let mut test_user = test_obj.test_user_from_cached_paths(0);
1580        test_obj
1581            .generic_transfer_init(&mut test_user, 0)
1582            .expect("transfer init failed");
1583        test_obj.state_check(State::Busy, TransactionStep::ReceivingFileDataPdus);
1584        let sent_packets = test_obj
1585            .generic_eof_no_error(&mut test_user, Vec::new())
1586            .expect("EOF no error insertion failed");
1587        assert_eq!(sent_packets, 1);
1588        assert!(fault_handler.all_queues_empty());
1589        // The Finished PDU was sent, so the state machine is done.
1590        test_obj.state_check(State::Idle, TransactionStep::Idle);
1591        assert!(!test_obj.pdu_sender.queue_empty());
1592        let sent_pdu = test_obj.pdu_sender.retrieve_next_pdu().unwrap();
1593        check_finished_pdu_success(&sent_pdu);
1594    }
1595
1596    #[test]
1597    fn test_file_transfer_with_closure_check_limit_reached() {
1598        // TODO: Implement test.
1599    }
1600}