evolution-mocker 1.3.0

Mocking components of evolution.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
//
// MIT License
//
// Copyright (c) 2024 Firelink Data
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE.
//
// File created: 2024-02-05
// Last updated: 2024-10-13
//

use crossbeam::channel;

use evolution_common::error::{Result, SetupError};
use evolution_common::{newline, NUM_BYTES_FOR_NEWLINE};
use evolution_schema::schema::FixedSchema;
use evolution_writer::writer::{FixedLengthFileWriter, FixedLengthFileWriterProperties, Writer};
use log::{info, warn};
use padder::pad_and_push_to_buffer;
use rand::rngs::ThreadRng;

use std::path::PathBuf;
use std::sync::Arc;
use std::thread::{spawn, JoinHandle};

use crate::mock_column;

/// If the user only wants to generate a small amount of mocked .flf rows then multithreading
/// is not a stuiable choice and probably only introduces extra overhead. This variable
/// specifies the minimum number of rows to be mocked to allow enabling multithreading.
///
/// # Note
/// This value takes priority over any CLI options regarding number of threads to use for mocking.
pub static MIN_NUM_ROWS_FOR_MULTITHREADING: usize = 100_000;

/// Unified trait for all types of file mockers.
pub trait Mocker {}
pub type MockerRef = Box<dyn Mocker>;

/// The mocker struct for fixed-length files (.flf).
pub struct FixedLengthFileMocker {
    /// The schema to mock data based on.
    schema: FixedSchema,
    /// The writer to use when writing the mocked data to file.
    writer: FixedLengthFileWriter,
    /// The number of mocked rows to generate.
    n_rows: usize,
    /// The number of threads (logical cores) to use.
    n_threads: usize,
    /// The size of the writer buffer (in number of rows).
    write_buffer_size: usize,
    // The maximum number of active messages allowed in the thread channels.
    thread_channel_capacity: usize,
}

impl FixedLengthFileMocker {
    /// Create a new instance of a [`FixedLengthFileMockerBuilder`] with default values.
    pub fn builder() -> FixedLengthFileMockerBuilder {
        FixedLengthFileMockerBuilder {
            ..Default::default()
        }
    }

    /// Try and generate mocked data based on the provided [`FixedSchema`].
    ///
    /// This function either runs in single-threaded mode or in multithreaded mode depending on:
    /// * the number of requested mocked rows to generate,
    /// * and the number of avilable threads on the host system.
    ///
    /// # Errors
    /// This function will propagate any errors created in any of the mocking modes, see any
    /// of the functions [`try_mock_multithreaded`] or [`try_mock_single_threaded`] for specifics.
    ///
    /// [`try_mock_multithreaded`]: FixedLengthFileMocker::try_mock_multithreaded
    /// [`try_mock_single_threaded`]: FixedLengthFileMocker::try_mock_single_threaded
    pub fn try_mock(&mut self) -> Result<()> {
        if self.n_threads > 1 {
            self.try_mock_multithreaded()?;
        } else {
            self.try_mock_single_threaded()?;
        }
        Ok(())
    }

    /// Try and generate mocked data in multithreaded mode.
    ///
    /// # Errors
    /// This function might return an error for the following reasons:
    /// * If the [`FixedLengthFileWriter`] failed to write the generated columns to file.
    /// * If the [`FixedLengthFileWriter`] failed to flush any remaining bytes and close the buffer.
    ///
    fn try_mock_multithreaded(&mut self) -> Result<()> {
        let thread_workloads: Vec<usize> = self.distribute_worker_thread_workloads();
        let (sender, receiver) = channel::bounded(self.thread_channel_capacity);

        info!(
            "Starting {} worker threads to generate {} mocked rows.",
            self.n_threads - 1,
            self.n_rows
        );
        let arc_schema = Arc::new(self.schema.clone());
        let t_n_rows_buffer_size: usize = self.write_buffer_size;
        let t_buffer_size: usize = 2 * self.write_buffer_size * arc_schema.row_length()
            + NUM_BYTES_FOR_NEWLINE * self.write_buffer_size;

        let threads = thread_workloads
            .into_iter()
            .enumerate()
            .map(|(t_idx, t_workload)| {
                let t_schema = Arc::clone(&arc_schema);
                let t_sender = sender.clone();
                spawn(move || {
                    let mut rng: ThreadRng = rand::thread_rng();
                    let mut buffer: Vec<u8> = Vec::with_capacity(t_buffer_size);

                    for row_idx in 0..t_workload {
                        if (row_idx % t_n_rows_buffer_size == 0) && (row_idx != 0) {
                            t_sender.send(buffer).unwrap_or_else(|_| {
                                panic!(
                                    "Thread {} could not send buffer to master thread!",
                                    t_idx + 1
                                )
                            });

                            // We need to reallocate the buffer since we sent it to the master thread.
                            buffer = Vec::with_capacity(t_buffer_size);
                        }

                        for column in t_schema.iter() {
                            pad_and_push_to_buffer(
                                mock_column(column, &mut rng).as_bytes(),
                                column.length(),
                                column.alignment(),
                                column.pad_symbol(),
                                &mut buffer,
                            );
                        }

                        buffer.extend_from_slice(newline().as_bytes());
                    }

                    t_sender.send(buffer).unwrap_or_else(|_| {
                        panic!(
                            "Thread {} could not send buffer to master thread!",
                            t_idx + 1
                        )
                    });

                    info!("Thread {} done!", t_idx + 1);
                    drop(t_sender);
                })
            })
            .collect::<Vec<JoinHandle<()>>>();

        drop(sender);
        for buffer in receiver {
            self.writer.try_write(&buffer)?;
            drop(buffer);
        }

        for (t_idx, handle) in threads.into_iter().enumerate() {
            handle
                .join()
                .unwrap_or_else(|_| panic!("Thread {} could not join the master thread!", t_idx));
        }

        Ok(())
    }

