asteroid_mq/protocol/node/raft/
proposal.rs1use std::collections::HashMap;
2
3use asteroid_mq_model::EndpointAddr;
4use serde::{Deserialize, Serialize};
5
6use crate::{
7 prelude::{MessageId, Node, TopicCode},
8 protocol::{message::*, node::durable_message::DurableCommand},
9};
10
11use super::state_machine::topic::wait_ack::WaitAckResult;
12pub(crate) mod ep_online;
13pub use ep_online::EndpointOnline;
14pub(crate) mod ep_offline;
15pub use ep_offline::EndpointOffline;
16pub(crate) mod ep_interest;
17pub use ep_interest::EndpointInterest;
18pub(crate) mod set_state;
19pub use set_state::*;
20pub(crate) mod load_topic;
21pub use load_topic::LoadTopic;
22pub(crate) mod unload_topic;
23pub use unload_topic::UnloadTopic;
24pub(crate) mod delegate_message;
25pub use delegate_message::DelegateMessage;
26pub(crate) mod ack_finished;
27pub use ack_finished::AckFinished;
28
29#[derive(Debug, Clone, Serialize, Deserialize)]
30#[repr(u8)]
31pub enum Proposal {
32 DelegateMessage(DelegateMessage),
34 SetState(SetState),
36 LoadTopic(LoadTopic),
38 UnloadTopic(UnloadTopic),
39 EpOnline(EndpointOnline),
41 EpOffline(EndpointOffline),
43 EpInterest(EndpointInterest),
45 AckFinished(AckFinished),
47}
48#[derive(Debug, Clone)]
49pub(crate) struct ProposalContext {
50 pub node: Node,
51 pub topic_code: Option<TopicCode>,
52 pub debug_ep_online: bool,
53}
54
55impl ProposalContext {
56 pub(crate) fn new(node: Node) -> Self {
57 Self {
58 node,
59 topic_code: None,
60 debug_ep_online: false,
61 }
62 }
63 pub(crate) fn push_durable_command(&mut self, command: DurableCommand) {
64 if let Some(topic_code) = self.topic_code.clone() {
65 self.node.push_durable_commands(Some((topic_code, command)));
66 }
67 }
68 pub(crate) fn set_topic_code(&mut self, code: TopicCode) {
69 self.topic_code = Some(code);
70 }
71 pub(crate) fn resolve_ack(&self, message_id: MessageId, result: WaitAckResult) {
72 let node = self.node.clone();
73 let node_id = self.node.id();
74 let Some(topic) = self.topic_code.clone() else {
75 tracing::warn!(?self.topic_code, "topic not found");
76 return;
77 };
78 tokio::spawn(async move {
79 let mut wg = node.ack_waiting_pool.write().await;
80 if let Some(mut tx) = wg.remove(&message_id) {
81 drop(wg);
82 tx.send(result);
83 let proposal_result = node
84 .propose(Proposal::AckFinished(AckFinished { message_id, topic }))
85 .await;
86 if let Err(e) = proposal_result {
87 tracing::warn!(error=%e, "ack finished proposal failed");
88 }
89 } else {
90 tracing::trace!(%node_id, %message_id, "ack wait handle not found");
91 }
92 });
93 }
94 #[tracing::instrument(skip(self))]
95 pub(crate) fn dispatch_message(&self, message: &Message, endpoint: EndpointAddr) -> bool {
97 let Some(ref code) = self.topic_code else {
98 tracing::warn!("topic code is not set");
100 return false;
101 };
102
103 let message = message.clone();
104 let code = code.clone();
105 let node = self.node.clone();
106 let message_id = message.id();
107 let status: MessageStatusKind;
108 if let Some(node_id) = node.blocking_get_edge_routing(&endpoint) {
109 tracing::debug!(%message_id, %node_id, ?endpoint, "dispatch message to edge");
110 if let Some(edge) = node.get_edge_connection(node_id) {
111 let result = edge.push_message(&endpoint, message);
112 status = if let Err(err) = result {
113 tracing::warn!(?err, "push message failed");
114 MessageStatusKind::Unreachable
115 } else {
116 MessageStatusKind::Sent
117 };
118 } else {
119 tracing::warn!(%message_id, %node_id, "edge connection not found when trying to dispatch message");
120 status = MessageStatusKind::Unreachable;
121 };
122 } else {
123 tracing::warn!(%message_id, "edge routing not found when trying to dispatch message");
124 status = MessageStatusKind::Unreachable;
125 }
126 tokio::spawn(async move {
127 let proposal_result = node
128 .propose(Proposal::SetState(SetState {
129 topic: code,
130 update: MessageStateUpdate::new(
131 message_id,
132 HashMap::from([(endpoint, status)]),
133 ),
134 }))
135 .await;
136 if let Err(err) = proposal_result {
137 tracing::error!(?err, "set state failed");
138 }
139 Some(())
140 });
141 return MessageStatusKind::Sent == status;
142 }
143}
144
145impl Node {
146 pub(self) fn push_durable_commands(
147 &self,
148 commands: impl IntoIterator<Item = (TopicCode, DurableCommand)>,
149 ) {
150 if let Some(service) = &self.durable_commit_service {
151 service
152 .durable_commands_queue
153 .write()
154 .unwrap()
155 .extend(commands);
156 }
157 }
158}