use super::Archive;
use crate::scheduler::job::JobInfo;
use chrono::{DateTime, Utc};
use clap::{App, Arg, ArgMatches, SubCommand};
use elastic_derive::ElasticType;
use log::{debug, error, info};
use serde::{Deserialize, Serialize};
use serde_json::json;
use std::collections::HashMap;
use std::io::Error;
use std::process::exit;
pub fn clap_subcommand(command: &str) -> App {
SubCommand::with_name(command)
.about("Archive to ElasticSearch")
.arg(
Arg::with_name("host")
.long("host")
.takes_value(true)
.default_value("localhost")
.help("The hostname of the ElasticSearch server"),
)
.arg(
Arg::with_name("port")
.long("port")
.takes_value(true)
.default_value("9200")
.help("The port of the ElasticSearch service"),
)
.arg(
Arg::with_name("index")
.long("index")
.takes_value(true)
.required(true)
.help("The index where the documents will be put"),
)
}
use elastic::client::{SyncClient, SyncClientBuilder};
pub struct ElasticArchive {
client: SyncClient,
}
fn create_index(client: &SyncClient, index_name: String) -> Result<(), Error> {
let body = json!({
"mappings": {
"dynamic": true,
"properties": {
"dynamic": true,
"@timestamp": {
"type": "date"
},
"jobinfo": {
"properties": {
"environment": {
"type": "object",
"dynamic": false
},
"id": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"cluster": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"timestamp": {
"type": "date"
},
"script": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
}
}
}
}
}
});
client
.index(index_name)
.create()
.body(body.to_string())
.send()
.unwrap();
Ok(())
}
impl ElasticArchive {
pub fn new(host: &str, port: u16, index: String) -> Self {
let client = SyncClientBuilder::new()
.sniff_nodes(format!("http://{host}:{port}", host = host, port = port)) .build()
.unwrap();
if let Ok(response) = client.index(index.to_owned()).exists().send() {
if !response.exists() {
create_index(&client, index).unwrap();
}
} else {
error!("Cannot check if index exists. Quitting.");
exit(1);
}
ElasticArchive {
client,
}
}
pub fn build(matches: &ArgMatches) -> Result<Self, Error> {
info!("Using ElasticSearch archival");
Ok(ElasticArchive::new(
matches.value_of("host").unwrap(),
matches.value_of("port").unwrap().parse::<u16>().unwrap(),
matches.value_of("index").unwrap().to_owned(),
))
}
}
impl Archive for ElasticArchive {
fn archive(&self, job_entry: &Box<dyn JobInfo>) -> Result<(), Error> {
debug!(
"ES archiver, received an entry for job ID {}",
job_entry.jobid()
);
let doc = JobMessage {
id: job_entry.jobid(),
timestamp: Utc::now(),
cluster: job_entry.cluster(),
script: job_entry.script(),
environment: job_entry.extra_info(),
};
let _res = self.client.document().index(doc).send().unwrap();
Ok(())
}
}
#[cfg(feature = "elasticsearch-7")]
#[derive(Serialize, Deserialize, ElasticType)]
struct JobMessage {
#[elastic(id)]
pub id: String,
pub timestamp: DateTime<Utc>,
pub cluster: String,
pub script: String,
pub environment: Option<HashMap<String, String>>,
}
#[cfg(test)]
mod tests {
use super::*;
use std::env;
use std::path::PathBuf;
}