1use std::sync::Arc;
2
3use datafusion_common::{internal_datafusion_err, internal_err, not_impl_err};
4use datafusion_execution::TaskContext;
5use datafusion_physical_plan::ExecutionPlan;
6use datafusion_proto::physical_plan::PhysicalExtensionCodec;
7use prost::Message;
8
9use crate::{DFResult, LokiLogInsertExec, LokiLogScanExec, protobuf};
10
11#[derive(Debug, Clone)]
12pub struct LokiPhysicalCodec;
13
14impl PhysicalExtensionCodec for LokiPhysicalCodec {
15 fn try_decode(
16 &self,
17 buf: &[u8],
18 inputs: &[Arc<dyn ExecutionPlan>],
19 _context: &TaskContext,
20 ) -> DFResult<Arc<dyn ExecutionPlan>> {
21 let loki_node = protobuf::LokiPhysicalPlanNode::decode(buf).map_err(|e| {
22 internal_datafusion_err!("Failed to decode loki physical plan node: {e:?}")
23 })?;
24 let loki_plan = loki_node.loki_physical_plan_type.ok_or_else(|| {
25 internal_datafusion_err!(
26 "Failed to decode loki physical plan node due to physical plan type is none"
27 )
28 })?;
29
30 match loki_plan {
31 protobuf::loki_physical_plan_node::LokiPhysicalPlanType::Scan(proto) => {
32 let projection = parse_projection(proto.projection.as_ref());
33 let exec = LokiLogScanExec::try_new(
34 proto.endpoint,
35 proto.log_query,
36 proto.start,
37 proto.end,
38 projection,
39 proto.limit.map(|l| l as usize),
40 )?;
41 Ok(Arc::new(exec))
42 }
43 protobuf::loki_physical_plan_node::LokiPhysicalPlanType::Insert(proto) => {
44 if inputs.len() != 1 {
45 return internal_err!("LokiLogInsertExec only support one input");
46 }
47
48 let input = inputs[0].clone();
49 let exec = LokiLogInsertExec::try_new(input, proto.endpoint)?;
50 Ok(Arc::new(exec))
51 }
52 }
53 }
54
55 fn try_encode(&self, node: Arc<dyn ExecutionPlan>, buf: &mut Vec<u8>) -> DFResult<()> {
56 if let Some(exec) = node.as_any().downcast_ref::<LokiLogScanExec>() {
57 let projection = serialize_projection(exec.projection.as_ref());
58
59 let proto = protobuf::LokiPhysicalPlanNode {
60 loki_physical_plan_type: Some(
61 protobuf::loki_physical_plan_node::LokiPhysicalPlanType::Scan(
62 protobuf::LokiLogScanExec {
63 endpoint: exec.endpoint.clone(),
64 log_query: exec.log_query.clone(),
65 start: exec.start,
66 end: exec.end,
67 projection,
68 limit: exec.limit.map(|l| l as i32),
69 },
70 ),
71 ),
72 };
73
74 proto.encode(buf).map_err(|e| {
75 internal_datafusion_err!("Failed to encode loki log scan exec plan: {e:?}")
76 })?;
77 Ok(())
78 } else if let Some(exec) = node.as_any().downcast_ref::<LokiLogInsertExec>() {
79 let proto = protobuf::LokiPhysicalPlanNode {
80 loki_physical_plan_type: Some(
81 protobuf::loki_physical_plan_node::LokiPhysicalPlanType::Insert(
82 protobuf::LokiLogInsertExec {
83 endpoint: exec.endpoint.clone(),
84 },
85 ),
86 ),
87 };
88
89 proto.encode(buf).map_err(|e| {
90 internal_datafusion_err!("Failed to encode loki log scan exec plan: {e:?}")
91 })?;
92 Ok(())
93 } else {
94 not_impl_err!(
95 "LokiPhysicalCodec does not support encoding {}",
96 node.name()
97 )
98 }
99 }
100}
101
102fn serialize_projection(projection: Option<&Vec<usize>>) -> Option<protobuf::Projection> {
103 projection.map(|p| protobuf::Projection {
104 projection: p.iter().map(|n| *n as u32).collect(),
105 })
106}
107
108fn parse_projection(projection: Option<&protobuf::Projection>) -> Option<Vec<usize>> {
109 projection.map(|p| p.projection.iter().map(|n| *n as usize).collect())
110}