from typing import Dict, Optional
import logging
try:
from opentelemetry import trace, context
from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator
HAS_OTEL = True
except ImportError:
HAS_OTEL = False
from briefcase.correlation.workflow import get_current_workflow
logger = logging.getLogger(__name__)
class TraceContextCarrier:
_propagator = None
@classmethod
def _get_propagator(cls):
if cls._propagator is None and HAS_OTEL:
cls._propagator = TraceContextTextMapPropagator()
return cls._propagator
@classmethod
def inject(cls, carrier: Optional[Dict] = None) -> Dict:
if carrier is None:
carrier = {}
if not HAS_OTEL:
logger.warning("OpenTelemetry not available, cannot inject trace context")
return carrier
propagator = cls._get_propagator()
if propagator:
try:
propagator.inject(carrier)
workflow = get_current_workflow()
if workflow:
existing_state = carrier.get("tracestate", "")
workflow_state = f"briefcase=workflow_id:{workflow.workflow_id}"
carrier["tracestate"] = f"{workflow_state},{existing_state}" if existing_state else workflow_state
except Exception as e:
logger.error(f"Failed to inject trace context: {e}")
return carrier
@classmethod
def extract(cls, carrier: Dict):
if not HAS_OTEL:
logger.warning("OpenTelemetry not available, cannot extract trace context")
return None
propagator = cls._get_propagator()
if propagator:
try:
ctx = propagator.extract(carrier)
return context.attach(ctx)
except Exception as e:
logger.error(f"Failed to extract trace context: {e}")
return None
return None
def inject_trace_context(headers: Optional[Dict] = None) -> Dict:
return TraceContextCarrier.inject(headers)
def extract_trace_context(headers: Dict):
return TraceContextCarrier.extract(headers)