parallel_processor/memory_fs/file/
flush.rs1use crate::memory_fs::allocator::AllocatedChunk;
2use crate::memory_fs::file::internal::FileChunk;
3use crate::memory_fs::flushable_buffer::{FileFlushMode, FlushableItem};
4use crate::memory_fs::stats;
5use crossbeam::channel::*;
6use mt_debug_counters::counter::{AtomicCounter, AtomicCounterGuardSum, MaxMode, SumMode};
7use parking_lot::lock_api::{RawMutex, RawRwLock};
8use parking_lot::{Mutex, RwLock};
9use std::cmp::max;
10use std::io::{Seek, SeekFrom, Write};
11use std::ops::DerefMut;
12use std::sync::Arc;
13use std::thread::JoinHandle;
14use std::time::Duration;
15
16use super::handle::FileHandle;
17
18static mut GLOBAL_FLUSH_QUEUE: Option<Sender<FlushableItem>> = None;
19static FLUSH_THREADS: Mutex<Vec<JoinHandle<()>>> =
20 Mutex::const_new(parking_lot::RawMutex::INIT, vec![]);
21
22static TAKE_FROM_QUEUE_MUTEX: Mutex<()> = Mutex::const_new(parking_lot::RawMutex::INIT, ());
23static WRITING_CHECK: RwLock<()> = RwLock::const_new(parking_lot::RawRwLock::INIT, ());
24
25static COUNTER_WRITING_APPEND: AtomicCounter<SumMode> =
26 declare_counter_i64!("threads_file_append_count", SumMode, false);
27
28static COUNTER_WRITE_AT: AtomicCounter<SumMode> =
29 declare_counter_i64!("threads_write_at_count", SumMode, false);
30
31static COUNTER_DISK_FLUSHES: AtomicCounter<SumMode> =
32 declare_counter_i64!("disk_flushes", SumMode, false);
33
34static COUNTER_BYTES_WRITTEN: AtomicCounter<SumMode> =
35 declare_counter_i64!("bytes_written_count", SumMode, false);
36
37static GLOBAL_QUEUE_MAX_SIZE_NOW: AtomicCounter<MaxMode> =
38 declare_counter_i64!("global_queue_max_size_now", MaxMode, true);
39
40pub struct GlobalFlush;
41
42#[allow(static_mut_refs)]
43impl GlobalFlush {
44 pub fn global_queue_occupation() -> (usize, usize) {
45 unsafe {
46 (
47 GLOBAL_FLUSH_QUEUE.as_ref().unwrap().len(),
48 GLOBAL_FLUSH_QUEUE
49 .as_ref()
50 .unwrap()
51 .capacity()
52 .unwrap_or(usize::MAX),
53 )
54 }
55 }
56
57 pub fn is_queue_empty() -> bool {
58 unsafe { GLOBAL_FLUSH_QUEUE.as_ref().unwrap().len() == 0 }
59 }
60
61 pub fn add_item_to_flush_queue(item: FlushableItem) {
62 unsafe { GLOBAL_FLUSH_QUEUE.as_ref().unwrap().send(item).unwrap() }
63 }
64
65 fn flush_thread(flush_channel_receiver: Receiver<FlushableItem>) {
66 let mut queue_take_lock = TAKE_FROM_QUEUE_MUTEX.lock();
67
68 while let Ok(file_to_flush) = flush_channel_receiver.recv() {
69 let mut file_lock = file_to_flush.underlying_file.lock();
71 let mut file = file_lock.get_file();
72 let _writing_check = WRITING_CHECK.read();
73 drop(queue_take_lock);
74
75 match file_to_flush.mode {
76 FileFlushMode::Append {
77 chunk: mut file_chunk,
78 } => {
79 if let FileChunk::OnMemory { chunk } = file_chunk.deref_mut() {
80 let _stat = AtomicCounterGuardSum::new(&COUNTER_WRITING_APPEND, 1);
81 GLOBAL_QUEUE_MAX_SIZE_NOW.max(flush_channel_receiver.len() as i64);
82 COUNTER_DISK_FLUSHES.inc();
83
84 let offset = file.stream_position().unwrap();
85
86 file.write_all(chunk.get())
87 .map_err(|e| {
88 format!(
89 "Error while writing file: '{}' => {}",
90 file_lock.get_path().display(),
91 e
92 )
93 })
94 .unwrap();
95
96 stats::add_disk_usage(chunk.len() as u64);
97
98 let len = chunk.len();
99 *file_chunk = FileChunk::OnDisk { offset, len };
100 COUNTER_BYTES_WRITTEN.inc_by(len as i64);
101 }
102 }
103 FileFlushMode::WriteAt { buffer, offset } => {
104 let _writing_check = WRITING_CHECK.read();
105 GLOBAL_QUEUE_MAX_SIZE_NOW.max(flush_channel_receiver.len() as i64);
106
107 COUNTER_DISK_FLUSHES.inc();
108 let _stat = AtomicCounterGuardSum::new(&COUNTER_WRITE_AT, 1);
109 file.seek(SeekFrom::Start(offset)).unwrap();
110 file.write_all(buffer.get()).unwrap();
111 COUNTER_BYTES_WRITTEN.inc_by(buffer.get().len() as i64);
112 }
113 }
114
115 drop(_writing_check);
116 drop(file);
117 drop(file_lock);
118 queue_take_lock = TAKE_FROM_QUEUE_MUTEX.lock();
120 }
121 }
122
123 pub fn is_initialized() -> bool {
124 return unsafe { GLOBAL_FLUSH_QUEUE.is_some() };
125 }
126
127 pub fn init(flush_queue_size: usize, threads_count: usize) {
128 let (flush_channel_sender, flush_channel_receiver) = bounded(flush_queue_size);
129
130 unsafe {
131 GLOBAL_FLUSH_QUEUE = Some(flush_channel_sender);
132 }
133
134 for _ in 0..max(1, threads_count) {
135 let flush_channel_receiver = flush_channel_receiver.clone();
136 FLUSH_THREADS.lock().push(
137 std::thread::Builder::new()
138 .name(String::from("flushing-thread"))
139 .spawn(move || Self::flush_thread(flush_channel_receiver))
140 .unwrap(),
141 );
142 }
143 }
144
145 pub fn schedule_disk_write(file: Arc<Mutex<FileHandle>>, buffer: AllocatedChunk, offset: u64) {
146 Self::add_item_to_flush_queue(FlushableItem {
147 underlying_file: file,
148 mode: FileFlushMode::WriteAt { buffer, offset },
149 })
150 }
151
152 pub fn flush_to_disk() {
153 while !unsafe { GLOBAL_FLUSH_QUEUE.as_ref().unwrap().is_empty() } {
154 std::thread::sleep(Duration::from_millis(50));
155 }
156 drop(WRITING_CHECK.write());
158 }
159
160 pub fn terminate() {
161 Self::flush_to_disk();
162 unsafe {
163 GLOBAL_FLUSH_QUEUE.take();
164 }
165 let mut threads = FLUSH_THREADS.lock();
166 for thread in threads.drain(..) {
167 thread.join().unwrap();
168 }
169 }
170}