1use std::sync::{Arc, LazyLock};
2
3use arrow::datatypes::{DataType, Field, FieldRef, Schema, SchemaRef, TimeUnit};
4use datafusion_catalog::{Session, TableProvider};
5use datafusion_common::{DataFusionError, exec_err};
6use datafusion_expr::{Expr, TableProviderFilterPushDown, TableType, dml::InsertOp};
7use datafusion_physical_plan::ExecutionPlan;
8
9use crate::{
10 DFResult, LokiLogInsertExec, LokiLogScanExec, TimestampBound, expr_to_label_filter,
11 expr_to_line_filter, parse_timestamp_bound,
12};
13
14pub static TIMESTAMP_FIELD_REF: LazyLock<FieldRef> = LazyLock::new(|| {
15 Arc::new(Field::new(
16 "timestamp",
17 DataType::Timestamp(TimeUnit::Nanosecond, None),
18 false,
19 ))
20});
21pub static LABELS_FIELD_REF: LazyLock<FieldRef> = LazyLock::new(|| {
22 let key_field = Field::new("keys", DataType::Utf8, false);
23 let value_field = Field::new("values", DataType::Utf8, true); let entry_struct = DataType::Struct(vec![key_field, value_field].into());
25 let map_field = Arc::new(Field::new("entries", entry_struct, false));
26 Arc::new(Field::new("labels", DataType::Map(map_field, false), true))
27});
28pub static LINE_FIELD_REF: LazyLock<FieldRef> =
29 LazyLock::new(|| Arc::new(Field::new("line", DataType::Utf8, true)));
30
31pub static LOG_TABLE_SCHEMA: LazyLock<SchemaRef> = LazyLock::new(|| {
32 Arc::new(Schema::new(vec![
33 TIMESTAMP_FIELD_REF.clone(),
34 LABELS_FIELD_REF.clone(),
35 LINE_FIELD_REF.clone(),
36 ]))
37});
38
39#[derive(Debug)]
40pub struct LokiLogTable {
41 pub endpoint: String,
42 pub default_label: Option<String>,
43}
44
45impl LokiLogTable {
46 pub fn try_new(endpoint: impl Into<String>) -> DFResult<Self> {
47 let endpoint = endpoint.into();
48
49 Ok(LokiLogTable {
50 endpoint,
51 default_label: None,
52 })
53 }
54
55 pub fn with_default_label(mut self, default_label: Option<String>) -> Self {
56 self.default_label = default_label;
57 self
58 }
59
60 pub async fn check_connection(&self) -> DFResult<()> {
61 let client = reqwest::Client::new();
62 let resp = client
63 .get(format!("{}/loki/api/v1/status/buildinfo", self.endpoint))
64 .send()
65 .await
66 .map_err(|e| DataFusionError::External(Box::new(e)))?;
67
68 if resp.status().is_success() {
69 Ok(())
70 } else {
71 exec_err!("Failed to connect to loki with status {}", resp.status())
72 }
73 }
74}
75
76#[async_trait::async_trait]
77impl TableProvider for LokiLogTable {
78 fn as_any(&self) -> &dyn std::any::Any {
79 self
80 }
81
82 fn schema(&self) -> SchemaRef {
83 LOG_TABLE_SCHEMA.clone()
84 }
85
86 fn table_type(&self) -> TableType {
87 TableType::Base
88 }
89
90 async fn scan(
91 &self,
92 _state: &dyn Session,
93 projection: Option<&Vec<usize>>,
94 filters: &[Expr],
95 limit: Option<usize>,
96 ) -> DFResult<Arc<dyn ExecutionPlan>> {
97 let mut label_filters = Vec::with_capacity(filters.len());
98 let mut line_filters = Vec::with_capacity(filters.len());
99 let mut start = None;
100 let mut end = None;
101 for filter in filters {
102 if let Some(label_filter) = expr_to_label_filter(filter) {
103 label_filters.push(label_filter);
104 } else if let Some(line_filter) = expr_to_line_filter(filter) {
105 line_filters.push(line_filter);
106 } else if let Some(timestamp_bound) = parse_timestamp_bound(filter) {
107 match timestamp_bound {
108 TimestampBound::Start(v) => start = v,
109 TimestampBound::End(v) => end = v,
110 }
111 } else {
112 return exec_err!("Unsupported filter: {filter}");
113 }
114 }
115
116 if label_filters.is_empty() {
117 if let Some(default_label) = &self.default_label {
118 label_filters.push(format!("{default_label}=~\".+\""));
119 } else {
120 return exec_err!("No label filters or default label provided");
121 }
122 }
123
124 let log_query = format!(
125 "{{{}}} {}",
126 label_filters.join(", "),
127 line_filters.join(" ")
128 );
129 let exec = LokiLogScanExec::try_new(
130 self.endpoint.clone(),
131 log_query,
132 start,
133 end,
134 projection.cloned(),
135 limit,
136 )?;
137 Ok(Arc::new(exec))
138 }
139
140 fn supports_filters_pushdown(
141 &self,
142 filters: &[&Expr],
143 ) -> DFResult<Vec<TableProviderFilterPushDown>> {
144 let mut pushdown = Vec::with_capacity(filters.len());
145 for filter in filters {
146 if expr_to_label_filter(filter).is_some()
147 || expr_to_line_filter(filter).is_some()
148 || parse_timestamp_bound(filter).is_some()
149 {
150 pushdown.push(TableProviderFilterPushDown::Exact);
151 } else {
152 pushdown.push(TableProviderFilterPushDown::Unsupported);
153 }
154 }
155 Ok(pushdown)
156 }
157
158 async fn insert_into(
159 &self,
160 _state: &dyn Session,
161 input: Arc<dyn ExecutionPlan>,
162 insert_op: InsertOp,
163 ) -> DFResult<Arc<dyn ExecutionPlan>> {
164 match insert_op {
165 InsertOp::Append => {}
166 InsertOp::Overwrite | InsertOp::Replace => {
167 return exec_err!("Only support append insert operation");
168 }
169 }
170
171 let exec = LokiLogInsertExec::try_new(input, self.endpoint.clone())?;
172 Ok(Arc::new(exec))
173 }
174}