Skip to main content

dvb_si/
epg.rs

1//! EPG convenience layer.
2//!
3//! This module provides a high-level store for EIT (Event Information Table) data,
4//! making it easy to query the "now and next" events for a service and export
5//! a full EPG schedule.
6//!
7//! It wraps [`crate::collect::EitCollector`] to handle the multi-section
8//! reassembly of EIT tables and maintains a deduplicated, time-ordered event list
9//! for each service keyed by `(original_network_id, transport_stream_id, service_id)`.
10//!
11//! # Memory bounds
12//!
13//! The store is bounded by default to prevent memory-exhaustion from hostile
14//! or pathological EIT input — an attacker who controls
15//! `original_network_id`/`transport_stream_id`/`service_id`/`event_id` could
16//! otherwise grow the cache without bound. Two caps apply:
17//!
18//! * **`max_services`** (default [`DEFAULT_MAX_SERVICES`]) — caps the number of
19//!   distinct [`ServiceKey`] entries. When the cap is reached, incoming events
20//!   for new services are skipped until a service is removed via
21//!   [`EpgStore::retain_services`] or [`EpgStore::clear`].
22//! * **`max_events_per_service`** (default [`DEFAULT_MAX_EVENTS_PER_SERVICE`]) —
23//!   caps the number of events stored per service. When a service's cap is
24//!   reached, new events (by `event_id`) are skipped; existing event_ids are
25//!   still updated on version churn.
26//!
27//! The policy is *skip-until-space* — the same as
28//! [`crate::carousel::ModuleReassembler`] — so long-running consumers should
29//! call `retain_services` or `clear` periodically to free capacity.
30//!
31//! # Quickstart
32//!
33//! ```rust
34//! use dvb_si::epg::{EpgStore, ServiceKey};
35//! use dvb_si::collect::SectionSetCollector;
36//! use chrono::{TimeZone, Utc};
37//!
38//! let mut store = EpgStore::new();
39//!
40//! // Feed EIT sections (from a TS demux, file, etc.)
41//! // store.feed(&eit_section_bytes)?;
42//!
43//! let key = ServiceKey {
44//!     original_network_id: 1,
45//!     transport_stream_id: 1,
46//!     service_id: 100,
47//! };
48//!
49//! // Query now/next (requires EIT present/following data to be fed first)
50//! let now = Utc.with_ymd_and_hms(2026, 6, 10, 20, 0, 0).unwrap();
51//! let (now_evt, next_evt) = store.now_and_next(key, now);
52//! if let Some(evt) = now_evt {
53//!     println!("Now:  {} (until {})",
54//!         evt.event_name.as_deref().unwrap_or("?"),
55//!         evt.start_time.map(|t| t + evt.duration.unwrap_or_default())
56//!             .map(|e| e.to_string()).unwrap_or_default());
57//! }
58//! if let Some(evt) = next_evt {
59//!     println!("Next: {} at {}",
60//!         evt.event_name.as_deref().unwrap_or("?"),
61//!         evt.start_time.map(|t| t.to_string()).unwrap_or_default());
62//! }
63//! // Print tonight's schedule (events from 20:00 to midnight)
64//! let tonight = Utc.with_ymd_and_hms(2026, 6, 10, 20, 0, 0).unwrap();
65//! let midnight = Utc.with_ymd_and_hms(2026, 6, 11, 0, 0, 0).unwrap();
66//! if let Some(events) = store.schedule(key, tonight, midnight) {
67//!     for evt in &events {
68//!         println!("{:>5}  {}",
69//!             evt.start_time.map(|t| t.format("%H:%M").to_string()).unwrap_or_default(),
70//!             evt.event_name.as_deref().unwrap_or("?"));
71//!     }
72//! }
73//! ```
74//!
75//! # Pruning policy
76//!
77//! The store accumulates events within its configured caps. To bound growth
78//! under schedule churn, use [`EpgStore::retain_services`] to remove services
79//! that are no longer of interest and [`EpgStore::clear`] to reset all state at
80//! a carousel boundary.  The underlying [`crate::collect::EitCollector`] handles
81//! version-driven section-set replacement automatically — when the
82//! `version_number` on a sub-table changes, the old partial set is discarded and
83//! a new one begins.  Callers scanning a full carousel cycle can `clear()` and
84//! start fresh, or `retain_services` to keep only the active service list.
85
86use crate::collect::CollectResult;
87use crate::tables::RunningStatus;
88use std::collections::HashMap;
89
90/// Logical key identifying a service across the DVB network.
91#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)]
92#[cfg_attr(feature = "serde", derive(serde::Serialize))]
93pub struct ServiceKey {
94    /// original_network_id from the EIT/SDT.
95    pub original_network_id: u16,
96    /// transport_stream_id from the EIT/SDT.
97    pub transport_stream_id: u16,
98    /// service_id.
99    pub service_id: u16,
100}
101
102/// A parental rating entry from a
103/// [`ParentalRatingDescriptor`](crate::descriptors::parental_rating::ParentalRatingDescriptor).
104#[non_exhaustive]
105#[derive(Debug, Clone, PartialEq, Eq)]
106#[cfg_attr(feature = "serde", derive(serde::Serialize))]
107pub struct Rating {
108    /// Three-character ISO 3166 country code.
109    pub country: String,
110    /// Rating code per EN 300 468 §6.2.13 (0x01..=0x0F => minimum age = value+3).
111    pub value: u8,
112}
113
114impl Rating {
115    /// Minimum age if `value` falls in the range 0x01..=0x0F, else `None`.
116    #[must_use]
117    pub fn minimum_age(&self) -> Option<u8> {
118        match self.value {
119            0x01..=0x0F => Some(self.value + 3),
120            _ => None,
121        }
122    }
123}
124
125/// CRID type per TS 102 323 Table 117.
126///
127/// Re-exported from [`crate::descriptors::content_identifier::CridType`].
128pub use crate::descriptors::content_identifier::CridType;
129
130/// A content reference identifier entry from a
131/// [`ContentIdentifierDescriptor`](crate::descriptors::content_identifier::ContentIdentifierDescriptor).
132#[non_exhaustive]
133#[derive(Debug, Clone, PartialEq, Eq)]
134#[cfg_attr(feature = "serde", derive(serde::Serialize))]
135pub struct Crid {
136    /// CRID type (TS 102 323 Table 117: 0x01 = item of content, 0x02 = series, 0x03 = recommendation).
137    pub crid_type: CridType,
138    /// The CRID locator string.
139    pub crid: String,
140}
141
142/// An item (description-value pair) from an extended event descriptor fragment.
143#[non_exhaustive]
144#[derive(Debug, Clone, PartialEq, Eq)]
145#[cfg_attr(feature = "serde", derive(serde::Serialize))]
146pub struct ExtendedItem {
147    /// Item description.
148    pub description: String,
149    /// Item value.
150    pub item: String,
151}
152
153/// A content genre nibble triplet from a
154/// [`ContentDescriptor`](crate::descriptors::content::ContentDescriptor).
155#[non_exhaustive]
156#[derive(Debug, Clone, PartialEq, Eq)]
157#[cfg_attr(feature = "serde", derive(serde::Serialize))]
158pub struct ContentNibble {
159    /// Content nibble level 1 (category).
160    pub level_1: u8,
161    /// Content nibble level 2 (sub-category).
162    pub level_2: u8,
163    /// User-defined byte.
164    pub user: u8,
165}
166
167impl ContentNibble {
168    /// Level-1 broad category per EN 300 468 Table 29.
169    #[must_use]
170    pub fn genre(&self) -> crate::descriptors::content::ContentGenre {
171        crate::descriptors::content::ContentGenre::from_nibble_1(self.level_1)
172    }
173
174    /// Most specific genre name per EN 300 468 Table 29.
175    #[must_use]
176    pub fn genre_name(&self) -> &'static str {
177        crate::descriptors::content::content_genre_name(self.level_1, self.level_2)
178    }
179}
180
181/// A decoded view of an EPG event.
182///
183/// Extracted from [`crate::collect::CompleteEitEvent`] with commonly needed
184/// descriptor fields pre-decoded and extended text concatenated per
185/// EN 300 468 §6.2.15.
186///
187/// # Limitations
188///
189/// Only the first descriptor of each kind (short_event, content,
190/// parental_rating, content_identifier) is decoded per event; EIT events
191/// may carry multiple language variants and only the first is taken.
192#[non_exhaustive]
193#[derive(Debug, Clone, PartialEq, Eq)]
194#[cfg_attr(feature = "serde", derive(serde::Serialize))]
195pub struct EpgEvent {
196    /// 16-bit event_id.
197    pub event_id: u16,
198    /// Decoded start time (MJD + BCD UTC), if valid.
199    pub start_time: Option<chrono::DateTime<chrono::Utc>>,
200    /// Decoded BCD duration, if valid.
201    pub duration: Option<core::time::Duration>,
202    /// 3-bit running status (EN 300 468 Table 6).
203    pub running_status: RunningStatus,
204    /// free_CA_mode.
205    pub free_ca_mode: bool,
206    /// Decoded short event name (from
207    /// [`ShortEventDescriptor`](crate::descriptors::short_event::ShortEventDescriptor)),
208    /// if present and decodeable.
209    pub event_name: Option<String>,
210    /// Decoded short event text, if present and decodeable.
211    pub event_text: Option<String>,
212    /// Concatenated extended event text from all
213    /// [`ExtendedEventDescriptor`](crate::descriptors::extended_event::ExtendedEventDescriptor)
214    /// fragments, per EN 300 468 §6.2.15. Fragments are sorted by
215    /// `descriptor_number` and concatenated directly (no separator).
216    pub extended_text: Option<String>,
217    /// Accumulated extended event items (description, value) from all
218    /// [`ExtendedEventDescriptor`](crate::descriptors::extended_event::ExtendedEventDescriptor)
219    /// fragments, sorted by `descriptor_number`.
220    #[cfg_attr(feature = "serde", serde(default))]
221    pub extended_items: Vec<ExtendedItem>,
222    /// Content genre entries from
223    /// [`ContentDescriptor`](crate::descriptors::content::ContentDescriptor).
224    #[cfg_attr(feature = "serde", serde(default))]
225    pub content_nibbles: Vec<ContentNibble>,
226    /// Parental rating entries from
227    /// [`ParentalRatingDescriptor`](crate::descriptors::parental_rating::ParentalRatingDescriptor).
228    #[cfg_attr(feature = "serde", serde(default))]
229    pub ratings: Vec<Rating>,
230    /// CRID entries from
231    /// [`ContentIdentifierDescriptor`](crate::descriptors::content_identifier::ContentIdentifierDescriptor).
232    #[cfg_attr(feature = "serde", serde(default))]
233    pub crids: Vec<Crid>,
234}
235
236/// Serialisable service data exposed by [`EpgStore`] serde export.
237#[derive(Debug, Clone)]
238#[cfg_attr(feature = "serde", derive(serde::Serialize))]
239struct ServiceData {
240    #[cfg_attr(feature = "serde", serde(skip_serializing_if = "Option::is_none"))]
241    service_name: Option<String>,
242    events: Vec<EpgEvent>,
243}
244
245/// Default cap on distinct [`ServiceKey`] entries (services with cached EPG data).
246///
247/// 1024 services is generous — a single DVB transponder typically carries 20–50
248/// services — while bounding a hostile stream that rotates `service_id` /
249/// `transport_stream_id` / `original_network_id` to force unbounded map growth.
250pub const DEFAULT_MAX_SERVICES: usize = 1024;
251
252/// Default cap on events stored per service.
253///
254/// 8192 events (~28 days of 5-minute-granularity schedule entries) is far
255/// above real 7-day EPG depth while bounding per-service event accumulation
256/// from a hostile stream that rotates `event_id` without re-versioning.
257pub const DEFAULT_MAX_EVENTS_PER_SERVICE: usize = 8192;
258
259/// A store for EIT data, providing high-level access to program events.
260///
261/// Wraps a [`crate::collect::EitCollector`] and maintains a deduplicated,
262/// event list per service. Optionally accepts SDT data to attach service names.
263///
264/// Serde export serializes the cache as a map of `ServiceKey` → service data.
265///
266/// # Memory bounds
267///
268/// The store is bounded by two configurable caps (see [module docs](self) for
269/// rationale and default values). Use [`with_max_services`](Self::with_max_services)
270/// and [`with_max_events_per_service`](Self::with_max_events_per_service) to
271/// tune them.
272///
273/// # Limitations
274///
275/// Events are deduplicated by `event_id` and stored within the configured
276/// per-service cap. Events removed from a re-versioned schedule, or events
277/// already in the past, remain in the store until evicted by
278/// [`retain_services()`](Self::retain_services) or [`clear()`](Self::clear).
279///
280/// # Example
281///
282/// ```no_run
283/// use dvb_si::epg::{EpgStore, ServiceKey};
284/// use chrono::Utc;
285///
286/// let mut store = EpgStore::new();
287/// // store.feed(&eit_section_bytes).unwrap();
288///
289/// let key = ServiceKey {
290///     original_network_id: 1,
291///     transport_stream_id: 1,
292///     service_id: 100,
293/// };
294///
295/// let (now_evt, _next) = store.now_and_next(key, Utc::now());
296/// if let Some(e) = now_evt {
297///     println!("Now playing: {:?}", e.event_name);
298/// }
299/// ```
300#[derive(Debug)]
301pub struct EpgStore {
302    collector: crate::collect::EitCollector,
303    cache: HashMap<ServiceKey, ServiceEpg>,
304    max_services: usize,
305    max_events_per_service: usize,
306}
307
308impl Default for EpgStore {
309    fn default() -> Self {
310        Self {
311            collector: crate::collect::EitCollector::default(),
312            cache: HashMap::new(),
313            max_services: DEFAULT_MAX_SERVICES,
314            max_events_per_service: DEFAULT_MAX_EVENTS_PER_SERVICE,
315        }
316    }
317}
318
319#[derive(Debug, Default)]
320struct ServiceEpg {
321    service_name: Option<String>,
322    /// Deduplicated by event_id. Latest version wins (later inserts overwrite).
323    events: HashMap<u16, EpgEvent>,
324}
325
326impl EpgStore {
327    /// Create a new, empty EPG store with default caps
328    /// ([`DEFAULT_MAX_SERVICES`], [`DEFAULT_MAX_EVENTS_PER_SERVICE`]).
329    #[must_use]
330    pub fn new() -> Self {
331        Self::default()
332    }
333
334    /// Replace the service-count cap (default [`DEFAULT_MAX_SERVICES`]).
335    /// When the cap is reached, events for new services are skipped until
336    /// [`retain_services`](Self::retain_services) or [`clear`](Self::clear)
337    /// frees capacity.
338    #[must_use]
339    pub fn with_max_services(mut self, max_services: usize) -> Self {
340        self.max_services = max_services;
341        self
342    }
343
344    /// Replace the per-service event cap (default
345    /// [`DEFAULT_MAX_EVENTS_PER_SERVICE`]). When a service reaches its cap, new
346    /// events (by `event_id`) are skipped; existing event_ids are still updated
347    /// on version churn.
348    #[must_use]
349    pub fn with_max_events_per_service(mut self, max_events_per_service: usize) -> Self {
350        self.max_events_per_service = max_events_per_service;
351        self
352    }
353
354    /// Replace the underlying collector's logical-key cap (default
355    /// [`crate::collect::DEFAULT_MAX_LOGICAL_KEYS`]). See
356    /// [`crate::collect::EitCollector::with_max_logical_keys`].
357    #[must_use]
358    pub fn with_collector_max_logical_keys(mut self, max_logical_keys: usize) -> Self {
359        self.collector = self.collector.with_max_logical_keys(max_logical_keys);
360        self
361    }
362
363    /// Feed one EIT section into the store.
364    ///
365    /// If a table becomes complete, its events are merged into the cache
366    /// (deduplicated by `event_id`, later insertions overwrite).
367    ///
368    /// # Errors
369    ///
370    /// Returns a [`crate::collect::CollectError`] if the section is malformed.
371    pub fn feed(&mut self, bytes: &[u8]) -> CollectResult<()> {
372        self.feed_with_pid(None, bytes)
373    }
374
375    /// Feed one EIT section with PID context into the store.
376    pub fn feed_with_pid(&mut self, pid: Option<u16>, bytes: &[u8]) -> CollectResult<()> {
377        if let Some(completed) = self.collector.push_section_with_pid(pid, bytes)? {
378            let tables = completed.tables()?;
379            for table in &tables {
380                let key = ServiceKey {
381                    original_network_id: table.original_network_id,
382                    transport_stream_id: table.transport_stream_id,
383                    service_id: table.service_id,
384                };
385                if self.cache.len() >= self.max_services && !self.cache.contains_key(&key) {
386                    continue;
387                }
388                let svc = self.cache.entry(key).or_default();
389                for event in &table.events {
390                    if svc.events.len() >= self.max_events_per_service
391                        && !svc.events.contains_key(&event.event_id)
392                    {
393                        continue;
394                    }
395                    svc.events.insert(event.event_id, event_to_epg(event));
396                }
397            }
398        }
399        Ok(())
400    }
401
402    /// Feed completed SDT data to attach service names.
403    ///
404    /// Accepts a parsed [`crate::collect::CompleteSdt`] from a
405    /// [`crate::collect::SectionSetCollector`].
406    pub fn feed_sdt(&mut self, sdt: &crate::collect::CompleteSdt<'_>) {
407        for svc in &sdt.services {
408            let key = ServiceKey {
409                original_network_id: sdt.original_network_id,
410                transport_stream_id: sdt.transport_stream_id,
411                service_id: svc.service_id,
412            };
413            let entry = self.cache.entry(key).or_default();
414            entry.service_name = extract_service_name(svc.descriptors.descriptors());
415        }
416    }
417
418    /// Get the "now" and "next" events for a service.
419    ///
420    /// Searches the event list for the given service and returns the event
421    /// currently on-air ("now") and the next upcoming event ("next") based
422    /// on reference time `at`.
423    ///
424    /// "now" is the event where `at` falls within `[start, start + duration)`.
425    /// "next" is the event with the earliest `start_time` strictly after `at`
426    /// (not just the first such event in arbitrary iteration order).
427    ///
428    /// An event ending exactly at `at` is NOT considered "now" (exclusive end).
429    ///
430    /// Returns `(None, None)` when the service is unknown or no event matches.
431    pub fn now_and_next(
432        &self,
433        key: ServiceKey,
434        at: chrono::DateTime<chrono::Utc>,
435    ) -> (Option<&EpgEvent>, Option<&EpgEvent>) {
436        let Some(svc) = self.cache.get(&key) else {
437            return (None, None);
438        };
439
440        let now = svc.events.values().find(|e| {
441            if let (Some(start), Some(dur)) = (e.start_time, e.duration) {
442                let end = start + dur;
443                return at >= start && at < end;
444            }
445            false
446        });
447
448        let next = svc
449            .events
450            .values()
451            .filter(|e| {
452                if let Some(start) = e.start_time {
453                    start > at
454                } else {
455                    false
456                }
457            })
458            .min_by_key(|e| e.start_time);
459
460        (now, next)
461    }
462
463    /// Query events with start times in the half-open range `[from, to)`.
464    ///
465    /// Returns events sorted by start time (valid times first, then by
466    /// event_id). Events without a decodable start time are excluded.
467    #[must_use]
468    pub fn schedule(
469        &self,
470        key: ServiceKey,
471        from: chrono::DateTime<chrono::Utc>,
472        to: chrono::DateTime<chrono::Utc>,
473    ) -> Option<Vec<&EpgEvent>> {
474        let svc = self.cache.get(&key)?;
475        let mut events: Vec<&EpgEvent> = svc
476            .events
477            .values()
478            .filter(|e| {
479                if let Some(start) = e.start_time {
480                    start >= from && start < to
481                } else {
482                    false
483                }
484            })
485            .collect();
486        events.sort_by(|a, b| cmp_event_by_start(a, b));
487        Some(events)
488    }
489
490    /// Return the service name for a given key, if SDT data was fed.
491    #[must_use]
492    pub fn service_name(&self, key: ServiceKey) -> Option<&str> {
493        self.cache.get(&key).and_then(|s| s.service_name.as_deref())
494    }
495
496    /// Iterate the [`ServiceKey`]s of every service with cached EIT data, so
497    /// callers can walk the whole EPG (e.g. render a grid) without knowing the
498    /// service ids in advance. Order is unspecified.
499    pub fn services(&self) -> impl Iterator<Item = ServiceKey> + '_ {
500        self.cache.keys().copied()
501    }
502
503    /// Return all events for a service, sorted by start time
504    /// (events without a valid start time sort last, then by event_id).
505    #[must_use]
506    pub fn events(&self, key: ServiceKey) -> Option<Vec<&EpgEvent>> {
507        let svc = self.cache.get(&key)?;
508        let mut events: Vec<&EpgEvent> = svc.events.values().collect();
509        events.sort_by(|a, b| cmp_event_by_start(a, b));
510        Some(events)
511    }
512
513    /// Return the number of services with cached EIT data.
514    #[must_use]
515    pub fn service_count(&self) -> usize {
516        self.cache.len()
517    }
518
519    /// Return the total number of events across all services.
520    #[must_use]
521    pub fn event_count(&self) -> usize {
522        self.cache.values().map(|s| s.events.len()).sum()
523    }
524
525    /// Retain only services matching the given predicate.
526    ///
527    /// Both the event cache and the underlying collector partial state
528    /// for rejected keys are removed.
529    pub fn retain_services<F>(&mut self, mut keep: F)
530    where
531        F: FnMut(&ServiceKey) -> bool,
532    {
533        self.cache.retain(|key, _| keep(key));
534        self.collector.retain_logical(|lk| {
535            keep(&ServiceKey {
536                original_network_id: lk.original_network_id,
537                transport_stream_id: lk.transport_stream_id,
538                service_id: lk.service_id,
539            })
540        });
541    }
542
543    /// Clear all cached EIT data and reset the internal collector.
544    pub fn clear(&mut self) {
545        self.collector.clear();
546        self.cache.clear();
547    }
548}
549
550#[cfg(feature = "serde")]
551impl serde::Serialize for EpgStore {
552    fn serialize<S: serde::Serializer>(&self, s: S) -> Result<S::Ok, S::Error> {
553        use serde::ser::SerializeMap;
554        let mut m = s.serialize_map(Some(self.cache.len()))?;
555        for (key, svc) in &self.cache {
556            let data = ServiceData {
557                service_name: svc.service_name.clone(),
558                events: {
559                    let mut evts: Vec<EpgEvent> = svc.events.values().cloned().collect();
560                    evts.sort_by(cmp_event_by_start);
561                    evts
562                },
563            };
564            let key_str = format!(
565                "{}-{}-{}",
566                key.original_network_id, key.transport_stream_id, key.service_id
567            );
568            m.serialize_entry(&key_str, &data)?;
569        }
570        m.end()
571    }
572}
573
574fn cmp_event_by_start(a: &EpgEvent, b: &EpgEvent) -> std::cmp::Ordering {
575    match (a.start_time, b.start_time) {
576        (Some(at), Some(bt)) => at.cmp(&bt).then_with(|| a.event_id.cmp(&b.event_id)),
577        (Some(_), None) => std::cmp::Ordering::Less,
578        (None, Some(_)) => std::cmp::Ordering::Greater,
579        (None, None) => a.event_id.cmp(&b.event_id),
580    }
581}
582
583fn event_to_epg(e: &crate::collect::CompleteEitEvent<'_>) -> EpgEvent {
584    let (event_name, event_text) = extract_short_event(e.descriptors.descriptors());
585    let (extended_text, extended_items) = extract_extended(e.descriptors.descriptors());
586    let content_nibbles = extract_content(e.descriptors.descriptors());
587    let ratings = extract_ratings(e.descriptors.descriptors());
588    let crids = extract_crids(e.descriptors.descriptors());
589
590    EpgEvent {
591        event_id: e.event_id,
592        start_time: e.start_time(),
593        duration: e.duration(),
594        running_status: e.running_status,
595        free_ca_mode: e.free_ca_mode,
596        event_name,
597        event_text,
598        extended_text,
599        extended_items,
600        content_nibbles,
601        ratings,
602        crids,
603    }
604}
605
606fn extract_short_event(
607    descriptors: &[crate::Result<crate::descriptors::AnyDescriptor<'_>>],
608) -> (Option<String>, Option<String>) {
609    for desc in descriptors {
610        if let Ok(crate::descriptors::AnyDescriptor::ShortEvent(se)) = desc {
611            return (
612                Some(se.event_name.decode().into_owned()),
613                Some(se.text.decode().into_owned()),
614            );
615        }
616    }
617    (None, None)
618}
619
620struct ExtendedFragment {
621    descriptor_number: u8,
622    text: String,
623    items: Vec<ExtendedItem>,
624}
625
626fn extract_extended(
627    descriptors: &[crate::Result<crate::descriptors::AnyDescriptor<'_>>],
628) -> (Option<String>, Vec<ExtendedItem>) {
629    use crate::descriptors::AnyDescriptor;
630
631    let mut fragments: Vec<ExtendedFragment> = descriptors
632        .iter()
633        .filter_map(|d| {
634            if let Ok(AnyDescriptor::ExtendedEvent(ee)) = d {
635                let text = ee.text.decode().into_owned();
636                let items: Vec<ExtendedItem> = ee
637                    .items
638                    .iter()
639                    .map(|i| ExtendedItem {
640                        description: i.description.decode().into_owned(),
641                        item: i.value.decode().into_owned(),
642                    })
643                    .collect();
644                if !text.is_empty() || !items.is_empty() {
645                    Some(ExtendedFragment {
646                        descriptor_number: ee.descriptor_number,
647                        text,
648                        items,
649                    })
650                } else {
651                    None
652                }
653            } else {
654                None
655            }
656        })
657        .collect();
658
659    if fragments.is_empty() {
660        return (None, Vec::new());
661    }
662
663    // Sort by descriptor_number per EN 300 468 §6.2.15.
664    fragments.sort_by_key(|f| f.descriptor_number);
665
666    let extended_text: String = fragments.iter().map(|f| f.text.as_str()).collect();
667
668    let extended_items: Vec<ExtendedItem> = fragments.into_iter().flat_map(|f| f.items).collect();
669
670    let text = if extended_text.is_empty() {
671        None
672    } else {
673        Some(extended_text)
674    };
675
676    (text, extended_items)
677}
678
679fn extract_content(
680    descriptors: &[crate::Result<crate::descriptors::AnyDescriptor<'_>>],
681) -> Vec<ContentNibble> {
682    for desc in descriptors {
683        if let Ok(crate::descriptors::AnyDescriptor::Content(ct)) = desc {
684            return ct
685                .entries
686                .iter()
687                .map(|e| ContentNibble {
688                    level_1: e.nibble_1,
689                    level_2: e.nibble_2,
690                    user: e.user_byte,
691                })
692                .collect();
693        }
694    }
695    Vec::new()
696}
697
698fn extract_ratings(
699    descriptors: &[crate::Result<crate::descriptors::AnyDescriptor<'_>>],
700) -> Vec<Rating> {
701    for desc in descriptors {
702        if let Ok(crate::descriptors::AnyDescriptor::ParentalRating(pr)) = desc {
703            return pr
704                .entries
705                .iter()
706                .map(|e| Rating {
707                    country: e.country_code.as_str().into_owned(),
708                    value: e.rating,
709                })
710                .collect();
711        }
712    }
713    Vec::new()
714}
715
716fn extract_crids(
717    descriptors: &[crate::Result<crate::descriptors::AnyDescriptor<'_>>],
718) -> Vec<Crid> {
719    use crate::descriptors::content_identifier::CridLocation;
720    for desc in descriptors {
721        if let Ok(crate::descriptors::AnyDescriptor::ContentIdentifier(ci)) = desc {
722            return ci
723                .entries
724                .iter()
725                .filter_map(|e| match e.location {
726                    CridLocation::Inline(bytes) => {
727                        let s = String::from_utf8_lossy(bytes).into_owned();
728                        Some(Crid {
729                            crid_type: e.crid_type,
730                            crid: s,
731                        })
732                    }
733                    _ => None,
734                })
735                .collect();
736        }
737    }
738    Vec::new()
739}
740
741fn extract_service_name(
742    descriptors: &[crate::Result<crate::descriptors::AnyDescriptor<'_>>],
743) -> Option<String> {
744    for desc in descriptors {
745        if let Ok(crate::descriptors::AnyDescriptor::Service(svc)) = desc {
746            return Some(svc.service_name.decode().into_owned());
747        }
748    }
749    None
750}
751
752#[cfg(test)]
753mod tests {
754    use super::*;
755    use chrono::{TimeZone, Utc};
756
757    // ------------------------------------------------------------------
758    // Helpers
759    // ------------------------------------------------------------------
760
761    /// Build the bytes of a minimal short_event_descriptor.
762    fn short_event_bytes(name: &[u8], text: &[u8]) -> Vec<u8> {
763        let lang = b"eng";
764        let mut v = Vec::new();
765        v.push(0x4Du8); // tag
766        v.push((3 + 1 + name.len() + 1 + text.len()) as u8); // length
767        v.extend_from_slice(lang);
768        v.push(name.len() as u8);
769        v.extend_from_slice(name);
770        v.push(text.len() as u8);
771        v.extend_from_slice(text);
772        v
773    }
774
775    /// Build the bytes of a minimal EIT present/following section
776    /// with one event. Returns bytes formated as a complete TS section
777    /// (including CRC-32).
778    #[allow(clippy::too_many_arguments)]
779    fn eit_pf_section(
780        service_id: u16,
781        ts_id: u16,
782        on_id: u16,
783        event_id: u16,
784        version: u8,
785        start_raw: [u8; 5],
786        dur_raw: [u8; 3],
787        descriptors: &[u8],
788    ) -> Vec<u8> {
789        let table_id = 0x4Eu8;
790
791        // Header: 3 + ext_header(5) + post_ext(6) = 14
792        // Event: 12 + descriptors.len()
793        // CRC: 4
794        let ev_len = 12 + descriptors.len();
795        let section_length = 5 + 6 + ev_len + 4;
796        let total = 3 + section_length;
797
798        let mut buf = vec![0u8; total];
799        buf[0] = table_id;
800        buf[1] = 0xB0 | ((section_length >> 8) as u8 & 0x0F);
801        buf[2] = (section_length & 0xFF) as u8;
802        buf[3..5].copy_from_slice(&service_id.to_be_bytes());
803        // reserved(2)=0b11, version, current_next=1
804        buf[5] = 0xC0 | ((version & 0x1F) << 1) | 0x01;
805        buf[6] = 0; // section_number
806        buf[7] = 0; // last_section_number
807        buf[8..10].copy_from_slice(&ts_id.to_be_bytes());
808        buf[10..12].copy_from_slice(&on_id.to_be_bytes());
809        buf[12] = 0; // segment_last_section_number
810        buf[13] = 0x5F; // last_table_id
811
812        // Event
813        let ev_off = 14;
814        buf[ev_off..ev_off + 2].copy_from_slice(&event_id.to_be_bytes());
815        buf[ev_off + 2..ev_off + 7].copy_from_slice(&start_raw);
816        buf[ev_off + 7..ev_off + 10].copy_from_slice(&dur_raw);
817        let dll = descriptors.len() as u16;
818        buf[ev_off + 10] = ((dll >> 8) as u8) & 0x0F;
819        buf[ev_off + 11] = (dll & 0xFF) as u8;
820        buf[ev_off + 12..ev_off + 12 + descriptors.len()].copy_from_slice(descriptors);
821
822        // CRC-32
823        let crc_pos = total - 4;
824        let crc = dvb_common::crc32_mpeg2::compute(&buf[..crc_pos]);
825        buf[crc_pos..].copy_from_slice(&crc.to_be_bytes());
826        buf
827    }
828
829    /// Build start-time raw bytes (16-bit MJD + 24-bit BCD) for a given
830    /// year/month/day/hour.
831    fn start_raw(year: i32, month: u32, day: u32, hour: u32) -> [u8; 5] {
832        let mjd = mjd_approx(year, month, day);
833        let mjd_bytes = mjd.to_be_bytes();
834        let bcd_hour = ((hour / 10 * 16) + (hour % 10)) as u8;
835        [
836            mjd_bytes[0],
837            mjd_bytes[1],
838            bcd_hour,
839            0, // minute BCD
840            0, // second BCD
841        ]
842    }
843
844    /// Quick MJD approximation for test dates (2026-06-10 = MJD 61785).
845    fn mjd_approx(year: i32, month: u32, day: u32) -> u16 {
846        assert!(
847            (year, month, day) == (2026, 6, 10),
848            "mjd_approx only supports 2026-06-10"
849        );
850        61785
851    }
852
853    /// Build content_descriptor (tag 0x54) wire bytes.
854    fn content_descriptor_bytes(entries: &[(u8, u8, u8)]) -> Vec<u8> {
855        let mut v = vec![0x54u8, (entries.len() * 2) as u8];
856        for &(n1, n2, u) in entries {
857            v.push((n1 << 4) | n2);
858            v.push(u);
859        }
860        v
861    }
862
863    /// Build parental_rating_descriptor (tag 0x55) wire bytes.
864    fn parental_rating_bytes(entries: &[([u8; 3], u8)]) -> Vec<u8> {
865        let mut v = vec![0x55u8, (entries.len() * 4) as u8];
866        for (country, rating) in entries {
867            v.extend_from_slice(country);
868            v.push(*rating);
869        }
870        v
871    }
872
873    /// Build content_identifier_descriptor (tag 0x76) wire bytes with inline
874    /// CRIDs.
875    fn content_identifier_bytes(entries: &[(u8, &[u8])]) -> Vec<u8> {
876        let body_len: usize = entries.iter().map(|(_, data)| 2 + data.len()).sum();
877        let mut v = vec![0x76u8, body_len as u8];
878        for (crid_type, data) in entries {
879            v.push(crid_type << 2); // location=0b00 inline
880            v.push(data.len() as u8);
881            v.extend_from_slice(data);
882        }
883        v
884    }
885
886    // ------------------------------------------------------------------
887    // Basic tests
888    // ------------------------------------------------------------------
889
890    #[test]
891    fn new_store_is_empty() {
892        let store = EpgStore::new();
893        assert_eq!(store.service_count(), 0);
894        assert_eq!(store.event_count(), 0);
895    }
896
897    #[test]
898    fn feed_empty_is_error() {
899        let mut store = EpgStore::new();
900        assert!(store.feed(&[]).is_err());
901    }
902
903    #[test]
904    fn now_and_next_no_data_returns_none() {
905        let store = EpgStore::new();
906        let now = Utc::now();
907        let key = ServiceKey {
908            original_network_id: 1,
909            transport_stream_id: 1,
910            service_id: 100,
911        };
912        assert_eq!(store.now_and_next(key, now), (None, None));
913    }
914
915    #[test]
916    fn service_key_ordering() {
917        let a = ServiceKey {
918            original_network_id: 1,
919            transport_stream_id: 2,
920            service_id: 100,
921        };
922        let b = ServiceKey {
923            original_network_id: 1,
924            transport_stream_id: 2,
925            service_id: 200,
926        };
927        assert!(a < b);
928    }
929
930    fn empty_event(
931        id: u16,
932        start: Option<chrono::DateTime<chrono::Utc>>,
933        dur: Option<core::time::Duration>,
934    ) -> EpgEvent {
935        EpgEvent {
936            event_id: id,
937            start_time: start,
938            duration: dur,
939            running_status: RunningStatus::Undefined,
940            free_ca_mode: false,
941            event_name: None,
942            event_text: None,
943            extended_text: None,
944            extended_items: Vec::new(),
945            content_nibbles: Vec::new(),
946            ratings: Vec::new(),
947            crids: Vec::new(),
948        }
949    }
950
951    #[test]
952    fn events_sorts_valid_before_invalid() {
953        let valid = empty_event(
954            1,
955            Some(Utc::now()),
956            Some(core::time::Duration::from_secs(3600)),
957        );
958        let invalid = empty_event(2, None, None);
959
960        let mut events = [&invalid, &valid];
961        events.sort_by(|a, b| cmp_event_by_start(a, b));
962        assert_eq!(events[0].event_id, 1);
963        assert_eq!(events[1].event_id, 2);
964    }
965
966    // ------------------------------------------------------------------
967    // §6.2.15 extended event text chaining
968    // ------------------------------------------------------------------
969
970    #[test]
971    fn extended_text_chaining_per_spec_6_2_15() {
972        use crate::descriptors::extended_event::ExtendedEventDescriptor;
973        use crate::descriptors::AnyDescriptor;
974        use crate::text::{DvbText, LangCode};
975
976        // Fragment 1: descriptor_number=2, last_descriptor_number=3
977        // "The quick " + item ("Director", "Alice")
978        let frag1 = ExtendedEventDescriptor {
979            descriptor_number: 2,
980            last_descriptor_number: 3,
981            language_code: LangCode(*b"eng"),
982            items: vec![crate::descriptors::extended_event::ExtendedEventItem {
983                description: DvbText::new(b"Director"),
984                value: DvbText::new(b"Alice"),
985            }],
986            text: DvbText::new(b"The quick "),
987        };
988
989        // Fragment 2: descriptor_number=0, last_descriptor_number=3
990        // "brown fox" + item ("Year", "2026")
991        let frag2 = ExtendedEventDescriptor {
992            descriptor_number: 0,
993            last_descriptor_number: 3,
994            language_code: LangCode(*b"eng"),
995            items: vec![crate::descriptors::extended_event::ExtendedEventItem {
996                description: DvbText::new(b"Year"),
997                value: DvbText::new(b"2026"),
998            }],
999            text: DvbText::new(b"brown fox"),
1000        };
1001
1002        // Fragment 3: descriptor_number=3, last_descriptor_number=3
1003        // "jumps." + no items
1004        let frag3 = ExtendedEventDescriptor {
1005            descriptor_number: 3,
1006            last_descriptor_number: 3,
1007            language_code: LangCode(*b"eng"),
1008            items: vec![],
1009            text: DvbText::new(b"jumps."),
1010        };
1011
1012        // Fragment 4: descriptor_number=1, last_descriptor_number=3
1013        // empty text + item ("Genre", "Thriller") — dropped by the chaining
1014        // helper (text is empty but items present → included)
1015        let frag4 = ExtendedEventDescriptor {
1016            descriptor_number: 1,
1017            last_descriptor_number: 3,
1018            language_code: LangCode(*b"eng"),
1019            items: vec![crate::descriptors::extended_event::ExtendedEventItem {
1020                description: DvbText::new(b"Genre"),
1021                value: DvbText::new(b"Thriller"),
1022            }],
1023            text: DvbText::new(b""),
1024        };
1025
1026        // Feed fragments out of order via AnyDescriptor.
1027        let descriptors: Vec<crate::Result<AnyDescriptor<'_>>> = vec![
1028            Ok(AnyDescriptor::ExtendedEvent(frag1)), // dn=2
1029            Ok(AnyDescriptor::ExtendedEvent(frag4)), // dn=1
1030            Ok(AnyDescriptor::ExtendedEvent(frag3)), // dn=3
1031            Ok(AnyDescriptor::ExtendedEvent(frag2)), // dn=0
1032        ];
1033
1034        let (text, items) = extract_extended(&descriptors);
1035
1036        // Text concatenated in descriptor_number order: 0,1,2,3
1037        assert_eq!(text.as_deref(), Some("brown foxThe quick jumps."));
1038
1039        // Items accumulated in descriptor_number order: dn=0 ("Year"/"2026"),
1040        // dn=1 ("Genre"/"Thriller"), dn=2 ("Director"/"Alice"), dn=3 (none)
1041        assert_eq!(items.len(), 3);
1042        assert_eq!(
1043            items[0],
1044            ExtendedItem {
1045                description: "Year".into(),
1046                item: "2026".into()
1047            }
1048        );
1049        assert_eq!(
1050            items[1],
1051            ExtendedItem {
1052                description: "Genre".into(),
1053                item: "Thriller".into()
1054            }
1055        );
1056        assert_eq!(
1057            items[2],
1058            ExtendedItem {
1059                description: "Director".into(),
1060                item: "Alice".into()
1061            }
1062        );
1063    }
1064
1065    // ------------------------------------------------------------------
1066    // now_and_next boundary correctness
1067    // ------------------------------------------------------------------
1068
1069    #[test]
1070    fn now_and_next_event_boundary() {
1071        let t1000 = Utc.with_ymd_and_hms(2026, 6, 10, 10, 0, 0).unwrap();
1072        let t1100 = Utc.with_ymd_and_hms(2026, 6, 10, 11, 0, 0).unwrap();
1073        let t1200 = Utc.with_ymd_and_hms(2026, 6, 10, 12, 0, 0).unwrap();
1074
1075        // Event 1: 10:00-11:00
1076        // Event 2: 12:00-13:00
1077        let sec = core::time::Duration::from_secs(3600);
1078        let ev1 = EpgEvent {
1079            event_id: 1,
1080            start_time: Some(t1000),
1081            duration: Some(sec),
1082            running_status: RunningStatus::Undefined,
1083            free_ca_mode: false,
1084            event_name: Some("Event 1".into()),
1085            event_text: None,
1086            extended_text: None,
1087            extended_items: vec![],
1088            content_nibbles: vec![],
1089            ratings: vec![],
1090            crids: vec![],
1091        };
1092        let ev2 = EpgEvent {
1093            event_id: 2,
1094            start_time: Some(t1200),
1095            duration: Some(sec),
1096            running_status: RunningStatus::Undefined,
1097            free_ca_mode: false,
1098            event_name: Some("Event 2".into()),
1099            event_text: None,
1100            extended_text: None,
1101            extended_items: vec![],
1102            content_nibbles: vec![],
1103            ratings: vec![],
1104            crids: vec![],
1105        };
1106
1107        // Set up store manually (bypass feed).
1108        let mut store = EpgStore::new();
1109        let key = ServiceKey {
1110            original_network_id: 1,
1111            transport_stream_id: 1,
1112            service_id: 100,
1113        };
1114        let svc = store.cache.entry(key).or_default();
1115        svc.events.insert(1, ev1);
1116        svc.events.insert(2, ev2);
1117
1118        // At 10:30 — now=Event 1, next=Event 2
1119        let at = Utc.with_ymd_and_hms(2026, 6, 10, 10, 30, 0).unwrap();
1120        let (now, next) = store.now_and_next(key, at);
1121        assert_eq!(now.unwrap().event_id, 1);
1122        assert_eq!(next.unwrap().event_id, 2);
1123
1124        // At 11:00 exactly — event 1 just ended (exclusive end),
1125        // now=None, next=Event 2
1126        let (now, next) = store.now_and_next(key, t1100);
1127        assert!(now.is_none(), "event ending at query time must NOT be now");
1128        assert_eq!(next.unwrap().event_id, 2);
1129
1130        // At 12:00 exactly — now=Event 2 (start == at, inclusive start),
1131        // next=None
1132        let (now, next) = store.now_and_next(key, t1200);
1133        assert_eq!(now.unwrap().event_id, 2);
1134        assert!(next.is_none());
1135    }
1136
1137    // ------------------------------------------------------------------
1138    // now_and_next: earliest-future-event selection (not arbitrary order)
1139    // ------------------------------------------------------------------
1140
1141    #[test]
1142    fn now_and_next_returns_earliest_future_event() {
1143        // Build a service with events out of insertion order:
1144        // Event 14:00 inserted first, Event 12:00 second, Event 16:00 third.
1145        // Query at 10:00 — "next" must be Event 12:00 (earliest future),
1146        // not Event 14:00 (which would win if the code used arbitrary
1147        // HashMap iteration order).
1148        let t1200 = Utc.with_ymd_and_hms(2026, 6, 10, 12, 0, 0).unwrap();
1149        let t1400 = Utc.with_ymd_and_hms(2026, 6, 10, 14, 0, 0).unwrap();
1150        let t1600 = Utc.with_ymd_and_hms(2026, 6, 10, 16, 0, 0).unwrap();
1151        let t1000 = Utc.with_ymd_and_hms(2026, 6, 10, 10, 0, 0).unwrap();
1152
1153        let sec = core::time::Duration::from_secs(3600);
1154
1155        fn named_event(
1156            id: u16,
1157            start: chrono::DateTime<chrono::Utc>,
1158            dur: core::time::Duration,
1159            name: &str,
1160        ) -> EpgEvent {
1161            EpgEvent {
1162                event_id: id,
1163                start_time: Some(start),
1164                duration: Some(dur),
1165                running_status: RunningStatus::Undefined,
1166                free_ca_mode: false,
1167                event_name: Some(name.into()),
1168                event_text: None,
1169                extended_text: None,
1170                extended_items: vec![],
1171                content_nibbles: vec![],
1172                ratings: vec![],
1173                crids: vec![],
1174            }
1175        }
1176
1177        let mut store = EpgStore::new();
1178        let key = ServiceKey {
1179            original_network_id: 1,
1180            transport_stream_id: 1,
1181            service_id: 100,
1182        };
1183        let svc = store.cache.entry(key).or_default();
1184        // Insert out of order — 14:00 first, 12:00 second, 16:00 third
1185        svc.events.insert(3, named_event(3, t1400, sec, "Event 14"));
1186        svc.events.insert(1, named_event(1, t1200, sec, "Event 12"));
1187        svc.events.insert(2, named_event(2, t1600, sec, "Event 16"));
1188
1189        // "next" at 10:00 must be the earliest future — event 1 at 12:00
1190        let (_now, next) = store.now_and_next(key, t1000);
1191        let next = next.expect("next event must exist");
1192        assert_eq!(
1193            next.event_id, 1,
1194            "next must be earliest future event (12:00), not first in iteration order"
1195        );
1196    }
1197
1198    // ------------------------------------------------------------------
1199    // extract_content / extract_ratings / extract_crids through feed
1200    // ------------------------------------------------------------------
1201
1202    #[test]
1203    fn extract_content_ratings_crids_through_feed() {
1204        let content = content_descriptor_bytes(&[(3, 1, 0xAA), (4, 2, 0xBB)]);
1205        let ratings = parental_rating_bytes(&[(*b"FRA", 0x05), (*b"GBR", 0x01)]);
1206        let crids = content_identifier_bytes(&[
1207            (0x01, b"crid://bbc.co.uk/prog123"),
1208            (0x03, b"crid://bbc.co.uk/rec456"),
1209        ]);
1210
1211        let mut descriptors = Vec::new();
1212        descriptors.extend_from_slice(&content);
1213        descriptors.extend_from_slice(&ratings);
1214        descriptors.extend_from_slice(&crids);
1215
1216        let sr = start_raw(2026, 6, 10, 10);
1217        let eit = eit_pf_section(100, 1, 1, 1, 0, sr, [1, 0, 0], &descriptors);
1218
1219        let mut store = EpgStore::new();
1220        store.feed(&eit).unwrap();
1221
1222        let key = ServiceKey {
1223            original_network_id: 1,
1224            transport_stream_id: 1,
1225            service_id: 100,
1226        };
1227        let events = store.events(key).unwrap();
1228        assert_eq!(events.len(), 1);
1229        let ev = &events[0];
1230
1231        assert_eq!(ev.content_nibbles.len(), 2);
1232        assert_eq!(
1233            ev.content_nibbles[0],
1234            ContentNibble {
1235                level_1: 3,
1236                level_2: 1,
1237                user: 0xAA
1238            }
1239        );
1240        assert_eq!(
1241            ev.content_nibbles[1],
1242            ContentNibble {
1243                level_1: 4,
1244                level_2: 2,
1245                user: 0xBB
1246            }
1247        );
1248
1249        assert_eq!(ev.ratings.len(), 2);
1250        assert_eq!(ev.ratings[0].country, "FRA");
1251        assert_eq!(ev.ratings[0].value, 0x05);
1252        assert_eq!(ev.ratings[1].country, "GBR");
1253        assert_eq!(ev.ratings[1].value, 0x01);
1254
1255        assert_eq!(ev.crids.len(), 1 + 1); // one item of content + one recommendation
1256        assert_eq!(ev.crids[0].crid_type, CridType::ItemOfContent);
1257        assert_eq!(ev.crids[0].crid, "crid://bbc.co.uk/prog123");
1258        assert_eq!(ev.crids[1].crid_type, CridType::Recommendation);
1259        assert_eq!(ev.crids[1].crid, "crid://bbc.co.uk/rec456");
1260    }
1261
1262    #[test]
1263    fn extract_service_name_through_feed_sdt() {
1264        use crate::collect::SectionSetCollector;
1265
1266        // Build a service_descriptor (tag 0x48) with provider="BBC", service_name="BBC ONE HD"
1267        let svc_desc = {
1268            let provider = b"BBC";
1269            let name = b"BBC ONE HD";
1270            let mut v = vec![0x48u8, (1 + 1 + provider.len() + 1 + name.len()) as u8];
1271            v.push(0x01); // service_type = TV SD
1272            v.push(provider.len() as u8);
1273            v.extend_from_slice(provider);
1274            v.push(name.len() as u8);
1275            v.extend_from_slice(name);
1276            v
1277        };
1278
1279        // Build an SDT section (table_id 0x42) with one service.
1280        let sdt_bytes = {
1281            let dll = svc_desc.len() as u16;
1282            // Service entry: 5 bytes header + descriptors
1283            let svc_entry_len = 5 + dll as usize;
1284            // Section: 3 (header) + 5 (ext) + 3 (post_ext) = 11 + svc + 4 (crc)
1285            let section_length: u16 = 5 + 3 + svc_entry_len as u16 + 4;
1286            let mut buf = vec![0u8; 3 + section_length as usize];
1287            buf[0] = 0x42; // SDT actual
1288            buf[1] = 0xB0 | ((section_length >> 8) as u8 & 0x0F);
1289            buf[2] = (section_length & 0xFF) as u8;
1290            buf[3..5].copy_from_slice(&1u16.to_be_bytes()); // ts_id
1291            buf[5] = 0xC1; // version=0, cni=1
1292            buf[6] = 0; // section_number
1293            buf[7] = 0; // last_section_number
1294            buf[8..10].copy_from_slice(&1u16.to_be_bytes()); // original_network_id
1295            buf[10] = 0xFF; // reserved
1296
1297            // Service entry
1298            let off = 11;
1299            buf[off..off + 2].copy_from_slice(&100u16.to_be_bytes()); // service_id=100
1300            buf[off + 2] = 0xFC; // flags
1301            buf[off + 3] = ((dll >> 8) as u8) & 0x0F;
1302            buf[off + 4] = (dll & 0xFF) as u8;
1303            buf[off + 5..off + 5 + svc_desc.len()].copy_from_slice(&svc_desc);
1304
1305            // CRC
1306            let crc_off = buf.len() - 4;
1307            let crc = dvb_common::crc32_mpeg2::compute(&buf[..crc_off]);
1308            buf[crc_off..].copy_from_slice(&crc.to_be_bytes());
1309            buf
1310        };
1311
1312        let mut collector = SectionSetCollector::new();
1313        let complete = collector.push_section(&sdt_bytes).unwrap().unwrap();
1314        let sdt = complete.sdt().unwrap();
1315
1316        let mut store = EpgStore::new();
1317        store.feed_sdt(&sdt);
1318
1319        let key = ServiceKey {
1320            original_network_id: 1,
1321            transport_stream_id: 1,
1322            service_id: 100,
1323        };
1324        assert_eq!(store.service_name(key), Some("BBC ONE HD"));
1325        assert_eq!(store.service_count(), 1);
1326    }
1327
1328    // ------------------------------------------------------------------
1329    // Version churn: bounded growth
1330    // ------------------------------------------------------------------
1331
1332    #[test]
1333    fn version_churn_bounded_growth() {
1334        // Feed an event, then feed the same event_id with updated data.
1335        // Store size must stay at 1 event.
1336        let s = |hh: u32| {
1337            let t = Utc.with_ymd_and_hms(2026, 6, 10, hh, 0, 0).unwrap();
1338            let days = 61785u16; // MJD for 2026-06-10
1339            let mjd_bytes = days.to_be_bytes();
1340            let bcd_time = [(hh / 10 * 16 + hh % 10) as u8, 0, 0];
1341            (
1342                [
1343                    mjd_bytes[0],
1344                    mjd_bytes[1],
1345                    bcd_time[0],
1346                    bcd_time[1],
1347                    bcd_time[2],
1348                ],
1349                t,
1350            )
1351        };
1352
1353        let (start1, _) = s(10);
1354        let (start2, _) = s(14);
1355
1356        let desc1 = short_event_bytes(b"News at 10", b"");
1357        let desc2 = short_event_bytes(b"News at 14", b"");
1358
1359        let eit1 = eit_pf_section(100, 1, 1, 1, 0, start1, [1, 0, 0], &desc1);
1360        let eit2 = eit_pf_section(100, 1, 1, 1, 1, start2, [1, 0, 0], &desc2);
1361
1362        let mut store = EpgStore::new();
1363        store.feed(&eit1).unwrap();
1364        assert_eq!(store.event_count(), 1);
1365        store.feed(&eit2).unwrap();
1366        // Same event_id should overwrite, not duplicate
1367        assert_eq!(store.event_count(), 1);
1368
1369        let key = ServiceKey {
1370            original_network_id: 1,
1371            transport_stream_id: 1,
1372            service_id: 100,
1373        };
1374        let evts = store.events(key).unwrap();
1375        assert_eq!(evts.len(), 1);
1376        assert_eq!(evts[0].event_name.as_deref(), Some("News at 14"));
1377    }
1378
1379    // ------------------------------------------------------------------
1380    // schedule range query
1381    // ------------------------------------------------------------------
1382
1383    #[test]
1384    fn schedule_range_query() {
1385        let t0900 = Utc.with_ymd_and_hms(2026, 6, 10, 9, 0, 0).unwrap();
1386        let t1000 = Utc.with_ymd_and_hms(2026, 6, 10, 10, 0, 0).unwrap();
1387        let t1100 = Utc.with_ymd_and_hms(2026, 6, 10, 11, 0, 0).unwrap();
1388        let t1200 = Utc.with_ymd_and_hms(2026, 6, 10, 12, 0, 0).unwrap();
1389
1390        let sec = core::time::Duration::from_secs(1800);
1391        let mut store = EpgStore::new();
1392        let key = ServiceKey {
1393            original_network_id: 1,
1394            transport_stream_id: 1,
1395            service_id: 100,
1396        };
1397        let svc = store.cache.entry(key).or_default();
1398        for (id, t) in [(1, t0900), (2, t1000), (3, t1100)] {
1399            svc.events.insert(
1400                id,
1401                EpgEvent {
1402                    event_id: id,
1403                    start_time: Some(t),
1404                    duration: Some(sec),
1405                    running_status: RunningStatus::Undefined,
1406                    free_ca_mode: false,
1407                    event_name: Some(format!("Event {id}")),
1408                    event_text: None,
1409                    extended_text: None,
1410                    extended_items: vec![],
1411                    content_nibbles: vec![],
1412                    ratings: vec![],
1413                    crids: vec![],
1414                },
1415            );
1416        }
1417
1418        // [10:00, 12:00) → events 2 and 3
1419        let events = store.schedule(key, t1000, t1200).unwrap();
1420        assert_eq!(events.len(), 2);
1421        assert_eq!(events[0].event_id, 2);
1422        assert_eq!(events[1].event_id, 3);
1423
1424        // [12:00, 13:00) → empty
1425        let events = store.schedule(key, t1200, t1100).unwrap();
1426        assert!(events.is_empty());
1427    }
1428
1429    // ------------------------------------------------------------------
1430    // Cap enforcement: max_services bounds service-count growth
1431    // ------------------------------------------------------------------
1432
1433    #[test]
1434    fn max_services_capped() {
1435        // Feed 3 distinct services with a cap of 2 — only the first 2 should
1436        // be retained; the third is skipped until space frees.
1437        let mut store = EpgStore::new().with_max_services(2);
1438
1439        let desc = short_event_bytes(b"Test", b"");
1440
1441        // Service 100
1442        let sr1 = start_raw(2026, 6, 10, 10);
1443        let eit1 = eit_pf_section(100, 1, 1, 1, 0, sr1, [1, 0, 0], &desc);
1444        store.feed(&eit1).unwrap();
1445        assert_eq!(store.service_count(), 1);
1446
1447        // Service 200
1448        let sr2 = start_raw(2026, 6, 10, 11);
1449        let eit2 = eit_pf_section(200, 1, 1, 3, 0, sr2, [1, 0, 0], &desc);
1450        store.feed(&eit2).unwrap();
1451        assert_eq!(store.service_count(), 2);
1452
1453        // Service 300 — should be skipped (cap 2 already hit, new key)
1454        let sr3 = start_raw(2026, 6, 10, 12);
1455        let eit3 = eit_pf_section(300, 1, 1, 5, 0, sr3, [1, 0, 0], &desc);
1456        store.feed(&eit3).unwrap();
1457        assert_eq!(
1458            store.service_count(),
1459            2,
1460            "third service must be rejected when cap is full"
1461        );
1462
1463        // Verify service 300 has no entry
1464        let key300 = ServiceKey {
1465            original_network_id: 1,
1466            transport_stream_id: 1,
1467            service_id: 300,
1468        };
1469        assert!(
1470            store.events(key300).is_none(),
1471            "rejected service must not appear"
1472        );
1473
1474        // Clearing frees space — service 300 can now be stored
1475        store.clear();
1476        store.feed(&eit3).unwrap();
1477        assert_eq!(store.service_count(), 1);
1478        assert!(store.events(key300).is_some());
1479    }
1480
1481    // ------------------------------------------------------------------
1482    // Cap enforcement: max_events_per_service bounds per-service events
1483    // ------------------------------------------------------------------
1484
1485    #[test]
1486    fn max_events_per_service_capped() {
1487        // Feed 4 distinct event_ids into one service with a cap of 3.
1488        // The 4th event must be skipped.
1489        let mut store = EpgStore::new().with_max_events_per_service(3);
1490
1491        let desc = short_event_bytes(b"Test", b"");
1492        let key = ServiceKey {
1493            original_network_id: 1,
1494            transport_stream_id: 1,
1495            service_id: 100,
1496        };
1497
1498        for (version, (event_id, hour)) in [(10, 10u32), (20, 11), (30, 12), (40, 13)]
1499            .iter()
1500            .enumerate()
1501        {
1502            let sr = start_raw(2026, 6, 10, *hour);
1503            let eit = eit_pf_section(100, 1, 1, *event_id, version as u8, sr, [1, 0, 0], &desc);
1504            store.feed(&eit).unwrap();
1505        }
1506
1507        assert_eq!(store.event_count(), 3, "4th event must be skipped at cap 3");
1508
1509        // Version churn on existing event_id still works:
1510        let sr_v2 = start_raw(2026, 6, 10, 15);
1511        let eit_v2 = eit_pf_section(100, 1, 1, 10, 1, sr_v2, [1, 0, 0], &desc);
1512        store.feed(&eit_v2).unwrap();
1513        assert_eq!(
1514            store.event_count(),
1515            3,
1516            "version churn on existing event_id must not increase count"
1517        );
1518
1519        let evts = store.events(key).unwrap();
1520        let ev10 = evts.iter().find(|e| e.event_id == 10).unwrap();
1521        assert_eq!(
1522            ev10.event_name.as_deref(),
1523            Some("Test"),
1524            "existing event updated"
1525        );
1526    }
1527
1528    // ------------------------------------------------------------------
1529    // serde round-trip
1530    // ------------------------------------------------------------------
1531
1532    #[cfg(feature = "serde")]
1533    #[test]
1534    fn serde_serializes_store_as_json() {
1535        let t = Utc.with_ymd_and_hms(2026, 6, 10, 20, 0, 0).unwrap();
1536        let mut store = EpgStore::new();
1537        let key = ServiceKey {
1538            original_network_id: 1,
1539            transport_stream_id: 1,
1540            service_id: 100,
1541        };
1542        let svc = store.cache.entry(key).or_default();
1543        svc.service_name = Some("BBC One".into());
1544        svc.events.insert(
1545            1,
1546            EpgEvent {
1547                event_id: 1,
1548                start_time: Some(t),
1549                duration: Some(core::time::Duration::from_secs(3600)),
1550                running_status: RunningStatus::Running,
1551                free_ca_mode: false,
1552                event_name: Some("The News".into()),
1553                event_text: Some("Today's headlines".into()),
1554                extended_text: None,
1555                extended_items: vec![],
1556                content_nibbles: vec![ContentNibble {
1557                    level_1: 1,
1558                    level_2: 1,
1559                    user: 0,
1560                }],
1561                ratings: vec![],
1562                crids: vec![],
1563            },
1564        );
1565
1566        let json = serde_json::to_string(&store).unwrap();
1567        let v: serde_json::Value = serde_json::from_str(&json).unwrap();
1568        let svc_data = &v["1-1-100"];
1569        assert_eq!(svc_data["service_name"], "BBC One");
1570        assert_eq!(svc_data["events"][0]["event_name"], "The News");
1571        assert_eq!(
1572            svc_data["events"][0]["content_nibbles"][0],
1573            serde_json::json!({"level_1": 1, "level_2": 1, "user": 0})
1574        );
1575    }
1576}