Skip to main content

dvb_ci_runtime/
stack.rs

1//! The CI protocol stack — composes the transport + session layers (and, as
2//! they land, the resource state machines) into one sans-IO core.
3//!
4//! [`CiStack::handle`] is the pure entry point: feed it an [`Event`], get back
5//! the [`Action`]s the driver must perform. No I/O, threads, or clock here.
6
7use crate::event::{Action, Event, HostRequest, Notification};
8use crate::resource::{
9    ApplicationInformation, ConditionalAccess, DateTime, Mmi, Resource, ResourceManager,
10    ResourceOut,
11};
12use crate::session::{SessionLayer, SessionOut};
13use crate::transport::{Out as TransportOut, Transport};
14
15use dvb_ci::builder::{build_ca_pmt, build_ca_pmt_for_caids};
16use dvb_ci::objects::ca_pmt::{CaPmtCmdId, CaPmtListManagement};
17use dvb_ci::objects::mmi_high::{Answ, AnswId, MenuAnsw};
18use dvb_ci::resource::{ResourceId, CONDITIONAL_ACCESS_SUPPORT, DATE_TIME, MMI, RESOURCE_MANAGER};
19use dvb_common::{Parse, Serialize};
20use dvb_si::tables::pmt::PmtSection;
21
22/// Serialize an APDU object to owned bytes (buffer is sized exactly).
23fn ser_apdu<S: Serialize>(s: &S) -> Vec<u8> {
24    let mut b = vec![0u8; s.serialized_len()];
25    match s.serialize_into(&mut b) {
26        Ok(n) => b.truncate(n),
27        Err(_) => b.clear(),
28    }
29    b
30}
31
32/// The composed EN 50221 protocol core.
33pub struct CiStack {
34    transport: Transport,
35    session: SessionLayer,
36    /// Application-layer resource handlers, dispatched by `ResourceId`.
37    resources: Vec<Box<dyn Resource>>,
38    /// `CA_system_id`s the CAM advertised in its `ca_info` (the descramble
39    /// filter set; empty until `ca_info` arrives).
40    cam_caids: Vec<u16>,
41    /// PMT section bytes of an in-flight [`HostRequest::Descramble`], awaiting
42    /// the `ca_pmt_reply` that triggers the `ok_descrambling` follow-up.
43    pending_descramble: Option<Vec<u8>>,
44}
45
46impl Default for CiStack {
47    fn default() -> Self {
48        Self::new()
49    }
50}
51
52impl CiStack {
53    /// New stack on transport connection `t_c_id = 1`. The host advertises the
54    /// Resource Manager and registers the RM + application_information +
55    /// conditional_access handlers.
56    #[must_use]
57    pub fn new() -> Self {
58        // The host genuinely *provides* Resource Manager + Date-Time; that is the
59        // list the RM advertises in its `profile` reply. The module opens the
60        // sessions; the host accepts any resource it has a handler for (see
61        // `drive_session`).
62        let host_resources = vec![RESOURCE_MANAGER, DATE_TIME];
63        Self {
64            transport: Transport::new(1),
65            session: SessionLayer::new(),
66            resources: vec![
67                Box::new(ResourceManager::new(host_resources)),
68                Box::new(ApplicationInformation),
69                Box::new(ConditionalAccess),
70                Box::new(DateTime::new()),
71                Box::new(Mmi),
72            ],
73            cam_caids: Vec::new(),
74            pending_descramble: None,
75        }
76    }
77
78    /// Register an additional resource handler.
79    pub fn register(&mut self, resource: Box<dyn Resource>) -> &mut Self {
80        self.resources.push(resource);
81        self
82    }
83
84    /// Index of the registered handler for `resource`, if any.
85    fn handler_index(&self, resource: ResourceId) -> Option<usize> {
86        self.resources.iter().position(|r| r.id() == resource)
87    }
88
89    /// The pure sans-IO entry point.
90    pub fn handle(&mut self, event: Event<'_>) -> Vec<Action> {
91        match event {
92            Event::Host(HostRequest::Init) => {
93                let mut actions = vec![Action::Reset, Action::QuerySlot];
94                let out = self.transport.init();
95                actions.extend(self.emit_transport(out));
96                actions
97            }
98            Event::Tick { elapsed } => {
99                let out = self.transport.tick(elapsed);
100                let mut actions = self.emit_transport(out);
101                // Advance each open resource's timers (e.g. date_time resend).
102                for (session_nb, resource) in self.session.sessions() {
103                    if let Some(i) = self.handler_index(resource) {
104                        let out = self.resources[i].tick(elapsed);
105                        actions.extend(self.process_resource_out(session_nb, out));
106                    }
107                }
108                actions
109            }
110            Event::Readable(frame) => {
111                let out = self.transport.on_frame(frame);
112                self.emit_transport(out)
113            }
114            Event::Host(HostRequest::SendCaPmt(apdu)) => {
115                self.send_to_resource(CONDITIONAL_ACCESS_SUPPORT, apdu)
116            }
117            Event::Host(HostRequest::Descramble(pmt)) => self.descramble(pmt),
118            Event::Host(HostRequest::MmiMenuAnswer(choice_ref)) => {
119                let apdu = ser_apdu(&MenuAnsw { choice_ref });
120                self.send_to_resource(MMI, &apdu)
121            }
122            Event::Host(HostRequest::MmiEnquiryAnswer(text)) => {
123                let apdu = ser_apdu(&Answ {
124                    answ_id: AnswId::Answer,
125                    text_chars: text,
126                });
127                self.send_to_resource(MMI, &apdu)
128            }
129            Event::Host(HostRequest::MmiCancel) => {
130                let apdu = ser_apdu(&Answ {
131                    answ_id: AnswId::Cancel,
132                    text_chars: &[],
133                });
134                self.send_to_resource(MMI, &apdu)
135            }
136            Event::Host(HostRequest::Shutdown) => Vec::new(),
137        }
138    }
139
140    /// React to a CA notification as it is surfaced: cache the CAM's CAIDs from
141    /// `ca_info`, and complete a pending [`HostRequest::Descramble`] by sending
142    /// `ok_descrambling` when the `ca_pmt_reply` says descrambling is possible.
143    fn on_ca_notification(&mut self, note: &Notification) -> Vec<Action> {
144        match note {
145            Notification::CaInfo { ca_system_ids } => {
146                self.cam_caids = ca_system_ids.clone();
147                Vec::new()
148            }
149            Notification::CaPmtReply {
150                descrambling_ok, ..
151            } => match self.pending_descramble.take() {
152                Some(pmt) if *descrambling_ok => {
153                    match self.build_ca_pmt_bytes(&pmt, CaPmtCmdId::OkDescrambling) {
154                        Ok(bytes) => self.send_to_resource(CONDITIONAL_ACCESS_SUPPORT, &bytes),
155                        Err(detail) => vec![Action::Notify(Notification::Error { detail })],
156                    }
157                }
158                _ => Vec::new(),
159            },
160            _ => Vec::new(),
161        }
162    }
163
164    /// Begin a [`HostRequest::Descramble`]: build a CAID-filtered `ca_pmt` with
165    /// `cmd_id = query` and send it, recording the PMT so the `ok_descrambling`
166    /// follow-up can be built when the reply arrives.
167    fn descramble(&mut self, pmt: &[u8]) -> Vec<Action> {
168        let bytes = match self.build_ca_pmt_bytes(pmt, CaPmtCmdId::Query) {
169            Ok(b) => b,
170            Err(detail) => return vec![Action::Notify(Notification::Error { detail })],
171        };
172        self.pending_descramble = Some(pmt.to_vec());
173        self.send_to_resource(CONDITIONAL_ACCESS_SUPPORT, &bytes)
174    }
175
176    /// Build a CAID-filtered `ca_pmt` APDU (`list_management = only`) for `pmt`
177    /// with the given command id. Filters to the CAM's advertised CAIDs once
178    /// `ca_info` is known; falls back to all `CA_descriptor`s before then.
179    fn build_ca_pmt_bytes(&self, pmt: &[u8], cmd_id: CaPmtCmdId) -> Result<Vec<u8>, String> {
180        let parsed = PmtSection::parse(pmt).map_err(|e| format!("invalid PMT: {e}"))?;
181        let lm = CaPmtListManagement::Only;
182        let built = if self.cam_caids.is_empty() {
183            build_ca_pmt(&parsed, lm, cmd_id)
184        } else {
185            build_ca_pmt_for_caids(&parsed, &self.cam_caids, lm, cmd_id)
186        };
187        Ok(built.to_bytes())
188    }
189
190    /// Send an APDU to the open session bound to `resource` (if any).
191    fn send_to_resource(&mut self, resource: ResourceId, apdu: &[u8]) -> Vec<Action> {
192        // Find the session_nb for the resource (linear scan over the small set).
193        let nb = (1u16..=u16::MAX).find(|&n| self.session.resource_of(n) == Some(resource));
194        match nb {
195            Some(nb) => {
196                let spdu = self.session.send_apdu(nb, apdu);
197                let out = self.transport.send_spdu(&spdu);
198                self.emit_transport(out)
199            }
200            None => vec![Action::Notify(Notification::Error {
201                detail: format!("no open session for resource {}", resource.name()),
202            })],
203        }
204    }
205
206    /// Convert a transport [`Out`](TransportOut) into actions, driving any
207    /// reassembled SPDUs up through the session layer.
208    fn emit_transport(&mut self, out: TransportOut) -> Vec<Action> {
209        let mut actions = Vec::new();
210        for w in out.writes {
211            actions.push(Action::Write(w));
212        }
213        if let Some(after) = out.timer {
214            actions.push(Action::SetTimer { after });
215        }
216        if let Some(err) = out.error {
217            actions.push(Action::Notify(Notification::Error {
218                detail: err.to_string(),
219            }));
220        }
221        for spdu in out.spdus {
222            actions.extend(self.drive_session(&spdu));
223        }
224        actions
225    }
226
227    /// Feed one SPDU to the session layer and convert its output to actions.
228    fn drive_session(&mut self, spdu: &[u8]) -> Vec<Action> {
229        // The module opens every session (§7.2.3); the host accepts an
230        // `open_session_request` for any resource it has a handler for — both
231        // host-provided (resource_manager, date_time) and the module-provided
232        // ones the host engages (application_information, conditional_access,
233        // mmi). #340: the old code only accepted `provided`, so it rejected the
234        // module's app-info/ca opens with `resource_non_existent`.
235        let handled: Vec<ResourceId> = self.resources.iter().map(|r| r.id()).collect();
236        let SessionOut {
237            spdus,
238            apdus,
239            opened,
240            closed,
241        } = self.session.on_spdu(spdu, |r| handled.contains(&r));
242
243        let mut actions = Vec::new();
244        // Session-layer SPDUs (e.g. open_session_response) go down the transport.
245        for s in spdus {
246            actions.extend(self.send_spdu_actions(&s));
247        }
248        for (session_nb, resource) in opened {
249            actions.push(Action::Notify(Notification::SessionOpened { resource }));
250            // Drive the resource handler's on_open (e.g. RM sends profile_enq).
251            if let Some(i) = self.handler_index(resource) {
252                let out = self.resources[i].on_open();
253                actions.extend(self.process_resource_out(session_nb, out));
254            }
255        }
256        for session_nb in closed {
257            actions.push(Action::Notify(Notification::SessionClosed { session_nb }));
258        }
259        // Route each APDU to the resource handler bound to its session.
260        for (session_nb, apdu) in apdus {
261            if let Some(resource) = self.session.resource_of(session_nb) {
262                if let Some(i) = self.handler_index(resource) {
263                    let out = self.resources[i].on_apdu(&apdu);
264                    actions.extend(self.process_resource_out(session_nb, out));
265                }
266            }
267        }
268        actions
269    }
270
271    /// Wrap an SPDU as a `T_Data_Last` and collect the resulting actions.
272    fn send_spdu_actions(&mut self, spdu: &[u8]) -> Vec<Action> {
273        let t = self.transport.send_spdu(spdu);
274        let mut actions = Vec::new();
275        for w in t.writes {
276            actions.push(Action::Write(w));
277        }
278        if let Some(after) = t.timer {
279            actions.push(Action::SetTimer { after });
280        }
281        actions
282    }
283
284    /// Convert a [`ResourceOut`] into actions: send its APDUs on `session_nb`,
285    /// surface its notifications, and open any module resources it requested.
286    fn process_resource_out(&mut self, session_nb: u16, out: ResourceOut) -> Vec<Action> {
287        let mut actions = Vec::new();
288        for apdu in out.apdus {
289            let spdu = self.session.send_apdu(session_nb, &apdu);
290            actions.extend(self.send_spdu_actions(&spdu));
291        }
292        for note in out.notify {
293            // Drive the auto-descramble sequence off the CA notifications.
294            let follow = self.on_ca_notification(&note);
295            actions.push(Action::Notify(note));
296            actions.extend(follow);
297        }
298        for resource in out.open {
299            let spdu = self.session.create_session(resource);
300            actions.extend(self.send_spdu_actions(&spdu));
301        }
302        actions
303    }
304}
305
306#[cfg(test)]
307mod tests {
308    use super::*;
309    use crate::transport::DEFAULT_POLL_INTERVAL;
310    use dvb_ci::resource::RESOURCE_MANAGER;
311    use dvb_ci::spdu::{tags as spdu_tags, OpenSessionRequest};
312    use dvb_ci::tpdu::{tags as tpdu_tags, SbValue};
313    use dvb_common::Serialize;
314
315    fn ser<S: Serialize>(s: &S) -> Vec<u8> {
316        let mut b = vec![0u8; s.serialized_len()];
317        match s.serialize_into(&mut b) {
318            Ok(n) => b.truncate(n),
319            Err(_) => b.clear(),
320        }
321        b
322    }
323
324    /// Wrap an SPDU as a module→host `T_Data_Last` R_TPDU (+ T_SB, DA clear).
325    fn r_data(tcid: u8, spdu: &[u8]) -> Vec<u8> {
326        let mut v = vec![tpdu_tags::DATA_LAST, (1 + spdu.len()) as u8, tcid];
327        v.extend_from_slice(spdu);
328        v.extend_from_slice(&[tpdu_tags::SB, 0x02, tcid, SbValue::new(false).0]);
329        v
330    }
331
332    #[test]
333    fn init_resets_and_opens_transport() {
334        let mut s = CiStack::new();
335        let a = s.handle(Event::Host(HostRequest::Init));
336        assert_eq!(a[0], Action::Reset);
337        assert_eq!(a[1], Action::QuerySlot);
338        assert!(matches!(&a[2], Action::Write(w) if w[0] == tpdu_tags::CREATE_T_C));
339    }
340
341    #[test]
342    fn full_pipeline_opens_a_session_for_a_provided_resource() {
343        let mut s = CiStack::new();
344        s.handle(Event::Host(HostRequest::Init));
345        // module accepts the transport connection
346        s.handle(Event::Readable(&[tpdu_tags::C_T_C_REPLY, 0x01, 0x01]));
347        // module opens a session to the host's resource_manager (carried in an
348        // R_TPDU data block)
349        let osr = ser(&OpenSessionRequest {
350            resource: RESOURCE_MANAGER,
351        });
352        let actions = s.handle(Event::Readable(&r_data(1, &osr)));
353
354        // a SessionOpened notification surfaced...
355        assert!(actions.iter().any(|x| matches!(
356            x,
357            Action::Notify(Notification::SessionOpened {
358                resource
359            }) if *resource == RESOURCE_MANAGER
360        )));
361        // ...and an open_session_response was written back down (inside a TPDU).
362        let wrote_osr = actions.iter().any(|x| match x {
363            Action::Write(w) => w
364                .windows(1)
365                .any(|_| w.contains(&spdu_tags::OPEN_SESSION_RESPONSE)),
366            _ => false,
367        });
368        assert!(wrote_osr, "open_session_response must be sent down");
369
370        // and the session is tracked + a valid response decodes
371        let nb = (1u16..16).find(|&n| s.session.resource_of(n).is_some());
372        assert!(nb.is_some());
373    }
374
375    #[test]
376    fn tick_drives_poll_when_active() {
377        let mut s = CiStack::new();
378        s.handle(Event::Host(HostRequest::Init));
379        s.handle(Event::Readable(&[tpdu_tags::C_T_C_REPLY, 0x01, 0x01]));
380        let a = s.handle(Event::Tick {
381            elapsed: DEFAULT_POLL_INTERVAL,
382        });
383        assert!(a
384            .iter()
385            .any(|x| matches!(x, Action::Write(w) if w.first() == Some(&tpdu_tags::DATA_LAST))));
386    }
387
388    // --- #334: the auto-descramble (query -> reply -> ok) sequence ---
389
390    /// Feed standalone `T_SB`s (data_available = 0) — the module acking each host
391    /// block — until the stack stops writing, collecting every action. This
392    /// drains the transport's one-block-per-turn outbound queue (#337).
393    fn pump_sbs(s: &mut CiStack) -> Vec<Action> {
394        let mut all = Vec::new();
395        for _ in 0..16 {
396            let a = s.handle(Event::Readable(&[
397                tpdu_tags::SB,
398                0x02,
399                0x01,
400                SbValue::new(false).0,
401            ]));
402            let wrote = a.iter().any(|x| matches!(x, Action::Write(_)));
403            all.extend(a);
404            if !wrote {
405                break;
406            }
407        }
408        all
409    }
410
411    /// Wrap an APDU for delivery on `session_nb` (session_number prefix), then as
412    /// a module→host R_TPDU.
413    fn r_apdu(session_nb: u16, apdu: &[u8]) -> Vec<u8> {
414        use dvb_ci::spdu::SessionNumber;
415        let mut spdu = ser(&SessionNumber { session_nb });
416        spdu.extend_from_slice(apdu);
417        r_data(1, &spdu)
418    }
419
420    /// Minimal PMT: program_info has one CA_descriptor (CAID 0x0B00) + a non-CA
421    /// descriptor; one clear ES. Mirrors the dvb-ci builder fixture.
422    fn build_pmt() -> Vec<u8> {
423        let prog_ca = [0x09u8, 0x04, 0x0B, 0x00, 0xE1, 0x00];
424        let reg = [0x05u8, 0x04, b'H', b'D', b'M', b'V'];
425        let mut program_info = Vec::new();
426        program_info.extend_from_slice(&prog_ca);
427        program_info.extend_from_slice(&reg);
428        let lang = [0x0Au8, 0x04, b'e', b'n', b'g', 0x00];
429
430        let mut body = Vec::new();
431        body.push(0x02); // table_id
432        body.push(0);
433        body.push(0); // section_length placeholder
434        body.extend_from_slice(&[0x00, 0x01]); // program_number 1
435        body.push(0xC3); // version 1, current_next 1
436        body.push(0x00);
437        body.push(0x00);
438        body.push(0xE0 | 0x02); // PCR_PID 0x0200
439        body.push(0x00);
440        let pil = program_info.len();
441        body.push(0xF0 | ((pil >> 8) as u8 & 0x0F));
442        body.push(pil as u8);
443        body.extend_from_slice(&program_info);
444        // one clear ES
445        body.push(0x03);
446        body.push(0xE0 | 0x02);
447        body.push(0x01);
448        body.push(0xF0 | ((lang.len() >> 8) as u8 & 0x0F));
449        body.push(lang.len() as u8);
450        body.extend_from_slice(&lang);
451
452        let section_length = body.len() - 3 + 4;
453        body[1] = 0xB0 | ((section_length >> 8) as u8 & 0x0F);
454        body[2] = section_length as u8;
455        let crc = dvb_common::crc32_mpeg2::compute(&body);
456        body.extend_from_slice(&crc.to_be_bytes());
457        body
458    }
459
460    /// Drive the full handshake to open conditional-access + mmi sessions with
461    /// the CAM's CAIDs learned, following the real flow: module opens RM → host
462    /// `profile_change` → module opens its resource sessions (#340).
463    fn stack_with_ca_session() -> CiStack {
464        use dvb_ci::objects::ca_info::CaInfo;
465        use dvb_ci::objects::resource_manager::Profile;
466        use dvb_ci::resource::{APPLICATION_INFORMATION, CONDITIONAL_ACCESS_SUPPORT, MMI};
467        use dvb_ci::spdu::OpenSessionRequest;
468
469        let mut s = CiStack::new();
470        s.handle(Event::Host(HostRequest::Init));
471        s.handle(Event::Readable(&[tpdu_tags::C_T_C_REPLY, 0x01, 0x01]));
472        // module opens the host's resource_manager → RM session 1
473        s.handle(Event::Readable(&r_data(
474            1,
475            &ser(&OpenSessionRequest {
476                resource: RESOURCE_MANAGER,
477            }),
478        )));
479        // module sends its profile → host fires CamReady + sends profile_change
480        s.handle(Event::Readable(&r_apdu(
481            1,
482            &ser(&Profile {
483                resources: vec![APPLICATION_INFORMATION, CONDITIONAL_ACCESS_SUPPORT, MMI],
484            }),
485        )));
486        pump_sbs(&mut s); // flush the profile_change
487                          // unblocked, the module now opens its own resource sessions
488        for res in [APPLICATION_INFORMATION, CONDITIONAL_ACCESS_SUPPORT, MMI] {
489            s.handle(Event::Readable(&r_data(
490                1,
491                &ser(&OpenSessionRequest { resource: res }),
492            )));
493            pump_sbs(&mut s); // flush open_session_response + the on_open enq
494        }
495        // module advertises its CAIDs on the CA session
496        let ca_nb = s
497            .session
498            .sessions()
499            .into_iter()
500            .find(|&(_, r)| r == CONDITIONAL_ACCESS_SUPPORT)
501            .map(|(n, _)| n)
502            .expect("CA session open");
503        s.handle(Event::Readable(&r_apdu(
504            ca_nb,
505            &ser(&CaInfo {
506                ca_system_ids: vec![0x0B00, 0x1800],
507            }),
508        )));
509        s
510    }
511
512    #[test]
513    fn descramble_filters_then_queries_then_oks() {
514        use dvb_ci::objects::ca_pmt::CaPmtCmdId;
515        use dvb_ci::objects::ca_pmt_reply::{CaEnable, CaPmtReply};
516        use dvb_ci::resource::CONDITIONAL_ACCESS_SUPPORT;
517
518        let mut s = stack_with_ca_session();
519        let ca_nb = s
520            .session
521            .sessions()
522            .into_iter()
523            .find(|&(_, r)| r == CONDITIONAL_ACCESS_SUPPORT)
524            .map(|(n, _)| n)
525            .unwrap();
526
527        let pmt = build_pmt();
528        let mut query_actions = s.handle(Event::Host(HostRequest::Descramble(&pmt)));
529        // The query is queued behind the in-flight link; the module's SB flushes
530        // it (one block per turn — #337).
531        query_actions.extend(pump_sbs(&mut s));
532        // A ca_pmt with cmd_id = query was sent, filtered to the CAM's CAIDs.
533        let q = first_ca_pmt(&query_actions).expect("ca_pmt query sent");
534        assert_eq!(q.cmd_id, CaPmtCmdId::Query);
535        assert_eq!(
536            q.program_ca_descriptors.as_slice(),
537            &[0x09, 0x04, 0x0B, 0x00, 0xE1, 0x00]
538        );
539
540        // Module replies that descrambling is possible → stack auto-sends OK.
541        let mut ok_actions = s.handle(Event::Readable(&r_apdu(
542            ca_nb,
543            &ser(&CaPmtReply {
544                program_number: 1,
545                version_number: 1,
546                current_next_indicator: true,
547                ca_enable: Some(CaEnable::Possible),
548                streams: vec![],
549            }),
550        )));
551        assert!(ok_actions.iter().any(|a| matches!(
552            a,
553            Action::Notify(Notification::CaPmtReply {
554                descrambling_ok: true,
555                ..
556            })
557        )));
558        ok_actions.extend(pump_sbs(&mut s));
559        assert!(
560            all_ca_pmts(&ok_actions)
561                .iter()
562                .any(|c| c.cmd_id == CaPmtCmdId::OkDescrambling),
563            "ca_pmt ok_descrambling sent after a positive reply"
564        );
565    }
566
567    #[test]
568    fn descramble_reply_not_possible_sends_no_ok() {
569        use dvb_ci::objects::ca_pmt::CaPmtCmdId;
570        use dvb_ci::objects::ca_pmt_reply::CaPmtReply;
571        use dvb_ci::resource::CONDITIONAL_ACCESS_SUPPORT;
572
573        let mut s = stack_with_ca_session();
574        let ca_nb = s
575            .session
576            .sessions()
577            .into_iter()
578            .find(|&(_, r)| r == CONDITIONAL_ACCESS_SUPPORT)
579            .map(|(n, _)| n)
580            .unwrap();
581        let pmt = build_pmt();
582        let mut actions = s.handle(Event::Host(HostRequest::Descramble(&pmt)));
583        // ca_enable = None → descrambling not possible → no OK follow-up.
584        actions.extend(s.handle(Event::Readable(&r_apdu(
585            ca_nb,
586            &ser(&CaPmtReply {
587                program_number: 1,
588                version_number: 1,
589                current_next_indicator: true,
590                ca_enable: None,
591                streams: vec![],
592            }),
593        ))));
594        actions.extend(pump_sbs(&mut s));
595        // The query may be flushed, but no ok_descrambling is ever sent.
596        assert!(
597            all_ca_pmts(&actions)
598                .iter()
599                .all(|c| c.cmd_id != CaPmtCmdId::OkDescrambling),
600            "no ok_descrambling without a positive reply"
601        );
602    }
603
604    /// Whether any written frame carries the 3-byte APDU tag `want`.
605    fn wrote_apdu(actions: &[Action], want: [u8; 3]) -> bool {
606        actions
607            .iter()
608            .any(|a| matches!(a, Action::Write(w) if w.windows(3).any(|x| x == want)))
609    }
610
611    #[test]
612    fn mmi_menu_answer_sends_menu_answ() {
613        let mut s = stack_with_ca_session();
614        let mut acts = s.handle(Event::Host(HostRequest::MmiMenuAnswer(2)));
615        acts.extend(pump_sbs(&mut s));
616        // menu_answ APDU (9F 88 0B) reaches the wire.
617        assert!(wrote_apdu(&acts, [0x9F, 0x88, 0x0B]));
618    }
619
620    #[test]
621    fn mmi_enquiry_answer_sends_answ() {
622        let mut s = stack_with_ca_session();
623        let mut acts = s.handle(Event::Host(HostRequest::MmiEnquiryAnswer(b"1234")));
624        acts.extend(pump_sbs(&mut s));
625        // answ APDU (9F 88 08) reaches the wire.
626        assert!(wrote_apdu(&acts, [0x9F, 0x88, 0x08]));
627    }
628
629    /// Parse every `ca_pmt` (tag `9F 80 32`) found in the written frames,
630    /// returning each one's `cmd_id` + programme CA-descriptor bytes (owned).
631    fn all_ca_pmts(actions: &[Action]) -> Vec<CaPmtSummary> {
632        use dvb_ci::objects::ca_pmt::CaPmt;
633        use dvb_common::Parse;
634        let tag = [0x9F, 0x80, 0x32];
635        let mut out = Vec::new();
636        for a in actions {
637            if let Action::Write(w) = a {
638                if let Some(pos) = w.windows(3).position(|x| x == tag) {
639                    if let Ok(p) = CaPmt::parse(&w[pos..]) {
640                        out.push(CaPmtSummary {
641                            cmd_id: p.cmd_id.expect("programme cmd_id present"),
642                            program_ca_descriptors: p.program_ca_descriptors.to_vec(),
643                        });
644                    }
645                }
646            }
647        }
648        out
649    }
650
651    /// The first `ca_pmt` in the written frames.
652    fn first_ca_pmt(actions: &[Action]) -> Option<CaPmtSummary> {
653        all_ca_pmts(actions).into_iter().next()
654    }
655
656    struct CaPmtSummary {
657        cmd_id: dvb_ci::objects::ca_pmt::CaPmtCmdId,
658        program_ca_descriptors: Vec<u8>,
659    }
660}