Skip to main content

camel_function/
service.rs

1use crate::config::FunctionConfig;
2use crate::invoker::DefaultFunctionInvoker;
3use crate::pool::{RunnerPool, RunnerPoolKey, RunnerState};
4use crate::provider::{FunctionHealthStatus, FunctionProvider, ProviderError};
5use camel_api::function::{FunctionDefinition, FunctionId, FunctionInvoker};
6use camel_api::{CamelError, Lifecycle, ServiceStatus};
7use std::sync::Arc;
8use std::sync::atomic::{AtomicU8, Ordering};
9
10const STATUS_STOPPED: u8 = 0;
11const STATUS_STARTED: u8 = 1;
12const STATUS_FAILED: u8 = 2;
13
14pub struct FunctionRuntimeService {
15    config: FunctionConfig,
16    provider: Arc<dyn FunctionProvider>,
17    container_provider: Option<Arc<crate::provider::container::ContainerProvider>>,
18    pub(crate) invoker: Arc<DefaultFunctionInvoker>,
19    status: Arc<AtomicU8>,
20}
21
22impl FunctionRuntimeService {
23    pub(crate) fn new(config: FunctionConfig, provider: Arc<dyn FunctionProvider>) -> Self {
24        let pool = Arc::new(RunnerPool::new());
25        let invoker = Arc::new(DefaultFunctionInvoker::new(
26            Arc::clone(&pool),
27            Arc::clone(&provider),
28            config.clone(),
29        ));
30        Self {
31            config,
32            provider,
33            container_provider: None,
34            invoker,
35            status: Arc::new(AtomicU8::new(STATUS_STOPPED)),
36        }
37    }
38
39    pub fn with_fake_provider(
40        config: FunctionConfig,
41        provider: Arc<crate::provider::fake::FakeProvider>,
42    ) -> Self {
43        Self::new(config, provider as Arc<dyn FunctionProvider>)
44    }
45
46    pub fn with_container_provider(
47        config: FunctionConfig,
48        provider: crate::provider::container::ContainerProvider,
49    ) -> Self {
50        let arc = Arc::new(provider);
51        let mut svc = Self::new(config, arc.clone() as Arc<dyn FunctionProvider>);
52        svc.container_provider = Some(arc);
53        svc
54    }
55
56    pub fn with_default_container_provider(
57        config: FunctionConfig,
58    ) -> Result<Self, crate::provider::ProviderError> {
59        let provider = crate::provider::container::ContainerProvider::builder().build()?;
60        Ok(Self::with_container_provider(config, provider))
61    }
62
63    pub fn invoker(&self) -> Arc<dyn FunctionInvoker> {
64        self.invoker.clone() as Arc<dyn FunctionInvoker>
65    }
66
67    pub fn provider(&self) -> Result<&crate::provider::container::ContainerProvider, CamelError> {
68        self.container_provider
69            .as_ref()
70            .map(|arc| arc.as_ref())
71            .ok_or_else(|| {
72                CamelError::Config("unsupported provider type: not a container provider".into())
73            })
74    }
75
76    pub fn runner_state(&self, runtime: &str) -> Option<RunnerState> {
77        let key = RunnerPoolKey {
78            runtime: runtime.to_string(),
79        };
80        self.invoker
81            .pool
82            .handles
83            .get(&key)
84            .map(|h| h.state.lock().expect("state").clone()) // allow-unwrap
85    }
86
87    pub fn force_runner_failed(&self, runtime: &str, reason: &str) {
88        let key = RunnerPoolKey {
89            runtime: runtime.to_string(),
90        };
91        if let Some(handle) = self.invoker.pool.handles.get(&key) {
92            *handle.state.lock().expect("state") = RunnerState::Failed {
93                // allow-unwrap
94                reason: reason.to_string(),
95            };
96        }
97    }
98
99    pub(crate) async fn wait_until_healthy(
100        &self,
101        handle: &crate::pool::RunnerHandle,
102    ) -> Result<(), ProviderError> {
103        let deadline = tokio::time::Instant::now() + self.config.boot_timeout;
104        loop {
105            if tokio::time::Instant::now() > deadline {
106                return Err(ProviderError::BootTimeout);
107            }
108            match self.provider.health(handle).await {
109                Ok(FunctionHealthStatus::Healthy) => {
110                    *handle.state.lock().expect("state") = RunnerState::Healthy; // allow-unwrap
111                    return Ok(());
112                }
113                Ok(FunctionHealthStatus::Unhealthy(reason)) => {
114                    *handle.state.lock().expect("state") = RunnerState::Unhealthy {
115                        // allow-unwrap
116                        since: std::time::Instant::now(),
117                        reason,
118                    };
119                    tokio::time::sleep(self.config.health_interval).await;
120                }
121                Err(_) => {
122                    tokio::time::sleep(self.config.health_interval).await;
123                }
124            }
125        }
126    }
127
128    pub(crate) fn spawn_health_task(&self, handle: crate::pool::RunnerHandle) {
129        let provider = Arc::clone(&self.provider);
130        let interval = self.config.health_interval;
131        tokio::spawn(async move {
132            let mut ticks = tokio::time::interval(interval);
133            let mut unhealthy_count = 0u8;
134            loop {
135                tokio::select! {
136                    _ = handle.cancel.cancelled() => break,
137                    _ = ticks.tick() => {
138                        match provider.health(&handle).await {
139                            Ok(FunctionHealthStatus::Healthy) => {
140                                unhealthy_count = 0;
141                                *handle.state.lock().expect("state") = RunnerState::Healthy; // allow-unwrap
142                            }
143                            Ok(FunctionHealthStatus::Unhealthy(reason)) => {
144                                unhealthy_count = unhealthy_count.saturating_add(1);
145                                if unhealthy_count >= 2 {
146                                    *handle.state.lock().expect("state") = RunnerState::Unhealthy { since: std::time::Instant::now(), reason }; // allow-unwrap
147                                }
148                            }
149                            Err(err) => {
150                                *handle.state.lock().expect("state") = RunnerState::Failed { reason: err.to_string() }; // allow-unwrap
151                            }
152                        }
153                    }
154                }
155            }
156        });
157    }
158
159    pub(crate) async fn rollback_start(
160        &self,
161        spawned: &[(RunnerPoolKey, crate::pool::RunnerHandle)],
162        registered_refs: &[((FunctionId, Option<String>), RunnerPoolKey)],
163        pending: &[(FunctionDefinition, Option<String>)],
164    ) {
165        for (ref_key, _pool_key) in registered_refs {
166            self.invoker.pool.ref_counts.remove(ref_key);
167            self.invoker.pool.function_to_key.remove(ref_key);
168            let still_used = self
169                .invoker
170                .pool
171                .function_to_key
172                .iter()
173                .any(|kv| kv.key().0 == ref_key.0);
174            if !still_used {
175                self.invoker
176                    .function_timeouts
177                    .lock()
178                    .expect("function_timeouts") // allow-unwrap
179                    .remove(&ref_key.0);
180            }
181        }
182        for (key, handle) in spawned {
183            self.invoker.pool.handles.remove(key);
184            handle.cancel.cancel();
185            let _ = self.provider.shutdown(handle.clone()).await;
186        }
187        self.invoker
188            .pending
189            .lock()
190            .expect("pending") // allow-unwrap
191            .extend(pending.iter().cloned());
192    }
193}
194
195#[async_trait::async_trait]
196impl Lifecycle for FunctionRuntimeService {
197    fn name(&self) -> &str {
198        "function-runtime"
199    }
200
201    fn status(&self) -> ServiceStatus {
202        match self.status.load(Ordering::SeqCst) {
203            STATUS_STOPPED => ServiceStatus::Stopped,
204            STATUS_STARTED => ServiceStatus::Started,
205            _ => ServiceStatus::Failed,
206        }
207    }
208
209    async fn start(&mut self) -> Result<(), CamelError> {
210        self.config.validate()?;
211        if self.status.load(Ordering::SeqCst) == STATUS_STARTED {
212            return Ok(());
213        }
214        let pending = {
215            let mut lock = self.invoker.pending.lock().expect("pending"); // allow-unwrap
216            std::mem::take(&mut *lock)
217        };
218        let mut grouped: std::collections::HashMap<
219            RunnerPoolKey,
220            Vec<(FunctionDefinition, Option<String>)>,
221        > = std::collections::HashMap::new();
222        for (def, route_id) in pending.iter().cloned() {
223            grouped
224                .entry(RunnerPoolKey {
225                    runtime: def.runtime.clone(),
226                })
227                .or_default()
228                .push((def, route_id));
229        }
230        let mut spawned: Vec<(RunnerPoolKey, crate::pool::RunnerHandle)> = Vec::new();
231        let mut registered_refs: Vec<((FunctionId, Option<String>), RunnerPoolKey)> = Vec::new();
232        for (key, defs) in grouped {
233            let handle = match self.provider.spawn(&key).await {
234                Ok(h) => h,
235                Err(e) => {
236                    self.rollback_start(&spawned, &registered_refs, &pending)
237                        .await;
238                    self.status.store(STATUS_FAILED, Ordering::SeqCst);
239                    return Err(CamelError::Config(format!("function: spawn failed: {e}")));
240                }
241            };
242            match self.wait_until_healthy(&handle).await {
243                Ok(()) => {}
244                Err(e) => {
245                    handle.cancel.cancel();
246                    let _ = self.provider.shutdown(handle).await;
247                    self.rollback_start(&spawned, &registered_refs, &pending)
248                        .await;
249                    self.status.store(STATUS_FAILED, Ordering::SeqCst);
250                    return Err(CamelError::Config(format!("function: boot timeout: {e}")));
251                }
252            }
253            self.invoker
254                .pool
255                .handles
256                .insert(key.clone(), handle.clone());
257            self.spawn_health_task(handle.clone());
258            spawned.push((key.clone(), handle.clone()));
259            for (def, route_id) in defs {
260                if let Err(err) = self.provider.register(&handle, &def).await {
261                    self.rollback_start(&spawned, &registered_refs, &pending)
262                        .await;
263                    self.status.store(STATUS_FAILED, Ordering::SeqCst);
264                    return Err(CamelError::Config(format!(
265                        "function: register failed: {err}"
266                    )));
267                }
268                let ref_key = (def.id.clone(), route_id.clone());
269                self.invoker.pool.ref_counts.insert(ref_key.clone(), 1);
270                self.invoker
271                    .pool
272                    .function_to_key
273                    .insert(ref_key.clone(), key.clone());
274                registered_refs.push((ref_key, key.clone()));
275            }
276        }
277        self.invoker.started.store(true, Ordering::SeqCst);
278        self.status.store(STATUS_STARTED, Ordering::SeqCst);
279        Ok(())
280    }
281
282    async fn stop(&mut self) -> Result<(), CamelError> {
283        let handles: Vec<_> = self
284            .invoker
285            .pool
286            .handles
287            .iter()
288            .map(|h| h.clone())
289            .collect();
290        self.invoker.pool.handles.clear();
291        self.invoker.pool.ref_counts.clear();
292        self.invoker.pool.function_to_key.clear();
293        self.invoker
294            .function_timeouts
295            .lock()
296            .expect("function_timeouts") // allow-unwrap
297            .clear();
298        for handle in handles {
299            handle.cancel.cancel();
300            self.provider
301                .shutdown(handle)
302                .await
303                .map_err(|e| CamelError::ProcessorError(e.to_string()))?;
304        }
305        self.invoker.started.store(false, Ordering::SeqCst);
306        self.status.store(STATUS_STOPPED, Ordering::SeqCst);
307        Ok(())
308    }
309
310    fn as_function_invoker(&self) -> Option<Arc<dyn FunctionInvoker>> {
311        Some(self.invoker.clone() as Arc<dyn FunctionInvoker>)
312    }
313}