Skip to main content

asteroid_mq/protocol/node/raft/
proposal.rs

1use std::collections::HashMap;
2
3use asteroid_mq_model::EndpointAddr;
4use serde::{Deserialize, Serialize};
5
6use crate::{
7    prelude::{MessageId, Node, TopicCode},
8    protocol::{message::*, node::durable_message::DurableCommand},
9};
10
11use super::state_machine::topic::wait_ack::WaitAckResult;
12pub(crate) mod ep_online;
13pub use ep_online::EndpointOnline;
14pub(crate) mod ep_offline;
15pub use ep_offline::EndpointOffline;
16pub(crate) mod ep_interest;
17pub use ep_interest::EndpointInterest;
18pub(crate) mod set_state;
19pub use set_state::*;
20pub(crate) mod load_topic;
21pub use load_topic::LoadTopic;
22pub(crate) mod unload_topic;
23pub use unload_topic::UnloadTopic;
24pub(crate) mod delegate_message;
25pub use delegate_message::DelegateMessage;
26pub(crate) mod ack_finished;
27pub use ack_finished::AckFinished;
28
29#[derive(Debug, Clone, Serialize, Deserialize)]
30#[repr(u8)]
31pub enum Proposal {
32    /// Hold Message: edge node ask cluster node to hold a message.
33    DelegateMessage(DelegateMessage),
34    /// Set State: set ack state
35    SetState(SetState),
36    /// Load Queue: load messages into the topic queue.
37    LoadTopic(LoadTopic),
38    UnloadTopic(UnloadTopic),
39    /// Ep Online: report endpoint online.
40    EpOnline(EndpointOnline),
41    /// Ep Offline: report endpoint offline.
42    EpOffline(EndpointOffline),
43    /// Ep Interest: set endpoint's interests.
44    EpInterest(EndpointInterest),
45    /// Ack Finished: ack finished.
46    AckFinished(AckFinished),
47}
48#[derive(Debug, Clone)]
49pub(crate) struct ProposalContext {
50    pub node: Node,
51    pub topic_code: Option<TopicCode>,
52    pub debug_ep_online: bool,
53}
54
55impl ProposalContext {
56    pub(crate) fn new(node: Node) -> Self {
57        Self {
58            node,
59            topic_code: None,
60            debug_ep_online: false,
61        }
62    }
63    pub(crate) fn push_durable_command(&mut self, command: DurableCommand) {
64        if let Some(topic_code) = self.topic_code.clone() {
65            self.node.push_durable_commands(Some((topic_code, command)));
66        }
67    }
68    pub(crate) fn set_topic_code(&mut self, code: TopicCode) {
69        self.topic_code = Some(code);
70    }
71    pub(crate) fn resolve_ack(&self, message_id: MessageId, result: WaitAckResult) {
72        let node = self.node.clone();
73        let node_id = self.node.id();
74        let Some(topic) = self.topic_code.clone() else {
75            tracing::warn!(?self.topic_code, "topic not found");
76            return;
77        };
78        tokio::spawn(async move {
79            let mut wg = node.ack_waiting_pool.write().await;
80            if let Some(mut tx) = wg.remove(&message_id) {
81                drop(wg);
82                tx.send(result);
83                let proposal_result = node
84                    .propose(Proposal::AckFinished(AckFinished { message_id, topic }))
85                    .await;
86                if let Err(e) = proposal_result {
87                    tracing::warn!(error=%e, "ack finished proposal failed");
88                }
89            } else {
90                tracing::trace!(%node_id, %message_id, "ack wait handle not found");
91            }
92        });
93    }
94    #[tracing::instrument(skip(self))]
95    // return true if the message is reachable
96    pub(crate) fn dispatch_message(&self, message: &Message, endpoint: EndpointAddr) -> bool {
97        let Some(ref code) = self.topic_code else {
98            // topic code is not set
99            tracing::warn!("topic code is not set");
100            return false;
101        };
102
103        let message = message.clone();
104        let code = code.clone();
105        let node = self.node.clone();
106        let message_id = message.id();
107        let status: MessageStatusKind;
108        if let Some(node_id) = node.blocking_get_edge_routing(&endpoint) {
109            tracing::debug!(%message_id, %node_id, ?endpoint, "dispatch message to edge");
110            if let Some(edge) = node.get_edge_connection(node_id) {
111                let result = edge.push_message(&endpoint, message);
112                status = if let Err(err) = result {
113                    tracing::warn!(?err, "push message failed");
114                    MessageStatusKind::Unreachable
115                } else {
116                    MessageStatusKind::Sent
117                };
118            } else {
119                tracing::warn!(%message_id, %node_id, "edge connection not found when trying to dispatch message");
120                status = MessageStatusKind::Unreachable;
121            };
122        } else {
123            tracing::warn!(%message_id, "edge routing not found when trying to dispatch message");
124            status = MessageStatusKind::Unreachable;
125        }
126        tokio::spawn(async move {
127            let proposal_result = node
128                .propose(Proposal::SetState(SetState {
129                    topic: code,
130                    update: MessageStateUpdate::new(
131                        message_id,
132                        HashMap::from([(endpoint, status)]),
133                    ),
134                }))
135                .await;
136            if let Err(err) = proposal_result {
137                tracing::error!(?err, "set state failed");
138            }
139            Some(())
140        });
141        return MessageStatusKind::Sent == status;
142    }
143}
144
145impl Node {
146    pub(self) fn push_durable_commands(
147        &self,
148        commands: impl IntoIterator<Item = (TopicCode, DurableCommand)>,
149    ) {
150        if let Some(service) = &self.durable_commit_service {
151            service
152                .durable_commands_queue
153                .write()
154                .unwrap()
155                .extend(commands);
156        }
157    }
158}