Crate flute

Crate flute 

Source
Expand description

Rust Python Docs.rs Crates.io Rust Dependency codecov

§FLUTE - File Delivery over Unidirectional Transport

Massively scalable multicast distribution solution

The library implements a unidirectional file delivery, without the need of a return channel.

§RFC

This library implements the following RFCs

RFCTitleLink
RFC 6726FLUTE - File Delivery over Unidirectional Transporthttps://www.rfc-editor.org/rfc/rfc6726.html
RFC 5775Asynchronous Layered Coding (ALC) Protocol Instantiationhttps://www.rfc-editor.org/rfc/rfc5775.html
RFC 5661Layered Coding Transport (LCT) Building Blockhttps://www.rfc-editor.org/rfc/rfc5651
RFC 5052Forward Error Correction (FEC) Building Blockhttps://www.rfc-editor.org/rfc/rfc5052
RFC 5510Reed-Solomon Forward Error Correction (FEC) Schemeshttps://www.rfc-editor.org/rfc/rfc5510.html
3GPP TS 26.346Extended FLUTE FDT Schema (7.2.10)https://www.etsi.org/deliver/etsi_ts/126300_126399/126346/17.03.00_60/ts_126346v170300p.pdf

§Thread Safety

§FLUTE Sender

The FLUTE Sender is designed to be safely shared between multiple threads.

§FLUTE Receiver and Tokio Integration

Unlike the sender, the FLUTE Receiver is not thread-safe and cannot be shared between multiple threads. To integrate it with Tokio, you must use tokio::task::LocalSet, which allows spawning tasks that require a single-threaded runtime.

The following example demonstrates how to use the FLUTE Receiver with Tokio:

 use flute::receiver::{writer, MultiReceiver};
 use std::rc::Rc;
 
#[tokio::main]
async fn main() {
    let local = task::LocalSet::new();
    // Run the local task set.
    local.run_until(async move {
        let nonsend_data = nonsend_data.clone();
        task::spawn_local(async move {
            let writer = Rc::new(writer::ObjectWriterFSBuilder::new(&std::path::Path::new("./flute_dir"), true).unwrap_or_else(|_| std::process::exit(0)));
            let mut receiver = MultiReceiver::new(writer, None, false);
            // ... run the receiver
        }).await.unwrap();
    }).await;
}

§UDP/IP Multicast files sender

Transfer files over a UDP/IP network

 use flute::sender::Sender;
 use flute::sender::ObjectDesc;
 use flute::core::lct::Cenc;
 use flute::core::UDPEndpoint;
 use std::net::UdpSocket;
 use std::time::SystemTime;

 // Create UDP Socket
 let udp_socket = UdpSocket::bind("0.0.0.0:0").unwrap();
 udp_socket.connect("224.0.0.1:3400").expect("Connection failed");

 // Create FLUTE Sender
 let tsi = 1;
 let oti = Default::default();
 let config = Default::default();
 let endpoint = UDPEndpoint::new(None, "224.0.0.1".to_string(), 3400);
 let mut sender = Sender::new(endpoint, tsi, &oti, &config);

 // Add object(s) (files) to the FLUTE sender (priority queue 0)
 let obj = ObjectDesc::create_from_buffer(b"hello world".to_vec(), "text/plain",
 &url::Url::parse("file:///hello.txt").unwrap(), 1, None, None, None, None, Cenc::Null, true, None, true).unwrap();
 sender.add_object(0, obj);

 // Always call publish after adding objects when FDT publish mode is FullFDT
 sender.publish(SystemTime::now());

 // Send FLUTE packets over UDP/IP
 while let Some(pkt) = sender.read(SystemTime::now()) {
     udp_socket.send(&pkt).unwrap();
     std::thread::sleep(std::time::Duration::from_millis(1));
 }

§UDP/IP Multicast files receiver

Receive files from a UDP/IP network

 use flute::receiver::{writer, MultiReceiver};
 use flute::core::UDPEndpoint;
 use std::net::UdpSocket;
 use std::time::SystemTime;
 use std::rc::Rc;

 // Create UDP/IP socket to receive FLUTE pkt
 let endpoint = UDPEndpoint::new(None, "224.0.0.1".to_string(), 3400);
 let udp_socket = UdpSocket::bind(format!("{}:{}", endpoint.destination_group_address, endpoint.port)).expect("Fail to bind");

 // Create a writer able to write received files to the filesystem
 let enable_md5_check = true;
 let writer = Rc::new(writer::ObjectWriterFSBuilder::new(&std::path::Path::new("./flute_dir"), enable_md5_check)
     .unwrap_or_else(|_| std::process::exit(0)));

 // Create a multi-receiver capable of de-multiplexing several FLUTE sessions
 let mut receiver = MultiReceiver::new(writer, None, false);

 // Receive pkt from UDP/IP socket and push it to the FLUTE receiver
 let mut buf = [0; 2048];
 loop {
     let (n, _src) = udp_socket.recv_from(&mut buf).expect("Failed to receive data");
     let now = SystemTime::now();
     receiver.push(&endpoint, &buf[..n], now).unwrap();
     receiver.cleanup(now);
 }

