nimble_blob_stream/
in_logic_front.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
/*
 * Copyright (c) Peter Bjorklund. All rights reserved. https://github.com/nimble-rust/nimble
 * Licensed under the MIT License. See LICENSE in the project root for license information.
 */
use crate::in_logic::Logic;
use crate::prelude::BlobError;
use crate::protocol::TransferId;
use crate::protocol_front::{
    AckChunkFrontData, ReceiverToSenderFrontCommands, SenderToReceiverFrontCommands,
};
use crate::ChunkIndex;
use err_rs::{ErrorLevel, ErrorLevelProvider};
use log::{debug, trace};
use std::io;

#[derive(Debug)]
pub enum FrontLogicError {
    IoError(io::Error),
    BlobError(BlobError),
    UnknownTransferId(TransferId),
    ChunkSizeCanNotBeZero,
}

impl ErrorLevelProvider for FrontLogicError {
    fn error_level(&self) -> ErrorLevel {
        match self {
            Self::IoError(_)
            | Self::ChunkSizeCanNotBeZero
            | Self::BlobError(_)
            | Self::UnknownTransferId(_) => ErrorLevel::Info,
        }
    }
}

impl From<BlobError> for FrontLogicError {
    fn from(err: BlobError) -> Self {
        Self::BlobError(err)
    }
}

pub struct Info {
    pub transfer_id: TransferId,
    pub fixed_chunk_size: u16,
    pub octet_count: usize,
    pub chunk_count_received: u32,
    pub waiting_for_chunk_index: ChunkIndex,
}

#[derive(Debug)]
pub struct State {
    transfer_id: TransferId,
    logic: Logic,
}

/// `Logic` handles the logic for receiving and processing chunks of data
/// in a streaming context. It manages the internal state and interactions
/// between the sender and receiver commands.
#[derive(Debug, Default)]
pub struct FrontLogic {
    state: Option<State>,
    should_reply_ack: bool,
}

impl FrontLogic {
    /// Creates a new `InLogicFront` instance with the specified `octet_count` and `chunk_size`.
    ///
    /// # Arguments
    ///
    /// * `octet_count` - The total number of octets (bytes) expected in the stream.
    /// * `chunk_size` - The size of each chunk in the stream.
    ///
    /// # Returns
    ///
    /// A new `InLogicFront` instance.
    ///
    #[must_use]
    pub const fn new() -> Self {
        Self {
            state: None,
            should_reply_ack: false,
        }
    }

    /// Updates the internal state based on a `SenderToReceiverFrontCommands` command.
    ///
    /// This method processes either a `StartTransfer` or `SetChunk` command sent by the sender.
    /// If a `StartTransfer` command is received, the current state (including `transfer_id` and
    /// `logic`) is reinitialized if necessary. If a `SetChunk` command is received, it applies
    /// the chunk of data to the current logic.
    ///
    /// # Arguments
    ///
    /// * `command` - A command sent by the sender to either start a new transfer or update
    ///               an existing one with a chunk of data.
    ///
    /// # Returns
    ///
    /// On success, this method returns a corresponding response:
    /// * If a `StartTransfer` command is processed, it returns `AckStart` with the `transfer_id`.
    /// * If a `SetChunk` command is processed successfully, it returns `AckChunk` with information
    ///   on the last chunk received in order as well as a receive-mask for up to 64 chunks
    ///   after that.
    ///
    /// # Errors
    ///
    /// This function returns an `io::Error` in the following cases:
    /// * If a `SetChunk` command is received and the transfer state has not been initialized
    ///   (i.e., no `StartTransfer` has been processed), it returns an `io::Error` with
    ///   `ErrorKind::InvalidData` and a message indicating that the `transfer_id` is unknown.
    ///
    /// * Any I/O error encountered during the update of the logic will be propagated.
    ///
    /// # Example
    ///
    /// ```
    /// use nimble_blob_stream::in_logic_front::FrontLogic;
    /// use nimble_blob_stream::protocol::StartTransferData;
    /// use nimble_blob_stream::protocol_front::SenderToReceiverFrontCommands;
    ///
    /// let mut logic_front = FrontLogic::new();
    ///
    /// let start_command = SenderToReceiverFrontCommands::StartTransfer(StartTransferData {
    ///     transfer_id: 1234,
    ///     total_octet_size: 1024,
    ///     chunk_size: 256,
    /// });
    ///
    /// let response = logic_front.receive(&start_command);
    /// assert!(response.is_ok());
    /// ```
    pub fn receive(
        &mut self,
        command: &SenderToReceiverFrontCommands,
    ) -> Result<(), FrontLogicError> {
        match command {
            SenderToReceiverFrontCommands::StartTransfer(start_transfer_data) => {
                if self
                    .state
                    .as_ref()
                    .map_or(true, |s| s.transfer_id.0 != start_transfer_data.transfer_id)
                {
                    debug!(
                        "received a start transfer for {}. sending ack.",
                        start_transfer_data.transfer_id
                    );

                    if start_transfer_data.chunk_size == 0 {
                        Err(FrontLogicError::ChunkSizeCanNotBeZero)?;
                    }
                    // Either logic is not set or the transfer_id is different, so we start with a fresh InLogic.
                    self.state = Some(State {
                        transfer_id: TransferId(start_transfer_data.transfer_id),
                        logic: Logic::new(
                            start_transfer_data.total_octet_size as usize,
                            start_transfer_data.chunk_size,
                        ),
                    });
                    self.should_reply_ack = true;
                }
                Ok(())
            }
            SenderToReceiverFrontCommands::SetChunk(chunk_data) => {
                if let Some(ref mut state) = self.state {
                    trace!(
                        "received chunk {}  (transfer:{})",
                        chunk_data.data.chunk_index,
                        chunk_data.transfer_id.0
                    );
                    state.logic.receive(&chunk_data.data)?;
                    if state.logic.is_complete() {
                        trace!("received all chunks!");
                    }
                    Ok(())
                } else {
                    Err(FrontLogicError::UnknownTransferId(chunk_data.transfer_id))
                }
            }
        }
    }

    pub fn send(&mut self) -> Option<ReceiverToSenderFrontCommands> {
        if self.should_reply_ack {
            self.should_reply_ack = false;
            let transfer_id = self.state.as_ref()?.transfer_id.0;
            Some(ReceiverToSenderFrontCommands::AckStart(transfer_id))
        } else if let Some(state) = self.state.as_mut() {
            let ack = &state.logic.send();
            Some(ReceiverToSenderFrontCommands::AckChunk(AckChunkFrontData {
                transfer_id: state.transfer_id,
                data: *ack,
            }))
        } else {
            None
        }
    }

    /// Retrieves the full blob data if all chunks have been received.
    ///
    /// # Returns
    ///
    /// An `Some(&[u8])` containing the full blob data if all chunks have been received,
    /// or `None` if the blob is incomplete.
    #[must_use]
    pub fn blob(&self) -> Option<&[u8]> {
        self.state.as_ref().and_then(|state| state.logic.blob())
    }

    #[must_use]
    pub fn info(&self) -> Option<Info> {
        self.state.as_ref().map(|s| {
            let info = s.logic.info();
            Info {
                transfer_id: s.transfer_id,
                fixed_chunk_size: info.chunk_octet_size,
                octet_count: info.total_octet_size,
                chunk_count_received: info.chunk_count_received,
                waiting_for_chunk_index: info.waiting_for_chunk_index,
            }
        })
    }
}