1use crate::pool::{RunnerHandle, RunnerPoolKey};
2use crate::protocol::ProtocolClient;
3use camel_api::Exchange;
4use camel_api::function::*;
5use dashmap::DashMap;
6use std::sync::Arc;
7use tokio_util::sync::CancellationToken;
8
9use super::{FunctionHealthStatus, FunctionProvider, ProviderError};
10
11struct ContainerEntry {
12 container_id: String,
13 endpoint: String,
14}
15
16#[derive(Debug, Clone)]
17pub enum PullPolicy {
18 Always,
19 Never,
20 IfMissing,
21}
22
23impl std::fmt::Debug for ContainerProvider {
24 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
25 f.debug_struct("ContainerProvider")
26 .field("image", &self.image)
27 .field("active_containers", &self.containers_by_handle.len())
28 .finish()
29 }
30}
31
32#[derive(Debug, Clone)]
33pub struct ContainerProviderBuilder {
34 image: String,
35 boot_timeout: std::time::Duration,
36 pull_policy: PullPolicy,
37 instance_id: Option<String>,
38}
39
40impl Default for ContainerProviderBuilder {
41 fn default() -> Self {
42 Self {
43 image: "kennycallado/deno-runner:latest".to_string(),
44 boot_timeout: std::time::Duration::from_secs(10),
45 pull_policy: PullPolicy::IfMissing,
46 instance_id: None,
47 }
48 }
49}
50
51impl ContainerProviderBuilder {
52 pub fn new() -> Self {
53 Self::default()
54 }
55
56 pub fn image(mut self, image: impl Into<String>) -> Self {
57 self.image = image.into();
58 self
59 }
60
61 pub fn boot_timeout(mut self, timeout: std::time::Duration) -> Self {
62 self.boot_timeout = timeout;
63 self
64 }
65
66 pub fn pull_policy(mut self, policy: PullPolicy) -> Self {
67 self.pull_policy = policy;
68 self
69 }
70
71 pub fn instance_id(mut self, id: impl Into<String>) -> Self {
72 self.instance_id = Some(id.into());
73 self
74 }
75
76 pub fn build(self) -> Result<ContainerProvider, ProviderError> {
77 let docker = bollard::Docker::connect_with_local_defaults()
78 .map_err(|e| ProviderError::SpawnFailed(format!("docker connect: {e}")))?;
79 let instance_id = self.instance_id.unwrap_or_else(|| {
80 let hash = blake3::hash(
81 format!(
82 "{}-{}",
83 self.image,
84 std::time::SystemTime::now()
85 .duration_since(std::time::UNIX_EPOCH)
86 .unwrap_or_default()
87 .as_nanos()
88 )
89 .as_bytes(),
90 );
91 format!("inst-{}", &hash.to_hex()[..12])
92 });
93 Ok(ContainerProvider {
94 docker,
95 image: self.image,
96 boot_timeout: self.boot_timeout,
97 pull_policy: self.pull_policy,
98 client: ProtocolClient::new(),
99 containers_by_handle: DashMap::new(),
100 instance_id,
101 })
102 }
103}
104
105pub struct ContainerProvider {
106 docker: bollard::Docker,
107 image: String,
108 instance_id: String,
109 boot_timeout: std::time::Duration,
110 pull_policy: PullPolicy,
111 client: ProtocolClient,
112 containers_by_handle: DashMap<String, ContainerEntry>,
113}
114
115impl ContainerProvider {
116 pub fn builder() -> ContainerProviderBuilder {
117 ContainerProviderBuilder::new()
118 }
119
120 pub async fn cleanup_all(&self) {
121 let entries: Vec<(String, String)> = self
122 .containers_by_handle
123 .iter()
124 .map(|e| (e.key().clone(), e.container_id.clone()))
125 .collect();
126 for (handle_id, container_id) in entries {
127 self.stop_and_remove_container(&container_id).await;
128 self.containers_by_handle.remove(&handle_id);
129 }
130 }
131
132 pub fn instance_id(&self) -> &str {
133 &self.instance_id
134 }
135
136 pub async fn is_clean(&self) -> bool {
137 self.list_instance_containers().await.is_empty()
138 }
139
140 pub async fn list_instance_containers(&self) -> Vec<String> {
141 let options = bollard::query_parameters::ListContainersOptions {
142 filters: Some(std::collections::HashMap::from([(
143 "label".to_string(),
144 vec![format!("camel.function.instance={}", self.instance_id)],
145 )])),
146 ..Default::default()
147 };
148 match self.docker.list_containers(Some(options)).await {
149 Ok(containers) => containers.into_iter().filter_map(|c| c.id).collect(),
150 Err(_) => vec![],
151 }
152 }
153
154 async fn stop_and_remove_container(&self, container_id: &str) {
155 let _ = self.docker.stop_container(container_id, None).await;
156 match self
157 .docker
158 .remove_container(
159 container_id,
160 Some(bollard::query_parameters::RemoveContainerOptions {
161 force: true,
162 ..Default::default()
163 }),
164 )
165 .await
166 {
167 Ok(()) => {}
168 Err(bollard::errors::Error::DockerResponseServerError {
169 status_code: 404, ..
170 }) => {}
171 Err(e) => {
172 tracing::warn!(target: "camel_function::container", %container_id, "remove error: {e}");
173 }
174 }
175 }
176
177 async fn allocate_host_port(&self) -> Result<u16, ProviderError> {
178 let listener = tokio::net::TcpListener::bind("127.0.0.1:0")
179 .await
180 .map_err(|e| ProviderError::SpawnFailed(format!("allocate port: {e}")))?;
181 let port = listener
182 .local_addr()
183 .map_err(|e| ProviderError::SpawnFailed(format!("get port: {e}")))?
184 .port();
185 drop(listener);
186 Ok(port)
187 }
188
189 pub async fn spawn_runner(&self, runtime: &str) -> Result<RunnerHandle, ProviderError> {
190 let key = RunnerPoolKey {
191 runtime: runtime.to_string(),
192 };
193 FunctionProvider::spawn(self, &key).await
194 }
195
196 pub async fn shutdown_runner(&self, handle: RunnerHandle) -> Result<(), ProviderError> {
197 FunctionProvider::shutdown(self, handle).await
198 }
199
200 pub async fn health_runner(
201 &self,
202 handle: &RunnerHandle,
203 ) -> Result<FunctionHealthStatus, ProviderError> {
204 FunctionProvider::health(self, handle).await
205 }
206
207 pub async fn register_function(
208 &self,
209 handle: &RunnerHandle,
210 def: &FunctionDefinition,
211 ) -> Result<(), ProviderError> {
212 FunctionProvider::register(self, handle, def).await
213 }
214
215 pub async fn unregister_function(
216 &self,
217 handle: &RunnerHandle,
218 id: &FunctionId,
219 ) -> Result<(), ProviderError> {
220 FunctionProvider::unregister(self, handle, id).await
221 }
222
223 pub async fn invoke_function(
224 &self,
225 handle: &RunnerHandle,
226 function_id: &FunctionId,
227 exchange: &Exchange,
228 ) -> Result<ExchangePatch, ProviderError> {
229 FunctionProvider::invoke(
230 self,
231 handle,
232 function_id,
233 exchange,
234 std::time::Duration::from_millis(5000),
235 )
236 .await
237 }
238
239 async fn pull_image_if_needed(&self) -> Result<(), ProviderError> {
241 match self.pull_policy {
242 PullPolicy::Never => {}
243 PullPolicy::Always => {
244 self.pull_image().await?;
245 }
246 PullPolicy::IfMissing => match self.docker.inspect_image(&self.image).await {
247 Ok(_) => {}
248 Err(bollard::errors::Error::DockerResponseServerError {
249 status_code: 404, ..
250 }) => {
251 self.pull_image().await?;
252 }
253 Err(e) => {
254 return Err(inspect_error_to_spawn_failed(&self.image, e));
255 }
256 },
257 }
258 Ok(())
259 }
260}
261
262fn inspect_error_to_spawn_failed(image: &str, e: bollard::errors::Error) -> ProviderError {
267 ProviderError::SpawnFailed(format!("failed to inspect image '{image}': {e}"))
268}
269
270#[cfg(test)]
271mod tests {
272 use super::*;
273
274 #[test]
275 fn inspect_non_404_becomes_spawn_failed() {
276 let err = bollard::errors::Error::DockerResponseServerError {
277 status_code: 500,
278 message: "internal server error".into(),
279 };
280 let result = inspect_error_to_spawn_failed("my-image:latest", err);
281 assert!(
282 matches!(result, ProviderError::SpawnFailed(ref msg) if msg.contains("my-image:latest")),
283 "expected SpawnFailed with image name, got: {result:?}"
284 );
285 }
286
287 #[test]
288 fn inspect_permission_denied_becomes_spawn_failed() {
289 let err = bollard::errors::Error::DockerResponseServerError {
290 status_code: 403,
291 message: "permission denied".into(),
292 };
293 let result = inspect_error_to_spawn_failed("private/image:1.0", err);
294 assert!(matches!(
295 result,
296 ProviderError::SpawnFailed(ref msg) if msg.contains("private/image:1.0")
297 ));
298 }
299}
300
301impl ContainerProvider {
302 async fn pull_image(&self) -> Result<(), ProviderError> {
304 use futures::StreamExt;
305
306 let options = bollard::query_parameters::CreateImageOptionsBuilder::default()
307 .from_image(&self.image)
308 .build();
309
310 let mut stream = self.docker.create_image(Some(options), None, None);
311 while let Some(item) = stream.next().await {
312 match item {
313 Ok(_) => {}
314 Err(e) => {
315 return Err(ProviderError::SpawnFailed(format!(
316 "image pull failed for '{}': {e}",
317 self.image
318 )));
319 }
320 }
321 }
322 Ok(())
323 }
324
325 fn spawn_log_forwarder(&self, container_id: String) {
326 use futures::StreamExt;
327
328 let docker = self.docker.clone();
329 tokio::spawn(async move {
330 let options = bollard::query_parameters::LogsOptions {
331 follow: true,
332 stdout: true,
333 stderr: true,
334 ..Default::default()
335 };
336 let mut stream = docker.logs(&container_id, Some(options));
337
338 while let Some(msg) = stream.next().await {
339 match msg {
340 Ok(log_output) => {
341 let text = match &log_output {
342 bollard::container::LogOutput::StdOut { message } => {
343 String::from_utf8_lossy(message).into_owned()
344 }
345 bollard::container::LogOutput::StdErr { message } => {
346 String::from_utf8_lossy(message).into_owned()
347 }
348 _ => continue,
349 };
350 let trimmed = text.trim_end();
351 if trimmed.is_empty() {
352 continue;
353 }
354 match &log_output {
355 bollard::container::LogOutput::StdOut { .. } => {
356 tracing::info!(target: "camel_function::runner", "{trimmed}");
357 }
358 bollard::container::LogOutput::StdErr { .. } => {
359 tracing::warn!(target: "camel_function::runner", "{trimmed}");
360 }
361 _ => {}
362 }
363 }
364 Err(e) => {
365 tracing::debug!(target: "camel_function::container", "log stream error: {e}");
366 break;
367 }
368 }
369 }
370 });
371 }
372}
373
374impl super::sealed::Sealed for ContainerProvider {}
375
376#[async_trait::async_trait]
377impl FunctionProvider for ContainerProvider {
378 async fn spawn(&self, _key: &RunnerPoolKey) -> Result<RunnerHandle, ProviderError> {
379 let hash = blake3::hash(
380 format!(
381 "{}",
382 std::time::SystemTime::now()
383 .duration_since(std::time::UNIX_EPOCH)
384 .unwrap_or_default()
385 .as_nanos()
386 )
387 .as_bytes(),
388 )
389 .to_hex();
390 let handle_id = format!("deno-{}", &hash[..16]);
391 let host_port = self.allocate_host_port().await?;
392
393 tracing::debug!(
394 target: "camel_function::container",
395 %handle_id,
396 image = %self.image,
397 "spawning container"
398 );
399
400 self.pull_image_if_needed().await?;
401
402 let labels = std::collections::HashMap::from([
403 ("camel.function.runner".to_string(), "true".to_string()),
404 ("camel.function.context".to_string(), handle_id.clone()),
405 (
406 "camel.function.instance".to_string(),
407 self.instance_id.clone(),
408 ),
409 ]);
410
411 let config = bollard::models::ContainerCreateBody {
412 image: Some(self.image.clone()),
413 env: Some(vec![
414 "PORT=8080".to_string(),
415 "DENO_NO_PROMPT=1".to_string(),
416 ]),
417 labels: Some(labels),
418 exposed_ports: Some(vec!["8080/tcp".to_string()]),
419 host_config: Some(bollard::models::HostConfig {
420 port_bindings: Some(std::collections::HashMap::from([(
421 "8080/tcp".to_string(),
422 Some(vec![bollard::models::PortBinding {
423 host_ip: Some("127.0.0.1".to_string()),
424 host_port: Some(host_port.to_string()),
425 }]),
426 )])),
427 init: Some(true),
428 auto_remove: Some(false),
429 ..Default::default()
430 }),
431 ..Default::default()
432 };
433
434 let create_opts = bollard::query_parameters::CreateContainerOptions {
435 name: Some(handle_id.clone()),
436 ..Default::default()
437 };
438
439 let create_result = self
440 .docker
441 .create_container(Some(create_opts), config)
442 .await
443 .map_err(|e| ProviderError::SpawnFailed(format!("create container: {e}")))?;
444
445 let container_id = create_result.id;
446
447 if let Err(e) = self.docker.start_container(&container_id, None).await {
448 let _ = self.stop_and_remove_container(&container_id).await;
449 return Err(ProviderError::SpawnFailed(format!("start container: {e}")));
450 }
451
452 let endpoint = format!("http://127.0.0.1:{host_port}");
453
454 let boot_timeout = self.boot_timeout;
456 let client = &self.client;
457 let endpoint_clone = endpoint.clone();
458 let ready_result = tokio::time::timeout(boot_timeout, async {
459 loop {
460 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
461 if client.health(&endpoint_clone).await.is_ok() {
462 return;
463 }
464 }
465 })
466 .await;
467
468 if ready_result.is_err() {
469 let _ = self.stop_and_remove_container(&container_id).await;
470 return Err(ProviderError::SpawnFailed("container boot timeout".into()));
471 }
472
473 self.spawn_log_forwarder(container_id.clone());
474
475 self.containers_by_handle.insert(
476 handle_id.clone(),
477 ContainerEntry {
478 container_id,
479 endpoint,
480 },
481 );
482
483 Ok(RunnerHandle {
484 id: handle_id,
485 state: Arc::new(std::sync::Mutex::new(crate::pool::RunnerState::Booting)),
486 cancel: CancellationToken::new(),
487 })
488 }
489
490 async fn shutdown(&self, handle: RunnerHandle) -> Result<(), ProviderError> {
491 handle.cancel.cancel();
492 let entry = match self.containers_by_handle.remove(&handle.id) {
493 Some((_, entry)) => entry,
494 None => return Ok(()),
495 };
496 let _ = self.client.shutdown(&entry.endpoint).await;
497 self.stop_and_remove_container(&entry.container_id).await;
498 Ok(())
499 }
500
501 async fn health(&self, handle: &RunnerHandle) -> Result<FunctionHealthStatus, ProviderError> {
502 let endpoint = self
503 .containers_by_handle
504 .get(&handle.id)
505 .ok_or_else(|| ProviderError::HealthFailed(format!("unknown handle {}", handle.id)))?
506 .endpoint
507 .clone();
508 self.client.health(&endpoint).await
509 }
510
511 async fn register(
512 &self,
513 handle: &RunnerHandle,
514 def: &FunctionDefinition,
515 ) -> Result<(), ProviderError> {
516 let endpoint = self
517 .containers_by_handle
518 .get(&handle.id)
519 .ok_or_else(|| ProviderError::RegisterFailed(format!("unknown handle {}", handle.id)))?
520 .endpoint
521 .clone();
522 self.client.register(&endpoint, def).await
523 }
524
525 async fn unregister(
526 &self,
527 handle: &RunnerHandle,
528 id: &FunctionId,
529 ) -> Result<(), ProviderError> {
530 let endpoint = self
531 .containers_by_handle
532 .get(&handle.id)
533 .ok_or_else(|| {
534 ProviderError::UnregisterFailed(format!("unknown handle {}", handle.id))
535 })?
536 .endpoint
537 .clone();
538 self.client.unregister(&endpoint, id).await
539 }
540
541 async fn invoke(
542 &self,
543 handle: &RunnerHandle,
544 id: &FunctionId,
545 ex: &Exchange,
546 timeout: std::time::Duration,
547 ) -> Result<ExchangePatch, ProviderError> {
548 let endpoint = self
549 .containers_by_handle
550 .get(&handle.id)
551 .ok_or_else(|| ProviderError::InvokeFailed(format!("unknown handle {}", handle.id)))?
552 .endpoint
553 .clone();
554 let resp = self.client.invoke(&endpoint, id, ex, timeout).await?;
555 if resp.ok {
556 let patch = resp.patch.unwrap_or_default();
557 Ok(patch
558 .to_exchange_patch()
559 .map_err(|e| ProviderError::InvokeFailed(e.to_string()))?)
560 } else {
561 let err = resp.error.unwrap_or_else(|| crate::protocol::ErrorWire {
562 kind: "unknown".into(),
563 message: "no error body".into(),
564 stack: None,
565 });
566 Err(ProviderError::InvokeFailed(format!(
567 "{}: {}",
568 err.kind, err.message
569 )))
570 }
571 }
572}
573
574impl Drop for ContainerProvider {
575 fn drop(&mut self) {
576 if self.containers_by_handle.is_empty() {
577 return;
578 }
579 let docker = self.docker.clone();
580 let container_ids: Vec<String> = self
581 .containers_by_handle
582 .iter()
583 .map(|e| e.container_id.clone())
584 .collect();
585 match tokio::runtime::Handle::try_current() {
586 Ok(handle) => {
587 drop(handle.spawn(async move {
588 for id in container_ids {
589 let _ = docker.stop_container(&id, None).await;
590 let _ = docker
591 .remove_container(
592 &id,
593 Some(bollard::query_parameters::RemoveContainerOptions {
594 force: true,
595 ..Default::default()
596 }),
597 )
598 .await;
599 }
600 }));
601 }
602 Err(_) => {
603 tracing::warn!(target: "camel_function::container", "container cleanup skipped: no tokio runtime");
604 }
605 }
606 }
607}