Skip to main content

camel_function/
service.rs

1use crate::config::FunctionConfig;
2use crate::invoker::DefaultFunctionInvoker;
3use crate::pool::{RunnerPool, RunnerPoolKey, RunnerState};
4use crate::provider::{FunctionProvider, HealthReport, ProviderError};
5use camel_api::function::{FunctionDefinition, FunctionId, FunctionInvoker};
6use camel_api::{CamelError, Lifecycle, ServiceStatus};
7use std::sync::Arc;
8use std::sync::atomic::{AtomicU8, Ordering};
9
10const STATUS_STOPPED: u8 = 0;
11const STATUS_STARTED: u8 = 1;
12const STATUS_FAILED: u8 = 2;
13
14pub struct FunctionRuntimeService {
15    config: FunctionConfig,
16    provider: Arc<dyn FunctionProvider>,
17    container_provider: Option<Arc<crate::provider::container::ContainerProvider>>,
18    pub(crate) invoker: Arc<DefaultFunctionInvoker>,
19    status: Arc<AtomicU8>,
20}
21
22impl FunctionRuntimeService {
23    pub(crate) fn new(config: FunctionConfig, provider: Arc<dyn FunctionProvider>) -> Self {
24        let pool = Arc::new(RunnerPool::new());
25        let invoker = Arc::new(DefaultFunctionInvoker::new(
26            Arc::clone(&pool),
27            Arc::clone(&provider),
28            config.clone(),
29        ));
30        Self {
31            config,
32            provider,
33            container_provider: None,
34            invoker,
35            status: Arc::new(AtomicU8::new(STATUS_STOPPED)),
36        }
37    }
38
39    pub fn with_fake_provider(
40        config: FunctionConfig,
41        provider: Arc<crate::provider::fake::FakeProvider>,
42    ) -> Self {
43        Self::new(config, provider as Arc<dyn FunctionProvider>)
44    }
45
46    pub fn with_container_provider(
47        config: FunctionConfig,
48        provider: crate::provider::container::ContainerProvider,
49    ) -> Self {
50        let arc = Arc::new(provider);
51        let mut svc = Self::new(config, arc.clone() as Arc<dyn FunctionProvider>);
52        svc.container_provider = Some(arc);
53        svc
54    }
55
56    pub fn with_default_container_provider(
57        config: FunctionConfig,
58    ) -> Result<Self, crate::provider::ProviderError> {
59        let provider = crate::provider::container::ContainerProvider::builder().build()?;
60        Ok(Self::with_container_provider(config, provider))
61    }
62
63    pub fn invoker(&self) -> Arc<dyn FunctionInvoker> {
64        self.invoker.clone() as Arc<dyn FunctionInvoker>
65    }
66
67    pub fn provider(&self) -> &crate::provider::container::ContainerProvider {
68        self.container_provider
69            .as_ref()
70            .expect("not a container provider") // allow-unwrap
71    }
72
73    pub fn runner_state(&self, runtime: &str) -> Option<RunnerState> {
74        let key = RunnerPoolKey {
75            runtime: runtime.to_string(),
76        };
77        self.invoker
78            .pool
79            .handles
80            .get(&key)
81            .map(|h| h.state.lock().expect("state").clone()) // allow-unwrap
82    }
83
84    pub fn force_runner_failed(&self, runtime: &str, reason: &str) {
85        let key = RunnerPoolKey {
86            runtime: runtime.to_string(),
87        };
88        if let Some(handle) = self.invoker.pool.handles.get(&key) {
89            *handle.state.lock().expect("state") = RunnerState::Failed {
90                // allow-unwrap
91                reason: reason.to_string(),
92            };
93        }
94    }
95
96    pub(crate) async fn wait_until_healthy(
97        &self,
98        handle: &crate::pool::RunnerHandle,
99    ) -> Result<(), ProviderError> {
100        let deadline = tokio::time::Instant::now() + self.config.boot_timeout;
101        loop {
102            if tokio::time::Instant::now() > deadline {
103                return Err(ProviderError::BootTimeout);
104            }
105            match self.provider.health(handle).await {
106                Ok(HealthReport::Healthy) => {
107                    *handle.state.lock().expect("state") = RunnerState::Healthy; // allow-unwrap
108                    return Ok(());
109                }
110                Ok(HealthReport::Unhealthy(reason)) => {
111                    *handle.state.lock().expect("state") = RunnerState::Unhealthy {
112                        // allow-unwrap
113                        since: std::time::Instant::now(),
114                        reason,
115                    };
116                    tokio::time::sleep(self.config.health_interval).await;
117                }
118                Err(_) => {
119                    tokio::time::sleep(self.config.health_interval).await;
120                }
121            }
122        }
123    }
124
125    pub(crate) fn spawn_health_task(&self, handle: crate::pool::RunnerHandle) {
126        let provider = Arc::clone(&self.provider);
127        let interval = self.config.health_interval;
128        tokio::spawn(async move {
129            let mut ticks = tokio::time::interval(interval);
130            let mut unhealthy_count = 0u8;
131            loop {
132                tokio::select! {
133                    _ = handle.cancel.cancelled() => break,
134                    _ = ticks.tick() => {
135                        match provider.health(&handle).await {
136                            Ok(HealthReport::Healthy) => {
137                                unhealthy_count = 0;
138                                *handle.state.lock().expect("state") = RunnerState::Healthy; // allow-unwrap
139                            }
140                            Ok(HealthReport::Unhealthy(reason)) => {
141                                unhealthy_count = unhealthy_count.saturating_add(1);
142                                if unhealthy_count >= 2 {
143                                    *handle.state.lock().expect("state") = RunnerState::Unhealthy { since: std::time::Instant::now(), reason }; // allow-unwrap
144                                }
145                            }
146                            Err(err) => {
147                                *handle.state.lock().expect("state") = RunnerState::Failed { reason: err.to_string() }; // allow-unwrap
148                            }
149                        }
150                    }
151                }
152            }
153        });
154    }
155
156    pub(crate) async fn rollback_start(
157        &self,
158        spawned: &[(RunnerPoolKey, crate::pool::RunnerHandle)],
159        registered_refs: &[((FunctionId, Option<String>), RunnerPoolKey)],
160        pending: &[(FunctionDefinition, Option<String>)],
161    ) {
162        for (ref_key, _pool_key) in registered_refs {
163            self.invoker.pool.ref_counts.remove(ref_key);
164            self.invoker.pool.function_to_key.remove(ref_key);
165            let still_used = self
166                .invoker
167                .pool
168                .function_to_key
169                .iter()
170                .any(|kv| kv.key().0 == ref_key.0);
171            if !still_used {
172                self.invoker
173                    .function_timeouts
174                    .lock()
175                    .expect("function_timeouts") // allow-unwrap
176                    .remove(&ref_key.0);
177            }
178        }
179        for (key, handle) in spawned {
180            self.invoker.pool.handles.remove(key);
181            handle.cancel.cancel();
182            let _ = self.provider.shutdown(handle.clone()).await;
183        }
184        self.invoker
185            .pending
186            .lock()
187            .expect("pending") // allow-unwrap
188            .extend(pending.iter().cloned());
189    }
190}
191
192#[async_trait::async_trait]
193impl Lifecycle for FunctionRuntimeService {
194    fn name(&self) -> &str {
195        "function-runtime"
196    }
197
198    fn status(&self) -> ServiceStatus {
199        match self.status.load(Ordering::SeqCst) {
200            STATUS_STOPPED => ServiceStatus::Stopped,
201            STATUS_STARTED => ServiceStatus::Started,
202            _ => ServiceStatus::Failed,
203        }
204    }
205
206    async fn start(&mut self) -> Result<(), CamelError> {
207        if self.status.load(Ordering::SeqCst) == STATUS_STARTED {
208            return Ok(());
209        }
210        let pending = {
211            let mut lock = self.invoker.pending.lock().expect("pending"); // allow-unwrap
212            std::mem::take(&mut *lock)
213        };
214        let mut grouped: std::collections::HashMap<
215            RunnerPoolKey,
216            Vec<(FunctionDefinition, Option<String>)>,
217        > = std::collections::HashMap::new();
218        for (def, route_id) in pending.iter().cloned() {
219            grouped
220                .entry(RunnerPoolKey {
221                    runtime: def.runtime.clone(),
222                })
223                .or_default()
224                .push((def, route_id));
225        }
226        let mut spawned: Vec<(RunnerPoolKey, crate::pool::RunnerHandle)> = Vec::new();
227        let mut registered_refs: Vec<((FunctionId, Option<String>), RunnerPoolKey)> = Vec::new();
228        for (key, defs) in grouped {
229            let handle = match self.provider.spawn(&key).await {
230                Ok(h) => h,
231                Err(e) => {
232                    self.rollback_start(&spawned, &registered_refs, &pending)
233                        .await;
234                    self.status.store(STATUS_FAILED, Ordering::SeqCst);
235                    return Err(CamelError::Config(format!("function: spawn failed: {e}")));
236                }
237            };
238            match self.wait_until_healthy(&handle).await {
239                Ok(()) => {}
240                Err(e) => {
241                    handle.cancel.cancel();
242                    let _ = self.provider.shutdown(handle).await;
243                    self.rollback_start(&spawned, &registered_refs, &pending)
244                        .await;
245                    self.status.store(STATUS_FAILED, Ordering::SeqCst);
246                    return Err(CamelError::Config(format!("function: boot timeout: {e}")));
247                }
248            }
249            self.invoker
250                .pool
251                .handles
252                .insert(key.clone(), handle.clone());
253            self.spawn_health_task(handle.clone());
254            spawned.push((key.clone(), handle.clone()));
255            for (def, route_id) in defs {
256                if let Err(err) = self.provider.register(&handle, &def).await {
257                    self.rollback_start(&spawned, &registered_refs, &pending)
258                        .await;
259                    self.status.store(STATUS_FAILED, Ordering::SeqCst);
260                    return Err(CamelError::Config(format!(
261                        "function: register failed: {err}"
262                    )));
263                }
264                let ref_key = (def.id.clone(), route_id.clone());
265                self.invoker.pool.ref_counts.insert(ref_key.clone(), 1);
266                self.invoker
267                    .pool
268                    .function_to_key
269                    .insert(ref_key.clone(), key.clone());
270                registered_refs.push((ref_key, key.clone()));
271            }
272        }
273        self.invoker.started.store(true, Ordering::SeqCst);
274        self.status.store(STATUS_STARTED, Ordering::SeqCst);
275        Ok(())
276    }
277
278    async fn stop(&mut self) -> Result<(), CamelError> {
279        let handles: Vec<_> = self
280            .invoker
281            .pool
282            .handles
283            .iter()
284            .map(|h| h.clone())
285            .collect();
286        self.invoker.pool.handles.clear();
287        self.invoker.pool.ref_counts.clear();
288        self.invoker.pool.function_to_key.clear();
289        self.invoker
290            .function_timeouts
291            .lock()
292            .expect("function_timeouts") // allow-unwrap
293            .clear();
294        for handle in handles {
295            handle.cancel.cancel();
296            self.provider
297                .shutdown(handle)
298                .await
299                .map_err(|e| CamelError::ProcessorError(e.to_string()))?;
300        }
301        self.invoker.started.store(false, Ordering::SeqCst);
302        self.status.store(STATUS_STOPPED, Ordering::SeqCst);
303        Ok(())
304    }
305
306    fn as_function_invoker(&self) -> Option<Arc<dyn FunctionInvoker>> {
307        Some(self.invoker.clone() as Arc<dyn FunctionInvoker>)
308    }
309}