use serde_json::{Value, json};
use crate::state::{IndexMeta, OpenSearchState};
pub fn cluster_health(state: &OpenSearchState) -> (u16, Value) {
let index_count = state.indices.len();
(
200,
json!({
"cluster_name": "awsim",
"status": "green",
"timed_out": false,
"number_of_nodes": 1,
"number_of_data_nodes": 1,
"active_primary_shards": index_count,
"active_shards": index_count,
"relocating_shards": 0,
"initializing_shards": 0,
"unassigned_shards": 0,
"delayed_unassigned_shards": 0,
"number_of_pending_tasks": 0,
"number_of_in_flight_fetch": 0,
"task_max_waiting_in_queue_millis": 0,
"active_shards_percent_as_number": 100.0,
}),
)
}
pub fn get_task(task_id: &str) -> (u16, Value) {
(
200,
json!({
"completed": true,
"task": {
"node": "awsim-node-1",
"id": 1,
"type": "transport",
"action": "indices:data/write/reindex",
"description": task_id,
"start_time_in_millis": 0,
"running_time_in_nanos": 0,
"status": {
"total": 0,
"updated": 0,
"created": 0,
"deleted": 0,
"batches": 1,
"version_conflicts": 0,
"noops": 0,
"throttled_millis": 0,
"requests_per_second": -1.0,
"throttled_until_millis": 0,
}
},
"response": {
"took": 1,
"timed_out": false,
"total": 0,
"updated": 0,
"created": 0,
"deleted": 0,
"batches": 1,
"version_conflicts": 0,
"noops": 0,
"throttled_millis": 0,
"requests_per_second": -1.0,
"throttled_until_millis": 0,
"failures": [],
}
}),
)
}
pub fn reindex(state: &OpenSearchState, body: &Value, wait_for_completion: bool) -> (u16, Value) {
let source_index = body
.pointer("/source/index")
.and_then(|v| v.as_str())
.unwrap_or("");
let dest_index = body
.pointer("/dest/index")
.and_then(|v| v.as_str())
.unwrap_or("");
if source_index.is_empty() || dest_index.is_empty() {
return (
400,
json!({
"error": "source.index and dest.index are required",
"status": 400,
}),
);
}
let mut docs: Vec<(String, Value)> = Vec::new();
if state.index_exists(source_index) {
let _ = state.for_each_doc(source_index, |id, doc| {
docs.push((id.to_string(), doc.clone()));
true
});
}
let count = docs.len();
if !state.index_exists(dest_index) {
let (mappings, settings) = state
.get_index_meta(source_index)
.map(|m| (m.mappings, m.settings))
.unwrap_or((json!({}), json!({})));
let _ = state.create_index_meta(
dest_index,
IndexMeta {
mappings,
settings,
created_at: crate::util::now_iso8601(),
uuid: uuid::Uuid::new_v4().to_string(),
},
);
}
for (id, doc) in &docs {
let _ = state.put_doc(dest_index, id, doc);
}
let task_id = format!("awsim-task-{}", uuid::Uuid::new_v4());
if wait_for_completion {
(
200,
json!({
"took": 1,
"timed_out": false,
"total": count,
"updated": 0,
"created": count,
"deleted": 0,
"batches": 1,
"version_conflicts": 0,
"noops": 0,
"throttled_millis": 0,
"requests_per_second": -1.0,
"throttled_until_millis": 0,
"failures": [],
}),
)
} else {
(200, json!({ "task": task_id }))
}
}
pub fn update_aliases(state: &OpenSearchState, body: &Value) -> (u16, Value) {
let actions = match body.get("actions").and_then(|a| a.as_array()) {
Some(a) => a.clone(),
None => {
return (
400,
json!({
"error": "actions array is required",
"status": 400,
}),
);
}
};
for action in &actions {
if let Some(add) = action.get("add") {
let index = add["index"].as_str().unwrap_or("").to_string();
let alias = add["alias"].as_str().unwrap_or("").to_string();
if !index.is_empty() && !alias.is_empty() {
let mut members = state
.aliases
.get(&alias)
.map(|v| v.clone())
.unwrap_or_default();
if !members.contains(&index) {
members.push(index);
}
let _ = state.put_alias(&alias, members);
}
}
if let Some(remove) = action.get("remove") {
let index = remove["index"].as_str().unwrap_or("");
let alias = remove["alias"].as_str().unwrap_or("").to_string();
if !alias.is_empty()
&& let Some(mut members) = state.aliases.get(&alias).map(|v| v.clone())
{
members.retain(|i| i != index);
if members.is_empty() {
let _ = state.delete_alias(&alias);
} else {
let _ = state.put_alias(&alias, members);
}
}
}
}
(200, json!({ "acknowledged": true }))
}
pub fn msearch(state: &OpenSearchState, default_index: Option<&str>, body: &str) -> (u16, Value) {
let lines: Vec<&str> = body.lines().filter(|l| !l.trim().is_empty()).collect();
let mut responses: Vec<Value> = Vec::new();
let mut i = 0;
while i + 1 < lines.len() {
let header: Value = match serde_json::from_str(lines[i]) {
Ok(v) => v,
Err(_) => {
i += 2;
continue;
}
};
let search_body: Value = match serde_json::from_str(lines[i + 1]) {
Ok(v) => v,
Err(_) => {
i += 2;
continue;
}
};
let index = header["index"].as_str().or(default_index).unwrap_or("_all");
let (_, result) = super::search::search(state, index, &search_body);
responses.push(result);
i += 2;
}
(200, json!({ "responses": responses }))
}