Skip to main content

asteroid_mq/protocol/node/
durable_commit_service.rs

1use std::{collections::VecDeque, sync::Arc};
2
3use asteroid_mq_model::TopicCode;
4use chrono::Utc;
5use tokio_util::sync::CancellationToken;
6
7use crate::prelude::{DurableMessage, DurableService};
8
9use super::{durable_message::DurableCommand, raft::MaybeLoadingRaft};
10#[derive(Debug, Clone)]
11pub struct DurableCommitService {
12    ct: CancellationToken,
13    pub(crate) durable_commands_queue:
14        Arc<std::sync::RwLock<VecDeque<(TopicCode, DurableCommand)>>>,
15    raft: MaybeLoadingRaft,
16    service: DurableService,
17}
18
19impl DurableCommitService {
20    pub fn new(
21        raft: MaybeLoadingRaft,
22        durable_service: DurableService,
23        ct: CancellationToken,
24    ) -> Self {
25        Self {
26            ct,
27            durable_commands_queue: Arc::new(std::sync::RwLock::new(VecDeque::new())),
28            raft,
29            service: durable_service,
30        }
31    }
32    pub fn spawn(self) {
33        tokio::spawn(self.run());
34    }
35    pub async fn run(self) -> crate::Result<()> {
36        let mut commands = VecDeque::new();
37        loop {
38            if self.ct.is_cancelled() {
39                break;
40            }
41            let is_empty = { self.durable_commands_queue.read().unwrap().is_empty() };
42            if is_empty {
43                tokio::task::yield_now().await;
44            }
45
46            // only one execute task at a time for each topic
47            {
48                let mut outer_commands = self.durable_commands_queue.write().unwrap();
49                if outer_commands.is_empty() {
50                    continue;
51                }
52                std::mem::swap(&mut commands, &mut outer_commands);
53            }
54            if self.raft.get().await.ensure_linearizable().await.is_err() {
55                tracing::trace!("raft not leader, skip durable commands");
56                continue;
57            } else {
58                tracing::trace!(?commands, "raft is leader, commit durable commands");
59            };
60            for (topic_code, command) in commands.drain(..) {
61                let topic = topic_code.clone();
62                let result = match command {
63                    DurableCommand::Create(command) => {
64                        self.service
65                            .save(
66                                topic,
67                                DurableMessage {
68                                    message: command,
69                                    status: Default::default(),
70                                    time: Utc::now(),
71                                },
72                            )
73                            .await
74                    }
75                    DurableCommand::UpdateStatus(command) => {
76                        if !command.status.is_empty() {
77                            self.service.update_status(topic, command).await
78                        } else {
79                            Ok(())
80                        }
81                    }
82                    DurableCommand::Archive(command) => self.service.archive(topic, command).await,
83                };
84                if let Err(err) = result {
85                    tracing::error!(?err, "durable command failed");
86                }
87            }
88        }
89        Ok(())
90    }
91}