use crate::config::register_plugin;
use crate::config::ItemType;
use crate::config::{ConfigSpec, ExecutionType};
use crate::modules::tls::ClientTlsConfig;
use crate::{BatchingPolicy, MessageBatch};
use crate::{Closer, Error, OutputBatch};
use async_trait::async_trait;
use chrono::Datelike;
use fiddler_macros::fiddler_registration_func;
use flume::{bounded, Receiver, Sender};
use serde::Deserialize;
use serde_yaml::Value;
use std::time::Duration;
use tracing::debug;
use elasticsearch::{
auth::Credentials,
http::{
transport::{SingleNodeConnectionPool, Transport, TransportBuilder},
Url,
},
BulkOperation, BulkParts, Elasticsearch,
};
use chrono::Utc;
use elasticsearch::cert::{Certificate, CertificateValidation};
#[derive(Deserialize, Default)]
struct ElasticConfig {
url: Option<String>,
username: Option<String>,
password: Option<String>,
cloud_id: Option<String>,
index: String,
tls: Option<ClientTlsConfig>,
batch_policy: Option<BatchingPolicy>,
}
pub struct Elastic {
sender: Sender<Request>,
size: usize,
duration: Duration,
max_batch_bytes: usize,
}
struct Request {
message: MessageBatch,
output: Sender<Result<(), Error>>,
}
impl ElasticConfig {
fn get_cert_validation(&self) -> Result<CertificateValidation, Error> {
let Some(ref tls) = self.tls else {
return Ok(CertificateValidation::Default);
};
if tls.skip_verify {
return Ok(CertificateValidation::None);
}
if let Some(ref ca) = tls.ca {
let pem_bytes = crate::modules::tls::read_pem(ca, "tls.ca")?;
let cert = Certificate::from_pem(&pem_bytes).map_err(|e| {
Error::ConfigFailedValidation(format!("failed to parse tls.ca certificate: {}", e))
})?;
return Ok(CertificateValidation::Certificate(cert));
}
Ok(CertificateValidation::Default)
}
fn get_client(&self) -> Result<Elasticsearch, Error> {
let cert_validation = self.get_cert_validation()?;
if let Some(ref cloud_id) = self.cloud_id {
let username = self
.username
.clone()
.ok_or(Error::ConfigFailedValidation("username is required".into()))?;
let password = self
.password
.clone()
.ok_or(Error::ConfigFailedValidation("password is required".into()))?;
let credentials = Credentials::Basic(username, password);
let transport = Transport::cloud(cloud_id, credentials)
.map_err(|e| Error::ConfigFailedValidation(format!("{e}")))?;
Ok(Elasticsearch::new(transport))
} else if self.username.is_some() {
let url = self
.url
.clone()
.ok_or(Error::ConfigFailedValidation("url is required".into()))?;
let es_url =
Url::parse(&url).map_err(|e| Error::ConfigFailedValidation(format!("{}", e)))?;
let connection_pool = SingleNodeConnectionPool::new(es_url);
let username = self
.username
.clone()
.ok_or(Error::ConfigFailedValidation("username is required".into()))?;
let password = self
.password
.clone()
.ok_or(Error::ConfigFailedValidation("password is required".into()))?;
let credentials = Credentials::Basic(username, password);
let transport = TransportBuilder::new(connection_pool)
.auth(credentials)
.cert_validation(cert_validation)
.build()
.map_err(|e| Error::ConfigFailedValidation(format!("{}", e)))?;
Ok(Elasticsearch::new(transport))
} else if self.url.is_some() {
let url = self
.url
.clone()
.ok_or(Error::ConfigFailedValidation("url is required".into()))?;
let es_url =
Url::parse(&url).map_err(|e| Error::ConfigFailedValidation(format!("{}", e)))?;
let connection_pool = SingleNodeConnectionPool::new(es_url);
let transport = TransportBuilder::new(connection_pool)
.cert_validation(cert_validation)
.build()
.map_err(|e| Error::ConfigFailedValidation(format!("{}", e)))?;
Ok(Elasticsearch::new(transport))
} else {
Err(Error::ConfigFailedValidation(
"unable to determine connection type".into(),
))
}
}
}
async fn elasticsearch_handler(
es_client: Elasticsearch,
index: String,
requests: Receiver<Request>,
) -> Result<(), Error> {
while let Ok(req) = requests.recv_async().await {
let mut body: Vec<BulkOperation<_>> = Vec::new();
let now = Utc::now();
let index_date = format!("{}-{}-{}-{}", index, now.year(), now.month(), now.day());
for msg in req.message {
let v: serde_json::Value = match serde_json::from_slice(&msg.bytes) {
Ok(i) => i,
Err(_e) => continue,
};
body.push(BulkOperation::index(v).into());
}
let response = match es_client
.bulk(BulkParts::Index(&index_date))
.body(body)
.send()
.await
{
Ok(i) => i,
Err(e) => {
req.output
.send_async(Err(Error::OutputError(format!("{}", e))))
.await
.map_err(|e| Error::UnableToSendToChannel(format!("{}", e)))?;
continue;
}
};
let json: serde_json::Value = match response.json().await {
Ok(i) => i,
Err(e) => {
req.output
.send_async(Err(Error::OutputError(format!("{}", e))))
.await
.map_err(|e| Error::UnableToSendToChannel(format!("{}", e)))?;
continue;
}
};
match json["errors"].as_bool() {
Some(_e) => match json["items"].as_array() {
Some(arr) => {
let failed: Vec<String> = arr
.iter()
.filter(|v| !v["error"].is_null())
.map(|v| format!("{}", v["error"]))
.collect();
if !failed.is_empty() {
req.output
.send_async(Err(Error::UnRetryable(format!(
"failed to insert record: {}",
failed.join(",")
))))
.await
.map_err(|e| Error::UnableToSendToChannel(format!("{}", e)))?;
continue;
}
}
None => {
req.output
.send_async(Err(Error::OutputError(
"unable to deteremine result".into(),
)))
.await
.map_err(|e| Error::UnableToSendToChannel(format!("{}", e)))?;
continue;
}
},
None => {
req.output
.send_async(Err(Error::OutputError(
"unable to deteremine result".into(),
)))
.await
.map_err(|e| Error::UnableToSendToChannel(format!("{}", e)))?;
continue;
}
};
req.output
.send_async(Ok(()))
.await
.map_err(|e| Error::UnableToSendToChannel(format!("{}", e)))?;
}
Ok(())
}
#[async_trait]
impl OutputBatch for Elastic {
async fn write_batch(&mut self, message: MessageBatch) -> Result<(), Error> {
debug!("Received batch, sending");
let (tx, rx) = bounded(0);
self.sender
.send_async(Request {
message,
output: tx,
})
.await
.map_err(|e| Error::UnableToSendToChannel(format!("{}", e)))?;
debug!("Waiting for results");
rx.recv_async().await??;
debug!("Done sending details");
Ok(())
}
async fn batch_size(&self) -> usize {
self.size
}
async fn interval(&self) -> Duration {
self.duration
}
async fn max_batch_bytes(&self) -> usize {
self.max_batch_bytes
}
}
#[async_trait]
impl Closer for Elastic {}
#[fiddler_registration_func]
fn create_elasticsearch(conf: Value) -> Result<ExecutionType, Error> {
let elastic: ElasticConfig = serde_yaml::from_value(conf.clone())?;
if elastic.username.is_none() && elastic.password.is_some() {
return Err(Error::ConfigFailedValidation(
"password is set but username is not".into(),
));
} else if elastic.username.is_some() && elastic.password.is_none() {
return Err(Error::ConfigFailedValidation(
"username is set but password is not".into(),
));
} else if elastic.cloud_id.is_some()
&& (elastic.username.is_none() || elastic.password.is_none())
{
return Err(Error::ConfigFailedValidation(
"cloud_id is set but username and/or password are not".into(),
));
} else if elastic.cloud_id.is_none() && elastic.url.is_none() {
return Err(Error::ConfigFailedValidation(
"cloud_id or url is required".into(),
));
}
let c = elastic.get_client()?;
let (sender, receiver) = bounded(0);
let _ = tokio::spawn(elasticsearch_handler(c, elastic.index.clone(), receiver));
let size = match &elastic.batch_policy {
Some(i) => i.size.unwrap_or(500),
None => 500,
};
let duration = match &elastic.batch_policy {
Some(i) => i.duration.unwrap_or(Duration::from_secs(10)),
None => Duration::from_secs(10),
};
let max_batch_bytes = match &elastic.batch_policy {
Some(i) => i.max_batch_bytes.unwrap_or(10_485_760),
None => 10_485_760,
};
Ok(ExecutionType::OutputBatch(Box::new(Elastic {
sender,
size,
duration,
max_batch_bytes,
})))
}
pub(super) fn register_elasticsearch() -> Result<(), Error> {
let config = "type: object
properties:
url:
type: string
username:
type: string
password:
type: string
cloud_id:
type: string
index:
type: string
tls:
type: object
properties:
ca:
type: string
description: CA certificate — file path or inline PEM
cert:
type: string
description: Client certificate for mTLS — file path or inline PEM
key:
type: string
description: Client private key for mTLS — file path or inline PEM
skip_verify:
type: boolean
default: false
description: Skip server certificate verification
description: TLS configuration for custom certificates
batch_policy:
type: object
properties:
size:
type: integer
duration:
type: string
max_batch_bytes:
type: integer
default: 10485760
required:
- index
- url";
let conf_spec = ConfigSpec::from_schema(config)?;
register_plugin(
"elasticsearch".into(),
ItemType::OutputBatch,
conf_spec,
create_elasticsearch,
)
}
#[cfg(test)]
mod test {
use super::*;
#[test]
fn register_plugin() {
register_elasticsearch().unwrap()
}
#[test]
fn test_config_with_tls_skip_verify() {
let yaml = r#"
url: "https://localhost:9200"
index: "logs"
tls:
skip_verify: true
"#;
let config: ElasticConfig = serde_yaml::from_str(yaml).unwrap();
let tls = config.tls.as_ref().unwrap();
assert!(tls.skip_verify);
}
#[test]
fn test_config_with_tls_ca() {
let yaml = r#"
url: "https://localhost:9200"
index: "logs"
tls:
ca: /etc/ssl/ca.crt
"#;
let config: ElasticConfig = serde_yaml::from_str(yaml).unwrap();
let tls = config.tls.as_ref().unwrap();
assert_eq!(tls.ca.as_deref(), Some("/etc/ssl/ca.crt"));
assert!(!tls.skip_verify);
}
#[test]
fn test_config_without_tls() {
let yaml = r#"
url: "https://localhost:9200"
index: "logs"
"#;
let config: ElasticConfig = serde_yaml::from_str(yaml).unwrap();
assert!(config.tls.is_none());
}
}