Skip to main content

awsim_opensearch/
lib.rs

1//! Amazon OpenSearch (Elasticsearch-compatible) emulator for AWSim.
2//!
3//! Unlike other AWS services, OpenSearch exposes an Elasticsearch-compatible
4//! REST API rather than using AWS API protocols. This crate provides an Axum
5//! router that handles index management, document CRUD, and search queries.
6
7#![deny(warnings)]
8
9mod operations;
10pub mod state;
11mod util;
12
13use std::sync::Arc;
14
15use axum::Router;
16use axum::extract::{Path, Query, State};
17use axum::http::StatusCode;
18use axum::response::{IntoResponse, Json};
19use axum::routing::{delete, get, head, post, put};
20use serde::Deserialize;
21use serde_json::{Value, json};
22
23use state::OpenSearchState;
24
25/// Build an Axum router for the OpenSearch Elasticsearch-compatible API.
26///
27/// Mount this at `/opensearch` in the main server.
28pub fn router(state: Arc<OpenSearchState>) -> Router {
29    Router::new()
30        // Cluster info
31        .route("/", get(cluster_info))
32        // Cluster health
33        .route("/_cluster/health", get(cluster_health_handler))
34        // Tasks API
35        .route("/_tasks/{task_id}", get(task_handler))
36        // Aliases
37        .route("/_aliases", post(aliases_handler))
38        // Reindex
39        .route("/_reindex", post(reindex_handler))
40        // Multi-search (global)
41        .route("/_msearch", post(msearch_global_handler))
42        // Multi-get (global)
43        .route("/_mget", post(mget_global_handler))
44        // Cat APIs
45        .route("/_cat/indices", get(cat_indices))
46        // Bulk API
47        .route("/_bulk", post(bulk_handler))
48        // Index operations
49        .route("/{index}", put(create_index))
50        .route("/{index}", get(get_index))
51        .route("/{index}", head(head_index))
52        .route("/{index}", delete(delete_index))
53        .route("/{index}/_mapping", get(get_mapping))
54        .route("/{index}/_mapping", put(put_mapping))
55        .route("/{index}/_refresh", post(refresh_handler))
56        .route("/{index}/_count", post(count))
57        .route("/{index}/_count", get(count))
58        // Search
59        .route("/{index}/_search", post(search))
60        .route("/{index}/_search", get(search))
61        // Multi-search (per-index)
62        .route("/{index}/_msearch", post(msearch_index_handler))
63        // Multi-get (per-index)
64        .route("/{index}/_mget", post(mget_index_handler))
65        // Document operations
66        .route("/{index}/_doc/{id}", put(put_doc))
67        .route("/{index}/_doc/{id}", post(put_doc))
68        .route("/{index}/_doc/{id}", get(get_doc))
69        .route("/{index}/_doc/{id}", delete(delete_doc))
70        .route("/{index}/_doc", post(post_doc_auto_id))
71        // Update document
72        .route("/{index}/_update/{id}", post(update_doc_handler))
73        // Update by query
74        .route("/{index}/_update_by_query", post(update_by_query_handler))
75        // Delete by query
76        .route("/{index}/_delete_by_query", post(delete_by_query_handler))
77        // Source-only get
78        .route("/{index}/_source/{id}", get(get_source_handler))
79        // Bulk per index
80        .route("/{index}/_bulk", post(bulk_index_handler))
81        .with_state(state)
82}
83
84// --- Handlers ---
85
86async fn cluster_info() -> Json<Value> {
87    Json(json!({
88        "name": "awsim-opensearch",
89        "cluster_name": "awsim",
90        "cluster_uuid": "awsim-local",
91        "version": {
92            "distribution": "opensearch",
93            "number": "3.6.0",
94            "build_type": "tar",
95            "build_hash": "awsim",
96            "build_date": "2025-01-01T00:00:00Z",
97            "build_snapshot": false,
98            "lucene_version": "10.2.0",
99            "minimum_wire_compatibility_version": "3.0.0",
100            "minimum_index_compatibility_version": "3.0.0",
101        },
102        "tagline": "The OpenSearch Project: https://opensearch.org/",
103    }))
104}
105
106async fn cat_indices(State(state): State<Arc<OpenSearchState>>) -> Json<Value> {
107    let (_, body) = operations::index::cat_indices(&state);
108    Json(body)
109}
110
111async fn create_index(
112    State(state): State<Arc<OpenSearchState>>,
113    Path(index): Path<String>,
114    body: Option<Json<Value>>,
115) -> impl IntoResponse {
116    let body = body.map(|b| b.0).unwrap_or(json!({}));
117    let (status, result) = operations::index::create_index(&state, &index, &body);
118    (
119        StatusCode::from_u16(status).unwrap_or(StatusCode::OK),
120        Json(result),
121    )
122}
123
124async fn get_index(
125    State(state): State<Arc<OpenSearchState>>,
126    Path(index): Path<String>,
127) -> impl IntoResponse {
128    let (status, result) = operations::index::get_index(&state, &index);
129    (
130        StatusCode::from_u16(status).unwrap_or(StatusCode::OK),
131        Json(result),
132    )
133}
134
135async fn head_index(
136    State(state): State<Arc<OpenSearchState>>,
137    Path(index): Path<String>,
138) -> impl IntoResponse {
139    let status = operations::index::index_exists(&state, &index);
140    StatusCode::from_u16(status).unwrap_or(StatusCode::NOT_FOUND)
141}
142
143async fn delete_index(
144    State(state): State<Arc<OpenSearchState>>,
145    Path(index): Path<String>,
146) -> impl IntoResponse {
147    let (status, result) = operations::index::delete_index(&state, &index);
148    (
149        StatusCode::from_u16(status).unwrap_or(StatusCode::OK),
150        Json(result),
151    )
152}
153
154async fn get_mapping(
155    State(state): State<Arc<OpenSearchState>>,
156    Path(index): Path<String>,
157) -> impl IntoResponse {
158    let (status, result) = operations::index::get_mapping(&state, &index);
159    (
160        StatusCode::from_u16(status).unwrap_or(StatusCode::OK),
161        Json(result),
162    )
163}
164
165async fn put_mapping(
166    State(state): State<Arc<OpenSearchState>>,
167    Path(index): Path<String>,
168    body: Option<Json<Value>>,
169) -> impl IntoResponse {
170    let body = body.map(|b| b.0).unwrap_or(json!({}));
171    let (status, result) = operations::index::put_mapping(&state, &index, &body);
172    (
173        StatusCode::from_u16(status).unwrap_or(StatusCode::OK),
174        Json(result),
175    )
176}
177
178async fn refresh_handler(
179    State(state): State<Arc<OpenSearchState>>,
180    Path(index): Path<String>,
181) -> impl IntoResponse {
182    let (status, result) = operations::index::refresh(&state, &index);
183    (
184        StatusCode::from_u16(status).unwrap_or(StatusCode::OK),
185        Json(result),
186    )
187}
188
189async fn search(
190    State(state): State<Arc<OpenSearchState>>,
191    Path(index): Path<String>,
192    body: Option<Json<Value>>,
193) -> impl IntoResponse {
194    let body = body
195        .map(|b| b.0)
196        .unwrap_or(json!({"query": {"match_all": {}}}));
197    let (status, result) = operations::search::search(&state, &index, &body);
198    (
199        StatusCode::from_u16(status).unwrap_or(StatusCode::OK),
200        Json(result),
201    )
202}
203
204async fn count(
205    State(state): State<Arc<OpenSearchState>>,
206    Path(index): Path<String>,
207    body: Option<Json<Value>>,
208) -> impl IntoResponse {
209    let body = body
210        .map(|b| b.0)
211        .unwrap_or(json!({"query": {"match_all": {}}}));
212    let (status, result) = operations::search::count(&state, &index, &body);
213    (
214        StatusCode::from_u16(status).unwrap_or(StatusCode::OK),
215        Json(result),
216    )
217}
218
219async fn put_doc(
220    State(state): State<Arc<OpenSearchState>>,
221    Path((index, id)): Path<(String, String)>,
222    Json(body): Json<Value>,
223) -> impl IntoResponse {
224    let (status, result) = operations::document::index_document(&state, &index, Some(&id), &body);
225    (
226        StatusCode::from_u16(status).unwrap_or(StatusCode::OK),
227        Json(result),
228    )
229}
230
231async fn post_doc_auto_id(
232    State(state): State<Arc<OpenSearchState>>,
233    Path(index): Path<String>,
234    Json(body): Json<Value>,
235) -> impl IntoResponse {
236    let (status, result) = operations::document::index_document(&state, &index, None, &body);
237    (
238        StatusCode::from_u16(status).unwrap_or(StatusCode::CREATED),
239        Json(result),
240    )
241}
242
243async fn get_doc(
244    State(state): State<Arc<OpenSearchState>>,
245    Path((index, id)): Path<(String, String)>,
246) -> impl IntoResponse {
247    let (status, result) = operations::document::get_document(&state, &index, &id);
248    (
249        StatusCode::from_u16(status).unwrap_or(StatusCode::OK),
250        Json(result),
251    )
252}
253
254async fn delete_doc(
255    State(state): State<Arc<OpenSearchState>>,
256    Path((index, id)): Path<(String, String)>,
257) -> impl IntoResponse {
258    let (status, result) = operations::document::delete_document(&state, &index, &id);
259    (
260        StatusCode::from_u16(status).unwrap_or(StatusCode::OK),
261        Json(result),
262    )
263}
264
265async fn bulk_handler(
266    State(state): State<Arc<OpenSearchState>>,
267    body: String,
268) -> impl IntoResponse {
269    let (status, result) = operations::bulk::bulk(&state, None, &body);
270    (
271        StatusCode::from_u16(status).unwrap_or(StatusCode::OK),
272        Json(result),
273    )
274}
275
276async fn bulk_index_handler(
277    State(state): State<Arc<OpenSearchState>>,
278    Path(index): Path<String>,
279    body: String,
280) -> impl IntoResponse {
281    let (status, result) = operations::bulk::bulk(&state, Some(&index), &body);
282    (
283        StatusCode::from_u16(status).unwrap_or(StatusCode::OK),
284        Json(result),
285    )
286}
287
288// --- Cluster-level handlers ---
289
290async fn cluster_health_handler(State(state): State<Arc<OpenSearchState>>) -> impl IntoResponse {
291    let (status, result) = operations::cluster::cluster_health(&state);
292    (
293        StatusCode::from_u16(status).unwrap_or(StatusCode::OK),
294        Json(result),
295    )
296}
297
298async fn task_handler(Path(task_id): Path<String>) -> impl IntoResponse {
299    let (status, result) = operations::cluster::get_task(&task_id);
300    (
301        StatusCode::from_u16(status).unwrap_or(StatusCode::OK),
302        Json(result),
303    )
304}
305
306#[derive(Deserialize)]
307struct ReindexParams {
308    wait_for_completion: Option<bool>,
309}
310
311async fn reindex_handler(
312    State(state): State<Arc<OpenSearchState>>,
313    Query(params): Query<ReindexParams>,
314    Json(body): Json<Value>,
315) -> impl IntoResponse {
316    let wait = params.wait_for_completion.unwrap_or(true);
317    let (status, result) = operations::cluster::reindex(&state, &body, wait);
318    (
319        StatusCode::from_u16(status).unwrap_or(StatusCode::OK),
320        Json(result),
321    )
322}
323
324async fn aliases_handler(
325    State(state): State<Arc<OpenSearchState>>,
326    Json(body): Json<Value>,
327) -> impl IntoResponse {
328    let (status, result) = operations::cluster::update_aliases(&state, &body);
329    (
330        StatusCode::from_u16(status).unwrap_or(StatusCode::OK),
331        Json(result),
332    )
333}
334
335async fn msearch_global_handler(
336    State(state): State<Arc<OpenSearchState>>,
337    body: String,
338) -> impl IntoResponse {
339    let (status, result) = operations::cluster::msearch(&state, None, &body);
340    (
341        StatusCode::from_u16(status).unwrap_or(StatusCode::OK),
342        Json(result),
343    )
344}
345
346async fn msearch_index_handler(
347    State(state): State<Arc<OpenSearchState>>,
348    Path(index): Path<String>,
349    body: String,
350) -> impl IntoResponse {
351    let (status, result) = operations::cluster::msearch(&state, Some(&index), &body);
352    (
353        StatusCode::from_u16(status).unwrap_or(StatusCode::OK),
354        Json(result),
355    )
356}
357
358// --- Document-level handlers ---
359
360async fn update_doc_handler(
361    State(state): State<Arc<OpenSearchState>>,
362    Path((index, id)): Path<(String, String)>,
363    Json(body): Json<Value>,
364) -> impl IntoResponse {
365    let (status, result) = operations::document::update_document(&state, &index, &id, &body);
366    (
367        StatusCode::from_u16(status).unwrap_or(StatusCode::OK),
368        Json(result),
369    )
370}
371
372async fn update_by_query_handler(
373    State(state): State<Arc<OpenSearchState>>,
374    Path(index): Path<String>,
375    body: Option<Json<Value>>,
376) -> impl IntoResponse {
377    let body = body
378        .map(|b| b.0)
379        .unwrap_or(json!({"query": {"match_all": {}}}));
380    let (status, result) = operations::document::update_by_query(&state, &index, &body);
381    (
382        StatusCode::from_u16(status).unwrap_or(StatusCode::OK),
383        Json(result),
384    )
385}
386
387async fn delete_by_query_handler(
388    State(state): State<Arc<OpenSearchState>>,
389    Path(index): Path<String>,
390    body: Option<Json<Value>>,
391) -> impl IntoResponse {
392    let body = body
393        .map(|b| b.0)
394        .unwrap_or(json!({"query": {"match_all": {}}}));
395    let (status, result) = operations::document::delete_by_query(&state, &index, &body);
396    (
397        StatusCode::from_u16(status).unwrap_or(StatusCode::OK),
398        Json(result),
399    )
400}
401
402async fn mget_global_handler(
403    State(state): State<Arc<OpenSearchState>>,
404    Json(body): Json<Value>,
405) -> impl IntoResponse {
406    let index = body.get("index").and_then(|v| v.as_str()).unwrap_or("_all");
407    let (status, result) = operations::document::mget(&state, index, &body);
408    (
409        StatusCode::from_u16(status).unwrap_or(StatusCode::OK),
410        Json(result),
411    )
412}
413
414async fn mget_index_handler(
415    State(state): State<Arc<OpenSearchState>>,
416    Path(index): Path<String>,
417    Json(body): Json<Value>,
418) -> impl IntoResponse {
419    let (status, result) = operations::document::mget(&state, &index, &body);
420    (
421        StatusCode::from_u16(status).unwrap_or(StatusCode::OK),
422        Json(result),
423    )
424}
425
426async fn get_source_handler(
427    State(state): State<Arc<OpenSearchState>>,
428    Path((index, id)): Path<(String, String)>,
429) -> impl IntoResponse {
430    let (status, result) = operations::document::get_document(&state, &index, &id);
431    if status == 200 {
432        // Return just the _source without the metadata wrapper
433        let source = result["_source"].clone();
434        (StatusCode::OK, Json(source))
435    } else {
436        (
437            StatusCode::from_u16(status).unwrap_or(StatusCode::NOT_FOUND),
438            Json(result),
439        )
440    }
441}