datafusion_loki/
scan.rs

1use std::{any::Any, io::Cursor, pin::Pin, sync::Arc};
2
3use datafusion::{
4    arrow::array::RecordBatch,
5    common::{exec_err, project_schema},
6    error::DataFusionError,
7    execution::{SendableRecordBatchStream, TaskContext},
8    physical_expr::EquivalenceProperties,
9    physical_plan::{
10        DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties,
11        display::ProjectSchemaDisplay,
12        execution_plan::{Boundedness, EmissionType},
13        stream::RecordBatchStreamAdapter,
14    },
15};
16use futures::{Stream, StreamExt, TryStreamExt};
17use log::debug;
18use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask};
19use reqwest::{Client, RequestBuilder};
20
21use crate::{
22    DFResult, LOG_TABLE_SCHEMA, current_timestamp_ns, thirty_days_before_now_timestamp_ns,
23};
24
25#[derive(Debug)]
26pub struct LokiLogScanExec {
27    pub endpoint: String,
28    pub log_query: String,
29    pub start: Option<i64>,
30    pub end: Option<i64>,
31    pub projection: Option<Vec<usize>>,
32    pub limit: Option<usize>,
33    client: Client,
34    plan_properties: PlanProperties,
35}
36
37impl LokiLogScanExec {
38    pub fn try_new(
39        endpoint: String,
40        log_query: String,
41        start: Option<i64>,
42        end: Option<i64>,
43        projection: Option<Vec<usize>>,
44        limit: Option<usize>,
45    ) -> DFResult<Self> {
46        let projected_schema = project_schema(&LOG_TABLE_SCHEMA, projection.as_ref())?;
47        let plan_properties = PlanProperties::new(
48            EquivalenceProperties::new(projected_schema),
49            Partitioning::UnknownPartitioning(1),
50            EmissionType::Incremental,
51            Boundedness::Bounded,
52        );
53        let client = Client::builder()
54            .build()
55            .map_err(|e| DataFusionError::Plan(format!("Failed to build http client: {e}")))?;
56        Ok(LokiLogScanExec {
57            endpoint,
58            log_query,
59            start,
60            end,
61            projection,
62            limit,
63            client,
64            plan_properties,
65        })
66    }
67}
68
69impl ExecutionPlan for LokiLogScanExec {
70    fn name(&self) -> &str {
71        "LokiLogScanExec"
72    }
73
74    fn as_any(&self) -> &dyn Any {
75        self
76    }
77
78    fn properties(&self) -> &PlanProperties {
79        &self.plan_properties
80    }
81
82    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
83        vec![]
84    }
85
86    fn with_new_children(
87        self: Arc<Self>,
88        _children: Vec<Arc<dyn ExecutionPlan>>,
89    ) -> DFResult<Arc<dyn ExecutionPlan>> {
90        Ok(self)
91    }
92
93    fn execute(
94        &self,
95        partition: usize,
96        _context: Arc<TaskContext>,
97    ) -> DFResult<SendableRecordBatchStream> {
98        if partition != 0 {
99            return exec_err!("LokiLogScanExec does not support multiple partitions");
100        }
101
102        debug!(
103            "[datafusion-loki] starting to scan logs: query: {}, start: {:?}, end: {:?}, limit: {:?}",
104            self.log_query, self.start, self.end, self.limit
105        );
106
107        let mut query = Vec::new();
108        query.push(("query", self.log_query.clone()));
109
110        let start = self.start.unwrap_or(thirty_days_before_now_timestamp_ns());
111        query.push(("start", start.to_string()));
112
113        let end = self.end.unwrap_or(current_timestamp_ns());
114        query.push(("end", end.to_string()));
115
116        if let Some(limit) = self.limit {
117            query.push(("limit", limit.to_string()));
118        }
119
120        let req_builder = self
121            .client
122            .get(format!("{}/loki/api/v1/query_range", self.endpoint))
123            .header("Accept", "application/vnd.apache.parquet")
124            .query(&query);
125
126        let fut = fetch_log_stream(req_builder, self.projection.clone());
127        let stream = futures::stream::once(fut).try_flatten();
128        Ok(Box::pin(RecordBatchStreamAdapter::new(
129            self.schema(),
130            stream,
131        )))
132    }
133
134    fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {
135        Self::try_new(
136            self.endpoint.clone(),
137            self.log_query.clone(),
138            self.start,
139            self.end,
140            self.projection.clone(),
141            limit,
142        )
143        .ok()
144        .map(|exec| Arc::new(exec) as Arc<dyn ExecutionPlan>)
145    }
146
147    fn fetch(&self) -> Option<usize> {
148        self.limit
149    }
150}
151
152impl DisplayAs for LokiLogScanExec {
153    fn fmt_as(&self, _t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
154        write!(
155            f,
156            "LokiLogScanExec: endpoint={}, query={}",
157            self.endpoint, self.log_query
158        )?;
159        if let Some(start) = self.start {
160            write!(f, ", start={}", start)?;
161        }
162        if let Some(end) = self.end {
163            write!(f, ", end={}", end)?;
164        }
165        if self.projection.is_some() {
166            let projected_schema = self.schema();
167            write!(
168                f,
169                ", projection={}",
170                ProjectSchemaDisplay(&projected_schema)
171            )?;
172        }
173        if let Some(limit) = self.limit {
174            write!(f, ", limit={limit}")?;
175        }
176        Ok(())
177    }
178}
179
180async fn fetch_log_stream(
181    req_builder: RequestBuilder,
182    projection: Option<Vec<usize>>,
183) -> DFResult<Pin<Box<dyn Stream<Item = DFResult<RecordBatch>> + Send>>> {
184    let resp = req_builder
185        .send()
186        .await
187        .map_err(|e| DataFusionError::Execution(format!("Failed to send request to loki: {e}")))?;
188    let status = resp.status();
189    if !status.is_success() {
190        let url = resp.url().clone();
191        let text = resp.text().await;
192        println!("LWZTEST resp text: {text:?}");
193        return Err(DataFusionError::Execution(format!(
194            "Request to logi failed with status {}, url: {}",
195            status, url
196        )));
197    }
198    let bytes = resp.bytes().await.map_err(|e| {
199        DataFusionError::Execution(format!("Failed to get response body as bytes: {e}"))
200    })?;
201    let cursor = Cursor::new(bytes);
202
203    let builder = ParquetRecordBatchStreamBuilder::new(cursor).await?;
204    let parquet_schema = builder.parquet_schema();
205
206    let projection_mask = match projection {
207        Some(proj) => ProjectionMask::roots(parquet_schema, proj),
208        None => ProjectionMask::all(),
209    };
210
211    let stream = builder
212        .with_batch_size(4096)
213        .with_projection(projection_mask)
214        .build()?
215        .map_err(|e| DataFusionError::ParquetError(Box::new(e)))
216        .boxed();
217
218    Ok(stream)
219}