Skip to main content

dvb_si/
epg.rs

1//! EPG convenience layer.
2//!
3//! This module provides a high-level store for EIT (Event Information Table) data,
4//! making it easy to query the "now and next" events for a service and export
5//! a full EPG schedule.
6//!
7//! It wraps [`crate::collect::EitCollector`] to handle the multi-section
8//! reassembly of EIT tables and maintains a deduplicated, time-ordered event list
9//! for each service keyed by `(original_network_id, transport_stream_id, service_id)`.
10//!
11//! # Memory bounds
12//!
13//! The store is bounded by default to prevent memory-exhaustion from hostile
14//! or pathological EIT input — an attacker who controls
15//! `original_network_id`/`transport_stream_id`/`service_id`/`event_id` could
16//! otherwise grow the cache without bound. Two caps apply:
17//!
18//! * **`max_services`** (default [`DEFAULT_MAX_SERVICES`]) — caps the number of
19//!   distinct [`ServiceKey`] entries. When the cap is reached, incoming events
20//!   for new services are skipped until a service is removed via
21//!   [`EpgStore::retain_services`] or [`EpgStore::clear`].
22//! * **`max_events_per_service`** (default [`DEFAULT_MAX_EVENTS_PER_SERVICE`]) —
23//!   caps the number of events stored per service. When a service's cap is
24//!   reached, new events (by `event_id`) are skipped; existing event_ids are
25//!   still updated on version churn.
26//!
27//! The policy is *skip-until-space* — the same as
28//! [`crate::carousel::ModuleReassembler`] — so long-running consumers should
29//! call `retain_services` or `clear` periodically to free capacity.
30//!
31//! # Quickstart
32//!
33//! ```rust
34//! use dvb_si::epg::{EpgStore, ServiceKey};
35//! use dvb_si::collect::SectionSetCollector;
36//! use chrono::{TimeZone, Utc};
37//!
38//! let mut store = EpgStore::new();
39//!
40//! // Feed EIT sections (from a TS demux, file, etc.)
41//! // store.feed(&eit_section_bytes)?;
42//!
43//! let key = ServiceKey {
44//!     original_network_id: 1,
45//!     transport_stream_id: 1,
46//!     service_id: 100,
47//! };
48//!
49//! // Query now/next (requires EIT present/following data to be fed first)
50//! let now = Utc.with_ymd_and_hms(2026, 6, 10, 20, 0, 0).unwrap();
51//! let (now_evt, next_evt) = store.now_and_next(key, now);
52//! if let Some(evt) = now_evt {
53//!     println!("Now:  {} (until {})",
54//!         evt.event_name.as_deref().unwrap_or("?"),
55//!         evt.start_time.map(|t| t + evt.duration.unwrap_or_default())
56//!             .map(|e| e.to_string()).unwrap_or_default());
57//! }
58//! if let Some(evt) = next_evt {
59//!     println!("Next: {} at {}",
60//!         evt.event_name.as_deref().unwrap_or("?"),
61//!         evt.start_time.map(|t| t.to_string()).unwrap_or_default());
62//! }
63//! // Print tonight's schedule (events from 20:00 to midnight)
64//! let tonight = Utc.with_ymd_and_hms(2026, 6, 10, 20, 0, 0).unwrap();
65//! let midnight = Utc.with_ymd_and_hms(2026, 6, 11, 0, 0, 0).unwrap();
66//! if let Some(events) = store.schedule(key, tonight, midnight) {
67//!     for evt in &events {
68//!         println!("{:>5}  {}",
69//!             evt.start_time.map(|t| t.format("%H:%M").to_string()).unwrap_or_default(),
70//!             evt.event_name.as_deref().unwrap_or("?"));
71//!     }
72//! }
73//! ```
74//!
75//! # Pruning policy
76//!
77//! The store accumulates events within its configured caps. To bound growth
78//! under schedule churn, use [`EpgStore::retain_services`] to remove services
79//! that are no longer of interest and [`EpgStore::clear`] to reset all state at
80//! a carousel boundary.  The underlying [`crate::collect::EitCollector`] handles
81//! version-driven section-set replacement automatically — when the
82//! `version_number` on a sub-table changes, the old partial set is discarded and
83//! a new one begins.  Callers scanning a full carousel cycle can `clear()` and
84//! start fresh, or `retain_services` to keep only the active service list.
85
86use crate::collect::CollectResult;
87use std::collections::HashMap;
88
89/// Logical key identifying a service across the DVB network.
90#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)]
91#[cfg_attr(feature = "serde", derive(serde::Serialize))]
92pub struct ServiceKey {
93    /// original_network_id from the EIT/SDT.
94    pub original_network_id: u16,
95    /// transport_stream_id from the EIT/SDT.
96    pub transport_stream_id: u16,
97    /// service_id.
98    pub service_id: u16,
99}
100
101/// A parental rating entry from a
102/// [`ParentalRatingDescriptor`](crate::descriptors::parental_rating::ParentalRatingDescriptor).
103#[non_exhaustive]
104#[derive(Debug, Clone, PartialEq, Eq)]
105#[cfg_attr(feature = "serde", derive(serde::Serialize))]
106pub struct Rating {
107    /// Three-character ISO 3166 country code.
108    pub country: String,
109    /// Minimum recommended age.
110    pub value: u8,
111}
112
113/// A content reference identifier entry from a
114/// [`ContentIdentifierDescriptor`](crate::descriptors::content_identifier::ContentIdentifierDescriptor).
115#[non_exhaustive]
116#[derive(Debug, Clone, PartialEq, Eq)]
117#[cfg_attr(feature = "serde", derive(serde::Serialize))]
118pub struct Crid {
119    /// CRID type (0x01 = series, 0x02 = programme, 0x03 = recommendation).
120    pub crid_type: u8,
121    /// The CRID locator string.
122    pub crid: String,
123}
124
125/// An item (description-value pair) from an extended event descriptor fragment.
126#[non_exhaustive]
127#[derive(Debug, Clone, PartialEq, Eq)]
128#[cfg_attr(feature = "serde", derive(serde::Serialize))]
129pub struct ExtendedItem {
130    /// Item description.
131    pub description: String,
132    /// Item value.
133    pub item: String,
134}
135
136/// A content genre nibble triplet from a
137/// [`ContentDescriptor`](crate::descriptors::content::ContentDescriptor).
138#[non_exhaustive]
139#[derive(Debug, Clone, PartialEq, Eq)]
140#[cfg_attr(feature = "serde", derive(serde::Serialize))]
141pub struct ContentNibble {
142    /// Content nibble level 1 (category).
143    pub level_1: u8,
144    /// Content nibble level 2 (sub-category).
145    pub level_2: u8,
146    /// User-defined byte.
147    pub user: u8,
148}
149
150/// A decoded view of an EPG event.
151///
152/// Extracted from [`crate::collect::CompleteEitEvent`] with commonly needed
153/// descriptor fields pre-decoded and extended text concatenated per
154/// EN 300 468 §6.2.15.
155///
156/// # Limitations
157///
158/// Only the first descriptor of each kind (short_event, content,
159/// parental_rating, content_identifier) is decoded per event; EIT events
160/// may carry multiple language variants and only the first is taken.
161#[non_exhaustive]
162#[derive(Debug, Clone, PartialEq, Eq)]
163#[cfg_attr(feature = "serde", derive(serde::Serialize))]
164pub struct EpgEvent {
165    /// 16-bit event_id.
166    pub event_id: u16,
167    /// Decoded start time (MJD + BCD UTC), if valid.
168    pub start_time: Option<chrono::DateTime<chrono::Utc>>,
169    /// Decoded BCD duration, if valid.
170    pub duration: Option<core::time::Duration>,
171    /// 3-bit running status.
172    pub running_status: u8,
173    /// free_CA_mode.
174    pub free_ca_mode: bool,
175    /// Decoded short event name (from
176    /// [`ShortEventDescriptor`](crate::descriptors::short_event::ShortEventDescriptor)),
177    /// if present and decodeable.
178    pub event_name: Option<String>,
179    /// Decoded short event text, if present and decodeable.
180    pub event_text: Option<String>,
181    /// Concatenated extended event text from all
182    /// [`ExtendedEventDescriptor`](crate::descriptors::extended_event::ExtendedEventDescriptor)
183    /// fragments, per EN 300 468 §6.2.15. Fragments are sorted by
184    /// `descriptor_number` and concatenated directly (no separator).
185    pub extended_text: Option<String>,
186    /// Accumulated extended event items (description, value) from all
187    /// [`ExtendedEventDescriptor`](crate::descriptors::extended_event::ExtendedEventDescriptor)
188    /// fragments, sorted by `descriptor_number`.
189    #[cfg_attr(feature = "serde", serde(default))]
190    pub extended_items: Vec<ExtendedItem>,
191    /// Content genre entries from
192    /// [`ContentDescriptor`](crate::descriptors::content::ContentDescriptor).
193    #[cfg_attr(feature = "serde", serde(default))]
194    pub content_nibbles: Vec<ContentNibble>,
195    /// Parental rating entries from
196    /// [`ParentalRatingDescriptor`](crate::descriptors::parental_rating::ParentalRatingDescriptor).
197    #[cfg_attr(feature = "serde", serde(default))]
198    pub ratings: Vec<Rating>,
199    /// CRID entries from
200    /// [`ContentIdentifierDescriptor`](crate::descriptors::content_identifier::ContentIdentifierDescriptor).
201    #[cfg_attr(feature = "serde", serde(default))]
202    pub crids: Vec<Crid>,
203}
204
205/// Serialisable service data exposed by [`EpgStore`] serde export.
206#[derive(Debug, Clone)]
207#[cfg_attr(feature = "serde", derive(serde::Serialize))]
208struct ServiceData {
209    #[cfg_attr(feature = "serde", serde(skip_serializing_if = "Option::is_none"))]
210    service_name: Option<String>,
211    events: Vec<EpgEvent>,
212}
213
214/// Default cap on distinct [`ServiceKey`] entries (services with cached EPG data).
215///
216/// 1024 services is generous — a single DVB transponder typically carries 20–50
217/// services — while bounding a hostile stream that rotates `service_id` /
218/// `transport_stream_id` / `original_network_id` to force unbounded map growth.
219pub const DEFAULT_MAX_SERVICES: usize = 1024;
220
221/// Default cap on events stored per service.
222///
223/// 8192 events (~28 days of 5-minute-granularity schedule entries) is far
224/// above real 7-day EPG depth while bounding per-service event accumulation
225/// from a hostile stream that rotates `event_id` without re-versioning.
226pub const DEFAULT_MAX_EVENTS_PER_SERVICE: usize = 8192;
227
228/// A store for EIT data, providing high-level access to program events.
229///
230/// Wraps a [`crate::collect::EitCollector`] and maintains a deduplicated,
231/// event list per service. Optionally accepts SDT data to attach service names.
232///
233/// Serde export serializes the cache as a map of `ServiceKey` → service data.
234///
235/// # Memory bounds
236///
237/// The store is bounded by two configurable caps (see [module docs](self) for
238/// rationale and default values). Use [`with_max_services`](Self::with_max_services)
239/// and [`with_max_events_per_service`](Self::with_max_events_per_service) to
240/// tune them.
241///
242/// # Limitations
243///
244/// Events are deduplicated by `event_id` and stored within the configured
245/// per-service cap. Events removed from a re-versioned schedule, or events
246/// already in the past, remain in the store until evicted by
247/// [`retain_services()`](Self::retain_services) or [`clear()`](Self::clear).
248///
249/// # Example
250///
251/// ```no_run
252/// use dvb_si::epg::{EpgStore, ServiceKey};
253/// use chrono::Utc;
254///
255/// let mut store = EpgStore::new();
256/// // store.feed(&eit_section_bytes).unwrap();
257///
258/// let key = ServiceKey {
259///     original_network_id: 1,
260///     transport_stream_id: 1,
261///     service_id: 100,
262/// };
263///
264/// let (now_evt, _next) = store.now_and_next(key, Utc::now());
265/// if let Some(e) = now_evt {
266///     println!("Now playing: {:?}", e.event_name);
267/// }
268/// ```
269#[derive(Debug)]
270pub struct EpgStore {
271    collector: crate::collect::EitCollector,
272    cache: HashMap<ServiceKey, ServiceEpg>,
273    max_services: usize,
274    max_events_per_service: usize,
275}
276
277impl Default for EpgStore {
278    fn default() -> Self {
279        Self {
280            collector: crate::collect::EitCollector::default(),
281            cache: HashMap::new(),
282            max_services: DEFAULT_MAX_SERVICES,
283            max_events_per_service: DEFAULT_MAX_EVENTS_PER_SERVICE,
284        }
285    }
286}
287
288#[derive(Debug, Default)]
289struct ServiceEpg {
290    service_name: Option<String>,
291    /// Deduplicated by event_id. Latest version wins (later inserts overwrite).
292    events: HashMap<u16, EpgEvent>,
293}
294
295impl EpgStore {
296    /// Create a new, empty EPG store with default caps
297    /// ([`DEFAULT_MAX_SERVICES`], [`DEFAULT_MAX_EVENTS_PER_SERVICE`]).
298    #[must_use]
299    pub fn new() -> Self {
300        Self::default()
301    }
302
303    /// Replace the service-count cap (default [`DEFAULT_MAX_SERVICES`]).
304    /// When the cap is reached, events for new services are skipped until
305    /// [`retain_services`](Self::retain_services) or [`clear`](Self::clear)
306    /// frees capacity.
307    #[must_use]
308    pub fn with_max_services(mut self, max_services: usize) -> Self {
309        self.max_services = max_services;
310        self
311    }
312
313    /// Replace the per-service event cap (default
314    /// [`DEFAULT_MAX_EVENTS_PER_SERVICE`]). When a service reaches its cap, new
315    /// events (by `event_id`) are skipped; existing event_ids are still updated
316    /// on version churn.
317    #[must_use]
318    pub fn with_max_events_per_service(mut self, max_events_per_service: usize) -> Self {
319        self.max_events_per_service = max_events_per_service;
320        self
321    }
322
323    /// Replace the underlying collector's logical-key cap (default
324    /// [`crate::collect::DEFAULT_MAX_LOGICAL_KEYS`]). See
325    /// [`crate::collect::EitCollector::with_max_logical_keys`].
326    #[must_use]
327    pub fn with_collector_max_logical_keys(mut self, max_logical_keys: usize) -> Self {
328        self.collector = self.collector.with_max_logical_keys(max_logical_keys);
329        self
330    }
331
332    /// Feed one EIT section into the store.
333    ///
334    /// If a table becomes complete, its events are merged into the cache
335    /// (deduplicated by `event_id`, later insertions overwrite).
336    ///
337    /// # Errors
338    ///
339    /// Returns a [`crate::collect::CollectError`] if the section is malformed.
340    pub fn feed(&mut self, bytes: &[u8]) -> CollectResult<()> {
341        self.feed_with_pid(None, bytes)
342    }
343
344    /// Feed one EIT section with PID context into the store.
345    pub fn feed_with_pid(&mut self, pid: Option<u16>, bytes: &[u8]) -> CollectResult<()> {
346        if let Some(completed) = self.collector.push_section_with_pid(pid, bytes)? {
347            let tables = completed.tables()?;
348            for table in &tables {
349                let key = ServiceKey {
350                    original_network_id: table.original_network_id,
351                    transport_stream_id: table.transport_stream_id,
352                    service_id: table.service_id,
353                };
354                if self.cache.len() >= self.max_services && !self.cache.contains_key(&key) {
355                    continue;
356                }
357                let svc = self.cache.entry(key).or_default();
358                for event in &table.events {
359                    if svc.events.len() >= self.max_events_per_service
360                        && !svc.events.contains_key(&event.event_id)
361                    {
362                        continue;
363                    }
364                    svc.events.insert(event.event_id, event_to_epg(event));
365                }
366            }
367        }
368        Ok(())
369    }
370
371    /// Feed completed SDT data to attach service names.
372    ///
373    /// Accepts a parsed [`crate::collect::CompleteSdt`] from a
374    /// [`crate::collect::SectionSetCollector`].
375    pub fn feed_sdt(&mut self, sdt: &crate::collect::CompleteSdt<'_>) {
376        for svc in &sdt.services {
377            let key = ServiceKey {
378                original_network_id: sdt.original_network_id,
379                transport_stream_id: sdt.transport_stream_id,
380                service_id: svc.service_id,
381            };
382            let entry = self.cache.entry(key).or_default();
383            entry.service_name = extract_service_name(svc.descriptors.descriptors());
384        }
385    }
386
387    /// Get the "now" and "next" events for a service.
388    ///
389    /// Searches the event list for the given service and returns the event
390    /// currently on-air ("now") and the next upcoming event ("next") based
391    /// on reference time `at`.
392    ///
393    /// "now" is the event where `at` falls within `[start, start + duration)`.
394    /// "next" is the event with the earliest `start_time` strictly after `at`
395    /// (not just the first such event in arbitrary iteration order).
396    ///
397    /// An event ending exactly at `at` is NOT considered "now" (exclusive end).
398    ///
399    /// Returns `(None, None)` when the service is unknown or no event matches.
400    pub fn now_and_next(
401        &self,
402        key: ServiceKey,
403        at: chrono::DateTime<chrono::Utc>,
404    ) -> (Option<&EpgEvent>, Option<&EpgEvent>) {
405        let Some(svc) = self.cache.get(&key) else {
406            return (None, None);
407        };
408
409        let now = svc.events.values().find(|e| {
410            if let (Some(start), Some(dur)) = (e.start_time, e.duration) {
411                let end = start + dur;
412                return at >= start && at < end;
413            }
414            false
415        });
416
417        let next = svc
418            .events
419            .values()
420            .filter(|e| {
421                if let Some(start) = e.start_time {
422                    start > at
423                } else {
424                    false
425                }
426            })
427            .min_by_key(|e| e.start_time);
428
429        (now, next)
430    }
431
432    /// Query events with start times in the half-open range `[from, to)`.
433    ///
434    /// Returns events sorted by start time (valid times first, then by
435    /// event_id). Events without a decodable start time are excluded.
436    #[must_use]
437    pub fn schedule(
438        &self,
439        key: ServiceKey,
440        from: chrono::DateTime<chrono::Utc>,
441        to: chrono::DateTime<chrono::Utc>,
442    ) -> Option<Vec<&EpgEvent>> {
443        let svc = self.cache.get(&key)?;
444        let mut events: Vec<&EpgEvent> = svc
445            .events
446            .values()
447            .filter(|e| {
448                if let Some(start) = e.start_time {
449                    start >= from && start < to
450                } else {
451                    false
452                }
453            })
454            .collect();
455        events.sort_by(|a, b| cmp_event_by_start(a, b));
456        Some(events)
457    }
458
459    /// Return the service name for a given key, if SDT data was fed.
460    #[must_use]
461    pub fn service_name(&self, key: ServiceKey) -> Option<&str> {
462        self.cache.get(&key).and_then(|s| s.service_name.as_deref())
463    }
464
465    /// Iterate the [`ServiceKey`]s of every service with cached EIT data, so
466    /// callers can walk the whole EPG (e.g. render a grid) without knowing the
467    /// service ids in advance. Order is unspecified.
468    pub fn services(&self) -> impl Iterator<Item = ServiceKey> + '_ {
469        self.cache.keys().copied()
470    }
471
472    /// Return all events for a service, sorted by start time
473    /// (events without a valid start time sort last, then by event_id).
474    #[must_use]
475    pub fn events(&self, key: ServiceKey) -> Option<Vec<&EpgEvent>> {
476        let svc = self.cache.get(&key)?;
477        let mut events: Vec<&EpgEvent> = svc.events.values().collect();
478        events.sort_by(|a, b| cmp_event_by_start(a, b));
479        Some(events)
480    }
481
482    /// Return the number of services with cached EIT data.
483    #[must_use]
484    pub fn service_count(&self) -> usize {
485        self.cache.len()
486    }
487
488    /// Return the total number of events across all services.
489    #[must_use]
490    pub fn event_count(&self) -> usize {
491        self.cache.values().map(|s| s.events.len()).sum()
492    }
493
494    /// Retain only services matching the given predicate.
495    ///
496    /// Both the event cache and the underlying collector partial state
497    /// for rejected keys are removed.
498    pub fn retain_services<F>(&mut self, mut keep: F)
499    where
500        F: FnMut(&ServiceKey) -> bool,
501    {
502        self.cache.retain(|key, _| keep(key));
503        self.collector.retain_logical(|lk| {
504            keep(&ServiceKey {
505                original_network_id: lk.original_network_id,
506                transport_stream_id: lk.transport_stream_id,
507                service_id: lk.service_id,
508            })
509        });
510    }
511
512    /// Clear all cached EIT data and reset the internal collector.
513    pub fn clear(&mut self) {
514        self.collector.clear();
515        self.cache.clear();
516    }
517}
518
519#[cfg(feature = "serde")]
520impl serde::Serialize for EpgStore {
521    fn serialize<S: serde::Serializer>(&self, s: S) -> Result<S::Ok, S::Error> {
522        use serde::ser::SerializeMap;
523        let mut m = s.serialize_map(Some(self.cache.len()))?;
524        for (key, svc) in &self.cache {
525            let data = ServiceData {
526                service_name: svc.service_name.clone(),
527                events: {
528                    let mut evts: Vec<EpgEvent> = svc.events.values().cloned().collect();
529                    evts.sort_by(cmp_event_by_start);
530                    evts
531                },
532            };
533            let key_str = format!(
534                "{}-{}-{}",
535                key.original_network_id, key.transport_stream_id, key.service_id
536            );
537            m.serialize_entry(&key_str, &data)?;
538        }
539        m.end()
540    }
541}
542
543fn cmp_event_by_start(a: &EpgEvent, b: &EpgEvent) -> std::cmp::Ordering {
544    match (a.start_time, b.start_time) {
545        (Some(at), Some(bt)) => at.cmp(&bt).then_with(|| a.event_id.cmp(&b.event_id)),
546        (Some(_), None) => std::cmp::Ordering::Less,
547        (None, Some(_)) => std::cmp::Ordering::Greater,
548        (None, None) => a.event_id.cmp(&b.event_id),
549    }
550}
551
552fn event_to_epg(e: &crate::collect::CompleteEitEvent<'_>) -> EpgEvent {
553    let (event_name, event_text) = extract_short_event(e.descriptors.descriptors());
554    let (extended_text, extended_items) = extract_extended(e.descriptors.descriptors());
555    let content_nibbles = extract_content(e.descriptors.descriptors());
556    let ratings = extract_ratings(e.descriptors.descriptors());
557    let crids = extract_crids(e.descriptors.descriptors());
558
559    EpgEvent {
560        event_id: e.event_id,
561        start_time: e.start_time(),
562        duration: e.duration(),
563        running_status: e.running_status,
564        free_ca_mode: e.free_ca_mode,
565        event_name,
566        event_text,
567        extended_text,
568        extended_items,
569        content_nibbles,
570        ratings,
571        crids,
572    }
573}
574
575fn extract_short_event(
576    descriptors: &[crate::Result<crate::descriptors::AnyDescriptor<'_>>],
577) -> (Option<String>, Option<String>) {
578    for desc in descriptors {
579        if let Ok(crate::descriptors::AnyDescriptor::ShortEvent(se)) = desc {
580            return (
581                Some(se.event_name.decode().into_owned()),
582                Some(se.text.decode().into_owned()),
583            );
584        }
585    }
586    (None, None)
587}
588
589struct ExtendedFragment {
590    descriptor_number: u8,
591    text: String,
592    items: Vec<ExtendedItem>,
593}
594
595fn extract_extended(
596    descriptors: &[crate::Result<crate::descriptors::AnyDescriptor<'_>>],
597) -> (Option<String>, Vec<ExtendedItem>) {
598    use crate::descriptors::AnyDescriptor;
599
600    let mut fragments: Vec<ExtendedFragment> = descriptors
601        .iter()
602        .filter_map(|d| {
603            if let Ok(AnyDescriptor::ExtendedEvent(ee)) = d {
604                let text = ee.text.decode().into_owned();
605                let items: Vec<ExtendedItem> = ee
606                    .items
607                    .iter()
608                    .map(|i| ExtendedItem {
609                        description: i.description.decode().into_owned(),
610                        item: i.value.decode().into_owned(),
611                    })
612                    .collect();
613                if !text.is_empty() || !items.is_empty() {
614                    Some(ExtendedFragment {
615                        descriptor_number: ee.descriptor_number,
616                        text,
617                        items,
618                    })
619                } else {
620                    None
621                }
622            } else {
623                None
624            }
625        })
626        .collect();
627
628    if fragments.is_empty() {
629        return (None, Vec::new());
630    }
631
632    // Sort by descriptor_number per EN 300 468 §6.2.15.
633    fragments.sort_by_key(|f| f.descriptor_number);
634
635    let extended_text: String = fragments.iter().map(|f| f.text.as_str()).collect();
636
637    let extended_items: Vec<ExtendedItem> = fragments.into_iter().flat_map(|f| f.items).collect();
638
639    let text = if extended_text.is_empty() {
640        None
641    } else {
642        Some(extended_text)
643    };
644
645    (text, extended_items)
646}
647
648fn extract_content(
649    descriptors: &[crate::Result<crate::descriptors::AnyDescriptor<'_>>],
650) -> Vec<ContentNibble> {
651    for desc in descriptors {
652        if let Ok(crate::descriptors::AnyDescriptor::Content(ct)) = desc {
653            return ct
654                .entries
655                .iter()
656                .map(|e| ContentNibble {
657                    level_1: e.nibble_1,
658                    level_2: e.nibble_2,
659                    user: e.user_byte,
660                })
661                .collect();
662        }
663    }
664    Vec::new()
665}
666
667fn extract_ratings(
668    descriptors: &[crate::Result<crate::descriptors::AnyDescriptor<'_>>],
669) -> Vec<Rating> {
670    for desc in descriptors {
671        if let Ok(crate::descriptors::AnyDescriptor::ParentalRating(pr)) = desc {
672            return pr
673                .entries
674                .iter()
675                .map(|e| Rating {
676                    country: e.country_code.as_str().into_owned(),
677                    value: e.rating,
678                })
679                .collect();
680        }
681    }
682    Vec::new()
683}
684
685fn extract_crids(
686    descriptors: &[crate::Result<crate::descriptors::AnyDescriptor<'_>>],
687) -> Vec<Crid> {
688    use crate::descriptors::content_identifier::CridLocation;
689    for desc in descriptors {
690        if let Ok(crate::descriptors::AnyDescriptor::ContentIdentifier(ci)) = desc {
691            return ci
692                .entries
693                .iter()
694                .filter_map(|e| match e.location {
695                    CridLocation::Inline(bytes) => {
696                        let s = String::from_utf8_lossy(bytes).into_owned();
697                        Some(Crid {
698                            crid_type: e.crid_type,
699                            crid: s,
700                        })
701                    }
702                    _ => None,
703                })
704                .collect();
705        }
706    }
707    Vec::new()
708}
709
710fn extract_service_name(
711    descriptors: &[crate::Result<crate::descriptors::AnyDescriptor<'_>>],
712) -> Option<String> {
713    for desc in descriptors {
714        if let Ok(crate::descriptors::AnyDescriptor::Service(svc)) = desc {
715            return Some(svc.service_name.decode().into_owned());
716        }
717    }
718    None
719}
720
721#[cfg(test)]
722mod tests {
723    use super::*;
724    use chrono::{TimeZone, Utc};
725
726    // ------------------------------------------------------------------
727    // Helpers
728    // ------------------------------------------------------------------
729
730    /// Build the bytes of a minimal short_event_descriptor.
731    fn short_event_bytes(name: &[u8], text: &[u8]) -> Vec<u8> {
732        let lang = b"eng";
733        let mut v = Vec::new();
734        v.push(0x4Du8); // tag
735        v.push((3 + 1 + name.len() + 1 + text.len()) as u8); // length
736        v.extend_from_slice(lang);
737        v.push(name.len() as u8);
738        v.extend_from_slice(name);
739        v.push(text.len() as u8);
740        v.extend_from_slice(text);
741        v
742    }
743
744    /// Build the bytes of a minimal EIT present/following section
745    /// with one event. Returns bytes formated as a complete TS section
746    /// (including CRC-32).
747    #[allow(clippy::too_many_arguments)]
748    fn eit_pf_section(
749        service_id: u16,
750        ts_id: u16,
751        on_id: u16,
752        event_id: u16,
753        version: u8,
754        start_raw: [u8; 5],
755        dur_raw: [u8; 3],
756        descriptors: &[u8],
757    ) -> Vec<u8> {
758        let table_id = 0x4Eu8;
759
760        // Header: 3 + ext_header(5) + post_ext(6) = 14
761        // Event: 12 + descriptors.len()
762        // CRC: 4
763        let ev_len = 12 + descriptors.len();
764        let section_length = 5 + 6 + ev_len + 4;
765        let total = 3 + section_length;
766
767        let mut buf = vec![0u8; total];
768        buf[0] = table_id;
769        buf[1] = 0xB0 | ((section_length >> 8) as u8 & 0x0F);
770        buf[2] = (section_length & 0xFF) as u8;
771        buf[3..5].copy_from_slice(&service_id.to_be_bytes());
772        // reserved(2)=0b11, version, current_next=1
773        buf[5] = 0xC0 | ((version & 0x1F) << 1) | 0x01;
774        buf[6] = 0; // section_number
775        buf[7] = 0; // last_section_number
776        buf[8..10].copy_from_slice(&ts_id.to_be_bytes());
777        buf[10..12].copy_from_slice(&on_id.to_be_bytes());
778        buf[12] = 0; // segment_last_section_number
779        buf[13] = 0x5F; // last_table_id
780
781        // Event
782        let ev_off = 14;
783        buf[ev_off..ev_off + 2].copy_from_slice(&event_id.to_be_bytes());
784        buf[ev_off + 2..ev_off + 7].copy_from_slice(&start_raw);
785        buf[ev_off + 7..ev_off + 10].copy_from_slice(&dur_raw);
786        let dll = descriptors.len() as u16;
787        buf[ev_off + 10] = ((dll >> 8) as u8) & 0x0F;
788        buf[ev_off + 11] = (dll & 0xFF) as u8;
789        buf[ev_off + 12..ev_off + 12 + descriptors.len()].copy_from_slice(descriptors);
790
791        // CRC-32
792        let crc_pos = total - 4;
793        let crc = dvb_common::crc32_mpeg2::compute(&buf[..crc_pos]);
794        buf[crc_pos..].copy_from_slice(&crc.to_be_bytes());
795        buf
796    }
797
798    /// Build start-time raw bytes (16-bit MJD + 24-bit BCD) for a given
799    /// year/month/day/hour.
800    fn start_raw(year: i32, month: u32, day: u32, hour: u32) -> [u8; 5] {
801        let mjd = mjd_approx(year, month, day);
802        let mjd_bytes = mjd.to_be_bytes();
803        let bcd_hour = ((hour / 10 * 16) + (hour % 10)) as u8;
804        [
805            mjd_bytes[0],
806            mjd_bytes[1],
807            bcd_hour,
808            0, // minute BCD
809            0, // second BCD
810        ]
811    }
812
813    /// Quick MJD approximation for test dates (2026-06-10 = MJD 61785).
814    fn mjd_approx(year: i32, month: u32, day: u32) -> u16 {
815        assert!(
816            (year, month, day) == (2026, 6, 10),
817            "mjd_approx only supports 2026-06-10"
818        );
819        61785
820    }
821
822    /// Build content_descriptor (tag 0x54) wire bytes.
823    fn content_descriptor_bytes(entries: &[(u8, u8, u8)]) -> Vec<u8> {
824        let mut v = vec![0x54u8, (entries.len() * 2) as u8];
825        for &(n1, n2, u) in entries {
826            v.push((n1 << 4) | n2);
827            v.push(u);
828        }
829        v
830    }
831
832    /// Build parental_rating_descriptor (tag 0x55) wire bytes.
833    fn parental_rating_bytes(entries: &[([u8; 3], u8)]) -> Vec<u8> {
834        let mut v = vec![0x55u8, (entries.len() * 4) as u8];
835        for (country, rating) in entries {
836            v.extend_from_slice(country);
837            v.push(*rating);
838        }
839        v
840    }
841
842    /// Build content_identifier_descriptor (tag 0x76) wire bytes with inline
843    /// CRIDs.
844    fn content_identifier_bytes(entries: &[(u8, &[u8])]) -> Vec<u8> {
845        let body_len: usize = entries.iter().map(|(_, data)| 2 + data.len()).sum();
846        let mut v = vec![0x76u8, body_len as u8];
847        for (crid_type, data) in entries {
848            v.push(crid_type << 2); // location=0b00 inline
849            v.push(data.len() as u8);
850            v.extend_from_slice(data);
851        }
852        v
853    }
854
855    // ------------------------------------------------------------------
856    // Basic tests
857    // ------------------------------------------------------------------
858
859    #[test]
860    fn new_store_is_empty() {
861        let store = EpgStore::new();
862        assert_eq!(store.service_count(), 0);
863        assert_eq!(store.event_count(), 0);
864    }
865
866    #[test]
867    fn feed_empty_is_error() {
868        let mut store = EpgStore::new();
869        assert!(store.feed(&[]).is_err());
870    }
871
872    #[test]
873    fn now_and_next_no_data_returns_none() {
874        let store = EpgStore::new();
875        let now = Utc::now();
876        let key = ServiceKey {
877            original_network_id: 1,
878            transport_stream_id: 1,
879            service_id: 100,
880        };
881        assert_eq!(store.now_and_next(key, now), (None, None));
882    }
883
884    #[test]
885    fn service_key_ordering() {
886        let a = ServiceKey {
887            original_network_id: 1,
888            transport_stream_id: 2,
889            service_id: 100,
890        };
891        let b = ServiceKey {
892            original_network_id: 1,
893            transport_stream_id: 2,
894            service_id: 200,
895        };
896        assert!(a < b);
897    }
898
899    fn empty_event(
900        id: u16,
901        start: Option<chrono::DateTime<chrono::Utc>>,
902        dur: Option<core::time::Duration>,
903    ) -> EpgEvent {
904        EpgEvent {
905            event_id: id,
906            start_time: start,
907            duration: dur,
908            running_status: 0,
909            free_ca_mode: false,
910            event_name: None,
911            event_text: None,
912            extended_text: None,
913            extended_items: Vec::new(),
914            content_nibbles: Vec::new(),
915            ratings: Vec::new(),
916            crids: Vec::new(),
917        }
918    }
919
920    #[test]
921    fn events_sorts_valid_before_invalid() {
922        let valid = empty_event(
923            1,
924            Some(Utc::now()),
925            Some(core::time::Duration::from_secs(3600)),
926        );
927        let invalid = empty_event(2, None, None);
928
929        let mut events = [&invalid, &valid];
930        events.sort_by(|a, b| cmp_event_by_start(a, b));
931        assert_eq!(events[0].event_id, 1);
932        assert_eq!(events[1].event_id, 2);
933    }
934
935    // ------------------------------------------------------------------
936    // §6.2.15 extended event text chaining
937    // ------------------------------------------------------------------
938
939    #[test]
940    fn extended_text_chaining_per_spec_6_2_15() {
941        use crate::descriptors::extended_event::ExtendedEventDescriptor;
942        use crate::descriptors::AnyDescriptor;
943        use crate::text::{DvbText, LangCode};
944
945        // Fragment 1: descriptor_number=2, last_descriptor_number=3
946        // "The quick " + item ("Director", "Alice")
947        let frag1 = ExtendedEventDescriptor {
948            descriptor_number: 2,
949            last_descriptor_number: 3,
950            language_code: LangCode(*b"eng"),
951            items: vec![crate::descriptors::extended_event::ExtendedEventItem {
952                description: DvbText::new(b"Director"),
953                value: DvbText::new(b"Alice"),
954            }],
955            text: DvbText::new(b"The quick "),
956        };
957
958        // Fragment 2: descriptor_number=0, last_descriptor_number=3
959        // "brown fox" + item ("Year", "2026")
960        let frag2 = ExtendedEventDescriptor {
961            descriptor_number: 0,
962            last_descriptor_number: 3,
963            language_code: LangCode(*b"eng"),
964            items: vec![crate::descriptors::extended_event::ExtendedEventItem {
965                description: DvbText::new(b"Year"),
966                value: DvbText::new(b"2026"),
967            }],
968            text: DvbText::new(b"brown fox"),
969        };
970
971        // Fragment 3: descriptor_number=3, last_descriptor_number=3
972        // "jumps." + no items
973        let frag3 = ExtendedEventDescriptor {
974            descriptor_number: 3,
975            last_descriptor_number: 3,
976            language_code: LangCode(*b"eng"),
977            items: vec![],
978            text: DvbText::new(b"jumps."),
979        };
980
981        // Fragment 4: descriptor_number=1, last_descriptor_number=3
982        // empty text + item ("Genre", "Thriller") — dropped by the chaining
983        // helper (text is empty but items present → included)
984        let frag4 = ExtendedEventDescriptor {
985            descriptor_number: 1,
986            last_descriptor_number: 3,
987            language_code: LangCode(*b"eng"),
988            items: vec![crate::descriptors::extended_event::ExtendedEventItem {
989                description: DvbText::new(b"Genre"),
990                value: DvbText::new(b"Thriller"),
991            }],
992            text: DvbText::new(b""),
993        };
994
995        // Feed fragments out of order via AnyDescriptor.
996        let descriptors: Vec<crate::Result<AnyDescriptor<'_>>> = vec![
997            Ok(AnyDescriptor::ExtendedEvent(frag1)), // dn=2
998            Ok(AnyDescriptor::ExtendedEvent(frag4)), // dn=1
999            Ok(AnyDescriptor::ExtendedEvent(frag3)), // dn=3
1000            Ok(AnyDescriptor::ExtendedEvent(frag2)), // dn=0
1001        ];
1002
1003        let (text, items) = extract_extended(&descriptors);
1004
1005        // Text concatenated in descriptor_number order: 0,1,2,3
1006        assert_eq!(text.as_deref(), Some("brown foxThe quick jumps."));
1007
1008        // Items accumulated in descriptor_number order: dn=0 ("Year"/"2026"),
1009        // dn=1 ("Genre"/"Thriller"), dn=2 ("Director"/"Alice"), dn=3 (none)
1010        assert_eq!(items.len(), 3);
1011        assert_eq!(
1012            items[0],
1013            ExtendedItem {
1014                description: "Year".into(),
1015                item: "2026".into()
1016            }
1017        );
1018        assert_eq!(
1019            items[1],
1020            ExtendedItem {
1021                description: "Genre".into(),
1022                item: "Thriller".into()
1023            }
1024        );
1025        assert_eq!(
1026            items[2],
1027            ExtendedItem {
1028                description: "Director".into(),
1029                item: "Alice".into()
1030            }
1031        );
1032    }
1033
1034    // ------------------------------------------------------------------
1035    // now_and_next boundary correctness
1036    // ------------------------------------------------------------------
1037
1038    #[test]
1039    fn now_and_next_event_boundary() {
1040        let t1000 = Utc.with_ymd_and_hms(2026, 6, 10, 10, 0, 0).unwrap();
1041        let t1100 = Utc.with_ymd_and_hms(2026, 6, 10, 11, 0, 0).unwrap();
1042        let t1200 = Utc.with_ymd_and_hms(2026, 6, 10, 12, 0, 0).unwrap();
1043
1044        // Event 1: 10:00-11:00
1045        // Event 2: 12:00-13:00
1046        let sec = core::time::Duration::from_secs(3600);
1047        let ev1 = EpgEvent {
1048            event_id: 1,
1049            start_time: Some(t1000),
1050            duration: Some(sec),
1051            running_status: 0,
1052            free_ca_mode: false,
1053            event_name: Some("Event 1".into()),
1054            event_text: None,
1055            extended_text: None,
1056            extended_items: vec![],
1057            content_nibbles: vec![],
1058            ratings: vec![],
1059            crids: vec![],
1060        };
1061        let ev2 = EpgEvent {
1062            event_id: 2,
1063            start_time: Some(t1200),
1064            duration: Some(sec),
1065            running_status: 0,
1066            free_ca_mode: false,
1067            event_name: Some("Event 2".into()),
1068            event_text: None,
1069            extended_text: None,
1070            extended_items: vec![],
1071            content_nibbles: vec![],
1072            ratings: vec![],
1073            crids: vec![],
1074        };
1075
1076        // Set up store manually (bypass feed).
1077        let mut store = EpgStore::new();
1078        let key = ServiceKey {
1079            original_network_id: 1,
1080            transport_stream_id: 1,
1081            service_id: 100,
1082        };
1083        let svc = store.cache.entry(key).or_default();
1084        svc.events.insert(1, ev1);
1085        svc.events.insert(2, ev2);
1086
1087        // At 10:30 — now=Event 1, next=Event 2
1088        let at = Utc.with_ymd_and_hms(2026, 6, 10, 10, 30, 0).unwrap();
1089        let (now, next) = store.now_and_next(key, at);
1090        assert_eq!(now.unwrap().event_id, 1);
1091        assert_eq!(next.unwrap().event_id, 2);
1092
1093        // At 11:00 exactly — event 1 just ended (exclusive end),
1094        // now=None, next=Event 2
1095        let (now, next) = store.now_and_next(key, t1100);
1096        assert!(now.is_none(), "event ending at query time must NOT be now");
1097        assert_eq!(next.unwrap().event_id, 2);
1098
1099        // At 12:00 exactly — now=Event 2 (start == at, inclusive start),
1100        // next=None
1101        let (now, next) = store.now_and_next(key, t1200);
1102        assert_eq!(now.unwrap().event_id, 2);
1103        assert!(next.is_none());
1104    }
1105
1106    // ------------------------------------------------------------------
1107    // now_and_next: earliest-future-event selection (not arbitrary order)
1108    // ------------------------------------------------------------------
1109
1110    #[test]
1111    fn now_and_next_returns_earliest_future_event() {
1112        // Build a service with events out of insertion order:
1113        // Event 14:00 inserted first, Event 12:00 second, Event 16:00 third.
1114        // Query at 10:00 — "next" must be Event 12:00 (earliest future),
1115        // not Event 14:00 (which would win if the code used arbitrary
1116        // HashMap iteration order).
1117        let t1200 = Utc.with_ymd_and_hms(2026, 6, 10, 12, 0, 0).unwrap();
1118        let t1400 = Utc.with_ymd_and_hms(2026, 6, 10, 14, 0, 0).unwrap();
1119        let t1600 = Utc.with_ymd_and_hms(2026, 6, 10, 16, 0, 0).unwrap();
1120        let t1000 = Utc.with_ymd_and_hms(2026, 6, 10, 10, 0, 0).unwrap();
1121
1122        let sec = core::time::Duration::from_secs(3600);
1123
1124        fn named_event(
1125            id: u16,
1126            start: chrono::DateTime<chrono::Utc>,
1127            dur: core::time::Duration,
1128            name: &str,
1129        ) -> EpgEvent {
1130            EpgEvent {
1131                event_id: id,
1132                start_time: Some(start),
1133                duration: Some(dur),
1134                running_status: 0,
1135                free_ca_mode: false,
1136                event_name: Some(name.into()),
1137                event_text: None,
1138                extended_text: None,
1139                extended_items: vec![],
1140                content_nibbles: vec![],
1141                ratings: vec![],
1142                crids: vec![],
1143            }
1144        }
1145
1146        let mut store = EpgStore::new();
1147        let key = ServiceKey {
1148            original_network_id: 1,
1149            transport_stream_id: 1,
1150            service_id: 100,
1151        };
1152        let svc = store.cache.entry(key).or_default();
1153        // Insert out of order — 14:00 first, 12:00 second, 16:00 third
1154        svc.events.insert(3, named_event(3, t1400, sec, "Event 14"));
1155        svc.events.insert(1, named_event(1, t1200, sec, "Event 12"));
1156        svc.events.insert(2, named_event(2, t1600, sec, "Event 16"));
1157
1158        // "next" at 10:00 must be the earliest future — event 1 at 12:00
1159        let (_now, next) = store.now_and_next(key, t1000);
1160        let next = next.expect("next event must exist");
1161        assert_eq!(
1162            next.event_id, 1,
1163            "next must be earliest future event (12:00), not first in iteration order"
1164        );
1165    }
1166
1167    // ------------------------------------------------------------------
1168    // extract_content / extract_ratings / extract_crids through feed
1169    // ------------------------------------------------------------------
1170
1171    #[test]
1172    fn extract_content_ratings_crids_through_feed() {
1173        let content = content_descriptor_bytes(&[(3, 1, 0xAA), (4, 2, 0xBB)]);
1174        let ratings = parental_rating_bytes(&[(*b"FRA", 0x05), (*b"GBR", 0x01)]);
1175        let crids = content_identifier_bytes(&[
1176            (0x01, b"crid://bbc.co.uk/prog123"),
1177            (0x03, b"crid://bbc.co.uk/rec456"),
1178        ]);
1179
1180        let mut descriptors = Vec::new();
1181        descriptors.extend_from_slice(&content);
1182        descriptors.extend_from_slice(&ratings);
1183        descriptors.extend_from_slice(&crids);
1184
1185        let sr = start_raw(2026, 6, 10, 10);
1186        let eit = eit_pf_section(100, 1, 1, 1, 0, sr, [1, 0, 0], &descriptors);
1187
1188        let mut store = EpgStore::new();
1189        store.feed(&eit).unwrap();
1190
1191        let key = ServiceKey {
1192            original_network_id: 1,
1193            transport_stream_id: 1,
1194            service_id: 100,
1195        };
1196        let events = store.events(key).unwrap();
1197        assert_eq!(events.len(), 1);
1198        let ev = &events[0];
1199
1200        assert_eq!(ev.content_nibbles.len(), 2);
1201        assert_eq!(
1202            ev.content_nibbles[0],
1203            ContentNibble {
1204                level_1: 3,
1205                level_2: 1,
1206                user: 0xAA
1207            }
1208        );
1209        assert_eq!(
1210            ev.content_nibbles[1],
1211            ContentNibble {
1212                level_1: 4,
1213                level_2: 2,
1214                user: 0xBB
1215            }
1216        );
1217
1218        assert_eq!(ev.ratings.len(), 2);
1219        assert_eq!(ev.ratings[0].country, "FRA");
1220        assert_eq!(ev.ratings[0].value, 0x05);
1221        assert_eq!(ev.ratings[1].country, "GBR");
1222        assert_eq!(ev.ratings[1].value, 0x01);
1223
1224        assert_eq!(ev.crids.len(), 1 + 1); // one series + one recommendation
1225        assert_eq!(ev.crids[0].crid_type, 0x01);
1226        assert_eq!(ev.crids[0].crid, "crid://bbc.co.uk/prog123");
1227        assert_eq!(ev.crids[1].crid_type, 0x03);
1228        assert_eq!(ev.crids[1].crid, "crid://bbc.co.uk/rec456");
1229    }
1230
1231    #[test]
1232    fn extract_service_name_through_feed_sdt() {
1233        use crate::collect::SectionSetCollector;
1234
1235        // Build a service_descriptor (tag 0x48) with provider="BBC", service_name="BBC ONE HD"
1236        let svc_desc = {
1237            let provider = b"BBC";
1238            let name = b"BBC ONE HD";
1239            let mut v = vec![0x48u8, (1 + 1 + provider.len() + 1 + name.len()) as u8];
1240            v.push(0x01); // service_type = TV SD
1241            v.push(provider.len() as u8);
1242            v.extend_from_slice(provider);
1243            v.push(name.len() as u8);
1244            v.extend_from_slice(name);
1245            v
1246        };
1247
1248        // Build an SDT section (table_id 0x42) with one service.
1249        let sdt_bytes = {
1250            let dll = svc_desc.len() as u16;
1251            // Service entry: 5 bytes header + descriptors
1252            let svc_entry_len = 5 + dll as usize;
1253            // Section: 3 (header) + 5 (ext) + 3 (post_ext) = 11 + svc + 4 (crc)
1254            let section_length: u16 = 5 + 3 + svc_entry_len as u16 + 4;
1255            let mut buf = vec![0u8; 3 + section_length as usize];
1256            buf[0] = 0x42; // SDT actual
1257            buf[1] = 0xB0 | ((section_length >> 8) as u8 & 0x0F);
1258            buf[2] = (section_length & 0xFF) as u8;
1259            buf[3..5].copy_from_slice(&1u16.to_be_bytes()); // ts_id
1260            buf[5] = 0xC1; // version=0, cni=1
1261            buf[6] = 0; // section_number
1262            buf[7] = 0; // last_section_number
1263            buf[8..10].copy_from_slice(&1u16.to_be_bytes()); // original_network_id
1264            buf[10] = 0xFF; // reserved
1265
1266            // Service entry
1267            let off = 11;
1268            buf[off..off + 2].copy_from_slice(&100u16.to_be_bytes()); // service_id=100
1269            buf[off + 2] = 0xFC; // flags
1270            buf[off + 3] = ((dll >> 8) as u8) & 0x0F;
1271            buf[off + 4] = (dll & 0xFF) as u8;
1272            buf[off + 5..off + 5 + svc_desc.len()].copy_from_slice(&svc_desc);
1273
1274            // CRC
1275            let crc_off = buf.len() - 4;
1276            let crc = dvb_common::crc32_mpeg2::compute(&buf[..crc_off]);
1277            buf[crc_off..].copy_from_slice(&crc.to_be_bytes());
1278            buf
1279        };
1280
1281        let mut collector = SectionSetCollector::new();
1282        let complete = collector.push_section(&sdt_bytes).unwrap().unwrap();
1283        let sdt = complete.sdt().unwrap();
1284
1285        let mut store = EpgStore::new();
1286        store.feed_sdt(&sdt);
1287
1288        let key = ServiceKey {
1289            original_network_id: 1,
1290            transport_stream_id: 1,
1291            service_id: 100,
1292        };
1293        assert_eq!(store.service_name(key), Some("BBC ONE HD"));
1294        assert_eq!(store.service_count(), 1);
1295    }
1296
1297    // ------------------------------------------------------------------
1298    // Version churn: bounded growth
1299    // ------------------------------------------------------------------
1300
1301    #[test]
1302    fn version_churn_bounded_growth() {
1303        // Feed an event, then feed the same event_id with updated data.
1304        // Store size must stay at 1 event.
1305        let s = |hh: u32| {
1306            let t = Utc.with_ymd_and_hms(2026, 6, 10, hh, 0, 0).unwrap();
1307            let days = 61785u16; // MJD for 2026-06-10
1308            let mjd_bytes = days.to_be_bytes();
1309            let bcd_time = [(hh / 10 * 16 + hh % 10) as u8, 0, 0];
1310            (
1311                [
1312                    mjd_bytes[0],
1313                    mjd_bytes[1],
1314                    bcd_time[0],
1315                    bcd_time[1],
1316                    bcd_time[2],
1317                ],
1318                t,
1319            )
1320        };
1321
1322        let (start1, _) = s(10);
1323        let (start2, _) = s(14);
1324
1325        let desc1 = short_event_bytes(b"News at 10", b"");
1326        let desc2 = short_event_bytes(b"News at 14", b"");
1327
1328        let eit1 = eit_pf_section(100, 1, 1, 1, 0, start1, [1, 0, 0], &desc1);
1329        let eit2 = eit_pf_section(100, 1, 1, 1, 1, start2, [1, 0, 0], &desc2);
1330
1331        let mut store = EpgStore::new();
1332        store.feed(&eit1).unwrap();
1333        assert_eq!(store.event_count(), 1);
1334        store.feed(&eit2).unwrap();
1335        // Same event_id should overwrite, not duplicate
1336        assert_eq!(store.event_count(), 1);
1337
1338        let key = ServiceKey {
1339            original_network_id: 1,
1340            transport_stream_id: 1,
1341            service_id: 100,
1342        };
1343        let evts = store.events(key).unwrap();
1344        assert_eq!(evts.len(), 1);
1345        assert_eq!(evts[0].event_name.as_deref(), Some("News at 14"));
1346    }
1347
1348    // ------------------------------------------------------------------
1349    // schedule range query
1350    // ------------------------------------------------------------------
1351
1352    #[test]
1353    fn schedule_range_query() {
1354        let t0900 = Utc.with_ymd_and_hms(2026, 6, 10, 9, 0, 0).unwrap();
1355        let t1000 = Utc.with_ymd_and_hms(2026, 6, 10, 10, 0, 0).unwrap();
1356        let t1100 = Utc.with_ymd_and_hms(2026, 6, 10, 11, 0, 0).unwrap();
1357        let t1200 = Utc.with_ymd_and_hms(2026, 6, 10, 12, 0, 0).unwrap();
1358
1359        let sec = core::time::Duration::from_secs(1800);
1360        let mut store = EpgStore::new();
1361        let key = ServiceKey {
1362            original_network_id: 1,
1363            transport_stream_id: 1,
1364            service_id: 100,
1365        };
1366        let svc = store.cache.entry(key).or_default();
1367        for (id, t) in [(1, t0900), (2, t1000), (3, t1100)] {
1368            svc.events.insert(
1369                id,
1370                EpgEvent {
1371                    event_id: id,
1372                    start_time: Some(t),
1373                    duration: Some(sec),
1374                    running_status: 0,
1375                    free_ca_mode: false,
1376                    event_name: Some(format!("Event {id}")),
1377                    event_text: None,
1378                    extended_text: None,
1379                    extended_items: vec![],
1380                    content_nibbles: vec![],
1381                    ratings: vec![],
1382                    crids: vec![],
1383                },
1384            );
1385        }
1386
1387        // [10:00, 12:00) → events 2 and 3
1388        let events = store.schedule(key, t1000, t1200).unwrap();
1389        assert_eq!(events.len(), 2);
1390        assert_eq!(events[0].event_id, 2);
1391        assert_eq!(events[1].event_id, 3);
1392
1393        // [12:00, 13:00) → empty
1394        let events = store.schedule(key, t1200, t1100).unwrap();
1395        assert!(events.is_empty());
1396    }
1397
1398    // ------------------------------------------------------------------
1399    // Cap enforcement: max_services bounds service-count growth
1400    // ------------------------------------------------------------------
1401
1402    #[test]
1403    fn max_services_capped() {
1404        // Feed 3 distinct services with a cap of 2 — only the first 2 should
1405        // be retained; the third is skipped until space frees.
1406        let mut store = EpgStore::new().with_max_services(2);
1407
1408        let desc = short_event_bytes(b"Test", b"");
1409
1410        // Service 100
1411        let sr1 = start_raw(2026, 6, 10, 10);
1412        let eit1 = eit_pf_section(100, 1, 1, 1, 0, sr1, [1, 0, 0], &desc);
1413        store.feed(&eit1).unwrap();
1414        assert_eq!(store.service_count(), 1);
1415
1416        // Service 200
1417        let sr2 = start_raw(2026, 6, 10, 11);
1418        let eit2 = eit_pf_section(200, 1, 1, 3, 0, sr2, [1, 0, 0], &desc);
1419        store.feed(&eit2).unwrap();
1420        assert_eq!(store.service_count(), 2);
1421
1422        // Service 300 — should be skipped (cap 2 already hit, new key)
1423        let sr3 = start_raw(2026, 6, 10, 12);
1424        let eit3 = eit_pf_section(300, 1, 1, 5, 0, sr3, [1, 0, 0], &desc);
1425        store.feed(&eit3).unwrap();
1426        assert_eq!(
1427            store.service_count(),
1428            2,
1429            "third service must be rejected when cap is full"
1430        );
1431
1432        // Verify service 300 has no entry
1433        let key300 = ServiceKey {
1434            original_network_id: 1,
1435            transport_stream_id: 1,
1436            service_id: 300,
1437        };
1438        assert!(
1439            store.events(key300).is_none(),
1440            "rejected service must not appear"
1441        );
1442
1443        // Clearing frees space — service 300 can now be stored
1444        store.clear();
1445        store.feed(&eit3).unwrap();
1446        assert_eq!(store.service_count(), 1);
1447        assert!(store.events(key300).is_some());
1448    }
1449
1450    // ------------------------------------------------------------------
1451    // Cap enforcement: max_events_per_service bounds per-service events
1452    // ------------------------------------------------------------------
1453
1454    #[test]
1455    fn max_events_per_service_capped() {
1456        // Feed 4 distinct event_ids into one service with a cap of 3.
1457        // The 4th event must be skipped.
1458        let mut store = EpgStore::new().with_max_events_per_service(3);
1459
1460        let desc = short_event_bytes(b"Test", b"");
1461        let key = ServiceKey {
1462            original_network_id: 1,
1463            transport_stream_id: 1,
1464            service_id: 100,
1465        };
1466
1467        for (version, (event_id, hour)) in [(10, 10u32), (20, 11), (30, 12), (40, 13)]
1468            .iter()
1469            .enumerate()
1470        {
1471            let sr = start_raw(2026, 6, 10, *hour);
1472            let eit = eit_pf_section(100, 1, 1, *event_id, version as u8, sr, [1, 0, 0], &desc);
1473            store.feed(&eit).unwrap();
1474        }
1475
1476        assert_eq!(store.event_count(), 3, "4th event must be skipped at cap 3");
1477
1478        // Version churn on existing event_id still works:
1479        let sr_v2 = start_raw(2026, 6, 10, 15);
1480        let eit_v2 = eit_pf_section(100, 1, 1, 10, 1, sr_v2, [1, 0, 0], &desc);
1481        store.feed(&eit_v2).unwrap();
1482        assert_eq!(
1483            store.event_count(),
1484            3,
1485            "version churn on existing event_id must not increase count"
1486        );
1487
1488        let evts = store.events(key).unwrap();
1489        let ev10 = evts.iter().find(|e| e.event_id == 10).unwrap();
1490        assert_eq!(
1491            ev10.event_name.as_deref(),
1492            Some("Test"),
1493            "existing event updated"
1494        );
1495    }
1496
1497    // ------------------------------------------------------------------
1498    // serde round-trip
1499    // ------------------------------------------------------------------
1500
1501    #[cfg(feature = "serde")]
1502    #[test]
1503    fn serde_serializes_store_as_json() {
1504        let t = Utc.with_ymd_and_hms(2026, 6, 10, 20, 0, 0).unwrap();
1505        let mut store = EpgStore::new();
1506        let key = ServiceKey {
1507            original_network_id: 1,
1508            transport_stream_id: 1,
1509            service_id: 100,
1510        };
1511        let svc = store.cache.entry(key).or_default();
1512        svc.service_name = Some("BBC One".into());
1513        svc.events.insert(
1514            1,
1515            EpgEvent {
1516                event_id: 1,
1517                start_time: Some(t),
1518                duration: Some(core::time::Duration::from_secs(3600)),
1519                running_status: 4,
1520                free_ca_mode: false,
1521                event_name: Some("The News".into()),
1522                event_text: Some("Today's headlines".into()),
1523                extended_text: None,
1524                extended_items: vec![],
1525                content_nibbles: vec![ContentNibble {
1526                    level_1: 1,
1527                    level_2: 1,
1528                    user: 0,
1529                }],
1530                ratings: vec![],
1531                crids: vec![],
1532            },
1533        );
1534
1535        let json = serde_json::to_string(&store).unwrap();
1536        let v: serde_json::Value = serde_json::from_str(&json).unwrap();
1537        let svc_data = &v["1-1-100"];
1538        assert_eq!(svc_data["service_name"], "BBC One");
1539        assert_eq!(svc_data["events"][0]["event_name"], "The News");
1540        assert_eq!(
1541            svc_data["events"][0]["content_nibbles"][0],
1542            serde_json::json!({"level_1": 1, "level_2": 1, "user": 0})
1543        );
1544    }
1545}