Skip to main content

flute/
lib.rs

1//! [![Rust](https://github.com/ypo/flute/actions/workflows/rust.yml/badge.svg)](https://github.com/ypo/flute/actions/workflows/rust.yml)
2//! [![Python](https://github.com/ypo/flute/actions/workflows/python.yml/badge.svg)](https://github.com/ypo/flute/actions/workflows/python.yml)
3//! [![Docs.rs](https://docs.rs/flute/badge.svg)](https://docs.rs/crate/flute/)
4//! [![Crates.io](https://img.shields.io/crates/v/flute)](https://crates.io/crates/flute/)
5//! [![Rust Dependency](https://deps.rs/repo/github/ypo/flute/status.svg)](https://deps.rs/repo/github/ypo/flute)
6//! [![codecov](https://codecov.io/gh/ypo/flute/branch/main/graph/badge.svg?token=P4KE639YU8)](https://codecov.io/gh/ypo/flute)
7//!
8//! # FLUTE - File Delivery over Unidirectional Transport
9//!
10//!
11//! Massively scalable multicast distribution solution
12//!
13//! The library implements a unidirectional file delivery, without the need of a return channel.
14//!
15//!
16//! # RFC
17//!
18//! This library implements the following RFCs
19//!
20//!| RFC      | Title      | Link       |
21//!| -------- | ---------- | -----------|
22//!| RFC 6726 | FLUTE - File Delivery over Unidirectional Transport      | <https://www.rfc-editor.org/rfc/rfc6726.html> |
23//!| RFC 5775 | Asynchronous Layered Coding (ALC) Protocol Instantiation | <https://www.rfc-editor.org/rfc/rfc5775.html> |
24//!| RFC 5661 | Layered Coding Transport (LCT) Building Block            | <https://www.rfc-editor.org/rfc/rfc5651>      |
25//!| RFC 5052 | Forward Error Correction (FEC) Building Block            | <https://www.rfc-editor.org/rfc/rfc5052>      |
26//!| RFC 5510 | Reed-Solomon Forward Error Correction (FEC) Schemes      | <https://www.rfc-editor.org/rfc/rfc5510.html> |
27//!| 3GPP TS 26.346 | Extended FLUTE FDT Schema (7.2.10)      | <https://www.etsi.org/deliver/etsi_ts/126300_126399/126346/17.03.00_60/ts_126346v170300p.pdf> |
28//!
29//! # Thread Safety
30//! 
31//! ## FLUTE Sender
32//! 
33//! The FLUTE Sender is designed to be safely shared between multiple threads.
34//! 
35//! ## FLUTE Receiver and Tokio Integration
36//! 
37//! Unlike the sender, the FLUTE Receiver **is not thread-safe** and cannot be shared between multiple threads. 
38//! To integrate it with Tokio, you must use `tokio::task::LocalSet`, which allows spawning tasks that require a single-threaded runtime. 
39//! 
40//! The following example demonstrates how to use the FLUTE Receiver with Tokio:
41//!
42//!```ignore
43//! use flute::receiver::{writer, MultiReceiver};
44//! use std::rc::Rc;
45//! 
46//!#[tokio::main]
47//!async fn main() {
48//!    let local = task::LocalSet::new();
49//!    // Run the local task set.
50//!    local.run_until(async move {
51//!        let nonsend_data = nonsend_data.clone();
52//!        task::spawn_local(async move {
53//!            let writer = Rc::new(writer::ObjectWriterFSBuilder::new(&std::path::Path::new("./flute_dir"), true).unwrap_or_else(|_| std::process::exit(0)));
54//!            let mut receiver = MultiReceiver::new(writer, None, false);
55//!            // ... run the receiver
56//!        }).await.unwrap();
57//!    }).await;
58//!}
59//!```
60//! # Command-Line Tools
61//!
62//! Install the sender and receiver binaries:
63//!
64//! ```bash
65//! cargo install flute --features cli
66//! ```
67//!
68//! ## Send files
69//!
70//! ```bash
71//! flute-sender --destination 224.0.0.1 --port 3400 file1.txt file2.txt
72//! ```
73//!
74//! ## Receive files
75//!
76//! ```bash
77//! flute-receiver --group 224.0.0.1 --port 3400 --interface 127.0.0.1 /path/to/dest
78//! ```
79//!
80//! # UDP/IP Multicast files sender
81//!
82//! Transfer files over a UDP/IP network
83//!
84//!```rust
85//! use flute::sender::Sender;
86//! use flute::sender::ObjectDesc;
87//! use flute::core::lct::Cenc;
88//! use flute::core::UDPEndpoint;
89//! use std::net::UdpSocket;
90//! use std::time::SystemTime;
91//!
92//! // Create UDP Socket
93//! let udp_socket = UdpSocket::bind("0.0.0.0:0").unwrap();
94//! udp_socket.connect("224.0.0.1:3400").expect("Connection failed");
95//!
96//! // Create FLUTE Sender
97//! let tsi = 1;
98//! let oti = Default::default();
99//! let config = Default::default();
100//! let endpoint = UDPEndpoint::new(None, "224.0.0.1".to_string(), 3400);
101//! let mut sender = Sender::new(endpoint, tsi, &oti, &config);
102//!
103//! // Add object(s) (files) to the FLUTE sender (priority queue 0)
104//! let obj = ObjectDesc::create_from_buffer(b"hello world".to_vec(), "text/plain",
105//! &url::Url::parse("file:///hello.txt").unwrap(), true, Default::default()).unwrap();
106//! sender.add_object(0, obj);
107//!
108//! // Always call publish after adding objects when FDT publish mode is FullFDT
109//! sender.publish(SystemTime::now());
110//!
111//! // Send FLUTE packets over UDP/IP
112//! while let Some(pkt) = sender.read(SystemTime::now()) {
113//!     udp_socket.send(&pkt).unwrap();
114//!     std::thread::sleep(std::time::Duration::from_millis(1));
115//! }
116//!
117//!```
118//! # UDP/IP Multicast files receiver
119//!
120//! Receive files from a UDP/IP network
121//!
122//!```
123//! use flute::receiver::{writer, MultiReceiver};
124//! use flute::core::UDPEndpoint;
125//! use std::net::UdpSocket;
126//! use std::time::SystemTime;
127//! use std::rc::Rc;
128//!
129//! // Create UDP/IP socket to receive FLUTE pkt
130//! let endpoint = UDPEndpoint::new(None, "224.0.0.1".to_string(), 3400);
131//! let udp_socket = UdpSocket::bind(format!("{}:{}", endpoint.destination_group_address, endpoint.port)).expect("Fail to bind");
132//!
133//! // Create a writer able to write received files to the filesystem
134//! let enable_md5_check = true;
135//! let writer = Rc::new(writer::ObjectWriterFSBuilder::new(&std::path::Path::new("./flute_dir"), enable_md5_check)
136//!     .unwrap_or_else(|_| std::process::exit(0)));
137//!
138//! // Create a multi-receiver capable of de-multiplexing several FLUTE sessions
139//! let mut receiver = MultiReceiver::new(writer, None, false);
140//!
141//! // Receive pkt from UDP/IP socket and push it to the FLUTE receiver
142//! let mut buf = [0; 2048];
143//! loop {
144//!     let (n, _src) = udp_socket.recv_from(&mut buf).expect("Failed to receive data");
145//!     let now = SystemTime::now();
146//!     receiver.push(&endpoint, &buf[..n], now).unwrap();
147//!     receiver.cleanup(now);
148//! }
149//!```
150//! # Application-Level Forward Erasure Correction (AL-FEC)
151//!
152//! The following error recovery algorithms are supported
153//!
154//! - [X] No-code
155//! - [X] Reed-Solomon GF 2^8  
156//! - [X] Reed-Solomon GF 2^8 Under Specified
157//! - [ ] Reed-Solomon GF 2^16  
158//! - [ ] Reed-Solomon GF 2^m  
159//! - [X] RaptorQ  
160//! - [X] Raptor
161//!
162//! The `Oti` module provides an implementation of the Object Transmission Information (OTI)
163//! used to configure Forward Error Correction (FEC) encoding in the FLUTE protocol.
164//!
165//!```rust
166//! use flute::sender::Sender;
167//! use flute::core::Oti;
168//! use flute::core::UDPEndpoint;
169//!
170//! // Reed Solomon 2^8 with encoding blocks composed of  
171//! // 60 source symbols and 4 repair symbols of 1424 bytes per symbol
172//! let endpoint = UDPEndpoint::new(None, "224.0.0.1".to_string(), 3400);
173//! let oti = Oti::new_reed_solomon_rs28(1424, 60, 4).unwrap();
174//! let mut sender = Sender::new(endpoint, 1, &oti, &Default::default());
175//!```
176//!
177//! # Content Encoding (CENC)
178//!
179//! The following schemes are supported during the transmission/reception
180//!
181//! - [x] Null (no compression)
182//! - [x] Deflate
183//! - [x] Zlib
184//! - [x] Gzip
185//!
186//! # Files multiplex / Blocks interleave
187//!
188//! The FLUTE Sender is able to transfer multiple files in parallel by interleaving packets from each file. For example:
189//!
190//! **Pkt file1** -> Pkt file2 -> Pkt file3 -> **Pkt file1** -> Pkt file2 -> Pkt file3 ...
191//!
192//! The Sender can interleave blocks within a single file.  
193//! The following example shows Encoding Symbols (ES) from different blocks (B) are interleaved. For example:  
194//!
195//! **(B 1,ES 1)**->(B 2,ES 1)->(B 3,ES 1)->**(B 1,ES 2)**->(B 2,ES 2)...
196//!
197//! To configure the multiplexing, use the `Config` struct as follows:
198//!
199//!```rust
200//! use flute::sender::Sender;
201//! use flute::sender::Config;
202//! use flute::sender::PriorityQueue;
203//! use flute::core::UDPEndpoint;
204//!
205//! let mut config = Config {
206//!     // Interleave a maximum of 3 blocks within each file
207//!     interleave_blocks: 3,
208//!     ..Default::default()
209//! };
210//!
211//! // Interleave a maximum of 3 files in priority queue '0'
212//! config.set_priority_queue(PriorityQueue::HIGHEST, PriorityQueue::new(3));
213//!
214//! let endpoint = UDPEndpoint::new(None, "224.0.0.1".to_string(), 3400);
215//! let mut sender = Sender::new(endpoint, 1, &Default::default(), &config);
216//!```
217//!
218//! # Priority Queues
219//!
220//! FLUTE sender can be configured with multiple queues, each having a different priority level.  
221//! Files in higher priority queues are always transferred before files in lower priority queues.  
222//! Transfer of files in lower priority queues is paused while there are files to be transferred in higher priority queues.  
223//!
224//!```rust
225//! 
226//! use flute::sender::Sender;
227//! use flute::sender::Config;
228//! use flute::sender::PriorityQueue;
229//! use flute::core::UDPEndpoint;
230//! use flute::sender::ObjectDesc;
231//! use flute::core::lct::Cenc;
232//!
233//! // Create a default configuration
234//! let mut config: flute::sender::Config = Default::default();
235//!
236//! // Configure the HIGHEST priority queue with a capacity of 3 simultaneous file transfer
237//! config.set_priority_queue(PriorityQueue::HIGHEST, PriorityQueue::new(3));
238//!
239//! // Configure the LOW priority queue with a capacity of 1 file transfer at a time
240//! config.set_priority_queue(PriorityQueue::LOW, PriorityQueue::new(1));
241//!
242//! let endpoint = UDPEndpoint::new(None, "224.0.0.1".to_string(), 3400);
243//! let mut sender = Sender::new(endpoint, 1, &Default::default(), &config);
244//!
245//! // Create an ObjectDesc for a low priority file
246//! let low_priority_obj = ObjectDesc::create_from_buffer(b"low priority".to_vec(), "text/plain",
247//! &url::Url::parse("file:///low_priority.txt").unwrap(), true, Default::default()).unwrap();
248//!
249//! // Create an ObjectDesc for a high priority file
250//! let high_priority_obj = ObjectDesc::create_from_buffer(b"high priority".to_vec(), "text/plain",
251//! &url::Url::parse("file:///high_priority.txt").unwrap(), true, Default::default()).unwrap();
252//!
253//! // Put Object to the low priority queue
254//! sender.add_object(PriorityQueue::LOW, low_priority_obj);
255//!
256//! // Put Object to the high priority queue
257//! sender.add_object(PriorityQueue::HIGHEST, high_priority_obj);
258//!```
259//! 
260//! # Bitrate Control
261//!The FLUTE library does not provide a built-in bitrate control mechanism. 
262//! Users are responsible for controlling the bitrate by sending packets at a specific rate. 
263//! However, the library offers a way to control the target transfer duration or the target transfer end time for each file individually.
264//!
265//!To ensure proper functionality, the target transfer mechanism requires that the overall bitrate is sufficiently high.
266//! 
267//! ## Target Transfer Duration
268//! 
269//! The sender will attempt to transfer the file within the specified duration.
270//! 
271//!```rust
272//! 
273//! use flute::sender::Sender;
274//! use flute::sender::ObjectDesc;
275//! use flute::sender::TargetAcquisition;
276//! use flute::sender::TransferConfig;
277//! use flute::core::lct::Cenc;
278//! use flute::core::UDPEndpoint;
279//! use std::net::UdpSocket;
280//! use std::time::SystemTime;
281//!
282//! // Create UDP Socket
283//! let udp_socket = UdpSocket::bind("0.0.0.0:0").unwrap();
284//! udp_socket.connect("224.0.0.1:3400").expect("Connection failed");
285//!
286//! // Create FLUTE Sender
287//! let tsi = 1;
288//! let oti = Default::default();
289//! let config = Default::default();
290//! let endpoint = UDPEndpoint::new(None, "224.0.0.1".to_string(), 3400);
291//! let mut sender = Sender::new(endpoint, tsi, &oti, &config);
292//!
293//! // Create an Object
294//! 
295//! // Set the Target Transfer Duration of this object to 2 seconds
296//! let target_acquisition = TargetAcquisition::WithinDuration(std::time::Duration::from_secs(2));
297//! let transfer_config = TransferConfig::builder().target_acquisition(target_acquisition).build();
298//! 
299//! let mut obj = ObjectDesc::create_from_buffer(b"hello world".to_vec(), "text/plain",
300//! &url::Url::parse("file:///hello.txt").unwrap(), true, transfer_config).unwrap();
301//! 
302//! 
303//! // Add object(s) (files) to the FLUTE sender (priority queue 0)
304//! sender.add_object(0, obj);
305//!
306//! // Always call publish after adding objects when FDT publish mode is FullFDT when FDT publish mode is FullFDT
307//! sender.publish(SystemTime::now());
308//!
309//! // Send FLUTE packets over UDP/IP
310//! while sender.nb_objects() > 0  {
311//!     if let Some(pkt) = sender.read(SystemTime::now()) {
312//!         udp_socket.send(&pkt).unwrap();
313//!     } else {
314//!        std::thread::sleep(std::time::Duration::from_millis(1));
315//!     }
316//! }
317//!```
318//! 
319//! ## Target Time to End Transfer
320//! 
321//! The sender will try to finish the file at the specified time.
322//! 
323//!```rust
324//! 
325//! use flute::sender::Sender;
326//! use flute::sender::ObjectDesc;
327//! use flute::sender::TargetAcquisition;
328//! use flute::sender::TransferConfig;
329//! use flute::core::lct::Cenc;
330//! use flute::core::UDPEndpoint;
331//! use std::net::UdpSocket;
332//! use std::time::SystemTime;
333//!
334//! // Create UDP Socket
335//! let udp_socket = UdpSocket::bind("0.0.0.0:0").unwrap();
336//! udp_socket.connect("224.0.0.1:3400").expect("Connection failed");
337//!
338//! // Create FLUTE Sender
339//! let tsi = 1;
340//! let oti = Default::default();
341//! let config = Default::default();
342//! let endpoint = UDPEndpoint::new(None, "224.0.0.1".to_string(), 3400);
343//! let mut sender = Sender::new(endpoint, tsi, &oti, &config);
344//!
345//! // Create an Object
346//! 
347//! // Set the Target Transfer End Time of this object to 10 seconds from now
348//! let target_end_time = TargetAcquisition::WithinTime(SystemTime::now() + std::time::Duration::from_secs(10));
349//! let transfer_config = TransferConfig::builder().target_acquisition(target_end_time).build();
350//! 
351//! let mut obj = ObjectDesc::create_from_buffer(b"hello world".to_vec(), "text/plain",
352//! &url::Url::parse("file:///hello.txt").unwrap(), true, transfer_config).unwrap();
353//!
354//! 
355//! // Add object(s) (files) to the FLUTE sender (priority queue 0)
356//! sender.add_object(0, obj);
357//!
358//! // Always call publish after adding objects when FDT publish mode is FullFDT
359//! sender.publish(SystemTime::now());
360//! 
361//! // Send FLUTE packets over UDP/IP
362//! while sender.nb_objects() > 0  {
363//!     if let Some(pkt) = sender.read(SystemTime::now()) {
364//!         udp_socket.send(&pkt).unwrap();
365//!     } else {
366//!        std::thread::sleep(std::time::Duration::from_millis(1));
367//!     }
368//! }
369//!```
370//! 
371//! ## Carouseling
372//! 
373//! The FLUTE library supports carouseling, a mechanism that continuously re-transmits files in a loop. This is useful for scenarios such as broadcasting where a receiver may join at any time and still receive the full file.
374//! 
375//! A file remains in the carousel and is re-transferred repeatedly until explicitly removed. The repetition behavior is controlled by the `CarouselRepeatMode` when creating an object,  which offers two modes:
376//! 
377//! 
378//! ### Fixed Delay After Each Transfer
379//! 
380//! This mode waits for a fixed delay after the end of each transfer before starting the next one using `CarouselRepeatMode::DelayBetweenTransfers`.
381//! 
382//!```ignore
383//! | Transfer Object | Fixed Delay | Transfer Object | Fixed Delay | ...
384//!```
385//! 
386//!```no_run
387//! use flute::sender::Sender;
388//! use flute::sender::ObjectDesc;
389//! use flute::sender::CarouselRepeatMode;
390//! use flute::sender::TransferConfig;
391//! use flute::core::UDPEndpoint;
392//! use std::net::UdpSocket;
393//! use std::time::SystemTime;
394//! 
395//! // Create UDP Socket
396//! let udp_socket = UdpSocket::bind("0.0.0.0:0").unwrap();
397//! udp_socket.connect("224.0.0.1:3400").expect("Connection failed");
398//! 
399//! // Create FLUTE Sender
400//! let tsi = 1;
401//! let oti = Default::default();
402//! let config = Default::default();
403//! let endpoint = UDPEndpoint::new(None, "224.0.0.1".to_string(), 3400);
404//! let mut sender = Sender::new(endpoint, tsi, &oti, &config);
405//! 
406//! // 10s delay after each transfer
407//! let carousel_mode = CarouselRepeatMode::DelayBetweenTransfers(std::time::Duration::from_secs(10));
408//! let transfer_config = TransferConfig::builder().carousel_mode(carousel_mode).build();
409//! 
410//! // Create an Object
411//! let mut obj = ObjectDesc::create_from_buffer(b"hello world".to_vec(), "text/plain",
412//! &url::Url::parse("file:///hello.txt").unwrap(), true, transfer_config).unwrap();
413//! 
414//! // Add object(s) (files) to the FLUTE sender (priority queue 0)
415//! sender.add_object(0, obj);
416//! 
417//! // Always call publish after adding objects when FDT publish mode is FullFDT
418//! sender.publish(SystemTime::now());
419//! 
420//! // Send FLUTE packets over UDP/IP
421//! while sender.nb_objects() > 0  {
422//!     if let Some(pkt) = sender.read(SystemTime::now()) {
423//!         udp_socket.send(&pkt).unwrap();
424//!     } else {
425//!        std::thread::sleep(std::time::Duration::from_millis(1));
426//!     }
427//! }
428//!```
429//! 
430//! ### Fix interval between 2 transfer start
431//! 
432//! `CarouselRepeatMode::IntervalBetweenStartTimes` : This mode ensures each new transfer starts at a fixed interval, regardless of the duration of the previous one.
433//! 
434//! > ⚠️ **Note**: If the transfer of an object takes longer than the specified interval, the actual interval will be longer.  
435//! > It is the application's responsibility to ensure that the FLUTE channel bitrate is high enough to meet the interval timing.
436//! 
437//!```ignore
438//! | Transfer Object 1 | Adaptative Delay | Transfer Object 1 | Adaptative Delay |
439//! | ------------Fixed Interval-----------| ----------- Fixed Interval-----------|
440//!```
441//! 
442//!```no_run
443//! use flute::sender::Sender;
444//! use flute::sender::ObjectDesc;
445//! use flute::sender::CarouselRepeatMode;
446//! use flute::sender::TransferConfig;
447//! use flute::core::UDPEndpoint;
448//! use std::net::UdpSocket;
449//! use std::time::SystemTime;
450//! 
451//! // Create UDP Socket
452//! let udp_socket = UdpSocket::bind("0.0.0.0:0").unwrap();
453//! udp_socket.connect("224.0.0.1:3400").expect("Connection failed");
454//! 
455//! // Create FLUTE Sender
456//! let tsi = 1;
457//! let oti = Default::default();
458//! let config = Default::default();
459//! let endpoint = UDPEndpoint::new(None, "224.0.0.1".to_string(), 3400);
460//! let mut sender = Sender::new(endpoint, tsi, &oti, &config);
461//! 
462//! // Configure a fixed interval between 2 transfer start
463//! let carousel_mode = CarouselRepeatMode::IntervalBetweenStartTimes(std::time::Duration::from_secs(10));
464//! let transfer_config = TransferConfig::builder().carousel_mode(carousel_mode).build();
465//! 
466//! // Create an Object
467//! let mut obj = ObjectDesc::create_from_buffer(b"hello world".to_vec(), "text/plain",
468//! &url::Url::parse("file:///hello.txt").unwrap(), true, transfer_config).unwrap();
469//! 
470//! // Add object(s) (files) to the FLUTE sender (priority queue 0)
471//! sender.add_object(0, obj);
472//! 
473//! // Always call publish after adding objects when FDT publish mode is FullFDT
474//! sender.publish(SystemTime::now());
475//! 
476//! // Send FLUTE packets over UDP/IP
477//! while sender.nb_objects() > 0  {
478//!     if let Some(pkt) = sender.read(SystemTime::now()) {
479//!         udp_socket.send(&pkt).unwrap();
480//!     } else {
481//!        std::thread::sleep(std::time::Duration::from_millis(1));
482//!     }
483//! }
484//!```
485//! 
486
487#![deny(missing_docs)]
488#![deny(missing_debug_implementations)]
489#![cfg_attr(test, deny(warnings))]
490
491mod common;
492mod fec;
493mod tools;
494
495pub mod receiver;
496pub mod sender;
497pub use crate::tools::error;
498
499/// Core module with low-level function
500pub mod core {
501
502    /// ALC packets
503    pub mod alc {
504        pub use crate::common::alc::get_sender_current_time;
505        pub use crate::common::alc::parse_alc_pkt;
506        pub use crate::common::alc::parse_payload_id;
507        pub use crate::common::alc::AlcPkt;
508        pub use crate::common::alc::PayloadID;
509    }
510
511    /// LCT packets
512    pub mod lct {
513        pub use crate::common::lct::get_ext;
514        pub use crate::common::lct::inc_hdr_len;
515        pub use crate::common::lct::push_lct_header;
516        pub use crate::common::lct::Cenc;
517        pub use crate::common::lct::LCTHeader;
518    }
519
520    pub use crate::common::oti::FECEncodingID;
521    pub use crate::common::oti::Oti;
522    pub use crate::common::udpendpoint::UDPEndpoint;
523}
524
525#[cfg(feature = "python")]
526mod py;
527
528#[cfg(test)]
529mod tests {
530    pub fn init() {
531        std::env::set_var("RUST_LOG", "debug");
532        env_logger::builder().is_test(true).try_init().ok();
533    }
534}