1use std::{any::Any, io::Cursor, pin::Pin, sync::Arc};
2
3use datafusion::{
4 arrow::array::RecordBatch,
5 common::{exec_err, project_schema},
6 error::DataFusionError,
7 execution::{SendableRecordBatchStream, TaskContext},
8 physical_expr::EquivalenceProperties,
9 physical_plan::{
10 DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties,
11 display::ProjectSchemaDisplay,
12 execution_plan::{Boundedness, EmissionType},
13 stream::RecordBatchStreamAdapter,
14 },
15};
16use futures::{Stream, StreamExt, TryStreamExt};
17use log::debug;
18use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask};
19use reqwest::{Client, RequestBuilder};
20
21use crate::{
22 DFResult, LOG_TABLE_SCHEMA, current_timestamp_ns, thirty_days_before_now_timestamp_ns,
23};
24
25#[derive(Debug)]
26pub struct LokiLogScanExec {
27 pub endpoint: String,
28 pub log_query: String,
29 pub start: Option<i64>,
30 pub end: Option<i64>,
31 pub projection: Option<Vec<usize>>,
32 pub limit: Option<usize>,
33 client: Client,
34 plan_properties: PlanProperties,
35}
36
37impl LokiLogScanExec {
38 pub fn try_new(
39 endpoint: String,
40 log_query: String,
41 start: Option<i64>,
42 end: Option<i64>,
43 projection: Option<Vec<usize>>,
44 limit: Option<usize>,
45 ) -> DFResult<Self> {
46 let projected_schema = project_schema(&LOG_TABLE_SCHEMA, projection.as_ref())?;
47 let plan_properties = PlanProperties::new(
48 EquivalenceProperties::new(projected_schema),
49 Partitioning::UnknownPartitioning(1),
50 EmissionType::Incremental,
51 Boundedness::Bounded,
52 );
53 let client = Client::builder()
54 .build()
55 .map_err(|e| DataFusionError::Plan(format!("Failed to build http client: {e}")))?;
56 Ok(LokiLogScanExec {
57 endpoint,
58 log_query,
59 start,
60 end,
61 projection,
62 limit,
63 client,
64 plan_properties,
65 })
66 }
67}
68
69impl ExecutionPlan for LokiLogScanExec {
70 fn name(&self) -> &str {
71 "LokiLogScanExec"
72 }
73
74 fn as_any(&self) -> &dyn Any {
75 self
76 }
77
78 fn properties(&self) -> &PlanProperties {
79 &self.plan_properties
80 }
81
82 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
83 vec![]
84 }
85
86 fn with_new_children(
87 self: Arc<Self>,
88 _children: Vec<Arc<dyn ExecutionPlan>>,
89 ) -> DFResult<Arc<dyn ExecutionPlan>> {
90 Ok(self)
91 }
92
93 fn execute(
94 &self,
95 partition: usize,
96 _context: Arc<TaskContext>,
97 ) -> DFResult<SendableRecordBatchStream> {
98 if partition != 0 {
99 return exec_err!("LokiLogScanExec does not support multiple partitions");
100 }
101
102 debug!(
103 "[datafusion-loki] starting to scan logs: query: {}, start: {:?}, end: {:?}, limit: {:?}",
104 self.log_query, self.start, self.end, self.limit
105 );
106
107 let mut query = Vec::new();
108 query.push(("query", self.log_query.clone()));
109
110 let start = self.start.unwrap_or(thirty_days_before_now_timestamp_ns());
111 query.push(("start", start.to_string()));
112
113 let end = self.end.unwrap_or(current_timestamp_ns());
114 query.push(("end", end.to_string()));
115
116 if let Some(limit) = self.limit {
117 query.push(("limit", limit.to_string()));
118 }
119
120 let req_builder = self
121 .client
122 .get(format!("{}/loki/api/v1/query_range", self.endpoint))
123 .header("Accept", "application/vnd.apache.parquet")
124 .query(&query);
125
126 let fut = fetch_log_stream(req_builder, self.projection.clone());
127 let stream = futures::stream::once(fut).try_flatten();
128 Ok(Box::pin(RecordBatchStreamAdapter::new(
129 self.schema(),
130 stream,
131 )))
132 }
133
134 fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {
135 Self::try_new(
136 self.endpoint.clone(),
137 self.log_query.clone(),
138 self.start,
139 self.end,
140 self.projection.clone(),
141 limit,
142 )
143 .ok()
144 .map(|exec| Arc::new(exec) as Arc<dyn ExecutionPlan>)
145 }
146
147 fn fetch(&self) -> Option<usize> {
148 self.limit
149 }
150}
151
152impl DisplayAs for LokiLogScanExec {
153 fn fmt_as(&self, _t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
154 write!(
155 f,
156 "LokiLogScanExec: endpoint={}, query={}",
157 self.endpoint, self.log_query
158 )?;
159 if let Some(start) = self.start {
160 write!(f, ", start={}", start)?;
161 }
162 if let Some(end) = self.end {
163 write!(f, ", end={}", end)?;
164 }
165 if self.projection.is_some() {
166 let projected_schema = self.schema();
167 write!(
168 f,
169 ", projection={}",
170 ProjectSchemaDisplay(&projected_schema)
171 )?;
172 }
173 if let Some(limit) = self.limit {
174 write!(f, ", limit={limit}")?;
175 }
176 Ok(())
177 }
178}
179
180async fn fetch_log_stream(
181 req_builder: RequestBuilder,
182 projection: Option<Vec<usize>>,
183) -> DFResult<Pin<Box<dyn Stream<Item = DFResult<RecordBatch>> + Send>>> {
184 let resp = req_builder
185 .send()
186 .await
187 .map_err(|e| DataFusionError::Execution(format!("Failed to send request to loki: {e}")))?;
188 let status = resp.status();
189 if !status.is_success() {
190 let url = resp.url().clone();
191 let with_text = if let Ok(text) = resp.text().await {
192 format!(", text: {text}")
193 } else {
194 String::new()
195 };
196 return exec_err!("Request to logi failed with status {status}, url: {url}{with_text}");
197 }
198 let bytes = resp.bytes().await.map_err(|e| {
199 DataFusionError::Execution(format!("Failed to get response body as bytes: {e}"))
200 })?;
201 let cursor = Cursor::new(bytes);
202
203 let builder = ParquetRecordBatchStreamBuilder::new(cursor).await?;
204 let parquet_schema = builder.parquet_schema();
205
206 let projection_mask = match projection {
207 Some(proj) => ProjectionMask::roots(parquet_schema, proj),
208 None => ProjectionMask::all(),
209 };
210
211 let stream = builder
212 .with_batch_size(4096)
213 .with_projection(projection_mask)
214 .build()?
215 .map_err(|e| DataFusionError::ParquetError(Box::new(e)))
216 .boxed();
217
218 Ok(stream)
219}