Skip to main content

camel_component_grpc/
component.rs

1use std::path::PathBuf;
2use std::sync::Arc;
3
4use camel_component_api::{
5    BoxProcessor, CamelError, Component, ComponentContext, Consumer, Endpoint, ProducerContext,
6    RuntimeObservability,
7};
8
9use crate::config::{GrpcConfig, parse_grpc_uri};
10use crate::consumer::{GrpcConsumer, resolve_grpc_mode};
11use crate::health::GrpcHealthCheck;
12use crate::producer::GrpcProducer;
13
14pub struct GrpcComponent;
15
16impl GrpcComponent {
17    pub fn new() -> Self {
18        Self
19    }
20}
21
22impl Default for GrpcComponent {
23    fn default() -> Self {
24        Self::new()
25    }
26}
27
28impl Component for GrpcComponent {
29    fn scheme(&self) -> &str {
30        "grpc"
31    }
32
33    fn create_endpoint(
34        &self,
35        uri: &str,
36        ctx: &dyn ComponentContext,
37    ) -> Result<Box<dyn Endpoint>, CamelError> {
38        let (host, port, service_name, method_name, cfg) = parse_grpc_uri(uri)?;
39        let proto_file = cfg.proto_file.clone().ok_or_else(|| {
40            CamelError::EndpointCreationFailed(
41                "missing required query parameter: protoFile".to_string(),
42            )
43        })?;
44        let addr = format!("http://{host}:{port}");
45
46        let health_check = GrpcHealthCheck::new(host.clone(), port);
47        ctx.register_current_route_health_check(Arc::new(health_check));
48
49        Ok(Box::new(GrpcEndpoint {
50            uri: uri.to_string(),
51            addr,
52            host,
53            port,
54            proto_path: PathBuf::from(proto_file),
55            service_name,
56            method_name,
57            deadline_ms: cfg.deadline_ms,
58            config: cfg,
59        }))
60    }
61}
62
63struct GrpcEndpoint {
64    uri: String,
65    addr: String,
66    host: String,
67    port: u16,
68    proto_path: PathBuf,
69    service_name: String,
70    method_name: String,
71    deadline_ms: Option<u64>,
72    config: GrpcConfig,
73}
74
75impl Endpoint for GrpcEndpoint {
76    fn uri(&self) -> &str {
77        &self.uri
78    }
79
80    fn create_consumer(
81        &self,
82        rt: Arc<dyn RuntimeObservability>,
83    ) -> Result<Box<dyn Consumer>, CamelError> {
84        let path = format!("/{}/{}", self.service_name, self.method_name);
85        let mode = resolve_grpc_mode(&self.proto_path, &self.service_name, &self.method_name)?;
86        Ok(Box::new(GrpcConsumer::new(
87            self.host.clone(),
88            self.port,
89            path,
90            self.proto_path.clone(),
91            self.service_name.clone(),
92            self.method_name.clone(),
93            mode,
94            rt,
95        )))
96    }
97
98    fn create_producer(
99        &self,
100        rt: Arc<dyn RuntimeObservability>,
101        ctx: &ProducerContext,
102    ) -> Result<BoxProcessor, CamelError> {
103        let mode = resolve_grpc_mode(&self.proto_path, &self.service_name, &self.method_name)?;
104        // route_id may not be set in test scenarios (e.g., standalone endpoint tests)
105        let route_id = ctx.route_id().unwrap_or("unknown");
106        let producer = GrpcProducer::new(
107            self.addr.clone(),
108            self.proto_path.clone(),
109            self.service_name.clone(),
110            self.method_name.clone(),
111            mode,
112            self.deadline_ms,
113            &self.config,
114            rt,
115            route_id,
116        )?;
117        Ok(BoxProcessor::new(producer))
118    }
119}