datafusion_loki/
insert.rs

1use std::{
2    collections::HashMap,
3    sync::{Arc, LazyLock},
4};
5
6use arrow::{
7    array::{
8        Array, ArrayRef, Int64Array, MapArray, RecordBatch, StringArray, StructArray,
9        TimestampNanosecondArray,
10    },
11    datatypes::{DataType, Field, Schema, SchemaRef},
12};
13use datafusion_common::{DataFusionError, plan_err, stats::Precision};
14use datafusion_execution::{SendableRecordBatchStream, TaskContext};
15use datafusion_physical_expr::EquivalenceProperties;
16use datafusion_physical_plan::{
17    DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties, Partitioning,
18    PlanProperties, stream::RecordBatchStreamAdapter,
19};
20use futures::StreamExt;
21use reqwest::Client;
22use serde::{Deserialize, Serialize};
23
24use crate::{DFResult, LOG_TABLE_SCHEMA};
25
26pub static COUNT_SCHEMA: LazyLock<SchemaRef> = LazyLock::new(|| {
27    Arc::new(Schema::new(vec![Field::new(
28        "count",
29        DataType::Int64,
30        false,
31    )]))
32});
33
34#[derive(Debug)]
35pub struct LokiLogInsertExec {
36    pub input: Arc<dyn ExecutionPlan>,
37    pub endpoint: String,
38    client: Client,
39    plan_properties: PlanProperties,
40}
41
42impl LokiLogInsertExec {
43    pub fn try_new(input: Arc<dyn ExecutionPlan>, endpoint: String) -> DFResult<Self> {
44        if input.schema() != LOG_TABLE_SCHEMA.clone() {
45            return plan_err!("input exec schema not matched: {:?}", input.schema());
46        }
47
48        let plan_properties = PlanProperties::new(
49            EquivalenceProperties::new(COUNT_SCHEMA.clone()),
50            Partitioning::UnknownPartitioning(1),
51            input.pipeline_behavior(),
52            input.boundedness(),
53        );
54
55        let client = Client::builder()
56            .build()
57            .map_err(|e| DataFusionError::Plan(format!("Failed to build http client: {e}")))?;
58
59        Ok(Self {
60            input,
61            endpoint,
62            client,
63            plan_properties,
64        })
65    }
66}
67
68impl ExecutionPlan for LokiLogInsertExec {
69    fn name(&self) -> &str {
70        "LokiLogInsertExec"
71    }
72
73    fn as_any(&self) -> &dyn std::any::Any {
74        self
75    }
76
77    fn properties(&self) -> &PlanProperties {
78        &self.plan_properties
79    }
80
81    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
82        vec![&self.input]
83    }
84
85    fn with_new_children(
86        self: Arc<Self>,
87        children: Vec<Arc<dyn ExecutionPlan>>,
88    ) -> DFResult<Arc<dyn ExecutionPlan>> {
89        let input = children[0].clone();
90        let exec = Self::try_new(input, self.endpoint.clone())?;
91        Ok(Arc::new(exec))
92    }
93
94    fn execute(
95        &self,
96        partition: usize,
97        context: Arc<TaskContext>,
98    ) -> DFResult<SendableRecordBatchStream> {
99        let mut input_stream = self.input.execute(partition, context)?;
100
101        let endpoint = self.endpoint.clone();
102        let client = self.client.clone();
103
104        let stream = futures::stream::once(async move {
105            let mut count = 0;
106            while let Some(batch) = input_stream.next().await {
107                let batch = batch?;
108                push_logs(&endpoint, &client, &batch).await?;
109                count += batch.num_rows();
110            }
111            make_result_batch(count as i64)
112        })
113        .boxed();
114
115        Ok(Box::pin(RecordBatchStreamAdapter::new(
116            COUNT_SCHEMA.clone(),
117            stream,
118        )))
119    }
120}
121
122impl DisplayAs for LokiLogInsertExec {
123    fn fmt_as(&self, _t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
124        write!(f, "LokiLogInsertExec: endpoint={}", self.endpoint)?;
125        if let Ok(stats) = self.input.partition_statistics(None) {
126            match stats.num_rows {
127                Precision::Exact(rows) => write!(f, ", rows={rows}")?,
128                Precision::Inexact(rows) => write!(f, ", rows≈{rows}")?,
129                Precision::Absent => {}
130            }
131        }
132        Ok(())
133    }
134}
135
136fn make_result_batch(count: i64) -> DFResult<RecordBatch> {
137    let array = Arc::new(Int64Array::from(vec![count])) as ArrayRef;
138    let batch = RecordBatch::try_new(COUNT_SCHEMA.clone(), vec![array])?;
139    Ok(batch)
140}
141
142async fn push_logs(endpoint: &str, client: &Client, batch: &RecordBatch) -> DFResult<()> {
143    let log_streams = build_log_streams(batch)?;
144    let resp = client
145        .post(format!("{endpoint}/loki/api/v1/push"))
146        .json(&log_streams)
147        .send()
148        .await
149        .map_err(|e| {
150            DataFusionError::Execution(format!("Failed to send push request to loki: {e:?}"))
151        })?;
152    let status = resp.status();
153    if !status.is_success() {
154        let text = resp.text().await.ok();
155        let with_text = if let Some(t) = text {
156            format!(", text: {t}")
157        } else {
158            "".to_string()
159        };
160        return Err(DataFusionError::Execution(format!(
161            "Failed to send push request to loki with status {status}{with_text}",
162        )));
163    }
164    Ok(())
165}
166
167fn build_log_streams(batch: &RecordBatch) -> DFResult<LogStreams> {
168    let timestamp_arr = batch.column(0);
169    let timestamp_arr = timestamp_arr
170        .as_any()
171        .downcast_ref::<TimestampNanosecondArray>()
172        .ok_or_else(|| {
173            DataFusionError::Execution("Failed to downcast timestamp array".to_string())
174        })?;
175    let labels_arr = batch.column(1);
176    let labels_arr = labels_arr
177        .as_any()
178        .downcast_ref::<MapArray>()
179        .ok_or_else(|| DataFusionError::Execution("Failed to downcast labels array".to_string()))?;
180    let line_arr = batch.column(2);
181    let line_arr = line_arr
182        .as_any()
183        .downcast_ref::<StringArray>()
184        .ok_or_else(|| DataFusionError::Execution("Failed to downcast line array".to_string()))?;
185
186    let streams = timestamp_arr
187        .iter()
188        .zip(labels_arr.iter())
189        .zip(line_arr.iter())
190        .map(|((timestamp, labels), line)| {
191            let timestamp = timestamp.ok_or_else(|| {
192                DataFusionError::Execution("timestamp should not be null".to_string())
193            })?;
194            let label_map = if let Some(labels) = labels {
195                struct_arr_to_map(&labels)?
196            } else {
197                HashMap::new()
198            };
199            let line = line.map(|s| s.to_string()).unwrap_or_default();
200            Ok::<_, DataFusionError>(LogStream {
201                stream: label_map,
202                values: vec![[timestamp.to_string(), line.to_string()]],
203            })
204        })
205        .collect::<Result<Vec<_>, _>>()?;
206    Ok(LogStreams { streams })
207}
208
209fn struct_arr_to_map(arr: &StructArray) -> DFResult<HashMap<String, String>> {
210    let keys_arr = arr.column(0);
211    let keys_arr = keys_arr
212        .as_any()
213        .downcast_ref::<StringArray>()
214        .ok_or_else(|| DataFusionError::Execution("Failed to downcast keys array".to_string()))?;
215    let values_arr = arr.column(1);
216    let values_arr = values_arr
217        .as_any()
218        .downcast_ref::<StringArray>()
219        .ok_or_else(|| DataFusionError::Execution("Failed to downcast values array".to_string()))?;
220
221    let mut map = HashMap::new();
222    keys_arr
223        .iter()
224        .zip(values_arr.iter())
225        .map(|(key, value)| {
226            let key = key.ok_or_else(|| {
227                DataFusionError::Execution("label key should not be null".to_string())
228            })?;
229            let value = value.map(|v| v.to_string()).unwrap_or_default();
230            map.insert(key.to_string(), value);
231            Ok::<_, DataFusionError>(())
232        })
233        .collect::<Result<Vec<_>, _>>()?;
234
235    Ok(map)
236}
237
238#[derive(Debug, Serialize, Deserialize)]
239struct LogStreams {
240    streams: Vec<LogStream>,
241}
242
243#[derive(Debug, Serialize, Deserialize)]
244struct LogStream {
245    stream: HashMap<String, String>,
246    values: Vec<[String; 2]>,
247}