1use std::cmp::Ordering;
8use std::fs::{self, File};
9use std::io::{BufRead, BufReader};
10use std::path::{Path, PathBuf};
11
12use malloc_size_of::MallocSizeOf;
13use malloc_size_of_derive::MallocSizeOf;
14use serde::{Deserialize, Serialize};
15use uuid::Uuid;
16
17use super::request::HeaderMap;
18use crate::{DELETION_REQUEST_PINGS_DIRECTORY, PENDING_PINGS_DIRECTORY};
19
20#[derive(Clone, Debug, Default, MallocSizeOf)]
22pub struct PingPayload {
23 pub document_id: String,
25 pub upload_path: String,
27 pub json_body: String,
29 pub headers: Option<HeaderMap>,
31 pub body_has_info_sections: bool,
33 pub ping_name: String,
35 pub uploader_capabilities: Vec<String>,
37}
38
39#[derive(Clone, Debug, Default)]
41pub struct PingPayloadsByDirectory {
42 pub pending_pings: Vec<(u64, PingPayload)>,
43 pub deletion_request_pings: Vec<(u64, PingPayload)>,
44}
45
46impl MallocSizeOf for PingPayloadsByDirectory {
47 fn size_of(&self, ops: &mut malloc_size_of::MallocSizeOfOps) -> usize {
48 let shallow_size = unsafe {
51 ops.malloc_size_of(self.pending_pings.as_ptr())
52 + ops.malloc_size_of(self.deletion_request_pings.as_ptr())
53 };
54
55 let mut n = shallow_size;
56 for elem in self.pending_pings.iter() {
57 n += elem.0.size_of(ops);
58 n += elem.1.size_of(ops);
59 }
60 for elem in self.deletion_request_pings.iter() {
61 n += elem.0.size_of(ops);
62 n += elem.1.size_of(ops);
63 }
64 n
65 }
66}
67
68impl PingPayloadsByDirectory {
69 pub fn extend(&mut self, other: PingPayloadsByDirectory) {
72 self.pending_pings.extend(other.pending_pings);
73 self.deletion_request_pings
74 .extend(other.deletion_request_pings);
75 }
76
77 pub fn len(&self) -> usize {
79 self.pending_pings.len() + self.deletion_request_pings.len()
80 }
81}
82
83fn get_file_name_as_str(path: &Path) -> Option<&str> {
89 match path.file_name() {
90 None => {
91 log::warn!("Error getting file name from path: {}", path.display());
92 None
93 }
94 Some(file_name) => {
95 let file_name = file_name.to_str();
96 if file_name.is_none() {
97 log::warn!("File name is not valid unicode: {}", path.display());
98 }
99 file_name
100 }
101 }
102}
103
104#[derive(Default, Deserialize, Serialize)]
108pub struct PingMetadata {
109 pub headers: Option<HeaderMap>,
111 pub body_has_info_sections: Option<bool>,
113 pub ping_name: Option<String>,
115 pub uploader_capabilities: Option<Vec<String>>,
117}
118
119pub fn process_metadata(path: &str, metadata: &str) -> Option<PingMetadata> {
126 if let Ok(metadata) = serde_json::from_str::<PingMetadata>(metadata) {
127 return Some(metadata);
128 } else {
129 log::warn!("Error while parsing ping metadata: {}", path);
130 }
131 None
132}
133
134#[derive(Debug, Clone, MallocSizeOf)]
136pub struct PingDirectoryManager {
137 pending_pings_dir: PathBuf,
139 deletion_request_pings_dir: PathBuf,
141}
142
143impl PingDirectoryManager {
144 pub fn new<P: Into<PathBuf>>(data_path: P) -> Self {
150 let data_path = data_path.into();
151 Self {
152 pending_pings_dir: data_path.join(PENDING_PINGS_DIRECTORY),
153 deletion_request_pings_dir: data_path.join(DELETION_REQUEST_PINGS_DIRECTORY),
154 }
155 }
156
157 pub fn delete_file(&self, uuid: &str) -> bool {
171 let path = match self.get_file_path(uuid) {
172 Some(path) => path,
173 None => {
174 log::warn!("Cannot find ping file to delete {}", uuid);
175 return false;
176 }
177 };
178
179 match fs::remove_file(&path) {
180 Err(e) => {
181 log::warn!("Error deleting file {}. {}", path.display(), e);
182 return false;
183 }
184 _ => log::info!("File was deleted {}", path.display()),
185 };
186
187 true
188 }
189
190 pub fn process_file(&self, document_id: &str) -> Option<PingPayload> {
198 let path = match self.get_file_path(document_id) {
199 Some(path) => path,
200 None => {
201 log::warn!("Cannot find ping file to process {}", document_id);
202 return None;
203 }
204 };
205 let file = match File::open(&path) {
206 Ok(file) => file,
207 Err(e) => {
208 log::warn!("Error reading ping file {}. {}", path.display(), e);
209 return None;
210 }
211 };
212
213 log::info!("Processing ping at: {}", path.display());
214
215 let mut lines = BufReader::new(file).lines();
220 if let (Some(Ok(path)), Some(Ok(body)), Ok(metadata)) =
221 (lines.next(), lines.next(), lines.next().transpose())
222 {
223 let PingMetadata {
224 headers,
225 body_has_info_sections,
226 ping_name,
227 uploader_capabilities,
228 } = metadata
229 .and_then(|m| process_metadata(&path, &m))
230 .unwrap_or_default();
231 let ping_name =
232 ping_name.unwrap_or_else(|| path.split('/').nth(3).unwrap_or("").into());
233 return Some(PingPayload {
234 document_id: document_id.into(),
235 upload_path: path,
236 json_body: body,
237 headers,
238 body_has_info_sections: body_has_info_sections.unwrap_or(true),
239 ping_name,
240 uploader_capabilities: uploader_capabilities.unwrap_or_default(),
241 });
242 } else {
243 log::warn!(
244 "Error processing ping file: {}. Ping file is not formatted as expected.",
245 document_id
246 );
247 }
248 self.delete_file(document_id);
249 None
250 }
251
252 pub fn process_dirs(&self) -> PingPayloadsByDirectory {
254 PingPayloadsByDirectory {
255 pending_pings: self.process_dir(&self.pending_pings_dir),
256 deletion_request_pings: self.process_dir(&self.deletion_request_pings_dir),
257 }
258 }
259
260 fn process_dir(&self, dir: &Path) -> Vec<(u64, PingPayload)> {
271 log::trace!("Processing persisted pings.");
272
273 let entries = match dir.read_dir() {
274 Ok(entries) => entries,
275 Err(_) => {
276 return Vec::new();
279 }
280 };
281
282 let mut pending_pings: Vec<_> = entries
283 .filter_map(|entry| entry.ok())
284 .filter_map(|entry| {
285 let path = entry.path();
286 if let Some(file_name) = get_file_name_as_str(&path) {
287 if Uuid::parse_str(file_name).is_err() {
289 log::warn!("Pattern mismatch. Deleting {}", path.display());
290 self.delete_file(file_name);
291 return None;
292 }
293 if let Some(data) = self.process_file(file_name) {
294 let metadata = match fs::metadata(&path) {
295 Ok(metadata) => metadata,
296 Err(e) => {
297 log::warn!(
302 "Unable to read metadata for file: {}, error: {:?}",
303 path.display(),
304 e
305 );
306 return None;
307 }
308 };
309 return Some((metadata, data));
310 }
311 };
312 None
313 })
314 .collect();
315
316 pending_pings.sort_by(|(a, _), (b, _)| {
318 if let (Ok(a), Ok(b)) = (a.modified(), b.modified()) {
321 a.cmp(&b)
322 } else {
323 Ordering::Less
324 }
325 });
326
327 pending_pings
328 .into_iter()
329 .map(|(metadata, data)| (metadata.len(), data))
330 .collect()
331 }
332
333 fn get_file_path(&self, document_id: &str) -> Option<PathBuf> {
338 for dir in [&self.pending_pings_dir, &self.deletion_request_pings_dir].iter() {
339 let path = dir.join(document_id);
340 if path.exists() {
341 return Some(path);
342 }
343 }
344 None
345 }
346}
347
348#[cfg(test)]
349mod test {
350 use super::*;
351 use crate::metrics::PingType;
352 use crate::tests::new_glean;
353
354 #[test]
355 fn doesnt_panic_if_no_pending_pings_directory() {
356 let dir = tempfile::tempdir().unwrap();
357 let directory_manager = PingDirectoryManager::new(dir.path());
358
359 let data = directory_manager.process_dirs();
361 assert_eq!(data.pending_pings.len(), 0);
362 assert_eq!(data.deletion_request_pings.len(), 0);
363 }
364
365 #[test]
366 fn gets_correct_data_from_valid_ping_file() {
367 let (mut glean, dir) = new_glean(None);
368
369 let ping_type = PingType::new(
371 "test",
372 true,
373 true,
374 true,
375 true,
376 true,
377 vec![],
378 vec![],
379 true,
380 vec![],
381 );
382 glean.register_ping_type(&ping_type);
383
384 ping_type.submit_sync(&glean, None);
386
387 let directory_manager = PingDirectoryManager::new(dir.path());
388
389 let data = directory_manager.process_dirs();
391
392 assert_eq!(data.pending_pings.len(), 1);
394 assert_eq!(data.deletion_request_pings.len(), 0);
395
396 let ping = &data.pending_pings[0].1;
398 let request_ping_type = ping.upload_path.split('/').nth(3).unwrap();
399 assert_eq!(request_ping_type, ping.ping_name);
400 assert_eq!(request_ping_type, "test");
401 }
402
403 #[test]
404 fn non_uuid_files_are_deleted_and_ignored() {
405 let (mut glean, dir) = new_glean(None);
406
407 let ping_type = PingType::new(
409 "test",
410 true,
411 true,
412 true,
413 true,
414 true,
415 vec![],
416 vec![],
417 true,
418 vec![],
419 );
420 glean.register_ping_type(&ping_type);
421
422 ping_type.submit_sync(&glean, None);
424
425 let directory_manager = PingDirectoryManager::new(dir.path());
426
427 let not_uuid_path = dir
428 .path()
429 .join(PENDING_PINGS_DIRECTORY)
430 .join("not-uuid-file-name.txt");
431 File::create(¬_uuid_path).unwrap();
432
433 let data = directory_manager.process_dirs();
435
436 assert_eq!(data.pending_pings.len(), 1);
438 assert_eq!(data.deletion_request_pings.len(), 0);
439
440 let ping = &data.pending_pings[0].1;
442 let request_ping_type = ping.upload_path.split('/').nth(3).unwrap();
443 assert_eq!(request_ping_type, ping.ping_name);
444 assert_eq!(request_ping_type, "test");
445
446 assert!(!not_uuid_path.exists());
448 }
449
450 #[test]
451 fn wrongly_formatted_files_are_deleted_and_ignored() {
452 let (mut glean, dir) = new_glean(None);
453
454 let ping_type = PingType::new(
456 "test",
457 true,
458 true,
459 true,
460 true,
461 true,
462 vec![],
463 vec![],
464 true,
465 vec![],
466 );
467 glean.register_ping_type(&ping_type);
468
469 ping_type.submit_sync(&glean, None);
471
472 let directory_manager = PingDirectoryManager::new(dir.path());
473
474 let wrong_contents_file_path = dir
475 .path()
476 .join(PENDING_PINGS_DIRECTORY)
477 .join(Uuid::new_v4().to_string());
478 File::create(&wrong_contents_file_path).unwrap();
479
480 let data = directory_manager.process_dirs();
482
483 assert_eq!(data.pending_pings.len(), 1);
485 assert_eq!(data.deletion_request_pings.len(), 0);
486
487 let ping = &data.pending_pings[0].1;
489 let request_ping_type = ping.upload_path.split('/').nth(3).unwrap();
490 assert_eq!(request_ping_type, ping.ping_name);
491 assert_eq!(request_ping_type, "test");
492
493 assert!(!wrong_contents_file_path.exists());
495 }
496
497 #[test]
498 fn takes_deletion_request_pings_into_account_while_processing() {
499 let (glean, dir) = new_glean(None);
500
501 glean
503 .internal_pings
504 .deletion_request
505 .submit_sync(&glean, None);
506
507 let directory_manager = PingDirectoryManager::new(dir.path());
508
509 let data = directory_manager.process_dirs();
511
512 assert_eq!(data.pending_pings.len(), 0);
513 assert_eq!(data.deletion_request_pings.len(), 1);
514
515 let ping = &data.deletion_request_pings[0].1;
517 let request_ping_type = ping.upload_path.split('/').nth(3).unwrap();
518 assert_eq!(request_ping_type, ping.ping_name);
519 assert_eq!(request_ping_type, "deletion-request");
520 }
521}