cfdp_simplified/daemon/
mod.rs

1use std::{
2    collections::{hash_map::Entry, HashMap},
3    fmt::Display,
4    io::Read,
5    sync::{
6        atomic::{AtomicBool, Ordering},
7        Arc,
8    },
9    time::Duration,
10};
11
12use camino::Utf8PathBuf;
13use chrono::{DateTime, Utc};
14use error::DaemonResult;
15use log::{error, info, warn};
16use num_traits::FromPrimitive;
17use tokio::{
18    select,
19    sync::{
20        mpsc::{channel, Receiver, Sender},
21        oneshot,
22    },
23    task::JoinHandle,
24    time::MissedTickBehavior,
25};
26
27use crate::{
28    filestore::{ChecksumType, FileStore},
29    pdu::{
30        header::{
31            CRCFlag, Condition, DeliveryCode, Direction, FileSizeFlag, FileStatusCode, PDUHeader,
32            SegmentedData, TransactionStatus, TransmissionMode,
33        },
34        ops::{EntityID, TransactionSeqNum},
35        PDUEncode, PDUError, PDUResult, PDU,
36    },
37    transaction::FaultHandlerAction,
38    transaction::{Metadata, TransactionConfig, TransactionID, TransactionState},
39};
40
41pub mod error;
42pub mod segments;
43pub mod timer;
44pub mod transport;
45pub mod user;
46
47pub(crate) use timer::*;
48pub use user::*;
49
50use self::error::DaemonError;
51use self::transport::PDUTransport;
52use crate::transaction::{error::TransactionError, recv::RecvTransaction, send::SendTransaction};
53
54#[derive(Debug, Clone, PartialEq, Eq)]
55/// Necessary Configuration for a Put.Request operation
56pub struct PutRequest {
57    /// Bytes of the source filename, can be null if length is 0.
58    pub source_filename: Utf8PathBuf,
59    /// Bytes of the destination filename, can be null if length is 0.
60    pub destination_filename: Utf8PathBuf,
61    /// Destination ID of the Request
62    pub destination_entity_id: EntityID,
63    /// Whether to send in acknowledged or unacknowledged mode
64    pub transmission_mode: TransmissionMode,
65}
66
67#[derive(Debug)]
68/// Possible User Primitives sent from a end user application via the user primitive channel
69pub enum UserPrimitive {
70    /// Initiate a Put transaction with the specified [PutRequest] configuration.
71    /// The channel is for the requesting entity to receive the unique transaction ID
72    /// from the Daemon.
73    Put(PutRequest, oneshot::Sender<TransactionID>),
74    /// Cancel the give transaction.
75    Cancel(TransactionID),
76    /// Report progress of the given transaction.
77    Report(TransactionID, oneshot::Sender<Report>),
78}
79
80/// Simple Status Report
81#[derive(Debug, Clone)]
82#[cfg_attr(test, derive(PartialEq))]
83/// Transaction status report
84pub struct Report {
85    /// The unique ID of the transaction.
86    pub id: TransactionID,
87    /// Current state of the transaction
88    pub state: TransactionState,
89    /// Current status of the transaction.
90    pub status: TransactionStatus,
91    /// Last known condition of the transaction.
92    pub condition: Condition,
93    /// File size of the transaction
94    pub file_size: u64,
95    /// Bytes received by the transaction
96    pub file_bytes_received: Option<u64>,
97    /// Bytes sent by the transaction
98    pub file_bytes_sent: Option<u64>,
99    /// Whether an empty NAK was received.
100    pub empty_nak_received: bool,
101    /// Transaction Direction
102    pub direction: Option<Direction>,
103    /// Name of file
104    pub file_name: String,
105    /// Date time file was initially submitted/received
106    pub submit_date: DateTime<Utc>,
107}
108impl Report {
109    pub fn encode(self) -> Vec<u8> {
110        let mut buff = self.id.0.encode();
111        buff.extend(self.id.1.encode());
112        buff.push(self.state as u8);
113        buff.push(self.status as u8);
114        buff.push(self.condition as u8);
115        buff
116    }
117
118    pub fn decode<T: Read>(buffer: &mut T) -> PDUResult<Self> {
119        let id = {
120            let entity_id = EntityID::decode(buffer)?;
121            let sequence_num = TransactionSeqNum::decode(buffer)?;
122
123            TransactionID(entity_id, sequence_num)
124        };
125
126        let mut u8_buff = [0_u8; 1];
127
128        let state = {
129            buffer.read_exact(&mut u8_buff)?;
130            let possible = u8_buff[0];
131            TransactionState::from_u8(possible).ok_or(PDUError::InvalidState(possible))?
132        };
133
134        let status = {
135            buffer.read_exact(&mut u8_buff)?;
136            let possible = u8_buff[0];
137            TransactionStatus::from_u8(possible)
138                .ok_or(PDUError::InvalidTransactionStatus(possible))?
139        };
140
141        let condition = {
142            buffer.read_exact(&mut u8_buff)?;
143            let possible = u8_buff[0];
144            Condition::from_u8(possible).ok_or(PDUError::InvalidCondition(possible))?
145        };
146
147        Ok(Self {
148            id,
149            state,
150            status,
151            condition,
152            empty_nak_received: false,
153            file_size: 0,
154            file_bytes_received: None,
155            file_bytes_sent: None,
156            direction: None,
157            file_name: String::new(),
158            submit_date: Utc::now(),
159        })
160    }
161}
162
163#[derive(Debug, Clone)]
164/// Indication sent from a Transaction when [Metadata](crate::transaction::Metadata) has been received
165pub struct MetadataRecvIndication {
166    pub id: TransactionID,
167    /// source file name relative to the filestore root.
168    pub source_filename: Utf8PathBuf,
169    /// destination file name relative to the filestore root
170    pub destination_filename: Utf8PathBuf,
171    /// Size of the file in bytes
172    pub file_size: u64,
173    /// Which transmission mode will used in the transaction.
174    pub transmission_mode: TransmissionMode,
175}
176
177#[derive(Debug, Clone)]
178/// Indication of the amount of data received from a [FileDataPDU](crate::pdu::FileDataPDU)
179pub struct FileSegmentIndication {
180    /// Unique transaction ID for the file data.
181    pub id: TransactionID,
182    /// Byte index offset in the file.
183    pub offset: u64,
184    /// Length of the file data received.
185    pub length: u64,
186}
187
188#[derive(Debug, Clone)]
189/// Indication sent when a transaction has finished.
190pub struct FinishedIndication {
191    /// Unique transaction ID.
192    pub id: TransactionID,
193    /// Final report of the transaction before shutting down.
194    pub report: Report,
195    /// The status of the file delivered if applicable.
196    pub file_status: FileStatusCode,
197    /// The final delivery result.
198    pub delivery_code: DeliveryCode,
199}
200
201#[derive(Debug, Clone)]
202/// Indications how the Daemon and Transactions relay information back to the User application.
203/// Indications are issued at necessary points in each Transaction's lifetime.
204pub enum Indication {
205    /// A new transaction has been initiated as a result of a [PutRequest]
206    Transaction(TransactionID),
207    /// End of File has been Sent
208    EoFSent(TransactionID),
209    /// End of File PDU has been received
210    EoFRecv(TransactionID),
211    /// A running transaction has reached the Finished state.
212    /// Receipt of this indications starts and post transaction actions.
213    Finished(FinishedIndication),
214    /// Metadata has been received for a Receive Transaction
215    MetadataRecv(MetadataRecvIndication),
216    /// A new file segment has been received
217    FileSegmentRecv(FileSegmentIndication),
218    /// Last known status for the given transaction
219    Report(Report),
220}
221
222/// The way the Nak procedure is implemented is the following:
223///  - In Immediate mode, upon reception of each file data PDU, if the received segment is at the end of the file and
224///    there is a gap between the previously received segment and the new segment, a nak is sent with the new gap but
225///    only after delay has elapsed (if any delay was set).
226///    If the NAK timer has timed out, the nak sent covers the gaps from the entire file, not only the last gap.
227///    After the EOF is received, the procedure is the same as in deferred mode.
228///  - In Deferred mode, a nak covering the gaps from the entire file is sent after the EOF has been received
229///    and each time the nak timer times out.
230///
231/// The delay parameter is useful when PDUs come out of order to avoid sending NAKs prematurely. One scenario when this may
232/// happen is when utilizing multiple links of different latencies. The delay should be set to cover the difference in latency
233/// between the slowest link and the fastest link.
234/// If the delay is greater than 0, the NAKs will not be sent immediately but only if the gap persists after the delay
235/// has passed.
236///
237/// NAK timer (note that this is different and probably much larger than the delay parameter mentioned above):
238/// - In Immediate mode the NAK timer is started at the beginning of the transaction.
239/// - In Deferred mode  the NAK timer is started after EOF is received.
240/// - If the NAK timer times out and it is determined that new data has been received since the last nak sending,
241///   the timer counter is reset to 0.
242/// - If the NAK timer expired more than the predefined limit (without any new data being received), the NakLimitReached
243///   fault will be raised.
244#[derive(Debug, Copy, Clone, PartialEq, Eq)]
245pub enum NakProcedure {
246    Immediate(Duration /* delay*/),
247    Deferred(Duration /* delay */),
248}
249
250#[derive(Clone)]
251/// Configuration parameters for transactions which may change based on the receiving entity.
252pub struct EntityConfig {
253    /// Mapping to decide how each fault type should be handled
254    pub fault_handler_override: HashMap<Condition, FaultHandlerAction>,
255    /// Maximum file size fragment this entity can receive
256    pub file_size_segment: u16,
257    // The number of timeouts before a fault is issued on a transaction
258    pub default_transaction_max_count: u32,
259    // number of seconds for inactivity timers to wait
260    pub inactivity_timeout: i64,
261    // number of seconds for EOF timers to wait
262    pub eof_timeout: i64,
263    // number of seconds for NAK timers to wait
264    pub nak_timeout: i64,
265    /// Flag to determine if the CRC protocol should be used
266    pub crc_flag: CRCFlag,
267    /// The default ChecksumType to use for file transfers
268    pub checksum_type: ChecksumType,
269    // for recv transactions - when to send the NAKs (immediately when detected or after EOF)
270    pub nak_procedure: NakProcedure,
271    // Local entity ID of CFDP instance
272    pub local_entity_id: u16,
273    // Remote entity ID of CFDP instance
274    pub remote_entity_id: u16,
275    // Local CFDP server address
276    pub local_server_addr: &'static str,
277    // Remote CFDP server address
278    pub remote_server_addr: &'static str,
279    // Interval in seconds to send progress report
280    pub progress_report_interval_secs: i64,
281}
282
283/// Lightweight commands the Daemon send to each Transaction
284#[derive(Debug)]
285pub enum Command {
286    Pdu(PDU),
287    Abandon,
288    Report(oneshot::Sender<Report>),
289}
290impl Display for Command {
291    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
292        write!(f, "{:?}", &self)
293    }
294}
295
296fn construct_metadata<T: FileStore + Send + 'static>(
297    filestore: &Arc<T>,
298    req: PutRequest,
299    config: EntityConfig,
300) -> DaemonResult<Metadata> {
301    let file_size = match req.source_filename.file_name().is_none() {
302        true => 0_u64,
303        false => filestore
304            .get_size(&req.source_filename)
305            .map_err(DaemonError::SpawnSend)?,
306    };
307    Ok(Metadata {
308        source_filename: req.source_filename,
309        destination_filename: req.destination_filename,
310        file_size,
311        checksum_type: config.checksum_type,
312    })
313}
314
315type RecvSpawnerTuple = (
316    TransactionID,
317    Sender<Command>,
318    JoinHandle<Result<TransactionID, TransactionError>>,
319);
320
321type SendSpawnerTuple = (
322    Sender<Command>,
323    JoinHandle<Result<TransactionID, TransactionError>>,
324);
325
326/// The CFDP Daemon is responsible for connecting [PDUTransport](crate::transport::PDUTransport) implementation
327/// with each individual [SendTransaction](crate::transaction::SendTransaction) and [RecvTransaction](crate::transaction::RecvTransaction).
328/// When a PDUTransport implementation
329/// sends a PDU through a channel, the Daemon distributes the PDU to the necessary Transaction.
330/// PDUs are sent from each Transaction directly to their respective PDUTransport implementations.
331pub struct Daemon<T: FileStore + Send + 'static> {
332    // The collection of all current transactions
333    transaction_handles: Vec<JoinHandle<Result<TransactionID, TransactionError>>>,
334    // Mapping of unique transaction ids to channels used to talk to each transaction
335    transaction_channels: HashMap<TransactionID, Sender<Command>>,
336    // the vector of transportation tx channel connections
337    transport_tx_map: HashMap<EntityID, Sender<(EntityID, PDU)>>,
338    // the transport PDU rx channel connection
339    transport_rx: Receiver<PDU>,
340    // the underlying filestore used by this Daemon
341    filestore: Arc<T>,
342    // message sender channel used to send Indications from Transactions to the User
343    indication_tx: Sender<Indication>,
344    // a mapping of individual fault handler actions per remote entity
345    entity_configs: HashMap<EntityID, EntityConfig>,
346    // the default fault handling configuration
347    default_config: EntityConfig,
348    // the entity ID of this daemon
349    entity_id: EntityID,
350    // current running count of the sequence numbers of transaction initiated by this entity
351    sequence_num: TransactionSeqNum,
352    // termination signal sent to children threads
353    terminate: Arc<AtomicBool>,
354    // channel to receive user primitives from the implemented User
355    primitive_rx: Receiver<UserPrimitive>,
356}
357impl<T: FileStore + Send + Sync + 'static> Daemon<T> {
358    #[allow(clippy::too_many_arguments)]
359    pub fn new(
360        entity_id: EntityID,
361        sequence_num: TransactionSeqNum,
362        transport_map: HashMap<Vec<EntityID>, Box<dyn PDUTransport + Send>>,
363        filestore: Arc<T>,
364        entity_configs: HashMap<EntityID, EntityConfig>,
365        default_config: EntityConfig,
366        primitive_rx: Receiver<UserPrimitive>,
367        indication_tx: Sender<Indication>,
368    ) -> Self {
369        let mut transport_tx_map: HashMap<EntityID, Sender<(EntityID, PDU)>> = HashMap::new();
370        let (pdu_send, pdu_receive) = channel(100);
371        let terminate = Arc::new(AtomicBool::new(false));
372        for (vec, mut transport) in transport_map.into_iter() {
373            let (remote_send, remote_receive) = channel(1);
374
375            vec.iter().for_each(|id| {
376                transport_tx_map.insert(*id, remote_send.clone());
377            });
378
379            let signal = terminate.clone();
380            let sender = pdu_send.clone();
381            tokio::task::spawn(async move {
382                transport.pdu_handler(signal, sender, remote_receive).await
383            });
384        }
385        Self {
386            transaction_handles: vec![],
387            transaction_channels: HashMap::new(),
388            transport_tx_map,
389            transport_rx: pdu_receive,
390            filestore,
391            indication_tx,
392            entity_configs,
393            default_config,
394            entity_id,
395            sequence_num,
396            terminate,
397            primitive_rx,
398        }
399    }
400
401    fn spawn_receive_transaction(
402        header: &PDUHeader,
403        transport_tx: Sender<(EntityID, PDU)>,
404        entity_config: EntityConfig,
405        filestore: Arc<T>,
406        indication_tx: Sender<Indication>,
407    ) -> RecvSpawnerTuple {
408        let (transaction_tx, mut transaction_rx) = channel(100);
409
410        let config = TransactionConfig {
411            source_entity_id: header.source_entity_id,
412            destination_entity_id: header.destination_entity_id,
413            transmission_mode: header.transmission_mode,
414            sequence_number: header.transaction_sequence_number,
415            file_size_flag: header.large_file_flag,
416            fault_handler_override: entity_config.fault_handler_override.clone(),
417            file_size_segment: entity_config.file_size_segment,
418            crc_flag: header.crc_flag,
419            segment_metadata_flag: header.segment_metadata_flag,
420            max_count: entity_config.default_transaction_max_count,
421            inactivity_timeout: entity_config.inactivity_timeout,
422            eof_timeout: entity_config.eof_timeout,
423            nak_timeout: entity_config.nak_timeout,
424            progress_report_interval_secs: entity_config.progress_report_interval_secs,
425        };
426        let mut transaction = RecvTransaction::new(
427            config,
428            entity_config.nak_procedure,
429            filestore,
430            indication_tx,
431        );
432        let id = transaction.id();
433
434        // tokio tasks can have names but that seems an unstable feature
435        let handle = tokio::task::spawn(async move {
436            transaction.send_report(None)?;
437
438            while transaction.get_state() != TransactionState::Terminated {
439                let timeout = transaction.until_timeout();
440                select! {
441                    Ok(permit) = transport_tx.reserve(), if transaction.has_pdu_to_send() => {
442                        transaction.send_pdu(permit)?
443                    },
444                    Some(command) = transaction_rx.recv() => {
445                        match command {
446                            Command::Pdu(pdu) => {
447                                match transaction.process_pdu(pdu) {
448                                    Ok(()) => {}
449                                    Err(err @ TransactionError::UnexpectedPDU(..)) => {
450                                        info!("Transaction {} Received Unexpected PDU: {err}", transaction.id());
451                                        // log some info on the unexpected PDU?
452                                    }
453                                    Err(err) => return Err(err)
454                                }
455                            }
456                            Command::Abandon => transaction.shutdown(),
457                            Command::Report(sender) => {
458                                transaction.send_report(Some(sender))?
459                            }
460                        }
461                    }
462                    _ = tokio::time::sleep(timeout) => {
463                        transaction.handle_timeout()?;
464                    }
465                    else => {
466                        if transport_tx.is_closed(){
467                            log::error!("Channel to transport unexpectedly severed for transaction {}.", transaction.id());
468                        }
469
470                        break;
471                    }
472                };
473            }
474
475            transaction.send_report(None)?;
476            Ok(transaction.id())
477        });
478
479        (id, transaction_tx, handle)
480    }
481
482    fn spawn_send_transaction(
483        request: PutRequest,
484        transaction_id: TransactionID,
485        transport_tx: Sender<(EntityID, PDU)>,
486        entity_config: EntityConfig,
487        filestore: Arc<T>,
488        indication_tx: Sender<Indication>,
489    ) -> DaemonResult<SendSpawnerTuple> {
490        let (transaction_tx, mut transaction_rx) = channel(10);
491
492        let destination_entity_id = request.destination_entity_id;
493        let transmission_mode = request.transmission_mode;
494        let mut config = TransactionConfig {
495            source_entity_id: transaction_id.0,
496            destination_entity_id,
497            transmission_mode,
498            sequence_number: transaction_id.1,
499            file_size_flag: FileSizeFlag::Small,
500            fault_handler_override: entity_config.fault_handler_override.clone(),
501            file_size_segment: entity_config.file_size_segment,
502            crc_flag: entity_config.crc_flag,
503            segment_metadata_flag: SegmentedData::NotPresent,
504            max_count: entity_config.default_transaction_max_count,
505            inactivity_timeout: entity_config.inactivity_timeout,
506            eof_timeout: entity_config.eof_timeout,
507            nak_timeout: entity_config.nak_timeout,
508            progress_report_interval_secs: entity_config.progress_report_interval_secs,
509        };
510        let metadata = construct_metadata(&filestore, request, entity_config)?;
511
512        let handle = tokio::task::spawn(async move {
513            config.file_size_flag = match metadata.file_size <= u32::MAX.into() {
514                true => FileSizeFlag::Small,
515                false => FileSizeFlag::Large,
516            };
517
518            let mut transaction = SendTransaction::new(config, metadata, filestore, indication_tx)?;
519            transaction.send_report(None)?;
520
521            while transaction.get_state() != TransactionState::Terminated {
522                let timeout = transaction.until_timeout();
523                select! {
524                    Ok(permit) = transport_tx.reserve(), if transaction.has_pdu_to_send()  => {
525                        transaction.send_pdu(permit)?;
526                    },
527
528                    Some(command) = transaction_rx.recv() => {
529                        match command {
530                            Command::Pdu(pdu) => {
531                                match transaction.process_pdu(pdu) {
532                                    Ok(()) => {}
533                                    Err(
534                                        err @ TransactionError::UnexpectedPDU(..),
535                                    ) => {
536                                        info!("Received Unexpected PDU: {err}");
537                                        // log some info on the unexpected PDU?
538                                    }
539                                    Err(err) => {
540                                        return Err(err);
541                                    }
542                                }
543                            }
544                            Command::Abandon => transaction.shutdown(),
545                            Command::Report(sender) => {
546                                transaction.send_report(Some(sender))?
547                            },
548                        }
549                    },
550                    _ = tokio::time::sleep(timeout) => {
551                        transaction.handle_timeout()?;
552                    },
553                    else => {
554                        if transport_tx.is_closed(){
555                            log::error!("Connection to transport unexpectedly severed for transaction {}.", transaction.id());
556                        }
557                        break;
558                    }
559                };
560            }
561            transaction.send_report(None)?;
562            Ok(transaction_id)
563        });
564        Ok((transaction_tx, handle))
565    }
566
567    async fn process_primitive(&mut self, primitive: UserPrimitive) -> DaemonResult<()> {
568        match primitive {
569            UserPrimitive::Put(request, put_sender) => {
570                let sequence_number = self.sequence_num.get_and_increment();
571
572                let entity_config = self
573                    .entity_configs
574                    .get(&request.destination_entity_id)
575                    .unwrap_or(&self.default_config)
576                    .clone();
577
578                if let Some(transport_tx) = self
579                    .transport_tx_map
580                    .get(&request.destination_entity_id)
581                    .cloned()
582                {
583                    let id = TransactionID(self.entity_id, sequence_number);
584                    let (sender, handle) = Self::spawn_send_transaction(
585                        request,
586                        id,
587                        transport_tx,
588                        entity_config,
589                        self.filestore.clone(),
590                        self.indication_tx.clone(),
591                    )?;
592                    self.transaction_handles.push(handle);
593                    self.transaction_channels.insert(id, sender);
594
595                    // ignore the possible error if the user disconnected;
596                    let _ = put_sender.send(id);
597                } else {
598                    warn!(
599                        "No Transport available for EntityID: {}. Skipping transaction creation.",
600                        request.destination_entity_id
601                    )
602                }
603            }
604            UserPrimitive::Cancel(id) => {
605                if let Some(channel) = self.transaction_channels.get(&id) {
606                    channel
607                        .send(Command::Abandon)
608                        .await
609                        .map_err(|err| DaemonError::from((id, err)))?;
610                }
611            }
612            UserPrimitive::Report(id, report_sender) => {
613                if let Some(channel) = self.transaction_channels.get(&id) {
614                    channel
615                        .send(Command::Report(report_sender))
616                        .await
617                        .map_err(|err| DaemonError::from((id, err)))?;
618                }
619            }
620        };
621        Ok(())
622    }
623
624    async fn forward_pdu(&mut self, pdu: PDU) -> DaemonResult<()> {
625        // find the entity this entity will be sending too.
626        // If this PDU is to the sender, we send to the destination
627        // if this PDU is to the receiver, we send to the source
628        let transport_entity = match &pdu.header.direction {
629            Direction::ToSender => pdu.header.destination_entity_id,
630            Direction::ToReceiver => pdu.header.source_entity_id,
631        };
632
633        let key = TransactionID(
634            pdu.header.source_entity_id,
635            pdu.header.transaction_sequence_number,
636        );
637        // hand pdu off to transaction
638        let channel = match self.transaction_channels.entry(key) {
639            Entry::Occupied(entry) => entry.into_mut(),
640            Entry::Vacant(entry) => {
641                if let Some(transport) = self.transport_tx_map.get(&transport_entity).cloned() {
642                    // if this key is not in the channel list
643                    // create a new transaction
644                    let entity_config = self
645                        .entity_configs
646                        .get(&key.0)
647                        .unwrap_or(&self.default_config)
648                        .clone();
649                    match &pdu.header.direction {
650                        Direction::ToReceiver => {
651                            let (_id, channel, handle) = Self::spawn_receive_transaction(
652                                &pdu.header,
653                                transport,
654                                entity_config,
655                                self.filestore.clone(),
656                                self.indication_tx.clone(),
657                            );
658
659                            self.transaction_handles.push(handle);
660                            entry.insert(channel)
661                        }
662                        // This is a very unlikely scenario.
663                        // We have received a PDU sent back to the Sender but we do not have a transaction running.
664                        // Likely causes are a system reboot in the middle of a transaction.
665                        // Unfortunately there is not enough information in a PDU
666                        // to completely re-create the transaction.
667                        Direction::ToSender => {
668                            error!("Received PDU sent back to sender but no transaction running. Unable to resume transaction.");
669                            return Err(DaemonError::UnableToResume(TransactionID(
670                                pdu.header.source_entity_id,
671                                pdu.header.transaction_sequence_number,
672                            )));
673                        }
674                    }
675                } else {
676                    warn!(
677                        "No Transport available for EntityID: {}. Skipping Transaction creation.",
678                        transport_entity
679                    );
680                    // skip to the next loop iteration
681                    return Ok(());
682                }
683            }
684        };
685
686        if channel.send(Command::Pdu(pdu.clone())).await.is_err() {
687            // the transaction is completed.
688            // spawn a new one
689            // this is very unlikely and only results
690            // if a sender is re-using a transaction id
691            match pdu.header.direction {
692                Direction::ToReceiver => {
693                    let entity_config = self
694                        .entity_configs
695                        .get(&key.0)
696                        .unwrap_or(&self.default_config)
697                        .clone();
698                    if let Some(transport) = self.transport_tx_map.get(&transport_entity).cloned() {
699                        let (id, new_channel, handle) = Self::spawn_receive_transaction(
700                            &pdu.header,
701                            transport,
702                            entity_config,
703                            self.filestore.clone(),
704                            self.indication_tx.clone(),
705                        );
706                        self.transaction_handles.push(handle);
707                        new_channel
708                            .send(Command::Pdu(pdu.clone()))
709                            .await
710                            .map_err(|err| DaemonError::from((id, err)))?;
711                        // update the dict to have the new channel
712                        self.transaction_channels.insert(key, new_channel);
713                    }
714                }
715                Direction::ToSender => {
716                    // This is a very unlikely scenario.
717                    // We have received a PDU sent back to the Sender
718                    // but the transaction transaction stopped running in the background.
719                    // Likely causes are a filestore error inside the transaction.
720                    // Unfortunately there is not enough information in a PDU
721                    // to completely re-create the transaction.
722                    error!("Received PDU sent back to sender but no transaction running. Unable to resume transaction.");
723                    return Err(DaemonError::UnableToResume(TransactionID(
724                        pdu.header.source_entity_id,
725                        pdu.header.transaction_sequence_number,
726                    )));
727                }
728            };
729        }
730
731        Ok(())
732    }
733
734    async fn cleanup_transactions(&mut self) {
735        // join any handles that have completed
736        let mut ind = 0;
737        while ind < self.transaction_handles.len() {
738            if self.transaction_handles[ind].is_finished() {
739                let handle = self.transaction_handles.remove(ind);
740                match handle.await {
741                    Ok(Ok(id)) => {
742                        // remove the channel for this transaction if it is complete
743                        let _ = self.transaction_channels.remove(&id);
744                    }
745                    Ok(Err(err)) => {
746                        info!("Error occurred during transaction: {}", err)
747                    }
748                    Err(_) => error!("Unable to join handle!"),
749                };
750            } else {
751                ind += 1;
752            }
753        }
754    }
755
756    /// This function will consist of the main logic loop in any daemon process.
757    pub async fn manage_transactions(&mut self) -> DaemonResult<()> {
758        let cleanup = {
759            let mut interval = tokio::time::interval(Duration::from_secs(1));
760            // Don't start counting another tick until the currrent one has been processed.
761            interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
762            interval
763        };
764        tokio::pin!(cleanup);
765
766        loop {
767            select! {
768                pdu = self.transport_rx.recv() => match pdu {
769                    Some(pdu) => match self.forward_pdu(pdu).await{
770                        Ok(_) => {},
771                        Err(error @ DaemonError::TransactionCommunication(_, _)) => {
772                            // This occcurs most likely if a user is attempting to
773                            // interact with a transaction that is already finished.
774                            warn!("{error}");
775                        },
776                        Err(err) => {
777                            if !self.terminate.load(Ordering::Relaxed) {
778                                self.terminate.store(true, Ordering::Relaxed);
779                            }
780                            return Err(err);
781                        }
782                    },
783                    None => {
784                        if !self.terminate.load(Ordering::Relaxed) {
785                            error!("Transport unexpectedly disconnected from daemon.");
786                            self.terminate.store(true, Ordering::Relaxed);
787                        }
788                        break;
789                    }
790                },
791                primitive = self.primitive_rx.recv() => match primitive {
792                    Some(primitive) => match self.process_primitive(primitive).await{
793                        Ok(_) => {},
794                        Err(error @ DaemonError::SpawnSend(_)) => {
795                            // Unable to spawn a send transaction.
796                            // There are lots of reasons this cound happen.
797                            // Mostly if a user asked for a file that doesn't exist.
798                            warn!("{error}");
799                        },
800                        Err(error @ DaemonError::TransactionCommunication(_, _)) => {
801                            // This occcurs most likely if a user is attempting to
802                            // interact with a transaction that is already finished.
803                            warn!("{error}");
804                        }
805                        Err(err) => {
806                            if !self.terminate.load(Ordering::Relaxed) {
807                                self.terminate.store(true, Ordering::Relaxed);
808                            }
809                            return Err(err);
810                        }
811                    },
812                    None => {
813                        info!("User triggered daemon shutdown.");
814                        if !self.terminate.load(Ordering::Relaxed) {
815                            self.terminate.store(true, Ordering::Relaxed);
816                        }
817                        break;
818                    }
819                },
820                _ = cleanup.tick() => self.cleanup_transactions().await,
821            };
822        }
823
824        // a final cleanup
825        while let Some(handle) = self.transaction_handles.pop() {
826            match handle.await {
827                Ok(Ok(id)) => {
828                    // remove the channel for this transaction if it is complete
829                    let _ = self.transaction_channels.remove(&id);
830                }
831                Ok(Err(err)) => {
832                    info!("Error occurred during transaction: {}", err)
833                }
834                Err(_) => error!("Unable to join handle!"),
835            };
836        }
837        Ok(())
838    }
839}
840
841#[cfg(test)]
842mod test {
843    use crate::{
844        daemon::NakProcedure,
845        filestore::{ChecksumType, NativeFileStore},
846        pdu::{self, CRCFlag, Condition, NegativeAcknowledgmentPDU, PDUPayload, U3},
847        transaction::FaultHandlerAction,
848    };
849
850    use super::*;
851
852    #[macro_export]
853    macro_rules! assert_err{
854        ($expression:expr, $($pattern:tt)+) => {
855            match $expression {
856                $($pattern)+ => {},
857                ref e => panic!("expected {} but got {:?}", stringify!($($pattern)+), e)
858            }
859        }
860    }
861
862    #[tokio::test]
863    async fn pdu_to_sender_no_transaction() {
864        let (_send, recv) = channel(1);
865        let (indication_tx, _indication_rx) = channel(1);
866        let (_primitive_tx, primitive_rx) = channel(1);
867        let filestore = Arc::new(NativeFileStore::new("."));
868        let mut transport_tx_map = HashMap::<_, _>::new();
869
870        let (transport_tx, _) = channel(10);
871
872        transport_tx_map.insert(EntityID::from(1_u16), transport_tx);
873
874        let mut daemon = Daemon {
875            transaction_handles: vec![],
876            transaction_channels: HashMap::<_, _>::new(),
877            transport_tx_map,
878            transport_rx: recv,
879            filestore,
880            indication_tx,
881            entity_configs: HashMap::new(),
882            default_config: EntityConfig {
883                fault_handler_override: HashMap::from([(
884                    Condition::PositiveLimitReached,
885                    FaultHandlerAction::Abandon,
886                )]),
887                file_size_segment: 1024,
888                default_transaction_max_count: 2,
889                inactivity_timeout: 0,
890                eof_timeout: 1,
891                nak_timeout: 2,
892                crc_flag: CRCFlag::NotPresent,
893                checksum_type: ChecksumType::Modular,
894                nak_procedure: NakProcedure::Deferred(Duration::from_secs(0)),
895                local_entity_id: 0_u16,
896                remote_entity_id: 1_u16,
897                local_server_addr: "127.0.0.1:0",
898                remote_server_addr: "127.0.0.1:0",
899                progress_report_interval_secs: 1,
900            },
901            entity_id: EntityID::from(0_u16),
902            sequence_num: TransactionSeqNum::from(0_u32),
903            terminate: Arc::new(AtomicBool::new(false)),
904            primitive_rx,
905        };
906        let payload = PDUPayload::Directive(pdu::Operations::Nak(NegativeAcknowledgmentPDU {
907            start_of_scope: 0,
908            end_of_scope: 1_000_000,
909            segment_requests: vec![],
910        }));
911        let pdu = PDU {
912            header: PDUHeader {
913                version: U3::Zero,
914                pdu_type: pdu::PDUType::FileDirective,
915                direction: Direction::ToSender,
916                transmission_mode: pdu::TransmissionMode::Acknowledged,
917                crc_flag: CRCFlag::NotPresent,
918                large_file_flag: FileSizeFlag::Small,
919                pdu_data_field_length: payload.encoded_len(FileSizeFlag::Small),
920                segmentation_control: pdu::SegmentationControl::NotPreserved,
921                segment_metadata_flag: SegmentedData::NotPresent,
922                source_entity_id: EntityID::from(0_u16),
923                transaction_sequence_number: TransactionSeqNum::from(3_u32),
924                destination_entity_id: EntityID::from(1_u16),
925            },
926            payload,
927        };
928
929        let res = daemon.forward_pdu(pdu).await;
930        assert_err!(res, Err(DaemonError::UnableToResume(_)))
931    }
932}