1#[derive(serde::Serialize, serde::Deserialize)]
3#[derive(Clone, PartialEq, ::prost::Message)]
4pub struct SnapshotChunk {
5 #[prost(uint64, tag = "1")]
7 pub leader_term: u64,
8 #[prost(uint32, tag = "2")]
10 pub leader_id: u32,
11 #[prost(uint32, tag = "3")]
13 pub seq: u32,
14 #[prost(uint32, tag = "4")]
16 pub total_chunks: u32,
17 #[prost(bytes = "vec", tag = "5")]
19 pub chunk_checksum: ::prost::alloc::vec::Vec<u8>,
20 #[prost(message, optional, tag = "6")]
22 pub metadata: ::core::option::Option<SnapshotMetadata>,
23 #[prost(bytes = "vec", tag = "7")]
25 pub data: ::prost::alloc::vec::Vec<u8>,
26}
27#[derive(serde::Serialize, serde::Deserialize)]
28#[derive(Clone, PartialEq, ::prost::Message)]
29pub struct SnapshotMetadata {
30 #[prost(message, optional, tag = "1")]
32 pub last_included: ::core::option::Option<super::common::LogId>,
33 #[prost(bytes = "vec", tag = "2")]
35 pub checksum: ::prost::alloc::vec::Vec<u8>,
36}
37#[derive(serde::Serialize, serde::Deserialize)]
38#[derive(Clone, Copy, PartialEq, ::prost::Message)]
39pub struct SnapshotResponse {
40 #[prost(uint64, tag = "1")]
42 pub term: u64,
43 #[prost(bool, tag = "2")]
45 pub success: bool,
46 #[prost(uint32, tag = "3")]
48 pub next_chunk: u32,
49}
50#[derive(serde::Serialize, serde::Deserialize)]
51#[derive(Clone, PartialEq, ::prost::Message)]
52pub struct PurgeLogRequest {
53 #[prost(uint64, tag = "1")]
54 pub term: u64,
55 #[prost(uint32, tag = "2")]
56 pub leader_id: u32,
57 #[prost(uint64, tag = "3")]
59 pub leader_commit: u64,
60 #[prost(message, optional, tag = "4")]
62 pub last_included: ::core::option::Option<super::common::LogId>,
63 #[prost(bytes = "vec", tag = "5")]
64 pub snapshot_checksum: ::prost::alloc::vec::Vec<u8>,
65}
66#[derive(serde::Serialize, serde::Deserialize)]
67#[derive(Clone, Copy, PartialEq, ::prost::Message)]
68pub struct PurgeLogResponse {
69 #[prost(uint32, tag = "1")]
70 pub node_id: u32,
71 #[prost(uint64, tag = "2")]
73 pub term: u64,
74 #[prost(bool, tag = "3")]
76 pub success: bool,
77 #[prost(message, optional, tag = "4")]
79 pub last_purged: ::core::option::Option<super::common::LogId>,
80}
81#[derive(serde::Serialize, serde::Deserialize)]
82#[derive(Clone, Copy, PartialEq, ::prost::Message)]
83pub struct SnapshotAck {
84 #[prost(uint32, tag = "1")]
86 pub seq: u32,
87 #[prost(enumeration = "snapshot_ack::ChunkStatus", tag = "2")]
89 pub status: i32,
90 #[prost(uint32, tag = "3")]
92 pub next_requested: u32,
93}
94pub mod snapshot_ack {
96 #[derive(serde::Serialize, serde::Deserialize)]
97 #[derive(
98 Clone,
99 Copy,
100 Debug,
101 PartialEq,
102 Eq,
103 Hash,
104 PartialOrd,
105 Ord,
106 ::prost::Enumeration
107 )]
108 #[repr(i32)]
109 pub enum ChunkStatus {
110 Accepted = 0,
111 ChecksumMismatch = 1,
112 OutOfOrder = 2,
113 Requested = 3,
114 Failed = 4,
115 }
116 impl ChunkStatus {
117 pub fn as_str_name(&self) -> &'static str {
122 match self {
123 Self::Accepted => "ACCEPTED",
124 Self::ChecksumMismatch => "CHECKSUM_MISMATCH",
125 Self::OutOfOrder => "OUT_OF_ORDER",
126 Self::Requested => "REQUESTED",
127 Self::Failed => "FAILED",
128 }
129 }
130 pub fn from_str_name(value: &str) -> ::core::option::Option<Self> {
132 match value {
133 "ACCEPTED" => Some(Self::Accepted),
134 "CHECKSUM_MISMATCH" => Some(Self::ChecksumMismatch),
135 "OUT_OF_ORDER" => Some(Self::OutOfOrder),
136 "REQUESTED" => Some(Self::Requested),
137 "FAILED" => Some(Self::Failed),
138 _ => None,
139 }
140 }
141 }
142}
143pub mod snapshot_service_client {
145 #![allow(
146 unused_variables,
147 dead_code,
148 missing_docs,
149 clippy::wildcard_imports,
150 clippy::let_unit_value,
151 )]
152 use tonic::codegen::*;
153 use tonic::codegen::http::Uri;
154 #[derive(Debug, Clone)]
155 pub struct SnapshotServiceClient<T> {
156 inner: tonic::client::Grpc<T>,
157 }
158 impl SnapshotServiceClient<tonic::transport::Channel> {
159 pub async fn connect<D>(dst: D) -> Result<Self, tonic::transport::Error>
161 where
162 D: TryInto<tonic::transport::Endpoint>,
163 D::Error: Into<StdError>,
164 {
165 let conn = tonic::transport::Endpoint::new(dst)?.connect().await?;
166 Ok(Self::new(conn))
167 }
168 }
169 impl<T> SnapshotServiceClient<T>
170 where
171 T: tonic::client::GrpcService<tonic::body::BoxBody>,
172 T::Error: Into<StdError>,
173 T::ResponseBody: Body<Data = Bytes> + std::marker::Send + 'static,
174 <T::ResponseBody as Body>::Error: Into<StdError> + std::marker::Send,
175 {
176 pub fn new(inner: T) -> Self {
177 let inner = tonic::client::Grpc::new(inner);
178 Self { inner }
179 }
180 pub fn with_origin(inner: T, origin: Uri) -> Self {
181 let inner = tonic::client::Grpc::with_origin(inner, origin);
182 Self { inner }
183 }
184 pub fn with_interceptor<F>(
185 inner: T,
186 interceptor: F,
187 ) -> SnapshotServiceClient<InterceptedService<T, F>>
188 where
189 F: tonic::service::Interceptor,
190 T::ResponseBody: Default,
191 T: tonic::codegen::Service<
192 http::Request<tonic::body::BoxBody>,
193 Response = http::Response<
194 <T as tonic::client::GrpcService<tonic::body::BoxBody>>::ResponseBody,
195 >,
196 >,
197 <T as tonic::codegen::Service<
198 http::Request<tonic::body::BoxBody>,
199 >>::Error: Into<StdError> + std::marker::Send + std::marker::Sync,
200 {
201 SnapshotServiceClient::new(InterceptedService::new(inner, interceptor))
202 }
203 #[must_use]
208 pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self {
209 self.inner = self.inner.send_compressed(encoding);
210 self
211 }
212 #[must_use]
214 pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self {
215 self.inner = self.inner.accept_compressed(encoding);
216 self
217 }
218 #[must_use]
222 pub fn max_decoding_message_size(mut self, limit: usize) -> Self {
223 self.inner = self.inner.max_decoding_message_size(limit);
224 self
225 }
226 #[must_use]
230 pub fn max_encoding_message_size(mut self, limit: usize) -> Self {
231 self.inner = self.inner.max_encoding_message_size(limit);
232 self
233 }
234 pub async fn install_snapshot(
236 &mut self,
237 request: impl tonic::IntoStreamingRequest<Message = super::SnapshotChunk>,
238 ) -> std::result::Result<
239 tonic::Response<super::SnapshotResponse>,
240 tonic::Status,
241 > {
242 self.inner
243 .ready()
244 .await
245 .map_err(|e| {
246 tonic::Status::unknown(
247 format!("Service was not ready: {}", e.into()),
248 )
249 })?;
250 let codec = tonic::codec::ProstCodec::default();
251 let path = http::uri::PathAndQuery::from_static(
252 "/raft.storage.SnapshotService/InstallSnapshot",
253 );
254 let mut req = request.into_streaming_request();
255 req.extensions_mut()
256 .insert(
257 GrpcMethod::new("raft.storage.SnapshotService", "InstallSnapshot"),
258 );
259 self.inner.client_streaming(req, path, codec).await
260 }
261 pub async fn stream_snapshot(
263 &mut self,
264 request: impl tonic::IntoStreamingRequest<Message = super::SnapshotAck>,
265 ) -> std::result::Result<
266 tonic::Response<tonic::codec::Streaming<super::SnapshotChunk>>,
267 tonic::Status,
268 > {
269 self.inner
270 .ready()
271 .await
272 .map_err(|e| {
273 tonic::Status::unknown(
274 format!("Service was not ready: {}", e.into()),
275 )
276 })?;
277 let codec = tonic::codec::ProstCodec::default();
278 let path = http::uri::PathAndQuery::from_static(
279 "/raft.storage.SnapshotService/StreamSnapshot",
280 );
281 let mut req = request.into_streaming_request();
282 req.extensions_mut()
283 .insert(
284 GrpcMethod::new("raft.storage.SnapshotService", "StreamSnapshot"),
285 );
286 self.inner.streaming(req, path, codec).await
287 }
288 pub async fn purge_log(
290 &mut self,
291 request: impl tonic::IntoRequest<super::PurgeLogRequest>,
292 ) -> std::result::Result<
293 tonic::Response<super::PurgeLogResponse>,
294 tonic::Status,
295 > {
296 self.inner
297 .ready()
298 .await
299 .map_err(|e| {
300 tonic::Status::unknown(
301 format!("Service was not ready: {}", e.into()),
302 )
303 })?;
304 let codec = tonic::codec::ProstCodec::default();
305 let path = http::uri::PathAndQuery::from_static(
306 "/raft.storage.SnapshotService/PurgeLog",
307 );
308 let mut req = request.into_request();
309 req.extensions_mut()
310 .insert(GrpcMethod::new("raft.storage.SnapshotService", "PurgeLog"));
311 self.inner.unary(req, path, codec).await
312 }
313 }
314}
315pub mod snapshot_service_server {
317 #![allow(
318 unused_variables,
319 dead_code,
320 missing_docs,
321 clippy::wildcard_imports,
322 clippy::let_unit_value,
323 )]
324 use tonic::codegen::*;
325 #[async_trait]
327 pub trait SnapshotService: std::marker::Send + std::marker::Sync + 'static {
328 async fn install_snapshot(
330 &self,
331 request: tonic::Request<tonic::Streaming<super::SnapshotChunk>>,
332 ) -> std::result::Result<
333 tonic::Response<super::SnapshotResponse>,
334 tonic::Status,
335 >;
336 type StreamSnapshotStream: tonic::codegen::tokio_stream::Stream<
338 Item = std::result::Result<super::SnapshotChunk, tonic::Status>,
339 >
340 + std::marker::Send
341 + 'static;
342 async fn stream_snapshot(
344 &self,
345 request: tonic::Request<tonic::Streaming<super::SnapshotAck>>,
346 ) -> std::result::Result<
347 tonic::Response<Self::StreamSnapshotStream>,
348 tonic::Status,
349 >;
350 async fn purge_log(
352 &self,
353 request: tonic::Request<super::PurgeLogRequest>,
354 ) -> std::result::Result<
355 tonic::Response<super::PurgeLogResponse>,
356 tonic::Status,
357 >;
358 }
359 #[derive(Debug)]
360 pub struct SnapshotServiceServer<T> {
361 inner: Arc<T>,
362 accept_compression_encodings: EnabledCompressionEncodings,
363 send_compression_encodings: EnabledCompressionEncodings,
364 max_decoding_message_size: Option<usize>,
365 max_encoding_message_size: Option<usize>,
366 }
367 impl<T> SnapshotServiceServer<T> {
368 pub fn new(inner: T) -> Self {
369 Self::from_arc(Arc::new(inner))
370 }
371 pub fn from_arc(inner: Arc<T>) -> Self {
372 Self {
373 inner,
374 accept_compression_encodings: Default::default(),
375 send_compression_encodings: Default::default(),
376 max_decoding_message_size: None,
377 max_encoding_message_size: None,
378 }
379 }
380 pub fn with_interceptor<F>(
381 inner: T,
382 interceptor: F,
383 ) -> InterceptedService<Self, F>
384 where
385 F: tonic::service::Interceptor,
386 {
387 InterceptedService::new(Self::new(inner), interceptor)
388 }
389 #[must_use]
391 pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self {
392 self.accept_compression_encodings.enable(encoding);
393 self
394 }
395 #[must_use]
397 pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self {
398 self.send_compression_encodings.enable(encoding);
399 self
400 }
401 #[must_use]
405 pub fn max_decoding_message_size(mut self, limit: usize) -> Self {
406 self.max_decoding_message_size = Some(limit);
407 self
408 }
409 #[must_use]
413 pub fn max_encoding_message_size(mut self, limit: usize) -> Self {
414 self.max_encoding_message_size = Some(limit);
415 self
416 }
417 }
418 impl<T, B> tonic::codegen::Service<http::Request<B>> for SnapshotServiceServer<T>
419 where
420 T: SnapshotService,
421 B: Body + std::marker::Send + 'static,
422 B::Error: Into<StdError> + std::marker::Send + 'static,
423 {
424 type Response = http::Response<tonic::body::BoxBody>;
425 type Error = std::convert::Infallible;
426 type Future = BoxFuture<Self::Response, Self::Error>;
427 fn poll_ready(
428 &mut self,
429 _cx: &mut Context<'_>,
430 ) -> Poll<std::result::Result<(), Self::Error>> {
431 Poll::Ready(Ok(()))
432 }
433 fn call(&mut self, req: http::Request<B>) -> Self::Future {
434 match req.uri().path() {
435 "/raft.storage.SnapshotService/InstallSnapshot" => {
436 #[allow(non_camel_case_types)]
437 struct InstallSnapshotSvc<T: SnapshotService>(pub Arc<T>);
438 impl<
439 T: SnapshotService,
440 > tonic::server::ClientStreamingService<super::SnapshotChunk>
441 for InstallSnapshotSvc<T> {
442 type Response = super::SnapshotResponse;
443 type Future = BoxFuture<
444 tonic::Response<Self::Response>,
445 tonic::Status,
446 >;
447 fn call(
448 &mut self,
449 request: tonic::Request<
450 tonic::Streaming<super::SnapshotChunk>,
451 >,
452 ) -> Self::Future {
453 let inner = Arc::clone(&self.0);
454 let fut = async move {
455 <T as SnapshotService>::install_snapshot(&inner, request)
456 .await
457 };
458 Box::pin(fut)
459 }
460 }
461 let accept_compression_encodings = self.accept_compression_encodings;
462 let send_compression_encodings = self.send_compression_encodings;
463 let max_decoding_message_size = self.max_decoding_message_size;
464 let max_encoding_message_size = self.max_encoding_message_size;
465 let inner = self.inner.clone();
466 let fut = async move {
467 let method = InstallSnapshotSvc(inner);
468 let codec = tonic::codec::ProstCodec::default();
469 let mut grpc = tonic::server::Grpc::new(codec)
470 .apply_compression_config(
471 accept_compression_encodings,
472 send_compression_encodings,
473 )
474 .apply_max_message_size_config(
475 max_decoding_message_size,
476 max_encoding_message_size,
477 );
478 let res = grpc.client_streaming(method, req).await;
479 Ok(res)
480 };
481 Box::pin(fut)
482 }
483 "/raft.storage.SnapshotService/StreamSnapshot" => {
484 #[allow(non_camel_case_types)]
485 struct StreamSnapshotSvc<T: SnapshotService>(pub Arc<T>);
486 impl<
487 T: SnapshotService,
488 > tonic::server::StreamingService<super::SnapshotAck>
489 for StreamSnapshotSvc<T> {
490 type Response = super::SnapshotChunk;
491 type ResponseStream = T::StreamSnapshotStream;
492 type Future = BoxFuture<
493 tonic::Response<Self::ResponseStream>,
494 tonic::Status,
495 >;
496 fn call(
497 &mut self,
498 request: tonic::Request<tonic::Streaming<super::SnapshotAck>>,
499 ) -> Self::Future {
500 let inner = Arc::clone(&self.0);
501 let fut = async move {
502 <T as SnapshotService>::stream_snapshot(&inner, request)
503 .await
504 };
505 Box::pin(fut)
506 }
507 }
508 let accept_compression_encodings = self.accept_compression_encodings;
509 let send_compression_encodings = self.send_compression_encodings;
510 let max_decoding_message_size = self.max_decoding_message_size;
511 let max_encoding_message_size = self.max_encoding_message_size;
512 let inner = self.inner.clone();
513 let fut = async move {
514 let method = StreamSnapshotSvc(inner);
515 let codec = tonic::codec::ProstCodec::default();
516 let mut grpc = tonic::server::Grpc::new(codec)
517 .apply_compression_config(
518 accept_compression_encodings,
519 send_compression_encodings,
520 )
521 .apply_max_message_size_config(
522 max_decoding_message_size,
523 max_encoding_message_size,
524 );
525 let res = grpc.streaming(method, req).await;
526 Ok(res)
527 };
528 Box::pin(fut)
529 }
530 "/raft.storage.SnapshotService/PurgeLog" => {
531 #[allow(non_camel_case_types)]
532 struct PurgeLogSvc<T: SnapshotService>(pub Arc<T>);
533 impl<
534 T: SnapshotService,
535 > tonic::server::UnaryService<super::PurgeLogRequest>
536 for PurgeLogSvc<T> {
537 type Response = super::PurgeLogResponse;
538 type Future = BoxFuture<
539 tonic::Response<Self::Response>,
540 tonic::Status,
541 >;
542 fn call(
543 &mut self,
544 request: tonic::Request<super::PurgeLogRequest>,
545 ) -> Self::Future {
546 let inner = Arc::clone(&self.0);
547 let fut = async move {
548 <T as SnapshotService>::purge_log(&inner, request).await
549 };
550 Box::pin(fut)
551 }
552 }
553 let accept_compression_encodings = self.accept_compression_encodings;
554 let send_compression_encodings = self.send_compression_encodings;
555 let max_decoding_message_size = self.max_decoding_message_size;
556 let max_encoding_message_size = self.max_encoding_message_size;
557 let inner = self.inner.clone();
558 let fut = async move {
559 let method = PurgeLogSvc(inner);
560 let codec = tonic::codec::ProstCodec::default();
561 let mut grpc = tonic::server::Grpc::new(codec)
562 .apply_compression_config(
563 accept_compression_encodings,
564 send_compression_encodings,
565 )
566 .apply_max_message_size_config(
567 max_decoding_message_size,
568 max_encoding_message_size,
569 );
570 let res = grpc.unary(method, req).await;
571 Ok(res)
572 };
573 Box::pin(fut)
574 }
575 _ => {
576 Box::pin(async move {
577 let mut response = http::Response::new(empty_body());
578 let headers = response.headers_mut();
579 headers
580 .insert(
581 tonic::Status::GRPC_STATUS,
582 (tonic::Code::Unimplemented as i32).into(),
583 );
584 headers
585 .insert(
586 http::header::CONTENT_TYPE,
587 tonic::metadata::GRPC_CONTENT_TYPE,
588 );
589 Ok(response)
590 })
591 }
592 }
593 }
594 }
595 impl<T> Clone for SnapshotServiceServer<T> {
596 fn clone(&self) -> Self {
597 let inner = self.inner.clone();
598 Self {
599 inner,
600 accept_compression_encodings: self.accept_compression_encodings,
601 send_compression_encodings: self.send_compression_encodings,
602 max_decoding_message_size: self.max_decoding_message_size,
603 max_encoding_message_size: self.max_encoding_message_size,
604 }
605 }
606 }
607 pub const SERVICE_NAME: &str = "raft.storage.SnapshotService";
609 impl<T> tonic::server::NamedService for SnapshotServiceServer<T> {
610 const NAME: &'static str = SERVICE_NAME;
611 }
612}