datafusion_loki/
table.rs

1use std::sync::{Arc, LazyLock};
2
3use datafusion::{
4    arrow::datatypes::{DataType, Field, FieldRef, Schema, SchemaRef, TimeUnit},
5    catalog::{Session, TableProvider},
6    common::exec_err,
7    datasource::TableType,
8    error::DataFusionError,
9    logical_expr::{TableProviderFilterPushDown, dml::InsertOp},
10    physical_plan::ExecutionPlan,
11    prelude::Expr,
12};
13
14use crate::{
15    DFResult, LokiLogInsertExec, LokiLogScanExec, TimestampBound, expr_to_label_filter,
16    expr_to_line_filter, parse_timestamp_bound,
17};
18
19pub static TIMESTAMP_FIELD_REF: LazyLock<FieldRef> = LazyLock::new(|| {
20    Arc::new(Field::new(
21        "timestamp",
22        DataType::Timestamp(TimeUnit::Nanosecond, None),
23        false,
24    ))
25});
26pub static LABELS_FIELD_REF: LazyLock<FieldRef> = LazyLock::new(|| {
27    let key_field = Field::new("keys", DataType::Utf8, false);
28    let value_field = Field::new("values", DataType::Utf8, true); // 值允许为空
29    let entry_struct = DataType::Struct(vec![key_field, value_field].into());
30    let map_field = Arc::new(Field::new("entries", entry_struct, false));
31    Arc::new(Field::new("labels", DataType::Map(map_field, false), true))
32});
33pub static LINE_FIELD_REF: LazyLock<FieldRef> =
34    LazyLock::new(|| Arc::new(Field::new("line", DataType::Utf8, true)));
35
36pub static LOG_TABLE_SCHEMA: LazyLock<SchemaRef> = LazyLock::new(|| {
37    Arc::new(Schema::new(vec![
38        TIMESTAMP_FIELD_REF.clone(),
39        LABELS_FIELD_REF.clone(),
40        LINE_FIELD_REF.clone(),
41    ]))
42});
43
44#[derive(Debug)]
45pub struct LokiLogTable {
46    pub endpoint: String,
47    pub default_label: Option<String>,
48}
49
50impl LokiLogTable {
51    pub fn try_new(endpoint: impl Into<String>) -> DFResult<Self> {
52        let endpoint = endpoint.into();
53
54        Ok(LokiLogTable {
55            endpoint,
56            default_label: None,
57        })
58    }
59
60    pub fn with_default_label(mut self, default_label: Option<String>) -> Self {
61        self.default_label = default_label;
62        self
63    }
64
65    pub async fn check_connection(&self) -> DFResult<()> {
66        let client = reqwest::Client::new();
67        let resp = client
68            .get(format!("{}/loki/api/v1/status/buildinfo", self.endpoint))
69            .send()
70            .await
71            .map_err(|e| DataFusionError::External(Box::new(e)))?;
72
73        if resp.status().is_success() {
74            Ok(())
75        } else {
76            exec_err!("Failed to connect to loki with status {}", resp.status())
77        }
78    }
79}
80
81#[async_trait::async_trait]
82impl TableProvider for LokiLogTable {
83    fn as_any(&self) -> &dyn std::any::Any {
84        self
85    }
86
87    fn schema(&self) -> SchemaRef {
88        LOG_TABLE_SCHEMA.clone()
89    }
90
91    fn table_type(&self) -> TableType {
92        TableType::Base
93    }
94
95    async fn scan(
96        &self,
97        _state: &dyn Session,
98        projection: Option<&Vec<usize>>,
99        filters: &[Expr],
100        limit: Option<usize>,
101    ) -> DFResult<Arc<dyn ExecutionPlan>> {
102        let mut label_filters = Vec::with_capacity(filters.len());
103        let mut line_filters = Vec::with_capacity(filters.len());
104        let mut start = None;
105        let mut end = None;
106        for filter in filters {
107            if let Some(label_filter) = expr_to_label_filter(filter) {
108                label_filters.push(label_filter);
109            } else if let Some(line_filter) = expr_to_line_filter(filter) {
110                line_filters.push(line_filter);
111            } else if let Some(timestamp_bound) = parse_timestamp_bound(filter) {
112                match timestamp_bound {
113                    TimestampBound::Start(v) => start = v,
114                    TimestampBound::End(v) => end = v,
115                }
116            } else {
117                return exec_err!("Unsupported filter: {filter}");
118            }
119        }
120
121        if label_filters.is_empty() {
122            if let Some(default_label) = &self.default_label {
123                label_filters.push(format!("{default_label}=~\".+\""));
124            } else {
125                return exec_err!("No label filters or default label provided");
126            }
127        }
128
129        let log_query = format!(
130            "{{{}}} {}",
131            label_filters.join(", "),
132            line_filters.join(" ")
133        );
134        let exec = LokiLogScanExec::try_new(
135            self.endpoint.clone(),
136            log_query,
137            start,
138            end,
139            projection.cloned(),
140            limit,
141        )?;
142        Ok(Arc::new(exec))
143    }
144
145    fn supports_filters_pushdown(
146        &self,
147        filters: &[&Expr],
148    ) -> DFResult<Vec<TableProviderFilterPushDown>> {
149        let mut pushdown = Vec::with_capacity(filters.len());
150        for filter in filters {
151            if expr_to_label_filter(filter).is_some()
152                || expr_to_line_filter(filter).is_some()
153                || parse_timestamp_bound(filter).is_some()
154            {
155                pushdown.push(TableProviderFilterPushDown::Exact);
156            } else {
157                pushdown.push(TableProviderFilterPushDown::Unsupported);
158            }
159        }
160        Ok(pushdown)
161    }
162
163    async fn insert_into(
164        &self,
165        _state: &dyn Session,
166        input: Arc<dyn ExecutionPlan>,
167        insert_op: InsertOp,
168    ) -> DFResult<Arc<dyn ExecutionPlan>> {
169        match insert_op {
170            InsertOp::Append => {}
171            InsertOp::Overwrite | InsertOp::Replace => {
172                return exec_err!("Only support append insert operation");
173            }
174        }
175
176        let exec = LokiLogInsertExec::try_new(input, self.endpoint.clone())?;
177        Ok(Arc::new(exec))
178    }
179}