1use std::collections::HashMap;
2use std::ops::AddAssign;
3use std::time::Duration;
4
5use serde::Serialize;
6use uuid::Uuid;
7
8use super::histogram::CostHistogram;
9use super::histogram::DurationHistogram;
10use super::histogram::ListLengthHistogram;
11use crate::apollo_studio_interop::AggregatedExtendedReferenceStats;
12use crate::apollo_studio_interop::ExtendedReferenceStats;
13use crate::apollo_studio_interop::ReferencedEnums;
14use crate::plugins::telemetry::apollo::LicensedOperationCountByType;
15use crate::plugins::telemetry::apollo_exporter::proto::reports::EnumStats;
16use crate::plugins::telemetry::apollo_exporter::proto::reports::InputFieldStats;
17use crate::plugins::telemetry::apollo_exporter::proto::reports::InputTypeStats;
18use crate::plugins::telemetry::apollo_exporter::proto::reports::QueryMetadata;
19use crate::plugins::telemetry::apollo_exporter::proto::reports::ReferencedFieldsForType;
20use crate::plugins::telemetry::apollo_exporter::proto::reports::StatsContext;
21
22#[derive(Default, Debug, Serialize)]
23pub(crate) struct SingleStatsReport {
24 pub(crate) request_id: Uuid,
25 pub(crate) stats: HashMap<String, SingleStats>,
26 pub(crate) licensed_operation_count_by_type: Option<LicensedOperationCountByType>,
27 pub(crate) router_features_enabled: Vec<String>,
28}
29
30#[derive(Default, Debug, Serialize)]
31pub(crate) struct SingleStats {
32 pub(crate) stats_with_context: SingleContextualizedStats,
33 pub(crate) referenced_fields_by_type: HashMap<String, ReferencedFieldsForType>,
34 pub(crate) query_metadata: Option<QueryMetadata>,
35}
36
37#[derive(Default, Debug, Serialize)]
38pub(crate) struct SingleContextualizedStats {
39 pub(crate) context: StatsContext,
40 pub(crate) query_latency_stats: SingleQueryLatencyStats,
41 pub(crate) limits_stats: SingleLimitsStats,
42 pub(crate) per_type_stat: HashMap<String, SingleTypeStat>,
43 pub(crate) extended_references: ExtendedReferenceStats,
44 pub(crate) enum_response_references: ReferencedEnums,
45 pub(crate) local_per_type_stat: HashMap<String, LocalTypeStat>,
46}
47#[derive(Default, Debug, Serialize)]
49pub(crate) struct SingleQueryLatencyStats {
50 pub(crate) latency: Duration,
51 pub(crate) cache_hit: bool,
52 pub(crate) persisted_query_hit: Option<bool>,
53 pub(crate) cache_latency: Option<Duration>,
54 pub(crate) root_error_stats: SinglePathErrorStats,
55 pub(crate) has_errors: bool,
56 pub(crate) public_cache_ttl_latency: Option<Duration>,
57 pub(crate) private_cache_ttl_latency: Option<Duration>,
58 pub(crate) registered_operation: bool,
59 pub(crate) forbidden_operation: bool,
60 pub(crate) without_field_instrumentation: bool,
61}
62
63#[derive(Default, Debug, Serialize)]
64pub(crate) struct SinglePathErrorStats {
65 pub(crate) children: HashMap<String, SinglePathErrorStats>,
66 pub(crate) errors_count: u64,
67 pub(crate) requests_with_errors_count: u64,
68}
69
70#[derive(Default, Debug, Serialize)]
71pub(crate) struct SingleTypeStat {
72 pub(crate) per_field_stat: HashMap<String, SingleFieldStat>,
73}
74
75#[derive(Default, Debug, Serialize)]
76pub(crate) struct SingleFieldStat {
77 pub(crate) return_type: String,
78 pub(crate) errors_count: u64,
79 pub(crate) requests_with_errors_count: u64,
80
81 pub(crate) observed_execution_count: u64,
85 pub(crate) latency: DurationHistogram<f64>,
86 pub(crate) length: ListLengthHistogram,
87}
88
89#[derive(Clone, Default, Debug, Serialize)]
90pub(crate) struct ContextualizedStats {
91 context: StatsContext,
92 query_latency_stats: QueryLatencyStats,
93 per_type_stat: HashMap<String, TypeStat>,
94 extended_references: AggregatedExtendedReferenceStats,
95 limits_stats: Option<LimitsStats>,
96 local_per_type_stat: HashMap<String, LocalTypeStat>,
97}
98
99impl AddAssign<SingleContextualizedStats> for ContextualizedStats {
100 fn add_assign(&mut self, stats: SingleContextualizedStats) {
101 self.context = stats.context;
102 self.query_latency_stats += stats.query_latency_stats;
103 if let Some(limits_stats) = &mut self.limits_stats {
104 *limits_stats += stats.limits_stats;
105 } else {
106 self.limits_stats = Some(stats.limits_stats.into());
107 }
108 for (k, v) in stats.per_type_stat {
109 *self.per_type_stat.entry(k).or_default() += v;
110 }
111 self.extended_references += stats.extended_references;
112 self.extended_references += stats.enum_response_references;
113 for (k, v) in stats.local_per_type_stat {
114 *self.local_per_type_stat.entry(k).or_default() += v;
115 }
116 }
117}
118
119#[derive(Clone, Default, Debug, Serialize)]
120pub(crate) struct QueryLatencyStats {
121 request_latencies: DurationHistogram,
122 persisted_query_hits: u64,
123 persisted_query_misses: u64,
124 cache_hits: DurationHistogram,
125 root_error_stats: PathErrorStats,
126 requests_with_errors_count: u64,
127 public_cache_ttl_count: DurationHistogram,
128 private_cache_ttl_count: DurationHistogram,
129 registered_operation_count: u64,
130 forbidden_operation_count: u64,
131 requests_without_field_instrumentation: u64,
132}
133
134impl AddAssign<SingleQueryLatencyStats> for QueryLatencyStats {
135 fn add_assign(&mut self, stats: SingleQueryLatencyStats) {
136 self.request_latencies.record(Some(stats.latency), 1);
137 match stats.persisted_query_hit {
138 Some(true) => self.persisted_query_hits += 1,
139 Some(false) => self.persisted_query_misses += 1,
140 None => {}
141 }
142 self.cache_hits.record(stats.cache_latency, 1);
143 self.root_error_stats += stats.root_error_stats;
144 self.requests_with_errors_count += stats.has_errors as u64;
145 self.public_cache_ttl_count
146 .record(stats.public_cache_ttl_latency, 1);
147 self.private_cache_ttl_count
148 .record(stats.private_cache_ttl_latency, 1);
149 self.registered_operation_count += stats.registered_operation as u64;
150 self.forbidden_operation_count += stats.forbidden_operation as u64;
151 self.requests_without_field_instrumentation += stats.without_field_instrumentation as u64;
152 }
153}
154
155#[derive(Clone, Default, Debug, Serialize)]
156pub(crate) struct PathErrorStats {
157 children: HashMap<String, PathErrorStats>,
158 errors_count: u64,
159 requests_with_errors_count: u64,
160}
161
162impl AddAssign<SinglePathErrorStats> for PathErrorStats {
163 fn add_assign(&mut self, stats: SinglePathErrorStats) {
164 for (k, v) in stats.children.into_iter() {
165 *self.children.entry(k).or_default() += v;
166 }
167 self.errors_count += stats.errors_count;
168 self.requests_with_errors_count += stats.requests_with_errors_count;
169 }
170}
171
172#[derive(Clone, Default, Debug, Serialize)]
173pub(crate) struct TypeStat {
174 per_field_stat: HashMap<String, FieldStat>,
175}
176
177impl AddAssign<SingleTypeStat> for TypeStat {
178 fn add_assign(&mut self, stat: SingleTypeStat) {
179 for (k, v) in stat.per_field_stat.into_iter() {
180 *self.per_field_stat.entry(k).or_default() += v;
181 }
182 }
183}
184
185#[derive(Clone, Default, Debug, Serialize)]
186pub(crate) struct FieldStat {
187 return_type: String,
188 errors_count: u64,
189 requests_with_errors_count: u64,
190 observed_execution_count: u64,
191 latency: DurationHistogram<f64>,
195}
196
197impl AddAssign<SingleFieldStat> for FieldStat {
198 fn add_assign(&mut self, stat: SingleFieldStat) {
199 self.latency += stat.latency;
200 self.requests_with_errors_count += stat.requests_with_errors_count;
201 self.observed_execution_count += stat.observed_execution_count;
202 self.errors_count += stat.errors_count;
203 self.return_type = stat.return_type;
204 }
205}
206
207impl From<ContextualizedStats>
208 for crate::plugins::telemetry::apollo_exporter::proto::reports::ContextualizedStats
209{
210 fn from(stats: ContextualizedStats) -> Self {
211 Self {
212 per_type_stat: stats
213 .per_type_stat
214 .into_iter()
215 .map(|(k, v)| (k, v.into()))
216 .collect(),
217 query_latency_stats: Some(stats.query_latency_stats.into()),
218 context: Some(stats.context),
219 limits_stats: stats.limits_stats.map(|ls| ls.into()),
220 local_per_type_stat: stats
221 .local_per_type_stat
222 .into_iter()
223 .map(|(k, v)| (k, v.into()))
224 .collect(),
225 extended_references: if stats.extended_references.is_empty() {
226 None
227 } else {
228 Some(stats.extended_references.into())
229 },
230 operation_count: 0,
231 }
232 }
233}
234
235impl From<QueryLatencyStats>
236 for crate::plugins::telemetry::apollo_exporter::proto::reports::QueryLatencyStats
237{
238 fn from(stats: QueryLatencyStats) -> Self {
239 Self {
240 request_count: stats.request_latencies.total_u64(),
241 latency_count: stats.request_latencies.buckets_to_i64(),
242 cache_hits: stats.cache_hits.total_u64(),
243 cache_latency_count: stats.cache_hits.buckets_to_i64(),
244 persisted_query_hits: stats.persisted_query_hits,
245 persisted_query_misses: stats.persisted_query_misses,
246 root_error_stats: Some(stats.root_error_stats.into()),
247 requests_with_errors_count: stats.requests_with_errors_count,
248 public_cache_ttl_count: stats.public_cache_ttl_count.buckets_to_i64(),
249 private_cache_ttl_count: stats.private_cache_ttl_count.buckets_to_i64(),
250 registered_operation_count: stats.registered_operation_count,
251 forbidden_operation_count: stats.forbidden_operation_count,
252 requests_without_field_instrumentation: stats.requests_without_field_instrumentation,
253 }
254 }
255}
256
257impl From<PathErrorStats>
258 for crate::plugins::telemetry::apollo_exporter::proto::reports::PathErrorStats
259{
260 fn from(stats: PathErrorStats) -> Self {
261 Self {
262 children: stats
263 .children
264 .into_iter()
265 .map(|(k, v)| (k, v.into()))
266 .collect(),
267 errors_count: stats.errors_count,
268 requests_with_errors_count: stats.requests_with_errors_count,
269 }
270 }
271}
272
273impl From<TypeStat> for crate::plugins::telemetry::apollo_exporter::proto::reports::TypeStat {
274 fn from(stat: TypeStat) -> Self {
275 Self {
276 per_field_stat: stat
277 .per_field_stat
278 .into_iter()
279 .map(|(k, v)| (k, v.into()))
280 .collect(),
281 }
282 }
283}
284
285impl From<FieldStat> for crate::plugins::telemetry::apollo_exporter::proto::reports::FieldStat {
286 fn from(stat: FieldStat) -> Self {
287 Self {
288 return_type: stat.return_type,
289 errors_count: stat.errors_count,
290 requests_with_errors_count: stat.requests_with_errors_count,
291
292 observed_execution_count: stat.observed_execution_count,
293 estimated_execution_count: stat.latency.total_u64(),
295 latency_count: stat.latency.buckets_to_i64(),
296 }
297 }
298}
299
300#[derive(Clone, Debug, Serialize)]
301
302pub(crate) struct LimitsStats {
303 strategy: String,
304 cost_estimated: CostHistogram,
305 cost_actual: CostHistogram,
306 depth: u64,
307 height: u64,
308 alias_count: u64,
309 root_field_count: u64,
310}
311
312impl From<LimitsStats> for crate::plugins::telemetry::apollo_exporter::proto::reports::LimitsStats {
313 fn from(value: LimitsStats) -> Self {
314 Self {
315 strategy: value.strategy,
316 max_cost_estimated: value.cost_estimated.max_u64(),
317 cost_estimated: value.cost_estimated.buckets_to_i64(),
318 max_cost_actual: value.cost_actual.max_u64(),
319 cost_actual: value.cost_actual.buckets_to_i64(),
320 depth: value.depth,
321 height: value.height,
322 alias_count: value.alias_count,
323 root_field_count: value.root_field_count,
324 }
325 }
326}
327
328impl AddAssign<SingleLimitsStats> for LimitsStats {
329 fn add_assign(&mut self, rhs: SingleLimitsStats) {
330 if let Some(cost) = rhs.cost_estimated {
331 self.cost_estimated.record(Some(cost), 1.0)
332 }
333
334 if let Some(cost) = rhs.cost_actual {
335 self.cost_actual.record(Some(cost), 1.0)
336 }
337
338 self.height = rhs.height;
342 self.depth = rhs.depth;
343 self.alias_count = rhs.alias_count;
344 self.root_field_count = rhs.root_field_count;
345 }
346}
347
348impl From<SingleLimitsStats> for LimitsStats {
349 fn from(value: SingleLimitsStats) -> Self {
350 let mut cost_estimated = CostHistogram::default();
351 if let Some(cost) = value.cost_estimated {
352 cost_estimated.record(Some(cost), 1.0)
353 }
354
355 let mut cost_actual = CostHistogram::default();
356 if let Some(cost) = value.cost_actual {
357 cost_actual.record(Some(cost), 1.0)
358 }
359
360 Self {
361 strategy: value.strategy.unwrap_or_default(),
362 cost_estimated,
363 cost_actual,
364 depth: value.depth,
365 height: value.height,
366 alias_count: value.alias_count,
367 root_field_count: value.root_field_count,
368 }
369 }
370}
371
372#[derive(Clone, Default, Debug, Serialize)]
373pub(crate) struct SingleLimitsStats {
374 pub(crate) strategy: Option<String>,
375 pub(crate) cost_estimated: Option<f64>,
376 pub(crate) cost_actual: Option<f64>,
377 pub(crate) depth: u64,
378 pub(crate) height: u64,
379 pub(crate) alias_count: u64,
380 pub(crate) root_field_count: u64,
381}
382
383#[derive(Clone, Debug, Default, Serialize)]
384pub(crate) struct LocalTypeStat {
385 pub(crate) local_per_field_stat: HashMap<String, LocalFieldStat>,
386}
387
388impl From<LocalTypeStat>
389 for crate::plugins::telemetry::apollo_exporter::proto::reports::LocalTypeStat
390{
391 fn from(value: LocalTypeStat) -> Self {
392 Self {
393 local_per_field_stat: value
394 .local_per_field_stat
395 .into_iter()
396 .map(|(k, v)| (k, v.into()))
397 .collect(),
398 }
399 }
400}
401
402impl AddAssign<LocalTypeStat> for LocalTypeStat {
403 fn add_assign(&mut self, rhs: LocalTypeStat) {
404 for (k, v) in rhs.local_per_field_stat {
405 *self.local_per_field_stat.entry(k).or_default() += v;
406 }
407 }
408}
409
410#[derive(Clone, Debug, Default, Serialize)]
411pub(crate) struct LocalFieldStat {
412 pub(crate) return_type: String,
413 pub(crate) list_lengths: ListLengthHistogram,
414}
415
416impl From<LocalFieldStat>
417 for crate::plugins::telemetry::apollo_exporter::proto::reports::LocalFieldStat
418{
419 fn from(value: LocalFieldStat) -> Self {
420 Self {
421 return_type: value.return_type,
422 array_size: value.list_lengths.buckets_to_i64(),
423 }
424 }
425}
426
427impl AddAssign<LocalFieldStat> for LocalFieldStat {
428 fn add_assign(&mut self, rhs: LocalFieldStat) {
429 self.return_type = rhs.return_type;
430 self.list_lengths += rhs.list_lengths;
431 }
432}
433
434impl From<AggregatedExtendedReferenceStats>
435 for crate::plugins::telemetry::apollo_exporter::proto::reports::ExtendedReferences
436{
437 fn from(references: AggregatedExtendedReferenceStats) -> Self {
438 Self {
439 input_types: references
440 .referenced_input_fields
441 .into_iter()
442 .map(|(type_name, type_stats)| {
443 (
444 type_name,
445 InputTypeStats {
446 field_names: type_stats
447 .into_iter()
448 .map(|(field_name, field_stats)| {
449 (
450 field_name,
451 InputFieldStats {
452 refs: field_stats.referenced,
453 null_refs: field_stats.null_reference,
454 missing: field_stats.undefined_reference,
455 },
456 )
457 })
458 .collect(),
459 },
460 )
461 })
462 .collect(),
463 enum_values: references
464 .referenced_enums
465 .into_iter()
466 .map(|(k, v)| (k, EnumStats { enum_values: v }))
467 .collect(),
468 }
469 }
470}
471
472#[cfg(test)]
473mod test {
474 use std::collections::HashMap;
475 use std::time::Duration;
476
477 use super::*;
478 use crate::plugins::telemetry::apollo::Report;
479 use crate::query_planner::OperationKind;
480
481 #[test]
482 fn test_aggregation() {
483 let metric_1 = create_test_metric(
484 "client_1",
485 "version_1",
486 "library_name_1",
487 "library_version_1",
488 "report_key_1",
489 );
490 let metric_2 = create_test_metric(
491 "client_1",
492 "version_1",
493 "library_name_1",
494 "library_version_1",
495 "report_key_1",
496 );
497 let aggregated_metrics = Report::new(vec![metric_1, metric_2]);
498 insta::with_settings!({sort_maps => true}, {
499 insta::assert_json_snapshot!(aggregated_metrics);
500 });
501 }
502
503 #[test]
504 fn test_aggregation_grouping() {
505 let metric_1 = create_test_metric(
506 "client_1",
507 "version_1",
508 "library_name_1",
509 "library_version_1",
510 "report_key_1",
511 );
512 let metric_2 = create_test_metric(
513 "client_1",
514 "version_1",
515 "library_name_1",
516 "library_version_1",
517 "report_key_1",
518 );
519 let metric_3 = create_test_metric(
520 "client_2",
521 "version_1",
522 "library_name_2",
523 "library_version_1",
524 "report_key_1",
525 );
526 let metric_4 = create_test_metric(
527 "client_1",
528 "version_2",
529 "library_name_1",
530 "library_version_2",
531 "report_key_1",
532 );
533 let metric_5 = create_test_metric(
534 "client_1",
535 "version_1",
536 "library_name_1",
537 "library_version_1",
538 "report_key_2",
539 );
540 let aggregated_metrics =
541 Report::new(vec![metric_1, metric_2, metric_3, metric_4, metric_5]);
542 assert_eq!(aggregated_metrics.traces_per_query.len(), 2);
543 assert_eq!(
544 aggregated_metrics.traces_per_query["report_key_1"]
545 .stats_with_context
546 .len(),
547 3
548 );
549 assert_eq!(
550 aggregated_metrics.traces_per_query["report_key_2"]
551 .stats_with_context
552 .len(),
553 1
554 );
555 }
556
557 fn create_test_metric(
558 client_name: &str,
559 client_version: &str,
560 library_name: &str,
561 library_version: &str,
562 stats_report_key: &str,
563 ) -> SingleStatsReport {
564 let mut count = Count::default();
568
569 SingleStatsReport {
570 request_id: Uuid::default(),
571 licensed_operation_count_by_type: LicensedOperationCountByType {
572 r#type: OperationKind::Query,
573 subtype: None,
574 licensed_operation_count: count.inc_u64(),
575 }
576 .into(),
577 stats: HashMap::from([(
578 stats_report_key.to_string(),
579 SingleStats {
580 stats_with_context: SingleContextualizedStats {
581 context: StatsContext {
582 result: "".to_string(),
583 client_name: client_name.to_string(),
584 client_version: client_version.to_string(),
585 client_library_name: library_name.to_string(),
586 client_library_version: library_version.to_string(),
587 operation_type: String::new(),
588 operation_subtype: String::new(),
589 },
590 query_latency_stats: SingleQueryLatencyStats {
591 latency: Duration::from_secs(1),
592 cache_hit: true,
593 persisted_query_hit: Some(true),
594 cache_latency: Some(Duration::from_secs(1)),
595 root_error_stats: SinglePathErrorStats {
596 children: HashMap::from([(
597 "path1".to_string(),
598 SinglePathErrorStats {
599 children: HashMap::from([(
600 "path2".to_string(),
601 SinglePathErrorStats {
602 children: Default::default(),
603 errors_count: count.inc_u64(),
604 requests_with_errors_count: count.inc_u64(),
605 },
606 )]),
607 errors_count: count.inc_u64(),
608 requests_with_errors_count: count.inc_u64(),
609 },
610 )]),
611 errors_count: count.inc_u64(),
612 requests_with_errors_count: count.inc_u64(),
613 },
614 has_errors: true,
615 public_cache_ttl_latency: Some(Duration::from_secs(1)),
616 private_cache_ttl_latency: Some(Duration::from_secs(1)),
617 registered_operation: true,
618 forbidden_operation: true,
619 without_field_instrumentation: true,
620 },
621 limits_stats: SingleLimitsStats {
622 strategy: Some("test".to_string()),
623 cost_estimated: Some(10.0),
624 cost_actual: Some(7.0),
625 depth: 2,
626 height: 4,
627 alias_count: 0,
628 root_field_count: 1,
629 },
630 per_type_stat: HashMap::from([
631 (
632 "type1".into(),
633 SingleTypeStat {
634 per_field_stat: HashMap::from([
635 ("field1".into(), field_stat(&mut count)),
636 ("field2".into(), field_stat(&mut count)),
637 ]),
638 },
639 ),
640 (
641 "type2".into(),
642 SingleTypeStat {
643 per_field_stat: HashMap::from([
644 ("field1".into(), field_stat(&mut count)),
645 ("field2".into(), field_stat(&mut count)),
646 ]),
647 },
648 ),
649 ]),
650 extended_references: ExtendedReferenceStats {
651 referenced_input_fields: HashMap::new(),
652 referenced_enums: HashMap::new(),
653 },
654 enum_response_references: HashMap::new(),
655 local_per_type_stat: HashMap::from([
656 ("type1".into(), local_type_stat(&mut count)),
657 ("type2".into(), local_type_stat(&mut count)),
658 ]),
659 },
660 referenced_fields_by_type: HashMap::from([(
661 "type1".into(),
662 ReferencedFieldsForType {
663 field_names: vec!["field1".into(), "field2".into()],
664 is_interface: false,
665 },
666 )]),
667 query_metadata: None,
668 },
669 )]),
670 router_features_enabled: vec![
671 "distributed_apq_cache".to_string(),
672 "entity_cache".to_string(),
673 ],
674 }
675 }
676
677 fn field_stat(count: &mut Count) -> SingleFieldStat {
678 let mut latency = DurationHistogram::default();
679 latency.record(Some(Duration::from_secs(1)), 1.0);
680
681 let mut length = ListLengthHistogram::default();
682 length.record(Some(1), 1);
683
684 SingleFieldStat {
685 return_type: "String".into(),
686 errors_count: count.inc_u64(),
687 observed_execution_count: count.inc_u64(),
688 requests_with_errors_count: count.inc_u64(),
689 latency,
690 length,
691 }
692 }
693
694 fn local_type_stat(count: &mut Count) -> LocalTypeStat {
695 LocalTypeStat {
696 local_per_field_stat: HashMap::from([("field1".into(), local_field_stat(count))]),
697 }
698 }
699
700 fn local_field_stat(count: &mut Count) -> LocalFieldStat {
701 let mut length = ListLengthHistogram::default();
702 length.record(Some(count.inc_u64()), 1);
703
704 LocalFieldStat {
705 return_type: "String".into(),
706 list_lengths: length,
707 }
708 }
709
710 #[derive(Default)]
711 struct Count {
712 count: u64,
713 }
714 impl Count {
715 fn inc_u64(&mut self) -> u64 {
716 self.count += 1;
717 self.count
718 }
719 }
720}