datafusion-python 53.0.0

Apache DataFusion DataFrame and SQL Query Engine
Documentation
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

use datafusion_python_util::wait_for_future;
use datafusion_substrait::logical_plan::{consumer, producer};
use datafusion_substrait::serializer;
use datafusion_substrait::substrait::proto::Plan;
use prost::Message;
use pyo3::prelude::*;
use pyo3::types::PyBytes;

use crate::context::PySessionContext;
use crate::errors::{PyDataFusionError, PyDataFusionResult, py_datafusion_err, to_datafusion_err};
use crate::sql::logical::PyLogicalPlan;

#[pyclass(
    from_py_object,
    frozen,
    name = "Plan",
    module = "datafusion.substrait",
    subclass
)]
#[derive(Debug, Clone)]
pub struct PyPlan {
    pub plan: Plan,
}

#[pymethods]
impl PyPlan {
    fn encode(&self, py: Python) -> PyResult<Py<PyAny>> {
        let mut proto_bytes = Vec::<u8>::new();
        self.plan
            .encode(&mut proto_bytes)
            .map_err(PyDataFusionError::EncodeError)?;
        Ok(PyBytes::new(py, &proto_bytes).into())
    }

    /// Get the JSON representation of the substrait plan
    fn to_json(&self) -> PyDataFusionResult<String> {
        let json = serde_json::to_string_pretty(&self.plan).map_err(to_datafusion_err)?;
        Ok(json)
    }

    /// Parse a Substrait Plan from its JSON representation
    #[staticmethod]
    fn from_json(json: &str) -> PyDataFusionResult<PyPlan> {
        let plan: Plan = serde_json::from_str(json).map_err(to_datafusion_err)?;
        Ok(PyPlan { plan })
    }
}

impl From<PyPlan> for Plan {
    fn from(plan: PyPlan) -> Plan {
        plan.plan
    }
}

impl From<Plan> for PyPlan {
    fn from(plan: Plan) -> PyPlan {
        PyPlan { plan }
    }
}

/// A PySubstraitSerializer is a representation of a Serializer that is capable of both serializing
/// a `LogicalPlan` instance to Substrait Protobuf bytes and also deserialize Substrait Protobuf bytes
/// to a valid `LogicalPlan` instance.
#[pyclass(
    from_py_object,
    frozen,
    name = "Serde",
    module = "datafusion.substrait",
    subclass
)]
#[derive(Debug, Clone)]
pub struct PySubstraitSerializer;

#[pymethods]
impl PySubstraitSerializer {
    #[staticmethod]
    pub fn serialize(
        sql: &str,
        ctx: PySessionContext,
        path: &str,
        py: Python,
    ) -> PyDataFusionResult<()> {
        wait_for_future(py, serializer::serialize(sql, &ctx.ctx, path))??;
        Ok(())
    }

    #[staticmethod]
    pub fn serialize_to_plan(
        sql: &str,
        ctx: PySessionContext,
        py: Python,
    ) -> PyDataFusionResult<PyPlan> {
        PySubstraitSerializer::serialize_bytes(sql, ctx, py).and_then(|proto_bytes| {
            let proto_bytes = proto_bytes.bind(py).cast::<PyBytes>().unwrap();
            PySubstraitSerializer::deserialize_bytes(proto_bytes.as_bytes().to_vec(), py)
        })
    }

    #[staticmethod]
    pub fn serialize_bytes(
        sql: &str,
        ctx: PySessionContext,
        py: Python,
    ) -> PyDataFusionResult<Py<PyAny>> {
        let proto_bytes: Vec<u8> =
            wait_for_future(py, serializer::serialize_bytes(sql, &ctx.ctx))??;
        Ok(PyBytes::new(py, &proto_bytes).into())
    }

    #[staticmethod]
    pub fn deserialize(path: &str, py: Python) -> PyDataFusionResult<PyPlan> {
        let plan = wait_for_future(py, serializer::deserialize(path))??;
        Ok(PyPlan { plan: *plan })
    }

    #[staticmethod]
    pub fn deserialize_bytes(proto_bytes: Vec<u8>, py: Python) -> PyDataFusionResult<PyPlan> {
        let plan = wait_for_future(py, serializer::deserialize_bytes(proto_bytes))??;
        Ok(PyPlan { plan: *plan })
    }
}

#[pyclass(
    from_py_object,
    frozen,
    name = "Producer",
    module = "datafusion.substrait",
    subclass
)]
#[derive(Debug, Clone)]
pub struct PySubstraitProducer;

#[pymethods]
impl PySubstraitProducer {
    /// Convert DataFusion LogicalPlan to Substrait Plan
    #[staticmethod]
    pub fn to_substrait_plan(plan: PyLogicalPlan, ctx: &PySessionContext) -> PyResult<PyPlan> {
        let session_state = ctx.ctx.state();
        match producer::to_substrait_plan(&plan.plan, &session_state) {
            Ok(plan) => Ok(PyPlan { plan: *plan }),
            Err(e) => Err(py_datafusion_err(e)),
        }
    }
}

#[pyclass(
    from_py_object,
    frozen,
    name = "Consumer",
    module = "datafusion.substrait",
    subclass
)]
#[derive(Debug, Clone)]
pub struct PySubstraitConsumer;

#[pymethods]
impl PySubstraitConsumer {
    /// Convert Substrait Plan to DataFusion DataFrame
    #[staticmethod]
    pub fn from_substrait_plan(
        ctx: &PySessionContext,
        plan: PyPlan,
        py: Python,
    ) -> PyDataFusionResult<PyLogicalPlan> {
        let session_state = ctx.ctx.state();
        let result = consumer::from_substrait_plan(&session_state, &plan.plan);
        let logical_plan = wait_for_future(py, result)??;
        Ok(PyLogicalPlan::new(logical_plan))
    }
}

pub fn init_module(m: &Bound<'_, PyModule>) -> PyResult<()> {
    m.add_class::<PyPlan>()?;
    m.add_class::<PySubstraitConsumer>()?;
    m.add_class::<PySubstraitProducer>()?;
    m.add_class::<PySubstraitSerializer>()?;
    Ok(())
}