1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
//! Contains the [ValidatorDispatcher], that manages [LinkValidator]s and iterates over and comnsumes a [`data_wrapper::CdpChunk<T>`], dispatching the data to the correct thread based on the Link ID running an instance of [LinkValidator].
use super::link_validator::LinkValidator;
use crate::{
    input::data_wrapper,
    stats::lib::StatType,
    util::{
        self,
        config::check::{CheckCommands, System},
    },
    words::lib::RDH,
};
type CdpTuple<T> = (T, Vec<u8>, u64);

/// The [ValidatorDispatcher] is responsible for creating and managing the [LinkValidator] threads.
///
/// It receives a [`data_wrapper::CdpChunk<T>`] and dispatches the data to the correct thread running an instance of [LinkValidator].
pub struct ValidatorDispatcher<T: RDH, C: util::lib::Config + 'static> {
    processors: Vec<DispatchId>,
    process_channels: Vec<crossbeam_channel::Sender<CdpTuple<T>>>,
    validator_thread_handles: Vec<std::thread::JoinHandle<()>>,
    stats_sender: flume::Sender<StatType>,
    global_config: &'static C,
}

#[derive(PartialEq, Clone, Copy)]
struct DispatchId(u16);

impl<T: RDH + 'static, C: util::lib::Config + 'static> ValidatorDispatcher<T, C> {
    /// Create a new ValidatorDispatcher from a Config and a stats sender channel
    pub fn new(global_config: &'static C, stats_sender: flume::Sender<StatType>) -> Self {
        Self {
            processors: Vec::new(),
            process_channels: Vec::new(),
            validator_thread_handles: Vec::new(),
            stats_sender,
            global_config,
        }
    }

    /// Iterates over and consumes a [`data_wrapper::CdpChunk<T>`], dispatching the data to the correct thread running an instance of [LinkValidator].
    ///
    /// If a link validator thread does not exist for the link id of the current rdh, a new one is spawned
    pub fn dispatch_cdp_chunk(&mut self, cdp_chunk: data_wrapper::CdpChunk<T>) {
        // Iterate over the CDP chunk
        cdp_chunk.into_iter().for_each(|(rdh, data, mem_pos)| {
            // Dispatch by FEE ID if system targeted for checks is ITS Stave (gonna be a lot of data to parse for each stave!)
            let id = if self.global_config.check().is_some_and(|c| {
                if let CheckCommands::All { system } = c {
                    system.is_some_and(|s| s == System::ITS_Stave)
                } else {
                    false
                }
            }) {
                // Dispatch by FEE ID which will effectively dispatch by link AND stave
                DispatchId(rdh.fee_id())
            } else {
                // Dispatch by link ID
                DispatchId(rdh.link_id() as u16)
            };

            self.dispatch_by_id(rdh, data, mem_pos, id);
        });
    }

    fn init_validator(&mut self, id: DispatchId) -> LinkValidator<T, C> {
        // Add a new ID to the list of processors
        self.processors.push(id);

        // Create a new link validator thread to handle a new ID that should be processed
        let (link_validator, send_channel) =
            LinkValidator::<T, C>::new(self.global_config, self.stats_sender.clone());

        // Add the send channel to the new link validator
        self.process_channels.push(send_channel);

        link_validator
    }

    fn dispatch_by_id(&mut self, rdh: T, data: Vec<u8>, mem_pos: u64, id: DispatchId) {
        // Check if the ID to dispatch by is already in the list of processors
        if let Some(index) = self.processors.iter().position(|&proc_id| proc_id == id) {
            // If the ID was found, use its index to send the data through the correct link validator's channel
            self.process_channels
                .get(index)
                .unwrap()
                .send((rdh, data, mem_pos))
                .unwrap();
        } else {
            // If the ID wasn't found, make a new validator to handle that ID
            let mut validator = self.init_validator(id);

            // Spawn a thread where the newly created link validator will run
            self.validator_thread_handles.push(
                std::thread::Builder::new()
                    .name(format!("Validator #{}", id.0))
                    .spawn({
                        move || {
                            validator.run();
                        }
                    })
                    .expect("Failed to spawn link validator thread"),
            );
            // Send the data through the newly created link validator's channel, by taking the last element of the vector
            self.process_channels
                .last()
                .unwrap()
                .send((rdh, data, mem_pos))
                .unwrap();
        }
    }

    /// Disconnects all the link validator's receiver channels and joins all link validator threads
    pub fn join(&mut self) {
        self.process_channels.clear();
        self.validator_thread_handles.drain(..).for_each(|handle| {
            handle.join().expect("Failed to join a validator thread");
        });
    }
}

/// Utility function to preprocess the payload and return an iterator over the GBT words
///
/// Consists of the following steps:
/// 1. Extract the end of payload 0xFF padding
/// 2. Determine if padding is flavor 0 (6 bytes of 0x00 padding following GBT words) or flavor 1 (no padding)
/// 3. Split the payload into GBT words sized slices, using chunks_exact to allow more compiler optimizations
///
/// Arguments:
/// * `payload` - The payload to be processed
/// Returns:
/// * An iterator over the GBT words
pub fn preprocess_payload(payload: &[u8]) -> Result<impl Iterator<Item = &[u8]>, String> {
    let ff_padding = extract_payload_ff_padding(payload)?;

    // Determine if padding is flavor 0 (6 bytes of 0x00 padding following GBT words) or flavor 1 (no padding)
    let detected_data_format = detect_payload_data_format(payload);

    let gbt_word_chunks = chunkify_payload(payload, detected_data_format, ff_padding);
    Ok(gbt_word_chunks)
}

