1use super::fdtreceiver;
2use super::fdtreceiver::FdtReceiver;
3use super::objectreceiver;
4use super::objectreceiver::ObjectReceiver;
5use super::writer::{ObjectMetadata, ObjectWriterBuilder};
6use crate::common::udpendpoint::UDPEndpoint;
7use crate::common::{alc, lct};
8use crate::receiver::writer::ObjectCacheControl;
9use crate::tools::error::FluteError;
10use crate::tools::error::Result;
11use std::collections::{BTreeMap, BTreeSet, HashMap, VecDeque};
12use std::rc::Rc;
13use std::time::Duration;
14use std::time::Instant;
15use std::time::SystemTime;
16
17#[derive(Clone, Copy, PartialEq, Debug)]
22pub struct Config {
23 pub max_objects_error: usize,
26 pub session_timeout: Option<Duration>,
29 pub object_timeout: Option<Duration>,
32 pub object_max_cache_size: Option<usize>,
34 pub object_receive_once: bool,
37 pub enable_fdt_expiration_check: bool,
39}
40
41impl Default for Config {
42 fn default() -> Self {
43 Self {
44 max_objects_error: 0,
45 session_timeout: None,
46 object_timeout: Some(Duration::from_secs(10)),
47 object_max_cache_size: None,
48 object_receive_once: true,
49 enable_fdt_expiration_check: true,
50 }
51 }
52}
53
54#[derive(Debug, Clone)]
55pub struct ObjectCompletedMeta {
56 metadata: ObjectMetadata,
57}
58
59#[derive(Debug)]
63pub struct Receiver {
64 tsi: u64,
65 objects: HashMap<u128, Box<ObjectReceiver>>,
66 objects_completed: BTreeMap<u128, ObjectCompletedMeta>,
67 objects_error: BTreeSet<u128>,
68 fdt_receivers: BTreeMap<u32, Box<FdtReceiver>>,
69 fdt_current: VecDeque<Box<FdtReceiver>>,
70 writer: Rc<dyn ObjectWriterBuilder>,
71 config: Config,
72 last_activity: Instant,
73 closed_is_imminent: bool,
74 endpoint: UDPEndpoint,
75 last_timestamp: Option<SystemTime>,
76}
77
78impl Receiver {
79 pub fn new(
93 endpoint: &UDPEndpoint,
94 tsi: u64,
95 writer: Rc<dyn ObjectWriterBuilder>,
96 config: Option<Config>,
97 ) -> Self {
98 Self {
99 tsi,
100 objects: HashMap::new(),
101 fdt_receivers: BTreeMap::new(),
102 fdt_current: VecDeque::new(),
103 writer,
104 objects_completed: BTreeMap::new(),
105 objects_error: BTreeSet::new(),
106 config: config.unwrap_or_default(),
107 last_activity: Instant::now(),
108 closed_is_imminent: false,
109 endpoint: endpoint.clone(),
110 last_timestamp: None,
111 }
112 }
113
114 pub fn is_expired(&self) -> bool {
124 if self.config.session_timeout.is_none() {
125 return false;
126 }
127
128 log::debug!("Check elapsed {:?}", self.last_activity.elapsed());
129 self.last_activity
130 .elapsed()
131 .gt(self.config.session_timeout.as_ref().unwrap())
132 }
133
134 pub fn nb_objects(&self) -> usize {
143 self.objects.len()
144 }
145
146 pub fn nb_objects_error(&self) -> usize {
156 self.objects_error.len()
157 }
158
159 pub fn cleanup(&mut self, now: std::time::SystemTime) {
169 self.last_timestamp = Some(now);
170 self.cleanup_objects();
171 self.cleanup_fdt(now);
172 }
173
174 fn cleanup_fdt(&mut self, now: std::time::SystemTime) {
175 self.fdt_receivers.iter_mut().for_each(|fdt| {
176 fdt.1.update_expired_state(now);
177 });
178
179 self.fdt_receivers.retain(|_, fdt| {
180 let state = fdt.state();
181 state == fdtreceiver::FDTState::Complete || state == fdtreceiver::FDTState::Receiving
182 });
183 }
184
185 fn cleanup_objects(&mut self) {
186 if self.config.object_timeout.is_none() {
187 return;
188 }
189 let object_timeout = self.config.object_timeout.as_ref().unwrap();
190 let now = Instant::now();
191
192 let expired_objects_toi: std::collections::HashSet<u128> = self
193 .objects
194 .iter()
195 .filter_map(|(key, object)| {
196 let duration = object.last_activity_duration_since(now);
197 if duration.gt(object_timeout) {
198 log::warn!(
199 "Object Expired ! tsi={} toi={} state : {:?}
200 location: {:?} attached={:?} blocks completed={}/{} last activity={:?} max={:?}
201 transfer_length={:?} byte_left={:?}",
202 object.tsi,
203 object.toi,
204 object.state,
205 object.content_location.as_ref().map(|u| u.to_string()),
206 object.fdt_instance_id,
207 object.nb_block_completed(),
208 object.nb_block(),
209 duration,
210 object_timeout,
211 object.transfer_length,
212 object.byte_left()
213 );
214 Some(*key)
215 } else {
216 None
217 }
218 })
219 .collect();
220
221 for toi in expired_objects_toi {
222 self.objects_error.remove(&toi);
223 self.objects.remove(&toi);
224 }
225 }
226
227 pub fn push_data(&mut self, data: &[u8], now: std::time::SystemTime) -> Result<()> {
245 self.last_timestamp = Some(now);
246 let alc = alc::parse_alc_pkt(data)?;
247 if alc.lct.tsi != self.tsi {
248 return Ok(());
249 }
250
251 self.push(&alc, now)
252 }
253
254 pub fn push(&mut self, alc_pkt: &alc::AlcPkt, now: std::time::SystemTime) -> Result<()> {
268 debug_assert!(self.tsi == alc_pkt.lct.tsi);
269 self.last_activity = Instant::now();
270 self.last_timestamp = Some(now);
271
272 if alc_pkt.lct.close_session {
273 log::info!("Close session");
274 self.closed_is_imminent = true;
275 }
276
277 match alc_pkt.lct.toi {
278 toi if toi == lct::TOI_FDT => self.push_fdt_obj(alc_pkt, now),
279 _ => self.push_obj(alc_pkt, now),
280 }
281 }
282
283 fn is_fdt_received(&self, fdt_instance_id: u32) -> bool {
284 self.fdt_current
285 .iter()
286 .any(|fdt| fdt.fdt_id == fdt_instance_id)
287 }
288
289 fn push_fdt_obj(&mut self, alc_pkt: &alc::AlcPkt, now: std::time::SystemTime) -> Result<()> {
290 if alc_pkt.fdt_info.is_none() {
291 if alc_pkt.lct.close_object {
292 return Ok(());
293 }
294
295 if alc_pkt.lct.close_session {
296 return Ok(());
297 }
298
299 return Err(FluteError::new("FDT pkt received without FDT Extension"));
300 }
301 let fdt_instance_id = alc_pkt
302 .fdt_info
303 .as_ref()
304 .map(|f| f.fdt_instance_id)
305 .unwrap();
306
307 if self.config.object_receive_once && self.is_fdt_received(fdt_instance_id) {
308 return Ok(());
309 }
310
311 {
312 let fdt_receiver = self
313 .fdt_receivers
314 .entry(fdt_instance_id)
315 .or_insert(Box::new(FdtReceiver::new(
316 &self.endpoint,
317 self.tsi,
318 fdt_instance_id,
319 self.config.enable_fdt_expiration_check,
320 now,
321 )));
322
323 if fdt_receiver.state() != fdtreceiver::FDTState::Receiving {
324 log::warn!(
325 "TSI={} FDT state is {:?}, bug ?",
326 self.tsi,
327 fdt_receiver.state()
328 );
329 return Ok(());
330 }
331
332 fdt_receiver.push(alc_pkt, now);
333
334 if fdt_receiver.state() == fdtreceiver::FDTState::Complete {
335 fdt_receiver.update_expired_state(now);
336 }
337
338 match fdt_receiver.state() {
339 fdtreceiver::FDTState::Receiving => return Ok(()),
340 fdtreceiver::FDTState::Complete => {}
341 fdtreceiver::FDTState::Error => return Err(FluteError::new("Fail to decode FDT")),
342 fdtreceiver::FDTState::Expired => {
343 let expiration = fdt_receiver.get_expiration_time().unwrap_or(now);
344 let server_time = fdt_receiver.get_server_time(now);
345
346 let expiration: chrono::DateTime<chrono::Utc> = expiration.into();
347 let server_time: chrono::DateTime<chrono::Utc> = server_time.into();
348
349 log::warn!(
350 "TSI={} FDT has been received but is already expired expiration time={} server time={}",
351 self.tsi,
352 expiration.to_rfc3339(),
353 server_time.to_rfc3339()
354 );
355 return Ok(());
356 }
357 };
358 }
359
360 if let Some(previous_fdt) = self.fdt_current.front() {
361 if previous_fdt.fdt_id + 1 != fdt_instance_id && previous_fdt.fdt_id != fdt_instance_id
362 {
363 log::warn!(
364 "TSI={} Previous FDT ID {} was current is {} is there an FDT missing ?",
365 self.tsi,
366 previous_fdt.fdt_id,
367 fdt_instance_id
368 );
369 }
370 }
371
372 let fdt_current = self.fdt_receivers.remove(&fdt_instance_id);
373 if let Some(mut fdt_current) = fdt_current {
374 if let Some(xml) = fdt_current.fdt_xml_str() {
375 let expiration_date = fdt_current
376 .fdt_instance()
377 .map(|inst| inst.get_expiration_date().unwrap_or(now))
378 .unwrap_or(now);
379
380 let meta = fdt_current.fdt_meta().unwrap();
381 let transfer_duration = now
382 .duration_since(fdt_current.reception_start_time)
383 .unwrap_or(std::time::Duration::new(0, 0));
384
385 self.writer.fdt_received(
386 &self.endpoint,
387 &self.tsi,
388 &xml,
389 expiration_date,
390 meta,
391 transfer_duration,
392 now,
393 fdt_current.ext_time,
394 );
395 }
396 self.fdt_current.push_front(fdt_current);
397 self.attach_latest_fdt_to_objects(now);
398 self.gc_object_completed();
399 self.update_expiration_date_of_completed_objects_using_latest_fdt(now);
400
401 if self.fdt_current.len() > 10 {
402 self.fdt_current.pop_back();
403 }
404 }
405
406 Ok(())
407 }
408
409 fn attach_latest_fdt_to_objects(&mut self, now: std::time::SystemTime) -> Option<()> {
410 let fdt = self.fdt_current.front_mut()?;
411 let fdt_id = fdt.fdt_id;
412 let fdt_instance = fdt.fdt_instance()?;
413 log::debug!("TSI={} Attach FDT id {}", self.tsi, fdt_id);
414 let mut check_state = Vec::new();
415 for obj in &mut self.objects {
416 let success = obj.1.attach_fdt(fdt_id, fdt_instance, now);
417 if success {
418 check_state.push(*obj.0);
419 }
420 }
421
422 for toi in check_state {
423 self.check_object_state(toi);
424 }
425
426 Some(())
427 }
428
429 fn update_expiration_date_of_completed_objects_using_latest_fdt(
430 &mut self,
431 now: std::time::SystemTime,
432 ) -> Option<()> {
433 let fdt = self.fdt_current.front_mut()?;
434 let fdt_instance = fdt.fdt_instance()?;
435 let files = fdt_instance.file.as_ref()?;
436 let fdt_expiration_date = fdt_instance.get_expiration_date();
437
438 for file in files {
439 let toi: u128 = file.toi.parse().unwrap_or_default();
440 let cache_control = file.get_object_cache_control(fdt_expiration_date);
441 if let Some(obj) = self.objects_completed.get_mut(&toi) {
442 if obj.metadata.cache_control.should_update(cache_control) {
443 obj.metadata.cache_control = cache_control;
444 self.writer.update_cache_control(
445 &self.endpoint,
446 &self.tsi,
447 &toi,
448 &obj.metadata,
449 now,
450 );
451 }
452 }
453 }
454
455 Some(())
456 }
457
458 fn push_obj(&mut self, pkt: &alc::AlcPkt, now: SystemTime) -> Result<()> {
459 if self.objects_completed.contains_key(&pkt.lct.toi) {
460 if self.config.object_receive_once {
461 return Ok(());
462 }
463
464 let payload_id = alc::get_fec_inline_payload_id(pkt)?;
465 if payload_id.sbn == 0 && payload_id.esi == 0 {
466 self.objects_completed.remove(&pkt.lct.toi);
467 } else {
468 return Ok(());
469 }
470 }
471 if self.objects_error.contains(&pkt.lct.toi) {
472 let payload_id = alc::get_fec_inline_payload_id(pkt)?;
473 if payload_id.sbn == 0 && payload_id.esi == 0 {
474 log::warn!("Re-download object after errors");
475 self.objects_error.remove(&pkt.lct.toi);
476 } else {
477 return Ok(());
478 }
479 }
480
481 let mut obj = self.objects.get_mut(&pkt.lct.toi);
482 if obj.is_none() {
483 self.create_obj(&pkt.lct.toi, now);
484 obj = self.objects.get_mut(&pkt.lct.toi);
485 }
486
487 let obj = match obj {
488 Some(obj) => obj.as_mut(),
489 None => return Err(FluteError::new("Bug ? Object not found")),
490 };
491
492 obj.push(pkt, now);
493 self.check_object_state(pkt.lct.toi);
494
495 Ok(())
496 }
497
498 fn check_object_state(&mut self, toi: u128) {
499 let obj = self.objects.get_mut(&toi);
500 if obj.is_none() {
501 return;
502 }
503 let mut remove_object = false;
504
505 {
506 let obj = obj.unwrap();
507
508 match obj.state {
509 objectreceiver::State::Receiving => {}
510 objectreceiver::State::Completed => {
511 remove_object = true;
512 log::debug!(
513 "Object state is completed {:?} tsi={} toi={}",
514 self.endpoint,
515 self.tsi,
516 obj.toi
517 );
518
519 if obj.cache_control != Some(ObjectCacheControl::NoCache) {
520 self.objects_completed.insert(
521 obj.toi,
522 ObjectCompletedMeta {
523 metadata: obj.create_meta(),
524 },
525 );
526 } else {
527 if obj.cache_control.is_none() {
528 log::error!("No cache expiration date for {:?}", obj.content_location);
529 }
530 }
531 }
532 objectreceiver::State::Interrupted => {
533 log::debug!(
534 "Object transmission interrupted tsi={} toi={}",
535 self.tsi,
536 obj.toi
537 );
538 remove_object = true;
539 self.objects_error.insert(toi);
540 self.gc_object_error();
541 }
542 objectreceiver::State::Error => {
543 log::error!("Object in error state tsi={} toi={}", self.tsi, obj.toi);
544 remove_object = true;
545 self.objects_error.insert(toi);
546 self.gc_object_error();
547 }
548 }
549 }
550
551 if remove_object {
552 log::debug!(
553 "Remove object {:?} tsi={} toi={}",
554 self.endpoint,
555 self.tsi,
556 toi
557 );
558 self.objects.remove(&toi);
559 }
560 }
561
562 fn gc_object_completed(&mut self) {
563 let current_fdt = match self.fdt_current.front_mut() {
564 Some(fdt) => fdt,
565 None => return,
566 };
567
568 let instance = match current_fdt.fdt_instance() {
569 Some(instance) => instance,
570 None => return,
571 };
572
573 let before = self.objects_completed.len();
574 if let Some(files) = instance.file.as_ref() {
575 let current_tois: std::collections::HashSet<u128> = files
576 .iter()
577 .map(|file| file.toi.parse().unwrap_or(0))
578 .collect();
579 self.objects_completed
580 .retain(|toi, _meta| current_tois.contains(toi));
581 }
582 let after = self.objects_completed.len();
583 if before != after {
584 log::debug!("GC remove {} / {} objects", before - after, before);
585 }
586 }
587
588 fn gc_object_error(&mut self) {
589 while self.objects_error.len() > self.config.max_objects_error {
590 let toi = self.objects_error.pop_first().unwrap();
591 self.objects.remove(&toi);
592 }
593 }
594
595 fn create_obj(&mut self, toi: &u128, now: SystemTime) {
596 let mut obj = Box::new(ObjectReceiver::new(
597 &self.endpoint,
598 self.tsi,
599 toi,
600 None,
601 self.writer.clone(),
602 self.config
603 .object_max_cache_size
604 .unwrap_or(10 * 1024 * 1024),
605 now,
606 ));
607
608 let mut is_attached = false;
609 for (fdt_index, fdt) in (&mut self.fdt_current.iter_mut()).enumerate() {
610 let fdt_id = fdt.fdt_id;
611 fdt.update_expired_state(now);
612 if fdt.state() == fdtreceiver::FDTState::Complete {
613 if let Some(fdt_instance) = fdt.fdt_instance() {
614 let success = obj.attach_fdt(fdt_id, fdt_instance, now);
615 if success {
616 is_attached = true;
617 if fdt_index != 0 {
618 log::warn!(
619 "TSI={} TOI={} CL={:?} Attaching an object to an FDT that is not the latest (index={}) ",
620 self.tsi,
621 obj.toi,
622 obj.content_location,
623 fdt_index
624 );
625 }
626
627 break;
628 }
629 }
630 }
631 }
632
633 if !is_attached {
634 log::warn!(
635 "Object received before the FDT TSI={} TOI={}",
636 self.tsi,
637 toi
638 );
639 }
640
641 self.objects.insert(*toi, obj);
642 }
643}