openraft/replication/
request.rs

1use std::fmt;
2
3use crate::type_config::alias::LogIdOf;
4
5/// A replication request sent by RaftCore leader state to replication stream.
6#[derive(Debug)]
7pub(crate) enum Replicate<C>
8where C: RaftTypeConfig
9{
10    /// Inform replication stream to forward the committed log id to followers/learners.
11    Committed(Option<LogId<C::NodeId>>),
12
13    /// Send an empty AppendEntries RPC as heartbeat.
14    Heartbeat,
15
16    /// Send a chunk of data, e.g., logs or snapshot.
17    Data(Data<C>),
18}
19
20impl<C> Replicate<C>
21where C: RaftTypeConfig
22{
23    pub(crate) fn logs(id: RequestId, log_id_range: LogIdRange<C::NodeId>) -> Self {
24        Self::Data(Data::new_logs(id, log_id_range))
25    }
26
27    pub(crate) fn snapshot(id: RequestId, last_log_id: Option<LogIdOf<C>>) -> Self {
28        Self::Data(Data::new_snapshot(id, last_log_id))
29    }
30
31    pub(crate) fn new_data(data: Data<C>) -> Self {
32        Self::Data(data)
33    }
34}
35
36impl<C: RaftTypeConfig> fmt::Display for Replicate<C> {
37    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
38        match self {
39            Self::Committed(c) => write!(f, "Committed({})", c.display()),
40            Self::Heartbeat => write!(f, "Heartbeat"),
41            Self::Data(d) => write!(f, "Data({})", d),
42        }
43    }
44}
45
46impl<C> MessageSummary<Replicate<C>> for Replicate<C>
47where C: RaftTypeConfig
48{
49    fn summary(&self) -> String {
50        self.to_string()
51    }
52}
53
54use crate::display_ext::DisplayOptionExt;
55use crate::error::StreamingError;
56use crate::log_id_range::LogIdRange;
57use crate::raft::SnapshotResponse;
58use crate::replication::callbacks::SnapshotCallback;
59use crate::replication::request_id::RequestId;
60use crate::type_config::alias::InstantOf;
61use crate::LogId;
62use crate::MessageSummary;
63use crate::RaftTypeConfig;
64use crate::SnapshotMeta;
65
66/// Request to replicate a chunk of data, logs or snapshot.
67///
68/// It defines what data to send to a follower/learner and an id to identify who is sending this
69/// data.
70/// Thd data is either a series of logs or a snapshot.
71pub(crate) enum Data<C>
72where C: RaftTypeConfig
73{
74    Heartbeat,
75    Logs(DataWithId<LogIdRange<C::NodeId>>),
76    Snapshot(DataWithId<Option<LogIdOf<C>>>),
77    SnapshotCallback(DataWithId<SnapshotCallback<C>>),
78}
79
80impl<C> fmt::Debug for Data<C>
81where C: RaftTypeConfig
82{
83    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
84        match self {
85            Data::Heartbeat => {
86                write!(f, "Data::Heartbeat")
87            }
88            Self::Logs(l) => f
89                .debug_struct("Data::Logs")
90                .field("request_id", &l.request_id())
91                .field("log_id_range", &l.data)
92                .finish(),
93            Self::Snapshot(s) => f.debug_struct("Data::Snapshot").field("request_id", &s.request_id()).finish(),
94            Self::SnapshotCallback(resp) => f
95                .debug_struct("Data::SnapshotCallback")
96                .field("request_id", &resp.request_id())
97                .field("callback", &resp.data)
98                .finish(),
99        }
100    }
101}
102
103impl<C: RaftTypeConfig> fmt::Display for Data<C> {
104    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
105        match self {
106            Data::Heartbeat => {
107                write!(f, "Heartbeat")
108            }
109            Self::Logs(l) => {
110                write!(f, "Logs{{request_id: {}, log_id_range: {}}}", l.request_id, l.data)
111            }
112            Self::Snapshot(s) => {
113                write!(f, "Snapshot{{request_id: {}}}", s.request_id)
114            }
115            Self::SnapshotCallback(l) => {
116                write!(
117                    f,
118                    "SnapshotCallback{{request_id: {}, callback: {}}}",
119                    l.request_id, l.data
120                )
121            }
122        }
123    }
124}
125
126impl<C> MessageSummary<Data<C>> for Data<C>
127where C: RaftTypeConfig
128{
129    fn summary(&self) -> String {
130        self.to_string()
131    }
132}
133
134impl<C> Data<C>
135where C: RaftTypeConfig
136{
137    pub(crate) fn new_heartbeat() -> Self {
138        Self::Heartbeat
139    }
140
141    pub(crate) fn new_logs(request_id: RequestId, log_id_range: LogIdRange<C::NodeId>) -> Self {
142        Self::Logs(DataWithId::new(request_id, log_id_range))
143    }
144
145    pub(crate) fn new_snapshot(request_id: RequestId, last_log_id: Option<LogIdOf<C>>) -> Self {
146        Self::Snapshot(DataWithId::new(request_id, last_log_id))
147    }
148
149    pub(crate) fn new_snapshot_callback(
150        request_id: RequestId,
151        start_time: InstantOf<C>,
152        snapshot_meta: SnapshotMeta<C::NodeId, C::Node>,
153        result: Result<SnapshotResponse<C::NodeId>, StreamingError<C>>,
154    ) -> Self {
155        Self::SnapshotCallback(DataWithId::new(
156            request_id,
157            SnapshotCallback::new(start_time, snapshot_meta, result),
158        ))
159    }
160
161    pub(crate) fn request_id(&self) -> RequestId {
162        match self {
163            Self::Heartbeat => RequestId::new_heartbeat(),
164            Self::Logs(l) => l.request_id(),
165            Self::Snapshot(s) => s.request_id(),
166            Self::SnapshotCallback(r) => r.request_id(),
167        }
168    }
169
170    /// Return true if the data includes any payload, i.e., not a heartbeat.
171    pub(crate) fn has_payload(&self) -> bool {
172        match self {
173            Self::Heartbeat => false,
174            Self::Logs(_) => true,
175            Self::Snapshot(_) => true,
176            Self::SnapshotCallback(_) => true,
177        }
178    }
179}
180
181#[derive(Clone)]
182pub(crate) struct DataWithId<T> {
183    /// The id of this replication request.
184    request_id: RequestId,
185    data: T,
186}
187
188impl<T> DataWithId<T> {
189    pub(crate) fn new(request_id: RequestId, data: T) -> Self {
190        Self { request_id, data }
191    }
192
193    pub(crate) fn request_id(&self) -> RequestId {
194        self.request_id
195    }
196
197    pub(crate) fn data(&self) -> &T {
198        &self.data
199    }
200
201    pub(crate) fn into_data(self) -> T {
202        self.data
203    }
204}