nimble_blob_stream/in_logic_front.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221
/*
* Copyright (c) Peter Bjorklund. All rights reserved. https://github.com/nimble-rust/nimble
* Licensed under the MIT License. See LICENSE in the project root for license information.
*/
use crate::in_logic::Logic;
use crate::prelude::BlobError;
use crate::protocol::TransferId;
use crate::protocol_front::{
AckChunkFrontData, ReceiverToSenderFrontCommands, SenderToReceiverFrontCommands,
};
use crate::ChunkIndex;
use err_rs::{ErrorLevel, ErrorLevelProvider};
use log::{debug, trace};
use std::io;
#[derive(Debug)]
pub enum FrontLogicError {
IoError(io::Error),
BlobError(BlobError),
UnknownTransferId(TransferId),
ChunkSizeCanNotBeZero,
}
impl ErrorLevelProvider for FrontLogicError {
fn error_level(&self) -> ErrorLevel {
match self {
Self::IoError(_)
| Self::ChunkSizeCanNotBeZero
| Self::BlobError(_)
| Self::UnknownTransferId(_) => ErrorLevel::Info,
}
}
}
impl From<BlobError> for FrontLogicError {
fn from(err: BlobError) -> Self {
Self::BlobError(err)
}
}
pub struct Info {
pub transfer_id: TransferId,
pub fixed_chunk_size: u16,
pub octet_count: usize,
pub chunk_count_received: u32,
pub waiting_for_chunk_index: ChunkIndex,
}
#[derive(Debug)]
pub struct State {
transfer_id: TransferId,
logic: Logic,
}
/// `Logic` handles the logic for receiving and processing chunks of data
/// in a streaming context. It manages the internal state and interactions
/// between the sender and receiver commands.
#[derive(Debug, Default)]
pub struct FrontLogic {
state: Option<State>,
should_reply_ack: bool,
}
impl FrontLogic {
/// Creates a new `InLogicFront` instance with the specified `octet_count` and `chunk_size`.
///
/// # Arguments
///
/// * `octet_count` - The total number of octets (bytes) expected in the stream.
/// * `chunk_size` - The size of each chunk in the stream.
///
/// # Returns
///
/// A new `InLogicFront` instance.
///
#[must_use]
pub const fn new() -> Self {
Self {
state: None,
should_reply_ack: false,
}
}
/// Updates the internal state based on a `SenderToReceiverFrontCommands` command.
///
/// This method processes either a `StartTransfer` or `SetChunk` command sent by the sender.
/// If a `StartTransfer` command is received, the current state (including `transfer_id` and
/// `logic`) is reinitialized if necessary. If a `SetChunk` command is received, it applies
/// the chunk of data to the current logic.
///
/// # Arguments
///
/// * `command` - A command sent by the sender to either start a new transfer or update
/// an existing one with a chunk of data.
///
/// # Returns
///
/// On success, this method returns a corresponding response:
/// * If a `StartTransfer` command is processed, it returns `AckStart` with the `transfer_id`.
/// * If a `SetChunk` command is processed successfully, it returns `AckChunk` with information
/// on the last chunk received in order as well as a receive-mask for up to 64 chunks
/// after that.
///
/// # Errors
///
/// This function returns an `io::Error` in the following cases:
/// * If a `SetChunk` command is received and the transfer state has not been initialized
/// (i.e., no `StartTransfer` has been processed), it returns an `io::Error` with
/// `ErrorKind::InvalidData` and a message indicating that the `transfer_id` is unknown.
///
/// * Any I/O error encountered during the update of the logic will be propagated.
///
/// # Example
///
/// ```
/// use nimble_blob_stream::in_logic_front::FrontLogic;
/// use nimble_blob_stream::protocol::StartTransferData;
/// use nimble_blob_stream::protocol_front::SenderToReceiverFrontCommands;
///
/// let mut logic_front = FrontLogic::new();
///
/// let start_command = SenderToReceiverFrontCommands::StartTransfer(StartTransferData {
/// transfer_id: 1234,
/// total_octet_size: 1024,
/// chunk_size: 256,
/// });
///
/// let response = logic_front.receive(&start_command);
/// assert!(response.is_ok());
/// ```
pub fn receive(
&mut self,
command: &SenderToReceiverFrontCommands,
) -> Result<(), FrontLogicError> {
match command {
SenderToReceiverFrontCommands::StartTransfer(start_transfer_data) => {
if self
.state
.as_ref()
.map_or(true, |s| s.transfer_id.0 != start_transfer_data.transfer_id)
{
debug!(
"received a start transfer for {}. sending ack.",
start_transfer_data.transfer_id
);
if start_transfer_data.chunk_size == 0 {
Err(FrontLogicError::ChunkSizeCanNotBeZero)?;
}
// Either logic is not set or the transfer_id is different, so we start with a fresh InLogic.
self.state = Some(State {
transfer_id: TransferId(start_transfer_data.transfer_id),
logic: Logic::new(
start_transfer_data.total_octet_size as usize,
start_transfer_data.chunk_size,
),
});
self.should_reply_ack = true;
}
Ok(())
}
SenderToReceiverFrontCommands::SetChunk(chunk_data) => {
if let Some(ref mut state) = self.state {
trace!(
"received chunk {} (transfer:{})",
chunk_data.data.chunk_index,
chunk_data.transfer_id.0
);
state.logic.receive(&chunk_data.data)?;
if state.logic.is_complete() {
trace!("received all chunks!");
}
Ok(())
} else {
Err(FrontLogicError::UnknownTransferId(chunk_data.transfer_id))
}
}
}
}
pub fn send(&mut self) -> Option<ReceiverToSenderFrontCommands> {
if self.should_reply_ack {
self.should_reply_ack = false;
let transfer_id = self.state.as_ref()?.transfer_id.0;
Some(ReceiverToSenderFrontCommands::AckStart(transfer_id))
} else if let Some(state) = self.state.as_mut() {
let ack = &state.logic.send();
Some(ReceiverToSenderFrontCommands::AckChunk(AckChunkFrontData {
transfer_id: state.transfer_id,
data: *ack,
}))
} else {
None
}
}
/// Retrieves the full blob data if all chunks have been received.
///
/// # Returns
///
/// An `Some(&[u8])` containing the full blob data if all chunks have been received,
/// or `None` if the blob is incomplete.
#[must_use]
pub fn blob(&self) -> Option<&[u8]> {
self.state.as_ref().and_then(|state| state.logic.blob())
}
#[must_use]
pub fn info(&self) -> Option<Info> {
self.state.as_ref().map(|s| {
let info = s.logic.info();
Info {
transfer_id: s.transfer_id,
fixed_chunk_size: info.chunk_octet_size,
octet_count: info.total_octet_size,
chunk_count_received: info.chunk_count_received,
waiting_for_chunk_index: info.waiting_for_chunk_index,
}
})
}
}