1use std::{
2 collections::HashMap,
3 sync::{Arc, LazyLock},
4};
5
6use arrow::{
7 array::{
8 Array, ArrayRef, Int64Array, MapArray, RecordBatch, StringArray, StructArray,
9 TimestampNanosecondArray,
10 },
11 datatypes::{DataType, Field, Schema, SchemaRef},
12};
13use datafusion_common::{DataFusionError, plan_err, stats::Precision};
14use datafusion_execution::{SendableRecordBatchStream, TaskContext};
15use datafusion_physical_expr::EquivalenceProperties;
16use datafusion_physical_plan::{
17 DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties, Partitioning,
18 PlanProperties, stream::RecordBatchStreamAdapter,
19};
20use futures::StreamExt;
21use reqwest::Client;
22use serde::{Deserialize, Serialize};
23
24use crate::{DFResult, LOG_TABLE_SCHEMA};
25
26pub static COUNT_SCHEMA: LazyLock<SchemaRef> = LazyLock::new(|| {
27 Arc::new(Schema::new(vec![Field::new(
28 "count",
29 DataType::Int64,
30 false,
31 )]))
32});
33
34#[derive(Debug)]
35pub struct LokiLogInsertExec {
36 pub input: Arc<dyn ExecutionPlan>,
37 pub endpoint: String,
38 client: Client,
39 plan_properties: PlanProperties,
40}
41
42impl LokiLogInsertExec {
43 pub fn try_new(input: Arc<dyn ExecutionPlan>, endpoint: String) -> DFResult<Self> {
44 if input.schema() != LOG_TABLE_SCHEMA.clone() {
45 return plan_err!("input exec schema not matched: {:?}", input.schema());
46 }
47
48 let plan_properties = PlanProperties::new(
49 EquivalenceProperties::new(COUNT_SCHEMA.clone()),
50 Partitioning::UnknownPartitioning(1),
51 input.pipeline_behavior(),
52 input.boundedness(),
53 );
54
55 let client = Client::builder()
56 .build()
57 .map_err(|e| DataFusionError::Plan(format!("Failed to build http client: {e}")))?;
58
59 Ok(Self {
60 input,
61 endpoint,
62 client,
63 plan_properties,
64 })
65 }
66}
67
68impl ExecutionPlan for LokiLogInsertExec {
69 fn name(&self) -> &str {
70 "LokiLogInsertExec"
71 }
72
73 fn as_any(&self) -> &dyn std::any::Any {
74 self
75 }
76
77 fn properties(&self) -> &PlanProperties {
78 &self.plan_properties
79 }
80
81 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
82 vec![&self.input]
83 }
84
85 fn with_new_children(
86 self: Arc<Self>,
87 children: Vec<Arc<dyn ExecutionPlan>>,
88 ) -> DFResult<Arc<dyn ExecutionPlan>> {
89 let input = children[0].clone();
90 let exec = Self::try_new(input, self.endpoint.clone())?;
91 Ok(Arc::new(exec))
92 }
93
94 fn execute(
95 &self,
96 partition: usize,
97 context: Arc<TaskContext>,
98 ) -> DFResult<SendableRecordBatchStream> {
99 let mut input_stream = self.input.execute(partition, context)?;
100
101 let endpoint = self.endpoint.clone();
102 let client = self.client.clone();
103
104 let stream = futures::stream::once(async move {
105 let mut count = 0;
106 while let Some(batch) = input_stream.next().await {
107 let batch = batch?;
108 push_logs(&endpoint, &client, &batch).await?;
109 count += batch.num_rows();
110 }
111 make_result_batch(count as i64)
112 })
113 .boxed();
114
115 Ok(Box::pin(RecordBatchStreamAdapter::new(
116 COUNT_SCHEMA.clone(),
117 stream,
118 )))
119 }
120}
121
122impl DisplayAs for LokiLogInsertExec {
123 fn fmt_as(&self, _t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
124 write!(f, "LokiLogInsertExec: endpoint={}", self.endpoint)?;
125 if let Ok(stats) = self.input.partition_statistics(None) {
126 match stats.num_rows {
127 Precision::Exact(rows) => write!(f, ", rows={rows}")?,
128 Precision::Inexact(rows) => write!(f, ", rows≈{rows}")?,
129 Precision::Absent => {}
130 }
131 }
132 Ok(())
133 }
134}
135
136fn make_result_batch(count: i64) -> DFResult<RecordBatch> {
137 let array = Arc::new(Int64Array::from(vec![count])) as ArrayRef;
138 let batch = RecordBatch::try_new(COUNT_SCHEMA.clone(), vec![array])?;
139 Ok(batch)
140}
141
142async fn push_logs(endpoint: &str, client: &Client, batch: &RecordBatch) -> DFResult<()> {
143 let log_streams = build_log_streams(batch)?;
144 let resp = client
145 .post(format!("{endpoint}/loki/api/v1/push"))
146 .json(&log_streams)
147 .send()
148 .await
149 .map_err(|e| {
150 DataFusionError::Execution(format!("Failed to send push request to loki: {e:?}"))
151 })?;
152 let status = resp.status();
153 if !status.is_success() {
154 let text = resp.text().await.ok();
155 let with_text = if let Some(t) = text {
156 format!(", text: {t}")
157 } else {
158 "".to_string()
159 };
160 return Err(DataFusionError::Execution(format!(
161 "Failed to send push request to loki with status {status}{with_text}",
162 )));
163 }
164 Ok(())
165}
166
167fn build_log_streams(batch: &RecordBatch) -> DFResult<LogStreams> {
168 let timestamp_arr = batch.column(0);
169 let timestamp_arr = timestamp_arr
170 .as_any()
171 .downcast_ref::<TimestampNanosecondArray>()
172 .ok_or_else(|| {
173 DataFusionError::Execution("Failed to downcast timestamp array".to_string())
174 })?;
175 let labels_arr = batch.column(1);
176 let labels_arr = labels_arr
177 .as_any()
178 .downcast_ref::<MapArray>()
179 .ok_or_else(|| DataFusionError::Execution("Failed to downcast labels array".to_string()))?;
180 let line_arr = batch.column(2);
181 let line_arr = line_arr
182 .as_any()
183 .downcast_ref::<StringArray>()
184 .ok_or_else(|| DataFusionError::Execution("Failed to downcast line array".to_string()))?;
185
186 let streams = timestamp_arr
187 .iter()
188 .zip(labels_arr.iter())
189 .zip(line_arr.iter())
190 .map(|((timestamp, labels), line)| {
191 let timestamp = timestamp.ok_or_else(|| {
192 DataFusionError::Execution("timestamp should not be null".to_string())
193 })?;
194 let label_map = if let Some(labels) = labels {
195 struct_arr_to_map(&labels)?
196 } else {
197 HashMap::new()
198 };
199 let line = line.map(|s| s.to_string()).unwrap_or_default();
200 Ok::<_, DataFusionError>(LogStream {
201 stream: label_map,
202 values: vec![[timestamp.to_string(), line.to_string()]],
203 })
204 })
205 .collect::<Result<Vec<_>, _>>()?;
206 Ok(LogStreams { streams })
207}
208
209fn struct_arr_to_map(arr: &StructArray) -> DFResult<HashMap<String, String>> {
210 let keys_arr = arr.column(0);
211 let keys_arr = keys_arr
212 .as_any()
213 .downcast_ref::<StringArray>()
214 .ok_or_else(|| DataFusionError::Execution("Failed to downcast keys array".to_string()))?;
215 let values_arr = arr.column(1);
216 let values_arr = values_arr
217 .as_any()
218 .downcast_ref::<StringArray>()
219 .ok_or_else(|| DataFusionError::Execution("Failed to downcast values array".to_string()))?;
220
221 let mut map = HashMap::new();
222 keys_arr
223 .iter()
224 .zip(values_arr.iter())
225 .map(|(key, value)| {
226 let key = key.ok_or_else(|| {
227 DataFusionError::Execution("label key should not be null".to_string())
228 })?;
229 let value = value.map(|v| v.to_string()).unwrap_or_default();
230 map.insert(key.to_string(), value);
231 Ok::<_, DataFusionError>(())
232 })
233 .collect::<Result<Vec<_>, _>>()?;
234
235 Ok(map)
236}
237
238#[derive(Debug, Serialize, Deserialize)]
239struct LogStreams {
240 streams: Vec<LogStream>,
241}
242
243#[derive(Debug, Serialize, Deserialize)]
244struct LogStream {
245 stream: HashMap<String, String>,
246 values: Vec<[String; 2]>,
247}