1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445
//! A distributed video encoder that splits files into chunks to encode them on
//! multiple machines in parallel.
//!
//! ## Installation
//!
//! Using Cargo, you can do
//! ```console
//! $ cargo install shepherd
//! ```
//! or just clone the repository and compile the binary with
//! ```console
//! $ git clone https://github.com/martindisch/shepherd
//! $ cd shepherd
//! $ cargo build --release
//! ```
//! There's also a
//! [direct download](https://github.com/martindisch/shepherd/releases/latest/download/shepherd)
//! for the latest x86-64 ELF binary.
//!
//! ## Usage
//!
//! The prerequisites are one or more (you'll want more) computers—which we'll
//! refer to as hosts—with `ffmpeg` installed and configured such that you can
//! SSH into them directly. This means you'll have to `ssh-copy-id` your public
//! key to them. I only tested it on Linux, but if you manage to set up
//! `ffmpeg` and SSH, it might work on macOS or Windows directly or with little
//! modification.
//!
//! The usage is pretty straightforward:
//! ```text
//! USAGE:
//! shepherd [FLAGS] [OPTIONS] <IN> <OUT> --clients <hostnames> [FFMPEG OPTIONS]...
//!
//! FLAGS:
//! -h, --help Prints help information
//! -k, --keep Don't clean up temporary files
//! -V, --version Prints version information
//!
//! OPTIONS:
//! -c, --clients <hostnames> Comma-separated list of encoding hosts
//! -l, --length <seconds> The length of video chunks in seconds
//! -t, --tmp <path> The path to the local temporary directory
//!
//! ARGS:
//! <IN> The original video file
//! <OUT> The output video file
//! <FFMPEG OPTIONS>... Options/flags for ffmpeg encoding of chunks. The
//! chunks are video only, so don't pass in anything
//! concerning audio. Input/output file names are added
//! by the application, so there is no need for that
//! either. This is the last positional argument and
//! needs to be preceeded by double hypens (--) as in:
//! shepherd -c c1,c2 in.mp4 out.mp4 -- -c:v libx264
//! -crf 26 -preset veryslow -profile:v high -level 4.2
//! -pix_fmt yuv420p
//! This is also the default that is used if no options
//! are provided.
//! ```
//!
//! So if we have three machines c1, c2 and c3, we could do
//! ```console
//! $ shepherd -c c1,c2,c3 -l 30 source_file.mp4 output_file.mp4
//! ```
//! to have it split the video in roughly 30 second chunks and encode them in
//! parallel. By default it encodes in H.264 with a CRF value of 26 and the
//! `veryslow` preset. If you want to supply your own `ffmpeg` options for more
//! control over the codec, you can do so by adding them to the end of the
//! invocation:
//! ```console
//! $ shepherd -c c1,c2 input.mkv output.mp4 -- -c:v libx264 -crf 40
//! ```
//!
//! ## How it works
//!
//! 1. Creates a temporary directory in your home directory.
//! 2. Extracts the audio and encodes it. This is not parallelized, but the
//! time this takes is negligible compared to the video anyway.
//! 3. Splits the video into chunks. This can take relatively long, since
//! you're basically writing the full file to disk again. It would be nice
//! if we could read chunks of the file and directly transfer them to the
//! hosts, but that might be tricky with `ffmpeg`.
//! 4. Spawns a manager and an encoder thread for every host. The manager
//! creates a temporary directory in the home directory of the remote and
//! makes sure that the encoder always has something to encode. It will
//! transfer a chunk, give it to the encoder to work on and meanwhile
//! transfer another chunk, so the encoder can start directly with that once
//! it's done, without wasting any time. But it will keep at most one chunk
//! in reserve, to prevent the case where a slow machine takes too many
//! chunks and is the only one still encoding while the faster ones are
//! already done.
//! 5. When an encoder is done and there are no more chunks to work on, it will
//! quit and the manager transfers the encoded chunks back before
//! terminating itself.
//! 6. Once all encoded chunks have arrived, they're concatenated and the audio
//! stream added.
//! 7. All remote and the local temporary directory are removed.
//!
//! Thanks to the work stealing method of distribution, having some hosts that
//! are significantly slower than others does not delay the overall operation.
//! In the worst case, the slowest machine is the last to start encoding a
//! chunk and remains the only working encoder for the duration it takes to
//! encode this one chunk. This window can easily be reduced by using smaller
//! chunks.
//!
//! ## Performance
//!
//! As with all things parallel, Amdahl's law rears its ugly head and you don't
//! just get twice the speed with twice the processing power. With this
//! approach, you pay for having to split the video into chunks before you
//! begin, transferring them to the encoders and the results back, and
//! reassembling them. Although I should clarify that transferring the chunks
//! to the encoders only causes a noticeable delay until every encoder has its
//! first chunk, the subsequent ones can be sent while the encoders are working
//! so they don't waste time waiting for that. And returning and assembling the
//! encoded chunks doesn't carry too big of a penalty, since we're dealing with
//! much more compressed data then.
//!
//! To get a better understanding of the tradeoffs, I did some testing with a
//! couple of computers I had access to. They were my main, pretty capable
//! desktop, two older ones and a laptop. To figure out how capable each of
//! them is so we can compare the actual to the expected speedup, I let each of
//! them encode a relatively short clip of slightly less than 4 minutes taken
//! from the real video I want to encode, using the same settings I'd use for
//! the real job. And if you're wondering why encoding takes so long, it's
//! because I'm using the `veryslow` preset for maximum efficiency, even though
//! it's definitely not worth the huge increase in encoding time. But it's a
//! nice simulation for how it would look if we were using an even more
//! demanding codec like AV1.
//!
//! | machine | duration (s) | power |
//! | --------- | ------------ | -------- |
//! | desktop | 1373 | 1.000 |
//! | old1 | 2571 | 0.53 |
//! | old2 | 3292 | 0.42 |
//! | laptop | 5572 | 0.25 |
//! | **total** | - | **2.20** |
//!
//! By giving my desktop the "power" level 1, we can determine how powerful the
//! others are at this encoding task, based on how long it takes them in
//! comparison. By adding the three other, less capable machines to the mix, we
//! slightly more than double the theoretical encoding capability of our
//! system.
//!
//! I determined these power levels on a short clip, because encoding the full
//! video would have taken very long on the less capable ones, especially the
//! laptop. But I still needed to encode the full thing on at least one of them
//! to make the comparison to the distributed encoding. I did that on my
//! desktop since it's the fastest one, and to additionally verify that the
//! power levels hold up for the full video, I bit the bullet and did the same
//! on the second most powerful machine.
//!
//! | machine | duration (s) | power |
//! | ------- | ------------- | ----- |
//! | desktop | 9356 | 1.00 |
//! | old1 | 17690 | 0.53 |
//!
//! Now we have the baseline we want to beat with parallel encoding, as well as
//! confirmation that the power levels are valid for the full video. Let's see
//! how much of the theoretical, but unreachable 2.2x speedup we can get.
//!
//! Encoding the video in parallel took 5283 seconds, so 56.5% of the time
//! using my fastest computer, or a 1.77x speedup. We committed about twice the
//! computing power and we're not too far off that two times speedup. It's
//! making use of the additionally available resources with an 80% efficiency
//! in this case. I also tried to encode the short clip in parallel, which was
//! very fast, but had a somewhat disappointing speedup of only 1.32x. I
//! suspect that we get better results with longer videos, since encoding a
//! chunk always takes longer than creating and transferring it (otherwise
//! distributing wouldn't make sense at all). The longer the video then, the
//! larger the ratio of encoding (which we can parallelize) in the total amount
//! of time the process takes, and the more effective doing so becomes.
//!
//! I've also looked at how the work is distributed over the nodes, depending
//! on their processing power. At the end of a parallel encode, it's possible
//! to determine how many chunks have been encoded by any given host.
//!
//! | host | chunks | power |
//! | ------------- | ------ | ----- |
//! | desktop | 73 | 1.00 |
//! | old1 | 39 | 0.53 |
//! | old2 | 31 | 0.42 |
//! | laptop | 19 | 0.26 |
//!
//! Inferring the processing power from the number of chunks leads to almost
//! exactly the same results as my initial determination, confirming it and
//! proving that work is distributed efficiently.
//!
//! To further see how the system scales, I've added two more machines,
//! bringing the total processing power up to 3.29.
//!
//! | machine | duration (s) | power |
//! | --------- | ------------ | -------- |
//! | desktop | 1373 | 1.00 |
//! | c1 | 2129 | 0.64 |
//! | old1 | 2571 | 0.53 |
//! | c2 | 3022 | 0.45 |
//! | old2 | 3292 | 0.42 |
//! | laptop | 5572 | 0.25 |
//! | **total** | - | **3.29** |
//!
//! Encoding the video on these 6 machines in parallel took 3865 seconds, so
//! 41.3% of the time using my fastest computer, or a 2.42x speedup. It's
//! making use of the additionally available resources with a 74% efficiency
//! here. As expected, while we can accelerate by adding more resources, we're
//! looking at diminishing returns. Although the factor by which the efficiency
//! decreases is not as bad as it could be.
//!
//! ## Limitations
//!
//! While you can use your own `ffmpeg` options to control how the video is
//! encoded, there is currently no such option for the audio, which is 192 kb/s
//! AAC by default.
use crossbeam::channel;
use dirs;
use log::{error, info};
use std::{
error::Error,
fs,
path::{Path, PathBuf},
process::Command,
string::ToString,
sync::atomic::{AtomicBool, Ordering},
sync::Arc,
thread,
time::Duration,
};
mod local;
mod remote;
/// The name of the temporary directory in the home directory to collect
/// intermediate files.
const TMP_DIR: &str = "shepherd_tmp";
/// The name of the encoded audio track.
const AUDIO: &str = "audio.aac";
/// The length of chunks to split the video into.
const DEFAULT_LENGTH: &str = "60";
/// The generic result type for this crate.
pub type Result<T> = std::result::Result<T, Box<dyn Error>>;
/// Starts the whole operation and cleans up afterwards.
///
/// # Arguments
/// * `input` - The path to the input file.
/// * `output` - The path to the output file.
/// * `args` - Arguments to `ffmpeg` for chunk encoding.
/// * `hosts` - Comma-separated list of hosts.
/// * `seconds` - The video chunk length.
/// * `tmp_dir` - The path to the local temporary directory.
/// * `keep` - Whether to keep temporary files on hosts (no cleanup).
pub fn run(
input: impl AsRef<Path>,
output: impl AsRef<Path>,
args: &[&str],
hosts: Vec<&str>,
seconds: Option<&str>,
tmp_dir: Option<&str>,
keep: bool,
) -> Result<()> {
// Convert the length
let seconds = seconds.unwrap_or(DEFAULT_LENGTH).parse::<u64>()?;
// Convert the tmp_dir
let mut tmp_dir = tmp_dir
.map(PathBuf::from)
.or_else(dirs::home_dir)
.ok_or("Home directory not found")?;
// Set up a shared boolean to check whether the user has aborted
let running = Arc::new(AtomicBool::new(true));
let r = Arc::clone(&running);
ctrlc::set_handler(move || {
r.store(false, Ordering::SeqCst);
info!(
"Abort signal received. Waiting for remote encoders to finish the \
current chunk and quit gracefully."
);
})
.expect("Error setting Ctrl-C handler");
tmp_dir.push(TMP_DIR);
// Remove local temporary directory in case it's still around
fs::remove_dir_all(&tmp_dir).ok();
// Create our local temporary directory
fs::create_dir(&tmp_dir)?;
// Start the operation
let result = run_local(
input.as_ref(),
output.as_ref(),
args,
&tmp_dir,
&hosts,
seconds,
Arc::clone(&running),
);
if !keep {
info!("Cleaning up");
// Remove remote temporary directories
for &host in &hosts {
// Clean up temporary directory on host
let output = Command::new("ssh")
.args(&[&host, "rm", "-r", remote::TMP_DIR])
.output()
.expect("Failed executing ssh command");
// These checks for `running` are necessary, because Ctrl + C also
// seems to terminate the commands we launch, which means they'll
// return unsuccessfully. With this check we prevent an error
// message in this case, because that's what the user wants.
// Unfortunately this also means we have to litter the `running`
// variable almost everyhwere.
if !output.status.success() && running.load(Ordering::SeqCst) {
error!(
"Failed removing remote temporary directory on {}",
host
);
}
}
// Remove local temporary directory
fs::remove_dir_all(&tmp_dir).ok();
}
result
}
/// Does the actual work.
///
/// This is separate so it can fail and return early, since cleanup is then
/// handled in its caller function.
fn run_local(
input: &Path,
output: &Path,
args: &[&str],
tmp_dir: &Path,
hosts: &[&str],
seconds: u64,
running: Arc<AtomicBool>,
) -> Result<()> {
// Build path to audio file
let mut audio = tmp_dir.to_path_buf();
audio.push(AUDIO);
// Start the extraction
info!("Extracting audio");
local::extract_audio(input, &audio, &running)?;
// We check whether the user has aborted before every time-intensive task.
// It's a better experience, but a bit ugly code-wise.
if !running.load(Ordering::SeqCst) {
// Abort early
return Ok(());
}
// Create directory for video chunks
let mut chunk_dir = tmp_dir.to_path_buf();
chunk_dir.push("chunks");
fs::create_dir(&chunk_dir)?;
// Split the video
info!("Splitting video into chunks");
local::split_video(
input,
&chunk_dir,
Duration::from_secs(seconds),
&running,
)?;
// Get the list of created chunks
let mut chunks = fs::read_dir(&chunk_dir)?
.map(|res| res.and_then(|readdir| Ok(readdir.path())))
.collect::<std::io::Result<Vec<PathBuf>>>()?;
// Sort them so they're in order. That's not strictly necessary, but nicer
// for the user to watch since it allows seeing the progress at a glance.
chunks.sort();
if !running.load(Ordering::SeqCst) {
// Abort early
return Ok(());
}
// Initialize the global channel for chunks
let (sender, receiver) = channel::unbounded();
// Send all chunks into it
for chunk in chunks {
sender.send(chunk)?;
}
// Drop the sender so the channel gets disconnected
drop(sender);
// Since we want to share the ffmpeg arguments between the threads, we need
// to first set up an owned version of them
let args: Vec<String> = args.iter().map(ToString::to_string).collect();
// and then create our Arc
let args = Arc::new(args);
// Create directory for encoded chunks
let mut encoded_dir = tmp_dir.to_path_buf();
encoded_dir.push("encoded");
fs::create_dir(&encoded_dir)?;
// Isolate output extension, since we want encoded chunks to have the same
let out_ext = output
.extension()
.ok_or("Unable to find extension")?
.to_str()
.ok_or("Unable to convert OsString extension")?
.to_string();
// Spawn threads for hosts
info!("Starting remote encoding");
let mut host_threads = Vec::with_capacity(hosts.len());
for &host in hosts {
// Clone the queue receiver for the thread
let thread_receiver = receiver.clone();
// Create copy of running indicator for the thread
let r = Arc::clone(&running);
// And lots of other copies because it's easy and the extra allocations
// are not a problem for this kind of application
let host = host.to_string();
let enc = encoded_dir.clone();
let ext = out_ext.clone();
let a = Arc::clone(&args);
// Start it
let handle =
thread::Builder::new().name(host.clone()).spawn(|| {
remote::host_thread(host, thread_receiver, enc, ext, a, r);
})?;
host_threads.push(handle);
}
// Wait for all hosts to finish
for handle in host_threads {
if handle.join().is_err() {
return Err("A host thread panicked".into());
}
}
if !running.load(Ordering::SeqCst) {
// We aborted early
return Ok(());
}
// Combine encoded chunks and audio
info!("Combining encoded chunks into final video");
local::combine(&encoded_dir, &audio, output, &running)?;
Ok(())
}