parallel_processor/memory_fs/file/
flush.rs1use crate::memory_fs::allocator::AllocatedChunk;
2use crate::memory_fs::file::internal::FileChunk;
3use crate::memory_fs::flushable_buffer::{FileFlushMode, FlushableItem};
4use crate::memory_fs::stats;
5use crossbeam::channel::*;
6use mt_debug_counters::counter::{AtomicCounter, AtomicCounterGuardSum, MaxMode, SumMode};
7use parking_lot::lock_api::{RawMutex, RawRwLock};
8use parking_lot::{Mutex, RwLock};
9use std::cmp::max;
10use std::io::{Seek, SeekFrom, Write};
11use std::ops::DerefMut;
12use std::sync::Arc;
13use std::thread::JoinHandle;
14use std::time::Duration;
15
16use super::handle::FileHandle;
17
18static mut GLOBAL_FLUSH_QUEUE: Option<Sender<FlushableItem>> = None;
19static FLUSH_THREADS: Mutex<Vec<JoinHandle<()>>> =
20 Mutex::const_new(parking_lot::RawMutex::INIT, vec![]);
21
22static TAKE_FROM_QUEUE_MUTEX: Mutex<()> = Mutex::const_new(parking_lot::RawMutex::INIT, ());
23static WRITING_CHECK: RwLock<()> = RwLock::const_new(parking_lot::RawRwLock::INIT, ());
24
25static COUNTER_WRITING_APPEND: AtomicCounter<SumMode> =
26 declare_counter_i64!("threads_file_append_count", SumMode, false);
27
28static COUNTER_WRITE_AT: AtomicCounter<SumMode> =
29 declare_counter_i64!("threads_write_at_count", SumMode, false);
30
31static COUNTER_DISK_FLUSHES: AtomicCounter<SumMode> =
32 declare_counter_i64!("disk_flushes", SumMode, false);
33
34static COUNTER_BYTES_WRITTEN: AtomicCounter<SumMode> =
35 declare_counter_i64!("bytes_written_count", SumMode, false);
36
37static GLOBAL_QUEUE_MAX_SIZE_NOW: AtomicCounter<MaxMode> =
38 declare_counter_i64!("global_queue_max_size_now", MaxMode, true);
39
40pub struct GlobalFlush;
41
42impl GlobalFlush {
43 pub fn global_queue_occupation() -> (usize, usize) {
44 unsafe {
45 (
46 GLOBAL_FLUSH_QUEUE.as_ref().unwrap().len(),
47 GLOBAL_FLUSH_QUEUE
48 .as_ref()
49 .unwrap()
50 .capacity()
51 .unwrap_or(usize::MAX),
52 )
53 }
54 }
55
56 pub fn is_queue_empty() -> bool {
57 unsafe { GLOBAL_FLUSH_QUEUE.as_ref().unwrap().len() == 0 }
58 }
59
60 pub fn add_item_to_flush_queue(item: FlushableItem) {
61 unsafe { GLOBAL_FLUSH_QUEUE.as_mut().unwrap().send(item).unwrap() }
62 }
63
64 fn flush_thread(flush_channel_receiver: Receiver<FlushableItem>) {
65 let mut queue_take_lock = TAKE_FROM_QUEUE_MUTEX.lock();
66
67 while let Ok(file_to_flush) = flush_channel_receiver.recv() {
68 let mut file_lock = file_to_flush.underlying_file.lock();
70 let mut file = file_lock.get_file();
71 let _writing_check = WRITING_CHECK.read();
72 drop(queue_take_lock);
73
74 match file_to_flush.mode {
75 FileFlushMode::Append {
76 chunk: mut file_chunk,
77 } => {
78 if let FileChunk::OnMemory { chunk } = file_chunk.deref_mut() {
79 let _stat = AtomicCounterGuardSum::new(&COUNTER_WRITING_APPEND, 1);
80 GLOBAL_QUEUE_MAX_SIZE_NOW.max(flush_channel_receiver.len() as i64);
81 COUNTER_DISK_FLUSHES.inc();
82
83 let offset = file.stream_position().unwrap();
84
85 file.write_all(chunk.get())
86 .map_err(|e| {
87 format!(
88 "Error while writing file: '{}' => {}",
89 file_lock.get_path().display(),
90 e
91 )
92 })
93 .unwrap();
94
95 stats::add_disk_usage(chunk.len() as u64);
96
97 let len = chunk.len();
98 *file_chunk = FileChunk::OnDisk { offset, len };
99 COUNTER_BYTES_WRITTEN.inc_by(len as i64);
100 }
101 }
102 FileFlushMode::WriteAt { buffer, offset } => {
103 let _writing_check = WRITING_CHECK.read();
104 GLOBAL_QUEUE_MAX_SIZE_NOW.max(flush_channel_receiver.len() as i64);
105
106 COUNTER_DISK_FLUSHES.inc();
107 let _stat = AtomicCounterGuardSum::new(&COUNTER_WRITE_AT, 1);
108 file.seek(SeekFrom::Start(offset)).unwrap();
109 file.write_all(buffer.get()).unwrap();
110 COUNTER_BYTES_WRITTEN.inc_by(buffer.get().len() as i64);
111 }
112 }
113
114 drop(_writing_check);
115 drop(file);
116 drop(file_lock);
117 queue_take_lock = TAKE_FROM_QUEUE_MUTEX.lock();
119 }
120 }
121
122 pub fn is_initialized() -> bool {
123 return unsafe { GLOBAL_FLUSH_QUEUE.is_some() };
124 }
125
126 pub fn init(flush_queue_size: usize, threads_count: usize) {
127 let (flush_channel_sender, flush_channel_receiver) = bounded(flush_queue_size);
128
129 unsafe {
130 GLOBAL_FLUSH_QUEUE = Some(flush_channel_sender);
131 }
132
133 for _ in 0..max(1, threads_count) {
134 let flush_channel_receiver = flush_channel_receiver.clone();
135 FLUSH_THREADS.lock().push(
136 std::thread::Builder::new()
137 .name(String::from("flushing-thread"))
138 .spawn(move || Self::flush_thread(flush_channel_receiver))
139 .unwrap(),
140 );
141 }
142 }
143
144 pub fn schedule_disk_write(file: Arc<Mutex<FileHandle>>, buffer: AllocatedChunk, offset: u64) {
145 Self::add_item_to_flush_queue(FlushableItem {
146 underlying_file: file,
147 mode: FileFlushMode::WriteAt { buffer, offset },
148 })
149 }
150
151 pub fn flush_to_disk() {
152 while !unsafe { GLOBAL_FLUSH_QUEUE.as_mut().unwrap().is_empty() } {
153 std::thread::sleep(Duration::from_millis(50));
154 }
155 drop(WRITING_CHECK.write());
157 }
158
159 pub fn terminate() {
160 Self::flush_to_disk();
161 unsafe {
162 GLOBAL_FLUSH_QUEUE.take();
163 }
164 let mut threads = FLUSH_THREADS.lock();
165 for thread in threads.drain(..) {
166 thread.join().unwrap();
167 }
168 }
169}