#![allow(unsafe_code)]
use std::collections::HashMap;
use std::sync::Arc;
use arrow::array::{Array, BooleanArray, Float64Array, RecordBatch, StringArray, UInt64Array};
use arrow::datatypes::{DataType as ArrowDataType, Field, Schema};
use arrow::ffi::{FFI_ArrowArray, FFI_ArrowSchema, to_ffi};
use pyo3::exceptions::{PyRuntimeError, PyTypeError, PyValueError};
use pyo3::prelude::*;
use pyo3::types::PyCapsule;
use crate::core::report_assembler::ReportAssembler;
use crate::engines::columnar::RecordBatchAnalyzer;
use crate::types::{
ColumnProfile, ColumnStats, DataFrameLibrary, DataSource, ExecutionMetadata, MetricPack,
TruncationReason,
};
use super::config::PyProfilerConfig;
const ARROW_SCHEMA_NAME: &[u8] = b"arrow_schema\0";
const ARROW_ARRAY_NAME: &[u8] = b"arrow_array\0";
unsafe extern "C" fn pycapsule_schema_destructor(capsule: *mut pyo3::ffi::PyObject) {
if capsule.is_null() {
return;
}
let ptr =
unsafe { pyo3::ffi::PyCapsule_GetPointer(capsule, ARROW_SCHEMA_NAME.as_ptr().cast()) };
if !ptr.is_null() {
let schema_ptr = ptr as *mut FFI_ArrowSchema;
unsafe { drop(Box::from_raw(schema_ptr)) };
}
}
unsafe extern "C" fn pycapsule_array_destructor(capsule: *mut pyo3::ffi::PyObject) {
if capsule.is_null() {
return;
}
let ptr = unsafe { pyo3::ffi::PyCapsule_GetPointer(capsule, ARROW_ARRAY_NAME.as_ptr().cast()) };
if !ptr.is_null() {
let array_ptr = ptr as *mut FFI_ArrowArray;
unsafe { drop(Box::from_raw(array_ptr)) };
}
}
#[pyclass(name = "RecordBatch")]
pub struct PyRecordBatch {
inner: RecordBatch,
}
impl PyRecordBatch {
pub fn new(batch: RecordBatch) -> Self {
Self { inner: batch }
}
pub fn inner(&self) -> &RecordBatch {
&self.inner
}
}
#[pymethods]
impl PyRecordBatch {
#[getter]
fn num_rows(&self) -> usize {
self.inner.num_rows()
}
#[getter]
fn num_columns(&self) -> usize {
self.inner.num_columns()
}
#[getter]
fn column_names(&self) -> Vec<String> {
self.inner
.schema()
.fields()
.iter()
.map(|f| f.name().clone())
.collect()
}
fn __arrow_c_schema__<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyCapsule>> {
let schema = self.inner.schema();
let ffi_schema = FFI_ArrowSchema::try_from(schema.as_ref())
.map_err(|e| PyRuntimeError::new_err(format!("Failed to export schema: {}", e)))?;
let schema_ptr = Box::into_raw(Box::new(ffi_schema));
let capsule = unsafe {
let cap = pyo3::ffi::PyCapsule_New(
schema_ptr.cast(),
ARROW_SCHEMA_NAME.as_ptr().cast(),
Some(pycapsule_schema_destructor),
);
if cap.is_null() {
drop(Box::from_raw(schema_ptr));
return Err(PyRuntimeError::new_err("Failed to create schema PyCapsule"));
}
Bound::from_owned_ptr(py, cap)
.cast_into::<PyCapsule>()
.map_err(|_| PyRuntimeError::new_err("PyCapsule downcast failed"))?
};
Ok(capsule)
}
#[pyo3(signature = (requested_schema=None))]
fn __arrow_c_array__<'py>(
&self,
py: Python<'py>,
requested_schema: Option<Py<PyAny>>,
) -> PyResult<(Bound<'py, PyCapsule>, Bound<'py, PyCapsule>)> {
let _ = requested_schema;
let struct_array: arrow::array::StructArray = self.inner.clone().into();
let array_data = struct_array.into_data();
let (ffi_array, ffi_schema) = to_ffi(&array_data)
.map_err(|e| PyRuntimeError::new_err(format!("FFI export failed: {}", e)))?;
let schema_ptr = Box::into_raw(Box::new(ffi_schema));
let array_ptr = Box::into_raw(Box::new(ffi_array));
let schema_capsule = unsafe {
let cap = pyo3::ffi::PyCapsule_New(
schema_ptr.cast(),
ARROW_SCHEMA_NAME.as_ptr().cast(),
Some(pycapsule_schema_destructor),
);
if cap.is_null() {
drop(Box::from_raw(schema_ptr));
drop(Box::from_raw(array_ptr));
return Err(PyRuntimeError::new_err("Failed to create schema PyCapsule"));
}
Bound::from_owned_ptr(py, cap)
.cast_into::<PyCapsule>()
.map_err(|e| {
drop(Box::from_raw(schema_ptr));
drop(Box::from_raw(array_ptr));
PyRuntimeError::new_err(format!("Schema PyCapsule downcast failed: {}", e))
})?
};
let array_capsule = unsafe {
let cap = pyo3::ffi::PyCapsule_New(
array_ptr.cast(),
ARROW_ARRAY_NAME.as_ptr().cast(),
Some(pycapsule_array_destructor),
);
if cap.is_null() {
drop(Box::from_raw(array_ptr));
return Err(PyRuntimeError::new_err("Failed to create array PyCapsule"));
}
Bound::from_owned_ptr(py, cap)
.cast_into::<PyCapsule>()
.map_err(|e| {
drop(Box::from_raw(array_ptr));
PyRuntimeError::new_err(format!("Array PyCapsule downcast failed: {}", e))
})?
};
Ok((schema_capsule, array_capsule))
}
fn to_pandas<'py>(slf: &Bound<'py, Self>, py: Python<'py>) -> PyResult<Py<PyAny>> {
let pyarrow = py.import("pyarrow").map_err(|_| {
PyRuntimeError::new_err(
"pyarrow required for to_pandas(). Install with: pip install pyarrow",
)
})?;
let pa_record_batch = pyarrow.getattr("record_batch")?;
let pa_batch = pa_record_batch.call1((slf,))?;
let df = pa_batch.call_method0("to_pandas")?;
Ok(df.into())
}
fn to_polars<'py>(slf: &Bound<'py, Self>, py: Python<'py>) -> PyResult<Py<PyAny>> {
let polars = py.import("polars").map_err(|_| {
PyRuntimeError::new_err(
"polars required for to_polars(). Install with: pip install polars",
)
})?;
let pyarrow = py.import("pyarrow").map_err(|_| {
PyRuntimeError::new_err(
"pyarrow required for to_polars(). Install with: pip install pyarrow",
)
})?;
let pa_record_batch = pyarrow.getattr("record_batch")?;
let pa_batch = pa_record_batch.call1((slf,))?;
let pa_table = pyarrow
.getattr("Table")?
.call_method1("from_batches", (vec![&pa_batch],))?;
let df = polars.call_method1("from_arrow", (pa_table,))?;
Ok(df.into())
}
fn __repr__(&self) -> String {
format!(
"RecordBatch(rows={}, columns={}, columns={:?})",
self.inner.num_rows(),
self.inner.num_columns(),
self.column_names()
)
}
}
#[pyfunction]
pub fn analyze_csv_to_arrow(path: &str) -> PyResult<PyRecordBatch> {
use crate::engines::columnar::ArrowProfiler;
use std::path::Path;
let profiler = ArrowProfiler::new();
let report = profiler
.analyze_csv_file(Path::new(path))
.map_err(|e| PyRuntimeError::new_err(format!("Analysis failed: {}", e)))?;
let batch = profiles_to_record_batch(&report.column_profiles)
.map_err(|e| PyRuntimeError::new_err(format!("Batch conversion failed: {}", e)))?;
Ok(PyRecordBatch::new(batch))
}
#[pyfunction]
pub fn analyze_parquet_to_arrow(path: &str) -> PyResult<PyRecordBatch> {
use crate::analyze_parquet_with_quality;
use std::path::Path;
let report = analyze_parquet_with_quality(Path::new(path))
.map_err(|e| PyRuntimeError::new_err(format!("Parquet analysis failed: {}", e)))?;
let batch = profiles_to_record_batch(&report.column_profiles)
.map_err(|e| PyRuntimeError::new_err(format!("Batch conversion failed: {}", e)))?;
Ok(PyRecordBatch::new(batch))
}
#[pyfunction]
#[pyo3(signature = (df, name = "dataframe".to_string(), max_rows = None, config = None))]
pub fn profile_dataframe(
py: Python<'_>,
df: Py<PyAny>,
name: String,
max_rows: Option<usize>,
config: Option<&PyProfilerConfig>,
) -> PyResult<super::types::PyProfileReport> {
let start = std::time::Instant::now();
let packs = config.and_then(|c| c.metric_packs.as_deref());
let skip_statistics = !MetricPack::include_statistics(packs);
let skip_patterns = !MetricPack::include_patterns(packs);
let include_quality = MetricPack::include_quality(packs);
let effective_max_rows =
max_rows.or_else(|| config.and_then(|c| c.max_rows.map(|v| v as usize)));
let source_library = detect_dataframe_library(py, &df)?;
let batch = convert_dataframe_to_batch(py, &df, &source_library)?;
let (batch, truncated) = limit_batch_rows(batch, effective_max_rows);
let num_rows = batch.num_rows();
let num_cols = batch.num_columns();
let mut analyzer = RecordBatchAnalyzer::new();
analyzer
.process_batch(&batch)
.map_err(|e| PyRuntimeError::new_err(format!("Analysis failed: {}", e)))?;
let column_profiles = analyzer.to_profiles(skip_statistics, skip_patterns);
let sample_columns = if include_quality {
analyzer.create_sample_columns()
} else {
HashMap::new()
};
let scan_time_ms = start.elapsed().as_millis();
let memory_bytes = estimate_memory_bytes(py, &df, &source_library);
let mut exec = ExecutionMetadata::new(num_rows, num_cols, scan_time_ms);
if truncated {
exec = exec.with_truncation(TruncationReason::MaxRows(
effective_max_rows.unwrap_or(0) as u64
));
}
let mut assembler = ReportAssembler::new(
DataSource::DataFrame {
name,
source_library,
row_count: num_rows,
column_count: num_cols,
memory_bytes,
},
exec,
)
.columns(column_profiles);
if include_quality {
assembler = assembler.with_quality_data(sample_columns);
if let Some(dims) = config.and_then(|c| c.quality_dimensions.clone()) {
assembler = assembler.with_requested_dimensions(dims);
}
} else {
assembler = assembler.skip_quality();
}
let report = assembler.build();
Ok(super::types::PyProfileReport::new(report))
}
#[pyfunction]
#[pyo3(signature = (table, name = "arrow_table".to_string(), max_rows = None, config = None))]
pub fn profile_arrow(
py: Python<'_>,
table: Py<PyAny>,
name: String,
max_rows: Option<usize>,
config: Option<&PyProfilerConfig>,
) -> PyResult<super::types::PyProfileReport> {
let start = std::time::Instant::now();
let packs = config.and_then(|c| c.metric_packs.as_deref());
let skip_statistics = !MetricPack::include_statistics(packs);
let skip_patterns = !MetricPack::include_patterns(packs);
let include_quality = MetricPack::include_quality(packs);
let effective_max_rows =
max_rows.or_else(|| config.and_then(|c| c.max_rows.map(|v| v as usize)));
let bound = table.bind(py);
let batch = import_from_pyarrow(py, bound)?;
let (batch, truncated) = limit_batch_rows(batch, effective_max_rows);
let num_rows = batch.num_rows();
let num_cols = batch.num_columns();
let memory_bytes: Option<u64> = bound
.getattr("nbytes")
.and_then(|v| v.extract::<u64>())
.ok();
let mut analyzer = RecordBatchAnalyzer::new();
analyzer
.process_batch(&batch)
.map_err(|e| PyRuntimeError::new_err(format!("Analysis failed: {}", e)))?;
let column_profiles = analyzer.to_profiles(skip_statistics, skip_patterns);
let sample_columns = analyzer.create_sample_columns();
let scan_time_ms = start.elapsed().as_millis();
let mut exec = ExecutionMetadata::new(num_rows, num_cols, scan_time_ms);
if truncated {
exec = exec.with_truncation(TruncationReason::MaxRows(
effective_max_rows.unwrap_or(0) as u64
));
}
let mut assembler = ReportAssembler::new(
DataSource::DataFrame {
name,
source_library: DataFrameLibrary::PyArrow,
row_count: num_rows,
column_count: num_cols,
memory_bytes,
},
exec,
)
.columns(column_profiles);
if include_quality {
assembler = assembler.with_quality_data(sample_columns);
if let Some(dims) = config.and_then(|c| c.quality_dimensions.clone()) {
assembler = assembler.with_requested_dimensions(dims);
}
} else {
assembler = assembler.skip_quality();
}
let report = assembler.build();
Ok(super::types::PyProfileReport::new(report))
}
fn profiles_to_record_batch(profiles: &[ColumnProfile]) -> anyhow::Result<RecordBatch> {
let schema = Arc::new(Schema::new(vec![
Field::new("column_name", ArrowDataType::Utf8, false),
Field::new("data_type", ArrowDataType::Utf8, false),
Field::new("total_count", ArrowDataType::UInt64, false),
Field::new("null_count", ArrowDataType::UInt64, false),
Field::new("null_percentage", ArrowDataType::Float64, false),
Field::new("unique_count", ArrowDataType::UInt64, true),
Field::new("uniqueness_ratio", ArrowDataType::Float64, false),
Field::new("min", ArrowDataType::Float64, true),
Field::new("max", ArrowDataType::Float64, true),
Field::new("mean", ArrowDataType::Float64, true),
Field::new("std_dev", ArrowDataType::Float64, true),
Field::new("variance", ArrowDataType::Float64, true),
Field::new("median", ArrowDataType::Float64, true),
Field::new("mode", ArrowDataType::Float64, true),
Field::new("skewness", ArrowDataType::Float64, true),
Field::new("kurtosis", ArrowDataType::Float64, true),
Field::new("coefficient_of_variation", ArrowDataType::Float64, true),
Field::new("q1", ArrowDataType::Float64, true),
Field::new("q2", ArrowDataType::Float64, true),
Field::new("q3", ArrowDataType::Float64, true),
Field::new("iqr", ArrowDataType::Float64, true),
Field::new("is_approximate", ArrowDataType::Boolean, true),
]));
let names: StringArray = profiles.iter().map(|p| Some(p.name.as_str())).collect();
let types: StringArray = profiles
.iter()
.map(|p| Some(format!("{:?}", p.data_type)))
.collect();
let totals: UInt64Array = profiles
.iter()
.map(|p| Some(p.total_count as u64))
.collect();
let nulls: UInt64Array = profiles.iter().map(|p| Some(p.null_count as u64)).collect();
let null_pcts: Float64Array = profiles
.iter()
.map(|p| {
let pct = if p.total_count > 0 {
(p.null_count as f64 / p.total_count as f64) * 100.0
} else {
0.0
};
Some(pct)
})
.collect();
let uniques: UInt64Array = profiles
.iter()
.map(|p| p.unique_count.map(|u| u as u64))
.collect();
let unique_ratios: Float64Array = profiles
.iter()
.map(|p| {
let ratio = match p.unique_count {
Some(u) if p.total_count > 0 => (u as f64 / p.total_count as f64) * 100.0,
_ => 0.0,
};
Some(ratio)
})
.collect();
macro_rules! numeric_field {
($profiles:expr, $field:ident) => {
$profiles
.iter()
.map(|p| match &p.stats {
ColumnStats::Numeric(n) => Some(n.$field),
_ => None,
})
.collect::<Float64Array>()
};
}
macro_rules! numeric_opt_field {
($profiles:expr, $field:ident) => {
$profiles
.iter()
.map(|p| match &p.stats {
ColumnStats::Numeric(n) => n.$field,
_ => None,
})
.collect::<Float64Array>()
};
}
let mins = numeric_field!(profiles, min);
let maxs = numeric_field!(profiles, max);
let means = numeric_field!(profiles, mean);
let std_devs = numeric_field!(profiles, std_dev);
let variances = numeric_field!(profiles, variance);
let medians = numeric_opt_field!(profiles, median);
let modes = numeric_opt_field!(profiles, mode);
let skewnesses = numeric_opt_field!(profiles, skewness);
let kurtoses = numeric_opt_field!(profiles, kurtosis);
let cvs = numeric_opt_field!(profiles, coefficient_of_variation);
let q1s: Float64Array = profiles
.iter()
.map(|p| match &p.stats {
ColumnStats::Numeric(n) => n.quartiles.as_ref().map(|q| q.q1),
_ => None,
})
.collect();
let q2s: Float64Array = profiles
.iter()
.map(|p| match &p.stats {
ColumnStats::Numeric(n) => n.quartiles.as_ref().map(|q| q.q2),
_ => None,
})
.collect();
let q3s: Float64Array = profiles
.iter()
.map(|p| match &p.stats {
ColumnStats::Numeric(n) => n.quartiles.as_ref().map(|q| q.q3),
_ => None,
})
.collect();
let iqrs: Float64Array = profiles
.iter()
.map(|p| match &p.stats {
ColumnStats::Numeric(n) => n.quartiles.as_ref().map(|q| q.iqr),
_ => None,
})
.collect();
let is_approx: BooleanArray = profiles
.iter()
.map(|p| match &p.stats {
ColumnStats::Numeric(n) => n.is_approximate,
_ => None,
})
.collect();
RecordBatch::try_new(
schema,
vec![
Arc::new(names),
Arc::new(types),
Arc::new(totals),
Arc::new(nulls),
Arc::new(null_pcts),
Arc::new(uniques),
Arc::new(unique_ratios),
Arc::new(mins),
Arc::new(maxs),
Arc::new(means),
Arc::new(std_devs),
Arc::new(variances),
Arc::new(medians),
Arc::new(modes),
Arc::new(skewnesses),
Arc::new(kurtoses),
Arc::new(cvs),
Arc::new(q1s),
Arc::new(q2s),
Arc::new(q3s),
Arc::new(iqrs),
Arc::new(is_approx),
],
)
.map_err(|e| anyhow::anyhow!("Failed to create RecordBatch: {}", e))
}
fn detect_dataframe_library(py: Python<'_>, df: &Py<PyAny>) -> PyResult<DataFrameLibrary> {
let bound = df.bind(py);
let type_name = bound.get_type().name()?.to_string();
let module = bound
.get_type()
.getattr("__module__")
.map(|m| m.to_string())
.unwrap_or_default();
if module.starts_with("pandas") || (type_name == "DataFrame" && module.contains("pandas")) {
Ok(DataFrameLibrary::Pandas)
} else if module.starts_with("polars") {
Ok(DataFrameLibrary::Polars)
} else if module.starts_with("pyarrow") {
Ok(DataFrameLibrary::PyArrow)
} else {
Ok(DataFrameLibrary::Custom(format!(
"{}:{}",
module, type_name
)))
}
}
fn estimate_memory_bytes(
py: Python<'_>,
df: &Py<PyAny>,
library: &DataFrameLibrary,
) -> Option<u64> {
let bound = df.bind(py);
match library {
DataFrameLibrary::Pandas => {
bound
.call_method0("memory_usage")
.ok()
.and_then(|usage| usage.call_method0("sum").ok())
.and_then(|val| val.extract::<u64>().ok())
}
DataFrameLibrary::Polars => {
bound
.call_method0("estimated_size")
.ok()
.and_then(|val| val.extract::<u64>().ok())
}
DataFrameLibrary::PyArrow => {
bound
.getattr("nbytes")
.ok()
.and_then(|val| val.extract::<u64>().ok())
}
DataFrameLibrary::Custom(_) => None,
}
}
fn limit_batch_rows(batch: RecordBatch, max_rows: Option<usize>) -> (RecordBatch, bool) {
match max_rows {
Some(limit) if limit < batch.num_rows() => (batch.slice(0, limit), true),
_ => (batch, false),
}
}
fn convert_dataframe_to_batch(
py: Python<'_>,
df: &Py<PyAny>,
source_library: &DataFrameLibrary,
) -> PyResult<RecordBatch> {
let bound = df.bind(py);
match source_library {
DataFrameLibrary::Pandas => convert_pandas_to_batch(py, bound),
DataFrameLibrary::Polars => convert_polars_to_batch(py, bound),
DataFrameLibrary::PyArrow => import_from_pyarrow(py, bound),
DataFrameLibrary::Custom(_) => {
if bound.hasattr("__arrow_c_array__")? {
import_via_pycapsule(py, bound)
} else {
Err(PyTypeError::new_err(format!(
"Unsupported DataFrame type: {}. Must implement Arrow PyCapsule protocol.",
source_library
)))
}
}
}
}
fn convert_pandas_to_batch(py: Python<'_>, df: &Bound<'_, PyAny>) -> PyResult<RecordBatch> {
let pyarrow = py.import("pyarrow").map_err(|_| {
PyRuntimeError::new_err(
"pyarrow required for pandas DataFrames. Install with: pip install pyarrow",
)
})?;
let pa_table = pyarrow
.getattr("Table")?
.call_method1("from_pandas", (df,))?;
let batches = pa_table.call_method0("to_batches")?;
let batch_list: Vec<Py<PyAny>> = batches.extract()?;
if batch_list.is_empty() {
return Err(PyValueError::new_err("DataFrame is empty"));
}
import_via_pycapsule(py, batch_list[0].bind(py))
}
fn convert_polars_to_batch(py: Python<'_>, df: &Bound<'_, PyAny>) -> PyResult<RecordBatch> {
if df.hasattr("to_arrow")? {
let arrow_data = df.call_method0("to_arrow")?;
let batches = arrow_data.call_method0("to_batches")?;
let batch_list: Vec<Py<PyAny>> = batches.extract()?;
if batch_list.is_empty() {
return Err(PyValueError::new_err("DataFrame is empty"));
}
import_via_pycapsule(py, batch_list[0].bind(py))
} else {
Err(PyRuntimeError::new_err(
"polars DataFrame doesn't support Arrow export",
))
}
}
fn import_from_pyarrow(py: Python<'_>, obj: &Bound<'_, PyAny>) -> PyResult<RecordBatch> {
let type_name = obj.get_type().name()?.to_string();
if type_name == "Table" {
let batches = obj.call_method0("to_batches")?;
let batch_list: Vec<Py<PyAny>> = batches.extract()?;
if batch_list.is_empty() {
return Err(PyValueError::new_err("Table is empty"));
}
import_via_pycapsule(py, batch_list[0].bind(py))
} else if type_name == "RecordBatch" {
import_via_pycapsule(py, obj)
} else {
Err(PyTypeError::new_err(format!(
"Expected pyarrow Table or RecordBatch, got {}",
type_name
)))
}
}
fn import_via_pycapsule(py: Python<'_>, obj: &Bound<'_, PyAny>) -> PyResult<RecordBatch> {
if !obj.hasattr("__arrow_c_array__")? {
return Err(PyTypeError::new_err(
"Object does not implement Arrow PyCapsule interface (__arrow_c_array__)",
));
}
let result = obj.call_method1("__arrow_c_array__", (py.None(),))?;
let tuple: (Bound<'_, PyAny>, Bound<'_, PyAny>) = result.extract()?;
let (schema_capsule, array_capsule) = tuple;
let schema_cap: &Bound<'_, PyCapsule> = schema_capsule
.cast()
.map_err(|_| PyTypeError::new_err("Expected PyCapsule for schema"))?;
let array_cap: &Bound<'_, PyCapsule> = array_capsule
.cast()
.map_err(|_| PyTypeError::new_err("Expected PyCapsule for array"))?;
let array_data = unsafe {
#[allow(deprecated)]
let ffi_schema_ptr = schema_cap.pointer() as *mut FFI_ArrowSchema;
#[allow(deprecated)]
let ffi_array_ptr = array_cap.pointer() as *mut FFI_ArrowArray;
if ffi_schema_ptr.is_null() || ffi_array_ptr.is_null() {
return Err(PyRuntimeError::new_err("Null pointer in PyCapsule"));
}
let ffi_array = FFI_ArrowArray::from_raw(ffi_array_ptr);
let ffi_schema = FFI_ArrowSchema::from_raw(ffi_schema_ptr);
arrow::ffi::from_ffi(ffi_array, &ffi_schema)
.map_err(|e| PyRuntimeError::new_err(format!("FFI import failed: {}", e)))?
};
let struct_array = arrow::array::StructArray::from(array_data);
let schema = Arc::new(Schema::new(
struct_array
.fields()
.iter()
.map(|f| f.as_ref().clone())
.collect::<Vec<_>>(),
));
RecordBatch::try_new(schema, struct_array.columns().to_vec())
.map_err(|e| PyRuntimeError::new_err(format!("RecordBatch creation failed: {}", e)))
}