polars_io/file_cache/
entry.rs

1use std::io::{Seek, SeekFrom};
2use std::path::{Path, PathBuf};
3use std::sync::atomic::AtomicU64;
4use std::sync::{Arc, Mutex};
5
6use fs4::fs_std::FileExt;
7use once_cell::sync::Lazy;
8use polars_core::config;
9use polars_error::{polars_bail, to_compute_err, PolarsError, PolarsResult};
10
11use super::cache_lock::{self, GLOBAL_FILE_CACHE_LOCK};
12use super::file_fetcher::{FileFetcher, RemoteMetadata};
13use super::file_lock::{FileLock, FileLockAnyGuard};
14use super::metadata::{EntryMetadata, FileVersion};
15use super::utils::update_last_accessed;
16
17pub(super) const DATA_PREFIX: u8 = b'd';
18pub(super) const METADATA_PREFIX: u8 = b'm';
19
20struct CachedData {
21    last_modified: u64,
22    metadata: Arc<EntryMetadata>,
23    data_file_path: PathBuf,
24}
25
26struct Inner {
27    uri: Arc<str>,
28    uri_hash: String,
29    path_prefix: Arc<Path>,
30    metadata: FileLock<PathBuf>,
31    cached_data: Option<CachedData>,
32    ttl: Arc<AtomicU64>,
33    file_fetcher: Arc<dyn FileFetcher>,
34}
35
36struct EntryData {
37    uri: Arc<str>,
38    inner: Mutex<Inner>,
39    ttl: Arc<AtomicU64>,
40}
41
42pub struct FileCacheEntry(EntryData);
43
44impl EntryMetadata {
45    fn matches_remote_metadata(&self, remote_metadata: &RemoteMetadata) -> bool {
46        self.remote_version == remote_metadata.version && self.local_size == remote_metadata.size
47    }
48}
49
50impl Inner {
51    fn try_open_assume_latest(&mut self) -> PolarsResult<std::fs::File> {
52        let verbose = config::verbose();
53
54        {
55            let cache_guard = GLOBAL_FILE_CACHE_LOCK.lock_any();
56            // We want to use an exclusive lock here to avoid an API call in the case where only the
57            // local TTL was updated.
58            let metadata_file = &mut self.metadata.acquire_exclusive().unwrap();
59            update_last_accessed(metadata_file);
60
61            if let Ok(metadata) = self.try_get_metadata(metadata_file, &cache_guard) {
62                let data_file_path = self.get_cached_data_file_path();
63
64                if metadata.compare_local_state(data_file_path).is_ok() {
65                    if verbose {
66                        eprintln!("[file_cache::entry] try_open_assume_latest: opening already fetched file for uri = {}", self.uri.clone());
67                    }
68                    return Ok(finish_open(data_file_path, metadata_file));
69                }
70            }
71        }
72
73        if verbose {
74            eprintln!(
75                "[file_cache::entry] try_open_assume_latest: did not find cached file for uri = {}",
76                self.uri.clone()
77            );
78        }
79
80        self.try_open_check_latest()
81    }
82
83    fn try_open_check_latest(&mut self) -> PolarsResult<std::fs::File> {
84        let verbose = config::verbose();
85        let remote_metadata = &self.file_fetcher.fetch_metadata()?;
86        let cache_guard = GLOBAL_FILE_CACHE_LOCK.lock_any();
87
88        {
89            let metadata_file = &mut self.metadata.acquire_shared().unwrap();
90            update_last_accessed(metadata_file);
91
92            if let Ok(metadata) = self.try_get_metadata(metadata_file, &cache_guard) {
93                if metadata.matches_remote_metadata(remote_metadata) {
94                    let data_file_path = self.get_cached_data_file_path();
95
96                    if metadata.compare_local_state(data_file_path).is_ok() {
97                        if verbose {
98                            eprintln!("[file_cache::entry] try_open_check_latest: opening already fetched file for uri = {}", self.uri.clone());
99                        }
100                        return Ok(finish_open(data_file_path, metadata_file));
101                    }
102                }
103            }
104        }
105
106        let metadata_file = &mut self.metadata.acquire_exclusive().unwrap();
107        let metadata = self
108            .try_get_metadata(metadata_file, &cache_guard)
109            // Safety: `metadata_file` is an exclusive guard.
110            .unwrap_or_else(|_| {
111                Arc::new(EntryMetadata::new(
112                    self.uri.clone(),
113                    self.ttl.load(std::sync::atomic::Ordering::Relaxed),
114                ))
115            });
116
117        if metadata.matches_remote_metadata(remote_metadata) {
118            let data_file_path = self.get_cached_data_file_path();
119
120            if metadata.compare_local_state(data_file_path).is_ok() {
121                if verbose {
122                    eprintln!(
123                        "[file_cache::entry] try_open_check_latest: opening already fetched file (lost race) for uri = {}",
124                        self.uri.clone()
125                    );
126                }
127                return Ok(finish_open(data_file_path, metadata_file));
128            }
129        }
130
131        if verbose {
132            eprintln!(
133                "[file_cache::entry] try_open_check_latest: fetching new data file for uri = {}, remote_version = {:?}, remote_size = {}",
134                self.uri.clone(),
135                remote_metadata.version,
136                remote_metadata.size
137            );
138        }
139
140        let data_file_path = &get_data_file_path(
141            self.path_prefix.to_str().unwrap().as_bytes(),
142            self.uri_hash.as_bytes(),
143            &remote_metadata.version,
144        );
145        // Remove the file if it exists, since it doesn't match the metadata.
146        // This could be left from an aborted process.
147        let _ = std::fs::remove_file(data_file_path);
148        if !self.file_fetcher.fetches_as_symlink() {
149            let file = std::fs::OpenOptions::new()
150                .write(true)
151                .create(true)
152                .truncate(true)
153                .open(data_file_path)
154                .map_err(PolarsError::from)?;
155
156            // * Some(true)   => always raise
157            // * Some(false)  => never raise
158            // * None         => do not raise if fallocate() is not permitted, otherwise raise.
159            static RAISE_ALLOC_ERROR: Lazy<Option<bool>> = Lazy::new(|| {
160                let v = match std::env::var("POLARS_IGNORE_FILE_CACHE_ALLOCATE_ERROR").as_deref() {
161                    Ok("1") => Some(false),
162                    Ok("0") => Some(true),
163                    Err(_) => None,
164                    Ok(v) => panic!(
165                        "invalid value {} for POLARS_IGNORE_FILE_CACHE_ALLOCATE_ERROR",
166                        v
167                    ),
168                };
169                if config::verbose() {
170                    eprintln!("[file_cache]: RAISE_ALLOC_ERROR: {:?}", v);
171                }
172                v
173            });
174
175            // Initialize it to get the verbose print
176            let raise_alloc_err = *RAISE_ALLOC_ERROR;
177
178            file.lock_exclusive().unwrap();
179            if let Err(e) = file.allocate(remote_metadata.size) {
180                let msg = format!(
181                    "failed to reserve {} bytes on disk to download uri = {}: {:?}",
182                    remote_metadata.size,
183                    self.uri.as_ref(),
184                    e
185                );
186
187                if raise_alloc_err == Some(true)
188                    || (raise_alloc_err.is_none() && file.allocate(1).is_ok())
189                {
190                    polars_bail!(ComputeError: msg)
191                } else if config::verbose() {
192                    eprintln!("[file_cache]: warning: {}", msg)
193                }
194            }
195        }
196        self.file_fetcher.fetch(data_file_path)?;
197
198        // Don't do this on windows as it will break setting last accessed times.
199        #[cfg(target_family = "unix")]
200        if !self.file_fetcher.fetches_as_symlink() {
201            let mut perms = std::fs::metadata(data_file_path.clone())
202                .unwrap()
203                .permissions();
204            perms.set_readonly(true);
205            std::fs::set_permissions(data_file_path, perms).unwrap();
206        }
207
208        let data_file_metadata = std::fs::metadata(data_file_path).unwrap();
209        let local_last_modified = super::utils::last_modified_u64(&data_file_metadata);
210        let local_size = data_file_metadata.len();
211
212        if local_size != remote_metadata.size {
213            polars_bail!(ComputeError: "downloaded file size ({}) does not match expected size ({})", local_size, remote_metadata.size);
214        }
215
216        let mut metadata = metadata;
217        let metadata = Arc::make_mut(&mut metadata);
218        metadata.local_last_modified = local_last_modified;
219        metadata.local_size = local_size;
220        metadata.remote_version = remote_metadata.version.clone();
221
222        if let Err(e) = metadata.compare_local_state(data_file_path) {
223            panic!("metadata mismatch after file fetch: {}", e);
224        }
225
226        let data_file = finish_open(data_file_path, metadata_file);
227
228        metadata_file.set_len(0).unwrap();
229        metadata_file.seek(SeekFrom::Start(0)).unwrap();
230        metadata
231            .try_write(&mut **metadata_file)
232            .map_err(to_compute_err)?;
233
234        Ok(data_file)
235    }
236
237    /// Try to read the metadata from disk. If `F` is an exclusive guard, this
238    /// will update the TTL stored in the metadata file if it does not match.
239    fn try_get_metadata<F: FileLockAnyGuard>(
240        &mut self,
241        metadata_file: &mut F,
242        _cache_guard: &cache_lock::GlobalFileCacheGuardAny,
243    ) -> PolarsResult<Arc<EntryMetadata>> {
244        let last_modified = super::utils::last_modified_u64(&metadata_file.metadata().unwrap());
245        let ttl = self.ttl.load(std::sync::atomic::Ordering::Relaxed);
246
247        for _ in 0..2 {
248            if let Some(ref cached) = self.cached_data {
249                if cached.last_modified == last_modified {
250                    if cached.metadata.ttl != ttl {
251                        polars_bail!(ComputeError: "TTL mismatch");
252                    }
253
254                    if cached.metadata.uri != self.uri {
255                        unimplemented!(
256                            "hash collision: uri1 = {}, uri2 = {}, hash = {}",
257                            cached.metadata.uri,
258                            self.uri,
259                            self.uri_hash,
260                        );
261                    }
262
263                    return Ok(cached.metadata.clone());
264                }
265            }
266
267            // Ensure cache is unset if read fails
268            self.cached_data = None;
269
270            let mut metadata =
271                EntryMetadata::try_from_reader(&mut **metadata_file).map_err(to_compute_err)?;
272
273            // Note this means if multiple processes on the same system set a
274            // different TTL for the same path, the metadata file will constantly
275            // get overwritten.
276            if metadata.ttl != ttl {
277                if F::IS_EXCLUSIVE {
278                    metadata.ttl = ttl;
279                    metadata_file.set_len(0).unwrap();
280                    metadata_file.seek(SeekFrom::Start(0)).unwrap();
281                    metadata
282                        .try_write(&mut **metadata_file)
283                        .map_err(to_compute_err)?;
284                } else {
285                    polars_bail!(ComputeError: "TTL mismatch");
286                }
287            }
288
289            let metadata = Arc::new(metadata);
290            let data_file_path = get_data_file_path(
291                self.path_prefix.to_str().unwrap().as_bytes(),
292                self.uri_hash.as_bytes(),
293                &metadata.remote_version,
294            );
295            self.cached_data = Some(CachedData {
296                last_modified,
297                metadata,
298                data_file_path,
299            });
300        }
301
302        unreachable!();
303    }
304
305    /// # Panics
306    /// Panics if `self.cached_data` is `None`.
307    fn get_cached_data_file_path(&self) -> &Path {
308        &self.cached_data.as_ref().unwrap().data_file_path
309    }
310}
311
312impl FileCacheEntry {
313    pub(crate) fn new(
314        uri: Arc<str>,
315        uri_hash: String,
316        path_prefix: Arc<Path>,
317        file_fetcher: Arc<dyn FileFetcher>,
318        file_cache_ttl: u64,
319    ) -> Self {
320        let metadata = FileLock::from(get_metadata_file_path(
321            path_prefix.to_str().unwrap().as_bytes(),
322            uri_hash.as_bytes(),
323        ));
324
325        debug_assert!(
326            Arc::ptr_eq(&uri, file_fetcher.get_uri()),
327            "impl error: entry uri != file_fetcher uri"
328        );
329
330        let ttl = Arc::new(AtomicU64::from(file_cache_ttl));
331
332        Self(EntryData {
333            uri: uri.clone(),
334            inner: Mutex::new(Inner {
335                uri,
336                uri_hash,
337                path_prefix,
338                metadata,
339                cached_data: None,
340                ttl: ttl.clone(),
341                file_fetcher,
342            }),
343            ttl,
344        })
345    }
346
347    pub fn uri(&self) -> &Arc<str> {
348        &self.0.uri
349    }
350
351    /// Directly returns the cached file if it finds one without checking if
352    /// there is a newer version on the remote. This does not make any API calls
353    /// if it finds a cached file, otherwise it simply downloads the file.
354    pub fn try_open_assume_latest(&self) -> PolarsResult<std::fs::File> {
355        self.0.inner.lock().unwrap().try_open_assume_latest()
356    }
357
358    /// Returns the cached file after ensuring it is up to date against the remote
359    /// This will always perform at least 1 API call for fetching metadata.
360    pub fn try_open_check_latest(&self) -> PolarsResult<std::fs::File> {
361        self.0.inner.lock().unwrap().try_open_check_latest()
362    }
363
364    pub fn update_ttl(&self, ttl: u64) {
365        self.0.ttl.store(ttl, std::sync::atomic::Ordering::Relaxed);
366    }
367}
368
369fn finish_open<F: FileLockAnyGuard>(data_file_path: &Path, _metadata_guard: &F) -> std::fs::File {
370    let file = {
371        #[cfg(not(target_family = "windows"))]
372        {
373            std::fs::OpenOptions::new()
374                .read(true)
375                .open(data_file_path)
376                .unwrap()
377        }
378        // windows requires write access to update the last accessed time
379        #[cfg(target_family = "windows")]
380        {
381            std::fs::OpenOptions::new()
382                .read(true)
383                .write(true)
384                .open(data_file_path)
385                .unwrap()
386        }
387    };
388    update_last_accessed(&file);
389    if FileExt::try_lock_shared(&file).is_err() {
390        panic!(
391            "finish_open: could not acquire shared lock on data file at {}",
392            data_file_path.to_str().unwrap()
393        );
394    }
395    file
396}
397
398/// `[prefix]/d/[uri hash][last modified]`
399fn get_data_file_path(
400    path_prefix: &[u8],
401    uri_hash: &[u8],
402    remote_version: &FileVersion,
403) -> PathBuf {
404    let owned;
405    let path = [
406        path_prefix,
407        &[b'/', DATA_PREFIX, b'/'],
408        uri_hash,
409        match remote_version {
410            FileVersion::Timestamp(v) => {
411                owned = Some(format!("{:013x}", v));
412                owned.as_deref().unwrap()
413            },
414            FileVersion::ETag(v) => v.as_str(),
415            FileVersion::Uninitialized => panic!("impl error: version not initialized"),
416        }
417        .as_bytes(),
418    ]
419    .concat();
420    PathBuf::from(String::from_utf8(path).unwrap())
421}
422
423/// `[prefix]/m/[uri hash]`
424fn get_metadata_file_path(path_prefix: &[u8], uri_hash: &[u8]) -> PathBuf {
425    let bytes = [path_prefix, &[b'/', METADATA_PREFIX, b'/'], uri_hash].concat();
426    PathBuf::from(String::from_utf8(bytes).unwrap())
427}