talos_certifier/services/
decision_outbox_service.rs

1use std::sync::Arc;
2
3use ahash::HashMap;
4use async_trait::async_trait;
5use log::{debug, error};
6
7use time::OffsetDateTime;
8use tokio::sync::mpsc;
9
10use crate::core::ServiceResult;
11use crate::model::decision_headers::{DecisionCertHeaders, DecisionHeaderBuilder, DecisionMetaHeaders};
12use crate::model::DEFAULT_DECISION_MESSAGE_VERSION;
13use crate::{
14    core::{DecisionOutboxChannelMessage, System, SystemService},
15    errors::{SystemServiceError, SystemServiceErrorKind},
16    model::DecisionMessage,
17    ports::{DecisionStore, MessagePublisher},
18    SystemMessage,
19};
20
21pub struct DecisionOutboxService {
22    pub system: System,
23    pub decision_outbox_channel_tx: mpsc::Sender<DecisionOutboxChannelMessage>,
24    pub decision_outbox_channel_rx: mpsc::Receiver<DecisionOutboxChannelMessage>,
25    pub decision_store: Arc<Box<dyn DecisionStore<Decision = DecisionMessage> + Sync + Send>>,
26    pub decision_publisher: Arc<Box<dyn MessagePublisher + Sync + Send>>,
27}
28
29impl DecisionOutboxService {
30    pub fn new(
31        decision_outbox_channel_tx: mpsc::Sender<DecisionOutboxChannelMessage>,
32        decision_outbox_channel_rx: mpsc::Receiver<DecisionOutboxChannelMessage>,
33        decision_store: Arc<Box<dyn DecisionStore<Decision = DecisionMessage> + Sync + Send>>,
34        decision_publisher: Arc<Box<dyn MessagePublisher + Sync + Send>>,
35        system: System,
36    ) -> Self {
37        Self {
38            system,
39            decision_store,
40            decision_publisher,
41            decision_outbox_channel_tx,
42            decision_outbox_channel_rx,
43        }
44    }
45
46    pub async fn save_decision_to_xdb(
47        datastore: &Arc<Box<dyn DecisionStore<Decision = DecisionMessage> + Send + Sync>>,
48        decision_message: &DecisionMessage,
49    ) -> ServiceResult<DecisionMessage> {
50        let xid = decision_message.xid.clone();
51
52        let started_at = OffsetDateTime::now_utc().unix_timestamp_nanos();
53        let mut decision = datastore
54            .insert_decision(xid, decision_message.clone())
55            .await
56            .map_err(|insert_error| SystemServiceError {
57                kind: SystemServiceErrorKind::DBError,
58                reason: format!("Datastore error kind = {:?} and reason = {}", insert_error.kind, insert_error.reason),
59                data: insert_error.data,
60                service: "Decision Outbox Service".to_string(),
61            })?;
62        let finished_at = OffsetDateTime::now_utc().unix_timestamp_nanos();
63
64        if decision.version.ne(&decision_message.version) {
65            decision = DecisionMessage {
66                duplicate_version: Some(decision_message.version),
67                agent: decision_message.agent.clone(),
68                ..decision
69            }
70        }
71
72        decision.metrics.db_save_started = started_at;
73        decision.metrics.db_save_ended = finished_at;
74        Ok(decision)
75    }
76
77    pub async fn publish_decision(
78        publisher: &Arc<Box<dyn MessagePublisher + Send + Sync>>,
79        decision_message: &DecisionMessage,
80        headers: HashMap<String, String>,
81    ) -> ServiceResult {
82        let xid = decision_message.xid.clone();
83        let decision_str = serde_json::to_string(&decision_message).map_err(|e| {
84            Box::new(SystemServiceError {
85                kind: SystemServiceErrorKind::ParseError,
86                reason: format!("Error serializing decision message to string - {}", e),
87                data: Some(format!("{:?}", decision_message)),
88                service: "Decision Outbox Service".to_string(),
89            })
90        })?;
91
92        debug!("Publishing message {}", decision_message.version);
93        publisher.publish_message(xid.as_str(), &decision_str, headers).await.map_err(|publish_error| {
94            Box::new(SystemServiceError {
95                kind: SystemServiceErrorKind::MessagePublishError,
96                reason: publish_error.reason,
97                data: publish_error.data, //Some(format!("{:?}", decision_message)),
98                service: "Decision Outbox Service".to_string(),
99            })
100        })
101    }
102}
103
104#[async_trait]
105impl SystemService for DecisionOutboxService {
106    async fn run(&mut self) -> ServiceResult {
107        let datastore = Arc::clone(&self.decision_store);
108        let publisher = Arc::clone(&self.decision_publisher);
109        let system = self.system.clone();
110
111        if let Some(decision_channel_message) = self.decision_outbox_channel_rx.recv().await {
112            let DecisionOutboxChannelMessage {
113                headers,
114                message: decision_message,
115            } = decision_channel_message;
116            tokio::spawn({
117                let decision_headers: DecisionHeaderBuilder<crate::model::decision_headers::MetaHeaders, crate::model::decision_headers::NoCertHeaders> =
118                    DecisionHeaderBuilder::with_additional_headers(headers.into()).add_meta_headers(DecisionMetaHeaders::new(
119                        DEFAULT_DECISION_MESSAGE_VERSION, // major version of decision message
120                        self.system.name.clone(),
121                        None,
122                    ));
123
124                async move {
125                    match DecisionOutboxService::save_decision_to_xdb(&datastore, &decision_message).await {
126                        Ok(decision) => {
127                            if let Err(publish_error) = DecisionOutboxService::publish_decision(
128                                &publisher,
129                                &decision,
130                                decision_headers.add_cert_headers(DecisionCertHeaders::new(&decision)).build().into(),
131                            )
132                            .await
133                            {
134                                error!(
135                                    "Error publishing message for version={} with reason={:?}",
136                                    decision.version,
137                                    publish_error.to_string()
138                                );
139                            }
140                        }
141                        Err(db_error) => {
142                            error!("Error saving decision to XDB with reason={:?}", db_error.to_string());
143                            system.system_notifier.send(SystemMessage::ShutdownWithError(db_error)).unwrap();
144                        }
145                    };
146                }
147            });
148        };
149
150        Ok(())
151    }
152}