flute/receiver/
fdtreceiver.rs

1use super::objectreceiver;
2use super::writer::{ObjectWriterBuilder, ObjectWriterBuilderResult};
3use crate::common::udpendpoint::UDPEndpoint;
4use crate::common::{alc, fdtinstance::FdtInstance, lct};
5use crate::{receiver::writer::ObjectMetadata, tools};
6use crate::{receiver::writer::ObjectWriter, tools::error::Result};
7use std::{cell::RefCell, rc::Rc, time::SystemTime};
8
9#[derive(Clone, Copy, PartialEq, Debug)]
10pub enum FDTState {
11    Receiving,
12    Complete,
13    Error,
14    Expired,
15}
16
17pub struct FdtReceiver {
18    pub fdt_id: u32,
19    obj: Option<Box<objectreceiver::ObjectReceiver>>,
20    inner: Rc<RefCell<FdtWriterInner>>,
21    fdt_instance: Option<FdtInstance>,
22    sender_current_time_offset: Option<std::time::Duration>,
23    sender_current_time_late: bool,
24    pub ext_time: Option<std::time::SystemTime>,
25    pub reception_start_time: SystemTime,
26    enable_expired_check: bool,
27    meta: Option<ObjectMetadata>,
28}
29
30impl std::fmt::Debug for FdtReceiver {
31    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
32        f.debug_struct("FdtReceiver")
33            .field("fdt_id", &self.fdt_id)
34            .field("obj", &self.obj)
35            .field("inner", &self.inner)
36            .field("fdt_instance", &self.fdt_instance)
37            .field(
38                "sender_current_time_offset",
39                &self.sender_current_time_offset,
40            )
41            .field("sender_current_time_late", &self.sender_current_time_late)
42            .field("receiver_start_time", &self.reception_start_time)
43            .finish()
44    }
45}
46
47#[derive(Debug)]
48struct FdtWriter {
49    inner: Rc<RefCell<FdtWriterInner>>,
50}
51
52struct FdtWriterBuilder {
53    inner: Rc<RefCell<FdtWriterInner>>,
54}
55
56#[derive(Debug)]
57struct FdtWriterInner {
58    data: Vec<u8>,
59    fdt: Option<FdtInstance>,
60    expires: Option<SystemTime>,
61    state: FDTState,
62}
63
64impl FdtReceiver {
65    pub fn new(
66        endpoint: &UDPEndpoint,
67        tsi: u64,
68        fdt_id: u32,
69        enable_expired_check: bool,
70        now: SystemTime,
71    ) -> FdtReceiver {
72        let inner = Rc::new(RefCell::new(FdtWriterInner {
73            data: Vec::new(),
74            fdt: None,
75            state: FDTState::Receiving,
76            expires: None,
77        }));
78
79        let fdt_builder = Rc::new(FdtWriterBuilder::new(inner.clone()));
80
81        FdtReceiver {
82            fdt_id,
83            obj: Some(Box::new(objectreceiver::ObjectReceiver::new(
84                endpoint,
85                tsi,
86                &lct::TOI_FDT,
87                Some(fdt_id),
88                fdt_builder,
89                1024 * 1024,
90                now,
91            ))),
92            inner: inner.clone(),
93            fdt_instance: None,
94            sender_current_time_offset: None,
95            sender_current_time_late: true,
96            reception_start_time: now,
97            enable_expired_check,
98            meta: None,
99            ext_time: None,
100        }
101    }
102
103    pub fn push(&mut self, pkt: &alc::AlcPkt, now: std::time::SystemTime) {
104        if let Ok(Some(res)) = alc::get_sender_current_time(pkt) {
105            self.ext_time = Some(res);
106            if res < now {
107                self.sender_current_time_late = true;
108                self.sender_current_time_offset = Some(now.duration_since(res).unwrap())
109            } else {
110                self.sender_current_time_late = false;
111                self.sender_current_time_offset = Some(res.duration_since(now).unwrap())
112            }
113        }
114
115        if let Some(obj) = self.obj.as_mut() {
116            obj.push(pkt, now);
117            match obj.state {
118                objectreceiver::State::Receiving => {}
119                objectreceiver::State::Completed => {
120                    self.meta = Some(obj.create_meta());
121                    self.obj = None
122                }
123                objectreceiver::State::Interrupted => {
124                    self.inner.borrow_mut().state = FDTState::Error
125                }
126                objectreceiver::State::Error => self.inner.borrow_mut().state = FDTState::Error,
127            }
128        }
129    }
130
131    pub fn get_server_time(&self, now: std::time::SystemTime) -> std::time::SystemTime {
132        if let Some(offset) = self.sender_current_time_offset {
133            if self.sender_current_time_late {
134                return now - offset;
135            } else {
136                return now + offset;
137            }
138        }
139
140        now
141    }
142
143    pub fn state(&self) -> FDTState {
144        self.inner.borrow().state
145    }
146
147    pub fn fdt_instance(&mut self) -> Option<&FdtInstance> {
148        if self.fdt_instance.is_none() {
149            let inner = self.inner.borrow();
150            let instance = inner.fdt.as_ref();
151            self.fdt_instance = instance.cloned();
152        }
153        self.fdt_instance.as_ref()
154    }
155
156    pub fn fdt_xml_str(&self) -> Option<String> {
157        let inner = self.inner.borrow();
158        String::from_utf8(inner.data.clone()).ok()
159    }
160
161    pub fn fdt_meta(&self) -> Option<&ObjectMetadata> {
162        self.meta.as_ref()
163    }
164
165    pub fn update_expired_state(&self, now: SystemTime) {
166        if self.state() != FDTState::Complete {
167            return;
168        }
169
170        if self.enable_expired_check && self.is_expired(now) {
171            let mut inner = self.inner.borrow_mut();
172            inner.state = FDTState::Expired;
173        }
174    }
175
176    fn is_expired(&self, now: SystemTime) -> bool {
177        let inner = self.inner.borrow();
178        let expires = match inner.expires {
179            Some(expires) => expires,
180            None => return true,
181        };
182
183        self.get_server_time(now) > expires
184    }
185
186    pub fn get_expiration_time(&self) -> Option<SystemTime> {
187        let inner = self.inner.borrow();
188        inner.expires
189    }
190}
191
192impl FdtWriterBuilder {
193    fn new(inner: Rc<RefCell<FdtWriterInner>>) -> Self {
194        FdtWriterBuilder { inner }
195    }
196}
197
198impl ObjectWriterBuilder for FdtWriterBuilder {
199    fn new_object_writer(
200        &self,
201        _endpoint: &UDPEndpoint,
202        _tsi: &u64,
203        _toi: &u128,
204        _meta: &ObjectMetadata,
205        _now: std::time::SystemTime,
206    ) -> ObjectWriterBuilderResult {
207        ObjectWriterBuilderResult::StoreObject(Box::new(FdtWriter {
208            inner: self.inner.clone(),
209        }))
210    }
211
212    fn update_cache_control(
213        &self,
214        _endpoint: &UDPEndpoint,
215        _tsi: &u64,
216        _toi: &u128,
217        _meta: &ObjectMetadata,
218        _now: std::time::SystemTime,
219    ) {
220    }
221
222    fn fdt_received(
223        &self,
224        _endpoint: &UDPEndpoint,
225        _tsi: &u64,
226        _fdt_xml: &str,
227        _expires: std::time::SystemTime,
228        __meta: &ObjectMetadata,
229        _transfer_duration: std::time::Duration,
230        _now: std::time::SystemTime,
231        _ext_time: Option<std::time::SystemTime>,
232    ) {
233    }
234}
235
236impl ObjectWriter for FdtWriter {
237    fn open(&self, _now: SystemTime) -> Result<()> {
238        Ok(())
239    }
240
241    fn write(&self, _sbn: u32, data: &[u8], _now: SystemTime) -> Result<()> {
242        let mut inner = self.inner.borrow_mut();
243        inner.data.extend(data);
244        Ok(())
245    }
246
247    fn complete(&self, _now: SystemTime) {
248        let mut inner = self.inner.borrow_mut();
249        match FdtInstance::parse(&inner.data) {
250            Ok(inst) => {
251                inner.expires = match inst.expires.parse::<u32>() {
252                    Ok(seconds_ntp) => tools::ntp_to_system_time((seconds_ntp as u64) << 32).ok(),
253                    _ => None,
254                };
255                inner.fdt = Some(inst);
256                inner.state = FDTState::Complete
257            }
258            Err(_) => inner.state = FDTState::Error,
259        };
260    }
261
262    fn error(&self, _now: SystemTime) {
263        let mut inner = self.inner.borrow_mut();
264        inner.state = FDTState::Error;
265    }
266
267    fn interrupted(&self, _now: SystemTime) {
268        let mut inner = self.inner.borrow_mut();
269        inner.state = FDTState::Error;
270    }
271
272    fn enable_md5_check(&self) -> bool {
273        false
274    }
275}