1#[derive(Clone, PartialEq, ::prost::Message)]
3pub struct RegisterEdgeRequest {
4 #[prost(string, tag = "1")]
6 pub edge_name: ::prost::alloc::string::String,
7 #[prost(message, repeated, tag = "2")]
9 pub topics: ::prost::alloc::vec::Vec<EdgeTopicDeclaration>,
10}
11#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
13pub struct EdgeTopicDeclaration {
14 #[prost(string, tag = "1")]
16 pub topic_name: ::prost::alloc::string::String,
17 #[prost(string, optional, tag = "2")]
19 pub schema_subject: ::core::option::Option<::prost::alloc::string::String>,
20}
21#[derive(Clone, PartialEq, ::prost::Message)]
22pub struct RegisterEdgeResponse {
23 #[prost(bool, tag = "1")]
24 pub success: bool,
25 #[prost(string, tag = "2")]
26 pub message: ::prost::alloc::string::String,
27 #[prost(uint64, tag = "3")]
29 pub config_version: u64,
30 #[prost(message, repeated, tag = "4")]
32 pub topics: ::prost::alloc::vec::Vec<TopicRegistrationResult>,
33}
34#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
36pub struct TopicRegistrationResult {
37 #[prost(string, tag = "1")]
38 pub topic_name: ::prost::alloc::string::String,
39 #[prost(bool, tag = "2")]
41 pub topic_created: bool,
42 #[prost(bool, tag = "3")]
44 pub schema_resolved: bool,
45 #[prost(message, optional, tag = "4")]
47 pub schema: ::core::option::Option<ResolvedSchema>,
48 #[prost(string, tag = "5")]
50 pub error: ::prost::alloc::string::String,
51}
52#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
55pub struct ResolvedSchema {
56 #[prost(string, tag = "1")]
57 pub subject: ::prost::alloc::string::String,
58 #[prost(uint64, tag = "2")]
59 pub schema_id: u64,
60 #[prost(uint32, tag = "3")]
61 pub schema_version: u32,
62 #[prost(string, tag = "4")]
64 pub schema_type: ::prost::alloc::string::String,
65 #[prost(bytes = "vec", tag = "5")]
67 pub schema_definition: ::prost::alloc::vec::Vec<u8>,
68 #[prost(string, tag = "6")]
70 pub fingerprint: ::prost::alloc::string::String,
71 #[prost(string, tag = "7")]
73 pub compatibility_mode: ::prost::alloc::string::String,
74}
75#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
76pub struct EdgeHeartbeatRequest {
77 #[prost(string, tag = "1")]
78 pub edge_name: ::prost::alloc::string::String,
79 #[prost(uint64, tag = "2")]
81 pub config_version: u64,
82}
83#[derive(Clone, PartialEq, ::prost::Message)]
84pub struct EdgeHeartbeatResponse {
85 #[prost(bool, tag = "1")]
87 pub changed: bool,
88 #[prost(uint64, tag = "2")]
90 pub config_version: u64,
91 #[prost(message, repeated, tag = "3")]
93 pub changes: ::prost::alloc::vec::Vec<EdgeChange>,
94}
95#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
97pub struct EdgeChange {
98 #[prost(string, tag = "1")]
99 pub topic_name: ::prost::alloc::string::String,
100 #[prost(enumeration = "EdgeChangeType", tag = "2")]
101 pub change_type: i32,
102 #[prost(message, optional, tag = "3")]
104 pub schema: ::core::option::Option<ResolvedSchema>,
105 #[prost(string, tag = "4")]
107 pub detail: ::prost::alloc::string::String,
108}
109#[derive(Clone, PartialEq, ::prost::Message)]
110pub struct ReplicateBatch {
111 #[prost(string, tag = "1")]
112 pub topic_name: ::prost::alloc::string::String,
113 #[prost(message, repeated, tag = "2")]
115 pub messages: ::prost::alloc::vec::Vec<super::StreamMessage>,
116 #[prost(uint64, tag = "3")]
118 pub batch_last_offset: u64,
119}
120#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
121pub struct ReplicateAck {
122 #[prost(string, tag = "1")]
123 pub topic_name: ::prost::alloc::string::String,
124 #[prost(uint64, tag = "2")]
126 pub acked_offset: u64,
127}
128#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)]
129#[repr(i32)]
130pub enum EdgeChangeType {
131 SchemaUpdated = 0,
133 SchemaRemoved = 1,
135 TopicRemoved = 2,
137}
138impl EdgeChangeType {
139 pub fn as_str_name(&self) -> &'static str {
144 match self {
145 Self::SchemaUpdated => "SCHEMA_UPDATED",
146 Self::SchemaRemoved => "SCHEMA_REMOVED",
147 Self::TopicRemoved => "TOPIC_REMOVED",
148 }
149 }
150 pub fn from_str_name(value: &str) -> ::core::option::Option<Self> {
152 match value {
153 "SCHEMA_UPDATED" => Some(Self::SchemaUpdated),
154 "SCHEMA_REMOVED" => Some(Self::SchemaRemoved),
155 "TOPIC_REMOVED" => Some(Self::TopicRemoved),
156 _ => None,
157 }
158 }
159}
160pub mod edge_replicator_service_client {
162 #![allow(
163 unused_variables,
164 dead_code,
165 missing_docs,
166 clippy::wildcard_imports,
167 clippy::let_unit_value,
168 )]
169 use tonic::codegen::*;
170 use tonic::codegen::http::Uri;
171 #[derive(Debug, Clone)]
178 pub struct EdgeReplicatorServiceClient<T> {
179 inner: tonic::client::Grpc<T>,
180 }
181 impl EdgeReplicatorServiceClient<tonic::transport::Channel> {
182 pub async fn connect<D>(dst: D) -> Result<Self, tonic::transport::Error>
184 where
185 D: TryInto<tonic::transport::Endpoint>,
186 D::Error: Into<StdError>,
187 {
188 let conn = tonic::transport::Endpoint::new(dst)?.connect().await?;
189 Ok(Self::new(conn))
190 }
191 }
192 impl<T> EdgeReplicatorServiceClient<T>
193 where
194 T: tonic::client::GrpcService<tonic::body::Body>,
195 T::Error: Into<StdError>,
196 T::ResponseBody: Body<Data = Bytes> + std::marker::Send + 'static,
197 <T::ResponseBody as Body>::Error: Into<StdError> + std::marker::Send,
198 {
199 pub fn new(inner: T) -> Self {
200 let inner = tonic::client::Grpc::new(inner);
201 Self { inner }
202 }
203 pub fn with_origin(inner: T, origin: Uri) -> Self {
204 let inner = tonic::client::Grpc::with_origin(inner, origin);
205 Self { inner }
206 }
207 pub fn with_interceptor<F>(
208 inner: T,
209 interceptor: F,
210 ) -> EdgeReplicatorServiceClient<InterceptedService<T, F>>
211 where
212 F: tonic::service::Interceptor,
213 T::ResponseBody: Default,
214 T: tonic::codegen::Service<
215 http::Request<tonic::body::Body>,
216 Response = http::Response<
217 <T as tonic::client::GrpcService<tonic::body::Body>>::ResponseBody,
218 >,
219 >,
220 <T as tonic::codegen::Service<
221 http::Request<tonic::body::Body>,
222 >>::Error: Into<StdError> + std::marker::Send + std::marker::Sync,
223 {
224 EdgeReplicatorServiceClient::new(InterceptedService::new(inner, interceptor))
225 }
226 #[must_use]
231 pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self {
232 self.inner = self.inner.send_compressed(encoding);
233 self
234 }
235 #[must_use]
237 pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self {
238 self.inner = self.inner.accept_compressed(encoding);
239 self
240 }
241 #[must_use]
245 pub fn max_decoding_message_size(mut self, limit: usize) -> Self {
246 self.inner = self.inner.max_decoding_message_size(limit);
247 self
248 }
249 #[must_use]
253 pub fn max_encoding_message_size(mut self, limit: usize) -> Self {
254 self.inner = self.inner.max_encoding_message_size(limit);
255 self
256 }
257 pub async fn register_edge(
262 &mut self,
263 request: impl tonic::IntoRequest<super::RegisterEdgeRequest>,
264 ) -> std::result::Result<
265 tonic::Response<super::RegisterEdgeResponse>,
266 tonic::Status,
267 > {
268 self.inner
269 .ready()
270 .await
271 .map_err(|e| {
272 tonic::Status::unknown(
273 format!("Service was not ready: {}", e.into()),
274 )
275 })?;
276 let codec = tonic_prost::ProstCodec::default();
277 let path = http::uri::PathAndQuery::from_static(
278 "/danube.edge.EdgeReplicatorService/RegisterEdge",
279 );
280 let mut req = request.into_request();
281 req.extensions_mut()
282 .insert(
283 GrpcMethod::new("danube.edge.EdgeReplicatorService", "RegisterEdge"),
284 );
285 self.inner.unary(req, path, codec).await
286 }
287 pub async fn edge_heartbeat(
291 &mut self,
292 request: impl tonic::IntoRequest<super::EdgeHeartbeatRequest>,
293 ) -> std::result::Result<
294 tonic::Response<super::EdgeHeartbeatResponse>,
295 tonic::Status,
296 > {
297 self.inner
298 .ready()
299 .await
300 .map_err(|e| {
301 tonic::Status::unknown(
302 format!("Service was not ready: {}", e.into()),
303 )
304 })?;
305 let codec = tonic_prost::ProstCodec::default();
306 let path = http::uri::PathAndQuery::from_static(
307 "/danube.edge.EdgeReplicatorService/EdgeHeartbeat",
308 );
309 let mut req = request.into_request();
310 req.extensions_mut()
311 .insert(
312 GrpcMethod::new("danube.edge.EdgeReplicatorService", "EdgeHeartbeat"),
313 );
314 self.inner.unary(req, path, codec).await
315 }
316 pub async fn replicate_data(
319 &mut self,
320 request: impl tonic::IntoStreamingRequest<Message = super::ReplicateBatch>,
321 ) -> std::result::Result<
322 tonic::Response<tonic::codec::Streaming<super::ReplicateAck>>,
323 tonic::Status,
324 > {
325 self.inner
326 .ready()
327 .await
328 .map_err(|e| {
329 tonic::Status::unknown(
330 format!("Service was not ready: {}", e.into()),
331 )
332 })?;
333 let codec = tonic_prost::ProstCodec::default();
334 let path = http::uri::PathAndQuery::from_static(
335 "/danube.edge.EdgeReplicatorService/ReplicateData",
336 );
337 let mut req = request.into_streaming_request();
338 req.extensions_mut()
339 .insert(
340 GrpcMethod::new("danube.edge.EdgeReplicatorService", "ReplicateData"),
341 );
342 self.inner.streaming(req, path, codec).await
343 }
344 }
345}
346pub mod edge_replicator_service_server {
348 #![allow(
349 unused_variables,
350 dead_code,
351 missing_docs,
352 clippy::wildcard_imports,
353 clippy::let_unit_value,
354 )]
355 use tonic::codegen::*;
356 #[async_trait]
358 pub trait EdgeReplicatorService: std::marker::Send + std::marker::Sync + 'static {
359 async fn register_edge(
364 &self,
365 request: tonic::Request<super::RegisterEdgeRequest>,
366 ) -> std::result::Result<
367 tonic::Response<super::RegisterEdgeResponse>,
368 tonic::Status,
369 >;
370 async fn edge_heartbeat(
374 &self,
375 request: tonic::Request<super::EdgeHeartbeatRequest>,
376 ) -> std::result::Result<
377 tonic::Response<super::EdgeHeartbeatResponse>,
378 tonic::Status,
379 >;
380 type ReplicateDataStream: tonic::codegen::tokio_stream::Stream<
382 Item = std::result::Result<super::ReplicateAck, tonic::Status>,
383 >
384 + std::marker::Send
385 + 'static;
386 async fn replicate_data(
389 &self,
390 request: tonic::Request<tonic::Streaming<super::ReplicateBatch>>,
391 ) -> std::result::Result<
392 tonic::Response<Self::ReplicateDataStream>,
393 tonic::Status,
394 >;
395 }
396 #[derive(Debug)]
403 pub struct EdgeReplicatorServiceServer<T> {
404 inner: Arc<T>,
405 accept_compression_encodings: EnabledCompressionEncodings,
406 send_compression_encodings: EnabledCompressionEncodings,
407 max_decoding_message_size: Option<usize>,
408 max_encoding_message_size: Option<usize>,
409 }
410 impl<T> EdgeReplicatorServiceServer<T> {
411 pub fn new(inner: T) -> Self {
412 Self::from_arc(Arc::new(inner))
413 }
414 pub fn from_arc(inner: Arc<T>) -> Self {
415 Self {
416 inner,
417 accept_compression_encodings: Default::default(),
418 send_compression_encodings: Default::default(),
419 max_decoding_message_size: None,
420 max_encoding_message_size: None,
421 }
422 }
423 pub fn with_interceptor<F>(
424 inner: T,
425 interceptor: F,
426 ) -> InterceptedService<Self, F>
427 where
428 F: tonic::service::Interceptor,
429 {
430 InterceptedService::new(Self::new(inner), interceptor)
431 }
432 #[must_use]
434 pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self {
435 self.accept_compression_encodings.enable(encoding);
436 self
437 }
438 #[must_use]
440 pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self {
441 self.send_compression_encodings.enable(encoding);
442 self
443 }
444 #[must_use]
448 pub fn max_decoding_message_size(mut self, limit: usize) -> Self {
449 self.max_decoding_message_size = Some(limit);
450 self
451 }
452 #[must_use]
456 pub fn max_encoding_message_size(mut self, limit: usize) -> Self {
457 self.max_encoding_message_size = Some(limit);
458 self
459 }
460 }
461 impl<T, B> tonic::codegen::Service<http::Request<B>>
462 for EdgeReplicatorServiceServer<T>
463 where
464 T: EdgeReplicatorService,
465 B: Body + std::marker::Send + 'static,
466 B::Error: Into<StdError> + std::marker::Send + 'static,
467 {
468 type Response = http::Response<tonic::body::Body>;
469 type Error = std::convert::Infallible;
470 type Future = BoxFuture<Self::Response, Self::Error>;
471 fn poll_ready(
472 &mut self,
473 _cx: &mut Context<'_>,
474 ) -> Poll<std::result::Result<(), Self::Error>> {
475 Poll::Ready(Ok(()))
476 }
477 fn call(&mut self, req: http::Request<B>) -> Self::Future {
478 match req.uri().path() {
479 "/danube.edge.EdgeReplicatorService/RegisterEdge" => {
480 #[allow(non_camel_case_types)]
481 struct RegisterEdgeSvc<T: EdgeReplicatorService>(pub Arc<T>);
482 impl<
483 T: EdgeReplicatorService,
484 > tonic::server::UnaryService<super::RegisterEdgeRequest>
485 for RegisterEdgeSvc<T> {
486 type Response = super::RegisterEdgeResponse;
487 type Future = BoxFuture<
488 tonic::Response<Self::Response>,
489 tonic::Status,
490 >;
491 fn call(
492 &mut self,
493 request: tonic::Request<super::RegisterEdgeRequest>,
494 ) -> Self::Future {
495 let inner = Arc::clone(&self.0);
496 let fut = async move {
497 <T as EdgeReplicatorService>::register_edge(&inner, request)
498 .await
499 };
500 Box::pin(fut)
501 }
502 }
503 let accept_compression_encodings = self.accept_compression_encodings;
504 let send_compression_encodings = self.send_compression_encodings;
505 let max_decoding_message_size = self.max_decoding_message_size;
506 let max_encoding_message_size = self.max_encoding_message_size;
507 let inner = self.inner.clone();
508 let fut = async move {
509 let method = RegisterEdgeSvc(inner);
510 let codec = tonic_prost::ProstCodec::default();
511 let mut grpc = tonic::server::Grpc::new(codec)
512 .apply_compression_config(
513 accept_compression_encodings,
514 send_compression_encodings,
515 )
516 .apply_max_message_size_config(
517 max_decoding_message_size,
518 max_encoding_message_size,
519 );
520 let res = grpc.unary(method, req).await;
521 Ok(res)
522 };
523 Box::pin(fut)
524 }
525 "/danube.edge.EdgeReplicatorService/EdgeHeartbeat" => {
526 #[allow(non_camel_case_types)]
527 struct EdgeHeartbeatSvc<T: EdgeReplicatorService>(pub Arc<T>);
528 impl<
529 T: EdgeReplicatorService,
530 > tonic::server::UnaryService<super::EdgeHeartbeatRequest>
531 for EdgeHeartbeatSvc<T> {
532 type Response = super::EdgeHeartbeatResponse;
533 type Future = BoxFuture<
534 tonic::Response<Self::Response>,
535 tonic::Status,
536 >;
537 fn call(
538 &mut self,
539 request: tonic::Request<super::EdgeHeartbeatRequest>,
540 ) -> Self::Future {
541 let inner = Arc::clone(&self.0);
542 let fut = async move {
543 <T as EdgeReplicatorService>::edge_heartbeat(
544 &inner,
545 request,
546 )
547 .await
548 };
549 Box::pin(fut)
550 }
551 }
552 let accept_compression_encodings = self.accept_compression_encodings;
553 let send_compression_encodings = self.send_compression_encodings;
554 let max_decoding_message_size = self.max_decoding_message_size;
555 let max_encoding_message_size = self.max_encoding_message_size;
556 let inner = self.inner.clone();
557 let fut = async move {
558 let method = EdgeHeartbeatSvc(inner);
559 let codec = tonic_prost::ProstCodec::default();
560 let mut grpc = tonic::server::Grpc::new(codec)
561 .apply_compression_config(
562 accept_compression_encodings,
563 send_compression_encodings,
564 )
565 .apply_max_message_size_config(
566 max_decoding_message_size,
567 max_encoding_message_size,
568 );
569 let res = grpc.unary(method, req).await;
570 Ok(res)
571 };
572 Box::pin(fut)
573 }
574 "/danube.edge.EdgeReplicatorService/ReplicateData" => {
575 #[allow(non_camel_case_types)]
576 struct ReplicateDataSvc<T: EdgeReplicatorService>(pub Arc<T>);
577 impl<
578 T: EdgeReplicatorService,
579 > tonic::server::StreamingService<super::ReplicateBatch>
580 for ReplicateDataSvc<T> {
581 type Response = super::ReplicateAck;
582 type ResponseStream = T::ReplicateDataStream;
583 type Future = BoxFuture<
584 tonic::Response<Self::ResponseStream>,
585 tonic::Status,
586 >;
587 fn call(
588 &mut self,
589 request: tonic::Request<
590 tonic::Streaming<super::ReplicateBatch>,
591 >,
592 ) -> Self::Future {
593 let inner = Arc::clone(&self.0);
594 let fut = async move {
595 <T as EdgeReplicatorService>::replicate_data(
596 &inner,
597 request,
598 )
599 .await
600 };
601 Box::pin(fut)
602 }
603 }
604 let accept_compression_encodings = self.accept_compression_encodings;
605 let send_compression_encodings = self.send_compression_encodings;
606 let max_decoding_message_size = self.max_decoding_message_size;
607 let max_encoding_message_size = self.max_encoding_message_size;
608 let inner = self.inner.clone();
609 let fut = async move {
610 let method = ReplicateDataSvc(inner);
611 let codec = tonic_prost::ProstCodec::default();
612 let mut grpc = tonic::server::Grpc::new(codec)
613 .apply_compression_config(
614 accept_compression_encodings,
615 send_compression_encodings,
616 )
617 .apply_max_message_size_config(
618 max_decoding_message_size,
619 max_encoding_message_size,
620 );
621 let res = grpc.streaming(method, req).await;
622 Ok(res)
623 };
624 Box::pin(fut)
625 }
626 _ => {
627 Box::pin(async move {
628 let mut response = http::Response::new(
629 tonic::body::Body::default(),
630 );
631 let headers = response.headers_mut();
632 headers
633 .insert(
634 tonic::Status::GRPC_STATUS,
635 (tonic::Code::Unimplemented as i32).into(),
636 );
637 headers
638 .insert(
639 http::header::CONTENT_TYPE,
640 tonic::metadata::GRPC_CONTENT_TYPE,
641 );
642 Ok(response)
643 })
644 }
645 }
646 }
647 }
648 impl<T> Clone for EdgeReplicatorServiceServer<T> {
649 fn clone(&self) -> Self {
650 let inner = self.inner.clone();
651 Self {
652 inner,
653 accept_compression_encodings: self.accept_compression_encodings,
654 send_compression_encodings: self.send_compression_encodings,
655 max_decoding_message_size: self.max_decoding_message_size,
656 max_encoding_message_size: self.max_encoding_message_size,
657 }
658 }
659 }
660 pub const SERVICE_NAME: &str = "danube.edge.EdgeReplicatorService";
662 impl<T> tonic::server::NamedService for EdgeReplicatorServiceServer<T> {
663 const NAME: &'static str = SERVICE_NAME;
664 }
665}