libstorage/
openstack.rs

1/**
2* Copyright 2019 Comcast Cable Communications Management, LLC
3*
4* Licensed under the Apache License, Version 2.0 (the "License");
5* you may not use this file except in compliance with the License.
6* You may obtain a copy of the License at
7*
8* http://www.apache.org/licenses/LICENSE-2.0
9*
10* Unless required by applicable law or agreed to in writing, software
11* distributed under the License is distributed on an "AS IS" BASIS,
12* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13* See the License for the specific language governing permissions and
14* limitations under the License.
15*
16* SPDX-License-Identifier: Apache-2.0
17*/
18use std::{collections::HashMap, fmt, fmt::Debug, str::FromStr};
19
20use crate::error::{MetricsResult, StorageError};
21use crate::ir::{TsPoint, TsValue};
22use crate::IntoPoint;
23
24use log::debug;
25use reqwest::{header::HeaderName, header::HeaderValue, StatusCode};
26use serde::de::DeserializeOwned;
27use serde_json::json;
28use serde_repr::{Deserialize_repr, Serialize_repr};
29
30#[derive(Clone, Deserialize, Debug)]
31pub struct OpenstackConfig {
32    /// The openstack endpoint to use
33    pub endpoint: String,
34    pub port: Option<u16>,
35    pub user: String,
36    /// This gets replaced with the token at runtime
37    pub password: String,
38    /// Openstack domain to use
39    pub domain: String,
40    pub project_name: String,
41    /// Optional certificate file to use against the server
42    /// der encoded
43    pub certificate: Option<String>,
44    pub region: String,
45}
46
47pub struct Openstack {
48    client: reqwest::Client,
49    config: OpenstackConfig,
50}
51
52#[derive(Deserialize, Debug)]
53pub struct Domain {
54    pub description: String,
55    pub enabled: bool,
56    pub id: String,
57    pub name: String,
58}
59
60#[derive(Deserialize, Debug)]
61pub struct Domains {
62    pub domains: Vec<Domain>,
63}
64
65#[derive(Serialize_repr, Deserialize_repr, PartialEq, Debug)]
66#[repr(u8)]
67pub enum PowerState {
68    NoState = 0,
69    Running = 1,
70    Paused = 3,
71    Shutdown = 4,
72    Crashed = 6,
73    Suspended = 7,
74}
75
76impl fmt::Display for PowerState {
77    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
78        match self {
79            PowerState::NoState => write!(f, "no_state"),
80            PowerState::Running => write!(f, "running"),
81            PowerState::Paused => write!(f, "paused"),
82            PowerState::Shutdown => write!(f, "shutdown"),
83            PowerState::Crashed => write!(f, "crashed"),
84            PowerState::Suspended => write!(f, "suspended"),
85        }
86    }
87}
88
89#[derive(Deserialize, Debug)]
90pub struct Project {
91    pub is_domain: Option<bool>,
92    pub description: Option<String>,
93    pub domain_id: String,
94    pub enabled: bool,
95    pub id: String,
96    pub name: String,
97    pub parent_id: Option<String>,
98    pub tags: Option<Vec<String>>,
99}
100
101#[derive(Deserialize, Debug)]
102pub struct Projects {
103    pub projects: Vec<Project>,
104}
105
106#[derive(Deserialize, Debug)]
107pub struct Server {
108    #[serde(rename = "OS-EXT-AZ:availability_zone")]
109    az_availability_zone: String,
110    #[serde(rename = "OS-EXT-SRV-ATTR:host")]
111    host: Option<String>,
112    #[serde(rename = "OS-EXT-SRV-ATTR:hostname")]
113    hostname: Option<String>,
114    #[serde(rename = "OS-EXT-SRV-ATTR:hypervisor_hostname")]
115    hypervisor_hostname: Option<String>,
116    #[serde(rename = "OS-EXT-SRV-ATTR:instance_name")]
117    instance_name: String,
118    #[serde(rename = "OS-EXT-STS:power_state")]
119    power_state: PowerState,
120    #[serde(rename = "OS-EXT-STS:task_state")]
121    task_state: Option<String>,
122    #[serde(rename = "OS-EXT-STS:vm_state")]
123    vm_state: String,
124    #[serde(rename = "OS-SRV-USG:launched_at")]
125    launched_at: Option<String>,
126    #[serde(rename = "OS-SRV-USG:terminated_at")]
127    terminated_at: Option<String>,
128    created: String,
129    description: Option<String>,
130    #[serde(rename = "hostId")]
131    host_id: String,
132    host_status: Option<String>,
133    id: String,
134    name: String,
135    #[serde(rename = "os-extended-volumes:volumes_attached")]
136    volumes_attached: Vec<HashMap<String, String>>,
137    #[serde(rename = "os-extended-volumes:volumes_attached.id")]
138    volumes_attached_id: Option<String>,
139    progress: Option<u64>,
140    status: String,
141    tenant_id: String,
142    updated: String,
143    user_id: String,
144}
145
146impl IntoPoint for Server {
147    fn into_point(&self, name: Option<&str>, is_time_series: bool) -> Vec<TsPoint> {
148        let mut p = TsPoint::new(name.unwrap_or("openstack_server"), is_time_series);
149        p.add_tag(
150            "az_availability_zone",
151            TsValue::String(self.az_availability_zone.clone()),
152        );
153        if let Some(host) = &self.host {
154            p.add_tag("host", TsValue::String(host.clone()));
155        }
156        if let Some(hostname) = &self.hostname {
157            p.add_tag("hostname", TsValue::String(hostname.clone()));
158        }
159        if let Some(hypervisor_hostname) = &self.hypervisor_hostname {
160            p.add_tag(
161                "hypervisor_hostname",
162                TsValue::String(hypervisor_hostname.clone()),
163            );
164        }
165        p.add_tag("instance_name", TsValue::String(self.instance_name.clone()));
166        p.add_tag(
167            "power_state",
168            TsValue::String(format!("{}", self.power_state)),
169        );
170        if let Some(task_state) = &self.task_state {
171            p.add_tag("task_state", TsValue::String(task_state.clone()));
172        }
173        p.add_tag("vm_state", TsValue::String(self.vm_state.clone()));
174        if let Some(launched_at) = &self.launched_at {
175            p.add_tag("launched_at", TsValue::String(launched_at.clone()));
176        }
177        if let Some(terminated_at) = &self.terminated_at {
178            p.add_tag("terminated_at", TsValue::String(terminated_at.clone()));
179        }
180        p.add_tag("created", TsValue::String(self.created.clone()));
181        if let Some(description) = &self.description {
182            p.add_tag("description", TsValue::String(description.clone()));
183        }
184        p.add_tag("host_id", TsValue::String(self.host_id.clone()));
185        if let Some(host_status) = &self.host_status {
186            p.add_tag("host_status", TsValue::String(host_status.clone()));
187        }
188        p.add_tag("id", TsValue::String(self.id.clone()));
189        p.add_tag("name", TsValue::String(self.name.clone()));
190        p.add_tag(
191            "volumes_attached",
192            TsValue::StringVec(
193                self.volumes_attached
194                    .iter()
195                    // Only save the volume_id
196                    .flat_map(|hashmap| hashmap.iter().map(|(_k, v)| v.clone()))
197                    .collect(),
198            ),
199        );
200        if let Some(volumes_attached_id) = &self.volumes_attached_id {
201            p.add_tag(
202                "volumes_attached_id",
203                TsValue::String(volumes_attached_id.clone()),
204            );
205        }
206        if let Some(progress) = &self.progress {
207            p.add_field("progress", TsValue::Long(*progress));
208        }
209        p.add_tag("status", TsValue::String(self.status.clone()));
210        p.add_tag("tenant_id", TsValue::String(self.tenant_id.clone()));
211        p.add_tag("updated", TsValue::String(self.updated.clone()));
212        p.add_tag("user_id", TsValue::String(self.user_id.clone()));
213
214        vec![p]
215    }
216}
217
218#[derive(Deserialize, Debug)]
219pub struct Servers {
220    pub servers: Vec<Server>,
221}
222
223impl IntoPoint for Servers {
224    fn into_point(&self, name: Option<&str>, is_time_series: bool) -> Vec<TsPoint> {
225        self.servers
226            .iter()
227            .flat_map(|s| s.into_point(name, is_time_series))
228            .collect()
229    }
230}
231
232#[derive(Deserialize, Debug)]
233pub struct UserRoot {
234    pub user: User,
235}
236
237#[derive(Deserialize, Debug)]
238pub struct User {
239    pub default_project_id: Option<String>,
240    pub domain_id: String,
241    pub enabled: Option<bool>,
242    pub id: String,
243    pub name: String,
244    pub email: Option<String>,
245    pub password_expires_at: Option<String>,
246}
247
248#[derive(Deserialize, Debug)]
249pub struct VolumesAttachment {
250    pub server_id: String,
251    pub attachment_id: String,
252    pub host_name: Option<String>,
253    pub volume_id: String,
254    pub device: String,
255    pub id: String,
256}
257
258#[derive(Deserialize, Debug)]
259pub struct VolumesMetadatum {
260    pub readonly: Option<String>,
261    pub attached_mode: Option<String>,
262}
263
264#[derive(Deserialize, Debug)]
265pub struct VolumeImageMetadatum {
266    pub kernel_id: Option<String>,
267    pub checksum: Option<String>,
268    pub min_ram: Option<String>,
269    pub ramdisk_id: Option<String>,
270    pub disk_format: Option<String>,
271    pub image_name: Option<String>,
272    pub image_id: Option<String>,
273    pub container_format: Option<String>,
274    pub min_disk: Option<String>,
275    pub size: Option<String>,
276}
277
278#[derive(Deserialize, Debug, IntoPoint)]
279pub struct Volume {
280    pub migration_status: Option<String>,
281    pub attachments: Vec<VolumesAttachment>,
282    pub availability_zone: String,
283    pub os_vol_host_attr_host: Option<String>,
284    pub encrypted: bool,
285    pub replication_status: String,
286    pub snapshot_id: Option<String>,
287    pub id: String,
288    pub size: u64, // Size is in GB
289    pub user_id: String,
290    #[serde(rename = "os-vol-tenant-attr:tenant_id")]
291    pub os_vol_tenant_attr_tenant_id: String,
292    pub os_vol_mig_status_attr_migstat: Option<String>,
293    pub metadata: VolumesMetadatum,
294    pub status: String,
295    pub description: Option<String>,
296    pub multiattach: bool,
297    pub source_volid: Option<String>,
298    pub consistencygroup_id: Option<String>,
299    pub os_vol_mig_status_attr_name_id: Option<String>,
300    pub name: Option<String>,
301    pub bootable: String,
302    pub created_at: String,
303    pub volume_type: Option<String>,
304    pub volume_image_metadata: Option<VolumeImageMetadatum>,
305}
306
307#[derive(Deserialize, Debug)]
308pub struct Volumes {
309    pub volumes: Vec<Volume>,
310    pub count: Option<u64>,
311}
312
313impl IntoPoint for Volumes {
314    fn into_point(&self, name: Option<&str>, is_time_series: bool) -> Vec<TsPoint> {
315        let mut points: Vec<TsPoint> = Vec::new();
316
317        for v in &self.volumes {
318            points.extend(v.into_point(name, is_time_series));
319        }
320
321        points
322    }
323}
324
325impl Openstack {
326    pub fn new(client: &reqwest::Client, config: OpenstackConfig) -> Self {
327        Openstack {
328            client: client.clone(),
329            config,
330        }
331    }
332
333    /// get the config pass
334    pub fn get_pass(&self) -> String {
335        self.config.password.clone()
336    }
337
338    fn get<T>(&self, api: &str) -> MetricsResult<T>
339    where
340        T: DeserializeOwned + Debug,
341    {
342        let url = match self.config.port {
343            Some(port) => format!("https://{}:{}/{}", self.config.endpoint, port, api),
344            None => format!("https://{}/{}", self.config.endpoint, api),
345        };
346
347        // This could be more efficient by deserializing immediately but when errors
348        // occur it can be really difficult to debug.
349        let res: Result<String, reqwest::Error> = loop {
350            match self
351                .client
352                .get(&url)
353                .header(
354                    HeaderName::from_str("X-Auth-Token")?,
355                    HeaderValue::from_str(&self.config.password)?,
356                )
357                .send()
358            {
359                Ok(status) => match status.error_for_status() {
360                    Ok(mut s) => break s.text(),
361                    Err(e) => match e.status() {
362                        Some(reqwest::StatusCode::REQUEST_TIMEOUT) => {}
363                        Some(reqwest::StatusCode::GATEWAY_TIMEOUT) => {}
364                        _ => return Err(StorageError::from(e)),
365                    },
366                },
367                Err(e) => {
368                    if !e.is_timeout() {
369                        return Err(StorageError::from(e));
370                    }
371                }
372            }
373        };
374        debug!("raw response: {:?}", res);
375        let res = serde_json::from_str(&res?);
376        Ok(res?)
377    }
378
379    // Connect to the metadata server and request a new api token
380    pub fn get_api_token(&mut self) -> MetricsResult<()> {
381        let auth_json = json!({
382            "auth": {
383                "identity": {
384                    "methods": ["password"],
385                    "password": {
386                        "user": {
387                            "name": self.config.user,
388                            "domain": {
389                                "name": self.config.domain,
390                            },
391                            "password": self.config.password,
392                        }
393                    }
394                },
395               "scope": {
396                   "project": {
397                       "name": self.config.project_name,
398                       "domain": {
399                           "name": "comcast",
400                       }
401                   }
402               }
403            }
404        });
405        let url = match self.config.port {
406            Some(port) => format!("https://{}:{}/v3/auth/tokens", self.config.endpoint, port),
407            None => format!("https://{}/v3/auth/tokens", self.config.endpoint),
408        };
409        let resp: reqwest::Response = loop {
410            match self.client.post(&url).json(&auth_json).send() {
411                Ok(status) => match status.error_for_status() {
412                    Ok(resp) => break resp,
413                    Err(e) => match e.status() {
414                        Some(reqwest::StatusCode::REQUEST_TIMEOUT) => {}
415                        Some(reqwest::StatusCode::GATEWAY_TIMEOUT) => {}
416                        _ => return Err(StorageError::from(e)),
417                    },
418                },
419                Err(e) => {
420                    if !e.is_timeout() {
421                        return Err(StorageError::from(e));
422                    }
423                }
424            }
425        };
426        match resp.status() {
427            StatusCode::OK | StatusCode::CREATED => {
428                // ok we're good
429                let h = resp.headers();
430
431                let token = h.get("X-Subject-Token");
432                if token.is_none() {
433                    return Err(StorageError::new(
434                        "openstack token not found in header".to_string(),
435                    ));
436                }
437                self.config.password = token.unwrap().to_str()?.to_owned();
438                Ok(())
439            }
440            StatusCode::UNAUTHORIZED => Err(StorageError::new(format!(
441                "Invalid credentials for {}",
442                self.config.user
443            ))),
444            _ => Err(StorageError::new(format!(
445                "Unknown error: {}",
446                resp.status()
447            ))),
448        }
449    }
450
451    pub fn list_domains(&self) -> MetricsResult<Vec<Domain>> {
452        let domains: Domains = self.get("v3/domains")?;
453        Ok(domains.domains)
454    }
455
456    pub fn list_projects(&self) -> MetricsResult<Vec<Project>> {
457        let projects: Projects = self.get("v3/projects")?;
458        Ok(projects.projects)
459    }
460
461    pub fn list_servers(&self) -> MetricsResult<Vec<TsPoint>> {
462        let servers: Servers = self.get("v2.1/servers/detail")?;
463        Ok(servers.into_point(Some("openstack_server"), false))
464    }
465
466    pub fn list_volumes(&self, project_id: &str) -> MetricsResult<Vec<TsPoint>> {
467        let volumes: Volumes = self.get(&format!(
468            "v3/{}/volumes/detail?all_tenants=True",
469            project_id
470        ))?;
471
472        Ok(volumes.into_point(Some("openstack_volume"), true))
473    }
474
475    pub fn get_user(&self, user_id: &str) -> MetricsResult<User> {
476        let user: UserRoot = self.get(&format!("/v3/users/{}", user_id))?;
477        Ok(user.user)
478    }
479}
480
481#[test]
482fn test_list_openstack_servers() {
483    use std::fs::File;
484    use std::io::Read;
485
486    let mut f = File::open("tests/openstack/servers.json").unwrap();
487    let mut buff = String::new();
488    f.read_to_string(&mut buff).unwrap();
489
490    let i: Servers = serde_json::from_str(&buff).unwrap();
491    println!("result: {:#?}", i);
492    println!("result points: {:#?}", i.into_point(None, false));
493}
494#[test]
495fn test_list_openstack_volumes() {
496    use std::fs::File;
497    use std::io::Read;
498
499    let mut f = File::open("tests/openstack/volumes.json").unwrap();
500    let mut buff = String::new();
501    f.read_to_string(&mut buff).unwrap();
502
503    let i: Volumes = serde_json::from_str(&buff).unwrap();
504    println!("result: {:#?}", i);
505}
506#[test]
507fn test_get_openstack_user() {
508    use std::fs::File;
509    use std::io::Read;
510
511    let mut f = File::open("tests/openstack/user.json").unwrap();
512    let mut buff = String::new();
513    f.read_to_string(&mut buff).unwrap();
514
515    let i: UserRoot = serde_json::from_str(&buff).unwrap();
516    println!("result: {:#?}", i);
517}