datafusion-loki 0.4.0

A DataFusion table provider for querying Loki data
Documentation
use std::{any::Any, io::Cursor, pin::Pin, sync::Arc};

use arrow::array::RecordBatch;
use datafusion_common::{DataFusionError, exec_err, project_schema};
use datafusion_execution::{SendableRecordBatchStream, TaskContext};
use datafusion_physical_expr::EquivalenceProperties;
use datafusion_physical_plan::{
    DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties,
    display::ProjectSchemaDisplay,
    execution_plan::{Boundedness, EmissionType},
    stream::RecordBatchStreamAdapter,
};
use futures::{Stream, StreamExt, TryStreamExt};
use log::debug;
use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask};
use reqwest::{Client, RequestBuilder};

use crate::{
    DFResult, LOG_TABLE_SCHEMA, current_timestamp_ns, thirty_days_before_now_timestamp_ns,
};

#[derive(Debug)]
pub struct LokiLogScanExec {
    pub endpoint: String,
    pub log_query: String,
    pub start: Option<i64>,
    pub end: Option<i64>,
    pub projection: Option<Vec<usize>>,
    pub limit: Option<usize>,
    client: Client,
    plan_properties: PlanProperties,
}

impl LokiLogScanExec {
    pub fn try_new(
        endpoint: String,
        log_query: String,
        start: Option<i64>,
        end: Option<i64>,
        projection: Option<Vec<usize>>,
        limit: Option<usize>,
    ) -> DFResult<Self> {
        let projected_schema = project_schema(&LOG_TABLE_SCHEMA, projection.as_ref())?;
        let plan_properties = PlanProperties::new(
            EquivalenceProperties::new(projected_schema),
            Partitioning::UnknownPartitioning(1),
            EmissionType::Incremental,
            Boundedness::Bounded,
        );
        let client = Client::builder()
            .build()
            .map_err(|e| DataFusionError::Plan(format!("Failed to build http client: {e}")))?;
        Ok(LokiLogScanExec {
            endpoint,
            log_query,
            start,
            end,
            projection,
            limit,
            client,
            plan_properties,
        })
    }
}

impl ExecutionPlan for LokiLogScanExec {
    fn name(&self) -> &str {
        "LokiLogScanExec"
    }

    fn as_any(&self) -> &dyn Any {
        self
    }

    fn properties(&self) -> &PlanProperties {
        &self.plan_properties
    }

    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
        vec![]
    }

    fn with_new_children(
        self: Arc<Self>,
        _children: Vec<Arc<dyn ExecutionPlan>>,
    ) -> DFResult<Arc<dyn ExecutionPlan>> {
        Ok(self)
    }

    fn execute(
        &self,
        partition: usize,
        _context: Arc<TaskContext>,
    ) -> DFResult<SendableRecordBatchStream> {
        if partition != 0 {
            return exec_err!("LokiLogScanExec does not support multiple partitions");
        }

        debug!(
            "[datafusion-loki] starting to scan logs: query: {}, start: {:?}, end: {:?}, limit: {:?}",
            self.log_query, self.start, self.end, self.limit
        );

        let mut query = Vec::new();
        query.push(("query", self.log_query.clone()));

        let start = self.start.unwrap_or(thirty_days_before_now_timestamp_ns());
        query.push(("start", start.to_string()));

        let end = self.end.unwrap_or(current_timestamp_ns());
        query.push(("end", end.to_string()));

        if let Some(limit) = self.limit {
            query.push(("limit", limit.to_string()));
        }

        let req_builder = self
            .client
            .get(format!("{}/loki/api/v1/query_range", self.endpoint))
            .header("Accept", "application/vnd.apache.parquet")
            .query(&query);

        let fut = fetch_log_stream(req_builder, self.projection.clone());
        let stream = futures::stream::once(fut).try_flatten();
        Ok(Box::pin(RecordBatchStreamAdapter::new(
            self.schema(),
            stream,
        )))
    }

    fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {
        Self::try_new(
            self.endpoint.clone(),
            self.log_query.clone(),
            self.start,
            self.end,
            self.projection.clone(),
            limit,
        )
        .ok()
        .map(|exec| Arc::new(exec) as Arc<dyn ExecutionPlan>)
    }

    fn fetch(&self) -> Option<usize> {
        self.limit
    }
}

impl DisplayAs for LokiLogScanExec {
    fn fmt_as(&self, _t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
        write!(
            f,
            "LokiLogScanExec: endpoint={}, query={}",
            self.endpoint, self.log_query
        )?;
        if let Some(start) = self.start {
            write!(f, ", start={}", start)?;
        }
        if let Some(end) = self.end {
            write!(f, ", end={}", end)?;
        }
        if self.projection.is_some() {
            let projected_schema = self.schema();
            write!(
                f,
                ", projection={}",
                ProjectSchemaDisplay(&projected_schema)
            )?;
        }
        if let Some(limit) = self.limit {
            write!(f, ", limit={limit}")?;
        }
        Ok(())
    }
}

async fn fetch_log_stream(
    req_builder: RequestBuilder,
    projection: Option<Vec<usize>>,
) -> DFResult<Pin<Box<dyn Stream<Item = DFResult<RecordBatch>> + Send>>> {
    let resp = req_builder
        .send()
        .await
        .map_err(|e| DataFusionError::Execution(format!("Failed to send request to loki: {e}")))?;
    let status = resp.status();
    if !status.is_success() {
        let url = resp.url().clone();
        let with_text = if let Ok(text) = resp.text().await {
            format!(", text: {text}")
        } else {
            String::new()
        };
        return exec_err!("Request to logi failed with status {status}, url: {url}{with_text}");
    }
    let bytes = resp.bytes().await.map_err(|e| {
        DataFusionError::Execution(format!("Failed to get response body as bytes: {e}"))
    })?;
    let cursor = Cursor::new(bytes);

    let builder = ParquetRecordBatchStreamBuilder::new(cursor).await?;
    let parquet_schema = builder.parquet_schema();

    let projection_mask = match projection {
        Some(proj) => ProjectionMask::roots(parquet_schema, proj),
        None => ProjectionMask::all(),
    };

    let stream = builder
        .with_batch_size(4096)
        .with_projection(projection_mask)
        .build()?
        .map_err(|e| DataFusionError::ParquetError(Box::new(e)))
        .boxed();

    Ok(stream)
}