1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
//! A sink which pipes the canvas into ffmpeg for video encoding or streaming

use crate::pixmap::SharedPixmap;
use crate::DaemonResult;
use anyhow::anyhow;
use std::process::Stdio;
use std::time::Duration;
use tokio::io::AsyncWriteExt;
use tokio::process::{Child, Command};
use tokio::task::{AbortHandle, JoinSet};

/// Configuration options of the ffmpeg sink
///
/// Some ffmpeg options are included in this struct as direct parameters but since output selection and encoding
/// configuration is very complex, it is done via the `output_spec` field which holds arguments that are passed directly
/// to ffmpeg.
/// To simplify the construction of common output specs, helper functions are available.
///
/// ## Examples
///
/// To stream to an rtsp server with 10fps:
///
/// ```rust
/// # use pixeldike::sinks::ffmpeg::FfmpegOptions;
///
/// const FPS: usize = 10;
/// let options = FfmpegOptions {
///     framerate: FPS,
///     synthesize_audio: true,
///     log_level: "warning".to_string(),
///     output_spec: FfmpegOptions::make_rtsp_out_spec("rtsp://localhost:8554/pixelflut", FPS)
/// };
/// ```
///
/// Stream to an RTSP and RTMP server simultaneously:
///
/// ```rust
/// # use pixeldike::sinks::ffmpeg::FfmpegOptions;
///
/// const FPS: usize = 10;
/// let options = FfmpegOptions {
///     framerate: FPS,
///     synthesize_audio: true,
///     log_level: "warning".to_string(),
///     output_spec: [
///         FfmpegOptions::make_rtsp_out_spec("rtsp://localhost:8554/pixelflut", FPS),
///         FfmpegOptions::make_rtmp_out_spec("rtmp://localhost:1935/pixelflut2", FPS),
///     ]
///     .into_iter()
///     .flatten()
///     .collect()
/// };
/// ```
#[derive(Debug, Clone, Eq, PartialEq)]
pub struct FfmpegOptions {
    /// The level on which ffmpeg should emit logs.
    ///
    /// Valid values are 'quiet', 'panic', 'fatal', 'error', 'warning', 'info', 'verbose', 'debug', 'trace'
    pub log_level: String,

    /// How many frames per second should be emitted.
    pub framerate: usize,

    /// Whether an empty audio track should be synthesized.
    ///
    /// **Note:** While strictly speaking an audio track is not required since pixelflut only consists of image data,
    /// some viewers won't display the video data if there is no audio component present.
    pub synthesize_audio: bool,

    /// Additional ffmpeg arguments that should be placed in the output part of the generated command.
    pub output_spec: Vec<String>,
}

impl FfmpegOptions {
    /// Create vector of ffmpeg arguments that are suitable for streaming to an [RTSP](https://en.wikipedia.org/wiki/Real-Time_Streaming_Protocol) server.
    ///
    /// The `server_addr` is required to be in `rtsp://hostname[:port]/path` format while `framerate` sets the targeted
    /// framerate of the stream.
    pub fn make_rtsp_out_spec(server_addr: &str, framerate: usize) -> Vec<String> {
        [
            // set encoding to commonly supported variant
            "-vcodec",
            "libx264",
            "-acodec",
            "aac",
            // encode as quickly as possible
            "-preset",
            "veryfast",
            // disable b-frames since some players don't support them
            "-bf",
            "0",
            // set pixel format to a commonly supported one
            "-pix_fmt",
            "yuv420p",
            // set output frame rate
            "-framerate",
            &framerate.to_string(),
            // force output format to be rtsp
            "-f",
            "rtsp",
            // set output url to the given rtsp server
            server_addr,
        ]
        .into_iter()
        .map(String::from)
        .collect()
    }

