glean_core/upload/
directory.rs

1// This Source Code Form is subject to the terms of the Mozilla Public
2// License, v. 2.0. If a copy of the MPL was not distributed with this
3// file, You can obtain one at https://mozilla.org/MPL/2.0/.
4
5//! Pings directory processing utilities.
6
7use std::cmp::Ordering;
8use std::fs::{self, File};
9use std::io::{BufRead, BufReader};
10use std::path::{Path, PathBuf};
11
12use malloc_size_of::MallocSizeOf;
13use malloc_size_of_derive::MallocSizeOf;
14use serde::{Deserialize, Serialize};
15use uuid::Uuid;
16
17use super::request::HeaderMap;
18use crate::{DELETION_REQUEST_PINGS_DIRECTORY, PENDING_PINGS_DIRECTORY};
19
20/// A representation of the data extracted from a ping file,
21#[derive(Clone, Debug, Default, MallocSizeOf)]
22pub struct PingPayload {
23    /// The ping's doc_id.
24    pub document_id: String,
25    /// The path to upload the ping to.
26    pub upload_path: String,
27    /// The ping body as JSON-encoded string.
28    pub json_body: String,
29    /// HTTP headers to include in the upload request.
30    pub headers: Option<HeaderMap>,
31    /// Whether the ping body contains {client|ping}_info
32    pub body_has_info_sections: bool,
33    /// The ping's name. (Also likely in the upload_path.)
34    pub ping_name: String,
35    /// The capabilities this ping must be uploaded under.
36    pub uploader_capabilities: Vec<String>,
37}
38
39/// A struct to hold the result of scanning all pings directories.
40#[derive(Clone, Debug, Default)]
41pub struct PingPayloadsByDirectory {
42    pub pending_pings: Vec<(u64, PingPayload)>,
43    pub deletion_request_pings: Vec<(u64, PingPayload)>,
44}
45
46impl MallocSizeOf for PingPayloadsByDirectory {
47    fn size_of(&self, ops: &mut malloc_size_of::MallocSizeOfOps) -> usize {
48        // SAFETY: We own the referenced `Vec`s and can hand out pointers to them
49        // to an external function.
50        let shallow_size = unsafe {
51            ops.malloc_size_of(self.pending_pings.as_ptr())
52                + ops.malloc_size_of(self.deletion_request_pings.as_ptr())
53        };
54
55        let mut n = shallow_size;
56        for elem in self.pending_pings.iter() {
57            n += elem.0.size_of(ops);
58            n += elem.1.size_of(ops);
59        }
60        for elem in self.deletion_request_pings.iter() {
61            n += elem.0.size_of(ops);
62            n += elem.1.size_of(ops);
63        }
64        n
65    }
66}
67
68impl PingPayloadsByDirectory {
69    /// Extends the data of this instance of PingPayloadsByDirectory
70    /// with the data from another instance of PingPayloadsByDirectory.
71    pub fn extend(&mut self, other: PingPayloadsByDirectory) {
72        self.pending_pings.extend(other.pending_pings);
73        self.deletion_request_pings
74            .extend(other.deletion_request_pings);
75    }
76
77    // Get the sum of the number of deletion request and regular pending pings.
78    pub fn len(&self) -> usize {
79        self.pending_pings.len() + self.deletion_request_pings.len()
80    }
81}
82
83/// Gets the file name from a path as a &str.
84///
85/// # Panics
86///
87/// Won't panic if not able to get file name.
88fn get_file_name_as_str(path: &Path) -> Option<&str> {
89    match path.file_name() {
90        None => {
91            log::warn!("Error getting file name from path: {}", path.display());
92            None
93        }
94        Some(file_name) => {
95            let file_name = file_name.to_str();
96            if file_name.is_none() {
97                log::warn!("File name is not valid unicode: {}", path.display());
98            }
99            file_name
100        }
101    }
102}
103
104/// A ping's metadata, as (optionally) represented on disk.
105///
106/// Anything that isn't the upload path or the ping body.
107#[derive(Default, Deserialize, Serialize)]
108pub struct PingMetadata {
109    /// HTTP headers to include when uploading the ping.
110    pub headers: Option<HeaderMap>,
111    /// Whether the body has {client|ping}_info sections.
112    pub body_has_info_sections: Option<bool>,
113    /// The name of the ping.
114    pub ping_name: Option<String>,
115    /// The capabilities this ping must be uploaded under.
116    pub uploader_capabilities: Option<Vec<String>>,
117}
118
119/// Processes a ping's metadata.
120///
121/// The metadata is an optional third line in the ping file,
122/// currently it contains only additonal headers to be added to each ping request.
123/// Therefore, we will process the contents of this line
124/// and return a HeaderMap of the persisted headers.
125pub fn process_metadata(path: &str, metadata: &str) -> Option<PingMetadata> {
126    if let Ok(metadata) = serde_json::from_str::<PingMetadata>(metadata) {
127        return Some(metadata);
128    } else {
129        log::warn!("Error while parsing ping metadata: {}", path);
130    }
131    None
132}
133
134/// Manages the pings directories.
135#[derive(Debug, Clone, MallocSizeOf)]
136pub struct PingDirectoryManager {
137    /// Path to the pending pings directory.
138    pending_pings_dir: PathBuf,
139    /// Path to the deletion-request pings directory.
140    deletion_request_pings_dir: PathBuf,
141}
142
143impl PingDirectoryManager {
144    /// Creates a new directory manager.
145    ///
146    /// # Arguments
147    ///
148    /// * `data_path` - Path to the pending pings directory.
149    pub fn new<P: Into<PathBuf>>(data_path: P) -> Self {
150        let data_path = data_path.into();
151        Self {
152            pending_pings_dir: data_path.join(PENDING_PINGS_DIRECTORY),
153            deletion_request_pings_dir: data_path.join(DELETION_REQUEST_PINGS_DIRECTORY),
154        }
155    }
156
157    /// Attempts to delete a ping file.
158    ///
159    /// # Arguments
160    ///
161    /// * `uuid` - The UUID of the ping file to be deleted
162    ///
163    /// # Returns
164    ///
165    /// Whether the file was successfully deleted.
166    ///
167    /// # Panics
168    ///
169    /// Won't panic if unable to delete the file.
170    pub fn delete_file(&self, uuid: &str) -> bool {
171        let path = match self.get_file_path(uuid) {
172            Some(path) => path,
173            None => {
174                log::warn!("Cannot find ping file to delete {}", uuid);
175                return false;
176            }
177        };
178
179        match fs::remove_file(&path) {
180            Err(e) => {
181                log::warn!("Error deleting file {}. {}", path.display(), e);
182                return false;
183            }
184            _ => log::info!("File was deleted {}", path.display()),
185        };
186
187        true
188    }
189
190    /// Reads a ping file and returns the data from it.
191    ///
192    /// If the file is not properly formatted, it will be deleted and `None` will be returned.
193    ///
194    /// # Arguments
195    ///
196    /// * `document_id` - The UUID of the ping file to be processed
197    pub fn process_file(&self, document_id: &str) -> Option<PingPayload> {
198        let path = match self.get_file_path(document_id) {
199            Some(path) => path,
200            None => {
201                log::warn!("Cannot find ping file to process {}", document_id);
202                return None;
203            }
204        };
205        let file = match File::open(&path) {
206            Ok(file) => file,
207            Err(e) => {
208                log::warn!("Error reading ping file {}. {}", path.display(), e);
209                return None;
210            }
211        };
212
213        log::info!("Processing ping at: {}", path.display());
214
215        // The way the ping file is structured:
216        // first line should always have the path,
217        // second line should have the body with the ping contents in JSON format
218        // and third line might contain ping metadata e.g. additional headers.
219        let mut lines = BufReader::new(file).lines();
220        if let (Some(Ok(path)), Some(Ok(body)), Ok(metadata)) =
221            (lines.next(), lines.next(), lines.next().transpose())
222        {
223            let PingMetadata {
224                headers,
225                body_has_info_sections,
226                ping_name,
227                uploader_capabilities,
228            } = metadata
229                .and_then(|m| process_metadata(&path, &m))
230                .unwrap_or_default();
231            let ping_name =
232                ping_name.unwrap_or_else(|| path.split('/').nth(3).unwrap_or("").into());
233            return Some(PingPayload {
234                document_id: document_id.into(),
235                upload_path: path,
236                json_body: body,
237                headers,
238                body_has_info_sections: body_has_info_sections.unwrap_or(true),
239                ping_name,
240                uploader_capabilities: uploader_capabilities.unwrap_or_default(),
241            });
242        } else {
243            log::warn!(
244                "Error processing ping file: {}. Ping file is not formatted as expected.",
245                document_id
246            );
247        }
248        self.delete_file(document_id);
249        None
250    }
251
252    /// Processes both ping directories.
253    pub fn process_dirs(&self) -> PingPayloadsByDirectory {
254        PingPayloadsByDirectory {
255            pending_pings: self.process_dir(&self.pending_pings_dir),
256            deletion_request_pings: self.process_dir(&self.deletion_request_pings_dir),
257        }
258    }
259
260    /// Processes one of the pings directory and return a vector with the ping data
261    /// corresponding to each valid ping file in the directory.
262    /// This vector will be ordered by file `modified_date`.
263    ///
264    /// Any files that don't match the UUID regex will be deleted
265    /// to prevent files from polluting the pings directory.
266    ///
267    /// # Returns
268    ///
269    /// A vector of tuples with the file size and payload of each ping file in the directory.
270    fn process_dir(&self, dir: &Path) -> Vec<(u64, PingPayload)> {
271        log::trace!("Processing persisted pings.");
272
273        let entries = match dir.read_dir() {
274            Ok(entries) => entries,
275            Err(_) => {
276                // This may error simply because the directory doesn't exist,
277                // which is expected if no pings were stored yet.
278                return Vec::new();
279            }
280        };
281
282        let mut pending_pings: Vec<_> = entries
283            .filter_map(|entry| entry.ok())
284            .filter_map(|entry| {
285                let path = entry.path();
286                if let Some(file_name) = get_file_name_as_str(&path) {
287                    // Delete file if it doesn't match the pattern.
288                    if Uuid::parse_str(file_name).is_err() {
289                        log::warn!("Pattern mismatch. Deleting {}", path.display());
290                        self.delete_file(file_name);
291                        return None;
292                    }
293                    if let Some(data) = self.process_file(file_name) {
294                        let metadata = match fs::metadata(&path) {
295                            Ok(metadata) => metadata,
296                            Err(e) => {
297                                // There's a rare case where this races against a parallel deletion
298                                // of all pending ping files.
299                                // This could therefore fail, in which case we don't care about the
300                                // result and can ignore the ping, it's already been deleted.
301                                log::warn!(
302                                    "Unable to read metadata for file: {}, error: {:?}",
303                                    path.display(),
304                                    e
305                                );
306                                return None;
307                            }
308                        };
309                        return Some((metadata, data));
310                    }
311                };
312                None
313            })
314            .collect();
315
316        // This will sort the pings by date in ascending order (oldest -> newest).
317        pending_pings.sort_by(|(a, _), (b, _)| {
318            // We might not be able to get the modified date for a given file,
319            // in which case we just put it at the end.
320            if let (Ok(a), Ok(b)) = (a.modified(), b.modified()) {
321                a.cmp(&b)
322            } else {
323                Ordering::Less
324            }
325        });
326
327        pending_pings
328            .into_iter()
329            .map(|(metadata, data)| (metadata.len(), data))
330            .collect()
331    }
332
333    /// Gets the path for a ping file based on its document_id.
334    ///
335    /// Will look for files in each ping directory until something is found.
336    /// If nothing is found, returns `None`.
337    fn get_file_path(&self, document_id: &str) -> Option<PathBuf> {
338        for dir in [&self.pending_pings_dir, &self.deletion_request_pings_dir].iter() {
339            let path = dir.join(document_id);
340            if path.exists() {
341                return Some(path);
342            }
343        }
344        None
345    }
346}
347
348#[cfg(test)]
349mod test {
350    use super::*;
351    use crate::metrics::PingType;
352    use crate::tests::new_glean;
353
354    #[test]
355    fn doesnt_panic_if_no_pending_pings_directory() {
356        let dir = tempfile::tempdir().unwrap();
357        let directory_manager = PingDirectoryManager::new(dir.path());
358
359        // Verify that processing the directory didn't panic
360        let data = directory_manager.process_dirs();
361        assert_eq!(data.pending_pings.len(), 0);
362        assert_eq!(data.deletion_request_pings.len(), 0);
363    }
364
365    #[test]
366    fn gets_correct_data_from_valid_ping_file() {
367        let (mut glean, dir) = new_glean(None);
368
369        // Register a ping for testing
370        let ping_type = PingType::new(
371            "test",
372            true,
373            true,
374            true,
375            true,
376            true,
377            vec![],
378            vec![],
379            true,
380            vec![],
381        );
382        glean.register_ping_type(&ping_type);
383
384        // Submit the ping to populate the pending_pings directory
385        ping_type.submit_sync(&glean, None);
386
387        let directory_manager = PingDirectoryManager::new(dir.path());
388
389        // Try and process the pings directories
390        let data = directory_manager.process_dirs();
391
392        // Verify there is just the one request
393        assert_eq!(data.pending_pings.len(), 1);
394        assert_eq!(data.deletion_request_pings.len(), 0);
395
396        // Verify request was returned for the "test" ping
397        let ping = &data.pending_pings[0].1;
398        let request_ping_type = ping.upload_path.split('/').nth(3).unwrap();
399        assert_eq!(request_ping_type, ping.ping_name);
400        assert_eq!(request_ping_type, "test");
401    }
402
403    #[test]
404    fn non_uuid_files_are_deleted_and_ignored() {
405        let (mut glean, dir) = new_glean(None);
406
407        // Register a ping for testing
408        let ping_type = PingType::new(
409            "test",
410            true,
411            true,
412            true,
413            true,
414            true,
415            vec![],
416            vec![],
417            true,
418            vec![],
419        );
420        glean.register_ping_type(&ping_type);
421
422        // Submit the ping to populate the pending_pings directory
423        ping_type.submit_sync(&glean, None);
424
425        let directory_manager = PingDirectoryManager::new(dir.path());
426
427        let not_uuid_path = dir
428            .path()
429            .join(PENDING_PINGS_DIRECTORY)
430            .join("not-uuid-file-name.txt");
431        File::create(&not_uuid_path).unwrap();
432
433        // Try and process the pings directories
434        let data = directory_manager.process_dirs();
435
436        // Verify there is just the one request
437        assert_eq!(data.pending_pings.len(), 1);
438        assert_eq!(data.deletion_request_pings.len(), 0);
439
440        // Verify request was returned for the "test" ping
441        let ping = &data.pending_pings[0].1;
442        let request_ping_type = ping.upload_path.split('/').nth(3).unwrap();
443        assert_eq!(request_ping_type, ping.ping_name);
444        assert_eq!(request_ping_type, "test");
445
446        // Verify that file was indeed deleted
447        assert!(!not_uuid_path.exists());
448    }
449
450    #[test]
451    fn wrongly_formatted_files_are_deleted_and_ignored() {
452        let (mut glean, dir) = new_glean(None);
453
454        // Register a ping for testing
455        let ping_type = PingType::new(
456            "test",
457            true,
458            true,
459            true,
460            true,
461            true,
462            vec![],
463            vec![],
464            true,
465            vec![],
466        );
467        glean.register_ping_type(&ping_type);
468
469        // Submit the ping to populate the pending_pings directory
470        ping_type.submit_sync(&glean, None);
471
472        let directory_manager = PingDirectoryManager::new(dir.path());
473
474        let wrong_contents_file_path = dir
475            .path()
476            .join(PENDING_PINGS_DIRECTORY)
477            .join(Uuid::new_v4().to_string());
478        File::create(&wrong_contents_file_path).unwrap();
479
480        // Try and process the pings directories
481        let data = directory_manager.process_dirs();
482
483        // Verify there is just the one request
484        assert_eq!(data.pending_pings.len(), 1);
485        assert_eq!(data.deletion_request_pings.len(), 0);
486
487        // Verify request was returned for the "test" ping
488        let ping = &data.pending_pings[0].1;
489        let request_ping_type = ping.upload_path.split('/').nth(3).unwrap();
490        assert_eq!(request_ping_type, ping.ping_name);
491        assert_eq!(request_ping_type, "test");
492
493        // Verify that file was indeed deleted
494        assert!(!wrong_contents_file_path.exists());
495    }
496
497    #[test]
498    fn takes_deletion_request_pings_into_account_while_processing() {
499        let (glean, dir) = new_glean(None);
500
501        // Submit a deletion request ping to populate deletion request folder.
502        glean
503            .internal_pings
504            .deletion_request
505            .submit_sync(&glean, None);
506
507        let directory_manager = PingDirectoryManager::new(dir.path());
508
509        // Try and process the pings directories
510        let data = directory_manager.process_dirs();
511
512        assert_eq!(data.pending_pings.len(), 0);
513        assert_eq!(data.deletion_request_pings.len(), 1);
514
515        // Verify request was returned for the "deletion-request" ping
516        let ping = &data.deletion_request_pings[0].1;
517        let request_ping_type = ping.upload_path.split('/').nth(3).unwrap();
518        assert_eq!(request_ping_type, ping.ping_name);
519        assert_eq!(request_ping_type, "deletion-request");
520    }
521}