parallel_processor/memory_fs/file/
flush.rs

1use crate::memory_fs::allocator::AllocatedChunk;
2use crate::memory_fs::file::internal::FileChunk;
3use crate::memory_fs::flushable_buffer::{FileFlushMode, FlushableItem};
4use crate::memory_fs::stats;
5use crossbeam::channel::*;
6use mt_debug_counters::counter::{AtomicCounter, AtomicCounterGuardSum, MaxMode, SumMode};
7use parking_lot::lock_api::{RawMutex, RawRwLock};
8use parking_lot::{Mutex, RwLock};
9use std::cmp::max;
10use std::io::{Seek, SeekFrom, Write};
11use std::ops::DerefMut;
12use std::sync::Arc;
13use std::thread::JoinHandle;
14use std::time::Duration;
15
16use super::handle::FileHandle;
17
18static mut GLOBAL_FLUSH_QUEUE: Option<Sender<FlushableItem>> = None;
19static FLUSH_THREADS: Mutex<Vec<JoinHandle<()>>> =
20    Mutex::const_new(parking_lot::RawMutex::INIT, vec![]);
21
22static TAKE_FROM_QUEUE_MUTEX: Mutex<()> = Mutex::const_new(parking_lot::RawMutex::INIT, ());
23static WRITING_CHECK: RwLock<()> = RwLock::const_new(parking_lot::RawRwLock::INIT, ());
24
25static COUNTER_WRITING_APPEND: AtomicCounter<SumMode> =
26    declare_counter_i64!("threads_file_append_count", SumMode, false);
27
28static COUNTER_WRITE_AT: AtomicCounter<SumMode> =
29    declare_counter_i64!("threads_write_at_count", SumMode, false);
30
31static COUNTER_DISK_FLUSHES: AtomicCounter<SumMode> =
32    declare_counter_i64!("disk_flushes", SumMode, false);
33
34static COUNTER_BYTES_WRITTEN: AtomicCounter<SumMode> =
35    declare_counter_i64!("bytes_written_count", SumMode, false);
36
37static GLOBAL_QUEUE_MAX_SIZE_NOW: AtomicCounter<MaxMode> =
38    declare_counter_i64!("global_queue_max_size_now", MaxMode, true);
39
40pub struct GlobalFlush;
41
42#[allow(static_mut_refs)]
43impl GlobalFlush {
44    pub fn global_queue_occupation() -> (usize, usize) {
45        unsafe {
46            (
47                GLOBAL_FLUSH_QUEUE.as_ref().unwrap().len(),
48                GLOBAL_FLUSH_QUEUE
49                    .as_ref()
50                    .unwrap()
51                    .capacity()
52                    .unwrap_or(usize::MAX),
53            )
54        }
55    }
56
57    pub fn is_queue_empty() -> bool {
58        unsafe { GLOBAL_FLUSH_QUEUE.as_ref().unwrap().len() == 0 }
59    }
60
61    pub fn add_item_to_flush_queue(item: FlushableItem) {
62        unsafe { GLOBAL_FLUSH_QUEUE.as_ref().unwrap().send(item).unwrap() }
63    }
64
65    fn flush_thread(flush_channel_receiver: Receiver<FlushableItem>) {
66        let mut queue_take_lock = TAKE_FROM_QUEUE_MUTEX.lock();
67
68        while let Ok(file_to_flush) = flush_channel_receiver.recv() {
69            // Here it's held an exclusive lock to ensure that sequential writes to a file are not processed concurrently
70            let mut file_lock = file_to_flush.underlying_file.lock();
71            let mut file = file_lock.get_file();
72            let _writing_check = WRITING_CHECK.read();
73            drop(queue_take_lock);
74
75            match file_to_flush.mode {
76                FileFlushMode::Append {
77                    chunk: mut file_chunk,
78                } => {
79                    if let FileChunk::OnMemory { chunk } = file_chunk.deref_mut() {
80                        let _stat = AtomicCounterGuardSum::new(&COUNTER_WRITING_APPEND, 1);
81                        GLOBAL_QUEUE_MAX_SIZE_NOW.max(flush_channel_receiver.len() as i64);
82                        COUNTER_DISK_FLUSHES.inc();
83
84                        let offset = file.stream_position().unwrap();
85
86                        file.write_all(chunk.get())
87                            .map_err(|e| {
88                                format!(
89                                    "Error while writing file: '{}' => {}",
90                                    file_lock.get_path().display(),
91                                    e
92                                )
93                            })
94                            .unwrap();
95
96                        stats::add_disk_usage(chunk.len() as u64);
97
98                        let len = chunk.len();
99                        *file_chunk = FileChunk::OnDisk { offset, len };
100                        COUNTER_BYTES_WRITTEN.inc_by(len as i64);
101                    }
102                }
103                FileFlushMode::WriteAt { buffer, offset } => {
104                    let _writing_check = WRITING_CHECK.read();
105                    GLOBAL_QUEUE_MAX_SIZE_NOW.max(flush_channel_receiver.len() as i64);
106
107                    COUNTER_DISK_FLUSHES.inc();
108                    let _stat = AtomicCounterGuardSum::new(&COUNTER_WRITE_AT, 1);
109                    file.seek(SeekFrom::Start(offset)).unwrap();
110                    file.write_all(buffer.get()).unwrap();
111                    COUNTER_BYTES_WRITTEN.inc_by(buffer.get().len() as i64);
112                }
113            }
114
115            drop(_writing_check);
116            drop(file);
117            drop(file_lock);
118            // Try lock the queue again
119            queue_take_lock = TAKE_FROM_QUEUE_MUTEX.lock();
120        }
121    }
122
123    pub fn is_initialized() -> bool {
124        return unsafe { GLOBAL_FLUSH_QUEUE.is_some() };
125    }
126
127    pub fn init(flush_queue_size: usize, threads_count: usize) {
128        let (flush_channel_sender, flush_channel_receiver) = bounded(flush_queue_size);
129
130        unsafe {
131            GLOBAL_FLUSH_QUEUE = Some(flush_channel_sender);
132        }
133
134        for _ in 0..max(1, threads_count) {
135            let flush_channel_receiver = flush_channel_receiver.clone();
136            FLUSH_THREADS.lock().push(
137                std::thread::Builder::new()
138                    .name(String::from("flushing-thread"))
139                    .spawn(move || Self::flush_thread(flush_channel_receiver))
140                    .unwrap(),
141            );
142        }
143    }
144
145    pub fn schedule_disk_write(file: Arc<Mutex<FileHandle>>, buffer: AllocatedChunk, offset: u64) {
146        Self::add_item_to_flush_queue(FlushableItem {
147            underlying_file: file,
148            mode: FileFlushMode::WriteAt { buffer, offset },
149        })
150    }
151
152    pub fn flush_to_disk() {
153        while !unsafe { GLOBAL_FLUSH_QUEUE.as_ref().unwrap().is_empty() } {
154            std::thread::sleep(Duration::from_millis(50));
155        }
156        // Ensure that no writers are still writing!
157        drop(WRITING_CHECK.write());
158    }
159
160    pub fn terminate() {
161        Self::flush_to_disk();
162        unsafe {
163            GLOBAL_FLUSH_QUEUE.take();
164        }
165        let mut threads = FLUSH_THREADS.lock();
166        for thread in threads.drain(..) {
167            thread.join().unwrap();
168        }
169    }
170}