use crate::{
driver::routing::CosmosEndpoint,
models::{ActivityId, CosmosStatus, RequestCharge, SubStatusCode},
options::{DiagnosticsOptions, DiagnosticsVerbosity, Region},
system::CpuMemoryMonitor,
};
use azure_core::http::StatusCode;
use serde::Serialize;
use std::{
collections::HashMap,
sync::{Arc, OnceLock},
time::{Duration, Instant},
};
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, Serialize)]
#[serde(rename_all = "snake_case")]
#[non_exhaustive]
pub enum ExecutionContext {
Initial,
Retry,
TransportRetry,
Hedging,
RegionFailover,
CircuitBreakerProbe,
}
impl ExecutionContext {
pub fn as_str(&self) -> &'static str {
match self {
ExecutionContext::Initial => "initial",
ExecutionContext::Retry => "retry",
ExecutionContext::TransportRetry => "transport_retry",
ExecutionContext::Hedging => "hedging",
ExecutionContext::RegionFailover => "region_failover",
ExecutionContext::CircuitBreakerProbe => "circuit_breaker_probe",
}
}
}
impl AsRef<str> for ExecutionContext {
fn as_ref(&self) -> &str {
self.as_str()
}
}
impl std::fmt::Display for ExecutionContext {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(self.as_str())
}
}
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, Serialize)]
#[serde(rename_all = "snake_case")]
#[non_exhaustive]
pub enum PipelineType {
Metadata,
DataPlane,
}
impl PipelineType {
pub fn as_str(self) -> &'static str {
match self {
PipelineType::Metadata => "metadata",
PipelineType::DataPlane => "data_plane",
}
}
pub fn is_metadata(self) -> bool {
matches!(self, PipelineType::Metadata)
}
pub fn is_data_plane(self) -> bool {
matches!(self, PipelineType::DataPlane)
}
}
impl std::fmt::Display for PipelineType {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(self.as_str())
}
}
impl AsRef<str> for PipelineType {
fn as_ref(&self) -> &str {
self.as_str()
}
}
#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Hash, Serialize)]
#[serde(rename_all = "snake_case")]
#[non_exhaustive]
pub enum TransportSecurity {
#[default]
Secure,
EmulatorWithInsecureCertificates,
}
#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Hash, Serialize)]
#[serde(rename_all = "snake_case")]
#[non_exhaustive]
pub enum TransportKind {
#[default]
Gateway,
Gateway20,
}
impl TransportKind {
pub fn as_str(self) -> &'static str {
match self {
TransportKind::Gateway => "gateway",
TransportKind::Gateway20 => "gateway20",
}
}
pub fn is_gateway(self) -> bool {
matches!(self, TransportKind::Gateway)
}
pub fn is_gateway20(self) -> bool {
matches!(self, TransportKind::Gateway20)
}
}
impl std::fmt::Display for TransportKind {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(self.as_str())
}
}
impl AsRef<str> for TransportKind {
fn as_ref(&self) -> &str {
self.as_str()
}
}
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, Serialize)]
#[serde(rename_all = "snake_case")]
#[non_exhaustive]
pub enum TransportHttpVersion {
Http11,
Http2,
}
impl TransportHttpVersion {
pub fn as_str(self) -> &'static str {
match self {
TransportHttpVersion::Http11 => "http11",
TransportHttpVersion::Http2 => "http2",
}
}
pub fn is_http11(self) -> bool {
matches!(self, TransportHttpVersion::Http11)
}
pub fn is_http2(self) -> bool {
matches!(self, TransportHttpVersion::Http2)
}
}
impl std::fmt::Display for TransportHttpVersion {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(self.as_str())
}
}
impl AsRef<str> for TransportHttpVersion {
fn as_ref(&self) -> &str {
self.as_str()
}
}
impl TransportSecurity {
pub fn as_str(self) -> &'static str {
match self {
TransportSecurity::Secure => "secure",
TransportSecurity::EmulatorWithInsecureCertificates => "emulator_insecure",
}
}
pub fn is_secure(self) -> bool {
matches!(self, TransportSecurity::Secure)
}
pub fn is_emulator(self) -> bool {
matches!(self, TransportSecurity::EmulatorWithInsecureCertificates)
}
}
impl std::fmt::Display for TransportSecurity {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(self.as_str())
}
}
impl AsRef<str> for TransportSecurity {
fn as_ref(&self) -> &str {
self.as_str()
}
}
#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Serialize)]
#[serde(rename_all = "snake_case")]
#[non_exhaustive]
pub enum RequestSentStatus {
Sent,
NotSent,
#[default]
Unknown,
}
impl RequestSentStatus {
pub fn may_have_been_sent(&self) -> bool {
!matches!(self, RequestSentStatus::NotSent)
}
pub fn definitely_sent(&self) -> bool {
matches!(self, RequestSentStatus::Sent)
}
pub fn definitely_not_sent(&self) -> bool {
matches!(self, RequestSentStatus::NotSent)
}
pub fn as_str(&self) -> &'static str {
match self {
RequestSentStatus::Sent => "sent",
RequestSentStatus::NotSent => "not_sent",
RequestSentStatus::Unknown => "unknown",
}
}
}
impl std::fmt::Display for RequestSentStatus {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(self.as_str())
}
}
impl AsRef<str> for RequestSentStatus {
fn as_ref(&self) -> &str {
self.as_str()
}
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
#[non_exhaustive]
pub struct RequestDiagnostics {
execution_context: ExecutionContext,
pipeline_type: PipelineType,
transport_security: TransportSecurity,
transport_kind: TransportKind,
transport_http_version: TransportHttpVersion,
region: Option<Region>,
endpoint: String,
#[serde(flatten)]
status: CosmosStatus,
pub(crate) request_charge: RequestCharge,
activity_id: Option<ActivityId>,
session_token: Option<String>,
server_duration_ms: Option<crate::models::FiniteF64>,
#[serde(skip)]
started_at: Instant,
#[serde(skip)]
pub(crate) completed_at: Option<Instant>,
duration_ms: u64,
events: Vec<RequestEvent>,
#[serde(skip_serializing_if = "Option::is_none")]
transport_shard: Option<TransportShardDiagnostics>,
#[serde(skip_serializing_if = "Vec::is_empty")]
failed_transport_shards: Vec<FailedTransportShardDiagnostics>,
#[serde(skip_serializing_if = "is_zero_u32")]
local_shard_retry_count: u32,
pub(crate) timed_out: bool,
request_sent: RequestSentStatus,
error: Option<String>,
#[cfg(feature = "fault_injection")]
fault_injection_evaluations: Vec<crate::fault_injection::FaultInjectionEvaluation>,
}
impl RequestDiagnostics {
pub(crate) fn new(
execution_context: ExecutionContext,
pipeline_type: PipelineType,
transport_security: TransportSecurity,
transport_kind: TransportKind,
transport_http_version: TransportHttpVersion,
endpoint: &CosmosEndpoint,
) -> Self {
Self {
execution_context,
pipeline_type,
transport_security,
transport_kind,
transport_http_version,
region: endpoint.region().cloned(),
endpoint: endpoint.url().as_str().to_owned(),
status: CosmosStatus::new(StatusCode::from(0)),
request_charge: RequestCharge::default(),
activity_id: None,
session_token: None,
server_duration_ms: None,
started_at: Instant::now(),
completed_at: None,
duration_ms: 0,
events: Vec::new(),
transport_shard: None,
failed_transport_shards: Vec::new(),
local_shard_retry_count: 0,
timed_out: false,
request_sent: RequestSentStatus::Unknown,
error: None,
#[cfg(feature = "fault_injection")]
fault_injection_evaluations: Vec::new(),
}
}
pub(crate) fn complete(&mut self, status_code: StatusCode, sub_status: Option<SubStatusCode>) {
self.completed_at = Some(Instant::now());
self.status = CosmosStatus::new(status_code);
if let Some(sub_status) = sub_status {
self.with_sub_status(sub_status);
}
self.error = None;
self.timed_out = false;
self.request_sent = RequestSentStatus::Sent;
self.duration_ms = self
.completed_at
.unwrap()
.duration_since(self.started_at)
.as_millis() as u64;
}
pub(crate) fn timeout(&mut self) {
self.completed_at = Some(Instant::now());
self.timed_out = true;
self.status = CosmosStatus::from_parts(
StatusCode::RequestTimeout,
Some(SubStatusCode::CLIENT_OPERATION_TIMEOUT),
);
self.duration_ms = self
.completed_at
.unwrap()
.duration_since(self.started_at)
.as_millis() as u64;
}
pub(crate) fn fail_transport(
&mut self,
error: impl Into<String>,
request_sent: RequestSentStatus,
status: CosmosStatus,
) {
self.completed_at = Some(Instant::now());
self.status = status;
self.with_error(error);
self.request_sent = request_sent;
self.timed_out = false;
self.duration_ms = self
.completed_at
.unwrap()
.duration_since(self.started_at)
.as_millis() as u64;
}
pub(crate) fn with_error(&mut self, error: impl Into<String>) {
self.error = Some(error.into());
}
pub(crate) fn with_sub_status(&mut self, sub_status: SubStatusCode) {
self.status = CosmosStatus::from_parts(self.status.status_code(), Some(sub_status));
}
pub(crate) fn with_charge(&mut self, charge: RequestCharge) {
self.request_charge = charge;
}
pub(crate) fn with_activity_id(&mut self, activity_id: ActivityId) {
self.activity_id = Some(activity_id);
}
pub(crate) fn with_session_token(&mut self, token: String) {
self.session_token = Some(token);
}
pub(crate) fn with_server_duration_ms(&mut self, duration: f64) {
self.server_duration_ms = Some(crate::models::FiniteF64::new_lossy(duration));
}
pub(crate) fn add_event(&mut self, event: RequestEvent) {
self.events.push(event);
}
pub(crate) fn set_transport_shard(&mut self, transport_shard: TransportShardDiagnostics) {
self.transport_shard = Some(transport_shard);
}
pub(crate) fn add_failed_transport_shard(
&mut self,
failed_transport_shard: FailedTransportShardDiagnostics,
) {
self.failed_transport_shards.push(failed_transport_shard);
}
pub(crate) fn increment_local_shard_retry_count(&mut self) {
self.local_shard_retry_count = self.local_shard_retry_count.saturating_add(1);
}
pub(crate) fn is_completed(&self) -> bool {
self.completed_at.is_some()
}
pub fn execution_context(&self) -> ExecutionContext {
self.execution_context
}
pub fn pipeline_type(&self) -> PipelineType {
self.pipeline_type
}
pub fn transport_security(&self) -> TransportSecurity {
self.transport_security
}
pub fn transport_kind(&self) -> TransportKind {
self.transport_kind
}
pub fn transport_http_version(&self) -> TransportHttpVersion {
self.transport_http_version
}
pub fn region(&self) -> Option<&Region> {
self.region.as_ref()
}
pub fn endpoint(&self) -> &str {
&self.endpoint
}
pub fn status(&self) -> &CosmosStatus {
&self.status
}
pub fn request_charge(&self) -> RequestCharge {
self.request_charge
}
pub fn activity_id(&self) -> Option<&ActivityId> {
self.activity_id.as_ref()
}
pub fn session_token(&self) -> Option<&str> {
self.session_token.as_deref()
}
pub fn server_duration_ms(&self) -> Option<f64> {
self.server_duration_ms.map(|f| f.value())
}
pub fn started_at(&self) -> Instant {
self.started_at
}
pub fn completed_at(&self) -> Option<Instant> {
self.completed_at
}
pub fn duration_ms(&self) -> u64 {
self.duration_ms
}
pub fn events(&self) -> &[RequestEvent] {
&self.events
}
pub fn transport_shard(&self) -> Option<&TransportShardDiagnostics> {
self.transport_shard.as_ref()
}
pub fn failed_transport_shards(&self) -> &[FailedTransportShardDiagnostics] {
&self.failed_transport_shards
}
pub fn local_shard_retry_count(&self) -> u32 {
self.local_shard_retry_count
}
pub fn timed_out(&self) -> bool {
self.timed_out
}
pub fn request_sent(&self) -> RequestSentStatus {
self.request_sent
}
pub fn error(&self) -> Option<&str> {
self.error.as_deref()
}
#[cfg(feature = "fault_injection")]
pub fn fault_injection_evaluations(
&self,
) -> &[crate::fault_injection::FaultInjectionEvaluation] {
&self.fault_injection_evaluations
}
#[cfg(feature = "fault_injection")]
pub(crate) fn set_fault_injection_evaluations(
&mut self,
evaluations: Vec<crate::fault_injection::FaultInjectionEvaluation>,
) {
self.fault_injection_evaluations = evaluations;
}
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub struct RequestHandle(usize);
#[derive(Clone, Debug, Serialize, PartialEq, Eq)]
#[serde(rename_all = "snake_case")]
#[non_exhaustive]
pub enum RequestEventType {
TransportStart,
ResponseHeadersReceived,
TransportComplete,
TransportFailed,
}
impl RequestEventType {
pub fn as_str(&self) -> &str {
match self {
Self::TransportStart => "transport_start",
Self::ResponseHeadersReceived => "response_headers_received",
Self::TransportComplete => "transport_complete",
Self::TransportFailed => "transport_failed",
}
}
pub fn indicates_request_sent(&self) -> bool {
matches!(
self,
Self::ResponseHeadersReceived | Self::TransportComplete
)
}
}
impl std::fmt::Display for RequestEventType {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(self.as_str())
}
}
impl AsRef<str> for RequestEventType {
fn as_ref(&self) -> &str {
self.as_str()
}
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
#[non_exhaustive]
pub struct RequestEvent {
event_type: RequestEventType,
#[serde(skip)]
timestamp: Instant,
duration_ms: Option<u64>,
details: Option<String>,
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
#[non_exhaustive]
pub struct TransportShardDiagnostics {
shard_id: u64,
estimated_inflight: u32,
consecutive_failures: u32,
total_requests: u64,
total_failures: u64,
total_cancellations: u64,
marked_for_eviction: bool,
}
impl TransportShardDiagnostics {
pub(crate) fn new(
shard_id: u64,
estimated_inflight: u32,
consecutive_failures: u32,
total_requests: u64,
total_failures: u64,
total_cancellations: u64,
marked_for_eviction: bool,
) -> Self {
Self {
shard_id,
estimated_inflight,
consecutive_failures,
total_requests,
total_failures,
total_cancellations,
marked_for_eviction,
}
}
pub fn shard_id(&self) -> u64 {
self.shard_id
}
pub fn estimated_inflight(&self) -> u32 {
self.estimated_inflight
}
pub fn consecutive_failures(&self) -> u32 {
self.consecutive_failures
}
pub fn total_requests(&self) -> u64 {
self.total_requests
}
pub fn total_failures(&self) -> u64 {
self.total_failures
}
pub fn total_cancellations(&self) -> u64 {
self.total_cancellations
}
pub fn marked_for_eviction(&self) -> bool {
self.marked_for_eviction
}
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
#[non_exhaustive]
pub struct FailedTransportShardDiagnostics {
#[serde(flatten)]
transport_shard: TransportShardDiagnostics,
request_sent: RequestSentStatus,
error: String,
}
impl FailedTransportShardDiagnostics {
pub(crate) fn new(
transport_shard: TransportShardDiagnostics,
request_sent: RequestSentStatus,
error: impl Into<String>,
) -> Self {
Self {
transport_shard,
request_sent,
error: error.into(),
}
}
pub fn transport_shard(&self) -> &TransportShardDiagnostics {
&self.transport_shard
}
pub fn request_sent(&self) -> RequestSentStatus {
self.request_sent
}
pub fn error(&self) -> &str {
&self.error
}
}
fn is_zero_u32(value: &u32) -> bool {
*value == 0
}
impl RequestEvent {
pub fn new(event_type: RequestEventType) -> Self {
Self {
event_type,
timestamp: Instant::now(),
duration_ms: None,
details: None,
}
}
pub fn with_duration(event_type: RequestEventType, duration: Duration) -> Self {
Self {
event_type,
timestamp: Instant::now(),
duration_ms: Some(duration.as_millis() as u64),
details: None,
}
}
pub fn with_details(mut self, details: impl Into<String>) -> Self {
self.details = Some(details.into());
self
}
pub fn event_type(&self) -> &RequestEventType {
&self.event_type
}
pub fn timestamp(&self) -> Instant {
self.timestamp
}
pub fn duration_ms(&self) -> Option<u64> {
self.duration_ms
}
pub fn details(&self) -> Option<&str> {
self.details.as_deref()
}
}
#[derive(Serialize)]
#[serde(untagged)]
enum DiagnosticsPayload<'a> {
Requests { requests: &'a [RequestDiagnostics] },
Summary { regions: Vec<RegionSummary> },
}
#[derive(Serialize)]
struct DiagnosticsOutput<'a> {
activity_id: &'a ActivityId,
total_duration_ms: u64,
total_request_charge: RequestCharge,
request_count: usize,
#[serde(skip_serializing_if = "Option::is_none")]
system_usage: Option<SystemUsageSnapshot>,
#[serde(skip_serializing_if = "Option::is_none")]
machine_id: Option<&'a str>,
#[serde(flatten)]
payload: DiagnosticsPayload<'a>,
}
#[derive(Serialize)]
struct RegionSummary {
region: String,
request_count: usize,
total_request_charge: RequestCharge,
first: Option<RequestSummary>,
last: Option<RequestSummary>,
deduplicated_groups: Vec<DeduplicatedGroup>,
}
#[derive(Serialize)]
struct RequestSummary {
execution_context: ExecutionContext,
endpoint: String,
#[serde(flatten)]
status: CosmosStatus,
request_charge: RequestCharge,
duration_ms: u64,
timed_out: bool,
}
impl From<&RequestDiagnostics> for RequestSummary {
fn from(req: &RequestDiagnostics) -> Self {
Self {
execution_context: req.execution_context,
endpoint: req.endpoint.clone(),
status: req.status,
request_charge: req.request_charge,
duration_ms: req.duration_ms,
timed_out: req.timed_out,
}
}
}
#[derive(Serialize)]
struct DeduplicatedGroup {
endpoint: String,
#[serde(flatten)]
status: CosmosStatus,
execution_context: ExecutionContext,
count: usize,
total_request_charge: RequestCharge,
min_duration_ms: u64,
max_duration_ms: u64,
p50_duration_ms: u64,
}
#[derive(Serialize)]
struct TruncatedOutput<'a> {
activity_id: &'a ActivityId,
total_duration_ms: u64,
request_count: usize,
truncated: bool,
message: &'static str,
}
#[derive(Clone, Debug, Serialize)]
struct SystemUsageSnapshot {
cpu: String,
#[serde(skip_serializing_if = "Option::is_none")]
memory_available_mb: Option<u64>,
processor_count: usize,
cpu_overloaded: bool,
}
impl SystemUsageSnapshot {
fn capture(monitor: &CpuMemoryMonitor) -> Self {
let history = monitor.snapshot();
Self {
cpu: history.to_string(),
memory_available_mb: history.latest_memory_mb(),
processor_count: std::thread::available_parallelism()
.map(|n| n.get())
.unwrap_or(1),
cpu_overloaded: history.is_cpu_overloaded(),
}
}
#[cfg(test)]
fn new_for_test(
cpu: String,
memory_available_mb: Option<u64>,
processor_count: usize,
cpu_overloaded: bool,
) -> Self {
Self {
cpu,
memory_available_mb,
processor_count,
cpu_overloaded,
}
}
}
#[derive(Debug)]
pub(crate) struct DiagnosticsContextBuilder {
activity_id: ActivityId,
started_at: Instant,
requests: Vec<RequestDiagnostics>,
status: Option<CosmosStatus>,
options: Arc<DiagnosticsOptions>,
cpu_monitor: Option<CpuMemoryMonitor>,
machine_id: Option<Arc<String>>,
#[cfg(feature = "fault_injection")]
fault_injection_enabled: bool,
#[cfg(test)]
test_system_usage: Option<SystemUsageSnapshot>,
}
impl DiagnosticsContextBuilder {
pub(crate) fn new(activity_id: ActivityId, options: Arc<DiagnosticsOptions>) -> Self {
Self {
activity_id,
started_at: Instant::now(),
requests: Vec::with_capacity(4), status: None,
options,
cpu_monitor: None,
machine_id: None,
#[cfg(feature = "fault_injection")]
fault_injection_enabled: false,
#[cfg(test)]
test_system_usage: None,
}
}
pub(crate) fn set_cpu_monitor(&mut self, monitor: CpuMemoryMonitor) {
self.cpu_monitor = Some(monitor);
}
pub(crate) fn set_machine_id(&mut self, machine_id: Arc<String>) {
self.machine_id = Some(machine_id);
}
#[cfg(feature = "fault_injection")]
pub(crate) fn set_fault_injection_enabled(&mut self, enabled: bool) {
self.fault_injection_enabled = enabled;
}
#[cfg(feature = "fault_injection")]
pub(crate) fn fault_injection_enabled(&self) -> bool {
self.fault_injection_enabled
}
#[allow(dead_code)]
pub(crate) fn activity_id(&self) -> &ActivityId {
&self.activity_id
}
#[allow(dead_code)]
pub(crate) fn request_count(&self) -> usize {
self.requests.len()
}
pub(crate) fn set_operation_status(
&mut self,
status_code: StatusCode,
sub_status_code: Option<SubStatusCode>,
) {
self.status = Some(CosmosStatus::from_parts(status_code, sub_status_code));
}
pub(crate) fn start_request(
&mut self,
execution_context: ExecutionContext,
pipeline_type: PipelineType,
transport_security: TransportSecurity,
transport_kind: TransportKind,
transport_http_version: TransportHttpVersion,
endpoint: &CosmosEndpoint,
) -> RequestHandle {
let request = RequestDiagnostics::new(
execution_context,
pipeline_type,
transport_security,
transport_kind,
transport_http_version,
endpoint,
);
let handle = RequestHandle(self.requests.len());
self.requests.push(request);
handle
}
pub(crate) fn complete_request(
&mut self,
handle: RequestHandle,
status_code: StatusCode,
sub_status: Option<SubStatusCode>,
) {
if let Some(request) = self.requests.get_mut(handle.0) {
request.complete(status_code, sub_status);
}
}
pub(crate) fn timeout_request(&mut self, handle: RequestHandle) {
if let Some(request) = self.requests.get_mut(handle.0) {
request.timeout();
}
}
pub(crate) fn fail_transport_request(
&mut self,
handle: RequestHandle,
error: impl Into<String>,
request_sent: RequestSentStatus,
status: CosmosStatus,
) {
if let Some(request) = self.requests.get_mut(handle.0) {
request.fail_transport(error, request_sent, status);
}
}
pub(crate) fn update_request(
&mut self,
handle: RequestHandle,
f: impl FnOnce(&mut RequestDiagnostics),
) {
if let Some(request) = self.requests.get_mut(handle.0) {
debug_assert!(
!request.is_completed(),
"update_request called after complete_request - updates should occur before completion"
);
if !request.is_completed() {
f(request);
}
}
}
pub(crate) fn add_event(&mut self, handle: RequestHandle, event: RequestEvent) {
if let Some(request) = self.requests.get_mut(handle.0) {
request.add_event(event);
}
}
pub(crate) fn set_transport_shard(
&mut self,
handle: RequestHandle,
transport_shard: TransportShardDiagnostics,
) {
if let Some(request) = self.requests.get_mut(handle.0) {
request.set_transport_shard(transport_shard);
}
}
pub(crate) fn add_failed_transport_shard(
&mut self,
handle: RequestHandle,
failed_transport_shard: FailedTransportShardDiagnostics,
) {
if let Some(request) = self.requests.get_mut(handle.0) {
request.add_failed_transport_shard(failed_transport_shard);
}
}
pub(crate) fn increment_local_shard_retry_count(&mut self, handle: RequestHandle) {
if let Some(request) = self.requests.get_mut(handle.0) {
request.increment_local_shard_retry_count();
}
}
#[cfg(feature = "fault_injection")]
pub(crate) fn set_fault_injection_evaluations(
&mut self,
handle: RequestHandle,
evaluations: Vec<crate::fault_injection::FaultInjectionEvaluation>,
) {
if let Some(request) = self.requests.get_mut(handle.0) {
request.set_fault_injection_evaluations(evaluations);
}
}
pub(crate) fn complete(self) -> DiagnosticsContext {
let duration = self.started_at.elapsed();
DiagnosticsContext {
activity_id: self.activity_id,
duration,
requests: Arc::new(self.requests),
status: self.status,
options: self.options,
cpu_monitor: self.cpu_monitor,
machine_id: self.machine_id,
#[cfg(feature = "fault_injection")]
fault_injection_enabled: self.fault_injection_enabled,
#[cfg(not(feature = "fault_injection"))]
fault_injection_enabled: false,
#[cfg(test)]
test_system_usage: self.test_system_usage,
cached_json_detailed: OnceLock::new(),
cached_json_summary: OnceLock::new(),
}
}
#[cfg(test)]
fn set_test_system_usage(&mut self, snapshot: SystemUsageSnapshot) {
self.test_system_usage = Some(snapshot);
}
}
#[non_exhaustive]
#[derive(Debug)]
pub struct DiagnosticsContext {
activity_id: ActivityId,
duration: Duration,
requests: Arc<Vec<RequestDiagnostics>>,
status: Option<CosmosStatus>,
options: Arc<DiagnosticsOptions>,
cpu_monitor: Option<CpuMemoryMonitor>,
machine_id: Option<Arc<String>>,
fault_injection_enabled: bool,
#[cfg(test)]
test_system_usage: Option<SystemUsageSnapshot>,
cached_json_detailed: OnceLock<String>,
cached_json_summary: OnceLock<String>,
}
impl DiagnosticsContext {
pub fn activity_id(&self) -> &ActivityId {
&self.activity_id
}
pub fn duration(&self) -> Duration {
self.duration
}
pub fn status(&self) -> Option<&CosmosStatus> {
self.status.as_ref()
}
pub fn total_request_charge(&self) -> RequestCharge {
self.requests.iter().map(|r| r.request_charge).sum()
}
pub fn request_count(&self) -> usize {
self.requests.len()
}
pub fn regions_contacted(&self) -> Vec<Region> {
let mut regions: Vec<Region> = self
.requests
.iter()
.filter_map(|r| r.region.clone())
.collect();
regions.sort();
regions.dedup();
regions
}
pub fn requests(&self) -> Arc<Vec<RequestDiagnostics>> {
Arc::clone(&self.requests)
}
pub fn machine_id(&self) -> Option<&str> {
self.machine_id.as_ref().map(|s| s.as_str())
}
pub fn fault_injection_enabled(&self) -> bool {
self.fault_injection_enabled
}
pub fn to_json_string(&self, verbosity: Option<DiagnosticsVerbosity>) -> &str {
let effective_verbosity = match verbosity.unwrap_or(self.options.default_verbosity()) {
DiagnosticsVerbosity::Default => self.options.default_verbosity(),
v => v,
};
match effective_verbosity {
DiagnosticsVerbosity::Default | DiagnosticsVerbosity::Detailed => self
.cached_json_detailed
.get_or_init(|| self.compute_json_detailed()),
DiagnosticsVerbosity::Summary => self
.cached_json_summary
.get_or_init(|| self.compute_json_summary(self.options.max_summary_size_bytes())),
}
}
fn resolve_system_usage(&self) -> Option<SystemUsageSnapshot> {
#[cfg(test)]
if let Some(snapshot) = &self.test_system_usage {
return Some(snapshot.clone());
}
self.cpu_monitor.as_ref().map(SystemUsageSnapshot::capture)
}
fn compute_json_detailed(&self) -> String {
let total_duration_ms = self.duration.as_millis() as u64;
let system_usage = self.resolve_system_usage();
let output = DiagnosticsOutput {
activity_id: &self.activity_id,
total_duration_ms,
total_request_charge: self.requests.iter().map(|r| r.request_charge).sum(),
request_count: self.requests.len(),
system_usage,
machine_id: self.machine_id.as_ref().map(|s| s.as_str()),
payload: DiagnosticsPayload::Requests {
requests: &self.requests,
},
};
serde_json::to_string(&output)
.unwrap_or_else(|e| serde_json::json!({"error": e.to_string()}).to_string())
}
fn compute_json_summary(&self, max_size: usize) -> String {
let total_duration_ms = self.duration.as_millis() as u64;
let mut region_groups = HashMap::<Option<Region>, Vec<&RequestDiagnostics>>::new();
for req in self.requests.iter() {
region_groups
.entry(req.region.clone())
.or_default()
.push(req);
}
let mut region_summaries = Vec::new();
for (region, requests) in region_groups {
region_summaries.push(build_region_summary(region, requests));
}
region_summaries.sort_by(|a, b| a.region.cmp(&b.region));
let output = DiagnosticsOutput {
activity_id: &self.activity_id,
total_duration_ms,
total_request_charge: self.requests.iter().map(|r| r.request_charge).sum(),
request_count: self.requests.len(),
system_usage: self.resolve_system_usage(),
machine_id: self.machine_id.as_ref().map(|s| s.as_str()),
payload: DiagnosticsPayload::Summary {
regions: region_summaries,
},
};
let json = serde_json::to_string(&output)
.unwrap_or_else(|e| serde_json::json!({"error": e.to_string()}).to_string());
if json.len() <= max_size {
json
} else {
let truncated = TruncatedOutput {
activity_id: &self.activity_id,
total_duration_ms,
request_count: self.requests.len(),
truncated: true,
message:
"Output truncated to fit size limit. Use Detailed verbosity for full diagnostics.",
};
serde_json::to_string(&truncated)
.unwrap_or_else(|e| serde_json::json!({"error": e.to_string()}).to_string())
}
}
}
impl Clone for DiagnosticsContext {
fn clone(&self) -> Self {
Self {
activity_id: self.activity_id.clone(),
duration: self.duration,
requests: Arc::clone(&self.requests),
status: self.status,
options: Arc::clone(&self.options),
cpu_monitor: self.cpu_monitor.clone(),
machine_id: self.machine_id.clone(),
fault_injection_enabled: self.fault_injection_enabled,
#[cfg(test)]
test_system_usage: self.test_system_usage.clone(),
cached_json_detailed: self
.cached_json_detailed
.get()
.cloned()
.map(OnceLock::from)
.unwrap_or_default(),
cached_json_summary: self
.cached_json_summary
.get()
.cloned()
.map(OnceLock::from)
.unwrap_or_default(),
}
}
}
impl PartialEq for DiagnosticsContext {
fn eq(&self, other: &Self) -> bool {
self.activity_id == other.activity_id
&& self.duration == other.duration
&& self.requests == other.requests
&& self.status == other.status
&& self.options == other.options
}
}
impl Eq for DiagnosticsContext {}
fn build_region_summary(
region: Option<Region>,
requests: Vec<&RequestDiagnostics>,
) -> RegionSummary {
let count = requests.len();
let total_charge: RequestCharge = requests.iter().map(|r| r.request_charge).sum();
let first = requests.first().map(|r| RequestSummary::from(*r));
let last = if count > 1 {
requests.last().map(|r| RequestSummary::from(*r))
} else {
None
};
let middle_requests: Vec<_> = if count > 2 {
requests[1..count - 1].to_vec()
} else {
Vec::new()
};
let deduped_groups = deduplicate_requests(middle_requests);
RegionSummary {
region: region.as_ref().map(|r| r.to_string()).unwrap_or_default(),
request_count: count,
total_request_charge: total_charge,
first,
last,
deduplicated_groups: deduped_groups,
}
}
#[derive(Clone, Debug, Hash, PartialEq, Eq)]
struct DeduplicationKey {
endpoint: String,
status: CosmosStatus,
execution_context: ExecutionContext,
}
fn deduplicate_requests(requests: Vec<&RequestDiagnostics>) -> Vec<DeduplicatedGroup> {
let mut groups = HashMap::<DeduplicationKey, Vec<&RequestDiagnostics>>::new();
for req in requests {
let key = DeduplicationKey {
endpoint: req.endpoint.clone(),
status: req.status,
execution_context: req.execution_context,
};
groups.entry(key).or_default().push(req);
}
groups
.into_iter()
.map(|(key, reqs)| {
let mut durations: Vec<u64> = reqs.iter().map(|r| r.duration_ms).collect();
durations.sort_unstable();
let total_charge: RequestCharge = reqs.iter().map(|r| r.request_charge).sum();
DeduplicatedGroup {
endpoint: key.endpoint,
status: key.status,
execution_context: key.execution_context,
count: reqs.len(),
total_request_charge: total_charge,
min_duration_ms: durations.first().copied().unwrap_or(0),
max_duration_ms: durations.last().copied().unwrap_or(0),
p50_duration_ms: percentile_sorted(&durations, 50),
}
})
.collect()
}
fn percentile_sorted(values: &[u64], p: u8) -> u64 {
if values.is_empty() {
return 0;
}
let index = ((p as f64 / 100.0) * (values.len() - 1) as f64).round() as usize;
values[index.min(values.len() - 1)]
}
#[cfg(test)]
mod tests {
use super::*;
fn make_options() -> Arc<DiagnosticsOptions> {
Arc::new(DiagnosticsOptions::default())
}
fn make_context_with<F>(activity_id: ActivityId, f: F) -> DiagnosticsContext
where
F: FnOnce(&mut DiagnosticsContextBuilder),
{
let mut builder = DiagnosticsContextBuilder::new(activity_id, make_options());
f(&mut builder);
builder.complete()
}
trait TestBuilderExt {
fn start_test_request(
&mut self,
execution_context: ExecutionContext,
region: Option<Region>,
endpoint: &str,
) -> RequestHandle;
}
impl TestBuilderExt for DiagnosticsContextBuilder {
fn start_test_request(
&mut self,
execution_context: ExecutionContext,
region: Option<Region>,
endpoint: &str,
) -> RequestHandle {
let cosmos_endpoint = match region {
Some(r) => CosmosEndpoint::regional(r, url::Url::parse(endpoint).unwrap()),
None => CosmosEndpoint::global(url::Url::parse(endpoint).unwrap()),
};
self.start_request(
execution_context,
PipelineType::DataPlane,
TransportSecurity::Secure,
TransportKind::Gateway,
TransportHttpVersion::Http11,
&cosmos_endpoint,
)
}
}
fn normalize_diagnostics_json(json: &str) -> serde_json::Value {
let mut value: serde_json::Value = serde_json::from_str(json)
.unwrap_or_else(|e| panic!("Failed to parse diagnostics JSON: {e}\nJSON: {json}"));
if let Some(obj) = value.as_object_mut() {
if obj.contains_key("total_duration_ms") {
obj.insert(
"total_duration_ms".to_string(),
serde_json::Value::Number(0.into()),
);
}
}
if let Some(requests) = value.get_mut("requests").and_then(|v| v.as_array_mut()) {
for req in requests {
if let Some(obj) = req.as_object_mut() {
if obj.contains_key("duration_ms") {
obj.insert(
"duration_ms".to_string(),
serde_json::Value::Number(0.into()),
);
}
}
}
}
if let Some(regions) = value.get_mut("regions").and_then(|v| v.as_array_mut()) {
for region in regions {
for key in &["first", "last"] {
if let Some(summary) = region.get_mut(*key).and_then(|v| v.as_object_mut()) {
if summary.contains_key("duration_ms") {
summary.insert(
"duration_ms".to_string(),
serde_json::Value::Number(0.into()),
);
}
}
}
if let Some(groups) = region
.get_mut("deduplicated_groups")
.and_then(|v| v.as_array_mut())
{
for group in groups {
if let Some(obj) = group.as_object_mut() {
for key in &["min_duration_ms", "max_duration_ms", "p50_duration_ms"] {
if obj.contains_key(*key) {
obj.insert(
key.to_string(),
serde_json::Value::Number(0.into()),
);
}
}
}
}
}
}
}
value
}
#[test]
fn builder_new_context_has_activity_id() {
let activity_id = ActivityId::new_uuid();
let ctx = make_context_with(activity_id.clone(), |_| {});
assert_eq!(ctx.activity_id(), &activity_id);
}
#[test]
fn builder_start_and_complete_request() {
let ctx = make_context_with(ActivityId::new_uuid(), |builder| {
let handle = builder.start_test_request(
ExecutionContext::Initial,
Some(Region::WEST_US_2),
"https://test.documents.azure.com",
);
std::thread::sleep(std::time::Duration::from_millis(10));
builder.complete_request(handle, StatusCode::Ok, None);
});
let requests = ctx.requests();
assert_eq!(requests.len(), 1);
assert_eq!(requests[0].status().status_code(), StatusCode::Ok);
assert!(requests[0].duration_ms >= 10);
assert!(requests[0].completed_at.is_some());
}
#[test]
fn builder_timeout_request() {
let ctx = make_context_with(ActivityId::new_uuid(), |builder| {
let handle = builder.start_test_request(
ExecutionContext::Initial,
Some(Region::WEST_US_2),
"https://test.documents.azure.com",
);
builder.timeout_request(handle);
});
let requests = ctx.requests();
assert!(requests[0].timed_out);
}
#[test]
fn builder_update_request_with_charge() {
let ctx = make_context_with(ActivityId::new_uuid(), |builder| {
let handle = builder.start_test_request(
ExecutionContext::Initial,
Some(Region::WEST_US_2),
"https://test.documents.azure.com",
);
builder.update_request(handle, |req| {
req.request_charge = RequestCharge::new(5.5);
});
});
assert_eq!(ctx.total_request_charge(), RequestCharge::new(5.5));
}
#[test]
fn total_charge_sums_all_requests() {
let ctx = make_context_with(ActivityId::new_uuid(), |builder| {
let h1 = builder.start_test_request(
ExecutionContext::Initial,
Some(Region::WEST_US_2),
"https://test.documents.azure.com",
);
builder.update_request(h1, |req| req.request_charge = RequestCharge::new(3.0));
let h2 = builder.start_test_request(
ExecutionContext::Retry,
Some(Region::WEST_US_2),
"https://test.documents.azure.com",
);
builder.update_request(h2, |req| req.request_charge = RequestCharge::new(2.5));
});
assert!((ctx.total_request_charge().value() - 5.5).abs() < f64::EPSILON);
}
#[test]
fn regions_contacted_deduplicates() {
let ctx = make_context_with(ActivityId::new_uuid(), |builder| {
builder.start_test_request(
ExecutionContext::Initial,
Some(Region::WEST_US_2),
"https://test.westus2.documents.azure.com",
);
builder.start_test_request(
ExecutionContext::Retry,
Some(Region::WEST_US_2),
"https://test.westus2.documents.azure.com",
);
builder.start_test_request(
ExecutionContext::RegionFailover,
Some(Region::EAST_US_2),
"https://test.eastus2.documents.azure.com",
);
});
let regions = ctx.regions_contacted();
assert_eq!(regions.len(), 2);
}
#[test]
fn to_json_detailed() {
let ctx = make_context_with(ActivityId::from_string("test-id".to_string()), |builder| {
let handle = builder.start_test_request(
ExecutionContext::Initial,
Some(Region::WEST_US_2),
"https://test.documents.azure.com",
);
builder.update_request(handle, |req| req.request_charge = RequestCharge::new(1.0));
builder.complete_request(handle, StatusCode::Ok, None);
});
let json = ctx.to_json_string(Some(DiagnosticsVerbosity::Detailed));
let actual = normalize_diagnostics_json(json);
let expected: serde_json::Value = {
#[cfg(feature = "fault_injection")]
{
serde_json::json!({
"activity_id": "test-id",
"total_duration_ms": 0,
"total_request_charge": 1.0,
"request_count": 1,
"requests": [{
"execution_context": "initial",
"pipeline_type": "data_plane",
"transport_security": "secure",
"transport_kind": "gateway",
"transport_http_version": "http11",
"region": "westus2",
"endpoint": "https://test.documents.azure.com/",
"status": "200",
"request_charge": 1.0,
"activity_id": null,
"session_token": null,
"server_duration_ms": null,
"duration_ms": 0,
"events": [],
"timed_out": false,
"request_sent": "sent",
"error": null,
"fault_injection_evaluations": []
}]
})
}
#[cfg(not(feature = "fault_injection"))]
{
serde_json::json!({
"activity_id": "test-id",
"total_duration_ms": 0,
"total_request_charge": 1.0,
"request_count": 1,
"requests": [{
"execution_context": "initial",
"pipeline_type": "data_plane",
"transport_security": "secure",
"transport_kind": "gateway",
"transport_http_version": "http11",
"region": "westus2",
"endpoint": "https://test.documents.azure.com/",
"status": "200",
"request_charge": 1.0,
"activity_id": null,
"session_token": null,
"server_duration_ms": null,
"duration_ms": 0,
"events": [],
"timed_out": false,
"request_sent": "sent",
"error": null
}]
})
}
};
assert_eq!(actual, expected, "Detailed JSON mismatch.\nActual:\n{json}");
}
#[test]
fn to_json_summary() {
let ctx = make_context_with(ActivityId::from_string("test-id".to_string()), |builder| {
for i in 0..5 {
let handle = builder.start_test_request(
ExecutionContext::Retry,
Some(Region::WEST_US_2),
"https://test.documents.azure.com",
);
builder.update_request(handle, |req| {
req.request_charge = RequestCharge::new(i as f64)
});
builder.complete_request(handle, StatusCode::TooManyRequests, None);
}
});
let json = ctx.to_json_string(Some(DiagnosticsVerbosity::Summary));
let actual = normalize_diagnostics_json(json);
let expected: serde_json::Value = serde_json::json!({
"activity_id": "test-id",
"total_duration_ms": 0,
"total_request_charge": 10.0,
"request_count": 5,
"regions": [{
"region": "westus2",
"request_count": 5,
"total_request_charge": 10.0,
"first": {
"execution_context": "retry",
"endpoint": "https://test.documents.azure.com/",
"status": "429",
"request_charge": 0.0,
"duration_ms": 0,
"timed_out": false
},
"last": {
"execution_context": "retry",
"endpoint": "https://test.documents.azure.com/",
"status": "429",
"request_charge": 4.0,
"duration_ms": 0,
"timed_out": false
},
"deduplicated_groups": [{
"endpoint": "https://test.documents.azure.com/",
"status": "429",
"execution_context": "retry",
"count": 3,
"total_request_charge": 6.0,
"min_duration_ms": 0,
"max_duration_ms": 0,
"p50_duration_ms": 0
}]
}]
});
assert_eq!(actual, expected, "Summary JSON mismatch.\nActual:\n{json}");
}
#[test]
fn json_caching_detailed() {
let ctx = make_context_with(
ActivityId::from_string("cache-test".to_string()),
|builder| {
let handle = builder.start_test_request(
ExecutionContext::Initial,
Some(Region::WEST_US_2),
"https://test.documents.azure.com",
);
builder.complete_request(handle, StatusCode::Ok, None);
},
);
let json1 = ctx.to_json_string(Some(DiagnosticsVerbosity::Detailed));
let json2 = ctx.to_json_string(Some(DiagnosticsVerbosity::Detailed));
assert_eq!(json1, json2);
assert!(std::ptr::eq(json1, json2)); }
#[test]
fn requests_returns_arc() {
let ctx = make_context_with(ActivityId::new_uuid(), |builder| {
builder.start_test_request(
ExecutionContext::Initial,
Some(Region::WEST_US_2),
"https://test.documents.azure.com",
);
});
let requests1 = ctx.requests();
let requests2 = ctx.requests();
assert!(Arc::ptr_eq(&requests1, &requests2));
}
#[test]
fn duration_is_captured() {
let ctx = make_context_with(ActivityId::new_uuid(), |builder| {
std::thread::sleep(std::time::Duration::from_millis(10));
builder.start_test_request(
ExecutionContext::Initial,
Some(Region::WEST_US_2),
"https://test.documents.azure.com",
);
});
assert!(ctx.duration().as_millis() >= 10);
}
#[test]
fn status_codes_stored() {
let mut builder = DiagnosticsContextBuilder::new(ActivityId::new_uuid(), make_options());
builder.set_operation_status(
StatusCode::NotFound,
Some(SubStatusCode::READ_SESSION_NOT_AVAILABLE),
);
let ctx = builder.complete();
let status = ctx.status().unwrap();
assert_eq!(status.status_code(), StatusCode::NotFound);
assert!(status.is_read_session_not_available());
}
#[test]
fn transport_failure_request_uses_transport_generated_503() {
let mut builder = DiagnosticsContextBuilder::new(ActivityId::new_uuid(), make_options());
let handle = builder.start_test_request(
ExecutionContext::Initial,
Some(Region::WEST_US_2),
"https://test.documents.azure.com",
);
builder.fail_transport_request(
handle,
"connection refused",
RequestSentStatus::Unknown,
CosmosStatus::TRANSPORT_GENERATED_503,
);
let ctx = builder.complete();
let requests = ctx.requests();
let status = requests[0].status();
assert_eq!(status, &CosmosStatus::TRANSPORT_GENERATED_503);
assert_eq!(requests[0].error(), Some("connection refused"));
}
#[test]
fn percentile_calculation() {
assert_eq!(percentile_sorted(&[], 50), 0);
assert_eq!(percentile_sorted(&[100], 50), 100);
assert_eq!(percentile_sorted(&[10, 20, 30, 40, 50], 50), 30);
assert_eq!(percentile_sorted(&[10, 20, 30, 40, 50], 0), 10);
assert_eq!(percentile_sorted(&[10, 20, 30, 40, 50], 100), 50);
}
#[test]
fn update_before_complete_succeeds() {
let mut builder = DiagnosticsContextBuilder::new(ActivityId::new_uuid(), make_options());
let handle = builder.start_test_request(
ExecutionContext::Initial,
Some(Region::WEST_US_2),
"https://test.documents.azure.com",
);
builder.update_request(handle, |req| {
req.request_charge = RequestCharge::new(5.5);
});
builder.complete_request(handle, StatusCode::Ok, None);
let ctx = builder.complete();
let requests = ctx.requests();
assert_eq!(requests[0].request_charge, RequestCharge::new(5.5));
}
#[test]
fn update_after_complete_is_ignored_in_release() {
let mut builder = DiagnosticsContextBuilder::new(ActivityId::new_uuid(), make_options());
let handle = builder.start_test_request(
ExecutionContext::Initial,
Some(Region::WEST_US_2),
"https://test.documents.azure.com",
);
builder.update_request(handle, |req| {
req.request_charge = RequestCharge::new(5.5);
});
builder.complete_request(handle, StatusCode::Ok, None);
#[cfg(not(debug_assertions))]
{
builder.update_request(handle, |req| {
req.request_charge = RequestCharge::new(10.0); });
let ctx = builder.complete();
let requests = ctx.requests();
assert_eq!(requests[0].request_charge, RequestCharge::new(5.5));
}
}
#[test]
fn execution_context_display() {
assert_eq!(ExecutionContext::Initial.to_string(), "initial");
assert_eq!(ExecutionContext::Retry.to_string(), "retry");
assert_eq!(
ExecutionContext::TransportRetry.to_string(),
"transport_retry"
);
assert_eq!(ExecutionContext::Hedging.to_string(), "hedging");
assert_eq!(
ExecutionContext::RegionFailover.to_string(),
"region_failover"
);
assert_eq!(
ExecutionContext::CircuitBreakerProbe.to_string(),
"circuit_breaker_probe"
);
}
#[test]
fn pipeline_type_classification() {
assert!(PipelineType::Metadata.is_metadata());
assert!(!PipelineType::Metadata.is_data_plane());
assert!(PipelineType::DataPlane.is_data_plane());
assert!(!PipelineType::DataPlane.is_metadata());
}
#[test]
fn transport_security_classification() {
assert!(TransportSecurity::Secure.is_secure());
assert!(!TransportSecurity::Secure.is_emulator());
assert!(TransportSecurity::EmulatorWithInsecureCertificates.is_emulator());
assert!(!TransportSecurity::EmulatorWithInsecureCertificates.is_secure());
}
#[test]
fn transport_kind_classification() {
assert!(TransportKind::Gateway.is_gateway());
assert!(!TransportKind::Gateway.is_gateway20());
assert!(TransportKind::Gateway20.is_gateway20());
assert!(!TransportKind::Gateway20.is_gateway());
}
#[test]
fn transport_http_version_classification() {
assert!(TransportHttpVersion::Http11.is_http11());
assert!(!TransportHttpVersion::Http11.is_http2());
assert!(TransportHttpVersion::Http2.is_http2());
assert!(!TransportHttpVersion::Http2.is_http11());
}
#[test]
fn transport_security_default() {
assert_eq!(TransportSecurity::default(), TransportSecurity::Secure);
}
#[test]
fn transport_kind_default() {
assert_eq!(TransportKind::default(), TransportKind::Gateway);
}
#[test]
fn pipeline_type_serialization() {
assert_eq!(
serde_json::to_string(&PipelineType::Metadata).unwrap(),
"\"metadata\""
);
assert_eq!(
serde_json::to_string(&PipelineType::DataPlane).unwrap(),
"\"data_plane\""
);
}
#[test]
fn transport_security_serialization() {
assert_eq!(
serde_json::to_string(&TransportSecurity::Secure).unwrap(),
"\"secure\""
);
assert_eq!(
serde_json::to_string(&TransportSecurity::EmulatorWithInsecureCertificates).unwrap(),
"\"emulator_with_insecure_certificates\""
);
}
#[test]
fn transport_kind_serialization() {
assert_eq!(
serde_json::to_string(&TransportKind::Gateway).unwrap(),
"\"gateway\""
);
assert_eq!(
serde_json::to_string(&TransportKind::Gateway20).unwrap(),
"\"gateway20\""
);
}
#[test]
fn transport_http_version_serialization() {
assert_eq!(
serde_json::to_string(&TransportHttpVersion::Http11).unwrap(),
"\"http11\""
);
assert_eq!(
serde_json::to_string(&TransportHttpVersion::Http2).unwrap(),
"\"http2\""
);
}
#[test]
fn event_type_indicates_sent() {
assert!(!RequestEventType::TransportStart.indicates_request_sent());
assert!(!RequestEventType::TransportFailed.indicates_request_sent());
assert!(RequestEventType::ResponseHeadersReceived.indicates_request_sent());
assert!(RequestEventType::TransportComplete.indicates_request_sent());
}
#[test]
fn event_creation() {
let event = RequestEvent::new(RequestEventType::TransportStart);
assert_eq!(event.event_type, RequestEventType::TransportStart);
assert!(event.duration_ms.is_none());
assert!(event.details.is_none());
}
#[test]
fn event_with_details() {
let event = RequestEvent::new(RequestEventType::TransportFailed)
.with_details("connection reset by peer");
assert_eq!(event.details, Some("connection reset by peer".to_string()));
}
#[test]
fn event_with_duration() {
let event = RequestEvent::with_duration(
RequestEventType::TransportComplete,
Duration::from_millis(50),
);
assert_eq!(event.duration_ms, Some(50));
}
#[test]
fn json_without_system_info_omits_fields() {
let ctx = make_context_with(
ActivityId::from_string("test-no-system-info".to_string()),
|builder| {
builder.set_operation_status(StatusCode::Ok, None);
},
);
let json = ctx.to_json_string(Some(DiagnosticsVerbosity::Detailed));
let actual = normalize_diagnostics_json(json);
let expected: serde_json::Value = serde_json::json!({
"activity_id": "test-no-system-info",
"total_duration_ms": 0,
"total_request_charge": 0.0,
"request_count": 0,
"requests": []
});
assert_eq!(
actual, expected,
"JSON without system info mismatch.\nActual:\n{json}"
);
}
#[test]
fn json_with_machine_id() {
let mut builder = DiagnosticsContextBuilder::new(
ActivityId::from_string("test-machine-id".to_string()),
make_options(),
);
builder.set_operation_status(StatusCode::Ok, None);
builder.set_machine_id(Arc::new("vmId_test-vm-123".to_string()));
let ctx = builder.complete();
let json = ctx.to_json_string(Some(DiagnosticsVerbosity::Detailed));
let actual = normalize_diagnostics_json(json);
let expected: serde_json::Value = serde_json::json!({
"activity_id": "test-machine-id",
"total_duration_ms": 0,
"total_request_charge": 0.0,
"request_count": 0,
"machine_id": "vmId_test-vm-123",
"requests": []
});
assert_eq!(
actual, expected,
"Detailed JSON with machine_id mismatch.\nActual:\n{json}"
);
let json_summary = ctx.to_json_string(Some(DiagnosticsVerbosity::Summary));
let actual_summary = normalize_diagnostics_json(json_summary);
let expected_summary: serde_json::Value = serde_json::json!({
"activity_id": "test-machine-id",
"total_duration_ms": 0,
"total_request_charge": 0.0,
"request_count": 0,
"machine_id": "vmId_test-vm-123",
"regions": []
});
assert_eq!(
actual_summary, expected_summary,
"Summary JSON with machine_id mismatch.\nActual:\n{json_summary}"
);
}
#[test]
fn json_with_system_usage() {
let mut builder = DiagnosticsContextBuilder::new(
ActivityId::from_string("test-system-usage".to_string()),
make_options(),
);
builder.set_operation_status(StatusCode::Ok, None);
builder.set_test_system_usage(SystemUsageSnapshot::new_for_test(
"(50.0%), (60.0%)".to_string(),
Some(4096),
4,
false,
));
let ctx = builder.complete();
let json = ctx.to_json_string(Some(DiagnosticsVerbosity::Detailed));
let actual = normalize_diagnostics_json(json);
let expected: serde_json::Value = serde_json::json!({
"activity_id": "test-system-usage",
"total_duration_ms": 0,
"total_request_charge": 0.0,
"request_count": 0,
"system_usage": {
"cpu": "(50.0%), (60.0%)",
"memory_available_mb": 4096,
"processor_count": 4,
"cpu_overloaded": false
},
"requests": []
});
assert_eq!(
actual, expected,
"JSON with system_usage mismatch.\nActual:\n{json}"
);
}
#[test]
fn machine_id_getter() {
let mut builder = DiagnosticsContextBuilder::new(ActivityId::new_uuid(), make_options());
builder.set_machine_id(Arc::new("uuid_abc-123".to_string()));
let ctx = builder.complete();
assert_eq!(ctx.machine_id(), Some("uuid_abc-123"));
}
#[test]
fn machine_id_none_when_not_set() {
let builder = DiagnosticsContextBuilder::new(ActivityId::new_uuid(), make_options());
let ctx = builder.complete();
assert_eq!(ctx.machine_id(), None);
}
}