Skip to main content

camel_function/provider/
container.rs

1use crate::pool::{RunnerHandle, RunnerPoolKey};
2use crate::protocol::ProtocolClient;
3use camel_api::Exchange;
4use camel_api::function::*;
5use dashmap::DashMap;
6use std::sync::Arc;
7use tokio_util::sync::CancellationToken;
8
9use super::{FunctionProvider, HealthReport, ProviderError};
10
11struct ContainerEntry {
12    container_id: String,
13    endpoint: String,
14    #[allow(dead_code)]
15    host_port: u16,
16}
17
18#[derive(Debug, Clone)]
19pub enum PullPolicy {
20    Always,
21    Never,
22    IfMissing,
23}
24
25impl std::fmt::Debug for ContainerProvider {
26    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
27        f.debug_struct("ContainerProvider")
28            .field("image", &self.image)
29            .field("active_containers", &self.containers_by_handle.len())
30            .finish()
31    }
32}
33
34#[derive(Debug, Clone)]
35pub struct ContainerProviderBuilder {
36    image: String,
37    boot_timeout: std::time::Duration,
38    pull_policy: PullPolicy,
39    instance_id: Option<String>,
40}
41
42impl Default for ContainerProviderBuilder {
43    fn default() -> Self {
44        Self {
45            image: "kennycallado/deno-runner:latest".to_string(),
46            boot_timeout: std::time::Duration::from_secs(10),
47            pull_policy: PullPolicy::IfMissing,
48            instance_id: None,
49        }
50    }
51}
52
53impl ContainerProviderBuilder {
54    pub fn new() -> Self {
55        Self::default()
56    }
57
58    pub fn image(mut self, image: impl Into<String>) -> Self {
59        self.image = image.into();
60        self
61    }
62
63    pub fn boot_timeout(mut self, timeout: std::time::Duration) -> Self {
64        self.boot_timeout = timeout;
65        self
66    }
67
68    pub fn pull_policy(mut self, policy: PullPolicy) -> Self {
69        self.pull_policy = policy;
70        self
71    }
72
73    pub fn instance_id(mut self, id: impl Into<String>) -> Self {
74        self.instance_id = Some(id.into());
75        self
76    }
77
78    pub fn build(self) -> Result<ContainerProvider, ProviderError> {
79        let docker = bollard::Docker::connect_with_local_defaults()
80            .map_err(|e| ProviderError::SpawnFailed(format!("docker connect: {e}")))?;
81        let instance_id = self.instance_id.unwrap_or_else(|| {
82            let hash = blake3::hash(
83                format!(
84                    "{}-{}",
85                    self.image,
86                    std::time::SystemTime::now()
87                        .duration_since(std::time::UNIX_EPOCH)
88                        .unwrap_or_default()
89                        .as_nanos()
90                )
91                .as_bytes(),
92            );
93            format!("inst-{}", &hash.to_hex()[..12])
94        });
95        Ok(ContainerProvider {
96            docker,
97            image: self.image,
98            boot_timeout: self.boot_timeout,
99            pull_policy: self.pull_policy,
100            client: ProtocolClient::new(),
101            containers_by_handle: DashMap::new(),
102            instance_id,
103        })
104    }
105}
106
107pub struct ContainerProvider {
108    docker: bollard::Docker,
109    image: String,
110    instance_id: String,
111    #[allow(dead_code)]
112    boot_timeout: std::time::Duration,
113    #[allow(dead_code)]
114    pull_policy: PullPolicy,
115    client: ProtocolClient,
116    containers_by_handle: DashMap<String, ContainerEntry>,
117}
118
119impl ContainerProvider {
120    pub fn builder() -> ContainerProviderBuilder {
121        ContainerProviderBuilder::new()
122    }
123
124    pub async fn cleanup_all(&self) {
125        let entries: Vec<(String, String)> = self
126            .containers_by_handle
127            .iter()
128            .map(|e| (e.key().clone(), e.container_id.clone()))
129            .collect();
130        for (handle_id, container_id) in entries {
131            self.stop_and_remove_container(&container_id).await;
132            self.containers_by_handle.remove(&handle_id);
133        }
134    }
135
136    pub fn instance_id(&self) -> &str {
137        &self.instance_id
138    }
139
140    pub async fn is_clean(&self) -> bool {
141        self.list_instance_containers().await.is_empty()
142    }
143
144    pub async fn list_instance_containers(&self) -> Vec<String> {
145        let options = bollard::query_parameters::ListContainersOptions {
146            filters: Some(std::collections::HashMap::from([(
147                "label".to_string(),
148                vec![format!("camel.function.instance={}", self.instance_id)],
149            )])),
150            ..Default::default()
151        };
152        match self.docker.list_containers(Some(options)).await {
153            Ok(containers) => containers.into_iter().filter_map(|c| c.id).collect(),
154            Err(_) => vec![],
155        }
156    }
157
158    async fn stop_and_remove_container(&self, container_id: &str) {
159        let _ = self.docker.stop_container(container_id, None).await;
160        match self
161            .docker
162            .remove_container(
163                container_id,
164                Some(bollard::query_parameters::RemoveContainerOptions {
165                    force: true,
166                    ..Default::default()
167                }),
168            )
169            .await
170        {
171            Ok(()) => {}
172            Err(bollard::errors::Error::DockerResponseServerError {
173                status_code: 404, ..
174            }) => {}
175            Err(e) => {
176                tracing::warn!(target: "camel_function::container", %container_id, "remove error: {e}");
177            }
178        }
179    }
180
181    async fn allocate_host_port(&self) -> Result<u16, ProviderError> {
182        let listener = tokio::net::TcpListener::bind("127.0.0.1:0")
183            .await
184            .map_err(|e| ProviderError::SpawnFailed(format!("allocate port: {e}")))?;
185        let port = listener
186            .local_addr()
187            .map_err(|e| ProviderError::SpawnFailed(format!("get port: {e}")))?
188            .port();
189        drop(listener);
190        Ok(port)
191    }
192
193    pub async fn spawn_runner(&self, runtime: &str) -> Result<RunnerHandle, ProviderError> {
194        let key = RunnerPoolKey {
195            runtime: runtime.to_string(),
196        };
197        FunctionProvider::spawn(self, &key).await
198    }
199
200    pub async fn shutdown_runner(&self, handle: RunnerHandle) -> Result<(), ProviderError> {
201        FunctionProvider::shutdown(self, handle).await
202    }
203
204    pub async fn health_runner(
205        &self,
206        handle: &RunnerHandle,
207    ) -> Result<HealthReport, ProviderError> {
208        FunctionProvider::health(self, handle).await
209    }
210
211    pub async fn register_function(
212        &self,
213        handle: &RunnerHandle,
214        def: &FunctionDefinition,
215    ) -> Result<(), ProviderError> {
216        FunctionProvider::register(self, handle, def).await
217    }
218
219    pub async fn unregister_function(
220        &self,
221        handle: &RunnerHandle,
222        id: &FunctionId,
223    ) -> Result<(), ProviderError> {
224        FunctionProvider::unregister(self, handle, id).await
225    }
226
227    pub async fn invoke_function(
228        &self,
229        handle: &RunnerHandle,
230        function_id: &FunctionId,
231        exchange: &Exchange,
232    ) -> Result<ExchangePatch, ProviderError> {
233        FunctionProvider::invoke(
234            self,
235            handle,
236            function_id,
237            exchange,
238            std::time::Duration::from_millis(5000),
239        )
240        .await
241    }
242
243    fn spawn_log_forwarder(&self, container_id: String) {
244        use futures::StreamExt;
245
246        let docker = self.docker.clone();
247        tokio::spawn(async move {
248            let options = bollard::query_parameters::LogsOptions {
249                follow: true,
250                stdout: true,
251                stderr: true,
252                ..Default::default()
253            };
254            let mut stream = docker.logs(&container_id, Some(options));
255
256            while let Some(msg) = stream.next().await {
257                match msg {
258                    Ok(log_output) => {
259                        let text = match &log_output {
260                            bollard::container::LogOutput::StdOut { message } => {
261                                String::from_utf8_lossy(message).into_owned()
262                            }
263                            bollard::container::LogOutput::StdErr { message } => {
264                                String::from_utf8_lossy(message).into_owned()
265                            }
266                            _ => continue,
267                        };
268                        let trimmed = text.trim_end();
269                        if trimmed.is_empty() {
270                            continue;
271                        }
272                        match &log_output {
273                            bollard::container::LogOutput::StdOut { .. } => {
274                                tracing::info!(target: "camel_function::runner", "{trimmed}");
275                            }
276                            bollard::container::LogOutput::StdErr { .. } => {
277                                tracing::warn!(target: "camel_function::runner", "{trimmed}");
278                            }
279                            _ => {}
280                        }
281                    }
282                    Err(e) => {
283                        tracing::debug!(target: "camel_function::container", "log stream error: {e}");
284                        break;
285                    }
286                }
287            }
288        });
289    }
290}
291
292impl super::sealed::Sealed for ContainerProvider {}
293
294#[async_trait::async_trait]
295impl FunctionProvider for ContainerProvider {
296    async fn spawn(&self, _key: &RunnerPoolKey) -> Result<RunnerHandle, ProviderError> {
297        let hash = blake3::hash(
298            format!(
299                "{}",
300                std::time::SystemTime::now()
301                    .duration_since(std::time::UNIX_EPOCH)
302                    .unwrap_or_default()
303                    .as_nanos()
304            )
305            .as_bytes(),
306        )
307        .to_hex();
308        let handle_id = format!("deno-{}", &hash[..16]);
309        let host_port = self.allocate_host_port().await?;
310
311        tracing::debug!(
312            target: "camel_function::container",
313            %handle_id,
314            image = %self.image,
315            "spawning container"
316        );
317
318        let labels = std::collections::HashMap::from([
319            ("camel.function.runner".to_string(), "true".to_string()),
320            ("camel.function.context".to_string(), handle_id.clone()),
321            (
322                "camel.function.instance".to_string(),
323                self.instance_id.clone(),
324            ),
325        ]);
326
327        let config = bollard::models::ContainerCreateBody {
328            image: Some(self.image.clone()),
329            env: Some(vec![
330                "PORT=8080".to_string(),
331                "DENO_NO_PROMPT=1".to_string(),
332            ]),
333            labels: Some(labels),
334            exposed_ports: Some(vec!["8080/tcp".to_string()]),
335            host_config: Some(bollard::models::HostConfig {
336                port_bindings: Some(std::collections::HashMap::from([(
337                    "8080/tcp".to_string(),
338                    Some(vec![bollard::models::PortBinding {
339                        host_ip: Some("127.0.0.1".to_string()),
340                        host_port: Some(host_port.to_string()),
341                    }]),
342                )])),
343                init: Some(true),
344                auto_remove: Some(false),
345                ..Default::default()
346            }),
347            ..Default::default()
348        };
349
350        let create_opts = bollard::query_parameters::CreateContainerOptions {
351            name: Some(handle_id.clone()),
352            ..Default::default()
353        };
354
355        let create_result = self
356            .docker
357            .create_container(Some(create_opts), config)
358            .await
359            .map_err(|e| ProviderError::SpawnFailed(format!("create container: {e}")))?;
360
361        let container_id = create_result.id;
362
363        if let Err(e) = self.docker.start_container(&container_id, None).await {
364            let _ = self.stop_and_remove_container(&container_id).await;
365            return Err(ProviderError::SpawnFailed(format!("start container: {e}")));
366        }
367
368        let endpoint = format!("http://127.0.0.1:{host_port}");
369
370        self.spawn_log_forwarder(container_id.clone());
371
372        self.containers_by_handle.insert(
373            handle_id.clone(),
374            ContainerEntry {
375                container_id,
376                endpoint,
377                host_port,
378            },
379        );
380
381        Ok(RunnerHandle {
382            id: handle_id,
383            state: Arc::new(std::sync::Mutex::new(crate::pool::RunnerState::Booting)),
384            cancel: CancellationToken::new(),
385        })
386    }
387
388    async fn shutdown(&self, handle: RunnerHandle) -> Result<(), ProviderError> {
389        handle.cancel.cancel();
390        let entry = match self.containers_by_handle.remove(&handle.id) {
391            Some((_, entry)) => entry,
392            None => return Ok(()),
393        };
394        let _ = self.client.shutdown(&entry.endpoint).await;
395        self.stop_and_remove_container(&entry.container_id).await;
396        Ok(())
397    }
398
399    async fn health(&self, handle: &RunnerHandle) -> Result<HealthReport, ProviderError> {
400        let endpoint = self
401            .containers_by_handle
402            .get(&handle.id)
403            .ok_or_else(|| ProviderError::HealthFailed(format!("unknown handle {}", handle.id)))?
404            .endpoint
405            .clone();
406        self.client.health(&endpoint).await
407    }
408
409    async fn register(
410        &self,
411        handle: &RunnerHandle,
412        def: &FunctionDefinition,
413    ) -> Result<(), ProviderError> {
414        let endpoint = self
415            .containers_by_handle
416            .get(&handle.id)
417            .ok_or_else(|| ProviderError::RegisterFailed(format!("unknown handle {}", handle.id)))?
418            .endpoint
419            .clone();
420        self.client.register(&endpoint, def).await
421    }
422
423    async fn unregister(
424        &self,
425        handle: &RunnerHandle,
426        id: &FunctionId,
427    ) -> Result<(), ProviderError> {
428        let endpoint = self
429            .containers_by_handle
430            .get(&handle.id)
431            .ok_or_else(|| {
432                ProviderError::UnregisterFailed(format!("unknown handle {}", handle.id))
433            })?
434            .endpoint
435            .clone();
436        self.client.unregister(&endpoint, id).await
437    }
438
439    async fn invoke(
440        &self,
441        handle: &RunnerHandle,
442        id: &FunctionId,
443        ex: &Exchange,
444        timeout: std::time::Duration,
445    ) -> Result<ExchangePatch, ProviderError> {
446        let endpoint = self
447            .containers_by_handle
448            .get(&handle.id)
449            .ok_or_else(|| ProviderError::InvokeFailed(format!("unknown handle {}", handle.id)))?
450            .endpoint
451            .clone();
452        let resp = self.client.invoke(&endpoint, id, ex, timeout).await?;
453        if resp.ok {
454            let patch = resp.patch.unwrap_or_default();
455            Ok(patch.to_exchange_patch())
456        } else {
457            let err = resp.error.unwrap_or_else(|| crate::protocol::ErrorWire {
458                kind: "unknown".into(),
459                message: "no error body".into(),
460                stack: None,
461            });
462            Err(ProviderError::InvokeFailed(format!(
463                "{}: {}",
464                err.kind, err.message
465            )))
466        }
467    }
468}
469
470impl Drop for ContainerProvider {
471    fn drop(&mut self) {
472        if self.containers_by_handle.is_empty() {
473            return;
474        }
475        let docker = self.docker.clone();
476        let container_ids: Vec<String> = self
477            .containers_by_handle
478            .iter()
479            .map(|e| e.container_id.clone())
480            .collect();
481        match tokio::runtime::Handle::try_current() {
482            Ok(handle) => {
483                drop(handle.spawn(async move {
484                    for id in container_ids {
485                        let _ = docker.stop_container(&id, None).await;
486                        let _ = docker
487                            .remove_container(
488                                &id,
489                                Some(bollard::query_parameters::RemoveContainerOptions {
490                                    force: true,
491                                    ..Default::default()
492                                }),
493                            )
494                            .await;
495                    }
496                }));
497            }
498            Err(_) => {
499                tracing::warn!(target: "camel_function::container", "container cleanup skipped: no tokio runtime");
500            }
501        }
502    }
503}