Skip to main content

opensearch_client/ingest/
pipeline.rs

1use std::{collections::HashMap, option::Option, vec::Vec};
2
3use serde::{Deserialize, Serialize};
4use serde_json::Map;
5
6#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
7pub struct Pipeline {
8    #[serde(default, skip_serializing_if = "Option::is_none")]
9    description: Option<String>,
10    #[serde(default, rename = "on_failure", skip_serializing_if = "Vec::is_empty")]
11    on_failure: Vec<Processor>,
12    #[serde(default, rename = "processors", skip_serializing_if = "Vec::is_empty")]
13    processors: Vec<Processor>,
14    #[serde(default, skip_serializing_if = "Option::is_none")]
15    version: Option<u32>,
16}
17
18impl Pipeline {
19    pub fn new() -> Pipeline {
20        Pipeline::default()
21    }
22}
23
24#[derive(Clone, Debug, Deserialize, Serialize, PartialEq, Eq)]
25pub enum Processor {
26    #[serde(rename = "key_value")]
27    KeyValueProcessor(KeyValueProcessor),
28    #[serde(rename = "set_security_user")]
29    SetSecurityUserProcessor(SetSecurityUserProcessor),
30    #[serde(rename = "join")]
31    JoinProcessor(JoinProcessor),
32    #[serde(rename = "attachment")]
33    AttachmentProcessor(AttachmentProcessor),
34    #[serde(rename = "foreach")]
35    ForeachProcessor(ForeachProcessor),
36    #[serde(rename = "csv")]
37    CsvProcessor(CsvProcessor),
38    #[serde(rename = "pipeline")]
39    PipelineProcessor(PipelineProcessor),
40    #[serde(rename = "dissect")]
41    DissectProcessor(DissectProcessor),
42    #[serde(rename = "user_agent")]
43    UserAgentProcessor(UserAgentProcessor),
44    #[serde(rename = "remove")]
45    RemoveProcessor(RemoveProcessor),
46    #[serde(rename = "url_decode")]
47    UrlDecodeProcessor(UrlDecodeProcessor),
48    #[serde(rename = "split")]
49    SplitProcessor(SplitProcessor),
50    #[serde(rename = "fail")]
51    FailProcessor(FailProcessor),
52    #[serde(rename = "sort")]
53    SortProcessor(SortProcessor),
54    // CircleProcessor(CircleProcessor),
55    #[serde(rename = "trim")]
56    TrimProcessor(TrimProcessor),
57    #[serde(rename = "script")]
58    ScriptProcessor(ScriptProcessor),
59    #[serde(rename = "json")]
60    JsonProcessor(JsonProcessor),
61    #[serde(rename = "uppercase")]
62    UppercaseProcessor(UppercaseProcessor),
63    #[serde(rename = "date")]
64    DateProcessor(DateProcessor),
65    #[serde(rename = "dot_expander")]
66    DotExpanderProcessor(DotExpanderProcessor),
67    #[serde(rename = "lowercase")]
68    LowercaseProcessor(LowercaseProcessor),
69    #[serde(rename = "set")]
70    SetProcessor(SetProcessor),
71    #[serde(rename = "grok")]
72    GrokProcessor(GrokProcessor),
73    #[serde(rename = "gsub")]
74    GsubProcessor(GsubProcessor),
75    #[serde(rename = "convert")]
76    ConvertProcessor(ConvertProcessor),
77    #[serde(rename = "geo_ip")]
78    GeoIpProcessor(GeoIpProcessor),
79    #[serde(rename = "bytes")]
80    BytesProcessor(BytesProcessor),
81    #[serde(rename = "inference")]
82    InferenceProcessor(InferenceProcessor),
83    #[serde(rename = "rename")]
84    RenameProcessor(RenameProcessor),
85    #[serde(rename = "append")]
86    AppendProcessor(AppendProcessor),
87    #[serde(rename = "date_index_name")]
88    DateIndexNameProcessor(DateIndexNameProcessor),
89    #[serde(rename = "drop")]
90    DropProcessor(DropProcessor),
91    #[serde(rename = "sparse_encoding")]
92    SparseEncodingProcessor(SparseEncodingProcessor),
93    #[serde(rename = "text_embedding")]
94    TextEmbeddingProcessor(TextEmbeddingProcessor),
95    #[serde(rename = "text_image_embedding")]
96    TextImageEmbeddingProcessor(TextImageEmbeddingProcessor),
97    #[serde(untagged)]
98    CustomProcessor(CustomProcessor),
99}
100
101impl Default for Processor {
102    fn default() -> Self {
103        Processor::CustomProcessor(CustomProcessor::default())
104    }
105}
106
107#[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq, Eq)]
108pub struct CustomProcessor(Map<String, serde_json::Value>);
109
110#[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq, Eq)]
111pub struct KeyValueProcessor {
112    field: String,
113    field_split: String,
114    #[serde(rename = "value_split")]
115    value_split: String,
116    #[serde(default, skip_serializing_if = "Vec::is_empty")]
117    exclude_keys: Vec<String>,
118    #[serde(default, skip_serializing_if = "Option::is_none")]
119    ignore_missing: Option<bool>,
120    #[serde(default, skip_serializing_if = "Vec::is_empty")]
121    include_keys: Vec<String>,
122    #[serde(default, skip_serializing_if = "Option::is_none")]
123    prefix: Option<String>,
124    #[serde(default, skip_serializing_if = "Option::is_none")]
125    strip_brackets: Option<bool>,
126    #[serde(default, skip_serializing_if = "Option::is_none")]
127    target_field: Option<String>,
128    #[serde(default, skip_serializing_if = "Option::is_none")]
129    trim_key: Option<String>,
130    #[serde(default, skip_serializing_if = "Option::is_none")]
131    trim_value: Option<String>,
132    #[serde(default, skip_serializing_if = "Option::is_none")]
133    description: Option<String>,
134    #[serde(rename = "if", default, skip_serializing_if = "Option::is_none")]
135    if_field: Option<String>,
136    #[serde(default, skip_serializing_if = "Option::is_none")]
137    ignore_failure: Option<bool>,
138    #[serde(default, skip_serializing_if = "Vec::is_empty")]
139    on_failure: Vec<Processor>,
140    #[serde(default, skip_serializing_if = "Option::is_none")]
141    tag: Option<String>,
142}
143
144#[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq, Eq)]
145#[serde(rename_all = "snake_case")]
146pub struct SetSecurityUserProcessor {
147    field: String,
148    #[serde(default, skip_serializing_if = "Vec::is_empty")]
149    properties: Vec<String>,
150    #[serde(default, skip_serializing_if = "Option::is_none")]
151    description: Option<String>,
152    #[serde(rename = "if", default, skip_serializing_if = "Option::is_none")]
153    if_field: Option<String>,
154    #[serde(default, skip_serializing_if = "Option::is_none")]
155    ignore_failure: Option<bool>,
156    #[serde(default, skip_serializing_if = "Vec::is_empty")]
157    on_failure: Vec<Processor>,
158    #[serde(default, skip_serializing_if = "Option::is_none")]
159    tag: Option<String>,
160}
161
162#[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq, Eq)]
163pub struct JoinProcessor {
164    field: String,
165    separator: String,
166    #[serde(default, skip_serializing_if = "Option::is_none")]
167    target_field: Option<String>,
168    #[serde(default, skip_serializing_if = "Option::is_none")]
169    description: Option<String>,
170    #[serde(rename = "if", default, skip_serializing_if = "Option::is_none")]
171    if_field: Option<String>,
172    #[serde(default, skip_serializing_if = "Option::is_none")]
173    ignore_failure: Option<bool>,
174    #[serde(default, skip_serializing_if = "Vec::is_empty")]
175    on_failure: Vec<Processor>,
176    #[serde(default, skip_serializing_if = "Option::is_none")]
177    tag: Option<String>,
178}
179#[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq, Eq)]
180#[serde(rename_all = "snake_case")]
181pub struct AttachmentProcessor {
182    field: String,
183    #[serde(default, skip_serializing_if = "Option::is_none")]
184    remove_binary: Option<bool>,
185    #[serde(default, skip_serializing_if = "Option::is_none")]
186    ignore_missing: Option<bool>,
187    #[serde(default, skip_serializing_if = "Option::is_none")]
188    indexed_chars: Option<u64>,
189    #[serde(default, skip_serializing_if = "Option::is_none")]
190    indexed_chars_field: Option<String>,
191    #[serde(default, skip_serializing_if = "Vec::is_empty")]
192    properties: Vec<String>,
193    #[serde(default, skip_serializing_if = "Option::is_none")]
194    target_field: Option<String>,
195    #[serde(default, skip_serializing_if = "Option::is_none")]
196    resource_name: Option<String>,
197    #[serde(default, skip_serializing_if = "Option::is_none")]
198    description: Option<String>,
199    #[serde(rename = "if", default, skip_serializing_if = "Option::is_none")]
200    if_field: Option<String>,
201    #[serde(default, skip_serializing_if = "Option::is_none")]
202    ignore_failure: Option<bool>,
203    #[serde(default, skip_serializing_if = "Vec::is_empty")]
204    on_failure: Vec<Processor>,
205    #[serde(default, skip_serializing_if = "Option::is_none")]
206    tag: Option<String>,
207}
208
209#[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq, Eq)]
210#[serde(rename_all = "snake_case")]
211pub struct ForeachProcessor {
212    field: String,
213    processor: Box<Processor>,
214    #[serde(default, skip_serializing_if = "Option::is_none")]
215    ignore_missing: Option<bool>,
216    #[serde(default, skip_serializing_if = "Option::is_none")]
217    description: Option<String>,
218    #[serde(rename = "if", default, skip_serializing_if = "Option::is_none")]
219    if_field: Option<String>,
220    #[serde(default, skip_serializing_if = "Option::is_none")]
221    ignore_failure: Option<bool>,
222    #[serde(default, skip_serializing_if = "Vec::is_empty")]
223    on_failure: Vec<Processor>,
224    #[serde(default, skip_serializing_if = "Option::is_none")]
225    tag: Option<String>,
226}
227
228#[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq, Eq)]
229#[serde(rename_all = "snake_case")]
230pub struct CsvProcessor {
231    field: String,
232    target_fields: Vec<String>,
233    #[serde(default, skip_serializing_if = "Option::is_none")]
234    empty_value: Option<serde_json::Value>,
235    #[serde(default, skip_serializing_if = "Option::is_none")]
236    quote: Option<String>,
237    #[serde(default, skip_serializing_if = "Option::is_none")]
238    separator: Option<String>,
239    #[serde(default, skip_serializing_if = "Option::is_none")]
240    trim: Option<bool>,
241    #[serde(default, skip_serializing_if = "Option::is_none")]
242    ignore_missing: Option<bool>,
243    #[serde(default, skip_serializing_if = "Option::is_none")]
244    description: Option<String>,
245    #[serde(rename = "if", default, skip_serializing_if = "Option::is_none")]
246    if_field: Option<String>,
247    #[serde(default, skip_serializing_if = "Option::is_none")]
248    ignore_failure: Option<bool>,
249    #[serde(default, skip_serializing_if = "Vec::is_empty")]
250    on_failure: Vec<Processor>,
251    #[serde(default, skip_serializing_if = "Option::is_none")]
252    tag: Option<String>,
253}
254
255#[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq, Eq)]
256#[serde(rename_all = "snake_case")]
257pub struct PipelineProcessor {
258    name: String,
259    #[serde(default, skip_serializing_if = "Option::is_none")]
260    ignore_missing_pipeline: Option<bool>,
261    #[serde(default, skip_serializing_if = "Option::is_none")]
262    description: Option<String>,
263    #[serde(rename = "if", default, skip_serializing_if = "Option::is_none")]
264    if_field: Option<String>,
265    #[serde(default, skip_serializing_if = "Option::is_none")]
266    ignore_failure: Option<bool>,
267    #[serde(default, skip_serializing_if = "Vec::is_empty")]
268    on_failure: Vec<Processor>,
269    #[serde(default, skip_serializing_if = "Option::is_none")]
270    tag: Option<String>,
271}
272
273#[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq, Eq)]
274#[serde(rename_all = "snake_case")]
275pub struct DissectProcessor {
276    field: String,
277    pattern: String,
278    #[serde(default, skip_serializing_if = "Option::is_none")]
279    append_separator: Option<String>,
280    #[serde(default, skip_serializing_if = "Option::is_none")]
281    ignore_missing: Option<bool>,
282    #[serde(default, skip_serializing_if = "Option::is_none")]
283    description: Option<String>,
284    #[serde(rename = "if", default, skip_serializing_if = "Option::is_none")]
285    if_field: Option<String>,
286    #[serde(default, skip_serializing_if = "Option::is_none")]
287    ignore_failure: Option<bool>,
288    #[serde(default, skip_serializing_if = "Vec::is_empty")]
289    on_failure: Vec<Processor>,
290    #[serde(default, skip_serializing_if = "Option::is_none")]
291    tag: Option<String>,
292}
293
294#[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq, Eq)]
295#[serde(rename_all = "snake_case")]
296pub struct UserAgentProcessor {
297    field: String,
298    // #[serde(default, skip_serializing_if = "Vec::is_empty")]
299    // options: Vec<UserAgentProperty>,
300    #[serde(default, skip_serializing_if = "Option::is_none")]
301    regex_file: Option<String>,
302    #[serde(default, skip_serializing_if = "Option::is_none")]
303    target_field: Option<String>,
304    #[serde(default, skip_serializing_if = "Option::is_none")]
305    ignore_missing: Option<bool>,
306    #[serde(default, skip_serializing_if = "Option::is_none")]
307    description: Option<String>,
308    #[serde(rename = "if", default, skip_serializing_if = "Option::is_none")]
309    if_field: Option<String>,
310    #[serde(default, skip_serializing_if = "Option::is_none")]
311    ignore_failure: Option<bool>,
312    #[serde(default, skip_serializing_if = "Vec::is_empty")]
313    on_failure: Vec<Processor>,
314    #[serde(default, skip_serializing_if = "Option::is_none")]
315    tag: Option<String>,
316}
317
318#[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq, Eq)]
319#[serde(rename_all = "snake_case")]
320pub struct RemoveProcessor {
321    field: Vec<String>,
322    ignore_missing: Option<bool>,
323    #[serde(default, skip_serializing_if = "Option::is_none")]
324    description: Option<String>,
325    #[serde(rename = "if", default, skip_serializing_if = "Option::is_none")]
326    if_field: Option<String>,
327    #[serde(default, skip_serializing_if = "Option::is_none")]
328    ignore_failure: Option<bool>,
329    #[serde(default, skip_serializing_if = "Vec::is_empty")]
330    on_failure: Vec<Processor>,
331    #[serde(default, skip_serializing_if = "Option::is_none")]
332    tag: Option<String>,
333}
334
335#[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq, Eq)]
336#[serde(rename_all = "snake_case")]
337pub struct UrlDecodeProcessor {
338    field: String,
339    #[serde(default, skip_serializing_if = "Option::is_none")]
340    target_field: Option<String>,
341    #[serde(default, skip_serializing_if = "Option::is_none")]
342    ignore_missing: Option<bool>,
343    #[serde(default, skip_serializing_if = "Option::is_none")]
344    description: Option<String>,
345    #[serde(rename = "if", default, skip_serializing_if = "Option::is_none")]
346    if_field: Option<String>,
347    #[serde(default, skip_serializing_if = "Option::is_none")]
348    ignore_failure: Option<bool>,
349    #[serde(default, skip_serializing_if = "Vec::is_empty")]
350    on_failure: Vec<Processor>,
351    #[serde(default, skip_serializing_if = "Option::is_none")]
352    tag: Option<String>,
353}
354
355#[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq, Eq)]
356#[serde(rename_all = "snake_case")]
357pub struct SplitProcessor {
358    field: String,
359    separator: String,
360    #[serde(default, skip_serializing_if = "Option::is_none")]
361    preserve_trailing: Option<bool>,
362    #[serde(default, skip_serializing_if = "Option::is_none")]
363    target_field: Option<String>,
364    #[serde(default, skip_serializing_if = "Option::is_none")]
365    ignore_missing: Option<bool>,
366    #[serde(default, skip_serializing_if = "Option::is_none")]
367    description: Option<String>,
368    #[serde(rename = "if", default, skip_serializing_if = "Option::is_none")]
369    if_field: Option<String>,
370    #[serde(default, skip_serializing_if = "Option::is_none")]
371    ignore_failure: Option<bool>,
372    #[serde(default, skip_serializing_if = "Vec::is_empty")]
373    on_failure: Vec<Processor>,
374    #[serde(default, skip_serializing_if = "Option::is_none")]
375    tag: Option<String>,
376}
377
378#[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq, Eq)]
379#[serde(rename_all = "snake_case")]
380pub struct FailProcessor {
381    message: String,
382    #[serde(default, skip_serializing_if = "Option::is_none")]
383    description: Option<String>,
384    #[serde(rename = "if", default, skip_serializing_if = "Option::is_none")]
385    if_field: Option<String>,
386    #[serde(default, skip_serializing_if = "Option::is_none")]
387    ignore_failure: Option<bool>,
388    #[serde(default, skip_serializing_if = "Vec::is_empty")]
389    on_failure: Vec<Processor>,
390    #[serde(default, skip_serializing_if = "Option::is_none")]
391    tag: Option<String>,
392}
393
394#[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq, Eq)]
395#[serde(rename_all = "snake_case")]
396pub struct SortProcessor {
397    field: String,
398    // #[serde(default, skip_serializing_if = "Option::is_none")]
399    // order: Option<SortOrder>,
400    #[serde(default, skip_serializing_if = "Option::is_none")]
401    target_field: Option<String>,
402    #[serde(default, skip_serializing_if = "Option::is_none")]
403    description: Option<String>,
404    #[serde(rename = "if", default, skip_serializing_if = "Option::is_none")]
405    if_field: Option<String>,
406    #[serde(default, skip_serializing_if = "Option::is_none")]
407    ignore_failure: Option<bool>,
408    #[serde(default, skip_serializing_if = "Vec::is_empty")]
409    on_failure: Vec<Processor>,
410    #[serde(default, skip_serializing_if = "Option::is_none")]
411    tag: Option<String>,
412}
413
414// #[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq, Eq)]
415// #[serde(rename_all = "snake_case")]
416// pub struct CircleProcessor {
417//   field: String,
418//   error_distance: f64,
419//   shape_type: ShapeType,
420//   #[serde(default, skip_serializing_if = "Option::is_none")]
421//   target_field: Option<String>,
422//   #[serde(default, skip_serializing_if = "Option::is_none")]
423//   ignore_missing: Option<bool>,
424//   #[serde(default, skip_serializing_if = "Option::is_none")]
425//   description: Option<String>,
426//   #[serde(rename = "if", default, skip_serializing_if = "Option::is_none")]
427//   if_field: Option<String>,
428//   #[serde(default, skip_serializing_if = "Option::is_none")]
429//   ignore_failure: Option<bool>,
430//   #[serde(default, skip_serializing_if = "Vec::is_empty")]
431//   on_failure: Vec<Processor>,
432//   #[serde(default, skip_serializing_if = "Option::is_none")]
433//   tag: Option<String>,
434// }
435
436#[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq, Eq)]
437#[serde(rename_all = "snake_case")]
438pub struct TrimProcessor {
439    field: String,
440    #[serde(default, skip_serializing_if = "Option::is_none")]
441    target_field: Option<String>,
442    #[serde(default, skip_serializing_if = "Option::is_none")]
443    ignore_missing: Option<bool>,
444    #[serde(default, skip_serializing_if = "Option::is_none")]
445    description: Option<String>,
446    #[serde(rename = "if", default, skip_serializing_if = "Option::is_none")]
447    if_field: Option<String>,
448    #[serde(default, skip_serializing_if = "Option::is_none")]
449    ignore_failure: Option<bool>,
450    #[serde(default, skip_serializing_if = "Vec::is_empty")]
451    on_failure: Vec<Processor>,
452    #[serde(default, skip_serializing_if = "Option::is_none")]
453    tag: Option<String>,
454}
455
456#[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq, Eq)]
457#[serde(rename_all = "snake_case")]
458pub struct ScriptProcessor {
459    #[serde(default, skip_serializing_if = "Option::is_none")]
460    id: Option<String>,
461    #[serde(default, skip_serializing_if = "Option::is_none")]
462    lang: Option<String>,
463    #[serde(default, skip_serializing_if = "Option::is_none")]
464    params: Option<std::collections::HashMap<String, serde_json::Value>>,
465    #[serde(default, skip_serializing_if = "Option::is_none")]
466    source: Option<String>,
467    #[serde(default, skip_serializing_if = "Option::is_none")]
468    description: Option<String>,
469    #[serde(rename = "if", default, skip_serializing_if = "Option::is_none")]
470    if_field: Option<String>,
471    #[serde(default, skip_serializing_if = "Option::is_none")]
472    ignore_failure: Option<bool>,
473    #[serde(default, skip_serializing_if = "Vec::is_empty")]
474    on_failure: Vec<Processor>,
475    #[serde(default, skip_serializing_if = "Option::is_none")]
476    tag: Option<String>,
477}
478
479#[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq, Eq)]
480#[serde(rename_all = "snake_case")]
481pub struct JsonProcessor {
482    field: String,
483    #[serde(default, skip_serializing_if = "Option::is_none")]
484    target_field: Option<String>,
485    #[serde(default, skip_serializing_if = "Option::is_none")]
486    add_to_root: Option<bool>,
487    // #[serde(default, skip_serializing_if = "Option::is_none")]
488    // add_to_root_conflict_strategy: Option<JsonProcessorConflictStrategy>,
489    #[serde(default, skip_serializing_if = "Option::is_none")]
490    allow_duplicate_keys: Option<bool>,
491    #[serde(default, skip_serializing_if = "Option::is_none")]
492    description: Option<String>,
493    #[serde(rename = "if", default, skip_serializing_if = "Option::is_none")]
494    if_field: Option<String>,
495    #[serde(default, skip_serializing_if = "Option::is_none")]
496    ignore_failure: Option<bool>,
497    #[serde(default, skip_serializing_if = "Vec::is_empty")]
498    on_failure: Vec<Processor>,
499    #[serde(default, skip_serializing_if = "Option::is_none")]
500    tag: Option<String>,
501}
502
503#[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq, Eq)]
504pub struct UppercaseProcessor {
505    field: String,
506    #[serde(default, skip_serializing_if = "Option::is_none")]
507    target_field: Option<String>,
508    #[serde(default, skip_serializing_if = "Option::is_none")]
509    ignore_missing: Option<bool>,
510    #[serde(default, skip_serializing_if = "Option::is_none")]
511    description: Option<String>,
512    #[serde(rename = "if", default, skip_serializing_if = "Option::is_none")]
513    if_field: Option<String>,
514    #[serde(default, skip_serializing_if = "Option::is_none")]
515    ignore_failure: Option<bool>,
516    #[serde(default, skip_serializing_if = "Vec::is_empty")]
517    on_failure: Vec<Processor>,
518    #[serde(default, skip_serializing_if = "Option::is_none")]
519    tag: Option<String>,
520}
521
522#[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq, Eq)]
523pub struct DateProcessor {
524    field: String,
525    formats: Vec<String>,
526    #[serde(default, skip_serializing_if = "Option::is_none")]
527    locale: Option<String>,
528    #[serde(default, skip_serializing_if = "Option::is_none")]
529    target_field: Option<String>,
530    #[serde(default, skip_serializing_if = "Option::is_none")]
531    timezone: Option<String>,
532    #[serde(default, skip_serializing_if = "Option::is_none")]
533    description: Option<String>,
534    #[serde(rename = "if", default, skip_serializing_if = "Option::is_none")]
535    if_field: Option<String>,
536    #[serde(default, skip_serializing_if = "Option::is_none")]
537    ignore_failure: Option<bool>,
538    #[serde(default, skip_serializing_if = "Vec::is_empty")]
539    on_failure: Vec<Processor>,
540    #[serde(default, skip_serializing_if = "Option::is_none")]
541    tag: Option<String>,
542}
543
544#[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq, Eq)]
545pub struct DotExpanderProcessor {
546    field: String,
547    #[serde(default, skip_serializing_if = "Option::is_none")]
548    path: Option<String>,
549    #[serde(default, skip_serializing_if = "Option::is_none")]
550    description: Option<String>,
551    #[serde(rename = "if", default, skip_serializing_if = "Option::is_none")]
552    if_field: Option<String>,
553    #[serde(default, skip_serializing_if = "Option::is_none")]
554    ignore_failure: Option<bool>,
555    #[serde(default, skip_serializing_if = "Vec::is_empty")]
556    on_failure: Vec<Processor>,
557    #[serde(default, skip_serializing_if = "Option::is_none")]
558    tag: Option<String>,
559}
560
561#[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq, Eq)]
562pub struct LowercaseProcessor {
563    field: String,
564    #[serde(default, skip_serializing_if = "Option::is_none")]
565    ignore_missing: Option<bool>,
566    #[serde(default, skip_serializing_if = "Option::is_none")]
567    target_field: Option<String>,
568    #[serde(default, skip_serializing_if = "Option::is_none")]
569    description: Option<String>,
570    #[serde(rename = "if", default, skip_serializing_if = "Option::is_none")]
571    if_field: Option<String>,
572    #[serde(default, skip_serializing_if = "Option::is_none")]
573    ignore_failure: Option<bool>,
574    #[serde(default, skip_serializing_if = "Vec::is_empty")]
575    on_failure: Vec<Processor>,
576    #[serde(default, skip_serializing_if = "Option::is_none")]
577    tag: Option<String>,
578}
579
580#[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq, Eq)]
581pub struct SetProcessor {
582    field: String,
583    #[serde(default, skip_serializing_if = "Option::is_none")]
584    copy_from: Option<String>,
585    #[serde(default, skip_serializing_if = "Option::is_none")]
586    ignore_empty_value: Option<bool>,
587    #[serde(default, skip_serializing_if = "Option::is_none")]
588    media_type: Option<String>,
589    #[serde(default, skip_serializing_if = "Option::is_none")]
590    override_field: Option<bool>,
591    #[serde(default, skip_serializing_if = "Option::is_none")]
592    value: Option<serde_json::Value>,
593    #[serde(default, skip_serializing_if = "Option::is_none")]
594    description: Option<String>,
595    #[serde(rename = "if", default, skip_serializing_if = "Option::is_none")]
596    if_field: Option<String>,
597    #[serde(default, skip_serializing_if = "Option::is_none")]
598    ignore_failure: Option<bool>,
599    #[serde(default, skip_serializing_if = "Vec::is_empty")]
600    on_failure: Vec<Processor>,
601    #[serde(default, skip_serializing_if = "Option::is_none")]
602    tag: Option<String>,
603}
604
605#[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq, Eq)]
606pub struct GrokProcessor {
607    field: String,
608    patterns: Vec<String>,
609    #[serde(default, skip_serializing_if = "HashMap::is_empty")]
610    pattern_definitions: HashMap<String, String>,
611    #[serde(default, skip_serializing_if = "Option::is_none")]
612    ignore_missing: Option<bool>,
613    #[serde(default, skip_serializing_if = "Option::is_none")]
614    trace_match: Option<bool>,
615    #[serde(default, skip_serializing_if = "Option::is_none")]
616    description: Option<String>,
617    #[serde(rename = "if", default, skip_serializing_if = "Option::is_none")]
618    if_field: Option<String>,
619    #[serde(default, skip_serializing_if = "Option::is_none")]
620    ignore_failure: Option<bool>,
621    #[serde(default, skip_serializing_if = "Vec::is_empty")]
622    on_failure: Vec<Processor>,
623    #[serde(default, skip_serializing_if = "Option::is_none")]
624    tag: Option<String>,
625}
626
627#[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq, Eq)]
628pub struct GsubProcessor {
629    field: String,
630    pattern: String,
631    replacement: String,
632    #[serde(default, skip_serializing_if = "Option::is_none")]
633    ignore_missing: Option<bool>,
634    #[serde(default, skip_serializing_if = "Option::is_none")]
635    target_field: Option<String>,
636    #[serde(default, skip_serializing_if = "Option::is_none")]
637    description: Option<String>,
638    #[serde(rename = "if", default, skip_serializing_if = "Option::is_none")]
639    if_field: Option<String>,
640    #[serde(default, skip_serializing_if = "Option::is_none")]
641    ignore_failure: Option<bool>,
642    #[serde(default, skip_serializing_if = "Vec::is_empty")]
643    on_failure: Vec<Processor>,
644    #[serde(default, skip_serializing_if = "Option::is_none")]
645    tag: Option<String>,
646}
647
648#[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq, Eq)]
649pub struct ConvertProcessor {
650    field: String,
651    type_field: String,
652    #[serde(default, skip_serializing_if = "Option::is_none")]
653    ignore_missing: Option<bool>,
654    #[serde(default, skip_serializing_if = "Option::is_none")]
655    target_field: Option<String>,
656    #[serde(default, skip_serializing_if = "Option::is_none")]
657    description: Option<String>,
658    #[serde(rename = "if", default, skip_serializing_if = "Option::is_none")]
659    if_field: Option<String>,
660    #[serde(default, skip_serializing_if = "Option::is_none")]
661    ignore_failure: Option<bool>,
662    #[serde(default, skip_serializing_if = "Vec::is_empty")]
663    on_failure: Vec<Processor>,
664    #[serde(default, skip_serializing_if = "Option::is_none")]
665    tag: Option<String>,
666}
667
668#[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq, Eq)]
669pub struct GeoIpProcessor {
670    field: String,
671    #[serde(default, skip_serializing_if = "Vec::is_empty")]
672    properties: Vec<String>,
673    #[serde(default, skip_serializing_if = "Option::is_none")]
674    database_file: Option<String>,
675    #[serde(default, skip_serializing_if = "Option::is_none")]
676    first_only: Option<bool>,
677    #[serde(default, skip_serializing_if = "Option::is_none")]
678    ignore_missing: Option<bool>,
679    #[serde(default, skip_serializing_if = "Option::is_none")]
680    target_field: Option<String>,
681    #[serde(default, skip_serializing_if = "Option::is_none")]
682    description: Option<String>,
683    #[serde(rename = "if", default, skip_serializing_if = "Option::is_none")]
684    if_field: Option<String>,
685    #[serde(default, skip_serializing_if = "Option::is_none")]
686    ignore_failure: Option<bool>,
687    #[serde(default, skip_serializing_if = "Vec::is_empty")]
688    on_failure: Vec<Processor>,
689    #[serde(default, skip_serializing_if = "Option::is_none")]
690    tag: Option<String>,
691}
692
693#[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq, Eq)]
694pub struct BytesProcessor {
695    field: String,
696    #[serde(default, skip_serializing_if = "Option::is_none")]
697    target_field: Option<String>,
698    #[serde(default, skip_serializing_if = "Option::is_none")]
699    ignore_missing: Option<bool>,
700    #[serde(default, skip_serializing_if = "Option::is_none")]
701    description: Option<String>,
702    #[serde(rename = "if", default, skip_serializing_if = "Option::is_none")]
703    if_field: Option<String>,
704    #[serde(default, skip_serializing_if = "Option::is_none")]
705    ignore_failure: Option<bool>,
706    #[serde(default, skip_serializing_if = "Vec::is_empty")]
707    on_failure: Vec<Processor>,
708    #[serde(default, skip_serializing_if = "Option::is_none")]
709    tag: Option<String>,
710}
711
712#[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq, Eq)]
713pub struct InferenceProcessor {
714    model_id: String,
715    #[serde(default, skip_serializing_if = "Option::is_none")]
716    target_field: Option<String>,
717    #[serde(default, skip_serializing_if = "HashMap::is_empty")]
718    field_map: HashMap<String, serde_json::Value>,
719    #[serde(default, skip_serializing_if = "HashMap::is_empty")]
720    inference_config: HashMap<String, serde_json::Value>,
721    #[serde(default, skip_serializing_if = "Option::is_none")]
722    description: Option<String>,
723    #[serde(rename = "if", default, skip_serializing_if = "Option::is_none")]
724    if_field: Option<String>,
725    #[serde(default, skip_serializing_if = "Option::is_none")]
726    ignore_failure: Option<bool>,
727    #[serde(default, skip_serializing_if = "Vec::is_empty")]
728    on_failure: Vec<Processor>,
729    #[serde(default, skip_serializing_if = "Option::is_none")]
730    tag: Option<String>,
731}
732
733#[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq, Eq)]
734pub struct RenameProcessor {
735    field: String,
736    target_field: String,
737    #[serde(default, skip_serializing_if = "Option::is_none")]
738    ignore_missing: Option<bool>,
739    #[serde(default, skip_serializing_if = "Option::is_none")]
740    description: Option<String>,
741    #[serde(rename = "if", default, skip_serializing_if = "Option::is_none")]
742    if_field: Option<String>,
743    #[serde(default, skip_serializing_if = "Option::is_none")]
744    ignore_failure: Option<bool>,
745    #[serde(default, skip_serializing_if = "Vec::is_empty")]
746    on_failure: Vec<Processor>,
747    #[serde(default, skip_serializing_if = "Option::is_none")]
748    tag: Option<String>,
749}
750
751#[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq, Eq)]
752pub struct AppendProcessor {
753    field: String,
754    value: Vec<serde_json::Value>,
755    #[serde(default, skip_serializing_if = "Option::is_none")]
756    allow_duplicates: Option<bool>,
757    #[serde(default, skip_serializing_if = "Option::is_none")]
758    description: Option<String>,
759    #[serde(rename = "if", default, skip_serializing_if = "Option::is_none")]
760    if_field: Option<String>,
761    #[serde(default, skip_serializing_if = "Option::is_none")]
762    ignore_failure: Option<bool>,
763    #[serde(default, skip_serializing_if = "Vec::is_empty")]
764    on_failure: Vec<Processor>,
765    #[serde(default, skip_serializing_if = "Option::is_none")]
766    tag: Option<String>,
767}
768
769#[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq, Eq)]
770pub struct DateIndexNameProcessor {
771    field: String,
772    date_rounding: String,
773    #[serde(default, skip_serializing_if = "Vec::is_empty")]
774    date_formats: Vec<String>,
775    #[serde(default, skip_serializing_if = "Option::is_none")]
776    index_name_format: Option<String>,
777    #[serde(default, skip_serializing_if = "Option::is_none")]
778    index_name_prefix: Option<String>,
779    #[serde(default, skip_serializing_if = "Option::is_none")]
780    locale: Option<String>,
781    #[serde(default, skip_serializing_if = "Option::is_none")]
782    timezone: Option<String>,
783    #[serde(default, skip_serializing_if = "Option::is_none")]
784    description: Option<String>,
785    #[serde(rename = "if", default, skip_serializing_if = "Option::is_none")]
786    if_field: Option<String>,
787    #[serde(default, skip_serializing_if = "Option::is_none")]
788    ignore_failure: Option<bool>,
789    #[serde(default, skip_serializing_if = "Vec::is_empty")]
790    on_failure: Vec<Processor>,
791    #[serde(default, skip_serializing_if = "Option::is_none")]
792    tag: Option<String>,
793}
794
795#[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq, Eq)]
796pub struct DropProcessor {
797    #[serde(default, skip_serializing_if = "Option::is_none")]
798    description: Option<String>,
799    #[serde(rename = "if", default, skip_serializing_if = "Option::is_none")]
800    if_field: Option<String>,
801    #[serde(default, skip_serializing_if = "Option::is_none")]
802    ignore_failure: Option<bool>,
803    #[serde(default, skip_serializing_if = "Vec::is_empty")]
804    on_failure: Vec<Processor>,
805    #[serde(default, skip_serializing_if = "Option::is_none")]
806    tag: Option<String>,
807}
808
809#[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq, Eq)]
810pub struct SparseEncodingProcessor {
811    model_id: String,
812    field_map: HashMap<String, String>,
813    #[serde(default, skip_serializing_if = "Option::is_none")]
814    description: Option<String>,
815    #[serde(default, skip_serializing_if = "Option::is_none")]
816    tag: Option<String>,
817}
818
819#[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq, Eq)]
820pub struct TextEmbeddingProcessor {
821    model_id: String,
822    field_map: HashMap<String, String>,
823    #[serde(default, skip_serializing_if = "Option::is_none")]
824    description: Option<String>,
825    #[serde(default, skip_serializing_if = "Option::is_none")]
826    tag: Option<String>,
827}
828
829#[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq, Eq)]
830pub struct TextImageEmbedding {
831    #[serde(default, skip_serializing_if = "Option::is_none")]
832    text: Option<String>,
833    #[serde(default, skip_serializing_if = "Option::is_none")]
834    image: Option<String>,
835}
836
837#[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq, Eq)]
838pub struct TextImageEmbeddingProcessor {
839    model_id: String,
840    embedding: String,
841    field_map: TextImageEmbedding,
842    #[serde(default, skip_serializing_if = "Option::is_none")]
843    description: Option<String>,
844    #[serde(default, skip_serializing_if = "Option::is_none")]
845    tag: Option<String>,
846}
847#[cfg(test)]
848mod tests {
849
850    use std::{default, path::PathBuf};
851
852    use serde::de::DeserializeOwned;
853
854    use super::*;
855    fn load_entity<T: DeserializeOwned>(name: &str) -> T {
856        let filename = PathBuf::from(env!("CARGO_MANIFEST_DIR"))
857            .join(format!("tests/ingest/pipeline.{name}.json"));
858        let text = std::fs::read_to_string(filename).unwrap();
859        serde_json::from_str(&text).unwrap()
860    }
861
862    #[test]
863    fn test_ml_processors() {
864        let decoded: Pipeline = load_entity("ml");
865        let expected = Pipeline {
866            description: Some("A ml pipeline".to_owned()),
867            processors: vec![
868                Processor::SparseEncodingProcessor(SparseEncodingProcessor {
869                    model_id: "aP2Q8ooBpBj3wT4HVS8a".to_owned(),
870                    field_map: vec![("passage_text".to_owned(), "passage_embedding".to_owned())]
871                        .into_iter()
872                        .collect(),
873                    ..default::Default::default()
874                }),
875                Processor::TextEmbeddingProcessor(TextEmbeddingProcessor {
876                    model_id: "bQ1J8ooBpBj3wT4HVUsb".to_owned(),
877                    field_map: vec![("passage_text".to_owned(), "passage_embedding".to_owned())]
878                        .into_iter()
879                        .collect(),
880                    ..default::Default::default()
881                }),
882                Processor::TextImageEmbeddingProcessor(TextImageEmbeddingProcessor {
883                    model_id: "bQ1J8ooBpBj3wT4HVUsb".to_owned(),
884                    embedding: "vector_embedding".to_owned(),
885                    field_map: TextImageEmbedding {
886                        text: Some("image_description".to_owned()),
887                        image: Some("image_binary".to_owned()),
888                    },
889                    ..default::Default::default()
890                }),
891            ],
892            ..Default::default()
893        };
894        // println!("{}", serde_json::to_string(&expected).unwrap());
895        assert_eq!(decoded.description, Some("A ml pipeline".to_owned()));
896        assert_eq!(decoded, expected);
897    }
898}