1use crate::pool::{RunnerHandle, RunnerPoolKey};
2use camel_api::{Exchange, function::*};
3use std::time::Duration;
4
5mod sealed {
6 pub trait Sealed {}
7}
8
9#[derive(Debug, Clone)]
10pub enum HealthReport {
11 Healthy,
12 Unhealthy(String),
13}
14
15#[derive(Debug, thiserror::Error)]
16pub enum ProviderError {
17 #[error("spawn failed: {0}")]
18 SpawnFailed(String),
19 #[error("health check failed: {0}")]
20 HealthFailed(String),
21 #[error("register failed: {0}")]
22 RegisterFailed(String),
23 #[error("unregister failed: {0}")]
24 UnregisterFailed(String),
25 #[error("invoke failed: {0}")]
26 InvokeFailed(String),
27 #[error("shutdown failed: {0}")]
28 ShutdownFailed(String),
29 #[error("boot timeout")]
30 BootTimeout,
31}
32
33#[async_trait::async_trait]
34pub(crate) trait FunctionProvider: Send + Sync + sealed::Sealed {
35 async fn spawn(&self, key: &RunnerPoolKey) -> Result<RunnerHandle, ProviderError>;
36 async fn shutdown(&self, handle: RunnerHandle) -> Result<(), ProviderError>;
37 async fn health(&self, handle: &RunnerHandle) -> Result<HealthReport, ProviderError>;
38 async fn register(
39 &self,
40 handle: &RunnerHandle,
41 def: &FunctionDefinition,
42 ) -> Result<(), ProviderError>;
43 async fn unregister(&self, handle: &RunnerHandle, id: &FunctionId)
44 -> Result<(), ProviderError>;
45 async fn invoke(
46 &self,
47 handle: &RunnerHandle,
48 id: &FunctionId,
49 ex: &Exchange,
50 timeout: Duration,
51 ) -> Result<ExchangePatch, ProviderError>;
52}
53
54pub mod container;
55pub mod fake {
56 use super::*;
57 use std::collections::{HashMap, HashSet};
58 use std::sync::atomic::{AtomicUsize, Ordering};
59 use std::sync::{Arc, Mutex};
60 use tokio_util::sync::CancellationToken;
61
62 #[derive(Debug, Clone, Default)]
63 pub struct FakeProviderConfig {
64 pub fail_on_spawn: bool,
65 pub fail_on_register: usize,
66 pub fail_on_health: bool,
67 pub invoke_response: Option<ExchangePatch>,
68 }
69
70 #[derive(Debug, Clone)]
71 pub enum FakeCall {
72 Spawn(RunnerPoolKey),
73 Shutdown(RunnerPoolKey),
74 Health(String),
75 Register(String, FunctionId),
76 Unregister(String, FunctionId),
77 Invoke(String, FunctionId),
78 }
79
80 pub struct FakeProvider {
81 pub config: Arc<Mutex<FakeProviderConfig>>,
82 pub calls: Arc<Mutex<Vec<FakeCall>>>,
83 pub registered: Arc<Mutex<HashMap<String, HashSet<FunctionId>>>>,
84 pub spawned: Arc<Mutex<Vec<RunnerPoolKey>>>,
85 pub shutdowns: Arc<Mutex<Vec<RunnerPoolKey>>>,
86 register_ok_count: Arc<Mutex<usize>>,
87 spawn_count: AtomicUsize,
88 }
89
90 impl FakeProvider {
91 pub fn new(config: FakeProviderConfig) -> Self {
92 Self {
93 config: Arc::new(Mutex::new(config)),
94 calls: Arc::new(Mutex::new(Vec::new())),
95 registered: Arc::new(Mutex::new(HashMap::new())),
96 spawned: Arc::new(Mutex::new(Vec::new())),
97 shutdowns: Arc::new(Mutex::new(Vec::new())),
98 register_ok_count: Arc::new(Mutex::new(0)),
99 spawn_count: AtomicUsize::new(0),
100 }
101 }
102
103 pub fn spawn_count(&self) -> usize {
104 self.spawn_count.load(Ordering::SeqCst)
105 }
106 }
107
108 impl super::sealed::Sealed for FakeProvider {}
109
110 #[async_trait::async_trait]
111 impl FunctionProvider for FakeProvider {
112 async fn spawn(&self, key: &RunnerPoolKey) -> Result<RunnerHandle, ProviderError> {
113 self.spawn_count.fetch_add(1, Ordering::SeqCst);
114 self.calls
115 .lock()
116 .expect("calls")
117 .push(FakeCall::Spawn(key.clone()));
118 self.spawned.lock().expect("spawned").push(key.clone());
119 if self.config.lock().expect("config").fail_on_spawn {
120 return Err(ProviderError::SpawnFailed("configured".into()));
121 }
122 Ok(RunnerHandle {
123 id: format!("fake-{}", key.runtime),
124 state: Arc::new(Mutex::new(crate::pool::RunnerState::Booting)),
125 cancel: CancellationToken::new(),
126 })
127 }
128
129 async fn shutdown(&self, handle: RunnerHandle) -> Result<(), ProviderError> {
130 self.calls
131 .lock()
132 .expect("calls")
133 .push(FakeCall::Shutdown(RunnerPoolKey {
134 runtime: handle.id.replace("fake-", ""),
135 }));
136 self.shutdowns
137 .lock()
138 .expect("shutdowns")
139 .push(RunnerPoolKey {
140 runtime: handle.id.replace("fake-", ""),
141 });
142 Ok(())
143 }
144
145 async fn health(&self, handle: &RunnerHandle) -> Result<HealthReport, ProviderError> {
146 self.calls
147 .lock()
148 .expect("calls")
149 .push(FakeCall::Health(handle.id.clone()));
150 if self.config.lock().expect("config").fail_on_health {
151 return Ok(HealthReport::Unhealthy("configured".into()));
152 }
153 Ok(HealthReport::Healthy)
154 }
155
156 async fn register(
157 &self,
158 handle: &RunnerHandle,
159 def: &FunctionDefinition,
160 ) -> Result<(), ProviderError> {
161 self.calls
162 .lock()
163 .expect("calls")
164 .push(FakeCall::Register(handle.id.clone(), def.id.clone()));
165 let mut count = self.register_ok_count.lock().expect("count");
166 let cfg = self.config.lock().expect("config").clone();
167 if cfg.fail_on_register > 0 && *count >= cfg.fail_on_register {
168 return Err(ProviderError::RegisterFailed("configured".into()));
169 }
170 *count += 1;
171 self.registered
172 .lock()
173 .expect("registered")
174 .entry(handle.id.clone())
175 .or_default()
176 .insert(def.id.clone());
177 Ok(())
178 }
179
180 async fn unregister(
181 &self,
182 handle: &RunnerHandle,
183 id: &FunctionId,
184 ) -> Result<(), ProviderError> {
185 self.calls
186 .lock()
187 .expect("calls")
188 .push(FakeCall::Unregister(handle.id.clone(), id.clone()));
189 if let Some(set) = self
190 .registered
191 .lock()
192 .expect("registered")
193 .get_mut(&handle.id)
194 {
195 set.remove(id);
196 }
197 Ok(())
198 }
199
200 async fn invoke(
201 &self,
202 handle: &RunnerHandle,
203 id: &FunctionId,
204 _ex: &Exchange,
205 _timeout: Duration,
206 ) -> Result<ExchangePatch, ProviderError> {
207 self.calls
208 .lock()
209 .expect("calls")
210 .push(FakeCall::Invoke(handle.id.clone(), id.clone()));
211 let exists = self
212 .registered
213 .lock()
214 .expect("registered")
215 .get(&handle.id)
216 .map(|s| s.contains(id))
217 .unwrap_or(false);
218 if !exists {
219 return Err(ProviderError::InvokeFailed("not registered".into()));
220 }
221 let cfg = self.config.lock().expect("config").clone();
222 Ok(cfg.invoke_response.unwrap_or_default())
223 }
224 }
225}