Skip to main content

camel_component_http/
static_endpoint.rs

1use std::sync::Arc;
2
3use camel_component_api::{
4    CamelError, Component, Consumer, Endpoint, ProducerContext, RuntimeObservability,
5};
6use tower_http::services::ServeDir;
7
8use crate::registry::{MountMode, StaticMount};
9use crate::{HttpStaticConfig, ServerRegistry};
10
11// ---------------------------------------------------------------------------
12// HttpStaticComponent
13// ---------------------------------------------------------------------------
14
15/// Component factory for the `http-static:` scheme.
16///
17/// Creates [`HttpStaticEndpoint`] instances from URIs like
18/// `http-static:/path/to/dir?port=8080&spaFallback=true`.
19pub struct HttpStaticComponent {
20    config: HttpStaticConfig,
21}
22
23impl HttpStaticComponent {
24    pub fn new() -> Self {
25        Self {
26            config: HttpStaticConfig::default(),
27        }
28    }
29
30    pub fn with_config(config: HttpStaticConfig) -> Self {
31        Self { config }
32    }
33}
34
35impl Default for HttpStaticComponent {
36    fn default() -> Self {
37        Self::new()
38    }
39}
40
41impl Component for HttpStaticComponent {
42    fn scheme(&self) -> &str {
43        "http-static"
44    }
45
46    fn create_endpoint(
47        &self,
48        uri: &str,
49        _ctx: &dyn camel_component_api::ComponentContext,
50    ) -> Result<Box<dyn Endpoint>, CamelError> {
51        let config = HttpStaticConfig::from_uri_with_defaults(uri, &self.config)?;
52        Ok(Box::new(HttpStaticEndpoint {
53            uri: uri.to_string(),
54            config,
55        }))
56    }
57}
58
59// ---------------------------------------------------------------------------
60// HttpStaticEndpoint
61// ---------------------------------------------------------------------------
62
63/// Endpoint for a static file serving route.
64///
65/// Holds the resolved [`HttpStaticConfig`] and creates [`HttpStaticConsumer`]
66/// instances when the route starts.
67pub struct HttpStaticEndpoint {
68    uri: String,
69    config: HttpStaticConfig,
70}
71
72impl Endpoint for HttpStaticEndpoint {
73    fn uri(&self) -> &str {
74        &self.uri
75    }
76
77    fn create_consumer(
78        &self,
79        rt: Arc<dyn RuntimeObservability>,
80    ) -> Result<Box<dyn Consumer>, CamelError> {
81        Ok(Box::new(HttpStaticConsumer::new(self.config.clone(), rt)))
82    }
83
84    fn create_producer(
85        &self,
86        _rt: Arc<dyn RuntimeObservability>,
87        _ctx: &ProducerContext,
88    ) -> Result<camel_component_api::BoxProcessor, CamelError> {
89        Err(CamelError::Config(
90            "http-static endpoint does not support producers".to_string(),
91        ))
92    }
93}
94
95// ---------------------------------------------------------------------------
96// HttpStaticConsumer
97// ---------------------------------------------------------------------------
98
99/// Consumer that registers a static file mount into the shared
100/// [`HttpRouteRegistry`] and stays idle until cancelled.
101///
102/// On start:
103/// 1. Canonicalizes the configured `dir` (fails if not found).
104/// 2. Canonicalizes each `error_pages` path (fails if any don't exist).
105/// 3. Builds a `ServeDir` for the directory.
106/// 4. Registers a `StaticMount` into the registry.
107///
108/// On stop (cancellation):
109/// - Unregisters the mount from the registry.
110pub struct HttpStaticConsumer {
111    config: HttpStaticConfig,
112    /// Phase B will use this for `rt.metrics().increment_errors(...)` and
113    /// `rt.health().force_unhealthy_for_route(...)` calls per ADR-0012.
114    #[allow(dead_code)]
115    runtime: Arc<dyn RuntimeObservability>,
116}
117
118impl HttpStaticConsumer {
119    /// Create a new `HttpStaticConsumer` from the given config.
120    pub fn new(config: HttpStaticConfig, runtime: Arc<dyn RuntimeObservability>) -> Self {
121        Self { config, runtime }
122    }
123}
124
125#[async_trait::async_trait]
126impl Consumer for HttpStaticConsumer {
127    async fn start(&mut self, ctx: camel_component_api::ConsumerContext) -> Result<(), CamelError> {
128        // 1. Canonicalize dir
129        let dir = std::fs::canonicalize(&self.config.dir).map_err(|e| {
130            CamelError::Config(format!(
131                "http-static directory not found: {}: {}",
132                self.config.dir.display(),
133                e
134            ))
135        })?;
136
137        // 2. Canonicalize error_pages paths (resolved relative to dir)
138        let mut error_pages = std::collections::HashMap::new();
139        for (code, path) in &self.config.error_pages {
140            let resolved = if path.is_absolute() {
141                path.clone()
142            } else {
143                self.config.dir.join(path)
144            };
145            let canonical = std::fs::canonicalize(&resolved).map_err(|e| {
146                CamelError::Config(format!(
147                    "http-static error page not found for status {}: {}: {}",
148                    code,
149                    resolved.display(),
150                    e
151                ))
152            })?;
153            error_pages.insert(*code, canonical);
154        }
155
156        // 3. Build ServeDir
157        let serve_dir = ServeDir::new(&dir)
158            .precompressed_gzip()
159            .precompressed_br()
160            .append_index_html_on_directories(true);
161
162        // 4. Get registry
163        let registry = ServerRegistry::global()
164            .get_or_spawn(
165                &self.config.host,
166                self.config.port,
167                2 * 1024 * 1024,  // max_request_body (not used for static)
168                10 * 1024 * 1024, // max_response_body (not used for static)
169                1024,             // max_inflight_requests
170            )
171            .await?;
172
173        // 5. Register mount
174        let mode = if self.config.spa_fallback {
175            MountMode::Spa
176        } else {
177            MountMode::Static
178        };
179        let mount = StaticMount {
180            mount_path: self.config.mount_path.clone(),
181            mode,
182            dir: dir.clone(),
183            cache_control: self.config.cache_control.clone(),
184            error_pages,
185            serve_dir,
186        };
187
188        registry.register_static_mount(mount).await?;
189
190        let mount_path_for_cleanup = self.config.mount_path.clone();
191        let registry_for_cleanup = registry.clone();
192
193        // 6. Wait on cancellation token
194        ctx.cancelled().await;
195
196        // 7. Unregister on stop (by mount_path identity)
197        registry_for_cleanup
198            .unregister_static_mount(&mount_path_for_cleanup)
199            .await;
200
201        Ok(())
202    }
203
204    async fn stop(&mut self) -> Result<(), CamelError> {
205        Ok(())
206    }
207
208    fn concurrency_model(&self) -> camel_component_api::ConcurrencyModel {
209        camel_component_api::ConcurrencyModel::Sequential
210    }
211}
212
213// ---------------------------------------------------------------------------
214// Tests
215// ---------------------------------------------------------------------------
216
217#[cfg(test)]
218mod tests {
219    use camel_component_api::test_support::PanicRuntimeObservability;
220    fn test_rt() -> std::sync::Arc<dyn camel_component_api::RuntimeObservability> {
221        std::sync::Arc::new(PanicRuntimeObservability)
222    }
223    fn rt() -> std::sync::Arc<dyn camel_component_api::RuntimeObservability> {
224        std::sync::Arc::new(PanicRuntimeObservability)
225    }
226
227    use super::*;
228    use crate::REGISTRY_TEST_MUTEX;
229    use camel_component_api::{ConsumerContext, ExchangeEnvelope};
230    use std::path::PathBuf;
231    use std::sync::Arc;
232    use tokio::sync::{Notify, mpsc};
233    use tokio_util::sync::CancellationToken;
234
235    /// Helper: create a test consumer context with a controllable cancellation.
236    fn test_consumer_ctx(notify: Arc<Notify>) -> ConsumerContext {
237        let (tx, _rx) = mpsc::channel::<ExchangeEnvelope>(16);
238        let token = CancellationToken::new();
239        // Spawn a task that cancels when notified
240        let token_clone = token.clone();
241        tokio::spawn(async move {
242            notify.notified().await;
243            token_clone.cancel();
244        });
245        ConsumerContext::new(tx, token)
246    }
247
248    #[test]
249    fn test_component_scheme() {
250        let component = HttpStaticComponent::new();
251        assert_eq!(component.scheme(), "http-static");
252    }
253
254    #[test]
255    fn test_component_with_config() {
256        let config = HttpStaticConfig {
257            dir: PathBuf::from("/tmp"),
258            port: 9999,
259            ..HttpStaticConfig::default()
260        };
261        let component = HttpStaticComponent::with_config(config.clone());
262        assert_eq!(component.scheme(), "http-static");
263    }
264
265    #[test]
266    fn test_endpoint_creates_consumer() {
267        let config = HttpStaticConfig {
268            dir: PathBuf::from("/tmp"),
269            ..HttpStaticConfig::default()
270        };
271        let endpoint = HttpStaticEndpoint {
272            uri: "http-static:/tmp".to_string(),
273            config,
274        };
275        let consumer = endpoint.create_consumer(rt());
276        assert!(consumer.is_ok());
277    }
278
279    #[test]
280    fn test_endpoint_producer_not_supported() {
281        let config = HttpStaticConfig {
282            dir: PathBuf::from("/tmp"),
283            ..HttpStaticConfig::default()
284        };
285        let endpoint = HttpStaticEndpoint {
286            uri: "http-static:/tmp".to_string(),
287            config,
288        };
289        let ctx = camel_component_api::ProducerContext::new();
290        let result = endpoint.create_producer(rt(), &ctx);
291        assert!(result.is_err());
292        if let Err(CamelError::Config(msg)) = result {
293            assert!(msg.contains("does not support producers"));
294        } else {
295            panic!("Expected Config error");
296        }
297    }
298
299    #[tokio::test]
300    async fn test_consumer_start_nonexistent_dir_returns_error() {
301        let config = HttpStaticConfig {
302            dir: PathBuf::from("/nonexistent/path/that/does/not/exist"),
303            port: 19900,
304            ..HttpStaticConfig::default()
305        };
306        let mut consumer = HttpStaticConsumer::new(config, test_rt());
307        let notify = Arc::new(Notify::new());
308        let ctx = test_consumer_ctx(notify);
309
310        let result = consumer.start(ctx).await;
311        assert!(result.is_err());
312        if let Err(CamelError::Config(msg)) = result {
313            assert!(msg.contains("directory not found"));
314        } else {
315            panic!("Expected Config error for nonexistent dir");
316        }
317    }
318
319    #[allow(clippy::await_holding_lock)]
320    #[tokio::test]
321    async fn test_consumer_start_registers_mount_in_registry() {
322        let _guard = REGISTRY_TEST_MUTEX.lock().unwrap();
323        // Reset registry for clean test
324        ServerRegistry::reset();
325
326        let dir = std::env::temp_dir();
327        let canonical_dir = std::fs::canonicalize(&dir).unwrap();
328
329        // Bind to a free port first
330        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
331        let port = listener.local_addr().unwrap().port();
332        drop(listener);
333
334        let config = HttpStaticConfig {
335            dir: dir.clone(),
336            port,
337            host: "127.0.0.1".to_string(),
338            ..HttpStaticConfig::default()
339        };
340
341        // Build ServeDir directly (same as consumer does)
342        let serve_dir = ServeDir::new(&canonical_dir)
343            .precompressed_gzip()
344            .precompressed_br()
345            .append_index_html_on_directories(true);
346
347        // Get registry (this spawns the axum server)
348        let registry = ServerRegistry::global()
349            .get_or_spawn("127.0.0.1", port, 2 * 1024 * 1024, 10 * 1024 * 1024, 1024)
350            .await
351            .unwrap();
352
353        // Register mount
354        let mount = StaticMount {
355            mount_path: "/".to_string(),
356            mode: MountMode::Static,
357            dir: canonical_dir.clone(),
358            cache_control: config.cache_control.clone(),
359            error_pages: std::collections::HashMap::new(),
360            serve_dir,
361        };
362        registry.register_static_mount(mount).await.unwrap();
363
364        // Verify registered
365        let inner = registry.inner.read().await;
366        assert_eq!(
367            inner.mounts.len(),
368            1,
369            "Expected one static mount registered"
370        );
371        assert_eq!(inner.mounts[0].dir, canonical_dir);
372        assert_eq!(inner.mounts[0].mount_path, "/");
373    }
374
375    #[allow(clippy::await_holding_lock)]
376    #[tokio::test]
377    async fn test_consumer_stop_unregisters_mount() {
378        let _guard = REGISTRY_TEST_MUTEX.lock().unwrap();
379        // Reset registry for clean test
380        ServerRegistry::reset();
381
382        let dir = std::env::temp_dir();
383        let canonical_dir = std::fs::canonicalize(&dir).unwrap();
384
385        // Bind to a free port
386        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
387        let port = listener.local_addr().unwrap().port();
388        drop(listener);
389
390        // Get registry
391        let registry = ServerRegistry::global()
392            .get_or_spawn("127.0.0.1", port, 2 * 1024 * 1024, 10 * 1024 * 1024, 1024)
393            .await
394            .unwrap();
395
396        // Register mount
397        let serve_dir = ServeDir::new(&canonical_dir)
398            .precompressed_gzip()
399            .precompressed_br()
400            .append_index_html_on_directories(true);
401        let mount = StaticMount {
402            mount_path: "/".to_string(),
403            mode: MountMode::Static,
404            dir: canonical_dir.clone(),
405            cache_control: "public, max-age=0".to_string(),
406            error_pages: std::collections::HashMap::new(),
407            serve_dir,
408        };
409        registry.register_static_mount(mount).await.unwrap();
410
411        // Verify registered
412        {
413            let inner = registry.inner.read().await;
414            assert_eq!(inner.mounts.len(), 1);
415        }
416
417        // Unregister by mount_path
418        registry.unregister_static_mount("/").await;
419
420        // Verify unregistered
421        let inner = registry.inner.read().await;
422        assert_eq!(
423            inner.mounts.len(),
424            0,
425            "Expected static mount to be unregistered"
426        );
427    }
428}