1use std::{
2 collections::{hash_map::Entry, HashMap},
3 fmt::Display,
4 io::Read,
5 sync::{
6 atomic::{AtomicBool, Ordering},
7 Arc,
8 },
9 time::Duration,
10};
11
12use camino::Utf8PathBuf;
13use chrono::{DateTime, Utc};
14use error::DaemonResult;
15use log::{error, info, warn};
16use num_traits::FromPrimitive;
17use tokio::{
18 select,
19 sync::{
20 mpsc::{channel, Receiver, Sender},
21 oneshot,
22 },
23 task::JoinHandle,
24 time::MissedTickBehavior,
25};
26
27use crate::{
28 filestore::{ChecksumType, FileStore},
29 pdu::{
30 header::{
31 CRCFlag, Condition, DeliveryCode, Direction, FileSizeFlag, FileStatusCode, PDUHeader,
32 SegmentedData, TransactionStatus, TransmissionMode,
33 },
34 ops::{EntityID, TransactionSeqNum},
35 PDUEncode, PDUError, PDUResult, PDU,
36 },
37 transaction::FaultHandlerAction,
38 transaction::{Metadata, TransactionConfig, TransactionID, TransactionState},
39};
40
41pub mod error;
42pub mod segments;
43pub mod timer;
44pub mod transport;
45pub mod user;
46
47pub(crate) use timer::*;
48pub use user::*;
49
50use self::error::DaemonError;
51use self::transport::PDUTransport;
52use crate::transaction::{error::TransactionError, recv::RecvTransaction, send::SendTransaction};
53
54#[derive(Debug, Clone, PartialEq, Eq)]
55pub struct PutRequest {
57 pub source_filename: Utf8PathBuf,
59 pub destination_filename: Utf8PathBuf,
61 pub destination_entity_id: EntityID,
63 pub transmission_mode: TransmissionMode,
65}
66
67#[derive(Debug)]
68pub enum UserPrimitive {
70 Put(PutRequest, oneshot::Sender<TransactionID>),
74 Cancel(TransactionID),
76 Report(TransactionID, oneshot::Sender<Report>),
78}
79
80#[derive(Debug, Clone)]
82#[cfg_attr(test, derive(PartialEq))]
83pub struct Report {
85 pub id: TransactionID,
87 pub state: TransactionState,
89 pub status: TransactionStatus,
91 pub condition: Condition,
93 pub file_size: u64,
95 pub file_bytes_received: Option<u64>,
97 pub file_bytes_sent: Option<u64>,
99 pub empty_nak_received: bool,
101 pub direction: Option<Direction>,
103 pub file_name: String,
105 pub submit_date: DateTime<Utc>,
107}
108impl Report {
109 pub fn encode(self) -> Vec<u8> {
110 let mut buff = self.id.0.encode();
111 buff.extend(self.id.1.encode());
112 buff.push(self.state as u8);
113 buff.push(self.status as u8);
114 buff.push(self.condition as u8);
115 buff
116 }
117
118 pub fn decode<T: Read>(buffer: &mut T) -> PDUResult<Self> {
119 let id = {
120 let entity_id = EntityID::decode(buffer)?;
121 let sequence_num = TransactionSeqNum::decode(buffer)?;
122
123 TransactionID(entity_id, sequence_num)
124 };
125
126 let mut u8_buff = [0_u8; 1];
127
128 let state = {
129 buffer.read_exact(&mut u8_buff)?;
130 let possible = u8_buff[0];
131 TransactionState::from_u8(possible).ok_or(PDUError::InvalidState(possible))?
132 };
133
134 let status = {
135 buffer.read_exact(&mut u8_buff)?;
136 let possible = u8_buff[0];
137 TransactionStatus::from_u8(possible)
138 .ok_or(PDUError::InvalidTransactionStatus(possible))?
139 };
140
141 let condition = {
142 buffer.read_exact(&mut u8_buff)?;
143 let possible = u8_buff[0];
144 Condition::from_u8(possible).ok_or(PDUError::InvalidCondition(possible))?
145 };
146
147 Ok(Self {
148 id,
149 state,
150 status,
151 condition,
152 empty_nak_received: false,
153 file_size: 0,
154 file_bytes_received: None,
155 file_bytes_sent: None,
156 direction: None,
157 file_name: String::new(),
158 submit_date: Utc::now(),
159 })
160 }
161}
162
163#[derive(Debug, Clone)]
164pub struct MetadataRecvIndication {
166 pub id: TransactionID,
167 pub source_filename: Utf8PathBuf,
169 pub destination_filename: Utf8PathBuf,
171 pub file_size: u64,
173 pub transmission_mode: TransmissionMode,
175}
176
177#[derive(Debug, Clone)]
178pub struct FileSegmentIndication {
180 pub id: TransactionID,
182 pub offset: u64,
184 pub length: u64,
186}
187
188#[derive(Debug, Clone)]
189pub struct FinishedIndication {
191 pub id: TransactionID,
193 pub report: Report,
195 pub file_status: FileStatusCode,
197 pub delivery_code: DeliveryCode,
199}
200
201#[derive(Debug, Clone)]
202pub enum Indication {
205 Transaction(TransactionID),
207 EoFSent(TransactionID),
209 EoFRecv(TransactionID),
211 Finished(FinishedIndication),
214 MetadataRecv(MetadataRecvIndication),
216 FileSegmentRecv(FileSegmentIndication),
218 Report(Report),
220}
221
222#[derive(Debug, Copy, Clone, PartialEq, Eq)]
245pub enum NakProcedure {
246 Immediate(Duration ),
247 Deferred(Duration ),
248}
249
250#[derive(Clone)]
251pub struct EntityConfig {
253 pub fault_handler_override: HashMap<Condition, FaultHandlerAction>,
255 pub file_size_segment: u16,
257 pub default_transaction_max_count: u32,
259 pub inactivity_timeout: i64,
261 pub eof_timeout: i64,
263 pub nak_timeout: i64,
265 pub crc_flag: CRCFlag,
267 pub checksum_type: ChecksumType,
269 pub nak_procedure: NakProcedure,
271 pub local_entity_id: u16,
273 pub remote_entity_id: u16,
275 pub local_server_addr: &'static str,
277 pub remote_server_addr: &'static str,
279 pub progress_report_interval_secs: i64,
281}
282
283#[derive(Debug)]
285pub enum Command {
286 Pdu(PDU),
287 Abandon,
288 Report(oneshot::Sender<Report>),
289}
290impl Display for Command {
291 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
292 write!(f, "{:?}", &self)
293 }
294}
295
296fn construct_metadata<T: FileStore + Send + 'static>(
297 filestore: &Arc<T>,
298 req: PutRequest,
299 config: EntityConfig,
300) -> DaemonResult<Metadata> {
301 let file_size = match req.source_filename.file_name().is_none() {
302 true => 0_u64,
303 false => filestore
304 .get_size(&req.source_filename)
305 .map_err(DaemonError::SpawnSend)?,
306 };
307 Ok(Metadata {
308 source_filename: req.source_filename,
309 destination_filename: req.destination_filename,
310 file_size,
311 checksum_type: config.checksum_type,
312 })
313}
314
315type RecvSpawnerTuple = (
316 TransactionID,
317 Sender<Command>,
318 JoinHandle<Result<TransactionID, TransactionError>>,
319);
320
321type SendSpawnerTuple = (
322 Sender<Command>,
323 JoinHandle<Result<TransactionID, TransactionError>>,
324);
325
326pub struct Daemon<T: FileStore + Send + 'static> {
332 transaction_handles: Vec<JoinHandle<Result<TransactionID, TransactionError>>>,
334 transaction_channels: HashMap<TransactionID, Sender<Command>>,
336 transport_tx_map: HashMap<EntityID, Sender<(EntityID, PDU)>>,
338 transport_rx: Receiver<PDU>,
340 filestore: Arc<T>,
342 indication_tx: Sender<Indication>,
344 entity_configs: HashMap<EntityID, EntityConfig>,
346 default_config: EntityConfig,
348 entity_id: EntityID,
350 sequence_num: TransactionSeqNum,
352 terminate: Arc<AtomicBool>,
354 primitive_rx: Receiver<UserPrimitive>,
356}
357impl<T: FileStore + Send + Sync + 'static> Daemon<T> {
358 #[allow(clippy::too_many_arguments)]
359 pub fn new(
360 entity_id: EntityID,
361 sequence_num: TransactionSeqNum,
362 transport_map: HashMap<Vec<EntityID>, Box<dyn PDUTransport + Send>>,
363 filestore: Arc<T>,
364 entity_configs: HashMap<EntityID, EntityConfig>,
365 default_config: EntityConfig,
366 primitive_rx: Receiver<UserPrimitive>,
367 indication_tx: Sender<Indication>,
368 ) -> Self {
369 let mut transport_tx_map: HashMap<EntityID, Sender<(EntityID, PDU)>> = HashMap::new();
370 let (pdu_send, pdu_receive) = channel(100);
371 let terminate = Arc::new(AtomicBool::new(false));
372 for (vec, mut transport) in transport_map.into_iter() {
373 let (remote_send, remote_receive) = channel(1);
374
375 vec.iter().for_each(|id| {
376 transport_tx_map.insert(*id, remote_send.clone());
377 });
378
379 let signal = terminate.clone();
380 let sender = pdu_send.clone();
381 tokio::task::spawn(async move {
382 transport.pdu_handler(signal, sender, remote_receive).await
383 });
384 }
385 Self {
386 transaction_handles: vec![],
387 transaction_channels: HashMap::new(),
388 transport_tx_map,
389 transport_rx: pdu_receive,
390 filestore,
391 indication_tx,
392 entity_configs,
393 default_config,
394 entity_id,
395 sequence_num,
396 terminate,
397 primitive_rx,
398 }
399 }
400
401 fn spawn_receive_transaction(
402 header: &PDUHeader,
403 transport_tx: Sender<(EntityID, PDU)>,
404 entity_config: EntityConfig,
405 filestore: Arc<T>,
406 indication_tx: Sender<Indication>,
407 ) -> RecvSpawnerTuple {
408 let (transaction_tx, mut transaction_rx) = channel(100);
409
410 let config = TransactionConfig {
411 source_entity_id: header.source_entity_id,
412 destination_entity_id: header.destination_entity_id,
413 transmission_mode: header.transmission_mode,
414 sequence_number: header.transaction_sequence_number,
415 file_size_flag: header.large_file_flag,
416 fault_handler_override: entity_config.fault_handler_override.clone(),
417 file_size_segment: entity_config.file_size_segment,
418 crc_flag: header.crc_flag,
419 segment_metadata_flag: header.segment_metadata_flag,
420 max_count: entity_config.default_transaction_max_count,
421 inactivity_timeout: entity_config.inactivity_timeout,
422 eof_timeout: entity_config.eof_timeout,
423 nak_timeout: entity_config.nak_timeout,
424 progress_report_interval_secs: entity_config.progress_report_interval_secs,
425 };
426 let mut transaction = RecvTransaction::new(
427 config,
428 entity_config.nak_procedure,
429 filestore,
430 indication_tx,
431 );
432 let id = transaction.id();
433
434 let handle = tokio::task::spawn(async move {
436 transaction.send_report(None)?;
437
438 while transaction.get_state() != TransactionState::Terminated {
439 let timeout = transaction.until_timeout();
440 select! {
441 Ok(permit) = transport_tx.reserve(), if transaction.has_pdu_to_send() => {
442 transaction.send_pdu(permit)?
443 },
444 Some(command) = transaction_rx.recv() => {
445 match command {
446 Command::Pdu(pdu) => {
447 match transaction.process_pdu(pdu) {
448 Ok(()) => {}
449 Err(err @ TransactionError::UnexpectedPDU(..)) => {
450 info!("Transaction {} Received Unexpected PDU: {err}", transaction.id());
451 }
453 Err(err) => return Err(err)
454 }
455 }
456 Command::Abandon => transaction.shutdown(),
457 Command::Report(sender) => {
458 transaction.send_report(Some(sender))?
459 }
460 }
461 }
462 _ = tokio::time::sleep(timeout) => {
463 transaction.handle_timeout()?;
464 }
465 else => {
466 if transport_tx.is_closed(){
467 log::error!("Channel to transport unexpectedly severed for transaction {}.", transaction.id());
468 }
469
470 break;
471 }
472 };
473 }
474
475 transaction.send_report(None)?;
476 Ok(transaction.id())
477 });
478
479 (id, transaction_tx, handle)
480 }
481
482 fn spawn_send_transaction(
483 request: PutRequest,
484 transaction_id: TransactionID,
485 transport_tx: Sender<(EntityID, PDU)>,
486 entity_config: EntityConfig,
487 filestore: Arc<T>,
488 indication_tx: Sender<Indication>,
489 ) -> DaemonResult<SendSpawnerTuple> {
490 let (transaction_tx, mut transaction_rx) = channel(10);
491
492 let destination_entity_id = request.destination_entity_id;
493 let transmission_mode = request.transmission_mode;
494 let mut config = TransactionConfig {
495 source_entity_id: transaction_id.0,
496 destination_entity_id,
497 transmission_mode,
498 sequence_number: transaction_id.1,
499 file_size_flag: FileSizeFlag::Small,
500 fault_handler_override: entity_config.fault_handler_override.clone(),
501 file_size_segment: entity_config.file_size_segment,
502 crc_flag: entity_config.crc_flag,
503 segment_metadata_flag: SegmentedData::NotPresent,
504 max_count: entity_config.default_transaction_max_count,
505 inactivity_timeout: entity_config.inactivity_timeout,
506 eof_timeout: entity_config.eof_timeout,
507 nak_timeout: entity_config.nak_timeout,
508 progress_report_interval_secs: entity_config.progress_report_interval_secs,
509 };
510 let metadata = construct_metadata(&filestore, request, entity_config)?;
511
512 let handle = tokio::task::spawn(async move {
513 config.file_size_flag = match metadata.file_size <= u32::MAX.into() {
514 true => FileSizeFlag::Small,
515 false => FileSizeFlag::Large,
516 };
517
518 let mut transaction = SendTransaction::new(config, metadata, filestore, indication_tx)?;
519 transaction.send_report(None)?;
520
521 while transaction.get_state() != TransactionState::Terminated {
522 let timeout = transaction.until_timeout();
523 select! {
524 Ok(permit) = transport_tx.reserve(), if transaction.has_pdu_to_send() => {
525 transaction.send_pdu(permit)?;
526 },
527
528 Some(command) = transaction_rx.recv() => {
529 match command {
530 Command::Pdu(pdu) => {
531 match transaction.process_pdu(pdu) {
532 Ok(()) => {}
533 Err(
534 err @ TransactionError::UnexpectedPDU(..),
535 ) => {
536 info!("Received Unexpected PDU: {err}");
537 }
539 Err(err) => {
540 return Err(err);
541 }
542 }
543 }
544 Command::Abandon => transaction.shutdown(),
545 Command::Report(sender) => {
546 transaction.send_report(Some(sender))?
547 },
548 }
549 },
550 _ = tokio::time::sleep(timeout) => {
551 transaction.handle_timeout()?;
552 },
553 else => {
554 if transport_tx.is_closed(){
555 log::error!("Connection to transport unexpectedly severed for transaction {}.", transaction.id());
556 }
557 break;
558 }
559 };
560 }
561 transaction.send_report(None)?;
562 Ok(transaction_id)
563 });
564 Ok((transaction_tx, handle))
565 }
566
567 async fn process_primitive(&mut self, primitive: UserPrimitive) -> DaemonResult<()> {
568 match primitive {
569 UserPrimitive::Put(request, put_sender) => {
570 let sequence_number = self.sequence_num.get_and_increment();
571
572 let entity_config = self
573 .entity_configs
574 .get(&request.destination_entity_id)
575 .unwrap_or(&self.default_config)
576 .clone();
577
578 if let Some(transport_tx) = self
579 .transport_tx_map
580 .get(&request.destination_entity_id)
581 .cloned()
582 {
583 let id = TransactionID(self.entity_id, sequence_number);
584 let (sender, handle) = Self::spawn_send_transaction(
585 request,
586 id,
587 transport_tx,
588 entity_config,
589 self.filestore.clone(),
590 self.indication_tx.clone(),
591 )?;
592 self.transaction_handles.push(handle);
593 self.transaction_channels.insert(id, sender);
594
595 let _ = put_sender.send(id);
597 } else {
598 warn!(
599 "No Transport available for EntityID: {}. Skipping transaction creation.",
600 request.destination_entity_id
601 )
602 }
603 }
604 UserPrimitive::Cancel(id) => {
605 if let Some(channel) = self.transaction_channels.get(&id) {
606 channel
607 .send(Command::Abandon)
608 .await
609 .map_err(|err| DaemonError::from((id, err)))?;
610 }
611 }
612 UserPrimitive::Report(id, report_sender) => {
613 if let Some(channel) = self.transaction_channels.get(&id) {
614 channel
615 .send(Command::Report(report_sender))
616 .await
617 .map_err(|err| DaemonError::from((id, err)))?;
618 }
619 }
620 };
621 Ok(())
622 }
623
624 async fn forward_pdu(&mut self, pdu: PDU) -> DaemonResult<()> {
625 let transport_entity = match &pdu.header.direction {
629 Direction::ToSender => pdu.header.destination_entity_id,
630 Direction::ToReceiver => pdu.header.source_entity_id,
631 };
632
633 let key = TransactionID(
634 pdu.header.source_entity_id,
635 pdu.header.transaction_sequence_number,
636 );
637 let channel = match self.transaction_channels.entry(key) {
639 Entry::Occupied(entry) => entry.into_mut(),
640 Entry::Vacant(entry) => {
641 if let Some(transport) = self.transport_tx_map.get(&transport_entity).cloned() {
642 let entity_config = self
645 .entity_configs
646 .get(&key.0)
647 .unwrap_or(&self.default_config)
648 .clone();
649 match &pdu.header.direction {
650 Direction::ToReceiver => {
651 let (_id, channel, handle) = Self::spawn_receive_transaction(
652 &pdu.header,
653 transport,
654 entity_config,
655 self.filestore.clone(),
656 self.indication_tx.clone(),
657 );
658
659 self.transaction_handles.push(handle);
660 entry.insert(channel)
661 }
662 Direction::ToSender => {
668 error!("Received PDU sent back to sender but no transaction running. Unable to resume transaction.");
669 return Err(DaemonError::UnableToResume(TransactionID(
670 pdu.header.source_entity_id,
671 pdu.header.transaction_sequence_number,
672 )));
673 }
674 }
675 } else {
676 warn!(
677 "No Transport available for EntityID: {}. Skipping Transaction creation.",
678 transport_entity
679 );
680 return Ok(());
682 }
683 }
684 };
685
686 if channel.send(Command::Pdu(pdu.clone())).await.is_err() {
687 match pdu.header.direction {
692 Direction::ToReceiver => {
693 let entity_config = self
694 .entity_configs
695 .get(&key.0)
696 .unwrap_or(&self.default_config)
697 .clone();
698 if let Some(transport) = self.transport_tx_map.get(&transport_entity).cloned() {
699 let (id, new_channel, handle) = Self::spawn_receive_transaction(
700 &pdu.header,
701 transport,
702 entity_config,
703 self.filestore.clone(),
704 self.indication_tx.clone(),
705 );
706 self.transaction_handles.push(handle);
707 new_channel
708 .send(Command::Pdu(pdu.clone()))
709 .await
710 .map_err(|err| DaemonError::from((id, err)))?;
711 self.transaction_channels.insert(key, new_channel);
713 }
714 }
715 Direction::ToSender => {
716 error!("Received PDU sent back to sender but no transaction running. Unable to resume transaction.");
723 return Err(DaemonError::UnableToResume(TransactionID(
724 pdu.header.source_entity_id,
725 pdu.header.transaction_sequence_number,
726 )));
727 }
728 };
729 }
730
731 Ok(())
732 }
733
734 async fn cleanup_transactions(&mut self) {
735 let mut ind = 0;
737 while ind < self.transaction_handles.len() {
738 if self.transaction_handles[ind].is_finished() {
739 let handle = self.transaction_handles.remove(ind);
740 match handle.await {
741 Ok(Ok(id)) => {
742 let _ = self.transaction_channels.remove(&id);
744 }
745 Ok(Err(err)) => {
746 info!("Error occurred during transaction: {}", err)
747 }
748 Err(_) => error!("Unable to join handle!"),
749 };
750 } else {
751 ind += 1;
752 }
753 }
754 }
755
756 pub async fn manage_transactions(&mut self) -> DaemonResult<()> {
758 let cleanup = {
759 let mut interval = tokio::time::interval(Duration::from_secs(1));
760 interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
762 interval
763 };
764 tokio::pin!(cleanup);
765
766 loop {
767 select! {
768 pdu = self.transport_rx.recv() => match pdu {
769 Some(pdu) => match self.forward_pdu(pdu).await{
770 Ok(_) => {},
771 Err(error @ DaemonError::TransactionCommunication(_, _)) => {
772 warn!("{error}");
775 },
776 Err(err) => {
777 if !self.terminate.load(Ordering::Relaxed) {
778 self.terminate.store(true, Ordering::Relaxed);
779 }
780 return Err(err);
781 }
782 },
783 None => {
784 if !self.terminate.load(Ordering::Relaxed) {
785 error!("Transport unexpectedly disconnected from daemon.");
786 self.terminate.store(true, Ordering::Relaxed);
787 }
788 break;
789 }
790 },
791 primitive = self.primitive_rx.recv() => match primitive {
792 Some(primitive) => match self.process_primitive(primitive).await{
793 Ok(_) => {},
794 Err(error @ DaemonError::SpawnSend(_)) => {
795 warn!("{error}");
799 },
800 Err(error @ DaemonError::TransactionCommunication(_, _)) => {
801 warn!("{error}");
804 }
805 Err(err) => {
806 if !self.terminate.load(Ordering::Relaxed) {
807 self.terminate.store(true, Ordering::Relaxed);
808 }
809 return Err(err);
810 }
811 },
812 None => {
813 info!("User triggered daemon shutdown.");
814 if !self.terminate.load(Ordering::Relaxed) {
815 self.terminate.store(true, Ordering::Relaxed);
816 }
817 break;
818 }
819 },
820 _ = cleanup.tick() => self.cleanup_transactions().await,
821 };
822 }
823
824 while let Some(handle) = self.transaction_handles.pop() {
826 match handle.await {
827 Ok(Ok(id)) => {
828 let _ = self.transaction_channels.remove(&id);
830 }
831 Ok(Err(err)) => {
832 info!("Error occurred during transaction: {}", err)
833 }
834 Err(_) => error!("Unable to join handle!"),
835 };
836 }
837 Ok(())
838 }
839}
840
841#[cfg(test)]
842mod test {
843 use crate::{
844 daemon::NakProcedure,
845 filestore::{ChecksumType, NativeFileStore},
846 pdu::{self, CRCFlag, Condition, NegativeAcknowledgmentPDU, PDUPayload, U3},
847 transaction::FaultHandlerAction,
848 };
849
850 use super::*;
851
852 #[macro_export]
853 macro_rules! assert_err{
854 ($expression:expr, $($pattern:tt)+) => {
855 match $expression {
856 $($pattern)+ => {},
857 ref e => panic!("expected {} but got {:?}", stringify!($($pattern)+), e)
858 }
859 }
860 }
861
862 #[tokio::test]
863 async fn pdu_to_sender_no_transaction() {
864 let (_send, recv) = channel(1);
865 let (indication_tx, _indication_rx) = channel(1);
866 let (_primitive_tx, primitive_rx) = channel(1);
867 let filestore = Arc::new(NativeFileStore::new("."));
868 let mut transport_tx_map = HashMap::<_, _>::new();
869
870 let (transport_tx, _) = channel(10);
871
872 transport_tx_map.insert(EntityID::from(1_u16), transport_tx);
873
874 let mut daemon = Daemon {
875 transaction_handles: vec![],
876 transaction_channels: HashMap::<_, _>::new(),
877 transport_tx_map,
878 transport_rx: recv,
879 filestore,
880 indication_tx,
881 entity_configs: HashMap::new(),
882 default_config: EntityConfig {
883 fault_handler_override: HashMap::from([(
884 Condition::PositiveLimitReached,
885 FaultHandlerAction::Abandon,
886 )]),
887 file_size_segment: 1024,
888 default_transaction_max_count: 2,
889 inactivity_timeout: 0,
890 eof_timeout: 1,
891 nak_timeout: 2,
892 crc_flag: CRCFlag::NotPresent,
893 checksum_type: ChecksumType::Modular,
894 nak_procedure: NakProcedure::Deferred(Duration::from_secs(0)),
895 local_entity_id: 0_u16,
896 remote_entity_id: 1_u16,
897 local_server_addr: "127.0.0.1:0",
898 remote_server_addr: "127.0.0.1:0",
899 progress_report_interval_secs: 1,
900 },
901 entity_id: EntityID::from(0_u16),
902 sequence_num: TransactionSeqNum::from(0_u32),
903 terminate: Arc::new(AtomicBool::new(false)),
904 primitive_rx,
905 };
906 let payload = PDUPayload::Directive(pdu::Operations::Nak(NegativeAcknowledgmentPDU {
907 start_of_scope: 0,
908 end_of_scope: 1_000_000,
909 segment_requests: vec![],
910 }));
911 let pdu = PDU {
912 header: PDUHeader {
913 version: U3::Zero,
914 pdu_type: pdu::PDUType::FileDirective,
915 direction: Direction::ToSender,
916 transmission_mode: pdu::TransmissionMode::Acknowledged,
917 crc_flag: CRCFlag::NotPresent,
918 large_file_flag: FileSizeFlag::Small,
919 pdu_data_field_length: payload.encoded_len(FileSizeFlag::Small),
920 segmentation_control: pdu::SegmentationControl::NotPreserved,
921 segment_metadata_flag: SegmentedData::NotPresent,
922 source_entity_id: EntityID::from(0_u16),
923 transaction_sequence_number: TransactionSeqNum::from(3_u32),
924 destination_entity_id: EntityID::from(1_u16),
925 },
926 payload,
927 };
928
929 let res = daemon.forward_pdu(pdu).await;
930 assert_err!(res, Err(DaemonError::UnableToResume(_)))
931 }
932}