use libpijul::{Hash, ApplyTimestamp, apply_resize, Repository, PatchId};
use libpijul::patch::read_changes;
use libpijul::fs_representation::{branch_changes_base_path, pristine_dir,
patches_dir, PIJUL_DIR_NAME, PATCHES_DIR_NAME, patch_file_name};
use hyper;
use hyper_rustls;
use regex::Regex;
use base64::URL_SAFE_NO_PAD;
use std::path::{Path, PathBuf};
use std::collections::hash_set::HashSet;
use std::collections::HashMap;
use std::fs::{File, hard_link, copy, metadata};
use std;
use error::{ErrorKind, Result, Error};
use std::io::prelude::*;
use std::net::ToSocketAddrs;
use shell_escape::unix::escape;
use std::borrow::Cow;
use commands::{ask, assert_no_containing_repo, create_repo};
use futures;
use user;
use thrussh;
use thrussh_keys;
use futures::{Future, Stream, Async, Poll};
use tokio_core;
use tokio_core::net::TcpStream;
#[derive(Debug)]
pub enum Remote<'a> {
Ssh {
user: Option<&'a str>,
host: &'a str,
port: Option<u16>,
path: &'a str,
id: &'a str,
secret_key: Option<&'a Path>
},
Uri { uri: &'a str },
Local { path: PathBuf },
}
pub enum Session<'a> {
Ssh {
l: tokio_core::reactor::Core,
id: &'a str,
path: &'a str,
session: Option<thrussh::client::Connection<TcpStream, Client>>,
},
Uri {
l: tokio_core::reactor::Core,
uri: &'a str,
client: hyper::Client<hyper_rustls::HttpsConnector>
},
Local { path: &'a Path },
}
pub struct Client {
exit_status: Option<u32>,
state: State,
host: String,
port: u16,
channel: Option<thrussh::ChannelId>,
}
enum State {
None,
Changes { changes: HashMap<Hash, ApplyTimestamp>, },
DownloadPatch { file: File },
}
enum SendFileState {
Read(thrussh::client::Connection<TcpStream, Client>),
Wait(thrussh::client::Data<TcpStream, Client, Vec<u8>>),
}
struct SendFile {
f: File,
buf: Option<Vec<u8>>,
chan: thrussh::ChannelId,
state: Option<SendFileState>,
}
impl Future for SendFile {
type Item = (thrussh::client::Connection<TcpStream, Client>, Vec<u8>);
type Error = Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
debug!("SendFile loop starting");
loop {
debug!("sendfile loop");
match self.state.take() {
Some(SendFileState::Read(c)) => {
debug!("read");
let mut buf = self.buf.take().unwrap();
buf.resize(BUFFER_SIZE, 0);
let len = self.f.read(&mut buf)?;
if len == 0 {
return Ok(Async::Ready((c, buf)));
}
buf.truncate(len);
debug!("sending {:?} bytes, {:?}", len, buf.len());
self.state = Some(SendFileState::Wait(c.data(self.chan, None, buf)));
}
Some(SendFileState::Wait(mut c)) => {
debug!("wait");
match try!(c.poll()) {
Async::Ready((c, buf)) => {
self.buf = Some(buf);
self.state = Some(SendFileState::Read(c))
}
Async::NotReady => {
self.state = Some(SendFileState::Wait(c));
return Ok(Async::NotReady);
}
}
}
None => unreachable!(),
}
}
}
}
impl thrussh::client::Handler for Client {
type Error = Error;
type FutureUnit = futures::Finished<Client, Error>;
type SessionUnit = futures::Finished<(Client, thrussh::client::Session), Error>;
type FutureBool = futures::future::FutureResult<(Client, bool), Error>;
fn data(mut self,
channel: thrussh::ChannelId,
stream: Option<u32>,
data: &[u8],
session: thrussh::client::Session)
-> Self::SessionUnit {
debug!("data ({:?}): {:?}", channel, &data[..std::cmp::min(data.len(), 100)]);
if stream == Some(1) {
std::io::stderr().write(data).unwrap();
} else if stream == None {
match self.state {
State::None => {
std::io::stdout().write(data).unwrap();
}
State::Changes { ref mut changes } => {
let data = std::str::from_utf8(data).unwrap();
for l in data.lines() {
let mut spl = l.split(':');
if let (Some(h), Some(s)) = (spl.next(), spl.next()) {
if let (Some(h), Ok(s)) =
(Hash::from_base64(h), s.parse()) {
changes.insert(h, s);
}
}
}
}
State::DownloadPatch { ref mut file } => {
file.write_all(data).unwrap();
}
}
} else {
debug!("SSH data received on channel {:?}: {:?} {:?}",
channel,
stream,
data);
}
futures::finished((self, session))
}
fn exit_status(mut self,
channel: thrussh::ChannelId,
exit_status: u32,
session: thrussh::client::Session)
-> Self::SessionUnit {
debug!("exit_status received on channel {:?}: {:?}:", channel, exit_status);
debug!("self.channel = {:?}", self.channel);
if let Some(c) = self.channel {
if channel == c {
self.exit_status = Some(exit_status);
}
}
debug!("self.exit_status = {:?}", self.exit_status);
futures::finished((self, session))
}
fn check_server_key(self, server_public_key: &thrussh::key::PublicKey) -> Self::FutureBool {
let path = std::env::home_dir().unwrap().join(".ssh").join("known_hosts");
match thrussh_keys::check_known_hosts_path(&self.host, self.port, &server_public_key, &path) {
Ok(true) => futures::done(Ok((self, true))),
Ok(false) => {
if let Ok(false) = ask::ask_learn_ssh(&self.host,
self.port, "") {
futures::done(Ok((self, false)))
} else {
thrussh_keys::learn_known_hosts_path(&self.host,
self.port,
&server_public_key,
&path)
.unwrap();
futures::done(Ok((self, true)))
}
}
Err(e) => {
if let &thrussh_keys::ErrorKind::KeyChanged(line) = e.kind() {
println!("Host key changed! Someone might be eavesdropping this communication, \
refusing to continue. Previous key found line {}",
line);
futures::done(Ok((self, false)))
} else {
futures::done(Err(From::from(e)))
}
}
}
}
}
const BUFFER_SIZE: usize = 1 << 14;
impl<'a> Session<'a> {
pub fn changes(&mut self, branch: &str) -> Result<HashMap<Hash, ApplyTimestamp>> {
match *self {
Session::Ssh { ref mut l, ref path, ref mut session, .. } => {
let esc_path = escape(Cow::Borrowed(path));
let cmd = format!("pijul changes --repository {} --branch {:?} --hash-only",
esc_path,
branch);
if let Some(ref mut session) = *session {
session.handler_mut().state = State::Changes { changes: HashMap::new() }
}
*session = Some(l.run(session.take()
.unwrap()
.channel_open_session()
.and_then(move |(mut connection, chan)| {
debug!("exec: {:?}", cmd);
connection.handler_mut().channel = Some(chan);
connection.exec(chan, false, &cmd);
debug!("waiting channel close");
connection.wait(move |session| {
session.handler().exit_status.is_some()
})
}))
.unwrap());
let exit_code: Option<u32> = if let Some(ref mut session) = *session {
session.handler().exit_status
} else {
None
};
debug!("exit_code = {:?}", exit_code);
if let Some(ref mut session) = *session {
match std::mem::replace(&mut session.handler_mut().state, State::None) {
State::Changes { changes } => {
debug!("changes: {:?}", changes);
Ok(changes)
}
_ => unreachable!(),
}
} else {
unreachable!()
}
}
Session::Local { path } => {
let repo_dir = pristine_dir(&path);
let repo = Repository::open(&repo_dir, None)?;
let txn = repo.txn_begin()?;
Ok(if let Some(branch) = txn.get_branch(&branch) {
txn.iter_patches(&branch, None)
.map(|(hash, s)| (txn.get_external(hash).unwrap().to_owned(), s))
.collect()
} else {
HashMap::new()
})
}
Session::Uri { ref mut l, uri, ref mut client } => {
let mut uri = uri.to_string();
uri = uri + "/" + PIJUL_DIR_NAME + "/" + &branch_changes_base_path(branch);
let mut req = hyper::Request::new(
hyper::Method::Get,
uri.parse().unwrap()
);
req.headers_mut().set_raw("connection", "close");
let res:Vec<u8> = l.run(
client.request(req)
.and_then(|resp: hyper::Response| {
let res = Vec::new();
let body: hyper::Body = resp.body();
body.fold(res, |mut res, x| {
res.extend(x.iter());
futures::finished::<_, hyper::Error>(res)
})
})
).unwrap();
let changes = read_changes(&mut &res[..]).unwrap_or(HashMap::new());
debug!("http: {:?}", changes);
Ok(changes)
}
}
}
pub fn download_patch(&mut self,
repo_root: &Path,
patch_hash: &Hash)
-> Result<PathBuf> {
let local_file = patches_dir(repo_root).join(&patch_file_name(patch_hash.as_ref()));
if metadata(&local_file).is_ok() {
Ok(local_file)
} else {
match *self {
Session::Local { path } => {
debug!("local downloading {:?}", patch_hash);
let remote_file = patches_dir(path).join(&patch_file_name(patch_hash.as_ref()));
debug!("hard linking {:?} to {:?}", remote_file, local_file);
try!(hard_link(&remote_file, &local_file)
.or_else(|_| copy(&remote_file, &local_file).and_then(|_| Ok(()))));
Ok(local_file)
}
Session::Ssh { ref mut l, ref path, ref mut session, .. } => {
let esc_path = escape(Cow::Borrowed(path));
let cmd = format!("pijul patch --repository {} {}",
esc_path,
patch_hash.to_base64(URL_SAFE_NO_PAD));
debug!("cmd {:?} {:?}", cmd, local_file);
if let Some(ref mut session) = *session {
session.handler_mut().state =
State::DownloadPatch { file: try!(File::create(&local_file)) };
session.handler_mut().exit_status = None;
session.handler_mut().channel = None;
}
*session = Some(l.run(session.take()
.unwrap()
.channel_open_session()
.and_then(move |(mut connection, chan)| {
connection.handler_mut().channel = Some(chan);
connection.exec(chan, false, &cmd);
connection.wait(move |session| {
session.handler().exit_status.is_some()
})
}))
.unwrap());
if let Some(ref mut session) = *session {
if let State::DownloadPatch { mut file } = std::mem::replace(&mut session.handler_mut().state, State::None) {
file.flush()?;
}
}
Ok(local_file)
}
Session::Uri { ref mut l, ref mut client, uri } => {
let uri =
uri.to_string() + "/" + PIJUL_DIR_NAME + "/" + PATCHES_DIR_NAME + "/" +
&patch_hash.to_base64(URL_SAFE_NO_PAD) + ".gz";
debug!("downloading uri {:?}", uri);
let mut req = hyper::Request::new(
hyper::Method::Get,
uri.parse().unwrap()
);
req.headers_mut().set_raw("connection", "close");
let res = l.run(
client.request(req)
.and_then(|resp| {
if resp.status() == hyper::StatusCode::Ok {
let res = Vec::new();
futures::future::Either::A(resp.body().fold(res, |mut res, x| {
res.extend(x.iter());
futures::finished::<_, hyper::Error>(res)
}).map(|body| {
debug!("response={:?}", body);
let mut f = File::create(&local_file).unwrap();
f.write_all(&body).unwrap();
debug!("patch downloaded through http: {:?}", body);
Some(local_file)
}))
} else {
futures::future::Either::B(futures::finished(None))
}
})
).unwrap();
if let Some(local_file) = res {
Ok(local_file)
} else {
Err(ErrorKind::PatchNotFound(
repo_root.to_str().unwrap().to_string(),
patch_hash.to_owned()
).into())
}
}
}
}
}
fn remote_apply(&mut self,
repo_root: &Path,
remote_branch: &str,
patch_hashes: &HashSet<Hash>)
-> Result<()> {
match *self {
Session::Ssh { ref mut l, ref mut session, ref path, .. } => {
let pdir = patches_dir(repo_root);
if let Some(ref mut session) = *session {
session.handler_mut().exit_status = None;
}
*session = Some(l.run(session.take()
.unwrap()
.channel_open_session()
.and_then(move |(mut session, chan)| {
session.handler_mut().channel = Some(chan);
let esc_path = escape(Cow::Borrowed(path));
session.exec(chan,
false,
&format!("pijul apply --repository {} --branch {:?}",
esc_path,
remote_branch));
let it = patch_hashes.iter().map(|x| {
let y: Result<_> = Ok(x);
y
});
futures::stream::iter(it)
.fold((session, Vec::new()), move |(session, buf), hash| {
let mut pdir = pdir.clone();
pdir.push(hash.to_base64(URL_SAFE_NO_PAD));
pdir.set_extension("gz");
let f = std::fs::File::open(&pdir).unwrap();
pdir.pop();
SendFile {
f: f,
buf: Some(buf),
chan: chan,
state: Some(SendFileState::Read(session)),
}
})
.and_then(move |(mut session, _)| {
session.channel_eof(chan);
session.channel_close(chan);
session.wait_flush().map_err(Error::from)
.and_then(move |session| {
session.wait(move |session| session.handler().exit_status.is_some())
.map_err(Error::from)
})
})
.map_err(From::from)
}))
.unwrap());
if let Some(ref session) = *session {
debug!("exit status = {:?}", session.handler().exit_status);
}
Ok(())
}
Session::Local { path } => {
let mut remote_path = patches_dir(path);
let mut local_path = patches_dir(repo_root);
for hash in patch_hashes {
remote_path.push(&hash.to_base64(URL_SAFE_NO_PAD));
remote_path.set_extension("gz");
local_path.push(&hash.to_base64(URL_SAFE_NO_PAD));
local_path.set_extension("gz");
debug!("hard linking {:?} to {:?}", local_path, remote_path);
if metadata(&remote_path).is_err() {
try!(hard_link(&local_path, &remote_path)
.or_else(|_| copy(&local_path, &remote_path).and_then(|_| Ok(()))))
}
local_path.pop();
remote_path.pop();
}
loop {
match apply_resize(&path, &remote_branch, patch_hashes.iter()) {
Err(ref e) if e.lacks_space() => {},
Ok(()) => return Ok(()),
Err(e) => return Err(From::from(e))
}
}
}
_ => panic!("upload to URI impossible"),
}
}
pub fn remote_init(&mut self) -> Result<()> {
match *self {
Session::Ssh { ref mut l, ref mut session, ref path, .. } => {
let esc_path = escape(Cow::Borrowed(path));
let cmd = format!("pijul init {}", esc_path);
debug!("command line:{:?}", cmd);
if let Some(ref mut session) = *session {
session.handler_mut().exit_status = None
}
*session = Some(l.run(session.take()
.unwrap()
.channel_open_session()
.and_then(move |(mut session, chan)| {
debug!("chan = {:?}", chan);
session.handler_mut().channel = Some(chan);
session.exec(chan, false, &cmd);
session.wait(move |session| session.handler().exit_status.is_some())
}))
.unwrap());
Ok(())
}
Session::Local { path } => {
try!(assert_no_containing_repo(path));
create_repo(path)
},
_ => panic!("remote init not possible"),
}
}
pub fn pullable_patches(&mut self,
remote_branch: &str,
local_branch: &str,
target: &Path)
-> Result<Pullable> {
let mut remote_patches: Vec<(Hash, ApplyTimestamp)> =
try!(self.changes(remote_branch))
.into_iter()
.map(|(h, s)| (h.to_owned(), s))
.collect();
remote_patches.sort_by(|&(_, ref a), &(_, ref b)| a.cmp(&b));
let local_patches: HashMap<Hash, ApplyTimestamp> = {
let repo_dir = pristine_dir(&target);
let repo = Repository::open(&repo_dir, None)?;
let txn = repo.txn_begin()?;
if let Some(branch) = txn.get_branch(&local_branch) {
txn.iter_patches(&branch, None)
.map(|(hash, s)| (txn.get_external(hash).unwrap().to_owned(), s))
.collect()
} else {
HashMap::new()
}
};
debug!("pullable done: {:?}", remote_patches);
Ok(Pullable {
local: local_patches.iter().map(|(h, _)| h.to_owned()).collect(),
remote: remote_patches
.into_iter()
.collect()
})
}
pub fn pull(&mut self,
target: &Path,
to_branch: &str,
pullable: &mut [(Hash, ApplyTimestamp)])
-> Result<()> {
for &(ref i, _) in pullable.iter() {
try!(self.download_patch(&target, i));
}
debug!("patches downloaded");
loop {
debug!("apply_resize");
match apply_resize(&target, &to_branch, pullable.iter().map(|&(ref h, _)| h)) {
Err(ref e) if e.lacks_space() => {},
Ok(()) => return Ok(()),
Err(e) => return Err(From::from(e))
}
}
}
pub fn pushable_patches(&mut self,
from_branch: &str,
to_branch: &str,
source: &Path)
-> Result<Vec<(Hash, Option<PatchId>, ApplyTimestamp)>> {
debug!("source: {:?}", source);
let to_changes = try!(self.changes(to_branch));
let from_changes: Vec<_> = {
let repo_dir = pristine_dir(&source);
let repo = Repository::open(&repo_dir, None)?;
let txn = repo.txn_begin()?;
if let Some(branch) = txn.get_branch(&from_branch) {
txn.iter_patches(&branch, None)
.map(|(hash, s)| (txn.get_external(hash).unwrap().to_owned(), Some(hash.to_owned()), s))
.filter(|&(ref hash, _, _)| to_changes.get(hash).is_none())
.collect()
} else {
Vec::new()
}
};
debug!("pushing: {:?}", from_changes);
let to_changes: HashSet<Hash> = to_changes.into_iter().map(|(h, _)| h).collect();
debug!("to_changes: {:?}", to_changes);
Ok(from_changes.into_iter()
.filter(|&(ref h, _, _)| !to_changes.contains(h))
.collect())
}
pub fn push(&mut self,
source: &Path,
remote_branch: &str,
pushable: &HashSet<Hash>)
-> Result<()> {
debug!("push, remote_applying");
debug!("pushable: {:?}", pushable);
if pushable.len() > 0 {
try!(self.remote_apply(source, remote_branch, pushable));
}
Ok(())
}
}
impl<'a> Remote<'a> {
pub fn session(&'a self) -> Result<Session<'a>> {
match *self {
Remote::Local { ref path } => Ok(Session::Local { path: path.as_path() }),
Remote::Uri { uri } => {
let l = tokio_core::reactor::Core::new().unwrap();
let h = l.handle();
Ok(Session::Uri {
l, uri: uri,
client: hyper::Client::configure()
.connector(hyper_rustls::HttpsConnector::new(3, &h))
.build(&h)
})
}
Remote::Ssh { ref user, ref host, ref port, ref path, ref id, ref secret_key } => {
let addr = (*host, port.unwrap_or(22)).to_socket_addrs().unwrap().next().unwrap();
debug!("addr = {:?}", addr);
let mut l = tokio_core::reactor::Core::new().unwrap();
let handle = l.handle();
let config = std::rc::Rc::new(thrussh::client::Config::default());
let handler = Client {
exit_status: None,
state: State::None,
port: port.unwrap_or(22),
host: host.to_string(),
channel: None,
};
let session = l.run(
tokio_core::net::TcpStream::connect(&addr, &handle)
.map_err(Error::from)
.and_then(|socket| {
let connection =
thrussh::client::Connection::new(
config.clone(),
socket, handler, None
)?;
let key = {
let (path_sec, path_pub):(_, Option<PathBuf>) = guess_secret_key_path(*secret_key);
debug!("key path: {:?}, {:?}", path_sec, path_pub);
load_key_or_ask(path_sec, path_pub)?
};
if let &Some(user) = user {
debug!("user = {:?}", user);
Ok(connection.authenticate_key(user, key))
} else {
let user = user::get_user_name().unwrap();
debug!("user = {:?}", user);
Ok(connection.authenticate_key(&user, key))
}
})
.flatten()
)?;
Ok(Session::Ssh {
l: l,
session: Some(session),
path: path,
id: id,
})
}
}
}
}
fn load_key_or_ask(path_sec: PathBuf, path_pub: Option<PathBuf>) -> Result<thrussh::key::Algorithm> {
match thrussh_keys::load_secret_key(
path_sec.to_str().unwrap(),
None,
path_pub.as_ref().map(|x| x.as_path())
) {
Ok(key) => Ok(key),
Err(e) => {
match e.kind() {
&thrussh_keys::ErrorKind::KeyIsEncrypted => {
let password = ask::password()?;
return Ok(thrussh_keys::load_secret_key(
path_sec.to_str().unwrap(),
Some(password.as_bytes()),
path_pub.as_ref().map(|x| x.as_path())
)?)
}
&thrussh_keys::ErrorKind::IO(ref e)
if e.kind() == std::io::ErrorKind::NotFound =>
return Err(ErrorKind::SshKeyNotFound(path_sec).into()),
_ => {}
}
return Err(From::from(e))
}
}
}
pub fn guess_secret_key_path(secret_key: Option<&std::path::Path>) -> (PathBuf, Option<PathBuf>) {
if let Some(key) = secret_key {
let path_sec = key.to_path_buf();
let mut path_pub:PathBuf = key.to_path_buf();
path_pub.set_extension("pub");
(path_sec, Some(path_pub))
} else {
let key_ed25519: PathBuf = std::env::home_dir()
.unwrap()
.join(".ssh").join("id_ed25519");
if std::fs::metadata(&key_ed25519).is_ok() {
(key_ed25519, None)
} else {
let key_rsa = std::env::home_dir().unwrap()
.join(".ssh").join("id_rsa");
let mut key_rsa_pub:PathBuf = key_rsa.clone();
key_rsa_pub.set_extension("pub");
(key_rsa, Some(key_rsa_pub))
}
}
}
pub fn parse_remote<'a>(remote_id: &'a str,
port: Option<u16>,
secret_key: Option<&'a Path>,
base_path: Option<&'a Path>)
-> Remote<'a> {
let ssh = Regex::new(r"^([^:]*):(.*)$").unwrap();
let uri = Regex::new(r"^([a-zA-Z]*)://(.*)$").unwrap();
if uri.is_match(remote_id) {
let cap = uri.captures(remote_id).unwrap();
if &cap[1] == "file" {
if let Some(a) = base_path {
let path = a.join(&cap[2]);
Remote::Local { path: path }
} else {
let path = Path::new(&cap[2]).to_path_buf();
Remote::Local { path: path }
}
} else {
Remote::Uri { uri: remote_id }
}
} else if ssh.is_match(remote_id) {
let cap = ssh.captures(remote_id).unwrap();
let user_host = cap.get(1).unwrap().as_str();
let (user, host) = {
let ssh_user_host = Regex::new(r"^([^@]*)@(.*)$").unwrap();
if ssh_user_host.is_match(user_host) {
let cap = ssh_user_host.captures(user_host).unwrap();
(Some(cap.get(1).unwrap().as_str()), cap.get(2).unwrap().as_str())
} else {
(None, user_host)
}
};
Remote::Ssh {
user: user,
host: host,
port: port,
path: cap.get(2).unwrap().as_str(),
id: remote_id,
secret_key: secret_key
}
} else {
if let Some(a) = base_path {
let path = a.join(remote_id);
Remote::Local { path: path }
} else {
let path = Path::new(remote_id).to_path_buf();
Remote::Local { path: path }
}
}
}
#[derive(Debug)]
pub struct Pullable {
pub local: HashSet<Hash>,
pub remote: Vec<(Hash, ApplyTimestamp)>,
}
pub struct PullableIterator<'a> {
remote: std::slice::Iter<'a, (Hash, ApplyTimestamp)>,
local: &'a HashSet<Hash>
}
impl Pullable {
pub fn iter(&self) -> PullableIterator {
PullableIterator {
local: &self.local,
remote: self.remote.iter()
}
}
}
impl<'a> Iterator for PullableIterator<'a> {
type Item = (Hash, ApplyTimestamp);
fn next(&mut self) -> Option<Self::Item> {
while let Some(&(ref h, t)) = self.remote.next() {
if !self.local.contains(h) {
return Some((h.to_owned(), t))
}
}
None
}
}