datafusion_loki/
scan.rs

1use std::{any::Any, io::Cursor, pin::Pin, sync::Arc};
2
3use arrow::array::RecordBatch;
4use datafusion_common::{DataFusionError, exec_err, project_schema};
5use datafusion_execution::{SendableRecordBatchStream, TaskContext};
6use datafusion_physical_expr::EquivalenceProperties;
7use datafusion_physical_plan::{
8    DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties,
9    display::ProjectSchemaDisplay,
10    execution_plan::{Boundedness, EmissionType},
11    stream::RecordBatchStreamAdapter,
12};
13use futures::{Stream, StreamExt, TryStreamExt};
14use log::debug;
15use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask};
16use reqwest::{Client, RequestBuilder};
17
18use crate::{
19    DFResult, LOG_TABLE_SCHEMA, current_timestamp_ns, thirty_days_before_now_timestamp_ns,
20};
21
22#[derive(Debug)]
23pub struct LokiLogScanExec {
24    pub endpoint: String,
25    pub log_query: String,
26    pub start: Option<i64>,
27    pub end: Option<i64>,
28    pub projection: Option<Vec<usize>>,
29    pub limit: Option<usize>,
30    client: Client,
31    plan_properties: PlanProperties,
32}
33
34impl LokiLogScanExec {
35    pub fn try_new(
36        endpoint: String,
37        log_query: String,
38        start: Option<i64>,
39        end: Option<i64>,
40        projection: Option<Vec<usize>>,
41        limit: Option<usize>,
42    ) -> DFResult<Self> {
43        let projected_schema = project_schema(&LOG_TABLE_SCHEMA, projection.as_ref())?;
44        let plan_properties = PlanProperties::new(
45            EquivalenceProperties::new(projected_schema),
46            Partitioning::UnknownPartitioning(1),
47            EmissionType::Incremental,
48            Boundedness::Bounded,
49        );
50        let client = Client::builder()
51            .build()
52            .map_err(|e| DataFusionError::Plan(format!("Failed to build http client: {e}")))?;
53        Ok(LokiLogScanExec {
54            endpoint,
55            log_query,
56            start,
57            end,
58            projection,
59            limit,
60            client,
61            plan_properties,
62        })
63    }
64}
65
66impl ExecutionPlan for LokiLogScanExec {
67    fn name(&self) -> &str {
68        "LokiLogScanExec"
69    }
70
71    fn as_any(&self) -> &dyn Any {
72        self
73    }
74
75    fn properties(&self) -> &PlanProperties {
76        &self.plan_properties
77    }
78
79    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
80        vec![]
81    }
82
83    fn with_new_children(
84        self: Arc<Self>,
85        _children: Vec<Arc<dyn ExecutionPlan>>,
86    ) -> DFResult<Arc<dyn ExecutionPlan>> {
87        Ok(self)
88    }
89
90    fn execute(
91        &self,
92        partition: usize,
93        _context: Arc<TaskContext>,
94    ) -> DFResult<SendableRecordBatchStream> {
95        if partition != 0 {
96            return exec_err!("LokiLogScanExec does not support multiple partitions");
97        }
98
99        debug!(
100            "[datafusion-loki] starting to scan logs: query: {}, start: {:?}, end: {:?}, limit: {:?}",
101            self.log_query, self.start, self.end, self.limit
102        );
103
104        let mut query = Vec::new();
105        query.push(("query", self.log_query.clone()));
106
107        let start = self.start.unwrap_or(thirty_days_before_now_timestamp_ns());
108        query.push(("start", start.to_string()));
109
110        let end = self.end.unwrap_or(current_timestamp_ns());
111        query.push(("end", end.to_string()));
112
113        if let Some(limit) = self.limit {
114            query.push(("limit", limit.to_string()));
115        }
116
117        let req_builder = self
118            .client
119            .get(format!("{}/loki/api/v1/query_range", self.endpoint))
120            .header("Accept", "application/vnd.apache.parquet")
121            .query(&query);
122
123        let fut = fetch_log_stream(req_builder, self.projection.clone());
124        let stream = futures::stream::once(fut).try_flatten();
125        Ok(Box::pin(RecordBatchStreamAdapter::new(
126            self.schema(),
127            stream,
128        )))
129    }
130
131    fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {
132        Self::try_new(
133            self.endpoint.clone(),
134            self.log_query.clone(),
135            self.start,
136            self.end,
137            self.projection.clone(),
138            limit,
139        )
140        .ok()
141        .map(|exec| Arc::new(exec) as Arc<dyn ExecutionPlan>)
142    }
143
144    fn fetch(&self) -> Option<usize> {
145        self.limit
146    }
147}
148
149impl DisplayAs for LokiLogScanExec {
150    fn fmt_as(&self, _t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
151        write!(
152            f,
153            "LokiLogScanExec: endpoint={}, query={}",
154            self.endpoint, self.log_query
155        )?;
156        if let Some(start) = self.start {
157            write!(f, ", start={}", start)?;
158        }
159        if let Some(end) = self.end {
160            write!(f, ", end={}", end)?;
161        }
162        if self.projection.is_some() {
163            let projected_schema = self.schema();
164            write!(
165                f,
166                ", projection={}",
167                ProjectSchemaDisplay(&projected_schema)
168            )?;
169        }
170        if let Some(limit) = self.limit {
171            write!(f, ", limit={limit}")?;
172        }
173        Ok(())
174    }
175}
176
177async fn fetch_log_stream(
178    req_builder: RequestBuilder,
179    projection: Option<Vec<usize>>,
180) -> DFResult<Pin<Box<dyn Stream<Item = DFResult<RecordBatch>> + Send>>> {
181    let resp = req_builder
182        .send()
183        .await
184        .map_err(|e| DataFusionError::Execution(format!("Failed to send request to loki: {e}")))?;
185    let status = resp.status();
186    if !status.is_success() {
187        let url = resp.url().clone();
188        let with_text = if let Ok(text) = resp.text().await {
189            format!(", text: {text}")
190        } else {
191            String::new()
192        };
193        return exec_err!("Request to logi failed with status {status}, url: {url}{with_text}");
194    }
195    let bytes = resp.bytes().await.map_err(|e| {
196        DataFusionError::Execution(format!("Failed to get response body as bytes: {e}"))
197    })?;
198    let cursor = Cursor::new(bytes);
199
200    let builder = ParquetRecordBatchStreamBuilder::new(cursor).await?;
201    let parquet_schema = builder.parquet_schema();
202
203    let projection_mask = match projection {
204        Some(proj) => ProjectionMask::roots(parquet_schema, proj),
205        None => ProjectionMask::all(),
206    };
207
208    let stream = builder
209        .with_batch_size(4096)
210        .with_projection(projection_mask)
211        .build()?
212        .map_err(|e| DataFusionError::ParquetError(Box::new(e)))
213        .boxed();
214
215    Ok(stream)
216}