    /// Try and generate mocked data in single-threaded mode.
    ///
    /// # Errors
    /// This function might return an error for the following reasons:
    /// * If the [`FixedLengthFileWriter`] failed to write the generated columns to file.
    /// * If the [`FixedLengthFileWriter`] failed to flush any remaining bytes and close the buffer.
    fn try_mock_single_threaded(&mut self) -> Result<()> {
        let n_runes_in_row = self.schema.row_length();
        // Here we multiply by 4 because a valid UTF-8 encoded character can at most be
        // exactly 4 bytes. Thus, we will always allocate enough memory for the writer buffer.
        // https://en.wikipedia.org/wiki/UTF-8
        let writer_buffer_size: usize = 4 * self.write_buffer_size * NUM_BYTES_FOR_NEWLINE
            + self.write_buffer_size * n_runes_in_row;

        let mut buffer: Vec<u8> = Vec::with_capacity(writer_buffer_size);
        let mut rng: ThreadRng = rand::thread_rng();

        info!(
            "Generating {} mocked rows in single-threaded mode.",
            self.n_rows
        );
        for ridx in 0..self.n_rows {
            if (ridx % self.write_buffer_size == 0) && (ridx != 0) {
                self.writer.try_write(&buffer)?;
                buffer.clear();
            }

            for column in self.schema.iter() {
                pad_and_push_to_buffer(
                    mock_column(column, &mut rng).as_bytes(),
                    column.length(),
                    column.alignment(),
                    column.pad_symbol(),
                    &mut buffer,
                );
            }

            buffer.extend_from_slice(newline().as_bytes());
        }

        info!("Done mocking, flushing any remaining buffers.");
        self.writer.try_write(&buffer)?;
        self.writer.try_finish()?;

        Ok(())
    }

    /// Distribute the workload of mocking rows to the available threads. Attempts to dsitribute
    /// uniformly, but the last thread will always get any remaining rows which were not split evenly.
    fn distribute_worker_thread_workloads(&self) -> Vec<usize> {
        let n_worker_threads: usize = self.n_threads - 1;
        let n_rows_per_thread: usize = self.n_rows / n_worker_threads;
        let n_rows_remaining: usize = self.n_rows - (n_rows_per_thread * n_worker_threads);
        let mut thread_workloads: Vec<usize> = (0..(n_worker_threads - 1))
            .map(|_| n_rows_per_thread)
            .collect();

        thread_workloads.push(n_rows_per_thread + n_rows_remaining);
        thread_workloads
    }
}

impl Mocker for FixedLengthFileMocker {}

/// A helper struct for building an instance of a [`FixedLengthFileMocker`] struct.
#[derive(Default)]
pub struct FixedLengthFileMockerBuilder {
    schema_path: Option<PathBuf>,
    out_path: Option<PathBuf>,
    n_rows: Option<usize>,
    n_threads: Option<usize>,
    write_buffer_size: Option<usize>,
    thread_channel_capacity: Option<usize>,

    // File descriptor properties.
    force_create_new: Option<bool>,
    truncate_existing: Option<bool>,
}

impl FixedLengthFileMockerBuilder {
    /// Set the relative or absolute path to the json schema file to use.
    pub fn with_schema(mut self, schema_path: PathBuf) -> Self {
        self.schema_path = Some(schema_path);
        self
    }

    /// Set the relative or absolute path to the output file to produce.
    pub fn with_out_file(mut self, out_path: PathBuf) -> Self {
        self.out_path = Some(out_path);
        self
    }

    /// Set the number of rows to generate.
    pub fn with_num_rows(mut self, n_rows: usize) -> Self {
        self.n_rows = Some(n_rows);
        self
    }

