Skip to main content

camel_function/provider/
container.rs

1use crate::pool::{RunnerHandle, RunnerPoolKey};
2use crate::protocol::ProtocolClient;
3use camel_api::Exchange;
4use camel_api::function::*;
5use dashmap::DashMap;
6use std::sync::Arc;
7use tokio_util::sync::CancellationToken;
8
9use super::{FunctionHealthStatus, FunctionProvider, ProviderError};
10
11struct ContainerEntry {
12    container_id: String,
13    endpoint: String,
14}
15
16#[derive(Debug, Clone)]
17pub enum PullPolicy {
18    Always,
19    Never,
20    IfMissing,
21}
22
23impl std::fmt::Debug for ContainerProvider {
24    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
25        f.debug_struct("ContainerProvider")
26            .field("image", &self.image)
27            .field("active_containers", &self.containers_by_handle.len())
28            .finish()
29    }
30}
31
32#[derive(Debug, Clone)]
33pub struct ContainerProviderBuilder {
34    image: String,
35    boot_timeout: std::time::Duration,
36    pull_policy: PullPolicy,
37    instance_id: Option<String>,
38}
39
40impl Default for ContainerProviderBuilder {
41    fn default() -> Self {
42        Self {
43            image: "kennycallado/deno-runner:latest".to_string(),
44            boot_timeout: std::time::Duration::from_secs(10),
45            pull_policy: PullPolicy::IfMissing,
46            instance_id: None,
47        }
48    }
49}
50
51impl ContainerProviderBuilder {
52    pub fn new() -> Self {
53        Self::default()
54    }
55
56    pub fn image(mut self, image: impl Into<String>) -> Self {
57        self.image = image.into();
58        self
59    }
60
61    pub fn boot_timeout(mut self, timeout: std::time::Duration) -> Self {
62        self.boot_timeout = timeout;
63        self
64    }
65
66    pub fn pull_policy(mut self, policy: PullPolicy) -> Self {
67        self.pull_policy = policy;
68        self
69    }
70
71    pub fn instance_id(mut self, id: impl Into<String>) -> Self {
72        self.instance_id = Some(id.into());
73        self
74    }
75
76    pub fn build(self) -> Result<ContainerProvider, ProviderError> {
77        let docker = bollard::Docker::connect_with_local_defaults()
78            .map_err(|e| ProviderError::SpawnFailed(format!("docker connect: {e}")))?;
79        let instance_id = self.instance_id.unwrap_or_else(|| {
80            let hash = blake3::hash(
81                format!(
82                    "{}-{}",
83                    self.image,
84                    std::time::SystemTime::now()
85                        .duration_since(std::time::UNIX_EPOCH)
86                        .unwrap_or_default()
87                        .as_nanos()
88                )
89                .as_bytes(),
90            );
91            format!("inst-{}", &hash.to_hex()[..12])
92        });
93        Ok(ContainerProvider {
94            docker,
95            image: self.image,
96            boot_timeout: self.boot_timeout,
97            pull_policy: self.pull_policy,
98            client: ProtocolClient::new(),
99            containers_by_handle: DashMap::new(),
100            instance_id,
101        })
102    }
103}
104
105pub struct ContainerProvider {
106    docker: bollard::Docker,
107    image: String,
108    instance_id: String,
109    boot_timeout: std::time::Duration,
110    pull_policy: PullPolicy,
111    client: ProtocolClient,
112    containers_by_handle: DashMap<String, ContainerEntry>,
113}
114
115impl ContainerProvider {
116    pub fn builder() -> ContainerProviderBuilder {
117        ContainerProviderBuilder::new()
118    }
119
120    pub async fn cleanup_all(&self) {
121        let entries: Vec<(String, String)> = self
122            .containers_by_handle
123            .iter()
124            .map(|e| (e.key().clone(), e.container_id.clone()))
125            .collect();
126        for (handle_id, container_id) in entries {
127            self.stop_and_remove_container(&container_id).await;
128            self.containers_by_handle.remove(&handle_id);
129        }
130    }
131
132    pub fn instance_id(&self) -> &str {
133        &self.instance_id
134    }
135
136    pub async fn is_clean(&self) -> bool {
137        self.list_instance_containers().await.is_empty()
138    }
139
140    pub async fn list_instance_containers(&self) -> Vec<String> {
141        let options = bollard::query_parameters::ListContainersOptions {
142            filters: Some(std::collections::HashMap::from([(
143                "label".to_string(),
144                vec![format!("camel.function.instance={}", self.instance_id)],
145            )])),
146            ..Default::default()
147        };
148        match self.docker.list_containers(Some(options)).await {
149            Ok(containers) => containers.into_iter().filter_map(|c| c.id).collect(),
150            Err(_) => vec![],
151        }
152    }
153
154    async fn stop_and_remove_container(&self, container_id: &str) {
155        let _ = self.docker.stop_container(container_id, None).await;
156        match self
157            .docker
158            .remove_container(
159                container_id,
160                Some(bollard::query_parameters::RemoveContainerOptions {
161                    force: true,
162                    ..Default::default()
163                }),
164            )
165            .await
166        {
167            Ok(()) => {}
168            Err(bollard::errors::Error::DockerResponseServerError {
169                status_code: 404, ..
170            }) => {}
171            Err(e) => {
172                tracing::warn!(target: "camel_function::container", %container_id, "remove error: {e}");
173            }
174        }
175    }
176
177    async fn allocate_host_port(&self) -> Result<u16, ProviderError> {
178        let listener = tokio::net::TcpListener::bind("127.0.0.1:0")
179            .await
180            .map_err(|e| ProviderError::SpawnFailed(format!("allocate port: {e}")))?;
181        let port = listener
182            .local_addr()
183            .map_err(|e| ProviderError::SpawnFailed(format!("get port: {e}")))?
184            .port();
185        drop(listener);
186        Ok(port)
187    }
188
189    pub async fn spawn_runner(&self, runtime: &str) -> Result<RunnerHandle, ProviderError> {
190        let key = RunnerPoolKey {
191            runtime: runtime.to_string(),
192        };
193        FunctionProvider::spawn(self, &key).await
194    }
195
196    pub async fn shutdown_runner(&self, handle: RunnerHandle) -> Result<(), ProviderError> {
197        FunctionProvider::shutdown(self, handle).await
198    }
199
200    pub async fn health_runner(
201        &self,
202        handle: &RunnerHandle,
203    ) -> Result<FunctionHealthStatus, ProviderError> {
204        FunctionProvider::health(self, handle).await
205    }
206
207    pub async fn register_function(
208        &self,
209        handle: &RunnerHandle,
210        def: &FunctionDefinition,
211    ) -> Result<(), ProviderError> {
212        FunctionProvider::register(self, handle, def).await
213    }
214
215    pub async fn unregister_function(
216        &self,
217        handle: &RunnerHandle,
218        id: &FunctionId,
219    ) -> Result<(), ProviderError> {
220        FunctionProvider::unregister(self, handle, id).await
221    }
222
223    pub async fn invoke_function(
224        &self,
225        handle: &RunnerHandle,
226        function_id: &FunctionId,
227        exchange: &Exchange,
228    ) -> Result<ExchangePatch, ProviderError> {
229        FunctionProvider::invoke(
230            self,
231            handle,
232            function_id,
233            exchange,
234            std::time::Duration::from_millis(5000),
235        )
236        .await
237    }
238
239    /// Pull the configured image according to the current [PullPolicy].
240    async fn pull_image_if_needed(&self) -> Result<(), ProviderError> {
241        match self.pull_policy {
242            PullPolicy::Never => {}
243            PullPolicy::Always => {
244                self.pull_image().await?;
245            }
246            PullPolicy::IfMissing => match self.docker.inspect_image(&self.image).await {
247                Ok(_) => {}
248                Err(bollard::errors::Error::DockerResponseServerError {
249                    status_code: 404, ..
250                }) => {
251                    self.pull_image().await?;
252                }
253                Err(e) => {
254                    return Err(inspect_error_to_spawn_failed(&self.image, e));
255                }
256            },
257        }
258        Ok(())
259    }
260}
261
262/// Converts a non-404 Docker inspect error into a [`ProviderError::SpawnFailed`].
263///
264/// Extracted as a free function so the classification logic can be unit-tested
265/// without a live Docker daemon.
266fn inspect_error_to_spawn_failed(image: &str, e: bollard::errors::Error) -> ProviderError {
267    ProviderError::SpawnFailed(format!("failed to inspect image '{image}': {e}"))
268}
269
270#[cfg(test)]
271mod tests {
272    use super::*;
273
274    #[test]
275    fn inspect_non_404_becomes_spawn_failed() {
276        let err = bollard::errors::Error::DockerResponseServerError {
277            status_code: 500,
278            message: "internal server error".into(),
279        };
280        let result = inspect_error_to_spawn_failed("my-image:latest", err);
281        assert!(
282            matches!(result, ProviderError::SpawnFailed(ref msg) if msg.contains("my-image:latest")),
283            "expected SpawnFailed with image name, got: {result:?}"
284        );
285    }
286
287    #[test]
288    fn inspect_permission_denied_becomes_spawn_failed() {
289        let err = bollard::errors::Error::DockerResponseServerError {
290            status_code: 403,
291            message: "permission denied".into(),
292        };
293        let result = inspect_error_to_spawn_failed("private/image:1.0", err);
294        assert!(matches!(
295            result,
296            ProviderError::SpawnFailed(ref msg) if msg.contains("private/image:1.0")
297        ));
298    }
299}
300
301impl ContainerProvider {
302    /// Execute the image pull via bollard's `create_image` API.
303    async fn pull_image(&self) -> Result<(), ProviderError> {
304        use futures::StreamExt;
305
306        let options = bollard::query_parameters::CreateImageOptionsBuilder::default()
307            .from_image(&self.image)
308            .build();
309
310        let mut stream = self.docker.create_image(Some(options), None, None);
311        while let Some(item) = stream.next().await {
312            match item {
313                Ok(_) => {}
314                Err(e) => {
315                    return Err(ProviderError::SpawnFailed(format!(
316                        "image pull failed for '{}': {e}",
317                        self.image
318                    )));
319                }
320            }
321        }
322        Ok(())
323    }
324
325    fn spawn_log_forwarder(&self, container_id: String) {
326        use futures::StreamExt;
327
328        let docker = self.docker.clone();
329        tokio::spawn(async move {
330            let options = bollard::query_parameters::LogsOptions {
331                follow: true,
332                stdout: true,
333                stderr: true,
334                ..Default::default()
335            };
336            let mut stream = docker.logs(&container_id, Some(options));
337
338            while let Some(msg) = stream.next().await {
339                match msg {
340                    Ok(log_output) => {
341                        let text = match &log_output {
342                            bollard::container::LogOutput::StdOut { message } => {
343                                String::from_utf8_lossy(message).into_owned()
344                            }
345                            bollard::container::LogOutput::StdErr { message } => {
346                                String::from_utf8_lossy(message).into_owned()
347                            }
348                            _ => continue,
349                        };
350                        let trimmed = text.trim_end();
351                        if trimmed.is_empty() {
352                            continue;
353                        }
354                        match &log_output {
355                            bollard::container::LogOutput::StdOut { .. } => {
356                                tracing::info!(target: "camel_function::runner", "{trimmed}");
357                            }
358                            bollard::container::LogOutput::StdErr { .. } => {
359                                tracing::warn!(target: "camel_function::runner", "{trimmed}");
360                            }
361                            _ => {}
362                        }
363                    }
364                    Err(e) => {
365                        tracing::debug!(target: "camel_function::container", "log stream error: {e}");
366                        break;
367                    }
368                }
369            }
370        });
371    }
372}
373
374impl super::sealed::Sealed for ContainerProvider {}
375
376#[async_trait::async_trait]
377impl FunctionProvider for ContainerProvider {
378    async fn spawn(&self, _key: &RunnerPoolKey) -> Result<RunnerHandle, ProviderError> {
379        let hash = blake3::hash(
380            format!(
381                "{}",
382                std::time::SystemTime::now()
383                    .duration_since(std::time::UNIX_EPOCH)
384                    .unwrap_or_default()
385                    .as_nanos()
386            )
387            .as_bytes(),
388        )
389        .to_hex();
390        let handle_id = format!("deno-{}", &hash[..16]);
391        let host_port = self.allocate_host_port().await?;
392
393        tracing::debug!(
394            target: "camel_function::container",
395            %handle_id,
396            image = %self.image,
397            "spawning container"
398        );
399
400        self.pull_image_if_needed().await?;
401
402        let labels = std::collections::HashMap::from([
403            ("camel.function.runner".to_string(), "true".to_string()),
404            ("camel.function.context".to_string(), handle_id.clone()),
405            (
406                "camel.function.instance".to_string(),
407                self.instance_id.clone(),
408            ),
409        ]);
410
411        let config = bollard::models::ContainerCreateBody {
412            image: Some(self.image.clone()),
413            env: Some(vec![
414                "PORT=8080".to_string(),
415                "DENO_NO_PROMPT=1".to_string(),
416            ]),
417            labels: Some(labels),
418            exposed_ports: Some(vec!["8080/tcp".to_string()]),
419            host_config: Some(bollard::models::HostConfig {
420                port_bindings: Some(std::collections::HashMap::from([(
421                    "8080/tcp".to_string(),
422                    Some(vec![bollard::models::PortBinding {
423                        host_ip: Some("127.0.0.1".to_string()),
424                        host_port: Some(host_port.to_string()),
425                    }]),
426                )])),
427                init: Some(true),
428                auto_remove: Some(false),
429                ..Default::default()
430            }),
431            ..Default::default()
432        };
433
434        let create_opts = bollard::query_parameters::CreateContainerOptions {
435            name: Some(handle_id.clone()),
436            ..Default::default()
437        };
438
439        let create_result = self
440            .docker
441            .create_container(Some(create_opts), config)
442            .await
443            .map_err(|e| ProviderError::SpawnFailed(format!("create container: {e}")))?;
444
445        let container_id = create_result.id;
446
447        if let Err(e) = self.docker.start_container(&container_id, None).await {
448            let _ = self.stop_and_remove_container(&container_id).await;
449            return Err(ProviderError::SpawnFailed(format!("start container: {e}")));
450        }
451
452        let endpoint = format!("http://127.0.0.1:{host_port}");
453
454        // Wait for the container to become healthy within boot_timeout.
455        let boot_timeout = self.boot_timeout;
456        let client = &self.client;
457        let endpoint_clone = endpoint.clone();
458        let ready_result = tokio::time::timeout(boot_timeout, async {
459            loop {
460                tokio::time::sleep(std::time::Duration::from_millis(100)).await;
461                if client.health(&endpoint_clone).await.is_ok() {
462                    return;
463                }
464            }
465        })
466        .await;
467
468        if ready_result.is_err() {
469            let _ = self.stop_and_remove_container(&container_id).await;
470            return Err(ProviderError::SpawnFailed("container boot timeout".into()));
471        }
472
473        self.spawn_log_forwarder(container_id.clone());
474
475        self.containers_by_handle.insert(
476            handle_id.clone(),
477            ContainerEntry {
478                container_id,
479                endpoint,
480            },
481        );
482
483        Ok(RunnerHandle {
484            id: handle_id,
485            state: Arc::new(std::sync::Mutex::new(crate::pool::RunnerState::Booting)),
486            cancel: CancellationToken::new(),
487        })
488    }
489
490    async fn shutdown(&self, handle: RunnerHandle) -> Result<(), ProviderError> {
491        handle.cancel.cancel();
492        let entry = match self.containers_by_handle.remove(&handle.id) {
493            Some((_, entry)) => entry,
494            None => return Ok(()),
495        };
496        let _ = self.client.shutdown(&entry.endpoint).await;
497        self.stop_and_remove_container(&entry.container_id).await;
498        Ok(())
499    }
500
501    async fn health(&self, handle: &RunnerHandle) -> Result<FunctionHealthStatus, ProviderError> {
502        let endpoint = self
503            .containers_by_handle
504            .get(&handle.id)
505            .ok_or_else(|| ProviderError::HealthFailed(format!("unknown handle {}", handle.id)))?
506            .endpoint
507            .clone();
508        self.client.health(&endpoint).await
509    }
510
511    async fn register(
512        &self,
513        handle: &RunnerHandle,
514        def: &FunctionDefinition,
515    ) -> Result<(), ProviderError> {
516        let endpoint = self
517            .containers_by_handle
518            .get(&handle.id)
519            .ok_or_else(|| ProviderError::RegisterFailed(format!("unknown handle {}", handle.id)))?
520            .endpoint
521            .clone();
522        self.client.register(&endpoint, def).await
523    }
524
525    async fn unregister(
526        &self,
527        handle: &RunnerHandle,
528        id: &FunctionId,
529    ) -> Result<(), ProviderError> {
530        let endpoint = self
531            .containers_by_handle
532            .get(&handle.id)
533            .ok_or_else(|| {
534                ProviderError::UnregisterFailed(format!("unknown handle {}", handle.id))
535            })?
536            .endpoint
537            .clone();
538        self.client.unregister(&endpoint, id).await
539    }
540
541    async fn invoke(
542        &self,
543        handle: &RunnerHandle,
544        id: &FunctionId,
545        ex: &Exchange,
546        timeout: std::time::Duration,
547    ) -> Result<ExchangePatch, ProviderError> {
548        let endpoint = self
549            .containers_by_handle
550            .get(&handle.id)
551            .ok_or_else(|| ProviderError::InvokeFailed(format!("unknown handle {}", handle.id)))?
552            .endpoint
553            .clone();
554        let resp = self.client.invoke(&endpoint, id, ex, timeout).await?;
555        if resp.ok {
556            let patch = resp.patch.unwrap_or_default();
557            Ok(patch
558                .to_exchange_patch()
559                .map_err(|e| ProviderError::InvokeFailed(e.to_string()))?)
560        } else {
561            let err = resp.error.unwrap_or_else(|| crate::protocol::ErrorWire {
562                kind: "unknown".into(),
563                message: "no error body".into(),
564                stack: None,
565            });
566            Err(ProviderError::InvokeFailed(format!(
567                "{}: {}",
568                err.kind, err.message
569            )))
570        }
571    }
572}
573
574impl Drop for ContainerProvider {
575    fn drop(&mut self) {
576        if self.containers_by_handle.is_empty() {
577            return;
578        }
579        let docker = self.docker.clone();
580        let container_ids: Vec<String> = self
581            .containers_by_handle
582            .iter()
583            .map(|e| e.container_id.clone())
584            .collect();
585        match tokio::runtime::Handle::try_current() {
586            Ok(handle) => {
587                drop(handle.spawn(async move {
588                    for id in container_ids {
589                        let _ = docker.stop_container(&id, None).await;
590                        let _ = docker
591                            .remove_container(
592                                &id,
593                                Some(bollard::query_parameters::RemoveContainerOptions {
594                                    force: true,
595                                    ..Default::default()
596                                }),
597                            )
598                            .await;
599                    }
600                }));
601            }
602            Err(_) => {
603                tracing::warn!(target: "camel_function::container", "container cleanup skipped: no tokio runtime");
604            }
605        }
606    }
607}