1use std::sync::Arc;
2
3use datafusion::{
4 common::{internal_datafusion_err, internal_err, not_impl_err},
5 execution::FunctionRegistry,
6 physical_plan::ExecutionPlan,
7};
8use datafusion_proto::physical_plan::PhysicalExtensionCodec;
9use prost::Message;
10
11use crate::{DFResult, LokiLogInsertExec, LokiLogScanExec, protobuf};
12
13#[derive(Debug, Clone)]
14pub struct LokiPhysicalCodec;
15
16impl PhysicalExtensionCodec for LokiPhysicalCodec {
17 fn try_decode(
18 &self,
19 buf: &[u8],
20 inputs: &[Arc<dyn ExecutionPlan>],
21 _registry: &dyn FunctionRegistry,
22 ) -> DFResult<Arc<dyn ExecutionPlan>> {
23 let loki_node = protobuf::LokiPhysicalPlanNode::decode(buf).map_err(|e| {
24 internal_datafusion_err!("Failed to decode loki physical plan node: {e:?}")
25 })?;
26 let loki_plan = loki_node.loki_physical_plan_type.ok_or_else(|| {
27 internal_datafusion_err!(
28 "Failed to decode loki physical plan node due to physical plan type is none"
29 )
30 })?;
31
32 match loki_plan {
33 protobuf::loki_physical_plan_node::LokiPhysicalPlanType::Scan(proto) => {
34 let projection = parse_projection(proto.projection.as_ref());
35 let exec = LokiLogScanExec::try_new(
36 proto.endpoint,
37 proto.log_query,
38 proto.start,
39 proto.end,
40 projection,
41 proto.limit.map(|l| l as usize),
42 )?;
43 Ok(Arc::new(exec))
44 }
45 protobuf::loki_physical_plan_node::LokiPhysicalPlanType::Insert(proto) => {
46 if inputs.len() != 1 {
47 return internal_err!("LokiLogInsertExec only support one input");
48 }
49
50 let input = inputs[0].clone();
51 let exec = LokiLogInsertExec::try_new(input, proto.endpoint)?;
52 Ok(Arc::new(exec))
53 }
54 }
55 }
56
57 fn try_encode(&self, node: Arc<dyn ExecutionPlan>, buf: &mut Vec<u8>) -> DFResult<()> {
58 if let Some(exec) = node.as_any().downcast_ref::<LokiLogScanExec>() {
59 let projection = serialize_projection(exec.projection.as_ref());
60
61 let proto = protobuf::LokiPhysicalPlanNode {
62 loki_physical_plan_type: Some(
63 protobuf::loki_physical_plan_node::LokiPhysicalPlanType::Scan(
64 protobuf::LokiLogScanExec {
65 endpoint: exec.endpoint.clone(),
66 log_query: exec.log_query.clone(),
67 start: exec.start,
68 end: exec.end,
69 projection,
70 limit: exec.limit.map(|l| l as i32),
71 },
72 ),
73 ),
74 };
75
76 proto.encode(buf).map_err(|e| {
77 internal_datafusion_err!("Failed to encode loki log scan exec plan: {e:?}")
78 })?;
79 Ok(())
80 } else if let Some(exec) = node.as_any().downcast_ref::<LokiLogInsertExec>() {
81 let proto = protobuf::LokiPhysicalPlanNode {
82 loki_physical_plan_type: Some(
83 protobuf::loki_physical_plan_node::LokiPhysicalPlanType::Insert(
84 protobuf::LokiLogInsertExec {
85 endpoint: exec.endpoint.clone(),
86 },
87 ),
88 ),
89 };
90
91 proto.encode(buf).map_err(|e| {
92 internal_datafusion_err!("Failed to encode loki log scan exec plan: {e:?}")
93 })?;
94 Ok(())
95 } else {
96 not_impl_err!(
97 "LokiPhysicalCodec does not support encoding {}",
98 node.name()
99 )
100 }
101 }
102}
103
104fn serialize_projection(projection: Option<&Vec<usize>>) -> Option<protobuf::Projection> {
105 projection.map(|p| protobuf::Projection {
106 projection: p.iter().map(|n| *n as u32).collect(),
107 })
108}
109
110fn parse_projection(projection: Option<&protobuf::Projection>) -> Option<Vec<usize>> {
111 projection.map(|p| p.projection.iter().map(|n| *n as usize).collect())
112}