1use std::{collections::HashMap, option::Option, vec::Vec};
2
3use serde::{Deserialize, Serialize};
4use serde_json::Map;
5
6#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
7pub struct Pipeline {
8 #[serde(default, skip_serializing_if = "Option::is_none")]
9 description: Option<String>,
10 #[serde(default, rename = "on_failure", skip_serializing_if = "Vec::is_empty")]
11 on_failure: Vec<Processor>,
12 #[serde(default, rename = "processors", skip_serializing_if = "Vec::is_empty")]
13 processors: Vec<Processor>,
14 #[serde(default, skip_serializing_if = "Option::is_none")]
15 version: Option<u32>,
16}
17
18impl Pipeline {
19 pub fn new() -> Pipeline {
20 Pipeline::default()
21 }
22}
23
24#[derive(Clone, Debug, Deserialize, Serialize, PartialEq, Eq)]
25pub enum Processor {
26 #[serde(rename = "key_value")]
27 KeyValueProcessor(KeyValueProcessor),
28 #[serde(rename = "set_security_user")]
29 SetSecurityUserProcessor(SetSecurityUserProcessor),
30 #[serde(rename = "join")]
31 JoinProcessor(JoinProcessor),
32 #[serde(rename = "attachment")]
33 AttachmentProcessor(AttachmentProcessor),
34 #[serde(rename = "foreach")]
35 ForeachProcessor(ForeachProcessor),
36 #[serde(rename = "csv")]
37 CsvProcessor(CsvProcessor),
38 #[serde(rename = "pipeline")]
39 PipelineProcessor(PipelineProcessor),
40 #[serde(rename = "dissect")]
41 DissectProcessor(DissectProcessor),
42 #[serde(rename = "user_agent")]
43 UserAgentProcessor(UserAgentProcessor),
44 #[serde(rename = "remove")]
45 RemoveProcessor(RemoveProcessor),
46 #[serde(rename = "url_decode")]
47 UrlDecodeProcessor(UrlDecodeProcessor),
48 #[serde(rename = "split")]
49 SplitProcessor(SplitProcessor),
50 #[serde(rename = "fail")]
51 FailProcessor(FailProcessor),
52 #[serde(rename = "sort")]
53 SortProcessor(SortProcessor),
54 #[serde(rename = "trim")]
56 TrimProcessor(TrimProcessor),
57 #[serde(rename = "script")]
58 ScriptProcessor(ScriptProcessor),
59 #[serde(rename = "json")]
60 JsonProcessor(JsonProcessor),
61 #[serde(rename = "uppercase")]
62 UppercaseProcessor(UppercaseProcessor),
63 #[serde(rename = "date")]
64 DateProcessor(DateProcessor),
65 #[serde(rename = "dot_expander")]
66 DotExpanderProcessor(DotExpanderProcessor),
67 #[serde(rename = "lowercase")]
68 LowercaseProcessor(LowercaseProcessor),
69 #[serde(rename = "set")]
70 SetProcessor(SetProcessor),
71 #[serde(rename = "grok")]
72 GrokProcessor(GrokProcessor),
73 #[serde(rename = "gsub")]
74 GsubProcessor(GsubProcessor),
75 #[serde(rename = "convert")]
76 ConvertProcessor(ConvertProcessor),
77 #[serde(rename = "geo_ip")]
78 GeoIpProcessor(GeoIpProcessor),
79 #[serde(rename = "bytes")]
80 BytesProcessor(BytesProcessor),
81 #[serde(rename = "inference")]
82 InferenceProcessor(InferenceProcessor),
83 #[serde(rename = "rename")]
84 RenameProcessor(RenameProcessor),
85 #[serde(rename = "append")]
86 AppendProcessor(AppendProcessor),
87 #[serde(rename = "date_index_name")]
88 DateIndexNameProcessor(DateIndexNameProcessor),
89 #[serde(rename = "drop")]
90 DropProcessor(DropProcessor),
91 #[serde(rename = "sparse_encoding")]
92 SparseEncodingProcessor(SparseEncodingProcessor),
93 #[serde(rename = "text_embedding")]
94 TextEmbeddingProcessor(TextEmbeddingProcessor),
95 #[serde(rename = "text_image_embedding")]
96 TextImageEmbeddingProcessor(TextImageEmbeddingProcessor),
97 #[serde(untagged)]
98 CustomProcessor(CustomProcessor),
99}
100
101impl Default for Processor {
102 fn default() -> Self {
103 Processor::CustomProcessor(CustomProcessor::default())
104 }
105}
106
107#[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq, Eq)]
108pub struct CustomProcessor(Map<String, serde_json::Value>);
109
110#[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq, Eq)]
111pub struct KeyValueProcessor {
112 field: String,
113 field_split: String,
114 #[serde(rename = "value_split")]
115 value_split: String,
116 #[serde(default, skip_serializing_if = "Vec::is_empty")]
117 exclude_keys: Vec<String>,
118 #[serde(default, skip_serializing_if = "Option::is_none")]
119 ignore_missing: Option<bool>,
120 #[serde(default, skip_serializing_if = "Vec::is_empty")]
121 include_keys: Vec<String>,
122 #[serde(default, skip_serializing_if = "Option::is_none")]
123 prefix: Option<String>,
124 #[serde(default, skip_serializing_if = "Option::is_none")]
125 strip_brackets: Option<bool>,
126 #[serde(default, skip_serializing_if = "Option::is_none")]
127 target_field: Option<String>,
128 #[serde(default, skip_serializing_if = "Option::is_none")]
129 trim_key: Option<String>,
130 #[serde(default, skip_serializing_if = "Option::is_none")]
131 trim_value: Option<String>,
132 #[serde(default, skip_serializing_if = "Option::is_none")]
133 description: Option<String>,
134 #[serde(rename = "if", default, skip_serializing_if = "Option::is_none")]
135 if_field: Option<String>,
136 #[serde(default, skip_serializing_if = "Option::is_none")]
137 ignore_failure: Option<bool>,
138 #[serde(default, skip_serializing_if = "Vec::is_empty")]
139 on_failure: Vec<Processor>,
140 #[serde(default, skip_serializing_if = "Option::is_none")]
141 tag: Option<String>,
142}
143
144#[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq, Eq)]
145#[serde(rename_all = "snake_case")]
146pub struct SetSecurityUserProcessor {
147 field: String,
148 #[serde(default, skip_serializing_if = "Vec::is_empty")]
149 properties: Vec<String>,
150 #[serde(default, skip_serializing_if = "Option::is_none")]
151 description: Option<String>,
152 #[serde(rename = "if", default, skip_serializing_if = "Option::is_none")]
153 if_field: Option<String>,
154 #[serde(default, skip_serializing_if = "Option::is_none")]
155 ignore_failure: Option<bool>,
156 #[serde(default, skip_serializing_if = "Vec::is_empty")]
157 on_failure: Vec<Processor>,
158 #[serde(default, skip_serializing_if = "Option::is_none")]
159 tag: Option<String>,
160}
161
162#[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq, Eq)]
163pub struct JoinProcessor {
164 field: String,
165 separator: String,
166 #[serde(default, skip_serializing_if = "Option::is_none")]
167 target_field: Option<String>,
168 #[serde(default, skip_serializing_if = "Option::is_none")]
169 description: Option<String>,
170 #[serde(rename = "if", default, skip_serializing_if = "Option::is_none")]
171 if_field: Option<String>,
172 #[serde(default, skip_serializing_if = "Option::is_none")]
173 ignore_failure: Option<bool>,
174 #[serde(default, skip_serializing_if = "Vec::is_empty")]
175 on_failure: Vec<Processor>,
176 #[serde(default, skip_serializing_if = "Option::is_none")]
177 tag: Option<String>,
178}
179#[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq, Eq)]
180#[serde(rename_all = "snake_case")]
181pub struct AttachmentProcessor {
182 field: String,
183 #[serde(default, skip_serializing_if = "Option::is_none")]
184 remove_binary: Option<bool>,
185 #[serde(default, skip_serializing_if = "Option::is_none")]
186 ignore_missing: Option<bool>,
187 #[serde(default, skip_serializing_if = "Option::is_none")]
188 indexed_chars: Option<u64>,
189 #[serde(default, skip_serializing_if = "Option::is_none")]
190 indexed_chars_field: Option<String>,
191 #[serde(default, skip_serializing_if = "Vec::is_empty")]
192 properties: Vec<String>,
193 #[serde(default, skip_serializing_if = "Option::is_none")]
194 target_field: Option<String>,
195 #[serde(default, skip_serializing_if = "Option::is_none")]
196 resource_name: Option<String>,
197 #[serde(default, skip_serializing_if = "Option::is_none")]
198 description: Option<String>,
199 #[serde(rename = "if", default, skip_serializing_if = "Option::is_none")]
200 if_field: Option<String>,
201 #[serde(default, skip_serializing_if = "Option::is_none")]
202 ignore_failure: Option<bool>,
203 #[serde(default, skip_serializing_if = "Vec::is_empty")]
204 on_failure: Vec<Processor>,
205 #[serde(default, skip_serializing_if = "Option::is_none")]
206 tag: Option<String>,
207}
208
209#[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq, Eq)]
210#[serde(rename_all = "snake_case")]
211pub struct ForeachProcessor {
212 field: String,
213 processor: Box<Processor>,
214 #[serde(default, skip_serializing_if = "Option::is_none")]
215 ignore_missing: Option<bool>,
216 #[serde(default, skip_serializing_if = "Option::is_none")]
217 description: Option<String>,
218 #[serde(rename = "if", default, skip_serializing_if = "Option::is_none")]
219 if_field: Option<String>,
220 #[serde(default, skip_serializing_if = "Option::is_none")]
221 ignore_failure: Option<bool>,
222 #[serde(default, skip_serializing_if = "Vec::is_empty")]
223 on_failure: Vec<Processor>,
224 #[serde(default, skip_serializing_if = "Option::is_none")]
225 tag: Option<String>,
226}
227
228#[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq, Eq)]
229#[serde(rename_all = "snake_case")]
230pub struct CsvProcessor {
231 field: String,
232 target_fields: Vec<String>,
233 #[serde(default, skip_serializing_if = "Option::is_none")]
234 empty_value: Option<serde_json::Value>,
235 #[serde(default, skip_serializing_if = "Option::is_none")]
236 quote: Option<String>,
237 #[serde(default, skip_serializing_if = "Option::is_none")]
238 separator: Option<String>,
239 #[serde(default, skip_serializing_if = "Option::is_none")]
240 trim: Option<bool>,
241 #[serde(default, skip_serializing_if = "Option::is_none")]
242 ignore_missing: Option<bool>,
243 #[serde(default, skip_serializing_if = "Option::is_none")]
244 description: Option<String>,
245 #[serde(rename = "if", default, skip_serializing_if = "Option::is_none")]
246 if_field: Option<String>,
247 #[serde(default, skip_serializing_if = "Option::is_none")]
248 ignore_failure: Option<bool>,
249 #[serde(default, skip_serializing_if = "Vec::is_empty")]
250 on_failure: Vec<Processor>,
251 #[serde(default, skip_serializing_if = "Option::is_none")]
252 tag: Option<String>,
253}
254
255#[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq, Eq)]
256#[serde(rename_all = "snake_case")]
257pub struct PipelineProcessor {
258 name: String,
259 #[serde(default, skip_serializing_if = "Option::is_none")]
260 ignore_missing_pipeline: Option<bool>,
261 #[serde(default, skip_serializing_if = "Option::is_none")]
262 description: Option<String>,
263 #[serde(rename = "if", default, skip_serializing_if = "Option::is_none")]
264 if_field: Option<String>,
265 #[serde(default, skip_serializing_if = "Option::is_none")]
266 ignore_failure: Option<bool>,
267 #[serde(default, skip_serializing_if = "Vec::is_empty")]
268 on_failure: Vec<Processor>,
269 #[serde(default, skip_serializing_if = "Option::is_none")]
270 tag: Option<String>,
271}
272
273#[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq, Eq)]
274#[serde(rename_all = "snake_case")]
275pub struct DissectProcessor {
276 field: String,
277 pattern: String,
278 #[serde(default, skip_serializing_if = "Option::is_none")]
279 append_separator: Option<String>,
280 #[serde(default, skip_serializing_if = "Option::is_none")]
281 ignore_missing: Option<bool>,
282 #[serde(default, skip_serializing_if = "Option::is_none")]
283 description: Option<String>,
284 #[serde(rename = "if", default, skip_serializing_if = "Option::is_none")]
285 if_field: Option<String>,
286 #[serde(default, skip_serializing_if = "Option::is_none")]
287 ignore_failure: Option<bool>,
288 #[serde(default, skip_serializing_if = "Vec::is_empty")]
289 on_failure: Vec<Processor>,
290 #[serde(default, skip_serializing_if = "Option::is_none")]
291 tag: Option<String>,
292}
293
294#[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq, Eq)]
295#[serde(rename_all = "snake_case")]
296pub struct UserAgentProcessor {
297 field: String,
298 #[serde(default, skip_serializing_if = "Option::is_none")]
301 regex_file: Option<String>,
302 #[serde(default, skip_serializing_if = "Option::is_none")]
303 target_field: Option<String>,
304 #[serde(default, skip_serializing_if = "Option::is_none")]
305 ignore_missing: Option<bool>,
306 #[serde(default, skip_serializing_if = "Option::is_none")]
307 description: Option<String>,
308 #[serde(rename = "if", default, skip_serializing_if = "Option::is_none")]
309 if_field: Option<String>,
310 #[serde(default, skip_serializing_if = "Option::is_none")]
311 ignore_failure: Option<bool>,
312 #[serde(default, skip_serializing_if = "Vec::is_empty")]
313 on_failure: Vec<Processor>,
314 #[serde(default, skip_serializing_if = "Option::is_none")]
315 tag: Option<String>,
316}
317
318#[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq, Eq)]
319#[serde(rename_all = "snake_case")]
320pub struct RemoveProcessor {
321 field: Vec<String>,
322 ignore_missing: Option<bool>,
323 #[serde(default, skip_serializing_if = "Option::is_none")]
324 description: Option<String>,
325 #[serde(rename = "if", default, skip_serializing_if = "Option::is_none")]
326 if_field: Option<String>,
327 #[serde(default, skip_serializing_if = "Option::is_none")]
328 ignore_failure: Option<bool>,
329 #[serde(default, skip_serializing_if = "Vec::is_empty")]
330 on_failure: Vec<Processor>,
331 #[serde(default, skip_serializing_if = "Option::is_none")]
332 tag: Option<String>,
333}
334
335#[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq, Eq)]
336#[serde(rename_all = "snake_case")]
337pub struct UrlDecodeProcessor {
338 field: String,
339 #[serde(default, skip_serializing_if = "Option::is_none")]
340 target_field: Option<String>,
341 #[serde(default, skip_serializing_if = "Option::is_none")]
342 ignore_missing: Option<bool>,
343 #[serde(default, skip_serializing_if = "Option::is_none")]
344 description: Option<String>,
345 #[serde(rename = "if", default, skip_serializing_if = "Option::is_none")]
346 if_field: Option<String>,
347 #[serde(default, skip_serializing_if = "Option::is_none")]
348 ignore_failure: Option<bool>,
349 #[serde(default, skip_serializing_if = "Vec::is_empty")]
350 on_failure: Vec<Processor>,
351 #[serde(default, skip_serializing_if = "Option::is_none")]
352 tag: Option<String>,
353}
354
355#[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq, Eq)]
356#[serde(rename_all = "snake_case")]
357pub struct SplitProcessor {
358 field: String,
359 separator: String,
360 #[serde(default, skip_serializing_if = "Option::is_none")]
361 preserve_trailing: Option<bool>,
362 #[serde(default, skip_serializing_if = "Option::is_none")]
363 target_field: Option<String>,
364 #[serde(default, skip_serializing_if = "Option::is_none")]
365 ignore_missing: Option<bool>,
366 #[serde(default, skip_serializing_if = "Option::is_none")]
367 description: Option<String>,
368 #[serde(rename = "if", default, skip_serializing_if = "Option::is_none")]
369 if_field: Option<String>,
370 #[serde(default, skip_serializing_if = "Option::is_none")]
371 ignore_failure: Option<bool>,
372 #[serde(default, skip_serializing_if = "Vec::is_empty")]
373 on_failure: Vec<Processor>,
374 #[serde(default, skip_serializing_if = "Option::is_none")]
375 tag: Option<String>,
376}
377
378#[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq, Eq)]
379#[serde(rename_all = "snake_case")]
380pub struct FailProcessor {
381 message: String,
382 #[serde(default, skip_serializing_if = "Option::is_none")]
383 description: Option<String>,
384 #[serde(rename = "if", default, skip_serializing_if = "Option::is_none")]
385 if_field: Option<String>,
386 #[serde(default, skip_serializing_if = "Option::is_none")]
387 ignore_failure: Option<bool>,
388 #[serde(default, skip_serializing_if = "Vec::is_empty")]
389 on_failure: Vec<Processor>,
390 #[serde(default, skip_serializing_if = "Option::is_none")]
391 tag: Option<String>,
392}
393
394#[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq, Eq)]
395#[serde(rename_all = "snake_case")]
396pub struct SortProcessor {
397 field: String,
398 #[serde(default, skip_serializing_if = "Option::is_none")]
401 target_field: Option<String>,
402 #[serde(default, skip_serializing_if = "Option::is_none")]
403 description: Option<String>,
404 #[serde(rename = "if", default, skip_serializing_if = "Option::is_none")]
405 if_field: Option<String>,
406 #[serde(default, skip_serializing_if = "Option::is_none")]
407 ignore_failure: Option<bool>,
408 #[serde(default, skip_serializing_if = "Vec::is_empty")]
409 on_failure: Vec<Processor>,
410 #[serde(default, skip_serializing_if = "Option::is_none")]
411 tag: Option<String>,
412}
413
414#[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq, Eq)]
437#[serde(rename_all = "snake_case")]
438pub struct TrimProcessor {
439 field: String,
440 #[serde(default, skip_serializing_if = "Option::is_none")]
441 target_field: Option<String>,
442 #[serde(default, skip_serializing_if = "Option::is_none")]
443 ignore_missing: Option<bool>,
444 #[serde(default, skip_serializing_if = "Option::is_none")]
445 description: Option<String>,
446 #[serde(rename = "if", default, skip_serializing_if = "Option::is_none")]
447 if_field: Option<String>,
448 #[serde(default, skip_serializing_if = "Option::is_none")]
449 ignore_failure: Option<bool>,
450 #[serde(default, skip_serializing_if = "Vec::is_empty")]
451 on_failure: Vec<Processor>,
452 #[serde(default, skip_serializing_if = "Option::is_none")]
453 tag: Option<String>,
454}
455
456#[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq, Eq)]
457#[serde(rename_all = "snake_case")]
458pub struct ScriptProcessor {
459 #[serde(default, skip_serializing_if = "Option::is_none")]
460 id: Option<String>,
461 #[serde(default, skip_serializing_if = "Option::is_none")]
462 lang: Option<String>,
463 #[serde(default, skip_serializing_if = "Option::is_none")]
464 params: Option<std::collections::HashMap<String, serde_json::Value>>,
465 #[serde(default, skip_serializing_if = "Option::is_none")]
466 source: Option<String>,
467 #[serde(default, skip_serializing_if = "Option::is_none")]
468 description: Option<String>,
469 #[serde(rename = "if", default, skip_serializing_if = "Option::is_none")]
470 if_field: Option<String>,
471 #[serde(default, skip_serializing_if = "Option::is_none")]
472 ignore_failure: Option<bool>,
473 #[serde(default, skip_serializing_if = "Vec::is_empty")]
474 on_failure: Vec<Processor>,
475 #[serde(default, skip_serializing_if = "Option::is_none")]
476 tag: Option<String>,
477}
478
479#[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq, Eq)]
480#[serde(rename_all = "snake_case")]
481pub struct JsonProcessor {
482 field: String,
483 #[serde(default, skip_serializing_if = "Option::is_none")]
484 target_field: Option<String>,
485 #[serde(default, skip_serializing_if = "Option::is_none")]
486 add_to_root: Option<bool>,
487 #[serde(default, skip_serializing_if = "Option::is_none")]
490 allow_duplicate_keys: Option<bool>,
491 #[serde(default, skip_serializing_if = "Option::is_none")]
492 description: Option<String>,
493 #[serde(rename = "if", default, skip_serializing_if = "Option::is_none")]
494 if_field: Option<String>,
495 #[serde(default, skip_serializing_if = "Option::is_none")]
496 ignore_failure: Option<bool>,
497 #[serde(default, skip_serializing_if = "Vec::is_empty")]
498 on_failure: Vec<Processor>,
499 #[serde(default, skip_serializing_if = "Option::is_none")]
500 tag: Option<String>,
501}
502
503#[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq, Eq)]
504pub struct UppercaseProcessor {
505 field: String,
506 #[serde(default, skip_serializing_if = "Option::is_none")]
507 target_field: Option<String>,
508 #[serde(default, skip_serializing_if = "Option::is_none")]
509 ignore_missing: Option<bool>,
510 #[serde(default, skip_serializing_if = "Option::is_none")]
511 description: Option<String>,
512 #[serde(rename = "if", default, skip_serializing_if = "Option::is_none")]
513 if_field: Option<String>,
514 #[serde(default, skip_serializing_if = "Option::is_none")]
515 ignore_failure: Option<bool>,
516 #[serde(default, skip_serializing_if = "Vec::is_empty")]
517 on_failure: Vec<Processor>,
518 #[serde(default, skip_serializing_if = "Option::is_none")]
519 tag: Option<String>,
520}
521
522#[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq, Eq)]
523pub struct DateProcessor {
524 field: String,
525 formats: Vec<String>,
526 #[serde(default, skip_serializing_if = "Option::is_none")]
527 locale: Option<String>,
528 #[serde(default, skip_serializing_if = "Option::is_none")]
529 target_field: Option<String>,
530 #[serde(default, skip_serializing_if = "Option::is_none")]
531 timezone: Option<String>,
532 #[serde(default, skip_serializing_if = "Option::is_none")]
533 description: Option<String>,
534 #[serde(rename = "if", default, skip_serializing_if = "Option::is_none")]
535 if_field: Option<String>,
536 #[serde(default, skip_serializing_if = "Option::is_none")]
537 ignore_failure: Option<bool>,
538 #[serde(default, skip_serializing_if = "Vec::is_empty")]
539 on_failure: Vec<Processor>,
540 #[serde(default, skip_serializing_if = "Option::is_none")]
541 tag: Option<String>,
542}
543
544#[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq, Eq)]
545pub struct DotExpanderProcessor {
546 field: String,
547 #[serde(default, skip_serializing_if = "Option::is_none")]
548 path: Option<String>,
549 #[serde(default, skip_serializing_if = "Option::is_none")]
550 description: Option<String>,
551 #[serde(rename = "if", default, skip_serializing_if = "Option::is_none")]
552 if_field: Option<String>,
553 #[serde(default, skip_serializing_if = "Option::is_none")]
554 ignore_failure: Option<bool>,
555 #[serde(default, skip_serializing_if = "Vec::is_empty")]
556 on_failure: Vec<Processor>,
557 #[serde(default, skip_serializing_if = "Option::is_none")]
558 tag: Option<String>,
559}
560
561#[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq, Eq)]
562pub struct LowercaseProcessor {
563 field: String,
564 #[serde(default, skip_serializing_if = "Option::is_none")]
565 ignore_missing: Option<bool>,
566 #[serde(default, skip_serializing_if = "Option::is_none")]
567 target_field: Option<String>,
568 #[serde(default, skip_serializing_if = "Option::is_none")]
569 description: Option<String>,
570 #[serde(rename = "if", default, skip_serializing_if = "Option::is_none")]
571 if_field: Option<String>,
572 #[serde(default, skip_serializing_if = "Option::is_none")]
573 ignore_failure: Option<bool>,
574 #[serde(default, skip_serializing_if = "Vec::is_empty")]
575 on_failure: Vec<Processor>,
576 #[serde(default, skip_serializing_if = "Option::is_none")]
577 tag: Option<String>,
578}
579
580#[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq, Eq)]
581pub struct SetProcessor {
582 field: String,
583 #[serde(default, skip_serializing_if = "Option::is_none")]
584 copy_from: Option<String>,
585 #[serde(default, skip_serializing_if = "Option::is_none")]
586 ignore_empty_value: Option<bool>,
587 #[serde(default, skip_serializing_if = "Option::is_none")]
588 media_type: Option<String>,
589 #[serde(default, skip_serializing_if = "Option::is_none")]
590 override_field: Option<bool>,
591 #[serde(default, skip_serializing_if = "Option::is_none")]
592 value: Option<serde_json::Value>,
593 #[serde(default, skip_serializing_if = "Option::is_none")]
594 description: Option<String>,
595 #[serde(rename = "if", default, skip_serializing_if = "Option::is_none")]
596 if_field: Option<String>,
597 #[serde(default, skip_serializing_if = "Option::is_none")]
598 ignore_failure: Option<bool>,
599 #[serde(default, skip_serializing_if = "Vec::is_empty")]
600 on_failure: Vec<Processor>,
601 #[serde(default, skip_serializing_if = "Option::is_none")]
602 tag: Option<String>,
603}
604
605#[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq, Eq)]
606pub struct GrokProcessor {
607 field: String,
608 patterns: Vec<String>,
609 #[serde(default, skip_serializing_if = "HashMap::is_empty")]
610 pattern_definitions: HashMap<String, String>,
611 #[serde(default, skip_serializing_if = "Option::is_none")]
612 ignore_missing: Option<bool>,
613 #[serde(default, skip_serializing_if = "Option::is_none")]
614 trace_match: Option<bool>,
615 #[serde(default, skip_serializing_if = "Option::is_none")]
616 description: Option<String>,
617 #[serde(rename = "if", default, skip_serializing_if = "Option::is_none")]
618 if_field: Option<String>,
619 #[serde(default, skip_serializing_if = "Option::is_none")]
620 ignore_failure: Option<bool>,
621 #[serde(default, skip_serializing_if = "Vec::is_empty")]
622 on_failure: Vec<Processor>,
623 #[serde(default, skip_serializing_if = "Option::is_none")]
624 tag: Option<String>,
625}
626
627#[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq, Eq)]
628pub struct GsubProcessor {
629 field: String,
630 pattern: String,
631 replacement: String,
632 #[serde(default, skip_serializing_if = "Option::is_none")]
633 ignore_missing: Option<bool>,
634 #[serde(default, skip_serializing_if = "Option::is_none")]
635 target_field: Option<String>,
636 #[serde(default, skip_serializing_if = "Option::is_none")]
637 description: Option<String>,
638 #[serde(rename = "if", default, skip_serializing_if = "Option::is_none")]
639 if_field: Option<String>,
640 #[serde(default, skip_serializing_if = "Option::is_none")]
641 ignore_failure: Option<bool>,
642 #[serde(default, skip_serializing_if = "Vec::is_empty")]
643 on_failure: Vec<Processor>,
644 #[serde(default, skip_serializing_if = "Option::is_none")]
645 tag: Option<String>,
646}
647
648#[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq, Eq)]
649pub struct ConvertProcessor {
650 field: String,
651 type_field: String,
652 #[serde(default, skip_serializing_if = "Option::is_none")]
653 ignore_missing: Option<bool>,
654 #[serde(default, skip_serializing_if = "Option::is_none")]
655 target_field: Option<String>,
656 #[serde(default, skip_serializing_if = "Option::is_none")]
657 description: Option<String>,
658 #[serde(rename = "if", default, skip_serializing_if = "Option::is_none")]
659 if_field: Option<String>,
660 #[serde(default, skip_serializing_if = "Option::is_none")]
661 ignore_failure: Option<bool>,
662 #[serde(default, skip_serializing_if = "Vec::is_empty")]
663 on_failure: Vec<Processor>,
664 #[serde(default, skip_serializing_if = "Option::is_none")]
665 tag: Option<String>,
666}
667
668#[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq, Eq)]
669pub struct GeoIpProcessor {
670 field: String,
671 #[serde(default, skip_serializing_if = "Vec::is_empty")]
672 properties: Vec<String>,
673 #[serde(default, skip_serializing_if = "Option::is_none")]
674 database_file: Option<String>,
675 #[serde(default, skip_serializing_if = "Option::is_none")]
676 first_only: Option<bool>,
677 #[serde(default, skip_serializing_if = "Option::is_none")]
678 ignore_missing: Option<bool>,
679 #[serde(default, skip_serializing_if = "Option::is_none")]
680 target_field: Option<String>,
681 #[serde(default, skip_serializing_if = "Option::is_none")]
682 description: Option<String>,
683 #[serde(rename = "if", default, skip_serializing_if = "Option::is_none")]
684 if_field: Option<String>,
685 #[serde(default, skip_serializing_if = "Option::is_none")]
686 ignore_failure: Option<bool>,
687 #[serde(default, skip_serializing_if = "Vec::is_empty")]
688 on_failure: Vec<Processor>,
689 #[serde(default, skip_serializing_if = "Option::is_none")]
690 tag: Option<String>,
691}
692
693#[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq, Eq)]
694pub struct BytesProcessor {
695 field: String,
696 #[serde(default, skip_serializing_if = "Option::is_none")]
697 target_field: Option<String>,
698 #[serde(default, skip_serializing_if = "Option::is_none")]
699 ignore_missing: Option<bool>,
700 #[serde(default, skip_serializing_if = "Option::is_none")]
701 description: Option<String>,
702 #[serde(rename = "if", default, skip_serializing_if = "Option::is_none")]
703 if_field: Option<String>,
704 #[serde(default, skip_serializing_if = "Option::is_none")]
705 ignore_failure: Option<bool>,
706 #[serde(default, skip_serializing_if = "Vec::is_empty")]
707 on_failure: Vec<Processor>,
708 #[serde(default, skip_serializing_if = "Option::is_none")]
709 tag: Option<String>,
710}
711
712#[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq, Eq)]
713pub struct InferenceProcessor {
714 model_id: String,
715 #[serde(default, skip_serializing_if = "Option::is_none")]
716 target_field: Option<String>,
717 #[serde(default, skip_serializing_if = "HashMap::is_empty")]
718 field_map: HashMap<String, serde_json::Value>,
719 #[serde(default, skip_serializing_if = "HashMap::is_empty")]
720 inference_config: HashMap<String, serde_json::Value>,
721 #[serde(default, skip_serializing_if = "Option::is_none")]
722 description: Option<String>,
723 #[serde(rename = "if", default, skip_serializing_if = "Option::is_none")]
724 if_field: Option<String>,
725 #[serde(default, skip_serializing_if = "Option::is_none")]
726 ignore_failure: Option<bool>,
727 #[serde(default, skip_serializing_if = "Vec::is_empty")]
728 on_failure: Vec<Processor>,
729 #[serde(default, skip_serializing_if = "Option::is_none")]
730 tag: Option<String>,
731}
732
733#[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq, Eq)]
734pub struct RenameProcessor {
735 field: String,
736 target_field: String,
737 #[serde(default, skip_serializing_if = "Option::is_none")]
738 ignore_missing: Option<bool>,
739 #[serde(default, skip_serializing_if = "Option::is_none")]
740 description: Option<String>,
741 #[serde(rename = "if", default, skip_serializing_if = "Option::is_none")]
742 if_field: Option<String>,
743 #[serde(default, skip_serializing_if = "Option::is_none")]
744 ignore_failure: Option<bool>,
745 #[serde(default, skip_serializing_if = "Vec::is_empty")]
746 on_failure: Vec<Processor>,
747 #[serde(default, skip_serializing_if = "Option::is_none")]
748 tag: Option<String>,
749}
750
751#[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq, Eq)]
752pub struct AppendProcessor {
753 field: String,
754 value: Vec<serde_json::Value>,
755 #[serde(default, skip_serializing_if = "Option::is_none")]
756 allow_duplicates: Option<bool>,
757 #[serde(default, skip_serializing_if = "Option::is_none")]
758 description: Option<String>,
759 #[serde(rename = "if", default, skip_serializing_if = "Option::is_none")]
760 if_field: Option<String>,
761 #[serde(default, skip_serializing_if = "Option::is_none")]
762 ignore_failure: Option<bool>,
763 #[serde(default, skip_serializing_if = "Vec::is_empty")]
764 on_failure: Vec<Processor>,
765 #[serde(default, skip_serializing_if = "Option::is_none")]
766 tag: Option<String>,
767}
768
769#[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq, Eq)]
770pub struct DateIndexNameProcessor {
771 field: String,
772 date_rounding: String,
773 #[serde(default, skip_serializing_if = "Vec::is_empty")]
774 date_formats: Vec<String>,
775 #[serde(default, skip_serializing_if = "Option::is_none")]
776 index_name_format: Option<String>,
777 #[serde(default, skip_serializing_if = "Option::is_none")]
778 index_name_prefix: Option<String>,
779 #[serde(default, skip_serializing_if = "Option::is_none")]
780 locale: Option<String>,
781 #[serde(default, skip_serializing_if = "Option::is_none")]
782 timezone: Option<String>,
783 #[serde(default, skip_serializing_if = "Option::is_none")]
784 description: Option<String>,
785 #[serde(rename = "if", default, skip_serializing_if = "Option::is_none")]
786 if_field: Option<String>,
787 #[serde(default, skip_serializing_if = "Option::is_none")]
788 ignore_failure: Option<bool>,
789 #[serde(default, skip_serializing_if = "Vec::is_empty")]
790 on_failure: Vec<Processor>,
791 #[serde(default, skip_serializing_if = "Option::is_none")]
792 tag: Option<String>,
793}
794
795#[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq, Eq)]
796pub struct DropProcessor {
797 #[serde(default, skip_serializing_if = "Option::is_none")]
798 description: Option<String>,
799 #[serde(rename = "if", default, skip_serializing_if = "Option::is_none")]
800 if_field: Option<String>,
801 #[serde(default, skip_serializing_if = "Option::is_none")]
802 ignore_failure: Option<bool>,
803 #[serde(default, skip_serializing_if = "Vec::is_empty")]
804 on_failure: Vec<Processor>,
805 #[serde(default, skip_serializing_if = "Option::is_none")]
806 tag: Option<String>,
807}
808
809#[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq, Eq)]
810pub struct SparseEncodingProcessor {
811 model_id: String,
812 field_map: HashMap<String, String>,
813 #[serde(default, skip_serializing_if = "Option::is_none")]
814 description: Option<String>,
815 #[serde(default, skip_serializing_if = "Option::is_none")]
816 tag: Option<String>,
817}
818
819#[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq, Eq)]
820pub struct TextEmbeddingProcessor {
821 model_id: String,
822 field_map: HashMap<String, String>,
823 #[serde(default, skip_serializing_if = "Option::is_none")]
824 description: Option<String>,
825 #[serde(default, skip_serializing_if = "Option::is_none")]
826 tag: Option<String>,
827}
828
829#[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq, Eq)]
830pub struct TextImageEmbedding {
831 #[serde(default, skip_serializing_if = "Option::is_none")]
832 text: Option<String>,
833 #[serde(default, skip_serializing_if = "Option::is_none")]
834 image: Option<String>,
835}
836
837#[derive(Clone, Debug, Default, Deserialize, Serialize, PartialEq, Eq)]
838pub struct TextImageEmbeddingProcessor {
839 model_id: String,
840 embedding: String,
841 field_map: TextImageEmbedding,
842 #[serde(default, skip_serializing_if = "Option::is_none")]
843 description: Option<String>,
844 #[serde(default, skip_serializing_if = "Option::is_none")]
845 tag: Option<String>,
846}
847#[cfg(test)]
848mod tests {
849
850 use std::{default, path::PathBuf};
851
852 use serde::de::DeserializeOwned;
853
854 use super::*;
855 fn load_entity<T: DeserializeOwned>(name: &str) -> T {
856 let filename = PathBuf::from(env!("CARGO_MANIFEST_DIR"))
857 .join(format!("tests/ingest/pipeline.{name}.json"));
858 let text = std::fs::read_to_string(filename).unwrap();
859 serde_json::from_str(&text).unwrap()
860 }
861
862 #[test]
863 fn test_ml_processors() {
864 let decoded: Pipeline = load_entity("ml");
865 let expected = Pipeline {
866 description: Some("A ml pipeline".to_owned()),
867 processors: vec![
868 Processor::SparseEncodingProcessor(SparseEncodingProcessor {
869 model_id: "aP2Q8ooBpBj3wT4HVS8a".to_owned(),
870 field_map: vec![("passage_text".to_owned(), "passage_embedding".to_owned())]
871 .into_iter()
872 .collect(),
873 ..default::Default::default()
874 }),
875 Processor::TextEmbeddingProcessor(TextEmbeddingProcessor {
876 model_id: "bQ1J8ooBpBj3wT4HVUsb".to_owned(),
877 field_map: vec![("passage_text".to_owned(), "passage_embedding".to_owned())]
878 .into_iter()
879 .collect(),
880 ..default::Default::default()
881 }),
882 Processor::TextImageEmbeddingProcessor(TextImageEmbeddingProcessor {
883 model_id: "bQ1J8ooBpBj3wT4HVUsb".to_owned(),
884 embedding: "vector_embedding".to_owned(),
885 field_map: TextImageEmbedding {
886 text: Some("image_description".to_owned()),
887 image: Some("image_binary".to_owned()),
888 },
889 ..default::Default::default()
890 }),
891 ],
892 ..Default::default()
893 };
894 assert_eq!(decoded.description, Some("A ml pipeline".to_owned()));
896 assert_eq!(decoded, expected);
897 }
898}