switchboard_container_utils/manager/
capacity.rs

1use crate::manager::*;
2use async_trait::async_trait;
3use std::{
4    collections::HashSet,
5    sync::{Arc, Mutex},
6};
7
8#[derive(Clone, Debug)]
9pub struct ContainerManagerWithCapacity {
10    pub max_active_containers: u32,
11    pub num_active_containers: u32,
12    pub containers: Arc<Mutex<HashSet<String>>>,
13
14    pub docker: Arc<Docker>,
15    pub docker_default_config: Config<String>,
16    pub docker_credentials: DockerCredentials,
17}
18
19impl ContainerManagerWithCapacity {
20    pub fn new(docker: Arc<Docker>, default_docker_config: Option<Config<String>>) -> Self {
21        Self {
22            max_active_containers: 1000,
23            num_active_containers: 0,
24            containers: Arc::new(Mutex::new(HashSet::new())),
25
26            docker,
27            docker_credentials: DockerCredentials {
28                username: Some(std::env::var("DOCKER_USER").unwrap_or(String::new())),
29                password: Some(std::env::var("DOCKER_KEY").unwrap_or(String::new())),
30                ..Default::default()
31            },
32            docker_default_config: default_docker_config.unwrap_or(get_default_docker_config()),
33        }
34    }
35
36    pub fn add_container(&mut self, id: &str) -> ContainerResult<()> {
37        let mut container_set = self.containers.lock().unwrap();
38        if container_set.insert(id.to_string()) {
39            self.num_active_containers += 1;
40            // TODO: should we error _here_ if this spils over max_active_containers? or just block new containers after this?
41        }
42
43        Ok(())
44    }
45
46    pub fn remove_container(&mut self, id: &str) -> ContainerResult<()> {
47        let mut container_set = self.containers.lock().unwrap();
48        if container_set.remove(&id.to_string()) {
49            self.num_active_containers -= 1;
50        }
51
52        Ok(())
53    }
54
55    pub async fn update_active_containers(&mut self) {
56        match self.get_active_containers().await {
57            Ok(containers) => {
58                let num_active_containers = containers.len();
59                self.num_active_containers = num_active_containers as u32;
60
61                let mut container_set = self.containers.lock().unwrap();
62                *container_set = containers
63                    .into_iter()
64                    .map(|c| c.id.unwrap_or_default())
65                    .filter(|c| !c.is_empty())
66                    .collect();
67            }
68            Err(error) => {
69                warn!("Failed to get active containers: {}", error);
70            }
71        }
72    }
73}
74
75#[async_trait]
76impl ContainerManager for ContainerManagerWithCapacity {
77    fn docker(&self) -> &Arc<Docker> {
78        &self.docker
79    }
80
81    fn docker_credentials(&self) -> &DockerCredentials {
82        &self.docker_credentials
83    }
84
85    fn docker_default_config(&self) -> &Config<String> {
86        &self.docker_default_config
87    }
88
89    async fn create_docker_container(
90        &self,
91        key: &str,
92        image_name: &str,
93        env: Option<Vec<String>>,
94        overrides: Option<DockerContainerOverrides>,
95    ) -> ContainerResult<DockerContainer> {
96        // Verify there is capacity
97        if self.containers.lock().unwrap().len() >= (self.max_active_containers as usize) {
98            return Err(SbError::CustomMessage(format!(
99                "Max active containers reached: {}",
100                self.max_active_containers
101            )));
102        }
103        ContainerManager::create_docker_container(self, key, image_name, env, overrides).await
104    }
105}