1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
use ::std;
use std::cell::Cell;
use std::fmt;
use std::io;
use std::ops::Deref;
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, ATOMIC_USIZE_INIT};
use std::sync::atomic::Ordering::{SeqCst, Relaxed};

#[cfg(unix)] use libc;

/// Possible error with `libc::(posix_)madvise()`, or other platform
/// equivalent.
///
/// Implements `std::error::Error` and may be converted to an
/// `io::Error(Other)`.
#[derive(Debug)]
pub struct MemAdviseError {
    ecode: i32,
}

impl From<MemAdviseError> for io::Error {
    fn from(me: MemAdviseError) -> io::Error {
        io::Error::new(io::ErrorKind::Other, me)
    }
}

impl fmt::Display for MemAdviseError {
    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
        write!(f, "libc::posix_madvise error return code {}", self.ecode)
    }
}

impl std::error::Error for MemAdviseError {
    fn description(&self) -> &str { "MemAdviseError" }
    fn cause(&self) -> Option<&std::error::Error> { None }
}

/// Memory access pattern advice.
///
/// This encodes a subset of POSIX.1-2001 `madvise` flags, and is intending to
/// be a workable cross platform abstraction. In particular, the values do not
/// correspond to any libc or other lib constants, and are arranged in
/// ascending order of minimal to maximum *priority* in the presence of
/// concurrent interest in the same region.
#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
#[repr(usize)]
pub enum MemAdvice {
    Normal     = 0,       // Not counted
    Random     = 0x003FF, // Bits  1-10 mask value
    Sequential = 0xFFC00, // Bits 11-20 mask value
}

/// Wrapper over a byte buffer, supporting concurrent memory access advice,
/// where the highest priority advice wins.
///
/// This is likely to only be useful for memory mapped regions and memory
/// mapped files in particular.
///
/// Uses an internal `Arc` over shared state, so cloning the handle is
/// inexpensive. Each new and cloned handle starts with the implicit
/// `MemAdvice::Normal`. The shared state, a list of advisor interest counts,
/// is maintained as a single atomic integer, for minimal overhead. Each
/// `MemAdvice` level is allocated 10-bits or up to 1,023 MemHandle
/// advisors. Any advice beyond this capacity or after an error is returned
/// from `advise`, may be ignored, favoring the prior highest priority
/// advice.
#[derive(Debug)]
pub struct MemHandle<T>
where T: Deref<Target=[u8]>
{
    mem: Arc<Mem<T>>,
    advice: Cell<MemAdvice>,
}

impl<T> MemHandle<T>
where T: Deref<Target=[u8]>
{
    pub fn new(mem: T) -> MemHandle<T> {
        let mem = Arc::new(Mem { mem, advisors: ATOMIC_USIZE_INIT });
        MemHandle { mem, advice: Cell::new(MemAdvice::Normal) }
    }

    /// Advise on access plans for the underlying memory. There may be
    /// multiple cloned handles to the same memory region, so the advice is
    /// only relayed to the operating system if it has greater priority than
    /// any other advice made via another surviving handle. On success,
    /// returns the MemAdvice as relayed, or a snapshot of the current,
    /// highest priority advice. Returns an error if the underlying system
    /// call fails.
    pub fn advise(&self, advice: MemAdvice)
        -> Result<MemAdvice, MemAdviseError>
    {
        let prior = self.advice.replace(advice);
        if advice == prior {
            Ok(prior)
        } else {
            self.mem.adjust_advice(prior, advice)
        }
    }
}

impl<T> Clone for MemHandle<T>
where T: Deref<Target=[u8]>
{
    fn clone(&self) -> MemHandle<T> {
        MemHandle { mem: self.mem.clone(), advice: Cell::new(MemAdvice::Normal) }
    }
}

impl<T> Drop for MemHandle<T>
where T: Deref<Target=[u8]>
{
    fn drop(&mut self) {
        let advice = self.advice.get();
        if  advice != MemAdvice::Normal {
            self.mem.adjust_advice(advice, MemAdvice::Normal).ok();
        }
    }
}

impl<T> Deref for MemHandle<T>
where T: Deref<Target=[u8]>
{
    type Target = [u8];

    fn deref(&self) -> &[u8] {
        &self.mem
    }
}

#[derive(Debug)]
struct Mem<T>
where T: Deref<Target=[u8]>
{
    mem: T,
    advisors: AtomicUsize,
}

impl<T> Mem<T>
where T: Deref<Target=[u8]>
{
    fn adjust_advice(&self, prior: MemAdvice, advice: MemAdvice)
        -> Result<MemAdvice, MemAdviseError>
    {
        debug_assert!(prior != advice);
        let mut adv = self.advisors.load(Relaxed);
        loop {
            let old_top = top_most(adv);
            let new_adv = decr_advisors(adv, prior);
            let new_adv = incr_advisors(new_adv, advice);
            let new_top = top_most(new_adv);
            match self.advisors.compare_exchange_weak(
                adv, new_adv, SeqCst, Relaxed
            ) {
                Ok(_) => {
                    if new_top != old_top {
                        // Note, may fail after adjustments
                        advise(&self.mem, new_top)?;
                        return Ok(new_top);
                    }
                    return Ok(new_top);
                }
                Err(x) => adv = x
            }
        }
    }
}

impl<T> Deref for Mem<T>
where T: Deref<Target=[u8]>
{
    type Target = [u8];

    fn deref(&self) -> &[u8] {
        &self.mem
    }
}

// Given packed advisors state, and prior advice, return decremented state.
fn decr_advisors(mut advisors: usize, prior: MemAdvice) -> usize {
    if prior != MemAdvice::Normal {
        let mut p = advisors & (prior as usize);
        advisors -= p;
        if prior == MemAdvice::Sequential { p >>= 10; }
        if p > 0 { p -= 1; }
        if prior == MemAdvice::Sequential { p <<= 10; }
        advisors |= p;
    }
    advisors
}

