1use std::cmp::Ordering;
8use std::fs::{self, File};
9use std::io::{BufRead, BufReader};
10use std::path::{Path, PathBuf};
11
12use serde::{Deserialize, Serialize};
13use uuid::Uuid;
14
15use super::request::HeaderMap;
16use crate::{DELETION_REQUEST_PINGS_DIRECTORY, PENDING_PINGS_DIRECTORY};
17
18#[derive(Clone, Debug, Default)]
20pub struct PingPayload {
21 pub document_id: String,
23 pub upload_path: String,
25 pub json_body: String,
27 pub headers: Option<HeaderMap>,
29 pub body_has_info_sections: bool,
31 pub ping_name: String,
33 pub uploader_capabilities: Vec<String>,
35}
36
37#[derive(Clone, Debug, Default)]
39pub struct PingPayloadsByDirectory {
40 pub pending_pings: Vec<(u64, PingPayload)>,
41 pub deletion_request_pings: Vec<(u64, PingPayload)>,
42}
43
44impl PingPayloadsByDirectory {
45 pub fn extend(&mut self, other: PingPayloadsByDirectory) {
48 self.pending_pings.extend(other.pending_pings);
49 self.deletion_request_pings
50 .extend(other.deletion_request_pings);
51 }
52
53 pub fn len(&self) -> usize {
55 self.pending_pings.len() + self.deletion_request_pings.len()
56 }
57}
58
59fn get_file_name_as_str(path: &Path) -> Option<&str> {
65 match path.file_name() {
66 None => {
67 log::warn!("Error getting file name from path: {}", path.display());
68 None
69 }
70 Some(file_name) => {
71 let file_name = file_name.to_str();
72 if file_name.is_none() {
73 log::warn!("File name is not valid unicode: {}", path.display());
74 }
75 file_name
76 }
77 }
78}
79
80#[derive(Default, Deserialize, Serialize)]
84pub struct PingMetadata {
85 pub headers: Option<HeaderMap>,
87 pub body_has_info_sections: Option<bool>,
89 pub ping_name: Option<String>,
91 pub uploader_capabilities: Option<Vec<String>>,
93}
94
95pub fn process_metadata(path: &str, metadata: &str) -> Option<PingMetadata> {
102 if let Ok(metadata) = serde_json::from_str::<PingMetadata>(metadata) {
103 return Some(metadata);
104 } else {
105 log::warn!("Error while parsing ping metadata: {}", path);
106 }
107 None
108}
109
110#[derive(Debug, Clone)]
112pub struct PingDirectoryManager {
113 pending_pings_dir: PathBuf,
115 deletion_request_pings_dir: PathBuf,
117}
118
119impl PingDirectoryManager {
120 pub fn new<P: Into<PathBuf>>(data_path: P) -> Self {
126 let data_path = data_path.into();
127 Self {
128 pending_pings_dir: data_path.join(PENDING_PINGS_DIRECTORY),
129 deletion_request_pings_dir: data_path.join(DELETION_REQUEST_PINGS_DIRECTORY),
130 }
131 }
132
133 pub fn delete_file(&self, uuid: &str) -> bool {
147 let path = match self.get_file_path(uuid) {
148 Some(path) => path,
149 None => {
150 log::warn!("Cannot find ping file to delete {}", uuid);
151 return false;
152 }
153 };
154
155 match fs::remove_file(&path) {
156 Err(e) => {
157 log::warn!("Error deleting file {}. {}", path.display(), e);
158 return false;
159 }
160 _ => log::info!("File was deleted {}", path.display()),
161 };
162
163 true
164 }
165
166 pub fn process_file(&self, document_id: &str) -> Option<PingPayload> {
174 let path = match self.get_file_path(document_id) {
175 Some(path) => path,
176 None => {
177 log::warn!("Cannot find ping file to process {}", document_id);
178 return None;
179 }
180 };
181 let file = match File::open(&path) {
182 Ok(file) => file,
183 Err(e) => {
184 log::warn!("Error reading ping file {}. {}", path.display(), e);
185 return None;
186 }
187 };
188
189 log::info!("Processing ping at: {}", path.display());
190
191 let mut lines = BufReader::new(file).lines();
196 if let (Some(Ok(path)), Some(Ok(body)), Ok(metadata)) =
197 (lines.next(), lines.next(), lines.next().transpose())
198 {
199 let PingMetadata {
200 headers,
201 body_has_info_sections,
202 ping_name,
203 uploader_capabilities,
204 } = metadata
205 .and_then(|m| process_metadata(&path, &m))
206 .unwrap_or_default();
207 let ping_name =
208 ping_name.unwrap_or_else(|| path.split('/').nth(3).unwrap_or("").into());
209 return Some(PingPayload {
210 document_id: document_id.into(),
211 upload_path: path,
212 json_body: body,
213 headers,
214 body_has_info_sections: body_has_info_sections.unwrap_or(true),
215 ping_name,
216 uploader_capabilities: uploader_capabilities.unwrap_or_default(),
217 });
218 } else {
219 log::warn!(
220 "Error processing ping file: {}. Ping file is not formatted as expected.",
221 document_id
222 );
223 }
224 self.delete_file(document_id);
225 None
226 }
227
228 pub fn process_dirs(&self) -> PingPayloadsByDirectory {
230 PingPayloadsByDirectory {
231 pending_pings: self.process_dir(&self.pending_pings_dir),
232 deletion_request_pings: self.process_dir(&self.deletion_request_pings_dir),
233 }
234 }
235
236 fn process_dir(&self, dir: &Path) -> Vec<(u64, PingPayload)> {
247 log::trace!("Processing persisted pings.");
248
249 let entries = match dir.read_dir() {
250 Ok(entries) => entries,
251 Err(_) => {
252 return Vec::new();
255 }
256 };
257
258 let mut pending_pings: Vec<_> = entries
259 .filter_map(|entry| entry.ok())
260 .filter_map(|entry| {
261 let path = entry.path();
262 if let Some(file_name) = get_file_name_as_str(&path) {
263 if Uuid::parse_str(file_name).is_err() {
265 log::warn!("Pattern mismatch. Deleting {}", path.display());
266 self.delete_file(file_name);
267 return None;
268 }
269 if let Some(data) = self.process_file(file_name) {
270 let metadata = match fs::metadata(&path) {
271 Ok(metadata) => metadata,
272 Err(e) => {
273 log::warn!(
278 "Unable to read metadata for file: {}, error: {:?}",
279 path.display(),
280 e
281 );
282 return None;
283 }
284 };
285 return Some((metadata, data));
286 }
287 };
288 None
289 })
290 .collect();
291
292 pending_pings.sort_by(|(a, _), (b, _)| {
294 if let (Ok(a), Ok(b)) = (a.modified(), b.modified()) {
297 a.cmp(&b)
298 } else {
299 Ordering::Less
300 }
301 });
302
303 pending_pings
304 .into_iter()
305 .map(|(metadata, data)| (metadata.len(), data))
306 .collect()
307 }
308
309 fn get_file_path(&self, document_id: &str) -> Option<PathBuf> {
314 for dir in [&self.pending_pings_dir, &self.deletion_request_pings_dir].iter() {
315 let path = dir.join(document_id);
316 if path.exists() {
317 return Some(path);
318 }
319 }
320 None
321 }
322}
323
324#[cfg(test)]
325mod test {
326 use super::*;
327 use crate::metrics::PingType;
328 use crate::tests::new_glean;
329
330 #[test]
331 fn doesnt_panic_if_no_pending_pings_directory() {
332 let dir = tempfile::tempdir().unwrap();
333 let directory_manager = PingDirectoryManager::new(dir.path());
334
335 let data = directory_manager.process_dirs();
337 assert_eq!(data.pending_pings.len(), 0);
338 assert_eq!(data.deletion_request_pings.len(), 0);
339 }
340
341 #[test]
342 fn gets_correct_data_from_valid_ping_file() {
343 let (mut glean, dir) = new_glean(None);
344
345 let ping_type = PingType::new(
347 "test",
348 true,
349 true,
350 true,
351 true,
352 true,
353 vec![],
354 vec![],
355 true,
356 vec![],
357 );
358 glean.register_ping_type(&ping_type);
359
360 ping_type.submit_sync(&glean, None);
362
363 let directory_manager = PingDirectoryManager::new(dir.path());
364
365 let data = directory_manager.process_dirs();
367
368 assert_eq!(data.pending_pings.len(), 1);
370 assert_eq!(data.deletion_request_pings.len(), 0);
371
372 let ping = &data.pending_pings[0].1;
374 let request_ping_type = ping.upload_path.split('/').nth(3).unwrap();
375 assert_eq!(request_ping_type, ping.ping_name);
376 assert_eq!(request_ping_type, "test");
377 }
378
379 #[test]
380 fn non_uuid_files_are_deleted_and_ignored() {
381 let (mut glean, dir) = new_glean(None);
382
383 let ping_type = PingType::new(
385 "test",
386 true,
387 true,
388 true,
389 true,
390 true,
391 vec![],
392 vec![],
393 true,
394 vec![],
395 );
396 glean.register_ping_type(&ping_type);
397
398 ping_type.submit_sync(&glean, None);
400
401 let directory_manager = PingDirectoryManager::new(dir.path());
402
403 let not_uuid_path = dir
404 .path()
405 .join(PENDING_PINGS_DIRECTORY)
406 .join("not-uuid-file-name.txt");
407 File::create(¬_uuid_path).unwrap();
408
409 let data = directory_manager.process_dirs();
411
412 assert_eq!(data.pending_pings.len(), 1);
414 assert_eq!(data.deletion_request_pings.len(), 0);
415
416 let ping = &data.pending_pings[0].1;
418 let request_ping_type = ping.upload_path.split('/').nth(3).unwrap();
419 assert_eq!(request_ping_type, ping.ping_name);
420 assert_eq!(request_ping_type, "test");
421
422 assert!(!not_uuid_path.exists());
424 }
425
426 #[test]
427 fn wrongly_formatted_files_are_deleted_and_ignored() {
428 let (mut glean, dir) = new_glean(None);
429
430 let ping_type = PingType::new(
432 "test",
433 true,
434 true,
435 true,
436 true,
437 true,
438 vec![],
439 vec![],
440 true,
441 vec![],
442 );
443 glean.register_ping_type(&ping_type);
444
445 ping_type.submit_sync(&glean, None);
447
448 let directory_manager = PingDirectoryManager::new(dir.path());
449
450 let wrong_contents_file_path = dir
451 .path()
452 .join(PENDING_PINGS_DIRECTORY)
453 .join(Uuid::new_v4().to_string());
454 File::create(&wrong_contents_file_path).unwrap();
455
456 let data = directory_manager.process_dirs();
458
459 assert_eq!(data.pending_pings.len(), 1);
461 assert_eq!(data.deletion_request_pings.len(), 0);
462
463 let ping = &data.pending_pings[0].1;
465 let request_ping_type = ping.upload_path.split('/').nth(3).unwrap();
466 assert_eq!(request_ping_type, ping.ping_name);
467 assert_eq!(request_ping_type, "test");
468
469 assert!(!wrong_contents_file_path.exists());
471 }
472
473 #[test]
474 fn takes_deletion_request_pings_into_account_while_processing() {
475 let (glean, dir) = new_glean(None);
476
477 glean
479 .internal_pings
480 .deletion_request
481 .submit_sync(&glean, None);
482
483 let directory_manager = PingDirectoryManager::new(dir.path());
484
485 let data = directory_manager.process_dirs();
487
488 assert_eq!(data.pending_pings.len(), 0);
489 assert_eq!(data.deletion_request_pings.len(), 1);
490
491 let ping = &data.deletion_request_pings[0].1;
493 let request_ping_type = ping.upload_path.split('/').nth(3).unwrap();
494 assert_eq!(request_ping_type, ping.ping_name);
495 assert_eq!(request_ping_type, "deletion-request");
496 }
497}