Skip to main content

camel_component_api/
endpoint.rs

1use std::sync::Arc;
2use std::time::Duration;
3
4use async_trait::async_trait;
5use camel_api::{BodyType, BoxProcessor, CamelError, Exchange};
6
7use crate::ProducerContext;
8use crate::consumer::Consumer;
9use crate::runtime_observability::RuntimeObservability;
10
11/// A polling consumer receives messages on demand (pull model) rather than
12/// being event-driven (push model).
13///
14/// Implement this trait on endpoints that support synchronous pull-based
15/// consumption (e.g., file, FTP, JMS). Components that are purely
16/// event-driven (e.g., HTTP server, Kafka) can leave the default
17/// [`Endpoint::polling_consumer`] returning `None`.
18#[async_trait]
19pub trait PollingConsumer: Send + Sync {
20    /// Receive the next available exchange within `timeout`, or `None` if none
21    /// arrives. `timeout = Duration::ZERO` means non-blocking (return
22    /// immediately if nothing pending).
23    async fn receive(&mut self, timeout: Duration) -> Result<Option<Exchange>, CamelError>;
24}
25
26/// An Endpoint represents a source or destination in a route URI.
27pub trait Endpoint: Send + Sync {
28    /// The URI that identifies this endpoint.
29    fn uri(&self) -> &str;
30
31    /// Create a consumer that reads from this endpoint.
32    ///
33    /// `rt` provides narrow observability access (`metrics()` + `health()`)
34    /// per ADR-0012 Phase A. Consumers store the Arc for later
35    /// `rt.metrics().increment_errors(...)` / `rt.health().force_unhealthy_for_route(...)`
36    /// calls (Phase B).
37    fn create_consumer(
38        &self,
39        rt: Arc<dyn RuntimeObservability>,
40    ) -> Result<Box<dyn Consumer>, CamelError>;
41
42    /// Create a producer that writes to this endpoint.
43    ///
44    /// `rt` provides narrow observability access (`metrics()` + `health()`)
45    /// per ADR-0012 Phase A. Producers store the Arc for later
46    /// `rt.health().force_unhealthy_for_route(...)` calls on creation failure
47    /// (Phase B category (g) sites).
48    fn create_producer(
49        &self,
50        rt: Arc<dyn RuntimeObservability>,
51        ctx: &ProducerContext,
52    ) -> Result<BoxProcessor, CamelError>;
53
54    /// Optional body type contract for the producer.
55    ///
56    /// When `Some(t)`, the pipeline will coerce the body to `t` before calling
57    /// the producer. Default: `None` (accept any body variant, zero overhead).
58    fn body_contract(&self) -> Option<BodyType> {
59        None
60    }
61
62    /// Return a polling consumer for this endpoint, if supported.
63    ///
64    /// Polling consumers use a pull model — callers invoke
65    /// [`PollingConsumer::receive`] to retrieve the next message.
66    /// Endpoints that only support push-based consumption should leave
67    /// this default (returns `None`).
68    fn polling_consumer(&self) -> Option<Box<dyn PollingConsumer>> {
69        None
70    }
71}
72
73#[cfg(test)]
74mod tests {
75    use super::*;
76    use crate::ComponentContext;
77    use crate::test_support::PanicRuntimeObservability;
78
79    /// A minimal mock endpoint for testing default trait methods.
80    struct MockEndpoint {
81        uri: String,
82    }
83
84    impl MockEndpoint {
85        fn new(uri: &str) -> Self {
86            Self {
87                uri: uri.to_string(),
88            }
89        }
90    }
91
92    impl Endpoint for MockEndpoint {
93        fn uri(&self) -> &str {
94            &self.uri
95        }
96
97        fn create_consumer(
98            &self,
99            _rt: Arc<dyn crate::RuntimeObservability>,
100        ) -> Result<Box<dyn Consumer>, CamelError> {
101            Err(CamelError::EndpointCreationFailed("mock".into()))
102        }
103
104        fn create_producer(
105            &self,
106            _rt: Arc<dyn crate::RuntimeObservability>,
107            _ctx: &ProducerContext,
108        ) -> Result<BoxProcessor, CamelError> {
109            Err(CamelError::ProcessorError("mock".into()))
110        }
111    }
112
113    #[test]
114    fn mock_endpoint_polling_consumer_returns_none() {
115        let ep = MockEndpoint::new("mock://test");
116        assert!(ep.polling_consumer().is_none());
117    }
118
119    #[test]
120    fn mock_endpoint_body_contract_default_is_none() {
121        let ep = MockEndpoint::new("mock://test");
122        assert!(ep.body_contract().is_none());
123    }
124
125    #[test]
126    fn mock_endpoint_uri() {
127        let ep = MockEndpoint::new("mock://test");
128        assert_eq!(ep.uri(), "mock://test");
129    }
130
131    #[test]
132    fn mock_endpoint_create_consumer_errors() {
133        let ep = MockEndpoint::new("mock://test");
134        let rt: Arc<dyn crate::RuntimeObservability> = Arc::new(PanicRuntimeObservability);
135        let result = ep.create_consumer(rt);
136        assert!(result.is_err());
137    }
138
139    #[test]
140    fn mock_endpoint_create_producer_errors() {
141        let ep = MockEndpoint::new("mock://test");
142        let ctx = ProducerContext::new();
143        let rt: Arc<dyn crate::RuntimeObservability> = Arc::new(PanicRuntimeObservability);
144        let result = ep.create_producer(rt, &ctx);
145        assert!(result.is_err());
146    }
147
148    /// Verify ComponentContext can be constructed (via NoOpComponentContext).
149    #[test]
150    fn component_context_noop_can_be_constructed() {
151        let _ctx = crate::NoOpComponentContext;
152    }
153
154    #[test]
155    fn component_context_noop_resolve_returns_none() {
156        let ctx = crate::NoOpComponentContext;
157        assert!(ctx.resolve_component("anything").is_none());
158        assert!(ctx.resolve_language("anything").is_none());
159    }
160}