v1_logs_pipelines_CreateLogsPipeline_501419705/
v1_logs-pipelines_CreateLogsPipeline_501419705.rs

1// Create a pipeline with schema processor
2use datadog_api_client::datadog;
3use datadog_api_client::datadogV1::api_logs_pipelines::LogsPipelinesAPI;
4use datadog_api_client::datadogV1::model::LogsFilter;
5use datadog_api_client::datadogV1::model::LogsPipeline;
6use datadog_api_client::datadogV1::model::LogsProcessor;
7use datadog_api_client::datadogV1::model::LogsSchemaCategoryMapper;
8use datadog_api_client::datadogV1::model::LogsSchemaCategoryMapperCategory;
9use datadog_api_client::datadogV1::model::LogsSchemaCategoryMapperFallback;
10use datadog_api_client::datadogV1::model::LogsSchemaCategoryMapperTargets;
11use datadog_api_client::datadogV1::model::LogsSchemaCategoryMapperType;
12use datadog_api_client::datadogV1::model::LogsSchemaData;
13use datadog_api_client::datadogV1::model::LogsSchemaMapper;
14use datadog_api_client::datadogV1::model::LogsSchemaProcessor;
15use datadog_api_client::datadogV1::model::LogsSchemaProcessorType;
16use datadog_api_client::datadogV1::model::LogsSchemaRemapper;
17use datadog_api_client::datadogV1::model::LogsSchemaRemapperType;
18use std::collections::BTreeMap;
19
20#[tokio::main]
21async fn main() {
22    let body = LogsPipeline::new("testSchemaProcessor".to_string())
23        .filter(LogsFilter::new().query("source:python".to_string()))
24        .processors(vec![LogsProcessor::LogsSchemaProcessor(Box::new(
25            LogsSchemaProcessor::new(
26                vec![
27                    LogsSchemaMapper::LogsSchemaCategoryMapper(Box::new(
28                        LogsSchemaCategoryMapper::new(
29                            vec![
30                                LogsSchemaCategoryMapperCategory::new(
31                                    LogsFilter::new().query("@eventName:(*Create*)".to_string()),
32                                    1,
33                                    "Create".to_string(),
34                                ),
35                                LogsSchemaCategoryMapperCategory::new(
36                                    LogsFilter::new().query(
37                                        "@eventName:(ChangePassword OR PasswordUpdated)"
38                                            .to_string(),
39                                    ),
40                                    3,
41                                    "Password Change".to_string(),
42                                ),
43                                LogsSchemaCategoryMapperCategory::new(
44                                    LogsFilter::new().query("@eventName:(*Attach*)".to_string()),
45                                    7,
46                                    "Attach Policy".to_string(),
47                                ),
48                                LogsSchemaCategoryMapperCategory::new(
49                                    LogsFilter::new()
50                                        .query("@eventName:(*Detach* OR *Remove*)".to_string()),
51                                    8,
52                                    "Detach Policy".to_string(),
53                                ),
54                                LogsSchemaCategoryMapperCategory::new(
55                                    LogsFilter::new().query("@eventName:(*Delete*)".to_string()),
56                                    6,
57                                    "Delete".to_string(),
58                                ),
59                                LogsSchemaCategoryMapperCategory::new(
60                                    LogsFilter::new().query("@eventName:*".to_string()),
61                                    99,
62                                    "Other".to_string(),
63                                ),
64                            ],
65                            "activity_id and activity_name".to_string(),
66                            LogsSchemaCategoryMapperTargets::new()
67                                .id("ocsf.activity_id".to_string())
68                                .name("ocsf.activity_name".to_string()),
69                            LogsSchemaCategoryMapperType::SCHEMA_CATEGORY_MAPPER,
70                        )
71                        .fallback(
72                            LogsSchemaCategoryMapperFallback::new()
73                                .sources(BTreeMap::from([(
74                                    "ocsf.activity_name".to_string(),
75                                    vec!["eventName".to_string()],
76                                )]))
77                                .values(BTreeMap::from([
78                                    ("ocsf.activity_id".to_string(), "99".to_string()),
79                                    ("ocsf.activity_name".to_string(), "Other".to_string()),
80                                ])),
81                        ),
82                    )),
83                    LogsSchemaMapper::LogsSchemaCategoryMapper(Box::new(
84                        LogsSchemaCategoryMapper::new(
85                            vec![
86                                LogsSchemaCategoryMapperCategory::new(
87                                    LogsFilter::new().query("-@errorCode:*".to_string()),
88                                    1,
89                                    "Success".to_string(),
90                                ),
91                                LogsSchemaCategoryMapperCategory::new(
92                                    LogsFilter::new().query("@errorCode:*".to_string()),
93                                    2,
94                                    "Failure".to_string(),
95                                ),
96                            ],
97                            "status".to_string(),
98                            LogsSchemaCategoryMapperTargets::new()
99                                .id("ocsf.status_id".to_string())
100                                .name("ocsf.status".to_string()),
101                            LogsSchemaCategoryMapperType::SCHEMA_CATEGORY_MAPPER,
102                        ),
103                    )),
104                    LogsSchemaMapper::LogsSchemaCategoryMapper(Box::new(
105                        LogsSchemaCategoryMapper::new(
106                            vec![LogsSchemaCategoryMapperCategory::new(
107                                LogsFilter::new().query("@eventName:*".to_string()),
108                                1,
109                                "Informational".to_string(),
110                            )],
111                            "Set default severity".to_string(),
112                            LogsSchemaCategoryMapperTargets::new()
113                                .id("ocsf.severity_id".to_string())
114                                .name("ocsf.severity".to_string()),
115                            LogsSchemaCategoryMapperType::SCHEMA_CATEGORY_MAPPER,
116                        ),
117                    )),
118                    LogsSchemaMapper::LogsSchemaRemapper(Box::new(LogsSchemaRemapper::new(
119                        "Map userIdentity to ocsf.user.uid".to_string(),
120                        vec![
121                            "userIdentity.principalId".to_string(),
122                            "responseElements.role.roleId".to_string(),
123                            "responseElements.user.userId".to_string(),
124                        ],
125                        "ocsf.user.uid".to_string(),
126                        LogsSchemaRemapperType::SCHEMA_REMAPPER,
127                    ))),
128                    LogsSchemaMapper::LogsSchemaRemapper(Box::new(LogsSchemaRemapper::new(
129                        "Map userName to ocsf.user.name".to_string(),
130                        vec![
131                            "requestParameters.userName".to_string(),
132                            "responseElements.role.roleName".to_string(),
133                            "requestParameters.roleName".to_string(),
134                            "responseElements.user.userName".to_string(),
135                        ],
136                        "ocsf.user.name".to_string(),
137                        LogsSchemaRemapperType::SCHEMA_REMAPPER,
138                    ))),
139                    LogsSchemaMapper::LogsSchemaRemapper(Box::new(LogsSchemaRemapper::new(
140                        "Map api to ocsf.api".to_string(),
141                        vec!["api".to_string()],
142                        "ocsf.api".to_string(),
143                        LogsSchemaRemapperType::SCHEMA_REMAPPER,
144                    ))),
145                    LogsSchemaMapper::LogsSchemaRemapper(Box::new(LogsSchemaRemapper::new(
146                        "Map user to ocsf.user".to_string(),
147                        vec!["user".to_string()],
148                        "ocsf.user".to_string(),
149                        LogsSchemaRemapperType::SCHEMA_REMAPPER,
150                    ))),
151                    LogsSchemaMapper::LogsSchemaRemapper(Box::new(LogsSchemaRemapper::new(
152                        "Map actor to ocsf.actor".to_string(),
153                        vec!["actor".to_string()],
154                        "ocsf.actor".to_string(),
155                        LogsSchemaRemapperType::SCHEMA_REMAPPER,
156                    ))),
157                    LogsSchemaMapper::LogsSchemaRemapper(Box::new(LogsSchemaRemapper::new(
158                        "Map cloud to ocsf.cloud".to_string(),
159                        vec!["cloud".to_string()],
160                        "ocsf.cloud".to_string(),
161                        LogsSchemaRemapperType::SCHEMA_REMAPPER,
162                    ))),
163                    LogsSchemaMapper::LogsSchemaRemapper(Box::new(LogsSchemaRemapper::new(
164                        "Map http_request to ocsf.http_request".to_string(),
165                        vec!["http_request".to_string()],
166                        "ocsf.http_request".to_string(),
167                        LogsSchemaRemapperType::SCHEMA_REMAPPER,
168                    ))),
169                    LogsSchemaMapper::LogsSchemaRemapper(Box::new(LogsSchemaRemapper::new(
170                        "Map metadata to ocsf.metadata".to_string(),
171                        vec!["metadata".to_string()],
172                        "ocsf.metadata".to_string(),
173                        LogsSchemaRemapperType::SCHEMA_REMAPPER,
174                    ))),
175                    LogsSchemaMapper::LogsSchemaRemapper(Box::new(LogsSchemaRemapper::new(
176                        "Map time to ocsf.time".to_string(),
177                        vec!["time".to_string()],
178                        "ocsf.time".to_string(),
179                        LogsSchemaRemapperType::SCHEMA_REMAPPER,
180                    ))),
181                    LogsSchemaMapper::LogsSchemaRemapper(Box::new(LogsSchemaRemapper::new(
182                        "Map src_endpoint to ocsf.src_endpoint".to_string(),
183                        vec!["src_endpoint".to_string()],
184                        "ocsf.src_endpoint".to_string(),
185                        LogsSchemaRemapperType::SCHEMA_REMAPPER,
186                    ))),
187                    LogsSchemaMapper::LogsSchemaRemapper(Box::new(LogsSchemaRemapper::new(
188                        "Map severity to ocsf.severity".to_string(),
189                        vec!["severity".to_string()],
190                        "ocsf.severity".to_string(),
191                        LogsSchemaRemapperType::SCHEMA_REMAPPER,
192                    ))),
193                    LogsSchemaMapper::LogsSchemaRemapper(Box::new(LogsSchemaRemapper::new(
194                        "Map severity_id to ocsf.severity_id".to_string(),
195                        vec!["severity_id".to_string()],
196                        "ocsf.severity_id".to_string(),
197                        LogsSchemaRemapperType::SCHEMA_REMAPPER,
198                    ))),
199                ],
200                "Apply OCSF schema for 3001".to_string(),
201                LogsSchemaData::new(
202                    "Account Change".to_string(),
203                    3001,
204                    "ocsf".to_string(),
205                    "1.5.0".to_string(),
206                )
207                .profiles(vec!["cloud".to_string(), "datetime".to_string()]),
208                LogsSchemaProcessorType::SCHEMA_PROCESSOR,
209            )
210            .is_enabled(true),
211        ))])
212        .tags(vec![]);
213    let configuration = datadog::Configuration::new();
214    let api = LogsPipelinesAPI::with_config(configuration);
215    let resp = api.create_logs_pipeline(body).await;
216    if let Ok(value) = resp {
217        println!("{:#?}", value);
218    } else {
219        println!("{:#?}", resp.unwrap_err());
220    }
221}