Skip to main content

oximedia_graph/filters/video/
merge.rs

1//! Video merge (picture-in-picture / multi-input compositor) filter.
2//!
3//! The [`MergeFilter`] combines multiple input video streams into a single
4//! output frame.  Each input stream is placed at a configurable position and
5//! optionally scaled to a given size, creating picture-in-picture, split-screen
6//! or tile-grid compositions.
7
8#![forbid(unsafe_code)]
9#![allow(dead_code)]
10#![allow(clippy::cast_possible_truncation)]
11#![allow(clippy::cast_sign_loss)]
12#![allow(clippy::cast_precision_loss)]
13
14use crate::error::{GraphError, GraphResult};
15use crate::frame::FilterFrame;
16use crate::node::{Node, NodeId, NodeState, NodeType};
17use crate::port::{InputPort, OutputPort, PortFormat, PortId, PortType, VideoPortFormat};
18use oximedia_codec::VideoFrame;
19use oximedia_core::PixelFormat;
20
21// ─────────────────────────────────────────────────────────────────────────────
22// Placement
23// ─────────────────────────────────────────────────────────────────────────────
24
25/// Placement and optional resize specification for one input slot.
26#[derive(Debug, Clone)]
27pub struct InputPlacement {
28    /// X offset in the output frame (pixels, top-left origin).
29    pub x: u32,
30    /// Y offset in the output frame (pixels, top-left origin).
31    pub y: u32,
32    /// Optional width override.  `None` keeps the source width.
33    pub width: Option<u32>,
34    /// Optional height override.  `None` keeps the source height.
35    pub height: Option<u32>,
36    /// Blend alpha: 0.0 (transparent) … 1.0 (opaque).
37    pub alpha: f32,
38}
39
40impl Default for InputPlacement {
41    fn default() -> Self {
42        Self {
43            x: 0,
44            y: 0,
45            width: None,
46            height: None,
47            alpha: 1.0,
48        }
49    }
50}
51
52impl InputPlacement {
53    /// Create a placement at the given position with no resize and full opacity.
54    #[must_use]
55    pub fn at(x: u32, y: u32) -> Self {
56        Self {
57            x,
58            y,
59            ..Default::default()
60        }
61    }
62
63    /// Set width/height override.
64    #[must_use]
65    pub fn with_size(mut self, width: u32, height: u32) -> Self {
66        self.width = Some(width);
67        self.height = Some(height);
68        self
69    }
70
71    /// Set the blend alpha.
72    #[must_use]
73    pub fn with_alpha(mut self, alpha: f32) -> Self {
74        self.alpha = alpha.clamp(0.0, 1.0);
75        self
76    }
77}
78
79// ─────────────────────────────────────────────────────────────────────────────
80// MergeConfig
81// ─────────────────────────────────────────────────────────────────────────────
82
83/// Configuration for the [`MergeFilter`].
84#[derive(Debug, Clone)]
85pub struct MergeConfig {
86    /// Width of the output frame.
87    pub output_width: u32,
88    /// Height of the output frame.
89    pub output_height: u32,
90    /// Pixel format for the output frame.
91    pub output_format: PixelFormat,
92    /// Placement spec for each input (index corresponds to input port).
93    pub placements: Vec<InputPlacement>,
94}
95
96impl MergeConfig {
97    /// Create a merge config for `n` inputs into a canvas of the given size.
98    ///
99    /// Inputs are tiled horizontally by default with equal widths.
100    #[must_use]
101    pub fn tiled(n: usize, output_width: u32, output_height: u32) -> Self {
102        let n = n.max(1);
103        let tile_w = output_width / n as u32;
104        let placements = (0..n)
105            .map(|i| InputPlacement::at(i as u32 * tile_w, 0).with_size(tile_w, output_height))
106            .collect();
107        Self {
108            output_width,
109            output_height,
110            output_format: PixelFormat::Yuv420p,
111            placements,
112        }
113    }
114
115    /// Create a picture-in-picture config.
116    ///
117    /// The background is the first input; the overlay is the second input,
118    /// placed at `pip_x, pip_y` with a size of `pip_w × pip_h`.
119    #[must_use]
120    pub fn picture_in_picture(
121        bg_width: u32,
122        bg_height: u32,
123        pip_x: u32,
124        pip_y: u32,
125        pip_w: u32,
126        pip_h: u32,
127    ) -> Self {
128        Self {
129            output_width: bg_width,
130            output_height: bg_height,
131            output_format: PixelFormat::Yuv420p,
132            placements: vec![
133                InputPlacement::at(0, 0).with_size(bg_width, bg_height),
134                InputPlacement::at(pip_x, pip_y).with_size(pip_w, pip_h),
135            ],
136        }
137    }
138
139    /// Number of configured input slots.
140    #[must_use]
141    pub fn input_count(&self) -> usize {
142        self.placements.len()
143    }
144}
145
146// ─────────────────────────────────────────────────────────────────────────────
147// MergeFilter
148// ─────────────────────────────────────────────────────────────────────────────
149
150/// A video filter that composites multiple input streams into a single output
151/// frame.
152///
153/// # Ports
154///
155/// - **Input 0 … N-1** (`"input_0"` … `"input_N-1"`) – one port per input
156///   stream.
157/// - **Output 0** (`"output"`) – the composited output frame.
158///
159/// # Buffering model
160///
161/// Each input slot has an internal queue.  Frames are pushed with
162/// [`MergeFilter::push_input`] and a composite output is generated via
163/// [`Node::process`] once all slots have at least one frame buffered.
164pub struct MergeFilter {
165    id: NodeId,
166    name: String,
167    state: NodeState,
168    config: MergeConfig,
169    inputs: Vec<InputPort>,
170    outputs: Vec<OutputPort>,
171    /// Per-slot frame queues.  Index == input port index.
172    input_queues: Vec<Vec<FilterFrame>>,
173}
174
175impl MergeFilter {
176    /// Create a new [`MergeFilter`] from a [`MergeConfig`].
177    #[must_use]
178    pub fn new(id: NodeId, name: impl Into<String>, config: MergeConfig) -> Self {
179        let n = config.input_count().max(1);
180        let video_format = PortFormat::Video(VideoPortFormat::any());
181
182        let inputs: Vec<InputPort> = (0..n)
183            .map(|i| {
184                InputPort::new(PortId(i as u32), format!("input_{i}"), PortType::Video)
185                    .with_format(video_format.clone())
186            })
187            .collect();
188
189        let outputs =
190            vec![OutputPort::new(PortId(0), "output", PortType::Video).with_format(video_format)];
191
192        let input_queues = vec![Vec::new(); n];
193
194        Self {
195            id,
196            name: name.into(),
197            state: NodeState::Idle,
198            config,
199            inputs,
200            outputs,
201            input_queues,
202        }
203    }
204
205    /// Push a frame into the queue for input slot `port_index`.
206    ///
207    /// # Errors
208    ///
209    /// Returns `Err` when `port_index` is out of range or the frame is not a
210    /// video frame.
211    pub fn push_input(&mut self, port_index: usize, frame: FilterFrame) -> GraphResult<()> {
212        if port_index >= self.input_queues.len() {
213            return Err(GraphError::PortNotFound {
214                node: self.id,
215                port: PortId(port_index as u32),
216            });
217        }
218        if !frame.is_video() {
219            return Err(GraphError::PortTypeMismatch {
220                expected: "Video".to_string(),
221                actual: "Audio".to_string(),
222            });
223        }
224        self.input_queues[port_index].push(frame);
225        Ok(())
226    }
227
228    /// Return `true` when every input slot has at least one buffered frame and
229    /// a composite output is ready to be produced.
230    #[must_use]
231    pub fn is_ready(&self) -> bool {
232        self.input_queues.iter().all(|q| !q.is_empty())
233    }
234
235    /// Number of configured input ports.
236    #[must_use]
237    pub fn input_count(&self) -> usize {
238        self.config.input_count()
239    }
240
241    /// Composite all buffered inputs into a single output frame using a simple
242    /// nearest-neighbour blit.
243    ///
244    /// Each input frame is blitted into the output canvas at the position
245    /// specified by [`InputPlacement`].  Y-plane blending uses integer alpha
246    /// mixing; UV planes are blitted without alpha for simplicity.
247    fn composite(&mut self) -> GraphResult<FilterFrame> {
248        let out_w = self.config.output_width as usize;
249        let out_h = self.config.output_height as usize;
250
251        // Create output frame (Yuv420p: Y = W*H, U = W/2*H/2, V = W/2*H/2).
252        let y_size = out_w * out_h;
253        let uv_size = (out_w / 2) * (out_h / 2);
254        let mut y_plane = vec![0u8; y_size];
255        let mut u_plane = vec![128u8; uv_size]; // neutral chroma
256        let mut v_plane = vec![128u8; uv_size];
257
258        let n = self.input_queues.len();
259        for slot in 0..n {
260            let frame = match self.input_queues[slot].first() {
261                Some(f) => f,
262                None => continue,
263            };
264
265            let placement = match self.config.placements.get(slot) {
266                Some(p) => p.clone(),
267                None => continue,
268            };
269
270            let src_frame: &VideoFrame = match frame {
271                FilterFrame::Video(v) => v,
272                _ => continue,
273            };
274
275            let src_w = src_frame.width as usize;
276            let src_h = src_frame.height as usize;
277            let dst_w = placement.width.unwrap_or(src_frame.width) as usize;
278            let dst_h = placement.height.unwrap_or(src_frame.height) as usize;
279            let dst_x = placement.x as usize;
280            let dst_y = placement.y as usize;
281            let alpha = placement.alpha;
282
283            // Nearest-neighbour blit of Y plane.
284            for dy in 0..dst_h {
285                let oy = dst_y + dy;
286                if oy >= out_h {
287                    break;
288                }
289                let sy = (dy * src_h) / dst_h.max(1);
290                for dx in 0..dst_w {
291                    let ox = dst_x + dx;
292                    if ox >= out_w {
293                        break;
294                    }
295                    let sx = (dx * src_w) / dst_w.max(1);
296
297                    // Try to read from Y plane of the source frame.
298                    let src_val = src_frame
299                        .planes
300                        .first()
301                        .and_then(|p| p.data.get(sy * src_w + sx))
302                        .copied()
303                        .unwrap_or(16);
304
305                    let dst_idx = oy * out_w + ox;
306                    if (alpha - 1.0_f32).abs() < f32::EPSILON {
307                        y_plane[dst_idx] = src_val;
308                    } else {
309                        let bg = y_plane[dst_idx] as f32;
310                        let blended = bg + alpha * (src_val as f32 - bg);
311                        y_plane[dst_idx] = blended.clamp(0.0, 255.0) as u8;
312                    }
313                }
314            }
315
316            // Blit UV planes (half resolution, no alpha blending).
317            let uv_dst_x = dst_x / 2;
318            let uv_dst_y = dst_y / 2;
319            let uv_dst_w = dst_w / 2;
320            let uv_dst_h = dst_h / 2;
321            let uv_src_w = src_w / 2;
322            let uv_src_h = src_h / 2;
323            let uv_out_w = out_w / 2;
324            let uv_out_h = out_h / 2;
325
326            for dy in 0..uv_dst_h {
327                let oy = uv_dst_y + dy;
328                if oy >= uv_out_h {
329                    break;
330                }
331                let sy = (dy * uv_src_h) / uv_dst_h.max(1);
332                for dx in 0..uv_dst_w {
333                    let ox = uv_dst_x + dx;
334                    if ox >= uv_out_w {
335                        break;
336                    }
337                    let sx = (dx * uv_src_w) / uv_dst_w.max(1);
338
339                    let u_val = src_frame
340                        .planes
341                        .get(1)
342                        .and_then(|p| p.data.get(sy * uv_src_w + sx))
343                        .copied()
344                        .unwrap_or(128);
345                    let v_val = src_frame
346                        .planes
347                        .get(2)
348                        .and_then(|p| p.data.get(sy * uv_src_w + sx))
349                        .copied()
350                        .unwrap_or(128);
351
352                    u_plane[oy * uv_out_w + ox] = u_val;
353                    v_plane[oy * uv_out_w + ox] = v_val;
354                }
355            }
356        }
357
358        // Consume one frame from each queue.
359        for queue in &mut self.input_queues {
360            if !queue.is_empty() {
361                queue.remove(0);
362            }
363        }
364
365        // Build output VideoFrame.
366        use oximedia_codec::{FrameType, Plane};
367        use oximedia_core::Rational;
368        let out_frame = VideoFrame {
369            format: PixelFormat::Yuv420p,
370            width: self.config.output_width,
371            height: self.config.output_height,
372            planes: vec![
373                Plane::with_dimensions(
374                    y_plane,
375                    out_w,
376                    self.config.output_width,
377                    self.config.output_height,
378                ),
379                Plane::with_dimensions(
380                    u_plane,
381                    out_w / 2,
382                    self.config.output_width / 2,
383                    self.config.output_height / 2,
384                ),
385                Plane::with_dimensions(
386                    v_plane,
387                    out_w / 2,
388                    self.config.output_width / 2,
389                    self.config.output_height / 2,
390                ),
391            ],
392            timestamp: oximedia_core::Timestamp::new(0, Rational::new(1, 1000)),
393            frame_type: FrameType::Key,
394            color_info: oximedia_codec::ColorInfo::default(),
395            corrupt: false,
396        };
397
398        Ok(FilterFrame::Video(out_frame))
399    }
400}
401
402impl Node for MergeFilter {
403    fn id(&self) -> NodeId {
404        self.id
405    }
406
407    fn name(&self) -> &str {
408        &self.name
409    }
410
411    fn node_type(&self) -> NodeType {
412        NodeType::Filter
413    }
414
415    fn state(&self) -> NodeState {
416        self.state
417    }
418
419    fn set_state(&mut self, state: NodeState) -> GraphResult<()> {
420        if !self.state.can_transition_to(state) {
421            return Err(GraphError::InvalidStateTransition {
422                node: self.id,
423                from: self.state.to_string(),
424                to: state.to_string(),
425            });
426        }
427        self.state = state;
428        Ok(())
429    }
430
431    fn inputs(&self) -> &[InputPort] {
432        &self.inputs
433    }
434
435    fn outputs(&self) -> &[OutputPort] {
436        &self.outputs
437    }
438
439    /// Process by pushing `input` into slot 0 and compositing if ready.
440    ///
441    /// For multi-input composition, push frames to slots 1..N via
442    /// [`MergeFilter::push_input`] before calling `process`.
443    fn process(&mut self, input: Option<FilterFrame>) -> GraphResult<Option<FilterFrame>> {
444        if let Some(frame) = input {
445            self.push_input(0, frame)?;
446        }
447        if self.is_ready() {
448            Ok(Some(self.composite()?))
449        } else {
450            Ok(None)
451        }
452    }
453}
454
455#[cfg(test)]
456mod tests {
457    use super::*;
458
459    fn make_video_frame(w: u32, h: u32) -> FilterFrame {
460        FilterFrame::Video(VideoFrame::new(PixelFormat::Yuv420p, w, h))
461    }
462
463    #[test]
464    fn test_merge_creation_two_inputs() {
465        let config = MergeConfig::tiled(2, 1920, 1080);
466        let merge = MergeFilter::new(NodeId(0), "pip", config);
467        assert_eq!(merge.input_count(), 2);
468        assert_eq!(merge.inputs().len(), 2);
469        assert_eq!(merge.outputs().len(), 1);
470    }
471
472    #[test]
473    fn test_merge_pip_config() {
474        let config = MergeConfig::picture_in_picture(1920, 1080, 100, 100, 480, 270);
475        assert_eq!(config.input_count(), 2);
476        assert_eq!(config.output_width, 1920);
477        assert_eq!(config.output_height, 1080);
478    }
479
480    #[test]
481    fn test_merge_not_ready_with_empty_queues() {
482        let config = MergeConfig::tiled(2, 640, 480);
483        let merge = MergeFilter::new(NodeId(0), "m", config);
484        assert!(!merge.is_ready());
485    }
486
487    #[test]
488    fn test_merge_ready_after_all_inputs_pushed() {
489        let config = MergeConfig::tiled(2, 640, 480);
490        let mut merge = MergeFilter::new(NodeId(0), "m", config);
491        merge
492            .push_input(0, make_video_frame(320, 480))
493            .expect("push should succeed");
494        assert!(!merge.is_ready());
495        merge
496            .push_input(1, make_video_frame(320, 480))
497            .expect("push should succeed");
498        assert!(merge.is_ready());
499    }
500
501    #[test]
502    fn test_merge_process_produces_output() {
503        let config = MergeConfig::tiled(2, 640, 480);
504        let mut merge = MergeFilter::new(NodeId(0), "m", config);
505        merge
506            .push_input(0, make_video_frame(320, 480))
507            .expect("push should succeed");
508        merge
509            .push_input(1, make_video_frame(320, 480))
510            .expect("push should succeed");
511        let result = merge.process(None).expect("process should succeed");
512        assert!(result.is_some());
513        if let Some(FilterFrame::Video(v)) = result {
514            assert_eq!(v.width, 640);
515            assert_eq!(v.height, 480);
516        } else {
517            panic!("expected video frame");
518        }
519    }
520
521    #[test]
522    fn test_merge_process_without_all_inputs_returns_none() {
523        let config = MergeConfig::tiled(2, 640, 480);
524        let mut merge = MergeFilter::new(NodeId(0), "m", config);
525        merge
526            .push_input(0, make_video_frame(320, 480))
527            .expect("push should succeed");
528        let result = merge.process(None).expect("process should succeed");
529        assert!(result.is_none());
530    }
531
532    #[test]
533    fn test_merge_push_invalid_port_returns_error() {
534        let config = MergeConfig::tiled(2, 640, 480);
535        let mut merge = MergeFilter::new(NodeId(0), "m", config);
536        let result = merge.push_input(99, make_video_frame(320, 480));
537        assert!(result.is_err());
538    }
539
540    #[test]
541    fn test_merge_input_port_names() {
542        let config = MergeConfig::tiled(3, 1920, 1080);
543        let merge = MergeFilter::new(NodeId(0), "m", config);
544        assert_eq!(merge.inputs()[0].name, "input_0");
545        assert_eq!(merge.inputs()[1].name, "input_1");
546        assert_eq!(merge.inputs()[2].name, "input_2");
547    }
548
549    #[test]
550    fn test_merge_node_type_is_filter() {
551        let config = MergeConfig::tiled(2, 640, 480);
552        let merge = MergeFilter::new(NodeId(0), "m", config);
553        assert_eq!(merge.node_type(), NodeType::Filter);
554    }
555
556    #[test]
557    fn test_merge_placement_at() {
558        let p = InputPlacement::at(10, 20);
559        assert_eq!(p.x, 10);
560        assert_eq!(p.y, 20);
561        assert!((p.alpha - 1.0).abs() < 1e-6);
562    }
563
564    #[test]
565    fn test_merge_placement_with_size() {
566        let p = InputPlacement::at(0, 0).with_size(480, 270);
567        assert_eq!(p.width, Some(480));
568        assert_eq!(p.height, Some(270));
569    }
570
571    #[test]
572    fn test_merge_placement_alpha_clamp() {
573        let p = InputPlacement::default().with_alpha(1.5);
574        assert!((p.alpha - 1.0).abs() < 1e-6);
575        let p2 = InputPlacement::default().with_alpha(-0.5);
576        assert!((p2.alpha).abs() < 1e-6);
577    }
578
579    #[test]
580    fn test_merge_state_transitions() {
581        let config = MergeConfig::tiled(2, 640, 480);
582        let mut merge = MergeFilter::new(NodeId(0), "m", config);
583        merge
584            .set_state(NodeState::Processing)
585            .expect("state transition should succeed");
586        assert_eq!(merge.state(), NodeState::Processing);
587    }
588}