Documentation
/*
==--==--==--==--==--==--==--==--==--==--==--==--==--==--==--==--

Bi

Copyright (C) 2019, 2021-2022, 2024  Anonymous

There are several releases over multiple years,
they are listed as ranges, such as: "2021-2022".

This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Lesser General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.

This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
GNU Lesser General Public License for more details.

You should have received a copy of the GNU Lesser General Public License
along with this program.  If not, see <https://www.gnu.org/licenses/>.

::--::--::--::--::--::--::--::--::--::--::--::--::--::--::--::--
*/

//! # Bi

use {
    core::{
        sync::atomic::{self, AtomicUsize},
        time::Duration,
    },
    std::{
        io::{Error, ErrorKind, Read},
        sync::Arc,
        thread,
        time::SystemTime,
    },
    crate::{Bytes, IoResult, Options},
};

/// # An [`Arc`][struct:std/sync/Arc] of [`Bi`][struct:Bi]
///
/// [struct:std/sync/Arc]: https://doc.rust-lang.org/std/sync/struct.Arc.html
/// [struct:Bi]: struct.Bi.html
pub type ArcBi = Arc<Bi>;

impl From<Options> for ArcBi {

    fn from(options: Options) -> Self {
        Self::new(Bi::from(options))
    }

}

/// # Atomic Ordering
pub (crate) const ATOMIC_ORDERING: atomic::Ordering = atomic::Ordering::Relaxed;

const DELAY: Duration = Duration::from_millis(10);

/// # Bi
///
/// This struct helps with controlling multiple tasks reading data into memory.
///
/// For examples, your program parses files. Naturally it scans for files on the system, then reads them into memory, and does the job. Now, if
/// you want to limit total memory it uses at a time to 256 MiB, you can do this:
///
/// ```
/// use {
///     core::time::Duration,
///     std::{
///         io::{self, Read},
///         sync::Arc,
///         thread,
///     },
///
///     bi::{Bi, Options},
/// };
///
/// let bi = Arc::new(Bi::from(Options {
///     limit: 1024 * 1024 * 256,
///     buf_size: 1024 * 64,
///     wait_timeout: Duration::from_secs(10),
/// }));
///
/// let threads = (0..100).map(|_| {
///     let bi = bi.clone();
///     thread::spawn(move || {
///         // For demonstration, we use io::repeat()
///         let bytes = bi.read(&mut io::repeat(0).take(100), 100)?;
///
///         // Do something with bytes...
///
///         io::Result::Ok(())
///     })
/// }).collect::<Vec<_>>();
///
/// for t in threads {
///     if let Err(err) = t.join().unwrap() {
///         eprintln!("{}", err);
///     }
/// }
/// ```
#[derive(Debug)]
pub struct Bi {
    options: Options,
    counter: Arc<AtomicUsize>,
}

impl Bi {

    /// # Reads into new [`Bytes`][struct:Bytes]
    ///
    /// Capacity is used to request memory upfront. It would help if you provide a good value. If capacity is lower than real data, the function
    /// will request more memory.
    ///
    /// [struct:Bytes]: struct.Bytes.html
    pub fn read<R>(&self, src: &mut R, capacity: usize) -> IoResult<Bytes> where R: Read {
        let mut buf = self.make_buf()?;

        let mut bytes = Vec::with_capacity(match self.reserve(capacity) {
            Ok(capacity) => capacity,
            Err(err) => {
                if self.counter.fetch_update(ATOMIC_ORDERING, ATOMIC_ORDERING, |c| Some(c.saturating_sub(self.options.buf_size))).is_err() {
                    // Ignore it
                }
                return Err(err);
            },
        });

        let remove_size_read_from_counter = |bytes: &Vec<_>| self.counter.fetch_update(
            ATOMIC_ORDERING, ATOMIC_ORDERING, |c| Some(c.saturating_sub(bytes.len().max(capacity)).saturating_sub(self.options.buf_size)),
        ).map_err(|_| Error::new(ErrorKind::Other, __!()));

        loop {
            match src.read(&mut buf) {
                Ok(0) => break,
                Ok(size) => {
                    let required = match bytes.len().checked_add(size) {
                        Some(new) => new.saturating_sub(bytes.capacity()),
                        None => {
                            remove_size_read_from_counter(&bytes)?;
                            return Err(Error::new(ErrorKind::Other, __!("Out of usize")));
                        },
                    };
                    let start_time = SystemTime::now();
                    loop {
                        match match required {
                            0 => Ok(0),
                            required => self.counter.fetch_update(ATOMIC_ORDERING, ATOMIC_ORDERING, |c| match c.checked_add(required) {
                                Some(new) if new <= self.options.limit => Some(new),
                                _ => None,
                            }),
                        } {
                            Ok(_) => break bytes.extend(&buf[..size]),
                            Err(_) => if let Err(err) = self.sleep(start_time) {
                                remove_size_read_from_counter(&bytes)?;
                                return Err(err);
                            },
                        };
                    }
                },
                Err(err) => {
                    remove_size_read_from_counter(&bytes)?;
                    return Err(err);
                },
            };
        }

        Ok(Bytes::new(bytes, self.counter.clone()))
    }

    /// # Moves `src` into new [`Bytes`][struct:Bytes]
    ///
    /// On failure, source will be returned.
    ///
    /// [struct:Bytes]: struct.Bytes.html
    pub fn r#move<B>(&self, src: B) -> Result<Bytes, Vec<u8>> where B: Into<Vec<u8>> {
        let src = src.into();
        match self.reserve(src.len()) {
            Ok(_) => Ok(Bytes::new(src, self.counter.clone())),
            Err(_) => Err(src),
        }
    }

    /// # Reserves some space, waits for it if necessary
    fn reserve(&self, capacity: usize) -> IoResult<usize> {
        if capacity == 0 {
            return Ok(capacity);
        }

        let start_time = SystemTime::now();
        loop {
            match self.counter.fetch_update(ATOMIC_ORDERING, ATOMIC_ORDERING, |current| match current.checked_add(capacity) {
                Some(new) if new <= self.options.limit => Some(new),
                _ => None,
            }) {
                Ok(_) => return Ok(capacity),
                Err(_) => self.sleep(start_time)?,
            };
        }
    }

    /// # Makes new buffer
    fn make_buf(&self) -> IoResult<Vec<u8>> {
        const ZEROS: &[u8] = &[0; 1024];

        self.reserve(self.options.buf_size)?;

        let mut result = Vec::with_capacity(self.options.buf_size);

        while result.len() < self.options.buf_size {
            result.extend(&ZEROS[..ZEROS.len().min(self.options.buf_size.saturating_sub(result.len()))]);
        }

        Ok(result)
    }

    /// # Sleeps
    fn sleep(&self, start_time: SystemTime) -> IoResult<()> {
        let duration = SystemTime::now().duration_since(start_time).map_err(|e| Error::new(ErrorKind::Other, e))?;
        if duration < self.options.wait_timeout {
            thread::sleep(DELAY);
            Ok(())
        } else {
            Err(Error::new(ErrorKind::TimedOut, "Timed out"))
        }
    }

}

impl From<Options> for Bi {

    fn from(options: Options) -> Self {
        Self {
            options,
            counter: Arc::new(AtomicUsize::new(0)),
        }
    }

}