1use std::{
2 collections::HashMap,
3 sync::{Arc, LazyLock},
4};
5
6use datafusion::{
7 arrow::{
8 array::{
9 Array, ArrayRef, Int64Array, MapArray, RecordBatch, StringArray, StructArray,
10 TimestampNanosecondArray,
11 },
12 datatypes::{DataType, Field, Schema, SchemaRef},
13 },
14 common::{plan_err, stats::Precision},
15 error::DataFusionError,
16 execution::{SendableRecordBatchStream, TaskContext},
17 physical_expr::EquivalenceProperties,
18 physical_plan::{
19 DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties, Partitioning,
20 PlanProperties, stream::RecordBatchStreamAdapter,
21 },
22};
23use futures::StreamExt;
24use reqwest::Client;
25use serde::{Deserialize, Serialize};
26
27use crate::{DFResult, LOG_TABLE_SCHEMA};
28
29pub static COUNT_SCHEMA: LazyLock<SchemaRef> = LazyLock::new(|| {
30 Arc::new(Schema::new(vec![Field::new(
31 "count",
32 DataType::Int64,
33 false,
34 )]))
35});
36
37#[derive(Debug)]
38pub struct LokiLogInsertExec {
39 pub input: Arc<dyn ExecutionPlan>,
40 pub endpoint: String,
41 client: Client,
42 plan_properties: PlanProperties,
43}
44
45impl LokiLogInsertExec {
46 pub fn try_new(input: Arc<dyn ExecutionPlan>, endpoint: String) -> DFResult<Self> {
47 if input.schema() != LOG_TABLE_SCHEMA.clone() {
48 return plan_err!("input exec schema not matched: {:?}", input.schema());
49 }
50
51 let plan_properties = PlanProperties::new(
52 EquivalenceProperties::new(COUNT_SCHEMA.clone()),
53 Partitioning::UnknownPartitioning(1),
54 input.pipeline_behavior(),
55 input.boundedness(),
56 );
57
58 let client = Client::builder()
59 .build()
60 .map_err(|e| DataFusionError::Plan(format!("Failed to build http client: {e}")))?;
61
62 Ok(Self {
63 input,
64 endpoint,
65 client,
66 plan_properties,
67 })
68 }
69}
70
71impl ExecutionPlan for LokiLogInsertExec {
72 fn name(&self) -> &str {
73 "LokiLogInsertExec"
74 }
75
76 fn as_any(&self) -> &dyn std::any::Any {
77 self
78 }
79
80 fn properties(&self) -> &PlanProperties {
81 &self.plan_properties
82 }
83
84 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
85 vec![&self.input]
86 }
87
88 fn with_new_children(
89 self: Arc<Self>,
90 children: Vec<Arc<dyn ExecutionPlan>>,
91 ) -> DFResult<Arc<dyn ExecutionPlan>> {
92 let input = children[0].clone();
93 let exec = Self::try_new(input, self.endpoint.clone())?;
94 Ok(Arc::new(exec))
95 }
96
97 fn execute(
98 &self,
99 partition: usize,
100 context: Arc<TaskContext>,
101 ) -> DFResult<SendableRecordBatchStream> {
102 let mut input_stream = self.input.execute(partition, context)?;
103
104 let endpoint = self.endpoint.clone();
105 let client = self.client.clone();
106
107 let stream = futures::stream::once(async move {
108 let mut count = 0;
109 while let Some(batch) = input_stream.next().await {
110 let batch = batch?;
111 push_logs(&endpoint, &client, &batch).await?;
112 count += batch.num_rows();
113 }
114 make_result_batch(count as i64)
115 })
116 .boxed();
117
118 Ok(Box::pin(RecordBatchStreamAdapter::new(
119 COUNT_SCHEMA.clone(),
120 stream,
121 )))
122 }
123}
124
125impl DisplayAs for LokiLogInsertExec {
126 fn fmt_as(&self, _t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
127 write!(f, "LokiLogInsertExec: endpoint={}", self.endpoint)?;
128 if let Ok(stats) = self.input.partition_statistics(None) {
129 match stats.num_rows {
130 Precision::Exact(rows) => write!(f, ", rows={rows}")?,
131 Precision::Inexact(rows) => write!(f, ", rows≈{rows}")?,
132 Precision::Absent => {}
133 }
134 }
135 Ok(())
136 }
137}
138
139fn make_result_batch(count: i64) -> DFResult<RecordBatch> {
140 let array = Arc::new(Int64Array::from(vec![count])) as ArrayRef;
141 let batch = RecordBatch::try_new(COUNT_SCHEMA.clone(), vec![array])?;
142 Ok(batch)
143}
144
145async fn push_logs(endpoint: &str, client: &Client, batch: &RecordBatch) -> DFResult<()> {
146 let log_streams = build_log_streams(batch)?;
147 let resp = client
148 .post(format!("{endpoint}/loki/api/v1/push"))
149 .json(&log_streams)
150 .send()
151 .await
152 .map_err(|e| {
153 DataFusionError::Execution(format!("Failed to send push request to loki: {e:?}"))
154 })?;
155 let status = resp.status();
156 if !status.is_success() {
157 let text = resp.text().await.ok();
158 let with_text = if let Some(t) = text {
159 format!(", text: {t}")
160 } else {
161 "".to_string()
162 };
163 return Err(DataFusionError::Execution(format!(
164 "Failed to send push request to loki with status {status}{with_text}",
165 )));
166 }
167 Ok(())
168}
169
170fn build_log_streams(batch: &RecordBatch) -> DFResult<LogStreams> {
171 let timestamp_arr = batch.column(0);
172 let timestamp_arr = timestamp_arr
173 .as_any()
174 .downcast_ref::<TimestampNanosecondArray>()
175 .ok_or_else(|| {
176 DataFusionError::Execution("Failed to downcast timestamp array".to_string())
177 })?;
178 let labels_arr = batch.column(1);
179 let labels_arr = labels_arr
180 .as_any()
181 .downcast_ref::<MapArray>()
182 .ok_or_else(|| DataFusionError::Execution("Failed to downcast labels array".to_string()))?;
183 let line_arr = batch.column(2);
184 let line_arr = line_arr
185 .as_any()
186 .downcast_ref::<StringArray>()
187 .ok_or_else(|| DataFusionError::Execution("Failed to downcast line array".to_string()))?;
188
189 let streams = timestamp_arr
190 .iter()
191 .zip(labels_arr.iter())
192 .zip(line_arr.iter())
193 .map(|((timestamp, labels), line)| {
194 let timestamp = timestamp.ok_or_else(|| {
195 DataFusionError::Execution("timestamp should not be null".to_string())
196 })?;
197 let label_map = if let Some(labels) = labels {
198 struct_arr_to_map(&labels)?
199 } else {
200 HashMap::new()
201 };
202 let line = line.map(|s| s.to_string()).unwrap_or_default();
203 Ok::<_, DataFusionError>(LogStream {
204 stream: label_map,
205 values: vec![[timestamp.to_string(), line.to_string()]],
206 })
207 })
208 .collect::<Result<Vec<_>, _>>()?;
209 Ok(LogStreams { streams })
210}
211
212fn struct_arr_to_map(arr: &StructArray) -> DFResult<HashMap<String, String>> {
213 let keys_arr = arr.column(0);
214 let keys_arr = keys_arr
215 .as_any()
216 .downcast_ref::<StringArray>()
217 .ok_or_else(|| DataFusionError::Execution("Failed to downcast keys array".to_string()))?;
218 let values_arr = arr.column(1);
219 let values_arr = values_arr
220 .as_any()
221 .downcast_ref::<StringArray>()
222 .ok_or_else(|| DataFusionError::Execution("Failed to downcast values array".to_string()))?;
223
224 let mut map = HashMap::new();
225 keys_arr
226 .iter()
227 .zip(values_arr.iter())
228 .map(|(key, value)| {
229 let key = key.ok_or_else(|| {
230 DataFusionError::Execution("label key should not be null".to_string())
231 })?;
232 let value = value.map(|v| v.to_string()).unwrap_or_default();
233 map.insert(key.to_string(), value);
234 Ok::<_, DataFusionError>(())
235 })
236 .collect::<Result<Vec<_>, _>>()?;
237
238 Ok(map)
239}
240
241#[derive(Debug, Serialize, Deserialize)]
242struct LogStreams {
243 streams: Vec<LogStream>,
244}
245
246#[derive(Debug, Serialize, Deserialize)]
247struct LogStream {
248 stream: HashMap<String, String>,
249 values: Vec<[String; 2]>,
250}