use std::{
collections::HashMap,
sync::{Arc, LazyLock},
};
use arrow::{
array::{
Array, ArrayRef, Int64Array, MapArray, RecordBatch, StringArray, StructArray,
TimestampNanosecondArray,
},
datatypes::{DataType, Field, Schema, SchemaRef},
};
use datafusion_common::{DataFusionError, plan_err, stats::Precision};
use datafusion_execution::{SendableRecordBatchStream, TaskContext};
use datafusion_physical_expr::EquivalenceProperties;
use datafusion_physical_plan::{
DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties, Partitioning,
PlanProperties, stream::RecordBatchStreamAdapter,
};
use futures::StreamExt;
use reqwest::Client;
use serde::{Deserialize, Serialize};
use crate::{DFResult, LOG_TABLE_SCHEMA};
pub static COUNT_SCHEMA: LazyLock<SchemaRef> = LazyLock::new(|| {
Arc::new(Schema::new(vec![Field::new(
"count",
DataType::Int64,
false,
)]))
});
#[derive(Debug)]
pub struct LokiLogInsertExec {
pub input: Arc<dyn ExecutionPlan>,
pub endpoint: String,
client: Client,
plan_properties: PlanProperties,
}
impl LokiLogInsertExec {
pub fn try_new(input: Arc<dyn ExecutionPlan>, endpoint: String) -> DFResult<Self> {
if input.schema() != LOG_TABLE_SCHEMA.clone() {
return plan_err!("input exec schema not matched: {:?}", input.schema());
}
let plan_properties = PlanProperties::new(
EquivalenceProperties::new(COUNT_SCHEMA.clone()),
Partitioning::UnknownPartitioning(1),
input.pipeline_behavior(),
input.boundedness(),
);
let client = Client::builder()
.build()
.map_err(|e| DataFusionError::Plan(format!("Failed to build http client: {e}")))?;
Ok(Self {
input,
endpoint,
client,
plan_properties,
})
}
}
impl ExecutionPlan for LokiLogInsertExec {
fn name(&self) -> &str {
"LokiLogInsertExec"
}
fn as_any(&self) -> &dyn std::any::Any {
self
}
fn properties(&self) -> &PlanProperties {
&self.plan_properties
}
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
vec![&self.input]
}
fn with_new_children(
self: Arc<Self>,
children: Vec<Arc<dyn ExecutionPlan>>,
) -> DFResult<Arc<dyn ExecutionPlan>> {
let input = children[0].clone();
let exec = Self::try_new(input, self.endpoint.clone())?;
Ok(Arc::new(exec))
}
fn execute(
&self,
partition: usize,
context: Arc<TaskContext>,
) -> DFResult<SendableRecordBatchStream> {
let mut input_stream = self.input.execute(partition, context)?;
let endpoint = self.endpoint.clone();
let client = self.client.clone();
let stream = futures::stream::once(async move {
let mut count = 0;
while let Some(batch) = input_stream.next().await {
let batch = batch?;
push_logs(&endpoint, &client, &batch).await?;
count += batch.num_rows();
}
make_result_batch(count as i64)
})
.boxed();
Ok(Box::pin(RecordBatchStreamAdapter::new(
COUNT_SCHEMA.clone(),
stream,
)))
}
}
impl DisplayAs for LokiLogInsertExec {
fn fmt_as(&self, _t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "LokiLogInsertExec: endpoint={}", self.endpoint)?;
if let Ok(stats) = self.input.partition_statistics(None) {
match stats.num_rows {
Precision::Exact(rows) => write!(f, ", rows={rows}")?,
Precision::Inexact(rows) => write!(f, ", rows≈{rows}")?,
Precision::Absent => {}
}
}
Ok(())
}
}
fn make_result_batch(count: i64) -> DFResult<RecordBatch> {
let array = Arc::new(Int64Array::from(vec![count])) as ArrayRef;
let batch = RecordBatch::try_new(COUNT_SCHEMA.clone(), vec![array])?;
Ok(batch)
}
async fn push_logs(endpoint: &str, client: &Client, batch: &RecordBatch) -> DFResult<()> {
let log_streams = build_log_streams(batch)?;
let resp = client
.post(format!("{endpoint}/loki/api/v1/push"))
.json(&log_streams)
.send()
.await
.map_err(|e| {
DataFusionError::Execution(format!("Failed to send push request to loki: {e:?}"))
})?;
let status = resp.status();
if !status.is_success() {
let text = resp.text().await.ok();
let with_text = if let Some(t) = text {
format!(", text: {t}")
} else {
"".to_string()
};
return Err(DataFusionError::Execution(format!(
"Failed to send push request to loki with status {status}{with_text}",
)));
}
Ok(())
}
fn build_log_streams(batch: &RecordBatch) -> DFResult<LogStreams> {
let timestamp_arr = batch.column(0);
let timestamp_arr = timestamp_arr
.as_any()
.downcast_ref::<TimestampNanosecondArray>()
.ok_or_else(|| {
DataFusionError::Execution("Failed to downcast timestamp array".to_string())
})?;
let labels_arr = batch.column(1);
let labels_arr = labels_arr
.as_any()
.downcast_ref::<MapArray>()
.ok_or_else(|| DataFusionError::Execution("Failed to downcast labels array".to_string()))?;
let line_arr = batch.column(2);
let line_arr = line_arr
.as_any()
.downcast_ref::<StringArray>()
.ok_or_else(|| DataFusionError::Execution("Failed to downcast line array".to_string()))?;
let streams = timestamp_arr
.iter()
.zip(labels_arr.iter())
.zip(line_arr.iter())
.map(|((timestamp, labels), line)| {
let timestamp = timestamp.ok_or_else(|| {
DataFusionError::Execution("timestamp should not be null".to_string())
})?;
let label_map = if let Some(labels) = labels {
struct_arr_to_map(&labels)?
} else {
HashMap::new()
};
let line = line.map(|s| s.to_string()).unwrap_or_default();
Ok::<_, DataFusionError>(LogStream {
stream: label_map,
values: vec![[timestamp.to_string(), line.to_string()]],
})
})
.collect::<Result<Vec<_>, _>>()?;
Ok(LogStreams { streams })
}
fn struct_arr_to_map(arr: &StructArray) -> DFResult<HashMap<String, String>> {
let keys_arr = arr.column(0);
let keys_arr = keys_arr
.as_any()
.downcast_ref::<StringArray>()
.ok_or_else(|| DataFusionError::Execution("Failed to downcast keys array".to_string()))?;
let values_arr = arr.column(1);
let values_arr = values_arr
.as_any()
.downcast_ref::<StringArray>()
.ok_or_else(|| DataFusionError::Execution("Failed to downcast values array".to_string()))?;
let mut map = HashMap::new();
keys_arr
.iter()
.zip(values_arr.iter())
.map(|(key, value)| {
let key = key.ok_or_else(|| {
DataFusionError::Execution("label key should not be null".to_string())
})?;
let value = value.map(|v| v.to_string()).unwrap_or_default();
map.insert(key.to_string(), value);
Ok::<_, DataFusionError>(())
})
.collect::<Result<Vec<_>, _>>()?;
Ok(map)
}
#[derive(Debug, Serialize, Deserialize)]
struct LogStreams {
streams: Vec<LogStream>,
}
#[derive(Debug, Serialize, Deserialize)]
struct LogStream {
stream: HashMap<String, String>,
values: Vec<[String; 2]>,
}