named_pipes/
named_pipes.rs1#[cfg(feature = "named_pipes")]
12fn main() -> anyhow::Result<()> {
13 use anyhow::Result;
14 use ffmpeg_sidecar::command::FfmpegCommand;
15 use ffmpeg_sidecar::event::{FfmpegEvent, LogLevel};
16 use ffmpeg_sidecar::named_pipes::NamedPipe;
17 use ffmpeg_sidecar::pipe_name;
18 use std::io::Read;
19 use std::sync::mpsc;
20 use std::thread;
21
22 const VIDEO_PIPE_NAME: &str = pipe_name!("ffmpeg_video");
23 const AUDIO_PIPE_NAME: &str = pipe_name!("ffmpeg_audio");
24 const SUBTITLES_PIPE_NAME: &str = pipe_name!("ffmpeg_subtitles");
25
26 let mut command = FfmpegCommand::new();
28 command
29 .hide_banner()
31 .overwrite() .format("lavfi")
34 .input("testsrc=size=1920x1080:rate=60:duration=10")
35 .format("lavfi")
37 .input("sine=frequency=1000:duration=10")
38 .format("srt")
40 .input(
41 "data:text/plain;base64,MQ0KMDA6MDA6MDAsMDAwIC0tPiAwMDowMDoxMCw1MDANCkhlbGxvIFdvcmxkIQ==",
42 )
43 .map("0:v")
45 .format("rawvideo")
46 .pix_fmt("rgb24")
47 .output(VIDEO_PIPE_NAME)
48 .map("1:a")
50 .format("s16le")
51 .output(AUDIO_PIPE_NAME)
52 .map("2:s")
54 .format("srt")
55 .output(SUBTITLES_PIPE_NAME);
56
57 let threads = [VIDEO_PIPE_NAME, AUDIO_PIPE_NAME, SUBTITLES_PIPE_NAME]
59 .iter()
60 .cloned()
61 .map(|pipe_name| {
62 let mut pipe = NamedPipe::new(pipe_name)?;
65 println!("[{pipe_name}] pipe created");
66 let (ready_sender, ready_receiver) = mpsc::channel::<()>();
67 let thread = thread::spawn(move || -> Result<()> {
68 println!("[{pipe_name}] waiting for ready signal");
71 ready_receiver.recv()?;
72
73 println!("[{pipe_name}] reading from pipe");
77 let mut buf = vec![0; 1920 * 1080 * 3];
78 let mut total_bytes_read = 0;
79
80 let mut text_content = if pipe_name == SUBTITLES_PIPE_NAME {
82 Some("".to_string())
83 } else {
84 None
85 };
86
87 loop {
88 match pipe.read(&mut buf) {
89 Ok(bytes_read) => {
90 total_bytes_read += bytes_read;
91
92 if let Some(cur_str) = &mut text_content {
94 let s = std::str::from_utf8(&buf[..bytes_read]).unwrap();
95 text_content = Some(format!("{}{}", cur_str, s));
96 }
97
98 if bytes_read == 0 {
99 break;
100 }
101 }
102 Err(err) => {
103 if err.kind() != std::io::ErrorKind::BrokenPipe {
104 return Err(err.into());
105 } else {
106 break;
107 }
108 }
109 }
110 }
111
112 let size_str = if total_bytes_read < 1024 {
116 format!("{}B", total_bytes_read)
117 } else {
118 format!("{}KiB", total_bytes_read / 1024)
119 };
120
121 if let Some(text_content) = text_content {
122 println!("[{pipe_name}] subtitle text content: ");
123 println!("{}", text_content.trim());
124 }
125
126 println!("[{pipe_name}] done reading ({size_str} total)");
127 Ok(())
128 });
129
130 Ok((thread, ready_sender))
131 })
132 .collect::<Result<Vec<_>>>()?;
133
134 let mut ready_signal_sent = false;
136 command
137 .print_command()
138 .spawn()?
139 .iter()?
140 .for_each(|event| match event {
141 FfmpegEvent::Progress(_) if !ready_signal_sent => {
143 threads.iter().for_each(|(_, sender)| {
144 sender.send(()).ok();
145 });
146 ready_signal_sent = true;
147 }
148
149 FfmpegEvent::Log(LogLevel::Info, msg) if msg.starts_with("[out#") => {
151 println!("{msg}");
152 }
153
154 FfmpegEvent::Log(LogLevel::Warning | LogLevel::Error | LogLevel::Fatal, msg) => {
156 eprintln!("{msg}");
157 }
158
159 _ => {}
160 });
161
162 for (thread, _) in threads {
163 thread.join().unwrap()?;
164 }
165
166 Ok(())
167}
168
169#[cfg(not(feature = "named_pipes"))]
170fn main() {
171 eprintln!(r#"Enable the "named_pipes" feature to run this example."#);
172 println!("cargo run --features named_pipes --example named_pipes")
173}