datafusion_loki/
codec.rs

1use std::sync::Arc;
2
3use datafusion::{
4    common::{internal_datafusion_err, internal_err, not_impl_err},
5    execution::FunctionRegistry,
6    physical_plan::ExecutionPlan,
7};
8use datafusion_proto::physical_plan::PhysicalExtensionCodec;
9use prost::Message;
10
11use crate::{DFResult, LokiLogInsertExec, LokiLogScanExec, protobuf};
12
13#[derive(Debug, Clone)]
14pub struct LokiPhysicalCodec;
15
16impl PhysicalExtensionCodec for LokiPhysicalCodec {
17    fn try_decode(
18        &self,
19        buf: &[u8],
20        inputs: &[Arc<dyn ExecutionPlan>],
21        _registry: &dyn FunctionRegistry,
22    ) -> DFResult<Arc<dyn ExecutionPlan>> {
23        let loki_node = protobuf::LokiPhysicalPlanNode::decode(buf).map_err(|e| {
24            internal_datafusion_err!("Failed to decode loki physical plan node: {e:?}")
25        })?;
26        let loki_plan = loki_node.loki_physical_plan_type.ok_or_else(|| {
27            internal_datafusion_err!(
28                "Failed to decode loki physical plan node due to physical plan type is none"
29            )
30        })?;
31
32        match loki_plan {
33            protobuf::loki_physical_plan_node::LokiPhysicalPlanType::Scan(proto) => {
34                let projection = parse_projection(proto.projection.as_ref());
35                let exec = LokiLogScanExec::try_new(
36                    proto.endpoint,
37                    proto.log_query,
38                    proto.start,
39                    proto.end,
40                    projection,
41                    proto.limit.map(|l| l as usize),
42                )?;
43                Ok(Arc::new(exec))
44            }
45            protobuf::loki_physical_plan_node::LokiPhysicalPlanType::Insert(proto) => {
46                if inputs.len() != 1 {
47                    return internal_err!("LokiLogInsertExec only support one input");
48                }
49
50                let input = inputs[0].clone();
51                let exec = LokiLogInsertExec::try_new(input, proto.endpoint)?;
52                Ok(Arc::new(exec))
53            }
54        }
55    }
56
57    fn try_encode(&self, node: Arc<dyn ExecutionPlan>, buf: &mut Vec<u8>) -> DFResult<()> {
58        if let Some(exec) = node.as_any().downcast_ref::<LokiLogScanExec>() {
59            let projection = serialize_projection(exec.projection.as_ref());
60
61            let proto = protobuf::LokiPhysicalPlanNode {
62                loki_physical_plan_type: Some(
63                    protobuf::loki_physical_plan_node::LokiPhysicalPlanType::Scan(
64                        protobuf::LokiLogScanExec {
65                            endpoint: exec.endpoint.clone(),
66                            log_query: exec.log_query.clone(),
67                            start: exec.start,
68                            end: exec.end,
69                            projection,
70                            limit: exec.limit.map(|l| l as i32),
71                        },
72                    ),
73                ),
74            };
75
76            proto.encode(buf).map_err(|e| {
77                internal_datafusion_err!("Failed to encode loki log scan exec plan: {e:?}")
78            })?;
79            Ok(())
80        } else if let Some(exec) = node.as_any().downcast_ref::<LokiLogInsertExec>() {
81            let proto = protobuf::LokiPhysicalPlanNode {
82                loki_physical_plan_type: Some(
83                    protobuf::loki_physical_plan_node::LokiPhysicalPlanType::Insert(
84                        protobuf::LokiLogInsertExec {
85                            endpoint: exec.endpoint.clone(),
86                        },
87                    ),
88                ),
89            };
90
91            proto.encode(buf).map_err(|e| {
92                internal_datafusion_err!("Failed to encode loki log scan exec plan: {e:?}")
93            })?;
94            Ok(())
95        } else {
96            not_impl_err!(
97                "LokiPhysicalCodec does not support encoding {}",
98                node.name()
99            )
100        }
101    }
102}
103
104fn serialize_projection(projection: Option<&Vec<usize>>) -> Option<protobuf::Projection> {
105    projection.map(|p| protobuf::Projection {
106        projection: p.iter().map(|n| *n as u32).collect(),
107    })
108}
109
110fn parse_projection(projection: Option<&protobuf::Projection>) -> Option<Vec<usize>> {
111    projection.map(|p| p.projection.iter().map(|n| *n as usize).collect())
112}