outbox_pattern_processor/
http_notification_service.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
use crate::app_state::AppState;
use crate::environment::Environment;
use crate::error::OutboxPatternProcessorError;
use crate::notification::NotificationResult;
use crate::outbox_destination::OutboxDestination;
use crate::outbox_group::GroupedOutboxed;
use regex::Regex;
use tracing::log::error;

pub struct HttpNotificationService;

impl HttpNotificationService {
    pub async fn send(
        app_state: &AppState,
        outboxes: &GroupedOutboxed,
    ) -> Result<NotificationResult, OutboxPatternProcessorError> {
        let mut notification_result = NotificationResult::default();

        for outbox in outboxes.http.clone() {
            for destination in outbox.destinations.0.clone() {
                if let OutboxDestination::HttpDestination(http) = destination {
                    let method = http.method.unwrap_or("POST".to_string()).to_uppercase();
                    let mut request = match method.as_str() {
                        "PUT" => app_state.http_gateway.client.put(&http.url),
                        "PATCH" => app_state.http_gateway.client.patch(&http.url),
                        _ => app_state.http_gateway.client.post(&http.url),
                    };

                    if let Some(headers) = http.headers {
                        for (key, value) in headers {
                            if let Ok(regex) = Regex::new("^\\{\\{[A-Z_]+}}$") {
                                if regex.is_match(&value) {
                                    let normalized_value_env_name = value.replace(['{', '}'], "");
                                    let env_value = Environment::string(&normalized_value_env_name, &value);
                                    request = request.header(key, env_value);
                                } else {
                                    request = request.header(key, value);
                                }
                            } else {
                                request = request.header(key, value);
                            }
                        }
                    }

                    if let Some(headers) = outbox.headers.clone() {
                        for (key, value) in headers.0 {
                            request = request.header(key, value);
                        }
                    }

                    request = request.header("x-idempotent-key", outbox.idempotent_key.to_string());

                    let result = request.body(outbox.payload.clone()).send().await;

                    if let Ok(response) = result {
                        if response.status().is_success() {
                            notification_result.sent.push(outbox.clone());
                        } else {
                            notification_result.failed.push(outbox.clone());
                            error!(
                                "Failed to send http notification for idempotent_key {} with status {} and body {}",
                                outbox.idempotent_key,
                                response.status(),
                                response.text().await.unwrap_or("unknown".to_string())
                            );
                        }
                    } else {
                        notification_result.failed.push(outbox.clone());
                        error!(
                            "Failed to send http notification cause {}",
                            result.err().map(|error| error.to_string()).unwrap_or("unknown".to_string())
                        );
                    }
                }
            }
        }

        Ok(notification_result)
    }
}