1use std::sync::Arc;
2
3use camel_component_api::{
4 CamelError, Component, Consumer, Endpoint, ProducerContext, RuntimeObservability,
5};
6use tower_http::services::ServeDir;
7
8use crate::registry::{MountMode, StaticMount};
9use crate::{HttpStaticConfig, ServerRegistry};
10
11pub struct HttpStaticComponent {
20 config: HttpStaticConfig,
21}
22
23impl HttpStaticComponent {
24 pub fn new() -> Self {
25 Self {
26 config: HttpStaticConfig::default(),
27 }
28 }
29
30 pub fn with_config(config: HttpStaticConfig) -> Self {
31 Self { config }
32 }
33}
34
35impl Default for HttpStaticComponent {
36 fn default() -> Self {
37 Self::new()
38 }
39}
40
41impl Component for HttpStaticComponent {
42 fn scheme(&self) -> &str {
43 "http-static"
44 }
45
46 fn create_endpoint(
47 &self,
48 uri: &str,
49 _ctx: &dyn camel_component_api::ComponentContext,
50 ) -> Result<Box<dyn Endpoint>, CamelError> {
51 let config = HttpStaticConfig::from_uri_with_defaults(uri, &self.config)?;
52 Ok(Box::new(HttpStaticEndpoint {
53 uri: uri.to_string(),
54 config,
55 }))
56 }
57}
58
59pub struct HttpStaticEndpoint {
68 uri: String,
69 config: HttpStaticConfig,
70}
71
72impl Endpoint for HttpStaticEndpoint {
73 fn uri(&self) -> &str {
74 &self.uri
75 }
76
77 fn create_consumer(
78 &self,
79 rt: Arc<dyn RuntimeObservability>,
80 ) -> Result<Box<dyn Consumer>, CamelError> {
81 Ok(Box::new(HttpStaticConsumer::new(self.config.clone(), rt)))
82 }
83
84 fn create_producer(
85 &self,
86 _rt: Arc<dyn RuntimeObservability>,
87 _ctx: &ProducerContext,
88 ) -> Result<camel_component_api::BoxProcessor, CamelError> {
89 Err(CamelError::Config(
90 "http-static endpoint does not support producers".to_string(),
91 ))
92 }
93}
94
95pub struct HttpStaticConsumer {
111 config: HttpStaticConfig,
112 #[allow(dead_code)]
115 runtime: Arc<dyn RuntimeObservability>,
116}
117
118impl HttpStaticConsumer {
119 pub fn new(config: HttpStaticConfig, runtime: Arc<dyn RuntimeObservability>) -> Self {
121 Self { config, runtime }
122 }
123}
124
125#[async_trait::async_trait]
126impl Consumer for HttpStaticConsumer {
127 async fn start(&mut self, ctx: camel_component_api::ConsumerContext) -> Result<(), CamelError> {
128 let dir = std::fs::canonicalize(&self.config.dir).map_err(|e| {
130 CamelError::Config(format!(
131 "http-static directory not found: {}: {}",
132 self.config.dir.display(),
133 e
134 ))
135 })?;
136
137 let mut error_pages = std::collections::HashMap::new();
139 for (code, path) in &self.config.error_pages {
140 let resolved = if path.is_absolute() {
141 path.clone()
142 } else {
143 self.config.dir.join(path)
144 };
145 let canonical = std::fs::canonicalize(&resolved).map_err(|e| {
146 CamelError::Config(format!(
147 "http-static error page not found for status {}: {}: {}",
148 code,
149 resolved.display(),
150 e
151 ))
152 })?;
153 error_pages.insert(*code, canonical);
154 }
155
156 let serve_dir = ServeDir::new(&dir)
158 .precompressed_gzip()
159 .precompressed_br()
160 .append_index_html_on_directories(true);
161
162 let registry = ServerRegistry::global()
164 .get_or_spawn(
165 &self.config.host,
166 self.config.port,
167 2 * 1024 * 1024, 10 * 1024 * 1024, 1024, )
171 .await?;
172
173 let mode = if self.config.spa_fallback {
175 MountMode::Spa
176 } else {
177 MountMode::Static
178 };
179 let mount = StaticMount {
180 mount_path: self.config.mount_path.clone(),
181 mode,
182 dir: dir.clone(),
183 cache_control: self.config.cache_control.clone(),
184 error_pages,
185 serve_dir,
186 };
187
188 registry.register_static_mount(mount).await?;
189
190 let mount_path_for_cleanup = self.config.mount_path.clone();
191 let registry_for_cleanup = registry.clone();
192
193 ctx.cancelled().await;
195
196 registry_for_cleanup
198 .unregister_static_mount(&mount_path_for_cleanup)
199 .await;
200
201 Ok(())
202 }
203
204 async fn stop(&mut self) -> Result<(), CamelError> {
205 Ok(())
206 }
207
208 fn concurrency_model(&self) -> camel_component_api::ConcurrencyModel {
209 camel_component_api::ConcurrencyModel::Sequential
210 }
211}
212
213#[cfg(test)]
218mod tests {
219 use camel_component_api::test_support::PanicRuntimeObservability;
220 fn test_rt() -> std::sync::Arc<dyn camel_component_api::RuntimeObservability> {
221 std::sync::Arc::new(PanicRuntimeObservability)
222 }
223 fn rt() -> std::sync::Arc<dyn camel_component_api::RuntimeObservability> {
224 std::sync::Arc::new(PanicRuntimeObservability)
225 }
226
227 use super::*;
228 use crate::REGISTRY_TEST_MUTEX;
229 use camel_component_api::{ConsumerContext, ExchangeEnvelope};
230 use std::path::PathBuf;
231 use std::sync::Arc;
232 use tokio::sync::{Notify, mpsc};
233 use tokio_util::sync::CancellationToken;
234
235 fn test_consumer_ctx(notify: Arc<Notify>) -> ConsumerContext {
237 let (tx, _rx) = mpsc::channel::<ExchangeEnvelope>(16);
238 let token = CancellationToken::new();
239 let token_clone = token.clone();
241 tokio::spawn(async move {
242 notify.notified().await;
243 token_clone.cancel();
244 });
245 ConsumerContext::new(tx, token)
246 }
247
248 #[test]
249 fn test_component_scheme() {
250 let component = HttpStaticComponent::new();
251 assert_eq!(component.scheme(), "http-static");
252 }
253
254 #[test]
255 fn test_component_with_config() {
256 let config = HttpStaticConfig {
257 dir: PathBuf::from("/tmp"),
258 port: 9999,
259 ..HttpStaticConfig::default()
260 };
261 let component = HttpStaticComponent::with_config(config.clone());
262 assert_eq!(component.scheme(), "http-static");
263 }
264
265 #[test]
266 fn test_endpoint_creates_consumer() {
267 let config = HttpStaticConfig {
268 dir: PathBuf::from("/tmp"),
269 ..HttpStaticConfig::default()
270 };
271 let endpoint = HttpStaticEndpoint {
272 uri: "http-static:/tmp".to_string(),
273 config,
274 };
275 let consumer = endpoint.create_consumer(rt());
276 assert!(consumer.is_ok());
277 }
278
279 #[test]
280 fn test_endpoint_producer_not_supported() {
281 let config = HttpStaticConfig {
282 dir: PathBuf::from("/tmp"),
283 ..HttpStaticConfig::default()
284 };
285 let endpoint = HttpStaticEndpoint {
286 uri: "http-static:/tmp".to_string(),
287 config,
288 };
289 let ctx = camel_component_api::ProducerContext::new();
290 let result = endpoint.create_producer(rt(), &ctx);
291 assert!(result.is_err());
292 if let Err(CamelError::Config(msg)) = result {
293 assert!(msg.contains("does not support producers"));
294 } else {
295 panic!("Expected Config error");
296 }
297 }
298
299 #[tokio::test]
300 async fn test_consumer_start_nonexistent_dir_returns_error() {
301 let config = HttpStaticConfig {
302 dir: PathBuf::from("/nonexistent/path/that/does/not/exist"),
303 port: 19900,
304 ..HttpStaticConfig::default()
305 };
306 let mut consumer = HttpStaticConsumer::new(config, test_rt());
307 let notify = Arc::new(Notify::new());
308 let ctx = test_consumer_ctx(notify);
309
310 let result = consumer.start(ctx).await;
311 assert!(result.is_err());
312 if let Err(CamelError::Config(msg)) = result {
313 assert!(msg.contains("directory not found"));
314 } else {
315 panic!("Expected Config error for nonexistent dir");
316 }
317 }
318
319 #[allow(clippy::await_holding_lock)]
320 #[tokio::test]
321 async fn test_consumer_start_registers_mount_in_registry() {
322 let _guard = REGISTRY_TEST_MUTEX.lock().unwrap();
323 ServerRegistry::reset();
325
326 let dir = std::env::temp_dir();
327 let canonical_dir = std::fs::canonicalize(&dir).unwrap();
328
329 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
331 let port = listener.local_addr().unwrap().port();
332 drop(listener);
333
334 let config = HttpStaticConfig {
335 dir: dir.clone(),
336 port,
337 host: "127.0.0.1".to_string(),
338 ..HttpStaticConfig::default()
339 };
340
341 let serve_dir = ServeDir::new(&canonical_dir)
343 .precompressed_gzip()
344 .precompressed_br()
345 .append_index_html_on_directories(true);
346
347 let registry = ServerRegistry::global()
349 .get_or_spawn("127.0.0.1", port, 2 * 1024 * 1024, 10 * 1024 * 1024, 1024)
350 .await
351 .unwrap();
352
353 let mount = StaticMount {
355 mount_path: "/".to_string(),
356 mode: MountMode::Static,
357 dir: canonical_dir.clone(),
358 cache_control: config.cache_control.clone(),
359 error_pages: std::collections::HashMap::new(),
360 serve_dir,
361 };
362 registry.register_static_mount(mount).await.unwrap();
363
364 let inner = registry.inner.read().await;
366 assert_eq!(
367 inner.mounts.len(),
368 1,
369 "Expected one static mount registered"
370 );
371 assert_eq!(inner.mounts[0].dir, canonical_dir);
372 assert_eq!(inner.mounts[0].mount_path, "/");
373 }
374
375 #[allow(clippy::await_holding_lock)]
376 #[tokio::test]
377 async fn test_consumer_stop_unregisters_mount() {
378 let _guard = REGISTRY_TEST_MUTEX.lock().unwrap();
379 ServerRegistry::reset();
381
382 let dir = std::env::temp_dir();
383 let canonical_dir = std::fs::canonicalize(&dir).unwrap();
384
385 let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
387 let port = listener.local_addr().unwrap().port();
388 drop(listener);
389
390 let registry = ServerRegistry::global()
392 .get_or_spawn("127.0.0.1", port, 2 * 1024 * 1024, 10 * 1024 * 1024, 1024)
393 .await
394 .unwrap();
395
396 let serve_dir = ServeDir::new(&canonical_dir)
398 .precompressed_gzip()
399 .precompressed_br()
400 .append_index_html_on_directories(true);
401 let mount = StaticMount {
402 mount_path: "/".to_string(),
403 mode: MountMode::Static,
404 dir: canonical_dir.clone(),
405 cache_control: "public, max-age=0".to_string(),
406 error_pages: std::collections::HashMap::new(),
407 serve_dir,
408 };
409 registry.register_static_mount(mount).await.unwrap();
410
411 {
413 let inner = registry.inner.read().await;
414 assert_eq!(inner.mounts.len(), 1);
415 }
416
417 registry.unregister_static_mount("/").await;
419
420 let inner = registry.inner.read().await;
422 assert_eq!(
423 inner.mounts.len(),
424 0,
425 "Expected static mount to be unregistered"
426 );
427 }
428}