1use crate::config::FunctionConfig;
2use crate::invoker::DefaultFunctionInvoker;
3use crate::pool::{RunnerPool, RunnerPoolKey, RunnerState};
4use crate::provider::{FunctionHealthStatus, FunctionProvider, ProviderError};
5use camel_api::function::{FunctionDefinition, FunctionId, FunctionInvoker};
6use camel_api::{CamelError, Lifecycle, ServiceStatus};
7use std::sync::Arc;
8use std::sync::atomic::{AtomicU8, Ordering};
9
10const STATUS_STOPPED: u8 = 0;
11const STATUS_STARTED: u8 = 1;
12const STATUS_FAILED: u8 = 2;
13
14pub struct FunctionRuntimeService {
15 config: FunctionConfig,
16 provider: Arc<dyn FunctionProvider>,
17 container_provider: Option<Arc<crate::provider::container::ContainerProvider>>,
18 pub(crate) invoker: Arc<DefaultFunctionInvoker>,
19 status: Arc<AtomicU8>,
20}
21
22impl FunctionRuntimeService {
23 pub(crate) fn new(config: FunctionConfig, provider: Arc<dyn FunctionProvider>) -> Self {
24 let pool = Arc::new(RunnerPool::new());
25 let invoker = Arc::new(DefaultFunctionInvoker::new(
26 Arc::clone(&pool),
27 Arc::clone(&provider),
28 config.clone(),
29 ));
30 Self {
31 config,
32 provider,
33 container_provider: None,
34 invoker,
35 status: Arc::new(AtomicU8::new(STATUS_STOPPED)),
36 }
37 }
38
39 pub fn with_fake_provider(
40 config: FunctionConfig,
41 provider: Arc<crate::provider::fake::FakeProvider>,
42 ) -> Self {
43 Self::new(config, provider as Arc<dyn FunctionProvider>)
44 }
45
46 pub fn with_container_provider(
47 config: FunctionConfig,
48 provider: crate::provider::container::ContainerProvider,
49 ) -> Self {
50 let arc = Arc::new(provider);
51 let mut svc = Self::new(config, arc.clone() as Arc<dyn FunctionProvider>);
52 svc.container_provider = Some(arc);
53 svc
54 }
55
56 pub fn with_default_container_provider(
57 config: FunctionConfig,
58 ) -> Result<Self, crate::provider::ProviderError> {
59 let provider = crate::provider::container::ContainerProvider::builder().build()?;
60 Ok(Self::with_container_provider(config, provider))
61 }
62
63 pub fn invoker(&self) -> Arc<dyn FunctionInvoker> {
64 self.invoker.clone() as Arc<dyn FunctionInvoker>
65 }
66
67 pub fn provider(&self) -> Result<&crate::provider::container::ContainerProvider, CamelError> {
68 self.container_provider
69 .as_ref()
70 .map(|arc| arc.as_ref())
71 .ok_or_else(|| {
72 CamelError::Config("unsupported provider type: not a container provider".into())
73 })
74 }
75
76 pub fn runner_state(&self, runtime: &str) -> Option<RunnerState> {
77 let key = RunnerPoolKey {
78 runtime: runtime.to_string(),
79 };
80 self.invoker
81 .pool
82 .handles
83 .get(&key)
84 .map(|h| h.state.lock().expect("state").clone()) }
86
87 pub fn force_runner_failed(&self, runtime: &str, reason: &str) {
88 let key = RunnerPoolKey {
89 runtime: runtime.to_string(),
90 };
91 if let Some(handle) = self.invoker.pool.handles.get(&key) {
92 *handle.state.lock().expect("state") = RunnerState::Failed {
93 reason: reason.to_string(),
95 };
96 }
97 }
98
99 pub(crate) async fn wait_until_healthy(
100 &self,
101 handle: &crate::pool::RunnerHandle,
102 ) -> Result<(), ProviderError> {
103 let deadline = tokio::time::Instant::now() + self.config.boot_timeout;
104 loop {
105 if tokio::time::Instant::now() > deadline {
106 return Err(ProviderError::BootTimeout);
107 }
108 match self.provider.health(handle).await {
109 Ok(FunctionHealthStatus::Healthy) => {
110 *handle.state.lock().expect("state") = RunnerState::Healthy; return Ok(());
112 }
113 Ok(FunctionHealthStatus::Unhealthy(reason)) => {
114 *handle.state.lock().expect("state") = RunnerState::Unhealthy {
115 since: std::time::Instant::now(),
117 reason,
118 };
119 tokio::time::sleep(self.config.health_interval).await;
120 }
121 Err(_) => {
122 tokio::time::sleep(self.config.health_interval).await;
123 }
124 }
125 }
126 }
127
128 pub(crate) fn spawn_health_task(&self, handle: crate::pool::RunnerHandle) {
129 let provider = Arc::clone(&self.provider);
130 let interval = self.config.health_interval;
131 tokio::spawn(async move {
132 let mut ticks = tokio::time::interval(interval);
133 let mut unhealthy_count = 0u8;
134 loop {
135 tokio::select! {
136 _ = handle.cancel.cancelled() => break,
137 _ = ticks.tick() => {
138 match provider.health(&handle).await {
139 Ok(FunctionHealthStatus::Healthy) => {
140 unhealthy_count = 0;
141 *handle.state.lock().expect("state") = RunnerState::Healthy; }
143 Ok(FunctionHealthStatus::Unhealthy(reason)) => {
144 unhealthy_count = unhealthy_count.saturating_add(1);
145 if unhealthy_count >= 2 {
146 *handle.state.lock().expect("state") = RunnerState::Unhealthy { since: std::time::Instant::now(), reason }; }
148 }
149 Err(err) => {
150 *handle.state.lock().expect("state") = RunnerState::Failed { reason: err.to_string() }; }
152 }
153 }
154 }
155 }
156 });
157 }
158
159 pub(crate) async fn rollback_start(
160 &self,
161 spawned: &[(RunnerPoolKey, crate::pool::RunnerHandle)],
162 registered_refs: &[((FunctionId, Option<String>), RunnerPoolKey)],
163 pending: &[(FunctionDefinition, Option<String>)],
164 ) {
165 for (ref_key, _pool_key) in registered_refs {
166 self.invoker.pool.ref_counts.remove(ref_key);
167 self.invoker.pool.function_to_key.remove(ref_key);
168 let still_used = self
169 .invoker
170 .pool
171 .function_to_key
172 .iter()
173 .any(|kv| kv.key().0 == ref_key.0);
174 if !still_used {
175 self.invoker
176 .function_timeouts
177 .lock()
178 .expect("function_timeouts") .remove(&ref_key.0);
180 }
181 }
182 for (key, handle) in spawned {
183 self.invoker.pool.handles.remove(key);
184 handle.cancel.cancel();
185 let _ = self.provider.shutdown(handle.clone()).await;
186 }
187 self.invoker
188 .pending
189 .lock()
190 .expect("pending") .extend(pending.iter().cloned());
192 }
193}
194
195#[async_trait::async_trait]
196impl Lifecycle for FunctionRuntimeService {
197 fn name(&self) -> &str {
198 "function-runtime"
199 }
200
201 fn status(&self) -> ServiceStatus {
202 match self.status.load(Ordering::SeqCst) {
203 STATUS_STOPPED => ServiceStatus::Stopped,
204 STATUS_STARTED => ServiceStatus::Started,
205 _ => ServiceStatus::Failed,
206 }
207 }
208
209 async fn start(&mut self) -> Result<(), CamelError> {
210 self.config.validate()?;
211 if self.status.load(Ordering::SeqCst) == STATUS_STARTED {
212 return Ok(());
213 }
214 let pending = {
215 let mut lock = self.invoker.pending.lock().expect("pending"); std::mem::take(&mut *lock)
217 };
218 let mut grouped: std::collections::HashMap<
219 RunnerPoolKey,
220 Vec<(FunctionDefinition, Option<String>)>,
221 > = std::collections::HashMap::new();
222 for (def, route_id) in pending.iter().cloned() {
223 grouped
224 .entry(RunnerPoolKey {
225 runtime: def.runtime.clone(),
226 })
227 .or_default()
228 .push((def, route_id));
229 }
230 let mut spawned: Vec<(RunnerPoolKey, crate::pool::RunnerHandle)> = Vec::new();
231 let mut registered_refs: Vec<((FunctionId, Option<String>), RunnerPoolKey)> = Vec::new();
232 for (key, defs) in grouped {
233 let handle = match self.provider.spawn(&key).await {
234 Ok(h) => h,
235 Err(e) => {
236 self.rollback_start(&spawned, ®istered_refs, &pending)
237 .await;
238 self.status.store(STATUS_FAILED, Ordering::SeqCst);
239 return Err(CamelError::Config(format!("function: spawn failed: {e}")));
240 }
241 };
242 match self.wait_until_healthy(&handle).await {
243 Ok(()) => {}
244 Err(e) => {
245 handle.cancel.cancel();
246 let _ = self.provider.shutdown(handle).await;
247 self.rollback_start(&spawned, ®istered_refs, &pending)
248 .await;
249 self.status.store(STATUS_FAILED, Ordering::SeqCst);
250 return Err(CamelError::Config(format!("function: boot timeout: {e}")));
251 }
252 }
253 self.invoker
254 .pool
255 .handles
256 .insert(key.clone(), handle.clone());
257 self.spawn_health_task(handle.clone());
258 spawned.push((key.clone(), handle.clone()));
259 for (def, route_id) in defs {
260 if let Err(err) = self.provider.register(&handle, &def).await {
261 self.rollback_start(&spawned, ®istered_refs, &pending)
262 .await;
263 self.status.store(STATUS_FAILED, Ordering::SeqCst);
264 return Err(CamelError::Config(format!(
265 "function: register failed: {err}"
266 )));
267 }
268 let ref_key = (def.id.clone(), route_id.clone());
269 self.invoker.pool.ref_counts.insert(ref_key.clone(), 1);
270 self.invoker
271 .pool
272 .function_to_key
273 .insert(ref_key.clone(), key.clone());
274 registered_refs.push((ref_key, key.clone()));
275 }
276 }
277 self.invoker.started.store(true, Ordering::SeqCst);
278 self.status.store(STATUS_STARTED, Ordering::SeqCst);
279 Ok(())
280 }
281
282 async fn stop(&mut self) -> Result<(), CamelError> {
283 let handles: Vec<_> = self
284 .invoker
285 .pool
286 .handles
287 .iter()
288 .map(|h| h.clone())
289 .collect();
290 self.invoker.pool.handles.clear();
291 self.invoker.pool.ref_counts.clear();
292 self.invoker.pool.function_to_key.clear();
293 self.invoker
294 .function_timeouts
295 .lock()
296 .expect("function_timeouts") .clear();
298 for handle in handles {
299 handle.cancel.cancel();
300 self.provider
301 .shutdown(handle)
302 .await
303 .map_err(|e| CamelError::ProcessorError(e.to_string()))?;
304 }
305 self.invoker.started.store(false, Ordering::SeqCst);
306 self.status.store(STATUS_STOPPED, Ordering::SeqCst);
307 Ok(())
308 }
309
310 fn as_function_invoker(&self) -> Option<Arc<dyn FunctionInvoker>> {
311 Some(self.invoker.clone() as Arc<dyn FunctionInvoker>)
312 }
313}