use std::path::{Path, PathBuf};
use std::process::ExitCode;
use std::sync::Arc;
use anyhow::{Context, Result, bail};
use clap::{Parser, Subcommand, ValueEnum};
use tracing_subscriber::EnvFilter;
use rivet::progress::{RungProgress, RungStatus};
use rivet::spec::{AudioPolicy, BitDepth, ChunkSeamMode, ColorPolicy, GpuFamily};
use rivet::{JobOutput, RungArtifact, TranscodeSettings};
#[derive(Parser)]
#[command(
name = "rivet",
version,
about = "Modular GPU-accelerated video transcoder (AV1 + Opus).",
long_about = None
)]
struct Cli {
#[command(subcommand)]
command: Command,
}
#[derive(Clone, Copy, ValueEnum)]
enum ModeArg {
Single,
Hls,
}
#[derive(Clone, Copy, ValueEnum)]
enum AudioArg {
Auto,
Opus,
Drop,
}
impl From<AudioArg> for AudioPolicy {
fn from(a: AudioArg) -> Self {
match a {
AudioArg::Auto => AudioPolicy::Auto,
AudioArg::Opus => AudioPolicy::ForceOpus,
AudioArg::Drop => AudioPolicy::Drop,
}
}
}
#[derive(Clone, Copy, ValueEnum)]
enum GpuFamilyArg {
Nvidia,
Amd,
Intel,
}
impl From<GpuFamilyArg> for GpuFamily {
fn from(a: GpuFamilyArg) -> Self {
match a {
GpuFamilyArg::Nvidia => GpuFamily::Nvidia,
GpuFamilyArg::Amd => GpuFamily::Amd,
GpuFamilyArg::Intel => GpuFamily::Intel,
}
}
}
#[derive(Clone, Copy, ValueEnum)]
enum ColorArg {
Sdr,
Hdr10,
Hlg,
Passthrough,
}
impl From<ColorArg> for ColorPolicy {
fn from(a: ColorArg) -> Self {
match a {
ColorArg::Sdr => ColorPolicy::TonemapToSdr,
ColorArg::Hdr10 => ColorPolicy::Hdr10,
ColorArg::Hlg => ColorPolicy::Hlg,
ColorArg::Passthrough => ColorPolicy::Passthrough,
}
}
}
#[derive(Clone, Copy, ValueEnum)]
enum PixelArg {
Auto,
#[value(name = "8bit")]
Eight,
#[value(name = "10bit")]
Ten,
}
impl From<PixelArg> for BitDepth {
fn from(a: PixelArg) -> Self {
match a {
PixelArg::Auto => BitDepth::Auto,
PixelArg::Eight => BitDepth::EightBit,
PixelArg::Ten => BitDepth::TenBit,
}
}
}
#[derive(Clone, Copy, ValueEnum)]
enum SeamArg {
Parallel,
Constqp,
Serial,
}
impl From<SeamArg> for ChunkSeamMode {
fn from(a: SeamArg) -> Self {
match a {
SeamArg::Parallel => ChunkSeamMode::Parallel,
SeamArg::Constqp => ChunkSeamMode::ParallelConstQp,
SeamArg::Serial => ChunkSeamMode::Serial,
}
}
}
#[derive(Subcommand)]
enum Command {
Transcode {
input: PathBuf,
#[arg(short, long)]
output: Option<PathBuf>,
#[arg(long, value_enum, default_value = "single")]
mode: ModeArg,
#[arg(long = "rung", value_name = "WxH")]
rungs: Vec<String>,
#[arg(long)]
ladder: bool,
#[arg(long)]
max_short_side: Option<u32>,
#[arg(long, default_value_t = 4.0)]
segment_seconds: f32,
#[arg(long)]
crf: Option<u8>,
#[arg(long)]
speed: Option<u8>,
#[arg(long, value_enum, default_value = "auto")]
audio: AudioArg,
#[arg(long)]
max_fps: Option<f64>,
#[arg(long)]
gpu: Option<u32>,
#[arg(long)]
single_gpu: bool,
#[arg(long, value_enum)]
gpu_family: Option<GpuFamilyArg>,
#[arg(long)]
decode_gpu: Option<u32>,
#[arg(long, value_enum, default_value = "sdr")]
color: ColorArg,
#[arg(long, value_enum, default_value = "auto")]
pixel_format: PixelArg,
#[arg(long = "seam-mode", value_enum, default_value = "parallel")]
seam_mode: SeamArg,
#[arg(long)]
filter: Option<String>,
},
Probe {
input: PathBuf,
#[arg(long)]
json: bool,
},
Devices {
#[arg(long)]
json: bool,
},
#[command(visible_alias = "caps")]
Capabilities {
#[arg(long)]
json: bool,
},
Pipe {
#[arg(long)]
crf: Option<u8>,
#[arg(long)]
speed: Option<u8>,
#[arg(long, value_enum)]
audio: Option<AudioArg>,
#[arg(long, value_enum)]
color: Option<ColorArg>,
#[arg(long = "bit-depth", visible_alias = "pixel-format", value_enum)]
bit_depth: Option<PixelArg>,
#[arg(long = "max-fps")]
max_fps: Option<f64>,
#[arg(long)]
width: Option<u32>,
#[arg(long)]
height: Option<u32>,
#[arg(long)]
gpu: Option<u32>,
#[arg(long)]
filter: Option<String>,
},
#[cfg(feature = "ipc")]
Ipc {
#[arg(long)]
socket: PathBuf,
},
#[cfg(feature = "batch")]
Batch {
manifest: PathBuf,
#[arg(long)]
dry_run: bool,
#[arg(long)]
stop_on_error: bool,
},
#[cfg(feature = "server")]
Serve {
#[arg(long, default_value = "127.0.0.1:8080")]
addr: String,
},
}
fn main() -> ExitCode {
tracing_subscriber::fmt()
.with_env_filter(EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info")))
.with_writer(std::io::stderr)
.init();
match run() {
Ok(()) => ExitCode::SUCCESS,
Err(e) => {
eprintln!("error: {e:#}");
ExitCode::FAILURE
}
}
}
fn run() -> Result<()> {
let cli = Cli::parse();
match cli.command {
Command::Transcode {
input,
output,
mode,
rungs,
ladder,
max_short_side,
segment_seconds,
crf,
speed,
audio,
max_fps,
gpu,
single_gpu,
gpu_family,
decode_gpu,
color,
pixel_format,
seam_mode,
filter,
} => transcode_cmd(TranscodeArgs {
input,
output,
mode,
rungs,
ladder,
max_short_side,
segment_seconds,
crf,
speed,
audio,
max_fps,
gpu,
single_gpu,
gpu_family,
decode_gpu,
color,
pixel_format,
seam_mode,
filter,
}),
Command::Probe { input, json } => {
let info = rivet::probe_file(&input)
.with_context(|| format!("probing {}", input.display()))?;
if json {
println!("{}", probe_json(&info));
} else {
print_probe(&input, &info);
}
Ok(())
}
Command::Devices { json } => {
devices_cmd(json);
Ok(())
}
Command::Capabilities { json } => {
capabilities_cmd(json);
Ok(())
}
Command::Pipe {
crf,
speed,
audio,
color,
bit_depth,
max_fps,
width,
height,
gpu,
filter,
} => pipe_cmd(TranscodeSettings {
crf,
speed,
audio: audio.map(Into::into),
color: color.map(Into::into),
bit_depth: bit_depth.map(Into::into),
max_fps,
width,
height,
gpu,
filters: match filter {
Some(s) => codec::filter::parse_chain(&s).context("parsing --filter")?,
None => Vec::new(),
},
..Default::default()
}),
#[cfg(feature = "ipc")]
Command::Ipc { socket } => ipc_cmd(&socket),
#[cfg(feature = "batch")]
Command::Batch {
manifest,
dry_run,
stop_on_error,
} => batch_cmd(&manifest, dry_run, stop_on_error),
#[cfg(feature = "server")]
Command::Serve { addr } => {
let addr: std::net::SocketAddr = addr.parse().context("parsing --addr")?;
let rt = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.context("building tokio runtime")?;
eprintln!("rivet transcode API on http://{addr} (POST media to /v1/transcode)");
rt.block_on(rivet::server::serve(addr))
}
}
}
struct TranscodeArgs {
input: PathBuf,
output: Option<PathBuf>,
mode: ModeArg,
rungs: Vec<String>,
ladder: bool,
max_short_side: Option<u32>,
segment_seconds: f32,
crf: Option<u8>,
speed: Option<u8>,
audio: AudioArg,
max_fps: Option<f64>,
gpu: Option<u32>,
single_gpu: bool,
gpu_family: Option<GpuFamilyArg>,
decode_gpu: Option<u32>,
color: ColorArg,
pixel_format: PixelArg,
seam_mode: SeamArg,
filter: Option<String>,
}
fn transcode_cmd(args: TranscodeArgs) -> Result<()> {
let bytes = std::fs::read(&args.input)
.with_context(|| format!("reading input {}", args.input.display()))?;
let probed = rivet::probe_bytes(&bytes).context("probing input")?;
let rungs = args
.rungs
.iter()
.map(|s| parse_wxh(s))
.collect::<Result<Vec<_>>>()?;
let filters = match args.filter.as_deref() {
Some(s) => codec::filter::parse_chain(s).context("parsing --filter")?,
None => Vec::new(),
};
let settings = TranscodeSettings {
mode: Some(match args.mode {
ModeArg::Single => rivet::Mode::Single,
ModeArg::Hls => rivet::Mode::Hls,
}),
rungs,
ladder: args.ladder,
max_short_side: args.max_short_side,
segment_seconds: Some(args.segment_seconds),
crf: args.crf,
speed: args.speed,
audio: Some(args.audio.into()),
color: Some(args.color.into()),
bit_depth: Some(args.pixel_format.into()),
seam: Some(args.seam_mode.into()),
max_fps: args.max_fps,
gpu: args.gpu,
gpu_family: args.gpu_family.map(Into::into),
single_gpu: args.single_gpu,
decode_gpu: args.decode_gpu,
width: None,
height: None,
filters,
};
let spec = settings
.into_spec(probed.width, probed.height)
.context("building output spec")?;
let sink = Arc::new(rivet::fn_sink(|p: RungProgress| {
eprintln!(
" [{:>6}] {:<6} {:>5.1}% {} frames{}",
p.label,
status_str(p.status),
p.percent,
p.frames_done,
p.message.as_deref().map(|m| format!(" ({m})")).unwrap_or_default(),
);
}));
let (output_dir, single_file_target) = plan_output(&args)?;
let out = rivet::run_job_blocking(
&bytes,
&spec,
output_dir.as_deref(),
sink,
)
.with_context(|| format!("transcoding {}", args.input.display()))?;
write_outputs(&args, &out, output_dir.as_deref(), single_file_target.as_deref())?;
print_summary(&args.input, &out);
Ok(())
}
fn plan_output(args: &TranscodeArgs) -> Result<(Option<PathBuf>, Option<PathBuf>)> {
match args.mode {
ModeArg::Hls => {
let dir = args
.output
.clone()
.unwrap_or_else(|| default_dir(&args.input, "hls"));
std::fs::create_dir_all(&dir)
.with_context(|| format!("creating output dir {}", dir.display()))?;
Ok((Some(dir), None))
}
ModeArg::Single => {
let multi = args.rungs.len() > 1 || args.ladder;
if multi {
let dir = args
.output
.clone()
.unwrap_or_else(|| default_dir(&args.input, "av1"));
std::fs::create_dir_all(&dir)
.with_context(|| format!("creating output dir {}", dir.display()))?;
Ok((Some(dir), None))
} else {
let file = args
.output
.clone()
.unwrap_or_else(|| default_file(&args.input));
Ok((None, Some(file)))
}
}
}
}
fn write_outputs(
args: &TranscodeArgs,
out: &JobOutput,
output_dir: Option<&Path>,
single_file_target: Option<&Path>,
) -> Result<()> {
match args.mode {
ModeArg::Hls => {
}
ModeArg::Single => {
if let Some(file) = single_file_target {
if let Some(r) = out.rungs.first() {
if let RungArtifact::File(bytes) = &r.artifact {
std::fs::write(file, bytes)
.with_context(|| format!("writing {}", file.display()))?;
}
}
} else if let Some(dir) = output_dir {
for r in &out.rungs {
if let RungArtifact::File(bytes) = &r.artifact {
let path = dir.join(format!("{}.mp4", r.label));
std::fs::write(&path, bytes)
.with_context(|| format!("writing {}", path.display()))?;
}
}
}
}
}
Ok(())
}
fn print_summary(input: &Path, out: &JobOutput) {
println!(
"{} ({}x{} @ {:.3} fps {})",
input.display(),
out.source_dims.0,
out.source_dims.1,
out.source_frame_rate,
out.source_codec,
);
println!(" audio: {}", out.audio_handling);
for r in &out.rungs {
let where_ = match &r.artifact {
RungArtifact::File(_) => "mp4".to_string(),
RungArtifact::HlsRendition { relative_dir, .. } => relative_dir.clone(),
};
println!(
" {:<6} {}x{} {} frames {:.2} MiB [{}]",
r.label,
r.width,
r.height,
r.frames,
r.bytes as f64 / (1024.0 * 1024.0),
where_,
);
}
if let Some(master) = &out.master_playlist {
println!(" master playlist: {}", master.display());
}
println!(" done in {:.2}s", out.elapsed.as_secs_f64());
}
fn parse_wxh(s: &str) -> Result<(u32, u32)> {
let (w, h) = s
.split_once(['x', 'X'])
.ok_or_else(|| anyhow::anyhow!("rung '{s}' is not WxH (e.g. 1280x720)"))?;
let w: u32 = w.trim().parse().with_context(|| format!("bad width in '{s}'"))?;
let h: u32 = h.trim().parse().with_context(|| format!("bad height in '{s}'"))?;
if w == 0 || h == 0 {
bail!("rung '{s}' has a zero dimension");
}
Ok((w & !1, h & !1))
}
fn default_file(input: &Path) -> PathBuf {
let stem = input
.file_stem()
.map(|s| s.to_string_lossy().into_owned())
.unwrap_or_else(|| "output".to_string());
let mut out = input.to_path_buf();
out.set_file_name(format!("{stem}.av1.mp4"));
out
}
fn default_dir(input: &Path, suffix: &str) -> PathBuf {
let stem = input
.file_stem()
.map(|s| s.to_string_lossy().into_owned())
.unwrap_or_else(|| "output".to_string());
let mut out = input.to_path_buf();
out.set_file_name(format!("{stem}.{suffix}"));
out
}
fn status_str(s: RungStatus) -> &'static str {
match s {
RungStatus::Pending => "pend",
RungStatus::Running => "run",
RungStatus::Finalizing => "final",
RungStatus::Completed => "done",
RungStatus::Failed => "FAIL",
}
}
fn print_probe(input: &Path, info: &rivet::MediaInfo) {
println!("{}", input.display());
println!(" container : {}", info.container);
println!(" video : {}", info.video_codec);
println!(" dimensions: {}x{}", info.width, info.height);
println!(" frame rate: {:.3} fps", info.frame_rate);
if info.duration > 0.0 {
println!(" duration : {:.3} s", info.duration);
}
println!(" pixel fmt : {}", info.pixel_format);
match &info.audio {
Some(a) => println!(" audio : {} {} Hz {} ch", a.codec, a.sample_rate, a.channels),
None => println!(" audio : (none)"),
}
}
fn probe_json(info: &rivet::MediaInfo) -> String {
let audio = match &info.audio {
Some(a) => format!(
"{{\"codec\":\"{}\",\"sample_rate\":{},\"channels\":{}}}",
esc(&a.codec),
a.sample_rate,
a.channels
),
None => "null".to_string(),
};
format!(
"{{\"container\":\"{}\",\"video_codec\":\"{}\",\"width\":{},\"height\":{},\"frame_rate\":{},\"duration\":{},\"pixel_format\":\"{}\",\"audio\":{}}}",
esc(&info.container),
esc(&info.video_codec),
info.width,
info.height,
info.frame_rate,
info.duration,
esc(&info.pixel_format),
audio,
)
}
fn esc(s: &str) -> String {
s.replace('\\', "\\\\").replace('"', "\\\"")
}
fn devices_cmd(json: bool) {
let devices = codec::gpu::detect_gpus();
if json {
println!("{}", devices_json(&devices));
return;
}
if devices.is_empty() {
println!(
"No GPUs detected (CPU-only host). GPU transcode needs a `nvidia` / `amd` / `qsv` \
feature build with the matching hardware; the `ffmpeg` feature provides software."
);
return;
}
let util = codec::gpu::GpuUtilizationReader::new();
println!("{} GPU(s) detected:\n", devices.len());
for d in &devices {
println!(
" [{}] {} {}",
d.index,
codec::gpu::manufacturer_label(d.vendor),
d.name
);
println!(" generation : {}", d.generation);
if d.vram_mib > 0 {
println!(" VRAM : {} MiB", d.vram_mib);
}
println!(" PCI : {}", d.host_pci_address);
println!(
" AV1 encode : {}",
if codec::encode::av1_encode_capable(d) { "yes" } else { "no" }
);
if matches!(d.vendor, codec::gpu::GpuVendor::Nvidia) {
let u = util.read(d);
print!(
" load : gpu {}% · enc {}% · dec {}% · mem {}/{} MiB",
u.util_percent, u.encoder_percent, u.decoder_percent, u.mem_used_mib, u.mem_total_mib
);
if let Some(t) = u.temperature_c {
print!(" · {t}°C");
}
println!();
}
println!();
}
println!("Run `rivet capabilities` for what this build can encode/decode.");
}
fn devices_json(devices: &[codec::gpu::GpuDevice]) -> String {
let util = codec::gpu::GpuUtilizationReader::new();
let items: Vec<String> = devices
.iter()
.map(|d| {
let load = if matches!(d.vendor, codec::gpu::GpuVendor::Nvidia) {
let u = util.read(d);
let temp = u
.temperature_c
.map(|t| t.to_string())
.unwrap_or_else(|| "null".into());
format!(
",\"load\":{{\"gpu_percent\":{},\"encoder_percent\":{},\"decoder_percent\":{},\"mem_used_mib\":{},\"mem_total_mib\":{},\"temperature_c\":{}}}",
u.util_percent, u.encoder_percent, u.decoder_percent, u.mem_used_mib, u.mem_total_mib, temp
)
} else {
String::new()
};
format!(
"{{\"index\":{},\"vendor\":\"{}\",\"name\":\"{}\",\"generation\":\"{}\",\"vram_mib\":{},\"pci\":\"{}\",\"av1_encode\":{}{}}}",
d.index,
codec::gpu::manufacturer_label(d.vendor),
esc(&d.name),
esc(&d.generation),
d.vram_mib,
esc(&d.host_pci_address),
codec::encode::av1_encode_capable(d),
load
)
})
.collect();
format!("{{\"gpus\":[{}]}}", items.join(","))
}
fn capabilities_cmd(json: bool) {
let enc = codec::encode::encode_backends();
let dec_backends = codec::decode::decode_backends();
let caps = codec::encode::build_output_caps();
let dec = codec::decode::decode_capabilities();
let devices = codec::gpu::detect_gpus();
if json {
let enc_b = enc
.iter()
.map(|b| format!("\"{b}\""))
.collect::<Vec<_>>()
.join(",");
let dec_b = dec_backends
.iter()
.map(|b| format!("\"{b}\""))
.collect::<Vec<_>>()
.join(",");
let codecs = dec
.iter()
.map(|d| {
let bs = d
.backends
.iter()
.map(|b| format!("\"{b}\""))
.collect::<Vec<_>>()
.join(",");
format!("{{\"codec\":\"{}\",\"backends\":[{}]}}", d.codec, bs)
})
.collect::<Vec<_>>()
.join(",");
println!(
"{{\"encode\":{{\"codec\":\"av1\",\"backends\":[{}],\"max_bit_depth\":{},\"hdr\":{}}},\
\"decode\":{{\"backends\":[{}],\"codecs\":[{}]}},\"devices\":{}}}",
enc_b,
caps.max_bit_depth,
caps.hdr,
dec_b,
codecs,
devices_json(&devices)
);
return;
}
println!("rivet capabilities\n");
println!("Encode — AV1 (4:2:0):");
if enc.is_empty() {
println!(" (none) build with a `nvidia` / `amd` / `qsv` / `ffmpeg` feature");
} else {
println!(" backends : {}", enc.join(", "));
println!(" max depth : {}-bit", caps.max_bit_depth);
println!(
" HDR : {}",
if caps.hdr {
"yes (PQ / HLG, BT.2020, 10-bit)"
} else {
"no"
}
);
}
println!("\nDecode — codec → backends:");
if dec_backends.is_empty() {
println!(" (none) build with a `nvidia` / `amd` / `qsv` / `ffmpeg` feature");
} else {
for d in &dec {
let b = if d.backends.is_empty() {
"—".to_string()
} else {
d.backends.join(", ")
};
println!(" {:<8} {}", d.codec, b);
}
}
println!("\nDevices — {} detected:", devices.len());
if devices.is_empty() {
println!(" (none) CPU-only host — only the `ffmpeg` software path can run here");
} else {
for dv in &devices {
print!(
" [{}] {} {}",
dv.index,
codec::gpu::manufacturer_label(dv.vendor),
dv.name
);
if dv.vram_mib > 0 {
print!(" ({} MiB)", dv.vram_mib);
}
let av1 = if codec::encode::av1_encode_capable(dv) { "yes" } else { "no" };
println!(" · AV1 encode: {av1}");
}
}
}
fn stream_transcode(input: &[u8], settings: &TranscodeSettings) -> Result<(Vec<u8>, u64, String)> {
if settings.is_empty() {
let out = rivet::transcode_bytes(input).context("transcoding")?;
return Ok((
out.output_bytes,
out.frames_processed,
out.audio_handling.label(),
));
}
let probed = rivet::probe_bytes(input).context("probing input")?;
let spec = settings
.clone()
.into_spec(probed.width, probed.height)
.context("invalid settings")?;
if matches!(spec.mode, rivet::OutputMode::Hls { .. }) {
bail!(
"HLS/segmented output isn't supported over pipe/ipc (a single stream) — \
use `rivet transcode -o <dir>` or the HTTP API"
);
}
let sink = Arc::new(rivet::fn_sink(|_p: RungProgress| {}));
let out = rivet::run_job_blocking(input, &spec, None, sink).context("transcoding")?;
let audio = out.audio_handling.clone();
for r in out.rungs {
let frames = r.frames;
if let rivet::RungArtifact::File(bytes) = r.artifact {
return Ok((bytes, frames, audio));
}
}
bail!("no single-file output produced")
}
fn pipe_cmd(settings: TranscodeSettings) -> Result<()> {
use std::io::{Read, Write};
let mut input = Vec::new();
std::io::stdin()
.lock()
.read_to_end(&mut input)
.context("reading media from stdin")?;
if input.is_empty() {
bail!("empty stdin — pipe media in, e.g. `cat in.mkv | rivet pipe > out.mp4`");
}
eprintln!("rivet pipe: {} bytes in, transcoding…", input.len());
let (bytes, frames, audio) = stream_transcode(&input, &settings)?;
let mut stdout = std::io::stdout().lock();
stdout.write_all(&bytes).context("writing AV1/MP4 to stdout")?;
stdout.flush().ok();
eprintln!("rivet pipe: {frames} frames → {} bytes out ({audio})", bytes.len());
Ok(())
}
#[cfg(all(feature = "ipc", unix))]
fn split_ipc_settings(input: &[u8]) -> (Result<TranscodeSettings>, &[u8]) {
const MAGIC: &[u8] = b"#rivet";
if input.starts_with(MAGIC) {
let nl = input.iter().position(|&b| b == b'\n').unwrap_or(input.len());
let media_start = (nl + 1).min(input.len());
let line = std::str::from_utf8(&input[MAGIC.len()..nl])
.map(str::trim)
.unwrap_or("");
(TranscodeSettings::parse_kv_line(line), &input[media_start..])
} else {
(Ok(TranscodeSettings::default()), input)
}
}
#[cfg(all(feature = "ipc", unix))]
fn ipc_cmd(socket: &Path) -> Result<()> {
use std::io::{Read, Write};
use std::os::unix::net::{UnixListener, UnixStream};
let _ = std::fs::remove_file(socket);
let listener = UnixListener::bind(socket)
.with_context(|| format!("binding Unix socket {}", socket.display()))?;
eprintln!(
"rivet ipc: listening on {}\n per connection: [optional `#rivet k=v …\\n` header] media → half-close → read AV1/MP4 back\n e.g. socat - UNIX-CONNECT:{} < in.mkv > out.mp4",
socket.display(),
socket.display(),
);
fn handle(mut stream: UnixStream) {
let mut input = Vec::new();
if let Err(e) = stream.read_to_end(&mut input) {
eprintln!("rivet ipc: read error: {e}");
return;
}
if input.is_empty() {
return; }
let (settings, media) = split_ipc_settings(&input);
let settings = match settings {
Ok(s) => s,
Err(e) => {
eprintln!("rivet ipc: bad settings header: {e:#}");
return;
}
};
eprintln!("rivet ipc: {} media bytes in", media.len());
match stream_transcode(media, &settings) {
Ok((bytes, frames, audio)) => {
if let Err(e) = stream.write_all(&bytes) {
eprintln!("rivet ipc: write error: {e}");
return;
}
stream.flush().ok();
let _ = stream.shutdown(std::net::Shutdown::Write);
eprintln!("rivet ipc: {frames} frames → {} bytes out ({audio})", bytes.len());
}
Err(e) => eprintln!("rivet ipc: transcode error: {e:#}"),
}
}
for stream in listener.incoming() {
match stream {
Ok(s) => {
std::thread::spawn(move || handle(s));
}
Err(e) => eprintln!("rivet ipc: accept error: {e}"),
}
}
Ok(())
}
#[cfg(all(feature = "ipc", not(unix)))]
fn ipc_cmd(_socket: &Path) -> Result<()> {
bail!(
"`rivet ipc` (Unix-domain socket) is Unix-only. On Windows, use \
`rivet pipe` (stdin/stdout) or `rivet serve` (HTTP)."
)
}
#[cfg(feature = "batch")]
fn batch_cmd(manifest_path: &Path, dry_run: bool, stop_on_error: bool) -> Result<()> {
use rivet::manifest;
let text = std::fs::read_to_string(manifest_path)
.with_context(|| format!("reading manifest {}", manifest_path.display()))?;
let mut m = manifest::parse_manifest(&text, manifest::Format::from_path(manifest_path))?;
if stop_on_error {
m.on_error = Some("stop".into());
}
let base = manifest_path
.parent()
.filter(|p| !p.as_os_str().is_empty())
.unwrap_or(Path::new("."))
.to_path_buf();
if dry_run {
let planned = manifest::plan_manifest(&m, &base)?;
eprintln!("batch dry-run: {} job(s) planned\n", planned.len());
for (i, job) in planned.iter().enumerate() {
let s = &job.spec;
let mode = s.mode.as_deref().unwrap_or("single");
let mut bits = vec![format!("mode={mode}")];
if s.ladder == Some(true) {
bits.push("ladder".into());
}
if let Some(r) = &s.rungs {
bits.push(format!("rungs={}", r.join(",")));
}
if let Some(c) = s.crf {
bits.push(format!("crf={c}"));
}
if let Some(c) = &s.color {
bits.push(format!("color={c}"));
}
if let Some(o) = &s.output {
bits.push(format!("output={o}"));
}
eprintln!(" [{}] {} ({})", i + 1, job.input.display(), bits.join(" "));
}
eprintln!("\n(dry run — nothing converted)");
return Ok(());
}
let report = manifest::run_manifest(&m, &base)?;
println!(
"\nbatch: {} ok, {} failed (of {})",
report.ok_count(),
report.failed_count(),
report.outcomes.len()
);
for o in &report.outcomes {
match &o.status {
manifest::JobStatus::Ok => println!(
" ok {} -> {}",
o.input.display(),
o.output.as_ref().map(|p| p.display().to_string()).unwrap_or_default()
),
manifest::JobStatus::Failed(e) => {
println!(" FAIL {}: {}", o.input.display(), e)
}
}
}
if !report.all_ok() {
bail!("{} job(s) failed", report.failed_count());
}
Ok(())
}