// Given packed advisors state, and new advice, return incremented state.
fn incr_advisors(mut advisors: usize, advice: MemAdvice) -> usize {
    let mut cur = advisors & (advice as usize);
    advisors -= cur;
    match advice {
        MemAdvice::Normal => {
            advisors
        }
        MemAdvice::Random => {
            if cur < 0x3FF { cur += 1; }
            advisors | cur
        }
        MemAdvice::Sequential => {
            cur >>= 10;
            if cur < 0x3FF { cur += 1; }
            cur <<= 10;
            advisors | cur
        }
    }
}

// Return top most advice from advisors state.
fn top_most(advisors: usize) -> MemAdvice {
    if (advisors & (MemAdvice::Sequential as usize)) > 0 {
        MemAdvice::Sequential
    } else if (advisors & (MemAdvice::Random as usize)) > 0 {
        MemAdvice::Random
    } else {
        MemAdvice::Normal
    }
}

// Advise the \*nix OS about memory access plans.
#[cfg(unix)]
fn advise<T>(mem: &T, advice: MemAdvice) -> Result<(), MemAdviseError>
    where T: Deref<Target=[u8]>
{
    let flags: libc::c_int = match advice {
        MemAdvice::Normal       => libc::POSIX_MADV_NORMAL,
        MemAdvice::Random       => libc::POSIX_MADV_RANDOM,
        MemAdvice::Sequential   => libc::POSIX_MADV_SEQUENTIAL,
    };

    let ptr = &(mem[0]) as *const u8 as *mut libc::c_void;
    let res = unsafe { libc::posix_madvise(ptr, mem.len(), flags) };
    if res == 0 {
        Ok(())
    } else {
        Err(MemAdviseError { ecode: res })
    }
}

// RAM access advice, currently a no-op for non-\*nix OS
#[cfg(not(unix))]
fn advise<T>(_mem: &T, _advice: MemAdvice) -> Result<(), MemAdviseError>
    where T: Deref<Target=[u8]>
{
    Ok(())
}

#[cfg(test)]
mod tests {
    use ::mem::MemHandle;

    #[test]
    fn test_with_any_deref() {
        let _m = MemHandle::new(vec![0u8; 1024]);
        // Note the would typically fail for any actual use of advise (not
        // properly aligned, not memory mapped, etc.
    }

    fn is_send<T: Send>() -> bool { true }

    #[test]
    fn test_send_sync() {
        assert!(is_send::<MemHandle<Vec<u8>>>());
    }

    #[cfg(feature = "mmap")]
    mod mmap {
        extern crate tempfile;

        use std::io::Write;

        use self::tempfile::tempfile;
        use memmap::Mmap;

        use ::mem::MemHandle;
        use ::mem::MemAdvice::*;

        #[test]
        fn test_advise_one() {
            let map = {
                let mut f = tempfile().unwrap();
                f.write_all(&vec![1u8; 256 * 1024]).unwrap();
                unsafe { Mmap::map(&f) }.unwrap()
            };
            let mem = MemHandle::new(map);
            assert_eq!(mem.advise(Normal).unwrap(),     Normal);
            assert_eq!(mem.advise(Random).unwrap(),     Random);
            assert_eq!(mem.advise(Random).unwrap(),     Random);
            assert_eq!(1u8, mem[0]);
            assert_eq!(mem.advise(Sequential).unwrap(), Sequential);
            assert_eq!(1u8, mem[128*1024-1]);
            assert_eq!(mem.advise(Random).unwrap(),     Random);
            assert_eq!(1u8, mem[256*1024-1]);
            assert_eq!(mem.advise(Normal).unwrap(),     Normal);
        }

        #[test]
        fn test_advise_two_random() {
            let map = {
                let mut f = tempfile().unwrap();
                f.write_all(&vec![1u8; 256 * 1024]).unwrap();
                unsafe { Mmap::map(&f) }.unwrap()
            };
            let h1 = MemHandle::new(map);
            let h2 = h1.clone();
            assert_eq!(h1.advise(Sequential).unwrap(), Sequential);
            assert_eq!(1u8, h1[0]);
            assert_eq!(h2.advise(Random).unwrap(), Sequential);
            assert_eq!(1u8, h2[128*1024-1]);
            drop(h1);
            assert_eq!(h2.advise(Random).unwrap(), Random);
        }

        #[test]
        fn test_advise_two_normal() {
            let map = {
                let mut f = tempfile().unwrap();
                f.write_all(&vec![1u8; 256 * 1024]).unwrap();
                unsafe { Mmap::map(&f) }.unwrap()
            };
            let h1 = MemHandle::new(map);
            let h2 = h1.clone();
            assert_eq!(h1.advise(Sequential).unwrap(), Sequential);
            drop(h1);
            assert_eq!(h2.advise(Normal).unwrap(), Normal);
        }

        #[test]
        fn test_advise_three() {
            let map = {
                let mut f = tempfile().unwrap();
                f.write_all(&vec![1u8; 256 * 1024]).unwrap();
                unsafe { Mmap::map(&f) }.unwrap()
            };
            let h1 = MemHandle::new(map);
            let h2 = h1.clone();
            let h3 = h2.clone();
            assert_eq!(h1.advise(Sequential).unwrap(), Sequential);
            assert_eq!(h2.advise(Random).unwrap(),     Sequential);
            assert_eq!(h3.advise(Random).unwrap(),     Sequential);
            drop(h1); //after which h2 (+h3) wins, now Random
            assert_eq!(h3.advise(Normal).unwrap(),     Random); //h2 remains
        }
    }
}