orphanage 0.5.6

Random collection of stuff that is still searching for a home.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
use std::{
  io::Read,
  iter::FusedIterator,
  ops::ControlFlow,
  panic,
  path::{Path, PathBuf},
  thread
};

use tokio::sync::mpsc::{self, channel};

use rand::RngExt;

/// An object that implements [`std::io::Read`] which returns random data.
#[derive(Default)]
pub struct RngReader(Option<u64>);

impl RngReader {
  /// Create an `RngReader` that will keep on yielding random data infinitely.
  #[must_use]
  pub fn new() -> Self {
    Self::default()
  }

  /// Create an `RngReader` that will return a specified amount of random data,
  /// after which the reader will return eof.
  #[must_use]
  pub const fn with_lim(size: u64) -> Self {
    Self(Some(size))
  }
}

impl std::io::Read for RngReader {
  fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
    if let Some(ref mut remain) = self.0 {
      if *remain == 0 {
        // signal eof
        Ok(0)
      } else {
        let n = std::cmp::min(*remain, buf.len() as u64);
        let len = usize::try_from(n).unwrap();
        rand::rng().fill(&mut buf[..len]);
        *remain -= n;
        Ok(len)
      }
    } else {
      rand::rng().fill(&mut buf[..]);
      Ok(buf.len())
    }
  }
}


/// Works much like `read_exact()`, but handles premature end-of-file.
///
/// # Errors
/// [`std::io::Error`]
pub fn readbuf<R>(f: &mut R, buf: &mut [u8]) -> Result<usize, std::io::Error>
where
  R: std::io::Read
{
  let mut o = 0;
  let mut remain = buf.len();
  while remain > 0 {
    let n = f.read(&mut buf[o..])?;
    o += n;
    if n == 0 {
      // This either means the input buffer was 0 length, or - the more
      // interesting case - that the end of file was reached.
      break;
    }
    remain -= n;
  }
  Ok(o)
}


/*
pub fn readbuf_uninit<R>(
  f: R,
  buf: &mut [MaybeUninit<u8>]
) -> Result<usize, std::io::Error>
where
  R: std::io::Read
{
}
*/


/// A builder for a fixed block size file reader.
pub struct BlockReader {
  fname: PathBuf,
  block_size: usize,
  queue_size: usize
}

impl BlockReader {
  /// Construct a new `BlockReader` builder.
  ///
  /// Defaults to a queue limit of 2 blocks.
  ///
  /// # Panics
  /// The `block_size` must be greater than zero.
  pub fn new<P>(fname: P, block_size: usize) -> Self
  where
    P: AsRef<Path>
  {
    // A zero length block size makes no sense.
    assert!(block_size > 0, "block_size must be greater than zero");

    Self {
      fname: fname.as_ref().to_path_buf(),
      block_size,
      queue_size: 2
    }
  }

  /// Set the queue size, in number of blocks.
  ///
  /// This configures how many blocks can be "in-flight".
  ///
  /// # Panics
  /// The `len` parameter mus be greater than zero.
  #[must_use]
  pub fn queue_size(mut self, len: usize) -> Self {
    assert!(len > 0, "Queue size must be greater than zero");
    self.queue_size = len;
    self
  }
}

impl BlockReader {
  /// Read file block-by-block and call a closure to process each block.
  ///
  /// There's an assumption that the file's size will not change:  Once the end
  /// of the file has been reached, the function will return.
  ///
  /// # Internals
  /// This function does not carry any blocks in-flight apart from the block
  /// currently being processed.  It is intended to be used when the closure
  /// returns quickly.
  ///
  /// # Errors
  pub fn proc<F, E>(self, mut f: F) -> Result<(), E>
  where
    F: FnMut(&[u8]) -> ControlFlow<E>,
    E: From<std::io::Error>
  {
    let mut file = std::fs::File::open(self.fname)?;

    // ToDo: redundant-init
    let mut buf = vec![0u8; self.block_size];

    loop {
      let n = readbuf(&mut file, &mut buf)?;
      if n == 0 {
        // Assume end-of-file
        break Ok(());
      }
      if let ControlFlow::Break(e) = f(&buf[..n]) {
        // If the application requested to break, then return its break value
        // as an error.
        break Err(e);
      }
      if n < self.block_size {
        // If an incomplete block was read, then assume this was the last one.
        break Ok(());
      }
    }
  }