/// Retrieve end of payload 0xFF padding, if it is more than 15 bytes, return an error
fn extract_payload_ff_padding(payload: &[u8]) -> Result<Vec<&u8>, String> {
    let ff_padding = payload
        .iter()
        .rev()
        .take_while(|&x| *x == 0xFF)
        .collect::<Vec<_>>();
    // Exceeds the maximum padding of 15 bytes that is required to pad to 16 bytes
    if ff_padding.len() > 15 {
        return Err(format!("End of payload 0xFF padding is {} bytes, exceeding max of 15 bytes: Skipping current payload",
        ff_padding.len()));
    }
    Ok(ff_padding)
}

/// Determine if padding is flavor 0 (6 bytes of 0x00 padding following GBT words) or flavor 1 (no padding)
fn detect_payload_data_format(payload: &[u8]) -> u8 {
    // Using an iterator approach instead of indexing also supports the case where the payload is smaller than 16 bytes or even empty
    if payload
    .iter() // Create an iterator over the payload
    .skip(10) // Skip the first 10 bytes, meaning the first GBT word
    .take(6) // Take the next 6 bytes
    .take_while(|&x| *x == 0x00) // Take bytes while they are equal to 0x00
    .count() // Count them and check if they are equal to 6
    == 6
    {
        log::trace!("Data format 0 detected");
        0
    } else {
        log::trace!("Data format 2 detected");
        2
    }
}

/// Splits a payload into GBT words sized slices, using chunks_exact to allow more compiler optimizations
fn chunkify_payload<'a>(
    payload: &'a [u8],
    data_format: u8,
    ff_padding: Vec<&'a u8>,
) -> std::slice::ChunksExact<'a, u8> {
    match data_format {
        0 => {
            let chunks = payload.chunks_exact(16);
            // If dataformat 0, dividing into 16 byte chunks should cut the payload up with no remainder
            debug_assert!(chunks.remainder().is_empty());
            chunks
        }
        2 => {
            // If dataformat 2, and the padding is more than 9 bytes, padding will be processed as a GBT word, therefor exclude it from the slice
            //    Before calling chunks_exact
            if ff_padding.len() > 9 {
                let last_idx_before_padding = payload.len() - ff_padding.len();
                let chunks = payload[..last_idx_before_padding].chunks_exact(10);
                debug_assert!(chunks.remainder().is_empty());
                chunks
            } else {
                // Simply divide into 10 byte chunks and assert that the remainder is padding bytes
                let chunks = payload.chunks_exact(10);
                debug_assert!(chunks.remainder().iter().all(|&x| x == 0xFF)); // Asserts that the payload padding is 0xFF
                chunks
            }
        }
        _ => unreachable!("Invalid data format"),
    }
}

#[cfg(test)]
mod tests {
    use crate::input::data_wrapper::CdpChunk;
    use crate::util::config::check::CheckCommands;
    use crate::util::lib::test_util::MockConfig;
    use crate::words::its::test_payloads::*;
    use crate::words::rdh_cru::test_data::CORRECT_RDH_CRU_V7;
    use crate::words::rdh_cru::{RdhCRU, V7};
    use std::sync::OnceLock;

    use super::*;

    static CFG_TEST_DISPACTER: OnceLock<MockConfig> = OnceLock::new();

    #[test]
    fn test_dispacter() {
        let mut cfg = MockConfig::new();
        cfg.check = Some(CheckCommands::Sanity { system: None });
        CFG_TEST_DISPACTER.set(cfg).unwrap();

        let mut disp: ValidatorDispatcher<RdhCRU<V7>, MockConfig> =
            ValidatorDispatcher::new(CFG_TEST_DISPACTER.get().unwrap(), flume::unbounded().0);

        let cdp_tuple: CdpTuple<RdhCRU<V7>> = (CORRECT_RDH_CRU_V7, vec![0; 100], 0);

        let mut cdp_chunk = CdpChunk::new();
        cdp_chunk.push_tuple(cdp_tuple);

        disp.dispatch_cdp_chunk(cdp_chunk);

        disp.join();
    }

    #[test]
    fn test_preprocess_payload_flavors() {
        let word_chunk_f0 = preprocess_payload(&START_PAYLOAD_FLAVOR_0).unwrap();
        let word_chunks_f2 = preprocess_payload(&START_PAYLOAD_FLAVOR_2).unwrap();

        let word_count = word_chunk_f0.count();
        let word_count_f2 = word_chunks_f2.count();

        assert_eq!(word_count, 2);
        assert_eq!(word_count_f2, 2);
    }

    #[test]
    fn test_extract_payload_padding() {
        let end_payload_flavor_0_padding =
            extract_payload_ff_padding(&END_PAYLOAD_FLAVOR_0).unwrap();
        let end_payload_flavor_2_padding =
            extract_payload_ff_padding(&END_PAYLOAD_FLAVOR_2).unwrap();

        assert!(end_payload_flavor_0_padding.is_empty());
        assert_eq!(end_payload_flavor_2_padding.len(), 6);
    }

    #[test]
    fn test_detect_payload_data_format() {
        let detected_data_format_f0 = detect_payload_data_format(&START_PAYLOAD_FLAVOR_0);
        let detected_data_format_f2 = detect_payload_data_format(&START_PAYLOAD_FLAVOR_2);

        assert_eq!(detected_data_format_f0, 0);
        assert_eq!(detected_data_format_f2, 2);
    }
}