1use crate::config::{DiskType, CLOUD_CONFIG, SUBREGION, VM_ID};
2use crate::utils::gib_to_bytes;
3use easy_error::format_err;
4use log::{debug, error};
5use outscale_api::apis::tag_api::create_tags;
6use outscale_api::apis::volume_api::{
7 create_volume, delete_volume, link_volume, read_volumes, unlink_volume,
8};
9use outscale_api::models::{
10 CreateTagsRequest, CreateVolumeRequest, DeleteVolumeRequest, FiltersVolume, LinkVolumeRequest,
11 ReadVolumesRequest, ResourceTag, UnlinkVolumeRequest, Volume,
12};
13use std::error::Error;
14use std::path::PathBuf;
15
16use datetime::{Duration, Instant};
17use lazy_static::lazy_static;
18use std::sync::Mutex;
19use std::thread::sleep;
20use std::time;
21
22const API_LIMITER_S: u64 = 3;
23const BSU_TAG_KEY: &str = "osc.bsud.drive-name";
24const MAX_IOPS_PER_VOLUMES: usize = 13000;
25const DEFAULT_IO1_IOPS_PER_GB: usize = 100;
26
27lazy_static! {
28 pub static ref API_LIMITER: Mutex<Instant> =
29 Mutex::new(Instant::now() - Duration::of(API_LIMITER_S as i64));
30}
31
32#[derive(Debug, Default, Clone)]
33pub struct Bsu {
34 pub vm_id: Option<String>,
35 pub drive_name: String,
36 pub id: String,
37 pub size_bytes: usize,
38 pub size_gib: usize,
39 pub device_path: Option<String>,
40}
41
42impl Bsu {
43 pub fn new(volume: &Volume) -> Result<Self, Box<dyn Error>> {
44 let Some(bsu_id) = volume.volume_id.clone() else {
45 return Err(Box::new(format_err!(
46 "BSU {:?} does not have an id",
47 volume
48 )));
49 };
50 let Some(bsu_size_gib) = volume.size else {
51 return Err(Box::new(format_err!(
52 "BSU {:?} does not have a size",
53 volume
54 )));
55 };
56 let vm_id = Bsu::get_drive_linked_vm_id(volume);
57 let Some(drive_name) = Bsu::get_drive_name(volume) else {
58 Err(format_err!(
59 "Cannot extract drive name from BSU id {}",
60 bsu_id
61 ))?
62 };
63 let device_path = Bsu::get_drive_device_path(volume);
64
65 Ok(Bsu {
66 vm_id,
67 drive_name,
68 id: bsu_id,
69 size_bytes: gib_to_bytes(bsu_size_gib as usize),
70 size_gib: bsu_size_gib as usize,
71 device_path,
72 })
73 }
74
75 fn get_drive_linked_vm_id(volume: &Volume) -> Option<String> {
76 let Some(linked_volumes) = &volume.linked_volumes else {
77 return None;
78 };
79 for linked_volume in linked_volumes {
80 let Some(state) = &linked_volume.state else {
81 continue;
82 };
83 let Some(vm_id) = &linked_volume.vm_id else {
84 continue;
85 };
86 match state.as_str() {
87 "attaching" | "attached" => return Some(vm_id.clone()),
88 _ => return None,
89 };
90 }
91 None
92 }
93
94 fn get_drive_name(volume: &Volume) -> Option<String> {
95 let Some(tags) = &volume.tags else {
96 return None;
97 };
98 for tag in tags {
99 if tag.key == *BSU_TAG_KEY.to_string() {
100 return Some(tag.value.clone());
101 }
102 }
103 None
104 }
105
106 fn get_drive_device_path(volume: &Volume) -> Option<String> {
107 let Some(linked_volumes) = &volume.linked_volumes else {
108 return None;
109 };
110 linked_volumes.iter().next()?.device_name.clone()
111 }
112
113 pub fn fetch_drive(drive_name: &String) -> Result<Vec<Bsu>, Box<dyn Error>> {
114 debug!("\"{}\" drive: fetching all bsu", drive_name);
115 api_limiter()?;
116 let mut request = ReadVolumesRequest::new();
117 let mut filter = FiltersVolume::default();
118 let tag = format!("{}={}", BSU_TAG_KEY, drive_name);
119 filter.tags = Some(vec![tag]);
120 filter.volume_states = Some(vec![
121 "creating".to_string(),
122 "available".to_string(),
123 "in-use".to_string(),
124 ]);
125 request.filters = Some(Box::new(filter));
126 let response = read_volumes(&*CLOUD_CONFIG.read()?, Some(request));
127 if response.is_err() {
128 error!("read volume response: {:?}", response);
129 }
130 let response = response?;
131 let volumes = response.volumes.unwrap_or_default();
132 let volumes: Vec<Volume> = volumes
134 .into_iter()
135 .filter(|vol| {
136 let Some(state) = &vol.state else {
137 return false;
138 };
139 matches!(state.as_str(), "creating" | "available" | "in-use")
140 })
141 .collect();
142 let bsu_list = volumes.iter().map(Bsu::new).collect();
143 bsu_list
144 }
145
146 pub fn detach(&self) -> Result<(), Box<dyn Error>> {
147 debug!("detaching BSU {} on vm {:?}", self.id, self.vm_id);
148 api_limiter()?;
149 let request = UnlinkVolumeRequest::new(self.id.clone());
150 let response = unlink_volume(&*CLOUD_CONFIG.read()?, Some(request));
151 if response.is_err() {
152 error!("unlink volume response: {:?}", response);
153 response?;
154 }
155 Bsu::wait_state(&self.id, "available")?;
156 Ok(())
157 }
158
159 pub fn multiple_attach(vm_id: &String, bsus: &Vec<Bsu>) -> Result<(), Box<dyn Error>> {
160 for bsu in bsus {
161 debug!("attaching BSU {} on vm {:?}", bsu.id, vm_id);
162 api_limiter()?;
163 let Some(device_name) = Bsu::find_next_available_device() else {
164 return Err(Box::new(format_err!(
165 "cannot find available device to attach {} BSU on {} VM",
166 bsu.id,
167 vm_id
168 )));
169 };
170 let request = LinkVolumeRequest::new(device_name, vm_id.clone(), bsu.id.clone());
171 let response = link_volume(&*CLOUD_CONFIG.read()?, Some(request));
172 if response.is_err() {
173 error!("link volume response: {:?}", response);
174 response?;
175 }
176 }
177 Bsu::wait_states(bsus, "in-use")?;
178 Ok(())
179 }
180
181 pub fn multiple_detach(bsus: &Vec<Bsu>) -> Result<(), Box<dyn Error>> {
182 let vm_id: String = VM_ID.try_read()?.clone();
183 let mut unlinked_volumes = Vec::new();
184 for bsu in bsus {
185 debug!("detaching BSU {} on vm {}", bsu.id, vm_id);
186 let Some(ref bsu_vm_id) = bsu.vm_id else {
187 debug!(
188 "BSU id {} seems not to be attached, ignore detaching",
189 bsu.id
190 );
191 continue;
192 };
193 if vm_id != *bsu_vm_id {
194 debug!(
195 "BSU {} id seems attached to vm {}, not on vm {}, ignore detaching",
196 bsu.id, bsu_vm_id, vm_id
197 );
198 continue;
199 }
200 api_limiter()?;
201 let request = UnlinkVolumeRequest::new(bsu.id.clone());
202 let response = unlink_volume(&*CLOUD_CONFIG.read()?, Some(request));
203 if response.is_err() {
204 error!("unlink volume response: {:?}", response);
205 response?;
206 }
207 unlinked_volumes.push(bsu.clone());
208 }
209 Bsu::wait_states(&unlinked_volumes, "available")?;
210 Ok(())
211 }
212
213 pub fn delete(&self) -> Result<(), Box<dyn Error>> {
214 debug!("deleting BSU {}", self.id);
215 api_limiter()?;
216 let request = DeleteVolumeRequest::new(self.id.clone());
217 let response = delete_volume(&*CLOUD_CONFIG.read()?, Some(request));
218 if response.is_err() {
219 error!("delete volume response: {:?}", response);
220 response?;
221 }
222 Ok(())
223 }
224
225 pub fn wait_state(bsu_id: &String, desired_state: &str) -> Result<(), Box<dyn Error>> {
226 loop {
227 let volume_state = Bsu::get_state(bsu_id)?;
228 debug!(
229 "volume {} state: {}, desired state: {}",
230 bsu_id, volume_state, desired_state
231 );
232 if volume_state == desired_state {
233 return Ok(());
234 }
235 }
236 }
237
238 pub fn wait_states(bsus: &[Bsu], desired_state: &str) -> Result<(), Box<dyn Error>> {
239 let bsu_ids: Vec<String> = bsus.iter().map(|bsu| bsu.id.clone()).collect();
240 debug!("fetching multiple BSU states {:?}", &bsu_ids);
241 let mut request = ReadVolumesRequest::new();
242 let filter = FiltersVolume {
243 volume_ids: Some(bsu_ids),
244 ..Default::default()
245 };
246 request.filters = Some(Box::new(filter));
247 loop {
248 api_limiter()?;
249 let response = read_volumes(&*CLOUD_CONFIG.read()?, Some(request.clone()));
250 if response.is_err() {
251 error!("read volume response: {:?}", response);
252 continue;
253 }
254 let volumes = response?.volumes.unwrap_or_default();
255 if !volumes
256 .iter()
257 .filter_map(|volume| volume.state.clone())
258 .any(|state| state != desired_state)
259 {
260 return Ok(());
261 }
262 }
263 }
264
265 pub fn get_state(bsu_id: &String) -> Result<String, Box<dyn Error>> {
266 debug!("fetching BSU {} state", bsu_id);
267 api_limiter()?;
268 let mut request = ReadVolumesRequest::new();
269 let filter = FiltersVolume {
270 volume_ids: Some(vec![bsu_id.clone()]),
271 ..Default::default()
272 };
273 request.filters = Some(Box::new(filter));
274 let response = read_volumes(&*CLOUD_CONFIG.read()?, Some(request));
275 if response.is_err() {
276 error!("read volume response: {:?}", response);
277 }
278 let response = response?;
279 let volumes = response.volumes.unwrap_or_default();
280 let Some(volume) = volumes.into_iter().next() else {
281 return Err(Box::new(format_err!("cannot find BSU {}", bsu_id)));
282 };
283 let Some(state) = volume.state else {
284 return Err(Box::new(format_err!("cannot find state in BSU {}", bsu_id)));
285 };
286 Ok(state)
287 }
288
289 fn find_next_available_device() -> Option<String> {
290 for c1 in b'b'..=b'z' {
291 let device = format!("/dev/xvd{}", c1 as char);
292 let path = PathBuf::from(device.clone());
293 if !path.exists() {
294 return Some(device);
295 }
296 }
297 for c1 in b'b'..=b'z' {
298 for c2 in b'a'..=b'z' {
299 let device = format!("/dev/xvd{}{}", c1 as char, c2 as char);
300 let path = PathBuf::from(device.clone());
301 if !path.exists() {
302 return Some(device);
303 }
304 }
305 }
306 None
307 }
308
309 pub fn create_gib(
310 drive_name: &String,
311 disk_type: &DiskType,
312 disk_iops_per_gib: Option<usize>,
313 disk_size_gib: usize,
314 ) -> Result<(), Box<dyn Error>> {
315 debug!(
316 "\"{}\" drive: creating BSU of type {}, size {} GiB",
317 drive_name,
318 disk_type.to_string(),
319 disk_size_gib
320 );
321 api_limiter()?;
322 let mut creation_request = CreateVolumeRequest::new(SUBREGION.read()?.clone());
323 creation_request.volume_type = Some(disk_type.to_string());
324 creation_request.iops = match disk_type {
325 DiskType::Io1 => match disk_iops_per_gib {
326 Some(disk_iops_per_gib) => {
327 Some((disk_size_gib * disk_iops_per_gib).max(MAX_IOPS_PER_VOLUMES) as i32)
328 }
329 None => {
330 Some((DEFAULT_IO1_IOPS_PER_GB * disk_size_gib).max(MAX_IOPS_PER_VOLUMES) as i32)
331 }
332 },
333 _ => None,
334 };
335 creation_request.size = Some(disk_size_gib as i32);
336 let create_result = match create_volume(&*CLOUD_CONFIG.read()?, Some(creation_request)) {
337 Ok(create) => create,
338 Err(err) => {
339 debug!("\"{}\" drive: during bsu creation: {:?}", drive_name, err);
340 return Err(Box::new(err));
341 }
342 };
343 let Some(bsu) = create_result.volume else {
344 return Err(Box::new(format_err!(
345 "volume creation did not provide a volume object"
346 )));
347 };
348 let Some(bsu_id) = bsu.volume_id else {
349 return Err(Box::new(format_err!(
350 "volume creation did provide a volume object but not volume id"
351 )));
352 };
353 debug!("\"{}\" drive: created BSU id {}", drive_name, bsu_id);
354 debug!("\"{}\" drive: adding tag to BSU {}", drive_name, bsu_id);
355 api_limiter()?;
356 let tag = ResourceTag::new(BSU_TAG_KEY.to_string(), drive_name.clone());
357 let tag_request = CreateTagsRequest::new(vec![bsu_id.clone()], vec![tag]);
358 if let Err(err) = create_tags(&*CLOUD_CONFIG.read()?, Some(tag_request)) {
359 debug!(
360 "\"{}\" drive: during bsu tag creation: {:?}",
361 drive_name, err
362 );
363 return Err(Box::new(err));
364 }
365 Bsu::wait_state(&bsu_id, "available")?;
366 Ok(())
367 }
368}
369
370pub fn api_limiter() -> Result<(), Box<dyn Error>> {
371 let mut limiter = API_LIMITER.lock()?;
372 let waited_time_s = Instant::now().seconds() - limiter.seconds();
373 let time_left = (API_LIMITER_S as i64 - waited_time_s).max(0) as u64;
374
375 if time_left > 0 {
376 debug!("api limiter sleeps for {} seconds", time_left);
377 sleep(time::Duration::from_secs(time_left));
378 }
379
380 *limiter = Instant::now();
381 Ok(())
382}