nimble_blob_stream/
in_logic_front.rs

1/*
2 * Copyright (c) Peter Bjorklund. All rights reserved. https://github.com/nimble-rust/nimble
3 * Licensed under the MIT License. See LICENSE in the project root for license information.
4 */
5use crate::in_logic::Logic;
6use crate::prelude::BlobError;
7use crate::protocol::TransferId;
8use crate::protocol_front::{
9    AckChunkFrontData, ReceiverToSenderFrontCommands, SenderToReceiverFrontCommands,
10};
11use crate::ChunkIndex;
12use err_rs::{ErrorLevel, ErrorLevelProvider};
13use log::{debug, trace};
14use std::io;
15
16#[derive(Debug)]
17pub enum FrontLogicError {
18    IoError(io::Error),
19    BlobError(BlobError),
20    UnknownTransferId(TransferId),
21    ChunkSizeCanNotBeZero,
22}
23
24impl ErrorLevelProvider for FrontLogicError {
25    fn error_level(&self) -> ErrorLevel {
26        match self {
27            Self::IoError(_)
28            | Self::ChunkSizeCanNotBeZero
29            | Self::BlobError(_)
30            | Self::UnknownTransferId(_) => ErrorLevel::Info,
31        }
32    }
33}
34
35impl From<BlobError> for FrontLogicError {
36    fn from(err: BlobError) -> Self {
37        Self::BlobError(err)
38    }
39}
40
41pub struct Info {
42    pub transfer_id: TransferId,
43    pub fixed_chunk_size: u16,
44    pub octet_count: usize,
45    pub chunk_count_received: u32,
46    pub waiting_for_chunk_index: ChunkIndex,
47}
48
49#[derive(Debug)]
50pub struct State {
51    transfer_id: TransferId,
52    logic: Logic,
53}
54
55/// `Logic` handles the logic for receiving and processing chunks of data
56/// in a streaming context. It manages the internal state and interactions
57/// between the sender and receiver commands.
58#[derive(Debug, Default)]
59pub struct FrontLogic {
60    state: Option<State>,
61    should_reply_ack: bool,
62}
63
64impl FrontLogic {
65    /// Creates a new `InLogicFront` instance with the specified `octet_count` and `chunk_size`.
66    ///
67    /// # Arguments
68    ///
69    /// * `octet_count` - The total number of octets (bytes) expected in the stream.
70    /// * `chunk_size` - The size of each chunk in the stream.
71    ///
72    /// # Returns
73    ///
74    /// A new `InLogicFront` instance.
75    ///
76    #[must_use]
77    pub const fn new() -> Self {
78        Self {
79            state: None,
80            should_reply_ack: false,
81        }
82    }
83
84    /// Updates the internal state based on a `SenderToReceiverFrontCommands` command.
85    ///
86    /// This method processes either a `StartTransfer` or `SetChunk` command sent by the sender.
87    /// If a `StartTransfer` command is received, the current state (including `transfer_id` and
88    /// `logic`) is reinitialized if necessary. If a `SetChunk` command is received, it applies
89    /// the chunk of data to the current logic.
90    ///
91    /// # Arguments
92    ///
93    /// * `command` - A command sent by the sender to either start a new transfer or update
94    ///               an existing one with a chunk of data.
95    ///
96    /// # Returns
97    ///
98    /// On success, this method returns a corresponding response:
99    /// * If a `StartTransfer` command is processed, it returns `AckStart` with the `transfer_id`.
100    /// * If a `SetChunk` command is processed successfully, it returns `AckChunk` with information
101    ///   on the last chunk received in order as well as a receive-mask for up to 64 chunks
102    ///   after that.
103    ///
104    /// # Errors
105    ///
106    /// This function returns an `io::Error` in the following cases:
107    /// * If a `SetChunk` command is received and the transfer state has not been initialized
108    ///   (i.e., no `StartTransfer` has been processed), it returns an `io::Error` with
109    ///   `ErrorKind::InvalidData` and a message indicating that the `transfer_id` is unknown.
110    ///
111    /// * Any I/O error encountered during the update of the logic will be propagated.
112    ///
113    /// # Example
114    ///
115    /// ```
116    /// use nimble_blob_stream::in_logic_front::FrontLogic;
117    /// use nimble_blob_stream::protocol::StartTransferData;
118    /// use nimble_blob_stream::protocol_front::SenderToReceiverFrontCommands;
119    ///
120    /// let mut logic_front = FrontLogic::new();
121    ///
122    /// let start_command = SenderToReceiverFrontCommands::StartTransfer(StartTransferData {
123    ///     transfer_id: 1234,
124    ///     total_octet_size: 1024,
125    ///     chunk_size: 256,
126    /// });
127    ///
128    /// let response = logic_front.receive(&start_command);
129    /// assert!(response.is_ok());
130    /// ```
131    pub fn receive(
132        &mut self,
133        command: &SenderToReceiverFrontCommands,
134    ) -> Result<(), FrontLogicError> {
135        match command {
136            SenderToReceiverFrontCommands::StartTransfer(start_transfer_data) => {
137                if self
138                    .state
139                    .as_ref()
140                    .map_or(true, |s| s.transfer_id.0 != start_transfer_data.transfer_id)
141                {
142                    debug!(
143                        "received a start transfer for {}. sending ack.",
144                        start_transfer_data.transfer_id
145                    );
146
147                    if start_transfer_data.chunk_size == 0 {
148                        Err(FrontLogicError::ChunkSizeCanNotBeZero)?;
149                    }
150                    // Either logic is not set or the transfer_id is different, so we start with a fresh InLogic.
151                    self.state = Some(State {
152                        transfer_id: TransferId(start_transfer_data.transfer_id),
153                        logic: Logic::new(
154                            start_transfer_data.total_octet_size as usize,
155                            start_transfer_data.chunk_size,
156                        ),
157                    });
158                    self.should_reply_ack = true;
159                }
160                Ok(())
161            }
162            SenderToReceiverFrontCommands::SetChunk(chunk_data) => {
163                if let Some(ref mut state) = self.state {
164                    trace!(
165                        "received chunk {}  (transfer:{})",
166                        chunk_data.data.chunk_index,
167                        chunk_data.transfer_id.0
168                    );
169                    state.logic.receive(&chunk_data.data)?;
170                    if state.logic.is_complete() {
171                        trace!("received all chunks!");
172                    }
173                    Ok(())
174                } else {
175                    Err(FrontLogicError::UnknownTransferId(chunk_data.transfer_id))
176                }
177            }
178        }
179    }
180
181    pub fn send(&mut self) -> Option<ReceiverToSenderFrontCommands> {
182        if self.should_reply_ack {
183            self.should_reply_ack = false;
184            let transfer_id = self.state.as_ref()?.transfer_id.0;
185            Some(ReceiverToSenderFrontCommands::AckStart(transfer_id))
186        } else if let Some(state) = self.state.as_mut() {
187            let ack = &state.logic.send();
188            Some(ReceiverToSenderFrontCommands::AckChunk(AckChunkFrontData {
189                transfer_id: state.transfer_id,
190                data: *ack,
191            }))
192        } else {
193            None
194        }
195    }
196
197    /// Retrieves the full blob data if all chunks have been received.
198    ///
199    /// # Returns
200    ///
201    /// An `Some(&[u8])` containing the full blob data if all chunks have been received,
202    /// or `None` if the blob is incomplete.
203    #[must_use]
204    pub fn blob(&self) -> Option<&[u8]> {
205        self.state.as_ref().and_then(|state| state.logic.blob())
206    }
207
208    #[must_use]
209    pub fn info(&self) -> Option<Info> {
210        self.state.as_ref().map(|s| {
211            let info = s.logic.info();
212            Info {
213                transfer_id: s.transfer_id,
214                fixed_chunk_size: info.chunk_octet_size,
215                octet_count: info.total_octet_size,
216                chunk_count_received: info.chunk_count_received,
217                waiting_for_chunk_index: info.waiting_for_chunk_index,
218            }
219        })
220    }
221}