camel_component_grpc/
component.rs1use std::path::PathBuf;
2use std::sync::Arc;
3
4use camel_component_api::{
5 BoxProcessor, CamelError, Component, ComponentContext, Consumer, Endpoint, ProducerContext,
6 RuntimeObservability,
7};
8
9use crate::config::{GrpcConfig, parse_grpc_uri};
10use crate::consumer::{GrpcConsumer, resolve_grpc_mode};
11use crate::health::GrpcHealthCheck;
12use crate::producer::GrpcProducer;
13
14pub struct GrpcComponent;
15
16impl GrpcComponent {
17 pub fn new() -> Self {
18 Self
19 }
20}
21
22impl Default for GrpcComponent {
23 fn default() -> Self {
24 Self::new()
25 }
26}
27
28impl Component for GrpcComponent {
29 fn scheme(&self) -> &str {
30 "grpc"
31 }
32
33 fn create_endpoint(
34 &self,
35 uri: &str,
36 ctx: &dyn ComponentContext,
37 ) -> Result<Box<dyn Endpoint>, CamelError> {
38 let (host, port, service_name, method_name, cfg) = parse_grpc_uri(uri)?;
39 let proto_file = cfg.proto_file.clone().ok_or_else(|| {
40 CamelError::EndpointCreationFailed(
41 "missing required query parameter: protoFile".to_string(),
42 )
43 })?;
44 let addr = format!("http://{host}:{port}");
45
46 let health_check = GrpcHealthCheck::new(host.clone(), port);
47 ctx.register_current_route_health_check(Arc::new(health_check));
48
49 Ok(Box::new(GrpcEndpoint {
50 uri: uri.to_string(),
51 addr,
52 host,
53 port,
54 proto_path: PathBuf::from(proto_file),
55 service_name,
56 method_name,
57 deadline_ms: cfg.deadline_ms,
58 config: cfg,
59 }))
60 }
61}
62
63struct GrpcEndpoint {
64 uri: String,
65 addr: String,
66 host: String,
67 port: u16,
68 proto_path: PathBuf,
69 service_name: String,
70 method_name: String,
71 deadline_ms: Option<u64>,
72 config: GrpcConfig,
73}
74
75impl Endpoint for GrpcEndpoint {
76 fn uri(&self) -> &str {
77 &self.uri
78 }
79
80 fn create_consumer(
81 &self,
82 rt: Arc<dyn RuntimeObservability>,
83 ) -> Result<Box<dyn Consumer>, CamelError> {
84 let path = format!("/{}/{}", self.service_name, self.method_name);
85 let mode = resolve_grpc_mode(&self.proto_path, &self.service_name, &self.method_name)?;
86 Ok(Box::new(GrpcConsumer::new(
87 self.host.clone(),
88 self.port,
89 path,
90 self.proto_path.clone(),
91 self.service_name.clone(),
92 self.method_name.clone(),
93 mode,
94 rt,
95 )))
96 }
97
98 fn create_producer(
99 &self,
100 rt: Arc<dyn RuntimeObservability>,
101 ctx: &ProducerContext,
102 ) -> Result<BoxProcessor, CamelError> {
103 let mode = resolve_grpc_mode(&self.proto_path, &self.service_name, &self.method_name)?;
104 let route_id = ctx.route_id().unwrap_or("unknown");
106 let producer = GrpcProducer::new(
107 self.addr.clone(),
108 self.proto_path.clone(),
109 self.service_name.clone(),
110 self.method_name.clone(),
111 mode,
112 self.deadline_ms,
113 &self.config,
114 rt,
115 route_id,
116 )?;
117 Ok(BoxProcessor::new(producer))
118 }
119}