§Application-Level Forward Erasure Correction (AL-FEC)

The following error recovery algorithms are supported

  • No-code
  • Reed-Solomon GF 2^8
  • Reed-Solomon GF 2^8 Under Specified
  • Reed-Solomon GF 2^16
  • Reed-Solomon GF 2^m
  • RaptorQ
  • Raptor

The Oti module provides an implementation of the Object Transmission Information (OTI) used to configure Forward Error Correction (FEC) encoding in the FLUTE protocol.

 use flute::sender::Sender;
 use flute::core::Oti;
 use flute::core::UDPEndpoint;

 // Reed Solomon 2^8 with encoding blocks composed of  
 // 60 source symbols and 4 repair symbols of 1424 bytes per symbol
 let endpoint = UDPEndpoint::new(None, "224.0.0.1".to_string(), 3400);
 let oti = Oti::new_reed_solomon_rs28(1424, 60, 4).unwrap();
 let mut sender = Sender::new(endpoint, 1, &oti, &Default::default());

§Content Encoding (CENC)

The following schemes are supported during the transmission/reception

  • Null (no compression)
  • Deflate
  • Zlib
  • Gzip

§Files multiplex / Blocks interleave

The FLUTE Sender is able to transfer multiple files in parallel by interleaving packets from each file. For example:

Pkt file1 -> Pkt file2 -> Pkt file3 -> Pkt file1 -> Pkt file2 -> Pkt file3 …

The Sender can interleave blocks within a single file.
The following example shows Encoding Symbols (ES) from different blocks (B) are interleaved. For example:

(B 1,ES 1)->(B 2,ES 1)->(B 3,ES 1)->(B 1,ES 2)->(B 2,ES 2)…

To configure the multiplexing, use the Config struct as follows:

 use flute::sender::Sender;
 use flute::sender::Config;
 use flute::sender::PriorityQueue;
 use flute::core::UDPEndpoint;

 let mut config = Config {
     // Interleave a maximum of 3 blocks within each file
     interleave_blocks: 3,
     ..Default::default()
 };

 // Interleave a maximum of 3 files in priority queue '0'
 config.set_priority_queue(PriorityQueue::HIGHEST, PriorityQueue::new(3));

 let endpoint = UDPEndpoint::new(None, "224.0.0.1".to_string(), 3400);
 let mut sender = Sender::new(endpoint, 1, &Default::default(), &config);

§Priority Queues

FLUTE sender can be configured with multiple queues, each having a different priority level.
Files in higher priority queues are always transferred before files in lower priority queues.
Transfer of files in lower priority queues is paused while there are files to be transferred in higher priority queues.

 
 use flute::sender::Sender;
 use flute::sender::Config;
 use flute::sender::PriorityQueue;
 use flute::core::UDPEndpoint;
 use flute::sender::ObjectDesc;
 use flute::core::lct::Cenc;

 // Create a default configuration
 let mut config: flute::sender::Config = Default::default();

 // Configure the HIGHEST priority queue with a capacity of 3 simultaneous file transfer
 config.set_priority_queue(PriorityQueue::HIGHEST, PriorityQueue::new(3));

 // Configure the LOW priority queue with a capacity of 1 file transfer at a time
 config.set_priority_queue(PriorityQueue::LOW, PriorityQueue::new(1));

 let endpoint = UDPEndpoint::new(None, "224.0.0.1".to_string(), 3400);
 let mut sender = Sender::new(endpoint, 1, &Default::default(), &config);

 // Create an ObjectDesc for a low priority file
 let low_priority_obj = ObjectDesc::create_from_buffer(b"low priority".to_vec(), "text/plain",
 &url::Url::parse("file:///low_priority.txt").unwrap(), 1, None, None, None, None, Cenc::Null, true, None, true).unwrap();

 // Create an ObjectDesc for a high priority file
 let high_priority_obj = ObjectDesc::create_from_buffer(b"high priority".to_vec(), "text/plain",
 &url::Url::parse("file:///high_priority.txt").unwrap(), 1, None, None, None, None, Cenc::Null, true, None, true).unwrap();

 // Put Object to the low priority queue
 sender.add_object(PriorityQueue::LOW, low_priority_obj);

 // Put Object to the high priority queue
 sender.add_object(PriorityQueue::HIGHEST, high_priority_obj);

