1use std::sync::Arc;
2
3use datafusion::{
4 arrow::{
5 array::RecordBatch,
6 datatypes::Schema,
7 ipc::{reader::StreamReader, writer::StreamWriter},
8 },
9 catalog::memory::{DataSourceExec, MemorySourceConfig},
10 common::{internal_datafusion_err, internal_err, not_impl_err},
11 datasource::source::DataSource,
12 error::DataFusionError,
13 execution::FunctionRegistry,
14 physical_expr::LexOrdering,
15 physical_plan::ExecutionPlan,
16};
17use datafusion_proto::physical_plan::{
18 PhysicalExtensionCodec, from_proto::parse_physical_sort_exprs,
19 to_proto::serialize_physical_sort_exprs,
20};
21use prost::Message;
22
23use crate::{DFResult, LokiLogInsertExec, LokiLogScanExec, protobuf};
24
25#[derive(Debug, Clone)]
26pub struct LokiPhysicalCodec;
27
28impl PhysicalExtensionCodec for LokiPhysicalCodec {
29 fn try_decode(
30 &self,
31 buf: &[u8],
32 inputs: &[Arc<dyn ExecutionPlan>],
33 registry: &dyn FunctionRegistry,
34 ) -> DFResult<Arc<dyn ExecutionPlan>> {
35 let loki_node = protobuf::LokiPhysicalPlanNode::decode(buf).map_err(|e| {
36 internal_datafusion_err!("Failed to decode loki physical plan node: {e:?}")
37 })?;
38 let loki_plan = loki_node.loki_physical_plan_type.ok_or_else(|| {
39 internal_datafusion_err!(
40 "Failed to decode loki physical plan node due to physical plan type is none"
41 )
42 })?;
43
44 match loki_plan {
45 protobuf::loki_physical_plan_node::LokiPhysicalPlanType::Scan(proto) => {
46 let projection = parse_projection(proto.projection.as_ref());
47 let exec = LokiLogScanExec::try_new(
48 proto.endpoint,
49 proto.log_query,
50 proto.start,
51 proto.end,
52 projection,
53 proto.limit.map(|l| l as usize),
54 )?;
55 Ok(Arc::new(exec))
56 }
57 protobuf::loki_physical_plan_node::LokiPhysicalPlanType::Insert(proto) => {
58 if inputs.len() != 1 {
59 return internal_err!("LokiLogInsertExec only support one input");
60 }
61
62 let input = inputs[0].clone();
63 let exec = LokiLogInsertExec::try_new(input, proto.endpoint)?;
64 Ok(Arc::new(exec))
65 }
66 protobuf::loki_physical_plan_node::LokiPhysicalPlanType::MemoryDatasource(proto) => {
67 let partitions = parse_partitions(&proto.partitions)?;
68 let schema = Schema::try_from(&proto.schema.unwrap())?;
69 let projection = parse_projection(proto.projection.as_ref());
70
71 let sort_information = proto
72 .sort_information
73 .iter()
74 .map(|sort_exprs| {
75 let sort_exprs = parse_physical_sort_exprs(
76 sort_exprs.physical_sort_expr_nodes.as_slice(),
77 registry,
78 &schema,
79 self,
80 )?;
81 let lex_ordering =
82 LexOrdering::new(sort_exprs).expect("lex ordering is not empty");
83 Ok::<_, DataFusionError>(lex_ordering)
84 })
85 .collect::<Result<Vec<_>, _>>()?;
86
87 let show_sizes = proto.show_sizes;
88 let fetch = proto.fetch.map(|f| f as usize);
89 let memory_source =
90 MemorySourceConfig::try_new(&partitions, Arc::new(schema), projection)?
91 .with_show_sizes(show_sizes)
92 .with_limit(fetch);
93
94 let memory_source =
95 MemorySourceConfig::try_with_sort_information(memory_source, sort_information)?;
96 Ok(DataSourceExec::from_data_source(memory_source))
97 }
98 }
99 }
100
101 fn try_encode(&self, node: Arc<dyn ExecutionPlan>, buf: &mut Vec<u8>) -> DFResult<()> {
102 if let Some(exec) = node.as_any().downcast_ref::<LokiLogScanExec>() {
103 let projection = serialize_projection(exec.projection.as_ref());
104
105 let proto = protobuf::LokiPhysicalPlanNode {
106 loki_physical_plan_type: Some(
107 protobuf::loki_physical_plan_node::LokiPhysicalPlanType::Scan(
108 protobuf::LokiLogScanExec {
109 endpoint: exec.endpoint.clone(),
110 log_query: exec.log_query.clone(),
111 start: exec.start,
112 end: exec.end,
113 projection,
114 limit: exec.limit.map(|l| l as i32),
115 },
116 ),
117 ),
118 };
119
120 proto.encode(buf).map_err(|e| {
121 internal_datafusion_err!("Failed to encode loki log scan exec plan: {e:?}")
122 })?;
123 Ok(())
124 } else if let Some(exec) = node.as_any().downcast_ref::<LokiLogInsertExec>() {
125 let proto = protobuf::LokiPhysicalPlanNode {
126 loki_physical_plan_type: Some(
127 protobuf::loki_physical_plan_node::LokiPhysicalPlanType::Insert(
128 protobuf::LokiLogInsertExec {
129 endpoint: exec.endpoint.clone(),
130 },
131 ),
132 ),
133 };
134
135 proto.encode(buf).map_err(|e| {
136 internal_datafusion_err!("Failed to encode loki log scan exec plan: {e:?}")
137 })?;
138 Ok(())
139 } else if let Some(exec) = node.as_any().downcast_ref::<DataSourceExec>() {
140 let source = exec.data_source();
141 if let Some(memory_source) = source.as_any().downcast_ref::<MemorySourceConfig>() {
142 let proto_partitions = serialize_partitions(memory_source.partitions())?;
143 let projection = serialize_projection(memory_source.projection().as_ref());
144 let sort_information = memory_source
145 .sort_information()
146 .iter()
147 .map(|ordering| {
148 let sort_exprs = serialize_physical_sort_exprs(ordering.clone(), self)?;
149 Ok::<_, DataFusionError>(
150 datafusion_proto::protobuf::PhysicalSortExprNodeCollection {
151 physical_sort_expr_nodes: sort_exprs,
152 },
153 )
154 })
155 .collect::<Result<Vec<_>, _>>()?;
156
157 let proto = protobuf::LokiPhysicalPlanNode {
158 loki_physical_plan_type: Some(
159 protobuf::loki_physical_plan_node::LokiPhysicalPlanType::MemoryDatasource(
160 protobuf::MemoryDatasourceNode {
161 partitions: proto_partitions,
162 schema: Some(memory_source.original_schema().try_into()?),
163 projection,
164 sort_information,
165 show_sizes: memory_source.show_sizes(),
166 fetch: memory_source.fetch().map(|f| f as u32),
167 },
168 ),
169 ),
170 };
171
172 proto.encode(buf).map_err(|e| {
173 internal_datafusion_err!("Failed to encode memory datasource node: {e:?}")
174 })?;
175
176 Ok(())
177 } else {
178 not_impl_err!(
179 "LokiPhysicalCodec only support encoding MemorySourceConfig, got {source:?}"
180 )
181 }
182 } else {
183 not_impl_err!(
184 "LokiPhysicalCodec does not support encoding {}",
185 node.name()
186 )
187 }
188 }
189}
190
191fn serialize_projection(projection: Option<&Vec<usize>>) -> Option<protobuf::Projection> {
192 projection.map(|p| protobuf::Projection {
193 projection: p.iter().map(|n| *n as u32).collect(),
194 })
195}
196
197fn parse_projection(projection: Option<&protobuf::Projection>) -> Option<Vec<usize>> {
198 projection.map(|p| p.projection.iter().map(|n| *n as usize).collect())
199}
200
201fn serialize_partitions(partitions: &[Vec<RecordBatch>]) -> DFResult<Vec<Vec<u8>>> {
202 let mut proto_partitions = vec![];
203 for partition in partitions {
204 if partition.is_empty() {
205 proto_partitions.push(vec![]);
206 continue;
207 }
208 let mut proto_partition = vec![];
209 let mut stream_writer =
210 StreamWriter::try_new(&mut proto_partition, &partition[0].schema())?;
211 for batch in partition {
212 stream_writer.write(batch)?;
213 }
214 stream_writer.finish()?;
215 proto_partitions.push(proto_partition);
216 }
217 Ok(proto_partitions)
218}
219
220fn parse_partitions(proto_partitions: &[Vec<u8>]) -> DFResult<Vec<Vec<RecordBatch>>> {
221 let mut partitions = vec![];
222 for proto_partition in proto_partitions {
223 if proto_partition.is_empty() {
224 partitions.push(vec![]);
225 continue;
226 }
227 let mut partition = vec![];
228 let stream_reader = StreamReader::try_new(proto_partition.as_slice(), None)?;
229 for batch in stream_reader {
230 partition.push(batch?);
231 }
232 partitions.push(partition);
233 }
234 Ok(partitions)
235}