Skip to main content

dvb_si/
epg.rs

1//! EPG convenience layer.
2//!
3//! This module provides a high-level store for EIT (Event Information Table) data,
4//! making it easy to query the "now and next" events for a service and export
5//! a full EPG schedule.
6//!
7//! It wraps [`crate::collect::EitCollector`] to handle the multi-section
8//! reassembly of EIT tables and maintains a deduplicated, time-ordered event list
9//! for each service keyed by `(original_network_id, transport_stream_id, service_id)`.
10//!
11//! # Memory bounds
12//!
13//! The store is bounded by default to prevent memory-exhaustion from hostile
14//! or pathological EIT input — an attacker who controls
15//! `original_network_id`/`transport_stream_id`/`service_id`/`event_id` could
16//! otherwise grow the cache without bound. Two caps apply:
17//!
18//! * **`max_services`** (default [`DEFAULT_MAX_SERVICES`]) — caps the number of
19//!   distinct [`ServiceKey`] entries. When the cap is reached, incoming events
20//!   for new services are skipped until a service is removed via
21//!   [`EpgStore::retain_services`] or [`EpgStore::clear`].
22//! * **`max_events_per_service`** (default [`DEFAULT_MAX_EVENTS_PER_SERVICE`]) —
23//!   caps the number of events stored per service. When a service's cap is
24//!   reached, new events (by `event_id`) are skipped; existing event_ids are
25//!   still updated on version churn.
26//!
27//! The policy is *skip-until-space* — the same as
28//! [`crate::carousel::ModuleReassembler`] — so long-running consumers should
29//! call `retain_services` or `clear` periodically to free capacity.
30//!
31//! # Quickstart
32//!
33//! ```rust
34//! use dvb_si::epg::{EpgStore, ServiceKey};
35//! use dvb_si::collect::SectionSetCollector;
36//! use chrono::{TimeZone, Utc};
37//!
38//! let mut store = EpgStore::new();
39//!
40//! // Feed EIT sections (from a TS demux, file, etc.)
41//! // store.feed(&eit_section_bytes)?;
42//!
43//! let key = ServiceKey {
44//!     original_network_id: 1,
45//!     transport_stream_id: 1,
46//!     service_id: 100,
47//! };
48//!
49//! // Query now/next (requires EIT present/following data to be fed first)
50//! let now = Utc.with_ymd_and_hms(2026, 6, 10, 20, 0, 0).unwrap();
51//! let (now_evt, next_evt) = store.now_and_next(key, now);
52//! if let Some(evt) = now_evt {
53//!     println!("Now:  {} (until {})",
54//!         evt.event_name.as_deref().unwrap_or("?"),
55//!         evt.start_time.map(|t| t + evt.duration.unwrap_or_default())
56//!             .map(|e| e.to_string()).unwrap_or_default());
57//! }
58//! if let Some(evt) = next_evt {
59//!     println!("Next: {} at {}",
60//!         evt.event_name.as_deref().unwrap_or("?"),
61//!         evt.start_time.map(|t| t.to_string()).unwrap_or_default());
62//! }
63//! // Print tonight's schedule (events from 20:00 to midnight)
64//! let tonight = Utc.with_ymd_and_hms(2026, 6, 10, 20, 0, 0).unwrap();
65//! let midnight = Utc.with_ymd_and_hms(2026, 6, 11, 0, 0, 0).unwrap();
66//! if let Some(events) = store.schedule(key, tonight, midnight) {
67//!     for evt in &events {
68//!         println!("{:>5}  {}",
69//!             evt.start_time.map(|t| t.format("%H:%M").to_string()).unwrap_or_default(),
70//!             evt.event_name.as_deref().unwrap_or("?"));
71//!     }
72//! }
73//! ```
74//!
75//! # Pruning policy
76//!
77//! The store accumulates events within its configured caps. To bound growth
78//! under schedule churn, use [`EpgStore::retain_services`] to remove services
79//! that are no longer of interest and [`EpgStore::clear`] to reset all state at
80//! a carousel boundary.  The underlying [`crate::collect::EitCollector`] handles
81//! version-driven section-set replacement automatically — when the
82//! `version_number` on a sub-table changes, the old partial set is discarded and
83//! a new one begins.  Callers scanning a full carousel cycle can `clear()` and
84//! start fresh, or `retain_services` to keep only the active service list.
85
86use crate::collect::CollectResult;
87use crate::tables::RunningStatus;
88use alloc::collections::BTreeMap;
89use alloc::format;
90use alloc::string::String;
91use alloc::vec::Vec;
92
93/// Logical key identifying a service across the DVB network.
94#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)]
95#[cfg_attr(feature = "serde", derive(serde::Serialize))]
96pub struct ServiceKey {
97    /// original_network_id from the EIT/SDT.
98    pub original_network_id: u16,
99    /// transport_stream_id from the EIT/SDT.
100    pub transport_stream_id: u16,
101    /// service_id.
102    pub service_id: u16,
103}
104
105/// A parental rating entry from a
106/// [`ParentalRatingDescriptor`](crate::descriptors::parental_rating::ParentalRatingDescriptor).
107#[non_exhaustive]
108#[derive(Debug, Clone, PartialEq, Eq)]
109#[cfg_attr(feature = "serde", derive(serde::Serialize))]
110pub struct Rating {
111    /// Three-character ISO 3166 country code.
112    pub country: String,
113    /// Rating code per EN 300 468 §6.2.13 (0x01..=0x0F => minimum age = value+3).
114    pub value: u8,
115}
116
117impl Rating {
118    /// Minimum age if `value` falls in the range 0x01..=0x0F, else `None`.
119    #[must_use]
120    pub fn minimum_age(&self) -> Option<u8> {
121        match self.value {
122            0x01..=0x0F => Some(self.value + 3),
123            _ => None,
124        }
125    }
126}
127
128/// CRID type per TS 102 323 Table 117.
129///
130/// Re-exported from [`crate::descriptors::content_identifier::CridType`].
131pub use crate::descriptors::content_identifier::CridType;
132
133/// A content reference identifier entry from a
134/// [`ContentIdentifierDescriptor`](crate::descriptors::content_identifier::ContentIdentifierDescriptor).
135#[non_exhaustive]
136#[derive(Debug, Clone, PartialEq, Eq)]
137#[cfg_attr(feature = "serde", derive(serde::Serialize))]
138pub struct Crid {
139    /// CRID type (TS 102 323 Table 117: 0x01 = item of content, 0x02 = series, 0x03 = recommendation).
140    pub crid_type: CridType,
141    /// The CRID locator string.
142    pub crid: String,
143}
144
145/// An item (description-value pair) from an extended event descriptor fragment.
146#[non_exhaustive]
147#[derive(Debug, Clone, PartialEq, Eq)]
148#[cfg_attr(feature = "serde", derive(serde::Serialize))]
149pub struct ExtendedItem {
150    /// Item description.
151    pub description: String,
152    /// Item value.
153    pub item: String,
154}
155
156/// A content genre nibble triplet from a
157/// [`ContentDescriptor`](crate::descriptors::content::ContentDescriptor).
158#[non_exhaustive]
159#[derive(Debug, Clone, PartialEq, Eq)]
160#[cfg_attr(feature = "serde", derive(serde::Serialize))]
161pub struct ContentNibble {
162    /// Content nibble level 1 (category).
163    pub level_1: u8,
164    /// Content nibble level 2 (sub-category).
165    pub level_2: u8,
166    /// User-defined byte.
167    pub user: u8,
168}
169
170impl ContentNibble {
171    /// Level-1 broad category per EN 300 468 Table 29.
172    #[must_use]
173    pub fn genre(&self) -> crate::descriptors::content::ContentGenre {
174        crate::descriptors::content::ContentGenre::from_nibble_1(self.level_1)
175    }
176
177    /// Most specific genre name per EN 300 468 Table 29.
178    #[must_use]
179    pub fn genre_name(&self) -> &'static str {
180        crate::descriptors::content::content_genre_name(self.level_1, self.level_2)
181    }
182}
183
184/// A decoded view of an EPG event.
185///
186/// Extracted from [`crate::collect::CompleteEitEvent`] with commonly needed
187/// descriptor fields pre-decoded and extended text concatenated per
188/// EN 300 468 §6.2.15.
189///
190/// # Limitations
191///
192/// Only the first descriptor of each kind (short_event, content,
193/// parental_rating, content_identifier) is decoded per event; EIT events
194/// may carry multiple language variants and only the first is taken.
195#[non_exhaustive]
196#[derive(Debug, Clone, PartialEq, Eq)]
197#[cfg_attr(feature = "serde", derive(serde::Serialize))]
198pub struct EpgEvent {
199    /// 16-bit event_id.
200    pub event_id: u16,
201    /// Decoded start time (MJD + BCD UTC), if valid.
202    pub start_time: Option<chrono::DateTime<chrono::Utc>>,
203    /// Decoded BCD duration, if valid.
204    pub duration: Option<core::time::Duration>,
205    /// 3-bit running status (EN 300 468 Table 6).
206    pub running_status: RunningStatus,
207    /// free_CA_mode.
208    pub free_ca_mode: bool,
209    /// Decoded short event name (from
210    /// [`ShortEventDescriptor`](crate::descriptors::short_event::ShortEventDescriptor)),
211    /// if present and decodeable.
212    pub event_name: Option<String>,
213    /// Decoded short event text, if present and decodeable.
214    pub event_text: Option<String>,
215    /// Concatenated extended event text from all
216    /// [`ExtendedEventDescriptor`](crate::descriptors::extended_event::ExtendedEventDescriptor)
217    /// fragments, per EN 300 468 §6.2.15. Fragments are sorted by
218    /// `descriptor_number` and concatenated directly (no separator).
219    pub extended_text: Option<String>,
220    /// Accumulated extended event items (description, value) from all
221    /// [`ExtendedEventDescriptor`](crate::descriptors::extended_event::ExtendedEventDescriptor)
222    /// fragments, sorted by `descriptor_number`.
223    #[cfg_attr(feature = "serde", serde(default))]
224    pub extended_items: Vec<ExtendedItem>,
225    /// Content genre entries from
226    /// [`ContentDescriptor`](crate::descriptors::content::ContentDescriptor).
227    #[cfg_attr(feature = "serde", serde(default))]
228    pub content_nibbles: Vec<ContentNibble>,
229    /// Parental rating entries from
230    /// [`ParentalRatingDescriptor`](crate::descriptors::parental_rating::ParentalRatingDescriptor).
231    #[cfg_attr(feature = "serde", serde(default))]
232    pub ratings: Vec<Rating>,
233    /// CRID entries from
234    /// [`ContentIdentifierDescriptor`](crate::descriptors::content_identifier::ContentIdentifierDescriptor).
235    #[cfg_attr(feature = "serde", serde(default))]
236    pub crids: Vec<Crid>,
237}
238
239/// Serialisable service data exposed by [`EpgStore`] serde export.
240#[derive(Debug, Clone)]
241#[cfg_attr(feature = "serde", derive(serde::Serialize))]
242struct ServiceData {
243    #[cfg_attr(feature = "serde", serde(skip_serializing_if = "Option::is_none"))]
244    service_name: Option<String>,
245    events: Vec<EpgEvent>,
246}
247
248/// Default cap on distinct [`ServiceKey`] entries (services with cached EPG data).
249///
250/// 1024 services is generous — a single DVB transponder typically carries 20–50
251/// services — while bounding a hostile stream that rotates `service_id` /
252/// `transport_stream_id` / `original_network_id` to force unbounded map growth.
253pub const DEFAULT_MAX_SERVICES: usize = 1024;
254
255/// Default cap on events stored per service.
256///
257/// 8192 events (~28 days of 5-minute-granularity schedule entries) is far
258/// above real 7-day EPG depth while bounding per-service event accumulation
259/// from a hostile stream that rotates `event_id` without re-versioning.
260pub const DEFAULT_MAX_EVENTS_PER_SERVICE: usize = 8192;
261
262/// A store for EIT data, providing high-level access to program events.
263///
264/// Wraps a [`crate::collect::EitCollector`] and maintains a deduplicated,
265/// event list per service. Optionally accepts SDT data to attach service names.
266///
267/// Serde export serializes the cache as a map of `ServiceKey` → service data.
268///
269/// # Memory bounds
270///
271/// The store is bounded by two configurable caps (see [module docs](self) for
272/// rationale and default values). Use [`with_max_services`](Self::with_max_services)
273/// and [`with_max_events_per_service`](Self::with_max_events_per_service) to
274/// tune them.
275///
276/// # Limitations
277///
278/// Events are deduplicated by `event_id` and stored within the configured
279/// per-service cap. Events removed from a re-versioned schedule, or events
280/// already in the past, remain in the store until evicted by
281/// [`retain_services()`](Self::retain_services) or [`clear()`](Self::clear).
282///
283/// # Example
284///
285/// ```no_run
286/// use dvb_si::epg::{EpgStore, ServiceKey};
287/// use chrono::Utc;
288///
289/// let mut store = EpgStore::new();
290/// // store.feed(&eit_section_bytes).unwrap();
291///
292/// let key = ServiceKey {
293///     original_network_id: 1,
294///     transport_stream_id: 1,
295///     service_id: 100,
296/// };
297///
298/// let (now_evt, _next) = store.now_and_next(key, Utc::now());
299/// if let Some(e) = now_evt {
300///     println!("Now playing: {:?}", e.event_name);
301/// }
302/// ```
303#[derive(Debug)]
304pub struct EpgStore {
305    collector: crate::collect::EitCollector,
306    cache: BTreeMap<ServiceKey, ServiceEpg>,
307    max_services: usize,
308    max_events_per_service: usize,
309}
310
311impl Default for EpgStore {
312    fn default() -> Self {
313        Self {
314            collector: crate::collect::EitCollector::default(),
315            cache: BTreeMap::new(),
316            max_services: DEFAULT_MAX_SERVICES,
317            max_events_per_service: DEFAULT_MAX_EVENTS_PER_SERVICE,
318        }
319    }
320}
321
322#[derive(Debug, Default)]
323struct ServiceEpg {
324    service_name: Option<String>,
325    /// Deduplicated by event_id. Latest version wins (later inserts overwrite).
326    events: BTreeMap<u16, EpgEvent>,
327}
328
329impl EpgStore {
330    /// Create a new, empty EPG store with default caps
331    /// ([`DEFAULT_MAX_SERVICES`], [`DEFAULT_MAX_EVENTS_PER_SERVICE`]).
332    #[must_use]
333    pub fn new() -> Self {
334        Self::default()
335    }
336
337    /// Replace the service-count cap (default [`DEFAULT_MAX_SERVICES`]).
338    /// When the cap is reached, events for new services are skipped until
339    /// [`retain_services`](Self::retain_services) or [`clear`](Self::clear)
340    /// frees capacity.
341    #[must_use]
342    pub fn with_max_services(mut self, max_services: usize) -> Self {
343        self.max_services = max_services;
344        self
345    }
346
347    /// Replace the per-service event cap (default
348    /// [`DEFAULT_MAX_EVENTS_PER_SERVICE`]). When a service reaches its cap, new
349    /// events (by `event_id`) are skipped; existing event_ids are still updated
350    /// on version churn.
351    #[must_use]
352    pub fn with_max_events_per_service(mut self, max_events_per_service: usize) -> Self {
353        self.max_events_per_service = max_events_per_service;
354        self
355    }
356
357    /// Replace the underlying collector's logical-key cap (default
358    /// [`crate::collect::DEFAULT_MAX_LOGICAL_KEYS`]). See
359    /// [`crate::collect::EitCollector::with_max_logical_keys`].
360    #[must_use]
361    pub fn with_collector_max_logical_keys(mut self, max_logical_keys: usize) -> Self {
362        self.collector = self.collector.with_max_logical_keys(max_logical_keys);
363        self
364    }
365
366    /// Feed one EIT section into the store.
367    ///
368    /// If a table becomes complete, its events are merged into the cache
369    /// (deduplicated by `event_id`, later insertions overwrite).
370    ///
371    /// # Errors
372    ///
373    /// Returns a [`crate::collect::CollectError`] if the section is malformed.
374    pub fn feed(&mut self, bytes: &[u8]) -> CollectResult<()> {
375        self.feed_with_pid(None, bytes)
376    }
377
378    /// Feed one EIT section with PID context into the store.
379    pub fn feed_with_pid(&mut self, pid: Option<u16>, bytes: &[u8]) -> CollectResult<()> {
380        if let Some(completed) = self.collector.push_section_with_pid(pid, bytes)? {
381            let tables = completed.tables()?;
382            for table in &tables {
383                let key = ServiceKey {
384                    original_network_id: table.original_network_id,
385                    transport_stream_id: table.transport_stream_id,
386                    service_id: table.service_id,
387                };
388                if self.cache.len() >= self.max_services && !self.cache.contains_key(&key) {
389                    continue;
390                }
391                let svc = self.cache.entry(key).or_default();
392                for event in &table.events {
393                    if svc.events.len() >= self.max_events_per_service
394                        && !svc.events.contains_key(&event.event_id)
395                    {
396                        continue;
397                    }
398                    svc.events.insert(event.event_id, event_to_epg(event));
399                }
400            }
401        }
402        Ok(())
403    }
404
405    /// Feed completed SDT data to attach service names.
406    ///
407    /// Accepts a parsed [`crate::collect::CompleteSdt`] from a
408    /// [`crate::collect::SectionSetCollector`].
409    pub fn feed_sdt(&mut self, sdt: &crate::collect::CompleteSdt<'_>) {
410        for svc in &sdt.services {
411            let key = ServiceKey {
412                original_network_id: sdt.original_network_id,
413                transport_stream_id: sdt.transport_stream_id,
414                service_id: svc.service_id,
415            };
416            let entry = self.cache.entry(key).or_default();
417            entry.service_name = extract_service_name(svc.descriptors.descriptors());
418        }
419    }
420
421    /// Get the "now" and "next" events for a service.
422    ///
423    /// Searches the event list for the given service and returns the event
424    /// currently on-air ("now") and the next upcoming event ("next") based
425    /// on reference time `at`.
426    ///
427    /// "now" is the event where `at` falls within `[start, start + duration)`.
428    /// "next" is the event with the earliest `start_time` strictly after `at`
429    /// (not just the first such event in arbitrary iteration order).
430    ///
431    /// An event ending exactly at `at` is NOT considered "now" (exclusive end).
432    ///
433    /// Returns `(None, None)` when the service is unknown or no event matches.
434    pub fn now_and_next(
435        &self,
436        key: ServiceKey,
437        at: chrono::DateTime<chrono::Utc>,
438    ) -> (Option<&EpgEvent>, Option<&EpgEvent>) {
439        let Some(svc) = self.cache.get(&key) else {
440            return (None, None);
441        };
442
443        let now = svc.events.values().find(|e| {
444            if let (Some(start), Some(dur)) = (e.start_time, e.duration) {
445                let end = start + dur;
446                return at >= start && at < end;
447            }
448            false
449        });
450
451        let next = svc
452            .events
453            .values()
454            .filter(|e| {
455                if let Some(start) = e.start_time {
456                    start > at
457                } else {
458                    false
459                }
460            })
461            .min_by_key(|e| e.start_time);
462
463        (now, next)
464    }
465
466    /// Query events with start times in the half-open range `[from, to)`.
467    ///
468    /// Returns events sorted by start time (valid times first, then by
469    /// event_id). Events without a decodable start time are excluded.
470    #[must_use]
471    pub fn schedule(
472        &self,
473        key: ServiceKey,
474        from: chrono::DateTime<chrono::Utc>,
475        to: chrono::DateTime<chrono::Utc>,
476    ) -> Option<Vec<&EpgEvent>> {
477        let svc = self.cache.get(&key)?;
478        let mut events: Vec<&EpgEvent> = svc
479            .events
480            .values()
481            .filter(|e| {
482                if let Some(start) = e.start_time {
483                    start >= from && start < to
484                } else {
485                    false
486                }
487            })
488            .collect();
489        events.sort_by(|a, b| cmp_event_by_start(a, b));
490        Some(events)
491    }
492
493    /// Return the service name for a given key, if SDT data was fed.
494    #[must_use]
495    pub fn service_name(&self, key: ServiceKey) -> Option<&str> {
496        self.cache.get(&key).and_then(|s| s.service_name.as_deref())
497    }
498
499    /// Iterate the [`ServiceKey`]s of every service with cached EIT data, so
500    /// callers can walk the whole EPG (e.g. render a grid) without knowing the
501    /// service ids in advance. Order is unspecified.
502    pub fn services(&self) -> impl Iterator<Item = ServiceKey> + '_ {
503        self.cache.keys().copied()
504    }
505
506    /// Return all events for a service, sorted by start time
507    /// (events without a valid start time sort last, then by event_id).
508    #[must_use]
509    pub fn events(&self, key: ServiceKey) -> Option<Vec<&EpgEvent>> {
510        let svc = self.cache.get(&key)?;
511        let mut events: Vec<&EpgEvent> = svc.events.values().collect();
512        events.sort_by(|a, b| cmp_event_by_start(a, b));
513        Some(events)
514    }
515
516    /// Return the number of services with cached EIT data.
517    #[must_use]
518    pub fn service_count(&self) -> usize {
519        self.cache.len()
520    }
521
522    /// Return the total number of events across all services.
523    #[must_use]
524    pub fn event_count(&self) -> usize {
525        self.cache.values().map(|s| s.events.len()).sum()
526    }
527
528    /// Retain only services matching the given predicate.
529    ///
530    /// Both the event cache and the underlying collector partial state
531    /// for rejected keys are removed.
532    pub fn retain_services<F>(&mut self, mut keep: F)
533    where
534        F: FnMut(&ServiceKey) -> bool,
535    {
536        self.cache.retain(|key, _| keep(key));
537        self.collector.retain_logical(|lk| {
538            keep(&ServiceKey {
539                original_network_id: lk.original_network_id,
540                transport_stream_id: lk.transport_stream_id,
541                service_id: lk.service_id,
542            })
543        });
544    }
545
546    /// Clear all cached EIT data and reset the internal collector.
547    pub fn clear(&mut self) {
548        self.collector.clear();
549        self.cache.clear();
550    }
551}
552
553#[cfg(feature = "serde")]
554impl serde::Serialize for EpgStore {
555    fn serialize<S: serde::Serializer>(&self, s: S) -> Result<S::Ok, S::Error> {
556        use serde::ser::SerializeMap;
557        let mut m = s.serialize_map(Some(self.cache.len()))?;
558        for (key, svc) in &self.cache {
559            let data = ServiceData {
560                service_name: svc.service_name.clone(),
561                events: {
562                    let mut evts: Vec<EpgEvent> = svc.events.values().cloned().collect();
563                    evts.sort_by(cmp_event_by_start);
564                    evts
565                },
566            };
567            let key_str = format!(
568                "{}-{}-{}",
569                key.original_network_id, key.transport_stream_id, key.service_id
570            );
571            m.serialize_entry(&key_str, &data)?;
572        }
573        m.end()
574    }
575}
576
577fn cmp_event_by_start(a: &EpgEvent, b: &EpgEvent) -> core::cmp::Ordering {
578    match (a.start_time, b.start_time) {
579        (Some(at), Some(bt)) => at.cmp(&bt).then_with(|| a.event_id.cmp(&b.event_id)),
580        (Some(_), None) => core::cmp::Ordering::Less,
581        (None, Some(_)) => core::cmp::Ordering::Greater,
582        (None, None) => a.event_id.cmp(&b.event_id),
583    }
584}
585
586fn event_to_epg(e: &crate::collect::CompleteEitEvent<'_>) -> EpgEvent {
587    let (event_name, event_text) = extract_short_event(e.descriptors.descriptors());
588    let (extended_text, extended_items) = extract_extended(e.descriptors.descriptors());
589    let content_nibbles = extract_content(e.descriptors.descriptors());
590    let ratings = extract_ratings(e.descriptors.descriptors());
591    let crids = extract_crids(e.descriptors.descriptors());
592
593    EpgEvent {
594        event_id: e.event_id,
595        start_time: e.start_time(),
596        duration: e.duration(),
597        running_status: e.running_status,
598        free_ca_mode: e.free_ca_mode,
599        event_name,
600        event_text,
601        extended_text,
602        extended_items,
603        content_nibbles,
604        ratings,
605        crids,
606    }
607}
608
609fn extract_short_event(
610    descriptors: &[crate::Result<crate::descriptors::AnyDescriptor<'_>>],
611) -> (Option<String>, Option<String>) {
612    for desc in descriptors {
613        if let Ok(crate::descriptors::AnyDescriptor::ShortEvent(se)) = desc {
614            return (
615                Some(se.event_name.decode().into_owned()),
616                Some(se.text.decode().into_owned()),
617            );
618        }
619    }
620    (None, None)
621}
622
623struct ExtendedFragment {
624    descriptor_number: u8,
625    text: String,
626    items: Vec<ExtendedItem>,
627}
628
629fn extract_extended(
630    descriptors: &[crate::Result<crate::descriptors::AnyDescriptor<'_>>],
631) -> (Option<String>, Vec<ExtendedItem>) {
632    use crate::descriptors::AnyDescriptor;
633
634    let mut fragments: Vec<ExtendedFragment> = descriptors
635        .iter()
636        .filter_map(|d| {
637            if let Ok(AnyDescriptor::ExtendedEvent(ee)) = d {
638                let text = ee.text.decode().into_owned();
639                let items: Vec<ExtendedItem> = ee
640                    .items
641                    .iter()
642                    .map(|i| ExtendedItem {
643                        description: i.description.decode().into_owned(),
644                        item: i.value.decode().into_owned(),
645                    })
646                    .collect();
647                if !text.is_empty() || !items.is_empty() {
648                    Some(ExtendedFragment {
649                        descriptor_number: ee.descriptor_number,
650                        text,
651                        items,
652                    })
653                } else {
654                    None
655                }
656            } else {
657                None
658            }
659        })
660        .collect();
661
662    if fragments.is_empty() {
663        return (None, Vec::new());
664    }
665
666    // Sort by descriptor_number per EN 300 468 §6.2.15.
667    fragments.sort_by_key(|f| f.descriptor_number);
668
669    let extended_text: String = fragments.iter().map(|f| f.text.as_str()).collect();
670
671    let extended_items: Vec<ExtendedItem> = fragments.into_iter().flat_map(|f| f.items).collect();
672
673    let text = if extended_text.is_empty() {
674        None
675    } else {
676        Some(extended_text)
677    };
678
679    (text, extended_items)
680}
681
682fn extract_content(
683    descriptors: &[crate::Result<crate::descriptors::AnyDescriptor<'_>>],
684) -> Vec<ContentNibble> {
685    for desc in descriptors {
686        if let Ok(crate::descriptors::AnyDescriptor::Content(ct)) = desc {
687            return ct
688                .entries
689                .iter()
690                .map(|e| ContentNibble {
691                    level_1: e.nibble_1,
692                    level_2: e.nibble_2,
693                    user: e.user_byte,
694                })
695                .collect();
696        }
697    }
698    Vec::new()
699}
700
701fn extract_ratings(
702    descriptors: &[crate::Result<crate::descriptors::AnyDescriptor<'_>>],
703) -> Vec<Rating> {
704    for desc in descriptors {
705        if let Ok(crate::descriptors::AnyDescriptor::ParentalRating(pr)) = desc {
706            return pr
707                .entries
708                .iter()
709                .map(|e| Rating {
710                    country: e.country_code.as_str().into_owned(),
711                    value: e.rating,
712                })
713                .collect();
714        }
715    }
716    Vec::new()
717}
718
719fn extract_crids(
720    descriptors: &[crate::Result<crate::descriptors::AnyDescriptor<'_>>],
721) -> Vec<Crid> {
722    use crate::descriptors::content_identifier::CridLocation;
723    for desc in descriptors {
724        if let Ok(crate::descriptors::AnyDescriptor::ContentIdentifier(ci)) = desc {
725            return ci
726                .entries
727                .iter()
728                .filter_map(|e| match e.location {
729                    CridLocation::Inline(bytes) => {
730                        let s = String::from_utf8_lossy(bytes).into_owned();
731                        Some(Crid {
732                            crid_type: e.crid_type,
733                            crid: s,
734                        })
735                    }
736                    _ => None,
737                })
738                .collect();
739        }
740    }
741    Vec::new()
742}
743
744fn extract_service_name(
745    descriptors: &[crate::Result<crate::descriptors::AnyDescriptor<'_>>],
746) -> Option<String> {
747    for desc in descriptors {
748        if let Ok(crate::descriptors::AnyDescriptor::Service(svc)) = desc {
749            return Some(svc.service_name.decode().into_owned());
750        }
751    }
752    None
753}
754
755#[cfg(test)]
756mod tests {
757    use super::*;
758    use chrono::{TimeZone, Utc};
759
760    // ------------------------------------------------------------------
761    // Helpers
762    // ------------------------------------------------------------------
763
764    /// Build the bytes of a minimal short_event_descriptor.
765    fn short_event_bytes(name: &[u8], text: &[u8]) -> Vec<u8> {
766        let lang = b"eng";
767        let mut v = Vec::new();
768        v.push(0x4Du8); // tag
769        v.push((3 + 1 + name.len() + 1 + text.len()) as u8); // length
770        v.extend_from_slice(lang);
771        v.push(name.len() as u8);
772        v.extend_from_slice(name);
773        v.push(text.len() as u8);
774        v.extend_from_slice(text);
775        v
776    }
777
778    /// Build the bytes of a minimal EIT present/following section
779    /// with one event. Returns bytes formated as a complete TS section
780    /// (including CRC-32).
781    #[allow(clippy::too_many_arguments)]
782    fn eit_pf_section(
783        service_id: u16,
784        ts_id: u16,
785        on_id: u16,
786        event_id: u16,
787        version: u8,
788        start_raw: [u8; 5],
789        dur_raw: [u8; 3],
790        descriptors: &[u8],
791    ) -> Vec<u8> {
792        let table_id = 0x4Eu8;
793
794        // Header: 3 + ext_header(5) + post_ext(6) = 14
795        // Event: 12 + descriptors.len()
796        // CRC: 4
797        let ev_len = 12 + descriptors.len();
798        let section_length = 5 + 6 + ev_len + 4;
799        let total = 3 + section_length;
800
801        let mut buf = vec![0u8; total];
802        buf[0] = table_id;
803        buf[1] = 0xB0 | ((section_length >> 8) as u8 & 0x0F);
804        buf[2] = (section_length & 0xFF) as u8;
805        buf[3..5].copy_from_slice(&service_id.to_be_bytes());
806        // reserved(2)=0b11, version, current_next=1
807        buf[5] = 0xC0 | ((version & 0x1F) << 1) | 0x01;
808        buf[6] = 0; // section_number
809        buf[7] = 0; // last_section_number
810        buf[8..10].copy_from_slice(&ts_id.to_be_bytes());
811        buf[10..12].copy_from_slice(&on_id.to_be_bytes());
812        buf[12] = 0; // segment_last_section_number
813        buf[13] = 0x5F; // last_table_id
814
815        // Event
816        let ev_off = 14;
817        buf[ev_off..ev_off + 2].copy_from_slice(&event_id.to_be_bytes());
818        buf[ev_off + 2..ev_off + 7].copy_from_slice(&start_raw);
819        buf[ev_off + 7..ev_off + 10].copy_from_slice(&dur_raw);
820        let dll = descriptors.len() as u16;
821        buf[ev_off + 10] = ((dll >> 8) as u8) & 0x0F;
822        buf[ev_off + 11] = (dll & 0xFF) as u8;
823        buf[ev_off + 12..ev_off + 12 + descriptors.len()].copy_from_slice(descriptors);
824
825        // CRC-32
826        let crc_pos = total - 4;
827        let crc = dvb_common::crc32_mpeg2::compute(&buf[..crc_pos]);
828        buf[crc_pos..].copy_from_slice(&crc.to_be_bytes());
829        buf
830    }
831
832    /// Build start-time raw bytes (16-bit MJD + 24-bit BCD) for a given
833    /// year/month/day/hour.
834    fn start_raw(year: i32, month: u32, day: u32, hour: u32) -> [u8; 5] {
835        let mjd = mjd_approx(year, month, day);
836        let mjd_bytes = mjd.to_be_bytes();
837        let bcd_hour = ((hour / 10 * 16) + (hour % 10)) as u8;
838        [
839            mjd_bytes[0],
840            mjd_bytes[1],
841            bcd_hour,
842            0, // minute BCD
843            0, // second BCD
844        ]
845    }
846
847    /// Quick MJD approximation for test dates (2026-06-10 = MJD 61785).
848    fn mjd_approx(year: i32, month: u32, day: u32) -> u16 {
849        assert!(
850            (year, month, day) == (2026, 6, 10),
851            "mjd_approx only supports 2026-06-10"
852        );
853        61785
854    }
855
856    /// Build content_descriptor (tag 0x54) wire bytes.
857    fn content_descriptor_bytes(entries: &[(u8, u8, u8)]) -> Vec<u8> {
858        let mut v = vec![0x54u8, (entries.len() * 2) as u8];
859        for &(n1, n2, u) in entries {
860            v.push((n1 << 4) | n2);
861            v.push(u);
862        }
863        v
864    }
865
866    /// Build parental_rating_descriptor (tag 0x55) wire bytes.
867    fn parental_rating_bytes(entries: &[([u8; 3], u8)]) -> Vec<u8> {
868        let mut v = vec![0x55u8, (entries.len() * 4) as u8];
869        for (country, rating) in entries {
870            v.extend_from_slice(country);
871            v.push(*rating);
872        }
873        v
874    }
875
876    /// Build content_identifier_descriptor (tag 0x76) wire bytes with inline
877    /// CRIDs.
878    fn content_identifier_bytes(entries: &[(u8, &[u8])]) -> Vec<u8> {
879        let body_len: usize = entries.iter().map(|(_, data)| 2 + data.len()).sum();
880        let mut v = vec![0x76u8, body_len as u8];
881        for (crid_type, data) in entries {
882            v.push(crid_type << 2); // location=0b00 inline
883            v.push(data.len() as u8);
884            v.extend_from_slice(data);
885        }
886        v
887    }
888
889    // ------------------------------------------------------------------
890    // Basic tests
891    // ------------------------------------------------------------------
892
893    #[test]
894    fn new_store_is_empty() {
895        let store = EpgStore::new();
896        assert_eq!(store.service_count(), 0);
897        assert_eq!(store.event_count(), 0);
898    }
899
900    #[test]
901    fn feed_empty_is_error() {
902        let mut store = EpgStore::new();
903        assert!(store.feed(&[]).is_err());
904    }
905
906    #[test]
907    fn now_and_next_no_data_returns_none() {
908        let store = EpgStore::new();
909        let now = Utc::now();
910        let key = ServiceKey {
911            original_network_id: 1,
912            transport_stream_id: 1,
913            service_id: 100,
914        };
915        assert_eq!(store.now_and_next(key, now), (None, None));
916    }
917
918    #[test]
919    fn service_key_ordering() {
920        let a = ServiceKey {
921            original_network_id: 1,
922            transport_stream_id: 2,
923            service_id: 100,
924        };
925        let b = ServiceKey {
926            original_network_id: 1,
927            transport_stream_id: 2,
928            service_id: 200,
929        };
930        assert!(a < b);
931    }
932
933    fn empty_event(
934        id: u16,
935        start: Option<chrono::DateTime<chrono::Utc>>,
936        dur: Option<core::time::Duration>,
937    ) -> EpgEvent {
938        EpgEvent {
939            event_id: id,
940            start_time: start,
941            duration: dur,
942            running_status: RunningStatus::Undefined,
943            free_ca_mode: false,
944            event_name: None,
945            event_text: None,
946            extended_text: None,
947            extended_items: Vec::new(),
948            content_nibbles: Vec::new(),
949            ratings: Vec::new(),
950            crids: Vec::new(),
951        }
952    }
953
954    #[test]
955    fn events_sorts_valid_before_invalid() {
956        let valid = empty_event(
957            1,
958            Some(Utc::now()),
959            Some(core::time::Duration::from_secs(3600)),
960        );
961        let invalid = empty_event(2, None, None);
962
963        let mut events = [&invalid, &valid];
964        events.sort_by(|a, b| cmp_event_by_start(a, b));
965        assert_eq!(events[0].event_id, 1);
966        assert_eq!(events[1].event_id, 2);
967    }
968
969    // ------------------------------------------------------------------
970    // §6.2.15 extended event text chaining
971    // ------------------------------------------------------------------
972
973    #[test]
974    fn extended_text_chaining_per_spec_6_2_15() {
975        use crate::descriptors::extended_event::ExtendedEventDescriptor;
976        use crate::descriptors::AnyDescriptor;
977        use crate::text::{DvbText, LangCode};
978
979        // Fragment 1: descriptor_number=2, last_descriptor_number=3
980        // "The quick " + item ("Director", "Alice")
981        let frag1 = ExtendedEventDescriptor {
982            descriptor_number: 2,
983            last_descriptor_number: 3,
984            language_code: LangCode(*b"eng"),
985            items: vec![crate::descriptors::extended_event::ExtendedEventItem {
986                description: DvbText::new(b"Director"),
987                value: DvbText::new(b"Alice"),
988            }],
989            text: DvbText::new(b"The quick "),
990        };
991
992        // Fragment 2: descriptor_number=0, last_descriptor_number=3
993        // "brown fox" + item ("Year", "2026")
994        let frag2 = ExtendedEventDescriptor {
995            descriptor_number: 0,
996            last_descriptor_number: 3,
997            language_code: LangCode(*b"eng"),
998            items: vec![crate::descriptors::extended_event::ExtendedEventItem {
999                description: DvbText::new(b"Year"),
1000                value: DvbText::new(b"2026"),
1001            }],
1002            text: DvbText::new(b"brown fox"),
1003        };
1004
1005        // Fragment 3: descriptor_number=3, last_descriptor_number=3
1006        // "jumps." + no items
1007        let frag3 = ExtendedEventDescriptor {
1008            descriptor_number: 3,
1009            last_descriptor_number: 3,
1010            language_code: LangCode(*b"eng"),
1011            items: vec![],
1012            text: DvbText::new(b"jumps."),
1013        };
1014
1015        // Fragment 4: descriptor_number=1, last_descriptor_number=3
1016        // empty text + item ("Genre", "Thriller") — dropped by the chaining
1017        // helper (text is empty but items present → included)
1018        let frag4 = ExtendedEventDescriptor {
1019            descriptor_number: 1,
1020            last_descriptor_number: 3,
1021            language_code: LangCode(*b"eng"),
1022            items: vec![crate::descriptors::extended_event::ExtendedEventItem {
1023                description: DvbText::new(b"Genre"),
1024                value: DvbText::new(b"Thriller"),
1025            }],
1026            text: DvbText::new(b""),
1027        };
1028
1029        // Feed fragments out of order via AnyDescriptor.
1030        let descriptors: Vec<crate::Result<AnyDescriptor<'_>>> = vec![
1031            Ok(AnyDescriptor::ExtendedEvent(frag1)), // dn=2
1032            Ok(AnyDescriptor::ExtendedEvent(frag4)), // dn=1
1033            Ok(AnyDescriptor::ExtendedEvent(frag3)), // dn=3
1034            Ok(AnyDescriptor::ExtendedEvent(frag2)), // dn=0
1035        ];
1036
1037        let (text, items) = extract_extended(&descriptors);
1038
1039        // Text concatenated in descriptor_number order: 0,1,2,3
1040        assert_eq!(text.as_deref(), Some("brown foxThe quick jumps."));
1041
1042        // Items accumulated in descriptor_number order: dn=0 ("Year"/"2026"),
1043        // dn=1 ("Genre"/"Thriller"), dn=2 ("Director"/"Alice"), dn=3 (none)
1044        assert_eq!(items.len(), 3);
1045        assert_eq!(
1046            items[0],
1047            ExtendedItem {
1048                description: "Year".into(),
1049                item: "2026".into()
1050            }
1051        );
1052        assert_eq!(
1053            items[1],
1054            ExtendedItem {
1055                description: "Genre".into(),
1056                item: "Thriller".into()
1057            }
1058        );
1059        assert_eq!(
1060            items[2],
1061            ExtendedItem {
1062                description: "Director".into(),
1063                item: "Alice".into()
1064            }
1065        );
1066    }
1067
1068    // ------------------------------------------------------------------
1069    // now_and_next boundary correctness
1070    // ------------------------------------------------------------------
1071
1072    #[test]
1073    fn now_and_next_event_boundary() {
1074        let t1000 = Utc.with_ymd_and_hms(2026, 6, 10, 10, 0, 0).unwrap();
1075        let t1100 = Utc.with_ymd_and_hms(2026, 6, 10, 11, 0, 0).unwrap();
1076        let t1200 = Utc.with_ymd_and_hms(2026, 6, 10, 12, 0, 0).unwrap();
1077
1078        // Event 1: 10:00-11:00
1079        // Event 2: 12:00-13:00
1080        let sec = core::time::Duration::from_secs(3600);
1081        let ev1 = EpgEvent {
1082            event_id: 1,
1083            start_time: Some(t1000),
1084            duration: Some(sec),
1085            running_status: RunningStatus::Undefined,
1086            free_ca_mode: false,
1087            event_name: Some("Event 1".into()),
1088            event_text: None,
1089            extended_text: None,
1090            extended_items: vec![],
1091            content_nibbles: vec![],
1092            ratings: vec![],
1093            crids: vec![],
1094        };
1095        let ev2 = EpgEvent {
1096            event_id: 2,
1097            start_time: Some(t1200),
1098            duration: Some(sec),
1099            running_status: RunningStatus::Undefined,
1100            free_ca_mode: false,
1101            event_name: Some("Event 2".into()),
1102            event_text: None,
1103            extended_text: None,
1104            extended_items: vec![],
1105            content_nibbles: vec![],
1106            ratings: vec![],
1107            crids: vec![],
1108        };
1109
1110        // Set up store manually (bypass feed).
1111        let mut store = EpgStore::new();
1112        let key = ServiceKey {
1113            original_network_id: 1,
1114            transport_stream_id: 1,
1115            service_id: 100,
1116        };
1117        let svc = store.cache.entry(key).or_default();
1118        svc.events.insert(1, ev1);
1119        svc.events.insert(2, ev2);
1120
1121        // At 10:30 — now=Event 1, next=Event 2
1122        let at = Utc.with_ymd_and_hms(2026, 6, 10, 10, 30, 0).unwrap();
1123        let (now, next) = store.now_and_next(key, at);
1124        assert_eq!(now.unwrap().event_id, 1);
1125        assert_eq!(next.unwrap().event_id, 2);
1126
1127        // At 11:00 exactly — event 1 just ended (exclusive end),
1128        // now=None, next=Event 2
1129        let (now, next) = store.now_and_next(key, t1100);
1130        assert!(now.is_none(), "event ending at query time must NOT be now");
1131        assert_eq!(next.unwrap().event_id, 2);
1132
1133        // At 12:00 exactly — now=Event 2 (start == at, inclusive start),
1134        // next=None
1135        let (now, next) = store.now_and_next(key, t1200);
1136        assert_eq!(now.unwrap().event_id, 2);
1137        assert!(next.is_none());
1138    }
1139
1140    // ------------------------------------------------------------------
1141    // now_and_next: earliest-future-event selection (not arbitrary order)
1142    // ------------------------------------------------------------------
1143
1144    #[test]
1145    fn now_and_next_returns_earliest_future_event() {
1146        // Build a service with events out of insertion order:
1147        // Event 14:00 inserted first, Event 12:00 second, Event 16:00 third.
1148        // Query at 10:00 — "next" must be Event 12:00 (earliest future),
1149        // not Event 14:00 (which would win if the code used arbitrary
1150        // HashMap iteration order).
1151        let t1200 = Utc.with_ymd_and_hms(2026, 6, 10, 12, 0, 0).unwrap();
1152        let t1400 = Utc.with_ymd_and_hms(2026, 6, 10, 14, 0, 0).unwrap();
1153        let t1600 = Utc.with_ymd_and_hms(2026, 6, 10, 16, 0, 0).unwrap();
1154        let t1000 = Utc.with_ymd_and_hms(2026, 6, 10, 10, 0, 0).unwrap();
1155
1156        let sec = core::time::Duration::from_secs(3600);
1157
1158        fn named_event(
1159            id: u16,
1160            start: chrono::DateTime<chrono::Utc>,
1161            dur: core::time::Duration,
1162            name: &str,
1163        ) -> EpgEvent {
1164            EpgEvent {
1165                event_id: id,
1166                start_time: Some(start),
1167                duration: Some(dur),
1168                running_status: RunningStatus::Undefined,
1169                free_ca_mode: false,
1170                event_name: Some(name.into()),
1171                event_text: None,
1172                extended_text: None,
1173                extended_items: vec![],
1174                content_nibbles: vec![],
1175                ratings: vec![],
1176                crids: vec![],
1177            }
1178        }
1179
1180        let mut store = EpgStore::new();
1181        let key = ServiceKey {
1182            original_network_id: 1,
1183            transport_stream_id: 1,
1184            service_id: 100,
1185        };
1186        let svc = store.cache.entry(key).or_default();
1187        // Insert out of order — 14:00 first, 12:00 second, 16:00 third
1188        svc.events.insert(3, named_event(3, t1400, sec, "Event 14"));
1189        svc.events.insert(1, named_event(1, t1200, sec, "Event 12"));
1190        svc.events.insert(2, named_event(2, t1600, sec, "Event 16"));
1191
1192        // "next" at 10:00 must be the earliest future — event 1 at 12:00
1193        let (_now, next) = store.now_and_next(key, t1000);
1194        let next = next.expect("next event must exist");
1195        assert_eq!(
1196            next.event_id, 1,
1197            "next must be earliest future event (12:00), not first in iteration order"
1198        );
1199    }
1200
1201    // ------------------------------------------------------------------
1202    // extract_content / extract_ratings / extract_crids through feed
1203    // ------------------------------------------------------------------
1204
1205    #[test]
1206    fn extract_content_ratings_crids_through_feed() {
1207        let content = content_descriptor_bytes(&[(3, 1, 0xAA), (4, 2, 0xBB)]);
1208        let ratings = parental_rating_bytes(&[(*b"FRA", 0x05), (*b"GBR", 0x01)]);
1209        let crids = content_identifier_bytes(&[
1210            (0x01, b"crid://bbc.co.uk/prog123"),
1211            (0x03, b"crid://bbc.co.uk/rec456"),
1212        ]);
1213
1214        let mut descriptors = Vec::new();
1215        descriptors.extend_from_slice(&content);
1216        descriptors.extend_from_slice(&ratings);
1217        descriptors.extend_from_slice(&crids);
1218
1219        let sr = start_raw(2026, 6, 10, 10);
1220        let eit = eit_pf_section(100, 1, 1, 1, 0, sr, [1, 0, 0], &descriptors);
1221
1222        let mut store = EpgStore::new();
1223        store.feed(&eit).unwrap();
1224
1225        let key = ServiceKey {
1226            original_network_id: 1,
1227            transport_stream_id: 1,
1228            service_id: 100,
1229        };
1230        let events = store.events(key).unwrap();
1231        assert_eq!(events.len(), 1);
1232        let ev = &events[0];
1233
1234        assert_eq!(ev.content_nibbles.len(), 2);
1235        assert_eq!(
1236            ev.content_nibbles[0],
1237            ContentNibble {
1238                level_1: 3,
1239                level_2: 1,
1240                user: 0xAA
1241            }
1242        );
1243        assert_eq!(
1244            ev.content_nibbles[1],
1245            ContentNibble {
1246                level_1: 4,
1247                level_2: 2,
1248                user: 0xBB
1249            }
1250        );
1251
1252        assert_eq!(ev.ratings.len(), 2);
1253        assert_eq!(ev.ratings[0].country, "FRA");
1254        assert_eq!(ev.ratings[0].value, 0x05);
1255        assert_eq!(ev.ratings[1].country, "GBR");
1256        assert_eq!(ev.ratings[1].value, 0x01);
1257
1258        assert_eq!(ev.crids.len(), 1 + 1); // one item of content + one recommendation
1259        assert_eq!(ev.crids[0].crid_type, CridType::ItemOfContent);
1260        assert_eq!(ev.crids[0].crid, "crid://bbc.co.uk/prog123");
1261        assert_eq!(ev.crids[1].crid_type, CridType::Recommendation);
1262        assert_eq!(ev.crids[1].crid, "crid://bbc.co.uk/rec456");
1263    }
1264
1265    #[test]
1266    fn extract_service_name_through_feed_sdt() {
1267        use crate::collect::SectionSetCollector;
1268
1269        // Build a service_descriptor (tag 0x48) with provider="BBC", service_name="BBC ONE HD"
1270        let svc_desc = {
1271            let provider = b"BBC";
1272            let name = b"BBC ONE HD";
1273            let mut v = vec![0x48u8, (1 + 1 + provider.len() + 1 + name.len()) as u8];
1274            v.push(0x01); // service_type = TV SD
1275            v.push(provider.len() as u8);
1276            v.extend_from_slice(provider);
1277            v.push(name.len() as u8);
1278            v.extend_from_slice(name);
1279            v
1280        };
1281
1282        // Build an SDT section (table_id 0x42) with one service.
1283        let sdt_bytes = {
1284            let dll = svc_desc.len() as u16;
1285            // Service entry: 5 bytes header + descriptors
1286            let svc_entry_len = 5 + dll as usize;
1287            // Section: 3 (header) + 5 (ext) + 3 (post_ext) = 11 + svc + 4 (crc)
1288            let section_length: u16 = 5 + 3 + svc_entry_len as u16 + 4;
1289            let mut buf = vec![0u8; 3 + section_length as usize];
1290            buf[0] = 0x42; // SDT actual
1291            buf[1] = 0xB0 | ((section_length >> 8) as u8 & 0x0F);
1292            buf[2] = (section_length & 0xFF) as u8;
1293            buf[3..5].copy_from_slice(&1u16.to_be_bytes()); // ts_id
1294            buf[5] = 0xC1; // version=0, cni=1
1295            buf[6] = 0; // section_number
1296            buf[7] = 0; // last_section_number
1297            buf[8..10].copy_from_slice(&1u16.to_be_bytes()); // original_network_id
1298            buf[10] = 0xFF; // reserved
1299
1300            // Service entry
1301            let off = 11;
1302            buf[off..off + 2].copy_from_slice(&100u16.to_be_bytes()); // service_id=100
1303            buf[off + 2] = 0xFC; // flags
1304            buf[off + 3] = ((dll >> 8) as u8) & 0x0F;
1305            buf[off + 4] = (dll & 0xFF) as u8;
1306            buf[off + 5..off + 5 + svc_desc.len()].copy_from_slice(&svc_desc);
1307
1308            // CRC
1309            let crc_off = buf.len() - 4;
1310            let crc = dvb_common::crc32_mpeg2::compute(&buf[..crc_off]);
1311            buf[crc_off..].copy_from_slice(&crc.to_be_bytes());
1312            buf
1313        };
1314
1315        let mut collector = SectionSetCollector::new();
1316        let complete = collector.push_section(&sdt_bytes).unwrap().unwrap();
1317        let sdt = complete.sdt().unwrap();
1318
1319        let mut store = EpgStore::new();
1320        store.feed_sdt(&sdt);
1321
1322        let key = ServiceKey {
1323            original_network_id: 1,
1324            transport_stream_id: 1,
1325            service_id: 100,
1326        };
1327        assert_eq!(store.service_name(key), Some("BBC ONE HD"));
1328        assert_eq!(store.service_count(), 1);
1329    }
1330
1331    // ------------------------------------------------------------------
1332    // Version churn: bounded growth
1333    // ------------------------------------------------------------------
1334
1335    #[test]
1336    fn version_churn_bounded_growth() {
1337        // Feed an event, then feed the same event_id with updated data.
1338        // Store size must stay at 1 event.
1339        let s = |hh: u32| {
1340            let t = Utc.with_ymd_and_hms(2026, 6, 10, hh, 0, 0).unwrap();
1341            let days = 61785u16; // MJD for 2026-06-10
1342            let mjd_bytes = days.to_be_bytes();
1343            let bcd_time = [(hh / 10 * 16 + hh % 10) as u8, 0, 0];
1344            (
1345                [
1346                    mjd_bytes[0],
1347                    mjd_bytes[1],
1348                    bcd_time[0],
1349                    bcd_time[1],
1350                    bcd_time[2],
1351                ],
1352                t,
1353            )
1354        };
1355
1356        let (start1, _) = s(10);
1357        let (start2, _) = s(14);
1358
1359        let desc1 = short_event_bytes(b"News at 10", b"");
1360        let desc2 = short_event_bytes(b"News at 14", b"");
1361
1362        let eit1 = eit_pf_section(100, 1, 1, 1, 0, start1, [1, 0, 0], &desc1);
1363        let eit2 = eit_pf_section(100, 1, 1, 1, 1, start2, [1, 0, 0], &desc2);
1364
1365        let mut store = EpgStore::new();
1366        store.feed(&eit1).unwrap();
1367        assert_eq!(store.event_count(), 1);
1368        store.feed(&eit2).unwrap();
1369        // Same event_id should overwrite, not duplicate
1370        assert_eq!(store.event_count(), 1);
1371
1372        let key = ServiceKey {
1373            original_network_id: 1,
1374            transport_stream_id: 1,
1375            service_id: 100,
1376        };
1377        let evts = store.events(key).unwrap();
1378        assert_eq!(evts.len(), 1);
1379        assert_eq!(evts[0].event_name.as_deref(), Some("News at 14"));
1380    }
1381
1382    // ------------------------------------------------------------------
1383    // schedule range query
1384    // ------------------------------------------------------------------
1385
1386    #[test]
1387    fn schedule_range_query() {
1388        let t0900 = Utc.with_ymd_and_hms(2026, 6, 10, 9, 0, 0).unwrap();
1389        let t1000 = Utc.with_ymd_and_hms(2026, 6, 10, 10, 0, 0).unwrap();
1390        let t1100 = Utc.with_ymd_and_hms(2026, 6, 10, 11, 0, 0).unwrap();
1391        let t1200 = Utc.with_ymd_and_hms(2026, 6, 10, 12, 0, 0).unwrap();
1392
1393        let sec = core::time::Duration::from_secs(1800);
1394        let mut store = EpgStore::new();
1395        let key = ServiceKey {
1396            original_network_id: 1,
1397            transport_stream_id: 1,
1398            service_id: 100,
1399        };
1400        let svc = store.cache.entry(key).or_default();
1401        for (id, t) in [(1, t0900), (2, t1000), (3, t1100)] {
1402            svc.events.insert(
1403                id,
1404                EpgEvent {
1405                    event_id: id,
1406                    start_time: Some(t),
1407                    duration: Some(sec),
1408                    running_status: RunningStatus::Undefined,
1409                    free_ca_mode: false,
1410                    event_name: Some(format!("Event {id}")),
1411                    event_text: None,
1412                    extended_text: None,
1413                    extended_items: vec![],
1414                    content_nibbles: vec![],
1415                    ratings: vec![],
1416                    crids: vec![],
1417                },
1418            );
1419        }
1420
1421        // [10:00, 12:00) → events 2 and 3
1422        let events = store.schedule(key, t1000, t1200).unwrap();
1423        assert_eq!(events.len(), 2);
1424        assert_eq!(events[0].event_id, 2);
1425        assert_eq!(events[1].event_id, 3);
1426
1427        // [12:00, 13:00) → empty
1428        let events = store.schedule(key, t1200, t1100).unwrap();
1429        assert!(events.is_empty());
1430    }
1431
1432    // ------------------------------------------------------------------
1433    // Cap enforcement: max_services bounds service-count growth
1434    // ------------------------------------------------------------------
1435
1436    #[test]
1437    fn max_services_capped() {
1438        // Feed 3 distinct services with a cap of 2 — only the first 2 should
1439        // be retained; the third is skipped until space frees.
1440        let mut store = EpgStore::new().with_max_services(2);
1441
1442        let desc = short_event_bytes(b"Test", b"");
1443
1444        // Service 100
1445        let sr1 = start_raw(2026, 6, 10, 10);
1446        let eit1 = eit_pf_section(100, 1, 1, 1, 0, sr1, [1, 0, 0], &desc);
1447        store.feed(&eit1).unwrap();
1448        assert_eq!(store.service_count(), 1);
1449
1450        // Service 200
1451        let sr2 = start_raw(2026, 6, 10, 11);
1452        let eit2 = eit_pf_section(200, 1, 1, 3, 0, sr2, [1, 0, 0], &desc);
1453        store.feed(&eit2).unwrap();
1454        assert_eq!(store.service_count(), 2);
1455
1456        // Service 300 — should be skipped (cap 2 already hit, new key)
1457        let sr3 = start_raw(2026, 6, 10, 12);
1458        let eit3 = eit_pf_section(300, 1, 1, 5, 0, sr3, [1, 0, 0], &desc);
1459        store.feed(&eit3).unwrap();
1460        assert_eq!(
1461            store.service_count(),
1462            2,
1463            "third service must be rejected when cap is full"
1464        );
1465
1466        // Verify service 300 has no entry
1467        let key300 = ServiceKey {
1468            original_network_id: 1,
1469            transport_stream_id: 1,
1470            service_id: 300,
1471        };
1472        assert!(
1473            store.events(key300).is_none(),
1474            "rejected service must not appear"
1475        );
1476
1477        // Clearing frees space — service 300 can now be stored
1478        store.clear();
1479        store.feed(&eit3).unwrap();
1480        assert_eq!(store.service_count(), 1);
1481        assert!(store.events(key300).is_some());
1482    }
1483
1484    // ------------------------------------------------------------------
1485    // Cap enforcement: max_events_per_service bounds per-service events
1486    // ------------------------------------------------------------------
1487
1488    #[test]
1489    fn max_events_per_service_capped() {
1490        // Feed 4 distinct event_ids into one service with a cap of 3.
1491        // The 4th event must be skipped.
1492        let mut store = EpgStore::new().with_max_events_per_service(3);
1493
1494        let desc = short_event_bytes(b"Test", b"");
1495        let key = ServiceKey {
1496            original_network_id: 1,
1497            transport_stream_id: 1,
1498            service_id: 100,
1499        };
1500
1501        for (version, (event_id, hour)) in [(10, 10u32), (20, 11), (30, 12), (40, 13)]
1502            .iter()
1503            .enumerate()
1504        {
1505            let sr = start_raw(2026, 6, 10, *hour);
1506            let eit = eit_pf_section(100, 1, 1, *event_id, version as u8, sr, [1, 0, 0], &desc);
1507            store.feed(&eit).unwrap();
1508        }
1509
1510        assert_eq!(store.event_count(), 3, "4th event must be skipped at cap 3");
1511
1512        // Version churn on existing event_id still works:
1513        let sr_v2 = start_raw(2026, 6, 10, 15);
1514        let eit_v2 = eit_pf_section(100, 1, 1, 10, 1, sr_v2, [1, 0, 0], &desc);
1515        store.feed(&eit_v2).unwrap();
1516        assert_eq!(
1517            store.event_count(),
1518            3,
1519            "version churn on existing event_id must not increase count"
1520        );
1521
1522        let evts = store.events(key).unwrap();
1523        let ev10 = evts.iter().find(|e| e.event_id == 10).unwrap();
1524        assert_eq!(
1525            ev10.event_name.as_deref(),
1526            Some("Test"),
1527            "existing event updated"
1528        );
1529    }
1530
1531    // ------------------------------------------------------------------
1532    // serde round-trip
1533    // ------------------------------------------------------------------
1534
1535    #[cfg(feature = "serde")]
1536    #[test]
1537    fn serde_serializes_store_as_json() {
1538        let t = Utc.with_ymd_and_hms(2026, 6, 10, 20, 0, 0).unwrap();
1539        let mut store = EpgStore::new();
1540        let key = ServiceKey {
1541            original_network_id: 1,
1542            transport_stream_id: 1,
1543            service_id: 100,
1544        };
1545        let svc = store.cache.entry(key).or_default();
1546        svc.service_name = Some("BBC One".into());
1547        svc.events.insert(
1548            1,
1549            EpgEvent {
1550                event_id: 1,
1551                start_time: Some(t),
1552                duration: Some(core::time::Duration::from_secs(3600)),
1553                running_status: RunningStatus::Running,
1554                free_ca_mode: false,
1555                event_name: Some("The News".into()),
1556                event_text: Some("Today's headlines".into()),
1557                extended_text: None,
1558                extended_items: vec![],
1559                content_nibbles: vec![ContentNibble {
1560                    level_1: 1,
1561                    level_2: 1,
1562                    user: 0,
1563                }],
1564                ratings: vec![],
1565                crids: vec![],
1566            },
1567        );
1568
1569        let json = serde_json::to_string(&store).unwrap();
1570        let v: serde_json::Value = serde_json::from_str(&json).unwrap();
1571        let svc_data = &v["1-1-100"];
1572        assert_eq!(svc_data["service_name"], "BBC One");
1573        assert_eq!(svc_data["events"][0]["event_name"], "The News");
1574        assert_eq!(
1575            svc_data["events"][0]["content_nibbles"][0],
1576            serde_json::json!({"level_1": 1, "level_2": 1, "user": 0})
1577        );
1578    }
1579}