Skip to main content

camel_function/
service.rs

1use crate::config::FunctionConfig;
2use crate::invoker::DefaultFunctionInvoker;
3use crate::pool::{RunnerPool, RunnerPoolKey, RunnerState};
4use crate::provider::{FunctionProvider, HealthReport, ProviderError};
5use camel_api::function::{FunctionDefinition, FunctionId, FunctionInvoker};
6use camel_api::{CamelError, Lifecycle, ServiceStatus};
7use std::sync::Arc;
8use std::sync::atomic::{AtomicU8, Ordering};
9
10const STATUS_STOPPED: u8 = 0;
11const STATUS_STARTED: u8 = 1;
12const STATUS_FAILED: u8 = 2;
13
14pub struct FunctionRuntimeService {
15    config: FunctionConfig,
16    provider: Arc<dyn FunctionProvider>,
17    container_provider: Option<Arc<crate::provider::container::ContainerProvider>>,
18    pub(crate) invoker: Arc<DefaultFunctionInvoker>,
19    status: Arc<AtomicU8>,
20}
21
22impl FunctionRuntimeService {
23    pub(crate) fn new(config: FunctionConfig, provider: Arc<dyn FunctionProvider>) -> Self {
24        let pool = Arc::new(RunnerPool::new());
25        let invoker = Arc::new(DefaultFunctionInvoker::new(
26            Arc::clone(&pool),
27            Arc::clone(&provider),
28            config.clone(),
29        ));
30        Self {
31            config,
32            provider,
33            container_provider: None,
34            invoker,
35            status: Arc::new(AtomicU8::new(STATUS_STOPPED)),
36        }
37    }
38
39    pub fn with_fake_provider(
40        config: FunctionConfig,
41        provider: Arc<crate::provider::fake::FakeProvider>,
42    ) -> Self {
43        Self::new(config, provider as Arc<dyn FunctionProvider>)
44    }
45
46    pub fn with_container_provider(
47        config: FunctionConfig,
48        provider: crate::provider::container::ContainerProvider,
49    ) -> Self {
50        let arc = Arc::new(provider);
51        let mut svc = Self::new(config, arc.clone() as Arc<dyn FunctionProvider>);
52        svc.container_provider = Some(arc);
53        svc
54    }
55
56    pub fn with_default_container_provider(
57        config: FunctionConfig,
58    ) -> Result<Self, crate::provider::ProviderError> {
59        let provider = crate::provider::container::ContainerProvider::builder().build()?;
60        Ok(Self::with_container_provider(config, provider))
61    }
62
63    pub fn invoker(&self) -> Arc<dyn FunctionInvoker> {
64        self.invoker.clone() as Arc<dyn FunctionInvoker>
65    }
66
67    pub fn provider(&self) -> &crate::provider::container::ContainerProvider {
68        self.container_provider
69            .as_ref()
70            .expect("not a container provider")
71    }
72
73    pub fn runner_state(&self, runtime: &str) -> Option<RunnerState> {
74        let key = RunnerPoolKey {
75            runtime: runtime.to_string(),
76        };
77        self.invoker
78            .pool
79            .handles
80            .get(&key)
81            .map(|h| h.state.lock().expect("state").clone())
82    }
83
84    pub fn force_runner_failed(&self, runtime: &str, reason: &str) {
85        let key = RunnerPoolKey {
86            runtime: runtime.to_string(),
87        };
88        if let Some(handle) = self.invoker.pool.handles.get(&key) {
89            *handle.state.lock().expect("state") = RunnerState::Failed {
90                reason: reason.to_string(),
91            };
92        }
93    }
94
95    pub(crate) async fn wait_until_healthy(
96        &self,
97        handle: &crate::pool::RunnerHandle,
98    ) -> Result<(), ProviderError> {
99        let deadline = tokio::time::Instant::now() + self.config.boot_timeout;
100        loop {
101            if tokio::time::Instant::now() > deadline {
102                return Err(ProviderError::BootTimeout);
103            }
104            match self.provider.health(handle).await {
105                Ok(HealthReport::Healthy) => {
106                    *handle.state.lock().expect("state") = RunnerState::Healthy;
107                    return Ok(());
108                }
109                Ok(HealthReport::Unhealthy(reason)) => {
110                    *handle.state.lock().expect("state") = RunnerState::Unhealthy {
111                        since: std::time::Instant::now(),
112                        reason,
113                    };
114                    tokio::time::sleep(self.config.health_interval).await;
115                }
116                Err(_) => {
117                    tokio::time::sleep(self.config.health_interval).await;
118                }
119            }
120        }
121    }
122
123    pub(crate) fn spawn_health_task(&self, handle: crate::pool::RunnerHandle) {
124        let provider = Arc::clone(&self.provider);
125        let interval = self.config.health_interval;
126        tokio::spawn(async move {
127            let mut ticks = tokio::time::interval(interval);
128            let mut unhealthy_count = 0u8;
129            loop {
130                tokio::select! {
131                    _ = handle.cancel.cancelled() => break,
132                    _ = ticks.tick() => {
133                        match provider.health(&handle).await {
134                            Ok(HealthReport::Healthy) => {
135                                unhealthy_count = 0;
136                                *handle.state.lock().expect("state") = RunnerState::Healthy;
137                            }
138                            Ok(HealthReport::Unhealthy(reason)) => {
139                                unhealthy_count = unhealthy_count.saturating_add(1);
140                                if unhealthy_count >= 2 {
141                                    *handle.state.lock().expect("state") = RunnerState::Unhealthy { since: std::time::Instant::now(), reason };
142                                }
143                            }
144                            Err(err) => {
145                                *handle.state.lock().expect("state") = RunnerState::Failed { reason: err.to_string() };
146                            }
147                        }
148                    }
149                }
150            }
151        });
152    }
153
154    pub(crate) async fn rollback_start(
155        &self,
156        spawned: &[(RunnerPoolKey, crate::pool::RunnerHandle)],
157        registered_refs: &[((FunctionId, Option<String>), RunnerPoolKey)],
158        pending: &[(FunctionDefinition, Option<String>)],
159    ) {
160        for (ref_key, _pool_key) in registered_refs {
161            self.invoker.pool.ref_counts.remove(ref_key);
162            self.invoker.pool.function_to_key.remove(ref_key);
163            let still_used = self
164                .invoker
165                .pool
166                .function_to_key
167                .iter()
168                .any(|kv| kv.key().0 == ref_key.0);
169            if !still_used {
170                self.invoker
171                    .function_timeouts
172                    .lock()
173                    .expect("function_timeouts")
174                    .remove(&ref_key.0);
175            }
176        }
177        for (key, handle) in spawned {
178            self.invoker.pool.handles.remove(key);
179            handle.cancel.cancel();
180            let _ = self.provider.shutdown(handle.clone()).await;
181        }
182        self.invoker
183            .pending
184            .lock()
185            .expect("pending")
186            .extend(pending.iter().cloned());
187    }
188}
189
190#[async_trait::async_trait]
191impl Lifecycle for FunctionRuntimeService {
192    fn name(&self) -> &str {
193        "function-runtime"
194    }
195
196    fn status(&self) -> ServiceStatus {
197        match self.status.load(Ordering::SeqCst) {
198            STATUS_STOPPED => ServiceStatus::Stopped,
199            STATUS_STARTED => ServiceStatus::Started,
200            _ => ServiceStatus::Failed,
201        }
202    }
203
204    async fn start(&mut self) -> Result<(), CamelError> {
205        if self.status.load(Ordering::SeqCst) == STATUS_STARTED {
206            return Ok(());
207        }
208        let pending = {
209            let mut lock = self.invoker.pending.lock().expect("pending");
210            std::mem::take(&mut *lock)
211        };
212        let mut grouped: std::collections::HashMap<
213            RunnerPoolKey,
214            Vec<(FunctionDefinition, Option<String>)>,
215        > = std::collections::HashMap::new();
216        for (def, route_id) in pending.iter().cloned() {
217            grouped
218                .entry(RunnerPoolKey {
219                    runtime: def.runtime.clone(),
220                })
221                .or_default()
222                .push((def, route_id));
223        }
224        let mut spawned: Vec<(RunnerPoolKey, crate::pool::RunnerHandle)> = Vec::new();
225        let mut registered_refs: Vec<((FunctionId, Option<String>), RunnerPoolKey)> = Vec::new();
226        for (key, defs) in grouped {
227            let handle = match self.provider.spawn(&key).await {
228                Ok(h) => h,
229                Err(e) => {
230                    self.rollback_start(&spawned, &registered_refs, &pending)
231                        .await;
232                    self.status.store(STATUS_FAILED, Ordering::SeqCst);
233                    return Err(CamelError::Config(format!("function: spawn failed: {e}")));
234                }
235            };
236            match self.wait_until_healthy(&handle).await {
237                Ok(()) => {}
238                Err(e) => {
239                    handle.cancel.cancel();
240                    let _ = self.provider.shutdown(handle).await;
241                    self.rollback_start(&spawned, &registered_refs, &pending)
242                        .await;
243                    self.status.store(STATUS_FAILED, Ordering::SeqCst);
244                    return Err(CamelError::Config(format!("function: boot timeout: {e}")));
245                }
246            }
247            self.invoker
248                .pool
249                .handles
250                .insert(key.clone(), handle.clone());
251            self.spawn_health_task(handle.clone());
252            spawned.push((key.clone(), handle.clone()));
253            for (def, route_id) in defs {
254                if let Err(err) = self.provider.register(&handle, &def).await {
255                    self.rollback_start(&spawned, &registered_refs, &pending)
256                        .await;
257                    self.status.store(STATUS_FAILED, Ordering::SeqCst);
258                    return Err(CamelError::Config(format!(
259                        "function: register failed: {err}"
260                    )));
261                }
262                let ref_key = (def.id.clone(), route_id.clone());
263                self.invoker.pool.ref_counts.insert(ref_key.clone(), 1);
264                self.invoker
265                    .pool
266                    .function_to_key
267                    .insert(ref_key.clone(), key.clone());
268                registered_refs.push((ref_key, key.clone()));
269            }
270        }
271        self.invoker.started.store(true, Ordering::SeqCst);
272        self.status.store(STATUS_STARTED, Ordering::SeqCst);
273        Ok(())
274    }
275
276    async fn stop(&mut self) -> Result<(), CamelError> {
277        let handles: Vec<_> = self
278            .invoker
279            .pool
280            .handles
281            .iter()
282            .map(|h| h.clone())
283            .collect();
284        self.invoker.pool.handles.clear();
285        self.invoker.pool.ref_counts.clear();
286        self.invoker.pool.function_to_key.clear();
287        self.invoker
288            .function_timeouts
289            .lock()
290            .expect("function_timeouts")
291            .clear();
292        for handle in handles {
293            handle.cancel.cancel();
294            self.provider
295                .shutdown(handle)
296                .await
297                .map_err(|e| CamelError::ProcessorError(e.to_string()))?;
298        }
299        self.invoker.started.store(false, Ordering::SeqCst);
300        self.status.store(STATUS_STOPPED, Ordering::SeqCst);
301        Ok(())
302    }
303
304    fn as_function_invoker(&self) -> Option<Arc<dyn FunctionInvoker>> {
305        Some(self.invoker.clone() as Arc<dyn FunctionInvoker>)
306    }
307}