openraft/replication/
request.rs1use std::fmt;
2
3use crate::type_config::alias::LogIdOf;
4
5#[derive(Debug)]
7pub(crate) enum Replicate<C>
8where C: RaftTypeConfig
9{
10 Committed(Option<LogId<C::NodeId>>),
12
13 Heartbeat,
15
16 Data(Data<C>),
18}
19
20impl<C> Replicate<C>
21where C: RaftTypeConfig
22{
23 pub(crate) fn logs(id: RequestId, log_id_range: LogIdRange<C::NodeId>) -> Self {
24 Self::Data(Data::new_logs(id, log_id_range))
25 }
26
27 pub(crate) fn snapshot(id: RequestId, last_log_id: Option<LogIdOf<C>>) -> Self {
28 Self::Data(Data::new_snapshot(id, last_log_id))
29 }
30
31 pub(crate) fn new_data(data: Data<C>) -> Self {
32 Self::Data(data)
33 }
34}
35
36impl<C: RaftTypeConfig> fmt::Display for Replicate<C> {
37 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
38 match self {
39 Self::Committed(c) => write!(f, "Committed({})", c.display()),
40 Self::Heartbeat => write!(f, "Heartbeat"),
41 Self::Data(d) => write!(f, "Data({})", d),
42 }
43 }
44}
45
46impl<C> MessageSummary<Replicate<C>> for Replicate<C>
47where C: RaftTypeConfig
48{
49 fn summary(&self) -> String {
50 self.to_string()
51 }
52}
53
54use crate::display_ext::DisplayOptionExt;
55use crate::error::StreamingError;
56use crate::log_id_range::LogIdRange;
57use crate::raft::SnapshotResponse;
58use crate::replication::callbacks::SnapshotCallback;
59use crate::replication::request_id::RequestId;
60use crate::type_config::alias::InstantOf;
61use crate::LogId;
62use crate::MessageSummary;
63use crate::RaftTypeConfig;
64use crate::SnapshotMeta;
65
66pub(crate) enum Data<C>
72where C: RaftTypeConfig
73{
74 Heartbeat,
75 Logs(DataWithId<LogIdRange<C::NodeId>>),
76 Snapshot(DataWithId<Option<LogIdOf<C>>>),
77 SnapshotCallback(DataWithId<SnapshotCallback<C>>),
78}
79
80impl<C> fmt::Debug for Data<C>
81where C: RaftTypeConfig
82{
83 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
84 match self {
85 Data::Heartbeat => {
86 write!(f, "Data::Heartbeat")
87 }
88 Self::Logs(l) => f
89 .debug_struct("Data::Logs")
90 .field("request_id", &l.request_id())
91 .field("log_id_range", &l.data)
92 .finish(),
93 Self::Snapshot(s) => f.debug_struct("Data::Snapshot").field("request_id", &s.request_id()).finish(),
94 Self::SnapshotCallback(resp) => f
95 .debug_struct("Data::SnapshotCallback")
96 .field("request_id", &resp.request_id())
97 .field("callback", &resp.data)
98 .finish(),
99 }
100 }
101}
102
103impl<C: RaftTypeConfig> fmt::Display for Data<C> {
104 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
105 match self {
106 Data::Heartbeat => {
107 write!(f, "Heartbeat")
108 }
109 Self::Logs(l) => {
110 write!(f, "Logs{{request_id: {}, log_id_range: {}}}", l.request_id, l.data)
111 }
112 Self::Snapshot(s) => {
113 write!(f, "Snapshot{{request_id: {}}}", s.request_id)
114 }
115 Self::SnapshotCallback(l) => {
116 write!(
117 f,
118 "SnapshotCallback{{request_id: {}, callback: {}}}",
119 l.request_id, l.data
120 )
121 }
122 }
123 }
124}
125
126impl<C> MessageSummary<Data<C>> for Data<C>
127where C: RaftTypeConfig
128{
129 fn summary(&self) -> String {
130 self.to_string()
131 }
132}
133
134impl<C> Data<C>
135where C: RaftTypeConfig
136{
137 pub(crate) fn new_heartbeat() -> Self {
138 Self::Heartbeat
139 }
140
141 pub(crate) fn new_logs(request_id: RequestId, log_id_range: LogIdRange<C::NodeId>) -> Self {
142 Self::Logs(DataWithId::new(request_id, log_id_range))
143 }
144
145 pub(crate) fn new_snapshot(request_id: RequestId, last_log_id: Option<LogIdOf<C>>) -> Self {
146 Self::Snapshot(DataWithId::new(request_id, last_log_id))
147 }
148
149 pub(crate) fn new_snapshot_callback(
150 request_id: RequestId,
151 start_time: InstantOf<C>,
152 snapshot_meta: SnapshotMeta<C::NodeId, C::Node>,
153 result: Result<SnapshotResponse<C::NodeId>, StreamingError<C>>,
154 ) -> Self {
155 Self::SnapshotCallback(DataWithId::new(
156 request_id,
157 SnapshotCallback::new(start_time, snapshot_meta, result),
158 ))
159 }
160
161 pub(crate) fn request_id(&self) -> RequestId {
162 match self {
163 Self::Heartbeat => RequestId::new_heartbeat(),
164 Self::Logs(l) => l.request_id(),
165 Self::Snapshot(s) => s.request_id(),
166 Self::SnapshotCallback(r) => r.request_id(),
167 }
168 }
169
170 pub(crate) fn has_payload(&self) -> bool {
172 match self {
173 Self::Heartbeat => false,
174 Self::Logs(_) => true,
175 Self::Snapshot(_) => true,
176 Self::SnapshotCallback(_) => true,
177 }
178 }
179}
180
181#[derive(Clone)]
182pub(crate) struct DataWithId<T> {
183 request_id: RequestId,
185 data: T,
186}
187
188impl<T> DataWithId<T> {
189 pub(crate) fn new(request_id: RequestId, data: T) -> Self {
190 Self { request_id, data }
191 }
192
193 pub(crate) fn request_id(&self) -> RequestId {
194 self.request_id
195 }
196
197 pub(crate) fn data(&self) -> &T {
198 &self.data
199 }
200
201 pub(crate) fn into_data(self) -> T {
202 self.data
203 }
204}