1#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
4pub struct RegisterSchemaRequest {
5 #[prost(string, tag = "1")]
7 pub subject: ::prost::alloc::string::String,
8 #[prost(string, tag = "2")]
10 pub schema_type: ::prost::alloc::string::String,
11 #[prost(bytes = "vec", tag = "3")]
13 pub schema_definition: ::prost::alloc::vec::Vec<u8>,
14 #[prost(string, tag = "4")]
16 pub description: ::prost::alloc::string::String,
17 #[prost(string, tag = "5")]
19 pub created_by: ::prost::alloc::string::String,
20 #[prost(string, repeated, tag = "6")]
22 pub tags: ::prost::alloc::vec::Vec<::prost::alloc::string::String>,
23}
24#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
26pub struct RegisterSchemaResponse {
27 #[prost(uint64, tag = "1")]
29 pub schema_id: u64,
30 #[prost(uint32, tag = "2")]
32 pub version: u32,
33 #[prost(bool, tag = "3")]
35 pub is_new_version: bool,
36 #[prost(string, tag = "4")]
38 pub fingerprint: ::prost::alloc::string::String,
39}
40#[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)]
42pub struct GetSchemaRequest {
43 #[prost(uint64, tag = "1")]
45 pub schema_id: u64,
46 #[prost(uint32, optional, tag = "2")]
48 pub version: ::core::option::Option<u32>,
49}
50#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
52pub struct GetSchemaResponse {
53 #[prost(uint64, tag = "1")]
54 pub schema_id: u64,
55 #[prost(uint32, tag = "2")]
56 pub version: u32,
57 #[prost(string, tag = "3")]
58 pub subject: ::prost::alloc::string::String,
59 #[prost(string, tag = "4")]
60 pub schema_type: ::prost::alloc::string::String,
61 #[prost(bytes = "vec", tag = "5")]
62 pub schema_definition: ::prost::alloc::vec::Vec<u8>,
63 #[prost(string, tag = "6")]
64 pub description: ::prost::alloc::string::String,
65 #[prost(uint64, tag = "7")]
66 pub created_at: u64,
67 #[prost(string, tag = "8")]
68 pub created_by: ::prost::alloc::string::String,
69 #[prost(string, repeated, tag = "9")]
70 pub tags: ::prost::alloc::vec::Vec<::prost::alloc::string::String>,
71 #[prost(string, tag = "10")]
72 pub fingerprint: ::prost::alloc::string::String,
73 #[prost(string, tag = "11")]
74 pub compatibility_mode: ::prost::alloc::string::String,
75}
76#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
78pub struct GetLatestSchemaRequest {
79 #[prost(string, tag = "1")]
81 pub subject: ::prost::alloc::string::String,
82}
83#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
85pub struct ListVersionsRequest {
86 #[prost(string, tag = "1")]
87 pub subject: ::prost::alloc::string::String,
88}
89#[derive(Clone, PartialEq, ::prost::Message)]
91pub struct ListVersionsResponse {
92 #[prost(message, repeated, tag = "1")]
93 pub versions: ::prost::alloc::vec::Vec<SchemaVersionInfo>,
94}
95#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
97pub struct SchemaVersionInfo {
98 #[prost(uint32, tag = "1")]
99 pub version: u32,
100 #[prost(uint64, tag = "2")]
101 pub created_at: u64,
102 #[prost(string, tag = "3")]
103 pub created_by: ::prost::alloc::string::String,
104 #[prost(string, tag = "4")]
105 pub description: ::prost::alloc::string::String,
106 #[prost(string, tag = "5")]
107 pub fingerprint: ::prost::alloc::string::String,
108 #[prost(uint64, tag = "6")]
109 pub schema_id: u64,
110}
111#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
113pub struct CheckCompatibilityRequest {
114 #[prost(string, tag = "1")]
115 pub subject: ::prost::alloc::string::String,
116 #[prost(bytes = "vec", tag = "2")]
117 pub new_schema_definition: ::prost::alloc::vec::Vec<u8>,
118 #[prost(string, tag = "3")]
119 pub schema_type: ::prost::alloc::string::String,
120 #[prost(string, optional, tag = "4")]
122 pub compatibility_mode: ::core::option::Option<::prost::alloc::string::String>,
123}
124#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
126pub struct CheckCompatibilityResponse {
127 #[prost(bool, tag = "1")]
128 pub is_compatible: bool,
129 #[prost(string, repeated, tag = "2")]
131 pub errors: ::prost::alloc::vec::Vec<::prost::alloc::string::String>,
132}
133#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
135pub struct DeleteSchemaVersionRequest {
136 #[prost(string, tag = "1")]
137 pub subject: ::prost::alloc::string::String,
138 #[prost(uint32, tag = "2")]
139 pub version: u32,
140}
141#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
143pub struct DeleteSchemaVersionResponse {
144 #[prost(bool, tag = "1")]
145 pub success: bool,
146 #[prost(string, tag = "2")]
147 pub message: ::prost::alloc::string::String,
148}
149#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
151pub struct SetCompatibilityModeRequest {
152 #[prost(string, tag = "1")]
153 pub subject: ::prost::alloc::string::String,
154 #[prost(string, tag = "2")]
156 pub compatibility_mode: ::prost::alloc::string::String,
157}
158#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
160pub struct SetCompatibilityModeResponse {
161 #[prost(bool, tag = "1")]
162 pub success: bool,
163 #[prost(string, tag = "2")]
164 pub message: ::prost::alloc::string::String,
165}
166#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
168pub struct ConfigureTopicSchemaRequest {
169 #[prost(string, tag = "1")]
171 pub topic_name: ::prost::alloc::string::String,
172 #[prost(string, tag = "2")]
174 pub schema_subject: ::prost::alloc::string::String,
175 #[prost(string, tag = "3")]
177 pub validation_policy: ::prost::alloc::string::String,
178 #[prost(bool, tag = "4")]
180 pub enable_payload_validation: bool,
181}
182#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
183pub struct ConfigureTopicSchemaResponse {
184 #[prost(bool, tag = "1")]
185 pub success: bool,
186 #[prost(string, tag = "2")]
187 pub message: ::prost::alloc::string::String,
188}
189#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
191pub struct UpdateTopicValidationPolicyRequest {
192 #[prost(string, tag = "1")]
193 pub topic_name: ::prost::alloc::string::String,
194 #[prost(string, tag = "2")]
196 pub validation_policy: ::prost::alloc::string::String,
197 #[prost(bool, tag = "3")]
198 pub enable_payload_validation: bool,
199}
200#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
201pub struct UpdateTopicValidationPolicyResponse {
202 #[prost(bool, tag = "1")]
203 pub success: bool,
204 #[prost(string, tag = "2")]
205 pub message: ::prost::alloc::string::String,
206}
207#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
209pub struct GetTopicSchemaConfigRequest {
210 #[prost(string, tag = "1")]
211 pub topic_name: ::prost::alloc::string::String,
212}
213#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
214pub struct GetTopicSchemaConfigResponse {
215 #[prost(string, tag = "1")]
217 pub schema_subject: ::prost::alloc::string::String,
218 #[prost(string, tag = "2")]
220 pub validation_policy: ::prost::alloc::string::String,
221 #[prost(bool, tag = "3")]
223 pub enable_payload_validation: bool,
224 #[prost(uint64, tag = "4")]
226 pub schema_id: u64,
227}
228pub mod schema_registry_client {
230 #![allow(
231 unused_variables,
232 dead_code,
233 missing_docs,
234 clippy::wildcard_imports,
235 clippy::let_unit_value,
236 )]
237 use tonic::codegen::*;
238 use tonic::codegen::http::Uri;
239 #[derive(Debug, Clone)]
241 pub struct SchemaRegistryClient<T> {
242 inner: tonic::client::Grpc<T>,
243 }
244 impl SchemaRegistryClient<tonic::transport::Channel> {
245 pub async fn connect<D>(dst: D) -> Result<Self, tonic::transport::Error>
247 where
248 D: TryInto<tonic::transport::Endpoint>,
249 D::Error: Into<StdError>,
250 {
251 let conn = tonic::transport::Endpoint::new(dst)?.connect().await?;
252 Ok(Self::new(conn))
253 }
254 }
255 impl<T> SchemaRegistryClient<T>
256 where
257 T: tonic::client::GrpcService<tonic::body::Body>,
258 T::Error: Into<StdError>,
259 T::ResponseBody: Body<Data = Bytes> + std::marker::Send + 'static,
260 <T::ResponseBody as Body>::Error: Into<StdError> + std::marker::Send,
261 {
262 pub fn new(inner: T) -> Self {
263 let inner = tonic::client::Grpc::new(inner);
264 Self { inner }
265 }
266 pub fn with_origin(inner: T, origin: Uri) -> Self {
267 let inner = tonic::client::Grpc::with_origin(inner, origin);
268 Self { inner }
269 }
270 pub fn with_interceptor<F>(
271 inner: T,
272 interceptor: F,
273 ) -> SchemaRegistryClient<InterceptedService<T, F>>
274 where
275 F: tonic::service::Interceptor,
276 T::ResponseBody: Default,
277 T: tonic::codegen::Service<
278 http::Request<tonic::body::Body>,
279 Response = http::Response<
280 <T as tonic::client::GrpcService<tonic::body::Body>>::ResponseBody,
281 >,
282 >,
283 <T as tonic::codegen::Service<
284 http::Request<tonic::body::Body>,
285 >>::Error: Into<StdError> + std::marker::Send + std::marker::Sync,
286 {
287 SchemaRegistryClient::new(InterceptedService::new(inner, interceptor))
288 }
289 #[must_use]
294 pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self {
295 self.inner = self.inner.send_compressed(encoding);
296 self
297 }
298 #[must_use]
300 pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self {
301 self.inner = self.inner.accept_compressed(encoding);
302 self
303 }
304 #[must_use]
308 pub fn max_decoding_message_size(mut self, limit: usize) -> Self {
309 self.inner = self.inner.max_decoding_message_size(limit);
310 self
311 }
312 #[must_use]
316 pub fn max_encoding_message_size(mut self, limit: usize) -> Self {
317 self.inner = self.inner.max_encoding_message_size(limit);
318 self
319 }
320 pub async fn register_schema(
322 &mut self,
323 request: impl tonic::IntoRequest<super::RegisterSchemaRequest>,
324 ) -> std::result::Result<
325 tonic::Response<super::RegisterSchemaResponse>,
326 tonic::Status,
327 > {
328 self.inner
329 .ready()
330 .await
331 .map_err(|e| {
332 tonic::Status::unknown(
333 format!("Service was not ready: {}", e.into()),
334 )
335 })?;
336 let codec = tonic_prost::ProstCodec::default();
337 let path = http::uri::PathAndQuery::from_static(
338 "/danube_schema.SchemaRegistry/RegisterSchema",
339 );
340 let mut req = request.into_request();
341 req.extensions_mut()
342 .insert(
343 GrpcMethod::new("danube_schema.SchemaRegistry", "RegisterSchema"),
344 );
345 self.inner.unary(req, path, codec).await
346 }
347 pub async fn get_schema(
349 &mut self,
350 request: impl tonic::IntoRequest<super::GetSchemaRequest>,
351 ) -> std::result::Result<
352 tonic::Response<super::GetSchemaResponse>,
353 tonic::Status,
354 > {
355 self.inner
356 .ready()
357 .await
358 .map_err(|e| {
359 tonic::Status::unknown(
360 format!("Service was not ready: {}", e.into()),
361 )
362 })?;
363 let codec = tonic_prost::ProstCodec::default();
364 let path = http::uri::PathAndQuery::from_static(
365 "/danube_schema.SchemaRegistry/GetSchema",
366 );
367 let mut req = request.into_request();
368 req.extensions_mut()
369 .insert(GrpcMethod::new("danube_schema.SchemaRegistry", "GetSchema"));
370 self.inner.unary(req, path, codec).await
371 }
372 pub async fn get_latest_schema(
374 &mut self,
375 request: impl tonic::IntoRequest<super::GetLatestSchemaRequest>,
376 ) -> std::result::Result<
377 tonic::Response<super::GetSchemaResponse>,
378 tonic::Status,
379 > {
380 self.inner
381 .ready()
382 .await
383 .map_err(|e| {
384 tonic::Status::unknown(
385 format!("Service was not ready: {}", e.into()),
386 )
387 })?;
388 let codec = tonic_prost::ProstCodec::default();
389 let path = http::uri::PathAndQuery::from_static(
390 "/danube_schema.SchemaRegistry/GetLatestSchema",
391 );
392 let mut req = request.into_request();
393 req.extensions_mut()
394 .insert(
395 GrpcMethod::new("danube_schema.SchemaRegistry", "GetLatestSchema"),
396 );
397 self.inner.unary(req, path, codec).await
398 }
399 pub async fn list_versions(
401 &mut self,
402 request: impl tonic::IntoRequest<super::ListVersionsRequest>,
403 ) -> std::result::Result<
404 tonic::Response<super::ListVersionsResponse>,
405 tonic::Status,
406 > {
407 self.inner
408 .ready()
409 .await
410 .map_err(|e| {
411 tonic::Status::unknown(
412 format!("Service was not ready: {}", e.into()),
413 )
414 })?;
415 let codec = tonic_prost::ProstCodec::default();
416 let path = http::uri::PathAndQuery::from_static(
417 "/danube_schema.SchemaRegistry/ListVersions",
418 );
419 let mut req = request.into_request();
420 req.extensions_mut()
421 .insert(GrpcMethod::new("danube_schema.SchemaRegistry", "ListVersions"));
422 self.inner.unary(req, path, codec).await
423 }
424 pub async fn check_compatibility(
426 &mut self,
427 request: impl tonic::IntoRequest<super::CheckCompatibilityRequest>,
428 ) -> std::result::Result<
429 tonic::Response<super::CheckCompatibilityResponse>,
430 tonic::Status,
431 > {
432 self.inner
433 .ready()
434 .await
435 .map_err(|e| {
436 tonic::Status::unknown(
437 format!("Service was not ready: {}", e.into()),
438 )
439 })?;
440 let codec = tonic_prost::ProstCodec::default();
441 let path = http::uri::PathAndQuery::from_static(
442 "/danube_schema.SchemaRegistry/CheckCompatibility",
443 );
444 let mut req = request.into_request();
445 req.extensions_mut()
446 .insert(
447 GrpcMethod::new("danube_schema.SchemaRegistry", "CheckCompatibility"),
448 );
449 self.inner.unary(req, path, codec).await
450 }
451 pub async fn delete_schema_version(
453 &mut self,
454 request: impl tonic::IntoRequest<super::DeleteSchemaVersionRequest>,
455 ) -> std::result::Result<
456 tonic::Response<super::DeleteSchemaVersionResponse>,
457 tonic::Status,
458 > {
459 self.inner
460 .ready()
461 .await
462 .map_err(|e| {
463 tonic::Status::unknown(
464 format!("Service was not ready: {}", e.into()),
465 )
466 })?;
467 let codec = tonic_prost::ProstCodec::default();
468 let path = http::uri::PathAndQuery::from_static(
469 "/danube_schema.SchemaRegistry/DeleteSchemaVersion",
470 );
471 let mut req = request.into_request();
472 req.extensions_mut()
473 .insert(
474 GrpcMethod::new(
475 "danube_schema.SchemaRegistry",
476 "DeleteSchemaVersion",
477 ),
478 );
479 self.inner.unary(req, path, codec).await
480 }
481 pub async fn set_compatibility_mode(
483 &mut self,
484 request: impl tonic::IntoRequest<super::SetCompatibilityModeRequest>,
485 ) -> std::result::Result<
486 tonic::Response<super::SetCompatibilityModeResponse>,
487 tonic::Status,
488 > {
489 self.inner
490 .ready()
491 .await
492 .map_err(|e| {
493 tonic::Status::unknown(
494 format!("Service was not ready: {}", e.into()),
495 )
496 })?;
497 let codec = tonic_prost::ProstCodec::default();
498 let path = http::uri::PathAndQuery::from_static(
499 "/danube_schema.SchemaRegistry/SetCompatibilityMode",
500 );
501 let mut req = request.into_request();
502 req.extensions_mut()
503 .insert(
504 GrpcMethod::new(
505 "danube_schema.SchemaRegistry",
506 "SetCompatibilityMode",
507 ),
508 );
509 self.inner.unary(req, path, codec).await
510 }
511 pub async fn configure_topic_schema(
514 &mut self,
515 request: impl tonic::IntoRequest<super::ConfigureTopicSchemaRequest>,
516 ) -> std::result::Result<
517 tonic::Response<super::ConfigureTopicSchemaResponse>,
518 tonic::Status,
519 > {
520 self.inner
521 .ready()
522 .await
523 .map_err(|e| {
524 tonic::Status::unknown(
525 format!("Service was not ready: {}", e.into()),
526 )
527 })?;
528 let codec = tonic_prost::ProstCodec::default();
529 let path = http::uri::PathAndQuery::from_static(
530 "/danube_schema.SchemaRegistry/ConfigureTopicSchema",
531 );
532 let mut req = request.into_request();
533 req.extensions_mut()
534 .insert(
535 GrpcMethod::new(
536 "danube_schema.SchemaRegistry",
537 "ConfigureTopicSchema",
538 ),
539 );
540 self.inner.unary(req, path, codec).await
541 }
542 pub async fn update_topic_validation_policy(
544 &mut self,
545 request: impl tonic::IntoRequest<super::UpdateTopicValidationPolicyRequest>,
546 ) -> std::result::Result<
547 tonic::Response<super::UpdateTopicValidationPolicyResponse>,
548 tonic::Status,
549 > {
550 self.inner
551 .ready()
552 .await
553 .map_err(|e| {
554 tonic::Status::unknown(
555 format!("Service was not ready: {}", e.into()),
556 )
557 })?;
558 let codec = tonic_prost::ProstCodec::default();
559 let path = http::uri::PathAndQuery::from_static(
560 "/danube_schema.SchemaRegistry/UpdateTopicValidationPolicy",
561 );
562 let mut req = request.into_request();
563 req.extensions_mut()
564 .insert(
565 GrpcMethod::new(
566 "danube_schema.SchemaRegistry",
567 "UpdateTopicValidationPolicy",
568 ),
569 );
570 self.inner.unary(req, path, codec).await
571 }
572 pub async fn get_topic_schema_config(
574 &mut self,
575 request: impl tonic::IntoRequest<super::GetTopicSchemaConfigRequest>,
576 ) -> std::result::Result<
577 tonic::Response<super::GetTopicSchemaConfigResponse>,
578 tonic::Status,
579 > {
580 self.inner
581 .ready()
582 .await
583 .map_err(|e| {
584 tonic::Status::unknown(
585 format!("Service was not ready: {}", e.into()),
586 )
587 })?;
588 let codec = tonic_prost::ProstCodec::default();
589 let path = http::uri::PathAndQuery::from_static(
590 "/danube_schema.SchemaRegistry/GetTopicSchemaConfig",
591 );
592 let mut req = request.into_request();
593 req.extensions_mut()
594 .insert(
595 GrpcMethod::new(
596 "danube_schema.SchemaRegistry",
597 "GetTopicSchemaConfig",
598 ),
599 );
600 self.inner.unary(req, path, codec).await
601 }
602 }
603}
604pub mod schema_registry_server {
606 #![allow(
607 unused_variables,
608 dead_code,
609 missing_docs,
610 clippy::wildcard_imports,
611 clippy::let_unit_value,
612 )]
613 use tonic::codegen::*;
614 #[async_trait]
616 pub trait SchemaRegistry: std::marker::Send + std::marker::Sync + 'static {
617 async fn register_schema(
619 &self,
620 request: tonic::Request<super::RegisterSchemaRequest>,
621 ) -> std::result::Result<
622 tonic::Response<super::RegisterSchemaResponse>,
623 tonic::Status,
624 >;
625 async fn get_schema(
627 &self,
628 request: tonic::Request<super::GetSchemaRequest>,
629 ) -> std::result::Result<
630 tonic::Response<super::GetSchemaResponse>,
631 tonic::Status,
632 >;
633 async fn get_latest_schema(
635 &self,
636 request: tonic::Request<super::GetLatestSchemaRequest>,
637 ) -> std::result::Result<
638 tonic::Response<super::GetSchemaResponse>,
639 tonic::Status,
640 >;
641 async fn list_versions(
643 &self,
644 request: tonic::Request<super::ListVersionsRequest>,
645 ) -> std::result::Result<
646 tonic::Response<super::ListVersionsResponse>,
647 tonic::Status,
648 >;
649 async fn check_compatibility(
651 &self,
652 request: tonic::Request<super::CheckCompatibilityRequest>,
653 ) -> std::result::Result<
654 tonic::Response<super::CheckCompatibilityResponse>,
655 tonic::Status,
656 >;
657 async fn delete_schema_version(
659 &self,
660 request: tonic::Request<super::DeleteSchemaVersionRequest>,
661 ) -> std::result::Result<
662 tonic::Response<super::DeleteSchemaVersionResponse>,
663 tonic::Status,
664 >;
665 async fn set_compatibility_mode(
667 &self,
668 request: tonic::Request<super::SetCompatibilityModeRequest>,
669 ) -> std::result::Result<
670 tonic::Response<super::SetCompatibilityModeResponse>,
671 tonic::Status,
672 >;
673 async fn configure_topic_schema(
676 &self,
677 request: tonic::Request<super::ConfigureTopicSchemaRequest>,
678 ) -> std::result::Result<
679 tonic::Response<super::ConfigureTopicSchemaResponse>,
680 tonic::Status,
681 >;
682 async fn update_topic_validation_policy(
684 &self,
685 request: tonic::Request<super::UpdateTopicValidationPolicyRequest>,
686 ) -> std::result::Result<
687 tonic::Response<super::UpdateTopicValidationPolicyResponse>,
688 tonic::Status,
689 >;
690 async fn get_topic_schema_config(
692 &self,
693 request: tonic::Request<super::GetTopicSchemaConfigRequest>,
694 ) -> std::result::Result<
695 tonic::Response<super::GetTopicSchemaConfigResponse>,
696 tonic::Status,
697 >;
698 }
699 #[derive(Debug)]
701 pub struct SchemaRegistryServer<T> {
702 inner: Arc<T>,
703 accept_compression_encodings: EnabledCompressionEncodings,
704 send_compression_encodings: EnabledCompressionEncodings,
705 max_decoding_message_size: Option<usize>,
706 max_encoding_message_size: Option<usize>,
707 }
708 impl<T> SchemaRegistryServer<T> {
709 pub fn new(inner: T) -> Self {
710 Self::from_arc(Arc::new(inner))
711 }
712 pub fn from_arc(inner: Arc<T>) -> Self {
713 Self {
714 inner,
715 accept_compression_encodings: Default::default(),
716 send_compression_encodings: Default::default(),
717 max_decoding_message_size: None,
718 max_encoding_message_size: None,
719 }
720 }
721 pub fn with_interceptor<F>(
722 inner: T,
723 interceptor: F,
724 ) -> InterceptedService<Self, F>
725 where
726 F: tonic::service::Interceptor,
727 {
728 InterceptedService::new(Self::new(inner), interceptor)
729 }
730 #[must_use]
732 pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self {
733 self.accept_compression_encodings.enable(encoding);
734 self
735 }
736 #[must_use]
738 pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self {
739 self.send_compression_encodings.enable(encoding);
740 self
741 }
742 #[must_use]
746 pub fn max_decoding_message_size(mut self, limit: usize) -> Self {
747 self.max_decoding_message_size = Some(limit);
748 self
749 }
750 #[must_use]
754 pub fn max_encoding_message_size(mut self, limit: usize) -> Self {
755 self.max_encoding_message_size = Some(limit);
756 self
757 }
758 }
759 impl<T, B> tonic::codegen::Service<http::Request<B>> for SchemaRegistryServer<T>
760 where
761 T: SchemaRegistry,
762 B: Body + std::marker::Send + 'static,
763 B::Error: Into<StdError> + std::marker::Send + 'static,
764 {
765 type Response = http::Response<tonic::body::Body>;
766 type Error = std::convert::Infallible;
767 type Future = BoxFuture<Self::Response, Self::Error>;
768 fn poll_ready(
769 &mut self,
770 _cx: &mut Context<'_>,
771 ) -> Poll<std::result::Result<(), Self::Error>> {
772 Poll::Ready(Ok(()))
773 }
774 fn call(&mut self, req: http::Request<B>) -> Self::Future {
775 match req.uri().path() {
776 "/danube_schema.SchemaRegistry/RegisterSchema" => {
777 #[allow(non_camel_case_types)]
778 struct RegisterSchemaSvc<T: SchemaRegistry>(pub Arc<T>);
779 impl<
780 T: SchemaRegistry,
781 > tonic::server::UnaryService<super::RegisterSchemaRequest>
782 for RegisterSchemaSvc<T> {
783 type Response = super::RegisterSchemaResponse;
784 type Future = BoxFuture<
785 tonic::Response<Self::Response>,
786 tonic::Status,
787 >;
788 fn call(
789 &mut self,
790 request: tonic::Request<super::RegisterSchemaRequest>,
791 ) -> Self::Future {
792 let inner = Arc::clone(&self.0);
793 let fut = async move {
794 <T as SchemaRegistry>::register_schema(&inner, request)
795 .await
796 };
797 Box::pin(fut)
798 }
799 }
800 let accept_compression_encodings = self.accept_compression_encodings;
801 let send_compression_encodings = self.send_compression_encodings;
802 let max_decoding_message_size = self.max_decoding_message_size;
803 let max_encoding_message_size = self.max_encoding_message_size;
804 let inner = self.inner.clone();
805 let fut = async move {
806 let method = RegisterSchemaSvc(inner);
807 let codec = tonic_prost::ProstCodec::default();
808 let mut grpc = tonic::server::Grpc::new(codec)
809 .apply_compression_config(
810 accept_compression_encodings,
811 send_compression_encodings,
812 )
813 .apply_max_message_size_config(
814 max_decoding_message_size,
815 max_encoding_message_size,
816 );
817 let res = grpc.unary(method, req).await;
818 Ok(res)
819 };
820 Box::pin(fut)
821 }
822 "/danube_schema.SchemaRegistry/GetSchema" => {
823 #[allow(non_camel_case_types)]
824 struct GetSchemaSvc<T: SchemaRegistry>(pub Arc<T>);
825 impl<
826 T: SchemaRegistry,
827 > tonic::server::UnaryService<super::GetSchemaRequest>
828 for GetSchemaSvc<T> {
829 type Response = super::GetSchemaResponse;
830 type Future = BoxFuture<
831 tonic::Response<Self::Response>,
832 tonic::Status,
833 >;
834 fn call(
835 &mut self,
836 request: tonic::Request<super::GetSchemaRequest>,
837 ) -> Self::Future {
838 let inner = Arc::clone(&self.0);
839 let fut = async move {
840 <T as SchemaRegistry>::get_schema(&inner, request).await
841 };
842 Box::pin(fut)
843 }
844 }
845 let accept_compression_encodings = self.accept_compression_encodings;
846 let send_compression_encodings = self.send_compression_encodings;
847 let max_decoding_message_size = self.max_decoding_message_size;
848 let max_encoding_message_size = self.max_encoding_message_size;
849 let inner = self.inner.clone();
850 let fut = async move {
851 let method = GetSchemaSvc(inner);
852 let codec = tonic_prost::ProstCodec::default();
853 let mut grpc = tonic::server::Grpc::new(codec)
854 .apply_compression_config(
855 accept_compression_encodings,
856 send_compression_encodings,
857 )
858 .apply_max_message_size_config(
859 max_decoding_message_size,
860 max_encoding_message_size,
861 );
862 let res = grpc.unary(method, req).await;
863 Ok(res)
864 };
865 Box::pin(fut)
866 }
867 "/danube_schema.SchemaRegistry/GetLatestSchema" => {
868 #[allow(non_camel_case_types)]
869 struct GetLatestSchemaSvc<T: SchemaRegistry>(pub Arc<T>);
870 impl<
871 T: SchemaRegistry,
872 > tonic::server::UnaryService<super::GetLatestSchemaRequest>
873 for GetLatestSchemaSvc<T> {
874 type Response = super::GetSchemaResponse;
875 type Future = BoxFuture<
876 tonic::Response<Self::Response>,
877 tonic::Status,
878 >;
879 fn call(
880 &mut self,
881 request: tonic::Request<super::GetLatestSchemaRequest>,
882 ) -> Self::Future {
883 let inner = Arc::clone(&self.0);
884 let fut = async move {
885 <T as SchemaRegistry>::get_latest_schema(&inner, request)
886 .await
887 };
888 Box::pin(fut)
889 }
890 }
891 let accept_compression_encodings = self.accept_compression_encodings;
892 let send_compression_encodings = self.send_compression_encodings;
893 let max_decoding_message_size = self.max_decoding_message_size;
894 let max_encoding_message_size = self.max_encoding_message_size;
895 let inner = self.inner.clone();
896 let fut = async move {
897 let method = GetLatestSchemaSvc(inner);
898 let codec = tonic_prost::ProstCodec::default();
899 let mut grpc = tonic::server::Grpc::new(codec)
900 .apply_compression_config(
901 accept_compression_encodings,
902 send_compression_encodings,
903 )
904 .apply_max_message_size_config(
905 max_decoding_message_size,
906 max_encoding_message_size,
907 );
908 let res = grpc.unary(method, req).await;
909 Ok(res)
910 };
911 Box::pin(fut)
912 }
913 "/danube_schema.SchemaRegistry/ListVersions" => {
914 #[allow(non_camel_case_types)]
915 struct ListVersionsSvc<T: SchemaRegistry>(pub Arc<T>);
916 impl<
917 T: SchemaRegistry,
918 > tonic::server::UnaryService<super::ListVersionsRequest>
919 for ListVersionsSvc<T> {
920 type Response = super::ListVersionsResponse;
921 type Future = BoxFuture<
922 tonic::Response<Self::Response>,
923 tonic::Status,
924 >;
925 fn call(
926 &mut self,
927 request: tonic::Request<super::ListVersionsRequest>,
928 ) -> Self::Future {
929 let inner = Arc::clone(&self.0);
930 let fut = async move {
931 <T as SchemaRegistry>::list_versions(&inner, request).await
932 };
933 Box::pin(fut)
934 }
935 }
936 let accept_compression_encodings = self.accept_compression_encodings;
937 let send_compression_encodings = self.send_compression_encodings;
938 let max_decoding_message_size = self.max_decoding_message_size;
939 let max_encoding_message_size = self.max_encoding_message_size;
940 let inner = self.inner.clone();
941 let fut = async move {
942 let method = ListVersionsSvc(inner);
943 let codec = tonic_prost::ProstCodec::default();
944 let mut grpc = tonic::server::Grpc::new(codec)
945 .apply_compression_config(
946 accept_compression_encodings,
947 send_compression_encodings,
948 )
949 .apply_max_message_size_config(
950 max_decoding_message_size,
951 max_encoding_message_size,
952 );
953 let res = grpc.unary(method, req).await;
954 Ok(res)
955 };
956 Box::pin(fut)
957 }
958 "/danube_schema.SchemaRegistry/CheckCompatibility" => {
959 #[allow(non_camel_case_types)]
960 struct CheckCompatibilitySvc<T: SchemaRegistry>(pub Arc<T>);
961 impl<
962 T: SchemaRegistry,
963 > tonic::server::UnaryService<super::CheckCompatibilityRequest>
964 for CheckCompatibilitySvc<T> {
965 type Response = super::CheckCompatibilityResponse;
966 type Future = BoxFuture<
967 tonic::Response<Self::Response>,
968 tonic::Status,
969 >;
970 fn call(
971 &mut self,
972 request: tonic::Request<super::CheckCompatibilityRequest>,
973 ) -> Self::Future {
974 let inner = Arc::clone(&self.0);
975 let fut = async move {
976 <T as SchemaRegistry>::check_compatibility(&inner, request)
977 .await
978 };
979 Box::pin(fut)
980 }
981 }
982 let accept_compression_encodings = self.accept_compression_encodings;
983 let send_compression_encodings = self.send_compression_encodings;
984 let max_decoding_message_size = self.max_decoding_message_size;
985 let max_encoding_message_size = self.max_encoding_message_size;
986 let inner = self.inner.clone();
987 let fut = async move {
988 let method = CheckCompatibilitySvc(inner);
989 let codec = tonic_prost::ProstCodec::default();
990 let mut grpc = tonic::server::Grpc::new(codec)
991 .apply_compression_config(
992 accept_compression_encodings,
993 send_compression_encodings,
994 )
995 .apply_max_message_size_config(
996 max_decoding_message_size,
997 max_encoding_message_size,
998 );
999 let res = grpc.unary(method, req).await;
1000 Ok(res)
1001 };
1002 Box::pin(fut)
1003 }
1004 "/danube_schema.SchemaRegistry/DeleteSchemaVersion" => {
1005 #[allow(non_camel_case_types)]
1006 struct DeleteSchemaVersionSvc<T: SchemaRegistry>(pub Arc<T>);
1007 impl<
1008 T: SchemaRegistry,
1009 > tonic::server::UnaryService<super::DeleteSchemaVersionRequest>
1010 for DeleteSchemaVersionSvc<T> {
1011 type Response = super::DeleteSchemaVersionResponse;
1012 type Future = BoxFuture<
1013 tonic::Response<Self::Response>,
1014 tonic::Status,
1015 >;
1016 fn call(
1017 &mut self,
1018 request: tonic::Request<super::DeleteSchemaVersionRequest>,
1019 ) -> Self::Future {
1020 let inner = Arc::clone(&self.0);
1021 let fut = async move {
1022 <T as SchemaRegistry>::delete_schema_version(
1023 &inner,
1024 request,
1025 )
1026 .await
1027 };
1028 Box::pin(fut)
1029 }
1030 }
1031 let accept_compression_encodings = self.accept_compression_encodings;
1032 let send_compression_encodings = self.send_compression_encodings;
1033 let max_decoding_message_size = self.max_decoding_message_size;
1034 let max_encoding_message_size = self.max_encoding_message_size;
1035 let inner = self.inner.clone();
1036 let fut = async move {
1037 let method = DeleteSchemaVersionSvc(inner);
1038 let codec = tonic_prost::ProstCodec::default();
1039 let mut grpc = tonic::server::Grpc::new(codec)
1040 .apply_compression_config(
1041 accept_compression_encodings,
1042 send_compression_encodings,
1043 )
1044 .apply_max_message_size_config(
1045 max_decoding_message_size,
1046 max_encoding_message_size,
1047 );
1048 let res = grpc.unary(method, req).await;
1049 Ok(res)
1050 };
1051 Box::pin(fut)
1052 }
1053 "/danube_schema.SchemaRegistry/SetCompatibilityMode" => {
1054 #[allow(non_camel_case_types)]
1055 struct SetCompatibilityModeSvc<T: SchemaRegistry>(pub Arc<T>);
1056 impl<
1057 T: SchemaRegistry,
1058 > tonic::server::UnaryService<super::SetCompatibilityModeRequest>
1059 for SetCompatibilityModeSvc<T> {
1060 type Response = super::SetCompatibilityModeResponse;
1061 type Future = BoxFuture<
1062 tonic::Response<Self::Response>,
1063 tonic::Status,
1064 >;
1065 fn call(
1066 &mut self,
1067 request: tonic::Request<super::SetCompatibilityModeRequest>,
1068 ) -> Self::Future {
1069 let inner = Arc::clone(&self.0);
1070 let fut = async move {
1071 <T as SchemaRegistry>::set_compatibility_mode(
1072 &inner,
1073 request,
1074 )
1075 .await
1076 };
1077 Box::pin(fut)
1078 }
1079 }
1080 let accept_compression_encodings = self.accept_compression_encodings;
1081 let send_compression_encodings = self.send_compression_encodings;
1082 let max_decoding_message_size = self.max_decoding_message_size;
1083 let max_encoding_message_size = self.max_encoding_message_size;
1084 let inner = self.inner.clone();
1085 let fut = async move {
1086 let method = SetCompatibilityModeSvc(inner);
1087 let codec = tonic_prost::ProstCodec::default();
1088 let mut grpc = tonic::server::Grpc::new(codec)
1089 .apply_compression_config(
1090 accept_compression_encodings,
1091 send_compression_encodings,
1092 )
1093 .apply_max_message_size_config(
1094 max_decoding_message_size,
1095 max_encoding_message_size,
1096 );
1097 let res = grpc.unary(method, req).await;
1098 Ok(res)
1099 };
1100 Box::pin(fut)
1101 }
1102 "/danube_schema.SchemaRegistry/ConfigureTopicSchema" => {
1103 #[allow(non_camel_case_types)]
1104 struct ConfigureTopicSchemaSvc<T: SchemaRegistry>(pub Arc<T>);
1105 impl<
1106 T: SchemaRegistry,
1107 > tonic::server::UnaryService<super::ConfigureTopicSchemaRequest>
1108 for ConfigureTopicSchemaSvc<T> {
1109 type Response = super::ConfigureTopicSchemaResponse;
1110 type Future = BoxFuture<
1111 tonic::Response<Self::Response>,
1112 tonic::Status,
1113 >;
1114 fn call(
1115 &mut self,
1116 request: tonic::Request<super::ConfigureTopicSchemaRequest>,
1117 ) -> Self::Future {
1118 let inner = Arc::clone(&self.0);
1119 let fut = async move {
1120 <T as SchemaRegistry>::configure_topic_schema(
1121 &inner,
1122 request,
1123 )
1124 .await
1125 };
1126 Box::pin(fut)
1127 }
1128 }
1129 let accept_compression_encodings = self.accept_compression_encodings;
1130 let send_compression_encodings = self.send_compression_encodings;
1131 let max_decoding_message_size = self.max_decoding_message_size;
1132 let max_encoding_message_size = self.max_encoding_message_size;
1133 let inner = self.inner.clone();
1134 let fut = async move {
1135 let method = ConfigureTopicSchemaSvc(inner);
1136 let codec = tonic_prost::ProstCodec::default();
1137 let mut grpc = tonic::server::Grpc::new(codec)
1138 .apply_compression_config(
1139 accept_compression_encodings,
1140 send_compression_encodings,
1141 )
1142 .apply_max_message_size_config(
1143 max_decoding_message_size,
1144 max_encoding_message_size,
1145 );
1146 let res = grpc.unary(method, req).await;
1147 Ok(res)
1148 };
1149 Box::pin(fut)
1150 }
1151 "/danube_schema.SchemaRegistry/UpdateTopicValidationPolicy" => {
1152 #[allow(non_camel_case_types)]
1153 struct UpdateTopicValidationPolicySvc<T: SchemaRegistry>(pub Arc<T>);
1154 impl<
1155 T: SchemaRegistry,
1156 > tonic::server::UnaryService<
1157 super::UpdateTopicValidationPolicyRequest,
1158 > for UpdateTopicValidationPolicySvc<T> {
1159 type Response = super::UpdateTopicValidationPolicyResponse;
1160 type Future = BoxFuture<
1161 tonic::Response<Self::Response>,
1162 tonic::Status,
1163 >;
1164 fn call(
1165 &mut self,
1166 request: tonic::Request<
1167 super::UpdateTopicValidationPolicyRequest,
1168 >,
1169 ) -> Self::Future {
1170 let inner = Arc::clone(&self.0);
1171 let fut = async move {
1172 <T as SchemaRegistry>::update_topic_validation_policy(
1173 &inner,
1174 request,
1175 )
1176 .await
1177 };
1178 Box::pin(fut)
1179 }
1180 }
1181 let accept_compression_encodings = self.accept_compression_encodings;
1182 let send_compression_encodings = self.send_compression_encodings;
1183 let max_decoding_message_size = self.max_decoding_message_size;
1184 let max_encoding_message_size = self.max_encoding_message_size;
1185 let inner = self.inner.clone();
1186 let fut = async move {
1187 let method = UpdateTopicValidationPolicySvc(inner);
1188 let codec = tonic_prost::ProstCodec::default();
1189 let mut grpc = tonic::server::Grpc::new(codec)
1190 .apply_compression_config(
1191 accept_compression_encodings,
1192 send_compression_encodings,
1193 )
1194 .apply_max_message_size_config(
1195 max_decoding_message_size,
1196 max_encoding_message_size,
1197 );
1198 let res = grpc.unary(method, req).await;
1199 Ok(res)
1200 };
1201 Box::pin(fut)
1202 }
1203 "/danube_schema.SchemaRegistry/GetTopicSchemaConfig" => {
1204 #[allow(non_camel_case_types)]
1205 struct GetTopicSchemaConfigSvc<T: SchemaRegistry>(pub Arc<T>);
1206 impl<
1207 T: SchemaRegistry,
1208 > tonic::server::UnaryService<super::GetTopicSchemaConfigRequest>
1209 for GetTopicSchemaConfigSvc<T> {
1210 type Response = super::GetTopicSchemaConfigResponse;
1211 type Future = BoxFuture<
1212 tonic::Response<Self::Response>,
1213 tonic::Status,
1214 >;
1215 fn call(
1216 &mut self,
1217 request: tonic::Request<super::GetTopicSchemaConfigRequest>,
1218 ) -> Self::Future {
1219 let inner = Arc::clone(&self.0);
1220 let fut = async move {
1221 <T as SchemaRegistry>::get_topic_schema_config(
1222 &inner,
1223 request,
1224 )
1225 .await
1226 };
1227 Box::pin(fut)
1228 }
1229 }
1230 let accept_compression_encodings = self.accept_compression_encodings;
1231 let send_compression_encodings = self.send_compression_encodings;
1232 let max_decoding_message_size = self.max_decoding_message_size;
1233 let max_encoding_message_size = self.max_encoding_message_size;
1234 let inner = self.inner.clone();
1235 let fut = async move {
1236 let method = GetTopicSchemaConfigSvc(inner);
1237 let codec = tonic_prost::ProstCodec::default();
1238 let mut grpc = tonic::server::Grpc::new(codec)
1239 .apply_compression_config(
1240 accept_compression_encodings,
1241 send_compression_encodings,
1242 )
1243 .apply_max_message_size_config(
1244 max_decoding_message_size,
1245 max_encoding_message_size,
1246 );
1247 let res = grpc.unary(method, req).await;
1248 Ok(res)
1249 };
1250 Box::pin(fut)
1251 }
1252 _ => {
1253 Box::pin(async move {
1254 let mut response = http::Response::new(
1255 tonic::body::Body::default(),
1256 );
1257 let headers = response.headers_mut();
1258 headers
1259 .insert(
1260 tonic::Status::GRPC_STATUS,
1261 (tonic::Code::Unimplemented as i32).into(),
1262 );
1263 headers
1264 .insert(
1265 http::header::CONTENT_TYPE,
1266 tonic::metadata::GRPC_CONTENT_TYPE,
1267 );
1268 Ok(response)
1269 })
1270 }
1271 }
1272 }
1273 }
1274 impl<T> Clone for SchemaRegistryServer<T> {
1275 fn clone(&self) -> Self {
1276 let inner = self.inner.clone();
1277 Self {
1278 inner,
1279 accept_compression_encodings: self.accept_compression_encodings,
1280 send_compression_encodings: self.send_compression_encodings,
1281 max_decoding_message_size: self.max_decoding_message_size,
1282 max_encoding_message_size: self.max_encoding_message_size,
1283 }
1284 }
1285 }
1286 pub const SERVICE_NAME: &str = "danube_schema.SchemaRegistry";
1288 impl<T> tonic::server::NamedService for SchemaRegistryServer<T> {
1289 const NAME: &'static str = SERVICE_NAME;
1290 }
1291}