§Bitrate Control

The FLUTE library does not provide a built-in bitrate control mechanism. Users are responsible for controlling the bitrate by sending packets at a specific rate. However, the library offers a way to control the target transfer duration or the target transfer end time for each file individually.

To ensure proper functionality, the target transfer mechanism requires that the overall bitrate is sufficiently high.

§Target Transfer Duration

The sender will attempt to transfer the file within the specified duration.

 
 use flute::sender::Sender;
 use flute::sender::ObjectDesc;
 use flute::sender::TargetAcquisition;
 use flute::core::lct::Cenc;
 use flute::core::UDPEndpoint;
 use std::net::UdpSocket;
 use std::time::SystemTime;

 // Create UDP Socket
 let udp_socket = UdpSocket::bind("0.0.0.0:0").unwrap();
 udp_socket.connect("224.0.0.1:3400").expect("Connection failed");

 // Create FLUTE Sender
 let tsi = 1;
 let oti = Default::default();
 let config = Default::default();
 let endpoint = UDPEndpoint::new(None, "224.0.0.1".to_string(), 3400);
 let mut sender = Sender::new(endpoint, tsi, &oti, &config);

 // Create an Object
 let mut obj = ObjectDesc::create_from_buffer(b"hello world".to_vec(), "text/plain",
 &url::Url::parse("file:///hello.txt").unwrap(), 1, None, None, None, None, Cenc::Null, true, None, true).unwrap();
 
 // Set the Target Transfer Duration of this object to 2 seconds
 obj.target_acquisition = Some(TargetAcquisition::WithinDuration(std::time::Duration::from_secs(2)));
 
 // Add object(s) (files) to the FLUTE sender (priority queue 0)
 sender.add_object(0, obj);

 // Always call publish after adding objects when FDT publish mode is FullFDT when FDT publish mode is FullFDT
 sender.publish(SystemTime::now());

 // Send FLUTE packets over UDP/IP
 while sender.nb_objects() > 0  {
     if let Some(pkt) = sender.read(SystemTime::now()) {
         udp_socket.send(&pkt).unwrap();
     } else {
        std::thread::sleep(std::time::Duration::from_millis(1));
     }
 }

§Target Time to End Transfer

The sender will try to finish the file at the specified time.

 
 use flute::sender::Sender;
 use flute::sender::ObjectDesc;
 use flute::sender::TargetAcquisition;
 use flute::core::lct::Cenc;
 use flute::core::UDPEndpoint;
 use std::net::UdpSocket;
 use std::time::SystemTime;

 // Create UDP Socket
 let udp_socket = UdpSocket::bind("0.0.0.0:0").unwrap();
 udp_socket.connect("224.0.0.1:3400").expect("Connection failed");

 // Create FLUTE Sender
 let tsi = 1;
 let oti = Default::default();
 let config = Default::default();
 let endpoint = UDPEndpoint::new(None, "224.0.0.1".to_string(), 3400);
 let mut sender = Sender::new(endpoint, tsi, &oti, &config);

 // Create an Object
 let mut obj = ObjectDesc::create_from_buffer(b"hello world".to_vec(), "text/plain",
 &url::Url::parse("file:///hello.txt").unwrap(), 1, None, None, None, None, Cenc::Null, true, None, true).unwrap();
 
 // Set the Target Transfer End Time of this object to 10 seconds from now
 let target_end_time = SystemTime::now() + std::time::Duration::from_secs(10);
 obj.target_acquisition = Some(TargetAcquisition::WithinTime(target_end_time));
 
 // Add object(s) (files) to the FLUTE sender (priority queue 0)
 sender.add_object(0, obj);

 // Always call publish after adding objects when FDT publish mode is FullFDT
 sender.publish(SystemTime::now());
 
 // Send FLUTE packets over UDP/IP
 while sender.nb_objects() > 0  {
     if let Some(pkt) = sender.read(SystemTime::now()) {
         udp_socket.send(&pkt).unwrap();
     } else {
        std::thread::sleep(std::time::Duration::from_millis(1));
     }
 }

§Carouseling

The FLUTE library supports carouseling, a mechanism that continuously re-transmits files in a loop. This is useful for scenarios such as broadcasting where a receiver may join at any time and still receive the full file.

A file remains in the carousel and is re-transferred repeatedly until explicitly removed. The repetition behavior is controlled by the CarouselRepeatMode when creating an object, which offers two modes:

