1use crate::mutator_protocol::mutator::ActuatorDescriptor;
2use crate::mutator_server::MUTATOR_API_KEY_HEADER;
3use async_trait::async_trait;
4use axum::{extract::FromRequestParts, http::StatusCode, Router};
5use std::collections::BTreeMap;
6use std::future::Future;
7use std::net::{SocketAddr, TcpListener};
8use std::sync::Arc;
9use utoipa::{
10 openapi::security::{ApiKey, ApiKeyValue, SecurityScheme},
11 Modify, OpenApi,
12};
13use utoipa_swagger_ui::SwaggerUi;
14
15pub async fn serve_mutators(
16 mutators: BTreeMap<String, Box<dyn ActuatorDescriptor + Send>>,
17 required_api_key_value: Option<String>,
18 addr: impl Into<SocketAddr>,
19 shutdown_signal: impl Future<Output = ()> + Send + 'static,
20) {
21 let listener = TcpListener::bind(addr.into()).unwrap();
22 serve_mutators_on_listener(mutators, required_api_key_value, listener, shutdown_signal).await
23}
24
25pub async fn serve_mutators_on_listener(
26 mutators: BTreeMap<String, Box<dyn ActuatorDescriptor + Send>>,
27 required_api_key_value: Option<String>,
28 listener: TcpListener,
29 shutdown_signal: impl Future<Output = ()> + Send + 'static,
30) {
31 let store = mutator::Store {
32 required_api_key_value,
33 mutators: Arc::new(tokio::sync::Mutex::new(mutators)),
34 };
35
36 let routes = Router::new()
37 .merge(mutator::routes().with_state(store))
38 .merge(swagger_routes());
39
40 let server = axum::Server::from_tcp(listener)
41 .expect("useable socket")
42 .serve(routes.into_make_service());
43
44 let addr = server.local_addr();
45 let server = server.with_graceful_shutdown(shutdown_signal);
46
47 tracing::debug!(%addr, "Serving mutator HTTP API");
48
49 if let Err(e) = server.await {
50 tracing::error!(
51 err = &e as &dyn std::error::Error,
52 "Error running mutator http server"
53 );
54 }
55}
56
57fn swagger_routes() -> Router {
58 use crate::mutator_server::{Mutation, Mutator};
59 #[derive(OpenApi)]
60 #[openapi(
61 paths(mutator::list_mutators, mutator::create_mutation, mutator::delete_mutations),
62 components(schemas(Mutator, Mutation)),
63 modifiers(&SecurityAddon),
64 tags(
65 (name = "mutator", description = "Mutator API")
66 )
67 )]
68 struct ApiDoc;
69
70 struct SecurityAddon;
71 impl Modify for SecurityAddon {
72 fn modify(&self, openapi: &mut utoipa::openapi::OpenApi) {
73 if let Some(components) = openapi.components.as_mut() {
74 components.add_security_scheme(
75 "api_key",
76 SecurityScheme::ApiKey(ApiKey::Header(ApiKeyValue::new(
77 MUTATOR_API_KEY_HEADER,
78 ))),
79 )
80 }
81 }
82 }
83
84 Router::new().merge(SwaggerUi::new("/swagger-ui").url("/api-doc.json", ApiDoc::openapi()))
85}
86
87pub(crate) mod mutator {
88 use crate::mutator_server::{GetAllMutatorsResponse, Mutation, Mutator};
89 use std::collections::BTreeMap;
90 use std::{convert::Infallible, sync::Arc};
91
92 use crate::mutator_protocol::mutator::ActuatorDescriptor;
93 use axum::extract::{Path, State};
94 use axum::http::StatusCode;
95 use axum::routing::{get, post};
96 use axum::{Json, Router};
97
98 use super::ValidApiKeyHeader;
99
100 mod url_path_part {
102 use percent_encoding::percent_decode_str;
103 use std::str::FromStr;
104 use std::string::ToString;
105
106 #[derive(Clone, Debug)]
110 #[repr(transparent)]
111 pub struct UrlPathPart(String);
112
113 impl FromStr for UrlPathPart {
116 type Err = std::str::Utf8Error;
117 #[inline]
118 fn from_str(s: &str) -> Result<Self, Self::Err> {
119 Ok(UrlPathPart(
120 percent_decode_str(s).decode_utf8()?.to_string(),
121 ))
122 }
123 }
124 impl From<UrlPathPart> for String {
125 fn from(v: UrlPathPart) -> Self {
126 v.0
127 }
128 }
129 impl AsRef<str> for UrlPathPart {
130 fn as_ref(&self) -> &str {
131 self.0.as_str()
132 }
133 }
134 }
135
136 #[derive(Clone)]
137 pub struct Store {
138 pub required_api_key_value: Option<String>,
139 pub mutators: Arc<tokio::sync::Mutex<BTreeMap<String, Box<dyn ActuatorDescriptor + Send>>>>,
140 }
141
142 pub fn routes() -> Router<Store> {
143 Router::new().route("/mutator", get(list_mutators)).route(
144 "/mutator/:mutator_correlation_id/mutation",
145 post(create_mutation).delete(delete_mutations),
146 )
147 }
148
149 #[utoipa::path(
151 get,
152 path = "/mutator",
153 responses(
154 (status = 200, description = "List mutators successfully", body = [Mutator]),
155 (status = 400, description = "Missing mutator_apikey request header"),
156 (status = 401, description = "Unauthorized to view mutators")
157 ),
158 security(
159 ("api_key" = [])
160 )
161 )]
162 pub async fn list_mutators(
163 State(store): State<Store>,
164 _h: ValidApiKeyHeader,
165 ) -> Result<Json<GetAllMutatorsResponse>, Infallible> {
166 let mutators_map = store.mutators.lock().await;
167
168 let mut mutator_components: Vec<Mutator> = vec![];
169 for (corr_id, actuator_descriptor) in mutators_map.iter() {
170 let attr_iter = actuator_descriptor.get_description_attributes();
171 mutator_components.push(Mutator {
172 mutator_correlation_id: corr_id.clone(),
173 attributes: attr_iter.collect(),
174 })
175 }
176
177 Ok(Json(mutator_components))
178 }
179
180 #[utoipa::path(
182 post,
183 path = "/mutator/{mutator_correlation_id}/mutation",
184 request_body = Mutation,
185 responses(
186 (status = 201, description = "Mutation created successfully"),
187 (status = 400, description = "Missing mutator_apikey request header"),
188 (status = 401, description = "Unauthorized to create mutations"),
189 (status = 404, description = "Mutator not found"),
190 (status = 500, description = "Internal mutator error")
191 ),
192 params(
193 ("mutator_correlation_id" = String, Path, description = "Mutator's server-local correlation id")
194 ),
195 security(
196 ("api_key" = [])
197 )
198 )]
199 pub async fn create_mutation(
200 State(store): State<Store>,
201 _h: ValidApiKeyHeader,
202 Path(mutator_correlation_id): Path<String>,
203 Json(mutation): Json<Mutation>,
204 ) -> Result<StatusCode, StatusCode> {
205 tracing::debug!(%mutator_correlation_id);
206 let mut mutators = store.mutators.lock().await;
207
208 let actuator_descriptor = mutators
209 .get_mut(&mutator_correlation_id)
210 .ok_or(StatusCode::NOT_FOUND)?;
211
212 match actuator_descriptor
213 .inject(mutation.mutation, mutation.params)
214 .await
215 {
216 Ok(()) => Ok(StatusCode::CREATED),
217 Err(err) => {
218 tracing::error!(
219 err = err.as_ref() as &dyn std::error::Error,
220 "Failed to inject mutation"
221 );
222 Err(StatusCode::INTERNAL_SERVER_ERROR)
223 }
224 }
225 }
226
227 #[utoipa::path(
229 delete,
230 path = "/mutator/{mutator_correlation_id}/mutation",
231 responses(
232 (status = 200, description = "Mutations reset / deleted successful"),
233 (status = 400, description = "Missing mutator_apikey request header"),
234 (status = 401, description = "Unauthorized to delete mutations"),
235 (status = 404, description = "Mutator not found to delete mutations"),
236 ),
237 params(
238 ("mutator_correlation_id" = String, Path, description = "Mutator's server-local correlation id")
239 ),
240 security(
241 ("api_key" = [])
242 )
243 )]
244 pub async fn delete_mutations(
245 State(store): State<Store>,
246 Path(mutator_correlation_id): Path<String>,
247 _h: ValidApiKeyHeader,
248 ) -> Result<StatusCode, StatusCode> {
249 tracing::debug!(%mutator_correlation_id);
250 let mut mutators = store.mutators.lock().await;
251
252 let actuator_descriptor = mutators
253 .get_mut(&mutator_correlation_id)
254 .ok_or(StatusCode::NOT_FOUND)?;
255 match actuator_descriptor.reset().await {
256 Ok(()) => Ok(StatusCode::OK),
257 Err(err) => {
258 tracing::error!(
259 err = err.as_ref() as &dyn std::error::Error,
260 "Failed to delete mutation"
261 );
262 Err(StatusCode::INTERNAL_SERVER_ERROR)
263 }
264 }
265 }
266}
267
268pub struct ValidApiKeyHeader(());
270
271#[async_trait]
272impl FromRequestParts<mutator::Store> for ValidApiKeyHeader {
273 type Rejection = (StatusCode, &'static str);
274
275 async fn from_request_parts(
276 parts: &mut axum::http::request::Parts,
277 store: &mutator::Store,
278 ) -> Result<Self, Self::Rejection> {
279 let api_key_bytes = parts.headers.get(MUTATOR_API_KEY_HEADER).ok_or((
280 StatusCode::BAD_REQUEST,
281 "Missing required header 'mutator_apikey'",
282 ))?;
283
284 let api_key = std::str::from_utf8(api_key_bytes.as_bytes())
285 .map_err(|_| (StatusCode::BAD_REQUEST, "Malformed 'mutator_apikey' header"))?;
286
287 if let Some(required_api_key_value) = store.required_api_key_value.as_ref() {
288 if api_key != required_api_key_value {
289 return Err((StatusCode::UNAUTHORIZED, "Invalid 'mutator_apikey'"));
290 }
291 }
292
293 Ok(ValidApiKeyHeader(()))
294 }
295}
296
297#[cfg(test)]
298mod tests {
299 use super::*;
300 use crate::api::{AttrKey, AttrType, AttrVal};
301 use crate::mutator_protocol::actuator::MutatorActuator;
302 use crate::mutator_protocol::descriptor::owned::{
303 MutatorOperation, OrganizationCustomMetadata, OwnedMutatorDescriptor,
304 OwnedMutatorParamDescriptor,
305 };
306 use crate::mutator_protocol::descriptor::MutatorDescriptor;
307 use crate::mutator_server::server::serve_mutators_on_listener;
308 use crate::mutator_server::{Mutation, Mutator};
309 use async_trait::async_trait;
310 use std::net::TcpListener;
311 use std::str::FromStr;
312 use std::sync::atomic::{AtomicI64, Ordering};
313 use std::time::Duration;
314 use tokio::sync::oneshot::Sender;
315 use uuid::Uuid;
316
317 #[tokio::test]
318 async fn it_works() {
319 let listener = TcpListener::bind("localhost:0").unwrap();
320 let addr = listener.local_addr().unwrap();
321 let (shutdown_tx, shutdown_rx): (Sender<()>, _) = tokio::sync::oneshot::channel();
322 let mut mutators_map: BTreeMap<_, Box<(dyn ActuatorDescriptor + Send + 'static)>> =
323 BTreeMap::new();
324 let original_value = 31;
325 let atomic_inner_state = Arc::new(AtomicI64::new(original_value));
326 mutators_map.insert(
327 "abc".to_string(),
328 Box::new(AtomicMutator::new(atomic_inner_state.clone())),
329 );
330 let mutators = mutators_map;
331 let server_fut = serve_mutators_on_listener(mutators, None, listener, async {
332 shutdown_rx.await.ok();
333 });
334 let join_handle = tokio::spawn(server_fut);
335 tokio::time::sleep(Duration::from_secs(1)).await;
336 let mutator_url = reqwest::Url::from_str(&format!("http://{}/mutator", addr)).unwrap();
338 let client = reqwest::Client::builder().build().unwrap();
339 let mutators_resp = client
340 .get(mutator_url)
341 .header(MUTATOR_API_KEY_HEADER, "whatever")
342 .send()
343 .await
344 .unwrap();
345 assert_eq!(reqwest::StatusCode::OK, mutators_resp.status());
346 let mutators: Vec<Mutator> = mutators_resp.json().await.unwrap();
347 assert_eq!(1, mutators.len());
348 assert_eq!(
349 &AttrVal::String("foo".into()),
350 mutators[0].attributes.get(&"mutator.name".into()).unwrap()
351 );
352
353 let mutation_url =
355 reqwest::Url::from_str(&format!("http://{}/mutator/abc/mutation", addr)).unwrap();
356 let mut mutation_params = BTreeMap::new();
357 let set_to_value = 42;
358 mutation_params.insert(
359 AttrKey::from(MutatorOperation::SetToValue.name()),
360 AttrVal::Integer(set_to_value),
361 );
362 let mutation = Mutation {
363 mutation: Default::default(),
364 params: mutation_params,
365 };
366 let mutation_resp = client
367 .post(mutation_url.clone())
368 .json(&mutation)
369 .header(MUTATOR_API_KEY_HEADER, "whatever")
370 .send()
371 .await
372 .unwrap();
373 assert_eq!(reqwest::StatusCode::CREATED, mutation_resp.status());
374 assert_eq!(set_to_value, atomic_inner_state.load(Ordering::SeqCst));
375
376 let reset_resp = client
377 .delete(mutation_url)
378 .header(MUTATOR_API_KEY_HEADER, "whatever")
379 .send()
380 .await
381 .unwrap();
382 assert_eq!(reqwest::StatusCode::OK, reset_resp.status());
383 assert_eq!(original_value, atomic_inner_state.load(Ordering::SeqCst));
384
385 let _ = shutdown_tx.send(());
386 let join_res = join_handle.await;
387 assert!(join_res.is_ok());
388 }
389
390 pub struct AtomicMutator {
391 initial: i64,
392 inner: Arc<AtomicI64>,
393 }
394 impl AtomicMutator {
395 pub fn new(inner: Arc<AtomicI64>) -> Self {
396 Self {
397 initial: inner.load(Ordering::SeqCst),
398 inner,
399 }
400 }
401
402 fn description() -> OwnedMutatorDescriptor {
403 OwnedMutatorDescriptor {
404 name: Some("foo".into()),
405 description: None,
406 layer: None,
407 group: None,
408 operation: Some(MutatorOperation::SetToValue),
409 statefulness: None,
410 organization_custom_metadata: Some(
411 OrganizationCustomMetadata::new(
412 "some_jerks".to_owned(),
413 std::iter::once(("fleet".to_owned(), AttrVal::Integer(99))).collect(),
414 )
415 .unwrap(),
416 ),
417 params: vec![OwnedMutatorParamDescriptor::new(
418 AttrType::Integer,
419 MutatorOperation::SetToValue.name().to_owned(),
420 )
421 .unwrap()],
422 }
423 }
424 }
425
426 impl ActuatorDescriptor for AtomicMutator {}
427
428 impl MutatorDescriptor for AtomicMutator {
429 fn get_description_attributes(&self) -> Box<dyn Iterator<Item = (AttrKey, AttrVal)> + '_> {
430 let desc = AtomicMutator::description();
432 let attrs: Vec<_> = desc.get_description_attributes().collect();
433 Box::new(attrs.into_iter())
434 }
435 }
436
437 #[async_trait]
438 impl MutatorActuator for AtomicMutator {
439 async fn inject(
440 &mut self,
441 _mutation_id: Uuid,
442 mut params: BTreeMap<AttrKey, AttrVal>,
443 ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
444 let v = params
445 .remove(&AttrKey::from(MutatorOperation::SetToValue.name()))
446 .expect("Expected the set_to_value parameter");
447 if let AttrVal::Integer(i) = v {
448 self.inner.store(i, Ordering::SeqCst);
449 } else {
450 panic!("Unexpected param of value {:?} for set_to_value", v);
451 }
452 Ok(())
453 }
454
455 async fn reset(&mut self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
456 self.inner.store(self.initial, Ordering::SeqCst);
457 Ok(())
458 }
459 }
460}