1use crate::common::byte_utils::{bin_to_id_result, id_to_bin};
2use crate::common::constant::SEQUENCE_TREE_NAME;
3use crate::raft::filestore::model::SnapshotRecordDto;
4use crate::raft::filestore::raftapply::{RaftApplyDataRequest, RaftApplyDataResponse};
5use crate::raft::filestore::raftsnapshot::{SnapshotWriterActor, SnapshotWriterRequest};
6use crate::sequence::model::{SequenceRaftReq, SequenceRaftResult};
7use actix::prelude::*;
8use std::collections::HashMap;
9use std::sync::Arc;
10
11#[derive(Clone, Debug)]
12pub struct SequenceDbManager {
13 pub(crate) seq_map: HashMap<Arc<String>, u64>,
15 init: bool,
16}
17
18impl SequenceDbManager {
19 pub fn new() -> Self {
20 Self {
21 seq_map: HashMap::new(),
22 init: false,
23 }
24 }
25
26 pub fn next_id(&mut self, key: Arc<String>) -> u64 {
27 if let Some(id) = self.seq_map.get_mut(&key) {
28 let old = *id;
29 *id += 1;
30 old
31 } else {
32 self.seq_map.insert(key.clone(), 1 + 1);
33 1
34 }
35 }
36
37 pub fn next_range(&mut self, key: Arc<String>, step: u64) -> anyhow::Result<u64> {
38 if let Some(id) = self.seq_map.get_mut(&key) {
39 let old = *id;
40 *id += step;
41 Ok(old)
42 } else {
43 self.seq_map.insert(key.clone(), step + 1);
44 Ok(1)
45 }
46 }
47
48 fn build_snapshot(&self, writer: Addr<SnapshotWriterActor>) -> anyhow::Result<()> {
49 for (key, value) in &self.seq_map {
50 let record = SnapshotRecordDto {
51 tree: SEQUENCE_TREE_NAME.clone(),
52 key: key.as_bytes().to_vec(),
53 value: id_to_bin(*value),
54 op_type: 0,
55 };
56 writer.do_send(SnapshotWriterRequest::Record(record));
57 }
58 Ok(())
59 }
60
61 fn load_snapshot_record(&mut self, record: SnapshotRecordDto) -> anyhow::Result<()> {
62 let value = bin_to_id_result(&record.value)?;
63 self.seq_map
64 .insert(Arc::new(String::from_utf8(record.key)?), value);
65 Ok(())
66 }
67
68 fn load_completed(&mut self, _ctx: &mut Context<Self>) -> anyhow::Result<()> {
69 self.init = true;
70 Ok(())
71 }
72}
73
74impl Actor for SequenceDbManager {
75 type Context = Context<Self>;
76 fn started(&mut self, _ctx: &mut Self::Context) {
77 log::info!("SequenceDbManager started")
78 }
79}
80
81impl Handler<SequenceRaftReq> for SequenceDbManager {
82 type Result = anyhow::Result<SequenceRaftResult>;
83
84 fn handle(&mut self, msg: SequenceRaftReq, _ctx: &mut Self::Context) -> Self::Result {
85 match msg {
86 SequenceRaftReq::NextId(key) => {
87 let id = self.next_id(key);
88 Ok(SequenceRaftResult::NextId(id))
89 }
90 SequenceRaftReq::NextRange(key, step) => {
91 let start = self.next_range(key, step)?;
92 Ok(SequenceRaftResult::NextRange { start, len: step })
93 }
94 SequenceRaftReq::SetId(key, id) => {
95 self.seq_map.insert(key, id);
96 Ok(SequenceRaftResult::None)
97 }
98 SequenceRaftReq::RemoveId(key) => {
99 self.seq_map.remove(&key);
100 Ok(SequenceRaftResult::None)
101 }
102 }
103 }
104}
105
106impl Handler<RaftApplyDataRequest> for SequenceDbManager {
107 type Result = anyhow::Result<RaftApplyDataResponse>;
108
109 fn handle(&mut self, msg: RaftApplyDataRequest, ctx: &mut Self::Context) -> Self::Result {
110 match msg {
111 RaftApplyDataRequest::BuildSnapshot(writer) => {
112 self.build_snapshot(writer)?;
113 }
114 RaftApplyDataRequest::LoadSnapshotRecord(record) => {
115 self.load_snapshot_record(record)?;
116 }
117 RaftApplyDataRequest::LoadCompleted => {
118 self.load_completed(ctx)?;
119 }
120 }
121 Ok(RaftApplyDataResponse::None)
122 }
123}