parallel_processor/memory_fs/file/
flush.rs

1use crate::memory_fs::allocator::AllocatedChunk;
2use crate::memory_fs::file::internal::FileChunk;
3use crate::memory_fs::flushable_buffer::{FileFlushMode, FlushableItem};
4use crate::memory_fs::stats;
5use crossbeam::channel::*;
6use mt_debug_counters::counter::{AtomicCounter, AtomicCounterGuardSum, MaxMode, SumMode};
7use parking_lot::lock_api::{RawMutex, RawRwLock};
8use parking_lot::{Mutex, RwLock};
9use std::cmp::max;
10use std::io::{Seek, SeekFrom, Write};
11use std::ops::DerefMut;
12use std::sync::Arc;
13use std::thread::JoinHandle;
14use std::time::Duration;
15
16use super::handle::FileHandle;
17
18static mut GLOBAL_FLUSH_QUEUE: Option<Sender<FlushableItem>> = None;
19static FLUSH_THREADS: Mutex<Vec<JoinHandle<()>>> =
20    Mutex::const_new(parking_lot::RawMutex::INIT, vec![]);
21
22static TAKE_FROM_QUEUE_MUTEX: Mutex<()> = Mutex::const_new(parking_lot::RawMutex::INIT, ());
23static WRITING_CHECK: RwLock<()> = RwLock::const_new(parking_lot::RawRwLock::INIT, ());
24
25static COUNTER_WRITING_APPEND: AtomicCounter<SumMode> =
26    declare_counter_i64!("threads_file_append_count", SumMode, false);
27
28static COUNTER_WRITE_AT: AtomicCounter<SumMode> =
29    declare_counter_i64!("threads_write_at_count", SumMode, false);
30
31static COUNTER_DISK_FLUSHES: AtomicCounter<SumMode> =
32    declare_counter_i64!("disk_flushes", SumMode, false);
33
34static COUNTER_BYTES_WRITTEN: AtomicCounter<SumMode> =
35    declare_counter_i64!("bytes_written_count", SumMode, false);
36
37static GLOBAL_QUEUE_MAX_SIZE_NOW: AtomicCounter<MaxMode> =
38    declare_counter_i64!("global_queue_max_size_now", MaxMode, true);
39
40pub struct GlobalFlush;
41
42impl GlobalFlush {
43    pub fn global_queue_occupation() -> (usize, usize) {
44        unsafe {
45            (
46                GLOBAL_FLUSH_QUEUE.as_ref().unwrap().len(),
47                GLOBAL_FLUSH_QUEUE
48                    .as_ref()
49                    .unwrap()
50                    .capacity()
51                    .unwrap_or(usize::MAX),
52            )
53        }
54    }
55
56    pub fn is_queue_empty() -> bool {
57        unsafe { GLOBAL_FLUSH_QUEUE.as_ref().unwrap().len() == 0 }
58    }
59
60    pub fn add_item_to_flush_queue(item: FlushableItem) {
61        unsafe { GLOBAL_FLUSH_QUEUE.as_mut().unwrap().send(item).unwrap() }
62    }
63
64    fn flush_thread(flush_channel_receiver: Receiver<FlushableItem>) {
65        let mut queue_take_lock = TAKE_FROM_QUEUE_MUTEX.lock();
66
67        while let Ok(file_to_flush) = flush_channel_receiver.recv() {
68            // Here it's held an exclusive lock to ensure that sequential writes to a file are not processed concurrently
69            let mut file_lock = file_to_flush.underlying_file.lock();
70            let mut file = file_lock.get_file();
71            let _writing_check = WRITING_CHECK.read();
72            drop(queue_take_lock);
73
74            match file_to_flush.mode {
75                FileFlushMode::Append {
76                    chunk: mut file_chunk,
77                } => {
78                    if let FileChunk::OnMemory { chunk } = file_chunk.deref_mut() {
79                        let _stat = AtomicCounterGuardSum::new(&COUNTER_WRITING_APPEND, 1);
80                        GLOBAL_QUEUE_MAX_SIZE_NOW.max(flush_channel_receiver.len() as i64);
81                        COUNTER_DISK_FLUSHES.inc();
82
83                        let offset = file.stream_position().unwrap();
84
85                        file.write_all(chunk.get())
86                            .map_err(|e| {
87                                format!(
88                                    "Error while writing file: '{}' => {}",
89                                    file_lock.get_path().display(),
90                                    e
91                                )
92                            })
93                            .unwrap();
94
95                        stats::add_disk_usage(chunk.len() as u64);
96
97                        let len = chunk.len();
98                        *file_chunk = FileChunk::OnDisk { offset, len };
99                        COUNTER_BYTES_WRITTEN.inc_by(len as i64);
100                    }
101                }
102                FileFlushMode::WriteAt { buffer, offset } => {
103                    let _writing_check = WRITING_CHECK.read();
104                    GLOBAL_QUEUE_MAX_SIZE_NOW.max(flush_channel_receiver.len() as i64);
105
106                    COUNTER_DISK_FLUSHES.inc();
107                    let _stat = AtomicCounterGuardSum::new(&COUNTER_WRITE_AT, 1);
108                    file.seek(SeekFrom::Start(offset)).unwrap();
109                    file.write_all(buffer.get()).unwrap();
110                    COUNTER_BYTES_WRITTEN.inc_by(buffer.get().len() as i64);
111                }
112            }
113
114            drop(_writing_check);
115            drop(file);
116            drop(file_lock);
117            // Try lock the queue again
118            queue_take_lock = TAKE_FROM_QUEUE_MUTEX.lock();
119        }
120    }
121
122    pub fn is_initialized() -> bool {
123        return unsafe { GLOBAL_FLUSH_QUEUE.is_some() };
124    }
125
126    pub fn init(flush_queue_size: usize, threads_count: usize) {
127        let (flush_channel_sender, flush_channel_receiver) = bounded(flush_queue_size);
128
129        unsafe {
130            GLOBAL_FLUSH_QUEUE = Some(flush_channel_sender);
131        }
132
133        for _ in 0..max(1, threads_count) {
134            let flush_channel_receiver = flush_channel_receiver.clone();
135            FLUSH_THREADS.lock().push(
136                std::thread::Builder::new()
137                    .name(String::from("flushing-thread"))
138                    .spawn(move || Self::flush_thread(flush_channel_receiver))
139                    .unwrap(),
140            );
141        }
142    }
143
144    pub fn schedule_disk_write(file: Arc<Mutex<FileHandle>>, buffer: AllocatedChunk, offset: u64) {
145        Self::add_item_to_flush_queue(FlushableItem {
146            underlying_file: file,
147            mode: FileFlushMode::WriteAt { buffer, offset },
148        })
149    }
150
151    pub fn flush_to_disk() {
152        while !unsafe { GLOBAL_FLUSH_QUEUE.as_mut().unwrap().is_empty() } {
153            std::thread::sleep(Duration::from_millis(50));
154        }
155        // Ensure that no writers are still writing!
156        drop(WRITING_CHECK.write());
157    }
158
159    pub fn terminate() {
160        Self::flush_to_disk();
161        unsafe {
162            GLOBAL_FLUSH_QUEUE.take();
163        }
164        let mut threads = FLUSH_THREADS.lock();
165        for thread in threads.drain(..) {
166            thread.join().unwrap();
167        }
168    }
169}