auxon_sdk/mutator_server/
server.rs

1use crate::mutator_protocol::mutator::ActuatorDescriptor;
2use crate::mutator_server::MUTATOR_API_KEY_HEADER;
3use async_trait::async_trait;
4use axum::{extract::FromRequestParts, http::StatusCode, Router};
5use std::collections::BTreeMap;
6use std::future::Future;
7use std::net::{SocketAddr, TcpListener};
8use std::sync::Arc;
9use utoipa::{
10    openapi::security::{ApiKey, ApiKeyValue, SecurityScheme},
11    Modify, OpenApi,
12};
13use utoipa_swagger_ui::SwaggerUi;
14
15pub async fn serve_mutators(
16    mutators: BTreeMap<String, Box<dyn ActuatorDescriptor + Send>>,
17    required_api_key_value: Option<String>,
18    addr: impl Into<SocketAddr>,
19    shutdown_signal: impl Future<Output = ()> + Send + 'static,
20) {
21    let listener = TcpListener::bind(addr.into()).unwrap();
22    serve_mutators_on_listener(mutators, required_api_key_value, listener, shutdown_signal).await
23}
24
25pub async fn serve_mutators_on_listener(
26    mutators: BTreeMap<String, Box<dyn ActuatorDescriptor + Send>>,
27    required_api_key_value: Option<String>,
28    listener: TcpListener,
29    shutdown_signal: impl Future<Output = ()> + Send + 'static,
30) {
31    let store = mutator::Store {
32        required_api_key_value,
33        mutators: Arc::new(tokio::sync::Mutex::new(mutators)),
34    };
35
36    let routes = Router::new()
37        .merge(mutator::routes().with_state(store))
38        .merge(swagger_routes());
39
40    let server = axum::Server::from_tcp(listener)
41        .expect("useable socket")
42        .serve(routes.into_make_service());
43
44    let addr = server.local_addr();
45    let server = server.with_graceful_shutdown(shutdown_signal);
46
47    tracing::debug!(%addr, "Serving mutator HTTP API");
48
49    if let Err(e) = server.await {
50        tracing::error!(
51            err = &e as &dyn std::error::Error,
52            "Error running mutator http server"
53        );
54    }
55}
56
57fn swagger_routes() -> Router {
58    use crate::mutator_server::{Mutation, Mutator};
59    #[derive(OpenApi)]
60    #[openapi(
61        paths(mutator::list_mutators, mutator::create_mutation, mutator::delete_mutations),
62        components(schemas(Mutator, Mutation)),
63        modifiers(&SecurityAddon),
64        tags(
65            (name = "mutator", description = "Mutator API")
66        )
67    )]
68    struct ApiDoc;
69
70    struct SecurityAddon;
71    impl Modify for SecurityAddon {
72        fn modify(&self, openapi: &mut utoipa::openapi::OpenApi) {
73            if let Some(components) = openapi.components.as_mut() {
74                components.add_security_scheme(
75                    "api_key",
76                    SecurityScheme::ApiKey(ApiKey::Header(ApiKeyValue::new(
77                        MUTATOR_API_KEY_HEADER,
78                    ))),
79                )
80            }
81        }
82    }
83
84    Router::new().merge(SwaggerUi::new("/swagger-ui").url("/api-doc.json", ApiDoc::openapi()))
85}
86
87pub(crate) mod mutator {
88    use crate::mutator_server::{GetAllMutatorsResponse, Mutation, Mutator};
89    use std::collections::BTreeMap;
90    use std::{convert::Infallible, sync::Arc};
91
92    use crate::mutator_protocol::mutator::ActuatorDescriptor;
93    use axum::extract::{Path, State};
94    use axum::http::StatusCode;
95    use axum::routing::{get, post};
96    use axum::{Json, Router};
97
98    use super::ValidApiKeyHeader;
99
100    /// See: <https://github.com/seanmonstar/warp/issues/242>.
101    mod url_path_part {
102        use percent_encoding::percent_decode_str;
103        use std::str::FromStr;
104        use std::string::ToString;
105
106        /// A type intended exclusively for use in warp path parameter parsing.
107        /// Do *not* use the `FromStr::from_str` implementation in any other context,
108        /// because it assumes that the input is already percent-encoded.
109        #[derive(Clone, Debug)]
110        #[repr(transparent)]
111        pub struct UrlPathPart(String);
112
113        /// Warning! This FromStr implementation is not for general purpose use.
114        /// It assumes the input `str` contains percent-encoded content.
115        impl FromStr for UrlPathPart {
116            type Err = std::str::Utf8Error;
117            #[inline]
118            fn from_str(s: &str) -> Result<Self, Self::Err> {
119                Ok(UrlPathPart(
120                    percent_decode_str(s).decode_utf8()?.to_string(),
121                ))
122            }
123        }
124        impl From<UrlPathPart> for String {
125            fn from(v: UrlPathPart) -> Self {
126                v.0
127            }
128        }
129        impl AsRef<str> for UrlPathPart {
130            fn as_ref(&self) -> &str {
131                self.0.as_str()
132            }
133        }
134    }
135
136    #[derive(Clone)]
137    pub struct Store {
138        pub required_api_key_value: Option<String>,
139        pub mutators: Arc<tokio::sync::Mutex<BTreeMap<String, Box<dyn ActuatorDescriptor + Send>>>>,
140    }
141
142    pub fn routes() -> Router<Store> {
143        Router::new().route("/mutator", get(list_mutators)).route(
144            "/mutator/:mutator_correlation_id/mutation",
145            post(create_mutation).delete(delete_mutations),
146        )
147    }
148
149    /// List all mutators.
150    #[utoipa::path(
151        get,
152        path = "/mutator",
153        responses(
154            (status = 200, description = "List mutators successfully", body = [Mutator]),
155            (status = 400, description = "Missing mutator_apikey request header"),
156            (status = 401, description = "Unauthorized to view mutators")
157        ),
158        security(
159            ("api_key" = [])
160        )
161    )]
162    pub async fn list_mutators(
163        State(store): State<Store>,
164        _h: ValidApiKeyHeader,
165    ) -> Result<Json<GetAllMutatorsResponse>, Infallible> {
166        let mutators_map = store.mutators.lock().await;
167
168        let mut mutator_components: Vec<Mutator> = vec![];
169        for (corr_id, actuator_descriptor) in mutators_map.iter() {
170            let attr_iter = actuator_descriptor.get_description_attributes();
171            mutator_components.push(Mutator {
172                mutator_correlation_id: corr_id.clone(),
173                attributes: attr_iter.collect(),
174            })
175        }
176
177        Ok(Json(mutator_components))
178    }
179
180    /// Create new mutation for a mutator.
181    #[utoipa::path(
182        post,
183        path = "/mutator/{mutator_correlation_id}/mutation",
184        request_body = Mutation,
185        responses(
186            (status = 201, description = "Mutation created successfully"),
187            (status = 400, description = "Missing mutator_apikey request header"),
188            (status = 401, description = "Unauthorized to create mutations"),
189            (status = 404, description = "Mutator not found"),
190            (status = 500, description = "Internal mutator error")
191        ),
192        params(
193            ("mutator_correlation_id" = String, Path, description = "Mutator's server-local correlation id")
194        ),
195        security(
196            ("api_key" = [])
197        )
198    )]
199    pub async fn create_mutation(
200        State(store): State<Store>,
201        _h: ValidApiKeyHeader,
202        Path(mutator_correlation_id): Path<String>,
203        Json(mutation): Json<Mutation>,
204    ) -> Result<StatusCode, StatusCode> {
205        tracing::debug!(%mutator_correlation_id);
206        let mut mutators = store.mutators.lock().await;
207
208        let actuator_descriptor = mutators
209            .get_mut(&mutator_correlation_id)
210            .ok_or(StatusCode::NOT_FOUND)?;
211
212        match actuator_descriptor
213            .inject(mutation.mutation, mutation.params)
214            .await
215        {
216            Ok(()) => Ok(StatusCode::CREATED),
217            Err(err) => {
218                tracing::error!(
219                    err = err.as_ref() as &dyn std::error::Error,
220                    "Failed to inject mutation"
221                );
222                Err(StatusCode::INTERNAL_SERVER_ERROR)
223            }
224        }
225    }
226
227    /// Delete / reset all mutations for a mutator
228    #[utoipa::path(
229        delete,
230        path = "/mutator/{mutator_correlation_id}/mutation",
231        responses(
232            (status = 200, description = "Mutations reset / deleted successful"),
233            (status = 400, description = "Missing mutator_apikey request header"),
234            (status = 401, description = "Unauthorized to delete mutations"),
235            (status = 404, description = "Mutator not found to delete mutations"),
236        ),
237        params(
238            ("mutator_correlation_id" = String, Path, description = "Mutator's server-local correlation id")
239        ),
240        security(
241            ("api_key" = [])
242        )
243    )]
244    pub async fn delete_mutations(
245        State(store): State<Store>,
246        Path(mutator_correlation_id): Path<String>,
247        _h: ValidApiKeyHeader,
248    ) -> Result<StatusCode, StatusCode> {
249        tracing::debug!(%mutator_correlation_id);
250        let mut mutators = store.mutators.lock().await;
251
252        let actuator_descriptor = mutators
253            .get_mut(&mutator_correlation_id)
254            .ok_or(StatusCode::NOT_FOUND)?;
255        match actuator_descriptor.reset().await {
256            Ok(()) => Ok(StatusCode::OK),
257            Err(err) => {
258                tracing::error!(
259                    err = err.as_ref() as &dyn std::error::Error,
260                    "Failed to delete mutation"
261                );
262                Err(StatusCode::INTERNAL_SERVER_ERROR)
263            }
264        }
265    }
266}
267
268/// An extractor to read the 'mutator_apikey' header and check that it's the right one.
269pub struct ValidApiKeyHeader(());
270
271#[async_trait]
272impl FromRequestParts<mutator::Store> for ValidApiKeyHeader {
273    type Rejection = (StatusCode, &'static str);
274
275    async fn from_request_parts(
276        parts: &mut axum::http::request::Parts,
277        store: &mutator::Store,
278    ) -> Result<Self, Self::Rejection> {
279        let api_key_bytes = parts.headers.get(MUTATOR_API_KEY_HEADER).ok_or((
280            StatusCode::BAD_REQUEST,
281            "Missing required header 'mutator_apikey'",
282        ))?;
283
284        let api_key = std::str::from_utf8(api_key_bytes.as_bytes())
285            .map_err(|_| (StatusCode::BAD_REQUEST, "Malformed 'mutator_apikey' header"))?;
286
287        if let Some(required_api_key_value) = store.required_api_key_value.as_ref() {
288            if api_key != required_api_key_value {
289                return Err((StatusCode::UNAUTHORIZED, "Invalid 'mutator_apikey'"));
290            }
291        }
292
293        Ok(ValidApiKeyHeader(()))
294    }
295}
296
297#[cfg(test)]
298mod tests {
299    use super::*;
300    use crate::api::{AttrKey, AttrType, AttrVal};
301    use crate::mutator_protocol::actuator::MutatorActuator;
302    use crate::mutator_protocol::descriptor::owned::{
303        MutatorOperation, OrganizationCustomMetadata, OwnedMutatorDescriptor,
304        OwnedMutatorParamDescriptor,
305    };
306    use crate::mutator_protocol::descriptor::MutatorDescriptor;
307    use crate::mutator_server::server::serve_mutators_on_listener;
308    use crate::mutator_server::{Mutation, Mutator};
309    use async_trait::async_trait;
310    use std::net::TcpListener;
311    use std::str::FromStr;
312    use std::sync::atomic::{AtomicI64, Ordering};
313    use std::time::Duration;
314    use tokio::sync::oneshot::Sender;
315    use uuid::Uuid;
316
317    #[tokio::test]
318    async fn it_works() {
319        let listener = TcpListener::bind("localhost:0").unwrap();
320        let addr = listener.local_addr().unwrap();
321        let (shutdown_tx, shutdown_rx): (Sender<()>, _) = tokio::sync::oneshot::channel();
322        let mut mutators_map: BTreeMap<_, Box<(dyn ActuatorDescriptor + Send + 'static)>> =
323            BTreeMap::new();
324        let original_value = 31;
325        let atomic_inner_state = Arc::new(AtomicI64::new(original_value));
326        mutators_map.insert(
327            "abc".to_string(),
328            Box::new(AtomicMutator::new(atomic_inner_state.clone())),
329        );
330        let mutators = mutators_map;
331        let server_fut = serve_mutators_on_listener(mutators, None, listener, async {
332            shutdown_rx.await.ok();
333        });
334        let join_handle = tokio::spawn(server_fut);
335        tokio::time::sleep(Duration::from_secs(1)).await;
336        // N.B. consider looping until we get a response
337        let mutator_url = reqwest::Url::from_str(&format!("http://{}/mutator", addr)).unwrap();
338        let client = reqwest::Client::builder().build().unwrap();
339        let mutators_resp = client
340            .get(mutator_url)
341            .header(MUTATOR_API_KEY_HEADER, "whatever")
342            .send()
343            .await
344            .unwrap();
345        assert_eq!(reqwest::StatusCode::OK, mutators_resp.status());
346        let mutators: Vec<Mutator> = mutators_resp.json().await.unwrap();
347        assert_eq!(1, mutators.len());
348        assert_eq!(
349            &AttrVal::String("foo".into()),
350            mutators[0].attributes.get(&"mutator.name".into()).unwrap()
351        );
352
353        // Do a mutation
354        let mutation_url =
355            reqwest::Url::from_str(&format!("http://{}/mutator/abc/mutation", addr)).unwrap();
356        let mut mutation_params = BTreeMap::new();
357        let set_to_value = 42;
358        mutation_params.insert(
359            AttrKey::from(MutatorOperation::SetToValue.name()),
360            AttrVal::Integer(set_to_value),
361        );
362        let mutation = Mutation {
363            mutation: Default::default(),
364            params: mutation_params,
365        };
366        let mutation_resp = client
367            .post(mutation_url.clone())
368            .json(&mutation)
369            .header(MUTATOR_API_KEY_HEADER, "whatever")
370            .send()
371            .await
372            .unwrap();
373        assert_eq!(reqwest::StatusCode::CREATED, mutation_resp.status());
374        assert_eq!(set_to_value, atomic_inner_state.load(Ordering::SeqCst));
375
376        let reset_resp = client
377            .delete(mutation_url)
378            .header(MUTATOR_API_KEY_HEADER, "whatever")
379            .send()
380            .await
381            .unwrap();
382        assert_eq!(reqwest::StatusCode::OK, reset_resp.status());
383        assert_eq!(original_value, atomic_inner_state.load(Ordering::SeqCst));
384
385        let _ = shutdown_tx.send(());
386        let join_res = join_handle.await;
387        assert!(join_res.is_ok());
388    }
389
390    pub struct AtomicMutator {
391        initial: i64,
392        inner: Arc<AtomicI64>,
393    }
394    impl AtomicMutator {
395        pub fn new(inner: Arc<AtomicI64>) -> Self {
396            Self {
397                initial: inner.load(Ordering::SeqCst),
398                inner,
399            }
400        }
401
402        fn description() -> OwnedMutatorDescriptor {
403            OwnedMutatorDescriptor {
404                name: Some("foo".into()),
405                description: None,
406                layer: None,
407                group: None,
408                operation: Some(MutatorOperation::SetToValue),
409                statefulness: None,
410                organization_custom_metadata: Some(
411                    OrganizationCustomMetadata::new(
412                        "some_jerks".to_owned(),
413                        std::iter::once(("fleet".to_owned(), AttrVal::Integer(99))).collect(),
414                    )
415                    .unwrap(),
416                ),
417                params: vec![OwnedMutatorParamDescriptor::new(
418                    AttrType::Integer,
419                    MutatorOperation::SetToValue.name().to_owned(),
420                )
421                .unwrap()],
422            }
423        }
424    }
425
426    impl ActuatorDescriptor for AtomicMutator {}
427
428    impl MutatorDescriptor for AtomicMutator {
429        fn get_description_attributes(&self) -> Box<dyn Iterator<Item = (AttrKey, AttrVal)> + '_> {
430            // Wasteful, but hey, it's a test util
431            let desc = AtomicMutator::description();
432            let attrs: Vec<_> = desc.get_description_attributes().collect();
433            Box::new(attrs.into_iter())
434        }
435    }
436
437    #[async_trait]
438    impl MutatorActuator for AtomicMutator {
439        async fn inject(
440            &mut self,
441            _mutation_id: Uuid,
442            mut params: BTreeMap<AttrKey, AttrVal>,
443        ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
444            let v = params
445                .remove(&AttrKey::from(MutatorOperation::SetToValue.name()))
446                .expect("Expected the set_to_value parameter");
447            if let AttrVal::Integer(i) = v {
448                self.inner.store(i, Ordering::SeqCst);
449            } else {
450                panic!("Unexpected param of value {:?} for set_to_value", v);
451            }
452            Ok(())
453        }
454
455        async fn reset(&mut self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
456            self.inner.store(self.initial, Ordering::SeqCst);
457            Ok(())
458        }
459    }
460}