blob_stream/
in_logic_front.rs

1/*
2 * Copyright (c) Peter Bjorklund. All rights reserved. https://github.com/nimble-rust/nimble
3 * Licensed under the MIT License. See LICENSE in the project root for license information.
4 */
5use crate::in_logic::Logic;
6use crate::prelude::BlobError;
7use crate::protocol::TransferId;
8use crate::protocol_front::{
9    AckChunkFrontData, ReceiverToSenderFrontCommands, SenderToReceiverFrontCommands,
10};
11use crate::ChunkIndex;
12use err_rs::{ErrorLevel, ErrorLevelProvider};
13use log::{debug, trace};
14use std::io;
15
16#[derive(Debug)]
17pub enum FrontLogicError {
18    IoError(io::Error),
19    BlobError(BlobError),
20    UnknownTransferId(TransferId),
21    ChunkSizeCanNotBeZero,
22}
23
24impl ErrorLevelProvider for FrontLogicError {
25    fn error_level(&self) -> ErrorLevel {
26        match self {
27            FrontLogicError::IoError(_) => ErrorLevel::Info,
28            FrontLogicError::BlobError(_) => ErrorLevel::Info,
29            FrontLogicError::UnknownTransferId(_) => ErrorLevel::Info,
30            FrontLogicError::ChunkSizeCanNotBeZero => ErrorLevel::Info,
31        }
32    }
33}
34
35pub struct Info {
36    pub transfer_id: TransferId,
37    pub fixed_chunk_size: usize,
38    pub octet_count: usize,
39    pub chunk_count_received: usize,
40    pub waiting_for_chunk_index: ChunkIndex,
41}
42
43#[derive(Debug)]
44pub struct State {
45    transfer_id: TransferId,
46    logic: Logic,
47}
48
49/// `Logic` handles the logic for receiving and processing chunks of data
50/// in a streaming context. It manages the internal state and interactions
51/// between the sender and receiver commands.
52#[derive(Debug, Default)]
53pub struct FrontLogic {
54    state: Option<State>,
55    should_reply_ack: bool,
56}
57
58impl FrontLogic {
59    /// Creates a new `InLogicFront` instance with the specified `octet_count` and `chunk_size`.
60    ///
61    /// # Arguments
62    ///
63    /// * `octet_count` - The total number of octets (bytes) expected in the stream.
64    /// * `chunk_size` - The size of each chunk in the stream.
65    ///
66    /// # Returns
67    ///
68    /// A new `InLogicFront` instance.
69    ///
70    #[must_use]
71    pub const fn new() -> Self {
72        Self {
73            state: None,
74            should_reply_ack: false,
75        }
76    }
77
78    /// Updates the internal state based on a `SenderToReceiverFrontCommands` command.
79    ///
80    /// This method processes either a `StartTransfer` or `SetChunk` command sent by the sender.
81    /// If a `StartTransfer` command is received, the current state (including `transfer_id` and
82    /// `logic`) is reinitialized if necessary. If a `SetChunk` command is received, it applies
83    /// the chunk of data to the current logic.
84    ///
85    /// # Arguments
86    ///
87    /// * `command` - A command sent by the sender to either start a new transfer or update
88    ///               an existing one with a chunk of data.
89    ///
90    /// # Returns
91    ///
92    /// On success, this method returns a corresponding response:
93    /// * If a `StartTransfer` command is processed, it returns `AckStart` with the `transfer_id`.
94    /// * If a `SetChunk` command is processed successfully, it returns `AckChunk` with information
95    ///   on the last chunk received in order as well as a receive-mask for up to 64 chunks
96    ///   after that.
97    ///
98    /// # Errors
99    ///
100    /// This function returns an `io::Error` in the following cases:
101    /// * If a `SetChunk` command is received and the transfer state has not been initialized
102    ///   (i.e., no `StartTransfer` has been processed), it returns an `io::Error` with
103    ///   `ErrorKind::InvalidData` and a message indicating that the `transfer_id` is unknown.
104    ///
105    /// * Any I/O error encountered during the update of the logic will be propagated.
106    ///
107    /// # Example
108    ///
109    /// ```
110    /// use blob_stream::in_logic_front::FrontLogic;
111    /// use blob_stream::protocol::StartTransferData;
112    /// use blob_stream::protocol_front::SenderToReceiverFrontCommands;
113    ///
114    /// let mut logic_front = FrontLogic::new();
115    ///
116    /// let start_command = SenderToReceiverFrontCommands::StartTransfer(StartTransferData {
117    ///     transfer_id: 1234,
118    ///     total_octet_size: 1024,
119    ///     chunk_size: 256,
120    /// });
121    ///
122    /// let response = logic_front.receive(&start_command);
123    /// assert!(response.is_ok());
124    /// ```
125    pub fn receive(
126        &mut self,
127        command: &SenderToReceiverFrontCommands,
128    ) -> Result<(), FrontLogicError> {
129        match command {
130            SenderToReceiverFrontCommands::StartTransfer(start_transfer_data) => {
131                if self
132                    .state
133                    .as_ref()
134                    .map_or(true, |s| s.transfer_id.0 != start_transfer_data.transfer_id)
135                {
136                    debug!(
137                        "received a start transfer for {}. sending ack.",
138                        start_transfer_data.transfer_id
139                    );
140
141                    if start_transfer_data.chunk_size == 0 {
142                        Err(FrontLogicError::ChunkSizeCanNotBeZero)?;
143                    }
144                    // Either logic is not set or the transfer_id is different, so we start with a fresh InLogic.
145                    self.state = Some(State {
146                        transfer_id: TransferId(start_transfer_data.transfer_id),
147                        logic: Logic::new(
148                            start_transfer_data.total_octet_size as usize,
149                            start_transfer_data.chunk_size as usize,
150                        ),
151                    });
152                    self.should_reply_ack = true;
153                }
154                Ok(())
155            }
156            SenderToReceiverFrontCommands::SetChunk(chunk_data) => {
157                if let Some(ref mut state) = self.state {
158                    trace!(
159                        "received chunk {}  (transfer:{})",
160                        chunk_data.data.chunk_index,
161                        chunk_data.transfer_id.0
162                    );
163                    state
164                        .logic
165                        .receive(&chunk_data.data)
166                        .map_err(FrontLogicError::BlobError)?;
167                    if state.logic.is_complete() {
168                        trace!("received all chunks!")
169                    }
170                    Ok(())
171                } else {
172                    Err(FrontLogicError::UnknownTransferId(chunk_data.transfer_id))
173                }
174            }
175        }
176    }
177
178    pub fn send(&mut self) -> Option<ReceiverToSenderFrontCommands> {
179        if self.should_reply_ack {
180            self.should_reply_ack = false;
181            let transfer_id = self.state.as_ref().unwrap().transfer_id.0;
182            Some(ReceiverToSenderFrontCommands::AckStart(transfer_id))
183        } else if let Some(state) = self.state.as_mut() {
184            let ack = &state.logic.send();
185            Some(ReceiverToSenderFrontCommands::AckChunk(AckChunkFrontData {
186                transfer_id: state.transfer_id,
187                data: *ack,
188            }))
189        } else {
190            None
191        }
192    }
193
194    /// Retrieves the full blob data if all chunks have been received.
195    ///
196    /// # Returns
197    ///
198    /// An `Some(&[u8])` containing the full blob data if all chunks have been received,
199    /// or `None` if the blob is incomplete.
200    #[must_use]
201    pub fn blob(&self) -> Option<&[u8]> {
202        self.state.as_ref().and_then(|state| state.logic.blob())
203    }
204
205    #[must_use]
206    pub fn info(&self) -> Option<Info> {
207        self.state.as_ref().map(|s| {
208            let info = s.logic.info();
209            Info {
210                transfer_id: s.transfer_id,
211                fixed_chunk_size: info.chunk_octet_size,
212                octet_count: info.total_octet_size,
213                chunk_count_received: info.chunk_count_received,
214                waiting_for_chunk_index: info.waiting_for_chunk_index,
215            }
216        })
217    }
218}