datafusion_loki/
table.rs

1use std::sync::{Arc, LazyLock};
2
3use arrow::datatypes::{DataType, Field, FieldRef, Schema, SchemaRef, TimeUnit};
4use datafusion_catalog::{Session, TableProvider};
5use datafusion_common::{DataFusionError, exec_err};
6use datafusion_expr::{Expr, TableProviderFilterPushDown, TableType, dml::InsertOp};
7use datafusion_physical_plan::ExecutionPlan;
8
9use crate::{
10    DFResult, LokiLogInsertExec, LokiLogScanExec, TimestampBound, expr_to_label_filter,
11    expr_to_line_filter, parse_timestamp_bound,
12};
13
14pub static TIMESTAMP_FIELD_REF: LazyLock<FieldRef> = LazyLock::new(|| {
15    Arc::new(Field::new(
16        "timestamp",
17        DataType::Timestamp(TimeUnit::Nanosecond, None),
18        false,
19    ))
20});
21pub static LABELS_FIELD_REF: LazyLock<FieldRef> = LazyLock::new(|| {
22    let key_field = Field::new("keys", DataType::Utf8, false);
23    let value_field = Field::new("values", DataType::Utf8, true); // 值允许为空
24    let entry_struct = DataType::Struct(vec![key_field, value_field].into());
25    let map_field = Arc::new(Field::new("entries", entry_struct, false));
26    Arc::new(Field::new("labels", DataType::Map(map_field, false), true))
27});
28pub static LINE_FIELD_REF: LazyLock<FieldRef> =
29    LazyLock::new(|| Arc::new(Field::new("line", DataType::Utf8, true)));
30
31pub static LOG_TABLE_SCHEMA: LazyLock<SchemaRef> = LazyLock::new(|| {
32    Arc::new(Schema::new(vec![
33        TIMESTAMP_FIELD_REF.clone(),
34        LABELS_FIELD_REF.clone(),
35        LINE_FIELD_REF.clone(),
36    ]))
37});
38
39#[derive(Debug)]
40pub struct LokiLogTable {
41    pub endpoint: String,
42    pub default_label: Option<String>,
43}
44
45impl LokiLogTable {
46    pub fn try_new(endpoint: impl Into<String>) -> DFResult<Self> {
47        let endpoint = endpoint.into();
48
49        Ok(LokiLogTable {
50            endpoint,
51            default_label: None,
52        })
53    }
54
55    pub fn with_default_label(mut self, default_label: Option<String>) -> Self {
56        self.default_label = default_label;
57        self
58    }
59
60    pub async fn check_connection(&self) -> DFResult<()> {
61        let client = reqwest::Client::new();
62        let resp = client
63            .get(format!("{}/loki/api/v1/status/buildinfo", self.endpoint))
64            .send()
65            .await
66            .map_err(|e| DataFusionError::External(Box::new(e)))?;
67
68        if resp.status().is_success() {
69            Ok(())
70        } else {
71            exec_err!("Failed to connect to loki with status {}", resp.status())
72        }
73    }
74}
75
76#[async_trait::async_trait]
77impl TableProvider for LokiLogTable {
78    fn as_any(&self) -> &dyn std::any::Any {
79        self
80    }
81
82    fn schema(&self) -> SchemaRef {
83        LOG_TABLE_SCHEMA.clone()
84    }
85
86    fn table_type(&self) -> TableType {
87        TableType::Base
88    }
89
90    async fn scan(
91        &self,
92        _state: &dyn Session,
93        projection: Option<&Vec<usize>>,
94        filters: &[Expr],
95        limit: Option<usize>,
96    ) -> DFResult<Arc<dyn ExecutionPlan>> {
97        let mut label_filters = Vec::with_capacity(filters.len());
98        let mut line_filters = Vec::with_capacity(filters.len());
99        let mut start = None;
100        let mut end = None;
101        for filter in filters {
102            if let Some(label_filter) = expr_to_label_filter(filter) {
103                label_filters.push(label_filter);
104            } else if let Some(line_filter) = expr_to_line_filter(filter) {
105                line_filters.push(line_filter);
106            } else if let Some(timestamp_bound) = parse_timestamp_bound(filter) {
107                match timestamp_bound {
108                    TimestampBound::Start(v) => start = v,
109                    TimestampBound::End(v) => end = v,
110                }
111            } else {
112                return exec_err!("Unsupported filter: {filter}");
113            }
114        }
115
116        if label_filters.is_empty() {
117            if let Some(default_label) = &self.default_label {
118                label_filters.push(format!("{default_label}=~\".+\""));
119            } else {
120                return exec_err!("No label filters or default label provided");
121            }
122        }
123
124        let log_query = format!(
125            "{{{}}} {}",
126            label_filters.join(", "),
127            line_filters.join(" ")
128        );
129        let exec = LokiLogScanExec::try_new(
130            self.endpoint.clone(),
131            log_query,
132            start,
133            end,
134            projection.cloned(),
135            limit,
136        )?;
137        Ok(Arc::new(exec))
138    }
139
140    fn supports_filters_pushdown(
141        &self,
142        filters: &[&Expr],
143    ) -> DFResult<Vec<TableProviderFilterPushDown>> {
144        let mut pushdown = Vec::with_capacity(filters.len());
145        for filter in filters {
146            if expr_to_label_filter(filter).is_some()
147                || expr_to_line_filter(filter).is_some()
148                || parse_timestamp_bound(filter).is_some()
149            {
150                pushdown.push(TableProviderFilterPushDown::Exact);
151            } else {
152                pushdown.push(TableProviderFilterPushDown::Unsupported);
153            }
154        }
155        Ok(pushdown)
156    }
157
158    async fn insert_into(
159        &self,
160        _state: &dyn Session,
161        input: Arc<dyn ExecutionPlan>,
162        insert_op: InsertOp,
163    ) -> DFResult<Arc<dyn ExecutionPlan>> {
164        match insert_op {
165            InsertOp::Append => {}
166            InsertOp::Overwrite | InsertOp::Replace => {
167                return exec_err!("Only support append insert operation");
168            }
169        }
170
171        let exec = LokiLogInsertExec::try_new(input, self.endpoint.clone())?;
172        Ok(Arc::new(exec))
173    }
174}