1use std::sync::{Arc, LazyLock};
2
3use datafusion::{
4 arrow::datatypes::{DataType, Field, FieldRef, Schema, SchemaRef, TimeUnit},
5 catalog::{Session, TableProvider},
6 common::exec_err,
7 datasource::TableType,
8 error::DataFusionError,
9 logical_expr::{TableProviderFilterPushDown, dml::InsertOp},
10 physical_plan::ExecutionPlan,
11 prelude::Expr,
12};
13
14use crate::{
15 DFResult, LokiLogInsertExec, LokiLogScanExec, TimestampBound, expr_to_label_filter,
16 expr_to_line_filter, parse_timestamp_bound,
17};
18
19pub static TIMESTAMP_FIELD_REF: LazyLock<FieldRef> = LazyLock::new(|| {
20 Arc::new(Field::new(
21 "timestamp",
22 DataType::Timestamp(TimeUnit::Nanosecond, None),
23 false,
24 ))
25});
26pub static LABELS_FIELD_REF: LazyLock<FieldRef> = LazyLock::new(|| {
27 let key_field = Field::new("keys", DataType::Utf8, false);
28 let value_field = Field::new("values", DataType::Utf8, true); let entry_struct = DataType::Struct(vec![key_field, value_field].into());
30 let map_field = Arc::new(Field::new("entries", entry_struct, false));
31 Arc::new(Field::new("labels", DataType::Map(map_field, false), true))
32});
33pub static LINE_FIELD_REF: LazyLock<FieldRef> =
34 LazyLock::new(|| Arc::new(Field::new("line", DataType::Utf8, true)));
35
36pub static LOG_TABLE_SCHEMA: LazyLock<SchemaRef> = LazyLock::new(|| {
37 Arc::new(Schema::new(vec![
38 TIMESTAMP_FIELD_REF.clone(),
39 LABELS_FIELD_REF.clone(),
40 LINE_FIELD_REF.clone(),
41 ]))
42});
43
44#[derive(Debug)]
45pub struct LokiLogTable {
46 pub endpoint: String,
47 pub default_label: Option<String>,
48}
49
50impl LokiLogTable {
51 pub fn try_new(endpoint: impl Into<String>) -> DFResult<Self> {
52 let endpoint = endpoint.into();
53
54 Ok(LokiLogTable {
55 endpoint,
56 default_label: None,
57 })
58 }
59
60 pub fn with_default_label(mut self, default_label: Option<String>) -> Self {
61 self.default_label = default_label;
62 self
63 }
64
65 pub async fn check_connection(&self) -> DFResult<()> {
66 let client = reqwest::Client::new();
67 let resp = client
68 .get(format!("{}/loki/api/v1/status/buildinfo", self.endpoint))
69 .send()
70 .await
71 .map_err(|e| DataFusionError::External(Box::new(e)))?;
72
73 if resp.status().is_success() {
74 Ok(())
75 } else {
76 exec_err!("Failed to connect to loki with status {}", resp.status())
77 }
78 }
79}
80
81#[async_trait::async_trait]
82impl TableProvider for LokiLogTable {
83 fn as_any(&self) -> &dyn std::any::Any {
84 self
85 }
86
87 fn schema(&self) -> SchemaRef {
88 LOG_TABLE_SCHEMA.clone()
89 }
90
91 fn table_type(&self) -> TableType {
92 TableType::Base
93 }
94
95 async fn scan(
96 &self,
97 _state: &dyn Session,
98 projection: Option<&Vec<usize>>,
99 filters: &[Expr],
100 limit: Option<usize>,
101 ) -> DFResult<Arc<dyn ExecutionPlan>> {
102 let mut label_filters = Vec::with_capacity(filters.len());
103 let mut line_filters = Vec::with_capacity(filters.len());
104 let mut start = None;
105 let mut end = None;
106 for filter in filters {
107 if let Some(label_filter) = expr_to_label_filter(filter) {
108 label_filters.push(label_filter);
109 } else if let Some(line_filter) = expr_to_line_filter(filter) {
110 line_filters.push(line_filter);
111 } else if let Some(timestamp_bound) = parse_timestamp_bound(filter) {
112 match timestamp_bound {
113 TimestampBound::Start(v) => start = v,
114 TimestampBound::End(v) => end = v,
115 }
116 } else {
117 return exec_err!("Unsupported filter: {filter}");
118 }
119 }
120
121 if label_filters.is_empty() {
122 if let Some(default_label) = &self.default_label {
123 label_filters.push(format!("{default_label}=~\".+\""));
124 } else {
125 return exec_err!("No label filters or default label provided");
126 }
127 }
128
129 let log_query = format!(
130 "{{{}}} {}",
131 label_filters.join(", "),
132 line_filters.join(" ")
133 );
134 let exec = LokiLogScanExec::try_new(
135 self.endpoint.clone(),
136 log_query,
137 start,
138 end,
139 projection.cloned(),
140 limit,
141 )?;
142 Ok(Arc::new(exec))
143 }
144
145 fn supports_filters_pushdown(
146 &self,
147 filters: &[&Expr],
148 ) -> DFResult<Vec<TableProviderFilterPushDown>> {
149 let mut pushdown = Vec::with_capacity(filters.len());
150 for filter in filters {
151 if expr_to_label_filter(filter).is_some()
152 || expr_to_line_filter(filter).is_some()
153 || parse_timestamp_bound(filter).is_some()
154 {
155 pushdown.push(TableProviderFilterPushDown::Exact);
156 } else {
157 pushdown.push(TableProviderFilterPushDown::Unsupported);
158 }
159 }
160 Ok(pushdown)
161 }
162
163 async fn insert_into(
164 &self,
165 _state: &dyn Session,
166 input: Arc<dyn ExecutionPlan>,
167 insert_op: InsertOp,
168 ) -> DFResult<Arc<dyn ExecutionPlan>> {
169 match insert_op {
170 InsertOp::Append => {}
171 InsertOp::Overwrite | InsertOp::Replace => {
172 return exec_err!("Only support append insert operation");
173 }
174 }
175
176 let exec = LokiLogInsertExec::try_new(input, self.endpoint.clone())?;
177 Ok(Arc::new(exec))
178 }
179}