flute/receiver/
receiver.rs

1use super::fdtreceiver;
2use super::fdtreceiver::FdtReceiver;
3use super::objectreceiver;
4use super::objectreceiver::ObjectReceiver;
5use super::writer::{ObjectMetadata, ObjectWriterBuilder};
6use crate::common::udpendpoint::UDPEndpoint;
7use crate::common::{alc, lct};
8use crate::receiver::writer::ObjectCacheControl;
9use crate::tools::error::FluteError;
10use crate::tools::error::Result;
11use std::collections::{BTreeMap, BTreeSet, HashMap, VecDeque};
12use std::rc::Rc;
13use std::time::Duration;
14use std::time::Instant;
15use std::time::SystemTime;
16
17/// Configuration of the FLUTE Receiver
18///
19/// The FLUTE receiver uses the `Config` struct to specify various settings and timeouts for the FLUTE session.
20///
21#[derive(Clone, Copy, PartialEq, Debug)]
22pub struct Config {
23    /// Max number of objects with error that the receiver is keeping track of.
24    /// Packets received for an object in error state are discarded
25    pub max_objects_error: usize,
26    /// The receiver expires if no data has been received before this timeout
27    /// `None` the receiver never expires except if a close session packet is received
28    pub session_timeout: Option<Duration>,
29    /// Objects expire if no data has been received before this timeout
30    /// `None` Objects never expires, not recommended as object that are not fully reconstructed might continue to consume memory for an finite amount of time.
31    pub object_timeout: Option<Duration>,
32    /// Maximum cache size that can be allocated to received an object. Default is 10MB.
33    pub object_max_cache_size: Option<usize>,
34    /// When set to `true`, the receiver will only reconstruct each object once.
35    /// If the same object is transferred again, it will be automatically discarded.
36    pub object_receive_once: bool,
37    /// When set to `true`, the receiver will check the expiration date of the FDT.
38    pub enable_fdt_expiration_check: bool,
39}
40
41impl Default for Config {
42    fn default() -> Self {
43        Self {
44            max_objects_error: 0,
45            session_timeout: None,
46            object_timeout: Some(Duration::from_secs(10)),
47            object_max_cache_size: None,
48            object_receive_once: true,
49            enable_fdt_expiration_check: true,
50        }
51    }
52}
53
54#[derive(Debug, Clone)]
55pub struct ObjectCompletedMeta {
56    metadata: ObjectMetadata,
57}
58
59///
60/// FLUTE `Receiver` able to re-construct objects from ALC/LCT packets
61///
62#[derive(Debug)]
63pub struct Receiver {
64    tsi: u64,
65    objects: HashMap<u128, Box<ObjectReceiver>>,
66    objects_completed: BTreeMap<u128, ObjectCompletedMeta>,
67    objects_error: BTreeSet<u128>,
68    fdt_receivers: BTreeMap<u32, Box<FdtReceiver>>,
69    fdt_current: VecDeque<Box<FdtReceiver>>,
70    writer: Rc<dyn ObjectWriterBuilder>,
71    config: Config,
72    last_activity: Instant,
73    closed_is_imminent: bool,
74    endpoint: UDPEndpoint,
75    last_timestamp: Option<SystemTime>,
76}
77
78impl Receiver {
79    ///
80    /// Create a new FLUTE Receiver
81    /// # Arguments
82    ///
83    /// * `endpoint` - The `UDPEndpoint` from where the data are received.
84    /// * `tsi` - The Transport Session Identifier of this FLUTE Session.
85    /// * `writer` - An `ObjectWriterBuilder` used for writing received objects.
86    /// * `config` - Configuration for the `Receiver`.
87    ///
88    /// # Returns
89    ///
90    /// A new `Receiver` instance.
91    ///
92    pub fn new(
93        endpoint: &UDPEndpoint,
94        tsi: u64,
95        writer: Rc<dyn ObjectWriterBuilder>,
96        config: Option<Config>,
97    ) -> Self {
98        Self {
99            tsi,
100            objects: HashMap::new(),
101            fdt_receivers: BTreeMap::new(),
102            fdt_current: VecDeque::new(),
103            writer,
104            objects_completed: BTreeMap::new(),
105            objects_error: BTreeSet::new(),
106            config: config.unwrap_or_default(),
107            last_activity: Instant::now(),
108            closed_is_imminent: false,
109            endpoint: endpoint.clone(),
110            last_timestamp: None,
111        }
112    }
113
114    /// Check if the receiver is expired.
115    ///
116    /// This method checks whether the receiver is expired and returns `true` if it is.
117    /// This indicates that the receiver should be destroyed.
118    ///
119    /// # Returns
120    ///
121    /// `true` if the receiver is expired, otherwise `false`.
122    ///
123    pub fn is_expired(&self) -> bool {
124        if self.config.session_timeout.is_none() {
125            return false;
126        }
127
128        log::debug!("Check elapsed {:?}", self.last_activity.elapsed());
129        self.last_activity
130            .elapsed()
131            .gt(self.config.session_timeout.as_ref().unwrap())
132    }
133
134    /// Get the number of objects being received.
135    ///
136    /// This method returns the number of objects that are currently being received by the `Receiver`.
137    ///
138    /// # Returns
139    ///
140    /// The number of objects being received.
141    ///
142    pub fn nb_objects(&self) -> usize {
143        self.objects.len()
144    }
145
146    /// Get the number of objects in error state.
147    ///
148    /// This method returns the number of objects that are currently in an error state
149    /// in the `Receiver`.
150    ///
151    /// # Returns
152    ///
153    /// The number of objects in error state.
154    ///
155    pub fn nb_objects_error(&self) -> usize {
156        self.objects_error.len()
157    }
158
159    /// Free objects that timed out.
160    ///
161    /// This method performs cleanup operations on the `Receiver`, freeing objects that
162    /// have timed out.
163    ///
164    /// # Arguments
165    ///
166    /// * `now` - The current `SystemTime` to use for time-related operations.
167    ///
168    pub fn cleanup(&mut self, now: std::time::SystemTime) {
169        self.last_timestamp = Some(now);
170        self.cleanup_objects();
171        self.cleanup_fdt(now);
172    }
173
174    fn cleanup_fdt(&mut self, now: std::time::SystemTime) {
175        self.fdt_receivers.iter_mut().for_each(|fdt| {
176            fdt.1.update_expired_state(now);
177        });
178
179        self.fdt_receivers.retain(|_, fdt| {
180            let state = fdt.state();
181            state == fdtreceiver::FDTState::Complete || state == fdtreceiver::FDTState::Receiving
182        });
183    }
184
185    fn cleanup_objects(&mut self) {
186        if self.config.object_timeout.is_none() {
187            return;
188        }
189        let object_timeout = self.config.object_timeout.as_ref().unwrap();
190        let now = Instant::now();
191
192        let expired_objects_toi: std::collections::HashSet<u128> = self
193            .objects
194            .iter()
195            .filter_map(|(key, object)| {
196                let duration = object.last_activity_duration_since(now);
197                if duration.gt(object_timeout) {
198                    log::warn!(
199                        "Object Expired ! tsi={} toi={} state : {:?} 
200                        location: {:?} attached={:?} blocks completed={}/{} last activity={:?} max={:?} 
201                        transfer_length={:?} byte_left={:?}",
202                        object.tsi,
203                        object.toi,
204                        object.state,
205                        object.content_location.as_ref().map(|u| u.to_string()),
206                        object.fdt_instance_id,
207                        object.nb_block_completed(),
208                        object.nb_block(),
209                        duration,
210                        object_timeout,
211                        object.transfer_length,
212                        object.byte_left()
213                    );
214                    Some(*key)
215                } else {
216                    None
217                }
218            })
219            .collect();
220
221        for toi in expired_objects_toi {
222            self.objects_error.remove(&toi);
223            self.objects.remove(&toi);
224        }
225    }
226
227    /// Push an ALC/LCT packet to the `Receiver`.
228    ///
229    /// This method is used to push data (the payload of a UDP/IP packet) to the `Receiver`.
230    ///
231    /// # Arguments
232    ///
233    /// * `data` - The payload of the UDP/IP packet.
234    /// * `now` - The current `SystemTime` to use for time-related operations.
235    ///
236    /// # Returns
237    ///
238    /// A `Result` indicating success (`Ok`) or an error (`Err`).
239    ///
240    /// # Errors
241    ///
242    /// Returns as error if the packet is not a valid
243    ///
244    pub fn push_data(&mut self, data: &[u8], now: std::time::SystemTime) -> Result<()> {
245        self.last_timestamp = Some(now);
246        let alc = alc::parse_alc_pkt(data)?;
247        if alc.lct.tsi != self.tsi {
248            return Ok(());
249        }
250
251        self.push(&alc, now)
252    }
253
254    /// Push ALC/LCT packets to the `Receiver`.
255    ///
256    /// This method is used to push ALC/LCT packets to the `Receiver`.
257    ///
258    /// # Arguments
259    ///
260    /// * `alc_pkt` - The ALC/LCT packet to push.
261    /// * `now` - The current `SystemTime` to use for time-related operations.
262    ///
263    /// # Returns
264    ///
265    /// A `Result` indicating success (`Ok`) or an error (`Err`).
266    ///
267    pub fn push(&mut self, alc_pkt: &alc::AlcPkt, now: std::time::SystemTime) -> Result<()> {
268        debug_assert!(self.tsi == alc_pkt.lct.tsi);
269        self.last_activity = Instant::now();
270        self.last_timestamp = Some(now);
271
272        if alc_pkt.lct.close_session {
273            log::info!("Close session");
274            self.closed_is_imminent = true;
275        }
276
277        match alc_pkt.lct.toi {
278            toi if toi == lct::TOI_FDT => self.push_fdt_obj(alc_pkt, now),
279            _ => self.push_obj(alc_pkt, now),
280        }
281    }
282
283    fn is_fdt_received(&self, fdt_instance_id: u32) -> bool {
284        self.fdt_current
285            .iter()
286            .any(|fdt| fdt.fdt_id == fdt_instance_id)
287    }
288
289    fn push_fdt_obj(&mut self, alc_pkt: &alc::AlcPkt, now: std::time::SystemTime) -> Result<()> {
290        if alc_pkt.fdt_info.is_none() {
291            if alc_pkt.lct.close_object {
292                return Ok(());
293            }
294
295            if alc_pkt.lct.close_session {
296                return Ok(());
297            }
298
299            return Err(FluteError::new("FDT pkt received without FDT Extension"));
300        }
301        let fdt_instance_id = alc_pkt
302            .fdt_info
303            .as_ref()
304            .map(|f| f.fdt_instance_id)
305            .unwrap();
306
307        if self.config.object_receive_once && self.is_fdt_received(fdt_instance_id) {
308            return Ok(());
309        }
310
311        {
312            let fdt_receiver = self
313                .fdt_receivers
314                .entry(fdt_instance_id)
315                .or_insert(Box::new(FdtReceiver::new(
316                    &self.endpoint,
317                    self.tsi,
318                    fdt_instance_id,
319                    self.config.enable_fdt_expiration_check,
320                    now,
321                )));
322
323            if fdt_receiver.state() != fdtreceiver::FDTState::Receiving {
324                log::warn!(
325                    "TSI={} FDT state is {:?}, bug ?",
326                    self.tsi,
327                    fdt_receiver.state()
328                );
329                return Ok(());
330            }
331
332            fdt_receiver.push(alc_pkt, now);
333
334            if fdt_receiver.state() == fdtreceiver::FDTState::Complete {
335                fdt_receiver.update_expired_state(now);
336            }
337
338            match fdt_receiver.state() {
339                fdtreceiver::FDTState::Receiving => return Ok(()),
340                fdtreceiver::FDTState::Complete => {}
341                fdtreceiver::FDTState::Error => return Err(FluteError::new("Fail to decode FDT")),
342                fdtreceiver::FDTState::Expired => {
343                    let expiration = fdt_receiver.get_expiration_time().unwrap_or(now);
344                    let server_time = fdt_receiver.get_server_time(now);
345
346                    let expiration: chrono::DateTime<chrono::Utc> = expiration.into();
347                    let server_time: chrono::DateTime<chrono::Utc> = server_time.into();
348
349                    log::warn!(
350                        "TSI={} FDT has been received but is already expired expiration time={} server time={}",
351                        self.tsi,
352                        expiration.to_rfc3339(),
353                        server_time.to_rfc3339()
354                    );
355                    return Ok(());
356                }
357            };
358        }
359
360        if let Some(previous_fdt) = self.fdt_current.front() {
361            if previous_fdt.fdt_id + 1 != fdt_instance_id && previous_fdt.fdt_id != fdt_instance_id
362            {
363                log::warn!(
364                    "TSI={} Previous FDT ID {} was current is {} is there an FDT missing ?",
365                    self.tsi,
366                    previous_fdt.fdt_id,
367                    fdt_instance_id
368                );
369            }
370        }
371
372        let fdt_current = self.fdt_receivers.remove(&fdt_instance_id);
373        if let Some(mut fdt_current) = fdt_current {
374            if let Some(xml) = fdt_current.fdt_xml_str() {
375                let expiration_date = fdt_current
376                    .fdt_instance()
377                    .map(|inst| inst.get_expiration_date().unwrap_or(now))
378                    .unwrap_or(now);
379
380                let meta = fdt_current.fdt_meta().unwrap();
381                let transfer_duration = now
382                    .duration_since(fdt_current.reception_start_time)
383                    .unwrap_or(std::time::Duration::new(0, 0));
384
385                self.writer.fdt_received(
386                    &self.endpoint,
387                    &self.tsi,
388                    &xml,
389                    expiration_date,
390                    meta,
391                    transfer_duration,
392                    now,
393                    fdt_current.ext_time,
394                );
395            }
396            self.fdt_current.push_front(fdt_current);
397            self.attach_latest_fdt_to_objects(now);
398            self.gc_object_completed();
399            self.update_expiration_date_of_completed_objects_using_latest_fdt(now);
400
401            if self.fdt_current.len() > 10 {
402                self.fdt_current.pop_back();
403            }
404        }
405
406        Ok(())
407    }
408
409    fn attach_latest_fdt_to_objects(&mut self, now: std::time::SystemTime) -> Option<()> {
410        let fdt = self.fdt_current.front_mut()?;
411        let fdt_id = fdt.fdt_id;
412        let fdt_instance = fdt.fdt_instance()?;
413        log::debug!("TSI={} Attach FDT id {}", self.tsi, fdt_id);
414        let mut check_state = Vec::new();
415        for obj in &mut self.objects {
416            let success = obj.1.attach_fdt(fdt_id, fdt_instance, now);
417            if success {
418                check_state.push(*obj.0);
419            }
420        }
421
422        for toi in check_state {
423            self.check_object_state(toi);
424        }
425
426        Some(())
427    }
428
429    fn update_expiration_date_of_completed_objects_using_latest_fdt(
430        &mut self,
431        now: std::time::SystemTime,
432    ) -> Option<()> {
433        let fdt = self.fdt_current.front_mut()?;
434        let fdt_instance = fdt.fdt_instance()?;
435        let files = fdt_instance.file.as_ref()?;
436        let fdt_expiration_date = fdt_instance.get_expiration_date();
437
438        for file in files {
439            let toi: u128 = file.toi.parse().unwrap_or_default();
440            let cache_control = file.get_object_cache_control(fdt_expiration_date);
441            if let Some(obj) = self.objects_completed.get_mut(&toi) {
442                if obj.metadata.cache_control.should_update(cache_control) {
443                    obj.metadata.cache_control = cache_control;
444                    self.writer.update_cache_control(
445                        &self.endpoint,
446                        &self.tsi,
447                        &toi,
448                        &obj.metadata,
449                        now,
450                    );
451                }
452            }
453        }
454
455        Some(())
456    }
457
458    fn push_obj(&mut self, pkt: &alc::AlcPkt, now: SystemTime) -> Result<()> {
459        if self.objects_completed.contains_key(&pkt.lct.toi) {
460            if self.config.object_receive_once {
461                return Ok(());
462            }
463
464            let payload_id = alc::get_fec_inline_payload_id(pkt)?;
465            if payload_id.sbn == 0 && payload_id.esi == 0 {
466                self.objects_completed.remove(&pkt.lct.toi);
467            } else {
468                return Ok(());
469            }
470        }
471        if self.objects_error.contains(&pkt.lct.toi) {
472            let payload_id = alc::get_fec_inline_payload_id(pkt)?;
473            if payload_id.sbn == 0 && payload_id.esi == 0 {
474                log::warn!("Re-download object after errors");
475                self.objects_error.remove(&pkt.lct.toi);
476            } else {
477                return Ok(());
478            }
479        }
480
481        let mut obj = self.objects.get_mut(&pkt.lct.toi);
482        if obj.is_none() {
483            self.create_obj(&pkt.lct.toi, now);
484            obj = self.objects.get_mut(&pkt.lct.toi);
485        }
486
487        let obj = match obj {
488            Some(obj) => obj.as_mut(),
489            None => return Err(FluteError::new("Bug ? Object not found")),
490        };
491
492        obj.push(pkt, now);
493        self.check_object_state(pkt.lct.toi);
494
495        Ok(())
496    }
497
498    fn check_object_state(&mut self, toi: u128) {
499        let obj = self.objects.get_mut(&toi);
500        if obj.is_none() {
501            return;
502        }
503        let mut remove_object = false;
504
505        {
506            let obj = obj.unwrap();
507
508            match obj.state {
509                objectreceiver::State::Receiving => {}
510                objectreceiver::State::Completed => {
511                    remove_object = true;
512                    log::debug!(
513                        "Object state is completed {:?} tsi={} toi={}",
514                        self.endpoint,
515                        self.tsi,
516                        obj.toi
517                    );
518
519                    if obj.cache_control != Some(ObjectCacheControl::NoCache) {
520                        self.objects_completed.insert(
521                            obj.toi,
522                            ObjectCompletedMeta {
523                                metadata: obj.create_meta(),
524                            },
525                        );
526                    } else {
527                        if obj.cache_control.is_none() {
528                            log::error!("No cache expiration date for {:?}", obj.content_location);
529                        }
530                    }
531                }
532                objectreceiver::State::Interrupted => {
533                    log::debug!(
534                        "Object transmission interrupted tsi={} toi={}",
535                        self.tsi,
536                        obj.toi
537                    );
538                    remove_object = true;
539                    self.objects_error.insert(toi);
540                    self.gc_object_error();
541                }
542                objectreceiver::State::Error => {
543                    log::error!("Object in error state tsi={} toi={}", self.tsi, obj.toi);
544                    remove_object = true;
545                    self.objects_error.insert(toi);
546                    self.gc_object_error();
547                }
548            }
549        }
550
551        if remove_object {
552            log::debug!(
553                "Remove object {:?} tsi={} toi={}",
554                self.endpoint,
555                self.tsi,
556                toi
557            );
558            self.objects.remove(&toi);
559        }
560    }
561
562    fn gc_object_completed(&mut self) {
563        let current_fdt = match self.fdt_current.front_mut() {
564            Some(fdt) => fdt,
565            None => return,
566        };
567
568        let instance = match current_fdt.fdt_instance() {
569            Some(instance) => instance,
570            None => return,
571        };
572
573        let before = self.objects_completed.len();
574        if let Some(files) = instance.file.as_ref() {
575            let current_tois: std::collections::HashSet<u128> = files
576                .iter()
577                .map(|file| file.toi.parse().unwrap_or(0))
578                .collect();
579            self.objects_completed
580                .retain(|toi, _meta| current_tois.contains(toi));
581        }
582        let after = self.objects_completed.len();
583        if before != after {
584            log::debug!("GC remove {} / {} objects", before - after, before);
585        }
586    }
587
588    fn gc_object_error(&mut self) {
589        while self.objects_error.len() > self.config.max_objects_error {
590            let toi = self.objects_error.pop_first().unwrap();
591            self.objects.remove(&toi);
592        }
593    }
594
595    fn create_obj(&mut self, toi: &u128, now: SystemTime) {
596        let mut obj = Box::new(ObjectReceiver::new(
597            &self.endpoint,
598            self.tsi,
599            toi,
600            None,
601            self.writer.clone(),
602            self.config
603                .object_max_cache_size
604                .unwrap_or(10 * 1024 * 1024),
605            now,
606        ));
607
608        let mut is_attached = false;
609        for (fdt_index, fdt) in (&mut self.fdt_current.iter_mut()).enumerate() {
610            let fdt_id = fdt.fdt_id;
611            fdt.update_expired_state(now);
612            if fdt.state() == fdtreceiver::FDTState::Complete {
613                if let Some(fdt_instance) = fdt.fdt_instance() {
614                    let success = obj.attach_fdt(fdt_id, fdt_instance, now);
615                    if success {
616                        is_attached = true;
617                        if fdt_index != 0 {
618                            log::warn!(
619                                "TSI={} TOI={} CL={:?} Attaching an object to an FDT that is not the latest (index={}) ",
620                                self.tsi,
621                                obj.toi,
622                                obj.content_location,
623                                fdt_index
624                            );
625                        }
626
627                        break;
628                    }
629                }
630            }
631        }
632
633        if !is_attached {
634            log::warn!(
635                "Object received before the FDT TSI={} TOI={}",
636                self.tsi,
637                toi
638            );
639        }
640
641        self.objects.insert(*toi, obj);
642    }
643}