1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
use crate::memory_fs::allocator::AllocatedChunk;
use crate::memory_fs::file::internal::FileChunk;
use crate::memory_fs::flushable_buffer::{FileFlushMode, FlushableItem};
use crossbeam::channel::*;
use mt_debug_counters::counter::{AtomicCounter, AtomicCounterGuardSum, MaxMode, SumMode};
use parking_lot::lock_api::{RawMutex, RawRwLock};
use parking_lot::{Mutex, RwLock};
use std::cmp::max;
use std::fs::File;
use std::io::{Seek, SeekFrom, Write};
use std::ops::DerefMut;
use std::path::PathBuf;
use std::sync::Arc;
use std::thread::JoinHandle;
use std::time::Duration;

static mut GLOBAL_FLUSH_QUEUE: Option<Sender<FlushableItem>> = None;
static FLUSH_THREADS: Mutex<Vec<JoinHandle<()>>> =
    Mutex::const_new(parking_lot::RawMutex::INIT, vec![]);

static TAKE_FROM_QUEUE_MUTEX: Mutex<()> = Mutex::const_new(parking_lot::RawMutex::INIT, ());
static WRITING_CHECK: RwLock<()> = RwLock::const_new(parking_lot::RawRwLock::INIT, ());

static COUNTER_WRITING_APPEND: AtomicCounter<SumMode> =
    declare_counter_i64!("threads_file_append_count", SumMode, false);

static COUNTER_WRITE_AT: AtomicCounter<SumMode> =
    declare_counter_i64!("threads_write_at_count", SumMode, false);

static COUNTER_DISK_FLUSHES: AtomicCounter<SumMode> =
    declare_counter_i64!("disk_flushes", SumMode, false);

static COUNTER_BYTES_WRITTEN: AtomicCounter<SumMode> =
    declare_counter_i64!("bytes_written_count", SumMode, false);

static GLOBAL_QUEUE_MAX_SIZE_NOW: AtomicCounter<MaxMode> =
    declare_counter_i64!("global_queue_max_size_now", MaxMode, true);

pub struct GlobalFlush;

impl GlobalFlush {
    pub fn global_queue_occupation() -> (usize, usize) {
        unsafe {
            (
                GLOBAL_FLUSH_QUEUE.as_ref().unwrap().len(),
                GLOBAL_FLUSH_QUEUE
                    .as_ref()
                    .unwrap()
                    .capacity()
                    .unwrap_or(usize::MAX),
            )
        }
    }

    pub fn is_queue_empty() -> bool {
        unsafe { GLOBAL_FLUSH_QUEUE.as_ref().unwrap().len() == 0 }
    }

    pub fn add_item_to_flush_queue(item: FlushableItem) {
        unsafe { GLOBAL_FLUSH_QUEUE.as_mut().unwrap().send(item).unwrap() }
    }

    fn flush_thread(flush_channel_receiver: Receiver<FlushableItem>) {
        let mut queue_take_lock = TAKE_FROM_QUEUE_MUTEX.lock();

        while let Ok(file_to_flush) = flush_channel_receiver.recv() {
            // Here it's held an exclusive lock to ensure that sequential writes to a file are not processed concurrently
            let mut file_lock = file_to_flush.underlying_file.1.lock();
            let _writing_check = WRITING_CHECK.read();
            drop(queue_take_lock);

            match file_to_flush.mode {
                FileFlushMode::Append {
                    chunk: mut file_chunk,
                } => {
                    if let FileChunk::OnMemory { chunk } = file_chunk.deref_mut() {
                        let _stat = AtomicCounterGuardSum::new(&COUNTER_WRITING_APPEND, 1);
                        GLOBAL_QUEUE_MAX_SIZE_NOW.max(flush_channel_receiver.len() as i64);
                        COUNTER_DISK_FLUSHES.inc();

                        let offset = file_lock.stream_position().unwrap();

                        file_lock.write_all(chunk.get()).unwrap();
                        let len = chunk.len();
                        *file_chunk = FileChunk::OnDisk { offset, len };
                        COUNTER_BYTES_WRITTEN.inc_by(len as i64);
                    }
                }
                FileFlushMode::WriteAt { buffer, offset } => {
                    let _writing_check = WRITING_CHECK.read();
                    GLOBAL_QUEUE_MAX_SIZE_NOW.max(flush_channel_receiver.len() as i64);

                    COUNTER_DISK_FLUSHES.inc();
                    let _stat = AtomicCounterGuardSum::new(&COUNTER_WRITE_AT, 1);
                    file_lock.seek(SeekFrom::Start(offset)).unwrap();
                    file_lock.write_all(buffer.get()).unwrap();
                    COUNTER_BYTES_WRITTEN.inc_by(buffer.get().len() as i64);
                }
            }

            drop(_writing_check);
            drop(file_lock);
            // Try lock the queue again
            queue_take_lock = TAKE_FROM_QUEUE_MUTEX.lock();
        }
    }

    pub fn is_initialized() -> bool {
        return unsafe { GLOBAL_FLUSH_QUEUE.is_some() };
    }

    pub fn init(flush_queue_size: usize, threads_count: usize) {
        let (flush_channel_sender, flush_channel_receiver) = bounded(flush_queue_size);

        unsafe {
            GLOBAL_FLUSH_QUEUE = Some(flush_channel_sender);
        }

        for _ in 0..max(1, threads_count) {
            let flush_channel_receiver = flush_channel_receiver.clone();
            FLUSH_THREADS.lock().push(
                std::thread::Builder::new()
                    .name(String::from("flushing-thread"))
                    .spawn(move || Self::flush_thread(flush_channel_receiver))
                    .unwrap(),
            );
        }
    }

    pub fn schedule_disk_write(
        file: Arc<(PathBuf, Mutex<File>)>,
        buffer: AllocatedChunk,
        offset: u64,
    ) {
        Self::add_item_to_flush_queue(FlushableItem {
            underlying_file: file,
            mode: FileFlushMode::WriteAt { buffer, offset },
        })
    }

    pub fn flush_to_disk() {
        while !unsafe { GLOBAL_FLUSH_QUEUE.as_mut().unwrap().is_empty() } {
            std::thread::sleep(Duration::from_millis(50));
        }
        // Ensure that no writers are still writing!
        drop(WRITING_CHECK.write());
    }

    pub fn terminate() {
        Self::flush_to_disk();
        unsafe {
            GLOBAL_FLUSH_QUEUE.take();
        }
        let mut threads = FLUSH_THREADS.lock();
        for thread in threads.drain(..) {
            thread.join().unwrap();
        }
    }
}