noria 0.6.1

Client bindings for Noria
Documentation
use std::collections::BTreeMap;
use std::sync::{Condvar, Mutex};

use failure::Error;
use serde::de::DeserializeOwned;
use serde::Serialize;

use super::Authority;
use super::Epoch;
use super::CONTROLLER_KEY;

struct LocalAuthorityInner {
    keys: BTreeMap<String, Vec<u8>>,
    epoch: Epoch,
}

pub struct LocalAuthority {
    inner: Mutex<LocalAuthorityInner>,
    cv: Condvar,
}

impl Default for LocalAuthority {
    fn default() -> Self {
        Self::new()
    }
}

impl LocalAuthority {
    pub fn new() -> Self {
        Self {
            inner: Mutex::new(LocalAuthorityInner {
                keys: BTreeMap::default(),
                epoch: Epoch(0),
            }),
            cv: Condvar::new(),
        }
    }
}
impl Authority for LocalAuthority {
    fn become_leader(&self, payload_data: Vec<u8>) -> Result<Option<Epoch>, Error> {
        let mut inner = self.inner.lock().unwrap();
        if !inner.keys.contains_key(CONTROLLER_KEY) {
            inner.keys.insert(CONTROLLER_KEY.to_owned(), payload_data);
            self.cv.notify_all();
            Ok(Some(inner.epoch))
        } else {
            Ok(None)
        }
    }

    fn surrender_leadership(&self) -> Result<(), Error> {
        let mut inner = self.inner.lock().unwrap();
        assert!(inner.keys.remove(CONTROLLER_KEY).is_some());
        inner.epoch = Epoch(inner.epoch.0 + 1);
        self.cv.notify_all();
        Ok(())
    }

    fn get_leader(&self) -> Result<(Epoch, Vec<u8>), Error> {
        let mut inner = self.inner.lock().unwrap();
        while !inner.keys.contains_key(CONTROLLER_KEY) {
            inner = self.cv.wait(inner).unwrap();
        }
        Ok((
            inner.epoch,
            inner.keys.get(CONTROLLER_KEY).cloned().unwrap(),
        ))
    }

    fn try_get_leader(&self) -> Result<Option<(Epoch, Vec<u8>)>, Error> {
        let inner = self.inner.lock().unwrap();

        Ok(inner
            .keys
            .get(CONTROLLER_KEY)
            .cloned()
            .map(|payload| (inner.epoch, payload)))
    }

    fn await_new_epoch(&self, epoch: Epoch) -> Result<Option<(Epoch, Vec<u8>)>, Error> {
        let mut inner = self.inner.lock().unwrap();
        while inner.epoch == epoch && inner.keys.contains_key(CONTROLLER_KEY) {
            inner = self.cv.wait(inner).unwrap();
        }

        Ok(inner
            .keys
            .get(CONTROLLER_KEY)
            .cloned()
            .map(|k| (inner.epoch, k)))
    }

    fn try_read(&self, path: &str) -> Result<Option<Vec<u8>>, Error> {
        let inner = self.inner.lock().unwrap();
        Ok(inner.keys.get(path).cloned())
    }

    fn read_modify_write<F, P, E>(&self, path: &str, mut f: F) -> Result<Result<P, E>, Error>
    where
        F: FnMut(Option<P>) -> Result<P, E>,
        P: Serialize + DeserializeOwned,
    {
        let mut inner = self.inner.lock().unwrap();
        let r = f(inner
            .keys
            .get(path)
            .map(|data| serde_json::from_slice(data).unwrap()));
        if let Ok(ref p) = r {
            inner
                .keys
                .insert(path.to_owned(), serde_json::to_vec(&p).unwrap());
        }
        Ok(r)
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use std::sync::Arc;
    use std::thread;
    use std::time::Duration;

    #[test]
    fn it_works() {
        let authority = Arc::new(LocalAuthority::new());
        assert!(authority.try_read(CONTROLLER_KEY).unwrap().is_none());
        assert!(authority.try_read("/a").unwrap().is_none());
        assert_eq!(
            authority
                .read_modify_write("/a", |arg: Option<u32>| -> Result<u32, u32> {
                    assert!(arg.is_none());
                    Ok(12)
                })
                .unwrap(),
            Ok(12)
        );
        assert_eq!(
            authority.try_read("/a").unwrap(),
            Some("12".bytes().collect())
        );
        assert_eq!(authority.become_leader(vec![15]).unwrap(), Some(Epoch(0)));
        assert_eq!(authority.get_leader().unwrap(), (Epoch(0), vec![15]));
        {
            let authority = authority.clone();
            thread::spawn(move || authority.become_leader(vec![20]).unwrap());
        }
        thread::sleep(Duration::from_millis(100));
        assert_eq!(authority.get_leader().unwrap(), (Epoch(0), vec![15]));
    }
}