1use std::{any::Any, io::Cursor, pin::Pin, sync::Arc};
2
3use arrow::array::RecordBatch;
4use datafusion_common::{DataFusionError, exec_err, project_schema};
5use datafusion_execution::{SendableRecordBatchStream, TaskContext};
6use datafusion_physical_expr::EquivalenceProperties;
7use datafusion_physical_plan::{
8 DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties,
9 display::ProjectSchemaDisplay,
10 execution_plan::{Boundedness, EmissionType},
11 stream::RecordBatchStreamAdapter,
12};
13use futures::{Stream, StreamExt, TryStreamExt};
14use log::debug;
15use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask};
16use reqwest::{Client, RequestBuilder};
17
18use crate::{
19 DFResult, LOG_TABLE_SCHEMA, current_timestamp_ns, thirty_days_before_now_timestamp_ns,
20};
21
22#[derive(Debug)]
23pub struct LokiLogScanExec {
24 pub endpoint: String,
25 pub log_query: String,
26 pub start: Option<i64>,
27 pub end: Option<i64>,
28 pub projection: Option<Vec<usize>>,
29 pub limit: Option<usize>,
30 client: Client,
31 plan_properties: PlanProperties,
32}
33
34impl LokiLogScanExec {
35 pub fn try_new(
36 endpoint: String,
37 log_query: String,
38 start: Option<i64>,
39 end: Option<i64>,
40 projection: Option<Vec<usize>>,
41 limit: Option<usize>,
42 ) -> DFResult<Self> {
43 let projected_schema = project_schema(&LOG_TABLE_SCHEMA, projection.as_ref())?;
44 let plan_properties = PlanProperties::new(
45 EquivalenceProperties::new(projected_schema),
46 Partitioning::UnknownPartitioning(1),
47 EmissionType::Incremental,
48 Boundedness::Bounded,
49 );
50 let client = Client::builder()
51 .build()
52 .map_err(|e| DataFusionError::Plan(format!("Failed to build http client: {e}")))?;
53 Ok(LokiLogScanExec {
54 endpoint,
55 log_query,
56 start,
57 end,
58 projection,
59 limit,
60 client,
61 plan_properties,
62 })
63 }
64}
65
66impl ExecutionPlan for LokiLogScanExec {
67 fn name(&self) -> &str {
68 "LokiLogScanExec"
69 }
70
71 fn as_any(&self) -> &dyn Any {
72 self
73 }
74
75 fn properties(&self) -> &PlanProperties {
76 &self.plan_properties
77 }
78
79 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
80 vec![]
81 }
82
83 fn with_new_children(
84 self: Arc<Self>,
85 _children: Vec<Arc<dyn ExecutionPlan>>,
86 ) -> DFResult<Arc<dyn ExecutionPlan>> {
87 Ok(self)
88 }
89
90 fn execute(
91 &self,
92 partition: usize,
93 _context: Arc<TaskContext>,
94 ) -> DFResult<SendableRecordBatchStream> {
95 if partition != 0 {
96 return exec_err!("LokiLogScanExec does not support multiple partitions");
97 }
98
99 debug!(
100 "[datafusion-loki] starting to scan logs: query: {}, start: {:?}, end: {:?}, limit: {:?}",
101 self.log_query, self.start, self.end, self.limit
102 );
103
104 let mut query = Vec::new();
105 query.push(("query", self.log_query.clone()));
106
107 let start = self.start.unwrap_or(thirty_days_before_now_timestamp_ns());
108 query.push(("start", start.to_string()));
109
110 let end = self.end.unwrap_or(current_timestamp_ns());
111 query.push(("end", end.to_string()));
112
113 if let Some(limit) = self.limit {
114 query.push(("limit", limit.to_string()));
115 }
116
117 let req_builder = self
118 .client
119 .get(format!("{}/loki/api/v1/query_range", self.endpoint))
120 .header("Accept", "application/vnd.apache.parquet")
121 .query(&query);
122
123 let fut = fetch_log_stream(req_builder, self.projection.clone());
124 let stream = futures::stream::once(fut).try_flatten();
125 Ok(Box::pin(RecordBatchStreamAdapter::new(
126 self.schema(),
127 stream,
128 )))
129 }
130
131 fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {
132 Self::try_new(
133 self.endpoint.clone(),
134 self.log_query.clone(),
135 self.start,
136 self.end,
137 self.projection.clone(),
138 limit,
139 )
140 .ok()
141 .map(|exec| Arc::new(exec) as Arc<dyn ExecutionPlan>)
142 }
143
144 fn fetch(&self) -> Option<usize> {
145 self.limit
146 }
147}
148
149impl DisplayAs for LokiLogScanExec {
150 fn fmt_as(&self, _t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
151 write!(
152 f,
153 "LokiLogScanExec: endpoint={}, query={}",
154 self.endpoint, self.log_query
155 )?;
156 if let Some(start) = self.start {
157 write!(f, ", start={}", start)?;
158 }
159 if let Some(end) = self.end {
160 write!(f, ", end={}", end)?;
161 }
162 if self.projection.is_some() {
163 let projected_schema = self.schema();
164 write!(
165 f,
166 ", projection={}",
167 ProjectSchemaDisplay(&projected_schema)
168 )?;
169 }
170 if let Some(limit) = self.limit {
171 write!(f, ", limit={limit}")?;
172 }
173 Ok(())
174 }
175}
176
177async fn fetch_log_stream(
178 req_builder: RequestBuilder,
179 projection: Option<Vec<usize>>,
180) -> DFResult<Pin<Box<dyn Stream<Item = DFResult<RecordBatch>> + Send>>> {
181 let resp = req_builder
182 .send()
183 .await
184 .map_err(|e| DataFusionError::Execution(format!("Failed to send request to loki: {e}")))?;
185 let status = resp.status();
186 if !status.is_success() {
187 let url = resp.url().clone();
188 let with_text = if let Ok(text) = resp.text().await {
189 format!(", text: {text}")
190 } else {
191 String::new()
192 };
193 return exec_err!("Request to logi failed with status {status}, url: {url}{with_text}");
194 }
195 let bytes = resp.bytes().await.map_err(|e| {
196 DataFusionError::Execution(format!("Failed to get response body as bytes: {e}"))
197 })?;
198 let cursor = Cursor::new(bytes);
199
200 let builder = ParquetRecordBatchStreamBuilder::new(cursor).await?;
201 let parquet_schema = builder.parquet_schema();
202
203 let projection_mask = match projection {
204 Some(proj) => ProjectionMask::roots(parquet_schema, proj),
205 None => ProjectionMask::all(),
206 };
207
208 let stream = builder
209 .with_batch_size(4096)
210 .with_projection(projection_mask)
211 .build()?
212 .map_err(|e| DataFusionError::ParquetError(Box::new(e)))
213 .boxed();
214
215 Ok(stream)
216}