  /// Read file block-by-block on a background thread and pass each block via
  /// a channel to a closure.
  ///
  /// This method should be preferred over [`BlockReader::proc()`] if the
  /// closure needs ownership of the blocks and its processing is non-trivial
  /// (i.e. the background thread can pre-load the next block).
  ///
  /// # Errors
  /// Returns a caller-defined error `E` which must be convertible from a
  /// `std::io::Error`.  If the
  pub fn proc_thrd<F, E>(self, mut f: F) -> Result<(), E>
  where
    F: FnMut(Vec<u8>) -> ControlFlow<E>,
    E: From<std::io::Error> + Send + 'static
  {
    // Construct (bounded) channel for passing blocks from the reader thread to
    // the processing loop.
    let (tx, mut rx) = channel(self.queue_size);

    let mut file = std::fs::File::open(self.fname)?;

    // The file i/o API's used u64 for sizes
    let bs = self.block_size as u64;

    // Kick off thread that's responsible for reading input file
    // block-by-block.
    //
    // If the read fails with an unexpected error, then return the error from
    // the thread so the caller can
    let jh: thread::JoinHandle<Result<(), E>> = thread::spawn(move || {
      loop {
        let mut buf = Vec::with_capacity(self.block_size);

        // Read next block
        match std::io::Read::by_ref(&mut file)
          .take(bs)
          .read_to_end(&mut buf)
        {
          Ok(n) => {
            if n == 0 {
              break Ok(());
            }

            unsafe {
              buf.set_len(n);
            }

            // Transmit block to handler
            if let Err(_e) = tx.blocking_send(buf) {
              // The remote end-point was closed prematurely.
              //
              // Return an "Interrupted" error.  This will currently not
              // actually be seen by the caller, because the only way to drop
              // the channel end-point prematurely is if the closure returns
              // Break, and at that point the return value is ignored.
              // However, we'll return this in case the semantics change in the
              // future and we want to see if this error leaks.
              let e = std::io::Error::new(
                std::io::ErrorKind::Interrupted,
                "Application aborted block processing"
              );
              break Err(e.into());
            }
          }
          Err(e) => {
            // I/O error -> E
            break Err(e.into());
          }
        }
      }
    });

    // Keep reading blocks from the channel and pass them to the closure.
    // Keep going until channel transmitter end-point is closed.
    while let Some(buf) = rx.blocking_recv() {
      if let ControlFlow::Break(e) = f(buf) {
        // The closure has requested to terminate the processing
        // prematurely.

        // Drop the receiver end-point to tell the the thread to
        // self-terminate.
        drop(rx);

        // Wait for the thread to terminate, but ignore its result (we're
        // returning the error from the closure).
        let _ = jh.join();

        return Err(e);
      }
    }

    // Explicitly close the channel's receiving end-point to instruct the
    // thread to terminate
    drop(rx);

    // At this point the receiver loop was terminated because the channel's
    // transmitting channel end-point was closed.  Return the thread's return
    // value.

    match jh.join() {
      Ok(res) => res,
      Err(e) => {
        // propagate panic
        panic::resume_unwind(e)
      }
    }
  }

  /// Open a file and return a channel receiver end-point that will return the
  /// file's contents block-by-block.
  ///
  /// Once the file's end-of-file has been reached, the transmitting end-point
  /// will be dropped, which will cause the receiver end-point to fail once the
  /// remaining blocks have been read.
  ///
  /// # Errors
  #[allow(clippy::type_complexity)]
  pub fn channel(
    self
  ) -> Result<
    (
      mpsc::Receiver<Result<Vec<u8>, std::io::Error>>,
      thread::JoinHandle<()>
    ),
    std::io::Error
  > {
    let (tx, rx) = channel(self.queue_size);

    let mut file = std::fs::File::open(self.fname)?;

    // The file i/o API's used u64 for sizes
    let bs = self.block_size as u64;

    let jh = thread::spawn(move || {
      loop {
        let mut buf = Vec::with_capacity(self.block_size);

        // Read next block
        match std::io::Read::by_ref(&mut file)
          .take(bs)
          .read_to_end(&mut buf)
        {
          Ok(0) => break, //eof
          Ok(n) => {
            unsafe {
              buf.set_len(n);
            }

            // Transmit block to handler
            if let Err(_e) = tx.blocking_send(Ok(buf)) {
              // The remote end-point was closed -- just terminate.
              break;
            }
          }
          Err(e) => {
            // Send error to iterator so it can be returned to application.
            let _ = tx.blocking_send(Err(e));
            break;
          }
        }
      }
    });

    Ok((rx, jh))
  }

  /// Return an `Iterator` which yields the file's blocks.
  ///
  /// # Errors
  pub fn try_iter(self) -> Result<BlockIter, std::io::Error> {
    let (tx, rx) = channel(self.queue_size);

    let mut file = std::fs::File::open(self.fname)?;

    // The file i/o API's used u64 for sizes
    let bs = self.block_size as u64;

    let jh = thread::spawn(move || {
      loop {
        let mut buf = Vec::with_capacity(self.block_size);

        // Read next block
        match std::io::Read::by_ref(&mut file)
          .take(bs)
          .read_to_end(&mut buf)
        {
          Ok(0) => break, //eof
          Ok(n) => {
            unsafe {
              buf.set_len(n);
            }

            // Transmit block to handler
            if let Err(_e) = tx.blocking_send(Ok(buf)) {
              // The remote end-point was closed -- just terminate.
              break;
            }
          }
          Err(e) => {
            // Send error to iterator so it can be returned to application.
            let _ = tx.blocking_send(Err(e));
            break;
          }
        }
      }
    });

    let bi = BlockIter {
      rx: Some(rx),
      jh: Some(jh)
    };

    Ok(bi)
  }
}

pub struct BlockIter {
  rx: Option<mpsc::Receiver<Result<Vec<u8>, std::io::Error>>>,
  jh: Option<thread::JoinHandle<()>>
}

impl Iterator for BlockIter {
  type Item = Result<Vec<u8>, std::io::Error>;

  fn next(&mut self) -> Option<Self::Item> {
    let res = self.rx.as_mut().and_then(|rx| rx.blocking_recv());

    // If an error was reported, then permanently drop the receiver end-point
    if let Some(ref res) = res {
      if res.is_err() {
        let _ = self.rx.take();
      }
    }

    res
  }
}

impl FusedIterator for BlockIter {}

impl Drop for BlockIter {
  fn drop(&mut self) {
    // Make sure receiver end-point is dropped so the thread self-terminates
    if let Some(rx) = self.rx.take() {
      // Be explicit
      drop(rx);
    }

    // Now that the receiver end-point is known to be dropped, join the thread
    if let Some(jh) = self.jh.take() {
      let _ = jh.join();
    }
  }
}

// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :