nimble_blob_stream/in_logic_front.rs
1/*
2 * Copyright (c) Peter Bjorklund. All rights reserved. https://github.com/nimble-rust/nimble
3 * Licensed under the MIT License. See LICENSE in the project root for license information.
4 */
5use crate::in_logic::Logic;
6use crate::prelude::BlobError;
7use crate::protocol::TransferId;
8use crate::protocol_front::{
9 AckChunkFrontData, ReceiverToSenderFrontCommands, SenderToReceiverFrontCommands,
10};
11use crate::ChunkIndex;
12use err_rs::{ErrorLevel, ErrorLevelProvider};
13use log::{debug, trace};
14use std::io;
15
16#[derive(Debug)]
17pub enum FrontLogicError {
18 IoError(io::Error),
19 BlobError(BlobError),
20 UnknownTransferId(TransferId),
21 ChunkSizeCanNotBeZero,
22}
23
24impl ErrorLevelProvider for FrontLogicError {
25 fn error_level(&self) -> ErrorLevel {
26 match self {
27 Self::IoError(_)
28 | Self::ChunkSizeCanNotBeZero
29 | Self::BlobError(_)
30 | Self::UnknownTransferId(_) => ErrorLevel::Info,
31 }
32 }
33}
34
35impl From<BlobError> for FrontLogicError {
36 fn from(err: BlobError) -> Self {
37 Self::BlobError(err)
38 }
39}
40
41pub struct Info {
42 pub transfer_id: TransferId,
43 pub fixed_chunk_size: u16,
44 pub octet_count: usize,
45 pub chunk_count_received: u32,
46 pub waiting_for_chunk_index: ChunkIndex,
47}
48
49#[derive(Debug)]
50pub struct State {
51 transfer_id: TransferId,
52 logic: Logic,
53}
54
55/// `Logic` handles the logic for receiving and processing chunks of data
56/// in a streaming context. It manages the internal state and interactions
57/// between the sender and receiver commands.
58#[derive(Debug, Default)]
59pub struct FrontLogic {
60 state: Option<State>,
61 should_reply_ack: bool,
62}
63
64impl FrontLogic {
65 /// Creates a new `InLogicFront` instance with the specified `octet_count` and `chunk_size`.
66 ///
67 /// # Arguments
68 ///
69 /// * `octet_count` - The total number of octets (bytes) expected in the stream.
70 /// * `chunk_size` - The size of each chunk in the stream.
71 ///
72 /// # Returns
73 ///
74 /// A new `InLogicFront` instance.
75 ///
76 #[must_use]
77 pub const fn new() -> Self {
78 Self {
79 state: None,
80 should_reply_ack: false,
81 }
82 }
83
84 /// Updates the internal state based on a `SenderToReceiverFrontCommands` command.
85 ///
86 /// This method processes either a `StartTransfer` or `SetChunk` command sent by the sender.
87 /// If a `StartTransfer` command is received, the current state (including `transfer_id` and
88 /// `logic`) is reinitialized if necessary. If a `SetChunk` command is received, it applies
89 /// the chunk of data to the current logic.
90 ///
91 /// # Arguments
92 ///
93 /// * `command` - A command sent by the sender to either start a new transfer or update
94 /// an existing one with a chunk of data.
95 ///
96 /// # Returns
97 ///
98 /// On success, this method returns a corresponding response:
99 /// * If a `StartTransfer` command is processed, it returns `AckStart` with the `transfer_id`.
100 /// * If a `SetChunk` command is processed successfully, it returns `AckChunk` with information
101 /// on the last chunk received in order as well as a receive-mask for up to 64 chunks
102 /// after that.
103 ///
104 /// # Errors
105 ///
106 /// This function returns an `io::Error` in the following cases:
107 /// * If a `SetChunk` command is received and the transfer state has not been initialized
108 /// (i.e., no `StartTransfer` has been processed), it returns an `io::Error` with
109 /// `ErrorKind::InvalidData` and a message indicating that the `transfer_id` is unknown.
110 ///
111 /// * Any I/O error encountered during the update of the logic will be propagated.
112 ///
113 /// # Example
114 ///
115 /// ```
116 /// use nimble_blob_stream::in_logic_front::FrontLogic;
117 /// use nimble_blob_stream::protocol::StartTransferData;
118 /// use nimble_blob_stream::protocol_front::SenderToReceiverFrontCommands;
119 ///
120 /// let mut logic_front = FrontLogic::new();
121 ///
122 /// let start_command = SenderToReceiverFrontCommands::StartTransfer(StartTransferData {
123 /// transfer_id: 1234,
124 /// total_octet_size: 1024,
125 /// chunk_size: 256,
126 /// });
127 ///
128 /// let response = logic_front.receive(&start_command);
129 /// assert!(response.is_ok());
130 /// ```
131 pub fn receive(
132 &mut self,
133 command: &SenderToReceiverFrontCommands,
134 ) -> Result<(), FrontLogicError> {
135 match command {
136 SenderToReceiverFrontCommands::StartTransfer(start_transfer_data) => {
137 if self
138 .state
139 .as_ref()
140 .map_or(true, |s| s.transfer_id.0 != start_transfer_data.transfer_id)
141 {
142 debug!(
143 "received a start transfer for {}. sending ack.",
144 start_transfer_data.transfer_id
145 );
146
147 if start_transfer_data.chunk_size == 0 {
148 Err(FrontLogicError::ChunkSizeCanNotBeZero)?;
149 }
150 // Either logic is not set or the transfer_id is different, so we start with a fresh InLogic.
151 self.state = Some(State {
152 transfer_id: TransferId(start_transfer_data.transfer_id),
153 logic: Logic::new(
154 start_transfer_data.total_octet_size as usize,
155 start_transfer_data.chunk_size,
156 ),
157 });
158 self.should_reply_ack = true;
159 }
160 Ok(())
161 }
162 SenderToReceiverFrontCommands::SetChunk(chunk_data) => {
163 if let Some(ref mut state) = self.state {
164 trace!(
165 "received chunk {} (transfer:{})",
166 chunk_data.data.chunk_index,
167 chunk_data.transfer_id.0
168 );
169 state.logic.receive(&chunk_data.data)?;
170 if state.logic.is_complete() {
171 trace!("received all chunks!");
172 }
173 Ok(())
174 } else {
175 Err(FrontLogicError::UnknownTransferId(chunk_data.transfer_id))
176 }
177 }
178 }
179 }
180
181 pub fn send(&mut self) -> Option<ReceiverToSenderFrontCommands> {
182 if self.should_reply_ack {
183 self.should_reply_ack = false;
184 let transfer_id = self.state.as_ref()?.transfer_id.0;
185 Some(ReceiverToSenderFrontCommands::AckStart(transfer_id))
186 } else if let Some(state) = self.state.as_mut() {
187 let ack = &state.logic.send();
188 Some(ReceiverToSenderFrontCommands::AckChunk(AckChunkFrontData {
189 transfer_id: state.transfer_id,
190 data: *ack,
191 }))
192 } else {
193 None
194 }
195 }
196
197 /// Retrieves the full blob data if all chunks have been received.
198 ///
199 /// # Returns
200 ///
201 /// An `Some(&[u8])` containing the full blob data if all chunks have been received,
202 /// or `None` if the blob is incomplete.
203 #[must_use]
204 pub fn blob(&self) -> Option<&[u8]> {
205 self.state.as_ref().and_then(|state| state.logic.blob())
206 }
207
208 #[must_use]
209 pub fn info(&self) -> Option<Info> {
210 self.state.as_ref().map(|s| {
211 let info = s.logic.info();
212 Info {
213 transfer_id: s.transfer_id,
214 fixed_chunk_size: info.chunk_octet_size,
215 octet_count: info.total_octet_size,
216 chunk_count_received: info.chunk_count_received,
217 waiting_for_chunk_index: info.waiting_for_chunk_index,
218 }
219 })
220 }
221}