blob_stream/in_logic_front.rs
1/*
2 * Copyright (c) Peter Bjorklund. All rights reserved. https://github.com/nimble-rust/nimble
3 * Licensed under the MIT License. See LICENSE in the project root for license information.
4 */
5use crate::in_logic::Logic;
6use crate::prelude::BlobError;
7use crate::protocol::TransferId;
8use crate::protocol_front::{
9 AckChunkFrontData, ReceiverToSenderFrontCommands, SenderToReceiverFrontCommands,
10};
11use crate::ChunkIndex;
12use err_rs::{ErrorLevel, ErrorLevelProvider};
13use log::{debug, trace};
14use std::io;
15
16#[derive(Debug)]
17pub enum FrontLogicError {
18 IoError(io::Error),
19 BlobError(BlobError),
20 UnknownTransferId(TransferId),
21 ChunkSizeCanNotBeZero,
22}
23
24impl ErrorLevelProvider for FrontLogicError {
25 fn error_level(&self) -> ErrorLevel {
26 match self {
27 FrontLogicError::IoError(_) => ErrorLevel::Info,
28 FrontLogicError::BlobError(_) => ErrorLevel::Info,
29 FrontLogicError::UnknownTransferId(_) => ErrorLevel::Info,
30 FrontLogicError::ChunkSizeCanNotBeZero => ErrorLevel::Info,
31 }
32 }
33}
34
35pub struct Info {
36 pub transfer_id: TransferId,
37 pub fixed_chunk_size: usize,
38 pub octet_count: usize,
39 pub chunk_count_received: usize,
40 pub waiting_for_chunk_index: ChunkIndex,
41}
42
43#[derive(Debug)]
44pub struct State {
45 transfer_id: TransferId,
46 logic: Logic,
47}
48
49/// `Logic` handles the logic for receiving and processing chunks of data
50/// in a streaming context. It manages the internal state and interactions
51/// between the sender and receiver commands.
52#[derive(Debug, Default)]
53pub struct FrontLogic {
54 state: Option<State>,
55 should_reply_ack: bool,
56}
57
58impl FrontLogic {
59 /// Creates a new `InLogicFront` instance with the specified `octet_count` and `chunk_size`.
60 ///
61 /// # Arguments
62 ///
63 /// * `octet_count` - The total number of octets (bytes) expected in the stream.
64 /// * `chunk_size` - The size of each chunk in the stream.
65 ///
66 /// # Returns
67 ///
68 /// A new `InLogicFront` instance.
69 ///
70 #[must_use]
71 pub const fn new() -> Self {
72 Self {
73 state: None,
74 should_reply_ack: false,
75 }
76 }
77
78 /// Updates the internal state based on a `SenderToReceiverFrontCommands` command.
79 ///
80 /// This method processes either a `StartTransfer` or `SetChunk` command sent by the sender.
81 /// If a `StartTransfer` command is received, the current state (including `transfer_id` and
82 /// `logic`) is reinitialized if necessary. If a `SetChunk` command is received, it applies
83 /// the chunk of data to the current logic.
84 ///
85 /// # Arguments
86 ///
87 /// * `command` - A command sent by the sender to either start a new transfer or update
88 /// an existing one with a chunk of data.
89 ///
90 /// # Returns
91 ///
92 /// On success, this method returns a corresponding response:
93 /// * If a `StartTransfer` command is processed, it returns `AckStart` with the `transfer_id`.
94 /// * If a `SetChunk` command is processed successfully, it returns `AckChunk` with information
95 /// on the last chunk received in order as well as a receive-mask for up to 64 chunks
96 /// after that.
97 ///
98 /// # Errors
99 ///
100 /// This function returns an `io::Error` in the following cases:
101 /// * If a `SetChunk` command is received and the transfer state has not been initialized
102 /// (i.e., no `StartTransfer` has been processed), it returns an `io::Error` with
103 /// `ErrorKind::InvalidData` and a message indicating that the `transfer_id` is unknown.
104 ///
105 /// * Any I/O error encountered during the update of the logic will be propagated.
106 ///
107 /// # Example
108 ///
109 /// ```
110 /// use blob_stream::in_logic_front::FrontLogic;
111 /// use blob_stream::protocol::StartTransferData;
112 /// use blob_stream::protocol_front::SenderToReceiverFrontCommands;
113 ///
114 /// let mut logic_front = FrontLogic::new();
115 ///
116 /// let start_command = SenderToReceiverFrontCommands::StartTransfer(StartTransferData {
117 /// transfer_id: 1234,
118 /// total_octet_size: 1024,
119 /// chunk_size: 256,
120 /// });
121 ///
122 /// let response = logic_front.receive(&start_command);
123 /// assert!(response.is_ok());
124 /// ```
125 pub fn receive(
126 &mut self,
127 command: &SenderToReceiverFrontCommands,
128 ) -> Result<(), FrontLogicError> {
129 match command {
130 SenderToReceiverFrontCommands::StartTransfer(start_transfer_data) => {
131 if self
132 .state
133 .as_ref()
134 .map_or(true, |s| s.transfer_id.0 != start_transfer_data.transfer_id)
135 {
136 debug!(
137 "received a start transfer for {}. sending ack.",
138 start_transfer_data.transfer_id
139 );
140
141 if start_transfer_data.chunk_size == 0 {
142 Err(FrontLogicError::ChunkSizeCanNotBeZero)?;
143 }
144 // Either logic is not set or the transfer_id is different, so we start with a fresh InLogic.
145 self.state = Some(State {
146 transfer_id: TransferId(start_transfer_data.transfer_id),
147 logic: Logic::new(
148 start_transfer_data.total_octet_size as usize,
149 start_transfer_data.chunk_size as usize,
150 ),
151 });
152 self.should_reply_ack = true;
153 }
154 Ok(())
155 }
156 SenderToReceiverFrontCommands::SetChunk(chunk_data) => {
157 if let Some(ref mut state) = self.state {
158 trace!(
159 "received chunk {} (transfer:{})",
160 chunk_data.data.chunk_index,
161 chunk_data.transfer_id.0
162 );
163 state
164 .logic
165 .receive(&chunk_data.data)
166 .map_err(FrontLogicError::BlobError)?;
167 if state.logic.is_complete() {
168 trace!("received all chunks!")
169 }
170 Ok(())
171 } else {
172 Err(FrontLogicError::UnknownTransferId(chunk_data.transfer_id))
173 }
174 }
175 }
176 }
177
178 pub fn send(&mut self) -> Option<ReceiverToSenderFrontCommands> {
179 if self.should_reply_ack {
180 self.should_reply_ack = false;
181 let transfer_id = self.state.as_ref().unwrap().transfer_id.0;
182 Some(ReceiverToSenderFrontCommands::AckStart(transfer_id))
183 } else if let Some(state) = self.state.as_mut() {
184 let ack = &state.logic.send();
185 Some(ReceiverToSenderFrontCommands::AckChunk(AckChunkFrontData {
186 transfer_id: state.transfer_id,
187 data: *ack,
188 }))
189 } else {
190 None
191 }
192 }
193
194 /// Retrieves the full blob data if all chunks have been received.
195 ///
196 /// # Returns
197 ///
198 /// An `Some(&[u8])` containing the full blob data if all chunks have been received,
199 /// or `None` if the blob is incomplete.
200 #[must_use]
201 pub fn blob(&self) -> Option<&[u8]> {
202 self.state.as_ref().and_then(|state| state.logic.blob())
203 }
204
205 #[must_use]
206 pub fn info(&self) -> Option<Info> {
207 self.state.as_ref().map(|s| {
208 let info = s.logic.info();
209 Info {
210 transfer_id: s.transfer_id,
211 fixed_chunk_size: info.chunk_octet_size,
212 octet_count: info.total_octet_size,
213 chunk_count_received: info.chunk_count_received,
214 waiting_for_chunk_index: info.waiting_for_chunk_index,
215 }
216 })
217 }
218}