1use crate::pool::{RunnerHandle, RunnerPoolKey};
2use crate::protocol::ProtocolClient;
3use camel_api::Exchange;
4use camel_api::function::*;
5use dashmap::DashMap;
6use std::sync::Arc;
7use tokio_util::sync::CancellationToken;
8
9use super::{FunctionProvider, HealthReport, ProviderError};
10
11struct ContainerEntry {
12 container_id: String,
13 endpoint: String,
14 #[allow(dead_code)]
15 host_port: u16,
16}
17
18#[derive(Debug, Clone)]
19pub enum PullPolicy {
20 Always,
21 Never,
22 IfMissing,
23}
24
25impl std::fmt::Debug for ContainerProvider {
26 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
27 f.debug_struct("ContainerProvider")
28 .field("image", &self.image)
29 .field("active_containers", &self.containers_by_handle.len())
30 .finish()
31 }
32}
33
34#[derive(Debug, Clone)]
35pub struct ContainerProviderBuilder {
36 image: String,
37 boot_timeout: std::time::Duration,
38 pull_policy: PullPolicy,
39 instance_id: Option<String>,
40}
41
42impl Default for ContainerProviderBuilder {
43 fn default() -> Self {
44 Self {
45 image: "kennycallado/deno-runner:latest".to_string(),
46 boot_timeout: std::time::Duration::from_secs(10),
47 pull_policy: PullPolicy::IfMissing,
48 instance_id: None,
49 }
50 }
51}
52
53impl ContainerProviderBuilder {
54 pub fn new() -> Self {
55 Self::default()
56 }
57
58 pub fn image(mut self, image: impl Into<String>) -> Self {
59 self.image = image.into();
60 self
61 }
62
63 pub fn boot_timeout(mut self, timeout: std::time::Duration) -> Self {
64 self.boot_timeout = timeout;
65 self
66 }
67
68 pub fn pull_policy(mut self, policy: PullPolicy) -> Self {
69 self.pull_policy = policy;
70 self
71 }
72
73 pub fn instance_id(mut self, id: impl Into<String>) -> Self {
74 self.instance_id = Some(id.into());
75 self
76 }
77
78 pub fn build(self) -> Result<ContainerProvider, ProviderError> {
79 let docker = bollard::Docker::connect_with_local_defaults()
80 .map_err(|e| ProviderError::SpawnFailed(format!("docker connect: {e}")))?;
81 let instance_id = self.instance_id.unwrap_or_else(|| {
82 let hash = blake3::hash(
83 format!(
84 "{}-{}",
85 self.image,
86 std::time::SystemTime::now()
87 .duration_since(std::time::UNIX_EPOCH)
88 .unwrap_or_default()
89 .as_nanos()
90 )
91 .as_bytes(),
92 );
93 format!("inst-{}", &hash.to_hex()[..12])
94 });
95 Ok(ContainerProvider {
96 docker,
97 image: self.image,
98 boot_timeout: self.boot_timeout,
99 pull_policy: self.pull_policy,
100 client: ProtocolClient::new(),
101 containers_by_handle: DashMap::new(),
102 instance_id,
103 })
104 }
105}
106
107pub struct ContainerProvider {
108 docker: bollard::Docker,
109 image: String,
110 instance_id: String,
111 #[allow(dead_code)]
112 boot_timeout: std::time::Duration,
113 #[allow(dead_code)]
114 pull_policy: PullPolicy,
115 client: ProtocolClient,
116 containers_by_handle: DashMap<String, ContainerEntry>,
117}
118
119impl ContainerProvider {
120 pub fn builder() -> ContainerProviderBuilder {
121 ContainerProviderBuilder::new()
122 }
123
124 pub async fn cleanup_all(&self) {
125 let entries: Vec<(String, String)> = self
126 .containers_by_handle
127 .iter()
128 .map(|e| (e.key().clone(), e.container_id.clone()))
129 .collect();
130 for (handle_id, container_id) in entries {
131 self.stop_and_remove_container(&container_id).await;
132 self.containers_by_handle.remove(&handle_id);
133 }
134 }
135
136 pub fn instance_id(&self) -> &str {
137 &self.instance_id
138 }
139
140 pub async fn is_clean(&self) -> bool {
141 self.list_instance_containers().await.is_empty()
142 }
143
144 pub async fn list_instance_containers(&self) -> Vec<String> {
145 let options = bollard::query_parameters::ListContainersOptions {
146 filters: Some(std::collections::HashMap::from([(
147 "label".to_string(),
148 vec![format!("camel.function.instance={}", self.instance_id)],
149 )])),
150 ..Default::default()
151 };
152 match self.docker.list_containers(Some(options)).await {
153 Ok(containers) => containers.into_iter().filter_map(|c| c.id).collect(),
154 Err(_) => vec![],
155 }
156 }
157
158 async fn stop_and_remove_container(&self, container_id: &str) {
159 let _ = self.docker.stop_container(container_id, None).await;
160 match self
161 .docker
162 .remove_container(
163 container_id,
164 Some(bollard::query_parameters::RemoveContainerOptions {
165 force: true,
166 ..Default::default()
167 }),
168 )
169 .await
170 {
171 Ok(()) => {}
172 Err(bollard::errors::Error::DockerResponseServerError {
173 status_code: 404, ..
174 }) => {}
175 Err(e) => {
176 tracing::warn!(target: "camel_function::container", %container_id, "remove error: {e}");
177 }
178 }
179 }
180
181 async fn allocate_host_port(&self) -> Result<u16, ProviderError> {
182 let listener = tokio::net::TcpListener::bind("127.0.0.1:0")
183 .await
184 .map_err(|e| ProviderError::SpawnFailed(format!("allocate port: {e}")))?;
185 let port = listener
186 .local_addr()
187 .map_err(|e| ProviderError::SpawnFailed(format!("get port: {e}")))?
188 .port();
189 drop(listener);
190 Ok(port)
191 }
192
193 pub async fn spawn_runner(&self, runtime: &str) -> Result<RunnerHandle, ProviderError> {
194 let key = RunnerPoolKey {
195 runtime: runtime.to_string(),
196 };
197 FunctionProvider::spawn(self, &key).await
198 }
199
200 pub async fn shutdown_runner(&self, handle: RunnerHandle) -> Result<(), ProviderError> {
201 FunctionProvider::shutdown(self, handle).await
202 }
203
204 pub async fn health_runner(
205 &self,
206 handle: &RunnerHandle,
207 ) -> Result<HealthReport, ProviderError> {
208 FunctionProvider::health(self, handle).await
209 }
210
211 pub async fn register_function(
212 &self,
213 handle: &RunnerHandle,
214 def: &FunctionDefinition,
215 ) -> Result<(), ProviderError> {
216 FunctionProvider::register(self, handle, def).await
217 }
218
219 pub async fn unregister_function(
220 &self,
221 handle: &RunnerHandle,
222 id: &FunctionId,
223 ) -> Result<(), ProviderError> {
224 FunctionProvider::unregister(self, handle, id).await
225 }
226
227 pub async fn invoke_function(
228 &self,
229 handle: &RunnerHandle,
230 function_id: &FunctionId,
231 exchange: &Exchange,
232 ) -> Result<ExchangePatch, ProviderError> {
233 FunctionProvider::invoke(
234 self,
235 handle,
236 function_id,
237 exchange,
238 std::time::Duration::from_millis(5000),
239 )
240 .await
241 }
242
243 fn spawn_log_forwarder(&self, container_id: String) {
244 use futures::StreamExt;
245
246 let docker = self.docker.clone();
247 tokio::spawn(async move {
248 let options = bollard::query_parameters::LogsOptions {
249 follow: true,
250 stdout: true,
251 stderr: true,
252 ..Default::default()
253 };
254 let mut stream = docker.logs(&container_id, Some(options));
255
256 while let Some(msg) = stream.next().await {
257 match msg {
258 Ok(log_output) => {
259 let text = match &log_output {
260 bollard::container::LogOutput::StdOut { message } => {
261 String::from_utf8_lossy(message).into_owned()
262 }
263 bollard::container::LogOutput::StdErr { message } => {
264 String::from_utf8_lossy(message).into_owned()
265 }
266 _ => continue,
267 };
268 let trimmed = text.trim_end();
269 if trimmed.is_empty() {
270 continue;
271 }
272 match &log_output {
273 bollard::container::LogOutput::StdOut { .. } => {
274 tracing::info!(target: "camel_function::runner", "{trimmed}");
275 }
276 bollard::container::LogOutput::StdErr { .. } => {
277 tracing::warn!(target: "camel_function::runner", "{trimmed}");
278 }
279 _ => {}
280 }
281 }
282 Err(e) => {
283 tracing::debug!(target: "camel_function::container", "log stream error: {e}");
284 break;
285 }
286 }
287 }
288 });
289 }
290}
291
292impl super::sealed::Sealed for ContainerProvider {}
293
294#[async_trait::async_trait]
295impl FunctionProvider for ContainerProvider {
296 async fn spawn(&self, _key: &RunnerPoolKey) -> Result<RunnerHandle, ProviderError> {
297 let hash = blake3::hash(
298 format!(
299 "{}",
300 std::time::SystemTime::now()
301 .duration_since(std::time::UNIX_EPOCH)
302 .unwrap_or_default()
303 .as_nanos()
304 )
305 .as_bytes(),
306 )
307 .to_hex();
308 let handle_id = format!("deno-{}", &hash[..16]);
309 let host_port = self.allocate_host_port().await?;
310
311 tracing::debug!(
312 target: "camel_function::container",
313 %handle_id,
314 image = %self.image,
315 "spawning container"
316 );
317
318 let labels = std::collections::HashMap::from([
319 ("camel.function.runner".to_string(), "true".to_string()),
320 ("camel.function.context".to_string(), handle_id.clone()),
321 (
322 "camel.function.instance".to_string(),
323 self.instance_id.clone(),
324 ),
325 ]);
326
327 let config = bollard::models::ContainerCreateBody {
328 image: Some(self.image.clone()),
329 env: Some(vec![
330 "PORT=8080".to_string(),
331 "DENO_NO_PROMPT=1".to_string(),
332 ]),
333 labels: Some(labels),
334 exposed_ports: Some(vec!["8080/tcp".to_string()]),
335 host_config: Some(bollard::models::HostConfig {
336 port_bindings: Some(std::collections::HashMap::from([(
337 "8080/tcp".to_string(),
338 Some(vec![bollard::models::PortBinding {
339 host_ip: Some("127.0.0.1".to_string()),
340 host_port: Some(host_port.to_string()),
341 }]),
342 )])),
343 init: Some(true),
344 auto_remove: Some(false),
345 ..Default::default()
346 }),
347 ..Default::default()
348 };
349
350 let create_opts = bollard::query_parameters::CreateContainerOptions {
351 name: Some(handle_id.clone()),
352 ..Default::default()
353 };
354
355 let create_result = self
356 .docker
357 .create_container(Some(create_opts), config)
358 .await
359 .map_err(|e| ProviderError::SpawnFailed(format!("create container: {e}")))?;
360
361 let container_id = create_result.id;
362
363 if let Err(e) = self.docker.start_container(&container_id, None).await {
364 let _ = self.stop_and_remove_container(&container_id).await;
365 return Err(ProviderError::SpawnFailed(format!("start container: {e}")));
366 }
367
368 let endpoint = format!("http://127.0.0.1:{host_port}");
369
370 self.spawn_log_forwarder(container_id.clone());
371
372 self.containers_by_handle.insert(
373 handle_id.clone(),
374 ContainerEntry {
375 container_id,
376 endpoint,
377 host_port,
378 },
379 );
380
381 Ok(RunnerHandle {
382 id: handle_id,
383 state: Arc::new(std::sync::Mutex::new(crate::pool::RunnerState::Booting)),
384 cancel: CancellationToken::new(),
385 })
386 }
387
388 async fn shutdown(&self, handle: RunnerHandle) -> Result<(), ProviderError> {
389 handle.cancel.cancel();
390 let entry = match self.containers_by_handle.remove(&handle.id) {
391 Some((_, entry)) => entry,
392 None => return Ok(()),
393 };
394 let _ = self.client.shutdown(&entry.endpoint).await;
395 self.stop_and_remove_container(&entry.container_id).await;
396 Ok(())
397 }
398
399 async fn health(&self, handle: &RunnerHandle) -> Result<HealthReport, ProviderError> {
400 let endpoint = self
401 .containers_by_handle
402 .get(&handle.id)
403 .ok_or_else(|| ProviderError::HealthFailed(format!("unknown handle {}", handle.id)))?
404 .endpoint
405 .clone();
406 self.client.health(&endpoint).await
407 }
408
409 async fn register(
410 &self,
411 handle: &RunnerHandle,
412 def: &FunctionDefinition,
413 ) -> Result<(), ProviderError> {
414 let endpoint = self
415 .containers_by_handle
416 .get(&handle.id)
417 .ok_or_else(|| ProviderError::RegisterFailed(format!("unknown handle {}", handle.id)))?
418 .endpoint
419 .clone();
420 self.client.register(&endpoint, def).await
421 }
422
423 async fn unregister(
424 &self,
425 handle: &RunnerHandle,
426 id: &FunctionId,
427 ) -> Result<(), ProviderError> {
428 let endpoint = self
429 .containers_by_handle
430 .get(&handle.id)
431 .ok_or_else(|| {
432 ProviderError::UnregisterFailed(format!("unknown handle {}", handle.id))
433 })?
434 .endpoint
435 .clone();
436 self.client.unregister(&endpoint, id).await
437 }
438
439 async fn invoke(
440 &self,
441 handle: &RunnerHandle,
442 id: &FunctionId,
443 ex: &Exchange,
444 timeout: std::time::Duration,
445 ) -> Result<ExchangePatch, ProviderError> {
446 let endpoint = self
447 .containers_by_handle
448 .get(&handle.id)
449 .ok_or_else(|| ProviderError::InvokeFailed(format!("unknown handle {}", handle.id)))?
450 .endpoint
451 .clone();
452 let resp = self.client.invoke(&endpoint, id, ex, timeout).await?;
453 if resp.ok {
454 let patch = resp.patch.unwrap_or_default();
455 Ok(patch.to_exchange_patch())
456 } else {
457 let err = resp.error.unwrap_or_else(|| crate::protocol::ErrorWire {
458 kind: "unknown".into(),
459 message: "no error body".into(),
460 stack: None,
461 });
462 Err(ProviderError::InvokeFailed(format!(
463 "{}: {}",
464 err.kind, err.message
465 )))
466 }
467 }
468}
469
470impl Drop for ContainerProvider {
471 fn drop(&mut self) {
472 if self.containers_by_handle.is_empty() {
473 return;
474 }
475 let docker = self.docker.clone();
476 let container_ids: Vec<String> = self
477 .containers_by_handle
478 .iter()
479 .map(|e| e.container_id.clone())
480 .collect();
481 match tokio::runtime::Handle::try_current() {
482 Ok(handle) => {
483 drop(handle.spawn(async move {
484 for id in container_ids {
485 let _ = docker.stop_container(&id, None).await;
486 let _ = docker
487 .remove_container(
488 &id,
489 Some(bollard::query_parameters::RemoveContainerOptions {
490 force: true,
491 ..Default::default()
492 }),
493 )
494 .await;
495 }
496 }));
497 }
498 Err(_) => {
499 tracing::warn!(target: "camel_function::container", "container cleanup skipped: no tokio runtime");
500 }
501 }
502 }
503}