nimble_blob_stream/
out_logic_front.rs1use crate::out_logic::Logic;
6use crate::out_stream::OutStreamError;
7use crate::prelude::{
8 ReceiverToSenderFrontCommands, SenderToReceiverFrontCommands, StartTransferData, TransferId,
9};
10use log::{debug, trace};
11use monotonic_time_rs::Millis;
12use std::time::Duration;
13
14#[derive(Debug)]
15pub enum Phase {
16 StartTransfer,
17 Transfer,
18}
19
20#[allow(unused)]
21#[derive(Debug)]
22pub struct OutLogicFront {
23 out_stream: Logic,
24 phase: Phase,
25 transfer_id: TransferId,
26}
27
28impl OutLogicFront {
29 #[allow(unused)]
32 pub fn new(
33 transfer_id: TransferId,
34 fixed_chunk_size: u16,
35 resend_duration: Duration,
36 blob: &[u8],
37 ) -> Result<Self, OutStreamError> {
38 Ok(Self {
39 out_stream: Logic::new(transfer_id, fixed_chunk_size, resend_duration, blob)?,
40 phase: Phase::StartTransfer,
41 transfer_id,
42 })
43 }
44
45 pub fn receive(
48 &mut self,
49 command: &ReceiverToSenderFrontCommands,
50 ) -> Result<(), OutStreamError> {
51 match self.phase {
52 Phase::StartTransfer => {
53 if let ReceiverToSenderFrontCommands::AckStart(ack_transfer_id) = command {
54 if self.transfer_id.0 == *ack_transfer_id {
55 debug!("received ack for correct transfer id {ack_transfer_id}, start transfer");
56 self.phase = Phase::Transfer;
57 } else {
58 debug!(
59 "received ack for wrong transfer id {ack_transfer_id}, start transfer"
60 );
61 }
62 }
63 }
64 Phase::Transfer => match command {
65 ReceiverToSenderFrontCommands::AckChunk(ack_chunk_front) => {
66 self.out_stream.set_waiting_for_chunk_index(
67 ack_chunk_front.data.waiting_for_chunk_index as usize,
68 ack_chunk_front.data.receive_mask_after_last,
69 )?;
70 if self.out_stream.is_received_by_remote() {
71 trace!("blob stream is received by remote! {}", self.transfer_id.0);
72 }
73 }
74 ReceiverToSenderFrontCommands::AckStart(_) => {}
75 },
76 }
77 Ok(())
78 }
79
80 #[allow(unused)]
83 pub fn send(
84 &mut self,
85 now: Millis,
86 ) -> Result<Vec<SenderToReceiverFrontCommands>, OutStreamError> {
87 match self.phase {
88 Phase::StartTransfer => {
89 debug!("send start transfer {}", self.transfer_id.0);
90 Ok(vec![SenderToReceiverFrontCommands::StartTransfer(
91 StartTransferData {
92 transfer_id: self.transfer_id.0,
93 total_octet_size: self.out_stream.octet_size(),
94 chunk_size: self.out_stream.chunk_size(),
95 },
96 )])
97 }
98
99 Phase::Transfer => {
100 const MAX_CHUNK_COUNT_EACH_SEND: usize = 10;
101 let set_chunks: Vec<_> = self
102 .out_stream
103 .send(now, MAX_CHUNK_COUNT_EACH_SEND)
104 .iter()
105 .map(|front_data| SenderToReceiverFrontCommands::SetChunk(front_data.clone()))
106 .collect();
107 for set_chunk in &set_chunks {
108 match set_chunk {
109 SenderToReceiverFrontCommands::SetChunk(front_data) => {
110 trace!(
111 "sending chunk {} (transfer:{})",
112 front_data.data.chunk_index,
113 front_data.transfer_id.0
114 );
115 }
116 SenderToReceiverFrontCommands::StartTransfer(_) => {
117 Err(OutStreamError::UnexpectedStartTransfer)?
118 }
119 }
120 }
121 Ok(set_chunks)
122 }
123 }
124 }
125
126 #[must_use]
127 pub fn is_received_by_remote(&self) -> bool {
128 self.out_stream.is_received_by_remote()
129 }
130
131 #[must_use]
132 pub const fn transfer_id(&self) -> TransferId {
133 self.out_stream.transfer_id()
134 }
135}