1use crate::config::FunctionConfig;
2use crate::invoker::DefaultFunctionInvoker;
3use crate::pool::{RunnerPool, RunnerPoolKey, RunnerState};
4use crate::provider::{FunctionProvider, HealthReport, ProviderError};
5use camel_api::function::{FunctionDefinition, FunctionId, FunctionInvoker};
6use camel_api::{CamelError, Lifecycle, ServiceStatus};
7use std::sync::Arc;
8use std::sync::atomic::{AtomicU8, Ordering};
9
10const STATUS_STOPPED: u8 = 0;
11const STATUS_STARTED: u8 = 1;
12const STATUS_FAILED: u8 = 2;
13
14pub struct FunctionRuntimeService {
15 config: FunctionConfig,
16 provider: Arc<dyn FunctionProvider>,
17 container_provider: Option<Arc<crate::provider::container::ContainerProvider>>,
18 pub(crate) invoker: Arc<DefaultFunctionInvoker>,
19 status: Arc<AtomicU8>,
20}
21
22impl FunctionRuntimeService {
23 pub(crate) fn new(config: FunctionConfig, provider: Arc<dyn FunctionProvider>) -> Self {
24 let pool = Arc::new(RunnerPool::new());
25 let invoker = Arc::new(DefaultFunctionInvoker::new(
26 Arc::clone(&pool),
27 Arc::clone(&provider),
28 config.clone(),
29 ));
30 Self {
31 config,
32 provider,
33 container_provider: None,
34 invoker,
35 status: Arc::new(AtomicU8::new(STATUS_STOPPED)),
36 }
37 }
38
39 pub fn with_fake_provider(
40 config: FunctionConfig,
41 provider: Arc<crate::provider::fake::FakeProvider>,
42 ) -> Self {
43 Self::new(config, provider as Arc<dyn FunctionProvider>)
44 }
45
46 pub fn with_container_provider(
47 config: FunctionConfig,
48 provider: crate::provider::container::ContainerProvider,
49 ) -> Self {
50 let arc = Arc::new(provider);
51 let mut svc = Self::new(config, arc.clone() as Arc<dyn FunctionProvider>);
52 svc.container_provider = Some(arc);
53 svc
54 }
55
56 pub fn with_default_container_provider(
57 config: FunctionConfig,
58 ) -> Result<Self, crate::provider::ProviderError> {
59 let provider = crate::provider::container::ContainerProvider::builder().build()?;
60 Ok(Self::with_container_provider(config, provider))
61 }
62
63 pub fn invoker(&self) -> Arc<dyn FunctionInvoker> {
64 self.invoker.clone() as Arc<dyn FunctionInvoker>
65 }
66
67 pub fn provider(&self) -> &crate::provider::container::ContainerProvider {
68 self.container_provider
69 .as_ref()
70 .expect("not a container provider") }
72
73 pub fn runner_state(&self, runtime: &str) -> Option<RunnerState> {
74 let key = RunnerPoolKey {
75 runtime: runtime.to_string(),
76 };
77 self.invoker
78 .pool
79 .handles
80 .get(&key)
81 .map(|h| h.state.lock().expect("state").clone()) }
83
84 pub fn force_runner_failed(&self, runtime: &str, reason: &str) {
85 let key = RunnerPoolKey {
86 runtime: runtime.to_string(),
87 };
88 if let Some(handle) = self.invoker.pool.handles.get(&key) {
89 *handle.state.lock().expect("state") = RunnerState::Failed {
90 reason: reason.to_string(),
92 };
93 }
94 }
95
96 pub(crate) async fn wait_until_healthy(
97 &self,
98 handle: &crate::pool::RunnerHandle,
99 ) -> Result<(), ProviderError> {
100 let deadline = tokio::time::Instant::now() + self.config.boot_timeout;
101 loop {
102 if tokio::time::Instant::now() > deadline {
103 return Err(ProviderError::BootTimeout);
104 }
105 match self.provider.health(handle).await {
106 Ok(HealthReport::Healthy) => {
107 *handle.state.lock().expect("state") = RunnerState::Healthy; return Ok(());
109 }
110 Ok(HealthReport::Unhealthy(reason)) => {
111 *handle.state.lock().expect("state") = RunnerState::Unhealthy {
112 since: std::time::Instant::now(),
114 reason,
115 };
116 tokio::time::sleep(self.config.health_interval).await;
117 }
118 Err(_) => {
119 tokio::time::sleep(self.config.health_interval).await;
120 }
121 }
122 }
123 }
124
125 pub(crate) fn spawn_health_task(&self, handle: crate::pool::RunnerHandle) {
126 let provider = Arc::clone(&self.provider);
127 let interval = self.config.health_interval;
128 tokio::spawn(async move {
129 let mut ticks = tokio::time::interval(interval);
130 let mut unhealthy_count = 0u8;
131 loop {
132 tokio::select! {
133 _ = handle.cancel.cancelled() => break,
134 _ = ticks.tick() => {
135 match provider.health(&handle).await {
136 Ok(HealthReport::Healthy) => {
137 unhealthy_count = 0;
138 *handle.state.lock().expect("state") = RunnerState::Healthy; }
140 Ok(HealthReport::Unhealthy(reason)) => {
141 unhealthy_count = unhealthy_count.saturating_add(1);
142 if unhealthy_count >= 2 {
143 *handle.state.lock().expect("state") = RunnerState::Unhealthy { since: std::time::Instant::now(), reason }; }
145 }
146 Err(err) => {
147 *handle.state.lock().expect("state") = RunnerState::Failed { reason: err.to_string() }; }
149 }
150 }
151 }
152 }
153 });
154 }
155
156 pub(crate) async fn rollback_start(
157 &self,
158 spawned: &[(RunnerPoolKey, crate::pool::RunnerHandle)],
159 registered_refs: &[((FunctionId, Option<String>), RunnerPoolKey)],
160 pending: &[(FunctionDefinition, Option<String>)],
161 ) {
162 for (ref_key, _pool_key) in registered_refs {
163 self.invoker.pool.ref_counts.remove(ref_key);
164 self.invoker.pool.function_to_key.remove(ref_key);
165 let still_used = self
166 .invoker
167 .pool
168 .function_to_key
169 .iter()
170 .any(|kv| kv.key().0 == ref_key.0);
171 if !still_used {
172 self.invoker
173 .function_timeouts
174 .lock()
175 .expect("function_timeouts") .remove(&ref_key.0);
177 }
178 }
179 for (key, handle) in spawned {
180 self.invoker.pool.handles.remove(key);
181 handle.cancel.cancel();
182 let _ = self.provider.shutdown(handle.clone()).await;
183 }
184 self.invoker
185 .pending
186 .lock()
187 .expect("pending") .extend(pending.iter().cloned());
189 }
190}
191
192#[async_trait::async_trait]
193impl Lifecycle for FunctionRuntimeService {
194 fn name(&self) -> &str {
195 "function-runtime"
196 }
197
198 fn status(&self) -> ServiceStatus {
199 match self.status.load(Ordering::SeqCst) {
200 STATUS_STOPPED => ServiceStatus::Stopped,
201 STATUS_STARTED => ServiceStatus::Started,
202 _ => ServiceStatus::Failed,
203 }
204 }
205
206 async fn start(&mut self) -> Result<(), CamelError> {
207 if self.status.load(Ordering::SeqCst) == STATUS_STARTED {
208 return Ok(());
209 }
210 let pending = {
211 let mut lock = self.invoker.pending.lock().expect("pending"); std::mem::take(&mut *lock)
213 };
214 let mut grouped: std::collections::HashMap<
215 RunnerPoolKey,
216 Vec<(FunctionDefinition, Option<String>)>,
217 > = std::collections::HashMap::new();
218 for (def, route_id) in pending.iter().cloned() {
219 grouped
220 .entry(RunnerPoolKey {
221 runtime: def.runtime.clone(),
222 })
223 .or_default()
224 .push((def, route_id));
225 }
226 let mut spawned: Vec<(RunnerPoolKey, crate::pool::RunnerHandle)> = Vec::new();
227 let mut registered_refs: Vec<((FunctionId, Option<String>), RunnerPoolKey)> = Vec::new();
228 for (key, defs) in grouped {
229 let handle = match self.provider.spawn(&key).await {
230 Ok(h) => h,
231 Err(e) => {
232 self.rollback_start(&spawned, ®istered_refs, &pending)
233 .await;
234 self.status.store(STATUS_FAILED, Ordering::SeqCst);
235 return Err(CamelError::Config(format!("function: spawn failed: {e}")));
236 }
237 };
238 match self.wait_until_healthy(&handle).await {
239 Ok(()) => {}
240 Err(e) => {
241 handle.cancel.cancel();
242 let _ = self.provider.shutdown(handle).await;
243 self.rollback_start(&spawned, ®istered_refs, &pending)
244 .await;
245 self.status.store(STATUS_FAILED, Ordering::SeqCst);
246 return Err(CamelError::Config(format!("function: boot timeout: {e}")));
247 }
248 }
249 self.invoker
250 .pool
251 .handles
252 .insert(key.clone(), handle.clone());
253 self.spawn_health_task(handle.clone());
254 spawned.push((key.clone(), handle.clone()));
255 for (def, route_id) in defs {
256 if let Err(err) = self.provider.register(&handle, &def).await {
257 self.rollback_start(&spawned, ®istered_refs, &pending)
258 .await;
259 self.status.store(STATUS_FAILED, Ordering::SeqCst);
260 return Err(CamelError::Config(format!(
261 "function: register failed: {err}"
262 )));
263 }
264 let ref_key = (def.id.clone(), route_id.clone());
265 self.invoker.pool.ref_counts.insert(ref_key.clone(), 1);
266 self.invoker
267 .pool
268 .function_to_key
269 .insert(ref_key.clone(), key.clone());
270 registered_refs.push((ref_key, key.clone()));
271 }
272 }
273 self.invoker.started.store(true, Ordering::SeqCst);
274 self.status.store(STATUS_STARTED, Ordering::SeqCst);
275 Ok(())
276 }
277
278 async fn stop(&mut self) -> Result<(), CamelError> {
279 let handles: Vec<_> = self
280 .invoker
281 .pool
282 .handles
283 .iter()
284 .map(|h| h.clone())
285 .collect();
286 self.invoker.pool.handles.clear();
287 self.invoker.pool.ref_counts.clear();
288 self.invoker.pool.function_to_key.clear();
289 self.invoker
290 .function_timeouts
291 .lock()
292 .expect("function_timeouts") .clear();
294 for handle in handles {
295 handle.cancel.cancel();
296 self.provider
297 .shutdown(handle)
298 .await
299 .map_err(|e| CamelError::ProcessorError(e.to_string()))?;
300 }
301 self.invoker.started.store(false, Ordering::SeqCst);
302 self.status.store(STATUS_STOPPED, Ordering::SeqCst);
303 Ok(())
304 }
305
306 fn as_function_invoker(&self) -> Option<Arc<dyn FunctionInvoker>> {
307 Some(self.invoker.clone() as Arc<dyn FunctionInvoker>)
308 }
309}