#![allow(non_local_definitions, unsafe_op_in_unsafe_fn)]
pub mod conversion;
use crate::python::bindings::PyOtlpLibrary;
use pyo3::prelude::*;
pub mod gc {
use super::*;
pub type LibraryRef = Py<PyOtlpLibrary>;
pub fn is_library_valid(library_ref: &LibraryRef, py: Python<'_>) -> bool {
library_ref.try_borrow(py).is_ok()
}
}
pub use gc::{LibraryRef, is_library_valid};
use crate::python::adapters::conversion::{
convert_metric_export_result_to_dict, convert_span_sequence_to_dict_list, error_message_to_py,
};
use pyo3::types::PyString;
#[pyclass]
pub struct PyOtlpMetricExporterAdapter {
pub(crate) library: LibraryRef,
pub(crate) temporality: std::sync::Mutex<Option<String>>, }
#[pymethods]
impl PyOtlpMetricExporterAdapter {
#[pyo3(signature = (metrics_data, *, timeout_millis=None))]
#[allow(unused_variables, unsafe_op_in_unsafe_fn)] pub fn export(
&self,
metrics_data: &PyAny, timeout_millis: Option<f64>, py: Python<'_>,
) -> PyResult<PyObject> {
if !is_library_valid(&self.library, py) {
return Err(error_message_to_py(
"Library instance is no longer valid".to_string(),
));
}
let metrics_dict = match convert_metric_export_result_to_dict(metrics_data, py) {
Ok(dict) => dict,
Err(e) => {
return Err(e);
}
};
let scope_metrics = metrics_dict.get_item("scope_metrics").ok().flatten();
let is_empty = if let Some(scope_metrics_list) = scope_metrics {
if let Ok(list) = scope_metrics_list.downcast::<pyo3::types::PyList>() {
list.is_empty()
} else {
false
}
} else {
true
};
if is_empty {
let export_result = py
.import("opentelemetry.sdk.metrics.export")
.and_then(|module| module.getattr("ExportResult"))
.and_then(|export_result| export_result.getattr("SUCCESS"));
match export_result {
Ok(success) => return Ok(success.into()),
Err(_) => {
use pyo3::types::PyDict;
let result_dict = PyDict::new(py);
result_dict.set_item("success", true)?;
return Ok(result_dict.into());
}
}
}
use opentelemetry_proto::tonic::collector::metrics::v1::ExportMetricsServiceRequest;
let protobuf_request = ExportMetricsServiceRequest::default();
let library_ref = self.library.borrow(py);
let library = library_ref.library.clone();
let runtime = library_ref.runtime.clone();
drop(library_ref);
py.allow_threads(|| {
runtime
.block_on(async move { library.export_metrics(protobuf_request).await })
.map_err(|e| error_message_to_py(format!("Failed to export metrics: {}", e)))
})?;
let export_result = py
.import("opentelemetry.sdk.metrics.export")
.and_then(|module| module.getattr("ExportResult"))
.and_then(|export_result| export_result.getattr("SUCCESS"));
match export_result {
Ok(success) => Ok(success.into()),
Err(_) => {
Ok(py.None())
}
}
}
#[pyo3(signature = (*, timeout=None, timeout_millis=None))]
#[allow(unused_variables)] pub fn shutdown(
&self,
timeout: Option<f64>,
timeout_millis: Option<f64>,
_py: Python<'_>,
) -> PyResult<()> {
Ok(())
}
#[pyo3(signature = (*, timeout_millis=None))]
#[allow(unused_variables)] pub fn force_flush(&self, timeout_millis: Option<f64>, py: Python<'_>) -> PyResult<PyObject> {
if !is_library_valid(&self.library, py) {
return Err(error_message_to_py(
"Library instance is no longer valid".to_string(),
));
}
let library_ref = self.library.borrow(py);
let library = library_ref.library.clone();
let runtime = library_ref.runtime.clone();
drop(library_ref);
py.allow_threads(|| {
runtime
.block_on(async move { library.flush().await })
.map_err(|e| error_message_to_py(format!("Failed to flush metrics: {}", e)))
})?;
let export_result = py
.import("opentelemetry.sdk.metrics.export")
.and_then(|module| module.getattr("ExportResult"))
.and_then(|export_result| export_result.getattr("SUCCESS"));
match export_result {
Ok(success) => Ok(success.into()),
Err(_) => {
use pyo3::types::PyDict;
let result_dict = PyDict::new(py);
result_dict.set_item("success", true)?;
Ok(result_dict.into())
}
}
}
pub fn set_temporality(&self, temporality: &PyAny, _py: Python<'_>) -> PyResult<()> {
let temp_str = if let Ok(attr) = temporality.getattr("name") {
attr.to_string()
} else if let Ok(s) = temporality.extract::<String>() {
s
} else {
return Err(PyErr::new::<pyo3::exceptions::PyValueError, _>(
"Invalid temporality value. Expected Temporality.CUMULATIVE or Temporality.DELTA",
));
};
let temp_upper = temp_str.to_uppercase();
if temp_upper != "CUMULATIVE" && temp_upper != "DELTA" {
return Err(PyErr::new::<pyo3::exceptions::PyValueError, _>(format!(
"Invalid temporality: {}. Expected CUMULATIVE or DELTA",
temp_str
)));
}
*self.temporality.lock().unwrap() = Some(temp_upper);
Ok(())
}
pub fn temporality(&self, py: Python<'_>) -> PyResult<PyObject> {
let temp_str = self
.temporality
.lock()
.unwrap()
.as_ref()
.map(|s| s.clone())
.unwrap_or_else(|| "CUMULATIVE".to_string());
let temporality_result = py
.import("opentelemetry.sdk.metrics.export")
.and_then(|module| module.getattr("Temporality"))
.and_then(|temporality| temporality.getattr(temp_str.as_str()));
match temporality_result {
Ok(temp) => Ok(temp.into()),
Err(_) => {
Ok(PyString::new(py, temp_str.as_str()).into_py(py))
}
}
}
fn __repr__(&self) -> String {
"PyOtlpMetricExporterAdapter".to_string()
}
fn __getattr__(&self, name: &str, py: Python<'_>) -> PyResult<PyObject> {
match name {
"_preferred_temporality" => {
let temp_str = self
.temporality
.lock()
.unwrap()
.as_ref()
.map(|s| s.clone())
.unwrap_or_else(|| "CUMULATIVE".to_string());
let temporality_dict = pyo3::types::PyDict::new(py);
let agg_temporality = match py
.import("opentelemetry.sdk.metrics.export")
.and_then(|module| module.getattr("AggregationTemporality"))
.and_then(|agg_temp| agg_temp.getattr(temp_str.as_str()))
{
Ok(temp) => temp,
Err(_) => {
match py
.import("opentelemetry.sdk.metrics.export")
.and_then(|module| module.getattr("AggregationTemporality"))
.and_then(|agg_temp| agg_temp.getattr("CUMULATIVE"))
{
Ok(cum) => cum,
Err(_) => {
return Ok(temporality_dict.into());
}
}
}
};
if let Ok(metrics_module) = py.import("opentelemetry.sdk.metrics") {
let metric_types = [
"Counter",
"Histogram",
"UpDownCounter",
"ObservableCounter",
"ObservableGauge",
"ObservableUpDownCounter",
];
for metric_type_name in metric_types {
if let Ok(metric_type) = metrics_module.getattr(metric_type_name) {
let _ = temporality_dict.set_item(metric_type, agg_temporality);
}
}
}
Ok(temporality_dict.into())
}
"_preferred_aggregation" => {
let empty_dict = pyo3::types::PyDict::new(py);
Ok(empty_dict.into())
}
_ => {
Err(pyo3::exceptions::PyAttributeError::new_err(format!(
"'PyOtlpMetricExporterAdapter' object has no attribute '{}'",
name
)))
}
}
}
}
#[pyclass]
pub struct PyOtlpSpanExporterAdapter {
pub(crate) library: LibraryRef,
}
#[pymethods]
impl PyOtlpSpanExporterAdapter {
#[allow(unsafe_op_in_unsafe_fn)] pub fn export(&self, spans: &PyAny, py: Python<'_>) -> PyResult<PyObject> {
if !is_library_valid(&self.library, py) {
return Err(error_message_to_py(
"Library instance is no longer valid".to_string(),
));
}
let spans_list = convert_span_sequence_to_dict_list(spans, py)?;
let library_ref = self.library.borrow(py);
let result = library_ref
.export_traces(spans_list)
.map_err(|e| error_message_to_py(format!("Failed to export spans: {}", e)));
drop(library_ref); result?;
let span_export_result = py
.import("opentelemetry.sdk.trace.export")
.and_then(|module| module.getattr("SpanExportResult"))
.and_then(|span_export_result| span_export_result.getattr("SUCCESS"));
match span_export_result {
Ok(success) => Ok(success.into()),
Err(_) => Ok(py.None()),
}
}
#[pyo3(signature = (*, timeout=None, timeout_millis=None))]
#[allow(unused_variables)] pub fn shutdown(
&self,
timeout: Option<f64>,
timeout_millis: Option<f64>,
_py: Python<'_>,
) -> PyResult<()> {
Ok(())
}
#[pyo3(signature = (*, timeout_millis=None))]
#[allow(unused_variables)] pub fn force_flush(&self, timeout_millis: Option<f64>, py: Python<'_>) -> PyResult<PyObject> {
if !is_library_valid(&self.library, py) {
return Err(error_message_to_py(
"Library instance is no longer valid".to_string(),
));
}
let library_ref = self.library.borrow(py);
let library = library_ref.library.clone();
let runtime = library_ref.runtime.clone();
drop(library_ref);
py.allow_threads(|| {
runtime
.block_on(async move { library.flush().await })
.map_err(|e| error_message_to_py(format!("Failed to flush spans: {}", e)))
})?;
let span_export_result = py
.import("opentelemetry.sdk.trace.export")
.and_then(|module| module.getattr("SpanExportResult"))
.and_then(|span_export_result| span_export_result.getattr("SUCCESS"));
match span_export_result {
Ok(success) => Ok(success.into()),
Err(_) => Ok(py.None()),
}
}
fn __repr__(&self) -> String {
"PyOtlpSpanExporterAdapter".to_string()
}
}