glean_core/ping/
mod.rs

1// This Source Code Form is subject to the terms of the Mozilla Public
2// License, v. 2.0. If a copy of the MPL was not distributed with this
3// file, You can obtain one at https://mozilla.org/MPL/2.0/.
4
5//! Ping collection, assembly & submission.
6
7use std::fs::{self, create_dir_all, File};
8use std::io::{BufRead, BufReader, Write};
9use std::path::{Path, PathBuf};
10
11use log::info;
12use serde_json::{json, Value as JsonValue};
13
14use crate::common_metric_data::{CommonMetricData, Lifetime};
15use crate::metrics::{CounterMetric, DatetimeMetric, Metric, MetricType, PingType, TimeUnit};
16use crate::storage::{StorageManager, INTERNAL_STORAGE};
17use crate::upload::{HeaderMap, PingMetadata};
18use crate::util::{get_iso_time_string, local_now_with_offset};
19use crate::{Glean, Result, DELETION_REQUEST_PINGS_DIRECTORY, PENDING_PINGS_DIRECTORY};
20
21/// Holds everything you need to store or send a ping.
22pub struct Ping<'a> {
23    /// The unique document id.
24    pub doc_id: &'a str,
25    /// The ping's name.
26    pub name: &'a str,
27    /// The path on the server to use when uplaoding this ping.
28    pub url_path: &'a str,
29    /// The payload, including `*_info` fields.
30    pub content: JsonValue,
31    /// The headers to upload with the payload.
32    pub headers: HeaderMap,
33    /// Whether the content contains {client|ping}_info sections.
34    pub includes_info_sections: bool,
35    /// Other pings that should be scheduled when this ping is sent.
36    pub schedules_pings: Vec<String>,
37    /// Capabilities the uploader must have in order to uplaoad this ping.
38    pub uploader_capabilities: Vec<String>,
39}
40
41/// Collect a ping's data, assemble it into its full payload and store it on disk.
42pub struct PingMaker;
43
44fn merge(a: &mut JsonValue, b: &JsonValue) {
45    match (a, b) {
46        (&mut JsonValue::Object(ref mut a), JsonValue::Object(b)) => {
47            for (k, v) in b {
48                merge(a.entry(k.clone()).or_insert(JsonValue::Null), v);
49            }
50        }
51        (a, b) => {
52            *a = b.clone();
53        }
54    }
55}
56
57impl Default for PingMaker {
58    fn default() -> Self {
59        Self::new()
60    }
61}
62
63impl PingMaker {
64    /// Creates a new [`PingMaker`].
65    pub fn new() -> Self {
66        Self
67    }
68
69    /// Gets, and then increments, the sequence number for a given ping.
70    fn get_ping_seq(&self, glean: &Glean, storage_name: &str) -> usize {
71        // Don't attempt to increase sequence number for disabled ping
72        if !glean.is_ping_enabled(storage_name) {
73            return 0;
74        }
75
76        // Sequence numbers are stored as a counter under a name that includes the storage name
77        let seq = CounterMetric::new(CommonMetricData {
78            name: format!("{}#sequence", storage_name),
79            // We don't need a category, the name is already unique
80            category: "".into(),
81            send_in_pings: vec![INTERNAL_STORAGE.into()],
82            lifetime: Lifetime::User,
83            ..Default::default()
84        });
85
86        let current_seq = match StorageManager.snapshot_metric(
87            glean.storage(),
88            INTERNAL_STORAGE,
89            &seq.meta().identifier(glean),
90            seq.meta().inner.lifetime,
91        ) {
92            Some(Metric::Counter(i)) => i,
93            _ => 0,
94        };
95
96        // Increase to next sequence id
97        seq.add_sync(glean, 1);
98
99        current_seq as usize
100    }
101
102    /// Gets the formatted start and end times for this ping and update for the next ping.
103    fn get_start_end_times(
104        &self,
105        glean: &Glean,
106        storage_name: &str,
107        time_unit: TimeUnit,
108    ) -> (String, String) {
109        let start_time = DatetimeMetric::new(
110            CommonMetricData {
111                name: format!("{}#start", storage_name),
112                category: "".into(),
113                send_in_pings: vec![INTERNAL_STORAGE.into()],
114                lifetime: Lifetime::User,
115                ..Default::default()
116            },
117            time_unit,
118        );
119
120        // "start_time" is the time the ping was generated the last time.
121        // If not available, we use the date the Glean object was initialized.
122        let start_time_data = start_time
123            .get_value(glean, INTERNAL_STORAGE)
124            .unwrap_or_else(|| glean.start_time());
125        let end_time_data = local_now_with_offset();
126
127        // Update the start time with the current time.
128        start_time.set_sync_chrono(glean, end_time_data);
129
130        // Format the times.
131        let start_time_data = get_iso_time_string(start_time_data, time_unit);
132        let end_time_data = get_iso_time_string(end_time_data, time_unit);
133        (start_time_data, end_time_data)
134    }
135
136    fn get_ping_info(
137        &self,
138        glean: &Glean,
139        storage_name: &str,
140        reason: Option<&str>,
141        precision: TimeUnit,
142    ) -> JsonValue {
143        let (start_time, end_time) = self.get_start_end_times(glean, storage_name, precision);
144        let mut map = json!({
145            "seq": self.get_ping_seq(glean, storage_name),
146            "start_time": start_time,
147            "end_time": end_time,
148        });
149
150        if let Some(reason) = reason {
151            map.as_object_mut()
152                .unwrap() // safe unwrap, we created the object above
153                .insert("reason".to_string(), JsonValue::String(reason.to_string()));
154        };
155
156        // Get the experiment data, if available.
157        if let Some(experiment_data) =
158            StorageManager.snapshot_experiments_as_json(glean.storage(), INTERNAL_STORAGE)
159        {
160            map.as_object_mut()
161                .unwrap() // safe unwrap, we created the object above
162                .insert("experiments".to_string(), experiment_data);
163        };
164
165        map
166    }
167
168    fn get_client_info(&self, glean: &Glean, include_client_id: bool) -> JsonValue {
169        // Add the "telemetry_sdk_build", which is the glean-core version.
170        let mut map = json!({
171            "telemetry_sdk_build": crate::GLEAN_VERSION,
172        });
173
174        // Flatten the whole thing.
175        if let Some(client_info) =
176            StorageManager.snapshot_as_json(glean.storage(), "glean_client_info", true)
177        {
178            let client_info_obj = client_info.as_object().unwrap(); // safe unwrap, snapshot always returns an object.
179            for (_metric_type, metrics) in client_info_obj {
180                merge(&mut map, metrics);
181            }
182            let map = map.as_object_mut().unwrap(); // safe unwrap, we created the object above.
183            let mut attribution = serde_json::Map::new();
184            let mut distribution = serde_json::Map::new();
185            map.retain(|name, value| {
186                // Only works because we ensure no client_info metric categories contain '.'.
187                let mut split = name.split('.');
188                let category = split.next();
189                let name = split.next();
190                if let (Some(category), Some(name)) = (category, name) {
191                    if category == "attribution" {
192                        attribution.insert(name.into(), value.take());
193                        false
194                    } else if category == "distribution" {
195                        distribution.insert(name.into(), value.take());
196                        false
197                    } else {
198                        true
199                    }
200                } else {
201                    true
202                }
203            });
204            if !attribution.is_empty() {
205                map.insert("attribution".into(), serde_json::Value::from(attribution));
206            }
207            if !distribution.is_empty() {
208                map.insert("distribution".into(), serde_json::Value::from(distribution));
209            }
210        } else {
211            log::warn!("Empty client info data.");
212        }
213
214        if !include_client_id {
215            // safe unwrap, we created the object above
216            map.as_object_mut().unwrap().remove("client_id");
217        }
218
219        json!(map)
220    }
221
222    /// Build the headers to be persisted and sent with a ping.
223    ///
224    /// Currently the only headers we persist are `X-Debug-ID` and `X-Source-Tags`.
225    ///
226    /// # Arguments
227    ///
228    /// * `glean` - the [`Glean`] instance to collect headers from.
229    ///
230    /// # Returns
231    ///
232    /// A map of header names to header values.
233    /// Might be empty if there are no extra headers to send.
234    /// ```
235    fn get_headers(&self, glean: &Glean) -> HeaderMap {
236        let mut headers_map = HeaderMap::new();
237
238        if let Some(debug_view_tag) = glean.debug_view_tag() {
239            headers_map.insert("X-Debug-ID".to_string(), debug_view_tag.to_string());
240        }
241
242        if let Some(source_tags) = glean.source_tags() {
243            headers_map.insert("X-Source-Tags".to_string(), source_tags.join(","));
244        }
245
246        headers_map
247    }
248
249    /// Collects a snapshot for the given ping from storage and attach required meta information.
250    ///
251    /// # Arguments
252    ///
253    /// * `glean` - the [`Glean`] instance to collect data from.
254    /// * `ping` - the ping to collect for.
255    /// * `reason` - an optional reason code to include in the ping.
256    /// * `doc_id` - the ping's unique document identifier.
257    /// * `url_path` - the path on the server to upload this ping to.
258    ///
259    /// # Returns
260    ///
261    /// A fully assembled representation of the ping payload and associated metadata.
262    /// If there is no data stored for the ping, `None` is returned.
263    pub fn collect<'a>(
264        &self,
265        glean: &Glean,
266        ping: &'a PingType,
267        reason: Option<&str>,
268        doc_id: &'a str,
269        url_path: &'a str,
270    ) -> Option<Ping<'a>> {
271        info!("Collecting {}", ping.name());
272        let database = glean.storage();
273
274        // HACK: Only for metrics pings we add the ping timings.
275        // But we want that to persist until the next metrics ping is actually sent.
276        let write_samples = database.write_timings.replace(Vec::with_capacity(64));
277        if !write_samples.is_empty() {
278            glean
279                .database_metrics
280                .write_time
281                .accumulate_samples_sync(glean, &write_samples);
282        }
283
284        let mut metrics_data = StorageManager.snapshot_as_json(database, ping.name(), true);
285
286        let events_data = glean
287            .event_storage()
288            .snapshot_as_json(glean, ping.name(), true);
289
290        // We're adding the metric `glean.ping.uploader_capabilities` the most manual way here.
291        // This avoids creating a `StringListMetric` and further indirection.
292        // It also avoids yet another database write.
293        // It's only added if
294        // (1) There's already data in `metrics` or `events`
295        // (2) or the ping should be sent empty (`send_if_empty=true`)
296        let uploader_capabilities = ping.uploader_capabilities();
297        if !uploader_capabilities.is_empty() {
298            if metrics_data.is_none() && (ping.send_if_empty() || events_data.is_some()) {
299                metrics_data = Some(json!({}))
300            }
301
302            if let Some(map) = metrics_data.as_mut().and_then(|o| o.as_object_mut()) {
303                let lists = map
304                    .entry("string_list")
305                    .or_insert_with(|| json!({}))
306                    .as_object_mut()
307                    .unwrap();
308
309                lists.insert(
310                    "glean.ping.uploader_capabilities".to_string(),
311                    json!(uploader_capabilities),
312                );
313            }
314        }
315
316        // Due to the way the experimentation identifier could link datasets that are intentionally unlinked,
317        // it will not be included in pings that specifically exclude the Glean client-id, those pings that
318        // should not be sent if empty, or pings that exclude the {client|ping}_info sections wholesale.
319        if (!ping.include_client_id() || !ping.send_if_empty() || !ping.include_info_sections())
320            && glean.test_get_experimentation_id().is_some()
321            && metrics_data.is_some()
322        {
323            // There is a lot of unwrapping here, but that's fine because the `if` conditions above mean that the
324            // experimentation id is present in the metrics.
325            let metrics = metrics_data.as_mut().unwrap().as_object_mut().unwrap();
326            let metrics_count = metrics.len();
327            let strings = metrics.get_mut("string").unwrap().as_object_mut().unwrap();
328            let string_count = strings.len();
329
330            // Handle the send_if_empty case by checking if the experimentation id is the only metric in the data.
331            let empty_payload = events_data.is_none() && metrics_count == 1 && string_count == 1;
332            if !ping.include_client_id() || (!ping.send_if_empty() && empty_payload) {
333                strings.remove("glean.client.annotation.experimentation_id");
334            }
335
336            if strings.is_empty() {
337                metrics.remove("string");
338            }
339
340            if metrics.is_empty() {
341                metrics_data = None;
342            }
343        }
344
345        let is_empty = metrics_data.is_none() && events_data.is_none();
346        if !ping.send_if_empty() && is_empty {
347            info!("Storage for {} empty. Bailing out.", ping.name());
348            return None;
349        } else if ping.name() == "events" && events_data.is_none() {
350            info!("No events for 'events' ping. Bailing out.");
351            return None;
352        } else if is_empty {
353            info!(
354                "Storage for {} empty. Ping will still be sent.",
355                ping.name()
356            );
357        }
358
359        let precision = if ping.precise_timestamps() {
360            TimeUnit::Millisecond
361        } else {
362            TimeUnit::Minute
363        };
364
365        let mut json = if ping.include_info_sections() {
366            let ping_info = self.get_ping_info(glean, ping.name(), reason, precision);
367            let client_info = self.get_client_info(glean, ping.include_client_id());
368
369            json!({
370                "ping_info": ping_info,
371                "client_info": client_info
372            })
373        } else {
374            json!({})
375        };
376
377        let json_obj = json.as_object_mut()?;
378        if let Some(metrics_data) = metrics_data {
379            json_obj.insert("metrics".to_string(), metrics_data);
380        }
381        if let Some(events_data) = events_data {
382            json_obj.insert("events".to_string(), events_data);
383        }
384
385        Some(Ping {
386            content: json,
387            name: ping.name(),
388            doc_id,
389            url_path,
390            headers: self.get_headers(glean),
391            includes_info_sections: ping.include_info_sections(),
392            schedules_pings: ping.schedules_pings().to_vec(),
393            uploader_capabilities: ping.uploader_capabilities().to_vec(),
394        })
395    }
396
397    /// Gets the path to a directory for ping storage.
398    ///
399    /// The directory will be created inside the `data_path`.
400    /// The `pings` directory (and its parents) is created if it does not exist.
401    fn get_pings_dir(&self, data_path: &Path, ping_type: Option<&str>) -> std::io::Result<PathBuf> {
402        // Use a special directory for deletion-request pings
403        let pings_dir = match ping_type {
404            Some("deletion-request") => data_path.join(DELETION_REQUEST_PINGS_DIRECTORY),
405            _ => data_path.join(PENDING_PINGS_DIRECTORY),
406        };
407
408        create_dir_all(&pings_dir)?;
409        Ok(pings_dir)
410    }
411
412    /// Gets path to a directory for temporary storage.
413    ///
414    /// The directory will be created inside the `data_path`.
415    /// The `tmp` directory (and its parents) is created if it does not exist.
416    fn get_tmp_dir(&self, data_path: &Path) -> std::io::Result<PathBuf> {
417        let pings_dir = data_path.join("tmp");
418        create_dir_all(&pings_dir)?;
419        Ok(pings_dir)
420    }
421
422    /// Stores a ping to disk in the pings directory.
423    pub fn store_ping(&self, data_path: &Path, ping: &Ping) -> std::io::Result<()> {
424        let pings_dir = self.get_pings_dir(data_path, Some(ping.name))?;
425        let temp_dir = self.get_tmp_dir(data_path)?;
426
427        // Write to a temporary location and then move when done,
428        // for transactional writes.
429        let temp_ping_path = temp_dir.join(ping.doc_id);
430        let ping_path = pings_dir.join(ping.doc_id);
431
432        log::debug!(
433            "Storing ping '{}' at '{}'",
434            ping.doc_id,
435            ping_path.display()
436        );
437
438        {
439            let mut file = File::create(&temp_ping_path)?;
440            file.write_all(ping.url_path.as_bytes())?;
441            file.write_all(b"\n")?;
442            file.write_all(::serde_json::to_string(&ping.content)?.as_bytes())?;
443            file.write_all(b"\n")?;
444            let metadata = PingMetadata {
445                // We don't actually need to clone the headers except to match PingMetadata's ownership.
446                // But since we're going to write a file to disk in a sec,
447                // and HeaderMaps tend to have only like two things in them, tops,
448                // the cost is bearable.
449                headers: Some(ping.headers.clone()),
450                body_has_info_sections: Some(ping.includes_info_sections),
451                ping_name: Some(ping.name.to_string()),
452                uploader_capabilities: Some(ping.uploader_capabilities.clone()),
453            };
454            file.write_all(::serde_json::to_string(&metadata)?.as_bytes())?;
455        }
456
457        if let Err(e) = std::fs::rename(&temp_ping_path, &ping_path) {
458            log::warn!(
459                "Unable to move '{}' to '{}",
460                temp_ping_path.display(),
461                ping_path.display()
462            );
463            return Err(e);
464        }
465
466        Ok(())
467    }
468
469    /// Clears any pending pings in the queue.
470    pub fn clear_pending_pings(&self, data_path: &Path, ping_names: &[&str]) -> Result<()> {
471        let pings_dir = self.get_pings_dir(data_path, None)?;
472
473        // TODO(bug 1932909): Refactor this into its own function
474        // and share it with `upload::directory`.
475        let entries = pings_dir.read_dir()?;
476        for entry in entries.filter_map(|entry| entry.ok()) {
477            if let Ok(file_type) = entry.file_type() {
478                if !file_type.is_file() {
479                    continue;
480                }
481            } else {
482                continue;
483            }
484
485            let file = match File::open(entry.path()) {
486                Ok(file) => file,
487                Err(_) => {
488                    continue;
489                }
490            };
491
492            let mut lines = BufReader::new(file).lines();
493            if let (Some(Ok(path)), Some(Ok(_body)), Ok(metadata)) =
494                (lines.next(), lines.next(), lines.next().transpose())
495            {
496                let PingMetadata { ping_name, .. } = metadata
497                    .and_then(|m| crate::upload::process_metadata(&path, &m))
498                    .unwrap_or_default();
499                let ping_name =
500                    ping_name.unwrap_or_else(|| path.split('/').nth(3).unwrap_or("").into());
501
502                if ping_names.contains(&&ping_name[..]) {
503                    _ = fs::remove_file(entry.path());
504                }
505            } else {
506                continue;
507            }
508        }
509
510        log::debug!("All pending pings deleted");
511
512        Ok(())
513    }
514}
515
516#[cfg(test)]
517mod test {
518    use super::*;
519    use crate::tests::new_glean;
520
521    #[test]
522    fn sequence_numbers_should_be_reset_when_toggling_uploading() {
523        let (mut glean, _t) = new_glean(None);
524        let ping_maker = PingMaker::new();
525
526        assert_eq!(0, ping_maker.get_ping_seq(&glean, "store1"));
527        assert_eq!(1, ping_maker.get_ping_seq(&glean, "store1"));
528
529        glean.set_upload_enabled(false);
530        assert_eq!(0, ping_maker.get_ping_seq(&glean, "store1"));
531        assert_eq!(0, ping_maker.get_ping_seq(&glean, "store1"));
532
533        glean.set_upload_enabled(true);
534        assert_eq!(0, ping_maker.get_ping_seq(&glean, "store1"));
535        assert_eq!(1, ping_maker.get_ping_seq(&glean, "store1"));
536    }
537}