import tempfile
import shutil
import pytest
def test_metric_exporter_interface():
import otlp_arrow_library
tmpdir = tempfile.mkdtemp()
try:
library = otlp_arrow_library.PyOtlpLibrary(
output_dir=tmpdir,
write_interval_secs=1
)
metric_exporter = library.metric_exporter_adapter()
assert metric_exporter is not None, "metric_exporter_adapter() should return an object"
assert hasattr(metric_exporter, "export"), "Adapter should have export() method"
assert hasattr(metric_exporter, "shutdown"), "Adapter should have shutdown() method"
assert hasattr(metric_exporter, "force_flush"), "Adapter should have force_flush() method"
assert hasattr(metric_exporter, "temporality"), "Adapter should have temporality() method"
result = metric_exporter.shutdown()
assert result is None, "shutdown() should return None"
flush_result = metric_exporter.force_flush(timeout_millis=1000)
assert flush_result is not None, "force_flush() should return a result"
temporality = metric_exporter.temporality()
assert temporality is not None, "temporality() should return a value"
library.shutdown()
finally:
shutil.rmtree(tmpdir, ignore_errors=True)
def test_metric_exporter_with_real_sdk_data():
import otlp_arrow_library
try:
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader
from opentelemetry import metrics
except ImportError:
pytest.skip("OpenTelemetry SDK not installed - install with: pip install opentelemetry-api opentelemetry-sdk")
tmpdir = tempfile.mkdtemp()
try:
library = otlp_arrow_library.PyOtlpLibrary(
output_dir=tmpdir,
write_interval_secs=1
)
metric_exporter = library.metric_exporter_adapter()
reader = PeriodicExportingMetricReader(
metric_exporter,
export_interval_millis=100 )
meter_provider = MeterProvider(metric_readers=[reader])
metrics.set_meter_provider(meter_provider)
try:
meter = metrics.get_meter(__name__)
counter = meter.create_counter(
"test_counter",
description="A test counter metric",
unit="1"
)
counter.add(5, {"environment": "test", "service": "test-service"})
counter.add(3, {"environment": "test", "service": "test-service"})
meter_provider.force_flush(timeout_millis=1000)
assert True, "Export completed successfully"
finally:
meter_provider.shutdown()
library.shutdown()
finally:
shutil.rmtree(tmpdir, ignore_errors=True)
def test_metric_exporter_direct_export():
import otlp_arrow_library
try:
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader
from opentelemetry import metrics
except ImportError:
pytest.skip("OpenTelemetry SDK not installed - install with: pip install opentelemetry-api opentelemetry-sdk")
tmpdir = tempfile.mkdtemp()
try:
library = otlp_arrow_library.PyOtlpLibrary(
output_dir=tmpdir,
write_interval_secs=1
)
metric_exporter = library.metric_exporter_adapter()
reader = PeriodicExportingMetricReader(
metric_exporter,
export_interval_millis=100
)
meter_provider = MeterProvider(metric_readers=[reader])
metrics.set_meter_provider(meter_provider)
try:
meter = metrics.get_meter(__name__)
counter = meter.create_counter("test_counter", description="Test counter")
gauge = meter.create_up_down_counter("test_gauge", description="Test gauge")
counter.add(10)
gauge.add(5)
reader.collect()
result = meter_provider.force_flush(timeout_millis=1000)
assert result, "Force flush should succeed"
finally:
meter_provider.shutdown()
library.shutdown()
finally:
shutil.rmtree(tmpdir, ignore_errors=True)
if __name__ == "__main__":
pytest.main([__file__])