rad 0.5.0

A type-safe, high-level interface to librados using the low-level C bindings from ceph-rust.
Documentation
use futures::prelude::*;
use futures::stream;
use rand::{Rng, SeedableRng, XorShiftRng};

use rad::errors::*;

use super::{CLUSTER_HOLD, connect_to_cluster};


const NUM_OBJECTS: usize = 64;


#[test]
fn read_write_remove() {
    let lock = CLUSTER_HOLD.lock().unwrap();

    let mut cluster = connect_to_cluster().unwrap();

    let bytes: Vec<_> = (0..NUM_OBJECTS)
        .map(|i| {
            let mut buf = Vec::new();

            buf.extend(
                XorShiftRng::from_seed([i as u32 + 1, 2, 3, 4])
                    .gen_iter::<u8>()
                    .take(1 << 16),
            );

            buf
        })
        .collect();

    let names: Vec<_> = (0..NUM_OBJECTS).map(|i| format!("obj-{}", i)).collect();

    (0..NUM_OBJECTS)
        .map(|i| -> Result<()> {
            let mut pool = cluster.get_pool_context("rbd").unwrap();
            let name = &names[i];
            let data = &bytes[i];

            println!("Beginning write for {}", name);
            pool.write_full(name, data)?;
            println!("Finished write for {}, asserting existence", name);
            assert!(pool.exists(name)?);
            println!("Existence of {} asserted, beginning data check", name);
            let mut buf = vec![0u8; data.len()];
            let n = pool.read(name, &mut buf, 0)?;
            assert!(n == buf.len());
            assert!(&buf == data);
            println!("Data equality asserted, beginning removal of {}", name);
            pool.remove(name)?;
            println!("Finished removal, asserting inexistence of {}", name);
            assert!(!pool.exists(name)?);
            println!("Asserted inexistence of {}", name);

            Ok(())
        })
        .for_each(|result| result.unwrap());

    let _ = lock;
}


#[test]
fn read_write_remove_async() {
    let lock = CLUSTER_HOLD.lock().unwrap();

    let mut cluster = connect_to_cluster().unwrap();

    let bytes: Vec<_> = (0..NUM_OBJECTS)
        .map(|i| {
            let mut buf = Vec::new();

            buf.extend(
                XorShiftRng::from_seed([i as u32 + 1, 2, 3, 4])
                    .gen_iter::<u8>()
                    .take(1 << 16),
            );

            buf
        })
        .collect();

    let names: Vec<_> = (0..NUM_OBJECTS).map(|i| format!("obj-{}", i)).collect();

    let writes = (0..NUM_OBJECTS).map(|i| {
        let mut pool = cluster.get_pool_context("rbd").unwrap();
        let name = &names[i];
        let data = &bytes[i];

        println!("Beginning write for {}", name);
        pool.write_full_async(name, data)
                .and_then(move |()| {
                    println!("Finished write for {}, beginning existence check", name);
                    pool.exists_async(name)
                        .and_then(move |b| {
                            assert!(b);
                            println!("Existence of {} asserted, beginning data check", name);
                            pool.read_async(name, vec![0u8; data.len()], 0)
                                .and_then(move |buf| {
                                    // TODO: Implement asynchronous full-read operation.
                                    assert!(buf.0 as usize == data.len());
                                    assert!(&buf.1 == data);
                                    println!("Data equality asserted, beginning removal of {}", name);
                                    pool.remove_async(name).and_then(move |()| {
                                        println!("Finished removal, asserting inexistence of {}", name);
                                        pool.exists_async(name).map(move |b| {
                                            assert!(!b);
                                            println!("Asserted inexistence of {}", name);
                                        })
                                    })
                                })
                        })
                })
    });

    stream::iter_ok(writes)
        .buffered(NUM_OBJECTS)
        .collect()
        .wait()
        .unwrap();

    let _ = lock;
}