shepherd/
lib.rs

1//! A distributed video encoder that splits files into chunks to encode them on
2//! multiple machines in parallel.
3//!
4//! ## Installation
5//!
6//! Using Cargo, you can do
7//! ```console
8//! $ cargo install shepherd
9//! ```
10//! or just clone the repository and compile the binary with
11//! ```console
12//! $ git clone https://github.com/martindisch/shepherd
13//! $ cd shepherd
14//! $ cargo build --release
15//! ```
16//! There's also a
17//! [direct download](https://github.com/martindisch/shepherd/releases/latest/download/shepherd)
18//! for the latest x86-64 ELF binary.
19//!
20//! ## Usage
21//!
22//! The prerequisites are one or more (you'll want more) computers—which we'll
23//! refer to as hosts—with `ffmpeg` installed and configured such that you can
24//! SSH into them directly. This means you'll have to `ssh-copy-id` your public
25//! key to them. I only tested it on Linux, but if you manage to set up
26//! `ffmpeg` and SSH, it might work on macOS or Windows directly or with little
27//! modification.
28//!
29//! The usage is pretty straightforward:
30//! ```text
31//! USAGE:
32//!     shepherd [FLAGS] [OPTIONS] <IN> <OUT> --clients <hostnames> [FFMPEG OPTIONS]...
33//!
34//! FLAGS:
35//!     -h, --help       Prints help information
36//!     -k, --keep       Don't clean up temporary files
37//!     -V, --version    Prints version information
38//!
39//! OPTIONS:
40//!     -c, --clients <hostnames>    Comma-separated list of encoding hosts
41//!     -l, --length <seconds>       The length of video chunks in seconds
42//!     -t, --tmp <path>             The path to the local temporary directory
43//!
44//! ARGS:
45//!     <IN>                   The original video file
46//!     <OUT>                  The output video file
47//!     <FFMPEG OPTIONS>...    Options/flags for ffmpeg encoding of chunks. The
48//!                            chunks are video only, so don't pass in anything
49//!                            concerning audio. Input/output file names are added
50//!                            by the application, so there is no need for that
51//!                            either. This is the last positional argument and
52//!                            needs to be preceeded by double hypens (--) as in:
53//!                            shepherd -c c1,c2 in.mp4 out.mp4 -- -c:v libx264
54//!                            -crf 26 -preset veryslow -profile:v high -level 4.2
55//!                            -pix_fmt yuv420p
56//!                            This is also the default that is used if no options
57//!                            are provided.
58//! ```
59//!
60//! So if we have three machines c1, c2 and c3, we could do
61//! ```console
62//! $ shepherd -c c1,c2,c3 -l 30 source_file.mp4 output_file.mp4
63//! ```
64//! to have it split the video in roughly 30 second chunks and encode them in
65//! parallel. By default it encodes in H.264 with a CRF value of 26 and the
66//! `veryslow` preset. If you want to supply your own `ffmpeg` options for more
67//! control over the codec, you can do so by adding them to the end of the
68//! invocation:
69//! ```console
70//! $ shepherd -c c1,c2 input.mkv output.mp4 -- -c:v libx264 -crf 40
71//! ```
72//!
73//! ## How it works
74//!
75//! 1. Creates a temporary directory in your home directory.
76//! 2. Extracts the audio and encodes it. This is not parallelized, but the
77//!    time this takes is negligible compared to the video anyway.
78//! 3. Splits the video into chunks. This can take relatively long, since
79//!    you're basically writing the full file to disk again. It would be nice
80//!    if we could read chunks of the file and directly transfer them to the
81//!    hosts, but that might be tricky with `ffmpeg`.
82//! 4. Spawns a manager and an encoder thread for every host. The manager
83//!    creates a temporary directory in the home directory of the remote and
84//!    makes sure that the encoder always has something to encode. It will
85//!    transfer a chunk, give it to the encoder to work on and meanwhile
86//!    transfer another chunk, so the encoder can start directly with that once
87//!    it's done, without wasting any time. But it will keep at most one chunk
88//!    in reserve, to prevent the case where a slow machine takes too many
89//!    chunks and is the only one still encoding while the faster ones are
90//!    already done.
91//! 5. When an encoder is done and there are no more chunks to work on, it will
92//!    quit and the manager transfers the encoded chunks back before
93//!    terminating itself.
94//! 6. Once all encoded chunks have arrived, they're concatenated and the audio
95//!    stream added.
96//! 7. All remote and the local temporary directory are removed.
97//!
98//! Thanks to the work stealing method of distribution, having some hosts that
99//! are significantly slower than others does not delay the overall operation.
100//! In the worst case, the slowest machine is the last to start encoding a
101//! chunk and remains the only working encoder for the duration it takes to
102//! encode this one chunk. This window can easily be reduced by using smaller
103//! chunks.
104//!
105//! ## Performance
106//!
107//! As with all things parallel, Amdahl's law rears its ugly head and you don't
108//! just get twice the speed with twice the processing power. With this
109//! approach, you pay for having to split the video into chunks before you
110//! begin, transferring them to the encoders and the results back, and
111//! reassembling them. Although I should clarify that transferring the chunks
112//! to the encoders only causes a noticeable delay until every encoder has its
113//! first chunk, the subsequent ones can be sent while the encoders are working
114//! so they don't waste time waiting for that. And returning and assembling the
115//! encoded chunks doesn't carry too big of a penalty, since we're dealing with
116//! much more compressed data then.
117//!
118//! To get a better understanding of the tradeoffs, I did some testing with a
119//! couple of computers I had access to. They were my main, pretty capable
120//! desktop, two older ones and a laptop. To figure out how capable each of
121//! them is so we can compare the actual to the expected speedup, I let each of
122//! them encode a relatively short clip of slightly less than 4 minutes taken
123//! from the real video I want to encode, using the same settings I'd use for
124//! the real job. And if you're wondering why encoding takes so long, it's
125//! because I'm using the `veryslow` preset for maximum efficiency, even though
126//! it's definitely not worth the huge increase in encoding time. But it's a
127//! nice simulation for how it would look if we were using an even more
128//! demanding codec like AV1.
129//!
130//! | machine   | duration (s) | power    |
131//! | --------- | ------------ | -------- |
132//! | desktop   | 1373         | 1.000    |
133//! | old1      | 2571         | 0.53     |
134//! | old2      | 3292         | 0.42     |
135//! | laptop    | 5572         | 0.25     |
136//! | **total** | -            | **2.20** |
137//!
138//! By giving my desktop the "power" level 1, we can determine how powerful the
139//! others are at this encoding task, based on how long it takes them in
140//! comparison. By adding the three other, less capable machines to the mix, we
141//! slightly more than double the theoretical encoding capability of our
142//! system.
143//!
144//! I determined these power levels on a short clip, because encoding the full
145//! video would have taken very long on the less capable ones, especially the
146//! laptop. But I still needed to encode the full thing on at least one of them
147//! to make the comparison to the distributed encoding. I did that on my
148//! desktop since it's the fastest one, and to additionally verify that the
149//! power levels hold up for the full video, I bit the bullet and did the same
150//! on the second most powerful machine.
151//!
152//! | machine | duration (s)  | power |
153//! | ------- | ------------- | ----- |
154//! | desktop | 9356          | 1.00  |
155//! | old1    | 17690         | 0.53  |
156//!
157//! Now we have the baseline we want to beat with parallel encoding, as well as
158//! confirmation that the power levels are valid for the full video. Let's see
159//! how much of the theoretical, but unreachable 2.2x speedup we can get.
160//!
161//! Encoding the video in parallel took 5283 seconds, so 56.5% of the time
162//! using my fastest computer, or a 1.77x speedup. We committed about twice the
163//! computing power and we're not too far off that two times speedup. It's
164//! making use of the additionally available resources with an 80% efficiency
165//! in this case. I also tried to encode the short clip in parallel, which was
166//! very fast, but had a somewhat disappointing speedup of only 1.32x. I
167//! suspect that we get better results with longer videos, since encoding a
168//! chunk always takes longer than creating and transferring it (otherwise
169//! distributing wouldn't make sense at all). The longer the video then, the
170//! larger the ratio of encoding (which we can parallelize) in the total amount
171//! of time the process takes, and the more effective doing so becomes.
172//!
173//! I've also looked at how the work is distributed over the nodes, depending
174//! on their processing power. At the end of a parallel encode, it's possible
175//! to determine how many chunks have been encoded by any given host.
176//!
177//! | host          | chunks | power |
178//! | ------------- | ------ | ----- |
179//! | desktop       | 73     | 1.00  |
180//! | old1          | 39     | 0.53  |
181//! | old2          | 31     | 0.42  |
182//! | laptop        | 19     | 0.26  |
183//!
184//! Inferring the processing power from the number of chunks leads to almost
185//! exactly the same results as my initial determination, confirming it and
186//! proving that work is distributed efficiently.
187//!
188//! To further see how the system scales, I've added two more machines,
189//! bringing the total processing power up to 3.29.
190//!
191//! | machine   | duration (s) | power    |
192//! | --------- | ------------ | -------- |
193//! | desktop   | 1373         | 1.00     |
194//! | c1        | 2129         | 0.64     |
195//! | old1      | 2571         | 0.53     |
196//! | c2        | 3022         | 0.45     |
197//! | old2      | 3292         | 0.42     |
198//! | laptop    | 5572         | 0.25     |
199//! | **total** | -            | **3.29** |
200//!
201//! Encoding the video on these 6 machines in parallel took 3865 seconds, so
202//! 41.3% of the time using my fastest computer, or a 2.42x speedup. It's
203//! making use of the additionally available resources with a 74% efficiency
204//! here. As expected, while we can accelerate by adding more resources, we're
205//! looking at diminishing returns. Although the factor by which the efficiency
206//! decreases is not as bad as it could be.
207//!
208//! ## Limitations
209//!
210//! While you can use your own `ffmpeg` options to control how the video is
211//! encoded, there is currently no such option for the audio, which is 192 kb/s
212//! AAC by default.
213
214use crossbeam::channel;
215use dirs;
216use log::{error, info};
217use std::{
218    error::Error,
219    fs,
220    path::{Path, PathBuf},
221    process::Command,
222    string::ToString,
223    sync::atomic::{AtomicBool, Ordering},
224    sync::Arc,
225    thread,
226    time::Duration,
227};
228
229mod local;
230mod remote;
231
232/// The name of the temporary directory in the home directory to collect
233/// intermediate files.
234const TMP_DIR: &str = "shepherd_tmp";
235/// The name of the encoded audio track.
236const AUDIO: &str = "audio.aac";
237/// The length of chunks to split the video into.
238const DEFAULT_LENGTH: &str = "60";
239
240/// The generic result type for this crate.
241pub type Result<T> = std::result::Result<T, Box<dyn Error>>;
242
243/// Starts the whole operation and cleans up afterwards.
244///
245/// # Arguments
246/// * `input` - The path to the input file.
247/// * `output` - The path to the output file.
248/// * `args` - Arguments to `ffmpeg` for chunk encoding.
249/// * `hosts` - Comma-separated list of hosts.
250/// * `seconds` - The video chunk length.
251/// * `tmp_dir` - The path to the local temporary directory.
252/// * `keep` - Whether to keep temporary files on hosts (no cleanup).
253pub fn run(
254    input: impl AsRef<Path>,
255    output: impl AsRef<Path>,
256    args: &[&str],
257    hosts: Vec<&str>,
258    seconds: Option<&str>,
259    tmp_dir: Option<&str>,
260    keep: bool,
261) -> Result<()> {
262    // Convert the length
263    let seconds = seconds.unwrap_or(DEFAULT_LENGTH).parse::<u64>()?;
264    // Convert the tmp_dir
265    let mut tmp_dir = tmp_dir
266        .map(PathBuf::from)
267        .or_else(dirs::home_dir)
268        .ok_or("Home directory not found")?;
269
270    // Set up a shared boolean to check whether the user has aborted
271    let running = Arc::new(AtomicBool::new(true));
272    let r = Arc::clone(&running);
273    ctrlc::set_handler(move || {
274        r.store(false, Ordering::SeqCst);
275        info!(
276            "Abort signal received. Waiting for remote encoders to finish the \
277             current chunk and quit gracefully."
278        );
279    })
280    .expect("Error setting Ctrl-C handler");
281
282    tmp_dir.push(TMP_DIR);
283    // Remove local temporary directory in case it's still around
284    fs::remove_dir_all(&tmp_dir).ok();
285    // Create our local temporary directory
286    fs::create_dir(&tmp_dir)?;
287
288    // Start the operation
289    let result = run_local(
290        input.as_ref(),
291        output.as_ref(),
292        args,
293        &tmp_dir,
294        &hosts,
295        seconds,
296        Arc::clone(&running),
297    );
298
299    if !keep {
300        info!("Cleaning up");
301        // Remove remote temporary directories
302        for &host in &hosts {
303            // Clean up temporary directory on host
304            let output = Command::new("ssh")
305                .args(&[&host, "rm", "-r", remote::TMP_DIR])
306                .output()
307                .expect("Failed executing ssh command");
308            // These checks for `running` are necessary, because Ctrl + C also
309            // seems to terminate the commands we launch, which means they'll
310            // return unsuccessfully. With this check we prevent an error
311            // message in this case, because that's what the user wants.
312            // Unfortunately this also means we have to litter the `running`
313            // variable almost everyhwere.
314            if !output.status.success() && running.load(Ordering::SeqCst) {
315                error!(
316                    "Failed removing remote temporary directory on {}",
317                    host
318                );
319            }
320        }
321        // Remove local temporary directory
322        fs::remove_dir_all(&tmp_dir).ok();
323    }
324
325    result
326}
327
328/// Does the actual work.
329///
330/// This is separate so it can fail and return early, since cleanup is then
331/// handled in its caller function.
332fn run_local(
333    input: &Path,
334    output: &Path,
335    args: &[&str],
336    tmp_dir: &Path,
337    hosts: &[&str],
338    seconds: u64,
339    running: Arc<AtomicBool>,
340) -> Result<()> {
341    // Build path to audio file
342    let mut audio = tmp_dir.to_path_buf();
343    audio.push(AUDIO);
344    // Start the extraction
345    info!("Extracting audio");
346    local::extract_audio(input, &audio, &running)?;
347
348    // We check whether the user has aborted before every time-intensive task.
349    // It's a better experience, but a bit ugly code-wise.
350    if !running.load(Ordering::SeqCst) {
351        // Abort early
352        return Ok(());
353    }
354
355    // Create directory for video chunks
356    let mut chunk_dir = tmp_dir.to_path_buf();
357    chunk_dir.push("chunks");
358    fs::create_dir(&chunk_dir)?;
359    // Split the video
360    info!("Splitting video into chunks");
361    local::split_video(
362        input,
363        &chunk_dir,
364        Duration::from_secs(seconds),
365        &running,
366    )?;
367    // Get the list of created chunks
368    let mut chunks = fs::read_dir(&chunk_dir)?
369        .map(|res| res.and_then(|readdir| Ok(readdir.path())))
370        .collect::<std::io::Result<Vec<PathBuf>>>()?;
371    // Sort them so they're in order. That's not strictly necessary, but nicer
372    // for the user to watch since it allows seeing the progress at a glance.
373    chunks.sort();
374
375    if !running.load(Ordering::SeqCst) {
376        // Abort early
377        return Ok(());
378    }
379
380    // Initialize the global channel for chunks
381    let (sender, receiver) = channel::unbounded();
382    // Send all chunks into it
383    for chunk in chunks {
384        sender.send(chunk)?;
385    }
386    // Drop the sender so the channel gets disconnected
387    drop(sender);
388
389    // Since we want to share the ffmpeg arguments between the threads, we need
390    // to first set up an owned version of them
391    let args: Vec<String> = args.iter().map(ToString::to_string).collect();
392    // and then create our Arc
393    let args = Arc::new(args);
394
395    // Create directory for encoded chunks
396    let mut encoded_dir = tmp_dir.to_path_buf();
397    encoded_dir.push("encoded");
398    fs::create_dir(&encoded_dir)?;
399    // Isolate output extension, since we want encoded chunks to have the same
400    let out_ext = output
401        .extension()
402        .ok_or("Unable to find extension")?
403        .to_str()
404        .ok_or("Unable to convert OsString extension")?
405        .to_string();
406    // Spawn threads for hosts
407    info!("Starting remote encoding");
408    let mut host_threads = Vec::with_capacity(hosts.len());
409    for &host in hosts {
410        // Clone the queue receiver for the thread
411        let thread_receiver = receiver.clone();
412        // Create copy of running indicator for the thread
413        let r = Arc::clone(&running);
414        // And lots of other copies because it's easy and the extra allocations
415        // are not a problem for this kind of application
416        let host = host.to_string();
417        let enc = encoded_dir.clone();
418        let ext = out_ext.clone();
419        let a = Arc::clone(&args);
420        // Start it
421        let handle =
422            thread::Builder::new().name(host.clone()).spawn(|| {
423                remote::host_thread(host, thread_receiver, enc, ext, a, r);
424            })?;
425        host_threads.push(handle);
426    }
427
428    // Wait for all hosts to finish
429    for handle in host_threads {
430        if handle.join().is_err() {
431            return Err("A host thread panicked".into());
432        }
433    }
434
435    if !running.load(Ordering::SeqCst) {
436        // We aborted early
437        return Ok(());
438    }
439
440    // Combine encoded chunks and audio
441    info!("Combining encoded chunks into final video");
442    local::combine(&encoded_dir, &audio, output, &running)?;
443
444    Ok(())
445}