import tempfile
import os
import pytest
def test_metric_reader_integration():
try:
from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry import metrics
except ImportError:
pytest.skip("OpenTelemetry SDK not installed")
import otlp_arrow_library
with tempfile.TemporaryDirectory() as tmpdir:
library = otlp_arrow_library.PyOtlpLibrary(
output_dir=tmpdir,
write_interval_secs=1
)
metric_exporter = library.metric_exporter_adapter()
try:
metric_reader = PeriodicExportingMetricReader(
metric_exporter,
export_interval_millis=1000
)
meter_provider = MeterProvider(metric_readers=[metric_reader])
metrics.set_meter_provider(meter_provider)
meter = metrics.get_meter(__name__)
counter = meter.create_counter(
name="test_counter",
description="Test counter",
unit="1"
)
counter.add(1, {"test": "value"})
metric_reader.force_flush()
metrics_dir = os.path.join(tmpdir, "otlp", "metrics")
assert os.path.exists(metrics_dir) or os.path.exists(tmpdir), "Metrics should be exported"
metrics.set_meter_provider(None)
library.shutdown()
except Exception as e:
pytest.skip(f"OpenTelemetry SDK integration test skipped: {e}")
def test_span_processor_integration():
try:
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry import trace
except ImportError:
pytest.skip("OpenTelemetry SDK not installed")
import otlp_arrow_library
with tempfile.TemporaryDirectory() as tmpdir:
library = otlp_arrow_library.PyOtlpLibrary(
output_dir=tmpdir,
write_interval_secs=1
)
span_exporter = library.span_exporter_adapter()
try:
span_processor = BatchSpanProcessor(span_exporter)
tracer_provider = TracerProvider()
tracer_provider.add_span_processor(span_processor)
trace.set_tracer_provider(tracer_provider)
tracer = trace.get_tracer(__name__)
with tracer.start_as_current_span("test-span") as span:
span.set_attribute("test.attribute", "test-value")
span.set_status(trace.Status(trace.StatusCode.OK))
span_processor.force_flush()
traces_dir = os.path.join(tmpdir, "otlp", "traces")
assert os.path.exists(traces_dir) or os.path.exists(tmpdir), "Traces should be exported"
trace.set_tracer_provider(None)
library.shutdown()
except Exception as e:
pytest.skip(f"OpenTelemetry SDK integration test skipped: {e}")
if __name__ == "__main__":
pytest.main([__file__])