Skip to main content

opensearch_client/
bulk.rs

1use std::collections::HashMap;
2
3use opensearch_dsl::ShardStatistics;
4use serde::{Deserialize, Serialize};
5use serde_json::Value;
6
7#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Default)]
8pub struct UpdateActionBody {
9    #[serde(default, skip_serializing_if = "Option::is_none")]
10    pub doc: Option<Value>,
11    #[serde(default, skip_serializing_if = "Option::is_none")]
12    pub upsert: Option<Value>,
13    #[serde(default, skip_serializing_if = "Option::is_none")]
14    pub doc_as_upsert: Option<bool>,
15    #[serde(default, skip_serializing_if = "Option::is_none")]
16    pub script: Option<Script>,
17}
18
19/// Represents the body of an update action.
20///
21/// This struct provides methods for creating different types of update actions.
22/// An update action can be created with a document, a script, or a script with
23/// parameters.
24///
25/// # Examples
26///
27/// Creating an update action with a document:
28///
29/// ```
30/// use serde_json::json;
31/// use opensearch_client::types::bulk::UpdateActionBody;
32///
33/// let doc = json!({
34///     "name": "John Doe",
35///     "age": 30,
36/// });
37///
38/// let update_action = UpdateActionBody::new(doc);
39/// ```
40///
41/// Creating an update action with a script:
42///
43/// ```
44/// use opensearch_client::types::bulk::UpdateActionBody;
45///
46/// let script = r#"ctx._source.age += params.age"#;
47///
48/// let update_action = UpdateActionBody::from_script(script);
49/// ```
50///
51/// Creating an update action with a script and parameters:
52///
53/// ```
54/// use serde_json::json;
55/// use opensearch_client::types::bulk::UpdateActionBody;
56///
57/// let script = r#"ctx._source.age += params.age"#;
58/// let params = json!({
59///     "age": 5,
60/// });
61///
62/// let update_action = UpdateActionBody::from_script_parameters(script, params);
63/// ```
64///
65/// Creating an update action with a pre-defined script:
66///
67/// ```
68/// use opensearch_client::types::bulk::{UpdateActionBody, Script};
69///
70/// let script = Script {
71///     source: r#"ctx._source.age += params.age"#.to_string(),
72///     params: Some(json!({
73///         "age": 5,
74///     })),
75/// };
76///
77/// let update_action = UpdateActionBody::with_script(script);
78/// ```
79impl UpdateActionBody {
80    /// Creates a new update action with a document.
81    ///
82    /// # Arguments
83    ///
84    /// * `doc` - The document to be updated.
85    ///
86    /// # Returns
87    ///
88    /// The update action body.
89    pub fn new(doc: Value) -> Self {
90        Self {
91            doc: Some(doc),
92            upsert: None,
93            doc_as_upsert: None,
94            script: None,
95        }
96    }
97
98    /// Creates a new update action with a script.
99    ///
100    /// # Arguments
101    ///
102    /// * `script` - The script to be executed for the update action.
103    ///
104    /// # Returns
105    ///
106    /// The update action body.
107    pub fn from_script(script: &str) -> Self {
108        Self {
109            doc: None,
110            upsert: None,
111            doc_as_upsert: None,
112            script: Some(Script {
113                source: script.to_string(),
114                params: None,
115            }),
116        }
117    }
118
119    /// Creates a new update action with a script and parameters.
120    ///
121    /// # Arguments
122    ///
123    /// * `script` - The script to be executed for the update action.
124    /// * `params` - The parameters to be passed to the script.
125    ///
126    /// # Returns
127    ///
128    /// The update action body.
129    pub fn from_script_parameters(script: &str, params: serde_json::Value) -> Self {
130        Self {
131            doc: None,
132            upsert: None,
133            doc_as_upsert: None,
134            script: Some(Script {
135                source: script.to_string(),
136                params: Some(params),
137            }),
138        }
139    }
140
141    /// Creates a new update action with a pre-defined script.
142    ///
143    /// # Arguments
144    ///
145    /// * `script` - The pre-defined script to be executed for the update action.
146    ///
147    /// # Returns
148    ///
149    /// The update action body.
150    pub fn with_script(script: Script) -> Self {
151        Self {
152            doc: None,
153            upsert: None,
154            doc_as_upsert: None,
155            script: Some(script),
156        }
157    }
158
159    /// Creates a new update action with a document and upsert option.
160    ///
161    /// # Arguments
162    ///
163    /// * `doc` - The document to be updated.
164    /// * `upsert` - The document to be used for upsert if the document does not exist.
165    /// # Returns
166    ///
167    /// The update action body.
168    pub fn with_doc_upsert(doc: &Value, upsert: &Value) -> Self {
169        Self {
170            doc: Some(doc.clone()),
171            upsert: Some(upsert.clone()),
172            doc_as_upsert: Some(true),
173            script: None,
174        }
175    }
176}
177
178#[derive(Default, Debug, Clone, PartialEq, Serialize, Deserialize)]
179pub struct IndexResponse {
180    #[serde(rename = "_index")]
181    pub index: String,
182    #[serde(rename = "_id")]
183    pub id: String,
184    #[serde(rename = "_version")]
185    pub version: i64,
186    #[serde(rename = "result")]
187    pub result: String,
188    #[serde(rename = "_shards", default, skip_serializing_if = "Option::is_none")]
189    pub shards: Option<ShardStatistics>,
190    #[serde(rename = "_seq_no", default)]
191    pub seq_no: i64,
192    #[serde(rename = "_primary_term", default)]
193    pub primary_term: i64,
194}
195
196#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
197pub enum BulkAction {
198    #[serde(rename = "index")]
199    Index(IndexAction),
200    #[serde(rename = "create")]
201    Create(CreateAction),
202    #[serde(rename = "update")]
203    Update(UpdateAction),
204    #[serde(rename = "delete")]
205    Delete(DeleteAction),
206}
207
208#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Default)]
209pub struct IndexAction {
210    #[serde(rename = "_index")]
211    pub index: String,
212    #[serde(rename = "_id")]
213    pub id: Option<String>,
214    #[serde(default, skip_serializing_if = "Option::is_none")]
215    pub pipeline: Option<String>,
216}
217
218#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Default)]
219pub struct CreateAction {
220    #[serde(rename = "_index")]
221    pub index: String,
222    #[serde(rename = "_id")]
223    pub id: String,
224    #[serde(default, skip_serializing_if = "Option::is_none")]
225    pub pipeline: Option<String>,
226}
227#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Default)]
228pub struct UpdateAction {
229    #[serde(rename = "_index")]
230    pub index: String,
231    #[serde(rename = "_id")]
232    pub id: String,
233    #[serde(default, skip_serializing_if = "Option::is_none")]
234    pub pipeline: Option<String>,
235    #[serde(default, skip_serializing_if = "Option::is_none")]
236    pub script: Option<String>,
237}
238
239#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Default)]
240pub struct DeleteAction {
241    #[serde(rename = "_index")]
242    pub index: String,
243    #[serde(rename = "_id")]
244    pub id: String,
245}
246
247#[derive(Default, Debug, Clone, PartialEq, Serialize, Deserialize)]
248pub struct BulkResponse {
249    pub items: Vec<HashMap<String, BulkItemResponse>>,
250    pub took: u64,
251    pub errors: bool,
252    #[serde(
253        rename = "ingest_took",
254        default,
255        skip_serializing_if = "Option::is_none"
256    )]
257    pub ingest_took: Option<u32>,
258}
259
260impl BulkResponse {
261    pub fn is_ok(&self) -> bool {
262        !self.errors
263    }
264
265    pub fn count_errors(&self) -> usize {
266        self.items
267            .iter()
268            .filter(|i| i.values().any(|i| i.status >= 400))
269            .count()
270    }
271
272    pub fn count_ok(&self) -> usize {
273        self.items
274            .iter()
275            .filter(|i| i.values().all(|i| i.status < 400))
276            .count()
277    }
278
279    pub fn count_create_errors(&self) -> usize {
280        self.items
281            .iter()
282            .filter(|i| i.values().any(|i| i.status == 409))
283            .count()
284    }
285}
286
287#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Default)]
288pub struct BulkError {
289    #[serde(rename = "_index")]
290    pub index: Option<String>,
291    #[serde(default)]
292    pub index_uuid: Option<String>,
293    #[serde(default)]
294    pub reason: String,
295    #[serde(default)]
296    pub shard: Option<String>,
297    #[serde(rename = "type")]
298    pub kind: String,
299}
300
301#[derive(Default, Debug, Clone, PartialEq, Serialize, Deserialize)]
302pub struct BulkItemResponse {
303    #[serde(rename = "_id")]
304    pub id: String,
305    #[serde(rename = "_index")]
306    pub index: String,
307    #[serde(rename = "_version", default, skip_serializing_if = "Option::is_none")]
308    pub version: Option<i64>,
309    #[serde(default)]
310    pub status: i16,
311    #[serde(default)]
312    pub found: Option<bool>,
313    #[serde(rename = "_shards", default, skip_serializing_if = "Option::is_none")]
314    pub shards: Option<ShardStatistics>,
315    #[serde(default, skip_serializing_if = "Option::is_none")]
316    pub error: Option<BulkError>,
317    #[serde(default, skip_serializing_if = "Option::is_none")]
318    pub cause: Option<crate::common::ErrorCause>,
319    #[serde(
320        default,
321        rename = "_primary_term",
322        skip_serializing_if = "Option::is_none"
323    )]
324    pub primary_term: Option<i32>,
325    #[serde(default, rename = "_seq_no", skip_serializing_if = "Option::is_none")]
326    pub seq_no: Option<i32>,
327}
328
329#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Default)]
330pub struct Script {
331    #[serde(default)]
332    pub source: String,
333    #[serde(default, skip_serializing_if = "Option::is_none")]
334    pub params: Option<serde_json::Value>,
335}