datafusion_loki/
insert.rs

1use std::{
2    collections::HashMap,
3    sync::{Arc, LazyLock},
4};
5
6use datafusion::{
7    arrow::{
8        array::{
9            Array, ArrayRef, Int64Array, MapArray, RecordBatch, StringArray, StructArray,
10            TimestampNanosecondArray,
11        },
12        datatypes::{DataType, Field, Schema, SchemaRef},
13    },
14    common::{plan_err, stats::Precision},
15    error::DataFusionError,
16    execution::{SendableRecordBatchStream, TaskContext},
17    physical_expr::EquivalenceProperties,
18    physical_plan::{
19        DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties, Partitioning,
20        PlanProperties, stream::RecordBatchStreamAdapter,
21    },
22};
23use futures::StreamExt;
24use reqwest::Client;
25use serde::{Deserialize, Serialize};
26
27use crate::{DFResult, LOG_TABLE_SCHEMA};
28
29pub static COUNT_SCHEMA: LazyLock<SchemaRef> = LazyLock::new(|| {
30    Arc::new(Schema::new(vec![Field::new(
31        "count",
32        DataType::Int64,
33        false,
34    )]))
35});
36
37#[derive(Debug)]
38pub struct LokiLogInsertExec {
39    pub input: Arc<dyn ExecutionPlan>,
40    pub endpoint: String,
41    client: Client,
42    plan_properties: PlanProperties,
43}
44
45impl LokiLogInsertExec {
46    pub fn try_new(input: Arc<dyn ExecutionPlan>, endpoint: String) -> DFResult<Self> {
47        if input.schema() != LOG_TABLE_SCHEMA.clone() {
48            return plan_err!("input exec schema not matched: {:?}", input.schema());
49        }
50
51        let plan_properties = PlanProperties::new(
52            EquivalenceProperties::new(COUNT_SCHEMA.clone()),
53            Partitioning::UnknownPartitioning(1),
54            input.pipeline_behavior(),
55            input.boundedness(),
56        );
57
58        let client = Client::builder()
59            .build()
60            .map_err(|e| DataFusionError::Plan(format!("Failed to build http client: {e}")))?;
61
62        Ok(Self {
63            input,
64            endpoint,
65            client,
66            plan_properties,
67        })
68    }
69}
70
71impl ExecutionPlan for LokiLogInsertExec {
72    fn name(&self) -> &str {
73        "LokiLogInsertExec"
74    }
75
76    fn as_any(&self) -> &dyn std::any::Any {
77        self
78    }
79
80    fn properties(&self) -> &PlanProperties {
81        &self.plan_properties
82    }
83
84    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
85        vec![&self.input]
86    }
87
88    fn with_new_children(
89        self: Arc<Self>,
90        children: Vec<Arc<dyn ExecutionPlan>>,
91    ) -> DFResult<Arc<dyn ExecutionPlan>> {
92        let input = children[0].clone();
93        let exec = Self::try_new(input, self.endpoint.clone())?;
94        Ok(Arc::new(exec))
95    }
96
97    fn execute(
98        &self,
99        partition: usize,
100        context: Arc<TaskContext>,
101    ) -> DFResult<SendableRecordBatchStream> {
102        let mut input_stream = self.input.execute(partition, context)?;
103
104        let endpoint = self.endpoint.clone();
105        let client = self.client.clone();
106
107        let stream = futures::stream::once(async move {
108            let mut count = 0;
109            while let Some(batch) = input_stream.next().await {
110                let batch = batch?;
111                push_logs(&endpoint, &client, &batch).await?;
112                count += batch.num_rows();
113            }
114            make_result_batch(count as i64)
115        })
116        .boxed();
117
118        Ok(Box::pin(RecordBatchStreamAdapter::new(
119            COUNT_SCHEMA.clone(),
120            stream,
121        )))
122    }
123}
124
125impl DisplayAs for LokiLogInsertExec {
126    fn fmt_as(&self, _t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
127        write!(f, "LokiLogInsertExec: endpoint={}", self.endpoint)?;
128        if let Ok(stats) = self.input.partition_statistics(None) {
129            match stats.num_rows {
130                Precision::Exact(rows) => write!(f, ", rows={rows}")?,
131                Precision::Inexact(rows) => write!(f, ", rows≈{rows}")?,
132                Precision::Absent => {}
133            }
134        }
135        Ok(())
136    }
137}
138
139fn make_result_batch(count: i64) -> DFResult<RecordBatch> {
140    let array = Arc::new(Int64Array::from(vec![count])) as ArrayRef;
141    let batch = RecordBatch::try_new(COUNT_SCHEMA.clone(), vec![array])?;
142    Ok(batch)
143}
144
145async fn push_logs(endpoint: &str, client: &Client, batch: &RecordBatch) -> DFResult<()> {
146    let log_streams = build_log_streams(batch)?;
147    let resp = client
148        .post(format!("{endpoint}/loki/api/v1/push"))
149        .json(&log_streams)
150        .send()
151        .await
152        .map_err(|e| {
153            DataFusionError::Execution(format!("Failed to send push request to loki: {e:?}"))
154        })?;
155    let status = resp.status();
156    if !status.is_success() {
157        let text = resp.text().await.ok();
158        let with_text = if let Some(t) = text {
159            format!(", text: {t}")
160        } else {
161            "".to_string()
162        };
163        return Err(DataFusionError::Execution(format!(
164            "Failed to send push request to loki with status {status}{with_text}",
165        )));
166    }
167    Ok(())
168}
169
170fn build_log_streams(batch: &RecordBatch) -> DFResult<LogStreams> {
171    let timestamp_arr = batch.column(0);
172    let timestamp_arr = timestamp_arr
173        .as_any()
174        .downcast_ref::<TimestampNanosecondArray>()
175        .ok_or_else(|| {
176            DataFusionError::Execution("Failed to downcast timestamp array".to_string())
177        })?;
178    let labels_arr = batch.column(1);
179    let labels_arr = labels_arr
180        .as_any()
181        .downcast_ref::<MapArray>()
182        .ok_or_else(|| DataFusionError::Execution("Failed to downcast labels array".to_string()))?;
183    let line_arr = batch.column(2);
184    let line_arr = line_arr
185        .as_any()
186        .downcast_ref::<StringArray>()
187        .ok_or_else(|| DataFusionError::Execution("Failed to downcast line array".to_string()))?;
188
189    let streams = timestamp_arr
190        .iter()
191        .zip(labels_arr.iter())
192        .zip(line_arr.iter())
193        .map(|((timestamp, labels), line)| {
194            let timestamp = timestamp.ok_or_else(|| {
195                DataFusionError::Execution("timestamp should not be null".to_string())
196            })?;
197            let label_map = if let Some(labels) = labels {
198                struct_arr_to_map(&labels)?
199            } else {
200                HashMap::new()
201            };
202            let line = line.map(|s| s.to_string()).unwrap_or_default();
203            Ok::<_, DataFusionError>(LogStream {
204                stream: label_map,
205                values: vec![[timestamp.to_string(), line.to_string()]],
206            })
207        })
208        .collect::<Result<Vec<_>, _>>()?;
209    Ok(LogStreams { streams })
210}
211
212fn struct_arr_to_map(arr: &StructArray) -> DFResult<HashMap<String, String>> {
213    let keys_arr = arr.column(0);
214    let keys_arr = keys_arr
215        .as_any()
216        .downcast_ref::<StringArray>()
217        .ok_or_else(|| DataFusionError::Execution("Failed to downcast keys array".to_string()))?;
218    let values_arr = arr.column(1);
219    let values_arr = values_arr
220        .as_any()
221        .downcast_ref::<StringArray>()
222        .ok_or_else(|| DataFusionError::Execution("Failed to downcast values array".to_string()))?;
223
224    let mut map = HashMap::new();
225    keys_arr
226        .iter()
227        .zip(values_arr.iter())
228        .map(|(key, value)| {
229            let key = key.ok_or_else(|| {
230                DataFusionError::Execution("label key should not be null".to_string())
231            })?;
232            let value = value.map(|v| v.to_string()).unwrap_or_default();
233            map.insert(key.to_string(), value);
234            Ok::<_, DataFusionError>(())
235        })
236        .collect::<Result<Vec<_>, _>>()?;
237
238    Ok(map)
239}
240
241#[derive(Debug, Serialize, Deserialize)]
242struct LogStreams {
243    streams: Vec<LogStream>,
244}
245
246#[derive(Debug, Serialize, Deserialize)]
247struct LogStream {
248    stream: HashMap<String, String>,
249    values: Vec<[String; 2]>,
250}