1use std::io::{Seek, SeekFrom};
2use std::path::{Path, PathBuf};
3use std::sync::atomic::AtomicU64;
4use std::sync::{Arc, Mutex};
5
6use fs4::fs_std::FileExt;
7use once_cell::sync::Lazy;
8use polars_core::config;
9use polars_error::{polars_bail, to_compute_err, PolarsError, PolarsResult};
10
11use super::cache_lock::{self, GLOBAL_FILE_CACHE_LOCK};
12use super::file_fetcher::{FileFetcher, RemoteMetadata};
13use super::file_lock::{FileLock, FileLockAnyGuard};
14use super::metadata::{EntryMetadata, FileVersion};
15use super::utils::update_last_accessed;
16
17pub(super) const DATA_PREFIX: u8 = b'd';
18pub(super) const METADATA_PREFIX: u8 = b'm';
19
20struct CachedData {
21 last_modified: u64,
22 metadata: Arc<EntryMetadata>,
23 data_file_path: PathBuf,
24}
25
26struct Inner {
27 uri: Arc<str>,
28 uri_hash: String,
29 path_prefix: Arc<Path>,
30 metadata: FileLock<PathBuf>,
31 cached_data: Option<CachedData>,
32 ttl: Arc<AtomicU64>,
33 file_fetcher: Arc<dyn FileFetcher>,
34}
35
36struct EntryData {
37 uri: Arc<str>,
38 inner: Mutex<Inner>,
39 ttl: Arc<AtomicU64>,
40}
41
42pub struct FileCacheEntry(EntryData);
43
44impl EntryMetadata {
45 fn matches_remote_metadata(&self, remote_metadata: &RemoteMetadata) -> bool {
46 self.remote_version == remote_metadata.version && self.local_size == remote_metadata.size
47 }
48}
49
50impl Inner {
51 fn try_open_assume_latest(&mut self) -> PolarsResult<std::fs::File> {
52 let verbose = config::verbose();
53
54 {
55 let cache_guard = GLOBAL_FILE_CACHE_LOCK.lock_any();
56 let metadata_file = &mut self.metadata.acquire_exclusive().unwrap();
59 update_last_accessed(metadata_file);
60
61 if let Ok(metadata) = self.try_get_metadata(metadata_file, &cache_guard) {
62 let data_file_path = self.get_cached_data_file_path();
63
64 if metadata.compare_local_state(data_file_path).is_ok() {
65 if verbose {
66 eprintln!("[file_cache::entry] try_open_assume_latest: opening already fetched file for uri = {}", self.uri.clone());
67 }
68 return Ok(finish_open(data_file_path, metadata_file));
69 }
70 }
71 }
72
73 if verbose {
74 eprintln!(
75 "[file_cache::entry] try_open_assume_latest: did not find cached file for uri = {}",
76 self.uri.clone()
77 );
78 }
79
80 self.try_open_check_latest()
81 }
82
83 fn try_open_check_latest(&mut self) -> PolarsResult<std::fs::File> {
84 let verbose = config::verbose();
85 let remote_metadata = &self.file_fetcher.fetch_metadata()?;
86 let cache_guard = GLOBAL_FILE_CACHE_LOCK.lock_any();
87
88 {
89 let metadata_file = &mut self.metadata.acquire_shared().unwrap();
90 update_last_accessed(metadata_file);
91
92 if let Ok(metadata) = self.try_get_metadata(metadata_file, &cache_guard) {
93 if metadata.matches_remote_metadata(remote_metadata) {
94 let data_file_path = self.get_cached_data_file_path();
95
96 if metadata.compare_local_state(data_file_path).is_ok() {
97 if verbose {
98 eprintln!("[file_cache::entry] try_open_check_latest: opening already fetched file for uri = {}", self.uri.clone());
99 }
100 return Ok(finish_open(data_file_path, metadata_file));
101 }
102 }
103 }
104 }
105
106 let metadata_file = &mut self.metadata.acquire_exclusive().unwrap();
107 let metadata = self
108 .try_get_metadata(metadata_file, &cache_guard)
109 .unwrap_or_else(|_| {
111 Arc::new(EntryMetadata::new(
112 self.uri.clone(),
113 self.ttl.load(std::sync::atomic::Ordering::Relaxed),
114 ))
115 });
116
117 if metadata.matches_remote_metadata(remote_metadata) {
118 let data_file_path = self.get_cached_data_file_path();
119
120 if metadata.compare_local_state(data_file_path).is_ok() {
121 if verbose {
122 eprintln!(
123 "[file_cache::entry] try_open_check_latest: opening already fetched file (lost race) for uri = {}",
124 self.uri.clone()
125 );
126 }
127 return Ok(finish_open(data_file_path, metadata_file));
128 }
129 }
130
131 if verbose {
132 eprintln!(
133 "[file_cache::entry] try_open_check_latest: fetching new data file for uri = {}, remote_version = {:?}, remote_size = {}",
134 self.uri.clone(),
135 remote_metadata.version,
136 remote_metadata.size
137 );
138 }
139
140 let data_file_path = &get_data_file_path(
141 self.path_prefix.to_str().unwrap().as_bytes(),
142 self.uri_hash.as_bytes(),
143 &remote_metadata.version,
144 );
145 let _ = std::fs::remove_file(data_file_path);
148 if !self.file_fetcher.fetches_as_symlink() {
149 let file = std::fs::OpenOptions::new()
150 .write(true)
151 .create(true)
152 .truncate(true)
153 .open(data_file_path)
154 .map_err(PolarsError::from)?;
155
156 static RAISE_ALLOC_ERROR: Lazy<Option<bool>> = Lazy::new(|| {
160 let v = match std::env::var("POLARS_IGNORE_FILE_CACHE_ALLOCATE_ERROR").as_deref() {
161 Ok("1") => Some(false),
162 Ok("0") => Some(true),
163 Err(_) => None,
164 Ok(v) => panic!(
165 "invalid value {} for POLARS_IGNORE_FILE_CACHE_ALLOCATE_ERROR",
166 v
167 ),
168 };
169 if config::verbose() {
170 eprintln!("[file_cache]: RAISE_ALLOC_ERROR: {:?}", v);
171 }
172 v
173 });
174
175 let raise_alloc_err = *RAISE_ALLOC_ERROR;
177
178 file.lock_exclusive().unwrap();
179 if let Err(e) = file.allocate(remote_metadata.size) {
180 let msg = format!(
181 "failed to reserve {} bytes on disk to download uri = {}: {:?}",
182 remote_metadata.size,
183 self.uri.as_ref(),
184 e
185 );
186
187 if raise_alloc_err == Some(true)
188 || (raise_alloc_err.is_none() && file.allocate(1).is_ok())
189 {
190 polars_bail!(ComputeError: msg)
191 } else if config::verbose() {
192 eprintln!("[file_cache]: warning: {}", msg)
193 }
194 }
195 }
196 self.file_fetcher.fetch(data_file_path)?;
197
198 #[cfg(target_family = "unix")]
200 if !self.file_fetcher.fetches_as_symlink() {
201 let mut perms = std::fs::metadata(data_file_path.clone())
202 .unwrap()
203 .permissions();
204 perms.set_readonly(true);
205 std::fs::set_permissions(data_file_path, perms).unwrap();
206 }
207
208 let data_file_metadata = std::fs::metadata(data_file_path).unwrap();
209 let local_last_modified = super::utils::last_modified_u64(&data_file_metadata);
210 let local_size = data_file_metadata.len();
211
212 if local_size != remote_metadata.size {
213 polars_bail!(ComputeError: "downloaded file size ({}) does not match expected size ({})", local_size, remote_metadata.size);
214 }
215
216 let mut metadata = metadata;
217 let metadata = Arc::make_mut(&mut metadata);
218 metadata.local_last_modified = local_last_modified;
219 metadata.local_size = local_size;
220 metadata.remote_version = remote_metadata.version.clone();
221
222 if let Err(e) = metadata.compare_local_state(data_file_path) {
223 panic!("metadata mismatch after file fetch: {}", e);
224 }
225
226 let data_file = finish_open(data_file_path, metadata_file);
227
228 metadata_file.set_len(0).unwrap();
229 metadata_file.seek(SeekFrom::Start(0)).unwrap();
230 metadata
231 .try_write(&mut **metadata_file)
232 .map_err(to_compute_err)?;
233
234 Ok(data_file)
235 }
236
237 fn try_get_metadata<F: FileLockAnyGuard>(
240 &mut self,
241 metadata_file: &mut F,
242 _cache_guard: &cache_lock::GlobalFileCacheGuardAny,
243 ) -> PolarsResult<Arc<EntryMetadata>> {
244 let last_modified = super::utils::last_modified_u64(&metadata_file.metadata().unwrap());
245 let ttl = self.ttl.load(std::sync::atomic::Ordering::Relaxed);
246
247 for _ in 0..2 {
248 if let Some(ref cached) = self.cached_data {
249 if cached.last_modified == last_modified {
250 if cached.metadata.ttl != ttl {
251 polars_bail!(ComputeError: "TTL mismatch");
252 }
253
254 if cached.metadata.uri != self.uri {
255 unimplemented!(
256 "hash collision: uri1 = {}, uri2 = {}, hash = {}",
257 cached.metadata.uri,
258 self.uri,
259 self.uri_hash,
260 );
261 }
262
263 return Ok(cached.metadata.clone());
264 }
265 }
266
267 self.cached_data = None;
269
270 let mut metadata =
271 EntryMetadata::try_from_reader(&mut **metadata_file).map_err(to_compute_err)?;
272
273 if metadata.ttl != ttl {
277 if F::IS_EXCLUSIVE {
278 metadata.ttl = ttl;
279 metadata_file.set_len(0).unwrap();
280 metadata_file.seek(SeekFrom::Start(0)).unwrap();
281 metadata
282 .try_write(&mut **metadata_file)
283 .map_err(to_compute_err)?;
284 } else {
285 polars_bail!(ComputeError: "TTL mismatch");
286 }
287 }
288
289 let metadata = Arc::new(metadata);
290 let data_file_path = get_data_file_path(
291 self.path_prefix.to_str().unwrap().as_bytes(),
292 self.uri_hash.as_bytes(),
293 &metadata.remote_version,
294 );
295 self.cached_data = Some(CachedData {
296 last_modified,
297 metadata,
298 data_file_path,
299 });
300 }
301
302 unreachable!();
303 }
304
305 fn get_cached_data_file_path(&self) -> &Path {
308 &self.cached_data.as_ref().unwrap().data_file_path
309 }
310}
311
312impl FileCacheEntry {
313 pub(crate) fn new(
314 uri: Arc<str>,
315 uri_hash: String,
316 path_prefix: Arc<Path>,
317 file_fetcher: Arc<dyn FileFetcher>,
318 file_cache_ttl: u64,
319 ) -> Self {
320 let metadata = FileLock::from(get_metadata_file_path(
321 path_prefix.to_str().unwrap().as_bytes(),
322 uri_hash.as_bytes(),
323 ));
324
325 debug_assert!(
326 Arc::ptr_eq(&uri, file_fetcher.get_uri()),
327 "impl error: entry uri != file_fetcher uri"
328 );
329
330 let ttl = Arc::new(AtomicU64::from(file_cache_ttl));
331
332 Self(EntryData {
333 uri: uri.clone(),
334 inner: Mutex::new(Inner {
335 uri,
336 uri_hash,
337 path_prefix,
338 metadata,
339 cached_data: None,
340 ttl: ttl.clone(),
341 file_fetcher,
342 }),
343 ttl,
344 })
345 }
346
347 pub fn uri(&self) -> &Arc<str> {
348 &self.0.uri
349 }
350
351 pub fn try_open_assume_latest(&self) -> PolarsResult<std::fs::File> {
355 self.0.inner.lock().unwrap().try_open_assume_latest()
356 }
357
358 pub fn try_open_check_latest(&self) -> PolarsResult<std::fs::File> {
361 self.0.inner.lock().unwrap().try_open_check_latest()
362 }
363
364 pub fn update_ttl(&self, ttl: u64) {
365 self.0.ttl.store(ttl, std::sync::atomic::Ordering::Relaxed);
366 }
367}
368
369fn finish_open<F: FileLockAnyGuard>(data_file_path: &Path, _metadata_guard: &F) -> std::fs::File {
370 let file = {
371 #[cfg(not(target_family = "windows"))]
372 {
373 std::fs::OpenOptions::new()
374 .read(true)
375 .open(data_file_path)
376 .unwrap()
377 }
378 #[cfg(target_family = "windows")]
380 {
381 std::fs::OpenOptions::new()
382 .read(true)
383 .write(true)
384 .open(data_file_path)
385 .unwrap()
386 }
387 };
388 update_last_accessed(&file);
389 if FileExt::try_lock_shared(&file).is_err() {
390 panic!(
391 "finish_open: could not acquire shared lock on data file at {}",
392 data_file_path.to_str().unwrap()
393 );
394 }
395 file
396}
397
398fn get_data_file_path(
400 path_prefix: &[u8],
401 uri_hash: &[u8],
402 remote_version: &FileVersion,
403) -> PathBuf {
404 let owned;
405 let path = [
406 path_prefix,
407 &[b'/', DATA_PREFIX, b'/'],
408 uri_hash,
409 match remote_version {
410 FileVersion::Timestamp(v) => {
411 owned = Some(format!("{:013x}", v));
412 owned.as_deref().unwrap()
413 },
414 FileVersion::ETag(v) => v.as_str(),
415 FileVersion::Uninitialized => panic!("impl error: version not initialized"),
416 }
417 .as_bytes(),
418 ]
419 .concat();
420 PathBuf::from(String::from_utf8(path).unwrap())
421}
422
423fn get_metadata_file_path(path_prefix: &[u8], uri_hash: &[u8]) -> PathBuf {
425 let bytes = [path_prefix, &[b'/', METADATA_PREFIX, b'/'], uri_hash].concat();
426 PathBuf::from(String::from_utf8(bytes).unwrap())
427}