1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
use std::path::{Path, PathBuf};
use std::sync::Arc;

pub mod bucket_writer;
pub mod concurrent;
pub mod readers;
pub mod single;
pub mod writers;

pub trait LockFreeBucket {
    type InitData;

    fn new(path: &Path, data: &Self::InitData, index: usize) -> Self;
    fn write_data(&self, bytes: &[u8]);
    fn get_path(&self) -> PathBuf;
    fn finalize(self);
}

pub struct MultiThreadBuckets<B: LockFreeBucket> {
    buckets: Vec<B>,
}

impl<B: LockFreeBucket> MultiThreadBuckets<B> {
    pub const EMPTY: Self = Self { buckets: vec![] };

    pub fn new(size: usize, path: PathBuf, init_data: &B::InitData) -> MultiThreadBuckets<B> {
        let mut buckets = Vec::with_capacity(size);

        for i in 0..size {
            buckets.push(B::new(&path, init_data, i));
        }
        MultiThreadBuckets { buckets }
    }

    pub fn into_buckets(mut self) -> impl Iterator<Item = B> {
        let buckets = std::mem::take(&mut self.buckets);
        buckets.into_iter()
    }

    pub fn get_path(&self, bucket: u16) -> PathBuf {
        self.buckets[bucket as usize].get_path()
    }

    pub fn add_data(&self, index: u16, data: &[u8]) {
        self.buckets[index as usize].write_data(data);
    }

    pub fn count(&self) -> usize {
        self.buckets.len()
    }

    pub fn finalize(self: Arc<Self>) -> Vec<PathBuf> {
        let mut self_ = Arc::try_unwrap(self)
            .unwrap_or_else(|_| panic!("Cannot take full ownership of multi thread buckets!"));

        self_
            .buckets
            .drain(..)
            .map(|bucket| {
                let path = bucket.get_path();
                bucket.finalize();
                path
            })
            .collect()
    }
}

impl<B: LockFreeBucket> Drop for MultiThreadBuckets<B> {
    fn drop(&mut self) {
        self.buckets.drain(..).for_each(|bucket| {
            bucket.finalize();
        });
    }
}

unsafe impl<B: LockFreeBucket> Send for MultiThreadBuckets<B> {}

unsafe impl<B: LockFreeBucket> Sync for MultiThreadBuckets<B> {}