asteroid_mq/protocol/node/
durable_commit_service.rs1use std::{collections::VecDeque, sync::Arc};
2
3use asteroid_mq_model::TopicCode;
4use chrono::Utc;
5use tokio_util::sync::CancellationToken;
6
7use crate::prelude::{DurableMessage, DurableService};
8
9use super::{durable_message::DurableCommand, raft::MaybeLoadingRaft};
10#[derive(Debug, Clone)]
11pub struct DurableCommitService {
12 ct: CancellationToken,
13 pub(crate) durable_commands_queue:
14 Arc<std::sync::RwLock<VecDeque<(TopicCode, DurableCommand)>>>,
15 raft: MaybeLoadingRaft,
16 service: DurableService,
17}
18
19impl DurableCommitService {
20 pub fn new(
21 raft: MaybeLoadingRaft,
22 durable_service: DurableService,
23 ct: CancellationToken,
24 ) -> Self {
25 Self {
26 ct,
27 durable_commands_queue: Arc::new(std::sync::RwLock::new(VecDeque::new())),
28 raft,
29 service: durable_service,
30 }
31 }
32 pub fn spawn(self) {
33 tokio::spawn(self.run());
34 }
35 pub async fn run(self) -> crate::Result<()> {
36 let mut commands = VecDeque::new();
37 loop {
38 if self.ct.is_cancelled() {
39 break;
40 }
41 let is_empty = { self.durable_commands_queue.read().unwrap().is_empty() };
42 if is_empty {
43 tokio::task::yield_now().await;
44 }
45
46 {
48 let mut outer_commands = self.durable_commands_queue.write().unwrap();
49 if outer_commands.is_empty() {
50 continue;
51 }
52 std::mem::swap(&mut commands, &mut outer_commands);
53 }
54 if self.raft.get().await.ensure_linearizable().await.is_err() {
55 tracing::trace!("raft not leader, skip durable commands");
56 continue;
57 } else {
58 tracing::trace!(?commands, "raft is leader, commit durable commands");
59 };
60 for (topic_code, command) in commands.drain(..) {
61 let topic = topic_code.clone();
62 let result = match command {
63 DurableCommand::Create(command) => {
64 self.service
65 .save(
66 topic,
67 DurableMessage {
68 message: command,
69 status: Default::default(),
70 time: Utc::now(),
71 },
72 )
73 .await
74 }
75 DurableCommand::UpdateStatus(command) => {
76 if !command.status.is_empty() {
77 self.service.update_status(topic, command).await
78 } else {
79 Ok(())
80 }
81 }
82 DurableCommand::Archive(command) => self.service.archive(topic, command).await,
83 };
84 if let Err(err) = result {
85 tracing::error!(?err, "durable command failed");
86 }
87 }
88 }
89 Ok(())
90 }
91}