datafusion_loki/
codec.rs

1use std::sync::Arc;
2
3use datafusion_common::{internal_datafusion_err, internal_err, not_impl_err};
4use datafusion_execution::TaskContext;
5use datafusion_physical_plan::ExecutionPlan;
6use datafusion_proto::physical_plan::PhysicalExtensionCodec;
7use prost::Message;
8
9use crate::{DFResult, LokiLogInsertExec, LokiLogScanExec, protobuf};
10
11#[derive(Debug, Clone)]
12pub struct LokiPhysicalCodec;
13
14impl PhysicalExtensionCodec for LokiPhysicalCodec {
15    fn try_decode(
16        &self,
17        buf: &[u8],
18        inputs: &[Arc<dyn ExecutionPlan>],
19        _context: &TaskContext,
20    ) -> DFResult<Arc<dyn ExecutionPlan>> {
21        let loki_node = protobuf::LokiPhysicalPlanNode::decode(buf).map_err(|e| {
22            internal_datafusion_err!("Failed to decode loki physical plan node: {e:?}")
23        })?;
24        let loki_plan = loki_node.loki_physical_plan_type.ok_or_else(|| {
25            internal_datafusion_err!(
26                "Failed to decode loki physical plan node due to physical plan type is none"
27            )
28        })?;
29
30        match loki_plan {
31            protobuf::loki_physical_plan_node::LokiPhysicalPlanType::Scan(proto) => {
32                let projection = parse_projection(proto.projection.as_ref());
33                let exec = LokiLogScanExec::try_new(
34                    proto.endpoint,
35                    proto.log_query,
36                    proto.start,
37                    proto.end,
38                    projection,
39                    proto.limit.map(|l| l as usize),
40                )?;
41                Ok(Arc::new(exec))
42            }
43            protobuf::loki_physical_plan_node::LokiPhysicalPlanType::Insert(proto) => {
44                if inputs.len() != 1 {
45                    return internal_err!("LokiLogInsertExec only support one input");
46                }
47
48                let input = inputs[0].clone();
49                let exec = LokiLogInsertExec::try_new(input, proto.endpoint)?;
50                Ok(Arc::new(exec))
51            }
52        }
53    }
54
55    fn try_encode(&self, node: Arc<dyn ExecutionPlan>, buf: &mut Vec<u8>) -> DFResult<()> {
56        if let Some(exec) = node.as_any().downcast_ref::<LokiLogScanExec>() {
57            let projection = serialize_projection(exec.projection.as_ref());
58
59            let proto = protobuf::LokiPhysicalPlanNode {
60                loki_physical_plan_type: Some(
61                    protobuf::loki_physical_plan_node::LokiPhysicalPlanType::Scan(
62                        protobuf::LokiLogScanExec {
63                            endpoint: exec.endpoint.clone(),
64                            log_query: exec.log_query.clone(),
65                            start: exec.start,
66                            end: exec.end,
67                            projection,
68                            limit: exec.limit.map(|l| l as i32),
69                        },
70                    ),
71                ),
72            };
73
74            proto.encode(buf).map_err(|e| {
75                internal_datafusion_err!("Failed to encode loki log scan exec plan: {e:?}")
76            })?;
77            Ok(())
78        } else if let Some(exec) = node.as_any().downcast_ref::<LokiLogInsertExec>() {
79            let proto = protobuf::LokiPhysicalPlanNode {
80                loki_physical_plan_type: Some(
81                    protobuf::loki_physical_plan_node::LokiPhysicalPlanType::Insert(
82                        protobuf::LokiLogInsertExec {
83                            endpoint: exec.endpoint.clone(),
84                        },
85                    ),
86                ),
87            };
88
89            proto.encode(buf).map_err(|e| {
90                internal_datafusion_err!("Failed to encode loki log scan exec plan: {e:?}")
91            })?;
92            Ok(())
93        } else {
94            not_impl_err!(
95                "LokiPhysicalCodec does not support encoding {}",
96                node.name()
97            )
98        }
99    }
100}
101
102fn serialize_projection(projection: Option<&Vec<usize>>) -> Option<protobuf::Projection> {
103    projection.map(|p| protobuf::Projection {
104        projection: p.iter().map(|n| *n as u32).collect(),
105    })
106}
107
108fn parse_projection(projection: Option<&protobuf::Projection>) -> Option<Vec<usize>> {
109    projection.map(|p| p.projection.iter().map(|n| *n as usize).collect())
110}