wasmtime_cache/
worker.rs

1//! Background worker that watches over the cache.
2//!
3//! It cleans up old cache, updates statistics and optimizes the cache.
4//! We allow losing some messages (it doesn't hurt) and some races,
5//! but we guarantee eventual consistency and fault tolerancy.
6//! Background tasks can be CPU intensive, but the worker thread has low priority.
7
8#![cfg_attr(
9    not(test),
10    expect(
11        clippy::useless_conversion,
12        reason = "cfg(test) and cfg(not(test)) have a different definition \
13                  of `SystemTime`, so conversions below are needed in \
14                  one mode but not the other, just ignore the lint in this \
15                  module in not(test) mode where the conversion isn't required",
16    )
17)]
18
19use super::{CacheConfig, fs_write_atomic};
20use log::{debug, info, trace, warn};
21use serde_derive::{Deserialize, Serialize};
22use std::cmp;
23use std::collections::HashMap;
24use std::ffi::OsStr;
25use std::fmt;
26use std::fs;
27use std::path::{Path, PathBuf};
28use std::sync::mpsc::{Receiver, SyncSender, sync_channel};
29#[cfg(test)]
30use std::sync::{Arc, Condvar, Mutex};
31use std::thread;
32use std::time::Duration;
33#[cfg(not(test))]
34use std::time::SystemTime;
35#[cfg(test)]
36use tests::system_time_stub::SystemTimeStub as SystemTime;
37
38#[derive(Clone)]
39pub(super) struct Worker {
40    sender: SyncSender<CacheEvent>,
41    #[cfg(test)]
42    stats: Arc<(Mutex<WorkerStats>, Condvar)>,
43}
44
45struct WorkerThread {
46    receiver: Receiver<CacheEvent>,
47    cache_config: CacheConfig,
48    #[cfg(test)]
49    stats: Arc<(Mutex<WorkerStats>, Condvar)>,
50}
51
52#[cfg(test)]
53#[derive(Default)]
54struct WorkerStats {
55    dropped: u32,
56    sent: u32,
57    handled: u32,
58}
59
60#[derive(Debug, Clone)]
61enum CacheEvent {
62    OnCacheGet(PathBuf),
63    OnCacheUpdate(PathBuf),
64}
65
66impl Worker {
67    pub(super) fn start_new(cache_config: &CacheConfig) -> Self {
68        let queue_size = match cache_config.worker_event_queue_size() {
69            num if num <= usize::max_value() as u64 => num as usize,
70            _ => usize::max_value(),
71        };
72        let (tx, rx) = sync_channel(queue_size);
73
74        #[cfg(test)]
75        let stats = Arc::new((Mutex::new(WorkerStats::default()), Condvar::new()));
76
77        let worker_thread = WorkerThread {
78            receiver: rx,
79            cache_config: cache_config.clone(),
80            #[cfg(test)]
81            stats: stats.clone(),
82        };
83
84        // when self is dropped, sender will be dropped, what will cause the channel
85        // to hang, and the worker thread to exit -- it happens in the tests
86        // non-tests binary has only a static worker, so Rust doesn't drop it
87        thread::spawn(move || worker_thread.run());
88
89        Self {
90            sender: tx,
91            #[cfg(test)]
92            stats,
93        }
94    }
95
96    pub(super) fn on_cache_get_async(&self, path: impl AsRef<Path>) {
97        let event = CacheEvent::OnCacheGet(path.as_ref().to_path_buf());
98        self.send_cache_event(event);
99    }
100
101    pub(super) fn on_cache_update_async(&self, path: impl AsRef<Path>) {
102        let event = CacheEvent::OnCacheUpdate(path.as_ref().to_path_buf());
103        self.send_cache_event(event);
104    }
105
106    #[inline]
107    fn send_cache_event(&self, event: CacheEvent) {
108        let sent_event = self.sender.try_send(event.clone());
109
110        if let Err(ref err) = sent_event {
111            info!(
112                "Failed to send asynchronously message to worker thread, \
113                 event: {:?}, error: {}",
114                event, err
115            );
116        }
117
118        #[cfg(test)]
119        {
120            let mut stats = self
121                .stats
122                .0
123                .lock()
124                .expect("Failed to acquire worker stats lock");
125
126            if sent_event.is_ok() {
127                stats.sent += 1;
128            } else {
129                stats.dropped += 1;
130            }
131        }
132    }
133
134    #[cfg(test)]
135    pub(super) fn events_dropped(&self) -> u32 {
136        let stats = self
137            .stats
138            .0
139            .lock()
140            .expect("Failed to acquire worker stats lock");
141        stats.dropped
142    }
143
144    #[cfg(test)]
145    pub(super) fn wait_for_all_events_handled(&self) {
146        let (stats, condvar) = &*self.stats;
147        let mut stats = stats.lock().expect("Failed to acquire worker stats lock");
148        while stats.handled != stats.sent {
149            stats = condvar
150                .wait(stats)
151                .expect("Failed to reacquire worker stats lock");
152        }
153    }
154}
155
156impl fmt::Debug for Worker {
157    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
158        f.debug_struct("Worker").finish()
159    }
160}
161
162#[derive(Serialize, Deserialize)]
163struct ModuleCacheStatistics {
164    pub usages: u64,
165    #[serde(rename = "optimized-compression")]
166    pub compression_level: i32,
167}
168
169impl ModuleCacheStatistics {
170    fn default(cache_config: &CacheConfig) -> Self {
171        Self {
172            usages: 0,
173            compression_level: cache_config.baseline_compression_level(),
174        }
175    }
176}
177
178enum CacheEntry {
179    Recognized {
180        path: PathBuf,
181        mtime: SystemTime,
182        size: u64,
183    },
184    Unrecognized {
185        path: PathBuf,
186        is_dir: bool,
187    },
188}
189
190macro_rules! unwrap_or_warn {
191    ($result:expr, $cont:stmt, $err_msg:expr, $path:expr) => {
192        match $result {
193            Ok(val) => val,
194            Err(err) => {
195                warn!("{}, path: {}, msg: {}", $err_msg, $path.display(), err);
196                $cont
197            }
198        }
199    };
200}
201
202impl WorkerThread {
203    fn run(self) {
204        debug!("Cache worker thread started.");
205
206        Self::lower_thread_priority();
207
208        #[cfg(test)]
209        let (stats, condvar) = &*self.stats;
210
211        for event in self.receiver.iter() {
212            match event {
213                CacheEvent::OnCacheGet(path) => self.handle_on_cache_get(path),
214                CacheEvent::OnCacheUpdate(path) => self.handle_on_cache_update(path),
215            }
216
217            #[cfg(test)]
218            {
219                let mut stats = stats.lock().expect("Failed to acquire worker stats lock");
220                stats.handled += 1;
221                condvar.notify_all();
222            }
223        }
224    }
225
226    #[cfg(target_os = "fuchsia")]
227    fn lower_thread_priority() {
228        // TODO This needs to use Fuchsia thread profiles
229        // https://fuchsia.dev/fuchsia-src/reference/kernel_objects/profile
230        warn!(
231            "Lowering thread priority on Fuchsia is currently a noop. It might affect application performance."
232        );
233    }
234
235    #[cfg(target_os = "windows")]
236    fn lower_thread_priority() {
237        use windows_sys::Win32::System::Threading::*;
238
239        // https://docs.microsoft.com/en-us/windows/win32/api/processthreadsapi/nf-processthreadsapi-setthreadpriority
240        // https://docs.microsoft.com/en-us/windows/win32/procthread/scheduling-priorities
241
242        if unsafe {
243            SetThreadPriority(
244                GetCurrentThread(),
245                THREAD_MODE_BACKGROUND_BEGIN.try_into().unwrap(),
246            )
247        } == 0
248        {
249            warn!(
250                "Failed to lower worker thread priority. It might affect application performance."
251            );
252        }
253    }
254
255    #[cfg(not(any(target_os = "windows", target_os = "fuchsia")))]
256    fn lower_thread_priority() {
257        // http://man7.org/linux/man-pages/man7/sched.7.html
258
259        const NICE_DELTA_FOR_BACKGROUND_TASKS: i32 = 3;
260
261        match rustix::process::nice(NICE_DELTA_FOR_BACKGROUND_TASKS) {
262            Ok(current_nice) => {
263                debug!("New nice value of worker thread: {}", current_nice);
264            }
265            Err(err) => {
266                warn!(
267                    "Failed to lower worker thread priority ({:?}). It might affect application performance.",
268                    err
269                );
270            }
271        };
272    }
273
274    /// Increases the usage counter and recompresses the file
275    /// if the usage counter reached configurable threshold.
276    fn handle_on_cache_get(&self, path: PathBuf) {
277        trace!("handle_on_cache_get() for path: {}", path.display());
278
279        // construct .stats file path
280        let filename = path.file_name().unwrap().to_str().unwrap();
281        let stats_path = path.with_file_name(format!("{filename}.stats"));
282
283        // load .stats file (default if none or error)
284        let mut stats = read_stats_file(stats_path.as_ref())
285            .unwrap_or_else(|| ModuleCacheStatistics::default(&self.cache_config));
286
287        // step 1: update the usage counter & write to the disk
288        //         it's racy, but it's fine (the counter will be just smaller,
289        //         sometimes will retrigger recompression)
290        stats.usages += 1;
291        if !write_stats_file(stats_path.as_ref(), &stats) {
292            return;
293        }
294
295        // step 2: recompress if there's a need
296        let opt_compr_lvl = self.cache_config.optimized_compression_level();
297        if stats.compression_level >= opt_compr_lvl
298            || stats.usages
299                < self
300                    .cache_config
301                    .optimized_compression_usage_counter_threshold()
302        {
303            return;
304        }
305
306        let lock_path = if let Some(p) = acquire_task_fs_lock(
307            path.as_ref(),
308            self.cache_config.optimizing_compression_task_timeout(),
309            self.cache_config
310                .allowed_clock_drift_for_files_from_future(),
311        ) {
312            p
313        } else {
314            return;
315        };
316
317        trace!("Trying to recompress file: {}", path.display());
318
319        // recompress, write to other file, rename (it's atomic file content exchange)
320        // and update the stats file
321        let compressed_cache_bytes = unwrap_or_warn!(
322            fs::read(&path),
323            return,
324            "Failed to read old cache file",
325            path
326        );
327
328        let cache_bytes = unwrap_or_warn!(
329            zstd::decode_all(&compressed_cache_bytes[..]),
330            return,
331            "Failed to decompress cached code",
332            path
333        );
334
335        let recompressed_cache_bytes = unwrap_or_warn!(
336            zstd::encode_all(&cache_bytes[..], opt_compr_lvl),
337            return,
338            "Failed to compress cached code",
339            path
340        );
341
342        unwrap_or_warn!(
343            fs::write(&lock_path, &recompressed_cache_bytes),
344            return,
345            "Failed to write recompressed cache",
346            lock_path
347        );
348
349        unwrap_or_warn!(
350            fs::rename(&lock_path, &path),
351            {
352                if let Err(error) = fs::remove_file(&lock_path) {
353                    warn!(
354                        "Failed to clean up (remove) recompressed cache, path {}, err: {}",
355                        lock_path.display(),
356                        error
357                    );
358                }
359
360                return;
361            },
362            "Failed to rename recompressed cache",
363            lock_path
364        );
365
366        // update stats file (reload it! recompression can take some time)
367        if let Some(mut new_stats) = read_stats_file(stats_path.as_ref()) {
368            if new_stats.compression_level >= opt_compr_lvl {
369                // Rare race:
370                //    two instances with different opt_compr_lvl: we don't know in which order they updated
371                //    the cache file and the stats file (they are not updated together atomically)
372                // Possible solution is to use directories per cache entry, but it complicates the system
373                // and is not worth it.
374                debug!(
375                    "DETECTED task did more than once (or race with new file): \
376                     recompression of {}. Note: if optimized compression level setting \
377                     has changed in the meantine, the stats file might contain \
378                     inconsistent compression level due to race.",
379                    path.display()
380                );
381            } else {
382                new_stats.compression_level = opt_compr_lvl;
383                let _ = write_stats_file(stats_path.as_ref(), &new_stats);
384            }
385
386            if new_stats.usages < stats.usages {
387                debug!(
388                    "DETECTED lower usage count (new file or race with counter \
389                     increasing): file {}",
390                    path.display()
391                );
392            }
393        } else {
394            debug!(
395                "Can't read stats file again to update compression level (it might got \
396                 cleaned up): file {}",
397                stats_path.display()
398            );
399        }
400
401        trace!("Task finished: recompress file: {}", path.display());
402    }
403
404    fn handle_on_cache_update(&self, path: PathBuf) {
405        trace!("handle_on_cache_update() for path: {}", path.display());
406
407        // ---------------------- step 1: create .stats file
408
409        // construct .stats file path
410        let filename = path
411            .file_name()
412            .expect("Expected valid cache file name")
413            .to_str()
414            .expect("Expected valid cache file name");
415        let stats_path = path.with_file_name(format!("{filename}.stats"));
416
417        // create and write stats file
418        let mut stats = ModuleCacheStatistics::default(&self.cache_config);
419        stats.usages += 1;
420        write_stats_file(&stats_path, &stats);
421
422        // ---------------------- step 2: perform cleanup task if needed
423
424        // acquire lock for cleanup task
425        // Lock is a proof of recent cleanup task, so we don't want to delete them.
426        // Expired locks will be deleted by the cleanup task.
427        let cleanup_file = self.cache_config.directory().join(".cleanup"); // some non existing marker file
428        if acquire_task_fs_lock(
429            &cleanup_file,
430            self.cache_config.cleanup_interval(),
431            self.cache_config
432                .allowed_clock_drift_for_files_from_future(),
433        )
434        .is_none()
435        {
436            return;
437        }
438
439        trace!("Trying to clean up cache");
440
441        let mut cache_index = self.list_cache_contents();
442        let future_tolerance = SystemTime::now()
443            .checked_add(
444                self.cache_config
445                    .allowed_clock_drift_for_files_from_future(),
446            )
447            .expect("Brace your cache, the next Big Bang is coming (time overflow)");
448        cache_index.sort_unstable_by(|lhs, rhs| {
449            // sort by age
450            use CacheEntry::*;
451            match (lhs, rhs) {
452                (Recognized { mtime: lhs_mt, .. }, Recognized { mtime: rhs_mt, .. }) => {
453                    match (*lhs_mt > future_tolerance, *rhs_mt > future_tolerance) {
454                        // later == younger
455                        (false, false) => rhs_mt.cmp(lhs_mt),
456                        // files from far future are treated as oldest recognized files
457                        // we want to delete them, so the cache keeps track of recent files
458                        // however, we don't delete them uncodintionally,
459                        // because .stats file can be overwritten with a meaningful mtime
460                        (true, false) => cmp::Ordering::Greater,
461                        (false, true) => cmp::Ordering::Less,
462                        (true, true) => cmp::Ordering::Equal,
463                    }
464                }
465                // unrecognized is kind of infinity
466                (Recognized { .. }, Unrecognized { .. }) => cmp::Ordering::Less,
467                (Unrecognized { .. }, Recognized { .. }) => cmp::Ordering::Greater,
468                (Unrecognized { .. }, Unrecognized { .. }) => cmp::Ordering::Equal,
469            }
470        });
471
472        // find "cut" boundary:
473        // - remove unrecognized files anyway,
474        // - remove some cache files if some quota has been exceeded
475        let mut total_size = 0u64;
476        let mut start_delete_idx = None;
477        let mut start_delete_idx_if_deleting_recognized_items: Option<usize> = None;
478
479        let total_size_limit = self.cache_config.files_total_size_soft_limit();
480        let file_count_limit = self.cache_config.file_count_soft_limit();
481        let tsl_if_deleting = total_size_limit
482            .checked_mul(
483                self.cache_config
484                    .files_total_size_limit_percent_if_deleting() as u64,
485            )
486            .unwrap()
487            / 100;
488        let fcl_if_deleting = file_count_limit
489            .checked_mul(self.cache_config.file_count_limit_percent_if_deleting() as u64)
490            .unwrap()
491            / 100;
492
493        for (idx, item) in cache_index.iter().enumerate() {
494            let size = if let CacheEntry::Recognized { size, .. } = item {
495                size
496            } else {
497                start_delete_idx = Some(idx);
498                break;
499            };
500
501            total_size += size;
502            if start_delete_idx_if_deleting_recognized_items.is_none()
503                && (total_size > tsl_if_deleting || (idx + 1) as u64 > fcl_if_deleting)
504            {
505                start_delete_idx_if_deleting_recognized_items = Some(idx);
506            }
507
508            if total_size > total_size_limit || (idx + 1) as u64 > file_count_limit {
509                start_delete_idx = start_delete_idx_if_deleting_recognized_items;
510                break;
511            }
512        }
513
514        if let Some(idx) = start_delete_idx {
515            for item in &cache_index[idx..] {
516                let (result, path, entity) = match item {
517                    CacheEntry::Recognized { path, .. }
518                    | CacheEntry::Unrecognized {
519                        path,
520                        is_dir: false,
521                    } => (fs::remove_file(path), path, "file"),
522                    CacheEntry::Unrecognized { path, is_dir: true } => {
523                        (fs::remove_dir_all(path), path, "directory")
524                    }
525                };
526                if let Err(err) = result {
527                    warn!(
528                        "Failed to remove {} during cleanup, path: {}, err: {}",
529                        entity,
530                        path.display(),
531                        err
532                    );
533                }
534            }
535        }
536
537        trace!("Task finished: clean up cache");
538    }
539
540    // Be fault tolerant: list as much as you can, and ignore the rest
541    fn list_cache_contents(&self) -> Vec<CacheEntry> {
542        fn enter_dir(
543            vec: &mut Vec<CacheEntry>,
544            dir_path: &Path,
545            level: u8,
546            cache_config: &CacheConfig,
547        ) {
548            macro_rules! add_unrecognized {
549                (file: $path:expr) => {
550                    add_unrecognized!(false, $path)
551                };
552                (dir: $path:expr) => {
553                    add_unrecognized!(true, $path)
554                };
555                ($is_dir:expr, $path:expr) => {
556                    vec.push(CacheEntry::Unrecognized {
557                        path: $path.to_path_buf(),
558                        is_dir: $is_dir,
559                    })
560                };
561            }
562            macro_rules! add_unrecognized_and {
563                ([ $( $ty:ident: $path:expr ),* ], $cont:stmt) => {{
564                    $( add_unrecognized!($ty: $path); )*
565                        $cont
566                }};
567            }
568
569            macro_rules! unwrap_or {
570                ($result:expr, $cont:stmt, $err_msg:expr) => {
571                    unwrap_or!($result, $cont, $err_msg, dir_path)
572                };
573                ($result:expr, $cont:stmt, $err_msg:expr, $path:expr) => {
574                    unwrap_or_warn!(
575                        $result,
576                        $cont,
577                        format!("{}, level: {}", $err_msg, level),
578                        $path
579                    )
580                };
581            }
582
583            // If we fail to list a directory, something bad is happening anyway
584            // (something touches our cache or we have disk failure)
585            // Try to delete it, so we can stay within soft limits of the cache size.
586            // This comment applies later in this function, too.
587            let it = unwrap_or!(
588                fs::read_dir(dir_path),
589                add_unrecognized_and!([dir: dir_path], return),
590                "Failed to list cache directory, deleting it"
591            );
592
593            let mut cache_files = HashMap::new();
594            for entry in it {
595                // read_dir() returns an iterator over results - in case some of them are errors
596                // we don't know their names, so we can't delete them. We don't want to delete
597                // the whole directory with good entries too, so we just ignore the erroneous entries.
598                let entry = unwrap_or!(
599                    entry,
600                    continue,
601                    "Failed to read a cache dir entry (NOT deleting it, it still occupies space)"
602                );
603                let path = entry.path();
604                match (level, path.is_dir()) {
605                    (0..=1, true) => enter_dir(vec, &path, level + 1, cache_config),
606                    (0..=1, false) => {
607                        if level == 0
608                            && path.file_stem() == Some(OsStr::new(".cleanup"))
609                                && path.extension().is_some()
610                                // assume it's cleanup lock
611                                && !is_fs_lock_expired(
612                                    Some(&entry),
613                                    &path,
614                                    cache_config.cleanup_interval(),
615                                    cache_config.allowed_clock_drift_for_files_from_future(),
616                                )
617                        {
618                            continue; // skip active lock
619                        }
620                        add_unrecognized!(file: path);
621                    }
622                    (2, false) => {
623                        match path.extension().and_then(OsStr::to_str) {
624                            // mod or stats file
625                            None | Some("stats") => {
626                                cache_files.insert(path, entry);
627                            }
628
629                            Some(ext) => {
630                                // check if valid lock
631                                let recognized = ext.starts_with("wip-")
632                                    && !is_fs_lock_expired(
633                                        Some(&entry),
634                                        &path,
635                                        cache_config.optimizing_compression_task_timeout(),
636                                        cache_config.allowed_clock_drift_for_files_from_future(),
637                                    );
638
639                                if !recognized {
640                                    add_unrecognized!(file: path);
641                                }
642                            }
643                        }
644                    }
645                    (_, is_dir) => add_unrecognized!(is_dir, path),
646                }
647            }
648
649            // associate module with its stats & handle them
650            // assumption: just mods and stats
651            for (path, entry) in cache_files.iter() {
652                let path_buf: PathBuf;
653                let (mod_, stats_, is_mod) = match path.extension() {
654                    Some(_) => {
655                        path_buf = path.with_extension("");
656                        (
657                            cache_files.get(&path_buf).map(|v| (&path_buf, v)),
658                            Some((path, entry)),
659                            false,
660                        )
661                    }
662                    None => {
663                        path_buf = path.with_extension("stats");
664                        (
665                            Some((path, entry)),
666                            cache_files.get(&path_buf).map(|v| (&path_buf, v)),
667                            true,
668                        )
669                    }
670                };
671
672                // construct a cache entry
673                match (mod_, stats_, is_mod) {
674                    (Some((mod_path, mod_entry)), Some((stats_path, stats_entry)), true) => {
675                        let mod_metadata = unwrap_or!(
676                            mod_entry.metadata(),
677                            add_unrecognized_and!([file: stats_path, file: mod_path], continue),
678                            "Failed to get metadata, deleting BOTH module cache and stats files",
679                            mod_path
680                        );
681                        let stats_mtime = unwrap_or!(
682                            stats_entry.metadata().and_then(|m| m.modified()),
683                            add_unrecognized_and!(
684                                [file: stats_path],
685                                unwrap_or!(
686                                    mod_metadata.modified(),
687                                    add_unrecognized_and!(
688                                        [file: stats_path, file: mod_path],
689                                        continue
690                                    ),
691                                    "Failed to get mtime, deleting BOTH module cache and stats \
692                                     files",
693                                    mod_path
694                                )
695                            ),
696                            "Failed to get metadata/mtime, deleting the file",
697                            stats_path
698                        );
699                        // .into() called for the SystemTimeStub if cfg(test)
700                        vec.push(CacheEntry::Recognized {
701                            path: mod_path.to_path_buf(),
702                            mtime: stats_mtime.into(),
703                            size: mod_metadata.len(),
704                        })
705                    }
706                    (Some(_), Some(_), false) => (), // was or will be handled by previous branch
707                    (Some((mod_path, mod_entry)), None, _) => {
708                        let (mod_metadata, mod_mtime) = unwrap_or!(
709                            mod_entry
710                                .metadata()
711                                .and_then(|md| md.modified().map(|mt| (md, mt))),
712                            add_unrecognized_and!([file: mod_path], continue),
713                            "Failed to get metadata/mtime, deleting the file",
714                            mod_path
715                        );
716                        // .into() called for the SystemTimeStub if cfg(test)
717                        vec.push(CacheEntry::Recognized {
718                            path: mod_path.to_path_buf(),
719                            mtime: mod_mtime.into(),
720                            size: mod_metadata.len(),
721                        })
722                    }
723                    (None, Some((stats_path, _stats_entry)), _) => {
724                        debug!("Found orphaned stats file: {}", stats_path.display());
725                        add_unrecognized!(file: stats_path);
726                    }
727                    _ => unreachable!(),
728                }
729            }
730        }
731
732        let mut vec = Vec::new();
733        enter_dir(
734            &mut vec,
735            self.cache_config.directory(),
736            0,
737            &self.cache_config,
738        );
739        vec
740    }
741}
742
743fn read_stats_file(path: &Path) -> Option<ModuleCacheStatistics> {
744    fs::read_to_string(path)
745        .map_err(|err| {
746            trace!(
747                "Failed to read stats file, path: {}, err: {}",
748                path.display(),
749                err
750            )
751        })
752        .and_then(|contents| {
753            toml::from_str::<ModuleCacheStatistics>(&contents).map_err(|err| {
754                trace!(
755                    "Failed to parse stats file, path: {}, err: {}",
756                    path.display(),
757                    err,
758                )
759            })
760        })
761        .ok()
762}
763
764fn write_stats_file(path: &Path, stats: &ModuleCacheStatistics) -> bool {
765    toml::to_string_pretty(&stats)
766        .map_err(|err| {
767            warn!(
768                "Failed to serialize stats file, path: {}, err: {}",
769                path.display(),
770                err
771            )
772        })
773        .and_then(|serialized| {
774            fs_write_atomic(path, "stats", serialized.as_bytes()).map_err(|_| ())
775        })
776        .is_ok()
777}
778
779/// Tries to acquire a lock for specific task.
780///
781/// Returns Some(path) to the lock if succeeds. The task path must not
782/// contain any extension and have file stem.
783///
784/// To release a lock you need either manually rename or remove it,
785/// or wait until it expires and cleanup task removes it.
786///
787/// Note: this function is racy. Main idea is: be fault tolerant and
788///       never block some task. The price is that we rarely do some task
789///       more than once.
790fn acquire_task_fs_lock(
791    task_path: &Path,
792    timeout: Duration,
793    allowed_future_drift: Duration,
794) -> Option<PathBuf> {
795    assert!(task_path.extension().is_none());
796    assert!(task_path.file_stem().is_some());
797
798    // list directory
799    let dir_path = task_path.parent()?;
800    let it = fs::read_dir(dir_path)
801        .map_err(|err| {
802            warn!(
803                "Failed to list cache directory, path: {}, err: {}",
804                dir_path.display(),
805                err
806            )
807        })
808        .ok()?;
809
810    // look for existing locks
811    for entry in it {
812        let entry = entry
813            .map_err(|err| {
814                warn!(
815                    "Failed to list cache directory, path: {}, err: {}",
816                    dir_path.display(),
817                    err
818                )
819            })
820            .ok()?;
821
822        let path = entry.path();
823        if path.is_dir() || path.file_stem() != task_path.file_stem() {
824            continue;
825        }
826
827        // check extension and mtime
828        match path.extension() {
829            None => continue,
830            Some(ext) => {
831                if let Some(ext_str) = ext.to_str() {
832                    // if it's None, i.e. not valid UTF-8 string, then that's not our lock for sure
833                    if ext_str.starts_with("wip-")
834                        && !is_fs_lock_expired(Some(&entry), &path, timeout, allowed_future_drift)
835                    {
836                        return None;
837                    }
838                }
839            }
840        }
841    }
842
843    // create the lock
844    let lock_path = task_path.with_extension(format!("wip-{}", std::process::id()));
845    let _file = fs::OpenOptions::new()
846        .create_new(true)
847        .write(true)
848        .open(&lock_path)
849        .map_err(|err| {
850            warn!(
851                "Failed to create lock file (note: it shouldn't exists): path: {}, err: {}",
852                lock_path.display(),
853                err
854            )
855        })
856        .ok()?;
857
858    Some(lock_path)
859}
860
861// we have either both, or just path; dir entry is desirable since on some platforms we can get
862// metadata without extra syscalls
863// furthermore: it's better to get a path if we have it instead of allocating a new one from the dir entry
864fn is_fs_lock_expired(
865    entry: Option<&fs::DirEntry>,
866    path: &PathBuf,
867    threshold: Duration,
868    allowed_future_drift: Duration,
869) -> bool {
870    let mtime = match entry
871        .map_or_else(|| path.metadata(), |e| e.metadata())
872        .and_then(|metadata| metadata.modified())
873    {
874        Ok(mt) => mt,
875        Err(err) => {
876            warn!(
877                "Failed to get metadata/mtime, treating as an expired lock, path: {}, err: {}",
878                path.display(),
879                err
880            );
881            return true; // can't read mtime, treat as expired, so this task will not be starved
882        }
883    };
884
885    // DON'T use: mtime.elapsed() -- we must call SystemTime directly for the tests to be deterministic
886    match SystemTime::now().duration_since(mtime) {
887        Ok(elapsed) => elapsed >= threshold,
888        Err(err) => {
889            trace!(
890                "Found mtime in the future, treating as a not expired lock, path: {}, err: {}",
891                path.display(),
892                err
893            );
894            // the lock is expired if the time is too far in the future
895            // it is fine to have network share and not synchronized clocks,
896            // but it's not good when user changes time in their system clock
897            err.duration() > allowed_future_drift
898        }
899    }
900}
901
902#[cfg(test)]
903mod tests;