bsudlib/
bsu.rs

1use crate::config::{DiskType, CLOUD_CONFIG, SUBREGION, VM_ID};
2use crate::utils::gib_to_bytes;
3use easy_error::format_err;
4use log::{debug, error};
5use outscale_api::apis::tag_api::create_tags;
6use outscale_api::apis::volume_api::{
7    create_volume, delete_volume, link_volume, read_volumes, unlink_volume,
8};
9use outscale_api::models::{
10    CreateTagsRequest, CreateVolumeRequest, DeleteVolumeRequest, FiltersVolume, LinkVolumeRequest,
11    ReadVolumesRequest, ResourceTag, UnlinkVolumeRequest, Volume,
12};
13use std::error::Error;
14use std::path::PathBuf;
15
16use datetime::{Duration, Instant};
17use lazy_static::lazy_static;
18use std::sync::Mutex;
19use std::thread::sleep;
20use std::time;
21
22const API_LIMITER_S: u64 = 3;
23const BSU_TAG_KEY: &str = "osc.bsud.drive-name";
24const MAX_IOPS_PER_VOLUMES: usize = 13000;
25const DEFAULT_IO1_IOPS_PER_GB: usize = 100;
26
27lazy_static! {
28    pub static ref API_LIMITER: Mutex<Instant> =
29        Mutex::new(Instant::now() - Duration::of(API_LIMITER_S as i64));
30}
31
32#[derive(Debug, Default, Clone)]
33pub struct Bsu {
34    pub vm_id: Option<String>,
35    pub drive_name: String,
36    pub id: String,
37    pub size_bytes: usize,
38    pub size_gib: usize,
39    pub device_path: Option<String>,
40}
41
42impl Bsu {
43    pub fn new(volume: &Volume) -> Result<Self, Box<dyn Error>> {
44        let Some(bsu_id) = volume.volume_id.clone() else {
45            return Err(Box::new(format_err!(
46                "BSU {:?} does not have an id",
47                volume
48            )));
49        };
50        let Some(bsu_size_gib) = volume.size else {
51            return Err(Box::new(format_err!(
52                "BSU {:?} does not have a size",
53                volume
54            )));
55        };
56        let vm_id = Bsu::get_drive_linked_vm_id(volume);
57        let Some(drive_name) = Bsu::get_drive_name(volume) else {
58            Err(format_err!(
59                "Cannot extract drive name from BSU id {}",
60                bsu_id
61            ))?
62        };
63        let device_path = Bsu::get_drive_device_path(volume);
64
65        Ok(Bsu {
66            vm_id,
67            drive_name,
68            id: bsu_id,
69            size_bytes: gib_to_bytes(bsu_size_gib as usize),
70            size_gib: bsu_size_gib as usize,
71            device_path,
72        })
73    }
74
75    fn get_drive_linked_vm_id(volume: &Volume) -> Option<String> {
76        let Some(linked_volumes) = &volume.linked_volumes else {
77            return None;
78        };
79        for linked_volume in linked_volumes {
80            let Some(state) = &linked_volume.state else {
81                continue;
82            };
83            let Some(vm_id) = &linked_volume.vm_id else {
84                continue;
85            };
86            match state.as_str() {
87                "attaching" | "attached" => return Some(vm_id.clone()),
88                _ => return None,
89            };
90        }
91        None
92    }
93
94    fn get_drive_name(volume: &Volume) -> Option<String> {
95        let Some(tags) = &volume.tags else {
96            return None;
97        };
98        for tag in tags {
99            if tag.key == *BSU_TAG_KEY.to_string() {
100                return Some(tag.value.clone());
101            }
102        }
103        None
104    }
105
106    fn get_drive_device_path(volume: &Volume) -> Option<String> {
107        let Some(linked_volumes) = &volume.linked_volumes else {
108            return None;
109        };
110        linked_volumes.iter().next()?.device_name.clone()
111    }
112
113    pub fn fetch_drive(drive_name: &String) -> Result<Vec<Bsu>, Box<dyn Error>> {
114        debug!("\"{}\" drive: fetching all bsu", drive_name);
115        api_limiter()?;
116        let mut request = ReadVolumesRequest::new();
117        let mut filter = FiltersVolume::default();
118        let tag = format!("{}={}", BSU_TAG_KEY, drive_name);
119        filter.tags = Some(vec![tag]);
120        filter.volume_states = Some(vec![
121            "creating".to_string(),
122            "available".to_string(),
123            "in-use".to_string(),
124        ]);
125        request.filters = Some(Box::new(filter));
126        let response = read_volumes(&*CLOUD_CONFIG.read()?, Some(request));
127        if response.is_err() {
128            error!("read volume response: {:?}", response);
129        }
130        let response = response?;
131        let volumes = response.volumes.unwrap_or_default();
132        // Check state filtering
133        let volumes: Vec<Volume> = volumes
134            .into_iter()
135            .filter(|vol| {
136                let Some(state) = &vol.state else {
137                    return false;
138                };
139                matches!(state.as_str(), "creating" | "available" | "in-use")
140            })
141            .collect();
142        let bsu_list = volumes.iter().map(Bsu::new).collect();
143        bsu_list
144    }
145
146    pub fn detach(&self) -> Result<(), Box<dyn Error>> {
147        debug!("detaching BSU {} on vm {:?}", self.id, self.vm_id);
148        api_limiter()?;
149        let request = UnlinkVolumeRequest::new(self.id.clone());
150        let response = unlink_volume(&*CLOUD_CONFIG.read()?, Some(request));
151        if response.is_err() {
152            error!("unlink volume response: {:?}", response);
153            response?;
154        }
155        Bsu::wait_state(&self.id, "available")?;
156        Ok(())
157    }
158
159    pub fn multiple_attach(vm_id: &String, bsus: &Vec<Bsu>) -> Result<(), Box<dyn Error>> {
160        for bsu in bsus {
161            debug!("attaching BSU {} on vm {:?}", bsu.id, vm_id);
162            api_limiter()?;
163            let Some(device_name) = Bsu::find_next_available_device() else {
164                return Err(Box::new(format_err!(
165                    "cannot find available device to attach {} BSU on {} VM",
166                    bsu.id,
167                    vm_id
168                )));
169            };
170            let request = LinkVolumeRequest::new(device_name, vm_id.clone(), bsu.id.clone());
171            let response = link_volume(&*CLOUD_CONFIG.read()?, Some(request));
172            if response.is_err() {
173                error!("link volume response: {:?}", response);
174                response?;
175            }
176        }
177        Bsu::wait_states(bsus, "in-use")?;
178        Ok(())
179    }
180
181    pub fn multiple_detach(bsus: &Vec<Bsu>) -> Result<(), Box<dyn Error>> {
182        let vm_id: String = VM_ID.try_read()?.clone();
183        let mut unlinked_volumes = Vec::new();
184        for bsu in bsus {
185            debug!("detaching BSU {} on vm {}", bsu.id, vm_id);
186            let Some(ref bsu_vm_id) = bsu.vm_id else {
187                debug!(
188                    "BSU id {} seems not to be attached, ignore detaching",
189                    bsu.id
190                );
191                continue;
192            };
193            if vm_id != *bsu_vm_id {
194                debug!(
195                    "BSU {} id seems attached to vm {}, not on vm {}, ignore detaching",
196                    bsu.id, bsu_vm_id, vm_id
197                );
198                continue;
199            }
200            api_limiter()?;
201            let request = UnlinkVolumeRequest::new(bsu.id.clone());
202            let response = unlink_volume(&*CLOUD_CONFIG.read()?, Some(request));
203            if response.is_err() {
204                error!("unlink volume response: {:?}", response);
205                response?;
206            }
207            unlinked_volumes.push(bsu.clone());
208        }
209        Bsu::wait_states(&unlinked_volumes, "available")?;
210        Ok(())
211    }
212
213    pub fn delete(&self) -> Result<(), Box<dyn Error>> {
214        debug!("deleting BSU {}", self.id);
215        api_limiter()?;
216        let request = DeleteVolumeRequest::new(self.id.clone());
217        let response = delete_volume(&*CLOUD_CONFIG.read()?, Some(request));
218        if response.is_err() {
219            error!("delete volume response: {:?}", response);
220            response?;
221        }
222        Ok(())
223    }
224
225    pub fn wait_state(bsu_id: &String, desired_state: &str) -> Result<(), Box<dyn Error>> {
226        loop {
227            let volume_state = Bsu::get_state(bsu_id)?;
228            debug!(
229                "volume {} state: {}, desired state: {}",
230                bsu_id, volume_state, desired_state
231            );
232            if volume_state == desired_state {
233                return Ok(());
234            }
235        }
236    }
237
238    pub fn wait_states(bsus: &[Bsu], desired_state: &str) -> Result<(), Box<dyn Error>> {
239        let bsu_ids: Vec<String> = bsus.iter().map(|bsu| bsu.id.clone()).collect();
240        debug!("fetching multiple BSU states {:?}", &bsu_ids);
241        let mut request = ReadVolumesRequest::new();
242        let filter = FiltersVolume {
243            volume_ids: Some(bsu_ids),
244            ..Default::default()
245        };
246        request.filters = Some(Box::new(filter));
247        loop {
248            api_limiter()?;
249            let response = read_volumes(&*CLOUD_CONFIG.read()?, Some(request.clone()));
250            if response.is_err() {
251                error!("read volume response: {:?}", response);
252                continue;
253            }
254            let volumes = response?.volumes.unwrap_or_default();
255            if !volumes
256                .iter()
257                .filter_map(|volume| volume.state.clone())
258                .any(|state| state != desired_state)
259            {
260                return Ok(());
261            }
262        }
263    }
264
265    pub fn get_state(bsu_id: &String) -> Result<String, Box<dyn Error>> {
266        debug!("fetching BSU {} state", bsu_id);
267        api_limiter()?;
268        let mut request = ReadVolumesRequest::new();
269        let filter = FiltersVolume {
270            volume_ids: Some(vec![bsu_id.clone()]),
271            ..Default::default()
272        };
273        request.filters = Some(Box::new(filter));
274        let response = read_volumes(&*CLOUD_CONFIG.read()?, Some(request));
275        if response.is_err() {
276            error!("read volume response: {:?}", response);
277        }
278        let response = response?;
279        let volumes = response.volumes.unwrap_or_default();
280        let Some(volume) = volumes.into_iter().next() else {
281            return Err(Box::new(format_err!("cannot find BSU {}", bsu_id)));
282        };
283        let Some(state) = volume.state else {
284            return Err(Box::new(format_err!("cannot find state in BSU {}", bsu_id)));
285        };
286        Ok(state)
287    }
288
289    fn find_next_available_device() -> Option<String> {
290        for c1 in b'b'..=b'z' {
291            let device = format!("/dev/xvd{}", c1 as char);
292            let path = PathBuf::from(device.clone());
293            if !path.exists() {
294                return Some(device);
295            }
296        }
297        for c1 in b'b'..=b'z' {
298            for c2 in b'a'..=b'z' {
299                let device = format!("/dev/xvd{}{}", c1 as char, c2 as char);
300                let path = PathBuf::from(device.clone());
301                if !path.exists() {
302                    return Some(device);
303                }
304            }
305        }
306        None
307    }
308
309    pub fn create_gib(
310        drive_name: &String,
311        disk_type: &DiskType,
312        disk_iops_per_gib: Option<usize>,
313        disk_size_gib: usize,
314    ) -> Result<(), Box<dyn Error>> {
315        debug!(
316            "\"{}\" drive: creating BSU of type {}, size {} GiB",
317            drive_name,
318            disk_type.to_string(),
319            disk_size_gib
320        );
321        api_limiter()?;
322        let mut creation_request = CreateVolumeRequest::new(SUBREGION.read()?.clone());
323        creation_request.volume_type = Some(disk_type.to_string());
324        creation_request.iops = match disk_type {
325            DiskType::Io1 => match disk_iops_per_gib {
326                Some(disk_iops_per_gib) => {
327                    Some((disk_size_gib * disk_iops_per_gib).max(MAX_IOPS_PER_VOLUMES) as i32)
328                }
329                None => {
330                    Some((DEFAULT_IO1_IOPS_PER_GB * disk_size_gib).max(MAX_IOPS_PER_VOLUMES) as i32)
331                }
332            },
333            _ => None,
334        };
335        creation_request.size = Some(disk_size_gib as i32);
336        let create_result = match create_volume(&*CLOUD_CONFIG.read()?, Some(creation_request)) {
337            Ok(create) => create,
338            Err(err) => {
339                debug!("\"{}\" drive: during bsu creation: {:?}", drive_name, err);
340                return Err(Box::new(err));
341            }
342        };
343        let Some(bsu) = create_result.volume else {
344            return Err(Box::new(format_err!(
345                "volume creation did not provide a volume object"
346            )));
347        };
348        let Some(bsu_id) = bsu.volume_id else {
349            return Err(Box::new(format_err!(
350                "volume creation did provide a volume object but not volume id"
351            )));
352        };
353        debug!("\"{}\" drive: created BSU id {}", drive_name, bsu_id);
354        debug!("\"{}\" drive: adding tag to BSU {}", drive_name, bsu_id);
355        api_limiter()?;
356        let tag = ResourceTag::new(BSU_TAG_KEY.to_string(), drive_name.clone());
357        let tag_request = CreateTagsRequest::new(vec![bsu_id.clone()], vec![tag]);
358        if let Err(err) = create_tags(&*CLOUD_CONFIG.read()?, Some(tag_request)) {
359            debug!(
360                "\"{}\" drive: during bsu tag creation: {:?}",
361                drive_name, err
362            );
363            return Err(Box::new(err));
364        }
365        Bsu::wait_state(&bsu_id, "available")?;
366        Ok(())
367    }
368}
369
370pub fn api_limiter() -> Result<(), Box<dyn Error>> {
371    let mut limiter = API_LIMITER.lock()?;
372    let waited_time_s = Instant::now().seconds() - limiter.seconds();
373    let time_left = (API_LIMITER_S as i64 - waited_time_s).max(0) as u64;
374
375    if time_left > 0 {
376        debug!("api limiter sleeps for {} seconds", time_left);
377        sleep(time::Duration::from_secs(time_left));
378    }
379
380    *limiter = Instant::now();
381    Ok(())
382}