blob_stream/
out_logic_front.rs1use crate::out_logic::Logic;
6use crate::out_stream::OutStreamError;
7use crate::prelude::{
8 ReceiverToSenderFrontCommands, SenderToReceiverFrontCommands, StartTransferData, TransferId,
9};
10use log::{debug, trace};
11use std::time::{Duration, Instant};
12
13#[derive(Debug)]
14pub enum Phase {
15 StartTransfer,
16 Transfer,
17}
18
19#[allow(unused)]
20#[derive(Debug)]
21pub struct OutLogicFront {
22 out_stream: Logic,
23 phase: Phase,
24 transfer_id: TransferId,
25}
26
27impl OutLogicFront {
28 #[allow(unused)]
29 pub fn new(
30 transfer_id: TransferId,
31 fixed_chunk_size: usize,
32 resend_duration: Duration,
33 blob: &[u8],
34 ) -> Self {
35 Self {
36 out_stream: Logic::new(transfer_id, fixed_chunk_size, resend_duration, blob),
37 phase: Phase::StartTransfer,
38 transfer_id,
39 }
40 }
41
42 pub fn receive(
43 &mut self,
44 command: &ReceiverToSenderFrontCommands,
45 ) -> Result<(), OutStreamError> {
46 match self.phase {
47 Phase::StartTransfer => {
48 if let ReceiverToSenderFrontCommands::AckStart(ack_transfer_id) = command {
49 if self.transfer_id.0 == *ack_transfer_id {
50 debug!("received ack for correct transfer id {ack_transfer_id}, start transfer");
51 self.phase = Phase::Transfer;
52 } else {
53 debug!(
54 "received ack for wrong transfer id {ack_transfer_id}, start transfer"
55 );
56 }
57 }
58 }
59 Phase::Transfer => match command {
60 ReceiverToSenderFrontCommands::AckChunk(ack_chunk_front) => {
61 self.out_stream.set_waiting_for_chunk_index(
62 ack_chunk_front.data.waiting_for_chunk_index as usize,
63 ack_chunk_front.data.receive_mask_after_last,
64 )?;
65 if self.out_stream.is_received_by_remote() {
66 trace!("blob stream is received by remote! {}", self.transfer_id.0);
67 }
68 }
69 ReceiverToSenderFrontCommands::AckStart(_) => {}
70 },
71 }
72 Ok(())
73 }
74
75 #[allow(unused)]
76 pub fn send(
77 &mut self,
78 now: Instant,
79 ) -> Result<Vec<SenderToReceiverFrontCommands>, OutStreamError> {
80 match self.phase {
81 Phase::StartTransfer => {
82 debug!("send start transfer {}", self.transfer_id.0);
83 Ok(vec![SenderToReceiverFrontCommands::StartTransfer(
84 StartTransferData {
85 transfer_id: self.transfer_id.0,
86 total_octet_size: self.out_stream.octet_size() as u32,
87 chunk_size: self.out_stream.chunk_size() as u16,
88 },
89 )])
90 }
91
92 Phase::Transfer => {
93 const MAX_CHUNK_COUNT_EACH_SEND: usize = 10;
94 let set_chunks: Vec<_> = self
95 .out_stream
96 .send(now, MAX_CHUNK_COUNT_EACH_SEND)
97 .iter()
98 .map(|front_data| SenderToReceiverFrontCommands::SetChunk(front_data.clone()))
99 .collect();
100 for set_chunk in &set_chunks {
101 match set_chunk {
102 SenderToReceiverFrontCommands::SetChunk(front_data) => {
103 trace!(
104 "sending chunk {} (transfer:{})",
105 front_data.data.chunk_index,
106 front_data.transfer_id.0
107 );
108 }
109 _ => panic!("Unexpected enum variant: {:?}", set_chunk),
110 }
111 }
112 Ok(set_chunks)
113 }
114 }
115 }
116
117 #[must_use]
118 pub fn is_received_by_remote(&self) -> bool {
119 self.out_stream.is_received_by_remote()
120 }
121
122 #[must_use]
123 pub fn transfer_id(&self) -> TransferId {
124 self.out_stream.transfer_id()
125 }
126}