v1_logs_pipelines_CreateLogsPipeline_2256674867/
v1_logs-pipelines_CreateLogsPipeline_2256674867.rs1use datadog_api_client::datadog;
4use datadog_api_client::datadogV1::api_logs_pipelines::LogsPipelinesAPI;
5use datadog_api_client::datadogV1::model::LogsFilter;
6use datadog_api_client::datadogV1::model::LogsPipeline;
7use datadog_api_client::datadogV1::model::LogsProcessor;
8use datadog_api_client::datadogV1::model::LogsSchemaCategoryMapper;
9use datadog_api_client::datadogV1::model::LogsSchemaCategoryMapperCategory;
10use datadog_api_client::datadogV1::model::LogsSchemaCategoryMapperFallback;
11use datadog_api_client::datadogV1::model::LogsSchemaCategoryMapperTargets;
12use datadog_api_client::datadogV1::model::LogsSchemaCategoryMapperType;
13use datadog_api_client::datadogV1::model::LogsSchemaData;
14use datadog_api_client::datadogV1::model::LogsSchemaMapper;
15use datadog_api_client::datadogV1::model::LogsSchemaProcessor;
16use datadog_api_client::datadogV1::model::LogsSchemaProcessorType;
17use datadog_api_client::datadogV1::model::LogsSchemaRemapper;
18use datadog_api_client::datadogV1::model::LogsSchemaRemapperType;
19use std::collections::BTreeMap;
20
21#[tokio::main]
22async fn main() {
23 let body = LogsPipeline::new("testSchemaProcessor".to_string())
24 .filter(LogsFilter::new().query("source:python".to_string()))
25 .processors(vec![LogsProcessor::LogsSchemaProcessor(Box::new(
26 LogsSchemaProcessor::new(
27 vec![
28 LogsSchemaMapper::LogsSchemaCategoryMapper(Box::new(
29 LogsSchemaCategoryMapper::new(
30 vec![
31 LogsSchemaCategoryMapperCategory::new(
32 LogsFilter::new().query("@eventName:(*Create*)".to_string()),
33 1,
34 "Create".to_string(),
35 ),
36 LogsSchemaCategoryMapperCategory::new(
37 LogsFilter::new().query(
38 "@eventName:(ChangePassword OR PasswordUpdated)"
39 .to_string(),
40 ),
41 3,
42 "Password Change".to_string(),
43 ),
44 LogsSchemaCategoryMapperCategory::new(
45 LogsFilter::new().query("@eventName:(*Attach*)".to_string()),
46 7,
47 "Attach Policy".to_string(),
48 ),
49 LogsSchemaCategoryMapperCategory::new(
50 LogsFilter::new()
51 .query("@eventName:(*Detach* OR *Remove*)".to_string()),
52 8,
53 "Detach Policy".to_string(),
54 ),
55 LogsSchemaCategoryMapperCategory::new(
56 LogsFilter::new().query("@eventName:(*Delete*)".to_string()),
57 6,
58 "Delete".to_string(),
59 ),
60 LogsSchemaCategoryMapperCategory::new(
61 LogsFilter::new().query("@eventName:*".to_string()),
62 99,
63 "Other".to_string(),
64 ),
65 ],
66 "activity_id and activity_name".to_string(),
67 LogsSchemaCategoryMapperTargets::new()
68 .id("ocsf.activity_id".to_string())
69 .name("ocsf.activity_name".to_string()),
70 LogsSchemaCategoryMapperType::SCHEMA_CATEGORY_MAPPER,
71 )
72 .fallback(
73 LogsSchemaCategoryMapperFallback::new()
74 .sources(BTreeMap::from([(
75 "ocsf.activity_name".to_string(),
76 vec!["eventName".to_string()],
77 )]))
78 .values(BTreeMap::from([
79 ("ocsf.activity_id".to_string(), "99".to_string()),
80 ("ocsf.activity_name".to_string(), "Other".to_string()),
81 ])),
82 ),
83 )),
84 LogsSchemaMapper::LogsSchemaCategoryMapper(Box::new(
85 LogsSchemaCategoryMapper::new(
86 vec![
87 LogsSchemaCategoryMapperCategory::new(
88 LogsFilter::new().query("-@errorCode:*".to_string()),
89 1,
90 "Success".to_string(),
91 ),
92 LogsSchemaCategoryMapperCategory::new(
93 LogsFilter::new().query("@errorCode:*".to_string()),
94 2,
95 "Failure".to_string(),
96 ),
97 ],
98 "status".to_string(),
99 LogsSchemaCategoryMapperTargets::new()
100 .id("ocsf.status_id".to_string())
101 .name("ocsf.status".to_string()),
102 LogsSchemaCategoryMapperType::SCHEMA_CATEGORY_MAPPER,
103 ),
104 )),
105 LogsSchemaMapper::LogsSchemaCategoryMapper(Box::new(
106 LogsSchemaCategoryMapper::new(
107 vec![LogsSchemaCategoryMapperCategory::new(
108 LogsFilter::new().query("@eventName:*".to_string()),
109 1,
110 "Informational".to_string(),
111 )],
112 "Set default severity".to_string(),
113 LogsSchemaCategoryMapperTargets::new()
114 .id("ocsf.severity_id".to_string())
115 .name("ocsf.severity".to_string()),
116 LogsSchemaCategoryMapperType::SCHEMA_CATEGORY_MAPPER,
117 ),
118 )),
119 LogsSchemaMapper::LogsSchemaRemapper(Box::new(
120 LogsSchemaRemapper::new(
121 "Map userIdentity to ocsf.user.uid".to_string(),
122 vec![
123 "userIdentity.principalId".to_string(),
124 "responseElements.role.roleId".to_string(),
125 "responseElements.user.userId".to_string(),
126 ],
127 "ocsf.user.uid".to_string(),
128 LogsSchemaRemapperType::SCHEMA_REMAPPER,
129 )
130 .preserve_source(false),
131 )),
132 LogsSchemaMapper::LogsSchemaRemapper(Box::new(
133 LogsSchemaRemapper::new(
134 "Map userName to ocsf.user.name".to_string(),
135 vec![
136 "requestParameters.userName".to_string(),
137 "responseElements.role.roleName".to_string(),
138 "requestParameters.roleName".to_string(),
139 "responseElements.user.userName".to_string(),
140 ],
141 "ocsf.user.name".to_string(),
142 LogsSchemaRemapperType::SCHEMA_REMAPPER,
143 )
144 .preserve_source(false),
145 )),
146 LogsSchemaMapper::LogsSchemaRemapper(Box::new(
147 LogsSchemaRemapper::new(
148 "Map api to ocsf.api".to_string(),
149 vec!["api".to_string()],
150 "ocsf.api".to_string(),
151 LogsSchemaRemapperType::SCHEMA_REMAPPER,
152 )
153 .preserve_source(false),
154 )),
155 LogsSchemaMapper::LogsSchemaRemapper(Box::new(
156 LogsSchemaRemapper::new(
157 "Map user to ocsf.user".to_string(),
158 vec!["user".to_string()],
159 "ocsf.user".to_string(),
160 LogsSchemaRemapperType::SCHEMA_REMAPPER,
161 )
162 .preserve_source(false),
163 )),
164 LogsSchemaMapper::LogsSchemaRemapper(Box::new(
165 LogsSchemaRemapper::new(
166 "Map actor to ocsf.actor".to_string(),
167 vec!["actor".to_string()],
168 "ocsf.actor".to_string(),
169 LogsSchemaRemapperType::SCHEMA_REMAPPER,
170 )
171 .preserve_source(false),
172 )),
173 LogsSchemaMapper::LogsSchemaRemapper(Box::new(
174 LogsSchemaRemapper::new(
175 "Map cloud to ocsf.cloud".to_string(),
176 vec!["cloud".to_string()],
177 "ocsf.cloud".to_string(),
178 LogsSchemaRemapperType::SCHEMA_REMAPPER,
179 )
180 .preserve_source(false),
181 )),
182 LogsSchemaMapper::LogsSchemaRemapper(Box::new(
183 LogsSchemaRemapper::new(
184 "Map http_request to ocsf.http_request".to_string(),
185 vec!["http_request".to_string()],
186 "ocsf.http_request".to_string(),
187 LogsSchemaRemapperType::SCHEMA_REMAPPER,
188 )
189 .preserve_source(false),
190 )),
191 LogsSchemaMapper::LogsSchemaRemapper(Box::new(
192 LogsSchemaRemapper::new(
193 "Map metadata to ocsf.metadata".to_string(),
194 vec!["metadata".to_string()],
195 "ocsf.metadata".to_string(),
196 LogsSchemaRemapperType::SCHEMA_REMAPPER,
197 )
198 .preserve_source(false),
199 )),
200 LogsSchemaMapper::LogsSchemaRemapper(Box::new(
201 LogsSchemaRemapper::new(
202 "Map time to ocsf.time".to_string(),
203 vec!["time".to_string()],
204 "ocsf.time".to_string(),
205 LogsSchemaRemapperType::SCHEMA_REMAPPER,
206 )
207 .preserve_source(false),
208 )),
209 LogsSchemaMapper::LogsSchemaRemapper(Box::new(
210 LogsSchemaRemapper::new(
211 "Map src_endpoint to ocsf.src_endpoint".to_string(),
212 vec!["src_endpoint".to_string()],
213 "ocsf.src_endpoint".to_string(),
214 LogsSchemaRemapperType::SCHEMA_REMAPPER,
215 )
216 .preserve_source(false),
217 )),
218 LogsSchemaMapper::LogsSchemaRemapper(Box::new(
219 LogsSchemaRemapper::new(
220 "Map severity to ocsf.severity".to_string(),
221 vec!["severity".to_string()],
222 "ocsf.severity".to_string(),
223 LogsSchemaRemapperType::SCHEMA_REMAPPER,
224 )
225 .preserve_source(false),
226 )),
227 LogsSchemaMapper::LogsSchemaRemapper(Box::new(
228 LogsSchemaRemapper::new(
229 "Map severity_id to ocsf.severity_id".to_string(),
230 vec!["severity_id".to_string()],
231 "ocsf.severity_id".to_string(),
232 LogsSchemaRemapperType::SCHEMA_REMAPPER,
233 )
234 .preserve_source(false),
235 )),
236 ],
237 "Apply OCSF schema for 3001".to_string(),
238 LogsSchemaData::new(
239 "Account Change".to_string(),
240 3001,
241 "ocsf".to_string(),
242 "1.5.0".to_string(),
243 )
244 .profiles(vec!["cloud".to_string(), "datetime".to_string()]),
245 LogsSchemaProcessorType::SCHEMA_PROCESSOR,
246 )
247 .is_enabled(true),
248 ))])
249 .tags(vec![]);
250 let configuration = datadog::Configuration::new();
251 let api = LogsPipelinesAPI::with_config(configuration);
252 let resp = api.create_logs_pipeline(body).await;
253 if let Ok(value) = resp {
254 println!("{:#?}", value);
255 } else {
256 println!("{:#?}", resp.unwrap_err());
257 }
258}