use std::ffi::OsStr;
#[cfg(any(feature = "native-tls", feature = "rustls-tls"))]
use opensearch::cert::CertificateValidation;
use opensearch::{
auth::Credentials,
http::{
response::Response,
transport::{SingleNodeConnectionPool, TransportBuilder},
StatusCode,
},
indices::IndicesExistsParts,
params::Refresh,
BulkOperation, BulkParts, Error, OpenSearch, DEFAULT_ADDRESS,
};
use serde_json::json;
use sysinfo::{ProcessRefreshKind, RefreshKind, System};
use url::Url;
pub fn cluster_addr() -> String {
match std::env::var("OPENSEARCH_URL") {
Ok(server) => server,
Err(_) => DEFAULT_ADDRESS.into(),
}
}
fn running_proxy() -> bool {
let system = System::new_with_specifics(
RefreshKind::nothing().with_processes(ProcessRefreshKind::default()),
);
let has_fiddler = system
.processes_by_name(OsStr::new("Fiddler"))
.next()
.is_some();
has_fiddler
}
pub struct TestClientBuilder(TransportBuilder);
impl TestClientBuilder {
pub fn new() -> Self {
Self::with_url(&cluster_addr())
}
pub fn with_url(url: &str) -> Self {
let url = Url::parse(url).unwrap();
let secure = url.scheme() == "https";
let conn_pool = SingleNodeConnectionPool::new(url);
let mut builder = TransportBuilder::new(conn_pool);
if secure {
builder = builder.auth(Credentials::Basic(
"admin".into(),
std::env::var("OPENSEARCH_PASSWORD").unwrap_or("admin".into()),
));
#[cfg(any(feature = "native-tls", feature = "rustls-tls"))]
{
builder = builder.cert_validation(CertificateValidation::None);
}
}
Self(builder)
}
pub fn with(mut self, configurator: impl FnOnce(TransportBuilder) -> TransportBuilder) -> Self {
self.0 = configurator(self.0);
self
}
pub fn build(self) -> OpenSearch {
let mut builder = self.0;
if running_proxy() {
let proxy_url = Url::parse("http://localhost:8888").unwrap();
builder = builder.proxy(proxy_url, None, None);
}
let transport = builder.build().unwrap();
OpenSearch::new(transport)
}
}
impl Default for TestClientBuilder {
fn default() -> Self {
Self::new()
}
}
pub fn builder() -> TestClientBuilder {
TestClientBuilder::new()
}
pub fn builder_with_url(url: &str) -> TestClientBuilder {
TestClientBuilder::with_url(url)
}
pub fn create() -> OpenSearch {
builder().build()
}
pub fn create_with(configurator: impl FnOnce(TransportBuilder) -> TransportBuilder) -> OpenSearch {
builder().with(configurator).build()
}
pub fn create_with_url(url: &str) -> OpenSearch {
builder_with_url(url).build()
}
pub async fn index_documents(client: &OpenSearch) -> Result<Response, Error> {
let index = "posts";
let exists_response = client
.indices()
.exists(IndicesExistsParts::Index(&[index]))
.send()
.await?;
if exists_response.status_code() == StatusCode::NOT_FOUND {
let mut body: Vec<BulkOperation<_>> = vec![];
for i in 1..=10 {
let op = BulkOperation::index(json!({"title":"OpenSearch"}))
.id(i.to_string())
.into();
body.push(op);
}
client
.bulk(BulkParts::Index(index))
.body(body)
.refresh(Refresh::WaitFor)
.send()
.await
} else {
Ok(exists_response)
}
}