switchboard_container_utils/manager/
backoff.rs1use crate::manager::*;
2use async_trait::async_trait;
3use dashmap::{DashMap, DashSet};
4use futures_util::future::join_all;
5use std::{
6 ops::Add,
7 sync::Arc,
8 time::{Duration, SystemTime},
9};
10
11type FetchContainerResult<'a, T> =
12 std::pin::Pin<Box<dyn futures_util::Future<Output = Result<T, SbError>> + Send + 'a>>;
13
14#[derive(Clone, Debug)]
15pub struct ContainerManagerWithBackoff {
16 pub docker: Arc<Docker>,
17 pub docker_default_config: Config<String>,
18 pub docker_credentials: DockerCredentials,
19
20 pub active_functions: Arc<DashSet<String>>,
22 pub max_active_functions: usize,
23
24 pub function_backoff: Arc<DashMap<String, (i64, SystemTime)>>,
25 pub max_function_backoff: i64,
26
27 pub function_error_counter: Arc<DashMap<String, u32>>,
30 pub max_function_failures: u32,
36
37 pub container_blacklist: Arc<DashSet<String>>,
39}
40
41impl ContainerManagerWithBackoff {
42 pub fn new(docker: Arc<Docker>, default_docker_config: Option<Config<String>>) -> Self {
43 Self {
44 docker,
45 docker_credentials: DockerCredentials {
46 username: Some(std::env::var("DOCKER_USER").unwrap_or_default()),
47 password: Some(std::env::var("DOCKER_KEY").unwrap_or_default()),
48 ..Default::default()
49 },
50 docker_default_config: default_docker_config.unwrap_or(get_default_docker_config()),
51
52 active_functions: Arc::new(DashSet::new()),
53 max_active_functions: 1000,
54
55 function_backoff: Arc::new(DashMap::new()),
56 max_function_backoff: 300,
57
58 function_error_counter: Arc::new(DashMap::new()),
59 max_function_failures: 10,
60
61 container_blacklist: Arc::new(DashSet::new()),
62 }
63 }
64
65 pub fn is_function_ready(&self, function_key: &str, image_name: &str) -> ContainerResult<()> {
68 if self.container_blacklist.contains(image_name) {
69 debug!("Container is blacklisted: {:?}", image_name);
70 return Err(SbError::DockerFetchError);
71 }
72
73 if self.active_functions.contains(function_key) {
74 debug!("Function is active: {:?}", function_key);
75 return Err(SbError::ContainerActive);
76 }
77
78 if let Some(backoff) = self.is_function_backoff(function_key) {
79 debug!("Function backoff {} seconds: {:?}", backoff, function_key);
80 return Err(SbError::ContainerBackoff(backoff));
81 }
82
83 let function_error_count = self.get_function_error_count(function_key);
84 if function_error_count >= self.max_function_failures {
85 debug!(
86 "Function error count ({}) exceeds threshold ({}): {:?}",
87 function_error_count, self.max_function_failures, function_key
88 );
89 return Err(SbError::FunctionErrorCountExceeded(function_error_count));
90 }
91
92 Ok(())
93 }
94
95 pub fn is_ready_for_new_function(&self) -> bool {
96 self.max_active_functions > self.active_functions.len()
97 }
98
99 pub fn add_active_function(&self, function_key: &str) -> ContainerResult<bool> {
100 if !self.is_ready_for_new_function() {
101 return Err(SbError::ContainerCreateError(Arc::new(SbError::Message(
102 "DockerNotReady",
103 ))));
104 }
105
106 let was_added = self.active_functions.insert(function_key.to_string());
107 Ok(was_added)
108 }
109
110 pub fn remove_active_function(&self, function_key: &str) -> bool {
111 self.active_functions.remove(function_key).is_some()
112 }
113
114 pub fn get_function_backoff(&self, function_key: &str) -> Option<(i64, SystemTime)> {
115 if let Some(entry) = self.function_backoff.get(function_key) {
116 let (backoff, next_timestamp) = *entry;
117 Some((backoff, next_timestamp))
118 } else {
119 None
120 }
121 }
122
123 pub fn add_function_backoff(&self, function_key: &str) -> (i64, SystemTime) {
124 let (new_backoff, new_next_timestamp) =
125 if let Some(entry) = self.function_backoff.get_mut(function_key) {
126 let (backoff, next_timestamp) = *entry;
127 let new_backoff = std::cmp::min(self.max_function_backoff, backoff.to_owned() + 5);
128 let next_timestamp =
129 next_timestamp.add(Duration::from_secs(new_backoff.try_into().unwrap_or(5)));
130
131 (new_backoff, next_timestamp)
132 } else {
133 (5, SystemTime::now().add(Duration::from_secs(5)))
134 };
135
136 self.function_backoff
137 .insert(function_key.to_string(), (new_backoff, new_next_timestamp));
138
139 (new_backoff, new_next_timestamp)
140 }
141
142 pub fn remove_function_backoff(&self, function_key: &str) -> bool {
143 self.function_backoff.remove(function_key).is_some()
144 }
145
146 pub fn get_function_error_count(&self, function_key: &str) -> u32 {
147 if let Some(error_count) = self.function_error_counter.get(function_key) {
148 *error_count
149 } else {
150 0
151 }
152 }
153
154 pub fn is_function_backoff(&self, function_key: &str) -> Option<u64> {
155 if let Some(entry) = self.function_backoff.get(function_key) {
156 let (_backoff, next_allowed_run) = *entry;
157 if let Ok(duration) = next_allowed_run.duration_since(SystemTime::now()) {
158 let seconds_until_future_time = duration.as_secs();
159 return Some(seconds_until_future_time);
160 }
161 }
162
163 None
164 }
165
166 pub fn add_function_error(&self, function_key: &str) -> u32 {
167 let current_count = self.get_function_error_count(function_key);
168 let new_count = current_count + 1;
169
170 if let Some(mut entry) = self.function_error_counter.get_mut(function_key) {
171 *entry = new_count;
172 } else {
173 self.function_error_counter
174 .insert(function_key.to_string(), new_count);
175 }
176
177 new_count
178 }
179
180 pub fn reset_function_error(&self, function_key: &str) -> bool {
181 self.function_error_counter.remove(function_key).is_some()
182 }
183
184 pub fn blacklist_container(&self, image_name: &str) -> bool {
185 self.container_blacklist.insert(image_name.to_string())
186 }
187
188 pub fn whitelist_container(&self, image_name: &str) -> bool {
189 self.container_blacklist.remove(image_name).is_some()
190 }
191
192 pub async fn fetch_switchboard_docker_layers(&self) -> ContainerResult<()> {
194 let switchboard_image_name = "switchboardlabs/sgx-function".to_string();
195
196 let mut create_img_stream = self.docker.create_image(
197 Some(bollard::image::CreateImageOptions {
198 from_image: switchboard_image_name.clone(),
199 platform: "linux/amd64".to_string(),
200 ..Default::default()
201 }),
202 None,
203 Some(self.docker_credentials.clone()),
204 );
205
206 while let Some(Ok(progress)) = create_img_stream.next().await {
207 trace!(
208 "{:?} {:?} {:?} {:?}",
209 switchboard_image_name.clone(),
210 progress.id,
211 progress.status,
212 progress.progress,
213 { id: switchboard_image_name.clone() }
214 );
215 }
216
217 Ok(())
218 }
219
220 pub async fn fetch_images(&self, images: Vec<String>) -> ContainerResult<()> {
222 let futures_vec: Vec<FetchContainerResult<bollard::models::ImageInspect>> = images
223 .iter()
224 .map(|image_name| self.inspect_image(image_name.as_str()))
225 .collect();
226
227 let results = join_all(futures_vec).await;
228
229 for (i, result) in results.iter().enumerate() {
230 let image_name: &String = images.get(i).unwrap();
231 match result {
232 Ok(image) => {
233 if let Some(size) = image.size {
234 let size_in_mb = size / 1024 / 1024;
235 info!("{}: Size = {} MB", image_name.clone(), size_in_mb);
236
237 if size_in_mb > 750 && self.container_blacklist.insert(image_name.clone()) {
238 info!("Docker image blacklisted {}", image_name);
239 }
240 }
241
242 if self.container_blacklist.remove(image_name).is_some() {
243 info!("Docker image removed from blacklist: {}", image_name);
244 }
245 }
246 Err(e) => {
247 error!("Failed to inspect docker image {}: {:#?}", image_name, e);
248 if self.container_blacklist.insert(image_name.clone()) {
249 info!("Docker image blacklisted {}", image_name);
250 }
251 }
252 }
253 }
254
255 Ok(())
256 }
257}
258
259#[async_trait]
260impl ContainerManager for ContainerManagerWithBackoff {
261 fn docker(&self) -> &Arc<Docker> {
262 &self.docker
263 }
264
265 fn docker_credentials(&self) -> &DockerCredentials {
266 &self.docker_credentials
267 }
268
269 fn docker_default_config(&self) -> &Config<String> {
270 &self.docker_default_config
271 }
272
273 async fn create_docker_container(
274 &self,
275 function_key: &str,
276 image_name: &str,
277 env: Option<Vec<String>>,
278 overrides: Option<DockerContainerOverrides>,
279 ) -> ContainerResult<DockerContainer> {
280 self.is_function_ready(function_key, image_name)?;
282
283 let config =
284 self.get_container_config(image_name, env.clone(), overrides.unwrap_or_default());
285
286 let was_downloaded = self.fetch_image(image_name).await?;
289 if was_downloaded {
290 debug!("Downloaded image {}", image_name, { id: function_key });
291 }
292
293 match self
294 .docker()
295 .create_container::<String, _>(
296 Some(CreateContainerOptions {
297 ..Default::default()
298 }),
299 config.clone(),
300 )
301 .await
302 {
303 Ok(result) => {
304 info!("Created container for image {}", image_name, { id: function_key });
305
306 Ok(DockerContainer {
307 id: result.id,
308 image_name: image_name.to_string(),
309 env: env.unwrap_or_default(),
310 docker: self.docker().clone(),
311 config: config.clone(),
312 })
313 }
314 Err(error) => {
315 let error_message = format!(
316 "Failed to create container for image {}, {}",
317 image_name, error
318 );
319 info!("{}", error_message, { id: function_key });
320
321 Err(SbError::ContainerError(std::sync::Arc::new(error)))
322 }
323 }
324 }
325}