    /// Set the number of threads (logical cores) to use.
    pub fn with_num_threads(mut self, n_thread: usize) -> Self {
        self.n_threads = Some(n_thread);
        self
    }

    /// Set the buffer size for writing to file (in number of rows).
    pub fn with_write_buffer_size(mut self, buffer_size: usize) -> Self {
        self.write_buffer_size = Some(buffer_size);
        self
    }

    /// Set the writer option to return an error if the file already exists.
    pub fn with_force_create_new(mut self, force_create_new: bool) -> Self {
        self.force_create_new = Some(force_create_new);
        self
    }

    /// Set the writer option to truncate the output file if it already exists.
    pub fn with_truncate_existing(mut self, truncate_existing: bool) -> Self {
        self.truncate_existing = Some(truncate_existing);
        self
    }

    pub fn with_thread_channel_capacity(mut self, capacity: Option<usize>) -> Self {
        self.thread_channel_capacity = capacity;
        self
    }

    /// Set default values for the optional configuration fields.
    ///
    /// # Note
    /// Default mode is always single-threaded since I/O bottleneck leads to multithreading not being efficient.
    pub fn with_default_values(mut self) -> Self {
        self.n_threads = Some(1);
        self.write_buffer_size = Some(1000);
        self.force_create_new = Some(true);
        self.truncate_existing = Some(true);
        self.thread_channel_capacity = Some(1);
        self
    }

    /// Try creating a new [`FixedLengthFileMocker`] from the previously set values.
    ///
    /// # Errors
    /// If any of the required fields are `None`, or if the schema deserialization failed.
    pub fn try_build(self) -> Result<FixedLengthFileMocker> {
        let schema: FixedSchema = match self.schema_path {
            Some(p) => FixedSchema::from_path(p)?,
            None => {
                return Err(Box::new(SetupError::new(
                    "Required field 'schema_path' was not provided, exiting...",
                )))
            }
        };

        let out_path: PathBuf = match self.out_path {
            Some(p) => p,
            None => {
                return Err(Box::new(SetupError::new(
                    "Required field 'out_path' was not provided, exiting...",
                )))
            }
        };

        let n_rows: usize = self.n_rows.ok_or_else(|| {
            Box::new(SetupError::new(
                "Required field 'n_rows' was not provided, exiting...",
            ))
        })?;

        let mut n_threads: usize = self.n_threads.ok_or_else(|| {
            Box::new(SetupError::new(
                "Required field 'n_threads' was not provided, exiting...",
            ))
        })?;

        let write_buffer_size: usize = self.write_buffer_size.ok_or_else(|| {
            Box::new(SetupError::new(
                "Required field 'write_buffer_size' was not provided, exiting...",
            ))
        })?;

        let force_create_new: bool = self.force_create_new.ok_or_else(|| {
            Box::new(SetupError::new(
                "Required field 'force_create_new' was not provided, exiting...",
            ))
        })?;

        let truncate_existing: bool = self.truncate_existing.ok_or_else(|| {
            Box::new(SetupError::new(
                "Required field 'truncate_existing' was not provided, exiting...",
            ))
        })?;

        let thread_channel_capacity: usize = self.thread_channel_capacity.unwrap_or(n_threads);

        let writer_properties: FixedLengthFileWriterProperties =
            FixedLengthFileWriterProperties::builder()
                .with_force_create_new(force_create_new)
                .with_create_or_open(true)
                .with_truncate_existing(truncate_existing)
                .try_build()?;

        let writer: FixedLengthFileWriter = FixedLengthFileWriter::builder()
            .with_out_path(out_path)
            .with_properties(writer_properties)
            .try_build()?;

        let multithreading: bool = (n_rows >= MIN_NUM_ROWS_FOR_MULTITHREADING) && (n_threads > 1);

        if !multithreading && n_threads > 1 {
            warn!(
                "You specified to use {} threads but only want to mock {} rows.",
                n_threads, n_rows
            );
            warn!(
                "This is done much more efficiently single-threaded, ignoring any multithreading!"
            );
            n_threads = 1;
        }

        Ok(FixedLengthFileMocker {
            schema,
            writer,
            n_rows,
            n_threads,
            write_buffer_size,
            thread_channel_capacity,
        })
    }

    /// Creates a new [`FixedLengthFileMocker`] from the previously set values.
    ///
    /// # Note
    /// This method internally calls the [`try_build`] method and simply unwraps the returned
    /// [`Result`]. If you don't care about error propagation, use this method over [`try_build`].
    ///
    /// # Panics
    /// If any of the required fields are `None`, or if the schema deserialization failed.
    ///
    /// [`try_build`]: FixedLengthFileMockerBuilder::try_build
    pub fn build(self) -> FixedLengthFileMocker {
        self.try_build().unwrap()
    }
}