raft/
read_only.rs

1// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
2
3// Copyright 2016 The etcd Authors
4//
5// Licensed under the Apache License, Version 2.0 (the "License");
6// you may not use this file except in compliance with the License.
7// You may obtain a copy of the License at
8//
9//     http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16
17use std::collections::VecDeque;
18
19use slog::Logger;
20
21use crate::eraftpb::Message;
22use crate::{HashMap, HashSet};
23
24/// Determines the relative safety of and consistency of read only requests.
25#[derive(Debug, PartialEq, Eq, Clone, Copy)]
26pub enum ReadOnlyOption {
27    /// Safe guarantees the linearizability of the read only request by
28    /// communicating with the quorum. It is the default and suggested option.
29    Safe,
30    /// LeaseBased ensures linearizability of the read only request by
31    /// relying on the leader lease. It can be affected by clock drift.
32    /// If the clock drift is unbounded, leader might keep the lease longer than it
33    /// should (clock can move backward/pause without any bound). ReadIndex is not safe
34    /// in that case.
35    LeaseBased,
36}
37
38impl Default for ReadOnlyOption {
39    fn default() -> ReadOnlyOption {
40        ReadOnlyOption::Safe
41    }
42}
43
44/// ReadState provides state for read only query.
45/// It's caller's responsibility to send MsgReadIndex first before getting
46/// this state from ready. It's also caller's duty to differentiate if this
47/// state is what it requests through request_ctx, e.g. given a unique id as
48/// request_ctx.
49#[derive(Default, Debug, PartialEq, Eq, Clone)]
50pub struct ReadState {
51    /// The index of the read state.
52    pub index: u64,
53    /// A datagram consisting of context about the request.
54    pub request_ctx: Vec<u8>,
55}
56
57#[derive(Default, Debug, Clone)]
58pub struct ReadIndexStatus {
59    pub req: Message,
60    pub index: u64,
61    pub acks: HashSet<u64>,
62}
63
64#[derive(Default, Debug, Clone)]
65pub struct ReadOnly {
66    pub option: ReadOnlyOption,
67    pub pending_read_index: HashMap<Vec<u8>, ReadIndexStatus>,
68    pub read_index_queue: VecDeque<Vec<u8>>,
69}
70
71impl ReadOnly {
72    pub fn new(option: ReadOnlyOption) -> ReadOnly {
73        ReadOnly {
74            option,
75            pending_read_index: HashMap::default(),
76            read_index_queue: VecDeque::new(),
77        }
78    }
79
80    /// Adds a read only request into readonly struct.
81    ///
82    /// `index` is the commit index of the raft state machine when it received
83    /// the read only request.
84    ///
85    /// `m` is the original read only request message from the local or remote node.
86    pub fn add_request(&mut self, index: u64, req: Message, self_id: u64) {
87        let ctx = {
88            let key: &[u8] = req.entries[0].data.as_ref();
89            if self.pending_read_index.contains_key(key) {
90                return;
91            }
92            key.to_vec()
93        };
94        let mut acks = HashSet::<u64>::default();
95        acks.insert(self_id);
96        let status = ReadIndexStatus { req, index, acks };
97        self.pending_read_index.insert(ctx.clone(), status);
98        self.read_index_queue.push_back(ctx);
99    }
100
101    /// Notifies the ReadOnly struct that the raft state machine received
102    /// an acknowledgment of the heartbeat that attached with the read only request
103    /// context.
104    pub fn recv_ack(&mut self, id: u64, ctx: &[u8]) -> Option<&HashSet<u64>> {
105        self.pending_read_index.get_mut(ctx).map(|rs| {
106            rs.acks.insert(id);
107            &rs.acks
108        })
109    }
110
111    /// Advances the read only request queue kept by the ReadOnly struct.
112    /// It dequeues the requests until it finds the read only request that has
113    /// the same context as the given `ctx`.
114    pub fn advance(&mut self, ctx: &[u8], logger: &Logger) -> Vec<ReadIndexStatus> {
115        let mut rss = vec![];
116        if let Some(i) = self.read_index_queue.iter().position(|x| {
117            if !self.pending_read_index.contains_key(x) {
118                fatal!(logger, "cannot find correspond read state from pending map");
119            }
120            *x == ctx
121        }) {
122            for _ in 0..=i {
123                let rs = self.read_index_queue.pop_front().unwrap();
124                let status = self.pending_read_index.remove(&rs).unwrap();
125                rss.push(status);
126            }
127        }
128        rss
129    }
130
131    /// Returns the context of the last pending read only request in ReadOnly struct.
132    pub fn last_pending_request_ctx(&self) -> Option<Vec<u8>> {
133        self.read_index_queue.back().cloned()
134    }
135
136    #[inline]
137    pub fn pending_read_count(&self) -> usize {
138        self.read_index_queue.len()
139    }
140}