gst-plugin-zenoh
A high-performance GStreamer plugin that enables distributed media streaming using Zenoh as the transport layer. Built with zenoh-rs for maximum performance and reliability.
Overview
The plugin provides three GStreamer elements that bridge GStreamer pipelines with Zenoh networks:
zenohsink: Publishes GStreamer buffers to Zenoh networkszenohsrc: Subscribes to Zenoh data and delivers it to GStreamer pipelineszenohdemux: Demultiplexes Zenoh streams by key expression, creating dynamic pads for each unique key
Together, these elements enable distributed media applications, edge computing scenarios, robotics systems, IoT data streaming, and more.
๐ Key Features
Advanced Quality of Service (QoS)
- Reliability Modes: Choose between
best-effort(low latency) andreliable(guaranteed delivery) - Congestion Control: Handle network congestion with
block(ensure delivery) ordrop(maintain real-time performance) - Priority Management: Message prioritization using Zenoh Priority levels (1-7) for intelligent bandwidth allocation
Performance Optimization
- Express Mode: Ultra-low latency mode that bypasses internal queues
- Session Sharing: Efficient resource usage through shared Zenoh sessions
- Batch Rendering: Efficient buffer list processing for high-throughput scenarios
- Responsive State Changes: Sub-second response to pipeline state changes with proper unlock/flush support
- Zero-Copy Data Paths: Minimal overhead with Cow-based buffer handling when compression is disabled
- Optional Compression: Reduce bandwidth usage with Zstandard, LZ4, or Gzip compression (compile-time optional)
- Buffer Metadata Preservation: PTS, DTS, duration, and flags preserved across Zenoh transport for proper A/V sync
Flexible Configuration
- URI Handler Support: Configure elements using standard GStreamer URI syntax (e.g.,
zenoh:demo/video?priority=2&reliability=reliable) - Runtime Properties: Configure QoS parameters dynamically
- Zenoh Config Files: Support for comprehensive Zenoh network configuration
- Key Expression Patterns: Flexible topic naming with wildcard support
Automatic Format Negotiation
- Caps Transmission: GStreamer capabilities automatically transmitted with first buffer
- Metadata Support: Custom key-value metadata can be attached to streams
- Zero Configuration: Receiver automatically configures based on sender's format
- Format Changes: Supports dynamic format changes during streaming
Production Monitoring
- Real-time Statistics: Track bytes sent/received, message counts, errors, and dropped packets
- Read-only Properties: Monitor performance without affecting operation
- Thread-safe Updates: Atomic statistics updates for accurate metrics
Enterprise Ready
- Rich Error Messages: Contextual error messages with troubleshooting guidance
- Comprehensive Error Handling: 10 specific error types with helpful diagnostics
- Thread Safety: Safe concurrent access to all plugin components
- Property Locking: Runtime protection against invalid configuration changes
- Extensive Testing: 101 comprehensive tests ensuring reliability
Quick Start
Installation
-
Install Dependencies (Ubuntu/Debian):
For Fedora/RHEL:
-
Build the Plugin:
# Basic build (no compression) # With all compression algorithms # With specific compression algorithms -
Run Examples:
# Basic video streaming demonstration GST_PLUGIN_PATH=target/debug # Comprehensive QoS configuration showcase GST_PLUGIN_PATH=target/debug
Simple Streaming Example
# Terminal 1: Start video publisher
# Terminal 2: Start video subscriber
๐ Advanced Pipeline Examples
High-Performance Video Streaming
# Ultra-low latency streaming with express mode
# Reliable HD streaming with error recovery
Multi-Stream Applications
# Camera + Audio streaming
# Multi-camera setup with priorities
IoT and Sensor Data
# Sensor data with custom Zenoh configuration
# Wildcard subscription for multiple sensors
Edge Computing Scenarios
# Edge AI processing pipeline
๐๏ธ Compression Support
The plugin supports optional compression to reduce bandwidth usage. Compression is compile-time optional and must be enabled via Cargo features.
Available Compression Algorithms
| Algorithm | Feature Flag | Characteristics | Best For |
|---|---|---|---|
| Zstandard | compression-zstd |
Best compression ratio, good speed | General purpose, bandwidth-limited networks |
| LZ4 | compression-lz4 |
Fastest compression, lower ratio | Low-latency, CPU-constrained systems |
| Gzip | compression-gzip |
Widely compatible, moderate speed | Cross-platform compatibility |
Building with Compression
# Enable all compression algorithms
# Enable specific algorithms
Usage
Compression is configured on the sender side (zenohsink) and automatically detected and decompressed on the receiver side (zenohsrc).
# Sender with Zstandard compression (recommended)
# Receiver (automatically decompresses)
Compression Levels
- 1-3: Fast compression, larger output (low CPU usage)
- 4-6: Balanced (recommended for most use cases)
- 7-9: Maximum compression, slower (high CPU usage)
Compression Statistics
When compression is enabled, zenohsink provides additional statistics:
| Property | Description |
|---|---|
bytes-before-compression |
Total bytes before compression |
bytes-after-compression |
Total bytes after compression (network usage) |
Calculate compression ratio:
# Query compression statistics
|
# Example: 1GB before -> 300MB after = 70% bandwidth savings
Performance Considerations
- Zstandard: Best all-around choice, excellent compression at level 5
- LZ4: Choose when CPU is limited or ultra-low latency is critical
- Gzip: Use for compatibility with non-Rust receivers
๐ Statistics Monitoring
Both zenohsink and zenohsrc provide real-time statistics for monitoring performance and debugging issues. All statistics properties are read-only and thread-safe.
ZenohSink Statistics
| Property | Type | Description |
|---|---|---|
bytes-sent |
UInt64 | Total bytes published to Zenoh network (after compression if enabled) |
messages-sent |
UInt64 | Total number of buffers published |
errors |
UInt64 | Number of publish errors encountered |
dropped |
UInt64 | Number of buffers dropped due to congestion (when congestion-control=drop) |
bytes-before-compression |
UInt64 | Total bytes before compression (compression features only) |
bytes-after-compression |
UInt64 | Total bytes after compression (compression features only) |
ZenohSrc Statistics
| Property | Type | Description |
|---|---|---|
bytes-received |
UInt64 | Total bytes received from Zenoh network |
messages-received |
UInt64 | Total number of buffers received |
errors |
UInt64 | Number of receive errors encountered |
dropped |
UInt64 | Number of samples dropped (reserved for future use) |
Monitoring Examples
# Monitor statistics in real-time using gst-launch watch mode
GST_DEBUG=zenohsink:5
# Query statistics programmatically in a script
& \
PIPELINE_PID=
# Use gst-inspect or property queries to read statistics
Programmatic Statistics Access (Rust)
use *;
// Create pipeline with named sink
let pipeline = parse_launch?;
// Get the zenohsink element
let sink = pipeline
.by_name
.expect;
// Start pipeline
pipeline.set_state?;
// Monitor statistics periodically
loop
pipeline.set_state?;
๐ URI Handler Support
Both elements implement the GStreamer URIHandler interface, allowing configuration via URI syntax. This provides a convenient alternative to setting individual properties.
URI Syntax
zenoh:<key-expression>[?<parameter>=<value>&...]
Supported URI Parameters
| Parameter | Values | Example |
|---|---|---|
priority |
1-7 | priority=2 |
reliability |
best-effort, reliable |
reliability=reliable |
congestion-control |
block, drop |
congestion-control=drop |
express |
true, false |
express=true |
config |
File path | config=/etc/zenoh/config.json5 |
URI Examples
# Simple key expression only
# With QoS parameters
# Full configuration with custom Zenoh config
# Receiving with URI
# Wildcard subscription
URI vs Properties
Both methods are equivalent and can be mixed:
# Using individual properties
# Using URI (equivalent)
# Mixed approach (URI sets base, properties override)
Programmatic URI Usage (Rust)
use *;
// Create element and set URI
let sink = make.build?;
// Set URI using URIHandler interface
if let Some = sink.
// Or use the uri property directly
sink.set_property;
// Read back current URI
if let Some = sink.
โ๏ธ Element Properties
ZenohSink Properties
| Property | Type | Default | Description |
|---|---|---|---|
key-expr |
String | required | Zenoh key expression for publishing (e.g., "demo/video/stream") |
config |
String | null |
Path to Zenoh configuration file for custom network settings |
priority |
Integer | 5 |
Publisher priority (1-7). Lower values = higher priority. 1=RealTime, 2=InteractiveHigh, 3=InteractiveLow, 4=DataHigh, 5=Data, 6=DataLow, 7=Background |
congestion-control |
String | "block" |
Congestion handling: "block" (wait) or "drop" (discard messages) |
reliability |
String | "best-effort" |
Delivery mode: "best-effort" (fast) or "reliable" (guaranteed) |
express |
Boolean | false |
Enable express mode for ultra-low latency (bypasses internal queues) |
send-caps |
Boolean | true |
Enable caps transmission as metadata (automatic format negotiation) |
caps-interval |
Integer | 1 |
Interval in seconds to send caps periodically (0 = only first buffer and format changes) |
compression |
Enum | none |
Compression algorithm: none, zstd, lz4, or gzip (requires compilation with compression features) |
compression-level |
Integer | 5 |
Compression level (1=fastest/largest, 9=slowest/smallest, 5=balanced) |
send-buffer-meta |
Boolean | true |
Send buffer metadata (PTS, DTS, duration, flags) as Zenoh attachments. Enables proper A/V synchronization on receiver. |
Usage Examples:
# High priority reliable streaming (RealTime priority)
# Real-time best-effort streaming (InteractiveHigh priority)
# Minimal bandwidth: send caps only on first buffer and format changes
# Disable caps entirely for absolute minimal overhead
# Compression examples (requires compression features enabled at compile time)
# High compression for bandwidth-limited networks (Zstandard)
# Balanced compression (recommended for most cases)
# Fast compression with minimal CPU overhead (LZ4)
# Compatible compression (Gzip - widely supported)
ZenohSrc Properties
| Property | Type | Default | Description |
|---|---|---|---|
key-expr |
String | required | Zenoh key expression for subscription (supports wildcards: *, **) |
config |
String | null |
Path to Zenoh configuration file |
priority |
Integer | 5 |
|
congestion-control |
String | "block" |
Informational only - actual behavior determined by publisher |
reliability |
String | "best-effort" |
Expected reliability mode - actual mode matches publisher |
receive-timeout-ms |
Integer | 1000 |
Timeout in milliseconds for receiving samples. Controls how long create() waits for data before checking for shutdown signals. |
apply-buffer-meta |
Boolean | true |
Apply buffer metadata (PTS, DTS, duration, flags) from Zenoh attachments. Enables proper A/V synchronization. |
Wildcard Examples:
# Subscribe to all video streams from a device
# Subscribe to all sensor data
# Subscribe to specific data types across all devices
ZenohDemux Properties
| Property | Type | Default | Description |
|---|---|---|---|
key-expr |
String | required | Zenoh key expression for subscription (supports wildcards: *, **) |
config |
String | null |
Path to Zenoh configuration file |
priority |
Integer | 5 |
Subscriber priority (1-7). Lower values = higher priority. |
reliability |
String | "best-effort" |
Expected reliability mode - actual mode matches publisher |
pad-naming |
Enum | full-path |
Pad naming strategy: full-path (use full key expression), last-segment (use last path segment), hash (use hash of key expression) |
apply-buffer-meta |
Boolean | true |
Apply buffer metadata (PTS, DTS, duration, flags) from Zenoh attachments |
Statistics (read-only):
| Property | Type | Description |
|---|---|---|
bytes-received |
UInt64 | Total bytes received from Zenoh network |
messages-received |
UInt64 | Total number of buffers received |
errors |
UInt64 | Number of receive errors encountered |
pads-created |
UInt64 | Number of dynamic source pads created |
ZenohDemux Examples:
# Demultiplex all sensor streams - creates one pad per unique key expression
# Use last segment for pad naming (e.g., "temperature" instead of "sensors/device1/temperature")
# Multi-camera demultiplexing with hash-based pad naming
Quality of Service (QoS) Guidelines
Reliability Modes
best-effort: Minimal latency, no delivery guarantees- Use for: Live video, real-time sensor data, gaming
- Latency: ~1-5ms additional
reliable: Guaranteed delivery with acknowledgments- Use for: Command & control, configuration updates, critical alerts
- Latency: ~10-50ms additional (network dependent)
Congestion Control
block: Pause publishing during network congestion- Use for: Critical data that cannot be lost
- Behavior: May cause frame drops if buffers fill up
drop: Discard messages during congestion- Use for: Real-time streams where recent data is most valuable
- Behavior: Maintains smooth streaming with occasional quality loss
Priority Levels (Zenoh Priority Enum)
The plugin uses Zenoh's built-in Priority enum with 7 levels (lower number = higher priority):
| Value | Zenoh Priority | Use Case | Example Applications |
|---|---|---|---|
| 1 | RealTime |
Critical real-time systems | Safety systems, emergency alerts |
| 2 | InteractiveHigh |
High-priority interactive | Live video calls, remote control |
| 3 | InteractiveLow |
Standard interactive | User interfaces, notifications |
| 4 | DataHigh |
Important data transfer | Configuration updates, commands |
| 5 | Data |
Normal data (default) | Regular media streaming, telemetry |
| 6 | DataLow |
Low-priority data | Logs, historical data |
| 7 | Background |
Background tasks | File transfers, bulk operations |
Note: These priorities only take effect when Zenoh QoS is enabled in the network configuration.
Express Mode
- Enabled: Bypass internal queues for minimum latency
- Use for: Ultra-low latency requirements (<1ms additional)
- Trade-off: Higher CPU usage, potential message reordering
- Disabled: Standard processing path
- Use for: Normal applications where latency is not critical
- Benefit: Lower CPU usage, guaranteed message ordering
๐งช Development & Testing
Running Tests
# Run all tests
# Run specific test suites
# With verbose output
Test Coverage
The comprehensive test suite includes:
- โ Element Creation: Plugin registration and factory tests
- โ Property Validation: QoS parameter validation and boundary testing
- โ Configuration Management: Settings validation and runtime property locking
- โ Error Handling: Network failure recovery and invalid input handling
- โ State Management: Element lifecycle and transition testing
- โ Integration Testing: End-to-end pipeline validation
Code Quality
# Check code formatting
# Run linting
# Run security audit
๐๏ธ Architecture
Session Management
The plugin implements a flexible session architecture supporting both owned and shared Zenoh sessions:
// Architectural overview
This design enables:
- Resource Efficiency: Multiple elements can share a single network connection
- Scalability: Reduced memory and network overhead for multi-element pipelines
- Flexibility: Mix of shared and dedicated sessions based on requirements
Thread Safety
All plugin components are designed for safe concurrent access:
- Mutex-Protected State: Element state and configuration are thread-safe
- Lock-Free Data Paths: Hot paths avoid locking where possible
- Property Locking: Runtime configuration changes are safely managed
Error Handling
Robust error handling throughout the plugin:
- Network Failures: Automatic retry and reconnection logic
- Invalid Configuration: Graceful degradation with warning messages
- Resource Exhaustion: Proper cleanup and resource management
- GStreamer Integration: Native GStreamer error reporting
๐ค Contributing
Development Setup
- Install Rust: https://rustup.rs/
- Install GStreamer development libraries (see Quick Start)
- Clone and build:
Coding Standards
- Follow Rust standard formatting (
cargo fmt) - Address all clippy warnings (
cargo clippy) - Add tests for new functionality
- Update documentation for API changes
- Follow semantic versioning for releases
Pull Request Process
- Fork the repository
- Create a feature branch
- Add tests for new functionality
- Ensure all tests pass
- Update documentation
- Submit pull request with clear description
๐ License
This project is licensed under the Mozilla Public License 2.0 - see the LICENSE file for details.
๐ Acknowledgments
- Eclipse Zenoh team for the excellent protocol and Rust implementation
- GStreamer community for the multimedia framework
- gtk-rs team for GStreamer Rust bindings