    /// Create a vector of ffmpeg arguments that are suitable for streaming to an [RTMP](https://en.wikipedia.org/wiki/Real-Time_Messaging_Protocol) server.
    pub fn make_rtmp_out_spec(server_addr: &str, framerate: usize) -> Vec<String> {
        [
            // set encoding to commonly supported variant
            "-vcodec",
            "libx264",
            "-acodec",
            "aac",
            // encode as quickly as possible
            "-preset",
            "veryfast",
            // disable b-frames since some players don't support them
            "-bf",
            "0",
            // set pixel format to a commonly supported one
            "-pix_fmt",
            "yuv420p",
            // set output frame rate
            "-framerate",
            &framerate.to_string(),
            // force output format to be flv which is commonly used over rtmp
            "-f",
            "flv",
            server_addr,
        ]
        .into_iter()
        .map(String::from)
        .collect()
    }
}

/// A sink that puts pixmap data into an ffmpeg subprocess
#[derive(Debug)]
pub struct FfmpegSink {
    options: FfmpegOptions,
    pixmap: SharedPixmap,
    ffmpeg_proc: Option<Child>,
}

impl FfmpegSink {
    /// Create a new ffmpeg sink which sinks data from the given pixmap into an ffmpeg child process
    pub fn new(options: FfmpegOptions, pixmap: SharedPixmap) -> Self {
        Self {
            options,
            pixmap,
            ffmpeg_proc: None,
        }
    }

    /// Spawn the ffmpeg child process and start sinking data into it
    pub async fn start(mut self, join_set: &mut JoinSet<DaemonResult>) -> anyhow::Result<AbortHandle> {
        self.start_ffmpeg()?;
        let handle = join_set
            .build_task()
            .name("ffmpeg")
            .spawn(async move { self.run().await })?;
        Ok(handle)
    }

    /// Start the ffmpeg child process
    fn start_ffmpeg(&mut self) -> anyhow::Result<()> {
        if self.ffmpeg_proc.is_some() {
            return Err(anyhow!("ffmpeg is already running"));
        }

        let (width, height) = self.pixmap.get_size();

        let mut cmd = Command::new("ffmpeg");
        cmd.stdin(Stdio::piped()).kill_on_drop(true).env_clear();

        // Global Options
        cmd.arg("-hide_banner")
            .arg("-loglevel")
            .arg(&self.options.log_level);

        // Video Input Options
        cmd
            // treat input framerate as fixed and don't buffer it
            .arg("-re")
            // specify input encoding as raw image data in rgb encoding
            .arg("-f")
            .arg("rawvideo")
            .arg("-pix_fmt")
            .arg("rgb24")
            // provide metadata since it is not included in the rawvideo format
            .arg("-video_size")
            .arg(&format!("{}x{}", width, height))
            .arg("-framerate")
            .arg(&self.options.framerate.to_string())
            // tell ffmpeg that it should read input from stdin
            .arg("-i")
            .arg("/dev/stdin");

        // Audio Input Options
        if self.options.synthesize_audio {
            cmd.arg("-f")
                .arg("lavfi")
                .arg("-i")
                .arg("anullsrc=channel_layout=stereo:sample_rate=44100");
        }

        // add output args
        cmd.args(&self.options.output_spec);

        tracing::info!("starting ffmpeg sink with args {:?}", cmd.as_std().get_args());

        self.ffmpeg_proc = Some(cmd.spawn()?);
        Ok(())
    }

    /// Execute the main loop which periodically sinks data into ffmpeg
    async fn run(self) -> anyhow::Result<!> {
        let mut ffmpeg = self.ffmpeg_proc.ok_or(anyhow!("ffmpeg is not running"))?;
        let Some(channel) = &mut ffmpeg.stdin else {
            return Err(anyhow!("ffmpegs stdin is not attached"));
        };

        let mut interval =
            tokio::time::interval(Duration::from_secs_f64(1.0 / self.options.framerate as f64));

        loop {
            let data = unsafe {
                self.pixmap
                    .get_color_data()
                    .iter()
                    .flat_map(|c| Into::<[u8; 3]>::into(*c))
                    .collect::<Vec<_>>()
            };
            channel.write_all(&data).await.expect("Could not write to ffmpeg");

            interval.tick().await;
        }
    }
}