switchboard_container_utils/manager/
backoff.rs

1use crate::manager::*;
2use async_trait::async_trait;
3use dashmap::{DashMap, DashSet};
4use futures_util::future::join_all;
5use std::{
6    ops::Add,
7    sync::Arc,
8    time::{Duration, SystemTime},
9};
10
11type FetchContainerResult<'a, T> =
12    std::pin::Pin<Box<dyn futures_util::Future<Output = Result<T, SbError>> + Send + 'a>>;
13
14#[derive(Clone, Debug)]
15pub struct ContainerManagerWithBackoff {
16    pub docker: Arc<Docker>,
17    pub docker_default_config: Config<String>,
18    pub docker_credentials: DockerCredentials,
19
20    // A mapping of active functions and the timestamp they were created
21    pub active_functions: Arc<DashSet<String>>,
22    pub max_active_functions: usize,
23
24    pub function_backoff: Arc<DashMap<String, (i64, SystemTime)>>,
25    pub max_function_backoff: i64,
26
27    /// An incrementer for successive container failures
28    // TODO: should error counter be based on params passed or overall?
29    pub function_error_counter: Arc<DashMap<String, u32>>,
30    /// The maximum number of failures before disabling
31    // TODO: we need a way to classify failures into one of the following
32    // * container logic / emitted error codes
33    // * escrow balance issues (temp)
34    // * callback issues (view ixn simulation)
35    pub max_function_failures: u32,
36
37    // < container name (TODO: maybe do by mrenclave and have routine populate a mapping) >
38    pub container_blacklist: Arc<DashSet<String>>,
39}
40
41impl ContainerManagerWithBackoff {
42    pub fn new(docker: Arc<Docker>, default_docker_config: Option<Config<String>>) -> Self {
43        Self {
44            docker,
45            docker_credentials: DockerCredentials {
46                username: Some(std::env::var("DOCKER_USER").unwrap_or_default()),
47                password: Some(std::env::var("DOCKER_KEY").unwrap_or_default()),
48                ..Default::default()
49            },
50            docker_default_config: default_docker_config.unwrap_or(get_default_docker_config()),
51
52            active_functions: Arc::new(DashSet::new()),
53            max_active_functions: 1000,
54
55            function_backoff: Arc::new(DashMap::new()),
56            max_function_backoff: 300,
57
58            function_error_counter: Arc::new(DashMap::new()),
59            max_function_failures: 10,
60
61            container_blacklist: Arc::new(DashSet::new()),
62        }
63    }
64
65    /// Checks whether the given function_key (request/routine) and image_name is ready to be
66    /// executed based on the cache.
67    pub fn is_function_ready(&self, function_key: &str, image_name: &str) -> ContainerResult<()> {
68        if self.container_blacklist.contains(image_name) {
69            debug!("Container is blacklisted: {:?}", image_name);
70            return Err(SbError::DockerFetchError);
71        }
72
73        if self.active_functions.contains(function_key) {
74            debug!("Function is active: {:?}", function_key);
75            return Err(SbError::ContainerActive);
76        }
77
78        if let Some(backoff) = self.is_function_backoff(function_key) {
79            debug!("Function backoff {} seconds: {:?}", backoff, function_key);
80            return Err(SbError::ContainerBackoff(backoff));
81        }
82
83        let function_error_count = self.get_function_error_count(function_key);
84        if function_error_count >= self.max_function_failures {
85            debug!(
86                "Function error count ({}) exceeds threshold ({}): {:?}",
87                function_error_count, self.max_function_failures, function_key
88            );
89            return Err(SbError::FunctionErrorCountExceeded(function_error_count));
90        }
91
92        Ok(())
93    }
94
95    pub fn is_ready_for_new_function(&self) -> bool {
96        self.max_active_functions > self.active_functions.len()
97    }
98
99    pub fn add_active_function(&self, function_key: &str) -> ContainerResult<bool> {
100        if !self.is_ready_for_new_function() {
101            return Err(SbError::ContainerCreateError(Arc::new(SbError::Message(
102                "DockerNotReady",
103            ))));
104        }
105
106        let was_added = self.active_functions.insert(function_key.to_string());
107        Ok(was_added)
108    }
109
110    pub fn remove_active_function(&self, function_key: &str) -> bool {
111        self.active_functions.remove(function_key).is_some()
112    }
113
114    pub fn get_function_backoff(&self, function_key: &str) -> Option<(i64, SystemTime)> {
115        if let Some(entry) = self.function_backoff.get(function_key) {
116            let (backoff, next_timestamp) = *entry;
117            Some((backoff, next_timestamp))
118        } else {
119            None
120        }
121    }
122
123    pub fn add_function_backoff(&self, function_key: &str) -> (i64, SystemTime) {
124        let (new_backoff, new_next_timestamp) =
125            if let Some(entry) = self.function_backoff.get_mut(function_key) {
126                let (backoff, next_timestamp) = *entry;
127                let new_backoff = std::cmp::min(self.max_function_backoff, backoff.to_owned() + 5);
128                let next_timestamp =
129                    next_timestamp.add(Duration::from_secs(new_backoff.try_into().unwrap_or(5)));
130
131                (new_backoff, next_timestamp)
132            } else {
133                (5, SystemTime::now().add(Duration::from_secs(5)))
134            };
135
136        self.function_backoff
137            .insert(function_key.to_string(), (new_backoff, new_next_timestamp));
138
139        (new_backoff, new_next_timestamp)
140    }
141
142    pub fn remove_function_backoff(&self, function_key: &str) -> bool {
143        self.function_backoff.remove(function_key).is_some()
144    }
145
146    pub fn get_function_error_count(&self, function_key: &str) -> u32 {
147        if let Some(error_count) = self.function_error_counter.get(function_key) {
148            *error_count
149        } else {
150            0
151        }
152    }
153
154    pub fn is_function_backoff(&self, function_key: &str) -> Option<u64> {
155        if let Some(entry) = self.function_backoff.get(function_key) {
156            let (_backoff, next_allowed_run) = *entry;
157            if let Ok(duration) = next_allowed_run.duration_since(SystemTime::now()) {
158                let seconds_until_future_time = duration.as_secs();
159                return Some(seconds_until_future_time);
160            }
161        }
162
163        None
164    }
165
166    pub fn add_function_error(&self, function_key: &str) -> u32 {
167        let current_count = self.get_function_error_count(function_key);
168        let new_count = current_count + 1;
169
170        if let Some(mut entry) = self.function_error_counter.get_mut(function_key) {
171            *entry = new_count;
172        } else {
173            self.function_error_counter
174                .insert(function_key.to_string(), new_count);
175        }
176
177        new_count
178    }
179
180    pub fn reset_function_error(&self, function_key: &str) -> bool {
181        self.function_error_counter.remove(function_key).is_some()
182    }
183
184    pub fn blacklist_container(&self, image_name: &str) -> bool {
185        self.container_blacklist.insert(image_name.to_string())
186    }
187
188    pub fn whitelist_container(&self, image_name: &str) -> bool {
189        self.container_blacklist.remove(image_name).is_some()
190    }
191
192    /// Fetch all layers and tags for the docker image switchboardlabs/sgx-function
193    pub async fn fetch_switchboard_docker_layers(&self) -> ContainerResult<()> {
194        let switchboard_image_name = "switchboardlabs/sgx-function".to_string();
195
196        let mut create_img_stream = self.docker.create_image(
197            Some(bollard::image::CreateImageOptions {
198                from_image: switchboard_image_name.clone(),
199                platform: "linux/amd64".to_string(),
200                ..Default::default()
201            }),
202            None,
203            Some(self.docker_credentials.clone()),
204        );
205
206        while let Some(Ok(progress)) = create_img_stream.next().await {
207            trace!(
208                "{:?} {:?} {:?} {:?}",
209                switchboard_image_name.clone(),
210                progress.id,
211                progress.status,
212                progress.progress,
213                { id: switchboard_image_name.clone() }
214            );
215        }
216
217        Ok(())
218    }
219
220    /// Fetch a list of docker images from the registry and blacklist images that cannot be pulled
221    pub async fn fetch_images(&self, images: Vec<String>) -> ContainerResult<()> {
222        let futures_vec: Vec<FetchContainerResult<bollard::models::ImageInspect>> = images
223            .iter()
224            .map(|image_name| self.inspect_image(image_name.as_str()))
225            .collect();
226
227        let results = join_all(futures_vec).await;
228
229        for (i, result) in results.iter().enumerate() {
230            let image_name: &String = images.get(i).unwrap();
231            match result {
232                Ok(image) => {
233                    if let Some(size) = image.size {
234                        let size_in_mb = size / 1024 / 1024;
235                        info!("{}: Size = {} MB", image_name.clone(), size_in_mb);
236
237                        if size_in_mb > 750 && self.container_blacklist.insert(image_name.clone()) {
238                            info!("Docker image blacklisted {}", image_name);
239                        }
240                    }
241
242                    if self.container_blacklist.remove(image_name).is_some() {
243                        info!("Docker image removed from blacklist: {}", image_name);
244                    }
245                }
246                Err(e) => {
247                    error!("Failed to inspect docker image {}: {:#?}", image_name, e);
248                    if self.container_blacklist.insert(image_name.clone()) {
249                        info!("Docker image blacklisted {}", image_name);
250                    }
251                }
252            }
253        }
254
255        Ok(())
256    }
257}
258
259#[async_trait]
260impl ContainerManager for ContainerManagerWithBackoff {
261    fn docker(&self) -> &Arc<Docker> {
262        &self.docker
263    }
264
265    fn docker_credentials(&self) -> &DockerCredentials {
266        &self.docker_credentials
267    }
268
269    fn docker_default_config(&self) -> &Config<String> {
270        &self.docker_default_config
271    }
272
273    async fn create_docker_container(
274        &self,
275        function_key: &str,
276        image_name: &str,
277        env: Option<Vec<String>>,
278        overrides: Option<DockerContainerOverrides>,
279    ) -> ContainerResult<DockerContainer> {
280        // Verify there is capacity and the function is not already being executed
281        self.is_function_ready(function_key, image_name)?;
282
283        let config =
284            self.get_container_config(image_name, env.clone(), overrides.unwrap_or_default());
285
286        // Pull the image
287        // Can we cache this?
288        let was_downloaded = self.fetch_image(image_name).await?;
289        if was_downloaded {
290            debug!("Downloaded image {}", image_name, { id: function_key });
291        }
292
293        match self
294            .docker()
295            .create_container::<String, _>(
296                Some(CreateContainerOptions {
297                    ..Default::default()
298                }),
299                config.clone(),
300            )
301            .await
302        {
303            Ok(result) => {
304                info!("Created container for image {}", image_name, { id: function_key });
305
306                Ok(DockerContainer {
307                    id: result.id,
308                    image_name: image_name.to_string(),
309                    env: env.unwrap_or_default(),
310                    docker: self.docker().clone(),
311                    config: config.clone(),
312                })
313            }
314            Err(error) => {
315                let error_message = format!(
316                    "Failed to create container for image {}, {}",
317                    image_name, error
318                );
319                info!("{}", error_message, { id: function_key });
320
321                Err(SbError::ContainerError(std::sync::Arc::new(error)))
322            }
323        }
324    }
325}