§Fixed Delay After Each Transfer

This mode waits for a fixed delay after the end of each transfer before starting the next one using CarouselRepeatMode::DelayBetweenTransfers.

 | Transfer Object | Fixed Delay | Transfer Object | Fixed Delay | ...
 use flute::sender::Sender;
 use flute::sender::ObjectDesc;
 use flute::sender::CarouselRepeatMode;
 use flute::core::lct::Cenc;
 use flute::core::UDPEndpoint;
 use std::net::UdpSocket;
 use std::time::SystemTime;
 
 // Create UDP Socket
 let udp_socket = UdpSocket::bind("0.0.0.0:0").unwrap();
 udp_socket.connect("224.0.0.1:3400").expect("Connection failed");
 
 // Create FLUTE Sender
 let tsi = 1;
 let oti = Default::default();
 let config = Default::default();
 let endpoint = UDPEndpoint::new(None, "224.0.0.1".to_string(), 3400);
 let mut sender = Sender::new(endpoint, tsi, &oti, &config);
 
 // 10s delay after each transfer
 let carousel_mode = CarouselRepeatMode::DelayBetweenTransfers(std::time::Duration::from_secs(10));
 
 // Create an Object
 let mut obj = ObjectDesc::create_from_buffer(b"hello world".to_vec(), "text/plain",
 &url::Url::parse("file:///hello.txt").unwrap(), 1, 
 Some(carousel_mode), None, None, None, Cenc::Null, true, None, true).unwrap();
 
 // Add object(s) (files) to the FLUTE sender (priority queue 0)
 sender.add_object(0, obj);
 
 // Always call publish after adding objects when FDT publish mode is FullFDT
 sender.publish(SystemTime::now());
 
 // Send FLUTE packets over UDP/IP
 while sender.nb_objects() > 0  {
     if let Some(pkt) = sender.read(SystemTime::now()) {
         udp_socket.send(&pkt).unwrap();
     } else {
        std::thread::sleep(std::time::Duration::from_millis(1));
     }
 }

§Fix interval between 2 transfer start

CarouselRepeatMode::IntervalBetweenStartTimes : This mode ensures each new transfer starts at a fixed interval, regardless of the duration of the previous one.

⚠️ Note: If the transfer of an object takes longer than the specified interval, the actual interval will be longer.
It is the application’s responsibility to ensure that the FLUTE channel bitrate is high enough to meet the interval timing.

 | Transfer Object 1 | Adaptative Delay | Transfer Object 1 | Adaptative Delay |
 | ------------Fixed Interval-----------| ----------- Fixed Interval-----------|
 use flute::sender::Sender;
 use flute::sender::ObjectDesc;
 use flute::sender::CarouselRepeatMode;
 use flute::core::lct::Cenc;
 use flute::core::UDPEndpoint;
 use std::net::UdpSocket;
 use std::time::SystemTime;
 
 // Create UDP Socket
 let udp_socket = UdpSocket::bind("0.0.0.0:0").unwrap();
 udp_socket.connect("224.0.0.1:3400").expect("Connection failed");
 
 // Create FLUTE Sender
 let tsi = 1;
 let oti = Default::default();
 let config = Default::default();
 let endpoint = UDPEndpoint::new(None, "224.0.0.1".to_string(), 3400);
 let mut sender = Sender::new(endpoint, tsi, &oti, &config);
 
 // Configure a fixed interval between 2 transfer start
 let carousel_mode = CarouselRepeatMode::IntervalBetweenStartTimes(std::time::Duration::from_secs(10));
 
 // Create an Object
 let mut obj = ObjectDesc::create_from_buffer(b"hello world".to_vec(), "text/plain",
 &url::Url::parse("file:///hello.txt").unwrap(), 1, 
 Some(carousel_mode), None, None, None, Cenc::Null, true, None, true).unwrap();
 
 // Add object(s) (files) to the FLUTE sender (priority queue 0)
 sender.add_object(0, obj);
 
 // Always call publish after adding objects when FDT publish mode is FullFDT
 sender.publish(SystemTime::now());
 
 // Send FLUTE packets over UDP/IP
 while sender.nb_objects() > 0  {
     if let Some(pkt) = sender.read(SystemTime::now()) {
         udp_socket.send(&pkt).unwrap();
     } else {
        std::thread::sleep(std::time::Duration::from_millis(1));
     }
 }

Modules§

core
Core module with low-level function
error
Handle errors
receiver
FLUTE Receivers to re-construct ALC/LCT packets to Objects (files)
sender
FLUTE Sender to convert Objects (files) to ALC/LCT packets