use libpijul::fs_representation::{
branch_changes_base_path, patch_file_name, RepoRoot, PIJUL_DIR_NAME,
};
use libpijul::patch::read_changes;
use libpijul::{
apply_resize, apply_resize_no_output, apply_resize_patches, apply_resize_patches_no_output,
ApplyTimestamp, ConflictingFile, Hash, Patch, PatchId, RepoPath, Repository,
};
use regex::Regex;
use reqwest;
use reqwest::async as reqwest_async;
use error::Error;
use std;
use std::collections::hash_set::HashSet;
use std::collections::HashMap;
use std::fs::{copy, hard_link, metadata, rename, File};
use std::path::{Path, PathBuf};
use std::sync::Arc;
use commands::{ask, assert_no_containing_repo, create_repo};
use cryptovec;
use dirs;
use futures;
use futures::{Async, Future, Poll, Stream};
use meta;
use progrs;
use sequoia_openpgp::serialize::Serialize;
use shell_escape::unix::escape;
use std::borrow::Cow;
use std::io::prelude::*;
use std::io::BufReader;
use std::net::ToSocketAddrs;
use tempfile::tempdir_in;
use thrussh;
use thrussh_config;
use thrussh_keys;
use tokio;
use username;
#[derive(Debug)]
pub struct SshRemote<'a> {
user: Option<&'a str>,
host: &'a str,
port: Option<u16>,
path: &'a str,
id: &'a str,
local_repo_root: Option<&'a Path>,
pijul_cmd: Cow<'static, str>,
}
#[derive(Debug)]
pub enum Remote<'a> {
Ssh(SshRemote<'a>),
Uri { uri: &'a str },
Local { path: RepoRoot<PathBuf> },
}
pub enum Session<'a> {
Ssh(SshSession<'a>),
Uri(UriSession<'a>),
Local(LocalSession<'a>),
}
pub struct SshSession<'a> {
pub l: tokio::runtime::Runtime,
path: &'a str,
pijul_cmd: &'a str,
pub session: Option<thrussh::client::Connection<thrussh_config::Stream, Client>>,
}
pub struct UriSession<'a> {
l: tokio::runtime::Runtime,
uri: &'a str,
client: reqwest_async::Client,
}
pub struct LocalSession<'a> {
root: RepoRoot<&'a Path>,
}
impl<'a> Drop for SshSession<'a> {
fn drop(&mut self) {
if let Some(mut session) = self.session.take() {
debug!("disconnecting");
session.disconnect(thrussh::Disconnect::ByApplication, "finished", "EN");
if let Err(e) = self.l.block_on(session) {
error!("While dropping SSH Session: {:?}", e);
}
}
}
}
#[cfg(unix)]
use thrussh_keys::agent::client::AgentClient;
#[cfg(unix)]
use tokio_uds::UnixStream;
pub struct Client {
pub exit_status: HashMap<thrussh::ChannelId, u32>,
state: State,
host: String,
port: u16,
channel: Option<thrussh::ChannelId>,
#[cfg(unix)]
pub agent: Option<AgentClient<UnixStream>>,
#[cfg(windows)]
pub agent: Option<()>,
}
impl Client {
#[cfg(unix)]
fn new(port: Option<u16>, host: &str, l: &mut tokio::runtime::Runtime) -> Self {
let agent = if let Ok(path) = std::env::var("SSH_AUTH_SOCK") {
l.block_on(
UnixStream::connect(path).map(thrussh_keys::agent::client::AgentClient::connect),
)
.ok()
} else {
None
};
debug!("Client::new(), agent: {:?}", agent.is_some());
Client {
exit_status: HashMap::new(),
state: State::None,
port: port.unwrap_or(22),
host: host.to_string(),
channel: None,
agent,
}
}
#[cfg(windows)]
fn new(port: Option<u16>, host: &str, _: &mut tokio::runtime::Runtime) -> Self {
Client {
exit_status: HashMap::new(),
state: State::None,
port: port.unwrap_or(22),
host: host.to_string(),
channel: None,
agent: None,
}
}
}
enum State {
None,
Changes {
changes: Vec<(Hash, ApplyTimestamp)>,
},
DownloadPatch {
file: File,
},
}
enum SendFileState {
Read(thrussh::client::Connection<thrussh_config::Stream, Client>),
Wait(thrussh::client::Data<thrussh_config::Stream, Client, Vec<u8>>),
}
struct SendFile {
f: File,
buf: Option<Vec<u8>>,
chan: thrussh::ChannelId,
state: Option<SendFileState>,
}
impl Future for SendFile {
type Item = (
thrussh::client::Connection<thrussh_config::Stream, Client>,
Vec<u8>,
);
type Error = Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
debug!("SendFile loop starting");
loop {
debug!("sendfile loop");
match self.state.take() {
Some(SendFileState::Read(c)) => {
debug!("read");
let mut buf = self.buf.take().unwrap();
buf.resize(BUFFER_SIZE, 0);
let len = self.f.read(&mut buf)?;
if len == 0 {
return Ok(Async::Ready((c, buf)));
}
buf.truncate(len);
debug!("sending {:?} bytes, {:?}", len, buf.len());
self.state = Some(SendFileState::Wait(c.data(self.chan, None, buf)));
}
Some(SendFileState::Wait(mut c)) => {
debug!("wait");
match c.poll()? {
Async::Ready((c, buf)) => {
self.buf = Some(buf);
self.state = Some(SendFileState::Read(c))
}
Async::NotReady => {
self.state = Some(SendFileState::Wait(c));
return Ok(Async::NotReady);
}
}
}
None => unreachable!(),
}
}
}
}
impl thrussh::client::Handler for Client {
type Error = Error;
type FutureUnit = futures::Finished<Client, Error>;
type SessionUnit = futures::Finished<(Client, thrussh::client::Session), Error>;
type FutureBool = futures::future::FutureResult<(Client, bool), Error>;
type FutureSign =
Box<futures::Future<Item = (Self, cryptovec::CryptoVec), Error = Self::Error>>;
#[cfg(unix)]
fn auth_publickey_sign(
mut self,
key: &thrussh_keys::key::PublicKey,
mut to_sign: cryptovec::CryptoVec,
) -> Self::FutureSign {
debug!("auth_publickey_sign");
if let Some(agent) = self.agent.take() {
use thrussh_keys::encoding::Encoding;
debug!("using agent");
Box::new(
agent
.sign_request(key, &to_sign)
.then(move |result| match result {
Ok((client, sig)) => {
debug!("sig = {:?}", sig);
if let Some(sig) = sig {
to_sign.extend_ssh_string(&sig[..]);
}
self.agent = Some(client);
futures::finished::<_, Error>((self, to_sign))
}
Err(e) => {
error!("SSH agent error: {:?}", e);
futures::finished((self, to_sign))
}
})
.from_err(),
)
} else {
debug!("no agent");
Box::new(futures::finished((self, to_sign)))
}
}
fn data(
mut self,
channel: thrussh::ChannelId,
stream: Option<u32>,
data: &[u8],
session: thrussh::client::Session,
) -> Self::SessionUnit {
debug!(
"data ({:?}): {:?}",
channel,
&data[..std::cmp::min(data.len(), 100)]
);
if stream == Some(1) {
std::io::stderr().write(data).unwrap();
} else if stream == None {
match self.state {
State::None => {
std::io::stdout().write(data).unwrap();
}
State::Changes { ref mut changes } => {
let data = std::str::from_utf8(data).unwrap();
for l in data.lines() {
let mut spl = l.split(':');
if let (Some(h), Some(s)) = (spl.next(), spl.next()) {
if let (Some(h), Ok(s)) = (Hash::from_base58(h), s.parse()) {
changes.push((h, s));
}
}
}
}
State::DownloadPatch { ref mut file, .. } => {
file.write_all(data).unwrap();
}
}
} else {
debug!(
"SSH data received on channel {:?}: {:?} {:?}",
channel, stream, data
);
}
futures::finished((self, session))
}
fn exit_status(
mut self,
channel: thrussh::ChannelId,
exit_status: u32,
session: thrussh::client::Session,
) -> Self::SessionUnit {
debug!(
"exit_status received on channel {:?}: {:?}:",
channel, exit_status
);
debug!("self.channel = {:?}", self.channel);
if let Some(c) = self.channel {
if channel == c {
self.exit_status.insert(channel, exit_status);
}
}
debug!("self.exit_status = {:?}", self.exit_status);
futures::finished((self, session))
}
fn check_server_key(
self,
server_public_key: &thrussh_keys::key::PublicKey,
) -> Self::FutureBool {
let path = dirs::home_dir().unwrap().join(".ssh").join("known_hosts");
match thrussh_keys::check_known_hosts_path(&self.host, self.port, server_public_key, &path)
{
Ok(true) => futures::done(Ok((self, true))),
Ok(false) => {
if let Ok(false) = ask::ask_learn_ssh(&self.host, self.port, "") {
futures::done(Ok((self, false)))
} else {
thrussh_keys::learn_known_hosts_path(
&self.host,
self.port,
server_public_key,
&path,
)
.unwrap();
futures::done(Ok((self, true)))
}
}
Err(e) => {
if let thrussh_keys::Error::KeyChanged(line) = e {
println!(
"Host key changed! Someone might be eavesdropping this communication, \
refusing to continue. Previous key found line {}",
line
);
futures::done(Ok((self, false)))
} else {
futures::done(Err(From::from(e)))
}
}
}
}
}
const BUFFER_SIZE: usize = 1 << 14;
impl<'a> SshSession<'a> {
pub fn changes(
&mut self,
branch: &str,
path: &[RepoPath<impl AsRef<Path>>],
) -> Result<Vec<(Hash, ApplyTimestamp)>, Error> {
let esc_path = escape(Cow::Borrowed(self.path));
let mut cmd = format!(
"{} log --repository {} --branch {:?} --hash-only",
self.pijul_cmd, esc_path, branch
);
for p in path {
cmd.push_str(&format!(" --path {}", p.display()))
}
if let Some(ref mut session) = self.session {
session.handler_mut().state = State::Changes {
changes: Vec::new(),
}
}
let mut channel = None;
self.session = Some(
self.l
.block_on(
self.session
.take()
.unwrap()
.channel_open_session()
.and_then(move |(mut connection, chan)| {
debug!("exec: {:?}", cmd);
channel = Some(chan);
connection.handler_mut().exit_status.remove(&chan);
connection.handler_mut().channel = Some(chan);
connection.exec(chan, false, &cmd);
connection.channel_eof(chan);
debug!("waiting channel close");
connection
.wait(move |session| {
session.handler().exit_status.get(&chan).is_some()
})
.and_then(move |mut session| {
if session.is_channel_open(chan) {
session.channel_close(chan);
}
session.wait(move |session| !session.is_channel_open(chan))
})
}),
)
.unwrap(),
);
if let Some(ref session) = self.session {
if let Some(channel) = channel {
if let Some(&exit_code) = session.handler().exit_status.get(&channel) {
debug!("exit_code = {:?}", exit_code);
if exit_code != 0 {
return Ok(Vec::new());
}
}
}
}
if let Some(ref mut session) = self.session {
match std::mem::replace(&mut session.handler_mut().state, State::None) {
State::Changes { changes } => {
debug!("changes: {:?}", changes);
Ok(changes)
}
_ => unreachable!(),
}
} else {
unreachable!()
}
}
pub fn send_key(&mut self, key_pair: meta::SigningKeys) -> Result<(), Error> {
if let Some(ref mut session) = self.session {
session.handler_mut().channel = None;
}
let challenge_cmd = format!("{} key register", self.pijul_cmd);
let mut data = Vec::new();
key_pair.tpk.serialize(&mut data)?;
self.session = Some(
self.l.block_on(
self.session
.take()
.unwrap()
.channel_open_session()
.and_then(move |(mut session, channelid)| {
session.exec(channelid, false, &challenge_cmd);
session
.data(channelid, None, data)
.and_then(move |(mut session, _)| {
session.channel_eof(channelid);
session.handler_mut().channel = Some(channelid);
session.wait(move |session| {
session.handler().exit_status.get(&channelid).is_some()
})
})
}),
)?,
);
Ok(())
}
pub fn fetch_patch(
&mut self,
patch_hash: &Hash,
local_file: PathBuf,
local_tmp_file: PathBuf,
) -> Result<PathBuf, Error> {
let esc_path = escape(Cow::Borrowed(self.path));
let cmd = format!(
"{} patch --repository {} --bin {}",
self.pijul_cmd,
esc_path,
patch_hash.to_base58()
);
debug!("cmd {:?} {:?}", cmd, local_file);
if let Some(ref mut session) = self.session {
session.handler_mut().state = State::DownloadPatch {
file: File::create(&local_tmp_file)?,
};
session.handler_mut().channel = None;
}
self.session = Some(
self.l
.block_on(
self.session
.take()
.unwrap()
.channel_open_session()
.and_then(move |(mut connection, chan)| {
connection.handler_mut().exit_status.remove(&chan);
connection.handler_mut().channel = Some(chan);
connection.exec(chan, false, &cmd);
connection.channel_eof(chan);
connection
.wait(move |session| {
session.handler().exit_status.get(&chan).is_some()
})
.and_then(move |mut session| {
if session.is_channel_open(chan) {
session.channel_close(chan);
}
session.wait(move |session| !session.is_channel_open(chan))
})
}),
)
.unwrap(),
);
if let Some(ref mut session) = self.session {
if let State::DownloadPatch { mut file, .. } =
std::mem::replace(&mut session.handler_mut().state, State::None)
{
file.flush()?;
rename(&local_tmp_file, &local_file)?;
}
}
Ok(local_file)
}
pub fn remote_apply(
&mut self,
repo_root: &RepoRoot<impl AsRef<Path>>,
remote_branch: &str,
patch_hashes: Vec<Hash>,
) -> Result<(), Error> {
let pdir = repo_root.patches_dir();
let mut exit_status = None;
let esc_path = escape(Cow::Borrowed(&self.path));
let apply_cmd = format!(
"{} apply --repository {} --branch {:?}",
self.pijul_cmd, esc_path, remote_branch
);
let sign_cmd = format!("{} sign --repository {}", self.pijul_cmd, esc_path);
let session = self.session.take().unwrap();
self.session = Some(
self.l
.block_on(
session
.channel_open_session()
.and_then(move |(session, chan0)| {
session
.channel_open_session()
.and_then(move |(mut session, chan1)| {
session.handler_mut().exit_status.remove(&chan0);
session.handler_mut().channel = Some(chan0);
debug!("exec {:?}", apply_cmd);
session.exec(chan0, false, &apply_cmd);
debug!("exec {:?}", sign_cmd);
session.exec(chan1, false, &sign_cmd);
futures::stream::iter_ok(patch_hashes.into_iter())
.fold((session, Vec::new()), move |(session, buf), hash| {
let mut pdir = pdir.clone();
pdir.push(hash.to_base58());
pdir.set_extension("gz");
let f = std::fs::File::open(&pdir).unwrap();
pdir.set_extension("sig");
if let Ok(sig) = std::fs::File::open(&pdir) {
futures::future::Either::A(
(SendFile {
f: f,
buf: Some(buf),
chan: chan0,
state: Some(SendFileState::Read(session)),
})
.and_then(move |(session, mut buf)| {
buf.clear();
SendFile {
f: sig,
buf: Some(buf),
chan: chan1,
state: Some(SendFileState::Read(
session,
)),
}
}),
)
} else {
futures::future::Either::B(SendFile {
f: f,
buf: Some(buf),
chan: chan0,
state: Some(SendFileState::Read(session)),
})
}
})
.and_then(move |(mut session, _)| {
session.channel_eof(chan0);
session
.wait(move |session| {
session
.handler()
.exit_status
.get(&chan0)
.is_some()
})
.map(move |mut session| {
exit_status = session
.handler()
.exit_status
.get(&chan0)
.map(|x| *x);
session.channel_close(chan0);
session
})
})
.map_err(From::from)
})
}),
)
.unwrap(),
);
if let Some(ref session) = self.session {
debug!("exit status = {:?}", session.handler().exit_status);
}
Ok(())
}
pub fn remote_init(&mut self) -> Result<(), Error> {
let esc_path = escape(Cow::Borrowed(self.path));
let cmd = format!("{} init {}", self.pijul_cmd, esc_path);
debug!("command line:{:?}", cmd);
self.session = Some(
self.l
.block_on(
self.session
.take()
.unwrap()
.channel_open_session()
.and_then(move |(mut session, chan)| {
debug!("chan = {:?}", chan);
session.handler_mut().exit_status.remove(&chan);
session.handler_mut().channel = Some(chan);
session.exec(chan, false, &cmd);
session.channel_eof(chan);
session
.wait(move |session| {
session.handler().exit_status.get(&chan).is_some()
})
.and_then(move |mut session| {
if session.is_channel_open(chan) {
session.channel_close(chan);
}
session.wait(move |session| !session.is_channel_open(chan))
})
}),
)
.unwrap(),
);
Ok(())
}
}
impl<'a> UriSession<'a> {
pub fn changes(
&mut self,
branch: &str,
path: &[RepoPath<impl AsRef<Path>>],
) -> Result<Vec<(Hash, ApplyTimestamp)>, Error> {
if !path.is_empty() {
return Err(Error::PartialPullOverHttp);
}
let mut uri = self.uri.to_string();
uri = uri + "/" + PIJUL_DIR_NAME + "/" + &branch_changes_base_path(branch);
let mut req = reqwest_async::Request::new(reqwest::Method::GET, uri.parse().unwrap());
req.headers_mut().insert(
reqwest::header::CONNECTION,
reqwest::header::HeaderValue::from_static("close"),
);
let res: Vec<u8> = self.l.block_on(self.client.execute(req).and_then(
|resp: reqwest_async::Response| {
let res = Vec::new();
let body = resp.into_body();
body.fold(res, |mut res, x| {
res.extend(x.iter());
futures::finished::<_, reqwest::Error>(res)
})
},
))?;
let changes = read_changes(&mut &res[..]).unwrap_or(Vec::new());
debug!("http: {:?}", changes);
Ok(changes)
}
pub fn fetch_patch(
&mut self,
patch_hash: &Hash,
local_file: PathBuf,
local_tmp_file: PathBuf,
) -> Result<PathBuf, Error> {
let ref mut l = self.l;
let ref mut client = self.client;
let uri = self.uri.to_string()
+ "/"
+ PIJUL_DIR_NAME
+ "/patches/"
+ &patch_hash.to_base58()
+ ".gz";
debug!("downloading uri {:?}", uri);
let req = reqwest_async::Request::new(reqwest::Method::GET, uri.parse().unwrap());
let uri_sig = self.uri.to_string()
+ "/"
+ PIJUL_DIR_NAME
+ "/patches/"
+ &patch_hash.to_base58()
+ ".sig";
debug!("{:?}", uri_sig);
let req_sig = reqwest_async::Request::new(reqwest::Method::GET, uri_sig.parse().unwrap());
let mut local_sig_file = local_file.clone();
let mut local_tmp_sig_file = local_tmp_file.clone();
local_sig_file.set_extension("sig");
local_tmp_sig_file.set_extension("sig");
let res = l
.block_on(
client
.execute(req)
.and_then(move |resp| {
if resp.status() == reqwest::StatusCode::OK {
let res = Vec::new();
futures::future::Either::A(
resp.into_body()
.fold(res, |mut res, x| {
res.extend(x.iter());
futures::finished::<_, reqwest::Error>(res)
})
.map(|body| {
let mut f = File::create(&local_tmp_file).unwrap();
f.write_all(&body).unwrap();
Some((local_tmp_file, local_file))
}),
)
} else {
futures::future::Either::B(futures::finished(None))
}
})
.join(client.execute(req_sig).then(move |resp| {
let resp = if let Ok(resp) = resp {
resp
} else {
return futures::future::Either::B(futures::finished(None));
};
debug!("sig status {:?}", resp.status());
if resp.status() == reqwest::StatusCode::OK {
let res = Vec::new();
futures::future::Either::A(
resp.into_body()
.fold(res, |mut res, x| {
res.extend(x.iter());
futures::finished::<_, reqwest::Error>(res)
})
.map(|body| {
let mut f = File::create(&local_tmp_sig_file).unwrap();
f.write_all(&body).unwrap();
Some((local_tmp_sig_file, local_sig_file))
}),
)
} else {
futures::future::Either::B(futures::finished(None))
}
})),
)
.unwrap();
if let Some((local_tmp_file, local_file)) = res.0 {
debug!("renaming {:?} to {:?}", local_tmp_file, local_file);
rename(&local_tmp_file, &local_file)?;
if let Some((local_tmp_sig_file, local_sig_file)) = res.1 {
debug!("renaming {:?} to {:?}", local_tmp_sig_file, local_sig_file);
rename(&local_tmp_sig_file, &local_sig_file).unwrap_or(());
}
Ok(local_file)
} else {
Err(Error::PatchNotFound {
repo_root: self.uri.into(),
patch_hash: patch_hash.to_owned(),
})
}
}
}
impl<'a> LocalSession<'a> {
pub fn changes(
&mut self,
branch: &str,
path: &[RepoPath<impl AsRef<Path>>],
) -> Result<Vec<(Hash, ApplyTimestamp)>, Error> {
let repo = self.root.open_repo(None)?;
let txn = repo.txn_begin()?;
if let Some(branch) = txn.get_branch(&branch) {
if !path.is_empty() {
let mut patches = Vec::new();
for (hash, s) in txn.iter_patches(&branch, None) {
for path in path {
let inode = txn.find_inode(path).unwrap();
let key = txn.get_inodes(inode).unwrap().key;
if txn.get_touched(key, hash) {
patches.push((txn.get_external(hash).unwrap().to_owned(), s));
break;
}
}
}
Ok(patches)
} else {
Ok(txn
.iter_patches(&branch, None)
.map(|(hash, s)| (txn.get_external(hash).unwrap().to_owned(), s))
.collect())
}
} else {
Ok(Vec::new())
}
}
pub fn fetch_patch(
&mut self,
patch_hash: &Hash,
local_file: PathBuf,
) -> Result<PathBuf, Error> {
debug!("local downloading {:?}", patch_hash);
let remote_file = self
.root
.patches_dir()
.join(&patch_file_name(patch_hash.as_ref()));
debug!("hard linking {:?} to {:?}", remote_file, local_file);
if hard_link(&remote_file, &local_file).is_err() {
copy(&remote_file, &local_file)?;
}
Ok(local_file)
}
pub fn remote_apply(
&mut self,
repo_root: &RepoRoot<impl AsRef<Path>>,
remote_branch: &str,
patch_hashes: &Vec<Hash>,
) -> Result<Vec<ConflictingFile>, Error> {
let mut remote_path = self.root.patches_dir();
let mut local_path = repo_root.patches_dir();
let remote_current_branch = self.root.get_current_branch()?;
for hash in patch_hashes {
remote_path.push(&hash.to_base58());
remote_path.set_extension("gz");
local_path.push(&hash.to_base58());
local_path.set_extension("gz");
debug!("hard linking {:?} to {:?}", local_path, remote_path);
if metadata(&remote_path).is_err() {
if hard_link(&local_path, &remote_path).is_err() {
copy(&local_path, &remote_path)?;
}
}
remote_path.set_extension("sig");
local_path.set_extension("sig");
if metadata(&remote_path).is_err() && metadata(&local_path).is_ok() {
if hard_link(&local_path, &remote_path).is_err() {
copy(&local_path, &remote_path)?;
}
}
local_path.pop();
remote_path.pop();
}
loop {
let app = if remote_current_branch != remote_branch {
apply_resize_no_output(&self.root, &remote_branch, patch_hashes.iter(), |_, _| {})
.map(|_| Vec::new())
} else {
apply_resize(
libpijul::DiffAlgorithm::default(),
&self.root,
&remote_branch,
patch_hashes.iter(),
&[] as &[RepoPath<&Path>],
|_, _| {},
)
};
match app {
Err(ref e) if e.lacks_space() => debug!("lacks space"),
Ok(v) => return Ok(v),
Err(e) => return Err(From::from(e)),
}
}
}
}
#[derive(Debug, Clone)]
pub struct PushablePatches {
pub pushable: Vec<(Hash, Option<PatchId>, ApplyTimestamp)>,
pub non_fast_forward: Vec<Hash>,
}
impl<'a> Session<'a> {
pub fn changes(
&mut self,
branch: &str,
remote_path: &[RepoPath<impl AsRef<Path>>],
) -> Result<Vec<(Hash, ApplyTimestamp)>, Error> {
match *self {
Session::Ssh(ref mut ssh_session) => ssh_session.changes(branch, remote_path),
Session::Local(ref mut local_session) => local_session.changes(branch, remote_path),
Session::Uri(ref mut uri_session) => uri_session.changes(branch, remote_path),
}
}
pub fn download_patch(
&mut self,
repo_root: &RepoRoot<impl AsRef<Path>>,
patch_hash: &Hash,
) -> Result<PathBuf, Error> {
let patches_dir_ = repo_root.patches_dir();
let local_file = patches_dir_.join(&patch_file_name(patch_hash.as_ref()));
if !metadata(&local_file).is_ok() {
match *self {
Session::Local(ref mut local_session) => {
local_session.fetch_patch(patch_hash, local_file)
}
Session::Ssh(ref mut ssh_session) => {
let tmp_dir = tempdir_in(&patches_dir_)?;
let local_tmp_file = tmp_dir.path().join("patch");
ssh_session.fetch_patch(patch_hash, local_file, local_tmp_file)
}
Session::Uri(ref mut uri_session) => {
let tmp_dir = tempdir_in(&patches_dir_)?;
let local_tmp_file = tmp_dir.path().join("patch");
uri_session.fetch_patch(patch_hash, local_file, local_tmp_file)
}
}
} else {
Ok(local_file)
}
}
fn remote_apply(
&mut self,
repo_root: &RepoRoot<impl AsRef<Path>>,
remote_branch: &str,
patch_hashes: Vec<Hash>,
) -> Result<(), Error> {
match *self {
Session::Ssh(ref mut ssh_session) => {
ssh_session.remote_apply(repo_root, remote_branch, patch_hashes)
}
Session::Local(ref mut local_session) => local_session
.remote_apply(repo_root, remote_branch, &patch_hashes)
.map(|_| ()),
_ => panic!("upload to URI impossible"),
}
}
pub fn remote_init(&mut self) -> Result<(), Error> {
match *self {
Session::Ssh(ref mut ssh_session) => ssh_session.remote_init(),
Session::Local(ref mut local_session) => {
assert_no_containing_repo(local_session.root.repo_root)?;
create_repo(local_session.root.repo_root)
}
_ => panic!("remote init not possible"),
}
}
pub fn pullable_patches(
&mut self,
remote_branch: &str,
local_branch: &str,
target: &RepoRoot<impl AsRef<Path>>,
remote_path: &[RepoPath<impl AsRef<Path>>],
) -> Result<Pullable, Error> {
let mut remote_patches: Vec<(Hash, ApplyTimestamp)> = self
.changes(remote_branch, remote_path)?
.into_iter()
.map(|(h, s)| (h.to_owned(), s))
.collect();
remote_patches.sort_by(|&(_, ref a), &(_, ref b)| a.cmp(&b));
let local_patches: HashMap<Hash, ApplyTimestamp> = {
let repo_dir = target.pristine_dir();
let repo = Repository::open(&repo_dir, None)?;
let txn = repo.txn_begin()?;
if let Some(branch) = txn.get_branch(&local_branch) {
txn.iter_patches(&branch, None)
.map(|(hash, s)| (txn.get_external(hash).unwrap().to_owned(), s))
.collect()
} else {
HashMap::new()
}
};
debug!("pullable done: {:?}", remote_patches);
Ok(Pullable {
local: local_patches.iter().map(|(h, _)| h.to_owned()).collect(),
remote: remote_patches.into_iter().collect(),
})
}
pub fn pull(
&mut self,
target: &RepoRoot<impl AsRef<Path>>,
to_branch: &str,
pullable: &mut Vec<(Hash, ApplyTimestamp)>,
partial_paths: &[RepoPath<impl AsRef<Path>>],
display_progress: bool,
) -> Result<Vec<ConflictingFile>, Error> {
let mut p = if display_progress && !pullable.is_empty() {
Some((progrs::start("Pulling patches", pullable.len() as u64), 0))
} else {
None
};
let mut pullable_plus_deps = Vec::new();
let mut pulled = HashSet::new();
while let Some((hash, _)) = pullable.pop() {
if pulled.contains(&hash) {
continue;
}
debug!("hash = {:?}", hash);
let path = self.download_patch(target, &hash)?;
let patch = {
let file = File::open(&path)?;
let mut file = BufReader::new(file);
Patch::from_reader_compressed(&mut file)?.2
};
pulled.insert(hash.clone());
if !partial_paths.is_empty() {
for dep in patch.dependencies() {
if !pulled.contains(dep) {
pullable.push((dep.to_owned(), 0));
}
}
}
pullable_plus_deps.push((hash.to_owned(), patch));
p.as_mut().map(|&mut (ref mut p, ref mut n)| {
p.display({
*n = *n + 1;
*n
})
});
}
pullable_plus_deps.reverse();
p.map(|(p, _)| p.stop("done"));
debug!("patches downloaded");
let p = std::cell::RefCell::new(progrs::start(
"Applying patches",
pullable_plus_deps.len() as u64,
));
let mut size_increase = 4096;
let current_branch = target.get_current_branch()?;
let conflicts = loop {
let app = if current_branch != to_branch {
apply_resize_patches_no_output(
&target,
&to_branch,
&pullable_plus_deps,
size_increase,
|c, _| p.borrow_mut().display(c as u64),
)
.map(|_| Vec::new())
} else {
apply_resize_patches(
libpijul::DiffAlgorithm::default(),
&target,
&to_branch,
&pullable_plus_deps,
size_increase,
partial_paths,
|c, _| p.borrow_mut().display(c as u64),
)
};
match app {
Ok(conflicts) => break conflicts,
Err(ref e) if e.lacks_space() => size_increase *= 2,
Err(e) => return Err(e.into()),
}
};
p.into_inner().stop("done");
Ok(conflicts)
}
pub fn pushable_patches(
&mut self,
from_branch: &str,
to_branch: &str,
source: &RepoRoot<impl AsRef<Path> + std::fmt::Debug>,
remote_paths: &[RepoPath<impl AsRef<Path>>],
) -> Result<PushablePatches, Error> {
debug!("source: {:?}", source);
let mut non_fast_forward = Vec::new();
let to_changes_ = self.changes(to_branch, remote_paths)?;
let repo = source.open_repo(None)?;
let txn = repo.txn_begin()?;
let mut to_changes = HashSet::new();
let branch = txn.get_branch(&from_branch);
for (h, _) in to_changes_.iter() {
if let Some(ref branch) = branch {
if let Some(patchid) = txn.get_internal(h.as_ref()) {
if txn.get_patch(&branch.patches, patchid).is_none() {
non_fast_forward.push(h.clone())
}
} else {
non_fast_forward.push(h.clone())
}
}
to_changes.insert(h.as_ref());
}
debug!("to_changes: {:?}", to_changes);
let from_changes: Vec<_> = {
if let Some(branch) = txn.get_branch(&from_branch) {
txn.iter_applied(&branch, None)
.filter_map(|(s, patchid)| {
if let Some(hash) = txn.get_external(patchid) {
if to_changes.contains(&hash) {
None
} else {
Some((hash.to_owned(), Some(patchid), s))
}
} else {
None
}
})
.collect()
} else {
Vec::new()
}
};
debug!("pushing: {:?}", from_changes);
Ok(PushablePatches {
pushable: from_changes,
non_fast_forward,
})
}
pub fn push(
&mut self,
source: &RepoRoot<impl AsRef<Path>>,
remote_branch: &str,
pushable: Vec<Hash>,
) -> Result<(), Error> {
debug!("push, remote_applying");
debug!("pushable: {:?}", pushable);
if pushable.len() > 0 {
self.remote_apply(source, remote_branch, pushable)?;
}
Ok(())
}
}
pub fn ssh_connect(
user: &Option<&str>,
host: &str,
port: Option<u16>,
) -> Result<(thrussh_config::Config, thrussh_config::ConnectFuture), Error> {
let mut ssh_config =
thrussh_config::parse_home(host).unwrap_or(thrussh_config::Config::default());
debug!("ssh_config = {:?}", ssh_config);
if ssh_config.host_name.is_none() {
ssh_config.host_name = Some(host.to_string())
}
if let Some(port) = port {
ssh_config.port = Some(port)
} else if ssh_config.port.is_none() {
ssh_config.port = Some(22)
}
if let Some(ref user) = *user {
ssh_config.user = Some(user.to_string())
} else if ssh_config.user.is_none() {
ssh_config.user = Some(username::get_user_name().unwrap())
}
ssh_config.update_proxy_command();
let stream = if let Some(ref proxycmd) = ssh_config.proxy_command {
debug!("{:?}", proxycmd);
thrussh_config::Stream::proxy_command("sh", &["-c", proxycmd.as_str()])
} else {
let addr = if let Some(addrs) = (
ssh_config.host_name.as_ref().unwrap().as_str(),
ssh_config.port.unwrap(),
)
.to_socket_addrs()?
.next()
{
addrs
} else {
return Err(Error::UnknownHost {
host: host.to_string(),
});
};
debug!("addr = {:?}", addr);
thrussh_config::Stream::tcp_connect(&addr)
};
Ok((ssh_config, stream))
}
impl<'a> Remote<'a> {
pub fn session(&'a self) -> Result<Session<'a>, Error> {
match *self {
Remote::Local {
path: RepoRoot {
repo_root: ref path,
},
} => Ok(Session::Local(LocalSession {
root: RepoRoot { repo_root: path },
})),
Remote::Uri { uri } => {
let l = tokio::runtime::Runtime::new().unwrap();
let proxy_url = std::env::var("http_proxy");
let c = match proxy_url {
Err(std::env::VarError::NotPresent) => reqwest_async::Client::new(),
Ok(p_url) => reqwest_async::Client::builder()
.proxy(reqwest::Proxy::all(reqwest::Url::parse(&p_url).unwrap())?)
.build()?,
Err(std::env::VarError::NotUnicode(s)) => {
panic!("invalid http_proxy value: {:?}", s)
}
};
Ok(Session::Uri(UriSession {
l,
uri: uri,
client: c,
}))
}
Remote::Ssh(ref remote) => Ok(Session::Ssh(remote.session()?)),
}
}
}
impl<'a> SshRemote<'a> {
pub fn session(&'a self) -> Result<SshSession<'a>, Error> {
let mut l = tokio::runtime::Runtime::new().unwrap();
let (ssh_config, stream) = ssh_connect(&self.user, self.host, self.port)?;
let config = Arc::new(thrussh::client::Config::default());
let local_repo_root = self.local_repo_root.map(|x| x.to_path_buf());
let host = self.host.to_string();
let handler = Client::new(
ssh_config.port,
ssh_config.host_name.as_ref().unwrap().as_str(),
&mut l,
);
let session: thrussh::client::Connection<_, _> =
l.block_on(stream.map_err(Error::from).and_then(move |socket| {
let use_agent = handler.agent.is_some();
futures::future::result(thrussh::client::Connection::new(
config.clone(),
socket,
handler,
None,
))
.from_err()
.and_then(move |connection| {
debug!("connection done");
use super::ssh_auth_attempts::{auth_attempt_future, AuthAttempts};
let user = ssh_config.user.unwrap();
auth_attempt_future(
connection,
AuthAttempts::new(host, local_repo_root, use_agent),
user,
ssh_config.add_keys_to_agent,
)
})
}))?;
debug!("session ready");
Ok(SshSession {
l,
session: Some(session),
path: self.path,
pijul_cmd: &self.pijul_cmd,
})
}
}
pub fn parse_ssh_remote<'a>(
remote_id: &'a str,
port: Option<u16>,
local_repo_root: Option<&'a Path>,
) -> Option<SshRemote<'a>> {
let ssh = Regex::new(r"^([^:]*):(.+)$").unwrap();
if ssh.is_match(remote_id) {
let cap = ssh.captures(remote_id).unwrap();
let user_host = cap.get(1).unwrap().as_str();
let (user, host) = {
let ssh_user_host = Regex::new(r"^([^@]*)@(.*)$").unwrap();
if ssh_user_host.is_match(user_host) {
let cap = ssh_user_host.captures(user_host).unwrap();
(
Some(cap.get(1).unwrap().as_str()),
cap.get(2).unwrap().as_str(),
)
} else {
(None, user_host)
}
};
let pijul_cmd = super::remote_pijul_cmd();
Some(SshRemote {
user: user,
host: host,
port: port,
id: remote_id,
path: cap.get(2).unwrap().as_str(),
local_repo_root,
pijul_cmd,
})
} else {
None
}
}
pub fn parse_ssh_remote_nopath<'a>(remote_id: &'a str, port: Option<u16>) -> Option<SshRemote<'a>> {
let (user, host) = {
let ssh_user_host = Regex::new(r"^([^@]*)@(.*)$").unwrap();
if ssh_user_host.is_match(remote_id) {
let cap = ssh_user_host.captures(remote_id).unwrap();
(
Some(cap.get(1).unwrap().as_str()),
cap.get(2).unwrap().as_str(),
)
} else {
(None, remote_id)
}
};
let pijul_cmd = super::remote_pijul_cmd();
Some(SshRemote {
user: user,
host: host,
port: port,
id: remote_id,
path: "",
local_repo_root: None,
pijul_cmd,
})
}
pub fn parse_remote<'a>(
remote_id: &'a str,
port: Option<u16>,
base_path: Option<&'a Path>,
local_repo_root: Option<&'a Path>,
) -> Remote<'a> {
let uri = Regex::new(r"^([a-zA-Z]*)://(.*)$").unwrap();
if uri.is_match(remote_id) {
let cap = uri.captures(remote_id).unwrap();
if &cap[1] == "file" {
if let Some(a) = base_path {
let path = a.join(&cap[2]);
Remote::Local {
path: { RepoRoot { repo_root: path } },
}
} else {
let path = Path::new(&cap[2]).to_path_buf();
Remote::Local {
path: { RepoRoot { repo_root: path } },
}
}
} else {
Remote::Uri { uri: remote_id }
}
} else if let Some(rem) = parse_ssh_remote(remote_id, port, local_repo_root) {
Remote::Ssh(rem)
} else {
if let Some(a) = base_path {
let path = a.join(remote_id);
Remote::Local {
path: RepoRoot { repo_root: path },
}
} else {
let path = Path::new(remote_id).to_path_buf();
Remote::Local {
path: RepoRoot { repo_root: path },
}
}
}
}
#[derive(Debug)]
pub struct Pullable {
pub local: HashSet<Hash>,
pub remote: Vec<(Hash, ApplyTimestamp)>,
}
pub struct PullableIterator<'a> {
remote: std::slice::Iter<'a, (Hash, ApplyTimestamp)>,
local: &'a HashSet<Hash>,
}
impl Pullable {
pub fn iter(&self) -> PullableIterator {
PullableIterator {
local: &self.local,
remote: self.remote.iter(),
}
}
}
impl<'a> Iterator for PullableIterator<'a> {
type Item = (Hash, ApplyTimestamp);
fn next(&mut self) -> Option<Self::Item> {
while let Some(&(ref h, t)) = self.remote.next() {
if !self.local.contains(h) {
return Some((h.to_owned(), t));
}
}
None
}
}