1use crate::config::FunctionConfig;
2use crate::invoker::DefaultFunctionInvoker;
3use crate::pool::{RunnerPool, RunnerPoolKey, RunnerState};
4use crate::provider::{FunctionProvider, HealthReport, ProviderError};
5use camel_api::function::{FunctionDefinition, FunctionId, FunctionInvoker};
6use camel_api::{CamelError, Lifecycle, ServiceStatus};
7use std::sync::Arc;
8use std::sync::atomic::{AtomicU8, Ordering};
9
10const STATUS_STOPPED: u8 = 0;
11const STATUS_STARTED: u8 = 1;
12const STATUS_FAILED: u8 = 2;
13
14pub struct FunctionRuntimeService {
15 config: FunctionConfig,
16 provider: Arc<dyn FunctionProvider>,
17 container_provider: Option<Arc<crate::provider::container::ContainerProvider>>,
18 pub(crate) invoker: Arc<DefaultFunctionInvoker>,
19 status: Arc<AtomicU8>,
20}
21
22impl FunctionRuntimeService {
23 pub(crate) fn new(config: FunctionConfig, provider: Arc<dyn FunctionProvider>) -> Self {
24 let pool = Arc::new(RunnerPool::new());
25 let invoker = Arc::new(DefaultFunctionInvoker::new(
26 Arc::clone(&pool),
27 Arc::clone(&provider),
28 config.clone(),
29 ));
30 Self {
31 config,
32 provider,
33 container_provider: None,
34 invoker,
35 status: Arc::new(AtomicU8::new(STATUS_STOPPED)),
36 }
37 }
38
39 pub fn with_fake_provider(
40 config: FunctionConfig,
41 provider: Arc<crate::provider::fake::FakeProvider>,
42 ) -> Self {
43 Self::new(config, provider as Arc<dyn FunctionProvider>)
44 }
45
46 pub fn with_container_provider(
47 config: FunctionConfig,
48 provider: crate::provider::container::ContainerProvider,
49 ) -> Self {
50 let arc = Arc::new(provider);
51 let mut svc = Self::new(config, arc.clone() as Arc<dyn FunctionProvider>);
52 svc.container_provider = Some(arc);
53 svc
54 }
55
56 pub fn with_default_container_provider(
57 config: FunctionConfig,
58 ) -> Result<Self, crate::provider::ProviderError> {
59 let provider = crate::provider::container::ContainerProvider::builder().build()?;
60 Ok(Self::with_container_provider(config, provider))
61 }
62
63 pub fn invoker(&self) -> Arc<dyn FunctionInvoker> {
64 self.invoker.clone() as Arc<dyn FunctionInvoker>
65 }
66
67 pub fn provider(&self) -> &crate::provider::container::ContainerProvider {
68 self.container_provider
69 .as_ref()
70 .expect("not a container provider")
71 }
72
73 pub fn runner_state(&self, runtime: &str) -> Option<RunnerState> {
74 let key = RunnerPoolKey {
75 runtime: runtime.to_string(),
76 };
77 self.invoker
78 .pool
79 .handles
80 .get(&key)
81 .map(|h| h.state.lock().expect("state").clone())
82 }
83
84 pub fn force_runner_failed(&self, runtime: &str, reason: &str) {
85 let key = RunnerPoolKey {
86 runtime: runtime.to_string(),
87 };
88 if let Some(handle) = self.invoker.pool.handles.get(&key) {
89 *handle.state.lock().expect("state") = RunnerState::Failed {
90 reason: reason.to_string(),
91 };
92 }
93 }
94
95 pub(crate) async fn wait_until_healthy(
96 &self,
97 handle: &crate::pool::RunnerHandle,
98 ) -> Result<(), ProviderError> {
99 let deadline = tokio::time::Instant::now() + self.config.boot_timeout;
100 loop {
101 if tokio::time::Instant::now() > deadline {
102 return Err(ProviderError::BootTimeout);
103 }
104 match self.provider.health(handle).await {
105 Ok(HealthReport::Healthy) => {
106 *handle.state.lock().expect("state") = RunnerState::Healthy;
107 return Ok(());
108 }
109 Ok(HealthReport::Unhealthy(reason)) => {
110 *handle.state.lock().expect("state") = RunnerState::Unhealthy {
111 since: std::time::Instant::now(),
112 reason,
113 };
114 tokio::time::sleep(self.config.health_interval).await;
115 }
116 Err(_) => {
117 tokio::time::sleep(self.config.health_interval).await;
118 }
119 }
120 }
121 }
122
123 pub(crate) fn spawn_health_task(&self, handle: crate::pool::RunnerHandle) {
124 let provider = Arc::clone(&self.provider);
125 let interval = self.config.health_interval;
126 tokio::spawn(async move {
127 let mut ticks = tokio::time::interval(interval);
128 let mut unhealthy_count = 0u8;
129 loop {
130 tokio::select! {
131 _ = handle.cancel.cancelled() => break,
132 _ = ticks.tick() => {
133 match provider.health(&handle).await {
134 Ok(HealthReport::Healthy) => {
135 unhealthy_count = 0;
136 *handle.state.lock().expect("state") = RunnerState::Healthy;
137 }
138 Ok(HealthReport::Unhealthy(reason)) => {
139 unhealthy_count = unhealthy_count.saturating_add(1);
140 if unhealthy_count >= 2 {
141 *handle.state.lock().expect("state") = RunnerState::Unhealthy { since: std::time::Instant::now(), reason };
142 }
143 }
144 Err(err) => {
145 *handle.state.lock().expect("state") = RunnerState::Failed { reason: err.to_string() };
146 }
147 }
148 }
149 }
150 }
151 });
152 }
153
154 pub(crate) async fn rollback_start(
155 &self,
156 spawned: &[(RunnerPoolKey, crate::pool::RunnerHandle)],
157 registered_refs: &[((FunctionId, Option<String>), RunnerPoolKey)],
158 pending: &[(FunctionDefinition, Option<String>)],
159 ) {
160 for (ref_key, _pool_key) in registered_refs {
161 self.invoker.pool.ref_counts.remove(ref_key);
162 self.invoker.pool.function_to_key.remove(ref_key);
163 let still_used = self
164 .invoker
165 .pool
166 .function_to_key
167 .iter()
168 .any(|kv| kv.key().0 == ref_key.0);
169 if !still_used {
170 self.invoker
171 .function_timeouts
172 .lock()
173 .expect("function_timeouts")
174 .remove(&ref_key.0);
175 }
176 }
177 for (key, handle) in spawned {
178 self.invoker.pool.handles.remove(key);
179 handle.cancel.cancel();
180 let _ = self.provider.shutdown(handle.clone()).await;
181 }
182 self.invoker
183 .pending
184 .lock()
185 .expect("pending")
186 .extend(pending.iter().cloned());
187 }
188}
189
190#[async_trait::async_trait]
191impl Lifecycle for FunctionRuntimeService {
192 fn name(&self) -> &str {
193 "function-runtime"
194 }
195
196 fn status(&self) -> ServiceStatus {
197 match self.status.load(Ordering::SeqCst) {
198 STATUS_STOPPED => ServiceStatus::Stopped,
199 STATUS_STARTED => ServiceStatus::Started,
200 _ => ServiceStatus::Failed,
201 }
202 }
203
204 async fn start(&mut self) -> Result<(), CamelError> {
205 if self.status.load(Ordering::SeqCst) == STATUS_STARTED {
206 return Ok(());
207 }
208 let pending = {
209 let mut lock = self.invoker.pending.lock().expect("pending");
210 std::mem::take(&mut *lock)
211 };
212 let mut grouped: std::collections::HashMap<
213 RunnerPoolKey,
214 Vec<(FunctionDefinition, Option<String>)>,
215 > = std::collections::HashMap::new();
216 for (def, route_id) in pending.iter().cloned() {
217 grouped
218 .entry(RunnerPoolKey {
219 runtime: def.runtime.clone(),
220 })
221 .or_default()
222 .push((def, route_id));
223 }
224 let mut spawned: Vec<(RunnerPoolKey, crate::pool::RunnerHandle)> = Vec::new();
225 let mut registered_refs: Vec<((FunctionId, Option<String>), RunnerPoolKey)> = Vec::new();
226 for (key, defs) in grouped {
227 let handle = match self.provider.spawn(&key).await {
228 Ok(h) => h,
229 Err(e) => {
230 self.rollback_start(&spawned, ®istered_refs, &pending)
231 .await;
232 self.status.store(STATUS_FAILED, Ordering::SeqCst);
233 return Err(CamelError::Config(format!("function: spawn failed: {e}")));
234 }
235 };
236 match self.wait_until_healthy(&handle).await {
237 Ok(()) => {}
238 Err(e) => {
239 handle.cancel.cancel();
240 let _ = self.provider.shutdown(handle).await;
241 self.rollback_start(&spawned, ®istered_refs, &pending)
242 .await;
243 self.status.store(STATUS_FAILED, Ordering::SeqCst);
244 return Err(CamelError::Config(format!("function: boot timeout: {e}")));
245 }
246 }
247 self.invoker
248 .pool
249 .handles
250 .insert(key.clone(), handle.clone());
251 self.spawn_health_task(handle.clone());
252 spawned.push((key.clone(), handle.clone()));
253 for (def, route_id) in defs {
254 if let Err(err) = self.provider.register(&handle, &def).await {
255 self.rollback_start(&spawned, ®istered_refs, &pending)
256 .await;
257 self.status.store(STATUS_FAILED, Ordering::SeqCst);
258 return Err(CamelError::Config(format!(
259 "function: register failed: {err}"
260 )));
261 }
262 let ref_key = (def.id.clone(), route_id.clone());
263 self.invoker.pool.ref_counts.insert(ref_key.clone(), 1);
264 self.invoker
265 .pool
266 .function_to_key
267 .insert(ref_key.clone(), key.clone());
268 registered_refs.push((ref_key, key.clone()));
269 }
270 }
271 self.invoker.started.store(true, Ordering::SeqCst);
272 self.status.store(STATUS_STARTED, Ordering::SeqCst);
273 Ok(())
274 }
275
276 async fn stop(&mut self) -> Result<(), CamelError> {
277 let handles: Vec<_> = self
278 .invoker
279 .pool
280 .handles
281 .iter()
282 .map(|h| h.clone())
283 .collect();
284 self.invoker.pool.handles.clear();
285 self.invoker.pool.ref_counts.clear();
286 self.invoker.pool.function_to_key.clear();
287 self.invoker
288 .function_timeouts
289 .lock()
290 .expect("function_timeouts")
291 .clear();
292 for handle in handles {
293 handle.cancel.cancel();
294 self.provider
295 .shutdown(handle)
296 .await
297 .map_err(|e| CamelError::ProcessorError(e.to_string()))?;
298 }
299 self.invoker.started.store(false, Ordering::SeqCst);
300 self.status.store(STATUS_STOPPED, Ordering::SeqCst);
301 Ok(())
302 }
303
304 fn as_function_invoker(&self) -> Option<Arc<dyn FunctionInvoker>> {
305 Some(self.invoker.clone() as Arc<dyn FunctionInvoker>)
306 }
307}