talos_certifier/services/
decision_outbox_service.rs1use std::sync::Arc;
2
3use ahash::HashMap;
4use async_trait::async_trait;
5use log::{debug, error};
6
7use time::OffsetDateTime;
8use tokio::sync::mpsc;
9
10use crate::core::ServiceResult;
11use crate::model::decision_headers::{DecisionCertHeaders, DecisionHeaderBuilder, DecisionMetaHeaders};
12use crate::model::DEFAULT_DECISION_MESSAGE_VERSION;
13use crate::{
14 core::{DecisionOutboxChannelMessage, System, SystemService},
15 errors::{SystemServiceError, SystemServiceErrorKind},
16 model::DecisionMessage,
17 ports::{DecisionStore, MessagePublisher},
18 SystemMessage,
19};
20
21pub struct DecisionOutboxService {
22 pub system: System,
23 pub decision_outbox_channel_tx: mpsc::Sender<DecisionOutboxChannelMessage>,
24 pub decision_outbox_channel_rx: mpsc::Receiver<DecisionOutboxChannelMessage>,
25 pub decision_store: Arc<Box<dyn DecisionStore<Decision = DecisionMessage> + Sync + Send>>,
26 pub decision_publisher: Arc<Box<dyn MessagePublisher + Sync + Send>>,
27}
28
29impl DecisionOutboxService {
30 pub fn new(
31 decision_outbox_channel_tx: mpsc::Sender<DecisionOutboxChannelMessage>,
32 decision_outbox_channel_rx: mpsc::Receiver<DecisionOutboxChannelMessage>,
33 decision_store: Arc<Box<dyn DecisionStore<Decision = DecisionMessage> + Sync + Send>>,
34 decision_publisher: Arc<Box<dyn MessagePublisher + Sync + Send>>,
35 system: System,
36 ) -> Self {
37 Self {
38 system,
39 decision_store,
40 decision_publisher,
41 decision_outbox_channel_tx,
42 decision_outbox_channel_rx,
43 }
44 }
45
46 pub async fn save_decision_to_xdb(
47 datastore: &Arc<Box<dyn DecisionStore<Decision = DecisionMessage> + Send + Sync>>,
48 decision_message: &DecisionMessage,
49 ) -> ServiceResult<DecisionMessage> {
50 let xid = decision_message.xid.clone();
51
52 let started_at = OffsetDateTime::now_utc().unix_timestamp_nanos();
53 let mut decision = datastore
54 .insert_decision(xid, decision_message.clone())
55 .await
56 .map_err(|insert_error| SystemServiceError {
57 kind: SystemServiceErrorKind::DBError,
58 reason: format!("Datastore error kind = {:?} and reason = {}", insert_error.kind, insert_error.reason),
59 data: insert_error.data,
60 service: "Decision Outbox Service".to_string(),
61 })?;
62 let finished_at = OffsetDateTime::now_utc().unix_timestamp_nanos();
63
64 if decision.version.ne(&decision_message.version) {
65 decision = DecisionMessage {
66 duplicate_version: Some(decision_message.version),
67 agent: decision_message.agent.clone(),
68 ..decision
69 }
70 }
71
72 decision.metrics.db_save_started = started_at;
73 decision.metrics.db_save_ended = finished_at;
74 Ok(decision)
75 }
76
77 pub async fn publish_decision(
78 publisher: &Arc<Box<dyn MessagePublisher + Send + Sync>>,
79 decision_message: &DecisionMessage,
80 headers: HashMap<String, String>,
81 ) -> ServiceResult {
82 let xid = decision_message.xid.clone();
83 let decision_str = serde_json::to_string(&decision_message).map_err(|e| {
84 Box::new(SystemServiceError {
85 kind: SystemServiceErrorKind::ParseError,
86 reason: format!("Error serializing decision message to string - {}", e),
87 data: Some(format!("{:?}", decision_message)),
88 service: "Decision Outbox Service".to_string(),
89 })
90 })?;
91
92 debug!("Publishing message {}", decision_message.version);
93 publisher.publish_message(xid.as_str(), &decision_str, headers).await.map_err(|publish_error| {
94 Box::new(SystemServiceError {
95 kind: SystemServiceErrorKind::MessagePublishError,
96 reason: publish_error.reason,
97 data: publish_error.data, service: "Decision Outbox Service".to_string(),
99 })
100 })
101 }
102}
103
104#[async_trait]
105impl SystemService for DecisionOutboxService {
106 async fn run(&mut self) -> ServiceResult {
107 let datastore = Arc::clone(&self.decision_store);
108 let publisher = Arc::clone(&self.decision_publisher);
109 let system = self.system.clone();
110
111 if let Some(decision_channel_message) = self.decision_outbox_channel_rx.recv().await {
112 let DecisionOutboxChannelMessage {
113 headers,
114 message: decision_message,
115 } = decision_channel_message;
116 tokio::spawn({
117 let decision_headers: DecisionHeaderBuilder<crate::model::decision_headers::MetaHeaders, crate::model::decision_headers::NoCertHeaders> =
118 DecisionHeaderBuilder::with_additional_headers(headers.into()).add_meta_headers(DecisionMetaHeaders::new(
119 DEFAULT_DECISION_MESSAGE_VERSION, self.system.name.clone(),
121 None,
122 ));
123
124 async move {
125 match DecisionOutboxService::save_decision_to_xdb(&datastore, &decision_message).await {
126 Ok(decision) => {
127 if let Err(publish_error) = DecisionOutboxService::publish_decision(
128 &publisher,
129 &decision,
130 decision_headers.add_cert_headers(DecisionCertHeaders::new(&decision)).build().into(),
131 )
132 .await
133 {
134 error!(
135 "Error publishing message for version={} with reason={:?}",
136 decision.version,
137 publish_error.to_string()
138 );
139 }
140 }
141 Err(db_error) => {
142 error!("Error saving decision to XDB with reason={:?}", db_error.to_string());
143 system.system_notifier.send(SystemMessage::ShutdownWithError(db_error)).unwrap();
144 }
145 };
146 }
147 });
148 };
149
150 Ok(())
151 }
152}