nimble_blob_stream/
out_logic_front.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
/*
 * Copyright (c) Peter Bjorklund. All rights reserved. https://github.com/nimble-rust/nimble
 * Licensed under the MIT License. See LICENSE in the project root for license information.
 */
use crate::out_logic::Logic;
use crate::out_stream::OutStreamError;
use crate::prelude::{
    ReceiverToSenderFrontCommands, SenderToReceiverFrontCommands, StartTransferData, TransferId,
};
use log::{debug, trace};
use monotonic_time_rs::Millis;
use std::time::Duration;

#[derive(Debug)]
pub enum Phase {
    StartTransfer,
    Transfer,
}

#[allow(unused)]
#[derive(Debug)]
pub struct OutLogicFront {
    out_stream: Logic,
    phase: Phase,
    transfer_id: TransferId,
}

impl OutLogicFront {
    #[allow(unused)]
    pub fn new(
        transfer_id: TransferId,
        fixed_chunk_size: usize,
        resend_duration: Duration,
        blob: &[u8],
    ) -> Self {
        Self {
            out_stream: Logic::new(transfer_id, fixed_chunk_size, resend_duration, blob),
            phase: Phase::StartTransfer,
            transfer_id,
        }
    }

    pub fn receive(
        &mut self,
        command: &ReceiverToSenderFrontCommands,
    ) -> Result<(), OutStreamError> {
        match self.phase {
            Phase::StartTransfer => {
                if let ReceiverToSenderFrontCommands::AckStart(ack_transfer_id) = command {
                    if self.transfer_id.0 == *ack_transfer_id {
                        debug!("received ack for correct transfer id {ack_transfer_id}, start transfer");
                        self.phase = Phase::Transfer;
                    } else {
                        debug!(
                            "received ack for wrong transfer id {ack_transfer_id}, start transfer"
                        );
                    }
                }
            }
            Phase::Transfer => match command {
                ReceiverToSenderFrontCommands::AckChunk(ack_chunk_front) => {
                    self.out_stream.set_waiting_for_chunk_index(
                        ack_chunk_front.data.waiting_for_chunk_index as usize,
                        ack_chunk_front.data.receive_mask_after_last,
                    )?;
                    if self.out_stream.is_received_by_remote() {
                        trace!("blob stream is received by remote! {}", self.transfer_id.0);
                    }
                }
                ReceiverToSenderFrontCommands::AckStart(_) => {}
            },
        }
        Ok(())
    }

    #[allow(unused)]
    pub fn send(
        &mut self,
        now: Millis,
    ) -> Result<Vec<SenderToReceiverFrontCommands>, OutStreamError> {
        match self.phase {
            Phase::StartTransfer => {
                debug!("send start transfer {}", self.transfer_id.0);
                Ok(vec![SenderToReceiverFrontCommands::StartTransfer(
                    StartTransferData {
                        transfer_id: self.transfer_id.0,
                        total_octet_size: self.out_stream.octet_size() as u32,
                        chunk_size: self.out_stream.chunk_size() as u16,
                    },
                )])
            }

            Phase::Transfer => {
                const MAX_CHUNK_COUNT_EACH_SEND: usize = 10;
                let set_chunks: Vec<_> = self
                    .out_stream
                    .send(now, MAX_CHUNK_COUNT_EACH_SEND)
                    .iter()
                    .map(|front_data| SenderToReceiverFrontCommands::SetChunk(front_data.clone()))
                    .collect();
                for set_chunk in &set_chunks {
                    match set_chunk {
                        SenderToReceiverFrontCommands::SetChunk(front_data) => {
                            trace!(
                                "sending chunk {}  (transfer:{})",
                                front_data.data.chunk_index,
                                front_data.transfer_id.0
                            );
                        }
                        _ => panic!("Unexpected enum variant: {:?}", set_chunk),
                    }
                }
                Ok(set_chunks)
            }
        }
    }

    #[must_use]
    pub fn is_received_by_remote(&self) -> bool {
        self.out_stream.is_received_by_remote()
    }

    #[must_use]
    pub fn transfer_id(&self) -> TransferId {
        self.out_stream.transfer_id()
    }
}