#[cfg(feature = "plugin")]
use arrow::{
compute,
pyarrow::{FromPyArrow, ToPyArrow},
record_batch::RecordBatch,
};
use once_cell::sync::OnceCell;
#[cfg(feature = "plugin")]
use pyo3::{
exceptions::PyRuntimeError,
types::{IntoPyDict, PyAnyMethods, PyDict, PyModule},
Bound, {Py, PyAny, PyResult, Python},
};
#[cfg(feature = "plugin")]
use std::collections::HashMap;
#[cfg(feature = "plugin")]
use std::ffi::CString;
#[cfg(feature = "plugin")]
use crate::data_source::schema::DataSourceSchema;
#[cfg(feature = "plugin")]
use crate::plugin::convert_py_data::append_to_py_dict;
use crate::plugin::{init_python, plugin_map::PluginMap};
#[cfg(feature = "plugin")]
use crate::response::http_error::ResponseError;
#[cfg(feature = "plugin")]
use crate::settings::Settings;
pub static PLUGIN_MANAGER: OnceCell<PluginManager> = OnceCell::new();
#[derive(Debug)]
pub struct PluginManager {
#[allow(dead_code)]
pub plugin_map: PluginMap,
}
impl PluginManager {
pub fn new() -> anyhow::Result<Self> {
init_python::py_init()?;
let plugin_map = PluginMap::new()?;
Ok(Self { plugin_map })
}
#[cfg(feature = "plugin")]
pub fn global() -> &'static PluginManager {
PLUGIN_MANAGER
.get()
.expect("Can not initialize plugin manager")
}
#[cfg(feature = "plugin")]
pub fn registered_schemes(&self) -> Vec<String> {
self.plugin_map.scheme_py_map.keys().cloned().collect()
}
#[cfg(feature = "plugin")]
#[allow(clippy::unused_self)]
pub fn py_interpreter_info(&self) -> String {
Python::attach(|py| -> PyResult<String> {
let sys = PyModule::import(py, "sys")?;
sys.getattr("version")?.extract()
})
.unwrap_or("Unknown".to_string())
}
#[cfg(feature = "plugin")]
#[allow(clippy::too_many_arguments)]
pub fn py_connector_exec(
&self,
format: &str,
scheme: &str,
authority: &str,
path: Option<&str>,
plugin_options: &serde_json::Value,
datasource_schema: Option<&DataSourceSchema>,
query: Option<HashMap<String, String>>,
) -> Result<Py<PyAny>, ResponseError> {
let (py_file, entry, _version) =
self.plugin_map.scheme_py_map.get(scheme).ok_or_else(|| {
ResponseError::unsupported_type(format!(
"Unsupported scheme '{scheme}', plugin not implemented."
))
})?;
let py_code = std::fs::read_to_string(py_file)
.map_err(|e| ResponseError::internal_server_error(e.to_string()))?;
let result = Python::attach(|py| -> PyResult<Py<PyAny>> {
let code_cstr = CString::new(py_code.as_str())?;
let file_name_cstr = CString::new(py_file.to_string_lossy().as_ref())?;
let module_name_cstr = CString::new(
py_file
.file_stem()
.and_then(|s| s.to_str())
.unwrap_or_default(),
)?;
let py_func: Py<PyAny> = PyModule::from_code(
py,
code_cstr.as_c_str(),
file_name_cstr.as_c_str(),
module_name_cstr.as_c_str(),
)
.inspect_err(|e| {
e.print(py);
})?
.getattr(entry.as_str())?
.into();
let arrow_schema = if let Some(schema) = &datasource_schema {
Some(
schema
.to_arrow_schema()
.map_err(|e| {
PyRuntimeError::new_err(format!("failed to build arrow schema: {e}"))
})?
.to_pyarrow(py)?,
)
} else {
None
};
let kwargs = if let Some(query) = query {
query.into_py_dict(py)
} else {
Ok(PyDict::new(py))
}?;
append_to_py_dict(py, &[plugin_options, &self.system_info()], &kwargs)?;
log::debug!(
"Call py func {} with args {:?}, {:?}, {:?}, {:?}, {:?}",
entry.as_str(),
format,
authority,
path,
arrow_schema,
kwargs
);
py_func.call(py, (format, authority, path, arrow_schema), Some(&kwargs))
})
.inspect_err(|e| {
Python::attach(|py| {
e.print(py);
});
})
.map_err(|e| ResponseError::python_interpreter_error(e.to_string()))?;
Ok(result)
}
#[cfg(feature = "plugin")]
pub fn py_processor_exec(
&self,
record_batches: &[RecordBatch],
module: &str,
plugin_options: &serde_json::Value,
) -> Result<Vec<RecordBatch>, ResponseError> {
if record_batches.is_empty() {
return Err(ResponseError::request_validation(
"Empty record batches array",
));
}
let (py_file, entry, _version) =
self.plugin_map
.processor_py_map
.get(module)
.ok_or_else(|| {
ResponseError::unsupported_type(format!(
"Not exists '{module}', plugin not implemented."
))
})?;
let py_code = std::fs::read_to_string(py_file)
.map_err(|e| ResponseError::internal_server_error(e.to_string()))?;
let record_batch = compute::concat_batches(&record_batches[0].schema(), record_batches)?;
let result = Python::attach(|py| -> PyResult<Vec<RecordBatch>> {
let code_cstr = CString::new(py_code.as_str())?;
let file_name_cstr = CString::new(py_file.to_string_lossy().as_ref())?;
let module_name_cstr = CString::new(
py_file
.file_stem()
.and_then(|s| s.to_str())
.unwrap_or_default(),
)?;
let module = PyModule::from_code(
py,
code_cstr.as_c_str(),
file_name_cstr.as_c_str(),
module_name_cstr.as_c_str(),
)?;
let py_func: Py<PyAny> = module.getattr(entry.as_str())?.into();
let pyarrow_obj = record_batch.to_pyarrow(py)?;
let kwargs = PyDict::new(py);
append_to_py_dict(py, &[plugin_options, &self.system_info()], &kwargs)?;
log::debug!("Call py func {} with args {:?}", entry.as_str(), kwargs);
let args = (pyarrow_obj,);
let result = py_func.call(py, args, Some(&kwargs))?;
self.to_record_batches(result.bind(py))
})
.map_err(|e| ResponseError::python_interpreter_error(e.to_string()))?;
Ok(result)
}
#[cfg(feature = "plugin")]
#[allow(clippy::unused_self)]
pub fn to_record_batches(&self, value: &Bound<'_, PyAny>) -> PyResult<Vec<RecordBatch>> {
Ok(vec![RecordBatch::from_pyarrow_bound(value)?])
}
#[cfg(feature = "plugin")]
#[allow(clippy::unused_self)]
pub fn system_info(&self) -> serde_json::Value {
serde_json::json!({
"system_config": {
"version": env!("CARGO_PKG_VERSION"),
"log_level": &Settings::global().log.level,
"data_dir": &Settings::global().server.data_dir,
"plugin_dir": &Settings::global().server.plugin_dir,
}
})
}
}