raft/read_only.rs
1// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.
2
3// Copyright 2016 The etcd Authors
4//
5// Licensed under the Apache License, Version 2.0 (the "License");
6// you may not use this file except in compliance with the License.
7// You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16
17use std::collections::VecDeque;
18
19use slog::Logger;
20
21use crate::eraftpb::Message;
22use crate::{HashMap, HashSet};
23
24/// Determines the relative safety of and consistency of read only requests.
25#[derive(Debug, PartialEq, Eq, Clone, Copy)]
26pub enum ReadOnlyOption {
27 /// Safe guarantees the linearizability of the read only request by
28 /// communicating with the quorum. It is the default and suggested option.
29 Safe,
30 /// LeaseBased ensures linearizability of the read only request by
31 /// relying on the leader lease. It can be affected by clock drift.
32 /// If the clock drift is unbounded, leader might keep the lease longer than it
33 /// should (clock can move backward/pause without any bound). ReadIndex is not safe
34 /// in that case.
35 LeaseBased,
36}
37
38impl Default for ReadOnlyOption {
39 fn default() -> ReadOnlyOption {
40 ReadOnlyOption::Safe
41 }
42}
43
44/// ReadState provides state for read only query.
45/// It's caller's responsibility to send MsgReadIndex first before getting
46/// this state from ready. It's also caller's duty to differentiate if this
47/// state is what it requests through request_ctx, e.g. given a unique id as
48/// request_ctx.
49#[derive(Default, Debug, PartialEq, Eq, Clone)]
50pub struct ReadState {
51 /// The index of the read state.
52 pub index: u64,
53 /// A datagram consisting of context about the request.
54 pub request_ctx: Vec<u8>,
55}
56
57#[derive(Default, Debug, Clone)]
58pub struct ReadIndexStatus {
59 pub req: Message,
60 pub index: u64,
61 pub acks: HashSet<u64>,
62}
63
64#[derive(Default, Debug, Clone)]
65pub struct ReadOnly {
66 pub option: ReadOnlyOption,
67 pub pending_read_index: HashMap<Vec<u8>, ReadIndexStatus>,
68 pub read_index_queue: VecDeque<Vec<u8>>,
69}
70
71impl ReadOnly {
72 pub fn new(option: ReadOnlyOption) -> ReadOnly {
73 ReadOnly {
74 option,
75 pending_read_index: HashMap::default(),
76 read_index_queue: VecDeque::new(),
77 }
78 }
79
80 /// Adds a read only request into readonly struct.
81 ///
82 /// `index` is the commit index of the raft state machine when it received
83 /// the read only request.
84 ///
85 /// `m` is the original read only request message from the local or remote node.
86 pub fn add_request(&mut self, index: u64, req: Message, self_id: u64) {
87 let ctx = {
88 let key: &[u8] = req.entries[0].data.as_ref();
89 if self.pending_read_index.contains_key(key) {
90 return;
91 }
92 key.to_vec()
93 };
94 let mut acks = HashSet::<u64>::default();
95 acks.insert(self_id);
96 let status = ReadIndexStatus { req, index, acks };
97 self.pending_read_index.insert(ctx.clone(), status);
98 self.read_index_queue.push_back(ctx);
99 }
100
101 /// Notifies the ReadOnly struct that the raft state machine received
102 /// an acknowledgment of the heartbeat that attached with the read only request
103 /// context.
104 pub fn recv_ack(&mut self, id: u64, ctx: &[u8]) -> Option<&HashSet<u64>> {
105 self.pending_read_index.get_mut(ctx).map(|rs| {
106 rs.acks.insert(id);
107 &rs.acks
108 })
109 }
110
111 /// Advances the read only request queue kept by the ReadOnly struct.
112 /// It dequeues the requests until it finds the read only request that has
113 /// the same context as the given `ctx`.
114 pub fn advance(&mut self, ctx: &[u8], logger: &Logger) -> Vec<ReadIndexStatus> {
115 let mut rss = vec![];
116 if let Some(i) = self.read_index_queue.iter().position(|x| {
117 if !self.pending_read_index.contains_key(x) {
118 fatal!(logger, "cannot find correspond read state from pending map");
119 }
120 *x == ctx
121 }) {
122 for _ in 0..=i {
123 let rs = self.read_index_queue.pop_front().unwrap();
124 let status = self.pending_read_index.remove(&rs).unwrap();
125 rss.push(status);
126 }
127 }
128 rss
129 }
130
131 /// Returns the context of the last pending read only request in ReadOnly struct.
132 pub fn last_pending_request_ctx(&self) -> Option<Vec<u8>> {
133 self.read_index_queue.back().cloned()
134 }
135
136 #[inline]
137 pub fn pending_read_count(&self) -> usize {
138 self.read_index_queue.len()
139 }
140}