1#[derive(serde::Serialize, serde::Deserialize)]
3#[derive(Clone, PartialEq, ::prost::Message)]
4pub struct SnapshotChunk {
5 #[prost(uint64, tag = "1")]
7 pub leader_term: u64,
8 #[prost(uint32, tag = "2")]
10 pub leader_id: u32,
11 #[prost(uint32, tag = "3")]
13 pub seq: u32,
14 #[prost(uint32, tag = "4")]
16 pub total_chunks: u32,
17 #[prost(bytes = "bytes", tag = "5")]
19 pub chunk_checksum: ::prost::bytes::Bytes,
20 #[prost(message, optional, tag = "6")]
22 pub metadata: ::core::option::Option<SnapshotMetadata>,
23 #[prost(bytes = "bytes", tag = "7")]
25 pub data: ::prost::bytes::Bytes,
26}
27#[derive(serde::Serialize, serde::Deserialize)]
28#[derive(Clone, PartialEq, ::prost::Message)]
29pub struct SnapshotMetadata {
30 #[prost(message, optional, tag = "1")]
32 pub last_included: ::core::option::Option<super::super::common::LogId>,
33 #[prost(bytes = "bytes", tag = "2")]
35 pub checksum: ::prost::bytes::Bytes,
36}
37#[derive(serde::Serialize, serde::Deserialize)]
38#[derive(Clone, Copy, PartialEq, ::prost::Message)]
39pub struct SnapshotResponse {
40 #[prost(uint64, tag = "1")]
42 pub term: u64,
43 #[prost(bool, tag = "2")]
45 pub success: bool,
46 #[prost(uint32, tag = "3")]
48 pub next_chunk: u32,
49}
50#[derive(serde::Serialize, serde::Deserialize)]
51#[derive(Clone, PartialEq, ::prost::Message)]
52pub struct PurgeLogRequest {
53 #[prost(uint64, tag = "1")]
54 pub term: u64,
55 #[prost(uint32, tag = "2")]
56 pub leader_id: u32,
57 #[prost(uint64, tag = "3")]
59 pub leader_commit: u64,
60 #[prost(message, optional, tag = "4")]
62 pub last_included: ::core::option::Option<super::super::common::LogId>,
63 #[prost(bytes = "bytes", tag = "5")]
64 pub snapshot_checksum: ::prost::bytes::Bytes,
65}
66#[derive(serde::Serialize, serde::Deserialize)]
67#[derive(Clone, Copy, PartialEq, ::prost::Message)]
68pub struct PurgeLogResponse {
69 #[prost(uint32, tag = "1")]
70 pub node_id: u32,
71 #[prost(uint64, tag = "2")]
73 pub term: u64,
74 #[prost(bool, tag = "3")]
76 pub success: bool,
77 #[prost(message, optional, tag = "4")]
79 pub last_purged: ::core::option::Option<super::super::common::LogId>,
80}
81#[derive(serde::Serialize, serde::Deserialize)]
82#[derive(Clone, Copy, PartialEq, ::prost::Message)]
83pub struct SnapshotAck {
84 #[prost(uint32, tag = "1")]
86 pub seq: u32,
87 #[prost(enumeration = "snapshot_ack::ChunkStatus", tag = "2")]
89 pub status: i32,
90 #[prost(uint32, tag = "3")]
92 pub next_requested: u32,
93}
94pub mod snapshot_ack {
96 #[derive(serde::Serialize, serde::Deserialize)]
97 #[derive(
98 Clone,
99 Copy,
100 Debug,
101 PartialEq,
102 Eq,
103 Hash,
104 PartialOrd,
105 Ord,
106 ::prost::Enumeration
107 )]
108 #[repr(i32)]
109 pub enum ChunkStatus {
110 Accepted = 0,
111 ChecksumMismatch = 1,
112 OutOfOrder = 2,
113 Requested = 3,
114 Failed = 4,
115 }
116 impl ChunkStatus {
117 pub fn as_str_name(&self) -> &'static str {
122 match self {
123 Self::Accepted => "ACCEPTED",
124 Self::ChecksumMismatch => "CHECKSUM_MISMATCH",
125 Self::OutOfOrder => "OUT_OF_ORDER",
126 Self::Requested => "REQUESTED",
127 Self::Failed => "FAILED",
128 }
129 }
130 pub fn from_str_name(value: &str) -> ::core::option::Option<Self> {
132 match value {
133 "ACCEPTED" => Some(Self::Accepted),
134 "CHECKSUM_MISMATCH" => Some(Self::ChecksumMismatch),
135 "OUT_OF_ORDER" => Some(Self::OutOfOrder),
136 "REQUESTED" => Some(Self::Requested),
137 "FAILED" => Some(Self::Failed),
138 _ => None,
139 }
140 }
141 }
142}
143pub mod snapshot_service_client {
145 #![allow(
146 unused_variables,
147 dead_code,
148 missing_docs,
149 clippy::wildcard_imports,
150 clippy::let_unit_value,
151 )]
152 use tonic::codegen::*;
153 use tonic::codegen::http::Uri;
154 #[derive(Debug, Clone)]
155 pub struct SnapshotServiceClient<T> {
156 inner: tonic::client::Grpc<T>,
157 }
158 impl SnapshotServiceClient<tonic::transport::Channel> {
159 pub async fn connect<D>(dst: D) -> Result<Self, tonic::transport::Error>
161 where
162 D: TryInto<tonic::transport::Endpoint>,
163 D::Error: Into<StdError>,
164 {
165 let conn = tonic::transport::Endpoint::new(dst)?.connect().await?;
166 Ok(Self::new(conn))
167 }
168 }
169 impl<T> SnapshotServiceClient<T>
170 where
171 T: tonic::client::GrpcService<tonic::body::BoxBody>,
172 T::Error: Into<StdError>,
173 T::ResponseBody: Body<Data = Bytes> + std::marker::Send + 'static,
174 <T::ResponseBody as Body>::Error: Into<StdError> + std::marker::Send,
175 {
176 pub fn new(inner: T) -> Self {
177 let inner = tonic::client::Grpc::new(inner);
178 Self { inner }
179 }
180 pub fn with_origin(inner: T, origin: Uri) -> Self {
181 let inner = tonic::client::Grpc::with_origin(inner, origin);
182 Self { inner }
183 }
184 pub fn with_interceptor<F>(
185 inner: T,
186 interceptor: F,
187 ) -> SnapshotServiceClient<InterceptedService<T, F>>
188 where
189 F: tonic::service::Interceptor,
190 T::ResponseBody: Default,
191 T: tonic::codegen::Service<
192 http::Request<tonic::body::BoxBody>,
193 Response = http::Response<
194 <T as tonic::client::GrpcService<tonic::body::BoxBody>>::ResponseBody,
195 >,
196 >,
197 <T as tonic::codegen::Service<
198 http::Request<tonic::body::BoxBody>,
199 >>::Error: Into<StdError> + std::marker::Send + std::marker::Sync,
200 {
201 SnapshotServiceClient::new(InterceptedService::new(inner, interceptor))
202 }
203 #[must_use]
208 pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self {
209 self.inner = self.inner.send_compressed(encoding);
210 self
211 }
212 #[must_use]
214 pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self {
215 self.inner = self.inner.accept_compressed(encoding);
216 self
217 }
218 #[must_use]
222 pub fn max_decoding_message_size(mut self, limit: usize) -> Self {
223 self.inner = self.inner.max_decoding_message_size(limit);
224 self
225 }
226 #[must_use]
230 pub fn max_encoding_message_size(mut self, limit: usize) -> Self {
231 self.inner = self.inner.max_encoding_message_size(limit);
232 self
233 }
234 pub async fn install_snapshot(
236 &mut self,
237 request: impl tonic::IntoStreamingRequest<Message = super::SnapshotChunk>,
238 ) -> std::result::Result<
239 tonic::Response<super::SnapshotResponse>,
240 tonic::Status,
241 > {
242 self.inner
243 .ready()
244 .await
245 .map_err(|e| {
246 tonic::Status::unknown(
247 format!("Service was not ready: {}", e.into()),
248 )
249 })?;
250 let codec = tonic::codec::ProstCodec::default();
251 let path = http::uri::PathAndQuery::from_static(
252 "/d_engine.server.storage.SnapshotService/InstallSnapshot",
253 );
254 let mut req = request.into_streaming_request();
255 req.extensions_mut()
256 .insert(
257 GrpcMethod::new(
258 "d_engine.server.storage.SnapshotService",
259 "InstallSnapshot",
260 ),
261 );
262 self.inner.client_streaming(req, path, codec).await
263 }
264 pub async fn stream_snapshot(
266 &mut self,
267 request: impl tonic::IntoStreamingRequest<Message = super::SnapshotAck>,
268 ) -> std::result::Result<
269 tonic::Response<tonic::codec::Streaming<super::SnapshotChunk>>,
270 tonic::Status,
271 > {
272 self.inner
273 .ready()
274 .await
275 .map_err(|e| {
276 tonic::Status::unknown(
277 format!("Service was not ready: {}", e.into()),
278 )
279 })?;
280 let codec = tonic::codec::ProstCodec::default();
281 let path = http::uri::PathAndQuery::from_static(
282 "/d_engine.server.storage.SnapshotService/StreamSnapshot",
283 );
284 let mut req = request.into_streaming_request();
285 req.extensions_mut()
286 .insert(
287 GrpcMethod::new(
288 "d_engine.server.storage.SnapshotService",
289 "StreamSnapshot",
290 ),
291 );
292 self.inner.streaming(req, path, codec).await
293 }
294 pub async fn purge_log(
296 &mut self,
297 request: impl tonic::IntoRequest<super::PurgeLogRequest>,
298 ) -> std::result::Result<
299 tonic::Response<super::PurgeLogResponse>,
300 tonic::Status,
301 > {
302 self.inner
303 .ready()
304 .await
305 .map_err(|e| {
306 tonic::Status::unknown(
307 format!("Service was not ready: {}", e.into()),
308 )
309 })?;
310 let codec = tonic::codec::ProstCodec::default();
311 let path = http::uri::PathAndQuery::from_static(
312 "/d_engine.server.storage.SnapshotService/PurgeLog",
313 );
314 let mut req = request.into_request();
315 req.extensions_mut()
316 .insert(
317 GrpcMethod::new(
318 "d_engine.server.storage.SnapshotService",
319 "PurgeLog",
320 ),
321 );
322 self.inner.unary(req, path, codec).await
323 }
324 }
325}
326pub mod snapshot_service_server {
328 #![allow(
329 unused_variables,
330 dead_code,
331 missing_docs,
332 clippy::wildcard_imports,
333 clippy::let_unit_value,
334 )]
335 use tonic::codegen::*;
336 #[async_trait]
338 pub trait SnapshotService: std::marker::Send + std::marker::Sync + 'static {
339 async fn install_snapshot(
341 &self,
342 request: tonic::Request<tonic::Streaming<super::SnapshotChunk>>,
343 ) -> std::result::Result<
344 tonic::Response<super::SnapshotResponse>,
345 tonic::Status,
346 >;
347 type StreamSnapshotStream: tonic::codegen::tokio_stream::Stream<
349 Item = std::result::Result<super::SnapshotChunk, tonic::Status>,
350 >
351 + std::marker::Send
352 + 'static;
353 async fn stream_snapshot(
355 &self,
356 request: tonic::Request<tonic::Streaming<super::SnapshotAck>>,
357 ) -> std::result::Result<
358 tonic::Response<Self::StreamSnapshotStream>,
359 tonic::Status,
360 >;
361 async fn purge_log(
363 &self,
364 request: tonic::Request<super::PurgeLogRequest>,
365 ) -> std::result::Result<
366 tonic::Response<super::PurgeLogResponse>,
367 tonic::Status,
368 >;
369 }
370 #[derive(Debug)]
371 pub struct SnapshotServiceServer<T> {
372 inner: Arc<T>,
373 accept_compression_encodings: EnabledCompressionEncodings,
374 send_compression_encodings: EnabledCompressionEncodings,
375 max_decoding_message_size: Option<usize>,
376 max_encoding_message_size: Option<usize>,
377 }
378 impl<T> SnapshotServiceServer<T> {
379 pub fn new(inner: T) -> Self {
380 Self::from_arc(Arc::new(inner))
381 }
382 pub fn from_arc(inner: Arc<T>) -> Self {
383 Self {
384 inner,
385 accept_compression_encodings: Default::default(),
386 send_compression_encodings: Default::default(),
387 max_decoding_message_size: None,
388 max_encoding_message_size: None,
389 }
390 }
391 pub fn with_interceptor<F>(
392 inner: T,
393 interceptor: F,
394 ) -> InterceptedService<Self, F>
395 where
396 F: tonic::service::Interceptor,
397 {
398 InterceptedService::new(Self::new(inner), interceptor)
399 }
400 #[must_use]
402 pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self {
403 self.accept_compression_encodings.enable(encoding);
404 self
405 }
406 #[must_use]
408 pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self {
409 self.send_compression_encodings.enable(encoding);
410 self
411 }
412 #[must_use]
416 pub fn max_decoding_message_size(mut self, limit: usize) -> Self {
417 self.max_decoding_message_size = Some(limit);
418 self
419 }
420 #[must_use]
424 pub fn max_encoding_message_size(mut self, limit: usize) -> Self {
425 self.max_encoding_message_size = Some(limit);
426 self
427 }
428 }
429 impl<T, B> tonic::codegen::Service<http::Request<B>> for SnapshotServiceServer<T>
430 where
431 T: SnapshotService,
432 B: Body + std::marker::Send + 'static,
433 B::Error: Into<StdError> + std::marker::Send + 'static,
434 {
435 type Response = http::Response<tonic::body::BoxBody>;
436 type Error = std::convert::Infallible;
437 type Future = BoxFuture<Self::Response, Self::Error>;
438 fn poll_ready(
439 &mut self,
440 _cx: &mut Context<'_>,
441 ) -> Poll<std::result::Result<(), Self::Error>> {
442 Poll::Ready(Ok(()))
443 }
444 fn call(&mut self, req: http::Request<B>) -> Self::Future {
445 match req.uri().path() {
446 "/d_engine.server.storage.SnapshotService/InstallSnapshot" => {
447 #[allow(non_camel_case_types)]
448 struct InstallSnapshotSvc<T: SnapshotService>(pub Arc<T>);
449 impl<
450 T: SnapshotService,
451 > tonic::server::ClientStreamingService<super::SnapshotChunk>
452 for InstallSnapshotSvc<T> {
453 type Response = super::SnapshotResponse;
454 type Future = BoxFuture<
455 tonic::Response<Self::Response>,
456 tonic::Status,
457 >;
458 fn call(
459 &mut self,
460 request: tonic::Request<
461 tonic::Streaming<super::SnapshotChunk>,
462 >,
463 ) -> Self::Future {
464 let inner = Arc::clone(&self.0);
465 let fut = async move {
466 <T as SnapshotService>::install_snapshot(&inner, request)
467 .await
468 };
469 Box::pin(fut)
470 }
471 }
472 let accept_compression_encodings = self.accept_compression_encodings;
473 let send_compression_encodings = self.send_compression_encodings;
474 let max_decoding_message_size = self.max_decoding_message_size;
475 let max_encoding_message_size = self.max_encoding_message_size;
476 let inner = self.inner.clone();
477 let fut = async move {
478 let method = InstallSnapshotSvc(inner);
479 let codec = tonic::codec::ProstCodec::default();
480 let mut grpc = tonic::server::Grpc::new(codec)
481 .apply_compression_config(
482 accept_compression_encodings,
483 send_compression_encodings,
484 )
485 .apply_max_message_size_config(
486 max_decoding_message_size,
487 max_encoding_message_size,
488 );
489 let res = grpc.client_streaming(method, req).await;
490 Ok(res)
491 };
492 Box::pin(fut)
493 }
494 "/d_engine.server.storage.SnapshotService/StreamSnapshot" => {
495 #[allow(non_camel_case_types)]
496 struct StreamSnapshotSvc<T: SnapshotService>(pub Arc<T>);
497 impl<
498 T: SnapshotService,
499 > tonic::server::StreamingService<super::SnapshotAck>
500 for StreamSnapshotSvc<T> {
501 type Response = super::SnapshotChunk;
502 type ResponseStream = T::StreamSnapshotStream;
503 type Future = BoxFuture<
504 tonic::Response<Self::ResponseStream>,
505 tonic::Status,
506 >;
507 fn call(
508 &mut self,
509 request: tonic::Request<tonic::Streaming<super::SnapshotAck>>,
510 ) -> Self::Future {
511 let inner = Arc::clone(&self.0);
512 let fut = async move {
513 <T as SnapshotService>::stream_snapshot(&inner, request)
514 .await
515 };
516 Box::pin(fut)
517 }
518 }
519 let accept_compression_encodings = self.accept_compression_encodings;
520 let send_compression_encodings = self.send_compression_encodings;
521 let max_decoding_message_size = self.max_decoding_message_size;
522 let max_encoding_message_size = self.max_encoding_message_size;
523 let inner = self.inner.clone();
524 let fut = async move {
525 let method = StreamSnapshotSvc(inner);
526 let codec = tonic::codec::ProstCodec::default();
527 let mut grpc = tonic::server::Grpc::new(codec)
528 .apply_compression_config(
529 accept_compression_encodings,
530 send_compression_encodings,
531 )
532 .apply_max_message_size_config(
533 max_decoding_message_size,
534 max_encoding_message_size,
535 );
536 let res = grpc.streaming(method, req).await;
537 Ok(res)
538 };
539 Box::pin(fut)
540 }
541 "/d_engine.server.storage.SnapshotService/PurgeLog" => {
542 #[allow(non_camel_case_types)]
543 struct PurgeLogSvc<T: SnapshotService>(pub Arc<T>);
544 impl<
545 T: SnapshotService,
546 > tonic::server::UnaryService<super::PurgeLogRequest>
547 for PurgeLogSvc<T> {
548 type Response = super::PurgeLogResponse;
549 type Future = BoxFuture<
550 tonic::Response<Self::Response>,
551 tonic::Status,
552 >;
553 fn call(
554 &mut self,
555 request: tonic::Request<super::PurgeLogRequest>,
556 ) -> Self::Future {
557 let inner = Arc::clone(&self.0);
558 let fut = async move {
559 <T as SnapshotService>::purge_log(&inner, request).await
560 };
561 Box::pin(fut)
562 }
563 }
564 let accept_compression_encodings = self.accept_compression_encodings;
565 let send_compression_encodings = self.send_compression_encodings;
566 let max_decoding_message_size = self.max_decoding_message_size;
567 let max_encoding_message_size = self.max_encoding_message_size;
568 let inner = self.inner.clone();
569 let fut = async move {
570 let method = PurgeLogSvc(inner);
571 let codec = tonic::codec::ProstCodec::default();
572 let mut grpc = tonic::server::Grpc::new(codec)
573 .apply_compression_config(
574 accept_compression_encodings,
575 send_compression_encodings,
576 )
577 .apply_max_message_size_config(
578 max_decoding_message_size,
579 max_encoding_message_size,
580 );
581 let res = grpc.unary(method, req).await;
582 Ok(res)
583 };
584 Box::pin(fut)
585 }
586 _ => {
587 Box::pin(async move {
588 let mut response = http::Response::new(empty_body());
589 let headers = response.headers_mut();
590 headers
591 .insert(
592 tonic::Status::GRPC_STATUS,
593 (tonic::Code::Unimplemented as i32).into(),
594 );
595 headers
596 .insert(
597 http::header::CONTENT_TYPE,
598 tonic::metadata::GRPC_CONTENT_TYPE,
599 );
600 Ok(response)
601 })
602 }
603 }
604 }
605 }
606 impl<T> Clone for SnapshotServiceServer<T> {
607 fn clone(&self) -> Self {
608 let inner = self.inner.clone();
609 Self {
610 inner,
611 accept_compression_encodings: self.accept_compression_encodings,
612 send_compression_encodings: self.send_compression_encodings,
613 max_decoding_message_size: self.max_decoding_message_size,
614 max_encoding_message_size: self.max_encoding_message_size,
615 }
616 }
617 }
618 pub const SERVICE_NAME: &str = "d_engine.server.storage.SnapshotService";
620 impl<T> tonic::server::NamedService for SnapshotServiceServer<T> {
621 const NAME: &'static str = SERVICE_NAME;
622 }
623}