Skip to main content

camel_component_http/
static_endpoint.rs

1use std::sync::Arc;
2
3use camel_component_api::{
4    CamelError, Component, Consumer, Endpoint, ProducerContext, RuntimeObservability,
5};
6use tower_http::services::ServeDir;
7
8use crate::registry::{MountMode, StaticMount};
9use crate::{HttpStaticConfig, ServerRegistry};
10
11// ---------------------------------------------------------------------------
12// HttpStaticComponent
13// ---------------------------------------------------------------------------
14
15/// Component factory for the `http-static:` scheme.
16///
17/// Creates [`HttpStaticEndpoint`] instances from URIs like
18/// `http-static:/path/to/dir?port=8080&spaFallback=true`.
19pub struct HttpStaticComponent {
20    config: HttpStaticConfig,
21}
22
23impl HttpStaticComponent {
24    pub fn new() -> Self {
25        Self {
26            config: HttpStaticConfig::default(),
27        }
28    }
29
30    pub fn with_config(config: HttpStaticConfig) -> Self {
31        Self { config }
32    }
33}
34
35impl Default for HttpStaticComponent {
36    fn default() -> Self {
37        Self::new()
38    }
39}
40
41impl Component for HttpStaticComponent {
42    fn scheme(&self) -> &str {
43        "http-static"
44    }
45
46    fn create_endpoint(
47        &self,
48        uri: &str,
49        _ctx: &dyn camel_component_api::ComponentContext,
50    ) -> Result<Box<dyn Endpoint>, CamelError> {
51        let config = HttpStaticConfig::from_uri_with_defaults(uri, &self.config)?;
52        Ok(Box::new(HttpStaticEndpoint {
53            uri: uri.to_string(),
54            config,
55        }))
56    }
57}
58
59// ---------------------------------------------------------------------------
60// HttpStaticEndpoint
61// ---------------------------------------------------------------------------
62
63/// Endpoint for a static file serving route.
64///
65/// Holds the resolved [`HttpStaticConfig`] and creates [`HttpStaticConsumer`]
66/// instances when the route starts.
67pub struct HttpStaticEndpoint {
68    uri: String,
69    config: HttpStaticConfig,
70}
71
72impl Endpoint for HttpStaticEndpoint {
73    fn uri(&self) -> &str {
74        &self.uri
75    }
76
77    fn create_consumer(
78        &self,
79        rt: Arc<dyn RuntimeObservability>,
80    ) -> Result<Box<dyn Consumer>, CamelError> {
81        Ok(Box::new(HttpStaticConsumer::new(self.config.clone(), rt)))
82    }
83
84    fn create_producer(
85        &self,
86        _rt: Arc<dyn RuntimeObservability>,
87        _ctx: &ProducerContext,
88    ) -> Result<camel_component_api::BoxProcessor, CamelError> {
89        Err(CamelError::Config(
90            "http-static endpoint does not support producers".to_string(),
91        ))
92    }
93}
94
95// ---------------------------------------------------------------------------
96// HttpStaticConsumer
97// ---------------------------------------------------------------------------
98
99/// Consumer that registers a static file mount into the shared
100/// [`HttpRouteRegistry`] and stays idle until cancelled.
101///
102/// On start:
103/// 1. Canonicalizes the configured `dir` (fails if not found).
104/// 2. Canonicalizes each `error_pages` path (fails if any don't exist).
105/// 3. Builds a `ServeDir` for the directory.
106/// 4. Registers a `StaticMount` into the registry.
107///
108/// On stop (cancellation):
109/// - Unregisters the mount from the registry.
110pub struct HttpStaticConsumer {
111    config: HttpStaticConfig,
112    /// Phase B will use this for `rt.metrics().increment_errors(...)` and
113    /// `rt.health().force_unhealthy_for_route(...)` calls per ADR-0012.
114    #[allow(dead_code)]
115    runtime: Arc<dyn RuntimeObservability>,
116}
117
118impl HttpStaticConsumer {
119    /// Create a new `HttpStaticConsumer` from the given config.
120    pub fn new(config: HttpStaticConfig, runtime: Arc<dyn RuntimeObservability>) -> Self {
121        Self { config, runtime }
122    }
123}
124
125#[async_trait::async_trait]
126impl Consumer for HttpStaticConsumer {
127    async fn start(&mut self, ctx: camel_component_api::ConsumerContext) -> Result<(), CamelError> {
128        // 1. Canonicalize dir
129        let dir = std::fs::canonicalize(&self.config.dir).map_err(|e| {
130            CamelError::Config(format!(
131                "http-static directory not found: {}: {}",
132                self.config.dir.display(),
133                e
134            ))
135        })?;
136
137        // 2. Canonicalize error_pages paths (resolved relative to dir)
138        let mut error_pages = std::collections::HashMap::new();
139        for (code, path) in &self.config.error_pages {
140            let resolved = if path.is_absolute() {
141                path.clone()
142            } else {
143                self.config.dir.join(path)
144            };
145            let canonical = std::fs::canonicalize(&resolved).map_err(|e| {
146                CamelError::Config(format!(
147                    "http-static error page not found for status {}: {}: {}",
148                    code,
149                    resolved.display(),
150                    e
151                ))
152            })?;
153            error_pages.insert(*code, canonical);
154        }
155
156        // 3. Build ServeDir
157        let serve_dir = ServeDir::new(&dir)
158            .precompressed_gzip()
159            .precompressed_br()
160            .append_index_html_on_directories(true);
161
162        // 4. Get registry
163        let registry = ServerRegistry::global()
164            .get_or_spawn(
165                &self.config.host,
166                self.config.port,
167                2 * 1024 * 1024,  // max_request_body (not used for static)
168                10 * 1024 * 1024, // max_response_body (not used for static)
169                1024,             // max_inflight_requests
170                self.runtime.clone(),
171                ctx.route_id().to_string(),
172            )
173            .await?;
174
175        // 5. Register mount
176        let mode = if self.config.spa_fallback {
177            MountMode::Spa
178        } else {
179            MountMode::Static
180        };
181        let mount = StaticMount {
182            mount_path: self.config.mount_path.clone(),
183            mode,
184            dir: dir.clone(),
185            cache_control: self.config.cache_control.clone(),
186            error_pages,
187            serve_dir,
188        };
189
190        registry.register_static_mount(mount).await?;
191
192        let mount_path_for_cleanup = self.config.mount_path.clone();
193        let registry_for_cleanup = registry.clone();
194
195        // 6. Wait on cancellation token
196        ctx.cancelled().await;
197
198        // 7. Unregister on stop (by mount_path identity)
199        registry_for_cleanup
200            .unregister_static_mount(&mount_path_for_cleanup)
201            .await;
202
203        Ok(())
204    }
205
206    async fn stop(&mut self) -> Result<(), CamelError> {
207        Ok(())
208    }
209
210    fn concurrency_model(&self) -> camel_component_api::ConcurrencyModel {
211        camel_component_api::ConcurrencyModel::Sequential
212    }
213}
214
215// ---------------------------------------------------------------------------
216// Tests
217// ---------------------------------------------------------------------------
218
219#[cfg(test)]
220mod tests {
221    use camel_component_api::test_support::PanicRuntimeObservability;
222    fn test_rt() -> std::sync::Arc<dyn camel_component_api::RuntimeObservability> {
223        std::sync::Arc::new(PanicRuntimeObservability)
224    }
225    fn rt() -> std::sync::Arc<dyn camel_component_api::RuntimeObservability> {
226        std::sync::Arc::new(PanicRuntimeObservability)
227    }
228
229    use super::*;
230    use crate::REGISTRY_TEST_MUTEX;
231    use camel_component_api::{ConsumerContext, ExchangeEnvelope};
232    use std::path::PathBuf;
233    use std::sync::Arc;
234    use tokio::sync::{Notify, mpsc};
235    use tokio_util::sync::CancellationToken;
236
237    /// Helper: create a test consumer context with a controllable cancellation.
238    fn test_consumer_ctx(notify: Arc<Notify>) -> ConsumerContext {
239        let (tx, _rx) = mpsc::channel::<ExchangeEnvelope>(16);
240        let token = CancellationToken::new();
241        // Spawn a task that cancels when notified
242        let token_clone = token.clone();
243        tokio::spawn(async move {
244            notify.notified().await;
245            token_clone.cancel();
246        });
247        ConsumerContext::new(tx, token, "http-static-test-route".to_string())
248    }
249
250    #[test]
251    fn test_component_scheme() {
252        let component = HttpStaticComponent::new();
253        assert_eq!(component.scheme(), "http-static");
254    }
255
256    #[test]
257    fn test_component_with_config() {
258        let config = HttpStaticConfig {
259            dir: PathBuf::from("/tmp"),
260            port: 9999,
261            ..HttpStaticConfig::default()
262        };
263        let component = HttpStaticComponent::with_config(config.clone());
264        assert_eq!(component.scheme(), "http-static");
265    }
266
267    #[test]
268    fn test_endpoint_creates_consumer() {
269        let config = HttpStaticConfig {
270            dir: PathBuf::from("/tmp"),
271            ..HttpStaticConfig::default()
272        };
273        let endpoint = HttpStaticEndpoint {
274            uri: "http-static:/tmp".to_string(),
275            config,
276        };
277        let consumer = endpoint.create_consumer(rt());
278        assert!(consumer.is_ok());
279    }
280
281    #[test]
282    fn test_endpoint_producer_not_supported() {
283        let config = HttpStaticConfig {
284            dir: PathBuf::from("/tmp"),
285            ..HttpStaticConfig::default()
286        };
287        let endpoint = HttpStaticEndpoint {
288            uri: "http-static:/tmp".to_string(),
289            config,
290        };
291        let ctx = camel_component_api::ProducerContext::new();
292        let result = endpoint.create_producer(rt(), &ctx);
293        assert!(result.is_err());
294        if let Err(CamelError::Config(msg)) = result {
295            assert!(msg.contains("does not support producers"));
296        } else {
297            panic!("Expected Config error");
298        }
299    }
300
301    #[tokio::test]
302    async fn test_consumer_start_nonexistent_dir_returns_error() {
303        let config = HttpStaticConfig {
304            dir: PathBuf::from("/nonexistent/path/that/does/not/exist"),
305            port: 19900,
306            ..HttpStaticConfig::default()
307        };
308        let mut consumer = HttpStaticConsumer::new(config, test_rt());
309        let notify = Arc::new(Notify::new());
310        let ctx = test_consumer_ctx(notify);
311
312        let result = consumer.start(ctx).await;
313        assert!(result.is_err());
314        if let Err(CamelError::Config(msg)) = result {
315            assert!(msg.contains("directory not found"));
316        } else {
317            panic!("Expected Config error for nonexistent dir");
318        }
319    }
320
321    #[allow(clippy::await_holding_lock)]
322    #[tokio::test]
323    async fn test_consumer_start_registers_mount_in_registry() {
324        let _guard = REGISTRY_TEST_MUTEX.lock().unwrap();
325        // Reset registry for clean test
326        ServerRegistry::reset();
327
328        let dir = std::env::temp_dir();
329        let canonical_dir = std::fs::canonicalize(&dir).unwrap();
330
331        // Bind to a free port first
332        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
333        let port = listener.local_addr().unwrap().port();
334        drop(listener);
335
336        let config = HttpStaticConfig {
337            dir: dir.clone(),
338            port,
339            host: "127.0.0.1".to_string(),
340            ..HttpStaticConfig::default()
341        };
342
343        // Build ServeDir directly (same as consumer does)
344        let serve_dir = ServeDir::new(&canonical_dir)
345            .precompressed_gzip()
346            .precompressed_br()
347            .append_index_html_on_directories(true);
348
349        // Get registry (this spawns the axum server)
350        let registry = ServerRegistry::global()
351            .get_or_spawn(
352                "127.0.0.1",
353                port,
354                2 * 1024 * 1024,
355                10 * 1024 * 1024,
356                1024,
357                test_rt(),
358                "test-static".into(),
359            )
360            .await
361            .unwrap();
362
363        // Register mount
364        let mount = StaticMount {
365            mount_path: "/".to_string(),
366            mode: MountMode::Static,
367            dir: canonical_dir.clone(),
368            cache_control: config.cache_control.clone(),
369            error_pages: std::collections::HashMap::new(),
370            serve_dir,
371        };
372        registry.register_static_mount(mount).await.unwrap();
373
374        // Verify registered
375        let inner = registry.inner.read().await;
376        assert_eq!(
377            inner.mounts.len(),
378            1,
379            "Expected one static mount registered"
380        );
381        assert_eq!(inner.mounts[0].dir, canonical_dir);
382        assert_eq!(inner.mounts[0].mount_path, "/");
383    }
384
385    #[allow(clippy::await_holding_lock)]
386    #[tokio::test]
387    async fn test_consumer_stop_unregisters_mount() {
388        let _guard = REGISTRY_TEST_MUTEX.lock().unwrap();
389        // Reset registry for clean test
390        ServerRegistry::reset();
391
392        let dir = std::env::temp_dir();
393        let canonical_dir = std::fs::canonicalize(&dir).unwrap();
394
395        // Bind to a free port
396        let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
397        let port = listener.local_addr().unwrap().port();
398        drop(listener);
399
400        // Get registry
401        let registry = ServerRegistry::global()
402            .get_or_spawn(
403                "127.0.0.1",
404                port,
405                2 * 1024 * 1024,
406                10 * 1024 * 1024,
407                1024,
408                test_rt(),
409                "test-static".into(),
410            )
411            .await
412            .unwrap();
413
414        // Register mount
415        let serve_dir = ServeDir::new(&canonical_dir)
416            .precompressed_gzip()
417            .precompressed_br()
418            .append_index_html_on_directories(true);
419        let mount = StaticMount {
420            mount_path: "/".to_string(),
421            mode: MountMode::Static,
422            dir: canonical_dir.clone(),
423            cache_control: "public, max-age=0".to_string(),
424            error_pages: std::collections::HashMap::new(),
425            serve_dir,
426        };
427        registry.register_static_mount(mount).await.unwrap();
428
429        // Verify registered
430        {
431            let inner = registry.inner.read().await;
432            assert_eq!(inner.mounts.len(), 1);
433        }
434
435        // Unregister by mount_path
436        registry.unregister_static_mount("/").await;
437
438        // Verify unregistered
439        let inner = registry.inner.read().await;
440        assert_eq!(
441            inner.mounts.len(),
442            0,
443            "Expected static mount to be unregistered"
444        );
445    }
446}