datafusion_loki/
codec.rs

1use std::sync::Arc;
2
3use datafusion::{
4    arrow::{
5        array::RecordBatch,
6        datatypes::Schema,
7        ipc::{reader::StreamReader, writer::StreamWriter},
8    },
9    catalog::memory::{DataSourceExec, MemorySourceConfig},
10    common::{internal_datafusion_err, internal_err, not_impl_err},
11    datasource::source::DataSource,
12    error::DataFusionError,
13    execution::FunctionRegistry,
14    physical_expr::LexOrdering,
15    physical_plan::ExecutionPlan,
16};
17use datafusion_proto::physical_plan::{
18    PhysicalExtensionCodec, from_proto::parse_physical_sort_exprs,
19    to_proto::serialize_physical_sort_exprs,
20};
21use prost::Message;
22
23use crate::{DFResult, LokiLogInsertExec, LokiLogScanExec, protobuf};
24
25#[derive(Debug, Clone)]
26pub struct LokiPhysicalCodec;
27
28impl PhysicalExtensionCodec for LokiPhysicalCodec {
29    fn try_decode(
30        &self,
31        buf: &[u8],
32        inputs: &[Arc<dyn ExecutionPlan>],
33        registry: &dyn FunctionRegistry,
34    ) -> DFResult<Arc<dyn ExecutionPlan>> {
35        let loki_node = protobuf::LokiPhysicalPlanNode::decode(buf).map_err(|e| {
36            internal_datafusion_err!("Failed to decode loki physical plan node: {e:?}")
37        })?;
38        let loki_plan = loki_node.loki_physical_plan_type.ok_or_else(|| {
39            internal_datafusion_err!(
40                "Failed to decode loki physical plan node due to physical plan type is none"
41            )
42        })?;
43
44        match loki_plan {
45            protobuf::loki_physical_plan_node::LokiPhysicalPlanType::Scan(proto) => {
46                let projection = parse_projection(proto.projection.as_ref());
47                let exec = LokiLogScanExec::try_new(
48                    proto.endpoint,
49                    proto.log_query,
50                    proto.start,
51                    proto.end,
52                    projection,
53                    proto.limit.map(|l| l as usize),
54                )?;
55                Ok(Arc::new(exec))
56            }
57            protobuf::loki_physical_plan_node::LokiPhysicalPlanType::Insert(proto) => {
58                if inputs.len() != 1 {
59                    return internal_err!("LokiLogInsertExec only support one input");
60                }
61
62                let input = inputs[0].clone();
63                let exec = LokiLogInsertExec::try_new(input, proto.endpoint)?;
64                Ok(Arc::new(exec))
65            }
66            protobuf::loki_physical_plan_node::LokiPhysicalPlanType::MemoryDatasource(proto) => {
67                let partitions = parse_partitions(&proto.partitions)?;
68                let schema = Schema::try_from(&proto.schema.unwrap())?;
69                let projection = parse_projection(proto.projection.as_ref());
70
71                let sort_information = proto
72                    .sort_information
73                    .iter()
74                    .map(|sort_exprs| {
75                        let sort_exprs = parse_physical_sort_exprs(
76                            sort_exprs.physical_sort_expr_nodes.as_slice(),
77                            registry,
78                            &schema,
79                            self,
80                        )?;
81                        let lex_ordering =
82                            LexOrdering::new(sort_exprs).expect("lex ordering is not empty");
83                        Ok::<_, DataFusionError>(lex_ordering)
84                    })
85                    .collect::<Result<Vec<_>, _>>()?;
86
87                let show_sizes = proto.show_sizes;
88                let fetch = proto.fetch.map(|f| f as usize);
89                let memory_source =
90                    MemorySourceConfig::try_new(&partitions, Arc::new(schema), projection)?
91                        .with_show_sizes(show_sizes)
92                        .with_limit(fetch);
93
94                let memory_source =
95                    MemorySourceConfig::try_with_sort_information(memory_source, sort_information)?;
96                Ok(DataSourceExec::from_data_source(memory_source))
97            }
98        }
99    }
100
101    fn try_encode(&self, node: Arc<dyn ExecutionPlan>, buf: &mut Vec<u8>) -> DFResult<()> {
102        if let Some(exec) = node.as_any().downcast_ref::<LokiLogScanExec>() {
103            let projection = serialize_projection(exec.projection.as_ref());
104
105            let proto = protobuf::LokiPhysicalPlanNode {
106                loki_physical_plan_type: Some(
107                    protobuf::loki_physical_plan_node::LokiPhysicalPlanType::Scan(
108                        protobuf::LokiLogScanExec {
109                            endpoint: exec.endpoint.clone(),
110                            log_query: exec.log_query.clone(),
111                            start: exec.start,
112                            end: exec.end,
113                            projection,
114                            limit: exec.limit.map(|l| l as i32),
115                        },
116                    ),
117                ),
118            };
119
120            proto.encode(buf).map_err(|e| {
121                internal_datafusion_err!("Failed to encode loki log scan exec plan: {e:?}")
122            })?;
123            Ok(())
124        } else if let Some(exec) = node.as_any().downcast_ref::<LokiLogInsertExec>() {
125            let proto = protobuf::LokiPhysicalPlanNode {
126                loki_physical_plan_type: Some(
127                    protobuf::loki_physical_plan_node::LokiPhysicalPlanType::Insert(
128                        protobuf::LokiLogInsertExec {
129                            endpoint: exec.endpoint.clone(),
130                        },
131                    ),
132                ),
133            };
134
135            proto.encode(buf).map_err(|e| {
136                internal_datafusion_err!("Failed to encode loki log scan exec plan: {e:?}")
137            })?;
138            Ok(())
139        } else if let Some(exec) = node.as_any().downcast_ref::<DataSourceExec>() {
140            let source = exec.data_source();
141            if let Some(memory_source) = source.as_any().downcast_ref::<MemorySourceConfig>() {
142                let proto_partitions = serialize_partitions(memory_source.partitions())?;
143                let projection = serialize_projection(memory_source.projection().as_ref());
144                let sort_information = memory_source
145                    .sort_information()
146                    .iter()
147                    .map(|ordering| {
148                        let sort_exprs = serialize_physical_sort_exprs(ordering.clone(), self)?;
149                        Ok::<_, DataFusionError>(
150                            datafusion_proto::protobuf::PhysicalSortExprNodeCollection {
151                                physical_sort_expr_nodes: sort_exprs,
152                            },
153                        )
154                    })
155                    .collect::<Result<Vec<_>, _>>()?;
156
157                let proto = protobuf::LokiPhysicalPlanNode {
158                    loki_physical_plan_type: Some(
159                        protobuf::loki_physical_plan_node::LokiPhysicalPlanType::MemoryDatasource(
160                            protobuf::MemoryDatasourceNode {
161                                partitions: proto_partitions,
162                                schema: Some(memory_source.original_schema().try_into()?),
163                                projection,
164                                sort_information,
165                                show_sizes: memory_source.show_sizes(),
166                                fetch: memory_source.fetch().map(|f| f as u32),
167                            },
168                        ),
169                    ),
170                };
171
172                proto.encode(buf).map_err(|e| {
173                    internal_datafusion_err!("Failed to encode memory datasource node: {e:?}")
174                })?;
175
176                Ok(())
177            } else {
178                not_impl_err!(
179                    "LokiPhysicalCodec only support encoding MemorySourceConfig, got {source:?}"
180                )
181            }
182        } else {
183            not_impl_err!(
184                "LokiPhysicalCodec does not support encoding {}",
185                node.name()
186            )
187        }
188    }
189}
190
191fn serialize_projection(projection: Option<&Vec<usize>>) -> Option<protobuf::Projection> {
192    projection.map(|p| protobuf::Projection {
193        projection: p.iter().map(|n| *n as u32).collect(),
194    })
195}
196
197fn parse_projection(projection: Option<&protobuf::Projection>) -> Option<Vec<usize>> {
198    projection.map(|p| p.projection.iter().map(|n| *n as usize).collect())
199}
200
201fn serialize_partitions(partitions: &[Vec<RecordBatch>]) -> DFResult<Vec<Vec<u8>>> {
202    let mut proto_partitions = vec![];
203    for partition in partitions {
204        if partition.is_empty() {
205            proto_partitions.push(vec![]);
206            continue;
207        }
208        let mut proto_partition = vec![];
209        let mut stream_writer =
210            StreamWriter::try_new(&mut proto_partition, &partition[0].schema())?;
211        for batch in partition {
212            stream_writer.write(batch)?;
213        }
214        stream_writer.finish()?;
215        proto_partitions.push(proto_partition);
216    }
217    Ok(proto_partitions)
218}
219
220fn parse_partitions(proto_partitions: &[Vec<u8>]) -> DFResult<Vec<Vec<RecordBatch>>> {
221    let mut partitions = vec![];
222    for proto_partition in proto_partitions {
223        if proto_partition.is_empty() {
224            partitions.push(vec![]);
225            continue;
226        }
227        let mut partition = vec![];
228        let stream_reader = StreamReader::try_new(proto_partition.as_slice(), None)?;
229        for batch in stream_reader {
230            partition.push(batch?);
231        }
232        partitions.push(partition);
233    }
234    Ok(partitions)
235}