1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
use std::env;
use std::fs::{OpenOptions, remove_file, rename};
use std::io::{self, Write};
use std::path::{Path, PathBuf};
use std::ptr;

use libc;
use serde_json::{to_string, to_value};

use error::{Error, ErrorEnum};
use name::Name;
use value::{Value, RawType};
use json::JsonName;
use {ActiveCollection};


/// A trait used to enumerate a collection
pub trait Visitor<'a> {
    /// Report a metric that belongs to a collection
    fn metric(&mut self, name: &Name, value: &'a Value);
}

/// A collection of metrics
///
/// A single collection is usually exported by a process but a vector or
/// slice of collections is also a collections, so they can be combined easily.
pub trait Collection {
    /// Visit the whole collection, use visitor to report a metric
    fn visit<'x>(&'x self, visitor: &mut Visitor<'x>);
}

#[cfg(unix)]
fn configured_path(warn: bool) -> Option<(PathBuf, String)> {
    env::var_os("CANTAL_PATH").and_then(|path| {
        let path = Path::new(&path);
        path.parent()
        .and_then(|p| path.file_name()
            .and_then(|x| x.to_str())
            .map(|n| (p.to_path_buf(), n.to_string())))
        .or_else(|| {
            if warn {
                error!("CANTAL_PATH is present, but can't be split into \
                    a directory and a filename. \
                    Probably contains som non-utf-8 characters.");
            }
            None
        })
    })
}

#[cfg(unix)]
pub fn path_from_env(warn: bool) -> (PathBuf, String) {
    use libc::{getpid, getuid};

    if let Some((dir, name)) = configured_path(warn) {
        (dir, name)
    } else {
        let (dir, name) = if let Some(dir) = env::var_os("XDG_RUNTIME_DIR") {
            (PathBuf::from(&dir), format!("cantal.{}", unsafe { getpid() }))
        } else {
            (PathBuf::from("/tmp"),
             format!("cantal.{}.{}", unsafe { getuid() }, unsafe { getpid() }))
        };
        if warn {
            warn!(
                "No CANTAL_PATH is set in the environment, using {:?}. \
                 The cantal-agent will be unable to discover it.",
                dir.join(&name));
        }
        (dir, name)
    }
}

fn remove_if_exists(path: &Path) -> Result<(), Error> {
    use self::ErrorEnum::*;

    match remove_file(path) {
        Ok(()) => Ok(()),
        Err(ref e) if e.kind() == io::ErrorKind::NotFound => Ok(()),
        Err(e) => Err(Delete(path.to_path_buf(), e).into()),
    }
}

