use std::collections::BTreeMap;
use polars_utils::pl_str::PlSmallStr;
#[cfg(feature = "python")]
use polars_utils::python_function::PythonObject;
#[derive(Clone, PartialEq, Debug, Eq, Hash)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]
#[cfg_attr(feature = "python", derive(pyo3::IntoPyObject, pyo3::FromPyObject))]
pub struct IcebergSinkState {
pub py_catalog_class_module: PlSmallStr,
pub py_catalog_class_qualname: PlSmallStr,
pub catalog_name: PlSmallStr,
pub catalog_properties: BTreeMap<PlSmallStr, PlSmallStr>,
pub table_name: PlSmallStr,
pub mode: IcebergCommitMode,
pub iceberg_storage_properties: BTreeMap<PlSmallStr, PlSmallStr>,
pub sink_uuid_str: String,
#[cfg(feature = "python")]
pub table_: Option<PythonObject>,
#[cfg(feature = "python")]
pub commit_result_df: Option<PythonObject>, }
#[derive(Copy, Clone, PartialEq, Debug, Eq, Hash)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]
pub enum IcebergCommitMode {
Append,
Overwrite,
}
#[cfg(feature = "python")]
mod _python_impl {
use std::convert::Infallible;
use pyo3::exceptions::PyValueError;
use pyo3::pybacked::PyBackedStr;
use pyo3::types::PyString;
use pyo3::{Borrowed, Bound, FromPyObject, IntoPyObject, Py, PyAny, PyErr, PyResult, Python};
use super::{IcebergCommitMode, IcebergSinkState};
impl IcebergSinkState {
pub(crate) fn into_sink_state_obj(self) -> PyResult<Py<PyAny>> {
Python::attach(|py| {
polars_utils::python_convert_registry::get_python_convert_registry()
.py_iceberg_sink_state_class()
.call(py, (), Some(&self.into_pyobject(py)?))
})
}
}
impl<'py> IntoPyObject<'py> for IcebergCommitMode {
type Target = PyString;
type Output = Bound<'py, Self::Target>;
type Error = Infallible;
fn into_pyobject(self, py: pyo3::Python<'py>) -> Result<Self::Output, Self::Error> {
match self {
Self::Append => "append",
Self::Overwrite => "overwrite",
}
.into_pyobject(py)
}
}
impl<'a, 'py> FromPyObject<'a, 'py> for IcebergCommitMode {
type Error = PyErr;
fn extract(ob: Borrowed<'a, 'py, PyAny>) -> PyResult<Self> {
Ok(match &*ob.extract::<PyBackedStr>()? {
"append" => Self::Append,
"overwrite" => Self::Overwrite,
v => {
return Err(PyValueError::new_err(format!(
"invalid iceberg commit mode: '{v}'"
)));
},
})
}
}
}