devguard-genesis 0.1.4

devguard generic iot configuration service
#![feature(generators, generator_trait)]

extern crate serde;
extern crate toml;

use std::fs::File;
use std::io::{Read};
use carrier::osaka::{self, osaka};

pub mod genesis;
pub mod openwrt;

fn map_err<E: std::error::Error> (e: E) -> std::io::Error {
    std::io::Error::new(std::io::ErrorKind::Other, format!("{:?}", e))
}

pub fn genesis_stream(
    poll: osaka::Poll,
    headers: carrier::headers::Headers,
    _: &carrier::identity::Identity,
    mut stream: carrier::endpoint::Stream,
) -> Option<osaka::Task<()>> {

    let gdir = carrier::config::persistence_dir().join("genesis");
    match std::fs::create_dir_all(&gdir) {
        Ok(_) => (),
        Err(e) => {
            log::warn!("cannot create {:?}: {}", gdir, e);
        }
    };
    let mut p = gdir.join("current.toml");
    if !p.exists() {
        p = gdir.join("stable.toml");
    }
    let mut f = match File::open(&p) {
        Ok(v) => v,
        Err(e) => {
            log::warn!("cannot open {:?}: {}", p, e);
            stream.send(carrier::headers::Headers::with_error(503, "misconfigured").encode());
            return None;
        }
    };

    let sha256 = match carrier::util::sha256file(&p) {
        Err(e) => {
            log::warn!("sha err {}", e);
            stream.send(carrier::headers::Headers::with_error(503, "misconfigured").encode());
            return None;
        },
        Ok(v) => v,
    };

    match headers.get(b":method") {
        None | Some(b"GET") => {
            stream.send(carrier::headers::Headers::ok().encode());
            let mut s = Vec::new();
            f.read_to_end(&mut s).unwrap();
            stream.message(carrier::proto::GenesisCurrent{
                sha256,
                commit: String::new(),
                data:   s,
                stable: !gdir.join("current.toml").exists(),
            });
        },
        Some(b"HEAD") => {
            stream.send(carrier::headers::Headers::ok().encode());
            stream.message(carrier::proto::GenesisCurrent{
                sha256,
                commit: String::new(),
                data:   Vec::new(),
                stable: !gdir.join("current.toml").exists(),
            });
        },
        Some(b"POST") => {
            stream.send(carrier::headers::Headers::with_error(100, "go ahead").encode());
            return Some(genesis_post_handler(poll, stream, gdir.clone()));
        }
        _ => {
            stream.send(carrier::headers::Headers::with_error(404, "no such method").encode());
            return None;
        }
    }

    None
}


#[osaka]
fn genesis_post_handler(_poll: osaka::Poll, mut stream: carrier::endpoint::Stream, gdir: std::path::PathBuf) {
    use carrier::prost::Message;
    use std::io::Write;

    let mut p = gdir.join("current.toml");
    if !p.exists() {
        p = gdir.join("stable.toml");
    }
    let sha256 = match carrier::util::sha256file(&p) {
        Err(e) => {
            log::warn!("sha err {}", e);
            stream.send(carrier::headers::Headers::with_error(503, format!("{:?}", e)).encode());
            return;
        },
        Ok(v) => v,
    };

    let m = osaka::sync!(stream);
    let m = match carrier::proto::GenesisUpdate::decode(&m) {
        Err(e) => {
            stream.send(carrier::headers::Headers::with_error(400, format!("{:?}", e)).encode());
            log::warn!("{:?}", e);
            return;
        },
        Ok(v) => v,
    };

    if m.previous_sha256 != sha256 {
        stream.send(carrier::headers::Headers::with_error(408, "outdated handle").encode());
        return;
    }

    let g : genesis::Genesis = match toml::de::from_slice(&m.data) {
        Err(e) => {
            stream.send(carrier::headers::Headers::with_error(400, format!("{:?}", e)).encode());
            log::warn!("{:?}", e);
            return;
        }
        Ok(v) => v,
    };

    let mut em = openwrt::Emitter::new();
    match em.load(&g) {
        Err(e) => {
            stream.send(carrier::headers::Headers::with_error(400, format!("{:?}", e)).encode());
            log::warn!("{:?}", e);
            return;
        }
        Ok(v) => v,
    }
    let unixtime = std::time::SystemTime::now()
        .duration_since(std::time::SystemTime::UNIX_EPOCH)
        .unwrap_or(std::time::Duration::default()).as_secs();

    let p = gdir.join("current.toml");
    let r = std::fs::File::create(&p)
        .and_then(|mut f|{
            write!(f, "#commit {} {}\n", unixtime, m.commit.replace("\n"," \\ "))?;
            f.write_all(&m.data)
        });
    if let Err(e) = r {
        stream.send(carrier::headers::Headers::with_error(500, format!("{:?}", e)).encode());
        log::warn!("{:?}", e);
        return;
    }

    let r = std::fs::File::create(&gdir.join("changing")).and_then(|mut f|f.write_all(b"0"));
    if let Err(e) = r {
        stream.send(carrier::headers::Headers::with_error(500, format!("{:?}", e)).encode());
        log::warn!("{:?}", e);
        return;
    }

    if let Err(e) = em.commit() {
        stream.send(carrier::headers::Headers::with_error(500, format!("{:?}", e)).encode());
        log::warn!("{:?}", e);
        return;
    }

    stream.send(carrier::headers::Headers::ok().encode());

    std::thread::spawn(||{
        std::thread::sleep(std::time::Duration::from_secs(10));
        std::process::exit(1);
    });
}


pub fn genesis() -> Result<(), std::io::Error> {
    let gdir = carrier::config::persistence_dir().join("genesis");

    let mut f = File::open(gdir.join("current.toml")).or_else(|_|File::open(gdir.join("stable.toml")))?;
    let mut s = Vec::new();
    f.read_to_end(&mut s)?;

    let g : genesis::Genesis = toml::de::from_slice(&s).map_err(map_err)?;
    let mut em = openwrt::Emitter::new();
    em.load(&g)?;
    em.commit()?;

    Ok(())
}

pub fn stabilize(stable: bool) {
    use std::io::Write;

    let gdir = carrier::config::persistence_dir().join("genesis");
    match std::fs::create_dir_all(&gdir) {
        Ok(_) => (),
        Err(e) => {
            log::warn!("cannot create {:?}: {}", gdir, e);
        }
    };

    if stable {
        log::info!("genesis stabilized");
        std::fs::remove_file(gdir.join("changing")).ok();

        let filep1 = gdir.join("current.toml");
        let filep2 = gdir.join("stable.toml");
        if let Err(e) = std::fs::rename(&filep1, &filep2) {
            log::warn!("{:?}", e);
            return;
        }
        return;
    }

    let mut ci = std::fs::read_to_string(&gdir.join("changing")).and_then(|f|f.parse::<u32>().map_err(map_err)).unwrap_or(0);
    ci += 1;
    std::fs::File::create(&gdir.join("changing")).and_then(|mut f|f.write_all(format!("{}", ci).as_bytes())).ok();

    log::info!("genesis stabililization attempt {}", ci);

    if ci > 30 && gdir.join("current.toml").exists() {
        log::info!("genesis reverting");
        std::fs::remove_file(gdir.join("current.toml")).ok();
        std::fs::remove_file(gdir.join("changing")).ok();

        if let Err(e) = genesis() {
            log::error!("genesis: {:?}", e);
        }
    }

}