Skip to main content

rnacos/sequence/
core.rs

1use crate::common::byte_utils::{bin_to_id_result, id_to_bin};
2use crate::common::constant::SEQUENCE_TREE_NAME;
3use crate::raft::filestore::model::SnapshotRecordDto;
4use crate::raft::filestore::raftapply::{RaftApplyDataRequest, RaftApplyDataResponse};
5use crate::raft::filestore::raftsnapshot::{SnapshotWriterActor, SnapshotWriterRequest};
6use crate::sequence::model::{SequenceRaftReq, SequenceRaftResult};
7use actix::prelude::*;
8use std::collections::HashMap;
9use std::sync::Arc;
10
11#[derive(Clone, Debug)]
12pub struct SequenceDbManager {
13    /// value为下一次可用id
14    pub(crate) seq_map: HashMap<Arc<String>, u64>,
15    init: bool,
16}
17
18impl SequenceDbManager {
19    pub fn new() -> Self {
20        Self {
21            seq_map: HashMap::new(),
22            init: false,
23        }
24    }
25
26    pub fn next_id(&mut self, key: Arc<String>) -> u64 {
27        if let Some(id) = self.seq_map.get_mut(&key) {
28            let old = *id;
29            *id += 1;
30            old
31        } else {
32            self.seq_map.insert(key.clone(), 1 + 1);
33            1
34        }
35    }
36
37    pub fn next_range(&mut self, key: Arc<String>, step: u64) -> anyhow::Result<u64> {
38        if let Some(id) = self.seq_map.get_mut(&key) {
39            let old = *id;
40            *id += step;
41            Ok(old)
42        } else {
43            self.seq_map.insert(key.clone(), step + 1);
44            Ok(1)
45        }
46    }
47
48    fn build_snapshot(&self, writer: Addr<SnapshotWriterActor>) -> anyhow::Result<()> {
49        for (key, value) in &self.seq_map {
50            let record = SnapshotRecordDto {
51                tree: SEQUENCE_TREE_NAME.clone(),
52                key: key.as_bytes().to_vec(),
53                value: id_to_bin(*value),
54                op_type: 0,
55            };
56            writer.do_send(SnapshotWriterRequest::Record(record));
57        }
58        Ok(())
59    }
60
61    fn load_snapshot_record(&mut self, record: SnapshotRecordDto) -> anyhow::Result<()> {
62        let value = bin_to_id_result(&record.value)?;
63        self.seq_map
64            .insert(Arc::new(String::from_utf8(record.key)?), value);
65        Ok(())
66    }
67
68    fn load_completed(&mut self, _ctx: &mut Context<Self>) -> anyhow::Result<()> {
69        self.init = true;
70        Ok(())
71    }
72}
73
74impl Actor for SequenceDbManager {
75    type Context = Context<Self>;
76    fn started(&mut self, _ctx: &mut Self::Context) {
77        log::info!("SequenceDbManager started")
78    }
79}
80
81impl Handler<SequenceRaftReq> for SequenceDbManager {
82    type Result = anyhow::Result<SequenceRaftResult>;
83
84    fn handle(&mut self, msg: SequenceRaftReq, _ctx: &mut Self::Context) -> Self::Result {
85        match msg {
86            SequenceRaftReq::NextId(key) => {
87                let id = self.next_id(key);
88                Ok(SequenceRaftResult::NextId(id))
89            }
90            SequenceRaftReq::NextRange(key, step) => {
91                let start = self.next_range(key, step)?;
92                Ok(SequenceRaftResult::NextRange { start, len: step })
93            }
94            SequenceRaftReq::SetId(key, id) => {
95                self.seq_map.insert(key, id);
96                Ok(SequenceRaftResult::None)
97            }
98            SequenceRaftReq::RemoveId(key) => {
99                self.seq_map.remove(&key);
100                Ok(SequenceRaftResult::None)
101            }
102        }
103    }
104}
105
106impl Handler<RaftApplyDataRequest> for SequenceDbManager {
107    type Result = anyhow::Result<RaftApplyDataResponse>;
108
109    fn handle(&mut self, msg: RaftApplyDataRequest, ctx: &mut Self::Context) -> Self::Result {
110        match msg {
111            RaftApplyDataRequest::BuildSnapshot(writer) => {
112                self.build_snapshot(writer)?;
113            }
114            RaftApplyDataRequest::LoadSnapshotRecord(record) => {
115                self.load_snapshot_record(record)?;
116            }
117            RaftApplyDataRequest::LoadCompleted => {
118                self.load_completed(ctx)?;
119            }
120        }
121        Ok(RaftApplyDataResponse::None)
122    }
123}