use crossbeam::channel::{self, Receiver};
use log::{debug, info};
use std::{
path::PathBuf,
process::Command,
sync::atomic::{AtomicBool, Ordering},
sync::Arc,
thread,
};
pub static TMP_DIR: &str = "shepherd_tmp_remote";
pub fn host_thread(
host: String,
global_receiver: Receiver<PathBuf>,
encoded_dir: PathBuf,
out_ext: String,
args: Arc<Vec<String>>,
running: Arc<AtomicBool>,
) {
debug!("Spawned host thread {}", host);
Command::new("ssh")
.args(&[&host, "rm", "-r", crate::remote::TMP_DIR])
.output()
.expect("Failed executing ssh command");
let output = Command::new("ssh")
.args(&[&host, "mkdir", TMP_DIR])
.output()
.expect("Failed executing ssh command");
assert!(
output.status.success() || !running.load(Ordering::SeqCst),
"Failed creating remote temporary directory"
);
let (sender, receiver) = channel::bounded(0);
let host_cpy = host.clone();
let r = Arc::clone(&running);
let a = Arc::clone(&args);
let handle = thread::Builder::new()
.name(format!("{}-encoder", host))
.spawn(move || encoder_thread(host_cpy, out_ext, a, receiver, r))
.expect("Failed spawning thread");
while let Ok(chunk) = global_receiver.recv() {
debug!("Host thread {} received chunk {:?}", host, chunk);
let output = Command::new("scp")
.args(&[
chunk.to_str().expect("Invalid Unicode"),
&format!("{}:{}", host, TMP_DIR),
])
.output()
.expect("Failed executing scp command");
assert!(
output.status.success() || !running.load(Ordering::SeqCst),
"Failed transferring chunk"
);
if sender.send(chunk).is_err() {
break;
}
}
drop(sender);
debug!("Host thread {} waiting for encoder to finish", host);
let encoded = handle.join().expect("Encoder thread panicked");
if !running.load(Ordering::SeqCst) {
info!("{} exiting", host);
return;
}
debug!("Host thread {} got encoded chunks {:?}", host, encoded);
let encoded_dir = encoded_dir.to_str().expect("Invalid Unicode");
for chunk in &encoded {
let output = Command::new("scp")
.args(&[&format!("{}:{}", host, chunk), encoded_dir])
.output()
.expect("Failed executing scp command");
assert!(
output.status.success() || !running.load(Ordering::SeqCst),
"Failed transferring encoded chunk"
);
info!("{} returned encoded chunk {}", host, chunk);
}
debug!("Host thread {} exiting", host);
}
fn encoder_thread(
host: String,
out_ext: String,
args: Arc<Vec<String>>,
receiver: Receiver<PathBuf>,
running: Arc<AtomicBool>,
) -> Vec<String> {
let mut encoded = Vec::new();
while let Ok(chunk) = receiver.recv() {
if !running.load(Ordering::SeqCst) {
break;
}
debug!("Encoder thread {} received chunk {:?}", host, chunk);
let chunk_name = format!(
"{}/{}",
TMP_DIR,
chunk
.file_name()
.expect("No normal file")
.to_str()
.expect("Invalid Unicode")
);
let enc_name = format!(
"{}/enc_{}.{}",
TMP_DIR,
chunk
.file_stem()
.expect("No normal file")
.to_str()
.expect("Invalid Unicode"),
out_ext
);
let mut command: Vec<&str> =
vec![&host, "ffmpeg", "-y", "-i", &chunk_name];
command.extend(args.iter().map(|s| s.as_str()));
command.push(&enc_name);
info!("{} starts encoding chunk {:?}", host, chunk);
let output = Command::new("ssh")
.args(&command)
.output()
.expect("Failed executing ssh command");
assert!(
output.status.success() || !running.load(Ordering::SeqCst),
"Failed encoding"
);
encoded.push(enc_name);
}
debug!("Encoder thread {} exiting", host);
encoded
}