datafusion-loki 0.4.0

A DataFusion table provider for querying Loki data
Documentation
use std::sync::Arc;

use datafusion_common::{internal_datafusion_err, internal_err, not_impl_err};
use datafusion_execution::TaskContext;
use datafusion_physical_plan::ExecutionPlan;
use datafusion_proto::physical_plan::PhysicalExtensionCodec;
use prost::Message;

use crate::{DFResult, LokiLogInsertExec, LokiLogScanExec, protobuf};

#[derive(Debug, Clone)]
pub struct LokiPhysicalCodec;

impl PhysicalExtensionCodec for LokiPhysicalCodec {
    fn try_decode(
        &self,
        buf: &[u8],
        inputs: &[Arc<dyn ExecutionPlan>],
        _context: &TaskContext,
    ) -> DFResult<Arc<dyn ExecutionPlan>> {
        let loki_node = protobuf::LokiPhysicalPlanNode::decode(buf).map_err(|e| {
            internal_datafusion_err!("Failed to decode loki physical plan node: {e:?}")
        })?;
        let loki_plan = loki_node.loki_physical_plan_type.ok_or_else(|| {
            internal_datafusion_err!(
                "Failed to decode loki physical plan node due to physical plan type is none"
            )
        })?;

        match loki_plan {
            protobuf::loki_physical_plan_node::LokiPhysicalPlanType::Scan(proto) => {
                let projection = parse_projection(proto.projection.as_ref());
                let exec = LokiLogScanExec::try_new(
                    proto.endpoint,
                    proto.log_query,
                    proto.start,
                    proto.end,
                    projection,
                    proto.limit.map(|l| l as usize),
                )?;
                Ok(Arc::new(exec))
            }
            protobuf::loki_physical_plan_node::LokiPhysicalPlanType::Insert(proto) => {
                if inputs.len() != 1 {
                    return internal_err!("LokiLogInsertExec only support one input");
                }

                let input = inputs[0].clone();
                let exec = LokiLogInsertExec::try_new(input, proto.endpoint)?;
                Ok(Arc::new(exec))
            }
        }
    }

    fn try_encode(&self, node: Arc<dyn ExecutionPlan>, buf: &mut Vec<u8>) -> DFResult<()> {
        if let Some(exec) = node.as_any().downcast_ref::<LokiLogScanExec>() {
            let projection = serialize_projection(exec.projection.as_ref());

            let proto = protobuf::LokiPhysicalPlanNode {
                loki_physical_plan_type: Some(
                    protobuf::loki_physical_plan_node::LokiPhysicalPlanType::Scan(
                        protobuf::LokiLogScanExec {
                            endpoint: exec.endpoint.clone(),
                            log_query: exec.log_query.clone(),
                            start: exec.start,
                            end: exec.end,
                            projection,
                            limit: exec.limit.map(|l| l as i32),
                        },
                    ),
                ),
            };

            proto.encode(buf).map_err(|e| {
                internal_datafusion_err!("Failed to encode loki log scan exec plan: {e:?}")
            })?;
            Ok(())
        } else if let Some(exec) = node.as_any().downcast_ref::<LokiLogInsertExec>() {
            let proto = protobuf::LokiPhysicalPlanNode {
                loki_physical_plan_type: Some(
                    protobuf::loki_physical_plan_node::LokiPhysicalPlanType::Insert(
                        protobuf::LokiLogInsertExec {
                            endpoint: exec.endpoint.clone(),
                        },
                    ),
                ),
            };

            proto.encode(buf).map_err(|e| {
                internal_datafusion_err!("Failed to encode loki log scan exec plan: {e:?}")
            })?;
            Ok(())
        } else {
            not_impl_err!(
                "LokiPhysicalCodec does not support encoding {}",
                node.name()
            )
        }
    }
}

fn serialize_projection(projection: Option<&Vec<usize>>) -> Option<protobuf::Projection> {
    projection.map(|p| protobuf::Projection {
        projection: p.iter().map(|n| *n as u32).collect(),
    })
}

fn parse_projection(projection: Option<&protobuf::Projection>) -> Option<Vec<usize>> {
    projection.map(|p| p.projection.iter().map(|n| *n as usize).collect())
}