1use std::sync::Arc;
2
3use camel_component_api::{
4 CamelError, Component, Consumer, Endpoint, ProducerContext, RuntimeObservability,
5};
6use tower_http::services::ServeDir;
7
8use crate::registry::{MountMode, StaticMount};
9use crate::{HttpStaticConfig, ServerRegistry};
10
11pub struct HttpStaticComponent {
20 config: HttpStaticConfig,
21}
22
23impl HttpStaticComponent {
24 pub fn new() -> Self {
25 Self {
26 config: HttpStaticConfig::default(),
27 }
28 }
29
30 pub fn with_config(config: HttpStaticConfig) -> Self {
31 Self { config }
32 }
33}
34
35impl Default for HttpStaticComponent {
36 fn default() -> Self {
37 Self::new()
38 }
39}
40
41impl Component for HttpStaticComponent {
42 fn scheme(&self) -> &str {
43 "http-static"
44 }
45
46 fn create_endpoint(
47 &self,
48 uri: &str,
49 _ctx: &dyn camel_component_api::ComponentContext,
50 ) -> Result<Box<dyn Endpoint>, CamelError> {
51 let config = HttpStaticConfig::from_uri_with_defaults(uri, &self.config)?;
52 Ok(Box::new(HttpStaticEndpoint {
53 uri: uri.to_string(),
54 config,
55 }))
56 }
57}
58
59pub struct HttpStaticEndpoint {
68 uri: String,
69 config: HttpStaticConfig,
70}
71
72impl Endpoint for HttpStaticEndpoint {
73 fn uri(&self) -> &str {
74 &self.uri
75 }
76
77 fn create_consumer(
78 &self,
79 rt: Arc<dyn RuntimeObservability>,
80 ) -> Result<Box<dyn Consumer>, CamelError> {
81 Ok(Box::new(HttpStaticConsumer::new(self.config.clone(), rt)))
82 }
83
84 fn create_producer(
85 &self,
86 _rt: Arc<dyn RuntimeObservability>,
87 _ctx: &ProducerContext,
88 ) -> Result<camel_component_api::BoxProcessor, CamelError> {
89 Err(CamelError::Config(
90 "http-static endpoint does not support producers".to_string(),
91 ))
92 }
93}
94
95pub struct HttpStaticConsumer {
111 config: HttpStaticConfig,
112 #[allow(dead_code)]
115 runtime: Arc<dyn RuntimeObservability>,
116}
117
118impl HttpStaticConsumer {
119 pub fn new(config: HttpStaticConfig, runtime: Arc<dyn RuntimeObservability>) -> Self {
121 Self { config, runtime }
122 }
123}
124
125#[async_trait::async_trait]
126impl Consumer for HttpStaticConsumer {
127 async fn start(&mut self, ctx: camel_component_api::ConsumerContext) -> Result<(), CamelError> {
128 let dir = std::fs::canonicalize(&self.config.dir).map_err(|e| {
130 CamelError::Config(format!(
131 "http-static directory not found: {}: {}",
132 self.config.dir.display(),
133 e
134 ))
135 })?;
136
137 let mut error_pages = std::collections::HashMap::new();
139 for (code, path) in &self.config.error_pages {
140 let resolved = if path.is_absolute() {
141 path.clone()
142 } else {
143 self.config.dir.join(path)
144 };
145 let canonical = std::fs::canonicalize(&resolved).map_err(|e| {
146 CamelError::Config(format!(
147 "http-static error page not found for status {}: {}: {}",
148 code,
149 resolved.display(),
150 e
151 ))
152 })?;
153 error_pages.insert(*code, canonical);
154 }
155
156 let serve_dir = ServeDir::new(&dir)
158 .precompressed_gzip()
159 .precompressed_br()
160 .append_index_html_on_directories(true);
161
162 let registry = ServerRegistry::global()
164 .get_or_spawn(
165 &self.config.host,
166 self.config.port,
167 2 * 1024 * 1024, 10 * 1024 * 1024, 1024, self.runtime.clone(),
171 ctx.route_id().to_string(),
172 )
173 .await?;
174
175 let mode = if self.config.spa_fallback {
177 MountMode::Spa
178 } else {
179 MountMode::Static
180 };
181 let mount = StaticMount {
182 mount_path: self.config.mount_path.clone(),
183 mode,
184 dir: dir.clone(),
185 cache_control: self.config.cache_control.clone(),
186 error_pages,
187 serve_dir,
188 };
189
190 registry.register_static_mount(mount).await?;
191
192 let mount_path_for_cleanup = self.config.mount_path.clone();
193 let registry_for_cleanup = registry.clone();
194
195 ctx.cancelled().await;
197
198 registry_for_cleanup
200 .unregister_static_mount(&mount_path_for_cleanup)
201 .await;
202
203 Ok(())
204 }
205
206 async fn stop(&mut self) -> Result<(), CamelError> {
207 Ok(())
208 }
209
210 fn concurrency_model(&self) -> camel_component_api::ConcurrencyModel {
211 camel_component_api::ConcurrencyModel::Sequential
212 }
213}
214
215#[cfg(test)]
220mod tests {
221 use camel_component_api::test_support::PanicRuntimeObservability;
222 fn test_rt() -> std::sync::Arc<dyn camel_component_api::RuntimeObservability> {
223 std::sync::Arc::new(PanicRuntimeObservability)
224 }
225 fn rt() -> std::sync::Arc<dyn camel_component_api::RuntimeObservability> {
226 std::sync::Arc::new(PanicRuntimeObservability)
227 }
228
229 use super::*;
230 use crate::REGISTRY_TEST_MUTEX;
231 use camel_component_api::{ConsumerContext, ExchangeEnvelope};
232 use std::path::PathBuf;
233 use std::sync::Arc;
234 use tokio::sync::{Notify, mpsc};
235 use tokio_util::sync::CancellationToken;
236
237 fn test_consumer_ctx(notify: Arc<Notify>) -> ConsumerContext {
239 let (tx, _rx) = mpsc::channel::<ExchangeEnvelope>(16);
240 let token = CancellationToken::new();
241 let token_clone = token.clone();
243 tokio::spawn(async move {
244 notify.notified().await;
245 token_clone.cancel();
246 });
247 ConsumerContext::new(tx, token, "http-static-test-route".to_string())
248 }
249
250 #[test]
251 fn test_component_scheme() {
252 let component = HttpStaticComponent::new();
253 assert_eq!(component.scheme(), "http-static");
254 }
255
256 #[test]
257 fn test_component_with_config() {
258 let config = HttpStaticConfig {
259 dir: PathBuf::from("/tmp"),
260 port: 9999,
261 ..HttpStaticConfig::default()
262 };
263 let component = HttpStaticComponent::with_config(config.clone());
264 assert_eq!(component.scheme(), "http-static");
265 }
266
267 #[test]
268 fn test_endpoint_creates_consumer() {
269 let config = HttpStaticConfig {
270 dir: PathBuf::from("/tmp"),
271 ..HttpStaticConfig::default()
272 };
273 let endpoint = HttpStaticEndpoint {
274 uri: "http-static:/tmp".to_string(),
275 config,
276 };
277 let consumer = endpoint.create_consumer(rt());
278 assert!(consumer.is_ok());
279 }
280
281 #[test]
282 fn test_endpoint_producer_not_supported() {
283 let config = HttpStaticConfig {
284 dir: PathBuf::from("/tmp"),
285 ..HttpStaticConfig::default()
286 };
287 let endpoint = HttpStaticEndpoint {
288 uri: "http-static:/tmp".to_string(),
289 config,
290 };
291 let ctx = camel_component_api::ProducerContext::new();
292 let result = endpoint.create_producer(rt(), &ctx);
293 assert!(result.is_err());
294 if let Err(CamelError::Config(msg)) = result {
295 assert!(msg.contains("does not support producers"));
296 } else {
297 panic!("Expected Config error");
298 }
299 }
300
301 #[tokio::test]
302 async fn test_consumer_start_nonexistent_dir_returns_error() {
303 let config = HttpStaticConfig {
304 dir: PathBuf::from("/nonexistent/path/that/does/not/exist"),
305 port: 19900,
306 ..HttpStaticConfig::default()
307 };
308 let mut consumer = HttpStaticConsumer::new(config, test_rt());
309 let notify = Arc::new(Notify::new());
310 let ctx = test_consumer_ctx(notify);
311
312 let result = consumer.start(ctx).await;
313 assert!(result.is_err());
314 if let Err(CamelError::Config(msg)) = result {
315 assert!(msg.contains("directory not found"));
316 } else {
317 panic!("Expected Config error for nonexistent dir");
318 }
319 }
320
321 #[allow(clippy::await_holding_lock)]
322 #[tokio::test]
323 async fn test_consumer_start_registers_mount_in_registry() {
324 let _guard = REGISTRY_TEST_MUTEX.lock().unwrap();
325 ServerRegistry::reset();
327
328 let dir = std::env::temp_dir();
329 let canonical_dir = std::fs::canonicalize(&dir).unwrap();
330
331 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
333 let port = listener.local_addr().unwrap().port();
334 drop(listener);
335
336 let config = HttpStaticConfig {
337 dir: dir.clone(),
338 port,
339 host: "127.0.0.1".to_string(),
340 ..HttpStaticConfig::default()
341 };
342
343 let serve_dir = ServeDir::new(&canonical_dir)
345 .precompressed_gzip()
346 .precompressed_br()
347 .append_index_html_on_directories(true);
348
349 let registry = ServerRegistry::global()
351 .get_or_spawn(
352 "127.0.0.1",
353 port,
354 2 * 1024 * 1024,
355 10 * 1024 * 1024,
356 1024,
357 test_rt(),
358 "test-static".into(),
359 )
360 .await
361 .unwrap();
362
363 let mount = StaticMount {
365 mount_path: "/".to_string(),
366 mode: MountMode::Static,
367 dir: canonical_dir.clone(),
368 cache_control: config.cache_control.clone(),
369 error_pages: std::collections::HashMap::new(),
370 serve_dir,
371 };
372 registry.register_static_mount(mount).await.unwrap();
373
374 let inner = registry.inner.read().await;
376 assert_eq!(
377 inner.mounts.len(),
378 1,
379 "Expected one static mount registered"
380 );
381 assert_eq!(inner.mounts[0].dir, canonical_dir);
382 assert_eq!(inner.mounts[0].mount_path, "/");
383 }
384
385 #[allow(clippy::await_holding_lock)]
386 #[tokio::test]
387 async fn test_consumer_stop_unregisters_mount() {
388 let _guard = REGISTRY_TEST_MUTEX.lock().unwrap();
389 ServerRegistry::reset();
391
392 let dir = std::env::temp_dir();
393 let canonical_dir = std::fs::canonicalize(&dir).unwrap();
394
395 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
397 let port = listener.local_addr().unwrap().port();
398 drop(listener);
399
400 let registry = ServerRegistry::global()
402 .get_or_spawn(
403 "127.0.0.1",
404 port,
405 2 * 1024 * 1024,
406 10 * 1024 * 1024,
407 1024,
408 test_rt(),
409 "test-static".into(),
410 )
411 .await
412 .unwrap();
413
414 let serve_dir = ServeDir::new(&canonical_dir)
416 .precompressed_gzip()
417 .precompressed_br()
418 .append_index_html_on_directories(true);
419 let mount = StaticMount {
420 mount_path: "/".to_string(),
421 mode: MountMode::Static,
422 dir: canonical_dir.clone(),
423 cache_control: "public, max-age=0".to_string(),
424 error_pages: std::collections::HashMap::new(),
425 serve_dir,
426 };
427 registry.register_static_mount(mount).await.unwrap();
428
429 {
431 let inner = registry.inner.read().await;
432 assert_eq!(inner.mounts.len(), 1);
433 }
434
435 registry.unregister_static_mount("/").await;
437
438 let inner = registry.inner.read().await;
440 assert_eq!(
441 inner.mounts.len(),
442 0,
443 "Expected static mount to be unregistered"
444 );
445 }
446}