glean_core/upload/
directory.rs

1// This Source Code Form is subject to the terms of the Mozilla Public
2// License, v. 2.0. If a copy of the MPL was not distributed with this
3// file, You can obtain one at https://mozilla.org/MPL/2.0/.
4
5//! Pings directory processing utilities.
6
7use std::cmp::Ordering;
8use std::fs::{self, File};
9use std::io::{BufRead, BufReader};
10use std::path::{Path, PathBuf};
11
12use serde::{Deserialize, Serialize};
13use uuid::Uuid;
14
15use super::request::HeaderMap;
16use crate::{DELETION_REQUEST_PINGS_DIRECTORY, PENDING_PINGS_DIRECTORY};
17
18/// A representation of the data extracted from a ping file,
19#[derive(Clone, Debug, Default)]
20pub struct PingPayload {
21    /// The ping's doc_id.
22    pub document_id: String,
23    /// The path to upload the ping to.
24    pub upload_path: String,
25    /// The ping body as JSON-encoded string.
26    pub json_body: String,
27    /// HTTP headers to include in the upload request.
28    pub headers: Option<HeaderMap>,
29    /// Whether the ping body contains {client|ping}_info
30    pub body_has_info_sections: bool,
31    /// The ping's name. (Also likely in the upload_path.)
32    pub ping_name: String,
33    /// The capabilities this ping must be uploaded under.
34    pub uploader_capabilities: Vec<String>,
35}
36
37/// A struct to hold the result of scanning all pings directories.
38#[derive(Clone, Debug, Default)]
39pub struct PingPayloadsByDirectory {
40    pub pending_pings: Vec<(u64, PingPayload)>,
41    pub deletion_request_pings: Vec<(u64, PingPayload)>,
42}
43
44impl PingPayloadsByDirectory {
45    /// Extends the data of this instance of PingPayloadsByDirectory
46    /// with the data from another instance of PingPayloadsByDirectory.
47    pub fn extend(&mut self, other: PingPayloadsByDirectory) {
48        self.pending_pings.extend(other.pending_pings);
49        self.deletion_request_pings
50            .extend(other.deletion_request_pings);
51    }
52
53    // Get the sum of the number of deletion request and regular pending pings.
54    pub fn len(&self) -> usize {
55        self.pending_pings.len() + self.deletion_request_pings.len()
56    }
57}
58
59/// Gets the file name from a path as a &str.
60///
61/// # Panics
62///
63/// Won't panic if not able to get file name.
64fn get_file_name_as_str(path: &Path) -> Option<&str> {
65    match path.file_name() {
66        None => {
67            log::warn!("Error getting file name from path: {}", path.display());
68            None
69        }
70        Some(file_name) => {
71            let file_name = file_name.to_str();
72            if file_name.is_none() {
73                log::warn!("File name is not valid unicode: {}", path.display());
74            }
75            file_name
76        }
77    }
78}
79
80/// A ping's metadata, as (optionally) represented on disk.
81///
82/// Anything that isn't the upload path or the ping body.
83#[derive(Default, Deserialize, Serialize)]
84pub struct PingMetadata {
85    /// HTTP headers to include when uploading the ping.
86    pub headers: Option<HeaderMap>,
87    /// Whether the body has {client|ping}_info sections.
88    pub body_has_info_sections: Option<bool>,
89    /// The name of the ping.
90    pub ping_name: Option<String>,
91    /// The capabilities this ping must be uploaded under.
92    pub uploader_capabilities: Option<Vec<String>>,
93}
94
95/// Processes a ping's metadata.
96///
97/// The metadata is an optional third line in the ping file,
98/// currently it contains only additonal headers to be added to each ping request.
99/// Therefore, we will process the contents of this line
100/// and return a HeaderMap of the persisted headers.
101pub fn process_metadata(path: &str, metadata: &str) -> Option<PingMetadata> {
102    if let Ok(metadata) = serde_json::from_str::<PingMetadata>(metadata) {
103        return Some(metadata);
104    } else {
105        log::warn!("Error while parsing ping metadata: {}", path);
106    }
107    None
108}
109
110/// Manages the pings directories.
111#[derive(Debug, Clone)]
112pub struct PingDirectoryManager {
113    /// Path to the pending pings directory.
114    pending_pings_dir: PathBuf,
115    /// Path to the deletion-request pings directory.
116    deletion_request_pings_dir: PathBuf,
117}
118
119impl PingDirectoryManager {
120    /// Creates a new directory manager.
121    ///
122    /// # Arguments
123    ///
124    /// * `data_path` - Path to the pending pings directory.
125    pub fn new<P: Into<PathBuf>>(data_path: P) -> Self {
126        let data_path = data_path.into();
127        Self {
128            pending_pings_dir: data_path.join(PENDING_PINGS_DIRECTORY),
129            deletion_request_pings_dir: data_path.join(DELETION_REQUEST_PINGS_DIRECTORY),
130        }
131    }
132
133    /// Attempts to delete a ping file.
134    ///
135    /// # Arguments
136    ///
137    /// * `uuid` - The UUID of the ping file to be deleted
138    ///
139    /// # Returns
140    ///
141    /// Whether the file was successfully deleted.
142    ///
143    /// # Panics
144    ///
145    /// Won't panic if unable to delete the file.
146    pub fn delete_file(&self, uuid: &str) -> bool {
147        let path = match self.get_file_path(uuid) {
148            Some(path) => path,
149            None => {
150                log::warn!("Cannot find ping file to delete {}", uuid);
151                return false;
152            }
153        };
154
155        match fs::remove_file(&path) {
156            Err(e) => {
157                log::warn!("Error deleting file {}. {}", path.display(), e);
158                return false;
159            }
160            _ => log::info!("File was deleted {}", path.display()),
161        };
162
163        true
164    }
165
166    /// Reads a ping file and returns the data from it.
167    ///
168    /// If the file is not properly formatted, it will be deleted and `None` will be returned.
169    ///
170    /// # Arguments
171    ///
172    /// * `document_id` - The UUID of the ping file to be processed
173    pub fn process_file(&self, document_id: &str) -> Option<PingPayload> {
174        let path = match self.get_file_path(document_id) {
175            Some(path) => path,
176            None => {
177                log::warn!("Cannot find ping file to process {}", document_id);
178                return None;
179            }
180        };
181        let file = match File::open(&path) {
182            Ok(file) => file,
183            Err(e) => {
184                log::warn!("Error reading ping file {}. {}", path.display(), e);
185                return None;
186            }
187        };
188
189        log::info!("Processing ping at: {}", path.display());
190
191        // The way the ping file is structured:
192        // first line should always have the path,
193        // second line should have the body with the ping contents in JSON format
194        // and third line might contain ping metadata e.g. additional headers.
195        let mut lines = BufReader::new(file).lines();
196        if let (Some(Ok(path)), Some(Ok(body)), Ok(metadata)) =
197            (lines.next(), lines.next(), lines.next().transpose())
198        {
199            let PingMetadata {
200                headers,
201                body_has_info_sections,
202                ping_name,
203                uploader_capabilities,
204            } = metadata
205                .and_then(|m| process_metadata(&path, &m))
206                .unwrap_or_default();
207            let ping_name =
208                ping_name.unwrap_or_else(|| path.split('/').nth(3).unwrap_or("").into());
209            return Some(PingPayload {
210                document_id: document_id.into(),
211                upload_path: path,
212                json_body: body,
213                headers,
214                body_has_info_sections: body_has_info_sections.unwrap_or(true),
215                ping_name,
216                uploader_capabilities: uploader_capabilities.unwrap_or_default(),
217            });
218        } else {
219            log::warn!(
220                "Error processing ping file: {}. Ping file is not formatted as expected.",
221                document_id
222            );
223        }
224        self.delete_file(document_id);
225        None
226    }
227
228    /// Processes both ping directories.
229    pub fn process_dirs(&self) -> PingPayloadsByDirectory {
230        PingPayloadsByDirectory {
231            pending_pings: self.process_dir(&self.pending_pings_dir),
232            deletion_request_pings: self.process_dir(&self.deletion_request_pings_dir),
233        }
234    }
235
236    /// Processes one of the pings directory and return a vector with the ping data
237    /// corresponding to each valid ping file in the directory.
238    /// This vector will be ordered by file `modified_date`.
239    ///
240    /// Any files that don't match the UUID regex will be deleted
241    /// to prevent files from polluting the pings directory.
242    ///
243    /// # Returns
244    ///
245    /// A vector of tuples with the file size and payload of each ping file in the directory.
246    fn process_dir(&self, dir: &Path) -> Vec<(u64, PingPayload)> {
247        log::trace!("Processing persisted pings.");
248
249        let entries = match dir.read_dir() {
250            Ok(entries) => entries,
251            Err(_) => {
252                // This may error simply because the directory doesn't exist,
253                // which is expected if no pings were stored yet.
254                return Vec::new();
255            }
256        };
257
258        let mut pending_pings: Vec<_> = entries
259            .filter_map(|entry| entry.ok())
260            .filter_map(|entry| {
261                let path = entry.path();
262                if let Some(file_name) = get_file_name_as_str(&path) {
263                    // Delete file if it doesn't match the pattern.
264                    if Uuid::parse_str(file_name).is_err() {
265                        log::warn!("Pattern mismatch. Deleting {}", path.display());
266                        self.delete_file(file_name);
267                        return None;
268                    }
269                    if let Some(data) = self.process_file(file_name) {
270                        let metadata = match fs::metadata(&path) {
271                            Ok(metadata) => metadata,
272                            Err(e) => {
273                                // There's a rare case where this races against a parallel deletion
274                                // of all pending ping files.
275                                // This could therefore fail, in which case we don't care about the
276                                // result and can ignore the ping, it's already been deleted.
277                                log::warn!(
278                                    "Unable to read metadata for file: {}, error: {:?}",
279                                    path.display(),
280                                    e
281                                );
282                                return None;
283                            }
284                        };
285                        return Some((metadata, data));
286                    }
287                };
288                None
289            })
290            .collect();
291
292        // This will sort the pings by date in ascending order (oldest -> newest).
293        pending_pings.sort_by(|(a, _), (b, _)| {
294            // We might not be able to get the modified date for a given file,
295            // in which case we just put it at the end.
296            if let (Ok(a), Ok(b)) = (a.modified(), b.modified()) {
297                a.cmp(&b)
298            } else {
299                Ordering::Less
300            }
301        });
302
303        pending_pings
304            .into_iter()
305            .map(|(metadata, data)| (metadata.len(), data))
306            .collect()
307    }
308
309    /// Gets the path for a ping file based on its document_id.
310    ///
311    /// Will look for files in each ping directory until something is found.
312    /// If nothing is found, returns `None`.
313    fn get_file_path(&self, document_id: &str) -> Option<PathBuf> {
314        for dir in [&self.pending_pings_dir, &self.deletion_request_pings_dir].iter() {
315            let path = dir.join(document_id);
316            if path.exists() {
317                return Some(path);
318            }
319        }
320        None
321    }
322}
323
324#[cfg(test)]
325mod test {
326    use super::*;
327    use crate::metrics::PingType;
328    use crate::tests::new_glean;
329
330    #[test]
331    fn doesnt_panic_if_no_pending_pings_directory() {
332        let dir = tempfile::tempdir().unwrap();
333        let directory_manager = PingDirectoryManager::new(dir.path());
334
335        // Verify that processing the directory didn't panic
336        let data = directory_manager.process_dirs();
337        assert_eq!(data.pending_pings.len(), 0);
338        assert_eq!(data.deletion_request_pings.len(), 0);
339    }
340
341    #[test]
342    fn gets_correct_data_from_valid_ping_file() {
343        let (mut glean, dir) = new_glean(None);
344
345        // Register a ping for testing
346        let ping_type = PingType::new(
347            "test",
348            true,
349            true,
350            true,
351            true,
352            true,
353            vec![],
354            vec![],
355            true,
356            vec![],
357        );
358        glean.register_ping_type(&ping_type);
359
360        // Submit the ping to populate the pending_pings directory
361        ping_type.submit_sync(&glean, None);
362
363        let directory_manager = PingDirectoryManager::new(dir.path());
364
365        // Try and process the pings directories
366        let data = directory_manager.process_dirs();
367
368        // Verify there is just the one request
369        assert_eq!(data.pending_pings.len(), 1);
370        assert_eq!(data.deletion_request_pings.len(), 0);
371
372        // Verify request was returned for the "test" ping
373        let ping = &data.pending_pings[0].1;
374        let request_ping_type = ping.upload_path.split('/').nth(3).unwrap();
375        assert_eq!(request_ping_type, ping.ping_name);
376        assert_eq!(request_ping_type, "test");
377    }
378
379    #[test]
380    fn non_uuid_files_are_deleted_and_ignored() {
381        let (mut glean, dir) = new_glean(None);
382
383        // Register a ping for testing
384        let ping_type = PingType::new(
385            "test",
386            true,
387            true,
388            true,
389            true,
390            true,
391            vec![],
392            vec![],
393            true,
394            vec![],
395        );
396        glean.register_ping_type(&ping_type);
397
398        // Submit the ping to populate the pending_pings directory
399        ping_type.submit_sync(&glean, None);
400
401        let directory_manager = PingDirectoryManager::new(dir.path());
402
403        let not_uuid_path = dir
404            .path()
405            .join(PENDING_PINGS_DIRECTORY)
406            .join("not-uuid-file-name.txt");
407        File::create(&not_uuid_path).unwrap();
408
409        // Try and process the pings directories
410        let data = directory_manager.process_dirs();
411
412        // Verify there is just the one request
413        assert_eq!(data.pending_pings.len(), 1);
414        assert_eq!(data.deletion_request_pings.len(), 0);
415
416        // Verify request was returned for the "test" ping
417        let ping = &data.pending_pings[0].1;
418        let request_ping_type = ping.upload_path.split('/').nth(3).unwrap();
419        assert_eq!(request_ping_type, ping.ping_name);
420        assert_eq!(request_ping_type, "test");
421
422        // Verify that file was indeed deleted
423        assert!(!not_uuid_path.exists());
424    }
425
426    #[test]
427    fn wrongly_formatted_files_are_deleted_and_ignored() {
428        let (mut glean, dir) = new_glean(None);
429
430        // Register a ping for testing
431        let ping_type = PingType::new(
432            "test",
433            true,
434            true,
435            true,
436            true,
437            true,
438            vec![],
439            vec![],
440            true,
441            vec![],
442        );
443        glean.register_ping_type(&ping_type);
444
445        // Submit the ping to populate the pending_pings directory
446        ping_type.submit_sync(&glean, None);
447
448        let directory_manager = PingDirectoryManager::new(dir.path());
449
450        let wrong_contents_file_path = dir
451            .path()
452            .join(PENDING_PINGS_DIRECTORY)
453            .join(Uuid::new_v4().to_string());
454        File::create(&wrong_contents_file_path).unwrap();
455
456        // Try and process the pings directories
457        let data = directory_manager.process_dirs();
458
459        // Verify there is just the one request
460        assert_eq!(data.pending_pings.len(), 1);
461        assert_eq!(data.deletion_request_pings.len(), 0);
462
463        // Verify request was returned for the "test" ping
464        let ping = &data.pending_pings[0].1;
465        let request_ping_type = ping.upload_path.split('/').nth(3).unwrap();
466        assert_eq!(request_ping_type, ping.ping_name);
467        assert_eq!(request_ping_type, "test");
468
469        // Verify that file was indeed deleted
470        assert!(!wrong_contents_file_path.exists());
471    }
472
473    #[test]
474    fn takes_deletion_request_pings_into_account_while_processing() {
475        let (glean, dir) = new_glean(None);
476
477        // Submit a deletion request ping to populate deletion request folder.
478        glean
479            .internal_pings
480            .deletion_request
481            .submit_sync(&glean, None);
482
483        let directory_manager = PingDirectoryManager::new(dir.path());
484
485        // Try and process the pings directories
486        let data = directory_manager.process_dirs();
487
488        assert_eq!(data.pending_pings.len(), 0);
489        assert_eq!(data.deletion_request_pings.len(), 1);
490
491        // Verify request was returned for the "deletion-request" ping
492        let ping = &data.deletion_request_pings[0].1;
493        let request_ping_type = ping.upload_path.split('/').nth(3).unwrap();
494        assert_eq!(request_ping_type, ping.ping_name);
495        assert_eq!(request_ping_type, "deletion-request");
496    }
497}