/// Start publishing metrics
#[cfg(unix)]
pub fn start<'x, T: Collection + ?Sized>(coll: &'x T)
    -> Result<ActiveCollection<'x>, Error>
{
    use std::os::unix::io::AsRawFd;
    use self::ErrorEnum::*;

    struct Metric<'a> {
        name: String,
        raw_type: RawType,
        size: usize,
        pointer: &'a Value,
    }
    struct ListVisitor<'a, 'b: 'a, 'c: 'a>(&'a mut Vec<Metric<'b>>,
        &'a mut Vec<&'c Value>, &'a mut usize);
    impl<'a, 'b: 'a, 'c: 'a> Visitor<'b> for ListVisitor<'a, 'b, 'c> {
        fn metric(&mut self, name: &Name, value: &'b Value)
        {
            self.0.push(Metric {
                // must have all keys sorted
                name: to_string(&to_value(JsonName(name))
                    .expect("can always serialize"))
                    .expect("can always serialize"),
                raw_type: value.raw_type(),
                size: value.raw_size(),
                pointer: value,
            });
            *self.2 += value.raw_size();
        }
    }

    let mut values_size = 0;
    let mut pointers = Vec::new();
    let mut all_metrics = Vec::with_capacity(100);
    coll.visit(&mut ListVisitor(&mut all_metrics, &mut pointers,
                                &mut values_size));

    // TODO(tailhook) sort metrics by size class
    // TODO(tailhook) find out real page size
    values_size = (values_size + 4095) & !4095;

    let (dir, name) = path_from_env(true);
    let tmp_path = dir.join(format!("{}.tmp", name));
    let values_path = dir.join(format!("{}.values", name));
    let meta_path = dir.join(format!("{}.meta", name));
    remove_if_exists(&tmp_path)?;
    remove_if_exists(&values_path)?;
    remove_if_exists(&meta_path)?;

    let values_file = OpenOptions::new()
        .read(true).write(true).create_new(true)
        .open(&tmp_path)
        .map_err(|e| Create(tmp_path.clone(), e))?;
    values_file.set_len(values_size as u64)
        .map_err(|e| Create(tmp_path.clone(), e))?;
    let ptr = unsafe {
        libc::mmap(ptr::null_mut(), values_size,
            libc::PROT_READ | libc::PROT_WRITE,
            libc::MAP_SHARED,
            values_file.as_raw_fd(),
            0)
    };
    if ptr == libc::MAP_FAILED {
        let err = io::Error::last_os_error();
        remove_file(&tmp_path).map_err(|e| {
            error!("Can't unlink path {:?}: {}", tmp_path, e);
        }).ok();
        return Err(Mmap(tmp_path, err, values_size).into());
    }

    // Create our unmap/reset guard before setting actual pointers
    let result = ActiveCollection {
        meta_path: meta_path,
        values_path: values_path,
        metrics: pointers,
        mmap: ptr,
        mmap_size: values_size,
    };

    let mut offset = 0;
    let mut metadata_buf = String::with_capacity(4096);
    let mut pointers = Vec::new();
    for metric in all_metrics {
        use std::fmt::Write;
        write!(metadata_buf, "{main_type} {size}{space}{type_suffix}: {key}\n",
            main_type=metric.raw_type.main_type(),
            size=metric.size,
            space=if metric.raw_type.type_suffix().is_some() { " " } else {""},
            type_suffix=metric.raw_type.type_suffix().unwrap_or(""),
            key=metric.name)
            .expect("Can always write into buffer");
        metric.pointer.copy_assign(unsafe { ptr.offset(offset as isize) });
        offset += metric.size;
        pointers.push(metric.pointer);
    }

    rename(&tmp_path, &result.values_path)
        .map_err(|e| Rename(result.values_path.clone(), e))?;
    OpenOptions::new().write(true).create_new(true)
        .open(&tmp_path)
        .and_then(|mut f| f.write_all(metadata_buf.as_bytes()))
        .map_err(|e| WriteMetadata(tmp_path.clone(), e))?;
    rename(&tmp_path, &result.meta_path)
        .map_err(|e| Rename(result.meta_path.clone(), e))?;

    Ok(result)
}

/// Start publishing metrics
///
/// Currently it's noop on windows
#[cfg(windows)]
pub fn start<'x, T: Collection + ?Sized>(coll: &'x T)
    -> Result<ActiveCollection<'x>, Error>
{
    Ok(ActiveCollection {
        phantom: ::std::marker::PhantomData,
    })
}

#[cfg(unix)]
impl<'a> Drop for ActiveCollection<'a> {
    fn drop(&mut self) {
        for m in &self.metrics {
            m.reset();
        }
        let rc = unsafe { libc::munmap(self.mmap, self.mmap_size) };
        if rc != 0 {
            let err = io::Error::last_os_error();
            error!("Can't unmap file {:?}: {}", self.values_path, err);
        }
        remove_file(&self.values_path).map_err(|e| {
            error!("Can't unlink path {:?}: {}", self.values_path, e);
        }).ok();
        remove_file(&self.meta_path).map_err(|e| {
            error!("Can't unlink path {:?}: {}", self.meta_path, e);
        }).ok();
    }
}