1#![deny(warnings)]
8
9mod operations;
10pub mod state;
11mod util;
12
13use std::sync::Arc;
14
15use axum::Router;
16use axum::extract::{Path, Query, State};
17use axum::http::StatusCode;
18use axum::response::{IntoResponse, Json};
19use axum::routing::{delete, get, head, post, put};
20use serde::Deserialize;
21use serde_json::{Value, json};
22
23use state::OpenSearchState;
24
25pub fn router(state: Arc<OpenSearchState>) -> Router {
29 Router::new()
30 .route("/", get(cluster_info))
32 .route("/_cluster/health", get(cluster_health_handler))
34 .route("/_tasks/{task_id}", get(task_handler))
36 .route("/_aliases", post(aliases_handler))
38 .route("/_reindex", post(reindex_handler))
40 .route("/_msearch", post(msearch_global_handler))
42 .route("/_mget", post(mget_global_handler))
44 .route("/_cat/indices", get(cat_indices))
46 .route("/_bulk", post(bulk_handler))
48 .route("/{index}", put(create_index))
50 .route("/{index}", get(get_index))
51 .route("/{index}", head(head_index))
52 .route("/{index}", delete(delete_index))
53 .route("/{index}/_mapping", get(get_mapping))
54 .route("/{index}/_mapping", put(put_mapping))
55 .route("/{index}/_refresh", post(refresh_handler))
56 .route("/{index}/_count", post(count))
57 .route("/{index}/_count", get(count))
58 .route("/{index}/_search", post(search))
60 .route("/{index}/_search", get(search))
61 .route("/{index}/_msearch", post(msearch_index_handler))
63 .route("/{index}/_mget", post(mget_index_handler))
65 .route("/{index}/_doc/{id}", put(put_doc))
67 .route("/{index}/_doc/{id}", post(put_doc))
68 .route("/{index}/_doc/{id}", get(get_doc))
69 .route("/{index}/_doc/{id}", delete(delete_doc))
70 .route("/{index}/_doc", post(post_doc_auto_id))
71 .route("/{index}/_update/{id}", post(update_doc_handler))
73 .route("/{index}/_update_by_query", post(update_by_query_handler))
75 .route("/{index}/_delete_by_query", post(delete_by_query_handler))
77 .route("/{index}/_source/{id}", get(get_source_handler))
79 .route("/{index}/_bulk", post(bulk_index_handler))
81 .with_state(state)
82}
83
84async fn cluster_info() -> Json<Value> {
87 Json(json!({
88 "name": "awsim-opensearch",
89 "cluster_name": "awsim",
90 "cluster_uuid": "awsim-local",
91 "version": {
92 "distribution": "opensearch",
93 "number": "3.6.0",
94 "build_type": "tar",
95 "build_hash": "awsim",
96 "build_date": "2025-01-01T00:00:00Z",
97 "build_snapshot": false,
98 "lucene_version": "10.2.0",
99 "minimum_wire_compatibility_version": "3.0.0",
100 "minimum_index_compatibility_version": "3.0.0",
101 },
102 "tagline": "The OpenSearch Project: https://opensearch.org/",
103 }))
104}
105
106async fn cat_indices(State(state): State<Arc<OpenSearchState>>) -> Json<Value> {
107 let (_, body) = operations::index::cat_indices(&state);
108 Json(body)
109}
110
111async fn create_index(
112 State(state): State<Arc<OpenSearchState>>,
113 Path(index): Path<String>,
114 body: Option<Json<Value>>,
115) -> impl IntoResponse {
116 let body = body.map(|b| b.0).unwrap_or(json!({}));
117 let (status, result) = operations::index::create_index(&state, &index, &body);
118 (
119 StatusCode::from_u16(status).unwrap_or(StatusCode::OK),
120 Json(result),
121 )
122}
123
124async fn get_index(
125 State(state): State<Arc<OpenSearchState>>,
126 Path(index): Path<String>,
127) -> impl IntoResponse {
128 let (status, result) = operations::index::get_index(&state, &index);
129 (
130 StatusCode::from_u16(status).unwrap_or(StatusCode::OK),
131 Json(result),
132 )
133}
134
135async fn head_index(
136 State(state): State<Arc<OpenSearchState>>,
137 Path(index): Path<String>,
138) -> impl IntoResponse {
139 let status = operations::index::index_exists(&state, &index);
140 StatusCode::from_u16(status).unwrap_or(StatusCode::NOT_FOUND)
141}
142
143async fn delete_index(
144 State(state): State<Arc<OpenSearchState>>,
145 Path(index): Path<String>,
146) -> impl IntoResponse {
147 let (status, result) = operations::index::delete_index(&state, &index);
148 (
149 StatusCode::from_u16(status).unwrap_or(StatusCode::OK),
150 Json(result),
151 )
152}
153
154async fn get_mapping(
155 State(state): State<Arc<OpenSearchState>>,
156 Path(index): Path<String>,
157) -> impl IntoResponse {
158 let (status, result) = operations::index::get_mapping(&state, &index);
159 (
160 StatusCode::from_u16(status).unwrap_or(StatusCode::OK),
161 Json(result),
162 )
163}
164
165async fn put_mapping(
166 State(state): State<Arc<OpenSearchState>>,
167 Path(index): Path<String>,
168 body: Option<Json<Value>>,
169) -> impl IntoResponse {
170 let body = body.map(|b| b.0).unwrap_or(json!({}));
171 let (status, result) = operations::index::put_mapping(&state, &index, &body);
172 (
173 StatusCode::from_u16(status).unwrap_or(StatusCode::OK),
174 Json(result),
175 )
176}
177
178async fn refresh_handler(
179 State(state): State<Arc<OpenSearchState>>,
180 Path(index): Path<String>,
181) -> impl IntoResponse {
182 let (status, result) = operations::index::refresh(&state, &index);
183 (
184 StatusCode::from_u16(status).unwrap_or(StatusCode::OK),
185 Json(result),
186 )
187}
188
189async fn search(
190 State(state): State<Arc<OpenSearchState>>,
191 Path(index): Path<String>,
192 body: Option<Json<Value>>,
193) -> impl IntoResponse {
194 let body = body
195 .map(|b| b.0)
196 .unwrap_or(json!({"query": {"match_all": {}}}));
197 let (status, result) = operations::search::search(&state, &index, &body);
198 (
199 StatusCode::from_u16(status).unwrap_or(StatusCode::OK),
200 Json(result),
201 )
202}
203
204async fn count(
205 State(state): State<Arc<OpenSearchState>>,
206 Path(index): Path<String>,
207 body: Option<Json<Value>>,
208) -> impl IntoResponse {
209 let body = body
210 .map(|b| b.0)
211 .unwrap_or(json!({"query": {"match_all": {}}}));
212 let (status, result) = operations::search::count(&state, &index, &body);
213 (
214 StatusCode::from_u16(status).unwrap_or(StatusCode::OK),
215 Json(result),
216 )
217}
218
219async fn put_doc(
220 State(state): State<Arc<OpenSearchState>>,
221 Path((index, id)): Path<(String, String)>,
222 Json(body): Json<Value>,
223) -> impl IntoResponse {
224 let (status, result) = operations::document::index_document(&state, &index, Some(&id), &body);
225 (
226 StatusCode::from_u16(status).unwrap_or(StatusCode::OK),
227 Json(result),
228 )
229}
230
231async fn post_doc_auto_id(
232 State(state): State<Arc<OpenSearchState>>,
233 Path(index): Path<String>,
234 Json(body): Json<Value>,
235) -> impl IntoResponse {
236 let (status, result) = operations::document::index_document(&state, &index, None, &body);
237 (
238 StatusCode::from_u16(status).unwrap_or(StatusCode::CREATED),
239 Json(result),
240 )
241}
242
243async fn get_doc(
244 State(state): State<Arc<OpenSearchState>>,
245 Path((index, id)): Path<(String, String)>,
246) -> impl IntoResponse {
247 let (status, result) = operations::document::get_document(&state, &index, &id);
248 (
249 StatusCode::from_u16(status).unwrap_or(StatusCode::OK),
250 Json(result),
251 )
252}
253
254async fn delete_doc(
255 State(state): State<Arc<OpenSearchState>>,
256 Path((index, id)): Path<(String, String)>,
257) -> impl IntoResponse {
258 let (status, result) = operations::document::delete_document(&state, &index, &id);
259 (
260 StatusCode::from_u16(status).unwrap_or(StatusCode::OK),
261 Json(result),
262 )
263}
264
265async fn bulk_handler(
266 State(state): State<Arc<OpenSearchState>>,
267 body: String,
268) -> impl IntoResponse {
269 let (status, result) = operations::bulk::bulk(&state, None, &body);
270 (
271 StatusCode::from_u16(status).unwrap_or(StatusCode::OK),
272 Json(result),
273 )
274}
275
276async fn bulk_index_handler(
277 State(state): State<Arc<OpenSearchState>>,
278 Path(index): Path<String>,
279 body: String,
280) -> impl IntoResponse {
281 let (status, result) = operations::bulk::bulk(&state, Some(&index), &body);
282 (
283 StatusCode::from_u16(status).unwrap_or(StatusCode::OK),
284 Json(result),
285 )
286}
287
288async fn cluster_health_handler(State(state): State<Arc<OpenSearchState>>) -> impl IntoResponse {
291 let (status, result) = operations::cluster::cluster_health(&state);
292 (
293 StatusCode::from_u16(status).unwrap_or(StatusCode::OK),
294 Json(result),
295 )
296}
297
298async fn task_handler(Path(task_id): Path<String>) -> impl IntoResponse {
299 let (status, result) = operations::cluster::get_task(&task_id);
300 (
301 StatusCode::from_u16(status).unwrap_or(StatusCode::OK),
302 Json(result),
303 )
304}
305
306#[derive(Deserialize)]
307struct ReindexParams {
308 wait_for_completion: Option<bool>,
309}
310
311async fn reindex_handler(
312 State(state): State<Arc<OpenSearchState>>,
313 Query(params): Query<ReindexParams>,
314 Json(body): Json<Value>,
315) -> impl IntoResponse {
316 let wait = params.wait_for_completion.unwrap_or(true);
317 let (status, result) = operations::cluster::reindex(&state, &body, wait);
318 (
319 StatusCode::from_u16(status).unwrap_or(StatusCode::OK),
320 Json(result),
321 )
322}
323
324async fn aliases_handler(
325 State(state): State<Arc<OpenSearchState>>,
326 Json(body): Json<Value>,
327) -> impl IntoResponse {
328 let (status, result) = operations::cluster::update_aliases(&state, &body);
329 (
330 StatusCode::from_u16(status).unwrap_or(StatusCode::OK),
331 Json(result),
332 )
333}
334
335async fn msearch_global_handler(
336 State(state): State<Arc<OpenSearchState>>,
337 body: String,
338) -> impl IntoResponse {
339 let (status, result) = operations::cluster::msearch(&state, None, &body);
340 (
341 StatusCode::from_u16(status).unwrap_or(StatusCode::OK),
342 Json(result),
343 )
344}
345
346async fn msearch_index_handler(
347 State(state): State<Arc<OpenSearchState>>,
348 Path(index): Path<String>,
349 body: String,
350) -> impl IntoResponse {
351 let (status, result) = operations::cluster::msearch(&state, Some(&index), &body);
352 (
353 StatusCode::from_u16(status).unwrap_or(StatusCode::OK),
354 Json(result),
355 )
356}
357
358async fn update_doc_handler(
361 State(state): State<Arc<OpenSearchState>>,
362 Path((index, id)): Path<(String, String)>,
363 Json(body): Json<Value>,
364) -> impl IntoResponse {
365 let (status, result) = operations::document::update_document(&state, &index, &id, &body);
366 (
367 StatusCode::from_u16(status).unwrap_or(StatusCode::OK),
368 Json(result),
369 )
370}
371
372async fn update_by_query_handler(
373 State(state): State<Arc<OpenSearchState>>,
374 Path(index): Path<String>,
375 body: Option<Json<Value>>,
376) -> impl IntoResponse {
377 let body = body
378 .map(|b| b.0)
379 .unwrap_or(json!({"query": {"match_all": {}}}));
380 let (status, result) = operations::document::update_by_query(&state, &index, &body);
381 (
382 StatusCode::from_u16(status).unwrap_or(StatusCode::OK),
383 Json(result),
384 )
385}
386
387async fn delete_by_query_handler(
388 State(state): State<Arc<OpenSearchState>>,
389 Path(index): Path<String>,
390 body: Option<Json<Value>>,
391) -> impl IntoResponse {
392 let body = body
393 .map(|b| b.0)
394 .unwrap_or(json!({"query": {"match_all": {}}}));
395 let (status, result) = operations::document::delete_by_query(&state, &index, &body);
396 (
397 StatusCode::from_u16(status).unwrap_or(StatusCode::OK),
398 Json(result),
399 )
400}
401
402async fn mget_global_handler(
403 State(state): State<Arc<OpenSearchState>>,
404 Json(body): Json<Value>,
405) -> impl IntoResponse {
406 let index = body.get("index").and_then(|v| v.as_str()).unwrap_or("_all");
407 let (status, result) = operations::document::mget(&state, index, &body);
408 (
409 StatusCode::from_u16(status).unwrap_or(StatusCode::OK),
410 Json(result),
411 )
412}
413
414async fn mget_index_handler(
415 State(state): State<Arc<OpenSearchState>>,
416 Path(index): Path<String>,
417 Json(body): Json<Value>,
418) -> impl IntoResponse {
419 let (status, result) = operations::document::mget(&state, &index, &body);
420 (
421 StatusCode::from_u16(status).unwrap_or(StatusCode::OK),
422 Json(result),
423 )
424}
425
426async fn get_source_handler(
427 State(state): State<Arc<OpenSearchState>>,
428 Path((index, id)): Path<(String, String)>,
429) -> impl IntoResponse {
430 let (status, result) = operations::document::get_document(&state, &index, &id);
431 if status == 200 {
432 let source = result["_source"].clone();
434 (StatusCode::OK, Json(source))
435 } else {
436 (
437 StatusCode::from_u16(status).unwrap_or(StatusCode::NOT_FOUND),
438 Json(result),
439 )
440 }
441}