1#![cfg_attr(
9 not(test),
10 expect(
11 clippy::useless_conversion,
12 reason = "cfg(test) and cfg(not(test)) have a different definition \
13 of `SystemTime`, so conversions below are needed in \
14 one mode but not the other, just ignore the lint in this \
15 module in not(test) mode where the conversion isn't required",
16 )
17)]
18
19use super::{CacheConfig, fs_write_atomic};
20use log::{debug, info, trace, warn};
21use serde_derive::{Deserialize, Serialize};
22use std::cmp;
23use std::collections::HashMap;
24use std::ffi::OsStr;
25use std::fmt;
26use std::fs;
27use std::path::{Path, PathBuf};
28use std::sync::mpsc::{Receiver, SyncSender, sync_channel};
29#[cfg(test)]
30use std::sync::{Arc, Condvar, Mutex};
31use std::thread;
32use std::time::Duration;
33#[cfg(not(test))]
34use std::time::SystemTime;
35#[cfg(test)]
36use tests::system_time_stub::SystemTimeStub as SystemTime;
37
38#[derive(Clone)]
39pub(super) struct Worker {
40 sender: SyncSender<CacheEvent>,
41 #[cfg(test)]
42 stats: Arc<(Mutex<WorkerStats>, Condvar)>,
43}
44
45struct WorkerThread {
46 receiver: Receiver<CacheEvent>,
47 cache_config: CacheConfig,
48 #[cfg(test)]
49 stats: Arc<(Mutex<WorkerStats>, Condvar)>,
50}
51
52#[cfg(test)]
53#[derive(Default)]
54struct WorkerStats {
55 dropped: u32,
56 sent: u32,
57 handled: u32,
58}
59
60#[derive(Debug, Clone)]
61enum CacheEvent {
62 OnCacheGet(PathBuf),
63 OnCacheUpdate(PathBuf),
64}
65
66impl Worker {
67 pub(super) fn start_new(cache_config: &CacheConfig) -> Self {
68 let queue_size = match cache_config.worker_event_queue_size() {
69 num if num <= usize::max_value() as u64 => num as usize,
70 _ => usize::max_value(),
71 };
72 let (tx, rx) = sync_channel(queue_size);
73
74 #[cfg(test)]
75 let stats = Arc::new((Mutex::new(WorkerStats::default()), Condvar::new()));
76
77 let worker_thread = WorkerThread {
78 receiver: rx,
79 cache_config: cache_config.clone(),
80 #[cfg(test)]
81 stats: stats.clone(),
82 };
83
84 thread::spawn(move || worker_thread.run());
88
89 Self {
90 sender: tx,
91 #[cfg(test)]
92 stats,
93 }
94 }
95
96 pub(super) fn on_cache_get_async(&self, path: impl AsRef<Path>) {
97 let event = CacheEvent::OnCacheGet(path.as_ref().to_path_buf());
98 self.send_cache_event(event);
99 }
100
101 pub(super) fn on_cache_update_async(&self, path: impl AsRef<Path>) {
102 let event = CacheEvent::OnCacheUpdate(path.as_ref().to_path_buf());
103 self.send_cache_event(event);
104 }
105
106 #[inline]
107 fn send_cache_event(&self, event: CacheEvent) {
108 let sent_event = self.sender.try_send(event.clone());
109
110 if let Err(ref err) = sent_event {
111 info!(
112 "Failed to send asynchronously message to worker thread, \
113 event: {:?}, error: {}",
114 event, err
115 );
116 }
117
118 #[cfg(test)]
119 {
120 let mut stats = self
121 .stats
122 .0
123 .lock()
124 .expect("Failed to acquire worker stats lock");
125
126 if sent_event.is_ok() {
127 stats.sent += 1;
128 } else {
129 stats.dropped += 1;
130 }
131 }
132 }
133
134 #[cfg(test)]
135 pub(super) fn events_dropped(&self) -> u32 {
136 let stats = self
137 .stats
138 .0
139 .lock()
140 .expect("Failed to acquire worker stats lock");
141 stats.dropped
142 }
143
144 #[cfg(test)]
145 pub(super) fn wait_for_all_events_handled(&self) {
146 let (stats, condvar) = &*self.stats;
147 let mut stats = stats.lock().expect("Failed to acquire worker stats lock");
148 while stats.handled != stats.sent {
149 stats = condvar
150 .wait(stats)
151 .expect("Failed to reacquire worker stats lock");
152 }
153 }
154}
155
156impl fmt::Debug for Worker {
157 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
158 f.debug_struct("Worker").finish()
159 }
160}
161
162#[derive(Serialize, Deserialize)]
163struct ModuleCacheStatistics {
164 pub usages: u64,
165 #[serde(rename = "optimized-compression")]
166 pub compression_level: i32,
167}
168
169impl ModuleCacheStatistics {
170 fn default(cache_config: &CacheConfig) -> Self {
171 Self {
172 usages: 0,
173 compression_level: cache_config.baseline_compression_level(),
174 }
175 }
176}
177
178enum CacheEntry {
179 Recognized {
180 path: PathBuf,
181 mtime: SystemTime,
182 size: u64,
183 },
184 Unrecognized {
185 path: PathBuf,
186 is_dir: bool,
187 },
188}
189
190macro_rules! unwrap_or_warn {
191 ($result:expr, $cont:stmt, $err_msg:expr, $path:expr) => {
192 match $result {
193 Ok(val) => val,
194 Err(err) => {
195 warn!("{}, path: {}, msg: {}", $err_msg, $path.display(), err);
196 $cont
197 }
198 }
199 };
200}
201
202impl WorkerThread {
203 fn run(self) {
204 debug!("Cache worker thread started.");
205
206 Self::lower_thread_priority();
207
208 #[cfg(test)]
209 let (stats, condvar) = &*self.stats;
210
211 for event in self.receiver.iter() {
212 match event {
213 CacheEvent::OnCacheGet(path) => self.handle_on_cache_get(path),
214 CacheEvent::OnCacheUpdate(path) => self.handle_on_cache_update(path),
215 }
216
217 #[cfg(test)]
218 {
219 let mut stats = stats.lock().expect("Failed to acquire worker stats lock");
220 stats.handled += 1;
221 condvar.notify_all();
222 }
223 }
224 }
225
226 #[cfg(target_os = "fuchsia")]
227 fn lower_thread_priority() {
228 warn!(
231 "Lowering thread priority on Fuchsia is currently a noop. It might affect application performance."
232 );
233 }
234
235 #[cfg(target_os = "windows")]
236 fn lower_thread_priority() {
237 use windows_sys::Win32::System::Threading::*;
238
239 if unsafe {
243 SetThreadPriority(
244 GetCurrentThread(),
245 THREAD_MODE_BACKGROUND_BEGIN.try_into().unwrap(),
246 )
247 } == 0
248 {
249 warn!(
250 "Failed to lower worker thread priority. It might affect application performance."
251 );
252 }
253 }
254
255 #[cfg(not(any(target_os = "windows", target_os = "fuchsia")))]
256 fn lower_thread_priority() {
257 const NICE_DELTA_FOR_BACKGROUND_TASKS: i32 = 3;
260
261 match rustix::process::nice(NICE_DELTA_FOR_BACKGROUND_TASKS) {
262 Ok(current_nice) => {
263 debug!("New nice value of worker thread: {}", current_nice);
264 }
265 Err(err) => {
266 warn!(
267 "Failed to lower worker thread priority ({:?}). It might affect application performance.",
268 err
269 );
270 }
271 };
272 }
273
274 fn handle_on_cache_get(&self, path: PathBuf) {
277 trace!("handle_on_cache_get() for path: {}", path.display());
278
279 let filename = path.file_name().unwrap().to_str().unwrap();
281 let stats_path = path.with_file_name(format!("{filename}.stats"));
282
283 let mut stats = read_stats_file(stats_path.as_ref())
285 .unwrap_or_else(|| ModuleCacheStatistics::default(&self.cache_config));
286
287 stats.usages += 1;
291 if !write_stats_file(stats_path.as_ref(), &stats) {
292 return;
293 }
294
295 let opt_compr_lvl = self.cache_config.optimized_compression_level();
297 if stats.compression_level >= opt_compr_lvl
298 || stats.usages
299 < self
300 .cache_config
301 .optimized_compression_usage_counter_threshold()
302 {
303 return;
304 }
305
306 let lock_path = if let Some(p) = acquire_task_fs_lock(
307 path.as_ref(),
308 self.cache_config.optimizing_compression_task_timeout(),
309 self.cache_config
310 .allowed_clock_drift_for_files_from_future(),
311 ) {
312 p
313 } else {
314 return;
315 };
316
317 trace!("Trying to recompress file: {}", path.display());
318
319 let compressed_cache_bytes = unwrap_or_warn!(
322 fs::read(&path),
323 return,
324 "Failed to read old cache file",
325 path
326 );
327
328 let cache_bytes = unwrap_or_warn!(
329 zstd::decode_all(&compressed_cache_bytes[..]),
330 return,
331 "Failed to decompress cached code",
332 path
333 );
334
335 let recompressed_cache_bytes = unwrap_or_warn!(
336 zstd::encode_all(&cache_bytes[..], opt_compr_lvl),
337 return,
338 "Failed to compress cached code",
339 path
340 );
341
342 unwrap_or_warn!(
343 fs::write(&lock_path, &recompressed_cache_bytes),
344 return,
345 "Failed to write recompressed cache",
346 lock_path
347 );
348
349 unwrap_or_warn!(
350 fs::rename(&lock_path, &path),
351 {
352 if let Err(error) = fs::remove_file(&lock_path) {
353 warn!(
354 "Failed to clean up (remove) recompressed cache, path {}, err: {}",
355 lock_path.display(),
356 error
357 );
358 }
359
360 return;
361 },
362 "Failed to rename recompressed cache",
363 lock_path
364 );
365
366 if let Some(mut new_stats) = read_stats_file(stats_path.as_ref()) {
368 if new_stats.compression_level >= opt_compr_lvl {
369 debug!(
375 "DETECTED task did more than once (or race with new file): \
376 recompression of {}. Note: if optimized compression level setting \
377 has changed in the meantine, the stats file might contain \
378 inconsistent compression level due to race.",
379 path.display()
380 );
381 } else {
382 new_stats.compression_level = opt_compr_lvl;
383 let _ = write_stats_file(stats_path.as_ref(), &new_stats);
384 }
385
386 if new_stats.usages < stats.usages {
387 debug!(
388 "DETECTED lower usage count (new file or race with counter \
389 increasing): file {}",
390 path.display()
391 );
392 }
393 } else {
394 debug!(
395 "Can't read stats file again to update compression level (it might got \
396 cleaned up): file {}",
397 stats_path.display()
398 );
399 }
400
401 trace!("Task finished: recompress file: {}", path.display());
402 }
403
404 fn handle_on_cache_update(&self, path: PathBuf) {
405 trace!("handle_on_cache_update() for path: {}", path.display());
406
407 let filename = path
411 .file_name()
412 .expect("Expected valid cache file name")
413 .to_str()
414 .expect("Expected valid cache file name");
415 let stats_path = path.with_file_name(format!("{filename}.stats"));
416
417 let mut stats = ModuleCacheStatistics::default(&self.cache_config);
419 stats.usages += 1;
420 write_stats_file(&stats_path, &stats);
421
422 let cleanup_file = self.cache_config.directory().join(".cleanup"); if acquire_task_fs_lock(
429 &cleanup_file,
430 self.cache_config.cleanup_interval(),
431 self.cache_config
432 .allowed_clock_drift_for_files_from_future(),
433 )
434 .is_none()
435 {
436 return;
437 }
438
439 trace!("Trying to clean up cache");
440
441 let mut cache_index = self.list_cache_contents();
442 let future_tolerance = SystemTime::now()
443 .checked_add(
444 self.cache_config
445 .allowed_clock_drift_for_files_from_future(),
446 )
447 .expect("Brace your cache, the next Big Bang is coming (time overflow)");
448 cache_index.sort_unstable_by(|lhs, rhs| {
449 use CacheEntry::*;
451 match (lhs, rhs) {
452 (Recognized { mtime: lhs_mt, .. }, Recognized { mtime: rhs_mt, .. }) => {
453 match (*lhs_mt > future_tolerance, *rhs_mt > future_tolerance) {
454 (false, false) => rhs_mt.cmp(lhs_mt),
456 (true, false) => cmp::Ordering::Greater,
461 (false, true) => cmp::Ordering::Less,
462 (true, true) => cmp::Ordering::Equal,
463 }
464 }
465 (Recognized { .. }, Unrecognized { .. }) => cmp::Ordering::Less,
467 (Unrecognized { .. }, Recognized { .. }) => cmp::Ordering::Greater,
468 (Unrecognized { .. }, Unrecognized { .. }) => cmp::Ordering::Equal,
469 }
470 });
471
472 let mut total_size = 0u64;
476 let mut start_delete_idx = None;
477 let mut start_delete_idx_if_deleting_recognized_items: Option<usize> = None;
478
479 let total_size_limit = self.cache_config.files_total_size_soft_limit();
480 let file_count_limit = self.cache_config.file_count_soft_limit();
481 let tsl_if_deleting = total_size_limit
482 .checked_mul(
483 self.cache_config
484 .files_total_size_limit_percent_if_deleting() as u64,
485 )
486 .unwrap()
487 / 100;
488 let fcl_if_deleting = file_count_limit
489 .checked_mul(self.cache_config.file_count_limit_percent_if_deleting() as u64)
490 .unwrap()
491 / 100;
492
493 for (idx, item) in cache_index.iter().enumerate() {
494 let size = if let CacheEntry::Recognized { size, .. } = item {
495 size
496 } else {
497 start_delete_idx = Some(idx);
498 break;
499 };
500
501 total_size += size;
502 if start_delete_idx_if_deleting_recognized_items.is_none()
503 && (total_size > tsl_if_deleting || (idx + 1) as u64 > fcl_if_deleting)
504 {
505 start_delete_idx_if_deleting_recognized_items = Some(idx);
506 }
507
508 if total_size > total_size_limit || (idx + 1) as u64 > file_count_limit {
509 start_delete_idx = start_delete_idx_if_deleting_recognized_items;
510 break;
511 }
512 }
513
514 if let Some(idx) = start_delete_idx {
515 for item in &cache_index[idx..] {
516 let (result, path, entity) = match item {
517 CacheEntry::Recognized { path, .. }
518 | CacheEntry::Unrecognized {
519 path,
520 is_dir: false,
521 } => (fs::remove_file(path), path, "file"),
522 CacheEntry::Unrecognized { path, is_dir: true } => {
523 (fs::remove_dir_all(path), path, "directory")
524 }
525 };
526 if let Err(err) = result {
527 warn!(
528 "Failed to remove {} during cleanup, path: {}, err: {}",
529 entity,
530 path.display(),
531 err
532 );
533 }
534 }
535 }
536
537 trace!("Task finished: clean up cache");
538 }
539
540 fn list_cache_contents(&self) -> Vec<CacheEntry> {
542 fn enter_dir(
543 vec: &mut Vec<CacheEntry>,
544 dir_path: &Path,
545 level: u8,
546 cache_config: &CacheConfig,
547 ) {
548 macro_rules! add_unrecognized {
549 (file: $path:expr) => {
550 add_unrecognized!(false, $path)
551 };
552 (dir: $path:expr) => {
553 add_unrecognized!(true, $path)
554 };
555 ($is_dir:expr, $path:expr) => {
556 vec.push(CacheEntry::Unrecognized {
557 path: $path.to_path_buf(),
558 is_dir: $is_dir,
559 })
560 };
561 }
562 macro_rules! add_unrecognized_and {
563 ([ $( $ty:ident: $path:expr ),* ], $cont:stmt) => {{
564 $( add_unrecognized!($ty: $path); )*
565 $cont
566 }};
567 }
568
569 macro_rules! unwrap_or {
570 ($result:expr, $cont:stmt, $err_msg:expr) => {
571 unwrap_or!($result, $cont, $err_msg, dir_path)
572 };
573 ($result:expr, $cont:stmt, $err_msg:expr, $path:expr) => {
574 unwrap_or_warn!(
575 $result,
576 $cont,
577 format!("{}, level: {}", $err_msg, level),
578 $path
579 )
580 };
581 }
582
583 let it = unwrap_or!(
588 fs::read_dir(dir_path),
589 add_unrecognized_and!([dir: dir_path], return),
590 "Failed to list cache directory, deleting it"
591 );
592
593 let mut cache_files = HashMap::new();
594 for entry in it {
595 let entry = unwrap_or!(
599 entry,
600 continue,
601 "Failed to read a cache dir entry (NOT deleting it, it still occupies space)"
602 );
603 let path = entry.path();
604 match (level, path.is_dir()) {
605 (0..=1, true) => enter_dir(vec, &path, level + 1, cache_config),
606 (0..=1, false) => {
607 if level == 0
608 && path.file_stem() == Some(OsStr::new(".cleanup"))
609 && path.extension().is_some()
610 && !is_fs_lock_expired(
612 Some(&entry),
613 &path,
614 cache_config.cleanup_interval(),
615 cache_config.allowed_clock_drift_for_files_from_future(),
616 )
617 {
618 continue; }
620 add_unrecognized!(file: path);
621 }
622 (2, false) => {
623 match path.extension().and_then(OsStr::to_str) {
624 None | Some("stats") => {
626 cache_files.insert(path, entry);
627 }
628
629 Some(ext) => {
630 let recognized = ext.starts_with("wip-")
632 && !is_fs_lock_expired(
633 Some(&entry),
634 &path,
635 cache_config.optimizing_compression_task_timeout(),
636 cache_config.allowed_clock_drift_for_files_from_future(),
637 );
638
639 if !recognized {
640 add_unrecognized!(file: path);
641 }
642 }
643 }
644 }
645 (_, is_dir) => add_unrecognized!(is_dir, path),
646 }
647 }
648
649 for (path, entry) in cache_files.iter() {
652 let path_buf: PathBuf;
653 let (mod_, stats_, is_mod) = match path.extension() {
654 Some(_) => {
655 path_buf = path.with_extension("");
656 (
657 cache_files.get(&path_buf).map(|v| (&path_buf, v)),
658 Some((path, entry)),
659 false,
660 )
661 }
662 None => {
663 path_buf = path.with_extension("stats");
664 (
665 Some((path, entry)),
666 cache_files.get(&path_buf).map(|v| (&path_buf, v)),
667 true,
668 )
669 }
670 };
671
672 match (mod_, stats_, is_mod) {
674 (Some((mod_path, mod_entry)), Some((stats_path, stats_entry)), true) => {
675 let mod_metadata = unwrap_or!(
676 mod_entry.metadata(),
677 add_unrecognized_and!([file: stats_path, file: mod_path], continue),
678 "Failed to get metadata, deleting BOTH module cache and stats files",
679 mod_path
680 );
681 let stats_mtime = unwrap_or!(
682 stats_entry.metadata().and_then(|m| m.modified()),
683 add_unrecognized_and!(
684 [file: stats_path],
685 unwrap_or!(
686 mod_metadata.modified(),
687 add_unrecognized_and!(
688 [file: stats_path, file: mod_path],
689 continue
690 ),
691 "Failed to get mtime, deleting BOTH module cache and stats \
692 files",
693 mod_path
694 )
695 ),
696 "Failed to get metadata/mtime, deleting the file",
697 stats_path
698 );
699 vec.push(CacheEntry::Recognized {
701 path: mod_path.to_path_buf(),
702 mtime: stats_mtime.into(),
703 size: mod_metadata.len(),
704 })
705 }
706 (Some(_), Some(_), false) => (), (Some((mod_path, mod_entry)), None, _) => {
708 let (mod_metadata, mod_mtime) = unwrap_or!(
709 mod_entry
710 .metadata()
711 .and_then(|md| md.modified().map(|mt| (md, mt))),
712 add_unrecognized_and!([file: mod_path], continue),
713 "Failed to get metadata/mtime, deleting the file",
714 mod_path
715 );
716 vec.push(CacheEntry::Recognized {
718 path: mod_path.to_path_buf(),
719 mtime: mod_mtime.into(),
720 size: mod_metadata.len(),
721 })
722 }
723 (None, Some((stats_path, _stats_entry)), _) => {
724 debug!("Found orphaned stats file: {}", stats_path.display());
725 add_unrecognized!(file: stats_path);
726 }
727 _ => unreachable!(),
728 }
729 }
730 }
731
732 let mut vec = Vec::new();
733 enter_dir(
734 &mut vec,
735 self.cache_config.directory(),
736 0,
737 &self.cache_config,
738 );
739 vec
740 }
741}
742
743fn read_stats_file(path: &Path) -> Option<ModuleCacheStatistics> {
744 fs::read_to_string(path)
745 .map_err(|err| {
746 trace!(
747 "Failed to read stats file, path: {}, err: {}",
748 path.display(),
749 err
750 )
751 })
752 .and_then(|contents| {
753 toml::from_str::<ModuleCacheStatistics>(&contents).map_err(|err| {
754 trace!(
755 "Failed to parse stats file, path: {}, err: {}",
756 path.display(),
757 err,
758 )
759 })
760 })
761 .ok()
762}
763
764fn write_stats_file(path: &Path, stats: &ModuleCacheStatistics) -> bool {
765 toml::to_string_pretty(&stats)
766 .map_err(|err| {
767 warn!(
768 "Failed to serialize stats file, path: {}, err: {}",
769 path.display(),
770 err
771 )
772 })
773 .and_then(|serialized| {
774 fs_write_atomic(path, "stats", serialized.as_bytes()).map_err(|_| ())
775 })
776 .is_ok()
777}
778
779fn acquire_task_fs_lock(
791 task_path: &Path,
792 timeout: Duration,
793 allowed_future_drift: Duration,
794) -> Option<PathBuf> {
795 assert!(task_path.extension().is_none());
796 assert!(task_path.file_stem().is_some());
797
798 let dir_path = task_path.parent()?;
800 let it = fs::read_dir(dir_path)
801 .map_err(|err| {
802 warn!(
803 "Failed to list cache directory, path: {}, err: {}",
804 dir_path.display(),
805 err
806 )
807 })
808 .ok()?;
809
810 for entry in it {
812 let entry = entry
813 .map_err(|err| {
814 warn!(
815 "Failed to list cache directory, path: {}, err: {}",
816 dir_path.display(),
817 err
818 )
819 })
820 .ok()?;
821
822 let path = entry.path();
823 if path.is_dir() || path.file_stem() != task_path.file_stem() {
824 continue;
825 }
826
827 match path.extension() {
829 None => continue,
830 Some(ext) => {
831 if let Some(ext_str) = ext.to_str() {
832 if ext_str.starts_with("wip-")
834 && !is_fs_lock_expired(Some(&entry), &path, timeout, allowed_future_drift)
835 {
836 return None;
837 }
838 }
839 }
840 }
841 }
842
843 let lock_path = task_path.with_extension(format!("wip-{}", std::process::id()));
845 let _file = fs::OpenOptions::new()
846 .create_new(true)
847 .write(true)
848 .open(&lock_path)
849 .map_err(|err| {
850 warn!(
851 "Failed to create lock file (note: it shouldn't exists): path: {}, err: {}",
852 lock_path.display(),
853 err
854 )
855 })
856 .ok()?;
857
858 Some(lock_path)
859}
860
861fn is_fs_lock_expired(
865 entry: Option<&fs::DirEntry>,
866 path: &PathBuf,
867 threshold: Duration,
868 allowed_future_drift: Duration,
869) -> bool {
870 let mtime = match entry
871 .map_or_else(|| path.metadata(), |e| e.metadata())
872 .and_then(|metadata| metadata.modified())
873 {
874 Ok(mt) => mt,
875 Err(err) => {
876 warn!(
877 "Failed to get metadata/mtime, treating as an expired lock, path: {}, err: {}",
878 path.display(),
879 err
880 );
881 return true; }
883 };
884
885 match SystemTime::now().duration_since(mtime) {
887 Ok(elapsed) => elapsed >= threshold,
888 Err(err) => {
889 trace!(
890 "Found mtime in the future, treating as a not expired lock, path: {}, err: {}",
891 path.display(),
892 err
893 );
894 err.duration() > allowed_future_drift
898 }
899 }
900}
901
902#[cfg(test)]
903mod tests;