Skip to main content

dvb_ci_runtime/
stack.rs

1//! The CI protocol stack — composes the transport + session layers (and, as
2//! they land, the resource state machines) into one sans-IO core.
3//!
4//! [`CiStack::handle`] is the pure entry point: feed it an [`Event`], get back
5//! the [`Action`]s the driver must perform. No I/O, threads, or clock here.
6
7use crate::event::{Action, Event, HostRequest, Notification};
8use crate::resource::{
9    ApplicationInformation, ConditionalAccess, DateTime, Mmi, Resource, ResourceManager,
10    ResourceOut,
11};
12use crate::session::{SessionLayer, SessionOut};
13use crate::transport::{Out as TransportOut, Transport};
14
15use dvb_ci::resource::{ResourceId, DATE_TIME, RESOURCE_MANAGER};
16
17/// The composed EN 50221 protocol core.
18pub struct CiStack {
19    transport: Transport,
20    session: SessionLayer,
21    /// Resources the host provides (answers incoming `open_session_request`).
22    provided: Vec<ResourceId>,
23    /// Application-layer resource handlers, dispatched by `ResourceId`.
24    resources: Vec<Box<dyn Resource>>,
25}
26
27impl Default for CiStack {
28    fn default() -> Self {
29        Self::new()
30    }
31}
32
33impl CiStack {
34    /// New stack on transport connection `t_c_id = 1`. The host advertises the
35    /// Resource Manager and registers the RM + application_information +
36    /// conditional_access handlers.
37    #[must_use]
38    pub fn new() -> Self {
39        // The host provides Resource Manager + Date-Time; it opens the
40        // module-provided application_information + conditional_access.
41        let provided = vec![RESOURCE_MANAGER, DATE_TIME];
42        Self {
43            transport: Transport::new(1),
44            session: SessionLayer::new(),
45            resources: vec![
46                Box::new(ResourceManager::new(provided.clone())),
47                Box::new(ApplicationInformation),
48                Box::new(ConditionalAccess),
49                Box::new(DateTime::new()),
50                Box::new(Mmi),
51            ],
52            provided,
53        }
54    }
55
56    /// Register an additional resource handler.
57    pub fn register(&mut self, resource: Box<dyn Resource>) -> &mut Self {
58        self.resources.push(resource);
59        self
60    }
61
62    /// Index of the registered handler for `resource`, if any.
63    fn handler_index(&self, resource: ResourceId) -> Option<usize> {
64        self.resources.iter().position(|r| r.id() == resource)
65    }
66
67    /// The pure sans-IO entry point.
68    pub fn handle(&mut self, event: Event<'_>) -> Vec<Action> {
69        match event {
70            Event::Host(HostRequest::Init) => {
71                let mut actions = vec![Action::Reset, Action::QuerySlot];
72                let out = self.transport.init();
73                actions.extend(self.emit_transport(out));
74                actions
75            }
76            Event::Tick { elapsed } => {
77                let out = self.transport.tick(elapsed);
78                let mut actions = self.emit_transport(out);
79                // Advance each open resource's timers (e.g. date_time resend).
80                for (session_nb, resource) in self.session.sessions() {
81                    if let Some(i) = self.handler_index(resource) {
82                        let out = self.resources[i].tick(elapsed);
83                        actions.extend(self.process_resource_out(session_nb, out));
84                    }
85                }
86                actions
87            }
88            Event::Readable(frame) => {
89                let out = self.transport.on_frame(frame);
90                self.emit_transport(out)
91            }
92            Event::Host(HostRequest::SendCaPmt(apdu)) => {
93                self.send_to_resource(dvb_ci::resource::CONDITIONAL_ACCESS_SUPPORT, apdu)
94            }
95            Event::Host(HostRequest::Shutdown) => Vec::new(),
96        }
97    }
98
99    /// Send an APDU to the open session bound to `resource` (if any).
100    fn send_to_resource(&mut self, resource: ResourceId, apdu: &[u8]) -> Vec<Action> {
101        // Find the session_nb for the resource (linear scan over the small set).
102        let nb = (1u16..=u16::MAX).find(|&n| self.session.resource_of(n) == Some(resource));
103        match nb {
104            Some(nb) => {
105                let spdu = self.session.send_apdu(nb, apdu);
106                let out = self.transport.send_spdu(&spdu);
107                self.emit_transport(out)
108            }
109            None => vec![Action::Notify(Notification::Error {
110                detail: format!("no open session for resource {}", resource.name()),
111            })],
112        }
113    }
114
115    /// Convert a transport [`Out`](TransportOut) into actions, driving any
116    /// reassembled SPDUs up through the session layer.
117    fn emit_transport(&mut self, out: TransportOut) -> Vec<Action> {
118        let mut actions = Vec::new();
119        for w in out.writes {
120            actions.push(Action::Write(w));
121        }
122        if let Some(after) = out.timer {
123            actions.push(Action::SetTimer { after });
124        }
125        if let Some(err) = out.error {
126            actions.push(Action::Notify(Notification::Error {
127                detail: err.to_string(),
128            }));
129        }
130        for spdu in out.spdus {
131            actions.extend(self.drive_session(&spdu));
132        }
133        actions
134    }
135
136    /// Feed one SPDU to the session layer and convert its output to actions.
137    fn drive_session(&mut self, spdu: &[u8]) -> Vec<Action> {
138        let provided = self.provided.clone();
139        let SessionOut {
140            spdus,
141            apdus,
142            opened,
143            closed,
144        } = self.session.on_spdu(spdu, |r| provided.contains(&r));
145
146        let mut actions = Vec::new();
147        // Session-layer SPDUs (e.g. open_session_response) go down the transport.
148        for s in spdus {
149            actions.extend(self.send_spdu_actions(&s));
150        }
151        for (session_nb, resource) in opened {
152            actions.push(Action::Notify(Notification::SessionOpened { resource }));
153            // Drive the resource handler's on_open (e.g. RM sends profile_enq).
154            if let Some(i) = self.handler_index(resource) {
155                let out = self.resources[i].on_open();
156                actions.extend(self.process_resource_out(session_nb, out));
157            }
158        }
159        for session_nb in closed {
160            actions.push(Action::Notify(Notification::SessionClosed { session_nb }));
161        }
162        // Route each APDU to the resource handler bound to its session.
163        for (session_nb, apdu) in apdus {
164            if let Some(resource) = self.session.resource_of(session_nb) {
165                if let Some(i) = self.handler_index(resource) {
166                    let out = self.resources[i].on_apdu(&apdu);
167                    actions.extend(self.process_resource_out(session_nb, out));
168                }
169            }
170        }
171        actions
172    }
173
174    /// Wrap an SPDU as a `T_Data_Last` and collect the resulting actions.
175    fn send_spdu_actions(&mut self, spdu: &[u8]) -> Vec<Action> {
176        let t = self.transport.send_spdu(spdu);
177        let mut actions = Vec::new();
178        for w in t.writes {
179            actions.push(Action::Write(w));
180        }
181        if let Some(after) = t.timer {
182            actions.push(Action::SetTimer { after });
183        }
184        actions
185    }
186
187    /// Convert a [`ResourceOut`] into actions: send its APDUs on `session_nb`,
188    /// surface its notifications, and open any module resources it requested.
189    fn process_resource_out(&mut self, session_nb: u16, out: ResourceOut) -> Vec<Action> {
190        let mut actions = Vec::new();
191        for apdu in out.apdus {
192            let spdu = self.session.send_apdu(session_nb, &apdu);
193            actions.extend(self.send_spdu_actions(&spdu));
194        }
195        for note in out.notify {
196            actions.push(Action::Notify(note));
197        }
198        for resource in out.open {
199            let spdu = self.session.create_session(resource);
200            actions.extend(self.send_spdu_actions(&spdu));
201        }
202        actions
203    }
204}
205
206#[cfg(test)]
207mod tests {
208    use super::*;
209    use crate::transport::DEFAULT_POLL_INTERVAL;
210    use dvb_ci::resource::RESOURCE_MANAGER;
211    use dvb_ci::spdu::{tags as spdu_tags, OpenSessionRequest};
212    use dvb_ci::tpdu::{tags as tpdu_tags, SbValue};
213    use dvb_common::Serialize;
214
215    fn ser<S: Serialize>(s: &S) -> Vec<u8> {
216        let mut b = vec![0u8; s.serialized_len()];
217        match s.serialize_into(&mut b) {
218            Ok(n) => b.truncate(n),
219            Err(_) => b.clear(),
220        }
221        b
222    }
223
224    /// Wrap an SPDU as a module→host `T_Data_Last` R_TPDU (+ T_SB, DA clear).
225    fn r_data(tcid: u8, spdu: &[u8]) -> Vec<u8> {
226        let mut v = vec![tpdu_tags::DATA_LAST, (1 + spdu.len()) as u8, tcid];
227        v.extend_from_slice(spdu);
228        v.extend_from_slice(&[tpdu_tags::SB, 0x02, tcid, SbValue::new(false).0]);
229        v
230    }
231
232    #[test]
233    fn init_resets_and_opens_transport() {
234        let mut s = CiStack::new();
235        let a = s.handle(Event::Host(HostRequest::Init));
236        assert_eq!(a[0], Action::Reset);
237        assert_eq!(a[1], Action::QuerySlot);
238        assert!(matches!(&a[2], Action::Write(w) if w[0] == tpdu_tags::CREATE_T_C));
239    }
240
241    #[test]
242    fn full_pipeline_opens_a_session_for_a_provided_resource() {
243        let mut s = CiStack::new();
244        s.handle(Event::Host(HostRequest::Init));
245        // module accepts the transport connection
246        s.handle(Event::Readable(&[tpdu_tags::C_T_C_REPLY, 0x01, 0x01]));
247        // module opens a session to the host's resource_manager (carried in an
248        // R_TPDU data block)
249        let osr = ser(&OpenSessionRequest {
250            resource: RESOURCE_MANAGER,
251        });
252        let actions = s.handle(Event::Readable(&r_data(1, &osr)));
253
254        // a SessionOpened notification surfaced...
255        assert!(actions.iter().any(|x| matches!(
256            x,
257            Action::Notify(Notification::SessionOpened {
258                resource
259            }) if *resource == RESOURCE_MANAGER
260        )));
261        // ...and an open_session_response was written back down (inside a TPDU).
262        let wrote_osr = actions.iter().any(|x| match x {
263            Action::Write(w) => w
264                .windows(1)
265                .any(|_| w.contains(&spdu_tags::OPEN_SESSION_RESPONSE)),
266            _ => false,
267        });
268        assert!(wrote_osr, "open_session_response must be sent down");
269
270        // and the session is tracked + a valid response decodes
271        let nb = (1u16..16).find(|&n| s.session.resource_of(n).is_some());
272        assert!(nb.is_some());
273    }
274
275    #[test]
276    fn tick_drives_poll_when_active() {
277        let mut s = CiStack::new();
278        s.handle(Event::Host(HostRequest::Init));
279        s.handle(Event::Readable(&[tpdu_tags::C_T_C_REPLY, 0x01, 0x01]));
280        let a = s.handle(Event::Tick {
281            elapsed: DEFAULT_POLL_INTERVAL,
282        });
283        assert!(a
284            .iter()
285            .any(|x| matches!(x, Action::Write(w) if w.first() == Some(&tpdu_tags::DATA_LAST))));
286    }
287}